BigQuery Streaming Buffer

개굴로그·2023년 3월 16일
0

오늘 대시보드 서비스를 하나 배포했다.
대시보드 하나를 위해서 진짜 수 많은 배포 작업이 있었다..

Debezium Connector, Kafka-Streams, google-flunetd, airflow, google log router, bigquery 등,,,

간단한 대시보드 하나 런칭한다고 매우 바빴다 :(


1. 문제

뭐 어쨋든 모든 배포를 끝내고 AIRFLOW가 잘 돌아가는지 확인하려고 했는데 아래와 같은 오류가 발생했다.

[2023-03-16, 15:16:27 KST] {base.py:68} INFO - Using connection ID 'bigquery-connection' for task execution.
[2023-03-16, 15:16:28 KST] {taskinstance.py:1909} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 171, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 189, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/cdc_dag_v2.py", line 123, in _merge_dw
    result = conn.query(query=query).result()
  File "/home/airflow/.local/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1499, in result
    do_get_result()
  File "/home/airflow/.local/lib/python3.8/site-packages/google/api_core/retry.py", line 283, in retry_wrapped_func
    return retry_target(
  File "/home/airflow/.local/lib/python3.8/site-packages/google/api_core/retry.py", line 190, in retry_target
    return target()
  File "/home/airflow/.local/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1489, in do_get_result
    super(QueryJob, self).result(retry=retry, timeout=timeout)
  File "/home/airflow/.local/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", line 728, in result
    return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/google/api_core/future/polling.py", line 137, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 UPDATE or DELETE statement over table GCP_PROJECT.DATASET.TABLE would affect rows in the streaming buffer, which is not supported

대충 에러를 해석해보자면, 내가 실행하려는 쿼리가 타겟이 되는 테이블의 streaming buffer에 영향을 끼칠 수도 있기에 쿼리를 실행시킬 수 없다는 것이다.


2. 상황

일단 에러에 대해서 깊게 파헤치기 전에, 내가 어떤 상황이였는지 먼저 설명하도록 하겠다.

Debezium

새로이 서비스가 배포되면서 MySQL에도 새로운 테이블이 생성되었다.
BE팀에서는 이전 데이터를 이 새로운 테이블에 마이그레이션을 진행하였다. 이에 따라, 새로운 테이블임에도 불구하고 옛 데이터들이 해당 테이블에 존재하였다.

이를 Debezium을 가지고 CDC를 하는데, snapshot.modeschema_only 로 설정하였다.
(나중에 이와 관련하여 공부를 해봐야겠다..)

기존에 존재하던 Debezium Connector의 Config 만 REST API(PUT)을 통해 업데이트 하는 방식이었다.
.

Kafka

내가 기대한 것은, 새로 생긴 테이블의 옛 데이터들 또한 Kafka의 Topic으로 들어오는 것이었다.

  1. 카테고리 정보를 담은 테이블
    1. 데이터의 추가 및 변경이 거의 일어나지 않는 테이블
    2. 그러나 초기에 마이그레이션 된 데이터들도 토픽에 쌓이지 않았다.
  2. 이력을 담은 테이블
    1. MySQL에서 집계한 카운트와 Kafka Topic에 쌓인 데이터의 수가 서로 달랐다.

카프카에 데이터가 쌓이지 않으면 대시보드가 제대로 나오지 않을 수 있다.


원래부터라면 이 파트부터 파헤쳐보아야 하는게 맞지만,
1. DB 마이그레이션 작업은 타 팀의 업무였다.
2. 운영 환경 배포 및 모니터링이 급했다.

그래서 다른 방법을 가지고 일단 대시보드의 기능 확인을 해보려 했다.
.

Bigquery Streaming Insert

사실 대시보드의 데이터는 Bigquery에 생성한 DW 테이블에서 가져오는 것이다.

Kafka에서 옛 데이터가 아닌 새로 들어오는 CDC 데이터들은 잘 들어오고 있었기에 DW 테이블에 옛 데이터를 밀어 넣어야겠다고 결론을 지었다.

Datagrip을 활용하여 MySQL에서 이력 데이터를 조회한 후 해당 데이터를 JSON 포맷으로 export 하였다.
그 Json 파일을 가지고 Bigquery의 Streaming Insert 기능을 활용하여 옛 데이터들을 모두 DW 테이블로 성공적으로 밀어넣었다.

그 결과, 대시보드에서는 엣 데이터들이 제대로 표현되었다.
.

Airflow

그리고 나서 1시간 짜리 Airflow 배치가 도는데, 저 위에 있던 에러가 발생했던 것이다...


3. BigQuery Streaming Buffer

위 그림은 BigQuery에서 Streaming Insert이 일어날 때 BigQuery 내부적인 컴포넌트 구조를 보여주는 것이다.
(아래 링크를 달아두었으니 자세한 내용은 거기서!)

Logical Table 부분을 보면 Steraming Buffer와 Columnar Storage가 보인다.

내가 밀어넣은 데이터는 Columnar Storage가 아니라, Streaming Buffer로 들어가 있는 상태인 것이다.

여기서 중요한건 Streaming Buffer에 데이터가 있을 때는 해당 테이블에는 다음과 같은 제약 사항이 생긴다.
1. UPDATE 및 DELETE가 안된다.
2. SELECT 및 INSERT만 가능하다.
3. INSERT의 경우 Buffer의 처리량을 초과할 경우 에러가 발생한다.


근데 에러가 난 이유는, 내가 돌리려고 했던 쿼리가 MERGE 쿼리였기 때문.. UPDATE도 하고 INSERT도 한다...

4. 해결

그러면 이를 해결하기 위해서는 Streaming Buffer가 비어있어야 한다는 것인데, 어떻게 비울까?

  1. 일정 시간이 지나서 버퍼가 비워질때까지 기다린다.
    1. 최소 시간(분) 동안 버퍼가 유지되고 그 이후에는 지워진다.
  2. 버퍼가 꽉차서 비워질때까지 기다린다.

그냥 기다렸다.. 오래 기다리지는 않았다.

기다리다 보니 해당 테이블의 버퍼가 비워진 것을 확인할 수 있었다.
(BigQuery UI에서 테이블 세부정보를 통해 확인할 수 있었다.)

그니까 결론적으로 내가 뭐 이 에러를 해결하기 위해 취한 액션은 없다는 거다~

하지만 배치가 뻑이 났으니 그 뻑난 부분은 센스있게 잘 처리했다!!!



5. 참고

  1. Life of a BigQuery streaming insert
  2. Streaming Insert

0개의 댓글