반응형

 

 

 

 

 

 

이전 포스팅에서 서로 다른 서비스들 간의 데이터 동기화를 Kafka를 이용하여 수행해보았다.

이번에는 같은 서비스가 여러개의 인스턴스를 가지는 경우, 데이터의 불일치를 해결하는 일부 방법에 대해 살펴보려고 한다.

사실 해당 내용은 공부를 하면서 여러가지 의문사항이 많이 남았는데 이 내용은 마지막에 서술하려고 한다.

 


문제점

 

Microservice에서는 여러개의 서비스 뿐 만 아니라, 같은 서비스가 여러개의 서버로 Scale-out되어 여러 인스턴스가 동작하고 있을 수 있다. 이들이 각자의 데이터베이스를 가지고 동작한다면 데이터 동기화의 문제가 발생할 수 있다.

 

예를 들어 Order-service의 주문을 100건 한다고 치면, 위의 3개의 Order-service들이 요청을 각각 나누어 처리하게 될 것이다.

그런데 주문은 각자의 데이터베이스에 저장이 되므로, 만약 주문 조회를 하게 된다면 전체 100건이 흩어져 있으므로 데이터 동기화의 문제가 발생한다는 것이다.

 

 

 

공부한 내용에서는 Kafka를 사용하여 서비스들은 주문 정보를 데이터베이스가 아니라 Kafka Topic으로 전송하게 되고

데이터베이스는 Sink Connector를 통해 Topic의 데이터를 가져오는 방식으로 단일 데이터베이스로 사용하는 패턴으로 해당 문제를 다루었다. 

 

 


 

Kafka Sink Connect와 Topic 등록

 

echo '
{
    "name":"my-order-sink-connect",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/my_database",
        "connection.user":"root",
        "connection.password":"1111",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"orders"
    }
}
'| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

 

사용할 데이터베이스에 Topic과, 이와 관련된 Sink Connector 설정을 진행하도록 하자.

해당 Post 요청을 통해서, orders라는 토픽으로 포멧에 맞는 메시지가 들어오면 Sink Connector를 통해 자동으로 데이터베이스에 Insert되도록 설정해주었다.

 

 

 

Kafka Connect에 대한 내용은 다음 포스팅을 참고하자.

https://sjh9708.tistory.com/153

 

 

[Apache Kafka] Kafka Connect : Source DB과 Target DB 연동 및 동기화

이번 포스팅에서는 Kafka Connector를 설치하고, 데이터베이스와 같은 Source들 사이에서 데이터 이동을 Kafka Connect를 연결하여 수행해보려고 한다. Kafka에 대한 대략적인 개념은 해당 포스팅에 작성하

sjh9708.tistory.com

 

 


Kafka의 메시지 포멧

 

 

 

Kafka의 메시지 형식은 직렬화 형식을 지정함에 따라 달라지겠지만, 기본적인 Arvo 형식은 위와 같은 포멧을 따른다. 

따라서 우리는 Sink Connector가 사용할 수 있는 형식인 상기의 포멧으로 메시지를 Produce해야 한다는 것을 기억하자.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


 

OrderService 수정

 

 

스키마 작성

@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
    private Schema schema;
    private Payload payload;
}

 

@Data
@Builder 
public class Payload {
    private String order_id;
    private String user_id;
    private String product_id;
    private int qty;
    private int unit_price;
    private int total_price;
}

@Data
@AllArgsConstructor
public class Field {
    private String type;
    private boolean optional;
    private String field;
}

@Data
@Builder
public class Schema {
    private String type;
    private List<Field> fields;
    private boolean optional;
    private String name;
}

 

Consumer가 Sink Connector라면, Producer은 직렬화하여 Kafka 클러스터에 보내기 이전에, 데이터의 형식이나 구조를 Sink Connector가 지원하는 형태로 변환해야 한다. 따라서 데이터를 Kafka 토픽에 보내기 전에는 이미 지정된 형식에 맞추어 변환하기 위해 다음과 같은 Arvo 스키마와 페이로드를 가진 형태로 데이터를 변환하기 위한 자료구조를 만들어 주었다.

 

 

 

 

