dbt Coreでdatabricksの変換処理を実行してみた

dbtはデータウェアハウス内で変換処理(Transformation)を実行することができるツールでELT(Extract,Load,Transform)のTを担当します。データウェアハウス(Redshift,BigQuery,Snowflake,Databricksなど)にあるRawDataを変換して結果を書き出す処理を実行します

dbt

dbtはSaaS製品のdbt CloudとCLIベースのdbt Coreから利用することができます。この記事ではdbt Coreからdatabricks上で変換処理を実行してみます。

dbt Coreのセットアップ

前提としてローカルの開発マシンに以下をインストールしておきます。

  • Python 3.7以降(本記事では Python 3.8)
  • Python 仮想環境を作成するためのユーティリティ(本記事では pipenv)
pipenv --python 3.8
pipenv install dbt-databricks

以下を実行し、仮想環境をアクティブにしてdbtの初期化コマンド(dbt init)を実行します。
dbt_demoはプロジェクト名になります。任意の値を設定してください。

pipenv shell
dbt init dbt_demo

初期化コマンドを実行すると対話形式でコマンドを入力していきます。
hostはDatabricksワークスペースのURLを入力します。
http_pathはSQLエンドポイントのConnection details画面に表示されているHTTP pathを指定します。
tokenは、Databricksユーザーのパーソナルアクセストークンを設定します。
schemaは、出力モデルを作成するスキーマを指定します。

04:53:33  Running with dbt=1.2.2
Which database would you like to use?
[1] databricks
[2] spark

