Follow US
facebook twitter
NO. 303
TECH BLOG

じげんにおけるVertex AI Pipelines導入事例(Kubeflow Pipelines v2対応)

12.18.2024

はじめに

株式会社じげんで開発Unitデータ基盤チームにジョインしてデータ分析・機械学習に従事しているBun-chanです。主な業務としてはレコメンドの改善、特に機械学習パイプラインの構築を行っておりまして、住まいや中古車、求人などの4つのサービスで横断的に開発しています。

私のGitHubアカウントはこちらになります。博士課程在学中に独立して、某国立大学で客員助教をしながら機械学習エンジニアとして従事しております。

https://github.com/bamboo-nova

弊社ではMLOpsの一環でVertex AI Pipelinesを用いた機械学習パイプラインの構築を今年の頭から進めていた経緯があり、実際に複数のサービスで導入して成功事例やノウハウも固まってきました。しかし、Vertex AI PipelinesにおいてKubeflowのv2 SDKで実装されているテックブログは少なく、私自身もGCPの公式マニュアルと格闘したりと色々と苦労したことがありました。少しでも他の方々にとってパイプライン導入の参入障壁が低くなるように、本記事ではパイプラインの構築を決断した経緯や構築方法、実際にパイプラインを導入したことで得られた恩恵などについてお話ししたいと思います。

機械学習パイプライン導入の背景

従来の機械学習サービスの構築

私は去年の2月からデータサイエンティストとして加入しましたが、最初に頼まれたのはLTR(Leraning to Rank、ランキング機械学習)の枠組みで物件単位でレコメンドのためのスコア付けを行う学習モデルの構築をLightGBMで実装することでした。当時はチームで開発できる人数も限られていたため、手動で手軽にデプロイできることを目指して、Vertex AIで提供されてるカスタム予測ルーチン (CPR) [1]を使用して実装を行い、モデルのデプロイを実施しました。カスタム予測ルーチンではHTTP サーバーの設定やコンテナのゼロからのビルドを行うことなく、前処理コードと後処理コードを使用して簡単にカスタム コンテナをビルドして学習モデルをデプロイすることができます。このアプローチを流用して他のサービスでも類似したLTR用のモデルを横展開できるようになりました。

結果として、当時3名しかいなかったチームにも関わらず、合計5つのサービスで機械学習プロダクトの横展開を可能にし、1年半で30回前後のABテストを実施できるようになりました。また、実際にABテストで年間換算で億単位のインパクトを出せるような事例も生み出せるようになりました。また、Two-Towerなどの異なる手法を用いたレコメンドエンジンでも同様の手順でサービスへのデプロイができるようになりました。

当時の課題と解決策

このように機械学習プロジェクトが回るようにはなったのですが、将来的なチームとしての運用を考えた際、下記の問題点が徐々に浮き彫りになってきました。

実験の再現が難しい

一人が複数のプロジェクトを担当するような状態だったため、徐々に過去の検証結果を振り返ることが難しくなってきました。モデルを学習するために必要なパラメータが多かったりすると管理が煩雑になってしまうためです。

例えば、ABテスト終了後にリバートして過去のモデルを再度デプロイするケースです。再学習したい場合、再現方法がなんだったのか分からなくなってしまうリスクが顕在化してきました。

メンバーがいなくなった際の引き継ぎ問題

もしメンバーが休んだり辞職した際に、コードやパラメータを全て引き継ぐ必要があり、引き継ぎが簡単に遂行できないリスクがありました。特に、パイプラインとして自動化されておらずノートブックによる継承のみの場合、ノートブックが上から順番にセルを実行されてない場合はうまく再現できなかったり、コンテナベースではないためライブラリの依存関係が変わっていてエラーが起きて再現できないリスクが生まれていました。

モデル生成に手間がかかる

機械学習パイプラインを導入する前は全て手動で学習モデルの構築とデプロイを実施していました。

その際、頻繁にABテストを回そうとすると手動実行に時間がかかったり、ノートブックを常に気にして業務をしなければならなかったり、不用意にインスタンスを立ち上げたままになったりして開発コストが無駄にかかってしまうような事象が起きていました。また、こうしたローカルやVertex AIのワークベンチのようなノートブックインスタンスに依存した開発環境だと、複数の実験を回す際に学習リソースが制限されてしまうため、複数のタスクを一人で回す際に支障になっていました。

分業ができない

