NTTドコモR&Dの技術ブログです。

Cloud Composer×Cloud Run Jobでnotebook実行を自動化した件

TL;DR

  • データ分析ツールの利用実績を定期的に社内で報告しており、利用データの集計と格納をJupyter Notebook(以下notebook)で実施していた。
  • Google CloudでCloud Composer、Cloud Run Job、papermillを使ってnotebookのスケジュール実行を実現した。
  • スケジュール実行処理を一箇所にまとめるため、Cloud Composerを採用した。
  • Airflow公式のCloudRunJobOperatorが2023年9月にリリースされ、簡単にDAGを組めるようになったので使ってみた。

自己紹介

  • NTTドコモ データプラットフォーム部(以下DP部)黒須です。私は今年の9月までd払いやdカードのデータ分析の仕事をしており、業務ではpythonとSQLしか触ったことがありませんでした。 この先のキャリアを考えて分析以外にクラウドの知識も身に着けたいと思い、10月からGoogle Cloudの仕事に携わっております。
  • NTTドコモでは様々なデータ分析ツールを取り入れることでサービス価値の向上を目指しています。今回自分がGoogle Cloudの仕事に携わるにあたり、データ分析ツールの利用ログ集計をGoogle Cloudを使ってスケジュール化してみたのでご紹介します。

背景

  • データ分析ツールの利用実績を定期的に社内で報告しており、利用データの集計と格納をnotebookで実施していた。
  • notebookのスケジュール実行をVertex AI Workbenchで実施していたが、VPC内で複数のVertex AI CustomJobが同時実行できない制約があった。
  • 上の制約とチーム内でスケジュール実行処理をCloud Composerに集約していることから、Cloud Composerでのスケジュール実行を実施した。

構成図

  • papermillが入ったDockerコンテナ(参考:こちらの記事の3.)でCloud Storage内のnotebookを実行するCloud Run Jobを登録し、Cloud Composerからジョブをスケジュール実行しております。

用語

  • papermill・・・notebookファイルをバッチ実行することができるNetflixで作られたOSS
  • Cloud Run ・・・Google Cloudにおけるサーバーレスコンテナコンピューティングサービス、papermillによるnotebook実行をジョブとして登録するために使用
  • Cloud Composer・・・Airflowで構成されたフルマネージドのワークフローオーケストレーションサービス、Cloud Runのジョブをスケジュール実行するために使用

1. papermill入りDockerコンテナ

  • DP部のデータサイエンティストはnotebookを使用することが多く、papermillを使うとnotebookファイルをそのまま定期的なジョブのタスクファイルとして利用できるため、私のチームではpapermillが入ったDockerコンテナを使用しております。
  • papermillでnotebookを実行する際のpythonの環境については、依存関係の管理がしやすいことからpoetryで環境構築をしております。
  • Dockerファイルの例を以下に示します。エントリーポイントでpoetryの環境構築をし、引数として与えられたnotebookをpapermillで実行しています。
FROM python:3.10-bullseye as python-base

RUN apt-get update \
    && apt-get install --no-install-recommends -y \
        curl \
        build-essential \
        git 

ENV HOME=/home/user

# poetry用にパスを通す
ENV PATH="$HOME/.local/bin:$PATH"

FROM python-base as builder-base

# poetryのインストール 
RUN curl -sSL https://install.python-poetry.org | python    

# poetryで仮想環境を構築し、ipykernelにsklearnというカーネル名で登録
WORKDIR "${HOME}/poetry_envs/sklearn"
COPY pyproject.toml pyproject.toml
RUN poetry install --sync --with XXXXX
RUN poetry run python -m ipykernel install --user --name sklearn

WORKDIR $HOME
ENV INPUT_NOTEBOOK=input.ipynb \
    OUTPUT_NOTEBOOK=out.ipynb \
    POETRY_KERNEL=sklearn \

# 仮想環境を構築し、notebookファイルをpapermillで実行
ENTRYPOINT poetry run -C "${HOME}/poetry_envs/${POETRY_KERNEL}" papermill  $INPUT_NOTEBOOK $OUTPUT_NOTEBOOK -k  $POETRY_KERNEL

