[Apache Kafka] Kafka Connect : Source DB과 Target DB 연동 및 동기화

2024. 1. 5. 12:44
반응형

 

이번 포스팅에서는 Kafka Connector를 설치하고, 데이터베이스와 같은 Source들 사이에서 데이터 이동을 Kafka Connect를 연결하여 수행해보려고 한다.

 

 

Kafka에 대한 대략적인 개념은 해당 포스팅에 작성하였으므로 참고하면 좋을 것 같다.

https://sjh9708.tistory.com/151

 

[Apache Kafka] Kafka의 주요 개념과 구성 요소

Apache Kafka는 데이터의 실시간 스트리밍 및 이벤트 처리에 유용한 도구로서 활용된다. Kafka는 대규모 분산 시스템에서 안정적이고 확장 가능한 메시지 큐와 이벤트 스트리밍 플랫폼으로 사용되

sjh9708.tistory.com

 

 


Kafka Connect

 

 

Connect를 통해 Source System(특정 데이터 소스, 스토리지)에서 Target System(다른 데이터 소스, 스토리지)으로 별도의 소스코드 없이 데이터를 이동시킬 수 있다. 

 

Kafka Connect Source

  • 외부 소스(예: 데이터베이스, 파일, 메시지 큐)로부터 데이터를 가져와 Kafka 토픽으로 데이터를 전송하는 커넥터.
  • Producer의 역할을 수행한다.
  • Source 커넥터는 데이터를 추출하여 Kafka 토픽에 쓰기 작업을 수행한다.
  • Source 커넥터에서 처리되기 전에 데이터 변환 작업을 수행할 수 있다.

Kafka Connect Sink

  • Kafka 토픽으로부터 데이터를 읽어와 외부 저장소(예: 데이터베이스, 파일 시스템)에 데이터를 쓰는 커넥터.
  • Consumer의 역할을 수행한다.
  • Sink 커넥터는 Kafka 토픽에서 데이터를 소비하여 외부 저장소로 데이터를 전송.
  • Sink 커넥터에서 처리되기 전에 데이터 변환 작업을 수행할 수 있다.

 

 

예시

 

1. Source System(MySQL)에서 변경된 데이터를 Source Connector가 읽어오고, 이때 Source Connector에서 데이터를 적절한 형식으로 변환한다.

2. Source Connector에서는 변환된 데이터를 Kafka 토픽으로 전송하기 전에 직렬화한다. 이 직렬화된 데이터가 Kafka Producer API를 통해 Kafka 클러스터에 전송된다. 직렬화 방식을 임의로 지정할 수 있으나, 역직렬화 할 때와 동일해야 한다.

3. Kafka Cluster은 해당 Topic의 파티션에 바이트 형식으로 데이터를 저장한다.

4. Sink Connector가 Topic에서 데이터를 소비할 때 Producer가 사용한 직렬화 형식과 일치하는 역직렬화 방식을 사용하여 데이터를 변환한다.

5. Kafka 토픽에서 데이터를 소비하고 Target System인 MongoDB에 저장한다.

 

 

주요 사용처

  • 데이터를 Kafka로 가져오거나 Kafka에서 다른 시스템으로 데이터를 전송하는 ETL (Extract, Transform, Load) 작업을 수행
  • 데이터 소스 연결: RDBMS, 로그 파일, 메시지 큐 등의 다양한 데이터 소스에서 데이터를 Kafka로 가져올 때 사용
  • 데이터 타겟 연결: Kafka에서 데이터를 가져와 다른 저장소 또는 시스템으로 전송할 때 사용

 

 


환경 설정

 

Kafka Connector 설치

 

http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz

 

다음 URL에서 다운로드 후 압축을 해제하자.

 

 

JDBC Connector 

 

Kafka Connect는 다양한 소스 및 대상 시스템과 통합할 수 있는 Connector를 제공한다.

JDBC Connector는 그 중 하나로, 데이터베이스와의 상호 작용을 쉽게 할 수 있도록 도와주는 Connector이다.

데이터베이스의 테이블에서 변경된 데이터를 추출하여 Kafka 토픽으로 전송하거나, Kafka 토픽에서 받아온 데이터를 데이터베이스에 적재할 수 있다.

