Snowflakeのエラー通知設定
- SnowflakeのタスクやSnowpipeでエラーが発生した場合、Snowflakeのクラウド通知統合でクラウドのメッセージ通知サービスと連携することでSlackやメールなどにエラー通知を行うことができます。本稿ではAWSのSNSでSlackへエラー通知する場合の設定手順を例示します
- AWSの設定
- SNSトピックの作成
- IAMロールの作成(SnowflakeからSNSへ通知するための権限を許可)
- Snowflake側のクラウド通知統合作成後のユーザーARNと外部IDを信頼関係ポリシーに反映
- Snowflakeの設定
- クラウド通知統合を作成
- TaskもしくはSnowpipeの作成時もしくは既存の場合は変更し、クラウド通知統合を設定する
- Slackへの通知設定
- SNSからのメッセージを受信してSlackへ通知するためのLambda関数を実装
- SNSのサブスクリプションに実装したLambdaを設定
Terraformでリソースを作成する
- 本稿では、1~3の設定をTerraformで作成する。Terraformのソースコードは、GitHubで公開しています
1.AWSの設定
- エラー通知に必要となるリソースをTerraformで作成します。SNSトピックを作成するTerraformは以下のようになります。SNSトピックの名前はTerraform外部変数(
sns_topic_name
)で定義します
resource "aws_sns_topic" "snowflake_notification" {
name = "${var.aws_sns_topic_name}"
}
- IAMロールを作成します。SnowflakeからのSNSへのアクセスを許可する設定を行うため、信頼関係ポリシーを定義したテンプレートファイルに後述するSnowflakeのクラウド通知統合で払い出されたIAMユーザーのARNと外部IDを設定します
resource "aws_iam_role" "snowflake_notification" {
name = "${var.snowflake_notif_iam_role_name}"
assume_role_policy = templatefile(
"./roles/snowflake_assume_policy.json",
{
sns_user_arn = snowflake_notification_integration.notif_int.aws_sns_iam_user_arn,
external_id = snowflake_notification_integration.notif_int.aws_sns_external_id
}
)
}
resource "aws_iam_role_policy" "snowflake_notification" {
name = "snowflake-notification-policy"
role = aws_iam_role.snowflake_notification.name
policy = templatefile(
"./roles/snowflake_notification.json",
{
sns_arn = aws_sns_topic.snowflake_notification.arn
}
)
}
- 信頼関係ポリシーのテンプレートファイル(
./roles/snowflake_assume_policy.json
)を配置します
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "${sns_user_arn}"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "${external_id}"
}
}
}
]
}
- IAMロールのポリシーのテンプレートファイル(
./roles/snowflake_notification.json
)は、作成したSNSトピックに対してsns:Publish
アクションを許可します
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Statement1",
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": [
"${sns_arn}"
]
}
]
}
- 次にSlackへ通知を行うLambda関数の実行時に許可するポリシーを定義します。Lambda関数のログをCloudWatchLogsに出力させるため、ログ出力の許可設定を行います。ここでは、ソースコードは割愛します(GitHubのソースコードを参照)
2.Snowflakeの設定
- Snowflakeのクラウド通知統合をTerraformで作成します。SNSへのアクセスを許可するIAMロールARNとSNSトピックARNを設定する必要がありますが、それぞれ先ほど定義したIAMロールとSNSトピック名を元に決め打ちで定義することができます
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
resource "snowflake_notification_integration" "notif_int" {
name = var.snowflake_notif_int_name
comment = "Snowpipe・タスク エラー通知のためのnotification integration"
enabled = true
type = "QUEUE"
direction = "OUTBOUND"
# AWS_SNS
notification_provider = "AWS_SNS"
aws_sns_role_arn = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:role/${var.snowflake_notif_iam_role_name}"
aws_sns_topic_arn = "arn:aws:sns:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:${var.aws_sns_topic_name}"
}
- snowflakeプロバイダーの設定はaccount_idとsnowflake_userは変数で定義しています。今回のソースコードでは、パスワードもしくは秘密鍵のパス情報は環境変数で設定しているため、コード上での定義はしていません。環境変数はSnowflakeプロバイダーの公式ページを参考に設定してください
provider "snowflake" {
account = "${var.snowflake_account_id}"
user = "${var.snowflake_user}"
role = "SYSADMIN"
}
3.Slackへの通知設定
- これまで定義したリソースでSnowflakeのクラウド通知統合とAWSのSNSトピックの連携ができているため、エラー通知をSNSへパブリッシュして通知を行うことはできるが、Snowflakeの通知メッセージはJSON形式となっているため、メッセージを整形してSlackへ通知を行うLambda関数を定義します
- Lambdaのソースコードは Snowpipeとタスクのエラー通知をサクッと設定するよ の記事を参考にさせていただきました
- ソースコードをlambda/srcパス上に配置して、以下のTerraformコードでデプロイします
- SlackAPIに設定するWeb hook urlは環境変数として定義しています
data archive_file "slack_notification" {
type = "zip"
source_dir = "lambda/src"
output_path = "lambda/build/slack_notification.zip"
}
resource "aws_lambda_function" "slack_notification" {
filename = "${data.archive_file.slack_notification.output_path}"
function_name = "slack_notification"
description = "Lambda function that receives Snowflake errors from SNS and notifies Slack."
role = aws_iam_role.slack_notification.arn
handler = "lambda_function.lambda_handler"
source_code_hash = data.archive_file.slack_notification.output_base64sha256
runtime = "python3.8"
timeout = 60
environment {
variables = {
LOG_LEVEL = var.log_level
WEBHOOK_URL = var.webhook_url
SLACK_CHANNEL = var.slack_channel
SLACK_USERNAME = var.slack_username
}
}
}
- SNSからLambdaへ連携するためにSNSのサブスクリプションとしてLambda関数を設定します
resource "aws_sns_topic_subscription" "slack_notification" {
topic_arn = aws_sns_topic.snowflake_notification.arn
protocol = "lambda"
endpoint = aws_lambda_function.slack_notification.arn
}
resource "aws_lambda_permission" "slack_notification" {
statement_id = "AllowExecutionFromSNS"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.slack_notification.arn
principal = "sns.amazonaws.com"
source_arn = aws_sns_topic.snowflake_notification.arn
}
エラー通知の確認
- terraform applyを実行してリソースを作成します。ここまでの設定で通知設定は完了しました。動作を確認するため、Snowflakeのタスクにクラウド通知統合を設定してエラーの動作を確認してみましょう
- 外部ステージからログファイルをCOPY INTOで取り込むタスクを定義しています。
ERROR_INTEGRATION
に作成したクラウド通知統合の名前を設定します。外部ステージなどの設定方法は本稿では割愛します
CREATE OR REPLACE TASK SUGA_EXT_STAGE_LOAD_TASK
SCHEDULE = 'USING CRON 00 08 * * * Asia/Tokyo'
WAREHOUSE = COMPUTE_WH
ERROR_INTEGRATION = aws_sns_notification
as
EXECUTE IMMEDIATE $$
BEGIN
USE ROLE LOAD_TASK_ROLE;
USE DATABASE ANALYTICS;
USE SCHEMA PUBLIC;
COPY INTO LOG FROM @LOG_EXT_STAGE;
RETURN 'success';
END;
$$;
- エラーを発生させるために外部ステージのパス上に画像データを配置してタスクを手動で実行してみます
- imformation_schame.task_historyでタスク実行時にエラーが発生していることを確認します
ALTER TASK SUGA_EXT_STAGE_LOAD_TASK RESUME;
EXECUTE TASK SUGA_EXT_STAGE_LOAD_TASK;
SELECT * FROM table(information_schema.task_history(result_limit=> 10000)) WHERE NAME = 'SUGA_EXT_STAGE_LOAD_TASK';
- Slackへエラー通知されることが確認できました
まとめ
- TerraformでSnowflakeのクラウド通知統合とAWS SNSを連携し、Lamda関数でSlackで通知する設定をおこないました。Snowflakeのクラウド通知統合は上記のタスクの他、Snowpipeやストアドプロシージャからメッセージを通知することができるため、本稿の設定を参考にしていただければと思います