Producer 작성

 

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory(){
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //Serialize 방법 지정
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

 

이전 포스팅에서도 만들었던 KafkaClient를 사용하기 위한 Producer 설정을 하는 설정파일을 작성해주자.

@EnableKafka를 통해 관련된 Bean들을 등록하고 관련 설정을 활성화해 줄 수 있다.

 

 

 

@Service
@Slf4j
public class OrderProducer {

    private KafkaTemplate<String, String> kafkaTemplate;

    public OrderProducer(KafkaTemplate<String, String> kafkaTemplate){
        this.kafkaTemplate = kafkaTemplate;
    }


    List<Field> fields = Arrays.asList(new Field("string", true, "order_id"),
            new Field("string", true, "user_id"),
            new Field("string", true, "product_id"),
            new Field("int32", true, "qty"),
            new Field("int32", true, "unit_price"),
            new Field("int32", true, "total_price")
    );

    private Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();



    public OrderDto send(String topic, OrderDto orderDto){
        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build();

        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);

        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try{
            jsonInString = mapper.writeValueAsString(kafkaOrderDto);
        }
        catch(JsonProcessingException e){
            e.printStackTrace();
        }
        kafkaTemplate.send(topic, jsonInString);
        log.info("Order Producer send data " + kafkaOrderDto);


        return orderDto;
    }
}

 

Spring에서 Kafka로 메시지를 보내기 위한 클래스를 작성해주자.

send 메서드는 OrderDto 객체를 Kafka 토픽으로 전송한다.

 

Sink Connector가 메시지를 처리하려면 해당 형식에 맞게 데이터를 변환할 필요가 있다고 하였다. OrderDto 객체를 Avro 스키마를 사용하여 Kafka 토픽으로 전송하기 위한 Arvo 스키마로 변환한 후, 이를 JSON으로 직렬화한 후 KafkaTemplate을 통해서 Topic에 메시지를 Produce하는 코드를 작성해주었다.

 

 

 

Controller 수정

 

@RestController
@RequestMapping("/order-service")
@AllArgsConstructor
public class OrderController {

    private final Environment env;
    private final OrderService orderService;
    private final KafkaProducer kafkaProducer;
    private final OrderProducer orderProducer;

    /* order-service/{user_id}/orders/ */
    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@RequestBody RequestOrder order, @PathVariable("userId") String userId){
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        OrderDto orderDto = mapper.map(order, OrderDto.class);
        orderDto.setUserId(userId);

        /* Use JPA */
//        OrderDto createdOrder = orderService.createOrder(orderDto);
//        ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);

        /* Kafka */
        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(order.getQty() * order.getUnitPrice());
        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);


        /* Kafka Messaging Queue에 전달 */
        orderProducer.send("orders", orderDto);
        kafkaProducer.send("example-catalog-topic", orderDto);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);  //201 Return

    }


}

 

이제 컨트롤러에서, 기존 JPA를 사용하여 데이터베이스에 직접 접근하는 것 대신 Kafka에 Message를 보내 데이터베이스에 Insert 처리를 위임하도록 하자. 이는 모듈간의 의존성과 결합도를 낮추고, 비동기성 측면에서 유리해진다.

 

과정을 정리해보면 다음과 같다.

 

1. orderProducer의 send 메서드를 호출

2. Kafka Topic에 Message를 Produce

3. Kafka Cluster은 Topic에 저장

4. Sink Connect에 의해 Topic의 값을 데이터베이스로 가져와 저장

 

 

 

 

이제 단일 데이터베이스에 저장이 되는 것을 확인할 수 있다.

 

 

 

 


단일 데이터베이스 사용에서 Message Queue 사용 이유?

 

 

이런 생각을 해 볼수도 있다. 그냥 어차피 단일 데이터베이스를 사용할 것이라면 바로 연결하여 JPA 등을 통해서 사용하면 되지 않을까?

전적으로 맞는 내용이다. Message Queue를 통한 처리는 비동기적 처리의 이점을 고려한 것으로, 다음의 이점을 생각해볼 수 있다.

 

직접적인 데이터베이스 접근에서는 동기 상태로 서비스가 진행되어야 함으로 인한 트래픽과 지연 문제 발생 가능성이 있고, 다른 서비스가 데이터를 사용하는 경우 Locking으로 인한 Delay 발생 가능성이 있다.

 

