NTTドコモR&Dの技術ブログです。

Iceberg × dbt × BigQuery × Snowflakeでつくる、マルチDWHパイプライン

1. はじめに

NTTドコモ データプラットフォーム部の田中です。

近年、データ基盤のオープン化と相互運用性を背景に、OTF(Open Table Format)が話題になっています。 OTFは、クラウドストレージにあるデータレイク上のファイル群をDWHのテーブルのように安全かつ一貫性を保って扱うことができるオープン仕様であり、複数エンジンからの共通利用が可能になります。 その主要なフォーマットの一つであるApache Icebergは、主要クラウドDWH/エンジンでのネイティブ対応が進んでおり、ドコモ内でも活用検討がなされているところです。

本記事では、ドコモのデータパイプラインで、SnowflakeとBigQueryをまたぐ横断的な構成を実現した話をします。
今回はApache Iceberg Tableを用いて、Snowflakeにある上流データをBigQueryで加工しつつ、分析結果をSnowflakeに連携するパイプラインを、dbtを用いて実装しました。SnowflakeとBigQuery間のIceberg連携・Icebergのdbt実装における多くの知見が得られたので、実装内容とともに紹介できればと思います。

なお、本記事における取り組みは株式会社シイエヌエスの小倉さんと協働で行っており、以下は小倉さんに執筆いただいています。

2. 背景と目的

2.1 課題意識

ドコモでは、様々なお客様の声のテキストデータから課題を発見し、サービス改善を支援する分析ツールを開発・運用しており、以下のような機能を有しています。

  • ポジティブ/ネガティブ判定
  • お客様の声のクラスタリングや属性分類
  • 結果のダッシュボード表示と可視化

本ツールで扱うデータは、ドコモ全社のデータ利活用基盤であるSnowflakeに格納されており、dbtを使ったパイプラインで加工しています。

今回、BigQueryを用いた新たな分析基盤を構築し、ハイブリッド検索やBQML等、Google Cloudの機能を活用したより高度な課題分析やモニタリングを実現したいというニーズがあり、Snowflakeとの相互のデータ連携を実現する必要がありました。
またお客様の声分析の重要性が高まっているため、今後他の基盤からもデータを参照しやすい形を目指します。

2.2 実現したいこと

このような背景のもと、以下の要件を満たすデータパイプラインを構築しました。

  • SnowflakeとBigQueryをまたいだ処理が可能であること
  • dbtで処理を実装し、開発・運用の効率性を維持すること
  • データストレージとDWHを疎結合にし、複数エンジンから同じデータを参照できること

3. Icebergを用いたパイプラインの構成

今回構築したデータパイプラインは下記の図の通りです。

大まかに2つのパイプラインがあり、そのうち下記のIcebergに関わる実装を今回解説します。

①Snowflakeから上流データをIceberg連携するパイプライン

  • Snowflake → Iceberg出力: dbt-snowflakeのIceberg Tableフォーマットを使い、SnowflakeのApache Icebergテーブルを作成します。
  • Iceberg → BigQuery読込み: Snowflakeから最新のメタデータファイルパスを取得し、取得したパスを用いてBigQueryにApache Iceberg external tablesを作成します。

②BigQueryで分析処理を施したデータをIceberg連携するパイプライン

  • BigQuery → Iceberg出力: dbtのpre-hookpost-hookを使って、BigLake tables for Apache Icebergを作成します。
  • Iceberg → Snowflake読込み: version-hint.textを読み込んで最新のメタデータファイルパスを取得し、取得したパスを用いてSnowflakeにIceberg Tableを作成します。

4. ①SnowflakeからのIceberg連携パイプライン

ここでは、Snowflakeにある上流データを、Snowflake→Icebergとして出力し、そのデータをBigQueryから参照するパイプラインについて、実際のコード例を交えて解説します。

4.1 Snowflake → Iceberg出力

このステップでは、dbt-snowflakeのIceberg Table機能を活用して、Snowflake上のデータをApache Iceberg形式でオブジェクトストレージに出力します。これにより、Snowflakeに依存しない形でデータを保存し、他のDWHからもアクセス可能な状態を実現します。

