이 연재글은 AWS 람다(lambda) 실습의 8번째 글입니다.

이번 시간에는 AWS SQS와 Lambda 함수를 이용하여 작업을 비동기로 처리하는 방법에 대해 실습해 보겠습니다. SQS란 AWS에서 제공하는 Simple Queue Service의 약자입니다. Queue(메시지 대기열)는 아래 그림과 같이 한쪽에선 데이터가 들어가고 한쪽에선 데이터가 나가는 파이프 형태의 구조체입니다.

일반적인 애플리케이션으로 치자면 RabbitMQ나 Kafka 같은 메시징 시스템에서 제공하는 Queue와 비슷합니다. AWS는 이러한 Queue의 기능을 쉽게 사용할 수 있도록 SQS라는 서비스로 제공하고 있습니다. 또한 kafka와 같은 고성능 메시징 시스템은 AWS에서 따로 서비스를 제공하고 있습니다.
– Amazon Managed Streaming for Apache Kafka(Amazon MSK)

비동기로 작업을 처리하는 목적

비동기 작업이라는 것은 어떤 목적을 달성하기 위한 여러 가지 연관된 작업이 있을때 그 중에 순차적으로 처리될 필요가 없는 작업을 뜻합니다. 실습에서 비동기 대상이 되는 작업은 다음과 같이 정의할 수 있습니다.

주(Main) 작업과 관련은 있지만 별도로 처리되어도 되며 ,
주 작업처럼 실시간으로 처리되어 결과를 즉시 반환하지 않아도 되는 작업

빠른 응답속도

웹서비스는 최대한 사용자에게 긍정적인 사용자 경험을 제공하기 위하여 빠른 응답속도가 필수적입니다. 사용자의 요청을 빠르게 처리하기 위해서는 즉시 처리 후 응답해야 하는 메인 작업 외에는 비동기로 처리할 수 있도록 변경하여 응답속도를 높여야 합니다.

웹서비스에서 발생할 수 있는 비동기 작업의 몇가지 유형

예시1) 특정 주제에 대한 글 작성 -> 부가 작업 : 나를 구독한 사용자들에게 알림 푸시 발송

예시2) 특정 컨텐츠에 대한 좋아요, 댓글 작성 -> 부가 작업 : 발생한 사용자 액션에 대한 로그 기록

예시3) 사용자 프로필 이미지 변경 -> 부가 작업 : 원본 이미지를 바탕으로 다양한 사이즈의 썸네일 생성

위의 예시 중 부가 작업은 비동기 작업 대상입니다. 주 작업과 부가 작업을 한꺼번에 처리하고 빠르게 응답하는 것이 베스트지만, 그렇게 하기 힘든 경우가 위의 예시처럼 꽤 많이 존재합니다. 예제와 같은 경우 부가 작업 때문에 주 작업의 처리 속도가 영향을 받거나, 주 작업은 이미 처리되었는데 응답을 주지 못하는 상황이 발생할 수 있습니다 . 따라서 가능하면 주 작업을 빠르게 완료시키고 부가 작업은 백그라운드 Job에게 맡긴 다음 빠르게 응답결과를 주도록 개발하는 경우가 많습니다. 부가 작업이 heavy 하고 오래 걸리는 작업이라면 더욱 이런 방식의 처리가 필요합니다.

비동기 처리의 목적은 클라이언트에게 빠른 응답을 주기 위해서 주 작업 처리에 영향을 미치지 않게 부가 작업을 별도의 작업하에 안전하게 수행하는 것입니다.

비동기처리에 Queue를 사용하는 이유?

부하 분산

만약 웹서비스에서 비동기로 처리해야 할 작업이 실시간으로 처리된다면, 동일한 요청이 100개가 들어오면 부가 작업도 100개를 실시간으로 처리해야 합니다. 서비스가 잘되어 해당 요청이 1,000건 10,000건 지속적으로 늘어난다면 리소스를 많이 사용하는 부가 작업일 경우 시스템 확장을 위해 많은 비용을 투자해야 합니다. 이러한 작업을 Queue를 이용하여 비동기 작업으로 대체하면 한 번에 처리할 수 있는 작업량을 서버 측에서 컨트롤 할 수 있게 되므로, 요청이 급속하게 늘어나더라도 응답 지연을 최대한 늦출수 있게 됩니다.