当初はチーム内の開発者が三人しかいなかったため、プロジェクト毎に最後まで一貫して遂行することが多かったです。しかし、最近ではインターンを含めた従業員が複数名参画してきたり、新卒が開発Unitデータ基盤チームにジョインする予定もあり、チームの規模がどんどん拡大してきました。こうなると必然的に開発者の得意分野などを考慮して、プロジェクトを分業したり、一部のプロセスのみを改修するようなタスクが発生してきます。しかし、ノートブックによる管理だとコードを全てを把握して一つずつ動かす必要があります。インターンや新規で入ってきた方にいきなり全てのコードを把握させるのは時間コストがもったいないため、必要な箇所だけを変更して動くようにして分業したいニーズが生まれました。

成果物の品質管理の問題

当時はノートブックでの運用が原則だったため、性能指標の保存やソースコードの保守性が問題になってきていました。特に、ノートブックだとGithub上でレビューができないため、linterも通さないことからコードの機密性が疎かになることが多く、全員が同じ機能なのに別々の関数名や機能のミスに気づかずにデプロイするようなリスクが潜んでいました。

以上の課題を踏まえて、弊チームではMLOpsの導入が必要不可欠であると判断して2024年の頭から機械学習パイプラインの導入を推進し、下記のように分解された課題に対して改善を行いました。

  • きちんと検証を評価できるように、ブランチやディレクトリ構成の運用ルールを統一化させる
    • 規約を定めることで、タスクを分解して何の機能を追加してるのか等を把握しやすいようにしました。
  • 再現ができるようにする
    • Vertex AI Experimentsを用いて評価指標を管理できるようにしました。
  • 類似した処理を何度も書いたり、可読性が低下したり、動作しないノートブックが出てこないようにする
    • ソースコードによってモジュール化して、linterを通すようにしました。
  • 分業ができるようにして、モデルの細かい仕様などを把握せずとも特定の機能の改良のみ学習および評価ができるようにする
    • コンポーネントごとに処理を独立させることで解決しました。

MLOpsについて

ここで、MLOpsについて簡単に説明して、我々の現在のMLOpsにおける立ち位置について簡単に補足したいと思います。

MLOpsでは、下記が大きな目標になっています。

  • DevOpsからの派生で、機械学習のワークフローを効率化させることを目的としている
  • 下記のワークフローを構築する際に、本番適用に時間がかかったり実験管理が面倒だったりするため、それらをMLOpsによって解決する

Google Cloud Platformでは機械学習のワークフローについて、下記のようにプロセスの自動化をレベル別で定義しています。[2]

レベル0: 手動プロセス

MLパイプラインが自動化を含まない場合はレベル0として扱われます。従来の我々のフェーズだとレベル0に相当しており、モデルの構築とデプロイを全て手動で実施していた形になります。

レベル1: MLパイプラインの自動化

MLパイプライン自体が自動化されている場合、レベル1として扱われます。若干項目が多いですが、レベル1に必要な要素として下記を挙げています。

  • 迅速な実験
    • 機械学習実験における前処理や学習などの各ステップがコンポーネントとして統合的に管理されている状態になっている
    • ステップ間の移行が自動化されていて、実験の迅速な反復が可能かつ本番環境への移行も容易な状態になっている
  • モデルの継続的トレーニング(CT)
    • モデルは本番環境で自動的に訓練されて、トリガーに基づいて新しいデータが使用されている状態になっている
  • 実験環境と運用環境の対称性
    • 開発や実験環境で使用されるパイプラインが、stagingおよびproduction環境でも使用されている
  • コンポーネントおよびパイプラインのモジュール化されたコード
    • MLパイプラインを構築するためには、コンポーネントが再利用可能で、構成可能で、他のMLパイプライン間で共有可能である必要がある
    • パイプライン内のコンポーネントは独立しており、異なる環境やライブラリを持つことができる。また、再現性がどの環境でも確保されている。
    • ノートブックのようなEDA的な扱いではなく、ソースコードとしてモジュール化されている必要がある。
  • モデルの継続的デリバリー
    • 新しいデータでトレーニングされた新しいモデルに対して予測サービスを継続的に提供できる
  • パイプラインのデプロイメント
    • レベル0では、トレーニング済みモデルを予測サービスとしてプロダクションにデプロイする。レベル1では、トレーニングパイプライン全体をデプロイし、自動的かつ定期的に実行されてトレーニング済みモデルを予測サービスとして提供する

