반응형

 

 

마이크로서비스의 독립적인 서비스들은 각각 자체 데이터 스토어를 가지며 서로 통신하여 전체 시스템을 구성한다.

데이터 동기화는 이러한 독립적인 마이크로서비스 간에 데이터 일관성을 유지하고 서로의 상태를 동기화하기 위해 중요한 측면을 띄고 있다.

 

 

 


E-Commerce 어플리케이션에서 예를 들면 주문 관리 서비스와 재고 관리 서비스가 각각의 데이터베이스를 가질 때, 주문이 발생하면 재고를 감소시켜야 한다.

 

이 때, 이벤트 기반의 Message Broker, 혹은 Message Queueing Service는 이런 데이터의 동기화에 유용하게 사용될 수 있다.

 

위 사례에서, Order-service가 주문을 처리함과 동시에 Message Broker에게 이벤트를 Produce하면서, Catalog-service는 해당 이벤트를 Listen하고 있다가 발생 시, Consume하여 재고 감소 처리를 이루어낼 수 있다.

 

Apache Kafka는 Message Broker의 종류 중 하나로, 현재 광범위하게 사용되고 있는 서비스이다. 카프카를 이용하면 이벤트 기반의 처리를 통해 마이크로서비스간의 일관성을 유지하는 데에 큰 역할을 할 수 있다.

 

 


Apache Kafka

 

카프카에 대한 대략적인 주요 개념들은 다음 포스팅에서 확인할 수 있다.

만약, Broker, Client(Consumer, Producer)에 관한 내용을 알지 못한다면 참고하면 좋을 것 같다.

 

https://sjh9708.tistory.com/151

 

[Apache Kafka] Kafka의 주요 개념과 구성 요소

Apache Kafka는 데이터의 실시간 스트리밍 및 이벤트 처리에 유용한 도구로서 활용된다. Kafka는 대규모 분산 시스템에서 안정적이고 확장 가능한 메시지 큐와 이벤트 스트리밍 플랫폼으로 사용되

sjh9708.tistory.com

 


Spring에서의 Kafka 적용하기

 

이제 Spring에서 Order-service와 Catalog-service를 작성하면서 위의 시나리오를 구현해 보도록 하자.

Order-service는 이벤트를 발행해야 하므로 Producer가 될 것이고, Catalog-service는 이벤트를 구독하여 소비해야 하므로 Consume가 될 것이다.

 

 

 

Dependency와 환경설정

 

Consumer와 Producer 둘 다 Kafka와 관련된 같은 의존성을 추가해 주어야 하고, 관련 설정을 추가해주자.

 

 

pom.xml

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

 

다음 Kafka 관련 의존성을 추가해주자.

 

 

 

Kafka 서버 가동 및 Topic 추가

Kafka 서버가 켜져있어야 하고, 이벤트의 Topic을 추가해주어야 한다.

해당 글을 참고하여 Kafka를 설치하고, 서버와 Zookeeper를 실행한 후, "example-catalog-topic"이라는 이름의 토픽을 추가해주자.

 

https://sjh9708.tistory.com/152

 

[Apache Kafka] 설치와 Client(Producer/Consumer) 간 Message 교환

해당 포스팅에서는 Kafka를 설치하고, CLI를 통해 Publisher, Consumer 간의 메시지 교환을 확인해보려고 한다. Kafka에 대한 대략적인 개념은 해당 포스팅에 작성하였으므로 참고하면 좋을 것 같다. https:

sjh9708.tistory.com

 


Producer 작성하기

 

 

▶ /messagequeue/KafkaProducerConfig.java

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory(){
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //Serialize 방법 지정
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

 

Kafka Producer 연결 객체를 만들기 위한 Config 파일을 작성해주자.

@EnableKafka를 통해 관련된 Bean들을 등록하고 관련 설정을 활성화해 줄 수 있다.

 

BOOTSTRAP_SERVERS_CONFIG

bootstrap-servers는 Kafka 클러스터에 연결하기 위한 초기 연결 지점을 지정한다.

Kafka를 설치하여 클러스터 서버를 시작할 때, 디폴트 bootstrap-server의 포트는 9092 포트이다. 만약 다른 포트, 혹은 다른 머신에서 카프카를 돌리고 있다면 해당 설정을 바꾸어주면 된다.

 

KEY_SERIALIZER_CLASS_CONFIG / VALUE_SERIALIZER_CLASS_CONFIG

메세지는 카프카에 저장될 때 직렬화되고 역직렬화되어 돌려받는다. 

Producer는 카프카에 메시지를 생산하는 입장이므로 직렬화에 대한 방법을 지정해주어야 한다.

 

 

 

▶ /messagequeue/KafkaProducer.java

@Service
@Slf4j
@AllArgsConstructor
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    // 주입받은 KafkaTemplate을 사용하여 Kafka에 메시지를 전송하는 send 메서드
    public OrderDto send(String topic, OrderDto orderDto){
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            // OrderDto 객체를 JSON 문자열로 직렬화
            jsonInString = mapper.writeValueAsString(orderDto);
        } catch(JsonProcessingException e) {
            e.printStackTrace();
        }

        // KafkaTemplate을 사용하여 지정된 토픽에 JSON 문자열을 전송
        kafkaTemplate.send(topic, jsonInString);

        // 로깅을 통해 전송된 데이터를 기록
        log.info("Kafka Producer send data " + orderDto);

        // 전송된 OrderDto 객체를 반환
        return orderDto;
    }
}

 

Kafka에 메시지를 전송하는 기능을 수행하는 클래스를 만들어주자.