유연한 실패건 재시도

HTTP 통신 중 발생하는 작업은 불의의 사고로 네트워크 통신이 중단되거나 서버 내부의 처리가 어떠한 이유로 실패하게 될 경우엔 작업을 다시 처리하기가 힘들고 복잡합니다. (처리한다 해도 부가적인 시간이 필요하므로 응답속도에 영향을 주게 됩니다.) 하지만 Queue를 사용하면 작업에 실패했을 때 시스템이 정상화된 후나 시간적인 여유가 있을 때 다시 처리하는 방식을 쓸 수 있으므로 좀 더 유연하게 실패 상황을 컨트롤 할 수 있습니다.

또한 SQS를 사용하면 정해진 횟수 이상 처리가 실패했을 때 해당 작업들을 따로 모아서 처리할 수 있는 방법(DeadLetter)도 제공합니다.

AWS SQS는 이러한 상황을 처리하는데 적합한 solution입니다. 또한 SQS를 Lambda 함수의 트리거로 설정하면 메시지를 정확하고 빠르게 lambda 함수로 전달할 수 있어 매우 유연하게 작업을 처리할 수 있습니다.

SQS는 두가지 모드의 Queue를 제공하는데 표준과 FIFO Queue입니다. 차이점은 Queue가 처리될때 순차적으로 처리되냐 아니냐의 차이입니다. Queue에 저장된 작업이 순서와 상관없이 처리되도 상관없으면 표준 Queue를 이용하면 되고 처리 순서가 중요하다면 FIFO Queue를 이용하면 됩니다. 작업의 성향에 따라 적절한 방식의 Queue를 생성해서 사용하면 됩니다. 참고로 처리 속도로만 따지면 제약이 없는 일반 Queue의 성능이 좀 더 좋습니다.

역할 생성

lambda함수에서 SQS 리소스를 사용할 수 있도록 역할을 하나 생성합니다.

IAM 접속

https://console.aws.amazon.com/iam/home#/roles

역할 – 역할 만들기 클릭

사용 사례 – Lambda 선택후 다음:권한 클릭

권한 정책 연결 – SQS검색 – AmazoneSQSFullAccess / AWSLambdaBasicExecutionRole 선택 – 다음:태그 버튼 클릭

태그 추가(선택사항) 변경 없음 – 다음:검토 클릭
검토 – 역할 이름 작성 – lambda-sqs – 역할 만들기 클릭

lambda-sqs 역할 검색 – 상세 화면에서 역할 ARN 복사
arn:aws:iam::478069740483:role/lambda-sqs

SQS 생성

실습에서는 표준 Queue를 생성해서 사용합니다. AWS console의 SQS 화면에서 아래와 같이 대기열을 하나 생성합니다. 구성은 변경 없이 기본 값을 사용하겠습니다.

SQS에 접근할 수 있는 계정 및 역할을 설정합니다. 실습에서는 root와 위에서 생성한 role 추가하였습니다.

SQS 메시지 처리 Lambda 생성

serverless 명령어로 lambda함수를 생성합니다. 그리고 npm으로 node 개발을 위한 초기 환경을 세팅합니다. serverless-prune-plugin은 배포할 때 과거 버전의 lambda 함수를 최신 n개만 남기고 삭제해 주는 plugin입니다.

$ sls create --template aws-nodejs --path lambda-sqs --name lambda-sqs
$ cd lambda-sqs
$ npm init
$ npm i aws-sdk
$ sls plugin install -n serverless-prune-plugin

SQS Event 데이터 생성

실습에서 lambda 함수는 SQS에 인입된 메시지를 받아 처리를 해야 하는데 로컬에서 개발 중인 lambda 함수는 SQS trigger를 설정할 방법이 없습니다. 그래서 lambda 함수는 SQS에서 메시지가 전달되었다고 가정하고 테스트를 진행해야 합니다. lambda 함수로 전달되는 SQS의 메시지 형식은 정해져 있고 serverless에서는 메시지 파일을 lambda에 주입하는 명령어 옵션을 제공하므로 이 방법을 이용하여 로컬에서도 손쉽게 개발 및 테스트를 할 수 있습니다.