現状の我々のチームだと定期実行は実施されていませんが、それ以外のレベル1の項目については概ね達成されている状態だと言えます。ちなみに、定期実行を含めた自動化のためのCI/CDを導入していない理由としては、かチーム人数が限られているため、多くのMLパイプラインを管理する必要がないことやパイプラインの新しい実装が頻繁にデプロイされないためです。そのため、将来的に本番環境で多くのMLパイプラインを管理する場合、MLパイプラインのビルド、テスト、デプロイを自動化するCI/CDを組み込む必要があります。

また、定期実行する際には、二つの方法があります。一つは、Cloud Functionsの関数をHTTPトリガーを使用してデプロイして、Cloud Schedulerジョブを作成することでパイプラインを定期実行することが可能です[3]。もう一つは、Pipeline内でScheduler APIを用いて定期実行する方法です[4]。後者の方が実装しやすいかもしれません。

レベル2: CI/CDパイプラインの自動化

自動化されたMLパイプラインと自動化されたCI/CDが構築されている状態で、理想的な開発環境であると言えます。どの機械学習開発チームも、成功事例が蓄積してメンバーが増えて成熟していくと最終的にレベル2に収束していくと思います。

開発設計

弊社ではGCPのVertex AIを使って機械学習を実装していたので、同じサービスとして提供されているVertex AI Pipelinesを用いて構築することにしました。

Vertex AI PipelinesはKubeflowを用いて開発を進めていきますが、我々のチームではKFP v2で構築を行っています。古い記事だとKFP v1での構築が紹介されているものが多いので注意してください。個人的には、KFP v1系のサポートが2024年12月で終了する見込み[5]のため、v2で構築した方が良いと思います。本ブログでは、 kfp==2.7.0 を使用しています。

https://cloud.google.com/vertex-ai/docs/supported-frameworks-list?hl=ja#pipelines

コンポーネントについて

パイプラインでは前処理や学習などの独立したコンポーネントで構成されて、それらが順番に連携しながら処理が進みます。コンポーネントの構築方法についてですが、大きく三種類あります[6]。今回は、我々がチーム開発で使用してる Lightweight Python Components と Container Components について簡単に説明したいと思います。

Lightweight Python Components

Lightweight Python Componentsはウェブ上の記事でも紹介されていることが多く、pipelinesを実行するpythonファイルに下記のようにコンポーネントの処理内容をPythonの関数として実装できます。

@dsl.component(
  packages_to_install=["pandas"],
  base_image="python:3.11.5-slim-bullseye"
)
def sample(
    input_path: dsl.Input[dsl.Dataset],
    output_path: dsl.Output[dsl.Dataset]
) -> None:
    import os
    import pandas as pd
    df = pd.read_csv(f"{input_path.path}/sample.csv")
    df["rand_seed"] = 42    
    # output_pathにデータフレームが保存される
    df.to_csv(output_path.path, index=False)

しかし、この方法だと関数の外で定義される変数が使用できなかったり、パッケージを関数内で呼び出す必要があるため、複雑な処理を入れるほどパイプラインのコードの可読性が下がります。

Container Components

こちらは最もシンプルな方法で、イメージとコマンドラインで引数をして処理を実行します。事前に必要なイメージをビルドしてGARにプッシュしておいて、下記のような実装をすることで実現できます。

@dsl.container_component
def sample(
    input_dir: dsl.Input[dsl.Dataset],
    output_dir: dsl.Output[dsl.Dataset]
) -> dsl.ContainerSpec:
    return dsl.ContainerSpec(
        image=<GARのImage URIを指定>,
        command=["python", "preprocess/main.py"],
        args=[
            "--input_dataset_uri", input_dir.path,
            "--output_dataset_uri", output_dir.path
        ],
    )

制約事項としては、Kubeflow Pipelines のメタデータ管理の機能の利用が著しく困難になることが知られています。例えば、Vertex AI ExperimentsをContainer Componentsを用いて使用する際に入出力のメタデータを管理する時にKubeflowで管理されている型から外れエラーになる事象を確認しています。
対策としては、できるだけContaiener Componentsで実装を行い、Vertex AI Experimentsでの指標の管理やデプロイなどのコンポーネントに対しては、処理が複雑にはならないのでLightweight Python Componentsで実装することで回避しています。

フォルダ構成

