이번 실습에서는 SpringBoot 프로젝트에서 SQS에 메시지를 전달하고 받는 실습을 진행해 보겠습니다. SQS는 AWS에서 제공하는 메시지 대기열 서비스로서 대기열(Queue)을 이용하면 메시지를 순차적으로 저장할 수 있으며 대기열에 저장된 메시지는 별도의 프로세스에서 작업을 진행할 수 있습니다.

주로 1:1의 관계로 맵핑되는 작업에서 많이 사용됩니다. 예를 들면 회원 가입 후 가입완료 메일 발송, 고객 주문 완료 후 배송 처리와 같이 주 프로세스가 완료된 이후 추가적으로 발생하는 작업 처리를 위해 많이 사용됩니다. 또는 서비스에 대량의 이벤트가 발생할 때 Queue를 이용하면 작업이 예약되어 처리되기 때문에 대량의 요청에서도 안정적으로 시스템을 운용할 수 있게 됩니다.

참고로 동일 메시지를 다수가 받아야 하는(1:N) 작업일 경우엔 AWS에서 제공하는 아래 서비스 중 하나를 사용하면 됩니다.

  • SNS(Simple Notification Service)
  • MSK(Amazon Managed Streaming for Apache Kafka)
  • Kinesis(Amazon Kinesis Data Streams)

SQS와 관련된 좀더 자세한 내용은 아래 포스팅을 참고하시면 됩니다.

http://3.36.120.4/post/13236/aws-lambda-sqs

SpringBoot에서 SQS 사용하기

Spring 프로젝트에서 SQS를 사용하기 위해서는 AWS에서 제공하는 Java 라이브러리를 사용해도 되고 본 글에서 사용할 Spring Cloud Messaging 라이브러리를 사용해도 됩니다.

build.gradle

프로젝트에 org.springframework.cloud:spring-cloud-starter-aws-messaging 라이브러리를 추가합니다.

plugins {
	id 'org.springframework.boot' version '2.3.9.RELEASE'
	id 'io.spring.dependency-management' version '1.0.11.RELEASE'
	id 'java'
}

group = 'com.daddyprogrammer'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

ext {
	set('springCloudVersion', "Hoxton.SR10")
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web'
        implementation 'org.springframework.boot:spring-boot-starter'
	implementation 'org.springframework.cloud:spring-cloud-starter-aws-messaging'
	compileOnly 'org.projectlombok:lombok'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation('org.springframework.boot:spring-boot-starter-test') {
		exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
	}
}

dependencyManagement {
	imports {
		mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
	}
}

test {
	useJUnitPlatform()
}

application.yml 설정 추가

cloud:
  aws:
    region:
      static: 리전 정보(ex:ap-northeast-2)
    stack:
      auto: false
    credentials:
      access-key: IAM에서 발급받은 엑세스키
      secret-key: IAM에서 발급받은 시크릿키

Message Sender/Listener 구현

Spring Cloud 라이브러리를 이용하면 예제의 코드처럼 간단하게 Sender, Listener 구현이 가능합니다.

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageProcessor {

    private final String queueName = "event-collect";
    private QueueMessagingTemplate queueMessagingTemplate;

    @Autowired
    public MessageProcessor(AmazonSQS amazonSqs) {
        this.queueMessagingTemplate = new QueueMessagingTemplate((AmazonSQSAsync) amazonSqs);
    }

    public void send(String data) {
        Message<String> message = MessageBuilder.withPayload(data).build();
        queueMessagingTemplate.send(queueName, message);
    }

    @SqsListener(value = queueName)
    public void receive(String message) {
        log.info("Event : {}", message);
    }
}

@SqsListener 메시지 삭제 정책

Listener에는 4가지 메시지 삭제 정책(SqsMessageDeletionPolicy)이 있으며 내용은 다음과 같습니다.

  • ALWAYS – 리스너 메서드에 의한 메시지 처리 중 성공 (예외 발생 없음) 또는 실패 (예외 발생)시 항상 메시지를 삭제합니다.
  • NEVER – 메시지를 자동으로 삭제하지 않습니다. 수신 확인(Acknowledgment)로 명시적으로 삭제가 가능합니다.
  • NO_REDRIVE – Redrive policy(DeadLetterQueue)가 정의되지 않은 경우 메시지를 삭제합니다.
  • ON_SUCCESS – 리스너 메서드에 의해 성공적으로 실행되면 메시지를 삭제합니다.

정책을 적용하려면 다음과 같이 @SqsListener에 설정을 추가하면 됩니다. 명시적으로 선언하지 않으면 NO_REDRIVE가 기본 정책으로 사용됩니다.