https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

JDBC Connector 설치를 위해서 마찬가지로 다운로드 이후 압축을 해제하자.

 

 

 

 

디렉터리는 Kafka를 설치한 디렉터리에 kafka-connector 디렉터리를 생성한 후,

하위 경로에 Kafka Connect와 JDBC Connector 압축 해제한 디렉터리들을 넣어 주었다.

 

 

 

JDBC Connector 설정

 

설치한 JDBC 커넥터 디렉터리 내부의 lib의 경로를 복사하자.

 

 

 

Kafka Connect에서 사용 가능한 Connector 플러그인으로 JDBC Connector를 추가해주는 과정이다.

JDBC Connector의 경우, 해당 Connector가 제공하는 라이브러리를 포함하는 디렉터리를 지정한다. 

이를 통해서 Kafka Connect는 해당 디렉터리에서 JDBC Connector 라이브러리를 로드하고 사용할 수 있게 됩다.

 

Kafka Connector 내부의 etc/kafka/connect-distributed.properties의 plugin.path를 위에서 복사한 JDBC Connector lib 폴더로 설정해주자.

 

 

 

 

MariaDB 사용을 위해서 드라이버 복사

이번 포스팅에서는 Kafka Connector를 MariaDB와 연동해 볼 예정이다.

이를 위해서 MariaDB를 설치한 이후 해당 드라이버를 Kafka에 설정해주자.

 

 

Mac을 기준으로 ~/.m2/repository/org/mariadb/jdbc/mariadb-java-client/2.7.2 디렉터리에 들어가보면 mariadb-java-client-2.7.2.jar이 있을 것이다. 해당 드라이버를 복사하자.

 

 

 

이후 Kafka Connector 디렉터리 내부의 /share/java/kafka 디렉터리 내에 복사한 드라이버를 추가해주자.

 

 

 

 

 

데이터베이스 준비

 

Kafka Connect를 위한 Source System와 Target System으로 두 개의 데이터베이스를 사용할 것이다.

이번 포스팅에서는 3306 포트의 로컬 MariaDB에서 두 개의 데이터베이스 간에 Connector를 통해 Publish/Consume을 수행하려고 한다.

 

 

 

따라서 Source System DB와 Target  System DB를 각각 생성해주고, Source DB에는 임의의 테이블을 생성해주었다.

이번 포스팅의 목적은 source_db의 users 테이블에 데이터가 추가되면, 이를 감지하여 target_db에 해당 데이터를 이동시켜 동기화시켜주는 것이다.

 


Kafka Connector 실행

 

Connector를 설정하기 이전에 아래의 포스팅에서 실행했던 Kafka Server와 Zookeeper를 먼저 실행해주자

https://sjh9708.tistory.com/152

 

[Apache Kafka] 설치와 Client(Producer/Consumer) 간 Message 테스트

해당 포스팅에서는 Kafka를 설치하고, CLI를 통해 Publisher, Consumer 간의 메시지 교환을 확인해보려고 한다. Kafka에 대한 대략적인 개념은 해당 포스팅에 작성하였으므로 참고하면 좋을 것 같다. https:

sjh9708.tistory.com

 

 

이제 Kafka Connector를 실행해보자.

 

 ./bin/connect-distributed ./etc/kafka/connect-distributed.properties

 

인자로 위에서 JDBC 관련 설정을 해주었던 Properties 파일을 지정해주자. 파일에 정의된 설정을 기반으로 Kafka Connect가 분산 모드로 시작된다.

 

 

 

 

Topic을 조회해보면 Kafka Connect와 관련된 내용들이 추가된 것을 확인할 수 있다.

 

  • connect-configs : Worker(Kafka Connect 작업자)의 구성 정보를 저장
  • connect-offsets : Worker가 토픽의 각 파티션에 대해 테스크가 마지막으로 성공적으로 처리한 오프셋을 저장하며. Failure가 발생하고 다시 시작할 때 어디까지 성공적으로 처리했는지 추적할 수 있다.
  • connect-status : Worker의 상태 정보를 저장. 작업자의 상태, 테스크의 실행 여부, 성공 또는 실패 여부 등

 

 