フォルダ構成は[7]の記事で紹介されている構成を参考にして構築しています。一例として、Two-Towerモデルの機械学習パイプラインは下記のようなフォルダ構成で運用しています。

pipelines
├── components
│   ├── configs
│   │   ├── Conversion
│   │   │   └── default.yaml
│   │   ├── Deploy
│   │   │   └── default.yaml
│   │   ├── Evaluation
│   │   │   └── default.yaml
│   │   ├── Preprocess
│   │   │   └── default.yaml
│   │   ├── Training
│   │   │   └── default.yaml
│   │   ├── default.yaml
│   │   └── model
│   │       ├── features.yml
│   │       ├── test_failed_features.yml
│   │       └── test_features.yml
│   ├── convert
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   ├── __init__.py
│   │   ├── based_model
│   │   │   ├── __init__.py
│   │   │   └── algorithm.py
│   │   ├── main.py
│   │   ├── poetry.lock
│   │   ├── pyproject.toml
│   │   └── tests
│   │       ├── __init__.py
│   │       └── test_conversion.py
│   ├── demo
│   │   └── demo.ipynb
│   ├── deploy
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   ├── deploy
│   │   │   └── __init__.py
│   │   ├── main.py
│   │   ├── poetry.lock
│   │   └── pyproject.toml
│   ├── evaluate
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   ├── __init__.py
│   │   ├── based_model
│   │   │   ├── __init__.py
│   │   │   └── algorithm.py
│   │   ├── main.py
│   │   ├── poetry.lock
│   │   ├── pyproject.toml
│   │   └── tests
│   │       ├── __init__.py
│   │       └── test_evaluate.py
│   ├── preprocess
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   ├── __init__.py
│   │   ├── based_model
│   │   │   ├── __init__.py
│   │   │   └── algorithm.py
│   │   ├── main.py
│   │   ├── poetry.lock
│   │   ├── pyproject.toml
│   │   └── tests
│   │       ├── __init__.py
│   │       └── test_preprocess.py
│   ├── pyproject.toml
│   ├── serving
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   ├── __init__.py
│   │   ├── poetry.lock
│   │   ├── pyproject.toml
│   │   └── server.py
│   ├── train
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   ├── __init__.py
│   │   ├── based_model
│   │   │   ├── __init__.py
│   │   │   ├── dataset.py
│   │   │   ├── model.py
│   │   │   └── trainer.py
│   │   ├── main.py
│   │   ├── poetry.lock
│   │   ├── pyproject.toml
│   │   └── tests
│   │       ├── __init__.py
│   │       └── test_train.py
│   └── utils
│       ├── __init__.py
│       ├── config.py
│       ├── dataframe_utils.py
│       ├── gcp_hogehoge.py
│       └── health_check.py
├── docker-compose.yaml
├── .env
├── pipeline.py
├── poetry.toml
├── pyproject.toml
└── README.md

まず、components/(preprocess, convert, train, evaluate, serving)ディレクトリ以下にコンテナを定義して順番に処理が走るようにしており、Container Componentsで実装されています。各コンポーネントの処理内容としては、下記のようになっています。

  • preprocess
    • 訓練データの前処理を実施する
  • convert
    • 前処理されたデータをtfrecords形式に変換して保存する
  • train
    • tfrecordsに変換された訓練データを用いてTwo-Towerモデルを学習する
  • evaluate
    • テストデータに対する評価指標を算出して、定性評価のためのデモ環境を提供する
  • serving
    • デプロイ時に必要となる学習モデルを搭載したAPIコンテナを作る

一方で、Vertex AI Experimentsを用いて性能指標を記録するコンポーネント(metrics)とデプロイのコンポーネント(deploy)に対しては、Container Componentsで実装すると先述したKubeflow Pipelines のメタデータ管理の制約によるエラーが発生してしまうため、Lightweight Python Componentsで可読性を損ねない形でシンプルに実装しています。

  • metrics
    • evaluateの後に性能指標を管理する
  • deploy
    • 学習されたモデルを基に本番環境にデプロイする

各コンポーネントは下記のような構成になっています。

├── Dockerfile
├── README.md
├── __init__.py
├── based_model
│   ├── __init__.py
│   └── algorithm.py
├── main.py
├── poetry.lock
├── pyproject.toml
└── tests
    ├── __init__.py
    └── test_preprocess.py

