이미지 출처는 링크 or 아이펠 교육 자료입니다.
왼쪽 ➡️ 오른쪽
: 가장 관심사일 것으로 예측되는 경우의 작품을 가장 왼쪽에 배치위 ➡️ 아래
: 시리즈 중 가장 관심 있을만한 시리즈를 가장 상단에 배치이미지
: 이미지도 추천을 해줌!참고 자료 : Kaggle API 사용법
API
부분의 Create New Token
클릭하여 발급GCP에서 설정함: market place
검색 > colab
검색
필요한 API 사용 설정: 사용 설정
클릭하여 설정
us-west4-a
n1-standard-8(vCPU 8개, 코어 4개, 메모리 30GB)
NVIDIA T4
"GPU 글로벌 할당량에서 GPU 1개를 초과했습니다" 경고 해결 방법 ➡️ 유료 계정 활성화 필요!
- "할당량 페이지" 클릭
- 현재 할당량이 0으로 설정되어 있어 발생하는 문제이므로, 1로 늘리기
- 그러나, 유료 계정이 아니면 할당량 조정이 되지 않기 때문에 유료 계정으로 전환
- 활성화 후 조정이 가능해짐
맞춤 GCE VM에 연결
!pip install kaggle
!kaggle datasets download -d netflix-inc/netflix-prize-data
!unzip "netflix-prize-data.zip" -d netflix-prize-data
MSE
로 설정data_dir = "netflix-prize-data"
data = load_netflix_data(data_dir)
dataset = NetflixDataset(data)
data_loader = DataLoader(dataset, batch_size=512, shuffle=True)
num_movies = data["Movie_Id"].max()
model = RecommendationNetwork(num_movies=num_movies, emb_size=100).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
def calculate_accuracy(y_pred: torch.Tensor, y_true: torch.Tensor) -> float:
# 평점 반올림 -> 정확도 계산(정수 평점 가정)
y_pred_rounded = torch.round(y_pred)
correct_predictions = (y_pred_rounded == y_true).float().sum()
accuracy = correct_predictions / y_true.shape[0]
return accuracy.item()
loss_values: list[float] = []
accuracy_values: list[float] = []
num_epochs = 5
for epoch in range(num_epochs):
model.train()
total_loss = 0.0
total_accuracy = 0.0
count_batches = 0
for movies, ratings in tqdm(data_loader):
movies = movies.to(device)
ratings = ratings.to(device)
target = ratings[:, -1]
optimizer.zero_grad()
outputs = model(movies)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
accuracy = calculate_accuracy(outputs, target)
total_loss += loss.item()
total_accuracy += accuracy
count_batches += 1
avg_loss = total_loss / count_batches
avg_accuracy = total_accuracy / count_batches
loss_values.append(avg_loss)
accuracy_values.append(avg_accuracy)
print(f"Epoch {epoch+1}: Loss = {avg_loss}, Accuracy = {avg_accuracy}")
GPU 추가 할당량 설정(권장)
GPUs
,NVIDIA T4 GPUs
를 3정도로 늘리는 것을 추천
- 우선은 1로 진행
Nexflix review data
Data Loader
NetflixRecommendation
model.mar
vertex AI
Users
, Movie
, Rating
, Date
writefile model.py
writefile handler.py
torch.save(model.state_dict(), "model.pth")
class Context:
def __init__(self):
self.system_properties = {
"model_dir": "."
}
context = Context()
!pip install torchserve torch-model-archiver
from handler import NetflixRecommendation
netflix_recommendation = NetflixRecommendation()
netflix_recommendation.initialize(Context())
sample_input = {"movies": [123, 456, 789, 100, 101, 102, 103, 104, 105, 106]}
preprocessed = netflix_recommendation.preprocess([[sample_input]])
predicted_rating = netflix_recommendation.inference(preprocessed)
result = netflix_recommendation.postprocess(predicted_rating)
print(f"Predicted Rating: {result}")
!torch-model-archiver \
--model-name model \
--version 1.0 \
--model-file model.py \
--serialized-file model.pth \
--handler handler.py \
--export-path . \
--force
%%writefile config.properties
inference_address=http://0.0.0.0:9080
management_address=http://0.0.0.0:9081
metrics_address=http://0.0.0.0:9082
disable_system_metrics=true
number_of_netty_threads=32
job_queue_size=1000
model_store=/home/model-server/model-store
from google.cloud import storage, aiplatform
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file("credentials.json")
storage_client = storage.Client(credentials=credentials)
bucket = storage_client.bucket(BUCKET_NAME)
aiplatform.init(
project=PROJECT_ID,
location=LOCATION,
credentials=credentials,
)
model_path = f"gs://{BUCKET_NAME}/models"
registry_model = aiplatform.Model.upload(
display_name="Netflix Recommender",
artifact_uri=model_path,
serving_container_image_uri="{}/trainer:1.0.0",
is_default_version=True,
version_aliases=["v1"],
version_description="A netflix rating classification model",
serving_container_predict_route="/predictions/model",
serving_container_health_route="/ping",
)
DEPLOY_COMPUTE = "n1-standard-2"
DEPLOY_ACCELERATOR = "NVIDIA_TESLA_T4"
endpoint = aiplatform.Endpoint.create(
display_name="netflix-recommendation",
project=PROJECT_ID,
location=LOCATION,
)
deployment = registry_model.deploy(
endpoint=endpoint,
machine_type=DEPLOY_COMPUTE,
min_replica_count=1,
max_replica_count=1,
accelerator_type=DEPLOY_ACCELERATOR,
accelerator_count=1,
traffic_percentage=100,
sync=True,
)
instances = [{"movies": [100, 101, 102, 103, 104, 105, 106, 107, 108, 400]}]
endpoint.predict(instances=[instances])
로그 탐색기
에서도 확인 가능
endpoint.undeploy_all()
endpoint.delete()
registry_model.delete()
지속 학습 ❌
데이터 전처리가 Colab에서 진행됨
형상 관리의 부재
Colab 이용 시, 모델이 사용하는 데이터셋이 휘발됨
주의: 이전과 동일하게 런타임을
맞춤 GCE VM에 연결
하여 사용해야 함
credentials = service_account.Credentials.from_service_account_file("credentials.json")
!gcloud auth login --cred-file=credentials.json
import os
from google.colab import userdata
os.environ["KAGGLE_USERNAME"] = userdata.get("KAGGLE_USERNAME")
os.environ["KAGGLE_KEY"] = userdata.get("KAGGLE_KEY")
!pip install kaggle
!kaggle datasets download -d netflix-inc/netflix-prize-data
!unzip "netflix-prize-data.zip" -d netflix-prize-data
Gold
레벨(메달리온 아키텍쳐)upload_blob
함수 사용(이전과 동일)csv_file_name = "netflix_prize_data.csv"
data.to_csv(csv_file_name, index=False)
upload_blob(csv_file_name, csv_file_name)
load_to_bigquery(f"gs://{BUCKET_NAME}/{csv_file_name}")
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root"
BUCKET_PATH = f"gs://{BUCKET_NAME}"
MODEL_DIR = f"{BUCKET_PATH}/model"
DATA_DIR = f"{BUCKET_PATH}/data"
DEPLOY_COMPUTE = "n1-standard-2"
DEPLOY_ACCELERATOR = "NVIDIA_TESLA_T4"
@component(base_image="python:3.11", packages_to_install=["google-cloud-aiplatform"])
def deploy_model_op(project_id: str, model_path: str):
from google.cloud import aiplatform
DEPLOY_COMPUTE = "n1-standard-2"
DEPLOY_ACCELERATOR = "NVIDIA_TESLA_T4"
model = aiplatform.Model.upload(
project=project_id,
display_name="netflix-recommender",
artifact_uri=model_path,
serving_container_image_uri="asia-northeast3-docker.pkg.dev/gde-project-aicloud/mlops-quicklab/trainer:1.0.3",
)
endpoint = model.deploy(
machine_type=DEPLOY_COMPUTE,
min_replica_count=1,
max_replica_count=1,
accelerator_type=DEPLOY_ACCELERATOR,
accelerator_count=1,
traffic_percentage=100,
)
@pipeline(name="netflix-recommender-pipeline", pipeline_root=PIPELINE_ROOT)
def netflix_recommender_pipeline(
project_id: str,
dataset_id: str,
table_id: str,
gcs_bucket_name: str,
gcs_bucket_path: str
):
export_op = export_data_to_gcs(
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
gcs_bucket_path=gcs_bucket_path
)
train_op = (
train_model(
project_id=project_id,
gcs_bucket_name=gcs_bucket_name,
gcs_bucket_path=gcs_bucket_path,
data_path=export_op.output,
)
.set_memory_limit("32Gi")
)
deploy_model_op(project_id=project_id, model_path=train_op.output)
job.submit
서비스 계정 사용자
가 등록되어 있어야 함!모든 노드가 실행되고 나면, 파이프라인은 종료됨
2번 노드 진행중
최종 노드는 실패
full_table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
try:
client.get_table(full_table_id)
print("Table {} already exists.".format(full_table_id))
except:
schema = [
bigquery.SchemaField(name="movie_id", field_type="STRING"),
bigquery.SchemaField(name="watch_time", field_type="INTEGER"),
bigquery.SchemaField(name="date", field_type="INTEGER"),
]
table = bigquery.Table(full_table_id, schema=schema)
client.create_table(table)
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))
def insert_new_line(watch_time: int, current_datetime: datetime.datetime) -> None:
rows_to_insert = [
{
"movie_id": str(uuid.uuid4()),
"watch_time": int(watch_time),
"date": int(current_datetime.timestamp()),
},
]
errors = client.insert_rows_json(full_table_id, rows_to_insert)
if errors != []:
print("Encountered errors while inserting rows: {}".format(errors))
total_iterations = 10000
current_datetime = datetime.datetime.utcnow()
for i in tqdm(range(total_iterations)):
if i >= total_iterations // 2:
mean, std_dev = 4800, 1200
watch_time = int(np.random.normal(mean, std_dev))
watch_time = np.clip(watch_time, 2400, 7200)
else:
mean, std_dev = 2500, 1250
watch_time = int(np.random.normal(mean, std_dev))
watch_time = np.clip(watch_time, 0, 5000)
insert_new_line(watch_time, current_datetime=current_datetime)
added_seconds = np.random.uniform(1.0, 120.0)
current_datetime += datetime.timedelta(seconds=added_seconds)
import base64
from facets_overview.generic_feature_statistics_generator import GenericFeatureStatisticsGenerator
from IPython.core.display import display, HTML
gfsg = GenericFeatureStatisticsGenerator()
proto = gfsg.ProtoFromDataFrames([
{"name": "current", "table": df_current},
{"name": "base", "table": df_base},
])
protostr = base64.b64encode(proto.SerializeToString()).decode("utf-8")
HTML_TEMPLATE = """
<script src="https://cdnjs.cloudflare.com/ajax/libs/webcomponentsjs/1.3.3/webcomponents-lite.js"></script>
<link rel="import" href="https://raw.githubusercontent.com/PAIR-code/facets/1.0.0/facets-dist/facets-jupyter.html" >
<facets-overview id="elem"></facets-overview>
<script>
document.querySelector("#elem").protoInput = "{protostr}";
</script>"""
html = HTML_TEMPLATE.format(protostr=protostr)
display(HTML(html))
mlflow
파트 활용version: "3.8"
services:
mlflow:
image: ghcr.io/mlflow/mlflow:v2.2.1
ports:
- "5000:5000"
environment:
- TZ=UTC
command: ["mlflow", "ui", "--host", "0.0.0.0"]
trainer-image:
image: trainer:latest
build:
context: .
dockerfile: Dockerfile
command: echo "Building trainer image"
trainer1:
image: trainer:latest
depends_on:
- mlflow
- trainer-image
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- BATCH_SIZE=64
- LEARNING_RATE=0.01
- NN_DIM_HIDDEN=128
trainer2:
image: trainer:latest
depends_on:
- mlflow
- trainer-image
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- BATCH_SIZE=32
- LEARNING_RATE=0.02
- NN_DIM_HIDDEN=512
trainer3:
image: trainer:latest
depends_on:
- mlflow
- trainer-image
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- BATCH_SIZE=128
- LEARNING_RATE=0.005
- NN_DIM_HIDDEN=64
trainer4:
image: trainer:latest
depends_on:
- mlflow
- trainer-image
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- BATCH_SIZE=128
- LEARNING_RATE=0.5
- NN_DIM_HIDDEN=256
trainer5:
image: trainer:latest
depends_on:
- mlflow
- trainer-image
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- BATCH_SIZE=16
- LEARNING_RATE=0.05
- NN_DIM_HIDDEN=196
$ docker-compose up
Settings
> Developer settings
classic Tokens에서 Generate new token
클릭
설정
$ export CR_PAT={토큰 정보 붙여넣기}
$ echo $CR_PAT | docker login ghcr.io -u USERNAME --password-stdin
$ docker-compose up
Mac의 경우, Airplay 수신 모드 해제해야 5000번 포트 사용 가능
트레이닝 완료 시, Duration에 시간 기록됨
Metrics로 성능 확인 가능
htop으로 모니터링할 경우 설치 필요
$ brew install htop
htop
입력하면 바로 실행 가능
참고 : 2-5. 더 나아가기: 부하분산
Scale-up
⭐️ Scale-out ⭐️
메달리온 아키텍쳐
Runner 분산 최적화 ❌
스트리밍 처리 ❌
Task 간 데이터 전송의 비효율 발생
데이터 보안 ❌
소프트웨어 개발 생애주기의 CI, CD로 코드 변경 사항의 자동화, 검증, 배포까지의 일련의 과정을 진행하는 것
CI
CD
CI/CD 워크플로우 구조
GPU 자원의 효율적 설정
초기 비용 및 시간 효율 상승
모니터링, 로깅 등의 검색 설정을 하지 않아도 기본 제공되는 기능으로 해결 가능
A/B 테스트 가능