이전 글
[1편] MSA 이벤트 발행 - 이론
MSA에서 비지니스 객체의 변경에 대한 이벤트를 발행할 때, 비지니스 객체 변경과 이벤트 발행이 하나의 트랜잭션으로 실행되어야하는 이유를 설명했다. 또한 이를 위한 방법으로 Outbox 패턴을 소개했다.
[그림] 강의 서비스 아키텍처 - 강의 게시 요청 상세
위 아키텍처 그림은 강사(Teacher)가 강의 게시 요청을 했을 때 강의 서비스가 어떻게 작동하는지에 대한 아키텍처를 그림으로 그린 것이다. 우선 애플리케이션 단에서 보면 1) DB 어댑터를 통해 강의 posted 값을 1로 변경하는 비지니스 로직 (CourseEntity, CourseRepository)과 2) 이벤트 메시지 발행을 위해 OUTBOX 테이블에 이벤트를 쓰는 로직 (이벤트 발행기/리스너, OutboxEvent, OutboxEventRepository)이 있다. 그리고 이 두 개의 로직은 하나의 트랜잭션으로 실행된다. 그 다음 OUTBOX 테이블에 쓰여진 이벤트를 이벤트 채널에 발행하기 위해서 debezium을 사용한다. Debezium은 OUTBOX 테이블의 트랜잭션 로그의 변경분을 하나의 메시지로 kafka 메시지 브로커에 발행한다.References:
강의 서비스 애플리케이션은 Spring으로 구현했다. 코드가 길어져 핵심 로직이 잘 안보일 수도 있어 롬복(Lombok 이란?)을 적용하여 getter, setter, 생성자 등을 어노테이션으로 대체했다.
- /courses/{courseId}/post
- /courses/{courseId}/unpost
@RestController
@RequestMapping("/courses")
public class CourseServiceApiController {
@Autowired
private CourseService courseService;
...
@PostMapping("/{courseId}/post")
public CourseResponseDTO postCourseById(@PathVariable("courseId") String courseId) {
courseService.postCourse(courseId);
return new CourseResponseDTO(courseId);
}
@PostMapping("/{courseId}/unpost")
public CourseResponseDTO unpostCourseById(@PathVariable("courseId") String courseId) {
courseService.unpostCourse(courseId);
return new CourseResponseDTO(courseId);
}
}
@AllArgsConstructor
@Service
public class CourseService {
private CourseRepository courseRepository;
private ApplicationEventPublisher eventBus;
...
@Transactional
public void postCourse(String courseId) {
CourseEntity course = courseRepository.findById(courseId).orElseThrow(() -> new CourseNotExistError());
course.post();
courseRepository.save(course);
eventBus.publishEvent(CoursePostedEvent.of(course));
}
...
}
@Component
@AllArgsConstructor
public class OutboxListener {
private OutboxEventRepository repository;
@EventListener
public void onExportedEvent(Outboxable event) {
OutboxEvent outboxEvent = OutboxEvent.from(event);
repository.save(outboxEvent);
repository.delete(outboxEvent);
}
}
public interface Outboxable {
String getAggregateId();
String getAggregateType();
String getPayload();
String getType();
}
public class CoursePostedEvent implements Outboxable {
private static ObjectMapper MAPPER = new ObjectMapper();
private final String id;
private final JsonNode payload;
public CoursePostedEvent(String id, JsonNode payload) {
this.id = id;
this.payload = payload;
}
public static CourseCreatedEvent of(CourseEntity course) {
return new CourseCreatedEvent(course.getCourseId(), MAPPER.valueToTree(course));
}
@Override
public String getAggregateId() {
return String.valueOf(id);
}
@Override
public String getAggregateType() {
return CourseEntity.class.getSimpleName().replace("Entity", "");
}
@Override
public String getPayload() {
try {
return MAPPER.writeValueAsString(payload);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
@Override
public String getType() {
return this.getClass().getSimpleName();
}
}
@Getter(AccessLevel.PUBLIC)
@Setter
@Entity
@AllArgsConstructor
@RequiredArgsConstructor
@Table(name ="outbox")
public class OutboxEvent {
@Id
@GeneratedValue
@Type(type = "uuid-char")
private UUID id;
@Column(nullable = false)
@NonNull
private String aggregateType;
@Column(nullable = false)
@NonNull
private String aggregateId;
@Column(nullable = false)
@NonNull
private String type;
@Column(nullable = false, length = 1048576) //e.g. 1 MB max
@NonNull
private String payload;
@Column(nullable = false, updatable = false, insertable = false, columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
private Timestamp timestamp;
public static OutboxEvent from(Outboxable event) {
return new OutboxEvent(
event.getAggregateType(),
event.getAggregateId(),
event.getType(),
event.getPayload()
);
}
}
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, Long> {
}
References:
version: '3'
services:
mysql:
image: mysql:8.0
container_name: mysql
ports:
- 3307:3306
environment:
- MYSQL_ROOT_PASSWORD=admin
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
volumes:
- /Users/jimin/mysql/data:/var/lib/mysql
zookeeper:
container_name: zookeeper
image: debezium/zookeeper:1.4
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafka:
container_name: kafka
image: debezium/kafka:1.4
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
ports:
- "9092:9092"
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
connect:
container_name: connect
image: debezium/connect:1.4
depends_on:
- kafka
- mysql
links:
- kafka:kafka
- mysql:mysql
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
Docker compose를 통해 각 컨테이너를 띄우고, connect가 제대로 실행되고 있는지 확인한다.
$ docker-compose -f docker-compose.yml up -d
$ curl http://localhost:8083/
{"version":"2.6.1","commit":"6b2021cd52659cef","kafka_cluster_id":"SdINnaM-QIyh4ZTJcUtvmA"}
use mysql;
// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
FLUSH PRIVILEGES;
등록 JSON의 각 프로퍼티에 대한 설명: https://debezium.io/documentation/reference/stable/connectors/mysql.html#_required_debezium_mysql_connector_configuration_properties
$ curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{"name":"outbox-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"mysqluser","database.password":"mysqlpw","database.server.id":"184054","database.server.name":"dbserver1","table.include.list":"course.outbox","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"course.outbox","database.allowPublicKeyRetrieval":"true"}}'
{"name":"outbox-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"mysqluser","database.password":"mysqlpw","database.server.id":"184054","database.server.name":"dbserver1","table.include.list":"course.outbox","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"course.outbox","database.allowPublicKeyRetrieval":"true","name":"outbox-connector"},"tasks":[],"type":"source"}
등록 JSON
{
"name":"outbox-connector",
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"tasks.max":"1",
"database.hostname":"mysql",
"database.port":"3306",
"database.user":"mysqluser",
"database.password":"mysqlpw",
"database.server.id":"184054",
"database.server.name":"dbserver1",
"table.include.list":"course.outbox",
"database.history.kafka.bootstrap.servers":"kafka:9092",
"database.history.kafka.topic":"course.outbox",
"database.allowPublicKeyRetrieval":"true"
}
}
$ curl -H "Accept:application/json" localhost:8083/connectors/
["outbox-connector"]
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/outbox-connector
HTTP/1.1 200 OKector
Date: Thu, 16 Feb 2023 13:18:37 GMT
Content-Type: application/json
Content-Length: 567
Server: Jetty(9.4.33.v20201020)
{"name":"outbox-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.allowPublicKeyRetrieval":"true","database.user":"mysqluser","database.server.id":"184054","tasks.max":"1","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"course.outbox","database.server.name":"dbserver1","database.port":"3306","database.hostname":"mysql","database.password":"mysqlpw","name":"outbox-connector","table.include.list":"course.outbox"},"tasks":[{"connector":"outbox-connector","task":0}],"type":"source"}
나중에 필요할 때 삭제
$ curl --location --request DELETE 'http://localhost:8083/connectors/outbox-connector'
sh-4.2$ ./bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
__consumer_offsets
course.outbox
dbserver1
dbserver1.course.outbox
my_connect_configs
my_connect_offsets
my_connect_statuses
sh-4.2$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.course.outbox --from-beginning
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.course.outbox.Envelope"},"payload":{"before":null,"after":{"id":"33319dbb-358c-4f06-8e60-c82b81eab769","aggregate_type":"Course","aggregate_id":"bc32dd3f-2334-43d9-9493-71b538510efe","type":"CoursePostedEvent","payload":"{\"name\":\"MSA 파헤치기\",\"posted\":true,\"courseId\":\"bc32dd3f-2334-43d9-9493-71b538510efe\",\"teacherId\":\"7ead2ed1-77fa-4d70-bdfd-77e67c513bbd\",\"thumbnail\":\"msa_course.jpeg\",\"description\":\"MSA의 모든 것을 가르쳐 드립니다.\",\"registeredAt\":1676458941000,\"lastUpdatedAt\":1676598812000}","timestamp":"2023-02-17T01:53:42Z"},"source":{"version":"1.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1676598822000,"snapshot":"false","db":"course","table":"outbox","server_id":1,"gtid":null,"file":"binlog.000015","pos":13396,"row":0,"thread":518,"query":null},"op":"c","ts_ms":1676598823103,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.course.outbox.Envelope"},"payload":{"before":{"id":"33319dbb-358c-4f06-8e60-c82b81eab769","aggregate_type":"Course","aggregate_id":"bc32dd3f-2334-43d9-9493-71b538510efe","type":"CoursePostedEvent","payload":"{\"name\":\"MSA 파헤치기\",\"posted\":true,\"courseId\":\"bc32dd3f-2334-43d9-9493-71b538510efe\",\"teacherId\":\"7ead2ed1-77fa-4d70-bdfd-77e67c513bbd\",\"thumbnail\":\"msa_course.jpeg\",\"description\":\"MSA의 모든 것을 가르쳐 드립니다.\",\"registeredAt\":1676458941000,\"lastUpdatedAt\":1676598812000}","timestamp":"2023-02-17T01:53:42Z"},"after":null,"source":{"version":"1.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1676598823000,"snapshot":"false","db":"course","table":"outbox","server_id":1,"gtid":null,"file":"binlog.000015","pos":14352,"row":0,"thread":518,"query":null},"op":"d","ts_ms":1676598823156,"transaction":null}}
null
카프카의 메시지를 beautify한 것이다.
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"aggregate_type"
},
{
"type":"string",
"optional":false,
"field":"aggregate_id"
},
{
"type":"string",
"optional":false,
"field":"type"
},
{
"type":"string",
"optional":false,
"name":"io.debezium.data.Json",
"version":1,
"field":"payload"
},
{
"type":"string",
"optional":false,
"name":"io.debezium.time.ZonedTimestamp",
"version":1,
"default":"1970-01-01T00:00:00Z",
"field":"timestamp"
}
],
"optional":true,
"name":"dbserver1.course.outbox.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"aggregate_type"
},
{
"type":"string",
"optional":false,
"field":"aggregate_id"
},
{
"type":"string",
"optional":false,
"field":"type"
},
{
"type":"string",
"optional":false,
"name":"io.debezium.data.Json",
"version":1,
"field":"payload"
},
{
"type":"string",
"optional":false,
"name":"io.debezium.time.ZonedTimestamp",
"version":1,
"default":"1970-01-01T00:00:00Z",
"field":"timestamp"
}
],
"optional":true,
"name":"dbserver1.course.outbox.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"int64",
"optional":false,
"field":"server_id"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"int64",
"optional":true,
"field":"thread"
},
{
"type":"string",
"optional":true,
"field":"query"
}
],
"optional":false,
"name":"io.debezium.connector.mysql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"dbserver1.course.outbox.Envelope"
},
"payload":{
"before":null,
"after":{
"id":"33319dbb-358c-4f06-8e60-c82b81eab769",
"aggregate_type":"Course",
"aggregate_id":"bc32dd3f-2334-43d9-9493-71b538510efe",
"type":"CoursePostedEvent",
"payload":"{\"name\":\"MSA 파헤치기\",\"posted\":true,\"courseId\":\"bc32dd3f-2334-43d9-9493-71b538510efe\",\"teacherId\":\"7ead2ed1-77fa-4d70-bdfd-77e67c513bbd\",\"thumbnail\":\"msa_course.jpeg\",\"description\":\"MSA의 모든 것을 가르쳐 드립니다.\",\"registeredAt\":1676458941000,\"lastUpdatedAt\":1676598812000}",
"timestamp":"2023-02-17T01:53:42Z"
},
"source":{
"version":"1.4.2.Final",
"connector":"mysql",
"name":"dbserver1",
"ts_ms":1676598822000,
"snapshot":"false",
"db":"course",
"table":"outbox",
"server_id":1,
"gtid":null,
"file":"binlog.000015",
"pos":13396,
"row":0,
"thread":518,
"query":null
},
"op":"c",
"ts_ms":1676598823103,
"transaction":null
}
}
보다시피 하나의 메시지의 양만해도 방대하다. 우리가 관심있는 정보는 payload 부분인데 schema 내용만 해도 메시지가 엄청 길어진다. 또한 지금은 강의 서비스가 강의에 대한 이벤트만 발행하고 있는데 하나의 서비스가 여러 애그리커트에 대한 이벤트를 발행할 수도 있다. 예를 들어, 아래 그림처럼 주문 서비스 (Order service)가 주문 이벤트 (Order event)와 고객 이벤트 (Customer event)를 발행할 수 있다. 즉, 하나의 OUTBOX 테이블에서 서로 다른 종류의 이벤트를 구분하여 다른 topic으로 이벤트를 발행해야 하는 것이다. 그 밖에도 메시지 브로커를 통해 메시지를 소비하다 보면 다양한 이유로 중복된 메시지가 수신되기도 한다. 이는 다음 편에서 다루도록 한다.
[그림] 사용자 서비스 이벤트 아키텍처
다음 글
[3편] MSA 이벤트 발행 - Debezium Transformations
다음 편에서는 카프카 메시지에서 관심없는 부분을 없애 메시지 사이즈를 줄이고 하나의 OUTBOX 테이블에서 여러 토픽으로 메시지를 라우팅하는 방법에 대해 알아보고자 한다.