[MLOps] MLFlow with Docker

leehs·2022년 4월 26일
1

mlops

목록 보기
1/2

개요

MLFlow

참고 내용: https://mlflow.org/docs/latest/concepts.html

  • MLFlow는 모델 학습을 반복하면서 사용하는 변화도와 같은 변수와 가중치 그리고 중간 학습률과 정답률 등을 기록하여 학습에 도움을 주는 도구
  • 네가지 요소로 구성됨
    • 추적:
      • 기계 학습 코드를 실행할 때와 나중에 결과를 시각화할 때 매개변수, 코드 버전, 메트릭 및 아티팩트를 로깅하기 위한 API 및 UI
      • 이를 사용하여 다른 사용자의 결과를 비교할 수도 있습니다.
        같은 모델이라도 다른 값을 사용하는 경우 변화를 기록하고 추적할 수 있도록 함
    • 프로젝트:
      • 재사용 가능한 데이터 과학 코드를 패키징하기 위한 표준 형식
      • 소스와 모델 그리고 추적을 프로젝트 별로 구분할 수 있도록 함
      • 프로젝트에서 MLflow Tracking API를 사용하면 MLflow는 프로젝트 버전(예: Git 커밋)과 모든 매개변수를 자동으로 저장
    • 모델:
      • 다양한 방식으로 기계 학습 모델을 패키징하기 위한 방법과 이를 배포하는 데 도움이 되는 다양한 도구를 제공
    • 모델 레지스트리:
      • 전체 수명 주기를 공동으로 관리하기 위해 중앙 집중식 모델 저장소, API 세트 및 UI를 제공
      • 모델과 생성 코드 그리고 매개변수 기록을 한번에 관리하고, 수많은 모델을 공동 작업하고 중앙 관리 할 수 있도록 함

MLFlow 적용 지점

  • 개발 단계의 모델 개발 및 학습 시, 모델 형상 관리
  • 학습 단계의 모델과 매개변수에 따른 정확도 추적
  • 운영 단계의 매개변수와 학습 결과 가중치 등의 아티팩트 형상 관리
  • 프로젝트의 여러 모델 비교, 분석, 관리 등의 기록

MLFlow 아쉬운 점

  • 각각의 학습마다 보여주는 그래프를 하나만 띄울 수 있음
  • 하이퍼 파라미터 자동 튜닝 기능이 없음 (실험 기능으로 제공하나 부족함)
  • 계정 관리가 없음, 권한 관리도 없음
  • 다중 서버 구현이 아직 안됨
  • Docker 공식 지원이 아직 안됨

설치

  • docker-compose 를 사용하여 설치
  • env 파일 사용하여 환경변수 전달

.env file

  • 적절하게 변경하여 사용
DB_NAME=mlflow
DB_USER=mlflow
DB_PW=mlflow123
DB_PORT=5432

AWS_BUCKET_NAME=mlflow
AWS_ACCESS_KEY_ID=mlflow
AWS_SECRET_ACCESS_KEY=mlflow123
AWS_REGION=localhost

MLFLOW_PORT=5000

tracker (server)

  • mlflow 에서 Dockerfile 을 공식 지원하지 않아 직접 작성

FROM continuumio/miniconda3:latest

RUN apt-get update && apt-get install -y git
RUN pip install mlflow psycopg2-binary pymysql boto3

WORKDIR /app
RUN cd /app && git clone https://github.com/mlflow/mlflow-example.git

COPY wait-for-it.sh wait-for-it.sh
RUN chmod +x wait-for-it.sh
wait-for-it.sh 내용 보기
#!/usr/bin/env bash
# Use this script to test if a given TCP host/port are available

WAITFORIT_cmdname=${0##*/}

echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi }

usage()
{
    cat << USAGE >&2
Usage:
    $WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args]
    -h HOST | --host=HOST       Host or IP under test
    -p PORT | --port=PORT       TCP port under test
                                Alternatively, you specify the host and port as host:port
    -s | --strict               Only execute subcommand if the test succeeds
    -q | --quiet                Don't output any status messages
    -t TIMEOUT | --timeout=TIMEOUT
                                Timeout in seconds, zero for no timeout
    -- COMMAND ARGS             Execute command with args after the test finishes
USAGE
    exit 1
}

