Matillion - Meta-Data Driven Load

Updated: Aug 1

Authors: Ganesh Goel & Navanil Roy


Overview:

There are multiple ways of implementing a Data Driven load, this article highlights one such method here.


This article focuses on FULL LOADS or TRUNCATE LOADS and INCREMENTAL LOADS. The source used here is the SQL Server and the destination is Snowflake. However, the same can be replicated with other systems as well with minimum changes.


Use Case -

The upstream is SQL Server and you need to move the data to Snowflake. There are over 100 tables in SQL Server and you need to schedule a pipeline to move the data to Snowflake. This will enable you to easily move all the tables into the Snowflake based on the type of load you want.


This implementation requires two orchestration jobs(parent and child) to be created. Below are both the orchestration jobs for Metadata driven load.


Parent Orchestration job

Child Orchestration Job
Prerequisites:
  • Environment setup is done with correct configuration

  • Roles, Warehouse Set-up in Snowflake, for Matillion to use

  • Master Load Table which will contain the details of source and destination.


Implementation:

Please follow the below steps to achieve the target result.


Snowflake:

Create a table in snowflake which will contain both the Source description along with the Target description. For example, to load table [dbo].[Division] from SQL Server to Snowflake the entry should look similar to the first row.


Significance of columns-

SOURCE_TABLE_NAME : Name of the table in upstream

TARGET_TABLE_NAME : Name of the table that you want matillion to create and load data in Snowflake

LOAD_TYPE : Type of load

INCREMENTAL_COLUMN(NULL for Full loads) : Name of the column on which incremental load is to be done(For multiple incremental columns add new record for same source and target table).

LAST_LOADED_VALUE(Only in case of incremental load) : Initially it will be NULL. It will get automatically updated as soon as the table is loaded by matillion and will keep on updating each time data is loaded into the table.

IS_ACTIVE : It will check if the table is currently in use or not.


Note: This can change based on requirement. In this example we are demonstrating bare minimum columns needed.

Please follow the below steps for setting up the pipeline in Matillion:

1. Create Job Variables


Create three job variables to store source, target table names and type of load.



2. Use Table iterator component


This will allow you to fetch data from the Master table(source, target table names and load type) and store that data in the job variables. A sample query to fetch data from the master table is given below:


select distinct source_table_name, target_table_name, LOAD_TYPE from "METADATA_DEV"."LOAD_METADATA"."SOURCE_TO_TARGET_MAPPING"

where IS_ACTIVE='TRUE'


After selecting the columns perform column mapping with their corresponding job variables.


Create a Child Orchestration job:

1. Create Job and Grid variables


Create several job variables to be used later in the pipeline.



Create one grid variable with one column:



2. Use If Component


This component will check the type of load(full or incremental) and based on that it will separate the pipeline into two different sections.


For Full Load:

1. Use Database Query Component

  • Write a SQL query with job variable as source(as mentioned in the screenshot given below), so that with every run source table value will be updated.

  • Choose the target table in the form of a job variable(as mentioned in the screenshot given below), so that with every run the target table value will be updated.



For Incremental Load:

1. Check if table already exists in Snowflake


For this you need to use the Query Result to Scalar component. This will check if the table to be loaded already exists in Snowflake or not. If it does not exist, it will perform a full load for the first time. You can query this from the information schema. A sample query is given below:


SELECT EXISTS (

SELECT * FROM information_schema.tables

WHERE table_name = '${target_table_job}'

) e


Store the results of the query in a job variable created above. After it use the ‘If’ component to branch based on whether the table exists or not.

If table exists:

2. Get the incremental conditions


This will let you generate the where condition to filter out only the new data from the source table. For this you need to use the Query Result to Scalar component. It will contain a SQL query which will generate a statement if in case there is new data else it will return blank. Store the results of the above query in a Job variable. A sample query is given below:


select case when v = 'where ' then '' else v

end v

from

(

select

case when (select count(*) from ${target_table_job})>=1 then

concat ('where ',listagg(

concat(INCREMENTAL_COLUMN,'>',case when regexp_like(LAST_LOADED_VALUE,'[0-9]*')='FALSE' then concat('\'',LAST_LOADED_VALUE,'\'')

else LAST_LOADED_VALUE end),' and '))

else '' end v

from "METADATA_DEV"."LOAD_METADATA"."SOURCE_TO_TARGET_MAPPING"

where SOURCE_TABLE_NAME='${source_table_job}'

and TARGET_TABLE_NAME='${target_table_job}'

and LOAD_TYPE='INCREMENTAL'

and IS_ACTIVE='TRUE') e


3. Perform Incremental Load


This step allows you to load data from the source into target incrementally(in case there is a where clause in the previous step). For this you need to use the Database Query(For legacy servers) component. It will contain the connection details of your source system and configuration of your target. Also it will contain a SQL query which will perform the load incrementally. A sample query is given below:


select *, CURRENT_TIMESTAMP CREATION_DATE,'ETL_USER' CREATED_BY, 'NULL' UPDATED_AT, 'NULL' UPDATED_BY

from ${source_table_job} ${incremental_query_job}


Now after you have performed incremental load, you need to update the last loaded value in the Master table.


4. Fetch incremental Columns


For this you need to use the Query result to Grid component. This will fetch all the incremental columns for the source and target from the Master table and store them in a grid variable. In the same component map the incremental column with the grid variable column. A sample query to get incremental column is given below:


select incremental_column from "METADATA_DEV"."LOAD_METADATA"."SOURCE_TO_TARGET_MAPPING"

where SOURCE_TABLE_NAME='${source_table_job}'

and TARGET_TABLE_NAME='${target_table_job}'

and LOAD_TYPE='INCREMENTAL'

and IS_ACTIVE='TRUE'


5. Update the Master table


For this you need to use Grid iterator and sql query components. Grid iterator will loop over the grid variable and give the value to the sql query(which contains an update statement) to update the value of the record in the Master table. In the Grid iterator you need to map the grid column value to a job variable which will keep on updating the incremental column name with each iteration.

A sample update query is mentioned below:


update "METADATA_DEV"."LOAD_METADATA"."SOURCE_TO_TARGET_MAPPING"

set LAST_LOADED_VALUE=(select max("${current_value_job}") from ${target_table_job})

where TARGET_TABLE_NAME='${target_table_job}'

and INCREMENTAL_COLUMN='${current_value_job}'

and SOURCE_TABLE_NAME='${source_table_job}'

and LOAD_TYPE='INCREMENTAL'

and IS_ACTIVE='TRUE'


If table does not exist:

Perform full load:


Use Database Query component to load the data in Snowflake as similar to what is done when load type is full. Now after loading the data in Snowflake, you need to update the Last loaded value in the Master table, so that next time the same data won’t be loaded again.


Import the Child Orchestration job in Parent:


Use the Run Orchestration job component and pass the values of scalar variables after setting up the orchestration job.


Run the Parent Orchestration Job:


Run the orchestration job to load the tables inside the snowflake environment. You will also be able to see the changes in the last_loaded_value in the Master table.



Benefits:
  • Scalability: It is a data-driven strategy, which enables the reuse of existing workflows thus making the future development efforts that require the same functionality efficient.

  • Flexibility: A reusable workflow can be updated, tested, and deployed in case of change of design, patterns or some errors.

  • Maintainability: In case of complex workflows, reusable workflow helps in identifying the exact point of error in case if a pipeline fails. Also, It generates a standardized workflow which is easier to review and maintain.

137 views0 comments

Recent Posts

See All