Debezium을 사용하여 SQL Server에 CDC 구성하기

dambyul·2023년 12월 4일
2
post-thumbnail

debezium을 활용하여 Apache Kafka에 DB의 변경 이력을 보내려고 한다. 이번 글에서는 SQL Server에서 구성하는 방법에 대해서 다뤄보았으며, 외의 DB에 대해서도 하나씩 다룰 예정이다.

참고문서

시작하기에 앞서 해당 글은 이미 SQL Server와 Apache Kafka 및 kafka connect의 설치는 되어있다는 가정하에 작성되었으므로 참고. 추후 kafka connect의 구성에 대한 글도 작성 예정에 있음.

SQL Server 설정

기본적으로는 SQL Server는 CDC가 비활성화되어있으며, CDC 전용 설정이 존재하므로 해당 설정을 활성화해야 한다.

설정 조회

Database와 Table 모두 CDC가 활성화되어 있어야 하며, 아래 쿼리를 사용하여 현재의 상태 값을 조회할 수 있다. 0이 비활성화, 1이 활성화 상태다.

select
  name,
  is_cdc_enabled
from sys.databases;	

select
  name,
  is_tracked_by_cdc 
from sys.tables;

Database CDC 활성

Table에 CDC를 적용하기 전 해당 테이블이 존재하는 Database에도 CDC를 활성화해야한다. 처음 설정해 볼 때 테이블에만 적용하면 되는 것 아닐까? 했는데 둘 다 적용이 필요하다고 한다. 아래 쿼리를 사용하여 간단하게 활성화할 수 있다.

USE DB이름
GO
EXEC sys.sp_cdc_enable_db
GO

Table CDC 활성

DB에 CDC 활성이 완료되었다면 테이블에 대해서도 활성화를 진행한다. 여러 개의 테이블에 적용할 때 USE DB이름처음 한번만 실행해도 된다.

USE DB이름
GO

EXEC sys.sp_cdc_enable_table
@source_schema = N'스키마이름',
@source_name   = N'테이블이름', 
@role_name     = N'Role이름',  
@filegroup_name = N'Filegroup이름',
@supports_net_changes = 0
GO

role_name의 경우 sysadmin이나 db_owner role은 이미 CDC 테이블에 대한 권한이 존재하므로, CDC 구성에 사용될 계정이 이 role에 포함되는 경우 NULL로 설정하면 되며 다른 role에 대해서 CDC를 구성을 할 경우에만 해당 role을 입력하면 된다.

filegroup_name은 CDC 테이블이 생성될 테이블 스페이스를 지정해 주는 것으로, 이미 존재하는 테이블 스페이스만 설정 가능하다.

권한 확인 및 CDC 설정 정보

CDC가 제대로 활성화되었다면 CDC라는 스키마에 변경 내용이 저장되게 된다. Debezium에서 커넥터 설정에 입력될 DB 계정으로 로그인하여 CDC 스키마 및 내부의 테이블에 Select 권한이 제대로 있는지 확인한다.

아래 쿼리를 사용하여 DB 및 Table에 적용된 CDC 설정을 한 번에 확인하는 방법도 있다.

USE DB이름;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO

커넥터 설정

DB 설정이 완료되었다면, 다음은 kafka connect에서 커넥터를 구성해야 한다. 설정할 수 있는 항목이 많으니 아래 예시 말고도 본인 입맛에 맞춰서 해당 문서를 참고하여 설정하면 된다.

{
  "name": "azure-source-connector",
  "config": {
	"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
	"database.encrypt": "false",
    "database.hostname": "127.0.0.1", 
    "database.port": "1433", 
    "database.user": "cdcuser", 
    "database.password": "password", 
    "database.names": "DBname", 
    "topic.prefix": "MSSQL", 
    "table.include.list": "dbo.TABLE_NAME", 
    "schema.history.internal.kafka.bootstrap.servers": "localhost:9096, localhost:9097", 
    "schema.history.internal.kafka.topic": "sh-history",
	"schema.history.internal.store.only.captured.tables.ddl": "true",
	"schema.history.internal.store.only.captured.databases.ddl": "true"
  }
}
  • name : 커넥터 이름
  • connector.class : 커넥터에 사용된 클래스로, debezium SQL Server 커넥터를 입력한다.
  • database.encrypt : DB 접속 시에 SSL을 사용하지 않을 경우 false 입력, 사용할 경우 database.ssl.truststore,database.ssl.truststore.password 설정이 필요하다.
  • database.hostname : DB Host
  • database.port : DB Port
  • database.user : DB User
  • database.password : DB Password
  • database.names : DB 이름. DB가 여러 개라면 경우 쉼표로 구분
  • topic.prefix : 토픽 구분자로 실제 topic은 prefix.schema.table 와 같은 형태로 사용된다.
  • table.include.list : CDC를 적용할 테이블 리스트
  • schema.history.internal.kafka.bootstrap.servers : 스키마 내역 정보를 저장할 카프카 borker 서버 리스트. 여러 개라면 쉼표로 구분
  • schema.history.internal.kafka.topic : 스키마 내역 정보를 저장할 토픽
  • schema.history.internal.store.only.captured.tables.ddl : table.include.list로 설정한 테이블에 대한 스키마 내역 정보만 저장할 것이라면 true 입력
  • schema.history.internal.store.only.captured.databases.ddl : database.names
    로 CDC가 적용된 DB에 대한 스키마 내역 정보만 저장할 것이라면 true 입력

생성한 커넥터는 curl 형태로 kafka connect가 실행되고 있는 http://localhost:8083/connectors에 POST request를 하면 실행할 수 있다.

토픽 생성

위 커넥터를 실행하기 위해서 추가로 생성해야 하는 토픽 정보는 아래와 같다

  1. topic.prefix 토픽 (예를 들어 prefix를 MS로 생성했을 경우, MS라는 토픽을 만들어야 한다)
  2. CDC를 적용한 각 테이블에 대한 토픽 : prefix.DB이름.스키마이름.테이블이름 각각 생성한다)
  3. schema history에 사용되는 토픽 : schema.history.internal.kafka.topic에 입력한 값을 토픽으로 생성
profile
자칭 개발자

0개의 댓글