SQS에서 lambda로 전달되는 이벤트 메시지 format은 AWS Console의 lambda 서비스에서 알 수 있습니다. 이미 작성되어있는 lambda 함수 아무거나 상세 정보로 들어가면 위쪽 상단의 테스트 이벤트 선택 박스가 있는데 클릭하고 테스트 이벤트 구성을 선택하면 다음과 같은 화면을 볼 수 있습니다.

이 상태에서 hello-world가 선택되어 있는 이벤트 템플릿 박스를 클릭하면 lambda함수에 트리거로 연결할 수 있는 서비스들이 주르륵 나오는데, 여기서 Amazon SQS를 선택합니다.

선택하면 아래와 같이 텍스트 박스에 JSON 데이터가 출력됩니다. 이 JSON이 SQS 이벤트가 발생했을때 lambda 함수로 전달되는 event 객체입니다. 출력된 JSON을 복사하여 개발 프로젝트에 신규 파일을 생성합니다.

{
  "Records": [
    {
      "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
      "receiptHandle": "MessageReceiptHandle",
      "body": "Hello from SQS!",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1523232000000",
        "SenderId": "123456789012",
        "ApproximateFirstReceiveTimestamp": "1523232000001"
      },
      "messageAttributes": {},
      "md5OfBody": "7b270e59b47ff90a553787216d55d91d",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:ap-southeast-1:123456789012:MyQueue",
      "awsRegion": "ap-southeast-1"
    }
  ]
}

프로젝트 root에 test 디렉터리를 만들고 sample.json을 생성한다음 위 내용을 붙여넣기 합니다. JSON 파일 내용중 SQS 메시지는 body에 담겨서 전달되므로 해당 메시지를 적당히 수정합니다.

handler.js는 다음과 같이 수정합니다.

module.exports.hello = async event => {
  console.log(event.Records[0].body);
  return {
    statusCode: 200,
    body: JSON.stringify(
      {
        message: 'Go Serverless v1.0! Your function executed successfully!',
        input: event,
      },
      null,
      2
    ),
  };
};

위에서 작성한 JSON 파일을 lambda에 주입하여 테스트 하려면 다음과 같이 명령어를 실행하면 됩니다.

$ sls invoke local -f [펑션명] -p [테스트 json 파일]

console 출력 내용을 보면 SQS sample.json의 body 내용이 출력되고 event 객체에 테스트 json의 내용이 주입되어 결과로 출력되는 것을 확인할 수 있습니다.

$ sls invoke local -f hello -p test/sample.json
{"Hello from SQS!":"bbb"}
{
    "statusCode": 200,
    "body": "{\n  \"message\": \"Go Serverless v1.0! Your function executed successfully!\",\n  \"input\": {\n    \"Records\": [\n      {\n        \"messageId\": \"19dd0b57-b21e-4ac1-bd88-01bbb068cb78\",\n        \"receiptHandle\": \"MessageReceiptHandle\",\n        \"body\": \"{\\\"Hello from SQS!\\\":\\\"bbb\\\"}\",\n        \"attributes\": {\n          \"ApproximateReceiveCount\": \"1\",\n          \"SentTimestamp\": \"1523232000000\",\n          \"SenderId\": \"123456789012\",\n          \"ApproximateFirstReceiveTimestamp\": \"1523232000001\"\n        },\n        \"messageAttributes\": {},\n        \"md5OfBody\": \"7b270e59b47ff90a553787216d55d91d\",\n        \"eventSource\": \"aws:sqs\",\n        \"eventSourceARN\": \"arn:aws:sqs:ap-southeast-1:123456789012:MyQueue\",\n        \"awsRegion\": \"ap-southeast-1\"\n      }\n    ]\n  }\n}"
}