wait_for()
{
    if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
        echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
    else
        echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout"
    fi
    WAITFORIT_start_ts=$(date +%s)
    while :
    do
        if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then
            nc -z $WAITFORIT_HOST $WAITFORIT_PORT
            WAITFORIT_result=$?
        else
            (echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1
            WAITFORIT_result=$?
        fi
        if [[ $WAITFORIT_result -eq 0 ]]; then
            WAITFORIT_end_ts=$(date +%s)
            echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds"
            break
        fi
        sleep 1
    done
    return $WAITFORIT_result
}

wait_for_wrapper()
{
    # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692
    if [[ $WAITFORIT_QUIET -eq 1 ]]; then
        timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
    else
        timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
    fi
    WAITFORIT_PID=$!
    trap "kill -INT -$WAITFORIT_PID" INT
    wait $WAITFORIT_PID
    WAITFORIT_RESULT=$?
    if [[ $WAITFORIT_RESULT -ne 0 ]]; then
        echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
    fi
    return $WAITFORIT_RESULT
}

# process arguments
while [[ $# -gt 0 ]]
do
    case "$1" in
        *:* )
        WAITFORIT_hostport=(${1//:/ })
        WAITFORIT_HOST=${WAITFORIT_hostport[0]}
        WAITFORIT_PORT=${WAITFORIT_hostport[1]}
        shift 1
        ;;
        --child)
        WAITFORIT_CHILD=1
        shift 1
        ;;
        -q | --quiet)
        WAITFORIT_QUIET=1
        shift 1
        ;;
        -s | --strict)
        WAITFORIT_STRICT=1
        shift 1
        ;;
        -h)
        WAITFORIT_HOST="$2"
        if [[ $WAITFORIT_HOST == "" ]]; then break; fi
        shift 2
        ;;
        --host=*)
        WAITFORIT_HOST="${1#*=}"
        shift 1
        ;;
        -p)
        WAITFORIT_PORT="$2"
        if [[ $WAITFORIT_PORT == "" ]]; then break; fi
        shift 2
        ;;
        --port=*)
        WAITFORIT_PORT="${1#*=}"
        shift 1
        ;;
        -t)
        WAITFORIT_TIMEOUT="$2"
        if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi
        shift 2
        ;;
        --timeout=*)
        WAITFORIT_TIMEOUT="${1#*=}"
        shift 1
        ;;
        --)
        shift
        WAITFORIT_CLI=("$@")
        break
        ;;
        --help)
        usage
        ;;
        *)
        echoerr "Unknown argument: $1"
        usage
        ;;
    esac
done

if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then
    echoerr "Error: you need to provide a host and port to test."
    usage
fi

WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15}
WAITFORIT_STRICT=${WAITFORIT_STRICT:-0}
WAITFORIT_CHILD=${WAITFORIT_CHILD:-0}
WAITFORIT_QUIET=${WAITFORIT_QUIET:-0}

# Check to see if timeout is from busybox?
WAITFORIT_TIMEOUT_PATH=$(type -p timeout)
WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH)

WAITFORIT_BUSYTIMEFLAG=""
if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then
    WAITFORIT_ISBUSY=1
    # Check if busybox timeout uses -t flag
    # (recent Alpine versions don't support -t anymore)
    if timeout &>/dev/stdout | grep -q -e '-t '; then
        WAITFORIT_BUSYTIMEFLAG="-t"
    fi
else
    WAITFORIT_ISBUSY=0
fi

if [[ $WAITFORIT_CHILD -gt 0 ]]; then
    wait_for
    WAITFORIT_RESULT=$?
    exit $WAITFORIT_RESULT
else
    if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
        wait_for_wrapper
        WAITFORIT_RESULT=$?
    else
        wait_for
        WAITFORIT_RESULT=$?
    fi
fi

if [[ $WAITFORIT_CLI != "" ]]; then
    if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then
        echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess"
        exit $WAITFORIT_RESULT
    fi
    exec "${WAITFORIT_CLI[@]}"
else
    exit $WAITFORIT_RESULT
fi

 

s3 (minio)

  • 라이센스가 2021년 5월부터 AGPLv3 로 변경됨에 따라 주의하여 사용

docker-compose

