Snowflakeの操作履歴データを永続化する方法
風音屋アドバイザーの渡部徹太郎(@fetarodc) です。 本ブログではSnowflakeの COPY_HISTORY、TASK_HISTORY、LOGIN_HISTORYそしてQUERY_HISTORY のデータを永続化する方法を説明します。
デフォルト機能では永続化されない
Snowflakeで定期的なデータ取り込みを実現しようとすると、データを取り込むためのCOPY文や、 定期的にタスクを実行するTASKを使うことが多いかと思います。 それらの履歴データであるCOPY_HISTORYや TASK_HISTORY は、このブログ記事を公開した2023年5月時点では、それぞれ「14日」と「7日」しか保存されません。 履歴が消えてしまうと運用観点で不都合が生じます。例えば、COPYやTASKのエラー件数をSLO/SLIに設定して、毎月の件数の増減をモニタリングするには、少なくとも数ヶ月分の履歴が必要となります。
また、ログインの履歴や発行したSQLの履歴を確認できるように LOGIN_HISTORYビューや QUERY_HISTORYビュー が用意されていますが、これらの保管期間も「1年」となっています。監査対応などで数年間のログ保管が求められる場合、デフォルトの設定では保管期間が足りません。
そこで、このブログではSnowflakeの
- LOGIN_HISTORYビュー
- QUERY_HISTORYビュー
- COPY_HISTORY
- TASK_HISTORY
の4つのHISTORYのデータを永続化する方法を説明します。
永続化の方針
- 各種HISTORYを永続化するテーブルを準備。
- 各種HISTORY表示コマンドの結果を、INSERT文を用いて永続化用のテーブルに追記。
- 追記のINSERT文は、Snowflakeに内蔵されている定期的に処理を実行する仕組みである TASK を用いて定期実行。
作成手順
各HISTORYごとに手順は若干異なります。簡単なものから順に説明していきます。
LOGIN_HISTORYビューの永続化
テーブル作成
永続化用のテーブルを作成します。
作成するテーブルは、元データであるsnowflake.account_usage.login_history
のデータ構造と同じです。
CREATE TABLE my_db.my_schema.login_history (
EVENT_ID NUMBER(38,0),
EVENT_TIMESTAMP TIMESTAMP_LTZ(6),
EVENT_TYPE VARCHAR(16777216),
USER_NAME VARCHAR(16777216),
CLIENT_IP VARCHAR(16777216),
REPORTED_CLIENT_TYPE VARCHAR(16777216),
REPORTED_CLIENT_VERSION VARCHAR(16777216),
FIRST_AUTHENTICATION_FACTOR VARCHAR(16777216),
SECOND_AUTHENTICATION_FACTOR VARCHAR(16777216),
IS_SUCCESS VARCHAR(3),
ERROR_CODE NUMBER(38,0),
ERROR_MESSAGE VARCHAR(16777216),
RELATED_EVENT_ID NUMBER(38,0),
CONNECTION VARCHAR(16777216)
);
定期INSERTするTASKの作成
永続化用テーブルに定期的にINSERTするTASKを作成します。
CREATE TASK backup_login_history
WAREHOUSE = MY_WAREHOUSE
SCHEDULE='USING CRON 0 6 * * MON Asia/Tokyo'
USER_TASK_TIMEOUT_MS=300000
ERROR_INTEGRATION = my_notification_integration
AS
INSERT INTO my_db.my_schema.login_history (
SELECT
EVENT_ID,
EVENT_TIMESTAMP,
EVENT_TYPE,
USER_NAME,
CLIENT_IP,
REPORTED_CLIENT_TYPE,
REPORTED_CLIENT_VERSION,
FIRST_AUTHENTICATION_FACTOR,
SECOND_AUTHENTICATION_FACTOR,
IS_SUCCESS,
ERROR_CODE,
ERROR_MESSAGE,
RELATED_EVENT_ID,
CONNECTION
FROM snowflake.account_usage.login_history
WHERE event_id NOT IN (SELECT event_id FROM my_db.my_schema.login_history)
);
SQLの特徴
- EVENT_IDをもとに未挿入のレコードを特定し、挿入しています。
- 毎週月曜の6時に動作させます。
- バックアップ失敗時に気づけるように、ERROR_INTEGRATIONを設定しています。このERROR_INTEGRATIONはAWSのSNS経由で管理者にメールが飛ぶようにしています。
- 処理のタイムアウトを300秒に設定しています。永続化用テーブルのデータ量が増えてくると、未挿入レコードを検索するコストが増え、実行時間が伸びていく可能性がありますが、タイムアウトを設定することでそのような処理の伸びに気づけるようにしています。
動作確認と定期実行開始
TASKを手動で実行し、動作確認します。
EXECUTE TASK backup_login_history;
実行したTASKの結果を確認します。
SELECT * FROM TABLE(information_schema.task_history());
STATEがSUCCEEDEDとなっていれば成功です。
動作確認し問題がないなら、TASKをRESUMEし、定期実行を開始します。
ALTER TASK backup_login_history RESUME;
以下のSQLを実行し、state が started
であることを確認します
SHOW TASKS LIKE '%backup_login_history%';
※以降に紹介するHISTORYにおいても、動作確認と定期実行開始の手順は同じであるため、割愛します。
QUERY_HISTORYビューの永続化
テーブル作成
永続化用のテーブルを作成します。
作成するテーブルは、元データであるsnowflake.account_usage.query_history
のデータ構造と同じです。
CREATE TABLE my_db.my_schema.query_history (
QUERY_ID VARCHAR(16777216),
QUERY_TEXT VARCHAR(16777216),
DATABASE_ID NUMBER(38,0),
DATABASE_NAME VARCHAR(16777216),
SCHEMA_ID NUMBER(38,0),
SCHEMA_NAME VARCHAR(16777216),
QUERY_TYPE VARCHAR(16777216),
SESSION_ID NUMBER(38,0),
USER_NAME VARCHAR(16777216),
ROLE_NAME VARCHAR(16777216),
WAREHOUSE_ID NUMBER(38,0),
WAREHOUSE_NAME VARCHAR(16777216),
WAREHOUSE_SIZE VARCHAR(16777216),
WAREHOUSE_TYPE VARCHAR(16777216),
CLUSTER_NUMBER NUMBER(38,0),
QUERY_TAG VARCHAR(16777216),
EXECUTION_STATUS VARCHAR(16777216),
ERROR_CODE VARCHAR(16777216),
ERROR_MESSAGE VARCHAR(16777216),
START_TIME TIMESTAMP_LTZ(6),
END_TIME TIMESTAMP_LTZ(6),
TOTAL_ELAPSED_TIME NUMBER(38,0),
BYTES_SCANNED NUMBER(38,0),
PERCENTAGE_SCANNED_FROM_CACHE FLOAT,
BYTES_WRITTEN NUMBER(38,0),
BYTES_WRITTEN_TO_RESULT NUMBER(38,0),
BYTES_READ_FROM_RESULT NUMBER(38,0),
ROWS_PRODUCED NUMBER(38,0),
ROWS_INSERTED NUMBER(38,0),
ROWS_UPDATED NUMBER(38,0),
ROWS_DELETED NUMBER(38,0),
ROWS_UNLOADED NUMBER(38,0),
BYTES_DELETED NUMBER(38,0),
PARTITIONS_SCANNED NUMBER(38,0),
PARTITIONS_TOTAL NUMBER(38,0),
BYTES_SPILLED_TO_LOCAL_STORAGE NUMBER(38,0),
BYTES_SPILLED_TO_REMOTE_STORAGE NUMBER(38,0),
BYTES_SENT_OVER_THE_NETWORK NUMBER(38,0),
COMPILATION_TIME NUMBER(38,0),
EXECUTION_TIME NUMBER(38,0),
QUEUED_PROVISIONING_TIME NUMBER(38,0),
QUEUED_REPAIR_TIME NUMBER(38,0),
QUEUED_OVERLOAD_TIME NUMBER(38,0),
TRANSACTION_BLOCKED_TIME NUMBER(38,0),
OUTBOUND_DATA_TRANSFER_CLOUD VARCHAR(16777216),
OUTBOUND_DATA_TRANSFER_REGION VARCHAR(16777216),
OUTBOUND_DATA_TRANSFER_BYTES NUMBER(38,0),
INBOUND_DATA_TRANSFER_CLOUD VARCHAR(16777216),
INBOUND_DATA_TRANSFER_REGION VARCHAR(16777216),
INBOUND_DATA_TRANSFER_BYTES NUMBER(38,0),
LIST_EXTERNAL_FILES_TIME NUMBER(38,0),
CREDITS_USED_CLOUD_SERVICES FLOAT,
RELEASE_VERSION VARCHAR(16777216),
EXTERNAL_FUNCTION_TOTAL_INVOCATIONS NUMBER(38,0),
EXTERNAL_FUNCTION_TOTAL_SENT_ROWS NUMBER(38,0),
EXTERNAL_FUNCTION_TOTAL_RECEIVED_ROWS NUMBER(38,0),
EXTERNAL_FUNCTION_TOTAL_SENT_BYTES NUMBER(38,0),
EXTERNAL_FUNCTION_TOTAL_RECEIVED_BYTES NUMBER(38,0),
QUERY_LOAD_PERCENT NUMBER(38,0),
IS_CLIENT_GENERATED_STATEMENT BOOLEAN,
QUERY_ACCELERATION_BYTES_SCANNED NUMBER(38,0),
QUERY_ACCELERATION_PARTITIONS_SCANNED NUMBER(38,0),
QUERY_ACCELERATION_UPPER_LIMIT_SCALE_FACTOR NUMBER(38,0)
);
定期INSERTするTASKの作成
永続化用テーブルに定期的にINSERTするTASKを作成します。 QUERY_IDをもとに未挿入のレコードを特定し、挿入しています。
CREATE TASK backup_query_history
WAREHOUSE = MY_WAREHOUSE
SCHEDULE='USING CRON 0 6 * * MON Asia/Tokyo'
USER_TASK_TIMEOUT_MS=300000
ERROR_INTEGRATION = my_notification_integration
AS
INSERT INTO my_db.my_schema.query_history (
SELECT
QUERY_ID,
QUERY_TEXT,
DATABASE_ID,
DATABASE_NAME,
SCHEMA_ID,
SCHEMA_NAME,
QUERY_TYPE,
SESSION_ID,
USER_NAME,
ROLE_NAME,
WAREHOUSE_ID,
WAREHOUSE_NAME,
WAREHOUSE_SIZE,
WAREHOUSE_TYPE,
CLUSTER_NUMBER,
QUERY_TAG,
EXECUTION_STATUS,
ERROR_CODE,
ERROR_MESSAGE,
START_TIME,
END_TIME,
TOTAL_ELAPSED_TIME,
BYTES_SCANNED,
PERCENTAGE_SCANNED_FROM_CACHE,
BYTES_WRITTEN,
BYTES_WRITTEN_TO_RESULT,
BYTES_READ_FROM_RESULT,
ROWS_PRODUCED,
ROWS_INSERTED,
ROWS_UPDATED,
ROWS_DELETED,
ROWS_UNLOADED,
BYTES_DELETED,
PARTITIONS_SCANNED,
PARTITIONS_TOTAL,
BYTES_SPILLED_TO_LOCAL_STORAGE,
BYTES_SPILLED_TO_REMOTE_STORAGE,
BYTES_SENT_OVER_THE_NETWORK,
COMPILATION_TIME,
EXECUTION_TIME,
QUEUED_PROVISIONING_TIME,
QUEUED_REPAIR_TIME,
QUEUED_OVERLOAD_TIME,
TRANSACTION_BLOCKED_TIME,
OUTBOUND_DATA_TRANSFER_CLOUD,
OUTBOUND_DATA_TRANSFER_REGION,
OUTBOUND_DATA_TRANSFER_BYTES,
INBOUND_DATA_TRANSFER_CLOUD,
INBOUND_DATA_TRANSFER_REGION,
INBOUND_DATA_TRANSFER_BYTES,
LIST_EXTERNAL_FILES_TIME,
CREDITS_USED_CLOUD_SERVICES,
RELEASE_VERSION,
EXTERNAL_FUNCTION_TOTAL_INVOCATIONS,
EXTERNAL_FUNCTION_TOTAL_SENT_ROWS,
EXTERNAL_FUNCTION_TOTAL_RECEIVED_ROWS,
EXTERNAL_FUNCTION_TOTAL_SENT_BYTES,
EXTERNAL_FUNCTION_TOTAL_RECEIVED_BYTES,
QUERY_LOAD_PERCENT,
IS_CLIENT_GENERATED_STATEMENT,
QUERY_ACCELERATION_BYTES_SCANNED,
QUERY_ACCELERATION_PARTITIONS_SCANNED,
QUERY_ACCELERATION_UPPER_LIMIT_SCALE_FACTOR
FROM snowflake.account_usage.query_history
WHERE query_id NOT IN (SELECT query_id FROM my_db.my_schema.query_history)
);
COPY_HISTORYの永続化
テーブル作成
永続化用のテーブルを作成します。
作成するテーブルは、元データであるinformation_schema.copy_history
のデータ構造とほぼ同じものを作りますが、
元データにはユニークなキーがないため、独自にCOPY_HISTORY_ID
という列を追加しています。
CREATE TABLE my_db.my_schema.copy_history(
COPY_HISTORY_ID text PRIMARY KEY, -- 独自に追加
-- 以下の列は元テーブルと同じ構造
FILE_NAME TEXT,
STAGE_LOCATION TEXT,
LAST_LOAD_TIME TIMESTAMP_LTZ,
ROW_COUNT NUMBER,
ROW_PARSED NUMBER,
FILE_SIZE NUMBER,
FIRST_ERROR_MESSAGE TEXT,
FIRST_ERROR_LINE_NUMBER NUMBER,
FIRST_ERROR_CHARACTER_POS NUMBER,
FIRST_ERROR_COLUMN_NAME TEXT,
ERROR_COUNT NUMBER,
ERROR_LIMIT NUMBER,
STATUS TEXT,
TABLE_CATALOG_NAME TEXT,
TABLE_SCHEMA_NAME TEXT,
TABLE_NAME TEXT,
PIPE_CATALOG_NAME TEXT,
PIPE_SCHEMA_NAME TEXT,
PIPE_NAME TEXT,
PIPE_RECEIVED_TIME TIMESTAMP_LTZ
);
定期INSERTするTASKの作成
次に、永続化用テーブルに定期的にINSERTするTASKを作成します。
CREATE TASK backup_copy_history_for_my_target_table
WAREHOUSE = MY_WAREHOUSE
SCHEDULE='USING CRON 0 6 * * MON Asia/Tokyo'
USER_TASK_TIMEOUT_MS=300000
ERROR_INTEGRATION = my_notification_integration
AS
INSERT INTO my_db.my_schema.copy_history (
SELECT
FILE_NAME || to_char(DATE_PART( epoch_second,LAST_LOAD_TIME)) AS COPY_HISTORY_ID,
FILE_NAME,
STAGE_LOCATION,
LAST_LOAD_TIME,
ROW_COUNT,
ROW_PARSED,
FILE_SIZE,
FIRST_ERROR_MESSAGE,
FIRST_ERROR_LINE_NUMBER,
FIRST_ERROR_CHARACTER_POS,
FIRST_ERROR_COLUMN_NAME,
ERROR_COUNT,
ERROR_LIMIT,
STATUS,
TABLE_CATALOG_NAME,
TABLE_SCHEMA_NAME,
TABLE_NAME,
PIPE_CATALOG_NAME,
PIPE_SCHEMA_NAME,
PIPE_NAME,
PIPE_RECEIVED_TIME
FROM table(information_schema.copy_history(
table_name=>'my_db.my_schema.my_target_table',
start_time=> dateadd(month, -1, current_timestamp()))
)
WHERE (FILE_NAME || to_char(DATE_PART( epoch_second,LAST_LOAD_TIME)))
NOT IN (SELECT COPY_HISTORY_ID FROM my_db.my_schema.copy_history)
);
SQLの特徴
- 未挿入レコードを判定するために、ファイル名とロード時間を文字列として結合しユニークなIDを生成し、このユニークなIDを用いて、挿入先のテーブルに未挿入かを判定しています。
- COPY_HISTORYはCOPYが実行される対象テーブルを指定して取得する必要がありますが、このSQLでは対象テーブルとして
my_db.my_schema.my_target_table
を指定しています。 start_time=> dateadd(month, -1, current_timestamp()))
で直近1ヶ月前のデータに絞り込んでいます。
TASK_HISTORYの永続化
テーブル作成
永続化用のテーブルを作成します。
作成するテーブルは、元データであるinformation_schema.copy_history
のデータ構造とほぼ同じものを作りますが、
元このテーブルにはユニークなキーがないため、独自にTASK_HISTORY_ID
という列を追加しています。
CREATE TABLE my_db.my_schema.task_history
(
TASK_HISTORY_ID text PRIMARY KEY, -- 独自に追加
-- 以下の列は元テーブルと同じ構造
QUERY_ID text,
NAME text,
DATABASE_NAME text,
SCHEMA_NAME text,
QUERY_TEXT text,
CONDITION_TEXT text,
STATE text,
ERROR_CODE number,
ERROR_MESSAGE text,
SCHEDULED_TIME timestamp_tz,
QUERY_START_TIME timestamp_tz,
NEXT_SCHEDULED_TIME timestamp_tz,
COMPLETED_TIME timestamp_tz,
ROOT_TASK_ID text,
GRAPH_VERSION number,
RUN_ID number,
RETURN_VALUE text,
SCHEDULED_FROM text
);
定期INSERTするTASKの作成
永続化用テーブルに定期的にINSERTするTASKを作成します。 未挿入レコードを判定するために、タスク名とクエリ開始時間を結合しユニークなIDを生成し、このユニークなIDを用いて、挿入先のテーブルに未挿入かを判定しています。
CREATE TASK backup_task_history
WAREHOUSE = MY_WAREHOUSE
SCHEDULE='USING CRON 0 6 * * MON Asia/Tokyo'
USER_TASK_TIMEOUT_MS=300000
ERROR_INTEGRATION = my_notification_integration
AS
INSERT INTO my_db.my_schema.task_history (
SELECT NAME || to_char(DATE_PART( epoch_second,QUERY_START_TIME)) AS TASK_HISTORY_ID,
QUERY_ID,
NAME,
DATABASE_NAME,
SCHEMA_NAME,
QUERY_TEXT,
CONDITION_TEXT,
STATE,
ERROR_CODE,
ERROR_MESSAGE,
SCHEDULED_TIME,
QUERY_START_TIME,
NEXT_SCHEDULED_TIME,
COMPLETED_TIME,
ROOT_TASK_ID,
GRAPH_VERSION,
RUN_ID,
RETURN_VALUE,
SCHEDULED_FROM
FROM TABLE(information_schema.task_history())
WHERE QUERY_START_TIME IS NOT NULL
AND NAME || to_char(DATE_PART( epoch_second,QUERY_START_TIME))
NOT IN (SELECT TASK_HISTORY_ID FROM my_db.my_schema.task_history)
);
まとめ
本ブログでは、Snowflakeの
- LOGIN_HISTORYビュー
- QUERY_HISTORYビュー
- COPY_HISTORY
- TASK_HISTORY
の4つのHISTORYを永続化する方法を説明しました。
HISTORYの種類によって複雑さは若干異なりますが、Snowflakeの標準機能であるTASKのみで簡単に設定できるので、まだ永続化していないのであればぜひ試してみてください。