이번에는 lambda 함수를 aws에 배포하고 SQS 트리거를 실제로 설정한 다음 메시지를 발행해보겠습니다.
원격에 배포하려면 먼저 개발 pc에 aws credential을 설정 해야 합니다. 그리고 포스트 하단의 awslogs 설치를 통해 원격에 배포한 lambda 실행 로그를 개발 pc에서 확인할 수 있으니 포스팅을 참고하여 설정 해주시기 바랍니다.

일단 serverless.yml을 열고 다음과 같이 배포될 region 및 role을 추가합니다. 실습에서는 서울 region을 사용할 것이므로 ap-northeast-2를 추가합니다. 설정을 추가하지 않고 배포하면 기본으로 미국동부(버지니아 북부) us-east-1으로 배포가 됩니다. role은 위에서 생성한 role의 arn을 적습니다.

service: lambda-sqs
provider:
  name: aws
  runtime: nodejs12.x
  region: ap-northeast-2
  role: arn:aws:iam::478069740483:role/lambda-sqs
functions:
  hello:
    handler: handler.hello
plugins:
  - serverless-prune-plugin

이제 serverless deploy 명령으로 lambda 함수를 배포합니다. 실습에서는 stage 정보를 dev로 주고 배포하였습니다.

$ sls deploy -s dev

배포 완료되면 aws console의 lambda 서비스에서 업로드된 lambda 함수를 확인할 수 있습니다. 함수 이름을 클릭하면 상세화면으로 이동하고 트리거를 추가할 수 있습니다.

구성 – 디자이너 “+ 트리거 추가” 클릭

SQS 선택

sampleQueue 선택

SQS trigger 설정 완료

메시지 발행

AWS console에서 온라인으로 SQS에 메시지 발행이 가능합니다. SQS 서비스 화면으로 이동하여 sampleQueue를 체크 후 메뉴 – 작업 – 메시지 전송 및 수신을 선택합니다.

본문에 발행할 메시지 내용을 작성하고 메시지 전송을 클릭합니다.

trigger를 설정해 두었으므로 메시지가 발행되면 자동으로 lambda함수가 실행됩니다. 로그 확인을 위해 cloudwatch로 이동합니다. 왼쪽 메뉴 로그-로그 그룹을 선택하고 아래와 같이 lambda함수를 검색합니다.

검색된 항목을 클릭하면 상세 내용이 출력되고 아래 로그 스트림에서 관련 로그를 확인 할 수 있습니다.

lambda 함수가 SQS에서 메시지를 전달받아 출력하고 종료된 로그를 확인할 수 있습니다.

만약 awslogs를 이용하여 개발 pc에서 로그를 확인하고 싶으면 다음과 같이 터미널에서 명령을 실행하면 됩니다.

$ awslogs get /aws/lambda/lambda-sqs-dev-hello -S -G —watch

lambda 서버스의 모니터링 탭에서는 여러가지 지표에 대한 정보를 확인할 수 있습니다.

RestAPI에서 SQS 메시지 발행하기

AWS gateway와 lambda함수를 활용하여 RestAPI를 만들고 내부에서 SQS로 메시지를 발행하는 간단한 프로그램을 작성합니다.

serverless 명령어로 빠르게 lambda함수를 하나 생성합니다.

$ sls create --template aws-nodejs --path lambda-rest --name lambda-rest
$ cd lambda-rest
$ npm init
$ npm i aws-sdk
$ sls plugin install -n serverless-prune-plugin

handler.js 수정

Restapi에 전달된 Request body 데이터를 console log로 출력하고 SQS에 메시지를 발행하는 간단한 로직을 작성합니다.

'use strict';
const AWS = require('aws-sdk'); 
AWS.config.update({region: 'ap-northeast-2'});
const sqs = new AWS.SQS({apiVersion: '2012-11-05'});
module.exports.hello = (event, context, callback) => {
  const body = JSON.parse(event.body);
  console.info("Data Saved >>>> %s - %s - %s - %s",body.user_id, body.user_nick, body.contents_title, body.message);
  let params = {
    MessageBody: event.body,
    QueueUrl: "https://sqs.ap-northeast-2.amazonaws.com/478069740483/sampleQueue"
  };
  sqs.sendMessage(params).promise()
    .then((data) => {
      console.info("SQS Send Message Success", data.MessageId);
      const response = {
        statusCode: 200,
        body: JSON.stringify(
          {
            message: 'Go Serverless v1.0! Your function executed successfully!',
            input: event,
          },
          null,
          2
        ),
      };
      callback(null, response);
    })
    .catch((err) => {
      console.info("SQS Send Message Error", err);
      callback(err);
    })
};