Message Queue를 사용할 경우에는 애플리케이션단에서는 데이터의 처리를 Kafka로 전달하였기 때문에, 비동기 방식으로 다른 처리를 진행할 수 있게 된다. 또한 Kafka는 클러스터 기반의 분산 시스템으로 구성되어 있기 때문에 Fault Tolerance하여 데이터 전달에 대한 보증을 할 수 있다.

 

따라서 대용량 DB 처리나 여러 서비스로부터의 동기화가 필요할 경우에는 위의 예제처럼 Message Queue Service와 같은 서비스가 도입을 고려할 수 있다.

 

 

 

 


숙제와 의문점

필자는 해당 방식이 분명 활용될 수 있겠지만, 여러가지 극복해야 할 점이 있다고 생각하였다. 아래의 내용은 개인적인 생각을 정리한 것으로 신빙성이 없음을 확인해두자.

 

 

1. Scale out, 여러개의 데이터베이스 인스턴스가 필요해지면?

 

지금까지 다룬 내용은 단일 데이터베이스 사용을 통해 해결한 것이지만 실제로 Microservice의 도입 목적 중 하나는 트래픽을 나누어 분산처리하는 것이 있다.

이를 위해서는 데이터베이스 또한 같은 서비스일지라도 여러개의 데이터베이스 인스턴스를 운영하게 될 상황을 고려하지 않을 수 없을 것이다. 이 때, 같은 서비스에서 분산된 여러개의 데이터베이스 동기화의 문제가 숙제가 될 것 같다. 해당 과정에서 Message Queuing Service가 큰 역할을 할 수 있을 것 같지만 복잡한 과정을 수행해야 할 것 같다.

 

CQRS 패턴

데이터 조회와 데이터 갱신을 나누는 CQRS 패턴이 있다고 하는데 이를 활용하면 구조를 정립하는 데에 도움이 될 수도 있다고 생각한다.

가장 단순하게 생각해서, 데이터를 저장하는 서비스에는 각각 분산하여 저장해두고, 데이터를 조회하는 서비스에만 Messaging Queueing Service 등을 통해 저장된 데이터를 모아 동기화하여 전체 데이터를 Read-only로 조회 가능하도록 나누는 방식을 활용할 수 있을 것 같다.

 

 

 

2. 일관성과 트랜잭션

 

기존 Monolothic 방식에서는 일련의 작업들을 트랜잭션으로 처리하여, 원자성을 띄고, 데이터의 일관성과 작업 순서를 보장할 수 있었다.

 

그런데 만약 Kafka와 같이 Event 기반이라면 비동기이므로, 메세지가 비동기적으로 전달되므로 순서대로 처리되지 않을 수 있다.

 

또한, 이벤트 중 하나가 실패했을 시의, 전체 이벤트의 오류처리 혹은 롤백을 어떻게 처리해야 할 지도 고민해보아야 한다. 

  • Message의 전송이 실패할 수도 있다. 이를 위해서는 Kafka의 트랜잭션을 활용할 수 있다고 한다.
  • Message 전송은 성공했으나, 데이터베이스 처리에서 문제가 발생할 수 있다. 여러 개의 서로 다른 서비스에서 수행되는 트랜잭션인 분산 트랜잭션 및 이를 사용한 Saga Pattern을 사용할 수 있을 것 같다.

 

Saga Pattern

분산된 마이크로서비스 아키텍처에서 장애 복구 및 데이터 일관성을 관리하기 위한 패턴이라고 한다.

이 패턴은 긴 트랜잭션을 단계별로 나누어 여러 마이크로서비스 간의 통신을 통해 완료하며, 각 단계는 성공하거나 실패하면 롤백되거나 보상 트랜잭션을 수행할 수 있다.

전체 트랜잭션을 관리하고 추적하는 Saga Orchestrator을 두어, 마이크로서비스간의 상호작용을 조정하는 역할을 한다.

어떤 단계에서 문제가 발생하면 Saga Orchestrator는 롤백 혹은 보상 트랜잭션을 시작하여 상태를 복구한다고 한다.

 

 

 

 

 

 

마이크로서비스는 공부하면서 느끼지만 매우 어렵다.... 기존 Monolothic 아키텍쳐에서 사용된 것 이외에 많은 것들을 고려해야 하는 것 같다.

 

 

 

 

 

 

반응형

BELATED ARTICLES

more