top of page

Cloud Storage To Snowflake: Schema Drift Handling With Snowflake Scripting

Author: Ganesh Goel



Overview

Data ingestion from cloud storage involves transferring data from cloud storage systems such as AWS S3, Azure Blob, or Google Cloud Storage to Snowflake. The data may come in various formats and structures, and may also have schema drift over time due to changes in the source system. Handling schema drift requires implementing a flexible data pipeline that can adapt to changes in data structure and schema, using techniques such as schema inference, and data validation. Proper handling of schema drift ensures that the data remains useful and relevant for downstream applications and analysis.


This article will provide you with knowledge on how to transfer data from a cloud storage platform (here S3) to Snowflake and effectively manage its schema drift through the use of Snowflake scripting. This will also include creating a log table for tracking all the files that got ingested during the execution.


Prerequisites
  • External stage and storage integration are already set up in Snowflake.

  • The required File Format is already created.

Implementation

This process will consist of creating 3 stored procedures(which can be changed based on the requirements):

  • Master procedure - This will fetch the list of files to be loaded from the Stage.

  • Driver procedure - This procedure will pass on the flow to the final procedure based on what type of operation will be performed(Create, Insert, Alter).

  • Child procedure - This procedure will perform the data ingestion operation in the tables.


PS: This flow can be modified as per your requirements.


Steps for creating the stored procedures to set up the Ingestion process

Master Procedure

1. Create Log table


This table will keep track of the status of all the files loaded.


create table if not exists metadata.S3_RAW_LOAD_LOGS( ID number identity(1,1), FILE_NAME varchar, TABLE_NAME varchar, FILE_PATH varchar, FILE_CREATION_DATE varchar, START_TIME datetime, END_TIME datetime, STATUS varchar, ROWS_STAGED varchar, MODE_OF_OPERATION varchar, LAST_EXECUTED_QUERY varchar, MESSAGE varchar );


2. Fetch files to be loaded


Here we fetch and filter the list of files that need to be loaded from the stage, so that the same file doesn’t get loaded again.


Replace {{STAGE_NAME}} with the name of the stage you have created and {{STAGE_LOCATION}} with the path configured inside the stage.


LIST @{{STAGE_NAME}}; SELECT query_id into :query_id FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY()) WHERE QUERY_TEXT = 'LIST @{{STAGE_NAME}}' ORDER BY START_TIME DESC LIMIT 1; let Fetch_files_query varchar:='select split_part("name",\'/\',-1) file_name, replace("name",\'{{STAGE_LOCATION}}\',\'\') path ,"last_modified" date from table(result_scan(\'' ||:query_id ||'\')) where "name" not in (select distinct file_path from metadata.S3_RAW_LOAD_LOGS where status=\'SUCCESS\' and file_path is not NULL) order by replace(date,\' GMT\')::date'; files_list:=(execute immediate :Fetch_files_query);



3. Iterate over files and Insert in log table


This is for loading each file into its required table


let c cursor for files_list; open c; for i in c do table_name:=upper(rtrim(i.FILE_NAME,'_01234567890.csv')); file_path :=i.PATH; last_modified:=i.DATE; file_name:=i.FILE_NAME; insert into metadata.S3_RAW_LOAD_LOGS( file_name,file_path,table_name,file_creation_date,start_time,status) values ( :file_name, '{{STAGE_LOCATION}}'||:file_path, :table_name, :last_modified, getdate(), 'RUNNING' ); call raw.sp_Load_raw_data_Driver(:table_name,:file_path,:raw_schema_name); end for; close c;


Master Procedure Code

