自己紹介
この記事はドコモR&D Advent Calendar 2025の16日目の記事です。
初めまして!NTTドコモ サービスイノベーション部 ビッグデータ基盤担当の竹鼻です。入社3年目になります。社内のビッグデータ基盤の保守運用や機能の開発に携わる担当をしています。本記事ではデータベースのSaaSであるSnowflakeのタスクとストアドプロシージャという機能についてお話ししようと思います。
1.はじめに
Snowflakeでは、ストアドプロシージャを使ってSQLの実行等のデータ処理をし、タスクを使ってそのプロシージャの処理を定期実行することが可能です。 本記事ではタスクとストアドプロシージャの簡単な機能の紹介と、その応用方法や躓いたポイントを紹介します。
2.ストアドプロシージャについて
ストアドプロシージャは、Snowflake環境において一連のデータ操作ロジックを手続き的に記述し、再利用可能なオブジェクトとして格納するための機能です。 一度作成することにより、そのストアドプロシージャを呼び出すだけで、記載した処理の実行が可能です。
まずはストアドプロシージャを作成してみましょう! どのような処理でも問題ないのですが、今回は例としてテーブルの作成とデータのINSERT処理をしてみようと思います。
CREATE OR REPLACE PROCEDURE snowflake_test.my_test.my_test_procedure(test_data VARCHAR)
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
-- 1. テーブルの作成 (もし存在しなければ)
CREATE OR REPLACE TABLE snowflake_test.my_test.test_table (
name VARCHAR,
created_at TIMESTAMP_NTZ
);
-- 2. サンプルデータの挿入
INSERT INTO snowflake_test.my_test.test_table(name, created_at) VALUES (:test_data, CURRENT_TIMESTAMP());
-- 3. 実行結果のメッセージを返す
RETURN 'テーブル snowflake_test.my_test.test_table が作成され、サンプルデータが挿入されました。';
END;
$$;
作成後に下記コマンドを叩くことで、ストアドプロシージャを呼び出すことができます!
CALL snowflake_test.my_test.my_test_procedure('今日は');
これにより、テーブルの作成がされデータのINSERTがされます。
SELECT * FROM snowflake_test.my_test.test_table;

このようにtest_tableテーブルが作成され、レコードが挿入されていることが分かります。
3.タスクの作成
次に、Snowflakeのタスク機能についてご紹介します。
タスクとは、Snowflake内でスケジュールに基づいたSQL操作を自動実行するための機能です。具体的には、単一のSQL文、またはストアドプロシージャの呼び出しを、指定した間隔や時間で繰り返し実行するように設定できます。
では、実際に先ほど作成したストアドプロシージャを呼び出すタスクを作成してみましょう!
CREATE OR REPLACE TASK snowflake_test.my_test.my_test_task WAREHOUSE = 'YOUR_WAREHOUSE_NAME' SCHEDULE = 'USING CRON 0 9 * * * Asia/Tokyo' AS -- 実行するストアドプロシージャの呼び出し CALL snowflake_test.my_test.my_test_procedure();
作成直後ではタスクは停止状態のため、ALTERコマンドを実行して起動する必要があります。
ALTER TASK snowflake_test.my_test.my_test_task RESUME;
ここでタスクで指定したウェアハウスはストアドプロシージャ内での処理についても使用されます。
上記が基本的なストアドプロシージャとタスクの作成方法と使用方法になります。
3.タスクグラフについて
次にタスクとストアドプロシージャの応用として、タスクグラフを作成してみます。 Snowflakeのタスクグラフとは、ルートタスクを起点に、依存関係を持つタスク群をループしない一方向(DAG)で順序立てて実行するための機能になります。 下記図のように、ルートタスクの後に、タスクA、タスクBを並列実行し、タスクAとタスクBの処理が終わった後にタスクCの処理をすることができます。 このタスクグラフのメリットとして、一つ一つの処理をモジュール化して管理をしやすくなる点とタスク同士の並列処理が可能なため、シーケンシャルでない処理がある場合は、 実行時間の短縮をすることができます。

