[AWS] AWS를 이용해 데이터 처리를 해보자! - 2

초이지수·2022년 6월 16일
0

PROJECT

목록 보기
6/9

🤸‍♂️ SQS와 Lambda Function을 활용해 메시지 등록, 조회 처리 (Batch Insert)를 해보자!


👻 시작 전에 참고!

🌵 SQS(Simple Queue Service)

마이크로 서비스와 분산 시스템, 서버리스 애플리케이션을 쉽게 분리하고 확장할 수 있는 완전관리형 메시지 대기열 서비스

🌵 Data batch process(SQS)를 사용하는 이유?

쉽게 비동기처리라고 생각하면 된다!

대용량 데이터를 주고

  1. 서버로 데이터를 요청했을때 서버가 언제 그 요청에대한 응답을 줄지 모르는데 마냥 기다릴 수 없고
  2. 제한시간 내에 처리할 수 있는 데이터는 한정적이기 때문에,

리턴으로 API Gateway에 첫번째 람다가 잘 받았다는 연락을 먼저 해주고
두번째 람다가 받은 대용량 데이터를 worker들이 동등하게 일처리 할 수 있게 로직을 짜는 것!

람다 함수 한 개로 한 번에 데이터를 우르르 보내는 게 아니라, SQS를 통해 한 줄서기로 보내주면 worker들이 동등하게 일처리를 하고 제한 시간내에 안정적으로 메시지 처리를 할 수 있다!


👻 구현해야하는 로직!

팀원과,, 함께 그림으로 그려가면서 로직을 파악했다!

🙋‍♀️ 1. A람다

(1) code 작성

import json
import boto3

def lambda_handler(event, context):
    client = boto3.client("sqs")
    
    ip = event['requestContext']['identity']['sourceIp']
    request_time = event['requestContext']['requestTime']
    
    response = client.send_message(
        QueueUrl = "https://sqs.ap-northeast-2.amazonaws.com/703474273090/load-test",
        MessageBody = json.dumps((ip, request_time))
    )
            
    return {
        'statusCode': response["ResponseMetadata"]["HTTPStatusCode"],
        'body': json.dumps(response['ResponseMetadata']['HTTPStatusCode'])
    }

(2) REST API 게이트웨이 트리거로 걸기


🙋‍♀️ 2. B람다

(1) code 작성

  • lambda 코드
import psycopg2
import sys
import boto3
import os
import json

conn = psycopg2.connect( 
    host = os.environ['HOST'],
    dbname = os.environ['DBNAME'],
    user = os.environ['USER'],
    password = os.environ['PASSWORD'],
    port = os.environ['PORT'],
    )

def lambda_handler(event, context):
    val = []
    records = event['Records']
    for record in records:
        val.append(json.loads(record['body']))
    
    print(val)

    cursor = conn.cursor()
    
    psl = "INSERT INTO lambda_logs (ip_address, access_time) VALUES (%s, %s)"

    cursor.executemany(psl, val)
    conn.commit()
    
    return{
        'statusCode':200,
        'body':json.dumps('success')
    }

(2) SQS 트리거로 걸기

환경변수 설정하기!

  • DBNAME = RDS 데이터베이스 이름
  • HOST = RDS
  • PASSWORD = RDS 만들 때 내가 설정한 비밀번호
  • PORT = RDS 내가 설정한 포트
  • USER = RDS 만들 때 설정한 username

(3) B람다 함수에 레이어 추가!

Lambda 함수에서 import 지원하지 않는 모듈 설정
저번에 했던 거 추가해주기!

(4) IAM 권한 추가

  • 역할 - B람다 선택 - 권한추가 - AmazonSQSFullAccess

(5) Test 코드 event에서 긁어서 테스트해보기!

{
  "Records": [
    {
      "messageId": "6d0cf2d8-348c-49e5-8537-a81ca2af0846",
      "receiptHandle": "AQEBnrZStxAoC6BV0iHyrP624JPso87YpjQgSYJ8i8ul98mbw7dEXtfbha9cQZ21kCnfrv6bt333hK/t9SZgRnwXXuo8dkg4732vn2xNMKM8xv7DCWzXDLam55XLJzq3UiQFIkZdANJhg/Yf+x3sLkRtnz4d7k9ZUj98i3+hHRCD5+WDaoLUzEoxzgASul7ccyOp75iskDNoi5oLvmTeeTHoPXtuw4QBsQ7dxeiIAv3OusJDk3fcmxSRQdKK5DqZxnzD4rKfZl8vU4241JGMzKNVmR3u9Tq15xZ6HSCPDaufCK0idgHJnZnh90Om/3HsIyBSXV1+cQZ9yBMTdyh6q9QpR5P2SQBgPJyBFmwdoQStvOpcJgmAPHi2bZf+OlmcU3OCvXjV+NEo5oUqhbgME8COrw==",
      "body": "{\"client_ip\": \"127.0.0.1\", \"request_time\": \"09/Apr/2015:12:34:56 +0000\"}",
      "attributes": {
        "ApproximateReceiveCount": "6",
        "SentTimestamp": "1655103586901",
        "SenderId": "AROA2HSSVRNBCIQSBSLDZ:logs-3",
        "ApproximateFirstReceiveTimestamp": "1655103586901"
      },
      "messageAttributes": {},
      "md5OfBody": "dfabdcc0018f9979c421515d718bbe6d",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:ap-northeast-2:703474273090:logs",
      "awsRegion": "ap-northeast-2"
    }
  ]
}

🙋‍♀️ 3. 로컬에서 SQS 대용량 메시지 테스트해보기!

import requests
from random import random
from concurrent.futures import ThreadPoolExecutor

def post_url(args):
    return requests.post(args[0], data=args[1])

for i in range(10):
    form_data = {"ikria":random()}
    list_of_urls = [("https://ghcrol2iwf.execute-api.ap-northeast-2.amazonaws.com/default/logs-3",form_data)]*1000

    with ThreadPoolExecutor(max_workers=20) as pool:
        response_list = list(pool.map(post_url,list_of_urls))

    for response in response_list:
        print(response)

    print(f"{(i+1)*1000}번 성공")

for문이 아닌 한 번에 대량으로 메시지가 들어갈 수 있게 코드를 짜야한다!
우리는 한 번에 10,000개의 메세지가 들어가도록 코드를 작성했다!


✔️ AWS 실습하면서 제일 중요한 점 세가지!

  1. 결제 대시보드 최소! 3시간에 한 번씩 확인하기. EC2 리전은 꼭 한 지역만!!!!!!
  2. 1년 지나기 전에 결제로 등록한 카드 아예 없애버리기
  3. 비밀번호는 기본 비밀번호가 아닌

참고한 사이트

https://www.youtube.com/watch?v=nhEFJgIhvuk
http://labs.brandi.co.kr/2018/02/16/leesg.html

profile
닫혀 있어서 벽인 줄 알고 있지만, 사실은 문이다.

0개의 댓글