ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • MySQL Binlog CDC 구현(1) - Binlog 읽기
    개발 2024. 12. 5. 04:28

    Binlog란?

    Binlog는 Binary Log라고도 불리며, 데이터베이스에서 발생하는 변경 사항(예: 테이블 생성, 데이터 변경 등)을 기록한 이벤트(Event)를 포함하는 로그 파일입니다.

    아래는 MySQL 공식 문서에서 발췌한 설명입니다:

    The binary log contains “events” that describe database changes such as table creation operations or changes to table data.
    (출처: MySQL 8.4 Reference Manual)

    이처럼, Binlog는 데이터베이스의 변경 작업을 이벤트로 기록하며, 데이터 복구와 복제(Replication)와 같은 중요한 작업에서 핵심적인 역할을 합니다.


    Binlog를 활용한 CDC 구현

    저는 이 Binlog를 사용하여 데이터베이스 변경 사항을 기반으로 비즈니스 로직 처리캐시 데이터 동기화를 수행하는 CDC(Change Data Capture) 기능을 구현했습니다.

     

    구현한 내용을 설명하기에 앞서 주요 클래스들을 설명하겠습니다.

    • BinlogReader : Binlog Client가 있는 클래스로 Binlog 를 읽고 로직 처리를 위임하는 클래스입니다.
    • CDCEventPublisher : BinlogReader 가 받아온 Event를 처리하는 클래스입니다.
    • BinlogPositionTracker : 서버가 예기치 않게 종료될 경우 마지막 읽은 Binlog 부터 처리하기 위해 Binlog 위치를 추적하는 클래스입니다.
    • TableMetadataCache : Binlog 는 변경이 일어난 데이터에 대해서 컬럼명 대신 컬럼의 Bitset을 포함하고 있습니다. 이를 기반으로 데이터를 변환하기 위해 테이블 정보를 저장하고 있는 클래스입니다. 서버 시작시 초기화가 이루어집니다.

    1. 의존성 추가 및 MySQL 설정

    Binlog를 활용하기 위해 다음과 같은 오픈소스 프로젝트를 확인할 수 있었습니다:

    1. Shyikomysql-binlog-connector-java:
      이 프로젝트는 MySQL Binlog 기반 프로젝트의 부모 프로젝트라 할 수 있습니다. 현재는 유지 관리가 중단되었지만, 많은 Binlog 관련 프로젝트가 이 프로젝트를 기반으로 개발되었습니다.
    2. Osheroffmysql-binlog-connector-java:
      Shyiko의 프로젝트를 포크하여 지속적으로 개발되고 있는 프로젝트입니다. 비교적 최신 MySQL 버전과의 호환성을 제공하며, 활성적으로 유지 관리되고 있어 이 프로젝트를 선택하기로 결정했습니다.
    implementation 'com.zendesk:mysql-binlog-connector-java:0.30.1'

     

    Binlog 에 대한 설정 파일

    [mysqld]
    # Binlog 파일이 저장될 경로를 설정합니다.
    # /var/lib/mysql/binlog 디렉토리에 Binlog 파일이 생성됩니다.
    log_bin=/var/lib/mysql/binlog
    
    # 서버 ID를 설정합니다.
    # MySQL 복제 환경에서 각 서버를 고유하게 식별하기 위해 사용됩니다.
    # 1 이상의 고유한 값을 설정해야 합니다.
    server_id=1
    
    # Binlog에 기록할 데이터베이스를 지정합니다.
    # 여기서는 "dandi"라는 데이터베이스의 변경 사항만 Binlog에 기록됩니다.
    # 여러 데이터베이스를 지정하려면 binlog-do-db 옵션을 반복적으로 추가하면 됩니다.
    binlog-do-db=dandi
    
    # Binlog에 기록되는 행(row) 데이터의 이미지 형태를 설정합니다.
    # FULL: 행의 모든 열 데이터를 기록합니다 (기본값).
    # MINIMAL: 필요한 최소 데이터만 기록.
    # NOBLOB: BLOB 및 TEXT 필드 제외.
    binlog_row_image=FULL

     


    2.  Binlog Event 구조 파악

     

    Binlog connector 는 Binlog 를 하나의 Event 로 처리하고 있습니다.

    public class Event implements Serializable {
    
        private EventHeader header;
        private EventData data;
    
        // 생략
    }

     

    • EventHeader : Binlog 가 어떠한 이벤트를 나타내는 지에 대한 헤더 정보를 담는 인터페이스 입니다.
    • EventData : 이벤트에 대한 데이터의 인터페이스 입니다.

    그리고 소스코드를 통해 위 두 인터페이스를 구현하고 있는 클래스들은 어떤 것이 있는 지 파악했습니다.

     

    EventHeader

    EventHeader 인터페이스의 구현체를 확인해본 결과 EventHeaderV4 밖에 없었습니다.

    EventHeaderV4는 필드로 시간 정보, 이벤트 타입, 다음 Binlog 포지션에 대한 정보 등을 가지고 있습니다.

    package com.github.shyiko.mysql.binlog.event;
    
    public class EventHeaderV4 implements EventHeader {
    
        // v1 (MySQL 3.23)
        private long timestamp; // 시간 정보
        private EventType eventType; // 이벤트 타입
        private long serverId;
        private long eventLength;
        // v3 (MySQL 4.0.2-4.1)
        private long nextPosition; // 다음 로그 파일 포지션
        private int flags;
    	
        // 생략
    }

    이벤트 타입은 Enum 클래스로 이벤트 타입에 대한 정보들이 선언되어 있었습니다. 이를 토대로 INSERT, UPDATE, DELETE 등 변경 감지가 필요한 이벤트 타입을 적절히 선택하여 코드를 작성하였습니다.

    package com.github.shyiko.mysql.binlog.event;
    
    import java.util.stream.Stream;
    
    public enum EventType {
    
    	// 생략
    	
        /**
         * Describes inserted rows (within a single table).
         * Used in case of RBR (5.1.18+).
         */
        EXT_WRITE_ROWS(30),
        /**
         * Describes updated rows (within a single table).
         * Used in case of RBR (5.1.18+).
         */
        EXT_UPDATE_ROWS(31),
        /**
         * Describes deleted rows (within a single table).
         * Used in case of RBR (5.1.18+).
         */
        EXT_DELETE_ROWS(32),
    
    	// 생략
    
    }

     


    3.  BinlogReader 클래스

    public void start() {
      this.client.registerEventListener(this::processEvent);
      try {
        this.client.connect();
      } catch (IOException e) {
        log.error("Failed to connect to Binlog server", e);
      }
    }
    
    private void processEvent(Event event) {
      EventHeaderV4 header = event.getHeader();
      String currentBinlog = this.client.getBinlogFilename();
      long currentPosition = header.getPosition(); // 다음 Binlog 위치
      this.tracker.updatePosition(currentBinlog, currentPosition); // Binlog 중복 처리 방지
      EventType eventType = header.getEventType();
    
      // 헤더의 이벤트 타입으로 알맞게 처리
      switch (eventType) {
        case TABLE_MAP -> this.eventPublisher.saveTableInfo(event); // 테이블 정보 저장
        case WRITE_ROWS, EXT_WRITE_ROWS,
             UPDATE_ROWS, EXT_UPDATE_ROWS,
             DELETE_ROWS, EXT_DELETE_ROWS ->
            this.eventPublisher.processRowEvent(event, eventType);
      }
    }

     

    4. 실행

    Binlog를 활용하여 회원 데이터를 저장하고 이벤트를 읽어오는 기능을 테스트하기 위해, 간단한 Runner 클래스를 작성했습니다. 디버그 모드로 실행하여 Binlog를 정상적으로 읽어오는지 확인했습니다.

    @RequiredArgsConstructor
    @Component
    public class AppRunner implements ApplicationRunner {
    
      private final MemberRepository memberRepository;
    
      @Transactional
      @Override
      public void run(ApplicationArguments args) {
        this.memberRepository.save(
            Member.builder()
                .nickname("test")
                .status(EmailStatus.VERIFIED)
                .memberStatus(MemberStatus.ACTIVE)
                .createdAt(LocalDateTime.now())
                .foundItemAlarm(true)
                .lostItemAlarm(true)
                .commentAlarm(true)
                .build()
        );
      }
    }

     

    정상적으로 Binlog를 읽어오는 것을 확인할 수 있습니다.

     

    그러나 아래의 결과에서 문제점이 발견되었습니다:

    1. Binlog에는 몇 번째 컬럼에 어떤 데이터가 들어갔는지에 대한 정보만 기록되어 있습니다.
    2. 컬럼명은 포함되어 있지 않아 데이터를 해석하기 어렵습니다.

     

    몇번째 컬럼에 어떤 데이터가 들어갔는지에 대한 정보만 있을 뿐, 컬럼명은 포함되어 있지 않습니다.

    다음 포스팅에서는 위 데이터를 컬럼명을 포함한 원본 데이터로 변환하는 방법과 구현 과정을 다룰 예정입니다.

Designed by Tistory.