AWS를 이용해 MSA 아키텍처 구축(구성) #2

임상규·2023년 10월 30일
1

AWS

목록 보기
28/33
post-thumbnail

OpenAPI를 활용한 REST API 만들기

OpenAPI
"개방된 API", 누구나 사용될 수 있도록 API의 엔드포인트가 개방

OpenAPI / OpenAPI Specification (OAS)
RESTful API를 기 정의된 규칙에 API spec을 json이나 yaml로 표현하는 방식
RESTful API 디자인에 대항 정의 표준
예전에는 Swagger 2.0와 같은 이름으로 불렸다가, 3.0 버전부터 OpenAPI 3.0 Specification으로 지칭

OpenAPI: 이전에 Swagger Specification으로 알려진 Specification 자체
Swagger: OpenAPI를 Implement하기 위한 도구

FastAPI는 API를 정의하기 위해 OpenAPI 표준을 사용하여 모든 API로 "스키마"를 생성

from fastapi import FastAPI
app = FastAPI()

@app.get("/")
async def root():
	return {"message": "Hello World"}

경로 매개변수

  • Python 형식 문자열에서 사용하는 것과 동일한 구문으로 경로 "매개 변수" 또는 "변수"를 선언
@app.get("/items/{item_id}")
async def read_item(item_id):
  • 표준 Python 유형 주석을 사용하여 함수에서 경로 매개변수의 유형을 선언
@app.get("/items/{item_id}")
async def read_item(item_id: int):

쿼리 매개변수

  • 경로 매개변수의 일부가 아닌 다른 함수 매개변수를 선언하면 자동으로 "쿼리" 매개변수로 해석
http://127.0.0.1:8000/items/?skip=0&limit=10
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
	return fake_items_db[skip : skip + limit]

요청 본문

  • 클라이언트(예: 브라우저)에서 API로 데이터를 보내야 하는 경우 요청 본문으로 보냄
  • 요청 본문은 클라이언트 API로 보낸 데이터
  • 응답 본문은 API가 클라이언트에 보내는 데이터
  • Pydantic에서 BaseModel 가져오기 → 데이터 모델 만들기 → 매개변수로 선언
from pydantic import BaseModel

class Item(BaseModel):
	name: str
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None
@app.pst("/items/")
async def create_item(item: Item):
	return item

Models와 MySQL 연동

RDS 생성

기본설정은 위와 같이 구성하고 VPC 설정과 보안그룹 설정을 해주었다.

데이터베이스 생성

MySQl Workbench를 이용하여 엔드포인트를 통해 RDS에 연결하였다.

  1. 아래 쿼리를 실행하여 데이터베이스를 생성 후, 데이터베이스를 선택
SHOW DATABASES;
CREATE DATABASE my_msa_db;
USE my_msa_db;
  1. 아래 쿼리를 실행하여 tb_product 테이블 생성
CREATE TABLE tb _product ( product_ id INT PRIMARY KEY AUTO_ INCREMENT, product_img
VARCHAR(50), product_ name VARCHAR(50), product_ desc VARCHAR(50), price INT, delivery_fee
INT, uploaded DATETIME, seller INT);
  1. 모카루에 접속하여 더미데이터 생성
  2. 같은 방식으로 tb_user, tb_order 테이블을 생성하고 더미데이터 삽입

MySQL 데이터베이스 연결

종속성 설치

pip install sqlalchemy
pip install pymysql

Requirement.txt 업데이트

pydantic==1.10.4
sqlalchemy==1.4.46
pymysql==1.0.2

Docker 빌드

docker build -t order-app --platform linux/amd64 .

app 디렉토리에 database.py 파일 생성 후, 아래 코드 작성

from sqlalchemy import create_engine 
from sqlalchemy.ext.declarative import declarative base 
from salalchemy.orm import sessionmaker

DATABASE_URL = 'mysq|+pymysq|://<db-user>:<db-password>@<db-cluster-endpoint>:3306/my_msa_db'

engine = create engine(
	DATABASE URL
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative base()

➡️ 'engine = create_engine(DATABASE_URL)': 데이터베이스 연결 URL을 사용하여
   SQLAlchemy 엔진을 생성한다.
   이 엔진은 데이터베이스와의 연결을 나타내며 모든 데이터베이스 작업을 처리한다.
   SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine):
   sessionmaker 클래스를 사용하여 세션 생성기를 만든다.
   이 세션 생성기는 데이터베이스와의 세션을 생성할 때 사용된다.
   autocommit 및 autoflush 매개 변수는 세션 동작을 제어하는 데 사용되고, bind 매개 변수는
   이 세션을 어떤 엔진과 연결할 것인지 지정한다. 여기서는 위에서 생성한 engine을 사용한다.
   Base = declarative_base(): SQLAlchemy의 declarative_base 클래스를 사용하여
   데이터베이스 모델을 정의할 기본 클래스를 생성한다.
   이 클래스를 상속하여 데이터베이스 테이블을 정의할 수 있습니다.

