digdagのworkflowをCIでrun throughする

2019-11-17·
Aki Ariga
Aki Ariga
· 3 min read

以前、kawasaki.rbやマジフェスでdigdagのworkflowのデバッグやテストどうすればいいんだ〜という話をしました。

How do you debug/test your Workflow?

この話は、でかいデータと戯れるworkflowをどうdebug、テストすればいいのかという話から、PyCharmとDockerでdebugするという話をしましたが、今回はworkflowを実際にrun throughできるようにするためのtipsとそれを実現するためのcookiecutterを作ったので紹介しようと思います。

cookiecutter-digdag

chezou/cookiecutter-digdag

cookiecutterはPythonで書かれたプロジェクトを作るためのテンプレートです。内部ではPythonのtemplate engineであるjinja2を使っており、普通HTMLをレンダーするために使うものをconfiguration fileやfile名、directoryに使ってしまおうと言うものです。

$ pip install --user cookiecutter

でcookiecutterをインストールしたら、

cookiecutter https://github.com/chezou/cookiecutter-digdag

とすれば、ディレクトリ以下に新しいpy> operatorを使ったdigdagプロジェクトができます。

my_project
├── README.md
├── config
   ├── params.test.yml     <- Configuration file for run through test. Mirror params.yml except for `td.database`
   └── params.yml          <- Configuration file for production
├── awesome_workflow.dig    <- Main workflow to be executed
├── ingest.dig              <- Data ingestion workflow
├── py_scripts              <- Python scripts directory
   ├── __init__.py
   ├── data.py             <- Script to upload data to Arm Treasure Data
   └── my_script.py        <- Main script to execute e.g. Data enrichment, ML training
├── queries                 <- SQL directory
├── run_test.sh             <- Test shell script for local run through test
└── test.dig                <- Test workflow for local run through test

run through testをCIで走らせるためのポイント

これだけだとふーんなんですけど、ポイントとしては test.dig に詰まっています。

_export:
  !include : config/params.yml

_error:
  td_ddl>:
  drop_databases: ["${td.database}"]

+ingest:
  call>: ingest.dig

+execute:
  call>: {{cookiecutter.main_workflow}}.dig

+cleanup:
  td_ddl>:
  drop_databases: ["${td.database}"]

特徴は

  1. config/params.yml にDB名の設定を指定する
  2. ingest.dig でデータのDBの作成とデータの書き込みをする
  3. {{cookiecutter.main_workflow}}.dig (デフォルトは awesome_workflow.dig )でcookiecutter使用時に指定されるメインのワークフローをcallする
  4. cleanup_error taskでDBを削除する

という4点です。

実際には、テストは run_test.sh を実行することで行われるのですが、その中身を見てみましょう。

mv config/params.test.yml config/params.yml
{{cookiecutter.digdag_path}} run test.dig

digdag_path はデフォルト digdagとなり、local modeでdigdagを実行します。

mv config/params.test.yml config/params.ymlparams.test.yml に差し替えをするのですが、

# Target Database on Treasure Data
td:
  database: "{{cookiecutter.database}}_${session_uuid.replace(/-/g,'_')}"
  table: {{cookiecutter.table}}

このように、 _${session_uuid.replace(/-/g,'_')}とユニークな文字列を作ることで、一時的なDB名を作るようにしています。

このやり方をすることで、メインのワークフローに極力触ることなく、CIでlocalモードのdigdagを使い走り切るかのテストができます。実際にこの構成で行っているworkflowの例はtreasure-boxesレポジトリにあるので、参考にしてください。


Back to home

Aki Ariga
Authors
Principal Software Engineer
Interested in Machine Learning, ML Ops, and Data driven business. If you like my blog post, I’m glad if you can buy me a tea 😉

Related