Delta Live Tablesでリアルタイムストリーミング処理

IoTデータをリアルタイムに取り込む

  • データブリックスのサンプルデータにJSON形式のIoTデータがありますが、このようなIoTデータをリアルタイムで取り込むパイプライン処理をDelta Live Tableで実現します
  • Delta Live Tables はノートブックのセルで対話的に実行するように設計されていません。 以下のソースコードの@dlt.tableデコレーターは、bronze_sensors関数によって返された DataFrame の結果を含むテーブルを作成するように Delta Live Tables に指示します。後でDelta Live Tablesのパイプライン処理のソースとしてこのノートブックを設定します。デフォルトでは関数名がテーブル名となります。
  • spark.readStream.format("cloudFiles") はAutoLoaderという機能でクラウドストレージ上のファイル(この例ではJSON形式のファイル)をインクリメンタルにロードします。
  • "cloudFiles.schemaLocation"オプションは、スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が追加されるとテーブル スキーマを進化させることができます。これにより、スキーマの変更を手動で追跡して適用する必要がなくなります。入力データに対するスキーマの変更を追跡するように指定したディレクトリにスキーマ情報を保存します。新しい列を検出すると、 UnknownFieldExceptionで処理が停止します。しかしスキーマ情報に新しい列が追加されているので、自動的に再起動するように設定することで自動でスキーマが進化するワークフローを構成します。
  • "cloudFiles.schemaHints"オプションは、既知で予期しているスキーマ情報を推論されたスキーマに適用できます。
  • "cloudFiles.useIncrementalListing"オプションは、導入するために条件はありますが、クラウドストレージのディレクトリを増分で捜査(全捜査ではない)して新しいファイルを検出するため、大量のファイルを処理する場合に高速に動作します。検出された新たなファイルのレコードは常にブロンズテーブルに追加挿入されます。
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

# 読み込み元データのパス
SENSORS_SRC_DATA_DIR = '/databricks-datasets/iot-stream/data-device/'
# チェックポイントのパス
SENSORS_CHECKPOINT_DIR = 's3://clue-technologies-demo/iot_sample/checkpoint/data-device'

@dlt.table
def bronze_sensors():
    return (
      # ストリーミングソースからインクリメンタルにロードされる
      spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .option("cloudFiles.schemaLocation", SENSORS_CHECKPOINT_DIR)
          .option("cloudFiles.schemaHints","id bigint, device_id integer, user_id integer, calories_burnt decimal(10,2), miles_walked decimal(10,2), num_steps decimal(10,2), timestamp timestamp")
          .option("cloudFiles.useIncrementalListing", True)
          .load(SENSORS_SRC_DATA_DIR)
    )

UPSERT処理で変更情報を取り込む

  • Delta Live TablesはCDC(Change Data Capture)をサポートしています。以下のようにDLTのAPIのapply_changes()関数を定義します。
  • keysはレコードを一意に特定するカラムの組み合わせを指定します。デフォルトの設定(SCDのType1)だと、指定されたキーにマッチするtargetのテーブル(以下例はsilver_sensors)のレコードを更新します。マッチするレコードが存在しない場合は、追加します(つまりUPSERTを実現します)
dlt.create_target_table('silver_sensors', 
                        table_properties = {'delta.enableChangeDataFeed': 'true'})
dlt.apply_changes(target='silver_sensors',
                  source='bronze_sensors',
                  keys=['Id', 'user_id', 'device_id'],
                  sequence_by='timestamp')

Delta Live Tablesのセットアップ

  • ワークフローのDelta Live Tablesタブ画面より「Create pipline」ボタンをクリックしてDelta Live Tablesのセットアップを行います。
  • Pipeline modeはTriggeredとContinuousを指定します。(バッチ処理かリアルタイム処理かを設定で切り替え可能)
  • Triggeredは処理が完了すると自動的に停止します。TriggeredはさらにDevelopmentとProductionモードのどちらかを選択して実行します。Developmentはパイプライン処理後、デフォルトでは2時間後にコンピュート(DLT Compute)を停止します。この時間はpipelines.clusterShutdown.delayで変更が可能です。主に開発中はこのモードを選択することで、起動時間のオーバーヘッドを回避することができます。Productionはパイプライン実行後にすぐに停止します。
  • Continuousは、手動で停止するまで継続的に実行されます。データソースに到着した新しいデータを処理するため、最新の状態に保つことができます。pipelines.trigger.intervalパラメータで更新頻度を設定することが可能です。(以下例では10秒毎に更新)
@dlt.table(
  spark_conf={"pipelines.trigger.interval" : "10 seconds"}
)
def bronze_sensors():
    return (

Delta Live Tablesを実行する

  • Startボタンをクリックするとパイプラインが実行されます。(Schedule設定の可能です)
  • リソースの準備が完了すると以下のグラフが表示されます。初回のロードで100万件を処理していることが表示されています。
  • データエクスプローラーから作成されたテーブルを選択して Lineage タブをクリックし、 See lineage graph ボタンをクリックすると、データリネージ(データの来歴)グラフを表示することができます。

リアルタイムダッシュボードのデモ

  • Databricksのブログ記事では今回DeltaLiveTablesでセットアップしたIoTデータを元にDashアプリケーションでデータを可視化するデモを公開しています
  • デモではDatabricks SQL connector for Pythonで直接Delta Live Tablesで作成したStreaming tableに接続し、データを表示しており、リアルタイムストリーミングダッシュボードを実現します。
  • 以下の手順でデモアプリケーションをセットアップして実行します。(本手順ではパッケージ管理ツールのpipenvを利用しています)
git clone git@github.com:plotly/dash-dbx-sql.git
pipenv install -r ./requirements.txt
  • 依存関係で問題があり、flaskをuninstallして2.2.2をinstallしました。
  • .envファイルを作成し、Databricks SQLの接続情報を設定します
SERVER_HOSTNAME="xxxxxxxxxxxxxxxx.cloud.databricks.com"
HTTP_PATH="/sql/1.0/warehouses/9999999999999999"
ACCESS_TOKEN="dapixxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  • Dashアプリケーションを実行します。
pipenv run python app.py
  • 体感ですが、Deltaキャッシュを利用しているため、2回目以降のレスポンスはかなり早くなりました(2~3秒程度)

まとめ

  • IoTデータをバッチで取り込むのかリアルタイムに取り込むのかは更新頻度の要件とコスト見合いで設定で運用を切り替えられます。またAutoLoaderのスキーマ進化の機能でスキーマのメンテナンスコストを削減し最新の状態に維持することができます。
  • 増分の変更をキャプチャし、UPSERTします。今回紹介できませんでしたが、更新履歴を保持したい場合はSCD Type2の設定を行うことで対応が可能です。
  • Delta Live TablesはDatabricksのETLパイプラインを宣言的に定義することで、処理の見通しがよくなります。今回紹介しなかった機能もまだまだ多数(データ品質を確保するためのルール定義など)あり、機能追加のUpdateが早く、注目の機能となっているため、今後も検証結果をまとめていきたいと思います。