Flink로 시작하는 Stream processing 3 - 다양한 Transformation Operator

Andy (Yoon Yong) Shin·2021년 10월 9일
0
post-thumbnail

개요

지난 시리즈에 이어서 요번에는 Flink가 기본적으로 제공해주는 transformation operator에 대해서 알아보려고 합니다. 기존에 소개해드렸던, source, transformation 그리고 sink operator 중에 source 그리고 sink 같은 경우, 사실 데이터를 처리하기 위해 데이터를 가져오고 결과를 저장하기 위한 것이기에, data engineering 기준으로는 메인은 실제로 데이터를 처리하는 transformation operator에 있다고 생각합니다. 현재 Flink 버젼 1.14 기준, DataStream API, DataSet API (will be deprecated) 그리고 Table API 로 3가지의 종류에 API 가 존재 합니다. DataStream API와 DataSet API가 Core API이면서 먼저 출시 되었으며, Flink도 요새 대세에 맞춰 SQL로 쉽게 데이터 처리를 가능하게 하는 high level API인 Table API가 추가로 출시가 되었습니다. Flink 1.12 기준 DataSet API는 Table API로 대체가 될 예정인거 같습니다. 개인적으로는 SQL 보다는 DataStream/DataSet API가 코드적인 표현력이 더 좋고 생각 하기 때문에 이번 시리즈에는 deprecated 되겟지만, DataSet API에 대해서 알아 보려 합니다. (deprecated 된다하더라도, DataSet API를 사용하여, 만든 Table API이기 때문에 내부적으로는 남아 있을거라 예상합니다 ㅎ)

Transformation

Source와 sink operator가 시작과 끝을 담당한다면, Transformation같은 경우 중간 다리역할을 담당 하다 보니, stream으로 생각하면, 모든 transformation operator는 데이터가 들어오는 input 과 output이 transformation operator별로 정해져 있으며, 이는 SQL에 table relation과 비슷합니다. 아래와 같이 정의 할수 있습니다.

  • one to one - 1개의 데이터가 들어오면, 1개의 데이터가 나간다.
  • one to many - 1개의 데이터가 들어도면, N개의 데이터가 나간다.
  • many to one - N개의 데이터가 들어오면, 1개의 데이터가 나간다.

아래 부터는 operator들의 설명을 할까 합니다. Operator 제목 옆에 IN → OUT으로 데이터의 변환 갯수를 나타내었으며, 추가로 어떤 DataSet에 사용할 수 있는지도 같이 명시 하였습니다. DataSet의 종류는

  • DataSet - 일반적인 DataSet
  • Grouped DataSet - Data들이 특정 key로 합쳐진 DataSet
  • Tuple DataSet - Data가 Tuple 일때.

Map (1 → 1) - DataSet

Map operator는 one to one operator로 1개의 데이터가 들오면, 1개의 데이터만 나갈 수 있습니다. 강제적으로 데이터를 모양을 변경과 동시에 1 대 1에 확실성이 필요 하다면, map operator를 사용 하시면 됩니다.

코드 사용 예제 - String 을 Int로 변환 하는 예제

// Source: "11", "12", "13" 
// Transformation: String to Int
// 결과: 11, 12, 13 
env.fromElements("11", "12", "13")
.map(Integer::parseInt)
.print();

FlatMap (1 → N) - DataSet

Map 과 비슷한 작업을 하지만, one to many operator 입니다. 1개의 데이터로 n개의 데이터를 만들수 있습니다.

코드 사용 예제

// Source: 0, 1, 2 
// Transformation: 넘어온 숫자 만큼 print 하기. (0 은 무시됨)
// 결과: 
// 1
// 2
// 2
env.fromElements(0, 1, 2)
.flatMap((Integer in, Collector<Integer> collector) -> {
    IntStream.range(0, in)
            .forEach((i) -> {
                collector.collect(in);
            });
})
// 아래 return type 같은 경우 java에서만 사용,
// scala에서는 바로 lambda function이 잘 알아듣지만 
// java는... 명시적으로 type을 말해주지않으면 알아 먹질 않는다..
.returns(Types.INT) 
.print();

