220902

HyeonKi Jo·2022년 9월 2일
0

오늘의 할일

  • Pipeline 구축하기
  • 계정 합치기

먼저 인프라 구축하기

ECR 생성

  • Lambda container Image를 업로드할 ECR 프라이빗 리포지토리 2개 생성

이미지 업로드


  • ImageProcess, TextMining 두 이미지 모두 업로드했다.

S3 생성


객체 잠금

  • 이번에는 객체 잠금을 활성화 해본다.
  • GCP Vision API를 잠금하고 읽기만 가능하도록 해본다.
  • 객체 잠금을 활성화하면 추가 구성을 설정해야한다.
  • 새 객체를 보호하지 않고, Credential만 보호할 예정이라 기본 보존옵션을 비활성화한다.
  • 이미지가 업로드 될 Images폴더와 GCP Vision API Credential.json을 업로드해줬다.
  • GCP Credential을 객체 잠금해준다.
  • 보존 모드를 거버넌스모드로 설정해서 특정한 상황에서는 지우거나 변경할 수 있도록 설정해준다.
  • 규정 준수 모드는 보존기간동안 아무도 삭제할 수 없게된다. 오직 계정을 닫는 방법 뿐인것 같다.

RDS 생성

  • MYSQL 8.0.23버전을 사용한다.
    • 8.0.23버전까지 AuroraDB로 Migration이 가능하다.
  • RDS를 프리티어로 생성해준다.
  • 퍼블릭 액세스를 해제해준다.
  • 가용 영역을 ap-northeast-2c로 설정한다.
  • 이후, TextProcess Lambda function에서 VPC를 연결해줄 때, 이 RDS 가용영역과 맞춰줘야한다.
  • 위 연결되어있는 보안그룹은 3306포트가 열려있는 보안그룹이다.
  • 초기 데이터베이스를 생성해준다.

Lambda함수 생성


  • 위와같이 두 Lambdafunction 을 생성해주었다.

ImageProcess function 설정

트리거

  • S3에 모든 object create events에 트리거를 연결해준다.
  • Images/ 로 Prefix를 줘서, Images폴더에 업로드되는 이미지만 처리하도록 한다.
  • 또, Suffix에 .jpg를 줘서 jpg파일만 처리하도록 한다.
  • 설정완료

대상

  • 비동기식 호출로 대상을 호출한다.
    • 스트림 호출은 함수에 매핑된 Kinesis 또는 DynamoDB스트림을 호출할 수 있다.
    • 또, 실패시에만 호출되는 것 같다.
  • 우리는 Lambda함수가 성공했을때 호출해야한다.
  • 대상을 Lambda함수로, TextProcess를 호출해준다.
  • 최종 모습

구성

  • 기본 설정에서 제한시간을 1분으로 준다.

TextProcess

일반구성

  • 제한시간을 1분으로 늘려준다.

VPC 연결

  • 아까 RDS에 연결했던 가용영역에 맞춰서 Public, Private 서브넷을 연결해줬다.
  • Private서브넷만 연결해줘도 되지만, 가용성의 이유로 2개의 서브넷을 선택해줘야되기 때문에 둘다 연결해 준 것이다.

Code Pipeline 구축

CodeCommit

CodeCommit 생성

  • CodeCommit Repository 생성
  • 각각의 HTTPS URL을 복제해서 push 해준다.
  • git clone <HTTPS URL>
  • git add .
  • git commit -m "<Message>"
  • git push
  • (만약 branch 문제로 안된다면 git push origin <branch>)
  • ImageProcess_repo 업로드
  • TextProcess_repo 업로드

CodeCommit에서 변경사항 체크리스트

ImageProcess

  • app.py에서 GCP Vision API Credential 이름 및 위치
  • buildspec.yaml에서 ECR 푸시명령어와 Lambda 이미지 배포 명령어
    • 주로 ECR 이름, Lambdafunction 이름을 확인하면 된다.

TextProcess

  • buildspec.yaml에서의 ECR 및 Lambdafunc 이름과 명령어 확인
  • Dockerfile에서의 환경변수 확인
    • 특히 RDS 엔드포인트를 확인해줘야 한다.