以下、各ファイルの役割を解説します。

  • based_model フォルダに中核となる algorithm.py などの処理が入っています
    • 学習コードはデータセットの加工とモデル定義、学習の三つに細かく分けています
  • main.py では、based_model で実行された出力結果を保存することがメインになっており、Container Componentsではコマンドラインで引数を指定して実行しています。こちらはパイプラインで実行するモードとコンテナ内で挙動をテストするモードの二つに分けて実装しています。debugという引数を指定するだけでコンテナ内でも実行した結果をdev環境のGCSに保存されるので、中間成果物を確認することができます
  • poetry.lock および pyproject.toml
    • コンポーネントで使用されるライブラリはpoetryで管理しています

また、特徴量は components/configs/model/features.yaml で下記のような形で定義されています。

price:
  type: float
  tower: query
  method:
    - log_normalize(対数変換して正規化)
  maximum: 300000
  minimum: 0
  missing_val: 0

これによって、どの特徴量に対してどんな欠損値補完や前処理をするかパターン化できるようにしています。components/configs/model 以下のその他のファイルはテスト用に使用しています。

ちなみに、ノートブックでEDAがしたい方向けに、demoフォルダに各コンポーネントを逐次実行できるノートブックも作成しています。コンポーネントの中身で何をしてるか分からない方向けに色々触れる環境として提供しています。

Container Componentsで実行する処理のパラメータはcomponents/configsでyamlファイルで定義して、hydra[8]を用いてパラメータを管理しています。パイプライン実行時に書き換えたい引数については、パイプラインを呼び出す部分で下記のように引数を指定することで異なる結果で実行できるようにしています。

@dsl.pipeline(name=f"pipeline-sample", description="Vertex Piplines sample", pipeline_root=<GCSのバケットを指定>)
def pipeline(
    learning_rate: float = 0.01,
    epoch: int = 30,
    validation_freq: int = 1,
    use_candidate_sampling_probability: bool = True
) -> None:

各コンポーネントでは単体テストを導入することで、最低限のテストが通るようになっています。テストのカバレッジはcoverage[9]モジュールで確認できるようになっています。また、linterについてはpysen[10]を導入してisort, black, flake8, mypyを一括でチェックできるようにしています。
以上の構成でパイプラインは構成されており、パイプラインを実行するだけであれば、Vertex AI Workbenchを立ち上げて、下記だけで動きます。

# 認証情報の設定(指定されたレジストリへのアクセスに必要な認証情報を追加)
$ gcloud auth configure-docker -docker.pkg.dev --quiet
# .envがあるディレクトリに移動して、環境変数を設定
$ source ./.env
# 各コンポーネントのイメージをビルドしてGARへプッシュ
$ docker compose build
$ docker compose push

# パイプラインを実行する
$ python pipeline.py

ちなみに、先ほど例に出したフォルダ構成を基に実行されたパイプラインは、下記の手順で実行されるようになります。

実装例

流石に実際に利用されているソースコードを公開するのは難しいため、今回は前処理から学習を実施して、得られた指標をExperimentsで記録するまでのパイプラインの実装例としてお見せしたいと思います。

Container Componentの作成方法

コンポーネントの作成方法についてですが、下記の二つの手順で構築できます。

  • コンポーネントを作成する
  • パイプラインを構築してコンパイルする

コンポーネントを作成する

まず、下記のようにコンポーネントを作成します。前処理だと下記のようになります。

@dsl.container_component
def preprocess(output_dir: dsl.Output[dsl.Dataset]) -> dsl.ContainerSpec:

    return dsl.ContainerSpec(

        image="<GARのImage URI>",

        command=["python", "preprocess/main.py"],

        args=["--output_dataset_uri", output_dir.path],

    )

パイプラインを構築してコンパイルする

次に、create_custom_training_job_op_from_component でコンポーネント仕様を指定し、作成したコンポーネントからカスタム トレーニング ジョブを作成するようにします。

@dsl.pipeline(name="", description="Vertex Piplines sample", pipeline_root="<GCSのバケット名>")
def pipeline() -> None:
    """Pipeline Setting."""
    # dataset_uriに相当するものは自動的にpipelines上で生成される
    preprocess_op = create_custom_training_job_op_from_component(
        preprocess,
        machine_type="n1-standard-4",
    )

    # dsl.Output以外の引数がある場合はここで指定する
    preprocess_task = preprocess_op()
    # デフォルトでTrueなので、一度成功するとキャッシュが維持されてしまう。変更した処理が反映されないのでFalseにする
    # preprocess_task.set_caching_options(False)