KafkaTemplate는, Key, Value 쌍의 형태로 구성된다.

send 메서드는 토픽과 데이터(여기서는 주문 관련 데이터이다.)를 받아 KafkaTemplate을 이용하여 메시지를 특정 토픽으로 전송하는 역할을 수행한다.

 

 

 

▶ /controller/OrderController.java

@RestController
@RequestMapping("/order-service")
@AllArgsConstructor
public class OrderController {

    private final Environment env;
    private final OrderService orderService;
    private final KafkaProducer kafkaProducer;


    /* order-service/{user_id}/orders/ */
    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@RequestBody RequestOrder order, @PathVariable("userId") String userId){
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        OrderDto orderDto = mapper.map(order, OrderDto.class);
        orderDto.setUserId(userId);

        /* Use JPA */
        OrderDto createdOrder = orderService.createOrder(orderDto);
        ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);

        /* Kafka Messaging Queue에 전달 */
        kafkaProducer.send("example-catalog-topic", orderDto);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);  //201 Return

    }

 

이제 주문을 처리하는 로직에서, 재고 감소를 위해서 Catalog-service로 메시지를 보내야 한다.

앞에서 작성했던 Producer send 메서드를 호출하여 Kafka에게 Message를 Produce하였다.

해당 과정으로 Kafka의 example-catalog-topic이라는 토픽에 orderDto가 직렬화되어 보내질 것이다.

 

 

 


Consumer 작성하기

 

이제 Producer가 생산한 메시지가 있는 토픽을 구독하고, 이를 수신할 Consumer에 해당하는 재고 서비스를 작성해보자.

 

 

▶ /messagequeue/KafkaConsumerconfig.java

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    // Kafka 컨슈머를 생성하기 위한 ConsumerFactory 빈을 정의
    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
        Map<String, Object> properties = new HashMap<>();

        // 부트스트랩 서버 주소 설정
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        // Kafka Consumer Group의 고유한 식별자를 설정
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");

        // 메시지 키 및 값의 디시리얼라이저를 설정
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    // 메시지를 수신하는 KafkaListenerContainerFactory 빈 정의
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
        // ConcurrentKafkaListenerContainerFactory를 생성
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory =
                new ConcurrentKafkaListenerContainerFactory<>();

        // 위에서 정의한 ConsumerFactory를 설정하여 KafkaListenerContainerFactory에 주입.
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        // Kafka 메시지를 병렬로 처리하기 위한 설정을 추가할 수 있음(스레드 풀 크기, 에러 핸들링..)
     
        return kafkaListenerContainerFactory;
    }
}

 

마찬가지로 Kafka에 관련된 연결 설정과, 메시지를 수신하는 ConcurrentKafkaListenerContainerFactory 빈을 정의하고 있다. 이는 @KafkaListener에서 메시지를 수신할 때 사용된다.

 

KEY_DESERIALIZER_CLASS_CONFIG / VALUE_DESERIALIZER_CLASS_CONFIG

Consumer는 Producer와 반대로 카프카에 메시지를 받아야하는 입장이므로 역직렬화에 대한 방법을 지정해주어야 한다.

 

GROUP_ID_CONFIG

동일한 컨슈머 그룹에 속한 여러 컨슈머 인스턴스가 Kafka 토픽에서 메시지를 소비할 때 함께 동작하도록 하는 개념이다.

만약 Consumer 인스턴스가 여러 개가 있다면, 이를 묶어 1개의 팀이라고 생각하고 특정 토픽에 대한 파티션의 업무를 분할하는 것이다.

 

해당 개념은 알아두어야 할 것이, 만약 그룹을 지정하지 않는다면 Consumer 인스턴스가 3개가 있다면 하나의 Message에 대해서 3개의 인스턴스에서 중복하여 3번 메시지를 수신받을 것이고, 이에 따른 처리도 중복되어 일어날 것이다.

 

이를 방지하기 위해 컨슈머들을 그룹으로 묶어서, 중복 처리의 방지와 파티션 분할을 통한 분산처리를 하기 위해서 지정하는 것이다.

 

 

 

▶ /messagequeue/KafkaConsumer.java

@Service
@Slf4j
@AllArgsConstructor
public class KafkaConsumer {
    private final CatalogRepository catalogRepository;

    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage){
        log.info("Kafka Message: -> " + kafkaMessage);
        
        // 수신한 JSON 형식의 메시지를 Map으로 변환
        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try{
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        }
        catch(JsonProcessingException ex){
            ex.printStackTrace();
        }
        String productId = (String)map.get("productId");
        CatalogEntity entity = catalogRepository.findByProductId(productId);
        if(entity != null){
            entity.setStock(entity.getStock() - (Integer)map.get("qty"));
            catalogRepository.save(entity);
        }
    }

}

 

@KafkaLister 어노테이션을 통해 example-catalog-topic에 대해서 구독하고, 메시지를 수신받을 수 있게 된다.

수신한 메시지는 JSON 형태이므로 이를 사용할 수 있는 형태로 바꾸면서, 메시지를 수신받을 때의 비즈니스 로직을 작성해준다.

따라서 Product의 재고량을 감소시키는 로직을 작성해주었다.

 

 

 

 


결과

 

아래 로그처럼 Order-Service가 메시지를 보내면, 이를 Catalog-service가 수신받아서 처리하는 것을 확인할 수 있다.

데이터베이스를 확인해보면 주문이 이루어질 때, 재고량도 감소한 것을 확인할 수 있었다.

 

 

 

 

 

 

 

반응형

BELATED ARTICLES

more