CodeBuild

  • 빌드 프로젝트를 생성해준다.

Image Process



TextProcess




  • 빌드 성공

Codebuild Test

  • 이미지를 업로드 해봤다.
  • ImageProcess 작업완료
  • TextProcess Error발생
  • 그러나 SQL오류로 보이고, 이전 TextProcessing은 잘되었다.

에러 확인

  • rds_config.py에서 Nutrition에 오타가 있었다.
  • rds_connect.py에서도 에러출력 코드를 더해 다시 push, build한다.

rds_config.py

#config file containing credentials for RDS MySQL instance
# for test
import os
rds_endpoint = os.environ['RDS_ENDPOINT']
db_username = os.environ['USERNAME']
db_password = os.environ['PASSWORD']
db_name = os.environ['DB_NAME']
table_name = os.environ['TABLE_NAME']

rds_keys={
    'Item_id': ['', 'varchar(200)'],
    'Item_URL': ['', 'varchar(200)'],
    'Item_key': ['',  'varchar(200)'],
    '계란': ['0', 'boolean default 0'],
    '우유': ['0', 'boolean default 0'],
    '땅콩': ['0', 'boolean default 0'],
    '견과류': ['0', 'boolean default 0'],
    '밀': ['0', 'boolean default 0'],
    '갑각류': ['0', 'boolean default 0'],
    '대두': ['0', 'boolean default 0'],
    '메밀': ['0', 'boolean default 0'],
    '육류': ['0', 'boolean default 0'],
    '생선': ['0', 'boolean default 0'],
    '과일': ['0', 'boolean default 0'],
    'Nutrition': ['', 'varchar(1000)'],
    'Ingredient' : ['', 'varchar(2000)']
}

rds_connect.py

def dict_to_query(dic):
    keys, vals = [], []

    for key, val in dic.items():
        if(val):
            keys.append(key)
            vals.append('\"'+str(val)+('\"')) 

    return ((', ').join(keys), (', ').join(vals))


def Insert_RDS(text_data, ITEM_KEY, bucket):
    import json
    import sys
    import logging
    import rds_config
    import pymysql
    import os
    #rds settings
    rds_endpoint  = rds_config.rds_endpoint
    name = rds_config.db_username
    password = rds_config.db_password
    db_name = rds_config.db_name
    table_name = rds_config.table_name

    # dict_keys(['Item_id', 'Item_URL', 'Item_key', '계란', '우유', '땅콩', '견과류', '밀', '갑각류', '대두', '메밀', '육류', '생선', '과일', 'Nutirition', 'Ingredient']
    item_dict = {k:v[0] for k, v in rds_config.rds_keys.items()}

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # Connect to RDS
    try:
        conn = pymysql.connect(host=rds_endpoint, user=name, passwd=password, db=db_name, connect_timeout=5)
        print('connected')
    except pymysql.MySQLError as e:
        logger.error("ERROR: Unexpected error: Could not connect to MySQL instance.")
        logger.error(e)
        sys.exit(1)

    logger.info("SUCCESS: Connection to RDS MySQL instance succeeded")


    #Init Table
    try:
        ### CREATE TABLE
        sql_create_option = []
        for k, v in rds_config.rds_keys.items():
            sql_create_option.append(k+' '+v[1])
        sql_create_option = (', ').join(sql_create_option)
        print('init table :',sql_create_option )
        with conn.cursor() as cur:
            cur.execute("create table if not exists "+table_name+" ( "+sql_create_option+" )")
            conn.commit()
        print('init table successed')
    except pymysql.MySQLError as e:
        logger.error("ERROR: Init Table Error")
        logger.error(e)
        sys.exit(2)


    #Set Item_ID
    item_dict['Item_id'] = ITEM_KEY.split('/')[-1]
    print(item_dict['Item_id'])

    '''
    #Init ROW
    data_keys, data_vals = dict_to_query(item_dict)
    try:
        with conn.cursor() as cur:
            cur.execute('insert into '+table_name+' ('+data_keys+') values('+data_vals+') WHERE NOT EXIST (SELECT Item_id FROM '+table_name+' WHERE Item_id = '+item_dict['Item_id']+';')
            conn.commit()
        print("Added items from RDS MySQL table")

    except:
        logger.error("ERROR: Init Row Error")
        sys.exit(3)
    '''

    # Item INSERT
    ## fit text_data to item_dict
    for k, v in text_data.allergy_dict.items():
        item_dict[k] = v

    item_dict['Nutrition'] = ('|').join(text_data.nutrition_list)
    item_dict['Ingredient'] = ('|').join(text_data.ingredient_list)
    print(item_dict)

    data_keys, data_vals = dict_to_query(item_dict)
    print('insert into '+table_name+' ('+data_keys+') values('+data_vals+')')

    try:
        with conn.cursor() as cur:
            cur.execute('insert into '+table_name+' ('+data_keys+') values('+data_vals+')')
            conn.commit()
        print("Added items from RDS MySQL table")

    except pymysql.err.InternalError as e:
    	code, msg = e.args
    	logger.error("ERROR: Insert Fail. Code:",code," message:", msg)
    	sys.exit(2)

  • Item이 잘 올라갔다.
  • Mysql에서도 확인할 수 있다.

