NTTドコモR&Dの技術ブログです。

MLOpsパイプラインをSnowflakeのみで実現してみた

1. 概要

NTTドコモ データプラットフォーム部の外山です。私たちのチームでは、4000万規模のユーザーを対象とした機械学習モデルを計50本開発・運用しており、効果的なマーケティングやプロダクト改善を実施する事業部を支えています。

これまで、AutoML製品のDataRobotを核にMLOpsパイプラインを構築していましたが、Snowflakeを効果的に利用できていないことでスコアリングに時間を要してしまったり、パイプラインが複雑になっていました。
SnowflakeのみでMLを実現できるSnowpark MLが登場し、さらにSnowflake Python APIを効果的に利用することで、SnowflakeネイティブにMLOpsを実現できるようになったことを踏まえ、MLOpsパイプラインを刷新することとしました。

本記事では、各機能のチュートリアルやハンズオンに留まらず、SnowflakeにおけるMLOpsパイプラインの設計に注目し、ご紹介したいと思います。 なお、本記事の掲載の取組については、株式会社シイエヌエスの支援メンバーである小倉さんに詳細検討・実装を進めてもらっており、以下は、小倉さんに執筆いただいています。

2. 構築したMLOpsパイプライン

処理フロー及び基盤設計
実現したいMLOpsパイプラインの処理フローは以下の通りです。

  • モデリング
    1. 正解ラベル作成
    2. 正解データのサンプリング
    3. 特徴量選択
    4. カテゴリ変数のエンコーディング
    5. 訓練データとテストデータへの分割
    6. 学習
    7. 学習済みモデルの保存
    8. 学習済みモデルの精度評価
  • スコアリング前処理
    1. スコアリングデータセット抽出
    2. カテゴリ変数のエンコーディング
    3. スコアリング(確率予測)

これらの機能を実現するための既存パイプラインでは大きく以下2点の課題がありました。

  1. スコアリングサーバへのAPI連携において、メモリ負荷が高い&データ転送処理が何度も必要なため実行に時間がかかってしまう
  2. 実行基盤のMWAAにて、EKSの維持・管理が発生し、管理コストがかかっている

nttdocomo-developers.jp

一方、新方式では、Snowflakeネイティブなパイプラインを構築し、実行環境をSnowflakeに集約する構成を検討しました。 それにより、既存MLOpsパイプラインの課題であった実行時間の短縮と管理コストの低減を実現しています。また、AutoML製品を使用しないため、ライセンス費用も不要となりました。

ToBeアーキテクチャー

3. 実装におけるポイント

機械学習(ML)については、PythonのSnowpark MLライブラリを介してLightGBMによる学習・スコアリング処理を実装することで、Snowflakeネイティブなモデル構築を実現しています。 オペレーション(Ops)をSnowflakeネイティブに実現するにはいくつかのポイントがあり、実例とともにご紹介したいと思います。

3.1. Pythonストアドプロシージャ

2.の処理フローの各処理を、Snowpark MLライブラリを用いてPythonコードで扱うために、SnowflakeのPythonストアドプロシージャを利用しています。

実装したPythonコードは下記のようなディレクトリ構成となっています。Pythonストアドプロシージャの中で実行する処理を関数化し、別モジュール(ここではmodeling.py)に切り出すことで、複数のストアドプロシージャ間でコードが再利用可能となります。また、ストアドプロシージャとしてデプロイする関数自体のコード量を減らすことができ、可読性の向上も実現しています。

.
├src/                               # Pythonコードを格納するディレクトリ
│└ mlops_modules/                  # Pythonコード内でimportする際に指定するパス
│ ├ stored_procedures.py          # ストアドプロシージャとしてデプロイする関数を定義したファイル
│ └ modeling.py                   # ストアドプロシージャ内で呼び出す自作関数を定義したファイル
└ notebooks/                        # 外部Python環境(Sagemaker Notebook)から実行するNotebookファイルを格納するディレクトリ
 ├ S3アップロード.ipynb            # PythonソースコードをS3(外部ステージ)にアップロードするノートブック
 ├ ストアドプロシージャ作成.ipynb  # ストアドプロシージャをデプロイするノートブック
 └ DAG作成.ipynb                   # DAG Taskをデプロイするノートブック

