MWAAからECSでdbtを実行する

はじめに

  • Airflowなどのようなワークフローツールからdbtを実行するためにはいくつかの選択肢があります(例えば、AWS公式にはMWAAのワーカーから実行するための構成例が示されていますが、dbtのパッケージと干渉するため特定のバージョンでのdbtを実行することができません)この記事では、MWAAからECS(Fargate)上でdbtを実行する構成を例示します。

Dockerイメージの作成

  • ECSにデプロイされるDockerイメージを作成します。開発環境のdbtと依存するPythonパッケージをインストールするのにpipenv syncを利用しています
  • 以下はDockerfileの例です
# Top level build args
ARG build_for=linux/amd64

##
# base image (abstract)
##
FROM --platform=$build_for python:3.10.7-slim-bullseye as base

# System setup
RUN apt-get update \
  && apt-get dist-upgrade -y \
  && apt-get install -y --no-install-recommends \
    git \
    jq \
    curl \
    unzip \
    sudo \
    ssh-client \
    software-properties-common \
    make \
    build-essential \
    ca-certificates \
    libpq-dev \
  && apt-get clean \
  && rm -rf \
    /var/lib/apt/lists/* \
    /tmp/* \
    /var/tmp/*

# Env vars
ENV PYTHONIOENCODING=utf-8
ENV LANG=C.UTF-8

# Update python
RUN python -m pip install --upgrade pip setuptools wheel --no-cache-dir

# pipenv sync
COPY Pipfile ./
COPY Pipfile.lock ./
RUN python -m pip install pipenv==2022.12.19
RUN python -m pipenv sync --system

COPY dbt_prj /usr/app/dbt
COPY profiles.yml /root/.dbt/

COPY job /usr/app/job

WORKDIR /usr/app/job/

RUN chmod -R 755 /usr/app/job/

ENTRYPOINT [ "/bin/sh", "-c" ]
  • dbt-postgresをインストールします。(この例ではpipenvですが、他の仮想環境構築ツールを利用しても問題ありません)
  pipenv install dbt-postgres==1.3.1
  • dbtという名前のプロジェクト名でdbtを初期化します
  dbt init dbt
  • 次にDocker実行時に呼び出されるスクリプトファイル(dbt/scripts/run_dbt.sh)を作成します。実行時にデータベースの認証情報をAWS Secretsから取得しています
  #!/bin/bash
  set -e

  dbt run --profiles-dir /root/.dbt --project-dir /usr/app/dbt
  • dbtのプロファイル(profiles.yml)は環境変数から認証情報を設定するようにします。必要に応じてローカル環境で実行するための認証情報を設定することも可能です。
  dbt_app:
    outputs:
      default:
        connect_timeout: 10
        dbname: denchi_db
        host: "{{ env_var('DBT_HOST') }}"
        keepalives_idle: 0
        password: "{{ env_var('DBT_PASSWORD') }}"
        port: 5432
        schema: public
        threads: 1
        type: postgres
        user: "{{ env_var('DBT_USER') }}"
    target: default

環境構築

  • ここではTerraformでセットアップを行いますが、環境によって異なるため一部のみを例示します。環境設定に必要なパラメータは変数として設定しています。
  • 以下はMWAAを構成します。一部手動で対応する必要があることに注意してください。以下のスクリプトで作成されるIAMロールにecs実行時のロールへのPassRole権限とecsの実行権限を追加する必要があります。
  provider "aws" {
    region = var.region
  }

  module "mwaa" {
    source = "aws-ia/mwaa/aws"

    name                 = var.name
    airflow_version      = "2.2.2"
    environment_class    = "mw1.small"

    vpc_id                = var.vpc_id
    private_subnet_ids    = [var.mwaa_subnet_a,var.mwaa_subnet_c]

    min_workers           = 1
    max_workers           = 10
    webserver_access_mode = "PRIVATE_ONLY"

    logging_configuration = {
      dag_processing_logs = {
        enabled   = true
        log_level = "INFO"
      }

      scheduler_logs = {
        enabled   = true
        log_level = "INFO"
      }

      task_logs = {
        enabled   = true
        log_level = "INFO"
      }

      webserver_logs = {
        enabled   = true
        log_level = "INFO"
      }

      worker_logs = {
        enabled   = true
        log_level = "INFO"
      }
    }

    airflow_configuration_options = {
      "core.load_default_connections" = "false"
      "core.load_examples"            = "false"
      "webserver.dag_default_view"    = "tree"
      "webserver.dag_orientation"     = "TB"
      "logging.logging_level"         = "INFO"
    }
  }
  • ECSを構成します。以下は簡略化のため省いていますが、VPCエンドポイントは必要に応じて追加するとよいでしょう
  • また、dbtのプロファイルに設定される認証情報をSecretsManagerから設定しているため、SecretsのGet権限が必要になります。
  provider "aws" {
    region = var.region
  }

  locals {
    ecs_cluster_name           = "dbt-app-cluster"
    app_name                   = "dbt-app"
  }

  data "aws_caller_identity" "current" {}
  data "aws_region" "current" {}

  ####################################################
  # ECS Cluster
  ####################################################
  resource "aws_ecs_cluster" "app-cluster" {
    name               = local.ecs_cluster_name
  }

  resource "aws_ecs_cluster_capacity_providers" "app-cluster_capacity_providers" {
    cluster_name       = local.ecs_cluster_name
    capacity_providers = ["FARGATE"]
    default_capacity_provider_strategy {
      base              = 1
      weight            = 100
      capacity_provider = "FARGATE"
    }
  }

  ####################################################
  # ECS IAM Role
  ####################################################
  resource "aws_iam_role" "ecs_task_execution_role" {
    name               = "ecs_task_execution_role"
    assume_role_policy = jsonencode({
      Version   = "2012-10-17"
      Statement = [
        {
          Effect    = "Allow"
          Principal = { Service = "ecs-tasks.amazonaws.com" }
          Action    = "sts:AssumeRole"
        }
      ]
    })
    managed_policy_arns = [
      "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy",
      "arn:aws:iam::aws:policy/AmazonSSMReadOnlyAccess"
    ]
  }
  resource "aws_iam_role_policy" "ecs_execution_policy" {
    name   = "ecs_task_execution_role_policy"
    role   = aws_iam_role.ecs_task_execution_role.id
    policy = file("./roles/ecs_task_execution_role.json")
  }

  ####################################################
  # ECS Task Container Log Groups
  ####################################################

  resource "aws_cloudwatch_log_group" "dbt_app" {
    name              = "/ecs/${local.app_name}"
    retention_in_days = 30
  }

  ####################################################
  # ECS Task Definition
  ####################################################

  resource "aws_ecs_task_definition" "app-task" {
    family                   = "app-task"
    requires_compatibilities = ["FARGATE"]
    cpu                      = 256
    memory                   = 512
    network_mode             = "awsvpc"
    execution_role_arn       = aws_iam_role.ecs_task_execution_role.arn
    task_role_arn            = aws_iam_role.ecs_task_execution_role.arn
    container_definitions = jsonencode([
      {
        name      = "dbt-job"
        image     = "xxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/dbt-app:latest"
        logConfiguration = {
          logDriver = "awslogs"
          options   = {
            awslogs-region : "ap-northeast-1"
            awslogs-group : aws_cloudwatch_log_group.dbt_app.name
            awslogs-stream-prefix : "ecs"
          }
        }
        secrets =  [{
          name = "DBT_HOST"
          valueFrom =  "${var.secrets_manager_arn}:db_host::"
        },{
          name = "DBT_USER"
          valueFrom =  "${var.secrets_manager_arn}:db_user::"
        },{
          name = "DBT_PASSWORD"
          valueFrom =  "${var.secrets_manager_arn}:db_password::"
        }]
      }
    ])
    runtime_platform {
      operating_system_family = "LINUX"
      cpu_architecture        = "X86_64"
    }
  }

DAGの構築

  • MWAAにデプロイするdagファイルを作成します。
    ネットワーク設定はAirflowのVariableから設定しています。
  from http import client
  from airflow import DAG
  from airflow.providers.amazon.aws.operators.ecs import ECSOperator
  from airflow.utils.dates import days_ago
  from airflow.models import Variable
  import boto3

  CLUSTER_NAME="dbt-app-cluster"
  CONTAINER_NAME="dbt-job"
  LAUNCH_TYPE="FARGATE"
  NETWORK_CONFIG = Variable.get("NETWORK_CONFIG", deserialize_json=True)

  with DAG(
      dag_id = "ecs_fargate_dag",
      schedule_interval=None,
      catchup=False,
      start_date=days_ago(1)
  ) as dag:
      client=boto3.client('ecs')
      ecs_operator_task = ECSOperator(
          task_id = "ecs_operator_task",
          dag=dag,
          cluster=CLUSTER_NAME,
          task_definition="app-task",
          launch_type=LAUNCH_TYPE,
          overrides={
              "containerOverrides":[
                  {
                      "name":CONTAINER_NAME
                  }
              ]
          },
          network_configuration = NETWORK_CONFIG
      )
Key:
  NETWORK_CONFIG
Value:
  {
    "awsvpcConfiguration": {
      "subnets": ["subnet-xxxxxxxxxxx", "subnet-xxxxxxxxx"],
      "securityGroups": ["sg-xxxxxxxxxxxxxxx"]
    }
  }

実行

  • 最後にDockerをビルドしてECRにpushすれば、DAGからdbtを実行することができます。必要に応じて、コマンドを上書きすることも可能です。
  • 今回はDockerイメージをデプロイするための方法について言及していませんが、AWSサービスのCodeBuildやCodePipelineを利用してECRにDockerイメージをビルドするCICDパイプラインを構築することも可能です。