こんにちは!小澤です。
皆さんは、Snowflakeのステージ上にあるファイルから、テーブルへデータをロードしたことはありますか?
今回は「ステージ上のファイルが変更された際に、再度ロードして変更内容を反映したい」ときの実装方法をご紹介します。
本記事でわかること
- ステージ上のファイルの変更をテーブルに反映する方法
- 全件差分更新の実装方法
- MERGE文の使い方
ファイルの種類と変更内容
今回は、下記のような「id」と「name」を持つCSVデータを考えます。主キーは、「id」とします。
id,name
1,tanaka
2,suzuki
3,sato
ファイルの種類
変更内容を含んだファイルは、大きく分けて2種類に分けられます。
全件ファイル
変更があった行を含む、テーブルのすべての行が含まれているファイルです。
id,name
1,tanaka
2,suzuki
3,sato
4,hayashi
差分ファイル
変更があった行のみが含まれているファイルです。
id,name
4,hayashi
変更内容
ファイルの変更内容は大きく分けて3つあります。
INSERT
レコードが新規追加されます。
id,name
1,tanaka
2,suzuki
3,sato
4,hayashi ※新規追加
UPDATE
既存のレコードの内容が更新されます(主キーで判定)。
id,name
1,tanaka
2,suzuki
3,satoda ※更新
4,hayashi
DELETE
既存のレコードが削除されます。
id,name
1,tanaka
3,satoda
4,hayashi
※この場合2, suzukiが削除されています。
設計
今回は下記の図のような流れで実現します!
※論理削除フラグがないと削除判定が困難であるため、「差分ファイルの削除」は対象外とします。

①ステージ上のファイルから、WORKテーブルにCOPY INTOでロードします。
②ファイルに含まれていないレコードを削除レコードと判断し、本テーブルから削除します(※全件ファイルの場合のみ)。
③MERGEを使用し、本テーブルの主キーと一致しているWORKテーブルのレコードは更新、一致しないレコードは新規追加します。
ポイントは、WORKテーブルに一度COPY INTOでロードしてからMERGEを使用する点です。COPY INTOでは直接MERGEを使用できないため、このような設計にしています。
準備
まずは、使用するオブジェクトの準備をします。
データベースの作成
USE ROLE SYSADMIN;
CREATE DATABASE HANASNOW;
スキーマの作成
CREATE SCHEMA HANASNOW.ZS_KOUSHIN;
WORKテーブルの作成
CREATE TABLE HANASNOW.ZS_KOUSHIN.AAA_WORK (
id NUMBER PRIMARY KEY, -- 主キーに設定
name VARCHAR
);
テーブルの作成&初期データ投入
CREATE TABLE HANASNOW.ZS_KOUSHIN.AAA LIKE HANASNOW.ZS_KOUSHIN.AAA_WORK;
INSERT INTO HANASNOW.ZS_KOUSHIN.AAA VALUES
(1, 'tanaka'),
(2, 'suzuki'),
(3, 'sato')
;
ステージの作成
CREATE STAGE HANASNOW.ZS_KOUSHIN.STG_AAA;
ファイルフォーマットの作成
CREATE FILE FORMAT HANASNOW.ZS_KOUSHIN.FF_CSV_DEFAULT
TYPE = CSV
FIELD_DELIMITER = ','
SKIP_HEADER = 1
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
ENCODING = 'SHIFTJIS';
;
次に、データの準備をします。
CSVファイルをステージにアップロード
Snowsight上の「ステージにファイルをロード」機能を使用して、AAA_zenken.csvとAAA_sabun.csvをステージにアップロードします。
サイドバーから「Add data」を選択

「Load files into a Stage」を選択

ステージを指定し、ファイルをアップロードする

実装
ファイルの存在確認
ファイルがステージ上に存在していることを確認します。
LS @HANASNOW.ZS_KOUSHIN.STG_AAA;

ステージ上にファイルが存在することが確認できました!
全件ファイルの反映
まずは、全件ファイルの変更内容をテーブルに反映させます!
WORKテーブルにCOPY INTO
-- WORKテーブルを空にする
DELETE FROM HANASNOW.ZS_KOUSHIN.AAA_WORK;
-- WORKテーブルにロード
COPY INTO HANASNOW.ZS_KOUSHIN.AAA_WORK
FROM @HANASNOW.ZS_KOUSHIN.STG_AAA/AAA_zenken.csv
FILE_FORMAT = (FORMAT_NAME = 'HANASNOW.ZS_KOUSHIN.FF_CSV_DEFAULT');
WORKテーブル確認
SELECT * FROM HANASNOW.ZS_KOUSHIN.AAA_WORK;

WORKテーブルに全件ファイルの変更内容が入っていることが確認できました!
WORKテーブルに存在しないレコードを本テーブルから削除
DELETE FROM HANASNOW.ZS_KOUSHIN.AAA t
WHERE NOT EXISTS (
SELECT 1 FROM HANASNOW.ZS_KOUSHIN.AAA_WORK s
WHERE t.id = s.id
);

削除されていることが確認できました!
MERGEで本テーブルに変更内容を反映
MERGE INTO HANASNOW.ZS_KOUSHIN.AAA t
USING HANASNOW.ZS_KOUSHIN.AAA_WORK s
ON t.id = s.id
WHEN MATCHED THEN -- 主キーが一致した場合(更新レコード)
UPDATE SET t.name = s.name
WHEN NOT MATCHED THEN -- 主キーが一致しなかった場合(新規追加レコード)
INSERT (id, name) values (s.id, s.name)
;

変更内容が反映されていることが確認できました!
差分ファイルの反映
差分ファイルの反映は、全件ファイルとほとんど同じです!
※赤文字の点が異なります。
-- WORKテーブルを空にする
DELETE FROM HANASNOW.ZS_KOUSHIN.AAA_WORK;
-- WORKテーブルにCOPY INTOする
COPY INTO HANASNOW.ZS_KOUSHIN.AAA_WORK
FROM @HANASNOW.ZS_KOUSHIN.STG_AAA/AAA_sabun.csv
FILE_FORMAT = (FORMAT_NAME = 'HANASNOW.ZS_KOUSHIN.FF_CSV_DEFAULT');
-- 差分ファイルのため削除処理はなし
-- MERGEで本テーブルに変更内容を反映
MERGE INTO HANASNOW.ZS_KOUSHIN.AAA t
USING HANASNOW.ZS_KOUSHIN.AAA_WORK s
ON t.id = s.id
WHEN MATCHED THEN -- 主キーが一致した場合(更新レコード)
UPDATE SET t.name = s.name
WHEN NOT MATCHED THEN -- 主キーが一致しなかった場合(新規追加レコード)
INSERT (id, name) values (s.id, s.name)
;

変更内容が反映されていることが確認できました!
まとめ
想定通り、全件ファイルの場合は新規追加・更新・削除が行われ、
差分ファイルの場合は、新規追加・更新が行われることが確認できました!
- 全件ファイルの場合は、存在しないレコードが削除レコードであること
- WORKテーブルにCOPY INTOして、本テーブルにMERGEを行うこと
この2点を意識することで、全件ファイル・差分ファイルともに、変更内容をテーブルに反映させることができます。
今回は以上です!ありがとうございました!



コメント