Snowflakeのエラー通知をTerraformで設定する

Snowflakeのエラー通知設定

  • SnowflakeのタスクやSnowpipeでエラーが発生した場合、Snowflakeのクラウド通知統合でクラウドのメッセージ通知サービスと連携することでSlackやメールなどにエラー通知を行うことができます。本稿ではAWSのSNSでSlackへエラー通知する場合の設定手順を例示します
  1. AWSの設定
    • SNSトピックの作成
    • IAMロールの作成(SnowflakeからSNSへ通知するための権限を許可)
    • Snowflake側のクラウド通知統合作成後のユーザーARNと外部IDを信頼関係ポリシーに反映
  2. Snowflakeの設定
    • クラウド通知統合を作成
    • TaskもしくはSnowpipeの作成時もしくは既存の場合は変更し、クラウド通知統合を設定する
  3. Slackへの通知設定
    • SNSからのメッセージを受信してSlackへ通知するためのLambda関数を実装
    • SNSのサブスクリプションに実装したLambdaを設定

Terraformでリソースを作成する

  • 本稿では、1~3の設定をTerraformで作成する。Terraformのソースコードは、GitHubで公開しています

1.AWSの設定

  • エラー通知に必要となるリソースをTerraformで作成します。SNSトピックを作成するTerraformは以下のようになります。SNSトピックの名前はTerraform外部変数(sns_topic_name)で定義します
resource "aws_sns_topic" "snowflake_notification" {
  name = "${var.aws_sns_topic_name}"
}
  • IAMロールを作成します。SnowflakeからのSNSへのアクセスを許可する設定を行うため、信頼関係ポリシーを定義したテンプレートファイルに後述するSnowflakeのクラウド通知統合で払い出されたIAMユーザーのARNと外部IDを設定します
resource "aws_iam_role" "snowflake_notification" {
  name = "${var.snowflake_notif_iam_role_name}"
  assume_role_policy = templatefile(
    "./roles/snowflake_assume_policy.json",
    {
      sns_user_arn = snowflake_notification_integration.notif_int.aws_sns_iam_user_arn,
      external_id = snowflake_notification_integration.notif_int.aws_sns_external_id
    }
  )
}

resource "aws_iam_role_policy" "snowflake_notification" {
  name = "snowflake-notification-policy"
  role = aws_iam_role.snowflake_notification.name
  policy = templatefile(
    "./roles/snowflake_notification.json",
    {
      sns_arn = aws_sns_topic.snowflake_notification.arn
    }
  )
}
  • 信頼関係ポリシーのテンプレートファイル(./roles/snowflake_assume_policy.json)を配置します
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "${sns_user_arn}"
                
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "sts:ExternalId": "${external_id}"
                }
            }
        }
    ]
}
  • IAMロールのポリシーのテンプレートファイル(./roles/snowflake_notification.json)は、作成したSNSトピックに対してsns:Publishアクションを許可します
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": [
                "sns:Publish"
            ],
            "Resource": [
                "${sns_arn}"
            ]
        }
    ]
}
  • 次にSlackへ通知を行うLambda関数の実行時に許可するポリシーを定義します。Lambda関数のログをCloudWatchLogsに出力させるため、ログ出力の許可設定を行います。ここでは、ソースコードは割愛します(GitHubのソースコードを参照)

2.Snowflakeの設定

  • Snowflakeのクラウド通知統合をTerraformで作成します。SNSへのアクセスを許可するIAMロールARNとSNSトピックARNを設定する必要がありますが、それぞれ先ほど定義したIAMロールとSNSトピック名を元に決め打ちで定義することができます
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}

resource "snowflake_notification_integration" "notif_int" {
  name     = var.snowflake_notif_int_name
  comment  = "Snowpipe・タスク エラー通知のためのnotification integration"

  enabled   = true
  type      = "QUEUE"
  direction = "OUTBOUND"

  # AWS_SNS
  notification_provider = "AWS_SNS"
  aws_sns_role_arn = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:role/${var.snowflake_notif_iam_role_name}"
  aws_sns_topic_arn = "arn:aws:sns:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:${var.aws_sns_topic_name}"
}
  • snowflakeプロバイダーの設定はaccount_idとsnowflake_userは変数で定義しています。今回のソースコードでは、パスワードもしくは秘密鍵のパス情報は環境変数で設定しているため、コード上での定義はしていません。環境変数はSnowflakeプロバイダーの公式ページを参考に設定してください