事前準備

まず、Snowflake側でEXTERNAL VOLUMEを作成し、dbt_project.ymlでIceberg機能を有効化します。

CREATE OR REPLACE EXTERNAL VOLUME gcs_iceberg_volume
STORAGE_LOCATIONS = (
    (
        NAME = 'asia-northeast1'  -- STORAGE_LOCATIONの任意の名称
        STORAGE_PROVIDER = 'GCS'
        STORAGE_BASE_URL = 'gcs://my-bucket/iceberg-from-snowflake/'
            -- ここで指定したパス及び配下のフォルダに作成可能
    )
);
# dbt_project.yml
flags:
  enable_iceberg_materializations: True

vars:
  # SnowflakeのEXTERNAL VOLUMEオブジェクト名
  external_volume_name: 'gcs_iceberg_volume'

実装方法

dbtモデルファイルでは、configブロックでIceberg Tableの設定を行います。

-- snowflake_project/models/marts/customer_voice.sql
{{
    config(
        materialized='table',
        table_format='iceberg',
        external_volume=var('external_volume_name'),
    )
}}

SELECT
    voice_id,
    voice_text,
    reception_datetime::TIMESTAMP_NTZ(6) AS reception_datetime,
     -- Iceberg Tableで利用できるTimestamp型に変換
FROM
    {{ ref('stg_customer_voice') }}

技術的ポイント

  1. データ型の考慮: Iceberg Tableでの対応精度に合わせて、Timestamp型はTIMESTAMP_NTZ(6)に明示的に変換しています。
  2. 変数の活用: external_volume_nameを変数化することで、開発・商用での切替えや、複数テーブルでの利用が容易になります。

4.2 Iceberg → BigQuery読込み

4.1でSnowflakeから出力したIcebergテーブルを、BigQueryから読み込むためBigLake外部テーブルを作成します。

Snowflakeネイティブカタログ管理のIcebergのデータをBigQueryから参照するには、最新のメタデータファイルパスの情報をBigQueryに渡す必要があります。

実装方法

最新のメタデータファイルパスはSnowflakeのSYSTEM$GET_ICEBERG_TABLE_INFORMATION関数で取得できます。 取得したパスを使ってBigQuery側に外部テーブルを作成するため、一連の処理をPythonスクリプトに実装しました。

import snowflake.connector
from google.cloud import bigquery

def get_iceberg_metadata_path(conn, table_name: str) -> str:
    """最新のメタデータファイルパスを取得"""

    cursor = conn.cursor()
    query = f"""
        SELECT
            JSON_EXTRACT_PATH_TEXT(
                SYSTEM$GET_ICEBERG_TABLE_INFORMATION('{table_name}'),
                'metadataLocation'
            ) AS metadata_file_path
    """
    
    cursor.execute(query)
    result = cursor.fetchone()
    cursor.close()
    
    return result[0]

def create_biglake_iceberg_table(bq_client, table_id: str, metadata_file_path: str):
    """BigLake外部テーブルを作成"""

    # メタデータファイルパスの形式を調整
    if metadata_file_path.startswith("gcs://"):
        metadata_file_path = metadata_file_path.replace("gcs://", "gs://")
    
    query = f"""
        CREATE OR REPLACE EXTERNAL TABLE
            `poc_dataset.{table_id}`
        OPTIONS(
            format = 'ICEBERG',
            uris = ['{metadata_file_path}']
        )
    """
    
    job = bq_client.query(query)
    job.result()

snowflake_connection = snowflake.connector(<接続パラメータ>)
bigquery_client = bigquery.Client()

# メタデータファイルパスを取得
metadata_path = get_iceberg_metadata_path(snowflake_connection, 'customer_voice')

# BigQuery外部テーブルを作成
create_biglake_iceberg_table(bbq_clientq_client, 'customer_voice_from_snowflake', metadata_path)