serverless.yml 수정

아래와 같이 serverless.yml을 수정합니다. region과 role은 위에서 생성한 lambda-sqs와 동일하게 설정합니다. 그리고 api gateway 리소스를 자동으로 생성하기 위해 functions handler하위에 events.http를 작성합니다.

service: lambda-rest
provider:
  name: aws
  runtime: nodejs12.x
  region: ap-northeast-2
  role: arn:aws:iam::478069740483:role/lambda-sqs
functions:
  hello:
    handler: handler.hello
    events:
      - http:
          path: contents/message
          method: post
plugins:
  - serverless-prune-plugin

로컬 테스트를 위해 아래 포맷의 JSON 파일을 생성합니다. body에서 데이터를 받아 처리 할 것이므로 body안에 테스트 데이터를 세팅합니다.

{
    "body": "{\n    \"user_id\":\"id123321\", \n    \"user_nick\":\"kooong\", \n    \"contents_title\":\"king\",  \n    \"message\":\"kong\"\n}",
    "resource": "/{proxy+}",
    "path": "/path/to/resource",
    "httpMethod": "POST",
    "isBase64Encoded": true,
    "queryStringParameters": {
      "foo": "bar"
    },
    "multiValueQueryStringParameters": {
      "foo": [
        "bar"
      ]
    },
    "pathParameters": {
      "proxy": "/path/to/resource"
    },
    "stageVariables": {
      "baz": "qux"
    },
    "headers": {
      "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
      "Accept-Encoding": "gzip, deflate, sdch",
      "Accept-Language": "en-US,en;q=0.8",
      "Cache-Control": "max-age=0",
      "CloudFront-Forwarded-Proto": "https",
      "CloudFront-Is-Desktop-Viewer": "true",
      "CloudFront-Is-Mobile-Viewer": "false",
      "CloudFront-Is-SmartTV-Viewer": "false",
      "CloudFront-Is-Tablet-Viewer": "false",
      "CloudFront-Viewer-Country": "US",
      "Host": "1234567890.execute-api.ap-southeast-1.amazonaws.com",
      "Upgrade-Insecure-Requests": "1",
      "User-Agent": "Custom User Agent String",
      "Via": "1.1 08f323deadbeefa7af34d5feb414ce27.cloudfront.net (CloudFront)",
      "X-Amz-Cf-Id": "cDehVQoZnx43VYQb9j2-nvCh-9z396Uhbp027Y2JvkCPNLmGJHqlaA==",
      "X-Forwarded-For": "127.0.0.1, 127.0.0.2",
      "X-Forwarded-Port": "443",
      "X-Forwarded-Proto": "https"
    },
    "multiValueHeaders": {
      "Accept": [
        "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"
      ],
      "Accept-Encoding": [
        "gzip, deflate, sdch"
      ],
      "Accept-Language": [
        "en-US,en;q=0.8"
      ],
      "Cache-Control": [
        "max-age=0"
      ],
      "CloudFront-Forwarded-Proto": [
        "https"
      ],
      "CloudFront-Is-Desktop-Viewer": [
        "true"
      ],
      "CloudFront-Is-Mobile-Viewer": [
        "false"
      ],
      "CloudFront-Is-SmartTV-Viewer": [
        "false"
      ],
      "CloudFront-Is-Tablet-Viewer": [
        "false"
      ],
      "CloudFront-Viewer-Country": [
        "US"
      ],
      "Host": [
        "0123456789.execute-api.ap-southeast-1.amazonaws.com"
      ],
      "Upgrade-Insecure-Requests": [
        "1"
      ],
      "User-Agent": [
        "Custom User Agent String"
      ],
      "Via": [
        "1.1 08f323deadbeefa7af34d5feb414ce27.cloudfront.net (CloudFront)"
      ],
      "X-Amz-Cf-Id": [
        "cDehVQoZnx43VYQb9j2-nvCh-9z396Uhbp027Y2JvkCPNLmGJHqlaA=="
      ],
      "X-Forwarded-For": [
        "127.0.0.1, 127.0.0.2"
      ],
      "X-Forwarded-Port": [
        "443"
      ],
      "X-Forwarded-Proto": [
        "https"
      ]
    },
    "requestContext": {
      "accountId": "123456789012",
      "resourceId": "123456",
      "stage": "prod",
      "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",
      "requestTime": "09/Apr/2015:12:34:56 +0000",
      "requestTimeEpoch": 1428582896000,
      "identity": {
        "cognitoIdentityPoolId": null,
        "accountId": null,
        "cognitoIdentityId": null,
        "caller": null,
        "accessKey": null,
        "sourceIp": "127.0.0.1",
        "cognitoAuthenticationType": null,
        "cognitoAuthenticationProvider": null,
        "userArn": null,
        "userAgent": "Custom User Agent String",
        "user": null
      },
      "path": "/prod/path/to/resource",
      "resourcePath": "/{proxy+}",
      "httpMethod": "POST",
      "apiId": "1234567890",
      "protocol": "HTTP/1.1"
    }
  }