作成したコードをSnowflakeTaskで利用するには、SnowparkのStored Procedures APIを使って、ステージ経由でSnowflakeにデプロイする必要があります。内部ステージを使うことが多いですが、今回は外部ステージ経由での転送を行っています。以下では、外部ステージへのコード連携とそのデプロイを実例ベースでご紹介します。

ユースケース
コード定義のうち、機械学習モデル開発部分をサンプルとしてご紹介します。 こちらのstored_procedure.pyでは、ストアドプロシージャとしてデプロイする処理をPython関数で定義しており、ポイントは以下の通りです。

  • Python関数で定義した処理をそのままストアドプロシージャとしてデプロイ可能です。
  • ストアドプロシージャとして定義する関数は第一引数にsnowpark.Sessionを指定する必要があります。
  • modeling.pyに定義した関数をimportして使用できます。
  • モデル名、モデリング実行年月をパラメータとすることで、再利用可能にしています。

stored_procedures.py

import snowflake.snowpark as snowpark
from mlops_modules.modeling import (
    prepare_train_test_df,
    train_and_save_model,
)

def execute_modeling(
    session: snowpark.Session,
    model_name: str,
    modeling_yyyymm: str,
) -> str:
    """前処理済みデータセットを用いてモデルのトレーニングを行う。
    
    Args:
        session (snowpark.Session): Pythonストアドプロシージャの第一引数
        model_name (str): モデル名
        modeling_yyyymm (str): モデリング実施年月

    Returns:
        str: 作成したモデルファイルの名前
    """

    modeling_dataset_df = session.table('MY_MODELING_DATASET_TABLE')

    # 訓練データとテストデータに分割する自作関数
    train_df, test_df = prepare_train_test_df(
        session=session,
        df=modeling_dataset_df,
        model_name=model_name,
        modeling_yyyymm=modeling_yyyymm,
    )

    # 学習・モデル保存を行う自作関数
    train_and_save_model(
        session=session,
        train_df=train_df,
        model_name=model_name,
        modeling_yyyymm=modeling_yyyymm,
    )

    return f'{model_name}の学習が完了しました'

外部ステージ連携
上記コードにおいて定義したPython関数execute_modelingの処理内容を外部ステージに連携し、後続のデプロイ時に利用します。 具体的には、S3アップロード.ipynbで、Sagemakerノートブックインスタンスから、外部ステージに対応するS3バケットの所定のパスにboto3を使ってアップロードします。

notebooks/S3アップロード.ipynb

import shutil
import boto3

# src.zipに圧縮
zip_path = shutil.make_archive(
    '../s3_upload/src',
    format='zip',
    root_dir='../src'
)
# 外部ステージに対応するS3バケットにアップロード
s3_client = boto3.client('s3')
s3_client.upload_file(
    Filename=zip_path,
    Bucket='my-s3-bucket',
    Key='src.zip'
)

デプロイ作業
デプロイ時には、snowpark.Session.sproc.register関数を使用します。外部ステージに連携したPython関数(execute_modeling)を引数に指定し、ストアドプロシージャとしてデプロイします。

notebooks/ストアドプロシージャ作成.ipynb

import snowflake.snowpark as snowpark
from mlops_modules.stored_procedures import execute_modeling

# セッション生成(パラメータは省略)
session = snowpark.Session.builder.configs(my_parameters).create()

# ストアドプロシージャをデプロイ
session.sproc.register(
    func=execute_modeling,
    name='EXECUTE_MODELING_SPROC',
    is_permanent=True,
    replace=True,
    stage_location='@MY_EXTERNAL_STAGE',
    imports=['@MY_EXTERNAL_STAGE/src.zip'],
)