技術的ポイント

  1. メタデータファイルパスの形式: SnowflakeからはGCSパスがgcs://形式で返されますが、BigQueryではgs://形式が必要なため、変換処理を行います。
  2. CREATE OR REPLACE: 既存のテーブルがある場合は置き換えることで、メタデータの更新に対応します。

ここまでの手順で、SnowflakeのIcebergテーブルがBigQueryからアクセス可能となり、BigQueryの機能を活用したデータ処理が実現できます。

5. ②BigQueryからのIceberg連携パイプライン

続いて、BigQueryにて分析処理したデータを、BigQuery→Icebergとして出力し、そのデータをSnowflakeから参照するパイプラインについて解説します。

5.1 BigQuery → Iceberg出力

BigQueryで加工したデータをApache Iceberg形式で出力します。実装時点では、dbt-bigqueryがIcebergフォーマットに未対応だったため、dbtのpost-hook機能を活用した、力技での実装となりました。

実装方法

実装時、dbtの標準的なconfig指定ではIceberg Tableの作成ができないが、作成済みのIcebergに対するデータ挿入はできるという状況でした。

よって、下記の段階に分けた実装を行いました。 - 初回実行時などIceberg Tableが存在しないときは、dbtのpost-hook機能を使ってBigLake tables for Apache IcebergのDDLを直接実行し、空のIceberg Tableを作る - dbtのIncrementalモデルを用いて、すでに存在するIceberg TableにMERGE処理をする

Step1: intermediateモデルのpost-hookで空のIceberg Tableを作成

-- bigquery_project/models/intermediate/int_customer_voice_for_insert.sql
{{
    config(
        materialized="table",
        post_hook="""
            CREATE TABLE IF NOT EXISTS
                `{{ target.database }}.{{ target.schema }}.customer_voice_analyze_result`
            CLUSTER BY reception_datetime
            WITH CONNECTION
                `projects/{{ target.database }}/locations/asia-northeast1/connections/{{ var('connection_name') }}`
            OPTIONS (
                file_format = 'PARQUET',
                table_format = 'ICEBERG',
                storage_uri = 'gs://{{ var('bucket_name') }}/iceberg_from_bigquery/customer_voice_analyze_result/'
            )
            AS
                SELECT *
                FROM {{ this }}
                LIMIT 0
        """
    )
}}

Step2: martsモデルでIceberg Tableへのデータ挿入

-- bigquery_project/models/marts/customer_voice_analyze_result.sql
{{
    config(
        materialized="incremental",
        incremental_strategy="merge",
        unique_key=["voice_id"],
        post_hook="EXPORT TABLE METADATA FROM {{ this }}"
    )
}}

SELECT *
FROM {{ ref('voice_sentiment_for_insert') }}

技術的ポイント

  1. post-hookでのDDL実行: 実装時点ではdbtの標準機能では対応できなかったため、post-hookで直接BigQueryのDDLを実行しました。
  2. WITH CONNECTION: BigLake tables for Apache Icebergでは、事前に作成したConnectionオブジェクトを指定する必要があります。
  3. LIMIT 0でのスキーマ作成: 初回実行時はLIMIT 0で空のテーブルを作成し、後続のIncremental処理でデータを投入します。
  4. 変数の活用: connection_namebucket_nameを変数化することで、環境間での設定切り替えを容易にします。

ここまでの実装により、dbt-bigqueryの制約を回避しつつ、BigQueryからIceberg形式でのデータ出力を実現できました。

5.2 Iceberg → Snowflake読込み

このステップでは、BigQueryで加工・出力されたIcebergテーブルを、Snowflakeから読み込み可能にします。BigQueryで作成されたIcebergメタデータを参照して、Snowflake側でIcebergテーブルを作成し、分析結果を全社基盤で利用可能にします。

実装方法

BigQueryからSnowflakeへのIcebergテーブル読み込みでは、GCS上のversion-hint.textファイルを活用して最新のメタデータファイルパスを特定し、Snowflake側でIcebergテーブルを作成します。

from google.cloud import storage
import snowflake.connector