create or replace procedure RAW.SP_LOAD_RAW_DATA_MASTER() returns varchar language sql EXECUTE AS CALLER as $$ declare files_list resultset; file_path varchar; query_id varchar; begin create table if not exists metadata.S3_RAW_LOAD_LOGS( ID number identity(1,1), FILE_NAME varchar, TABLE_NAME varchar, FILE_PATH varchar, FILE_CREATION_DATE varchar, START_TIME datetime, END_TIME datetime, STATUS varchar, ROWS_STAGED varchar, LAST_EXECUTED_QUERY varchar, MESSAGE varchar ); let table_name varchar:=''; let raw_schema_name varchar:='{{SCHEMA_NAME}}'; let last_modified varchar:=''; let file_name varchar:=''; Update metadata.S3_RAW_LOAD_LOGS set status='FAILED' where status='RUNNING'; LIST @{{STAGE_NAME}}; SELECT query_id into :query_id FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY()) WHERE QUERY_TEXT = 'LIST @{{STAGE_NAME}}' ORDER BY START_TIME DESC LIMIT 1; let Fetch_files_query varchar:='select split_part("name",\'/\',-1) file_name, replace("name",\'{{STAGE_LOCATION}}\',\'\') path ,"last_modified" date from table(result_scan(\'' ||:query_id ||'\')) where "name" not in (select distinct file_path from metadata.S3_RAW_LOAD_LOGS where status=\'SUCCESS\' and file_path is not NULL) order by replace(date,\' GMT\')::date'; query_executed:=:Fetch_files_query; files_list:=(execute immediate :Fetch_files_query); let c cursor for files_list; open c; for i in c do table_name:=upper(rtrim(i.FILE_NAME,'_01234567890.csv')); file_path :=i.PATH; last_modified:=i.DATE; file_name:=i.FILE_NAME; insert into metadata.S3_RAW_LOAD_LOGS( file_name,file_path,table_name,file_creation_date,start_time,status) values ( :file_name, '{{STAGE_LOCATION}}'||:file_path, :table_name, :last_modified, getdate(), 'RUNNING' ); call raw.sp_Load_raw_data_driver(:table_name,:file_path,:raw_schema_name); end for; close c; return 'EXECUTION_COMPLETE'; end; $$;


Driver Procedure

1. Create table


Here we create a file format with the wrong Field Delimiter, so that all the headers from the file will be fetched in a single line.


For ex:

CREATE OR REPLACE FILE FORMAT RAW.HEADER_FF FIELD_DELIMITER = '%%%$$$$', RECORD_DELIMITER = '\n', SKIP_HEADER = 0;


Now, check if the table already exists or not. If it doesn’t, then create the table.


fetch_headers_query :='select replace($1,\',\',\' varchar, \') headers from \'@{{STAGE_NAME}}/' ||:file_path || '\'(file_format=>{{HEADER_FILE_FORMAT}}) limit 1'; execute_fetch_headers :=(execute immediate :fetch_headers_query); let cur cursor for execute_fetch_headers; select count(*) count into :table_exists from information_schema.tables where table_name=:table_name and table_schema=:raw_schema_name; for j in cur do headers_list:=j.HEADERS; end for; if(:table_exists=0) then --If table doesn't exist create_raw_query:='CREATE TABLE '||:raw_schema_name || '.' ||:table_name || '(' || :headers_list || ' varchar, LOAD_TIMESTAMP timestamp default getdate(), LOAD_FILE_NAME varchar)'; query_executed:=:create_raw_query; execute immediate :create_raw_query; call raw.SP_LOAD_RAW_DATA_CHILD(:headers_list,:raw_schema_name,:table_name,:file_path,'CREATE & INSERT');


2. Check for Schema drift


Check if the number of columns in Snowflake are matching with the number of columns in the file.


select count(*)=1 into :column_matched from( select count(column_name) count from information_schema.columns where table_name=:table_name and table_schema=:raw_schema_name and column_name not in ('LOAD_FILE_NAME','LOAD_TIMESTAMP') union select count(t.value) from table(split_to_table(upper(replace(replace(:headers_list,' varchar',''),' ','')),',')) as t);

  1. Number of columns Matched

If there are same number of columns then check for their hash value if it’s the same or not.