주문 애플리케이션

  1. models 디렉토리 추가
  2. models 디렉토리에 tb_product 파일 생성하고 아래 코드 작성
from sqlalchemy import Column, TEXT, INT, DATETIME
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class TbProduct(Base):
    __tablename__ = "tb_product"

    product_id = Column(INT, nullable=False, autoincrement=True, primary_key=True)
    product_img = Column(TEXT, nullable=True)
    product_name = Column(TEXT, nullable=True)
    product_desc = Column(TEXT, nullable=True)
    price = Column(INT, nullable=True)
    delivery_fee = Column(INT, nullable=True)
    uploaded = Column(DATETIME, nullable=True)
    seller = Column(INT, nullable=True)
  1. schemas 디렉토리 추가
  2. schemas 디렉토리에 product.py 파일 생성하고 아래 코드 작성
from typing import Optional
from pydantic import BaseModel
from datetime import datetime

class Product(BaseModel):
    product_id : int
    product_img : str
    product_name : str
    product_desc : Optional[str] = None
    price : int
    delivery_fee : int
    uploaded : datetime
    seller : int

    class Config:
        orm_mode = True
  1. crud 디렉토리 추가
  2. crud 디렉토리에 prouct_crud.py 파일 생성하고 아래 코드 작성
from sqlalchemy.orm import Session
from app.models.tb_product import TbProduct

def read_products(db: Session):
    return db.query(TbProduct).all()
  1. 같은 방식으로 tb_order.py, order.py, order_crud.py 생성하고 코드 작성
  • order_crud.py
import uuid
from datetime import datetime

from sqlalchemy.orm import Session
from app.models.tb_order import TbOrder
from app.schemas.order import OrderCreate

def create_order(order: OrderCreate, db):
    order.order_id = str(uuid.uuid1())

    created = datetime.now()
    order.created = created.strftime("%Y-%m-%d %H:%M:%s")

    db_order = TbOrder(**order.dict())
    db.add(db_order)
    db.commit()
    db.refresh(db_order)

    return db_order
  • tb_order.py
from sqlalchemy import Column, TEXT, INT, DATETIME
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class TbOrder(Base):
    __tablename__ = "tb_order"

    order_id = Column(TEXT, nullable=False, primary_key=True)
    product_id = Column(INT, nullable=False)
    user_id = Column(INT, nullable=False)
    created = Column(DATETIME, nullable=True)%
  • order.py
import uuid
from datetime import datetime

from typing import Optional
from pydantic import BaseModel

class Order(BaseModel):
    order_id : Optional[str] = str(uuid.uuid1())
    product_id : int
    user_id : int
    created : Optional[str] = datetime.now().strftime("%Y-%m-%d %H:%M:%s")

    class Config:
        orm_mode = True

class OrderCreate(Order):
    pass

Amazon MQ

MQ란?: Amazon MQ

Amazon MQ 브로커 생성

브로커 엔진은 RabbitMQ로 생성하였다.

배포 모드는 단일 인스턴스 브로커로 설정하였고

위와 같이 구성하였다.

MQ 브로커에 애플리케이션 연결

AMQP 프로토콜을 사용해서 Message Queue와 통신할 계획이다.

AMQP: 클라이언트가 서버에 요청을 보낼 수 있고 서버가 클라이언트에 요청을 보낼 수 있는
    양방향 RPC 프로토콜

또, RabbitMQ를 연동하기위해 Pika 라이브러리를 사용할 것이다.

Pika

Python RPC 클라이언트
Python에서 AMQP 0-9-1 프로토콜을 사용하게 해주는 라이브러리
Pika는 각 비동기 연결 어댑터에서 IO루프를 구현하거나 확장

  • 특정 이벤트가 완료될 때 호출하려는 콜백 메서드를 전달하려는 Pika와 비동기식으로 인터페이스
  • pika.credentials 모듈은 클래스가 생성될 때, 사용자 이름과 암호를
    ConnectionParmeter 클래스에 전달하는 메커니즘 제공
  • 연결 정보를 연결 어댑터로 전달할 수 있는 두 가지 유형의 연결 매개변수 클래스
  • RabbitMQ 2.0부터 클라이언트 측 Channel.Flow가 제거됨

주문 애플리케이션

  1. 종속성 추가
pip install pika
  1. Requirement.txt 업데이트
pika==1.3.1
  1. Docker 빌드
docker build -t order-app --platform linux/amd64 .
  1. connection 관련 파일을 위한 conn 폴더를 생성
mkdir conn
  1. pika_client.py 파일 생성 후 아래 코드를 작성
import pika
import json

from app.schemas.order import Order, OrderCreate
from fastapi.encoders import jsonable_encoder