2. Cloud Run

  • Cloud Runにはサービスとジョブがありますが、今回はスケジュール実行で使うためジョブを選択しました。
  • Cloud Run Jobは従来のCloud Runと異なり、httpリクエストによらずコンテナさえあれば簡単にジョブ実行することができる便利なサービスです。
  • 本記事ではCloud Storage上のnotebookを1のDockerコンテナで実行するジョブを作成しております。ジョブを作成するコード例を以下に示します。
!pip install google-cloud-run
from google.cloud import run_v2
from pytz import timezone
import pandas as pd
import yaml

project_id = "xxxxxx"  
network_name = "xxxxxx" 
subnet_name = "xxxxxx" 
connector = "xxxxxx" 
service_account = "xxxxxx" 
region = "xxxxxx" 
client = run_v2.JobsClient()
parent = f"projects/{project_id}/locations/{region}"
_default_image_uri = f"{region}-docker.pkg.dev/{project_id}/path/image"
_default_kernel = "sklearn"

# CPU数
cpu = "2"
# メモリ
memory = "8Gi"
# 実行時間
max_run_duration = "3600s"

# job parameters
job_name        = "XXXXX"  #cloudrunjob名
input_notebook  = "gs://bucket/path/to/notebook.ipynb"  #GCS内の実行するnotebook
output_notebook = "gs://bucket/path/to/notebook_out.ipynb"  #notebookの実行結果
image_uri       = _default_image_uri
kernel          =_default_kernel

job = run_v2.Job()

job.template.template.max_retries = 1

container = run_v2.Container()
container.image = image_uri

# papermillコンテナを実行するための引数
container.env = [
    {"name": "INPUT_NOTEBOOK","value": input_notebook},
    {"name": "OUTPUT_NOTEBOOK","value": output_notebook},
    {"name": "POETRY_KERNEL","value": kernel},
]

# cloudrunを実行する際のパラメータ
resources = run_v2.ResourceRequirements()
resources.limits = {
    "memory": memory,
    "cpu": cpu
    }
container.resources = resources
job.template.template.containers = [container]
job.template.template.timeout = max_run_duration
job.template.template.service_account = service_account
vpc_access = run_v2.VpcAccess()
vpc_access.connector = f"projects/{project_id}/locations/{region}/connectors/{connector}"
egress = run_v2.VpcAccess.VpcEgress(1)
vpc_access.egress = egress
job.template.template.vpc_access = vpc_access

# ジョブ作成のリクエストを作って投げる
request = run_v2.CreateJobRequest(
    parent=parent,
    job=job,
    job_id=job_name,
)
operation = client.create_job(request=request)
response = operation.result()

3. Cloud Composer

  • Cloud Composerの利用者はDAGとよばれる.pyファイルをCloud Storageの指定のバケット下に置くことでワークフローを追加することができます。
  • こちらのDAGで2のジョブをスケジュール実行します。今回は10月10日から毎日9時に実行するDAGファイルを書いております。
import json
import requests
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator
from datetime import datetime
from google.cloud import secretmanager

# params
DAG_NAME = "XXXXX"

with DAG(
    'DAG_NAME',
    description='',
    schedule_interval='0 0 * * *',  #毎日9時
    start_date=datetime(2023, 10, 10),
    catchup=False
) as dag:
    pochi_admin_usagelog = CloudRunExecuteJobOperator(
    task_id='task id',
    project_id='project_id',
    region='region',
    job_name='cloudrun_job_name'  ##cloudrunで実行するjob名,
    dag=dag
)

あとがき

  • 今まで分析しか触れていなかったため、クラウドを使った処理の自動化はピタゴラスイッチ感がすごく楽しかったです。
  • 今回ネットワークやCloud Composerの設定は他のメンバーがすでに実施していたので、それらも含めて自分でできるようになりたいです。
  • NTTドコモでは専門性を活かして業務をしつつ、未経験の分野を伸ばせる機会があり、望んだキャリア形成がしやすいと思っております。 興味がある方は採用情報をご確認ください。
  • 今回クラウド未経験の自分を受け入れてくれたチームの皆様にはとても感謝しております。