if(:column_matched=TRUE) then --If table has same number of columns then check md5 select count(*) into :match_headers from ( select md5(listagg(column_name,',')within group (order by column_name)) md5 from information_schema.columns where table_name=:table_name and table_schema=:raw_schema_name and column_name not in ('LOAD_FILE_NAME','LOAD_TIMESTAMP') union select md5(listagg(value,',')within group (order by value)) md5 from table(split_to_table(upper(replace(replace(:headers_list,' varchar',''),' ','')),',')) as t );

a. Structure is same


If the hash value gets matched then this means that the file contains the same columns as present in Snowflake.


if (:match_headers=1) then --If md5 value is matching call raw.sp_Load_raw_data_child(:headers_list,:raw_schema_name,:table_name,:file_path,'INSERT W/O UPDATE');


b. Structure is changed


If the hash value doesn’t match then this means that the file structure has changed and contains probable column addition/deletion/renaming.


call raw.sp_Load_raw_data_child(:headers_list,:raw_schema_name,:table_name,:file_path,'ALTER & INSERT');


2. Number of columns mismatched


This means there is a schema drift, but still there is a possibility that you might not require to alter your schema. If columns present in the source file is a subset of the columns present in Snowflake, then you can optionally skip performing an alter operation on the table. For this first you need to check the number of columns in both the systems, i.e, Snowflake and S3 file.


select target_count<source_count into :column_count from( select count(column_name) target_count,max(source_count) source_count from information_schema.columns t1 inner join (select count(t.value) source_count from table(split_to_table(upper(replace(replace(:headers_list,' varchar',''),' ','')),',')) as t) as t2 where t1.table_name=:table_name and t1.table_schema=:raw_schema_name and t1.column_name not in ('LOAD_FILE_NAME','LOAD_TIMESTAMP'));


a.Source has less columns than target


This means there is a chance that the source file is a subset of the target.

Now perform a check for it.


if(column_count=FALSE) then --If Target has more number of columns than source select count(column_name)=0 into :column_is_subset from( select t.value column_name from table(split_to_table(upper(replace(replace(:headers_list,' varchar',''),' ','')),',')) as t where column_name not in( select upper(column_name) column_name from information_schema.columns where table_name=:table_name and table_schema=:raw_schema_name));


i. Source headers is subset of Target


This means that one or more column(s) has been removed from the source file.

if(:column_is_subset=TRUE) then --If source headers is subset of target headers call raw.sp_Load_raw_data_child(:headers_list,:raw_schema_name,:table_name,:file_path,'INSERT W/O UPDATE');


ii.Source headers is not a subset of Target


This means that one or more column(s) has been removed from the source file and some new column(s) have been added to the file.


call raw.sp_Load_raw_data_child(:headers_list,:raw_schema_name,:table_name,:file_path,'ALTER & INSERT');


b. Source has more columns than target


This means that the file structure has changed and contains probable column addition/deletion/renaming.


call raw.sp_Load_raw_data_child(:headers_list,:raw_schema_name,:table_name,:file_path,'ALTER & INSERT');


Driver Procedure Code