なお、imports=[f'@MY_EXTERNAL_STAGE/src.zip']の指定により、外部ステージにアップロードしたモジュールをimportして使用できます。
また、引数でnameで指定したEXECUTE_MODELING_SPROCという名前でストアドプロシージャが作成される形です。

3.2. Snowflake TaskによるDAG実装

現行方式ではAirflowを利用し、DAGの実装を行っていましたが、新方式では、Snowflake Python APIでSnowflake Taskを利用しています。 Snowflake Python APIを使用することで、ほぼAirflowに準ずる形でSnowflake Taskをコードベースで実装、デプロイが可能となります。

今回、DAGを作成するに辺り、計50モデルの学習・スコアリングを実装する必要があります。 モデリング処理を効率的に処理するために、モデル単位でタスクをデプロイすることでモデル単位での並列実行を行っていますが、実行コードをループさせ、デプロイできる部分が強力でした。

実装サンプルはDAG作成.ipynbとなります。各タスクにおいて、3.1.で作成したEXECUTE_MODELING_SPROCを含むストアドプロシージャをそれぞれ実行しています。schedule引数にsnowflake.core.task.Cronオブジェクトを指定することで、スケジュール実行も可能であり、タスクをスケジュールしたくない場合は、schedule=Noneと指定します。

notebooks/DAG作成.ipynb

from snowflake.core import Root
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import (
    DAG,
    DAGTask,
    DAGOperation,
    CreateMode,
)
import snowflake.snowpark as snowpark

# セッション生成(パラメータは省略)
session = snowpark.Session.builder.configs(my_parameters).create()

# 対象とするモデル名一覧
model_names = ['MODEL_A', 'MODEL_B', 'MODEL_C']

# DAGを定義するオブジェクト
# この時点ではSnowflake側へのデプロイはされない
with DAG(
    name='MY_MODELING_DAG',
    schedule=Cron('0 9 10 * *', 'Asia/Tokyo'),  # 毎月10日 9:00に自動実行
    warehouse='MY_WAREHOUSE',
    user_task_timeout_ms=3600 * 1000,  # タイムアウトは1時間に設定
    session_parameters={'query_tag': 'my_query_tag'},
) as dag:

    # モデルごとにタスクを生成
    for model_name in model_names:

        # モデリングに使用するデータセットを作成するTask
        task1 = DAGTask(
            name=f'{model_name}_CREATE_MODELING_DATASET_TASK',
            definition=f'''
                call CREATE_MODELING_DATASET_SPROC('{model_name}', to_char(current_date(), 'YYYYMM'))
            ''',
            warehouse='MY_WAREHOUSE',
            user_task_timeout_ms=3600 * 1000,  # タイムアウトは1時間に設定
            session_parameters={'query_tag': 'my_query_tag'},
        )

        # モデリング実行・モデル保存を行うTask
        task2 = DAGTask(
            name=f'{model_name}_EXECUTE_MODELING_TASK',
            definition=f'''
                call EXECUTE_MODELING_SPROC('{model_name}', to_char(current_date(), 'YYYYMM'))
            ''',
            warehouse='MY_WAREHOUSE',
            user_task_timeout_ms=3600 * 1000,  # タイムアウトは1時間に設定
            session_parameters={'query_tag': 'my_query_tag'},
        )

        # モデルの評価を行うTask
        task3 = DAGTask(
            name=f'{model_name}_EVALUATE_MODEL_TASK',
            definition=f'''
                call EVALUATE_MODEL_SPROC('{model_name}', to_char(current_date(), 'YYYYMM'))
            ''',
            warehouse='MY_WAREHOUSE',
            user_task_timeout_ms=3600 * 1000,  # タイムアウトは1時間に設定
            session_parameters={'query_tag': 'my_query_tag'},
        )

        # タスクの依存関係を作成
        task1 >> task2 >> task3

# snowflake.core APIでオブジェクトを操作するエントリーポイント
root = Root(session)
# スキーマを指定する
schema = root.databases['MY_DATABASE'].schemas['MY_SCHEMA']
# DAGを操作するAPIオブジェクト
op = DAGOperation(schema)

