1. 背景
NTTドコモ データプラットフォーム部の外山です。NTTドコモでは、ビジネスユーザーに使いやすいデータ活用環境を実現するために、Streamlitアプリを介したデータ活用を推進しています。
お客様の声を踏まえ、改善する事業運営を目指す中で、一つ一つのお客様の声のデータに、デモグラ情報やポジティブ評価、ネガティブ評価を行い、課題解決に向けたStreamlitアプリを作成していますが、アプリケーション上で処理させる上で、内容が多く、寄せられる声をできるだけ素早く利用することが難しい状況でした。
私たちのチームでは、レスポンス向上や開発者の負荷軽減等を目的として、各アプリ専用のデータパイプラインの開発・運用を行ってきました。
これまでも、独自のデータ品質テストやデータ鮮度チェックを作成・運用してきましたが、実装するための工数や、修正の手間が大きく、 高鮮度データの場合、品質テストすり抜けてしまったときのリカバリーにも時間を要すため、今回dbtのテスト機能を用いて、網羅的なテストパターン設計、設定することを行いました。
本記事では、データエンジニアを悩ませつづける運用の手助けとなるdbtのテスト機能を実例ベースにご紹介したいと思います。 なお、本記事の掲載の取組については、株式会社シイエヌエスの⽀援メンバーである⼩倉さんに詳細検討・実装を進めてもらっており、以下は、⼩倉さんに執筆いただいています。
2. 構築したデータパイプライン
本アプリにおけるデータパイプラインは以下のように構築しました。このパイプラインでは、上流システムからDWH(Snowflake)に取り込まれるデータをアプリ向けに加工し、提供しています。
3. テスト実装
チームで「可能な限り網羅的なテストを実施する」という方針を確認し、以下の手順でテストを実装しています。
- 本パイプラインで実装するモデル、カラムを表に一覧化する
- キー項目のテストを記載する
- アプリ側の要件から、Mart層に必要なテストを記載する
- 上流データの異常パターンを想定し、Source、Staging層に必要なテストを記載する
- エラー発生時の対応シナリオを考え、テストの過不足を見つける
上記のもと、主キー確認などに利用する基本的なテストから、dbt_utils、dbt_expectationsを用いた本件ならではのテスト実装イメージをご紹介いたします。
基本的なテスト
Staging層、Mart層のモデルに対しては、基本的に全モデルにキー項目で一意に特定できることを確認するテストを実装しました。
単一キーの場合はそのカラムにnot_null
とunique
テストを追加しました。
models: - name: テーブル名 columns: - name: カラム名 tests: - unique - not_null
複合キーの場合はdbt_utils
のunique_combination_of_columns
テストを実装しました。
また、複合キーの各列に対するnot_null
テストも追加しました。
models: - name: テーブル名 tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: - カラムA - カラムB columns: - name: カラムA tests: - not_null - name: カラムB tests: - not_null
また、入ってくるカテゴリが決まっているカテゴリ列に対してはaccepted_values
テストを実装しました。
models: - name: テーブル名 columns: - name: カラム名 tests: - accepted_values: values: - 'a' - 'b' - 'c' - 'd' - 'e' - null
source freshness
パイプラインに取り込むお客様の声データがロードされていることを確かめるため、各sourceに対してfreshnessを使い、鮮度の基準を定義しました。 warning水準とerror水準を設定することで、運用時のSLOに適した内容に設定を行っています。
sources: - name: ソース名 database: データベース名 schema: スキーマ名 tables: - name: テーブル名 freshness: warn_after: {count: 24, period: hour} error_after: {count: 30, period: day} loaded_at_field: データ作成日
dbt_expectations
を使ったテスト
dbt組み込みのテストやdbt_utils
だけでは実装できないテストを実装するため、dbt_expectations
というパッケージを使用してテストを実装しました。
カラムレイアウトに変更がないことのテスト
上流データのテーブルに意図しない変更(カラム名変更、カラム削除等)を検知するため、sourceに対してexpect_table_columns_to_contain_set
を使ったテストを実装しました。
sources: - name: ソース名 database: データベース名 schema: スキーマ名 tables: - name: テーブル名 tests: # カラムレイアウトチェック - dbt_expectations.expect_table_columns_to_contain_set: column_list: - カラムA - カラムB - カラムC …
取込み対象が1行以上存在することのテスト
上流システムからDWHに処理対象レコードが取り込まれていないことを早期に検出する必要がありました。そこで、Staging層のモデルに対してexpect_table_row_count_to_be_between
によるテストを実装し、処理対象レコード数のテストを実装しました。
models: - name: テーブル名 tests: - dbt_expectations.expect_table_row_count_to_be_between: min_value: 1
数値データの異常を検出するテスト
感情分析APIから出力されるポジティブ・ネガティブスコア、マグニチュードの値が定義の範囲内であることをチェックするため、expect_column_values_to_be_between
を使ったテストを実装しました。
sources: - name: ソース名 database: データベース名 schema: スキーマ名 tables: - name: テーブル名 columns: - name: ポジネガスコア tests: - dbt_expectations.expect_column_values_to_be_between: min_value: -1 max_value: 1 - name: マグニチュードスコア tests: - dbt_expectations.expect_column_values_to_be_between: min_value: 0
テキストデータの異常を検出するテスト
お客様の声のテキストデータの品質担保のためのテストも実装しました。
expect_column_value_lengths_to_be_between
を使った空文字列の検出、expect_column_values_to_not_match_regex
を使ったスペースのみの文字列の検出を実装しました。
models: - name: テーブル名 columns: - name: テキストカラム名 tests: - not_null - dbt_expectations.expect_column_value_lengths_to_be_between: min_value: 1 # 文字列長が0(=空文字列)の場合はエラー - dbt_expectations.expect_column_value_lengths_to_be_between: min_value: 10 # 文字列長が10未満の場合は警告 severity: warn - dbt_expectations.expect_column_values_to_not_match_regex: regex: "^[\\\\s ]+$" # 半角スペースまたは全角スペースのみの文字列はエラーとする
4. 工夫したポイント・ハマったポイント
外部APIからの取得値に対するテスト
今回、感情分析のためにGoogle CloudのNatural Language APIを使用しています。
APIを叩く部分について、今回はdbt外のJupyterNotebookを実行し、その中で呼び出す方式を取りました。
そのため、そのままではPythonコードの中で手動でテストの仕組みを実装する必要がありました。
そこで、APIからの取得値を格納したテーブルをdbtのMart層にsourceとして定義し、dbt testの枠組みでテストできるようにしました。
StagingモデルのMaterialization
当初、JupyterNotebookで読み込むStagingモデルのMaterializationにはview
を指定していました。
一日の処理上限を意識し、レコード数を絞る必要があった中で、VIEWではアクセスするたびに抽出結果が変わり、入力レコードと出力レコードが不一致となる場合がありました。
そこで、StagingモデルのMaterializationではtable
を設定することにしました。
正規表現のコンパイル
正規表現による半角・全角スペースのみのテキストの検出を実装しましたが、
実行されるSQLへのコンパイル結果が想定と異なるものになりました。
dbt-snowflakeにおいては、expect_column_values_to_not_match_regex
を使った
正規表現のテストはregexp_instr
関数を使ったSQLにコンパイルされます。
当初の実装
- Yamlファイルでの定義:
^[\\s ]+$
- SQLへのコンパイル結果:
^[\s ]+$
当初の実装では、Snowflakeにおける''
で囲まれた文字列の仕様上、半角スペースやタブにはマッチせず、
文字列s
にはマッチする実装となっていたため、下記の通り修正を行い、半角スペース・全角スペースのみの文字列を検出できるようにしました。
修正後の実装
- Yamlファイルでの定義:
^[\\\\s ]+$
- SQLへのコンパイル結果:
^[\\s ]+$
テスト基準の厳格さについて
運用開始後、テストの設計基準が厳しすぎたことによるエラーがいくつか発生しました。
- 古い日付のデータが含まれていることを検出するテストについて、想定よりも古いデータも取り込まれる場合があった
- 短い文字列を検出するテストについて、テキスト処理の結果短い文字列が含まれる場合もある仕様だった
事前にテストデータでの確認は行っていたものの、テキスト長さやデータ日付の基準については、データ提供元と明確に認識合わせをすべきと改めて実感しました。
5. 今後に向けて
さらなる改善
microbatch戦略の使用
本アプリ向けのMartモデルでは、dbtのIncrementalモデルを使って過去データを蓄積する方式としています。Incremental戦略にはdelete+insert
を採用しています。
日次の単位で処理を行っているため、dbt coreバージョン1.9で追加されたmicrobatch
戦略を採用することで、処理を高速化したり、過去データ・遅延データの再作成を容易にできるのではないかと考えています。
dbt Python modelの使用
今回、外部APIを叩く箇所は別のPython実行ツールに出して、Sourceを経由することでdbtのパイプラインに取り込んでいます。
この部分にdbt Python modelsを活用することで、
dbtの枠組みの中で必要な処理を実行しつつ、dbt testによる処理結果のチェックも行うことができるのではと考えています。
まとめ
dbt組み込みのテストに加えて、dbt_utilsやdbt_expectationsの実装を導入することで、データの特性に応じた変更を効果的に捉えられるようになりました。その結果、次のような効果を実感しています。
- 通常は1か月かけて開発していたケースが多いのですが、独自にカスタムしたテストの実装がほぼ不要なため、実装にかかる工数を低く抑えることができ、2週間程度で初期リリースを行うことができました。
- データの鮮度に比例して運用面での不安が増すこともありますが、網羅的なテストを組み込んでいるため、パイプラインの終了通知を確認するだけで運用ができ、安心感を持って運用を行えています。
データパイプラインはユーザーから直接見えにくく、日の目を浴びにくいものですが、データ品質が高いことはアプリの信頼性に直結し、また、非同期処理によりユーザー体験そのものを改善する可能性を秘めています。 NTTドコモでは、多くのStreamlitアプリケーションがデータ活用を支えています。今回得られた知見を水平展開することで、お客様起点の事業運営にさらに貢献できればと思います。