이 연재글은 Database Migration 실습의 2번째 글입니다.

DynamoDB는 어떤 규모에서도 10 millisecond 미만의 성능을 제공하는 AWS에서 제공하는 완전 관리형 Nosql 데이터베이스 입니다. 실습에서는 dynamoDB 가 제공하는 Stream에 대하여 알아보겠습니다. dynamoDB stream을 이용하면 dynamoDB에 상태 변경(Insert, Delete, Update)이 발생했을때 변경 데이터 스트림을 전달받을 수 있습니다. dynamoDB의 변경 사항에 대한 실시간 데이터 처리가 필요한 경우 stream은 유용하게 사용될 수 있습니다. 또한 AWS Lambda의 trigger를 이용하면 dynamoDB의 stream 이벤트를 쉽게 전달받을 수 있어 후속처리를 간편하게 할 수 있습니다.

Data Flow

dynamoDB의 변경 스트림을 AWSLambda에서 trigger event로 전달받아 카프카에 발행합니다. 카프카의 Topic을 구독 중인 Consumer는 스트리밍 데이터를 전달받아 이형의 데이터 플랫폼에 전달하게 됩니다.

DynamoDB Stream 활성화

dynamodb table 생성 시 stream은 disable상태입니다. 상태 변경 데이터를 전달받으려면 먼저 stream을 활성화해야 합니다. 그러나 Lambda함수의 trigger로 dynamodb를 설정하면 stream이 자동으로 enable 되므로 해당 작업은 필요 없습니다.

Manage DynamoDB steam 버튼을 클릭하면 아래와 같이 Stream 방식을 선택할 수 있습니다. 실습에서는 변경 데이터와 기존 데이터(New and Old Images)를 스트림에서 모두 전달받도록 설정합니다.

Stream 데이터 확인을 위한 간단한 Lambda 프로그램 작성

Lambda의 trigger로 dynomoDB를 설정하면 변경 스트림을 event 객체로 전달받을 수 있습니다.

module.exports.main = event => {
    if(event) {
        console.log(JSON.stringify(event));
    }
}

Lambda Trigger로 DynamoDB 선택

Lambda Designer화면을 열고 Add trigger를 선택한 다음 Select a trigger 셀렉트 박스에서 DynamoDB를 선택합니다.

Trigger configuration 화면에서 상태 변경 스트림을 전달받을 DynamoDB table을 선택한 다음 Add를 눌러 trigger를 활성화 합니다. Batch size는 한 번에 읽어 처리할 수 있는 스트림 개수입니다. 그리고 Starting position은 상태 변경 스트림을 어느 부분부터 읽을 것인가인데 Latest를 선택하여 최근 발생한 이벤트의 스트림을 전달받도록 설정합니다. additional settings를 열면 Lambda에서 스트림 처리 실패 시 SQS나 SNS로 보낼 수 있도록 설정도 가능합니다. 실습에서는 대부분 default 값을 사용하지만 Production 환경을 세팅할 시에는 메뉴얼을 참고하여 환경에 맞는 적절한 값을 적용하여 사용하면 됩니다.

trigger 추가가 완료되면 아래와 같이 Designer를 통해 연결된 정보를 확인할 수 있습니다.

DynamoDB Stream format

trigger 추가가 완료되면 Lambda함수 log를 통해 어떤 형식으로 dynamodb stream이 전달되는지 확인할 수 있습니다. 변경사항의 eventName에 따라 아래와 같이 데이터가 출력됩니다.

  • 데이터 추가시 : eventName = INSERT, 결과에 NewImage 데이터 출력
  • 데이터 수정시 : eventName = MODIFY, 결과에 NewImage , OldImage 데이터 출력
  • 데이터 삭제시 : eventName = REMOVE, 결과에 OldImage 데이터 출력

eventName = INSERT