# 定義したDAGをデプロイする処理
op.deploy(dag, mode=CreateMode.or_replace)

3.3. 外部ステージを使ったモデル管理

開発時には、Model Resistoryがプレビュー版であったため、別の方法で学習済みモデルを管理する必要がありました。そこで、今回は、外部ステージを使ってモデル管理をしています。具体的には、joblibを使ってシリアライズしたモデルを、session.file.put_streamを使って保存し、session.file.get_streamを使って読み込み利用しています。外部ステージがS3バケットに対応しているので、バージョニング機能を使って履歴を保持し、モデル管理ができます。

import io
import joblib
from snowflake.ml.modeling.lightgbm.lgbm_classifier import LGBMClassifier
import snowflake.snowpark as snowpark

# セッション生成(パラメータは省略)
session = snowpark.Session.builder.configs(my_parameters).create()

# 保存するモデル(学習処理は省略)
model = LGBMClassifier()

# モデルを保存するパス(外部ステージ上)
model_path = '@MY_EXTERNAL_STAGE/models/my_lgbm_classifier.joblib'

# モデルを保存するサンプルコード
input_stream = io.BytesIO()
joblib.dump(model, input_stream)
session.file.put_stream(
    input_stream=input_stream,
    stage_location=model_path,
    auto_compress=False,
)

# モデルを読み込むサンプルコード
input_stream = io.BytesIO()
file_descriptor = session.file.get_stream(model_path)
loaded_model = joblib.load(file_descriptor)

4. 切り替え効果

新MLOpsパイプラインでの合計の処理時間は約6時間となり、旧パイプラインでの約36時間程度から大幅に削減することができました。 主な要因としては、実行環境がSnowflake内に集約され、データの転送が減ったことや、移管にあたっての冗長な処理の削減、Snowparkライブラリの導入による処理効率化も寄与していると考えられます。 また、サービスリソースをSnowflakeのみに集約できたことで、管理コストも低減させることができました。

5. 今後に向けて

さらなる改善
開発中にもSnowflakeの新機能がリリースされたこともあり、今回ご紹介した機能構成を更に今後は改善していきたいと考えています。その中でポイントとなる内容をご紹介します。

1. 内部ステージの使用
今回は、外部ステージを仕様したため、各DAGTaskの定義でcall文でストアドプロシージャを呼び出すものとなっていますが、内部ステージを使えば、各DAGTaskの定義にsnowflake.core.task.StoredProcedureCallを使うことができ、よりPythonicな記述が可能となります。

2. Model Resistoryの活用
開発開始当初はプレビュー版であったSnowflake Model Registryを使うことで、外部ステージでのモデル管理よりも優れたモデル管理が可能となります。

3. Snowflake Notebook&Git integration
今回の構成では、開発環境はSagemaker Notebookインスタンス、コード管理はCodeCommitを利用しており、開発部分についてはSnowflake外のAWSサービスを利用しています。Snowflake NotebookGitリポジトリ統合を使うことで、開発部分についてもSnowflakeの基盤内に組み込むことができ、よりSnowflakeネイティブな構成を実現することができると考えています。

まとめ

これまでSnowflakeの利用用途としてはDWH製品としてのものが主でしたが、今回の開発体験を通じて、ワークロードを実現する単一のデータプラットフォームとしての力を実感しました。 モダンデータスタックでは多様なSaaS製品を組み合わせて活用基盤を構築する方法が主流ですが、Snowflakeの機能のみで実現するシンプルなパイプラインにも強みがあると考えています。 一方で、各種ライブラリのドキュメントや日本語情報が不十分であったり、実装を進める中で細かな困りごとがいくつか発生したりと、まだ途上にある技術であることも感じています。 今後も、より多様なケースに対応可能なプラットフォームとして進化してくことを期待し、ユーザとしても技術向上・社内展開に努めていきたいと思います。