Flink로 시작하는 Stream processing 5 - Window Operator pt1

Andy (Yoon Yong) Shin·2021년 11월 7일
1
post-thumbnail

개요

오늘은 개인적으로 Stream processing의 꽃이라고 생각하는 window operator에 대해 적어 보려고 합니다. Window operator는 간단하게 설명하자면, stream으로 흘들어 오는 이벤트를 묶어서 처리 하는 것입니다. 실시간을 들어오는 이벤트를 묶어서 처리해야 하는 여러가지 상황이 있지만, 아래와 같은 상황에 사용할 수 있습니다.

  • 각종 sensory data를 사용하여, 이상징후를 체크 할때 - 생각해보면 당연하지만, sensor를 통해 수집되는 데이터는 종류에 따라 다르지만, +- 오차가 존재하며, 생각 보다 noise가 많습니다. 따라서 windowing을 통해 mean, median, mode, lower quantile, upper quantile 그리고 지속시간과 같은 간단한 통계를 통해 보다 확실한 이상징후를 감지할 수 있습니다.
  • 웹사이트에 접속한 사용자의 세션을 처리할때 - 사실 일반적인 웹사이트에서 수용하는 트래픽으로는 굳이 Flink의 windowing을 사용할 필요성이 없지만, 트래픽의 양이 몰리게 된다면, 잠시 session을 저장하기 위해 생기는 외부 I/O를 대한 부담이 많이 커져서, 제 경험을 일반화 시킬 수 없지만, 당시 1번에 외부 I/O가 전체 성능에 30%를 감소시킬 정도로 부담이 컸습니다. 하지만 window operator를 사용한다면, 세션 시작에 한번 불러온 데이터를 세션이 닫히기전까지, 사용가능 하기 때문에 최소화 가능하게 됩니다.

위 상황들 말고도 여러 상황에서 window operator는 유효하게 사용 가능하며, 상황만 알맞은 곳에 적용한다면, 제 개인적으로는, 트래픽 대비 서버 사용량도 효율적으로 대폭 감소 시킨 경험이 있습니다.

Key란?

Window operaton은 keyed와 non-keyed가 존재 하는데, 결국 window operator는 특정 이벤트를 묶어서 처리해주는 기능이라면, key는 어떤 방식으로 이벤트가 같이 묶여서 처리되는 지에 대한 partition 설정이라고 보실수 있습니다. Key를 설정하게 된다면, 이후 이벤트 처리에서 같은 키들은 묶여서 처리되며, 다른 키는 결코 같이 묶일수 없으며, (이벤트 처리후 join을 강제적으로 하지 않는 이상...) 분리되어서 처리된다고 볼수 있고, Non-keyed로 진행한다면, 전체의 이벤트가 window operator가 작업이 되는 동안에 같이 window로 묶일 가능성이 생깁니다. (무조건 묶이는게 아니라, 그후에 추가로 묶이는 logic에 의해 결정됨) 추가로, key로 진행 된 window 같은 경우 여러 parition으로 나뉘어 지기 때문에, 각각의 key window로 병렬 처리가 가능합니다.

아래에 예제 코드는 간단하게, keyed window와 non-keyed window의 선언법 입니다.

// keyed window
stream
	.keyBy(...)
	.window(...)

// non-keyed window
stream
	.windowAll(...)

Window Assigner란?

Window assigner는 어떻게 넘어온 이벤트가 어떤 형식으로 window를 생성하고 닫는 지에 대한 정의를 합니다. Flink에서는 기본적으로 아래 4가지의 window를 제공합니다.

  • Tumbling Windows - tumbling window 선언할때 지정한 시간만큼 window를 열고 닫고, 해당 시간에 들어온 이벤트는 묶어서 처리되게 됩니다. 해당 시간은 UTC 기준으로 window가 조정되며, 두번째 파라미터로 UTC 기준 오프셋으로 원하는 시간 기준으로 offset을 조정 가능하다.
    // 5 초간 윈도우가 열리고 닫힌다.
    stream
        .keyBy()
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    
    // 한국 시간 기준
    stream
        .keyBy()
        .window(TumblingEventTimeWindows.of(Time.seconds(5), Time.hours(9))))

  • Sliding Windows - tumbling window와 비숫하지만, 이전 window와 다음 window가 겹치게 되어, 이전 window event에 대한 정보도 볼수 있어서 유용하게 사용할 수 있습니다. 첫번째 parameter는 window에 길이이고, 두번째 parameter는 얼마나 자주 window가 만들어지는 지에 대한 parameter 입니다. 마지막 parameter는 tumbling window와 동일하게 UTC 기준 시간을 조정하기 위에 offset으로 사용됩니다.
    // 10초간 윈도우가 열리고, 5초 마다 새로운 window가 시작된다.
    stream
        .keyBy()
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    
    // 한국 시간 기준
    stream
        .keyBy()
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5), Time.hours(9))))

  • Session window - Tumbling, sliding window와 다르게, window에 새로운 이벤트가 들오는 기준으로 window의 생성과 끝이 결정이 됩니다.
    ```java
    // 10분간 새로운 이벤트가 윈도우에 들어오지 않을경우 윈도우가 닫힌다.
    stream
        .keyBy()
    		.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    ```
  • Global window - global window는 그냥 백지라고 보시면 될거 같습니다. 해당 window는 같은 key를 모두 한 window에 할당하며, 열기만 할뿐 닫지 않습니다. 하지만, 그이후에 정의 되는 trigger 설정에 따라 원하는 window 방식으로 동작할수 있게 설정이 가능 합니다. 즉 위에 제공된 tumbling, sliding 그리고 session window가 아닌 clean한 상태에서 새로운 종류의 window를 만들고 싶으시다면, 사용할수 있을거 같습니다.
    //
    stream
        .keyBy()
        .window(GlobalWindows.create())