{
  "Records": [
    {
      "eventID": "c4ca4238a0b923820dcc509a6f75849b",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "ap-northeast-2",
      "dynamodb": {
       "Keys": {
          "name": {
            "S": "leelala"
          },
          "id": {
            "N": "2"
          }
        },
        "NewImage": {
          "name": {
            "S": "leelala"
          },
          "id": {
            "N": "2"
          },
          "info": {
            "M": {
              "gender": {
                "S": "female"
              },
              "favorite": {
                "L": [
                  {
                    "S": "music"
                  },
                  {
                    "S": "computer"
                  }
                ]
              },
              "age": {
                "N": "40"
              }
            }
          }
        },
        "ApproximateCreationDateTime": 1428537600,
        "SequenceNumber": "4421584500000000017450439091",
        "SizeBytes": 26,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:ap-northeast-2:123456789012:table/dynamo-test/stream/2015-06-27T00:48:05.899"
    }
  ]
}

eventName = MODIFY

{
  "Records": [
    {
      "eventID": "c81e728d9d4c2f636f067f89cc14862c",
      "eventName": "MODIFY",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "ap-northeast-2",
      "dynamodb": {
       "Keys": {
          "name": {
            "S": "leelala"
          },
          "id": {
            "N": "2"
          }
        },
        "NewImage": {
          "name": {
            "S": "leelala"
          },
          "id": {
            "N": "2"
          },
          "info": {
            "M": {
              "gender": {
                "S": "male"
              },
              "favorite": {
                "L": [
                  {
                    "S": "music"
                  },
                  {
                    "S": "computer"
                  }
                ]
              },
              "age": {
                "N": "20"
              }
            }
          }
        },
        "OldImage": {
          "name": {
            "S": "leelala"
          },
          "id": {
            "N": "2"
          },
          "info": {
            "M": {
              "gender": {
                "S": "female"
              },
              "favorite": {
                "L": [
                  {
                    "S": "music"
                  },
                  {
                    "S": "computer"
                  }
                ]
              },
              "age": {
                "N": "40"
              }
            }
          }
        },
        "ApproximateCreationDateTime": 1428537600,
        "SequenceNumber": "4421584500000000017450439092",
        "SizeBytes": 59,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:ap-northeast-2:123456789012:table/dynamo-test/stream/2015-06-27T00:48:05.899"
    }
  ]
}

eventName = REMOVE

{
  "Records": [
    {
      "eventID": "c44c2fd0d761602fdc53fbcbfd3f8f7a",
      "eventName": "REMOVE",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "ap-northeast-2",
      "dynamodb": {
        "ApproximateCreationDateTime": 1606204953,
        "Keys": {
          "name": {
            "S": "leelala"
          },
          "id": {
            "N": "2"
          }
        },
        "OldImage": {
          "name": {
            "S": "leelala"
          },
          "id": {
            "N": "2"
          },
          "info": {
            "M": {
              "gender": {
                "S": "female"
              },
              "favorite": {
                "L": [
                  {
                    "S": "music"
                  },
                  {
                    "S": "computer"
                  }
                ]
              },
              "age": {
                "N": "40"
              }
            }
          }
        },
        "SequenceNumber": "3175000000000003543299768",
        "SizeBytes": 83,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:ap-northeast-2:593352196761:table/dynamo-test/stream/2020-11-23T14:13:21.748"
    }
  ]
}

CheckPoint

  • DynamoDB 스트림은 최대 24시간만 저장됩니다.
  • Lambda 함수 트리거로 dynamodb를 설정하면 dynamodb의 stream은 자동으로 활성화 됩니다.
  • 다른 vpc내에 있는 서비스가 dynamoDB에 접속하기 위해서는 vpc endpoint설정이 필요합니다. (https://aws.amazon.com/ko/blogs/korea/new-vpc-endpoints-for-dynamodb/)

DynamoDB Stream Kafka에 발행

Lambda trigger로 전달된 stream 데이터를 kafka의 topic으로 발행(publish)하는 node.js 코드를 작성합니다. 테스트로 작성한 코드이므로 작동하지 않을 수 있습니다.

const aws = require("aws-sdk");
aws.config.update({
    region: 'ap-northeast-2'
})
const kafka = require("kafka-node");
module.exports.main = event => {
    if (event) {
        console.log(JSON.stringify(event));
        // publish stream to kafka cluster
        const Producer = kafka.Producer,
            KeyedMessage = kafka.KeyedMessage,
            client = new kafka.KafkaClient({kafkaHost: 'broker-host:broker-port'}),
            producer = new Producer(client),
            km = new KeyedMessage('key', 'message'),
            payloads = [
                {topic: 'dynamoStream', messages: JSON.stringify(event.Records)}
            ];
        producer.on('ready', function () {
            producer.send(payloads, function (err, data) {
                console.log(data);
            });
        });
        producer.on('error', function (err) {
        })
    }
}

[참고]

https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kafka.html

https://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/Streams.Lambda.Tutorial.html

https://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/GettingStarted.NodeJs.03.html

https://amazonmsk-labs.workshop.aws/en/overview/overview.html

https://medium.com/signal9/node-js-에서-dynamodb-streams-다루기-63bf5c4609a0

연재글 이동[이전글] AuroraDB Migration – Using Kafka as a target for Database Migration Service
[다음글] Amazon DocumentDB(MongoDB) Stream