ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • MySQL Binlog CDC 구현(2) - Binlog 변환
    개발 2024. 12. 26. 06:14

    지난 포스팅에 이어 Binlog 를 파싱하여 이벤트를 발행하는 과정입니다.

     

    Binlog는 크게 다음과 같은 정보를 가지고 있습니다.

    - 어떠한 EventType의 Binlog 인지

    - 몇번째 컬럼의 데이터인지를 나타내는 Bitset

    - 데이터 정보 등등

     

    컬럼명에 대한 정보가 없으니 원본 데이터로 파싱하기란 불가능합니다.

    따라서 MySQL의 Metadata를 통해 컬럼의 인덱스와 컬럼명을 알고 있어야 적절히 컬럼명과 데이터를 매핑할 수 있습니다.


    초기화

      @PostConstruct
      public void init() throws SQLException {
        try (Connection connection = jdbcTemplate.getDataSource().getConnection()) {
          ResultSet resultSet = connection.getMetaData()
              .getTables(null, null, "%", new String[]{"TABLE"});
          while (resultSet.next()) {
            String tableName = resultSet.getString("TABLE_NAME");
    
            if (!tables.contains(tableName)) {
              continue;
            }
    
            ResultSet columns = connection.getMetaData()
                .getColumns(null, null, tableName, "%");
            List<String> columnInfos = new ArrayList<>();
            while (columns.next()) {
              String columnName = columns.getString("COLUMN_NAME");
              columnInfos.add(columnName);
            }
            tableColumnCache.put(tableName, columnInfos);
          }
        }
      }

     

    • 메타데이터 조회:
      • getTables 메서드로 데이터베이스의 테이블 목록을 가져옵니다.
      • SHOW FULL TABLES FROM [database] 쿼리가 내부적으로 실행됩니다.
    • 컬럼 정보 조회:
      • 조회한 테이블 이름을 기준으로 getColumns 메서드로 컬럼 정보를 가져옵니다.
      • 해당 컬럼 정보를 캐시(tableColumnCache)에 저장합니다.
    • 필요한 테이블 필터링:
      • 특정 테이블만 처리하기 위해 조건(tables.contains)으로 필터링합니다.

    Binlog 데이터 추출

     

    Binlog 데이터에서 필요한 정보를 추출하기 위한 커스텀 클래스 RowsEventData를 사용합니다.

    import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
    import com.github.shyiko.mysql.binlog.event.Event;
    import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
    import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
    import java.io.Serializable;
    import java.util.BitSet;
    import java.util.List;
    import java.util.Map.Entry;
    import lombok.Getter;
    
    public class RowsEventData {
    
      private final Event event;
      @Getter
      private final long tableId;
      @Getter
      private final BitSet includedColumns;
      private final Object rows;
    
      public RowsEventData(Event event) {
        this.event = event;
        this.tableId = extractTableId();
        this.includedColumns = extractIncludedColumns();
        this.rows = extractRows();
      }
    
      public List<Entry<Serializable[], Serializable[]>> getUpdateRows() {
        if (event.getData() instanceof UpdateRowsEventData) {
          return (List<Entry<Serializable[], Serializable[]>>) rows;
        }
        throw new IllegalStateException("Rows are not of type UpdateRowsEventData");
      }
    
      public List<Serializable[]> getRows() {
        return (List<Serializable[]>) rows;
      }
    
      private long extractTableId() {
        if (event.getData() instanceof WriteRowsEventData data) {
          return data.getTableId();
        }
        if (event.getData() instanceof UpdateRowsEventData data) {
          return data.getTableId();
        }
        if (event.getData() instanceof DeleteRowsEventData data) {
          return data.getTableId();
        } else {
          throw new IllegalArgumentException(
              "Unsupported event type: " + event.getHeader().getEventType());
        }
      }
    
      private BitSet extractIncludedColumns() {
        if (event.getData() instanceof WriteRowsEventData data) {
          return data.getIncludedColumns();
        }
        if (event.getData() instanceof UpdateRowsEventData data) {
          return data.getIncludedColumns();
        }
        if (event.getData() instanceof DeleteRowsEventData data) {
          return data.getIncludedColumns();
        } else {
          throw new IllegalArgumentException(
              "Unsupported event type: " + event.getHeader().getEventType());
        }
      }
    
      private Object extractRows() {
        if (event.getData() instanceof WriteRowsEventData data) {
          return data.getRows();
        }
        if (event.getData() instanceof UpdateRowsEventData data) {
          return data.getRows();
        }
        if (event.getData() instanceof DeleteRowsEventData data) {
          return data.getRows();
        } else {
          throw new IllegalArgumentException(
              "Unsupported event type: " + event.getHeader().getEventType());
        }
      }
    }

     

     

    • Binlog 이벤트 데이터 처리:
      • tableId: 이벤트의 테이블 식별자.
      • includedColumns: 이벤트에 포함된 컬럼 비트셋(BitSet).
      • rows: 실제 변경된 데이터.
    • 이벤트 유형별 처리:
      • WriteRowsEventData, UpdateRowsEventData, DeleteRowsEventData 각각에 대해 데이터를 추출합니다.

    이벤트 발행

    Binlog 데이터와 메타데이터를 사용해 이벤트를 발행합니다.

     

      private void publishNotificationInsertEvent(RowsEventData data, String tableName) {
        Map<String, Object> rowList = getRowList(data, tableName);
        NotificationInsertEvent notificationInsertEvent = this.objectMapper.convertValue(rowList, NotificationInsertEvent.class);
        this.eventPublisher.publishEvent(notificationInsertEvent);
      }
    
      private Map<String, Object> getRowList(RowsEventData data, String tableName) {
        Map<String, Object> rowList = new HashMap<>();
        List<Serializable[]> rows = data.getRows();
    
        for (Serializable[] rowValues : rows) {
          rowList = mapRowData(rowValues, tableName);
        }
        return rowList;
      }
    
    
      private Map<String, Object> mapRowData(Serializable[] rowValues, String tableName) {
        Map<String, Object> row = new HashMap<>();
        for (int i = 0; i < rowValues.length; i++) {
          String columnName = this.tableMetadataCache.getColumnInfo(tableName, i);
          row.put(columnName, rowValues[i]);
        }
        return row;
      }
    • Row 데이터 매핑:
      • RowsEventData에서 추출한 데이터를 컬럼 정보와 매핑하여 Map으로 변환합니다.
    • 이벤트 객체 생성:
      • ObjectMapper를 사용해 Map을 특정 이벤트 객체(NotificationInsertEvent)로 변환합니다.
    • 이벤트 발행:
      • 변환된 이벤트를 eventPublisher로 발행합니다.
    1.  

    결과

    2024-12-26 06:12:32.613 [pool-3-thread-1] INFO  com.e205.cdc.CDCEventPublisher - publish notification event NotificationInsertEvent(id=3, memberId=1, title=Comment Notification, body=Comment Notification, type=comment)
    2024-12-26 06:12:32.622 [pool-3-thread-1] INFO  com.e205.cdc.CDCEventPublisher - publish notification event NotificationInsertEvent(id=4, memberId=1, title=Route Notification, body=Route Notification, type=route)

     

     

    정리

    1. 초기화 시점에서 테이블 및 컬럼 정보를 캐싱.
    2. Binlog 이벤트에서 데이터를 추출.
    3. 컬럼 정보를 매핑하여 이벤트 객체로 변환.
    4. 이벤트를 발행.
Designed by Tistory.