(Don't see the one you want? https://docs.getdbt.com/docs/available-adapters)

Enter a number: 1
host (yourorg.databricks.com): xxxxxxxxxxx.cloud.databricks.com
http_path (HTTP Path): /sql/1.0/endpoints/99999999999999aa
token (dapiXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX): 
[1] use Unity Catalog
[2] not use Unity Catalog
Desired unity catalog option (enter a number): 2
schema (default schema that dbt will build objects in): dbt_demo
threads (1 or more) [1]: 2
04:57:31  Profile dbt_demo written to C:\Users\xxxxx\.dbt\profiles.yml using target's profile_template.yml and your supplied values. Run 'dbt debug' to validate the connection.
04:57:31  
Your new dbt project "dbt_demo" was created!

For more information on how to configure the profiles.yml file,
please consult the dbt documentation here:

  https://docs.getdbt.com/docs/configure-your-profile

One more thing:

Need help? Don't hesitate to reach out to us via GitHub issues or on Slack:

  https://community.getdbt.com/

Happy modeling!

Happy modeling!が見えたら初期化が終わっていますので、疎通確認をしてみましょう。
プロジェクトフォルダ(今回はdbt_demo)に移動してdbt debugを実行します。

dbt debug

All checks passed!が表示されたら、疎通は完了しています。

作成するデータフロー

以下の商品マスタ結合後データモデルカテゴリ別売上集計モデルを作成してみます。

rawスキーマに入力元となるテーブルとデータを作成しておきます。
売上明細テーブル(SALES_DETAIL)

CREATE TABLE raw.SALES_DETAIL (
    SALES_DATE_TIME TIMESTAMP NOT NULL,
    ITEM_CODE VARCHAR(13) NOT NULL,
    AMOUNT INT NOT NULL
);

INSERT INTO raw.SALES_DETAIL VALUES ('2017-03-31 23:59:59','4922010001000',3);
INSERT INTO raw.SALES_DETAIL VALUES ('2017-04-01 10:30:00','4922010001000',3);
INSERT INTO raw.SALES_DETAIL VALUES ('2017-04-01 10:31:00','4922010001001',2);
INSERT INTO raw.SALES_DETAIL VALUES ('2017-04-01 10:32:00','4922010001000',1);
INSERT INTO raw.SALES_DETAIL VALUES ('2017-04-01 10:33:00','4922010001002',1);
INSERT INTO raw.SALES_DETAIL VALUES ('2017-04-01 10:35:00','4922020002000',3);

商品マスタ(ITEM_INFO)

CREATE TABLE raw.ITEM_INFO (
    ITEM_CODE VARCHAR(13) NOT NULL,
    ITEM_NAME VARCHAR(128),
    CATEGORY_CODE CHAR(4) NOT NULL,
    CATEGORY_NAME VARCHAR(128),
    UNIT_PRICE INT NOT NULL,
    REGISTERED_DATE DATE NOT NULL,
    BEGIN_DATE DATE NOT NULL,
    END_DATE DATE NOT NULL
);

INSERT INTO raw.ITEM_INFO VALUES ('4922010001000','ミルクチョコレートM','1600','チョコレート菓子',120,'2017-04-01','2017-04-01','2018-01-19');
INSERT INTO raw.ITEM_INFO VALUES ('4922010001000','ミルクチョコレートM','1600','チョコレート菓子',130,'2018-01-20','2018-01-20','2019-12-31');
INSERT INTO raw.ITEM_INFO VALUES ('4922010001001','PREMIUM アソートチョコレート','1600','チョコレート菓子',330,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO raw.ITEM_INFO VALUES ('4922010001002','アーモンドクランチミニ','1600','チョコレート菓子',140,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO raw.ITEM_INFO VALUES ('4922020002000','カップ麺 しょうゆ','1401','カップ麺',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO raw.ITEM_INFO VALUES ('4922020002001','カップ麺 塩','1401','カップ麺',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO raw.ITEM_INFO VALUES ('4922020002002','カップ麺 カレー','1401','カップ麺',120,'2017-04-01','2017-04-01','2019-12-31');

入力データの定義ファイルを作成します。プロジェクトフォルダの下にmodelsフォルダがあるので、その直下にsource.yml という名前で作成します。

version: 2

sources:
  - name: staging
    schema: raw
    tables:
      - name: item_info
        description: '商品マスタ'
      - name: sales_detail
        description: '売上明細'

次にmodels フォルダの下に出力モデル用のフォルダmartsを作成して、その直下にfct_sales.sqlファイルを作成します。
configという設定内にmaterialized='table'という定義がありますが、これはこのモデルをテーブルとして作成するという意味です。これを指定しないとviewとして作成されます。
file_formatの設定でdeltaとしています。これはこのモデルをDeltaフォーマットのテーブルとして作成することを設定しています。

{{ config(
  materialized='table',
  file_format='delta'
) }}
with
sales_detail as (
	select * from {{ source('staging', 'sales_detail') }}
),
item_info as (
	select * from {{ source('staging', 'item_info') }}
),
joined_item as (
	select
	  sales_date_time,
	  sales_detail.item_code as item_code,
	  item_name,
	  category_code,
	  category_name,
	  amount,
	  unit_price,
	  amount * unit_price as selling_price
	  from sales_detail left join item_info
	  on sales_detail.item_code = item_info.item_code
	  and sales_detail.sales_date_time >= item_info.begin_date
	  and sales_detail.sales_date_time <= item_info.end_date
)

select * from joined_item

それから、martsフォルダの下にもう一つのモデルcategory_summary.sqlを作成します。

select category_code,
	category_name,
	sum(amount) as amount,
	sum(selling_price) as selling_price
	from  {{ ref('fct_sales') }}
	group by category_code, category_name

models下にデフォルトで作成されているexampleフォルダは削除しておきます。ここまでで、以下のようなファイル構成になっています。

プロジェクトフォルダ下にあるdbt_project.ymlファイルの一番下にある2行(exampleの定義)も削除しておきます。

models:
  dbt_demo:
    # Config indicated by + and applies to all files under models/example/
    example:
      +materialized: view

コマンドによるdbtの実行

これまでの設定でモデル定義が完成していますので、仮想環境以下のコマンドプロンプトからdbt runを実行します。

(.venv) xxxxx\dbt_demo>dbt run
11:58:14  Running with dbt=1.2.2
11:58:14  Found 2 models, 0 tests, 0 snapshots, 0 analyses, 321 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics
11:58:14
11:58:15  Concurrency: 2 threads (target='dev')
11:58:15  
11:58:15  1 of 2 START table model dbt_demo.fct_sales .................................... [RUN]
11:58:20  1 of 2 OK created table model dbt_demo.fct_sales ............................... [OK in 4.81s]
11:58:20  2 of 2 START view model dbt_demo.category_summary .............................. [RUN]
11:58:22  2 of 2 OK created view model dbt_demo.category_summary ......................... [OK in 1.74s]
11:58:22  
11:58:22  Finished running 1 table model, 1 view model in 0 hours 0 minutes and 7.47 seconds (7.47s).
11:58:22
11:58:22  Completed successfully
11:58:22
11:58:22  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

dbt_demo.fct_salesはtableモデルとして、dbt_demo.category_summaryはviewモデルとして作成されていますね。

データモデルをテストする

出力モデルcategory_summarycategory_codecategory_nameはそれぞれユニークかつNullでないことをテストするため、marts フォルダの下にモデル定義ファイルmarts.ymlを作成します。
それぞれのカラムにuniquenot_null定義を設定します。

version: 2

models:
  - name: category_summary
    description: カテゴリ別売上集計
    columns:
      - name: category_code
        description: カテゴリーコード
        tests:
          - unique
          - not_null
      - name: category_name
        description: カテゴリー名
        tests:
          - unique
          - not_null
      - name: amount
        description: 数量
      - name: selling_price
        description: 売上金額

仮想環境以下のコマンドプロンプトからdbt testを実行します。

(.venv) xxxxx\dbt_demo>dbt test
12:31:52  Running with dbt=1.2.2
12:31:52  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 321 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics
12:31:52
12:31:53  Concurrency: 2 threads (target='dev')
12:31:53  
12:31:53  1 of 4 START test not_null_category_summary_category_code ...................... [RUN]
12:31:53  2 of 4 START test not_null_category_summary_category_name ...................... [RUN]
12:31:54  1 of 4 FAIL 1 not_null_category_summary_category_code .......................... [FAIL 1 in 0.82s]
12:31:54  3 of 4 START test unique_category_summary_category_code ........................ [RUN]
12:31:54  2 of 4 FAIL 1 not_null_category_summary_category_name .......................... [FAIL 1 in 1.28s]
12:31:54  4 of 4 START test unique_category_summary_category_name ........................ [RUN]
12:31:54  3 of 4 PASS unique_category_summary_category_code .............................. [PASS in 0.77s]
12:31:55  4 of 4 PASS unique_category_summary_category_name .............................. [PASS in 1.11s]
12:31:55  
12:31:55  Finished running 4 tests in 0 hours 0 minutes and 3.16 seconds (3.16s).
12:31:55
12:31:55  Completed with 2 errors and 0 warnings:
12:31:55  
12:31:55  Failure in test not_null_category_summary_category_code (models\marts\marts.yml)
12:31:55    Got 1 result, configured to fail if != 0
12:31:55
12:31:55    compiled SQL at target\compiled\dbt_demo\models\marts\marts.yml\not_null_category_summary_category_code.sql
12:31:55
12:31:55  Failure in test not_null_category_summary_category_name (models\marts\marts.yml)
12:31:55    Got 1 result, configured to fail if != 0
12:31:55
12:31:55    compiled SQL at target\compiled\dbt_demo\models\marts\marts.yml\not_null_category_summary_category_name.sql
12:31:55
12:31:55  Done. PASS=2 WARN=0 ERROR=2 SKIP=0 TOTAL=4

エラーとなっているのが確認できると思います。

データを確認してみると、トランザクション(sales_detail)のSALES_DATE_TIME2017-03-31`のデータがマスタ(item_info)の結合に失敗しているようです。

変数を使って動的にデータ抽出条件を設定する

入力モデルの売上明細の抽出条件に変数を指定してみます。marts\fct_sales.sqlの8行目に条件を追加してみます。

{{ config(
  materialized='table',
  file_format='delta'
) }}
with
sales_detail as (
	select * from {{ source('staging', 'sales_detail') }}
	where sales_date_time between '{{ var("date") }} 00:00:00' and '{{ var("date") }} 23:59:59'

dbt runで実行する際に変数を設定するオプションを追加して実行します。

(.venv) xxxxx\dbt_demo>dbt run --vars "date: '2017-04-01'"

同様にdbt testにもオプションを追加して実行してみます。

(.venv) xxxxx\dbt_demo>dbt test --vars "date: '2017-04-01'"
12:55:49  Running with dbt=1.2.2
12:55:49  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 321 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics
12:55:49
12:55:50  Concurrency: 2 threads (target='dev')
12:55:50  
12:55:50  1 of 4 START test not_null_category_summary_category_code ...................... [RUN]
12:55:50  2 of 4 START test not_null_category_summary_category_name ...................... [RUN]
12:55:51  1 of 4 PASS not_null_category_summary_category_code ............................ [PASS in 1.10s]
12:55:51  2 of 4 PASS not_null_category_summary_category_name ............................ [PASS in 1.10s]
12:55:51  3 of 4 START test unique_category_summary_category_code ........................ [RUN]
12:55:51  4 of 4 START test unique_category_summary_category_name ........................ [RUN]
12:55:52  3 of 4 PASS unique_category_summary_category_code .............................. [PASS in 1.02s]
12:55:52  4 of 4 PASS unique_category_summary_category_name .............................. [PASS in 1.12s]
12:55:52  
12:55:52  Finished running 4 tests in 0 hours 0 minutes and 3.14 seconds (3.14s).
12:55:52
12:55:52  Completed successfully
12:55:52
12:55:52  Done. PASS=4 WARN=0 ERROR=0 SKIP=0 TOTAL=4

今度はテストにパスしていることを確認できました。

ドキュメントを生成する

dbt docs generateコマンドを実行すると、models下にある定義を元にドキュメントを作成してくれます。
targetフォルダ下にドキュメントファイル(html形式)が作成されます。

(.venv) xxxxx\dbt_demo>dbt docs generate --vars "date: '2017-04-01'"
13:01:03  Running with dbt=1.2.2
13:01:04  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 321 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics
13:01:04
13:01:04  Concurrency: 2 threads (target='dev')
13:01:04  
13:01:04  Done.
13:01:04  Building catalog
13:01:06  Catalog written to xxxxx\dbt_demo\target\catalog.json

ローカルホストでドキュメントを閲覧するにはdbt docs servコマンドを実行します。
marts.yml設定ファイルで定義したdescriptionもドキュメントに反映されます。

dbtは変換処理で定義したsourcerefを使うとソール、モデル間の依存関係を可視化(リネージュ)してくれます。

まとめ

dbtCoreでDatabricks上での変換処理を実行し、データモデル(fct_salesとcategory_summary)を作成しました。
作成したモデルのテストを実行し、ドキュメントの作成もしてみました。
変数宣言をSQL内の条件に追加して実行時に動的に渡す方法も試してみました。
まだまだ色々な機能があるので、確認してみたいと思います。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です