パイプラインのコンポーネント間の引数の引き渡し

例えば、preprocessからtrainへ前処理されたファイルを指定するときは、下記のようにすることで実現できます。

@dsl.pipeline(name="", description="Vertex Piplines sample", pipeline_root="<GCSのバケット名>")
def pipeline(
    learning_rate: float = 0.01,
    epoch: int = 30
) -> None:
    """Pipeline Setting."""
    # dataset_uriに相当するものは自動的にpipelines上で生成されるので必要ない
    preprocess_op = create_custom_training_job_op_from_component(
        preprocess,
        machine_type="n1-standard-32",
    )
    preprocess_task = preprocess_op()

    train_op = create_custom_training_job_op_from_component(
        train,
        machine_type="c2-standard-16",
    )
    train_task = train_op(
        learning_rate=learning_rate,
        epoch=epoch,
        input_path=preprocess_task.outputs["output_path"],
    )

ここでは、input_pathで前処理で出力された成果物のパスを指定しています。

以上をまとめると、前処理から学習を実施して、評価指標をVertex AI Experimentsで記録する手順は下記のように定義できます。事前にGARに対応するイメージをプッシュする必要があるので注意してください。

from google.cloud import aiplatform
from google_cloud_pipeline_components.v1.custom_job import create_custom_training_job_op_from_component
from kfp import compiler, dsl

@dsl.container_component
def preprocess(output_dir: dsl.Output[dsl.Dataset]) -> dsl.ContainerSpec:
    return dsl.ContainerSpec(
        image="<GARのImage URI>",
        command=["python", "preprocess/main.py"],
        args=["--output_dataset_uri", output_dir.path],
    )

# モデルファイルと評価指標のjsonファイルが出力される想定で記述しています。
@dsl.container_component
def train(
    learning_rate: float,
    epoch: int,
    input_dir: dsl.Input[dsl.Dataset],
    output_model_dir: dsl.Output[dsl.Model],
    output_metric_dir: dsl.Output[dsl.Dataset],
) -> dsl.ContainerSpec:
    return dsl.ContainerSpec(
        image="<GARのImage URI>",
        command=["python", "train/main.py"],
        args=[
            "--learning_rate", learning_rate,
            "--epoch", epoch,
            "--input_uri", input_dir.path,
            "--output_model_uri", output_model_dir.path,
            "--output_metric_uri", output_metric_dir.path,
        ],
    )

# ここは前述した理由でLightweight Python Componentsで実装しています
@dsl.component(base_image="python:3.11.5-slim-bullseye")
def metrics(
    metrics_path: dsl.Input[dsl.Dataset],
    summary_metrics: dsl.Output[dsl.Metrics],
) -> None:
    import json
    # trainで出力されたjsonで定義された評価指標を記録する

    with open(f"{metrics_path.path}/summary_metrics.json") as f:
        eval_result = json.load(f)

    # 指標を記録する
    for key, val in eval_result.items():
        summary_metrics.log_metric(key, val)

@dsl.pipeline(name="", description="Vertex Piplines sample", pipeline_root="<GCSのバケット名>")
def pipeline(
    learning_rate: float = 0.01,
    epoch: int = 30
) -> None:
    """Pipeline Setting."""
    # dataset_uriに相当するものは自動的にpipelines上で生成されるので必要ない
    preprocess_op = create_custom_training_job_op_from_component(
        preprocess,
        machine_type="n1-standard-4",
    )
    preprocess_task = preprocess_op()
    # preprocess_task.set_caching_options(False)

    train_op = create_custom_training_job_op_from_component(
        train,
        machine_type="c2-standard-16",
    )
    train_task = train_op(
        learning_rate=learning_rate,
        epoch=epoch,
        input_dir=preprocess_task.outputs["output_dir"],
    )
    # train_task.set_caching_options(False)

    metrics_task = metrics(metrics_path=train_task.outputs["output_metric_dir"])
    # metrics_task.set_caching_options(False)

if __name__ == "__main__":
    # 初期化
    aiplatform.init(
        project="",
        location="",
    )


    # コンパイル
    compiler.Compiler().compile(
        pipeline_func=pipeline,
        package_path="<パイプラインの設定が保存されたjsonファイル名を指定>"
    )

    # ジョブを定義
    job = aiplatform.PipelineJob(
        display_name="<パイプラインの表示名>",
        template_path="<パイプラインの設定が保存されたjsonファイル名を指定>",
        pipeline_root="<結果を保存するGCSバケット>",
        # enable_caching=False,
    )

    # パイプラインを実行
    job.submit(
        experiment="<実験名>",  # Experimentsを使用するときは追加する
        # service_account="<特定のサービスアカウントを使う場合は指定>"
    )