rabbitmq_user = '<user>'
rabbitmq_password = '<password>'
rabbitmq_broker_id = '<broker-id>'
region = 'ap-northeast-2'
MQ_URL = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671"

params = pika.URLParameters(MQ_URL)

class PikaClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(params)
        self.channel = self.connection.channel()


    def is_open(self):
        return self.channel.is_open

➡️ 'user', 'password', 'broker-id'를 입력한다.

  1. main.py에 아래코드 추가
from .conn.pika_client import PikaClient

...

mq_client = PikaClient()


...

@app.get("/is_open_channel")
def is_open_channel():
    return mq_client.is_open()

Rest API - 연결테스트

MQ를 활용한 이벤트 발행, 구독 테스트

아키텍처

주문 페이지에서 REST Call을 통해서 주문이 발생하게 되면 order-app(back-end)에서는
pika client를 통해서 Amazon MQ (Rabbit MQ)에 메시지를 동기적으로 전달.

그러면 관리자 페이지에서 주문 내역을 실시간으로 업데이트 하기 위해 WebSocket을 통해서
Front-end / Backe-end 간 양방향 통신을 지원.

주문 애플리케이션

  1. 주문 이벤트를 발행하기 위해 pika_client.py에 아래 코드 추가
def pub_order(self, orderCreare):
        queue = 'orders'

        self.channel.queue_declare(queue)

        messgage = json.dumps(jsonable_encoder(orderCreare))

        self.channel.basic_publish(
            exchange='',
            routing_key=queue,
            body=messgage
        )

        return messgage

➡️ 채널을 통해 연결할 Queue를 선언, Publisher 함수를 통해서 지정한 큐에 주문 내역을
   Json String으로 인코딩해서 이벤트 발행.

  1. 상품 주문 API 호출 시,
    1) Amazon RDS에 주문 내역을 입력하고
    2) Amazon MQ에 이벤트를 발행한다.
main.py

@app.post("/order")
def create_order(order: order.OrderCreate, db: Session = Depends(get_db)):
     order_crud.create_order(order, db)

     order.order_id = str(uuid.uuid1())
     order.created = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

    new_order = order_crud.get_order(order, db)
    return mq_client.pub_order(new_order)

관리자 애플리케이션

  1. 비동기 처리를 위해 종속성들을 설치
pip install asyncio
pip install aio-pika
pip install websockets
  1. Requirements.txt 업데이트
asyncio==3.4.3
aio-pika==8.3.0
websockets==10.4
  1. Docker 빌드
docker build -t admin-app
  1. 주문 이벤트를 구독하기 위해 main.py에 아래 코드 작성
@app.websocket("/admin/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    connection = await connect(MQ_URL)
    channel = await connection.channel()

    await channel.set_qos(prefetch_count=1)

    queue = await channel.declare_queue('orders')

    async def on_message(message: IncomingMessage):
        async with message.process():
            await websocket.send_text(f"{message.body}")

    await queue.consume(on_message)

    while True:
        await queue.consume(on_message)
        await asyncio.sleep(1)

➡️ Fast API는 다음과 같은 단순한 형태로 WebSocket을 지원한다.
   기존의 WebSocket 어노테이션을 사용하고 함수의 인자로 WebSocket을 선언하면
   웹소켓을 위한 설정이 끝난다.
   aio-pika 라이브러리로 주문 페이지와 동일하게 커넥션을 만들고 채널을 가져온다.
   Queue를 선언해주고 on-message 함수를 통해서 메세지가 컨슘될 때 콜백을 처리할 수 있다.
   기존에 사용하지 않았던 지시자들이 2개가 사용되는데 async, awiat 이다.
   async는 해당함수를 비동기로 처리하고 await는 해당 구문이 처리될 때 까지
   해당 함수의 실행을 기다린다.

이벤트 발행, 구독 테스트

  1. 먼저 주문페이지를 ECR에 업데이트하고 ECS에서 해당 도커 프로세스를 종료한다.
    ➡️ 서비스는 해당 작업에 레플리케이션을 유지하기위해서 자동으로 재시작하게 되고 latest 태그를 통해서 새로 반영된 이미지로 컨테이너를 새로 띄운다.
    1) ECR 업데이트는 리포지토리 생성 이 링크에서 생성했던 것과 동일하게 푸쉬해준다.
    2) ECS 클러스터로 들어가 실행중인 태스크를 중지하게 되면 자동으로 재시작 된다.
  2. docker-compose up 명령어를 통해 관리자 페이지를 실행한다.
  3. ALB 엔드포인트를 통해 접근한 주문 페이지에서 상품을 주문한다.
  4. 관리자 페이지 > 주문 내역에서 페이지를 새로 고침하지 않고 업데이트 되는것을 확인한다.
profile
Cloud Engineer / DevOps Engineer

0개의 댓글