[AWS] Bulk data DB 삽입하기

그냥·2022년 8월 1일
0

AWS

목록 보기
9/9

이전에 Lambda - SQS 메시지 보내기에 대해서 알아보았다. 이번 포스팅에서는 Lambda - SQS - Lambda - DB로 연결하는 방법과 한 번에 많은 데이터(Bulk data)를 DB에 삽입하는 방법에 대해서 알아보려고 한다.


1. SQS - Trigger Lambda 연결

이전에는 Lambda > SQS 방향으로 연결했다면, 이번에는 SQS > Lambda(Lambda Consumer) 방향으로 연결하는 하는 것에 대해서 알아보자.

그리고 Bulk Data를 Lambda Consumer에서 처리하는 방법에 대해서도 알아보자.

1) SQS - Lambda 연결

  • Lambda 함수에 접속한다.
  • [함수 개요] 안에 있는 [+ 트리거 추가]를 클릭한다.
  • [트리거 구성]에서 SQS를 검색해서 선택한다.
  • SQS 대기열은 연결할 대기열을 클릭한다.
  • 배치 크기는 10개로 한다.
  • [연결]을 클릭한다.
  • 연결되면 함수개요에 [SQS]가 추가된 것을 확인할 수 있다.

위와 같이 SQS 트리거로 Lambda가 연결되었을 때 코드의 lambda_handler(event, context) 함수에서 event로 SQS의 Record가 들어온다.

Record의 양식은 아래 url에서 확인가능하다.

Amazon SQS에서 Lambda 사용



2. Lambda Consumer

SQS 큐의 Record를 받는 Lambda를 Lambda Consumer라고 한다. 이는 위에서 SQS의 Trigger로 연결된 Lambda이다. 해당 Lambda에서는 Q에서 전달된 Bulk Data를 DB에 보내는 방법에 대해서 알아보고자 한다.


1) cursor.executemany

cursor.executemany는 쿼리문 중 value에 들어가는 값들을 시퀀스 데이터 형태로 받아서 한 번에 DB로 전달하는 메서드이다. 해당 메서드를 사용하여 SQS에서 받은 Bulk Data를 DB에 전달하고자 한다.

Syntax

cursor.executemany(쿼리문, 시퀀스형 파라미터)

예제

data = [
  ('Jane', date(2005, 2, 12)),
  ('Joe', date(2006, 5, 23)),
  ('John', date(2010, 10, 3)),
]
stmt = "INSERT INTO employees (first_name, hire_date) VALUES (%s, %s)"
cursor.executemany(stmt, data)
  • ('Jane', date(2005, 2, 12)) 처럼 튜플로, 혹은 리스트로 값을 넣어도 된다.

-->

예제의 결과

INSERT INTO employees (first_name, hire_date)
VALUES ('Jane', '2005-02-12'), ('Joe', '2006-05-23'), ('John', '2010-10-03')

2) 코드

import json
import pymysql
import dbinfo
import pymysql

def lambda_handler(event, context):

	# 1
    connection = pymysql.connect(
    host = dbinfo.db_host,
    user = dbinfo.db_username,
    passwd = dbinfo.db_password,
    db = dbinfo.db_name,
    port = dbinfo.db_port
    )
    cursor = connection.cursor()
    
    # 2
    data = []
    i = 0
    
    # 3
    for record in event['Records']:
        i += 1
        payload = json.loads(record['body'])
        values = [
            str(payload["ip"]), 
            str(payload["now"]), 
            int(payload["random_number"])
        ]

        data.append(values)
        
        if i % 1000 == 0:
            query = "INSERT INTO ips(ip, created_at, random_number) values(%s,%s,%d)"
            cursor.executemany(query, data)
            connection.commit()
            data = []
    # 4
    query = "INSERT INTO ips(ip, created_at, random_number) values(%s,%s,%s)"
    cursor.executemany(query, data)
    connection.commit()
    connection.close()
# 1
    connection = pymysql.connect(
    host = dbinfo.db_host,
    user = dbinfo.db_username,
    passwd = dbinfo.db_password,
    db = dbinfo.db_name,
    port = dbinfo.db_port
    )
    cursor = connection.cursor()
 
- pymysql을 사용해서 DB와 연동하는 코드이다.
# 2
    data = []
    i = 0
    
- data : 이후 bulk로 value 값을 만들 떄 필요한 리스트를 미리 선언한 것이다.
- i : for문을 돌려서 i가 N일 때 Bulk Data를 DB에 넣을 것이다. 
      그 때 필요한 i 값을 미리 선언한 것이다.
# 3
for record in event['Records']:
        i += 1
        payload = json.loads(record['body'])
        values = [
            str(payload["ip"]), 
            str(payload["now"]), 
            int(payload["random_number"])
        ]

        data.append(values)
        
        if i % 1000 == 0:
            query = "INSERT INTO ips(ip, created_at, random_number) values(%s,%s,%d)"
            cursor.executemany(query, data)
            connection.commit()
            data = []
            
- event['Records']: SQS에서 오는 값에서 필요한 값을 뽑아 내기 위한 인덱싱 작업이다. 
  만약 Q에 10개의 데이터가 있었을 때 event['Records'] 밑에 10개의 record가 있다., 해당 for문은 10개 record에서 하나씩 뽑아서 데이터를 뽑는 코드이다.
- payload: json 값으로 되어 있는 record['body']dict로 변환
- values: 거기서 post로 날린 body 값을 추출하여 values 안에 넣음
- data.append(values): for문 바깥에서 선언한 data에 values를 append., 데이터들을 하나의 리스트에 모으는 작업이다.
 
- if i % 1000 = 0: i가 1000의 배수일 때마다 아래 로직이 실행
- cursor.executemany(query, data): for문을 돌면서 만들어진 
  시퀀스 데이터가 query에 넣어져 쿼리문이 실행됨.
- connection.commit(), data = []: 쿼리 실행을 저장하고, data를 초기화 해줌
# 4
query = "INSERT INTO ips(ip, created_at, random_number) values(%s,%s,%s)"
cursor.executemany(query, data)
connection.commit()
connection.close()

- 만약 i가 1000의 배수가 아니라서 for문이 끝났을 때 
  나머지 data를 DB에 넣어주는 코드이다.

0개의 댓글