実際の成功事例

実際にパイプラインを導入した結果、主に下記のような恩恵が得られるようになりました。

ソースコードのレビュー文化が生まれた

今までノートブックによる開発だったのでPRでレビューすることが難しかったですが、ソースコードで管理するようになったため、PRでのレビューができるようになりました。これによってソースコードの機密性が向上するだけではなく、実行されるまでに細かいバグをレビュワーが見つけて回避しやすい体制を整えることができました。

分業が可能になり、担当を細かく分担できるようになった

従来は一人で前処理から学習、デプロイを一貫して担当しなければならなかったため、学習コストが大きい欠点がありました。しかし、パイプラインを構築したことで、出力の型が適切であれば担当コンポーネントの処理のみを理解すれば良い状態を作ることができるため、ドメイン知識がない新規ジョインの方でも低い学習コストで機能を追加したり改善することが可能になりました。

パイプラインとして全体の実行が容易になった

今までは前処理から学習、デプロイをノートブックで上から順番に実行しなければならなかったため、例えば誰かが上から順番にノートブックを実行してなかった場合に開発の再現ができずエラーになったりするリスクがありました。しかし、パイプラインを構築したことで同じ手順でコンポーネントが動くようになっているため、実行が容易になっただけではなく、人手による予期しないバグやエラーに対しても頑健になりました。

今後の展望

現状だと学習モデルを改善してABテストを実施することに注力していますが、今後学習モデルがある程度確立したら、モデル学習からデプロイまでの手順を自動的に定期実行できる方向で進めていきたいと考えています。最終的にはgithub actionsを導入して、パイプラインで定期的に実行されて出てきた指標が問題なければ上長が判断してデプロイする運用にできると理想だと思います。

また、複数のサービスを横展開していく上である程度共通して利用される前処理や後処理も確立されてきたため、それらをサービス共通のパッケージとして提供して呼び出せる方式にするのも良いかもしれません。

まとめ

今回は、じげんの開発Unitデータ基盤チームにおけるMLOps事例として、Vertex AI Pipelinesを用いた機械学習パイプラインの導入経緯と構築方法などについて解説させて頂きました。将来的に機械学習チームでパイプラインを導入する予定の方々にとって少しでも参考になれば幸いです。

開発Unitデータ基盤チームでは、随時開発メンバーを募集しています。私たちのチームでは特定のサービスにおけるPoCに留まらず、成功した事例を他のサービスへ横展開して事業成長させることを大きな一つの目的として動いています。したがって、「点」としての開発ではなく、「面」としての開発を想定したタスクに取り組めるのは他の企業にはない特徴だと思います。もしこのような環境に興味を持ってくださる方がいましたら、じげんに応募してもらえたらと思います。

参考文献

[1] https://cloud.google.com/vertex-ai/docs/predictions/custom-prediction-routines?hl=ja

[2] https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning

[3] https://future-architect.github.io/articles/20230213a/

[4] https://tech.layerx.co.jp/entry/2023/11/16/185944

[5] https://cloud.google.com/vertex-ai/docs/supported-frameworks-list?hl=ja#pipelines

[6] https://zenn.dev/asei/articles/introduction-to-kfp-v2

[7] https://zenn.dev/dhirooka/articles/71a5fc473baefb

[8] https://hydra.cc/

[9] https://github.com/nedbat/coveragepy

[10] https://github.com/pfnet/pysen


SHARE
  • facebook
  • twitter