MapPartition (N → N) - DataSet

FlatMap과 비슷하게 one to many operator 이며, 구동 방법 또한 동일합니다. 다면 flat-map과의 차이점이라면, flat-map 같은 경우 넘어오는 데이터당 지정한 데이터 transform이 실행 되는 반면, map-partition은 해당 partition의 전체를 iterate 할수 있는 interable을 전달 받아 실행 되기 때문에 보다 높은 control이 가능 합니다. Map-partition 같은 경우 bounded stream에서만 사용가능 합니다.

아래 코드 예제를 보시면 알겠지만, Map-parition 같은 경우 flat-map 보다 넓은 범위에서 stream에 코드를 관장 하기 때문에 보다 강력하지만, 위험 하기도 합니다. 자칫 잘못하면, 바로 OOM (Out of memory) 에러가 뜰수도 있습니다.

코드 사용 예제 - flat-map 과 동일한 결과를 내지만 조금 다른 map-partition

// Source: 0, 1, 2 
// Transformation: 넘어온 숫자 만큼 print 하기. (0 은 무시됨)
// 결과: 
// 1
// 2
// 2
env.fromElements(0, 1, 2)
.mapPartition((Iterable<Integer> list, Collector<Integer> collector) -> {
    list.forEach(in -> {
        IntStream.range(0, in)
                .forEach((i) -> {
                    collector.collect(in);
                });
    });
})
// 아래 return type 같은 경우 java에서만 사용,
// scala에서는 바로 lambda function이 잘 알아듣지만 
// java는... 명시적으로 type을 말해주지않으면 알아 먹질 않는다..
.returns(Types.INT)
.print();

Filter (N → N) - DataSet

filter는 이름과 동일하게, 정해놓은 condition이 true로 계산된다면, 데이터를 다음 stream으로 넘기는 operator입니다.

코드 사용 예제

// Source: 0, 1, 2 
// Transformation: 2 미만의 숫자만
// 결과: 
// 0
// 1
env.fromElements(0, 1, 2)
.filter(i -> i < 2)
.print();

Reduce (1 → 1) - Group DataSet

reduce는 DataSet에 존재하는 모든 item을 하나로 합칠 수 있습니다. 합치기 위해 필요한 연산은 개발자가 직접 정의해, 여러가지 계산식을 구현할 수 있습니다.

코드 사용 예제

// Source: 1, 2, 3 
// Transformation: 모두 더하기
// 결과: 
// 6
env.fromElements(1, 2, 3)
.reduce((ReduceFunction<Integer>) Integer::sum)
.print();

GroupReduce (N → N) - DataSet, Group DataSet

reduce와 비슷한 계산방법을 가지고 있지만, reduce의 IN 과 OUT은 같은 type이어야 하는 반면, GroupCombine은 원하는 type으로 변환하여, 내보낼 수 있으며, flatMap과 mapParition 과 동일한 느낌으로, GroupReduce는 전체 element를 interate 할수 있는 interable이 주어진다.

추가로, 대용량을 처리하게된다면, 한번에 1개의 컴퓨터에서 처리하기에는 메모리적으로 비쌀수 있기에 중간 결과값을 (partial result) 서버끼리 통신 없이 계산하는 Combine function까지 만들면, 보다 성능적으로 이점을 챙길수 있습니다. GroupReduce는 기본적으로는 해당 기능을 사용하지 않지만, GroupReduceFunction을 만들때, GroupCombineFunction도 같이 implement 함으로서 combine method를 통해 중간 결과값을 개발자가 원하는 방식으로 계산할 수 있습니다. 한가지 제약이 있다면, combine method의 IN과 OUT type은 reduce에 IN으로 둘다 고정하여, 정의 해야 합니다. (아니면 에러 발생)