로컬 테스트

Request body정보를 console에 출력하고 SQS에 메시지를 발행한다음 결과 ID가 출력됩니다.

$ sls invoke local -f hello -p test/gateway.json
Data Saved >>>> id123321 - kooong - king - kong
Success eef6f8e7-1ac4-4a21-a466-fd05e7c3996a

서버 배포

$ sls deploy -s dev
Serverless: Stack update finished...
Service Information
service: lambda-rest
stage: dev
region: ap-northeast-2
stack: lambda-rest-dev
resources: 12
api keys:
  None
endpoints:
  POST - https://lr0ye6whjf.execute-api.ap-northeast-2.amazonaws.com/dev/contents/message
functions:
  hello: lambda-rest-dev-hello
layers:
  None

lambda-sqs 배포와 동일하지만 배포가 완료된 후 aws console의 API gateway에 접속해보면 자동으로 리소스가 생성되어 있는 것을 확인할 수 있습니다. 자동으로 dev 스테이지까지 배포가 되어있으며 배포 로그 상으로도 생성된 endpoint 정보를 확인할 수 있습니다.

이제 아래 endpoint에 body 데이터를 실어서 POST로 호출하면 SQS에 메시지가 발행되고 트리거를 통해 lambda가 실행됩니다.
POST – https://lr0ye6whjf.execute-api.ap-northeast-2.amazonaws.com/dev/contents/message

SQS에 발행된 메시지 처리

SQS에서 발행된 메시지를 log로 확인하기 위해 lambda-sqs 프로젝트의 handler.js를 아래와 같이 수정합니다. SQS 메시지는 event.Records[0].body값을 읽으면 알수 있습니다. 메시지는 JSON String이므로 JSON.parse를 통해 JSON객체로 변환한 후에 항목들을 조회하여 console log로 출력합니다.

lambda-sqs – handler.js 수정

module.exports.hello = async event => {
  console.log("--------------1-----------------")
  const body = JSON.parse(event.Records[0].body); 
  console.log(body);
  console.log("--------------2-----------------")
  console.log("Data Received >>>> %s - %s - %s - %s",body.user_id, body.user_nick, body.contents_title, body.message);
  console.log("--------------3-----------------")
  return {
    statusCode: 200,
    body: JSON.stringify(
      {
        message: 'Go Serverless v1.0! Your function executed successfully!',
        input: event,
      },
      null,
      2
    ),
  };
};

적용한 내용을 배포합니다.

sls deploy -s dev

테스트

RestAPI에 요청을 보내어 SQS에 메시지를 발행하고 트리거로 lambda 함수가 실행되는지 테스트 해보겠습니다. RestAPI 테스트를 위해 AWS Gateway에 접속합니다. 로컬에서 Postman이나 curl을 이용해도 되지만 aws에서 테스트 화면을 제공하므로 그것을 이용해 보겠습니다. gateway 서비스로 접속하여 dev-lambda-rest를 선택하고 상세 화면에서 테스트할 리소스의 POST를 클릭하면 아래와 같은 화면이 출력됩니다.