では実際に、タスクグラフを作成してみましょう!
CREATE OR REPLACE TASK snowflake_test.my_test.root_task
WAREHOUSE = 'YOUR_WAREHOUSE_NAME'
SCHEDULE = 'USING CRON 0 9 * * * Asia/Tokyo'
AS
CALL snowflake_test.my_test.my_test_procedure('root_task');
CREATE OR REPLACE TASK snowflake_test.my_test.task_A
WAREHOUSE = 'YOUR_WAREHOUSE_NAME'
AFTER snowflake_test.my_test.root_task
AS
CALL snowflake_test.my_test.my_test_procedure('task_A');
CREATE OR REPLACE TASK snowflake_test.my_test.task_B
WAREHOUSE = 'YOUR_WAREHOUSE_NAME'
AFTER snowflake_test.my_test.root_task
AS
CALL snowflake_test.my_test.my_test_procedure('task_B');
CREATE OR REPLACE TASK snowflake_test.my_test.task_C
WAREHOUSE = 'YOUR_WAREHOUSE_NAME'
AFTER snowflake_test.my_test.task_A, snowflake_test.my_test.task_B
AS
CALL snowflake_test.my_test.my_test_procedure('task_C');
上記のようにルートタスク以外の子タスクにはAFTER句を使ってグラフの作成をすることができます! コンソールからタスクの詳細画面にいくと下記の図のようにタスクグラフが作成されていることを確認できます。

また、上記の例ではルートタスクにSCHEDULE句で定期実行の設定をしていますが、 設定をせずに、EXECUTE TASKを使うことで、タスクグラフを呼び出すことも可能です。
ここで、タスクグラフ自体を並列実行したいケースがあるかと思います。 そのような場合はルートタスクの設定にALLOW_OVERLAPPING_EXECUTIONの設定をTRUEにしましょう! この設定をすることで、ルートタスクの実行は重なることはありませんが、子タスクの実行は並列して処理されることになります。
CREATE OR REPLACE TASK snowflake_test.my_test.root_task
WAREHOUSE = 'YOUR_WAREHOUSE_NAME'
ALLOW_OVERLAPPING_EXECUTION = True
AS
CALL snowflake_test.my_test.my_test_procedure_1('root_task');
5.タスク同士での値の受け渡し
4章でタスクグラフの作成をしましたが、タスク同士で値の受け渡しが可能です。 使用用途としては例えば、ルートタスクで得られた結果を元に子タスクの処理を変更したいといったケースが挙げられるかと思います。
下記がタスクグラフで子タスクに値わたしを実装するサンプルコードになります。
CREATE OR REPLACE PROCEDURE snowflake_test.my_test.root_task_procedure()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
DECLARE
--SYSTEM$SET_RETURN_VALUEに与える変数
today_date STRING;
BEGIN
today_date := (SELECT CURRENT_DATE()::STRING);
CALL SYSTEM$SET_RETURN_VALUE(:today_date);
RETURN today_date;
END;
$$;
CREATE OR REPLACE PROCEDURE snowflake_test.my_test.task_A_procedure()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
DECLARE
test_data VARCHAR;
BEGIN
CREATE OR REPLACE TABLE snowflake_test.my_test.test_table (
name VARCHAR,
created_at TIMESTAMP_NTZ
);
test_data := SYSTEM$GET_PREDECESSOR_RETURN_VALUE('ROOT_TASK_2');
INSERT INTO snowflake_test.my_test.test_table (name, created_at) VALUES (:test_data, CURRENT_TIMESTAMP());
END;
$$;
--タスクグラフの作成
CREATE OR REPLACE TASK snowflake_test.my_test.root_task_2
WAREHOUSE = 'YOUR_WAREHOUSE_NAME'
AS
CALL snowflake_test.my_test.root_task_procedure();
CREATE OR REPLACE TASK snowflake_test.my_test.task_A_2
WAREHOUSE = 'YOUR_WAREHOUSE_NAME'
AFTER snowflake_test.my_test.root_task_2
AS
CALL snowflake_test.my_test.task_A_procedure();
--子タスクの起動
ALTER TASK snowflake_test.my_test.task_A_2 RESUME;
--タスクグラフの実行
EXECUTE TASK snowflake_test.my_test.root_task_2;
--結果の確認
SELECT * FROM snowflake_test.my_test.test_table;
snowflake_test.my_test.test_tableに実行日の日付のレコードが挿入されていれば、成功になります。

また注意ポイントとして、my_test.task_A_procedureにて記載しているSYSTEM$GET_PREDECESSOR_RETURN_VALUE('ROOT_TASK_2');の箇所で ROOT_TASK_1を小文字で設定するとエラーが出ますので、大文字で設定するようにしましょう。 最初はエラーの原因がわからず、初めて遭遇した時はかなり原因究明に時間がかかりました。
6.まとめ
ここまで読んでいただきありがとうございました。 Snowflakeのタスクとストアドプロシージャを組み合わせることで、柔軟なデータ処理が実現できるということが分かりました。 公式ドキュメントには、まだ把握しきれていない興味深い機能や設定が数多く存在しており、さらに深く調べていくと、新しい発見がありそうです。 この記事を通じて、少しでも皆様の参考になれば幸いです。
みなさま良いお年を!