코드 사용 예제

// Source: 1, 2, 3 
// Transformation: 모두 더하기
// 결과: 
// 6
env.fromElements(1, 2, 3)
.reduceGroup(((GroupReduceFunction<Integer, Long>) (values, out) -> {
    Long sum = 0L;
    for(Integer i : values){
        sum += i;
    }

    out.collect(sum);
}))
.returns(Types.LONG)
.print();

// 또는 combine기능을 넣기 위해 아래와 같이 정의를 해야 합니다.

class IntSummer implements GroupReduceFunction<Integer, Long>, GroupCombineFunction<Integer, Integer> {

    @Override
    public void combine(Iterable<Integer> values, Collector<Integer> out) throws Exception {
        Integer sum = 0;
        for (Integer i : values) {
            sum += i;
        }

        out.collect(sum);
    }

    @Override
    public void reduce(Iterable<Integer> values, Collector<Long> out) throws Exception {
        Long sum = 0L;
        for (Integer i : values) {
            sum += i;
        }

        out.collect(sum);
    }
}

env.fromElements(1, 2, 3)
.reduceGroup(new IntSummer())
.returns(Types.LONG)
.print();

GroupCombine (N → N) - DataSet, Group DataSet

GroupReduce 에서 중간 결과 계산으로 성능 개선하면서 강제로 동일한 type을 가져간 문제를 해결하기 위해 나온 GroupCombine입니다. 아래 예제 코드를 확인 하시면, Long type으로 변하는 시점을 정할수 있게 됩니다.

코드 사용 예제

// Source: 1, 2, 3 
// Transformation: 모두 더하기
// 결과: 
// 6
env.fromElements(1, 2, 3)
// 각 서버에서 중간 결과 합계 - (서버끼리 통신 최소화 greedy 방식으로 최대한 local memory로 정산 하고 넘겨줌)
.combineGroup((GroupCombineFunction<Integer, Long>) (values, out) -> {
    Long sum = 0L;
    for (Integer i : values) {
        sum += i;
    }

    out.collect(sum);
})
.returns(Types.LONG)
// 최종 결과 합계 
.reduceGroup(((GroupReduceFunction<Long, Long>) (values, out) -> {
    Long sum = 0L;
    for (Long i : values) {
        sum += i;
    }

    out.collect(sum);
}))
.returns(Types.LONG)
.print();

Aggregate (N → N) - Tuple DataSet, Group DataSet

Aggregate은 기본적으로 SUM, MIN 그리고 MAX 작업을 제공 하며, SQL GROUP BY와 거의 흡사하게 동작 합니다. 한번에 두개 field 이상의 aggregate 계산이 필요하다면, 아래 예제 코드 처럼 and 로서 실행이 가능합니다. .aggregate.and 와 .aggregate.aggregate는 전혀 다른 결과물을 가져오며, .aggregate.aggregate는 순차적으로 진행 되어 원하는 값이 나오지 않을 경우가 큽니다.

코드 사용 예제

// Source: 
// "1", 5, 10L
// "1", 6, 9L
// "1", 7, 8L
// "2", 8, 7L
// "2", 9, 6L
// "3", 10, 5L
// Transformation: 필드 1 모두 더하고, 필드 2 최소값만 유지
// 결과: 
// (1,18,8)
// (2,17,6)
// (3,10,5)
env.fromElements(
        new Tuple3<>("1", 5, 10L),
        new Tuple3<>("1", 6, 9L),
        new Tuple3<>("1", 7, 8L),
        new Tuple3<>("2", 8, 7L),
        new Tuple3<>("2", 9, 6L),
        new Tuple3<>("3", 10, 5L)
)
.groupBy(0)
.aggregate(Aggregations.SUM, 1)
.and(Aggregations.MIN, 2)
.print();

MinBy / MaxBy (N → N) - Tuple DataSet, Group DataSet

