Kinesis를 활용한 데이터 수집 방법

Simon Kim·2022년 6월 30일
0

최근 이직한 회사에서 서비스 중인 아키텍처의 구조는 Serverless 기반의 아키텍처로 구성되어 있다.
(EC2 기반의 서비스 구성을 주로 경험한 나로썬 Serverless 기반 서비스 경험이 없어 정말로 많이 공부하고 있는중이다.ㅠㅠ)
주로 AWS 기반 완전 관리형 서비스로 구성되어 있다보니 Kinesis, ECS, AWS CI/CD, Monitoring Tools(X-ray, CloudTrail 등) 같은 서비스를 운영하는데 어려움이 있다.
(그래도 최대한 빠른 운영 이슈를 해결한 것으로 보아서는 이제는 많이 적응된 듯 하다.ㅠㅠ)
그래서인가.. 최근에 데이터 수집 API에 대한 아키택처 개선 미션을 받게 되었고, 이를 위하여 아키텍처에 대한 AS-IS / TO-BE를 분석하였다.


1. 기존 아키텍처 구조

대한민국에서 운영하고 있는 대부분의 서비스는 사용하는 서비스와 도구에 따라 세부 구성은 다를 수 있으나 전체적인 아키텍처는 아래 이미지와 같은 구조로 구성되어 있을 것이다. 물론 내가 담당하고 있는 아키텍처도 유사하다고 생각한다.

위의 아키텍처의 구성을 serverless로 구성되어 있는 서비스의 아키텍처는 아래 이미지처럼 구성되어 있으며, 처리 순서는 다음과 같다.

  • 클라이언트의 요청은 API Gateway에서 요청받는다.
  • API Gateway에서는 요청받은 데이터를 Queuing하여 비동기 처리를 요청하고, 클라이언트에게 처리 결과를 응답한다.
  • Queing된 메시지(데이터)는 Lambda에서 순차적으로 처리한다. 처리 시 데이터 유효성 검사 및 기본값 설정이 필요한 데이터를 가공한다.
  • Lambda에서 처리된 결과를 Kinesis Data Stream / firehose를 통해 S3의 객체로 저장한다.

처음 해당 아키텍처를 분석했을 때 구성이 잘되어 있어서 튜닝 포인트를 찾지 못했는데, 같이 일하고 있는 실무자님의 의견으로는 아래 두가지 이슈가 존재하여 아키텍처 튜닝이 필요하다는 의견을 주셨다.
1. 데이터 전송 프로세스를 보면 비효율적인 전송 프로세스가 존재하여 불필요한 비용 지출이 발생하고 있다.
2. 단순 데이터 수집인데, 대용량의 데이터가 아님에도 불구하고 간헐적으로 5xx 에러가 발생하여 원인 파악 및 대응이 어렵다.
(특히 API Gateway에서 504 Timed out 에러발생이 간헐적으로 발생한다.)

결론적으로, 비용 효율성 및 간헐적인 이슈사항을 명확하고 빠른 대응을 하기 위하여 중복처리가 되는 구성 요소를 통합하여 아키텍처를 좀더 예쁘게 만드는 것이 필요한 것이다.

이러한 의견을 접한 후 아키텍처를 좀더 자세하게분석해보니 SQS와 Lambda의 역활을 Kinesis 서비스 하나만으로도 충분히 처리가 가능한 것을 확인하였고, 데이터 수집을 Kinesis만을 이용하여 수집할 수 있도록 아키택처를 튜닝하기로 결정하였다.


AWS Kinesis

Kinesis는 AWS에서 제공하는 완전관리형 서비스로 실시간 데이터를 쉽게 수집/분석/처리 할 수 있도록 제공하고 있다. 따라서 Kinesis를 손쉽게 연결할 수 있도록 API Gateway에서 통합 기능으로 제공하고 있으며, SQS처럼 이벤트 방식의 비동기 처리를 수행하기 때문에 SQS와 동일한 역활을 수행할 수 있다.

튜닝 포인트를 결정했기 때문에 API Gateway에서 Kinesis로 다이랙트 연결할 수 있는 방법을 찾아보니 AWS에서 관련 가이드를 제공해주고 있어 해당 내용을 기준으로 진행하였다.
(https://docs.aws.amazon.com/ko_kr/apigateway/latest/developerguide/integrating-api-with-aws-services-kinesis.html)

사전준비. Resource 권한 설정
API Gateway에서 Kinesis작업을 직접 호출하기 위해서는 IAM에서 역활을 추가하여 API Gateway resource에 부여한다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:Get*",
                "kinesis:List*",
                "kinesis:Describe*"
            ],
            "Resource": "*"
        }
    ]
}     

1. API Gateway 생성

아래 이미지처럼 데이터 수집 API를 생성한 후 Method Execution에서 데이터를 전달할 AWS Service를 Kinesis로 설정하고, 사전에 준비한 권한 및 Action을 설정한다.

또한, Http Headers색션에서 헤더를 추가한다.

 Content-Type:'application/x-amz-json-1.1'

마지막으로, Integration Request의 Mapping Templates 색셩에서 매핑템플릿을 추가한다.
아래 Parameter는 Kinesis로 전달할 때 필수적인 요소들이며, application/json 형태로 요청해야 한다.

{
    "StreamName": "$input.params('stream-name')",
    "Data": "$util.base64Encode($input.json('$.Data'))",
    "PartitionKey": "$input.path('$.PartitionKey')"
}

2. Kinesis Data Stream 생성

API Gateway를 통해 데이터를 전달받을 Kinesis Data Stream을 생성한다. Kinesis Data Stream은 On demand와 provisioning 두가지 타입을 제공하기 때문에, 데이터 특성에 따라 타입을 선택하면 된다.

3. Kinesis firehose 생성

수집된 데이터를 S3에 저장하기 위하여 Delivery 서비스를 제공하는 firehose를 생성한다. firehose는 Data Stream에서 전달한 데이터를 원하는 형태로 가공 후 전달할 수 있는 매개체 역활을 한다. 나의 경우에는 수집 데이터를 1분 단위 parquet 파일로 묶어서 S3에 수집하려고 한다. Source는 Kinesis Data Stream으로 지정하고 Destination을 S3의 원하는 bucket으로 지정한다.

4. 결과

1,2,3번을 모두 진행하고 난 후 API Gateway에서 아래 이미지와 같이 테스트하면 요청한 데이터가 정상적으로 S3에 저장되는 것을 확인할 수 있다.


번외

1) 데이터 이슈로 인한 에러 발생 시 확인 방법
요청한 데이터의 유효성 이슈나 잘못된 데이터가 들어 올 경우 firehose에서는 Destination Error log를 별도로 남긴다. 이렇게 에러가 발생 시 원인을 쉽게 찾을 수 있고, 빠른 대응이 가능하게 된다.

2) firehose의 부가 기능
firehose에서는 Destination으로 convert하기 전에 데이터 transformation을 할 수 있도록 Transform source records with AWS Lambda 함수 기능을 제공하고 있다. 만약 요청 데이터에 대한 전처리가 필요하거나 유효성 검사 등 중간에 데이터에 대한 처리가 필요하다면 해당 기능을 적극 활용하면 된다.

profile
다양한 주제를 심플하고 명확하게 정리해 보려는 연차만 많은 IT 잡부입니다. 사람들과의 소통을 사랑합니다.~^^

0개의 댓글