이 연재글은 Database Migration 실습의 4번째 글입니다.

이전 실습까지는 데이터 마이그레이션을 위해 Database에서 제공하는 binlog나 DynamoDB/MongoDB에서 제공하는 Change Stream을 통해 변경 데이터를 처리할 수 있었습니다. 하지만 이렇게 시스템적으로 지원을 받지 못하는 경우는 어떻게 해야할까요. 여러가지 방법이 있겟지만 이번 장에서는 Transactional Outbox Pattern을 사용하는 방법에 대해 살펴보겠습니다.

AWS의 Serverless AuroraDB는 인스턴스 및 용량 관리의 복잡성을 크게 줄여주는 장점이 있어 최근 많이 사용되는 추세입니다. 그런데 Serverless AuroraDB의 경우 Classic AuroraDB에서 제공하는 binlog를 사용할 수가 없습니다. 즉 현재 시점에서는 AWS DMS(Database Migration Service)를 사용할 수 없어 이형의 플랫폼으로 Database를 마이그레이션 해야할 경우 어려움에 봉착하게 됩니다. 결국 시스템 측면에서 도움을 받을 수 없으면 소프트웨어 측면에서 문제를 해결할 수 밖에 없습니다.

Transactional Outbox Pattern

Transactional Outbox는 특정 도메인 객체(테이블)의 변경사항이 발생할때 단일 트랜잭션내에서 동일 데이터베이스 내의 Outbox 테이블에 관련 정보를 저장하여 후속 이벤트를 처리하는 방법입니다. 해당 시나리오는 다음과 같습니다.

  • 동일 Database내에 변경사항을 기록하는 Outbox Table을 생성
  • 단일 트랜잭션 내에서 데이터 변경 + OutBox로 변경사항을 기록하는 액션을 수행. 동일 Database내에서 단일 트랙잭션으로 데이터를 처리하므로 변경 데이터를 일관성있게 처리 할 수 있음
  • Outbox 테이블에 기록된 변경 사항은 Message Relay를 통해 Message Broker로 발행
  • Message Broker에 발행된 내역은 Outbox 테이블에서 삭제
  • 타 플랫폼에서는 Message Broker를 구독하여 변경 데이터를 전달받아 마이그레이션을 진행

Event Flow

Data Flow

Create Outbox Table

마이그레이션 대상의 변경 이력을 저장하기 위한 Outbox 테이블을 생성합니다.

FieldTypeCommentSample
idbigintPrimaryKey /AutoIncrement1000
aggregate_idvarchar순서 처리가 필요한 데이터를 Grouping하는 id( ex. user_id)happydaddy
aggregate_typevarchar변경이 발생한 도메인(테이블) 이름LICENSE
payloadtext도메인 entity 변경사항(JSON){“id”:42,”user_id”:”happydaddy”,”episode_id”:14562,”expire_at”:”2021-01-01T15:23:38Z”}
event_typevarchar발생한 이벤트 (INSERT, UPDATE, DELETE …)INSERT
created_at_datetimedatetime이벤트 발생시간2020-12-01 23:38:39.616869

Spring ApplicationEvent를 통한 Outbox 이벤트 처리

Java Spring 환경으로 작성되었으며 주요 부분에 대해서만 설명합니다. 전체 code는 다음 링크를 확인해 주십시오.

https://github.com/codej99/TransactionalOutbox.git

Application Event를 발행하는 event publisher 생성

@Component
public class EventPublisher implements ApplicationEventPublisherAware {

    private ApplicationEventPublisher publisher;

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }

    public void publish(OutBoxEvent outboxEvent) {
        this.publisher.publishEvent(outboxEvent);
    }
}

비즈니스 로직 처리시 테이블 변경사항이 발생하면 단일 트랜잭션 내에서 Outbox Event를 발행

@RequiredArgsConstructor
@Service
public class LicenseService implements RegisterLicenseUseCase {

    private final RegisterLicensePort registerLicensePort;
    private final EventPublisher eventPublisher;

