はじめに
- Airflowなどのようなワークフローツールからdbtを実行するためにはいくつかの選択肢があります(例えば、AWS公式にはMWAAのワーカーから実行するための構成例が示されていますが、dbtのパッケージと干渉するため特定のバージョンでのdbtを実行することができません)この記事では、MWAAからECS(Fargate)上でdbtを実行する構成を例示します。
Dockerイメージの作成
- ECSにデプロイされるDockerイメージを作成します。開発環境のdbtと依存するPythonパッケージをインストールするのにpipenv syncを利用しています
- 以下はDockerfileの例です
# Top level build args
ARG build_for=linux/amd64
##
# base image (abstract)
##
FROM --platform=$build_for python:3.10.7-slim-bullseye as base
# System setup
RUN apt-get update \
&& apt-get dist-upgrade -y \
&& apt-get install -y --no-install-recommends \
git \
jq \
curl \
unzip \
sudo \
ssh-client \
software-properties-common \
make \
build-essential \
ca-certificates \
libpq-dev \
&& apt-get clean \
&& rm -rf \
/var/lib/apt/lists/* \
/tmp/* \
/var/tmp/*
# Env vars
ENV PYTHONIOENCODING=utf-8
ENV LANG=C.UTF-8
# Update python
RUN python -m pip install --upgrade pip setuptools wheel --no-cache-dir
# pipenv sync
COPY Pipfile ./
COPY Pipfile.lock ./
RUN python -m pip install pipenv==2022.12.19
RUN python -m pipenv sync --system
COPY dbt_prj /usr/app/dbt
COPY profiles.yml /root/.dbt/
COPY job /usr/app/job
WORKDIR /usr/app/job/
RUN chmod -R 755 /usr/app/job/
ENTRYPOINT [ "/bin/sh", "-c" ]
- dbt-postgresをインストールします。(この例ではpipenvですが、他の仮想環境構築ツールを利用しても問題ありません)
pipenv install dbt-postgres==1.3.1
- dbtという名前のプロジェクト名でdbtを初期化します
dbt init dbt
- 次にDocker実行時に呼び出されるスクリプトファイル(dbt/scripts/run_dbt.sh)を作成します。実行時にデータベースの認証情報をAWS Secretsから取得しています
#!/bin/bash
set -e
dbt run --profiles-dir /root/.dbt --project-dir /usr/app/dbt
- dbtのプロファイル(profiles.yml)は環境変数から認証情報を設定するようにします。必要に応じてローカル環境で実行するための認証情報を設定することも可能です。
dbt_app:
outputs:
default:
connect_timeout: 10
dbname: denchi_db
host: "{{ env_var('DBT_HOST') }}"
keepalives_idle: 0
password: "{{ env_var('DBT_PASSWORD') }}"
port: 5432
schema: public
threads: 1
type: postgres
user: "{{ env_var('DBT_USER') }}"
target: default
環境構築
- ここではTerraformでセットアップを行いますが、環境によって異なるため一部のみを例示します。環境設定に必要なパラメータは変数として設定しています。
- 以下はMWAAを構成します。一部手動で対応する必要があることに注意してください。以下のスクリプトで作成されるIAMロールにecs実行時のロールへのPassRole権限とecsの実行権限を追加する必要があります。
provider "aws" {
region = var.region
}
module "mwaa" {
source = "aws-ia/mwaa/aws"
name = var.name
airflow_version = "2.2.2"
environment_class = "mw1.small"
vpc_id = var.vpc_id
private_subnet_ids = [var.mwaa_subnet_a,var.mwaa_subnet_c]
min_workers = 1
max_workers = 10
webserver_access_mode = "PRIVATE_ONLY"
logging_configuration = {
dag_processing_logs = {
enabled = true
log_level = "INFO"
}
scheduler_logs = {
enabled = true
log_level = "INFO"
}
task_logs = {
enabled = true
log_level = "INFO"
}
webserver_logs = {
enabled = true
log_level = "INFO"
}
worker_logs = {
enabled = true
log_level = "INFO"
}
}
airflow_configuration_options = {
"core.load_default_connections" = "false"
"core.load_examples" = "false"
"webserver.dag_default_view" = "tree"
"webserver.dag_orientation" = "TB"
"logging.logging_level" = "INFO"
}
}
- ECSを構成します。以下は簡略化のため省いていますが、VPCエンドポイントは必要に応じて追加するとよいでしょう
- また、dbtのプロファイルに設定される認証情報をSecretsManagerから設定しているため、SecretsのGet権限が必要になります。
provider "aws" {
region = var.region
}
locals {
ecs_cluster_name = "dbt-app-cluster"
app_name = "dbt-app"
}
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
####################################################
# ECS Cluster
####################################################
resource "aws_ecs_cluster" "app-cluster" {
name = local.ecs_cluster_name
}
resource "aws_ecs_cluster_capacity_providers" "app-cluster_capacity_providers" {
cluster_name = local.ecs_cluster_name
capacity_providers = ["FARGATE"]
default_capacity_provider_strategy {
base = 1
weight = 100
capacity_provider = "FARGATE"
}
}
####################################################
# ECS IAM Role
####################################################
resource "aws_iam_role" "ecs_task_execution_role" {
name = "ecs_task_execution_role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = { Service = "ecs-tasks.amazonaws.com" }
Action = "sts:AssumeRole"
}
]
})
managed_policy_arns = [
"arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy",
"arn:aws:iam::aws:policy/AmazonSSMReadOnlyAccess"
]
}
resource "aws_iam_role_policy" "ecs_execution_policy" {
name = "ecs_task_execution_role_policy"
role = aws_iam_role.ecs_task_execution_role.id
policy = file("./roles/ecs_task_execution_role.json")
}
####################################################
# ECS Task Container Log Groups
####################################################
resource "aws_cloudwatch_log_group" "dbt_app" {
name = "/ecs/${local.app_name}"
retention_in_days = 30
}
####################################################
# ECS Task Definition
####################################################
resource "aws_ecs_task_definition" "app-task" {
family = "app-task"
requires_compatibilities = ["FARGATE"]
cpu = 256
memory = 512
network_mode = "awsvpc"
execution_role_arn = aws_iam_role.ecs_task_execution_role.arn
task_role_arn = aws_iam_role.ecs_task_execution_role.arn
container_definitions = jsonencode([
{
name = "dbt-job"
image = "xxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/dbt-app:latest"
logConfiguration = {
logDriver = "awslogs"
options = {
awslogs-region : "ap-northeast-1"
awslogs-group : aws_cloudwatch_log_group.dbt_app.name
awslogs-stream-prefix : "ecs"
}
}
secrets = [{
name = "DBT_HOST"
valueFrom = "${var.secrets_manager_arn}:db_host::"
},{
name = "DBT_USER"
valueFrom = "${var.secrets_manager_arn}:db_user::"
},{
name = "DBT_PASSWORD"
valueFrom = "${var.secrets_manager_arn}:db_password::"
}]
}
])
runtime_platform {
operating_system_family = "LINUX"
cpu_architecture = "X86_64"
}
}
DAGの構築
- MWAAにデプロイするdagファイルを作成します。
ネットワーク設定はAirflowのVariableから設定しています。
from http import client
from airflow import DAG
from airflow.providers.amazon.aws.operators.ecs import ECSOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
import boto3
CLUSTER_NAME="dbt-app-cluster"
CONTAINER_NAME="dbt-job"
LAUNCH_TYPE="FARGATE"
NETWORK_CONFIG = Variable.get("NETWORK_CONFIG", deserialize_json=True)
with DAG(
dag_id = "ecs_fargate_dag",
schedule_interval=None,
catchup=False,
start_date=days_ago(1)
) as dag:
client=boto3.client('ecs')
ecs_operator_task = ECSOperator(
task_id = "ecs_operator_task",
dag=dag,
cluster=CLUSTER_NAME,
task_definition="app-task",
launch_type=LAUNCH_TYPE,
overrides={
"containerOverrides":[
{
"name":CONTAINER_NAME
}
]
},
network_configuration = NETWORK_CONFIG
)
Key:
NETWORK_CONFIG
Value:
{
"awsvpcConfiguration": {
"subnets": ["subnet-xxxxxxxxxxx", "subnet-xxxxxxxxx"],
"securityGroups": ["sg-xxxxxxxxxxxxxxx"]
}
}
実行
- 最後にDockerをビルドしてECRにpushすれば、DAGからdbtを実行することができます。必要に応じて、コマンドを上書きすることも可能です。
- 今回はDockerイメージをデプロイするための方法について言及していませんが、AWSサービスのCodeBuildやCodePipelineを利用してECRにDockerイメージをビルドするCICDパイプラインを構築することも可能です。
