-
MySQL Binlog CDC 구현(2) - Binlog 변환개발 2024. 12. 26. 06:14
지난 포스팅에 이어 Binlog 를 파싱하여 이벤트를 발행하는 과정입니다.
Binlog는 크게 다음과 같은 정보를 가지고 있습니다.
- 어떠한 EventType의 Binlog 인지
- 몇번째 컬럼의 데이터인지를 나타내는 Bitset
- 데이터 정보 등등
컬럼명에 대한 정보가 없으니 원본 데이터로 파싱하기란 불가능합니다.
따라서 MySQL의 Metadata를 통해 컬럼의 인덱스와 컬럼명을 알고 있어야 적절히 컬럼명과 데이터를 매핑할 수 있습니다.
초기화
@PostConstruct public void init() throws SQLException { try (Connection connection = jdbcTemplate.getDataSource().getConnection()) { ResultSet resultSet = connection.getMetaData() .getTables(null, null, "%", new String[]{"TABLE"}); while (resultSet.next()) { String tableName = resultSet.getString("TABLE_NAME"); if (!tables.contains(tableName)) { continue; } ResultSet columns = connection.getMetaData() .getColumns(null, null, tableName, "%"); List<String> columnInfos = new ArrayList<>(); while (columns.next()) { String columnName = columns.getString("COLUMN_NAME"); columnInfos.add(columnName); } tableColumnCache.put(tableName, columnInfos); } } }
- 메타데이터 조회:
- getTables 메서드로 데이터베이스의 테이블 목록을 가져옵니다.
- SHOW FULL TABLES FROM [database] 쿼리가 내부적으로 실행됩니다.
- 컬럼 정보 조회:
- 조회한 테이블 이름을 기준으로 getColumns 메서드로 컬럼 정보를 가져옵니다.
- 해당 컬럼 정보를 캐시(tableColumnCache)에 저장합니다.
- 필요한 테이블 필터링:
- 특정 테이블만 처리하기 위해 조건(tables.contains)으로 필터링합니다.
Binlog 데이터 추출
Binlog 데이터에서 필요한 정보를 추출하기 위한 커스텀 클래스 RowsEventData를 사용합니다.
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; import java.io.Serializable; import java.util.BitSet; import java.util.List; import java.util.Map.Entry; import lombok.Getter; public class RowsEventData { private final Event event; @Getter private final long tableId; @Getter private final BitSet includedColumns; private final Object rows; public RowsEventData(Event event) { this.event = event; this.tableId = extractTableId(); this.includedColumns = extractIncludedColumns(); this.rows = extractRows(); } public List<Entry<Serializable[], Serializable[]>> getUpdateRows() { if (event.getData() instanceof UpdateRowsEventData) { return (List<Entry<Serializable[], Serializable[]>>) rows; } throw new IllegalStateException("Rows are not of type UpdateRowsEventData"); } public List<Serializable[]> getRows() { return (List<Serializable[]>) rows; } private long extractTableId() { if (event.getData() instanceof WriteRowsEventData data) { return data.getTableId(); } if (event.getData() instanceof UpdateRowsEventData data) { return data.getTableId(); } if (event.getData() instanceof DeleteRowsEventData data) { return data.getTableId(); } else { throw new IllegalArgumentException( "Unsupported event type: " + event.getHeader().getEventType()); } } private BitSet extractIncludedColumns() { if (event.getData() instanceof WriteRowsEventData data) { return data.getIncludedColumns(); } if (event.getData() instanceof UpdateRowsEventData data) { return data.getIncludedColumns(); } if (event.getData() instanceof DeleteRowsEventData data) { return data.getIncludedColumns(); } else { throw new IllegalArgumentException( "Unsupported event type: " + event.getHeader().getEventType()); } } private Object extractRows() { if (event.getData() instanceof WriteRowsEventData data) { return data.getRows(); } if (event.getData() instanceof UpdateRowsEventData data) { return data.getRows(); } if (event.getData() instanceof DeleteRowsEventData data) { return data.getRows(); } else { throw new IllegalArgumentException( "Unsupported event type: " + event.getHeader().getEventType()); } } }
- Binlog 이벤트 데이터 처리:
- tableId: 이벤트의 테이블 식별자.
- includedColumns: 이벤트에 포함된 컬럼 비트셋(BitSet).
- rows: 실제 변경된 데이터.
- 이벤트 유형별 처리:
- WriteRowsEventData, UpdateRowsEventData, DeleteRowsEventData 각각에 대해 데이터를 추출합니다.
이벤트 발행
Binlog 데이터와 메타데이터를 사용해 이벤트를 발행합니다.
private void publishNotificationInsertEvent(RowsEventData data, String tableName) { Map<String, Object> rowList = getRowList(data, tableName); NotificationInsertEvent notificationInsertEvent = this.objectMapper.convertValue(rowList, NotificationInsertEvent.class); this.eventPublisher.publishEvent(notificationInsertEvent); } private Map<String, Object> getRowList(RowsEventData data, String tableName) { Map<String, Object> rowList = new HashMap<>(); List<Serializable[]> rows = data.getRows(); for (Serializable[] rowValues : rows) { rowList = mapRowData(rowValues, tableName); } return rowList; } private Map<String, Object> mapRowData(Serializable[] rowValues, String tableName) { Map<String, Object> row = new HashMap<>(); for (int i = 0; i < rowValues.length; i++) { String columnName = this.tableMetadataCache.getColumnInfo(tableName, i); row.put(columnName, rowValues[i]); } return row; }
- Row 데이터 매핑:
- RowsEventData에서 추출한 데이터를 컬럼 정보와 매핑하여 Map으로 변환합니다.
- 이벤트 객체 생성:
- ObjectMapper를 사용해 Map을 특정 이벤트 객체(NotificationInsertEvent)로 변환합니다.
- 이벤트 발행:
- 변환된 이벤트를 eventPublisher로 발행합니다.
결과
2024-12-26 06:12:32.613 [pool-3-thread-1] INFO com.e205.cdc.CDCEventPublisher - publish notification event NotificationInsertEvent(id=3, memberId=1, title=Comment Notification, body=Comment Notification, type=comment) 2024-12-26 06:12:32.622 [pool-3-thread-1] INFO com.e205.cdc.CDCEventPublisher - publish notification event NotificationInsertEvent(id=4, memberId=1, title=Route Notification, body=Route Notification, type=route)
정리
- 초기화 시점에서 테이블 및 컬럼 정보를 캐싱.
- Binlog 이벤트에서 데이터를 추출.
- 컬럼 정보를 매핑하여 이벤트 객체로 변환.
- 이벤트를 발행.
'개발' 카테고리의 다른 글
MySQL Binlog CDC 구현(1) - Binlog 읽기 (0) 2024.12.05 AOP를 활용하여 로깅 기능 개발 (1) 2024.11.13 findAll vs Stream vs Batch처리 비교 (0) 2024.11.07 JPA N+1 감지 기능 구현기 (0) 2024.10.30 JOOQ 테스트 환경 분리하기 (1) 2024.09.25 - 메타데이터 조회: