Flink로 시작하는 Stream processing 4 - Testing

Andy (Yoon Yong) Shin·2021년 10월 24일
1
post-thumbnail

개요

개발자마다 조금식 다르겠지만, 제가 처음 개발을 시작 했을때, 제 코드는 완벽할줄 알았습니다. 하지만, 1년 2년 지나고, 다수에 프로젝트를 경험하고, 구현해야 하는 기능에 복잡도가 올라가는 순간, 제일 먼저 의심해야 하는건 제 코드라는 것을 빠르게 깨닫았습니다. 개발에서 테스트는 중요한 역할을 합니다. 가장 기본적인 것으로는, 구현한 기능이 제대로 작동을 하는지 확인하는 방식으로 사용되지만, 구현한 테스트 코드는 추후에 지속적으로 가동되며, 테스트 하는 코드에 기능적인 변화가 있는지 감지하는 화재 경보기 같은 역할도 합니다. (e.g. 새로운 기능을 위해 변화된 코드가 기존 기능이 잘 작동되는 지 확인) 위 두가지도 테스트 코드를 작성해야하는 중요한 이유이지만, 제 개인적인 생각으로 개발자가 테스트 코드를 작성해야 하는 또 다른 이유는, 테스트 코드는 개발자에게 자신에 코드에 대한 자신감을 만들어줍니다. 이번 글에서는, 대용량 데이터를 stream processing 방식으로 처리하게 해주는 Flink에서는 어떤방식으로 테스트가 가능한지에 대해 적어보려합니다.

이 글에서는 아래 library를 사용하여, 테스트 코드 예제를 작성 하였습니다.

java 1.8
flink 1.13.2
junit 5.7.1
assertj 3.20.0
testcontainers 1.16.0

Code Test 구성

개발을 하다보면 여러가지 테스트에 방식이 있지만, 이 글에서는 Flink는 어떵방식으로 integration test와 unit test를 진행하는 지에 대해 알아보겠습니다. 테스트 방식 이외에도 테스트 자체에서는 보통 아래에 3개에 종류로 test를 분류합니다.

Positive Case - 해당 테스트에 의도는, 구현한 코드가 정상적으로 작동해야 하는 상황을 테스트 합니다.

Negative Case - 해당 테스트에 의도는, 구현한 코드가 정상적으로 작동하면 안되는 상황을 테스트 합니다.

Exception Case - 의도적으로 에러를 내뱉는 상황을 테스트합니다.

보통 테스트 코드를 작성한다면, 구현된 코드가 실제로 production으로 출시가된 후에 격을 모든 상황을 테스트 코드로 작성하는 것으로 목표로 해야 하지만, 제 개발자 커리어에서는 단 한번도 개발자에게 테스트 코드를 여유롭게 짤수 있는 시간이 주어지지 않다보니, 제 나름대로 테스트 코드를 구성할때에 시간이 부족하다면, 아래에 rule을 기준으로 우선순위을 만들어, 최대한 할수 있는 만큼 진행 후 출시하는 방식으로 현재는 개발하고 있습니다.

  1. 가장 영향력 잇고, 반복적이며, 사람이 테스트 하기 힘든 기능 - 불편한 진실이긴 하지만, 프로젝트를 진행할때, 개발적인 시간이 부족해지면 제일 먼저 버려지는게 테스트 코드를 작성하는 것입니다. 그렇다면, 최소한 production에 나가더라도, 가장 문제가 생겨서는 안되는 부분만은 테스트 코드를 만듬으로서, 피해를 최소화 해야 합니다. e.g. 비밀번호 최소 길이 8자 이상 보다는 결제 기능에 대한 테스트가 우선!
  2. 정해진 시나리오 먼저 - 위에 미리 설명드린 바와 같이 production에서 어떤 상황으로 코드가 실행이 될지 모르기 때문에, 모든 시나리오를 생각하고 테스트 코드로 옴기기에는 많은 시간과 노력이 필요 합니다. 그러므로, 모든 상황을 억지로 생각해내기 보다는 지금 당장 생각난 시나리오만 빠르게 테스트 코드로 작성하고 빠르게 넘어가는 게 시간절약에 도움이 됩니다. (대체로 생각이 안나는 경우는 그만큼 중요하지 않은 시나리오이기 때문일 것입니다.)
  3. Edge case 시나리오 지속적으로 - 마지막은 위 두가지 우선순위로 진행될시 추후 프로젝트에 문제가 생길 확률이 높기에, 보완하기 위한 rule 입니다. 보통 생각하지도 못한 상황에 test case를 edge case라 칭하며, 일반적이지 않는 상황을 말합니다. 해당 상황들은 추후 테스트 커버리지를 보다 촘촘하게 만들기 위해 지속적으로 운영과 함께 병행 생성되어야 합니다. e.g. 비밀번호 길이을 99999로 하려는 사용자.

버그 발견 → 테스트 케이스로 코드 작성 → 테스트를 통과하기 위한 코드 수정 → CT / CI / CD 로 설정

시간이 촉박한 상황에서도 위와 같이 진행한다면, 궁극적으로는 완성도 높은 테스트를 가진 프로젝트로 개발을 마무리할 수 있습니다.

Flink용 Test

Flink도 결국엔 일반적인 코딩과 다를게 없기에, unit test 까지는 별반 차이가 없이 진행이 됩니다. 다만, 그 이상으로 integration test 그리고 모든 환경을 갖춘 테스트를 가게 된다면, 일반적인 방식으로는 테스트하기 힘들어집니다. 저는 보통 2가지 이유 때문에, 해당 상황에 맞닫들이게 된다고 생각합니다.

  1. Flink는 일반적으로 실행되는 방식과 다르게, job이라는 단위로 개발이 되며, 해당 파일을 이미 작동중인 Flink cluster라는 다수에 서버로 구성된 cluster에 올라가 실행이 되는 형식이기에 flink cluster가 존재하지 않는 상황에서 비슷한 환경에서 테스트를 구동하기 힘듭니다.
  2. Flink의 코드는 보통 stream 처리로, 전체적인 시스템에서는 중간에 자리 잡혀있으며, 데이터에 흐름을 담당하면서, 다양한 데이터 전송 기술에 노출되기 때문에, 흔히 외부 연동 테스트로 사용되는 embeded db인 h2 database로만 온전히 테스트하기에는 모든 테스트를 production과 비슷한 상황으로 진행할수가 없습니다. (e.g. Kafka, SQLs, NoSQLs, s3, minio, RabbitMQ, etc...)

하여, 완벽하진 않지만, 위 두가지 문제를 풀기위해 제가 사용했던 flink test 방식을 공유하고자 합니다.

Unit Test - JUnit

Flink에 unit test는 일반적인 테스트 코드와 별반 차이가 없습니다. 하지만, source, transformation, sink operator를 구현하면서, 갖춰야 할 Flink operator만에 interface가 존재하기 때문에 해당 인터페이스를 테스트하기에 애매한 상황이 되는 케이스가 많습니다. 하여, 저같은 경우 operator를 단위 테스트 하기 위해, 간단하게 얇은 추상화 layer를 사용하는 편입니다.

아래는 아주 간단한 요구사항을 가지고 만든 예제입니다.

요구사항: List 에서 길이가 5인 String만 String으로 내려 보낸다.

예제 코드: 테스트를 고려 하지 않은 Flatmap operator

@Slf4j
@NoArgsConstructor
public class BlogFlatList implements FlatMapFunction<String, String> {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
					if (value.length() == 5)
            out.collect(value);
    }
}

예제 코드: 테스트를 고려한 Flatmap operator


@Slf4j
@NoArgsConstructor
public class BlogFlatList implements FlatMapFunction<String, String> {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        if (checkStringLength(value))
            out.collect(value);
    }

		// 아래 main logic만 테스트 진행가능.
    private boolean checkStringLength(String input) {
        return input.length() == 5;
    }
}

사실, 테스트를 고려하지 않은 Flatmap을 테스트할 방법이 없는 것은 아닙니다. Mocking library로 flatMap method에 Collector를 Inject하는 방식으로 가능하겠지만, 테스트를 하기위한 준비코드가 길어지며, 위 간단한 layering은 Flink가 제공한 operator interface에 logic을 뭉텅이 넣는 것을 방지하며, 다른 팀원에게 코드를 인수인계 할때도, 가장 기본적인 JUnit만으로 테스트를 확인가능하기 때문에, 저희 팀에 프로젝트에서는 layering하는 방식을 사용하고 있습니다.

Flink에 Job이나, 여러개의 커스텀 Flink operator를 stream processing 형식으로 테스트하려면 결국에는 Flink cluster 를 통해 해당 stream을 실행해봐야 합니다. 다행히도, Flink는 embeded형식으로 test로서 flink cluster를 생성할수 있는 test library를 제공 해주며, 해당 library는 아래와 같이 import하면 사용이 가능합니다.

testImplementation "org.apache.flink:flink-test-utils-junit:${flinkVersion}"
testImplementation "org.apache.flink:flink-test-utils_${scalaBinaryVersion}:${flinkVersion}"

위 Library 를 import 하게 된다면, MiniClusterWithClientResource 라는 객체를 만들수 있으며, 일반적으로 job을 실행하듯이 test에 자동으로 MiniClusterWithClientResource 에 task slot을 사용하여, 테스트를 진행합니다. 아래는 예제 코드는 위와 동일한 테스트이지만, Job으로 변환하여 진행된 integration test입니다.

@Slf4j
public class IntegrationTest {
    private static final int PARALLELISM = 1;

    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(PARALLELISM)
                            .setNumberTaskManagers(1)
                            .build());

    @Test
    @DisplayName("should have 2 resulting event")
    void test() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setClosureCleanerLevel(ExecutionConfig.ClosureCleanerLevel.NONE);
				// 두개의 이벤트만 글자가 5글자 이기 때문에, 2개의 이벤트가 결과적으로 나와야함
        env.fromElements("should not go through", "test1", "test2", "should not go through")
                .flatMap(new BlogFlatList())
                .addSink(new TestSink<>("result"));

        JobExecutionResult jobResult = env.execute();

        Map<String, Object> accumMap = jobResult.getAllAccumulatorResults();
        List<String> output = (List<String>) accumMap.get("result");

        Assertions.assertThat(output.size()).isEqualTo(2);
    }
}

// 위 사용한 test를 위한 sink
public class TestSink<OUT> extends RichSinkFunction<OUT> {

    private final String name;

    public TestSink(String name) {
        this.name = name;
    }

    public TestSink() {
        this("results");
    }

    @Override
    public void open(Configuration parameters) {
        getRuntimeContext().addAccumulator(name, new ListAccumulator<OUT>());
    }

    @Override
    public void invoke(OUT value, Context context) {
        getRuntimeContext().getAccumulator(name).add(value);
    }

    public List<OUT> getResults(JobExecutionResult jobResult) {
        return jobResult.getAccumulatorResult(name);
    }
}

위에서 주목해야 할것은 TestSink인데요. 해당 sink operator는 stream 작업이 끝난 후 assert로 확인이 가능하게, in-memory state 저장소에, 결과물을 저장해줍니다. TestSink를 생성할때 넘겨준 "result"가 결과물이 저장될 state 저장소 key-value pair에 key라고 보시면 될거 같습니다.

Full Environment Test - Testcontainers

Integration test 까지 진행하다보니, 실제 주변 환경들 까지 다 사용한 테스트를 진행하고 싶어져서 찾다 보니 TestContainers라는 java library를 찾게 되었습니다. 사용하는 방법은 생각보다 쉬우며, 당연히 dependency로 docker가 실행하는 컴퓨터에 설치가 되어 있어야 합니다. 사용방법은 아래와 같습니다. 아래 테스트 같은 경우 환경설정 부분 때문에 생각보다 코드가 길어졌지만, 위 integration test에서 fromElements에서 kafka source로 변환된거 밖에 없습니다.

@Slf4j
public class IntegrationTest {
    private static final int PARALLELISM = 1;

    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(PARALLELISM)
                            .setNumberTaskManagers(1)
                            .build());

    protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
        testKafkaFunctionality(bootstrapServers, 1, 1);
    }

    protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
        try (
                AdminClient adminClient = AdminClient.create(ImmutableMap.of(
                        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers
                ));

                KafkaProducer<String, String> producer = new KafkaProducer<>(
                        ImmutableMap.of(
                                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
                                ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()
                        ),
                        new StringSerializer(),
                        new StringSerializer()
                )

        ) {
            String topicName = "messages";
            Collection<NewTopic> topics = singletonList(new NewTopic(topicName, partitions, (short) rf));
            adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
            producer.send(new ProducerRecord<>(topicName, "1", "should not go through")).get();
            producer.send(new ProducerRecord<>(topicName, "1", "test1")).get();
            producer.send(new ProducerRecord<>(topicName, "1", "test2")).get();
            producer.send(new ProducerRecord<>(topicName, "1", "should not go through")).get();
            producer.send(new ProducerRecord<>(topicName, "1", "end")).get();
        }
    }

    @Test
    @DisplayName("should have 2 resulting event")
    void test1() throws Exception {
        KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
        kafka.start();
        testKafkaFunctionality(kafka.getBootstrapServers());

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setClosureCleanerLevel(ExecutionConfig.ClosureCleanerLevel.NONE);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafka.getBootstrapServers());
        properties.setProperty("group.id", "messages-group-1");

        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("messages",
                new TestDeserializer(), properties);

        consumer.setStartFromEarliest();

        env.addSource(consumer)
                .flatMap(new BlogFlatList())
                .addSink(new TestSink<>("result"));

        JobExecutionResult jobResult = env.execute();

        Map<String, Object> accumMap = jobResult.getAllAccumulatorResults();
        List<String> output = (List<String>) accumMap.get("result");
        Assertions.assertThat(output.size()).isEqualTo(2);
    }
}

마치며

오늘은 Flink 코드를 작성할때, 사용하는 여러가지 테스트 방법에 대해 적어봤습니다. 저 같은 경우 대용량의 데이터를 여러가지로 복잡한 Logic으로 처리하다 보면, 어느순간 내가 만든 과거에 코드들이 잘작동하지 않는 거 같아 길을 잃을때가 많았습니다. 하지만 비즈니스 로직이 변한게 아니라면, 테스트 코드 로직또한 변하지 않기에, 간단히 예전에 만들었던 테스트들을 다시 구동하여, 개발의 처음으로 되돌아가 data quality를 쌓는 일들을 방지하고 있습니다. (context switching은 너무 비효울적이다!) 현업에서 복잡한 데이터 처리에 대한 검증을 위해 많은 테스트를 진행 하고 있지만, data engineer로서 data quality는 한순간에 만들어지는 것이 아니라 테스트로서 차근차근 쌓음으로서 만들어진다고 생각합니다. 다음 글은 Flink에 watermark와 timestamp에 대해서 적도록 하겟습니다!

2개의 댓글

comment-user-thumbnail
2021년 10월 25일

Flink는 잘 모르지만 가장 먼저 의심해야 하는 것은 스스로 작성한 코드라는 것, 테스트에서도 우선순위가 필요하다는 것 배워가는군요. 좋은 글 감사합니다.

1개의 답글