이 연재글은 KAFKA 정보 모음의 3번째 글입니다.

Springboot 프로젝트에서 kafka를 사용하여 Producer, Consumer 구현 시 라이브러리에 따라 구현하는 방식이 상이하여 관련 내용을 정리하고자 포스팅을 하게 되었습니다. 자주 사용하는 라이브러리로는 spring-kafka, spring-cloud-starter-stream-kafka 두 가지가 있습니다. 심플하게 사용하기에는 spring-kafka가 적당 합니다. spring-cloud-starter-stream-kafka의 경우는 구현이 살짝 더 복잡한데 범용으로 사용하기 위해 의존성을 낮추고 캡슐화가 되어있기 때문입니다.

예를 들자면 spring-kafka의 경우는 kafka 플랫폼에서만 사용할 수 있지만 spring-cloud-starter-stream를 이용하여 작성한 코드는 kafka에서 rabbitmq 등 다른 stream 플랫폼으로 전환할 때 라이브러리만 spring-cloud-starter-stream-rabbit으로 교체하면 거의 코드 수정 없이 손쉽게 전환할 수 있습니다.

spring-kafka를 이용한 이벤트 메시지 처리

build.gradle

dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
    // 생략...
}

application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9093,localhost:9094,localhost:9095
    consumer:
      group-id: consumer-group-1
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Producer 예제 – 문자열 메시지 발행

@Component
public class KafkaMessageProducer {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    public void sendMessage(String payload) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> message = new ProducerRecord<>("domain-event", payload);
        producer.send(message);
    }
}

Consumer 예제 – 문자열 메시지 소비

@Component
public class KafkaMessageConsumer {
    @KafkaListener(topics = "domain-event")
    public void consumeMessage(@Headers MessageHeaders headers, @Payload String message) {
        System.out.println("Received Headers : "+headers);
        System.out.println("Received Payloads : "+message);
    }
}
/**
Received Headers : {kafka_offset=385, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@682362d8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608911503694, kafka_groupId=consumer-group-1}
Received Payloads : message
**/

객체 메시지 Producer/Consumer

application.yml

value-serializer를 JsonSerializer로 수정, value-deserializer를 JsonDeserializer로 수정, spring.json.trusted.packages 추가

spring:
  kafka:
    bootstrap-servers: localhost:9092,localhost:9094,localhost:9095
    consumer:
      group-id: consumer-group-1
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: com.spring.kafka.domain.model
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

객체 생성

package com.spring.kafka.domain.model;
import lombok.*;

@Getter
@Builder
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class User {
    private String id;
    private String name;
    private int age;
}

Producer

@Component
public class KafkaMessageProducer {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    public void sendMessage(User user) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, User> message = new ProducerRecord<>("domain-event-user", user);
        producer.send(message);
    }
}

Consumer

@Slf4j
@Component
public class KafkaMessageConsumer {
    @KafkaListener(topics = "domain-event")
    public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
        log.debug("Received Headers : "+headers);
        log.debug("Received Payloads : "+user.toString());
    }
}
/**
Received Headers : {kafka_offset=384, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4ab7d56c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608910676656, kafka_groupId=consumer-group-1}
Received Payloads : User(id=happydaddy@naver.com, name=happydaddy, age=28)
**/

spring-cloud-kafka를 이용한 이벤트 메시지 처리

build.gradle

repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/milestone' }
}

ext {
    set('springCloudVersion', "Hoxton.SR9")
}

dependencies {
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    // 생략
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

// 나머지 내용 생략

application.yml

spring:  
  cloud.stream:
    bindings:
      domainEventString-in-0:
        content-type: text/plain
        destination: domain-event-string
        group: consumer-group-string
      domainEventString-out-0:
        destination: domain-event-string
        group: consumer-group-string
      domainEventModel-in-0:
        destination: domain-event-model
        group: consumer-group-model
      domainEventModel-out-0:
        destination: domain-event-model
        group: consumer-group-model
    kafka:
      binder:
        brokers: localhost:9092,localhost:9094,localhost:9095
        configuration:
          auto.offset.reset: earliest
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
  cloud:
    stream:
      function:
        definition: domainEventString;domainEventModel

Producer 예제 – 문자열 메시지 발행

@Component
@RequiredArgsConstructor
public class KafkaMessageProducer {

    private final StreamBridge streamBridge;

    public void sendMessageBySpringCloud(String payload) {
        streamBridge.send("domainEventString-out-0", payload);
    }
}

Consumer 예제 – 문자열 메시지 소비

@Slf4j
@Component
public class KafkaSpringCloudMessageConsumer {
    @Bean
    Consumer<String> domainEventString() {
        return input -> {
            System.out.println("Received Message : " + input);
        };
    }
}

/**
[결과] 
Received Message : message
**/

Producer 예제 – 객체 메시지 발행

객체 생성

@Getter
@Builder
@ToString
public class User {
    private String id;
    private String name;
    private int age;
}

Producer

@Component
@RequiredArgsConstructor
public class KafkaMessageProducer {

    private final StreamBridge streamBridge;

    public void sendMessageBySpringCloud(User payload) {
        streamBridge.send("domainEventModel-out-0", payload);
    }
}

Consumer 예제 – 객체 메시지 소비

@Slf4j
@Component
public class KafkaSpringCloudMessageConsumer {
    @Bean
    Consumer<User> domainEventModel() {
        return input -> {
            System.out.println("Received Message : " + input);
        };
    }
}
/**
Received Message : User(id=happydaddy@naver.com, name=happydaddy, age=28)
**/

실습한 소스는 아래 GitHub에서 확인할 수 있습니다.

https://github.com/codej99/SpringKafkaConsumer.git

참고 자료

https://docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream.html#_overview

더 보기

연재글 이동[이전글] Apache kafka Installation by docker
[다음글] Kafka Consumer retry 및 deadletter 처리 방법