@SqsListener(value = queueName, deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receive(String message) {
    log.info("Event : {}", message);
}

테스트를 위한 Controller 작성

간단히 Queue에 메시지를 넣기 위해 GetMapping으로 endpoint를 만듭니다. 서버를 실행하고 메시지를 발송하면 Listener가 메시지를 받아 콘솔에 로그를 출력하는 것을 확인할 수 있습니다.

@RequiredArgsConstructor
@RestController
public class MessagingController {

    private final MessageProcessor messageProcessor;

    @GetMapping("/send")
    public void sendMessage(@RequestParam String message) {
        messageProcessor.send(message);
    }
}

localhost:8080/send?message=helloSQS

2021-03-13 00:16:36.608  INFO 83435 --- [enerContainer-2] c.d.messaging.MessageProcessor           : Event : helloSQS

성능 개선

대기열의 메시지 처리중 시간이 오래 걸리는 작업이 발생하면 이후로 대기중인 메시지들은 처리가 지연될 수 있습니다. 병목 현상을 최대한 줄이기 위해서 다음과 같이 동시에 처리할 수 있는 쓰레드 풀의 개수를 설정할 수 있습니다. 해당 쓰레드가 어떤것인지 판별하기 위해 ThreadNamePrefix를 지정하여 로그상으로 확인이 가능합니다.

@EnableSqs
@Configuration
@EnableAutoConfiguration(exclude = {ContextInstanceDataAutoConfiguration.class})
public class SQSConfiguration {

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs, AsyncTaskExecutor asyncTaskExecutor) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSqs);
        factory.setTaskExecutor(asyncTaskExecutor);
        return factory;
    }

    @Bean
    public AsyncTaskExecutor asyncTaskExecutor() {
        ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
        asyncTaskExecutor.setCorePoolSize(20);
        asyncTaskExecutor.setMaxPoolSize(50);
        asyncTaskExecutor.setQueueCapacity(5);
        asyncTaskExecutor.setThreadNamePrefix("jobThread-");
        asyncTaskExecutor.initialize();
        return asyncTaskExecutor;
    }
}

corePoolSize : 최초 생성되는 스레드 사이즈이며 해당 사이즈로 최소 스레드가 유지.
maximumPoolSize : 해당 풀에 최대로 유지할 수 있는 스레드 개수.
queueCapacity : corePoolSize보다 스레드가 많아졌을 경우, 남는 스레드가 없을때 대기하는 큐. 해당 큐에 담을공간이 꽉차면 maximumPoolSize까지 점차 스레드 개수가 늘어난다.

설정 적용 전

쓰레드(enerContainer-번호)를 보면 10개 정도의 쓰레드가 생성되어 작업을 처리하는 것을 확인할 수 있습니다.

2021-03-13 01:55:03.824  INFO 97718 --- [enerContainer-9] c.d.messaging.MessageProcessor           : Event : helloSQS-18
2021-03-13 01:55:03.824  INFO 97718 --- [enerContainer-5] c.d.messaging.MessageProcessor           : Event : helloSQS-3
2021-03-13 01:55:08.877  INFO 97718 --- [enerContainer-5] c.d.messaging.MessageProcessor           : Event : helloSQS-14
2021-03-13 01:55:08.877  INFO 97718 --- [enerContainer-3] c.d.messaging.MessageProcessor           : Event : helloSQS-30
2021-03-13 01:55:08.877  INFO 97718 --- [enerContainer-7] c.d.messaging.MessageProcessor           : Event : helloSQS-18
2021-03-13 01:55:08.877  INFO 97718 --- [enerContainer-9] c.d.messaging.MessageProcessor           : Event : helloSQS-3
2021-03-13 01:55:13.923  INFO 97718 --- [enerContainer-4] c.d.messaging.MessageProcessor           : Event : helloSQS-7
2021-03-13 01:55:13.923  INFO 97718 --- [enerContainer-9] c.d.messaging.MessageProcessor           : Event : helloSQS-4
2021-03-13 01:55:13.923  INFO 97718 --- [enerContainer-7] c.d.messaging.MessageProcessor           : Event : helloSQS-27
2021-03-13 01:55:13.923  INFO 97718 --- [enerContainer-3] c.d.messaging.MessageProcessor           : Event : helloSQS-29
2021-03-13 01:55:13.923  INFO 97718 --- [enerContainer-5] c.d.messaging.MessageProcessor           : Event : helloSQS-22

설정 적용 후

기본 20개의 쓰레드로 시작하며 maximumPoolSize까지 쓰레드 개수가 점차 증가되되면서 메시지를 처리하는 것을 확인할 수 있습니다.

2021-03-13 01:58:57.143  INFO 98274 --- [   jobThread-10] c.d.messaging.MessageProcessor           : Event : helloSQS-11
2021-03-13 01:58:57.143  INFO 98274 --- [   jobThread-16] c.d.messaging.MessageProcessor           : Event : helloSQS-10
2021-03-13 01:58:57.143  INFO 98274 --- [   jobThread-22] c.d.messaging.MessageProcessor           : Event : helloSQS-2
2021-03-13 01:58:57.143  INFO 98274 --- [   jobThread-18] c.d.messaging.MessageProcessor           : Event : helloSQS-25
2021-03-13 01:58:57.143  INFO 98274 --- [   jobThread-23] c.d.messaging.MessageProcessor           : Event : helloSQS-16
2021-03-13 01:58:57.143  INFO 98274 --- [   jobThread-19] c.d.messaging.MessageProcessor           : Event : helloSQS-20
2021-03-13 01:58:57.143  INFO 98274 --- [   jobThread-15] c.d.messaging.MessageProcessor           : Event : helloSQS-9
2021-03-13 01:58:57.143  INFO 98274 --- [    jobThread-6] c.d.messaging.MessageProcessor           : Event : helloSQS-15

실습에서 사용한 소스는 아래 Github에서 자세하게 확인할 수 있습니다.

https://github.com/codej99/spring-sqs-messaging.git