Author: Sandeep Dontiwar
I. Data Loading Using Metadata-driven Approach:
While loading data from any source (such as RDS) into the snowflake landing table, we can create a data pipeline job. But if we are planning to load data into multiple tables from the same source, then it becomes difficult to create and manage separate jobs for each table. Also, it is a repetitive task.
Instead, if data is coming from the same source such as RDS which needs to be loaded into multiple snowflake tables, then we can follow the Metadata-driven approach.
As per this approach, we need to create one mapping table in snowflake consisting of entries for Source Table (from source such as RDS) and Target Tables (in EDW like snowflake).
Once a mapping table is created, data ingestion from source to target can be done with the help of a single ETL job which is using some kind of iterator to go through each entry in the mapping table.
Advantage:
Scalability: It is a data-driven method that allows for the reuse of existing workflows, which streamlines any future development initiatives that call for the same functionality..
Flexibility: In the event of changes to the design, patterns, or faults, a reusable workflow can be changed, tested, and deployed.
Maintainability: Reusable workflow aids in pinpointing the precise source of mistake when a pipeline breaks in complex workflows. Furthermore, it creates a uniform workflow that is simpler to examine and maintain.
Below you can see a single matillion job used for ingesting data from AWS RDS into snowflake tables:
Step 1: Create a mapping table. This mapping table will contain the source to target mapping. In this example, the load is one-to-one, from an RDS database data source, to the Snowflake raw layer. Table names are case-sensitive for RDS, so any such scenario needs to be kept in mind while populating the table. This table needs to be updated as and when new source to table mapping is available.
Query:
create or replace TABLE MAPPING (
RDS_SOURCE VARCHAR(50),
LANDING_TARGET VARCHAR(50)
);

Step 2: Create a Matillion job for ingesting data into snowflake tables. This job uses an iterator for looping through all entries in the mapping table.

Note: In the above job, for loading data, the Truncate and Load method is used. All the target tables will be truncated first and then they will be loaded with data from source.
(i) Create a grid variable consisting of source and target columns.

(ii) Configure a Query Result to Grid component with columns of the mapping table.

(iii) Configure Grid Iterator component to loop through all the entries in the mapping table.

II. Automated Data Load Testing using Metadata-driven Matillion Job:
Once data is loaded, it is important to verify whether all the records from source are loaded into the target table.
This can be done in an automated way using a metadata-driven approach. For this, we need to create one mapping table consisting of information about source table, target table along with the query to validate data loading from source table to table table.
For any Source A mapped to be loaded to Target B, it is checked if there is any record present in A which is not loaded in B. Such a scenario is only expected to happen in case there is any invalid record in A.
Step 1: Create a mapping table in snowflake. This table is loaded once with the SOURCE_SCHEMA name, SOURCE_TABLE name, TARGET_SCHEMA name, TARGET_TABLE name, INVALID_RECORD_VIEW name (which is a view that is loaded by master pipelines in Matillion and consists of source records that did not pass the data quality checks), and TEST_QUERY to check if all the records from the source has been loaded into the target.
Query :
create or replace TABLE AUTOMATED_TESTING_MAPPING (
SOURCE_SCHEMA VARCHAR(50),
SOURCE_TABLE VARCHAR(100),
TARGET_SCHEMA VARCHAR(50),
TARGET_TABLE VARCHAR(100),
INVALID_RECORD_VIEW VARCHAR(100),
TEST_QUERY VARCHAR(16777216),
JOB_CODE NUMBER(4,0)
);

Sample Test_Query:
select col1, col2, col3 from source_table
minus
select col1, col2, col3 from target_table;
Step 2: Create a matillion job using an iterator to loop through all entries in the mapping table.

Step 3: Store the result of testing into the Results Table in snowflake.
Query: create or replace TABLE AUTOMATED_TESTING_RESULT (
SOURCE_SCHEMA VARCHAR(50),
SOURCE_TABLE VARCHAR(100),
TARGET_SCHEMA VARCHAR(50),
TARGET_TABLE VARCHAR(100),
NO_OF_ROWS_NOT_LOADED NUMBER(38,0),
INVALID_RECORD_COUNT NUMBER(38,0),
LOAD_DATE TIMESTAMP_NTZ(9));
Testing Result:

After testing is finished, results will be stored in the Results table which consists of NO_OF_ROWS_NOT_LOADED and INVALID_RECORD_COUNT. With the help of these columns, we can validate whether all records are loaded into the target table or not. Also, we are getting the count of invalid records which can be further analyzed to get invalid records.