def read_version_hint(gs_client, table_name: str) -> str:
    bucket = gs_client.bucket('my-bucket')
    blob = bucket.blob(f"iceberg_from_bigquery/{table_name}/metadata/version-hint.text")
    
    # ファイルの中身を文字列として取得
    version = blob.download_as_text().strip()
    return version

def build_metadata_file_path(gs_client, table_name: str) -> str:
    # 最新バージョンを取得
    version = read_version_hint(gs_client, table_name)
    
    # メタデータファイルパスを構築
    metadata_path = f"{table_name}/metadata/v{version}.metadata.json"
    
    return metadata_path

def create_snowflake_iceberg_table(conn, table_name: str, metadata_file_path: str):
    cursor = conn.cursor()
    query = f"""
        CREATE OR REPLACE ICEBERG TABLE {table_name}
        EXTERNAL_VOLUME = 'gcs_bigquery_volume'
        CATALOG = 'bigquery_iceberg_catalog'
        METADATA_FILE_PATH = '{metadata_file_path}'
    """
    
    cursor.execute(query)
    cursor.close()


# Cloud Storageに接続
gcs_client = storage.Client()

# メタデータファイルパスを構築
metadata_file_path = build_metadata_file_path(gcs_client, 'customer_voice_analyze_result')

# Snowflakeに接続
snowflake_connection = snowflake.connector(<接続パラメータ>)

# Iceberg Tableを作成
create_snowflake_iceberg_table(snowflake_connection, 'analyze_result_from_bigquery', metadata_file_path)

技術的ポイント

  1. version-hint.textの活用: BigLake tables for Apache Icebergでは、version-hint.textファイルに最新のメタデータバージョンが記録されており、これを読み込むことで最新状態を取得できます。
  2. メタデータファイルパスの構築: バージョン情報を使ってv{version}.metadata.json形式でメタデータファイルパスを構築します。
  3. EXTERNAL VOLUMEとCATALOG: Snowflake側では、事前に設定したEXTERNAL VOLUMEとCATALOGを指定してIcebergテーブルを作成します。
  4. CREATE OR REPLACE: 既存のテーブルがある場合は置き換えることで、メタデータの更新に対応します。

この実装により、BigQueryで加工された分析結果がSnowflakeからもアクセス可能となり、全社データ基盤での活用が実現できます。

Apache Icebergを介することで、両DWHの機能を最大限活用しながら、ベンダーロックインを回避した柔軟なデータ処理基盤が完成します。

6. 今後に向けての課題

dbt-bigquery の Iceberg サポート標準化

dbt-bigqueryのバージョン1.10より、BigLake Metastoreを使ったIceberg Table作成が標準でサポートされるようになりました。今回のような力技を使わない実装が可能となります。

GCS 上のオブジェクトライフサイクル設定

各テーブルごとにフォルダを分けているとはいえ、Iceberg のスナップショットや古いメタデータの肥大化を防ぐため、GCS のライフサイクルルール(例:30 日以上経過した metadata ファイルをアーカイブ/削除)を組み込む設計検討が必要です。

REST API カタログ活用

現状はSnowflake、BigQueryそれぞれの手段で最新のメタデータファイルパスを取得しています。

将来的には Apache Iceberg の REST カタログや BigLake の Metastore カタログなど、共通的かつ分散型にも対応できるメタデータ基盤に移行することで、複数エンジン間の同期性や管理を向上できます。

7. おわりに

本記事では、Snowflake × BigQuery × dbt × Iceberg を組み合わせ、複数のDWHから相互にデータを連携するデータパイプラインについてご紹介しました。

この仕組みによって、

  • Snowflakeを全社基盤として維持しつつ、
  • BigQueryのML機能やハイブリッド検索を組み込み、
  • さらに両者間で結果をシームレスに共有する

という柔軟なデータ活用が可能になり、また今後の発展性も高まります。

今後も、実運用に向けたストレージ管理やメタデータカタログ整備を進めつつ、マルチクラウド時代に適したアーキテクチャを検証していきたいと考えています。