Authors: Ganesh Goel & Navanil Roy
Overview:
There are multiple ways of implementing a Data-Driven load, this article highlights one such method here.
This article focuses on FULL LOADS or TRUNCATE LOADS and INCREMENTAL LOADS. The source used here is the SQL Server and the destination is Snowflake. However, the same can be replicated with other systems as well with minimum changes.
Use Case -
The upstream is SQL Server and you need to move the data to Snowflake. There are over 100 tables in SQL Server and you need to schedule a pipeline to move the data to Snowflake. This will enable you to easily move all the tables into the Snowflake based on the type of load you want.
This implementation requires two orchestration jobs(parent and child) to be created. Below are both the orchestration jobs for Metadata driven load.


Prerequisites:
Environment setup is done with the correct configuration
Roles, Warehouse Set-up in Snowflake, for Matillion to use
Master Load Table which will contain the details of source and destination.
Implementation:
Please follow the below steps to achieve the target result.
Snowflake:
Create a table in Snowflake that will contain both the Source description along with the Target description. For example, to load table [dbo].[Division] from SQL Server to Snowflake the entry should look similar to the first row.
Significance of columns-
SOURCE_TABLE_NAME : Name of the table in upstream
TARGET_TABLE_NAME : Name of the table that you want Matillion to create and load data in Snowflake
LOAD_TYPE : Type of load
INCREMENTAL_COLUMN(NULL for Full loads) : Name of the column on which incremental load is to be done (For multiple incremental columns add a new record for the same source and target table).
LAST_LOADED_VALUE(Only in case of incremental load) : Initially it will be NULL. It will get automatically updated as soon as the table is loaded by Matillion and will keep on updating each time data is loaded into the table.
IS_ACTIVE : It will check if the table is currently in use or not.

Please follow the below steps for setting up the pipeline in Matillion:
1. Create Job Variables
Create three job variables to store source, target table names, and type of load.

2. Use Table iterator component
This will allow you to fetch data from the Master table(source, target table names, and load type) and store that data in the job variables. A sample query to fetch data from the master table is given below:
select distinct source_table_name, target_table_name, LOAD_TYPE from "METADATA_DEV"."LOAD_METADATA"."SOURCE_TO_TARGET_MAPPING"
where IS_ACTIVE='TRUE'
After selecting the columns perform column mapping with their corresponding job variables.
Create a Child Orchestration job:
1. Create Job and Grid Variables
Create several job variables to be used later in the pipeline.

Create one grid variable with one column:

2. Use If Component
This component will check the type of load(full or incremental) and based on that it will separate the pipeline into two different sections.
For Full Load:
1. Use Database Query Component
Write a SQL query with the job variable as the source (as mentioned in the screenshot given below), so that with every run source table value will be updated.
Choose the target table in the form of a job variable(as mentioned in the screenshot given below), so that with every run the target table value will be updated.

For Incremental Load:
1. Check if the table already exists in Snowflake
For this, you need to use the Query Result to Scalar component. This will check if the table to be loaded already exists in Snowflake or not. If it does not exist, it will perform a full load for the first time. You can query this from the information schema. A sample query is given below:
SELECT EXISTS (
SELECT * FROM information_schema.tables
WHERE table_name = '${target_table_job}'
) e
Store the results of the query in a job variable created above. After it uses the ‘If’ component to branch based on whether the table exists or not.
If table exists:
2. Get the incremental conditions
This will let you generate the where condition to filter out only the new data from the source table. For this, you need to use the Query Result to Scalar component. It will contain a SQL query that will generate a statement if in case there is new data else it will return blank. Store the results of the above query in a Job variable. A sample query is given below:
select case when v = 'where ' then '' else v
end v
from
(
select
case when (select count(*) from ${target_table_job})>=1 then
concat ('where ',listagg(
concat(INCREMENTAL_COLUMN,'>',case when regexp_like(LAST_LOADED_VALUE,'[0-9]*')='FALSE' then concat('\'',LAST_LOADED_VALUE,'\'')
else LAST_LOADED_VALUE end),' and '))
else '' end v
from "METADATA_DEV"."LOAD_METADATA"."SOURCE_TO_TARGET_MAPPING"
where SOURCE_TABLE_NAME='${source_table_job}'
and TARGET_TABLE_NAME='${target_table_job}'
and LOAD_TYPE='INCREMENTAL'
and IS_ACTIVE='TRUE') e
3. Perform Incremental Load
This step allows you to load data from the source into the target incrementally(in case there is a where clause in the previous step). For this, you need to use the Database Query(For legacy servers) component. It will contain the connection details of your source system and the configuration of your target. Also, it will contain a SQL query that will perform the load incrementally. A sample query is given below:
select *, CURRENT_TIMESTAMP CREATION_DATE,'ETL_USER' CREATED_BY, 'NULL' UPDATED_AT, 'NULL' UPDATED_BY
from ${source_table_job} ${incremental_query_job}
Now after you have performed incremental load, you need to update the last loaded value in the Master table.
4. Fetch incremental Columns
For this, you need to use the Query result to Grid component. This will fetch all the incremental columns for the source and target from the Master table and store them in a grid variable. In the same component map the incremental column with the grid variable column. A sample query to get an incremental column is given below:
select incremental_column from "METADATA_DEV"."LOAD_METADATA"."SOURCE_TO_TARGET_MAPPING"
where SOURCE_TABLE_NAME='${source_table_job}'
and TARGET_TABLE_NAME='${target_table_job}'
and LOAD_TYPE='INCREMENTAL'
and IS_ACTIVE='TRUE'
5. Update the Master table
For this, you need to use Grid iterator and sql query components. Grid iterator will loop over the grid variable and give the value to the SQL query(which contains an update statement) to update the value of the record in the Master table. In the Grid iterator, you need to map the grid column value to a job variable which will keep on updating the incremental column name with each iteration.
A sample update query is mentioned below:
update "METADATA_DEV"."LOAD_METADATA"."SOURCE_TO_TARGET_MAPPING"
set LAST_LOADED_VALUE=(select max("${current_value_job}") from ${target_table_job})
where TARGET_TABLE_NAME='${target_table_job}'
and INCREMENTAL_COLUMN='${current_value_job}'
and SOURCE_TABLE_NAME='${source_table_job}'
and LOAD_TYPE='INCREMENTAL'
and IS_ACTIVE='TRUE'
If the table does not exist:
Perform full load:
Use the Database Query component to load the data in Snowflake similar to what is done when the load type is full. Now after loading the data in Snowflake, you need to update the Last loaded value in the Master table, so that next time the same data won’t be loaded again.
Import the Child Orchestration job in Parent:
Use the Run Orchestration job component and pass the values of scalar variables after setting up the orchestration job.
Run the Parent Orchestration Job:
Run the orchestration job to load the tables inside the snowflake environment. You will also be able to see the changes in the last_loaded_value in the Master table.

Benefits:
Scalability: It is a data-driven strategy, which enables the reuse of existing workflows thus making future development efforts that require the same functionality efficient.
Flexibility: A reusable workflow can be updated, tested, and deployed in case of a change of design, patterns, or some errors.
Maintainability: In the case of complex workflows, reusable workflow helps in identifying the exact point of error in case a pipeline fails. Also, It generates a standardized workflow which is easier to review and maintain.