위 화면에서 테스트(번개표시)를 클릭하면 웹 콘솔에서 해당 리소스로 요청을 보낼 수 있는 화면이 출력됩니다. 요청 본문에 body값을 세팅하고 테스트 버튼을 클릭하면 해당 리소스에 대한 요청이 수행됩니다. 오른쪽 화면에서는 응답 본문과 로그를 확인할 수 있습니다.

SQS에 메시지가 한건 발행되면 lambda-sqs에서 해당 메시지를 처리하게 됩니다. 관련 로그를 cloudwatch에서 확인하면 다음과 같이 출력됩니다.

START RequestId: ac025291-21df-5a2a-a2f3-050e4e3caf6f Version: $LATEST
2020-08-01T13:27:00.384Z  ac025291-21df-5a2a-a2f3-050e4e3caf6f  INFO  --------------1-----------------
2020-08-01T13:27:00.386Z  ac025291-21df-5a2a-a2f3-050e4e3caf6f  INFO  {
  user_id: 'id123321',
  user_nick: 'kooong',
  contents_title: 'king',
  message: 'kong'
}
2020-08-01T13:27:00.386Z  ac025291-21df-5a2a-a2f3-050e4e3caf6f  INFO  --------------2-----------------
2020-08-01T13:27:00.386Z  ac025291-21df-5a2a-a2f3-050e4e3caf6f  INFO  Data Received >>>> id123321 - kooong - king - kong
2020-08-01T13:27:00.386Z  ac025291-21df-5a2a-a2f3-050e4e3caf6f  INFO  --------------3-----------------
END RequestId: ac025291-21df-5a2a-a2f3-050e4e3caf6f
REPORT RequestId: ac025291-21df-5a2a-a2f3-050e4e3caf6f  Duration: 4.58 ms  Billed Duration: 100 ms  Memory Size: 1024 MB  Max Memory Used: 64 MB  Init Duration: 114.32 ms

DeadLetter

SQS에 발행된 메시지를 lambda함수가 처리하는 도중 문제가 발생하여 처리를 못하는 경우가 발생할 수 있습니다. 이런 상황이 발생하면 처리 못한 메시지는 다시 SQS의 사용 가능한 메시지 상태로 변경되어 trigger 된 lambda를 재 호출하게 됩니다. 만약 처리 중 일시적으로 발생한 오류라면 다시 처리되는 과정에서 자연스럽게 SQS 메시지가 사라지는데 SQS 메시지 자체에 문제가 있거나 시스템 문제가 오랫동안 지속될 경우 메시지가 삭제되지 않고 무한정 lambda 함수를 호출하는 상황이 발생하게 됩니다.

이를 처리하기 위해 SQS는 DeadLetter라는 Queue를 제공합니다. 메시지 처리 시 특정 횟수 이상 실패할 경우 더 이상 해당 메시지를 처리하지 않고 DeadLetter Queue로 이동시켜 별도 처리를 할 수 있도록 제공하는 것입니다.

DeadLetter 설정은 하나의 SQS를 더 생성하고 원본 SQS의 DeadLetter로 설정하면 됩니다. SQS 생성 방법은 위에서 설명한 sampleQueue와 동일하기 때문에 생략하겠습니다.

실습에서는 sampleQueueDL을 DeadLetter용으로 하나 생성하였습니다. 생성이 완료되면 sampleQueue 상세 화면으로 들어가 편집을 클릭합니다. 하단으로 스크롤을 내리면 배달 못한 편지 대기열 항목이 있는데 해당 항목을 활성화하고 생성한 sampleQueueDL을 지정합니다. 최대 수신수는 3으로 설정하였습니다. lambda에서 3번 재시도를 하고 성공하지 못할 경우 메시지를 sampleQueueDL로 이동시키라는 의미입니다.

설정이 완료되면 sampleQueue의 상세화면에서 등록된 배달 못한 편지 대기열을 확인할 수 있습니다.

