TFX: 시작하기 튜토리얼 - 시작 파이프라인

Tae-Kyun Kim·2022년 4월 2일
0

TFX 공식 홈페이지

펭귄 데이터 세트를 사용한 간단한 TFX 파이프라인 튜토리얼

개요

  • 간단한 TFX 파이프라인을 실행하는 짧은 자습서
  • 데이터 가져오기, 모델 학습 및 학습된 모델 내보내기와 같은 가장 최소한의 ML 워크플로
  • 세 가지 필수 TFX 구성요소: ExampleGen, Trainer 및 Pusher

TFX 파이프라인 이해하기

TFX 파이프라인 이해하기 | TensorFlow

노트북 코드

TFX 설치하기

pip install -U tfx
  • tfx 설치
  • 설치 후에 런타임을 다시 시작해야함. Colab이 패키지를 로드하는 방식 때문!

Tenosorflow 및 TFX 버전 확인

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
>>>>>>>>>>>>>>>>
TensorFlow version: 2.6.2
TFX version: 1.4.0

변수 설정

import os

PIPELINE_NAME = "penguin-simple"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.
  • 파이프라인을 정의하는데 사용되는 몇 가지 변수가 있음
  • 이러한 변수를 원하는 대로 사용 지정 가능
  • 기본적으로 파이프라인의 모든 출력은 현재 디렉터리 아래에 생성

os.path.join

  • os.path.join이 좋은 이유

os.path.join 함수는 운영체제에 맞게 폴더 구분자를 다뤄서 경로를 생성

# Windows
import os

os.path.join('a', 'b', 'c')
>>>
a\b\c

→ 사실 그동안 슬래쉬 그으면 되는걸 왜 쓰나 했는데 운영체제에 따라 달라지는 것을 고려한 것!

  • 그런데 어차피 주피터노트북에서 사용하면 os가 같은것이 보장된게 아닌가?
  • 아직도 꼭 왜 써야 하는지 모르겠다!

absl

Google의 내부 코드베이스의 가장 근반이되는 부분으로부터 만들어진 C++ 라이브러리의 오픈소스 모음!

예시 데이터 준비

import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)

[urllib](https://docs.python.org/3/library/urllib.html#module-urllib)

URLs와 함께 작동하기 위해 몇가지 모듈들을 모은 패키지

  • requests 라이브러리와 큰 차이는 없음

[tempfile](https://docs.python.org/ko/3/library/tempfile.html)

임시 파일과 디렉토리 생성

  • 임시 파일이면 만들어졌다가 없어지는건가?

파이프라인 생성

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2 

Tensorflow Transform

TensorFlow로 데이터를 전처리하기 위한 라이브러리

  • 다음과 같이 전체 패스가 필요한 데이터에 유용
    • 평균 및 표준 편차로 입력 값을 정규화
    • 모든 입력 값에 대해 어휘를 생성하여 문자열을 정수로 변환
    • 관찰된 데이터 분포를 기반으로 수레를 버킷에 할당하여 부동 소수점을 정수로 변환
  • Apache Beam과 Apache Arrow가 종속성으로 필요한 것이 특징

tfx_bsl.public.tfxio

TFXIO는 모든 TFX 라이브러리 및 구성 요소가 공유하는 공통 메모리 내 데이터 표현과 이러한 표현을 생성하기 위한 I/O 추상화 계층을 정의

schema_pb2

Main aliases: tfmd.proto.v0.schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
  • 우리가 스키마를 생성하지 않기 때문에 대신에 feature spec을 생성함.
    • 스키마를 생성한다면 tfx에서 schemaGEN을 사용해주는건가?

tf.io.FixedLenFeature

고정된 길이의 input feature를 파싱하기 위한 구성요소

def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()

학습을 위한 특징들과 라벨을 생성함

  • file_pattern: 인풋 tfrecord 파일들의 경로들 혹은 패턴의 리스트
  • data_accessor : input을 RecordBatch로 전환하기 위한 DataAccessor
  • shcema : 인풋 데이터의 스키마
  • batch_size: 단일 배치에서 결합하기위해 리턴된 데이터셋의 연이은 요소의 수를 대표 → 우리가 일반적으로 아는 배치사이즈. 말로 푸니까 되게 어렵네
  • return: 데이터셋 튜플 (features, indices)
    • features → Tensors 딕셔너리
    • indices → 라벨 인덱스들의 단일 텐서
def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model
  • 펭귄 데이터를 분류하기 위한 DNN Keras Model 생성
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
  • 이 스키마는 주로 SchemaGen 혹은 pipeline의 작성자에 의해 제공된 manually-curated version임
  • 스키마는 만약에 Transform component가 사용되면 TFT graph 에서 파생됨
  • 둘중 하나가 누락된 경우, schema_from_feature_spec 은 매우 단순한 feature_spce에서부터 스키마를 제공하는데 사용될 수 있음. 하지만 리턴된 스키마는 매우 primitive(원시적)할 것임
  • 학습 결과는 fn_args.serving_model_dir 폴더에 저장됨

파이프라인 정의 작성

TFX 파이프라인을 생성하는 함수를 정의.

Pipeline 객체는 하나의 TFX pipeline을 대표함. 이 파이프라인은 TFX가 지원하는 pipeline orchestration system 중 하나를 이용하여 작동시킬 수 있음

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

파이프라인 실행

  • TFX는 파이프라인을 실행하기 위한 여러 오케스트레이터 지원.
  • 이 튜토리얼에서는 LocalDagRunner 를 사용
  • DAG
    • 방향성 순환 그래프.
    • TFX 파이프라인을 나타내기도 함
  • LocalDagRunner : 개발 및 디버깅을 위한 빠른 반복을 제공
  • Kubeflow Pipelines 및 Apache Airflow를 비롯한 다른 오케스트레이터도 지원
  • 다른 오케스트레이션 시스템에 대해서 배우기
  • LocalDagRunner 를 생성하고 우리가 이미 정의한 함수에 의해 생성된 Pipeline 객체를 전달함. 파이프라인은 곧장 실행되고 ML model training을 포함하여 pipeline의 진행 로그를 볼 수 있음
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))
  • 만약 성공적으로 작동한다면 INFO:absl:Component Pusher is finished. 를 마지막 로그에서 볼 수 있음. 왜냐하면 Pusher 컴포넌트는 파이프라인의 마지막 구성요소임
  • pusher component는 학습된 모델을 SERVING_MODEL_DIR ( serving_model/penguin-simple) 에 푸시함

0개의 댓글