Tuple에서 선택된 필드가 가장 min 또는 max 한 element로 선택되어 넘긴다.

코드 사용 예제

// Source: 
// "1", 5, 10L
// "1", 6, 9L
// "1", 7, 8L
// "2", 8, 7L
// "2", 9, 6L
// "3", 10, 5L
// Transformation: 필드 1 최소 값의 element만 내보낸다. 같을 경우 필드 2 사용
// 결과: 
// (1,5,10)
// (2,8,7)
// (3,10,5)
env.fromElements(
        new Tuple3<>("1", 5, 10L),
        new Tuple3<>("1", 6, 9L),
        new Tuple3<>("1", 7, 4L),
        new Tuple3<>("2", 8, 7L),
        new Tuple3<>("2", 9, 6L),
        new Tuple3<>("3", 10, 5L)
)
.groupBy(0)
.minBy(1, 2)
.print();

Projection (1 → 1) - Tuple DataSet

Tuple로 넘어오는 event에 필요한 field만 선택해서 보내는 기능

코드 사용 예제

// Source: (0, 1, 2), (1, 2, 3), (2, 3, 4) 
// Transformation: tuple 2번째와 0번째  element만 들고 간다.
// 결과: 
// (2,0)
// (3,1)
// (4,2)
env.fromElements(
new Tuple3<>(0, 1, 2),
new Tuple3<>(1, 2, 3),
new Tuple3<>(2, 3, 4)
)
.project(2, 0)
.print();

Distinct (N → N) DataSet

SQL에서도 사용되는 distinct입니다. DataSet에서 동일한 데이터는 삭제하고, uniq 한 데이터만 내보냅니다.

코드 사용 예제

// Source: 1,1,1,2,2,3
// Transformation: distinct
// 1
// 2
// 3
env.fromElements(1,1,1,2,2,3)
                .distinct()
                .print();

Join (2 → 1) - DataSet, Tuple DataSet

SQL에 JOIN과 동일한 역할을 합니다. 특정 Key를 통해, 데이터가 합쳐집니다. 기본적으로 데이터는 Tuple 형식으로 합쳐집니다. 하지만 with를 사용하면, 원하는 객체로도 transform이 가능 합니다. 아래 예제를 보시면 알겠지만, Join 같은 경우 inner join에 느낌에 가까운 형식으로 join이 되어서, 매칭이 되지 않은 element는 다음 pipeline으로 넘어 가지 않습니다.

코드 사용 예제

// Source1 : 
// ("1", 1),
// ("2", 2),
// ("3", 3),
// ("4", 4)
// Source2 : 
// ("2", 22),
// ("3", 33)
// Transformation: source1과 2의 join
// 결과: 
// ((2,2),(2,22))
// ((3,3),(3,33))
DataSet<Tuple2<String, Integer>> stream1 = env.fromElements(
        new Tuple2<>("1", 1),
        new Tuple2<>("2", 2),
        new Tuple2<>("3", 3),
        new Tuple2<>("4", 4)
);

DataSet<Tuple2<String, Integer>> stream2 = env.fromElements(
        new Tuple2<>("2", 22),
        new Tuple2<>("3", 33)
);

stream1.join(stream2)
        .where("f0")
        .equalTo("f0")
        .print();

OuterJoin (N → N) - DataSet, Tuple DataSet

SQL에 LEFT RIGHT FULL OUTER JOIN과 같습니다. 다만, 매칭되지 않은 outer row에 대해서 어떻게 동일한 결과 type으로 transformation을 해야 하는 지에 대한 assigner가 필요하기에 JoinFunction을 추가로 with 로 정의 해야지만 사용이 가능 합니다.

코드 사용 예제


// 결과
// leftOuterJoin
// (1,1,-1)
// (2,2,22)
// (3,3,33)

// rightOuterJoin
// (2,2,22)
// (3,3,33)
// (4,-1,44)