DeadLetter 테스트

전송된 메시지를 DeadLetter로 이동시키기 위해서는 처리 lambda에서 작업을 실패하게 해야 합니다. 가장 간단한 방법은 형식에 맞지 않는 메시지를 발송하는 것입니다. 일단 SQS 메시지를 처리하는 lambda-sqs를 아래와 같이 변경하여 메시지 값을 읽을 수 없을 경우 Error를 발생시키도록 수정합니다.

'use strict';
module.exports.hello = (event, context, callback) => {
  console.log("--------------1-----------------")
  const body = JSON.parse(event.Records[0].body); 
  console.log(body);
  console.log("--------------2-----------------")
  console.log("Data Received >>>> %s - %s - %s - %s",body.user_id, body.user_nick, body.contents_title, body.message);
  console.log("--------------3-----------------")
  if(!body.user_id || !body.user_nick || !body.contents_title || !body.message)
    return callback("Messsage Error");
  return {
    statusCode: 200,
    body: JSON.stringify(
      {
        message: 'Go Serverless v1.0! Your function executed successfully!',
        input: event,
      },
      null,
      2
    ),
  };
};

수정한 lambda-sqs를 배포한 다음 아래와 같이 gateway 테스트 화면에서 형식에 맞지 않는 JSON을 입력하고 테스트를 시도합니다.

lambda-sqs 로그를 확인하면 아래와 같은 오류가 총 3번 발생한 후 Queue가 비워지는 것을 확인할 수 있습니다.

START RequestId: 146badd5-771e-5356-a210-fa4a03aa2c28 Version: $LATEST
2020-08-01T13:47:42.274Z  146badd5-771e-5356-a210-fa4a03aa2c28  INFO  --------------1-----------------
2020-08-01T13:47:42.274Z  146badd5-771e-5356-a210-fa4a03aa2c28  INFO  { user_id: 'id123321' }
2020-08-01T13:47:42.274Z  146badd5-771e-5356-a210-fa4a03aa2c28  INFO  --------------2-----------------
2020-08-01T13:47:42.274Z  146badd5-771e-5356-a210-fa4a03aa2c28  INFO  Data Received >>>> id123321 - undefined - undefined - undefined
2020-08-01T13:47:42.274Z  146badd5-771e-5356-a210-fa4a03aa2c28  INFO  --------------3-----------------
2020-08-01T13:47:42.275Z  146badd5-771e-5356-a210-fa4a03aa2c28  ERROR  Invoke Error   {"errorType":"Error","errorMessage":"Messsage Error","stack":["Error: Messsage Error","    at _homogeneousError (/var/runtime/CallbackContext.js:12:12)","    at postError (/var/runtime/CallbackContext.js:29:54)","    at callback (/var/runtime/CallbackContext.js:41:7)","    at /var/runtime/CallbackContext.js:104:16","    at Runtime.module.exports.hello [as handler] (/var/task/handler.js:11:12)","    at Runtime.handleOnce (/var/runtime/Runtime.js:66:25)"]}
END RequestId: 146badd5-771e-5356-a210-fa4a03aa2c28
REPORT RequestId: 146badd5-771e-5356-a210-fa4a03aa2c28  Duration: 5.10 ms  Billed Duration: 100 ms  Memory Size: 1024 MB  Max Memory Used: 64 MB

처리되지 못한 메시지는 DeadLetterQueue로 이동했음을 확인할 수 있습니다.

DeadLetter에 메시지가 발행되면 해당 메시지를 확인 후 처리해야 하므로 DeadLetter를 trigger로 하는 lambda를 작성하여 알람이나 메일을 보내도록 처리하면 됩니다.

실습에서 작성한 코드는 아래 github에서 확인할 수 있습니다.

https://github.com/codej99/LambdaTriggerSQS

https://github.com/codej99/LambdaRestSQS

연재글 이동[이전글] aws lambda 개발하기(7) – CircleCI를 이용하여 자동 배포하기
[다음글] aws lambda 개발하기(9) – Lambda Layer 이용하여 배포 사이즈 줄이기