이미지 출처: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/

Window Function이란?

Window function은 이전에 적은 window로서 뭉친 이벤트를 실제로 처리하는 logic 정하는 곳 입니다. 최초로는 아래와 같이 3가지 처리방식을 제공 합니다.

  • Reduce Function - 해당 function 같은 경우 window로 묶여진 이벤트 2개가 합쳐져서 최종적으로 내보내야하는 type으로 어떻게 계산되는지에 대한 logic을 사용할수 있습니다. Reduce function 같은 경우, incremental 하게 계산되기 때문에, 성능적인 이점이 있습니다. 단점으로는 들어오는 INPUT 과 OUTPUT이 동일한 type이어야 합니다.
// 튜플에 2번째를 sum 하는 reduce function
steam
		.keyBy()
    .window()
    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });
  • Aggregate Function - Reduce보다 조금 더 사용성이 확대 된 버젼으로 총 3가지 type인 input, accumulator, output을 선언하여야 사용가능 합니다. input은 들어오는 data의 종류이며, accumulator는 중간 집계용도라고 볼수 있습니다. output은 마지막으로 내보내야 할 결과물이며, window가 닫힐때 해당 type을 stream 밑으로 내려 보내야 합니다. Reduce와 동일하게, incremental 형식으로 계산되며, input type과 output type을 다르게 지정할 수 있어 보다, 다양한 용도로 사용가능 합니다. 단점은 4가지의 method를 선언해야 하기 때문에 reduce보다 코드를 추가적으로 적어야 합니다.
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

// 평균을 구하는 aggregate function
stream
		.keyBy()
    .window()
    .aggregate(new AverageAggregate());
  • Process Function - Process function은 window가 trigger시 (닫히거나 안닫힐수도 있음) 해당 window에 모든 이벤트를 iterate할 수 있는 function이 주어지기 때문에, 만능에 가까운 기능이지만, 모든 이벤트를 window에 머금고 있어야 하기 때문에, 그만큼 리소스를 더 잡아 먹습니다. (incremetal 하게 계산산되지 않음)
// 윈도우에 몇개에 이벤트가 넘어 왔는 지 string으로 output 내보내는 process function
stream
	.keyBy(t -> t.f0)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction());

public class MyProcessWindowFunction 
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}

마치며

Flink window process를 적다 보니 part1 과 part2로 나누게 되었습니다. 공식 문서에서도 이야기 하듯이 window는 Flink에서 가장 메인이 되는 기능이기도 하며, 다른 stream에서는 디테일하게 제공되지 않는 기능이기도 합니다. (제가 Flink를 선택한 가장 큰 이유이기도 하구요 ㅎ) 다음 파트는 Flink를 통해 제공된 window가 아닌 어떻게 커스텀하게 나만의 window operator 만들 수 있는 지 대해, 적어보겠습니다.

3개의 댓글

comment-user-thumbnail
2022년 2월 18일

좋은 글 감사합니다.

답글 달기
comment-user-thumbnail
2022년 2월 18일

window 라는 개념이 이벤트의 time 기간에 따라 혹은 갯수에 따라 이벤트들을 묶는 단위로 알고 있습니다.
이때 window 가 실생활의 창문(window)이라는 의미로써 사용된게 맞을까요 ??

답글 달기
comment-user-thumbnail
2022년 10월 13일

다음 파트 숨참고 존버중입니다.....!!!

답글 달기