    @Override
    @Transactional
    public void registerLicense(License license) {
        LicenseEntity licenseEntity = LicenseEntity.builder().userid(license.getUserid()).episodeId(license.getEpisodeId()).expireAtDatetime(license.getExpireAtDatetime()).build();
        registerLicensePort.register(licenseEntity);
        eventPublisher.publish(EventUtils.createLicenseEvent(licenseEntity, EventUtils.EventType.INSERT));
    }
}

이벤트 리스너가 데이터베이스 변경 이벤트를 Outbox 테이블에 기록

@RequiredArgsConstructor
@Service
public class EventService {

    private final SaveOutBoxPort saveOutBoxPort;

    @EventListener
    public void handleOutBoxEvent(OutBoxEvent event) {
        OutBoxEntity outBox = OutBoxEntity.builder()
                .aggregateId(event.getAggregateId())
                .aggregateType(event.getAggregateType().name())
                .eventType(event.getEventType().name())
                .payload(event.getPayload().toString())
                .createdAtDatetime(LocalDateTime.now())
                .build();
        saveOutBoxPort.save(outBox);
    }
}

Message Broker로 변경사항(Outbox)을 발행하는 Message Relay 작성

node로 작성되었으며 주요 부분에 대해서만 설명합니다. 전체 code는 다음 링크를 확인해 주십시오.

https://github.com/codej99/TransactionalOutboxMessageRelay.git

Outbox 테이블에 기록된 DB 변경 이벤트 조회

const publishOutbox = () => {
    const connection = createConnection();
    const sql = `SELECT
                    id,
                    aggregate_id,
                    aggregate_type,
                    event_type,
                    payload,
                    created_at_datetime
                FROM outbox
                ORDER BY id asc`;
    connection.query(sql, function (err, result) {
        if (result) {
            pushToKafka(result);
        } else {
            console.error(`publishOutbox failed - ${err}`);
        }
    });
    connection.end(function (err) {
    });
};

kafka topic에 변경사항 발행

const pushToKafka = (rows) => {
    const Producer = kafka.Producer,
        client = new kafka.KafkaClient({kafkaHost: process.env.KAFKA_BROKER}),
        producer = new Producer(client, {partitionerType: 3});

    let payloads = [];
    for (let i = 0; i < rows.length; i++) {
        rows[i].payload = JSON.parse(rows[i].payload);
        console.log(rows[i].id + " : " + JSON.stringify(rows[i]));
        payloads.push({
            topic: process.env.KAFKA_TOPIC,
            messages: JSON.stringify(rows[i]),
            key: rows[i].aggregate_id
        })
    }
    producer.on("ready", function () {
        producer.send(payloads, function (err, result) {
            if (result) {
                console.info(`[SUCCESS] publishKafka - ${JSON.stringify(result)}`);
                deleteOutbox(rows);
            } else {
                console.error(`publishKafka failed - ${err}`);
            }
            producer.close();
        })
    });
    producer.on("error", function (err) {
        console.error(`Unknown Error Occured : ${err}`);
    });
};

발행한 이벤트를 Outbox에서 삭제

const deleteOutbox = (rows) => {
    const connection = createConnection();
    for (let i = 0; i < rows.length; i++) {
        const sql = `DELETE FROM outbox WHERE id=${rows[i].id}`;
        connection.query(sql, function (err, result) {
            if (result)
                console.info(`[SUCCESS] deleteOutbox - ${JSON.stringify(result)}`);
            else
                console.error(`deleteOutbox failed - ${err}`);
        });
    }
    connection.end(function (err) {
    });
};

참고

https://dzone.com/articles/implementing-the-outbox-pattern

https://www.popit.kr/msa에서-메시징-트랜잭션-처리하기

https://medium.com/contino-engineering/publishing-events-to-kafka-using-a-outbox-pattern-867a48e29d35

연재글 이동[이전글] Amazon DocumentDB(MongoDB) Stream
[다음글] AWS Database Migration Service – Migration auroradb to kafka