create or replace procedure raw.sp_Load_raw_data_driver(file_path varchar,raw_schema_name varchar,table_name varchar) returns varchar language sql execute as caller as $$ declare execute_fetch_headers resultset; begin let create_raw_query:=''; let column_matched boolean:=FALSE; let column_count boolean:=FALSE; let column_is_subset boolean:=FALSE; let table_exists int:=0; let match_headers int:=0; let headers_list varchar:=''; let fetch_headers_query varchar :=('select replace($1,\';\',\' varchar, \') headers from @raw.GCS_STAGE/' ||:file_path || ' limit 1'); execute_fetch_headers :=(execute immediate :fetch_headers_query); let cur cursor for execute_fetch_headers; select count(*) count into :table_exists from information_schema.tables where table_name=:table_name and table_schema=:raw_schema_name; for j in cur do headers_list:=j.headers; end for; if(:table_exists=0) then --If table doesn't exist create_raw_query:=('CREATE TABLE '||:raw_schema_name || '.' ||:table_name || '(' || :headers_list || ' varchar, LOAD_TIMESTAMP timestamp default getdate(), LOAD_FILE_NAME varchar)'); execute immediate :create_raw_query; call raw.sp_Load_raw_data_child(:headers_list,:raw_schema_name,:table_name,:file_path,'CREATE & INSERT'); else --If table already exists select count(*)=1 into :column_matched from( select count(column_name) count from information_schema.columns where table_name=:table_name and table_schema=:raw_schema_name and column_name not in ('LOAD_FILE_NAME','LOAD_TIMESTAMP') union select count(t.value) from table(split_to_table(upper(replace(replace(:headers_list,' varchar',''),' ','')),',')) as t); if(:column_matched=TRUE) then --If table has same number of columns then check md5 select count(*) into :match_headers from ( select md5(listagg(column_name,',')within group (order by column_name)) md5 from information_schema.columns where table_name=:table_name and table_schema=:raw_schema_name and column_name not in ('LOAD_FILE_NAME','LOAD_TIMESTAMP') union select md5(listagg(value,',')within group (order by value)) md5 from table(split_to_table(upper(replace(replace(:headers_list,' varchar',''),' ','')),',')) as t ); if (:match_headers=1) then --If md5 value is matching call raw.sp_Load_raw_data_child(:headers_list,:raw_schema_name,:table_name,:file_path,'INSERT W/O UPDATE'); else --If md5 value is not matching call raw.sp_Load_raw_data_child(:headers_list,:raw_schema_name,:table_name,:file_path,'ALTER & INSERT'); end if; else --If table doesn't have same number of columns select target_count<source_count into :column_count from( select count(column_name) target_count,max(source_count) source_count from information_schema.columns t1 inner join (select count(t.value) source_count from table(split_to_table(upper(replace(replace(:headers_list,' varchar',''),' ','')),',')) as t) as t2 where t1.table_name=:table_name and t1.table_schema=:raw_schema_name and t1.column_name not in ('LOAD_FILE_NAME','LOAD_TIMESTAMP')); if(column_count=FALSE) then --If Target has more number of columns than source select count(column_name)=0 into :column_is_subset from( select t.value column_name from table(split_to_table(upper(replace(replace(:headers_list,' varchar',''),' ','')),',')) as t where column_name not in( select upper(column_name) column_name from information_schema.columns where table_name=:table_name and table_schema=:raw_schema_name)); if(:column_is_subset=TRUE) then --If source headers is subset of target headers call raw.sp_Load_raw_data_child(:headers_list,:raw_schema_name,:table_name,:file_path,'INSERT W/O UPDATE'); else --If source headers is not a subset of target headers call raw.sp_Load_raw_data_child(:headers_list,:raw_schema_name,:table_name,:file_path,'ALTER & INSERT'); end if; else --If Target has less number of columns than source call raw.sp_Load_raw_data_child(:headers_list,:raw_schema_name,:table_name,:file_path,'ALTER & INSERT'); end if; end if; end if; end; $$;


Child Procedure

Now create the child procedure with the respective conditional blocks based on the mode of operations('CREATE & INSERT', 'INSERT W/O UPDATE', 'ALTER & INSERT'). Here we first create a transient staging table with the structure of the file to be loaded. After this perform a copy into operation on the staging table using which you can perform an insert operation on the table.


Child Procedure Code