現在募集中の関連求人はこちら

  • Jobs 【中途】《開発Unit》データサイエンティスト

    summary
    • 募集概要

      開発ユニットは5つの事業部(求人・住まい・自動車・ライフサポート・パートナーソリューション)が展開する20以上のライフメディアプラットフォームに横断的に関わる開発組織です。
      これらの事業部のうち、複数のデータ分析・機械学習案件を担当していただきます。

    • 仕事内容

      各サービスの検索・レコメンドの改善になります。

      豊富なログデータをもとに、まずは弊社で実績のある手法の横展開を行っていただきます。

      力量に応じて、新たな手法を論文等を読み調査し、その試験的な導入まで担っていただく可能性があります。

      また、今後機械学習の運用をより組織的に行っていくにあたり、MLOpsの基盤作りにも携わることになります。
      その他、必要に応じて全社的なデータ基盤の整備や、Lookerによるデータ可視化も行います。

      また、マーケターやセールスによるLookerやBigQueryを用いた分析の補助を行っていただくこともあります。

       

      <主に使用している技術等>
      *言語
      Python・SQL
      *環境
      インフラ: GCP
      データ分析基盤: BigQuery・trocco
      機械学習基盤: Vertex AI・BigQuery ML
      *プロジェクト管理
      Jira・GitHub
      *支給マシン
      MacBook Pro 16inch

       

      <仕事の魅力>
      ・多種多様なWebデータをもとにログ収集・加工・機械学習・デプロイ・システム化までの全体を担うことで、単なるモデル作成に留まらない経験を得られます。
      ・検索やレコメンドに関する知識・経験を修得できます。
      ・自らの施策の実績を売上の改善という明確な形で積むことができます。
      ・マーケターやセールスなどビジネスサイドとの協業により、Web領域におけるビジネス感覚を得ることができます。
      ・クラウド(GCP)ベースのデータ基盤に精通できます。

       

      <組織体制>
      ・インフラチーム:2名
      ・クリエイティブチーム:2名+委託
      ・ディレクターチーム:3名+委託
      ・データ分析基盤チーム:2名+委託
      ※こちらのデータ分析基盤チームへの配属となります。
      現在は男性社員2名のチームとなります。

    • 応募条件

      【必須スキル】
      ・Pythonによる機械学習モデル開発の経験
      ・複数人でのサービス開発の経験
      ・専門的な英文ドキュメントを読み解く能力

       

      【歓迎スキル】
      ・Kaggle等の機械学習コンペの参加・入賞経験
      ・STEM(特に情報・数学・統計系専攻)の学士・修士・博士号
      ・SQL(DBは問わない)の経験

       

      【求める人物像】
      ・答えの見えない状況で粘り強く考え続けることが得意な方

    • 雇用形態

      正社員

    • 勤務地

      東京都港区虎ノ門3-4-8

    • 勤務時間

      フレックスタイム制
      ・コアタイム:10:00~16:00
      ・フレキシブルタイム:(始業)7:00~10:00 (終業)16:00~22:00
      ・1日の標準労働時間:8時間
      ※所属長の判断により、10:00~19:00等の固定シフトとなる可能性がございます。

    • 休日・休暇

      完全週休2日制(土日祝)、夏季休暇、年末年始休暇
      年次有給休暇(時間単位での取得も可能)

    • 想定給与

      月給:375,000円~666,667円
      ※固定残業手当(45時間分):97,544円~173,411円を含む。
      ※45時間を超過した時間外労働の残業手当は追加支給。
      ※採用時のポジションにより、試用期間終了後、
      別途役職手当・管理監督者手当を支給する場合あり。

      上記はあくまで想定であり、ご経験・スキルを考慮して決定いたします。
      (給与改定年4回)

    • 待遇

      【保険】
      各種社会保険完備(雇用・労災・健康・厚生年金)

       

      【交通費】
      全額支給

       

      【受動喫煙防止のための取組み】
      屋内原則禁煙(喫煙室あり)

       

      【福利厚生・社内制度】
      ・関東ITソフトウェア健康保険組合
      ・ベネフィットステーション
      ・社内クラブ活動支援
      ・リファラル採用決定でインセンティブ支給
      ・誕生日月にAmazonギフトカード1万円分支給
      ・社内交流イベント(全社会、シャッフルランチ等)
      ・N-minutes(1日20分間昼寝ができる)
      ・資格補助制度
      ・服装自由
      ・在宅勤務制度有 等

    • 選考フロー

      書類選考 → 一次面接 → 適性検査+二次面接 →(最終面接)→ 内定

      ※面接はオンラインで実施いたします。
      ※適性検査については一次面接通過のご連絡後、最終面接までにオンラインで受験いただきます。(15分程度)
      また、選考の合否は適性検査結果のみで判断されることはありません。
      ※決裁権限者や特定のポジションでの採用の場合、内定通知前にリファレンスチェックを実施させて頂いております。ご理解頂けますと幸いです。

    • よくあるご質問

      こちらをご覧くださいFAQ

Top