// fullOuterJoin
// (1,1,-1)
// (2,2,22)
// (3,3,33)
// (4,-1,44)

DataSet<Tuple2<String, Integer>> leftStream = env.fromElements(
      new Tuple2<>("1", 1),
      new Tuple2<>("2", 2),
      new Tuple2<>("3", 3)

);

DataSet<Tuple2<String, Integer>> rightStream = env.fromElements(
      new Tuple2<>("2", 22),
      new Tuple2<>("3", 33),
      new Tuple2<>("4", 44)
);

leftStream.fullOuterJoin(rightStream) // leftOuterJoin, rightOuterJoin
      .where("f0")
      .equalTo("f0")
      .with(new CombineAssigner())
      .print();

class CombineAssigner
        implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
    @Override
    public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws Exception {

        String key = "";
        Integer firstf1 = -1;
        Integer secondf1 = -1;

        if (first != null) {
            key = first.f0;
            firstf1 = first.f1;
        }

        if (second != null) {
            key = second.f0;
            secondf1 = second.f1;
        }

        return new Tuple3<>(key, firstf1, secondf1);
    }

}

Cross (N → N) - DataSet, Tuple DataSet

SQL Cross join과 동일하게 작동합니다. Left dataSet과 right dataSet에 모든 유니크한 조합 값을 돌려줍니다. 해당 join 기능도 기본적으로는 양쪽 dataSet을 tuple로서 합쳐 결과를 돌려줍니다.

코드 사용 예제

// 결과
// ((1,1),(2,22))
// ((2,2),(2,22))
// ((3,3),(2,22))
// ((1,1),(3,33))
// ((2,2),(3,33))
// ((3,3),(3,33))
// ((1,1),(4,44))
// ((2,2),(4,44))
// ((3,3),(4,44))
DataSet<Tuple2<String, Integer>> leftStream = env.fromElements(
          new Tuple2<>("1", 1),
          new Tuple2<>("2", 2),
          new Tuple2<>("3", 3)

);

DataSet<Tuple2<String, Integer>> rightStream = env.fromElements(
        new Tuple2<>("2", 22),
        new Tuple2<>("3", 33),
        new Tuple2<>("4", 44)
);

leftStream.cross(rightStream)
        .print();

CoGroup (N → N) - DataSet, Tuple DataSet

위 flatMap 과 mapParition의 관계를 join과 비슷하게 가지고 있는 CoGroup입니다. 두개의 dataSet을 어떻게 join 할지 양쪽 전체 dataSet 가지고 사용자가 정의 할수 있습니다. 추가로 join과 다르게 원하는 dataSet 숫자와 별개로 원하는 만큼에 결과도 내려보낼수 있습니다.

코드 사용 예제


// 결과
// (1,1,-1)
// (2,2,22)
// (3,3,33)
// (4,-1,44)

DataSet<Tuple2<String, Integer>> leftStream = env.fromElements(
        new Tuple2<>("1", 1),
        new Tuple2<>("2", 2),
        new Tuple2<>("3", 3)

);

DataSet<Tuple2<String, Integer>> rightStream = env.fromElements(
        new Tuple2<>("2", 22),
        new Tuple2<>("3", 33),
        new Tuple2<>("4", 44)
);

leftStream.coGroup(rightStream)
        .where("f0")
        .equalTo("f0")
        .with(new OuterJoinCoGrouper())
        .print();

