TL;DR
- データ分析ツールの利用実績を定期的に社内で報告しており、利用データの集計と格納をJupyter Notebook(以下notebook)で実施していた。
- Google CloudでCloud Composer、Cloud Run Job、papermillを使ってnotebookのスケジュール実行を実現した。
- スケジュール実行処理を一箇所にまとめるため、Cloud Composerを採用した。
- Airflow公式のCloudRunJobOperatorが2023年9月にリリースされ、簡単にDAGを組めるようになったので使ってみた。
自己紹介
- NTTドコモ データプラットフォーム部(以下DP部)黒須です。私は今年の9月までd払いやdカードのデータ分析の仕事をしており、業務ではpythonとSQLしか触ったことがありませんでした。
この先のキャリアを考えて分析以外にクラウドの知識も身に着けたいと思い、10月からGoogle Cloudの仕事に携わっております。
- NTTドコモでは様々なデータ分析ツールを取り入れることでサービス価値の向上を目指しています。今回自分がGoogle Cloudの仕事に携わるにあたり、データ分析ツールの利用ログ集計をGoogle Cloudを使ってスケジュール化してみたのでご紹介します。
背景
- データ分析ツールの利用実績を定期的に社内で報告しており、利用データの集計と格納をnotebookで実施していた。
- notebookのスケジュール実行をVertex AI Workbenchで実施していたが、VPC内で複数のVertex AI CustomJobが同時実行できない制約があった。
- 上の制約とチーム内でスケジュール実行処理をCloud Composerに集約していることから、Cloud Composerでのスケジュール実行を実施した。
構成図
- papermillが入ったDockerコンテナ(参考:こちらの記事の3.)でCloud Storage内のnotebookを実行するCloud Run Jobを登録し、Cloud Composerからジョブをスケジュール実行しております。
用語
- papermill・・・notebookファイルをバッチ実行することができるNetflixで作られたOSS
- Cloud Run ・・・Google Cloudにおけるサーバーレスコンテナコンピューティングサービス、papermillによるnotebook実行をジョブとして登録するために使用
- Cloud Composer・・・Airflowで構成されたフルマネージドのワークフローオーケストレーションサービス、Cloud Runのジョブをスケジュール実行するために使用
1. papermill入りDockerコンテナ
- DP部のデータサイエンティストはnotebookを使用することが多く、papermillを使うとnotebookファイルをそのまま定期的なジョブのタスクファイルとして利用できるため、私のチームではpapermillが入ったDockerコンテナを使用しております。
- papermillでnotebookを実行する際のpythonの環境については、依存関係の管理がしやすいことからpoetryで環境構築をしております。
- Dockerファイルの例を以下に示します。エントリーポイントでpoetryの環境構築をし、引数として与えられたnotebookをpapermillで実行しています。
FROM python:3.10-bullseye as python-base
RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
build-essential \
git
ENV HOME=/home/user
ENV PATH="$HOME/.local/bin:$PATH"
FROM python-base as builder-base
RUN curl -sSL https://install.python-poetry.org | python
WORKDIR "${HOME}/poetry_envs/sklearn"
COPY pyproject.toml pyproject.toml
RUN poetry install --sync --with XXXXX
RUN poetry run python -m ipykernel install --user --name sklearn
WORKDIR $HOME
ENV INPUT_NOTEBOOK=input.ipynb \
OUTPUT_NOTEBOOK=out.ipynb \
POETRY_KERNEL=sklearn \
ENTRYPOINT poetry run -C "${HOME}/poetry_envs/${POETRY_KERNEL}" papermill $INPUT_NOTEBOOK $OUTPUT_NOTEBOOK -k $POETRY_KERNEL
2. Cloud Run
- Cloud Runにはサービスとジョブがありますが、今回はスケジュール実行で使うためジョブを選択しました。
- Cloud Run Jobは従来のCloud Runと異なり、httpリクエストによらずコンテナさえあれば簡単にジョブ実行することができる便利なサービスです。
- 本記事ではCloud Storage上のnotebookを1のDockerコンテナで実行するジョブを作成しております。ジョブを作成するコード例を以下に示します。
!pip install google-cloud-run
from google.cloud import run_v2
from pytz import timezone
import pandas as pd
import yaml
project_id = "xxxxxx"
network_name = "xxxxxx"
subnet_name = "xxxxxx"
connector = "xxxxxx"
service_account = "xxxxxx"
region = "xxxxxx"
client = run_v2.JobsClient()
parent = f"projects/{project_id}/locations/{region}"
_default_image_uri = f"{region}-docker.pkg.dev/{project_id}/path/image"
_default_kernel = "sklearn"
cpu = "2"
memory = "8Gi"
max_run_duration = "3600s"
job_name = "XXXXX"
input_notebook = "gs://bucket/path/to/notebook.ipynb"
output_notebook = "gs://bucket/path/to/notebook_out.ipynb"
image_uri = _default_image_uri
kernel =_default_kernel
job = run_v2.Job()
job.template.template.max_retries = 1
container = run_v2.Container()
container.image = image_uri
container.env = [
{"name": "INPUT_NOTEBOOK","value": input_notebook},
{"name": "OUTPUT_NOTEBOOK","value": output_notebook},
{"name": "POETRY_KERNEL","value": kernel},
]
resources = run_v2.ResourceRequirements()
resources.limits = {
"memory": memory,
"cpu": cpu
}
container.resources = resources
job.template.template.containers = [container]
job.template.template.timeout = max_run_duration
job.template.template.service_account = service_account
vpc_access = run_v2.VpcAccess()
vpc_access.connector = f"projects/{project_id}/locations/{region}/connectors/{connector}"
egress = run_v2.VpcAccess.VpcEgress(1)
vpc_access.egress = egress
job.template.template.vpc_access = vpc_access
request = run_v2.CreateJobRequest(
parent=parent,
job=job,
job_id=job_name,
)
operation = client.create_job(request=request)
response = operation.result()
3. Cloud Composer
- Cloud Composerの利用者はDAGとよばれる.pyファイルをCloud Storageの指定のバケット下に置くことでワークフローを追加することができます。
- こちらのDAGで2のジョブをスケジュール実行します。今回は10月10日から毎日9時に実行するDAGファイルを書いております。
import json
import requests
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator
from datetime import datetime
from google.cloud import secretmanager
DAG_NAME = "XXXXX"
with DAG(
'DAG_NAME',
description='',
schedule_interval='0 0 * * *',
start_date=datetime(2023, 10, 10),
catchup=False
) as dag:
pochi_admin_usagelog = CloudRunExecuteJobOperator(
task_id='task id',
project_id='project_id',
region='region',
job_name='cloudrun_job_name'
dag=dag
)
あとがき
- 今まで分析しか触れていなかったため、クラウドを使った処理の自動化はピタゴラスイッチ感がすごく楽しかったです。
- 今回ネットワークやCloud Composerの設定は他のメンバーがすでに実施していたので、それらも含めて自分でできるようになりたいです。
- NTTドコモでは専門性を活かして業務をしつつ、未経験の分野を伸ばせる機会があり、望んだキャリア形成がしやすいと思っております。
興味がある方は採用情報をご確認ください。
- 今回クラウド未経験の自分を受け入れてくれたチームの皆様にはとても感謝しております。