Code Pipeline

Codepipeline 생성

ImageProcess





  • 파이프라인 생성 성공

TextProcess





  • 파이프라인 생성 성공

Pipeline 테스트

ImageProcess

  • git push
  • CodeCommit에 새로 push하니 Pipeline도 빌드중이다.

TextProcess

  • 벌서 ImageProcess가 완료됬고, TextProcess가 작업중이다.
  • 실행 완료

이미지 업로드

  • ImageProcess에 추가로 입력한 출력을 볼 수 있다.
  • TextProcess또한 추가로 입력한 출력이 나왔고, RDS에 잘 들어갔다.

계정 합칠때를 대비해 권한 정리하기

사용자 MLops

AWS 관리형 정책

  • AmazonEC2ContainerRegistryPowerUser
  • IAMFullAccess
  • SecretsManagerReadWrite
  • AWSCodeBuildAdminAccess
  • AWSCodeCommitFullAccess
  • AWSCloud9Administrator
  • AmazonS3FullAccess
  • AWSCodePipeline_FullAccess
  • AWSLambda_FullAccess

추가한 정책 (ECR_CREATE_and_REPLICA)

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ecr:CreateRepository",
                "ecr:ReplicateImage",
                "ecr:SetRepositoryPolicy"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:DescribeLogStreams"
            ],
            "Resource": "*"
        }
    ]
}

my-s3-role-for-lambda

AWS 관리형 정책

추가 정책

my-s3-policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents",
                "logs:CreateLogGroup",
                "logs:CreateLogStream"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": "arn:aws:s3:::s3.cocudeny/*"
        }
    ]
}

lambda-vpc-policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeNetworkInterfaces",
                "ec2:CreateNetworkInterface",
                "ec2:DeleteNetworkInterface",
                "ec2:DescribeInstances",
                "ec2:AttachNetworkInterface"
            ],
            "Resource": "*"
        }
    ]
}

codebuild-Image_Processing-service-role

AWS관리형 정책

  • AmazonEC2ContainerRegistryPowerUser
  • AWSLambdaBasicExecutionRole

추가 정책

S3-to-RDS-CICD-role

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "ecr:SetRepositoryPolicy",
                "ecr:GetRepositoryPolicy"
            ],
            "Resource": "arn:aws:ecr:<region>:<Account>:repository/<ECR Repo name>/"
        },
        {
            "Sid": "ECRPullPolicy",
            "Effect": "Allow",
            "Action": [
                "ecr:BatchCheckLayerAvailability",
                "ecr:GetDownloadUrlForLayer",
                "ecr:BatchGetImage"
            ],
            "Resource": [
                "arn:aws:ecr:<region>:<Account>:repository/<ECR Repo name>/"
            ]
        },
        {
            "Sid": "ECRAuthPolicy",
            "Effect": "Allow",
            "Action": [
                "ecr:GetAuthorizationToken"
            ],
            "Resource": [
                "arn:aws:ecr:<region>:<Account>:repository/<ECR Repo name>/"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "lambda:UpdateFunctionCode"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
profile
Talking Potato

0개의 댓글