class OuterJoinCoGrouper
        implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {

    @Override
    public void coGroup(Iterable<Tuple2<String, Integer>> first, Iterable<Tuple2<String, Integer>> second, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {

        Map<String, Tuple3<String, Integer, Integer>> resultMap = new HashMap<>();

        for (Tuple2<String, Integer> item : first) {
            resultMap.put(item.f0, new Tuple3<>(item.f0, item.f1, -1));
        }

        for (Tuple2<String, Integer> item : second) {
            if (resultMap.containsKey(item.f0)) {
                Tuple3<String, Integer, Integer> existingTuple = resultMap.get(item.f0);
                existingTuple.setField(item.f1, 2);
            } else {
                resultMap.put(item.f0, new Tuple3<>(item.f0, -1, item.f1));
            }
        }

        resultMap.forEach((key, value) -> out.collect(value));
    }
}

Union (1 → 1) - DataSet, Tuple DataSet

SQL에서 널리 알려져있는 union과 같은 기능을 합니다. 두개의 dataSet을 1개로 합쳐 줍니다. (이어 붙이기)

코드 사용 예제

// 결과:
//        (1,1)
//        (2,2)
//        (3,3)
//        (2,22)
//        (3,33)
//        (4,44)
DataSet<Tuple2<String, Integer>> leftStream = env.fromElements(
        new Tuple2<>("1", 1),
        new Tuple2<>("2", 2),
        new Tuple2<>("3", 3)

);

DataSet<Tuple2<String, Integer>> rightStream = env.fromElements(
        new Tuple2<>("2", 22),
        new Tuple2<>("3", 33),
        new Tuple2<>("4", 44)
);

leftStream.union(rightStream)
        .print();

Rebalance (1 → 1) - DataSet, Tuple DataSet, Group DataSet

Data에 행하는 transformation 보다는, 병렬로 처리시, 불균등한 workload 분배를 재분배 하는 logic 입니다.

코드 사용 예제

// 결과: 동일함
//        (1,1)
//        (2,2)
//        (3,3)
DataSet<Tuple2<String, Integer>> leftStream = env.fromElements(
        new Tuple2<>("1", 1),
        new Tuple2<>("2", 2),
        new Tuple2<>("3", 3)

);

leftStream
        .rebalance()
        .print();

Hash-Partition, Range-Partition, Sort Partition

Parition은 실질적으로 dataSet에 특별한 transformation을 적용하는 것이 아니지만, 해당 data가 병렬로 처리될때 어떤 형식으로 각각에 task slot에 분산될지에 대한 선언입니다.

e.g. DataSet을 8의 processor에 분할하여, data 처리를 할대 어떤 기준으로 데이터를 나누게 될지.

코드 사용 예제

// 결과: 내부적인 데이터 split 전략에 대한 선언이기 때문에 결과는 동일함
//        (1,1)
//        (2,2)
//        (3,3)
DataSet<Tuple2<String, Integer>> leftStream = env.fromElements(
        new Tuple2<>("1", 1),
        new Tuple2<>("2", 2),
        new Tuple2<>("3", 3)

);

// Hash
leftStream
        .partitionByHash(0)
        .print();

// Range
leftStream
        .partitionByRange(0)
        .print();

// Sort
leftStream
        .sortPartition(0, Order.DESCENDING)
        .print();

First-n (N → N) - DataSet, Tuple DataSet, Group DataSet

간단한 operator로 첫번째 N개의 데이터를 내려보낸다.

코드 사용 예제

// 결과: 
//        (1,1)
//        (2,2)
DataSet<Tuple2<String, Integer>> leftStream = env.fromElements(
        new Tuple2<>("1", 1),
        new Tuple2<>("2", 2),
        new Tuple2<>("3", 3)

);

leftStream
        .partitionByHash(0)
        .first(2)
        .print();

마치며

요번에는 이제 곧 legacy로 숨겨질 Flink DataSet API의 transformation operator들을 살펴 보았습니다. 해당 API가 숨겨질 지언정, 내부적으로는 Table API를 구현하기 위해 사용되고 있기 때문에 배워두면, Flink가 SQL API가 어떤 형식으로 구동 하는지 파악할수 있기에 좋은 거 같습니다. 다음 시간에는 Flink에 DataStream API에 꽃이라고 생각 하는 window operator에 대해 자세히 알아보려고 합니다. 감사합니다!

0개의 댓글