provider "snowflake" {
  account  = "${var.snowflake_account_id}"
  user     = "${var.snowflake_user}"
  role     = "SYSADMIN"
}

3.Slackへの通知設定

  • これまで定義したリソースでSnowflakeのクラウド通知統合とAWSのSNSトピックの連携ができているため、エラー通知をSNSへパブリッシュして通知を行うことはできるが、Snowflakeの通知メッセージはJSON形式となっているため、メッセージを整形してSlackへ通知を行うLambda関数を定義します
  • Lambdaのソースコードは Snowpipeとタスクのエラー通知をサクッと設定するよ の記事を参考にさせていただきました
  • ソースコードをlambda/srcパス上に配置して、以下のTerraformコードでデプロイします
  • SlackAPIに設定するWeb hook urlは環境変数として定義しています
data archive_file "slack_notification" {
  type        = "zip"
  source_dir  = "lambda/src"
  output_path = "lambda/build/slack_notification.zip"
}

resource "aws_lambda_function" "slack_notification" {
  filename           = "${data.archive_file.slack_notification.output_path}"
  function_name      = "slack_notification"
  description        = "Lambda function that receives Snowflake errors from SNS and notifies Slack."
  role               = aws_iam_role.slack_notification.arn
  handler            = "lambda_function.lambda_handler"
  source_code_hash   = data.archive_file.slack_notification.output_base64sha256
  runtime            = "python3.8"
  timeout            = 60
  environment {
    variables = {
      LOG_LEVEL      = var.log_level
      WEBHOOK_URL    = var.webhook_url
      SLACK_CHANNEL  = var.slack_channel
      SLACK_USERNAME = var.slack_username
    }
  }
}
  • SNSからLambdaへ連携するためにSNSのサブスクリプションとしてLambda関数を設定します
resource "aws_sns_topic_subscription" "slack_notification" {
  topic_arn = aws_sns_topic.snowflake_notification.arn
  protocol  = "lambda"
  endpoint  = aws_lambda_function.slack_notification.arn
}

resource "aws_lambda_permission" "slack_notification" {
  statement_id  = "AllowExecutionFromSNS"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.slack_notification.arn
  principal     = "sns.amazonaws.com"
  source_arn    = aws_sns_topic.snowflake_notification.arn
}

エラー通知の確認

  • terraform applyを実行してリソースを作成します。ここまでの設定で通知設定は完了しました。動作を確認するため、Snowflakeのタスクにクラウド通知統合を設定してエラーの動作を確認してみましょう
  • 外部ステージからログファイルをCOPY INTOで取り込むタスクを定義しています。ERROR_INTEGRATIONに作成したクラウド通知統合の名前を設定します。外部ステージなどの設定方法は本稿では割愛します
CREATE OR REPLACE TASK SUGA_EXT_STAGE_LOAD_TASK
    SCHEDULE = 'USING CRON 00 08 * * * Asia/Tokyo'
    WAREHOUSE = COMPUTE_WH
    ERROR_INTEGRATION = aws_sns_notification
    as
        EXECUTE IMMEDIATE $$
            BEGIN
              USE ROLE LOAD_TASK_ROLE;
              USE DATABASE ANALYTICS;
              USE SCHEMA PUBLIC;
              COPY INTO LOG FROM @LOG_EXT_STAGE;
              RETURN 'success';
            END;
        $$;
  • エラーを発生させるために外部ステージのパス上に画像データを配置してタスクを手動で実行してみます
  • imformation_schame.task_historyでタスク実行時にエラーが発生していることを確認します
ALTER TASK SUGA_EXT_STAGE_LOAD_TASK RESUME;
EXECUTE TASK SUGA_EXT_STAGE_LOAD_TASK;
SELECT * FROM table(information_schema.task_history(result_limit=> 10000)) WHERE NAME = 'SUGA_EXT_STAGE_LOAD_TASK';
  • Slackへエラー通知されることが確認できました

まとめ

  • TerraformでSnowflakeのクラウド通知統合とAWS SNSを連携し、Lamda関数でSlackで通知する設定をおこないました。Snowflakeのクラウド通知統合は上記のタスクの他、Snowpipeやストアドプロシージャからメッセージを通知することができるため、本稿の設定を参考にしていただければと思います