CREATE OR REPLACE PROCEDURE raw.sp_Load_raw_data_child(HEADERS_LIST VARCHAR, RAW_SCHEMA_NAME VARCHAR, TABLE_NAME VARCHAR, FILE_PATH VARCHAR, MODE VARCHAR) RETURNS VARCHAR LANGUAGE SQL EXECUTE AS CALLER AS $$ declare execute_copy resultset; execute_copy_rows_loaded resultset; query_executed varchar; begin let STAGING_TABLE_NAME varchar:='STAGING_TABLE'; let copy_staging_query varchar:=('COPY INTO ' || :raw_schema_name || '.'|| :STAGING_TABLE_NAME || ' FROM @{{STAGE_NAME}}/' ||:file_path || ' file_format=raw.CSV_FILE_FORMAT'); let create_staging_query varchar:=('CREATE OR REPLACE TRANSIENT TABLE '||:raw_schema_name || '.'|| :STAGING_TABLE_NAME ||'(' || :headers_list || ' varchar) DATA_RETENTION_TIME_IN_DAYS=0'); let alter_raw_query varchar:=(select 'alter table '|| :raw_schema_name || '.'|| :table_name || ' add ' || listagg(column_name ||' varchar',',') from( select t.value column_name from table(split_to_table(upper(replace(replace(:headers_list,' varchar',''),' ','')),',')) as t where upper(column_name) not in(select upper(column_name) column_name from information_schema.columns where table_name=:table_name and table_schema=:raw_schema_name))); let insert_query varchar:=''; if(:mode='INSERT W/O UPDATE' or :mode='CREATE & INSERT') then query_executed:=:create_staging_query; execute immediate :create_staging_query; query_executed:=:copy_staging_query; execute_copy_rows_loaded:=(execute immediate :copy_staging_query); select listagg(column_name,',') into :headers_list from information_schema.columns where table_name=:staging_table_name and table_schema=:raw_schema_name; insert_query:=('INSERT INTO '||:raw_schema_name || '.'|| :table_name ||'(' || :headers_list || ',LOAD_FILE_NAME) SELECT '||:headers_list || ',\''|| :file_path||'\' FROM '||:raw_schema_name || '.'|| :STAGING_TABLE_NAME); query_executed:=:insert_query; execute immediate :insert_query; let c cursor for execute_copy_rows_loaded; open c; for i in c do rows_affected:=i.rows_loaded; end for; close c; elseif(:mode='ALTER & INSERT') then query_executed:=:alter_raw_query; execute immediate :alter_raw_query; query_executed:=:create_staging_query; execute immediate :create_staging_query; query_executed:=:copy_staging_query; execute_copy_rows_loaded:=(execute immediate :copy_staging_query); select listagg(column_name,',') into :headers_list from information_schema.columns where table_name=:staging_table_name and table_schema=:raw_schema_name; insert_query:=('INSERT INTO '||:raw_schema_name || '.'|| :table_name ||'(' || :headers_list || ',LOAD_FILE_NAME) SELECT '||:headers_list || ',\''|| :file_path||'\' FROM '||:raw_schema_name || '.'|| :STAGING_TABLE_NAME); query_executed:=:insert_query; execute immediate :insert_query; let c cursor for execute_copy_rows_loaded; open c; for i in c do rows_affected:=i.rows_loaded; end for; close c; end if; Update metadata.S3_RAW_LOAD_LOGS set end_time=getdate(), status='SUCCESS', rows_staged=:rows_affected, mode_of_operation=:mode, last_executed_query=:query_executed WHERE START_TIME IN(SELECT MAX(START_TIME) FROM metadata.S3_RAW_LOAD_LOGS WHERE FILE_PATH= '{{STAGE_LOCATION}}'||:file_path); return 'SUCCESS'; EXCEPTION WHEN OTHER THEN Update metadata.S3_RAW_LOAD_LOGS set end_time=getdate(), status='FAILED', MESSAGE = :SQLERRM, mode_of_operation=:mode, last_executed_query= :query_executed WHERE START_TIME IN(SELECT MAX(START_TIME) FROM metadata.S3_RAW_LOAD_LOGS WHERE FILE_PATH= '{{STAGE_LOCATION}}'||:file_path ); end; $$;


57 views0 comments

Recent Posts

See All
bottom of page