[Spring Cloud] MSA : (8) Apache Kafka 연결 : 데이터베이스 동기화
이전 포스팅에서 서로 다른 서비스들 간의 데이터 동기화를 Kafka를 이용하여 수행해보았다.
이번에는 같은 서비스가 여러개의 인스턴스를 가지는 경우, 데이터의 불일치를 해결하는 일부 방법에 대해 살펴보려고 한다.
사실 해당 내용은 공부를 하면서 여러가지 의문사항이 많이 남았는데 이 내용은 마지막에 서술하려고 한다.
문제점
Microservice에서는 여러개의 서비스 뿐 만 아니라, 같은 서비스가 여러개의 서버로 Scale-out되어 여러 인스턴스가 동작하고 있을 수 있다. 이들이 각자의 데이터베이스를 가지고 동작한다면 데이터 동기화의 문제가 발생할 수 있다.
예를 들어 Order-service의 주문을 100건 한다고 치면, 위의 3개의 Order-service들이 요청을 각각 나누어 처리하게 될 것이다.
그런데 주문은 각자의 데이터베이스에 저장이 되므로, 만약 주문 조회를 하게 된다면 전체 100건이 흩어져 있으므로 데이터 동기화의 문제가 발생한다는 것이다.
공부한 내용에서는 Kafka를 사용하여 서비스들은 주문 정보를 데이터베이스가 아니라 Kafka Topic으로 전송하게 되고
데이터베이스는 Sink Connector를 통해 Topic의 데이터를 가져오는 방식으로 단일 데이터베이스로 사용하는 패턴으로 해당 문제를 다루었다.
Kafka Sink Connect와 Topic 등록
echo '
{
"name":"my-order-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/my_database",
"connection.user":"root",
"connection.password":"1111",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"orders"
}
}
'| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
사용할 데이터베이스에 Topic과, 이와 관련된 Sink Connector 설정을 진행하도록 하자.
해당 Post 요청을 통해서, orders라는 토픽으로 포멧에 맞는 메시지가 들어오면 Sink Connector를 통해 자동으로 데이터베이스에 Insert되도록 설정해주었다.
Kafka Connect에 대한 내용은 다음 포스팅을 참고하자.
https://sjh9708.tistory.com/153
[Apache Kafka] Kafka Connect : Source DB과 Target DB 연동 및 동기화
이번 포스팅에서는 Kafka Connector를 설치하고, 데이터베이스와 같은 Source들 사이에서 데이터 이동을 Kafka Connect를 연결하여 수행해보려고 한다. Kafka에 대한 대략적인 개념은 해당 포스팅에 작성하
sjh9708.tistory.com
Kafka의 메시지 포멧
Kafka의 메시지 형식은 직렬화 형식을 지정함에 따라 달라지겠지만, 기본적인 Arvo 형식은 위와 같은 포멧을 따른다.
따라서 우리는 Sink Connector가 사용할 수 있는 형식인 상기의 포멧으로 메시지를 Produce해야 한다는 것을 기억하자.
OrderService 수정
스키마 작성
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
Consumer가 Sink Connector라면, Producer은 직렬화하여 Kafka 클러스터에 보내기 이전에, 데이터의 형식이나 구조를 Sink Connector가 지원하는 형태로 변환해야 한다. 따라서 데이터를 Kafka 토픽에 보내기 전에는 이미 지정된 형식에 맞추어 변환하기 위해 다음과 같은 Arvo 스키마와 페이로드를 가진 형태로 데이터를 변환하기 위한 자료구조를 만들어 주었다.
Producer 작성
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //Serialize 방법 지정
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
이전 포스팅에서도 만들었던 KafkaClient를 사용하기 위한 Producer 설정을 하는 설정파일을 작성해주자.
@EnableKafka를 통해 관련된 Bean들을 등록하고 관련 설정을 활성화해 줄 수 있다.
@Service
@Slf4j
public class OrderProducer {
private KafkaTemplate<String, String> kafkaTemplate;
public OrderProducer(KafkaTemplate<String, String> kafkaTemplate){
this.kafkaTemplate = kafkaTemplate;
}
List<Field> fields = Arrays.asList(new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price")
);
private Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
public OrderDto send(String topic, OrderDto orderDto){
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try{
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
}
catch(JsonProcessingException e){
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer send data " + kafkaOrderDto);
return orderDto;
}
}
Spring에서 Kafka로 메시지를 보내기 위한 클래스를 작성해주자.
send 메서드는 OrderDto 객체를 Kafka 토픽으로 전송한다.
Sink Connector가 메시지를 처리하려면 해당 형식에 맞게 데이터를 변환할 필요가 있다고 하였다. OrderDto 객체를 Avro 스키마를 사용하여 Kafka 토픽으로 전송하기 위한 Arvo 스키마로 변환한 후, 이를 JSON으로 직렬화한 후 KafkaTemplate을 통해서 Topic에 메시지를 Produce하는 코드를 작성해주었다.
Controller 수정
@RestController
@RequestMapping("/order-service")
@AllArgsConstructor
public class OrderController {
private final Environment env;
private final OrderService orderService;
private final KafkaProducer kafkaProducer;
private final OrderProducer orderProducer;
/* order-service/{user_id}/orders/ */
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@RequestBody RequestOrder order, @PathVariable("userId") String userId){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(order, OrderDto.class);
orderDto.setUserId(userId);
/* Use JPA */
// OrderDto createdOrder = orderService.createOrder(orderDto);
// ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
/* Kafka */
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(order.getQty() * order.getUnitPrice());
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
/* Kafka Messaging Queue에 전달 */
orderProducer.send("orders", orderDto);
kafkaProducer.send("example-catalog-topic", orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder); //201 Return
}
}
이제 컨트롤러에서, 기존 JPA를 사용하여 데이터베이스에 직접 접근하는 것 대신 Kafka에 Message를 보내 데이터베이스에 Insert 처리를 위임하도록 하자. 이는 모듈간의 의존성과 결합도를 낮추고, 비동기성 측면에서 유리해진다.
과정을 정리해보면 다음과 같다.
1. orderProducer의 send 메서드를 호출
2. Kafka Topic에 Message를 Produce
3. Kafka Cluster은 Topic에 저장
4. Sink Connect에 의해 Topic의 값을 데이터베이스로 가져와 저장
이제 단일 데이터베이스에 저장이 되는 것을 확인할 수 있다.
단일 데이터베이스 사용에서 Message Queue 사용 이유?
이런 생각을 해 볼수도 있다. 그냥 어차피 단일 데이터베이스를 사용할 것이라면 바로 연결하여 JPA 등을 통해서 사용하면 되지 않을까?
전적으로 맞는 내용이다. Message Queue를 통한 처리는 비동기적 처리의 이점을 고려한 것으로, 다음의 이점을 생각해볼 수 있다.
직접적인 데이터베이스 접근에서는 동기 상태로 서비스가 진행되어야 함으로 인한 트래픽과 지연 문제 발생 가능성이 있고, 다른 서비스가 데이터를 사용하는 경우 Locking으로 인한 Delay 발생 가능성이 있다.
Message Queue를 사용할 경우에는 애플리케이션단에서는 데이터의 처리를 Kafka로 전달하였기 때문에, 비동기 방식으로 다른 처리를 진행할 수 있게 된다. 또한 Kafka는 클러스터 기반의 분산 시스템으로 구성되어 있기 때문에 Fault Tolerance하여 데이터 전달에 대한 보증을 할 수 있다.
따라서 대용량 DB 처리나 여러 서비스로부터의 동기화가 필요할 경우에는 위의 예제처럼 Message Queue Service와 같은 서비스가 도입을 고려할 수 있다.
한계점
이 방식은 데이터가 분산 저장하지 못하고 동기화로 인해 결국 병합된다. 따라서 물리적으로는 분산 데이터베이스 인스턴스를 가지지만, 논리적으로는 중앙 저장소에 집중되는 형태를 가진다. 따라서 DB 부하 분산이나 진정한 MSA 확장성에는 한계가 존재한다.
AWS와 같은 클라우드의 환경이라면 AWS AuroraDB와 같은 분산 데이터베이스 서비스를 사용해서 비교적 쉽게 사용할 수 있겠지만, 온프레미스 환경이라면 직접 분산 데이터베이스 서버를 구축해야 할 것이다.