Source Connector 추가

 

이제 Source System으로부터 변화를 감지하여 Kafka Cluster로 데이터를 보내줄 Source Connector 설정을 해주자.

 

echo '
{
    "name" : "my-source-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url":"jdbc:mysql://localhost:3306/source_db",
        "connection.user":"root",
        "connection.password":"1111",
        "mode": "incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist":"users",
        "topic.prefix" : "my_topic_",
        "tasks.max" : "1"
	}
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

 

위와 같은 HTTP 요청을 통해서 Kafka Connect에 JDBC Source Connect 설정을 추가해준다.

  • connector.class: 사용할 커넥터 클래스를 지정(JDBC 소스 커넥터)
  • connection.url: MariaDB 데이터베이스에 연결하는 데 사용되는 JDBC 연결 URL
  • connection.user / connection.password: 데이터베이스에 연결하는 데 사용되는 계정
  • mode: "incrementing": 변경 감지 모드를 지정. 이 경우 Row의 증가를 통해 변경을 감지한다.
  • incrementing.column.name : 증가하는 열의 이름을 지정 ("incrementing"으로 설정된 경우 팔요)
  • table.whitelist: 데이터를 추출할 테이블을 지정
  • topic.prefix: 생성되는 Kafka 토픽의 이름 접두사를 지정
  • tasks.max: 병렬로 실행될 작업의 최대 수를 지정

 

 

현재 Connector의 리스트 조회하기

curl -X GET http://localhost:8083/connectors

 

 

Topic 리스트 확인

 

리스트에 등록한 Source Connect의 토픽이 추가된 것을 확인할 수 있다.

 

 

 

 

Source DB에 데이터 추가해보기

 

 

Source DB에 데이터를 추가해보자.

해당 Source Connect Topic을 Consume하는 클라이언트를 실행해보면 해당 데이터의 변경을 감지하여 데이터를 받아오는 것을 확인할 수 있다.

 

클라이언트가 받아온 데이터를 살펴보면 Arvo 형식의 메시지로, Type과 Fields, Payload로 구성된 것을 확인할 수 있다.

 

 

 


Sink Connector 추가

 

앞에서 Source System으로부터 변화를 감지하여 Kafka Cluster로 데이터를 보내줄 Source Connector 설정을 해주었다.

이제 변경된 데이터를 Consume하여, 다른 Target Database에 자동으로 동기화되도록 Sink Connector를 설정해보자.

 

 

echo '
{
    "name":"my-sink-connect",
    "config": {
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/target_db",
        "connection.user":"root",
        "connection.password":"1111",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"my_topic_users"
    }
}
'| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

 

Target Connector와 유사하게 Sink Connect 설정을 추가해준다.

  • name: Connector의 고유한 이름 설정
  • connector.class: 사용할 Connector 클래스를 지정
  • auto.create 및 auto.evolve: 데이터베이스 테이블이 존재하지 않을 경우 자동으로 생성하도록 설정 및 스키마의 자동 갱신 허용
  • delete.enabled: false로 설정하여, Connector는 기존 레코드를 삭제하지 않고 새로운 레코드를 추가 또는 갱신
  • tasks.max: 한 번에 실행할 Task의 최대 수를 설정
  • topics: 소비할 Kafka 토픽을 지정한다. 여기서는 "my_topic_users"로 설정되어 있다.

 

Target Source 확인

Sink Connector를 추가했을 때, Auto Create 옵션에 의해 토픽의 이름으로 테이블이 자동 생성되었다.

만약 테이블의 이름을 따로 지정하고 싶으면 table.name.format을 Sink Connector를 만들 때 지정하면 된다.

 

 

 

 

 


테스트

 

이제 Source DB에 User 테이블에 데이터가 추가가 되면, Target DB는 이를 Kafka를 통해 받아 자동으로 테이블에 데이터가 추가된다.

 

이처럼 Kafka Connect를 사용하여 연결해두면 Source System(특정 데이터 소스, 스토리지)에서 Target System(다른 데이터 소스, 스토리지)으로 소스코드 없이 데이터를 이동시킬 수 있다. 

 

반응형

BELATED ARTICLES

more