version: "2.1"
services:
  s3:
    container_name: s3
    image:  minio/minio:RELEASE.2022-03-08T22-28-51Z
    restart: unless-stopped
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      - MINIO_ROOT_USER=${AWS_ACCESS_KEY_ID}
      - MINIO_ROOT_PASSWORD=${AWS_SECRET_ACCESS_KEY}
    entrypoint: sh
    command: -c 'mkdir -p /data/${AWS_BUCKET_NAME} && minio server /data --console-address ":9001"'
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 15s
      timeout: 20s
      retries: 3
    volumes:
      - minio:/data

  db:
    image: postgres:14.2
    restart: unless-stopped
    container_name: mlflow-db
    expose:
      - ${DB_PORT}
    environment:
      - POSTGRES_DB=${DB_NAME}
      - POSTGRES_USER=${DB_USER}
      - POSTGRES_PASSWORD=${DB_PW}
    healthcheck:
      test: ["CMD-SHELL", "pg_isready", "-U", "${DB_USER}"]
      interval: 15s
      timeout: 10s
      retries: 5
    volumes:
      - db:/var/lib/postgresql/data

  tracker:
    container_name: tracker
    image: mlflow-tracker
    restart: unless-stopped
    build:
      context: ./
      dockerfile: Dockerfile
    depends_on:
      - "s3"
      - "db"
    ports:
      - ${MLFLOW_PORT}:${MLFLOW_PORT}
    environment:
      - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
      - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
      - AWS_DEFAULT_REGION=${AWS_REGION}
      - MLFLOW_S3_ENDPOINT_URL=http://s3:9000
      - BACKEND=postgresql://${DB_USER}:${DB_PW}@db:${DB_PORT}/${DB_NAME}
    networks:
      - public
      - internal
      - traefik
    entrypoint: >
      /bin/sh -c "
        /app/wait-for-it.sh db:5432 --timeout=60 -- \
        mlflow server --port ${MLFLOW_PORT} --host 0.0.0.0 \
          --backend-store-uri $${BACKEND} \
          --default-artifact-root s3://${AWS_BUCKET_NAME}/ 
      "
    healthcheck:
      test: ["CMD", "wget", "-O/dev/null", "-q", "http://localhost:${MLFLOW_PORT}/"]
      interval: 15s
      timeout: 15s
      retries: 3

volumes:
  db:
  minio:

build

  • Dockerfile 로 docker 이미지를 생성
docker build -t mlflow-tracker .
  • docker-compose 로 서비스를 실행
docker-compose up -d
  • 웹 브라우저에서 서버 접속

사용법

python

  • 아래와 같이 import
import mlflow
import mlflow.sklearn
import boto3
...
  • 가중치를 저장하는데 S3(S3 Compatibility)를 사용

  • 아래의 환경변수 설정

    • MLFLOW_S3_ENDPOINT_URL: MLFLOW에서 가중치를 저장하는 S3 서버의 주소
    • AWS_ACCESS_KEY_ID: S3의 ACCESS KEY 값
    • AWS_SECRET_ACCESS_KEY: S3의 ACCESS SECRET KEY 값
  • 코드에서 아래 설정을 적용

  • 작성자에 따라 해당 값을 별도 Config 로 관리

# Tracking 서버 주소
remote_server_uri = "http://127.0.0.1:5000"
mlflow.set_tracking_uri(remote_server_uri)
# 프로젝트 이름
experiment_name = "mlflow_test"
mlflow.set_experiment(experiment_name)
  • 만들어진 experiment 정보는 아래와 같이 확인 가능

experiment = mlflow.get_experiment_by_name(experiment_name)
print("Experiment_id: {}".format(experiment.experiment_id))
print("Artifact Location: {}".format(experiment.artifact_location))
print("Tags: {}".format(experiment.tags))
print("Lifecycle_stage: {}".format(experiment.lifecycle_stage))

logging

  • mlflow.start_run()

    • 서버와의 연결 세션을 생성하고 해당 실험을 진행함을 직접 선언하는 코드
    • 해당 코드를 사용하지 않더라도, 이후에 log_param 등을 사용하면 자동으로 시작
    # ... 설정 ...
    mlflow_ctx = mlflow.start_run()
    # ...학습 진행...
  • mlflow.log_param()

    • 파라미터 값을 기록하는 용도
    • lr, gamma 등의 값을 기록하는데 사용
    mlflow.log_param("lr",lr)
    mlflow.log_param("gamma",gamma)  
  • mlflow.log_metric()

    • 지표 값을 기록하는 용도
    • loss 값이나 quality 값 등을 기록하는데 사용
    mlflow.log_metric(key="epoch_train_avg_loss", value=epoch_train_avg_loss, step=i)
    mlflow.log_metric(key="epoch_val_avg_loss", value=epoch_val_avg_loss, step=i)
    mlflow.log_metric(key="best_val_loss", value=best_val_loss, step=i)
  • mlflow.pytorch.log_model()

    • 학습된 모델을 저장하는 용도
    • S3 에 업로드를 하기 때문에 다른 작업에 비해 오래 걸림
    for i in range(num_epoch):
      ...
      if i%50==50-1:
      mlflow.pytorch.log_model(model, "model-"+str(i))
      ...
  • mlflow.pytorch.log_state_dict()

    • 모델 중 state_dict 만 저장하는 용도
    • 아래 예제는 pytorch 를 사용하는 경우에 해당
    for i in range(num_epoch):
      ...
      if epoch_val_avg_loss < best_val_loss:
        mlflow.pytorch.log_state_dict(model.state_dict(), artifact_path="best-state-dict-model")

참고

0개의 댓글