top of page

Metadata Driven Framework with Snowpark for Python

Introduction

Snowpark is a developer framework that now brings Python support to Snowflake for enhanced developer experience. Snowpark allows developers to write custom transformation logic and deploy the code in a serverless manner to Snowflake’s virtual warehouses.


Snowpark provides support for data ingestion through APIs and custom Python code to move data from stages to Snowflake tables. The kipi.bi team has leveraged Snowpark for our in-house Migration Accelerator to automate the process for creating data pipelines through a Python-based approach.


Designed to make development easier within a client’s Snowflake environment, Snowpark adoption rates have skyrocketed with uses-cases ranging from core data integration pipelines to building data applications including machine learning and advanced analytics.


Below, we will take a closer look at a Snowpark for data ingestion use-case.


 
Use Case

There are multiple ways to ingest data into Snowflake. In this blog, we will explore how to ingest data into Snowflake from AWS S3 through a metadata file-driven approach leveraging Snowpark for Python for the orchestration framework.

Metadata driven framework leveraging Snowpark for Python


The metadata file is placed on S3 with all required information including load schedule, S3 bucket location, table name, etc. All files that need to be ingested into Snowflake are also in S3 as data files in Parquet format.


Data for a particular entity needs to be loaded to various tables in this use case as type0, type1, and type2, with logging enabled to capture structure changes on the tables. Basic functionality of each table is as follows:


  • Type 0 (Append) Table - a straight copy of data coming in from S3

  • Type 1 (Regular) Table - has logic to identify the latest values on a primary key from the appended table

  • Type 2 (History) Table - has logic to maintain the history of changes on a primary key from the appended table

  • Schema_audit tables to capture any changes in the schema of the data for a table coming from the source parquet files


This use case also involves handling Schema Drift (addition of new columns and change in data type) and Schema Evolution (taking actions on schema drift in Snowflake) dynamically to address changes occurring in the source files.


All operations must be driven by a metadata file that can be modified at any time on S3. For example changes in schedule_cron or S3 bucket location for a particular table.


 
Logical representation of the Framework

Let's take a look at the framework needed to execute the use case:


To begin, we need to check if anything has changed in the metadata of a JSON file stored in S3. This could include things like a scheduled cron or a primary key. We can create a view of the JSON file to see if any records have been updated. Then, we compare the new metadata to a previous snapshot to see if there are any changes. If there are, we replace the old metadata with the new metadata and keep a record of the changes. If the JSON file contains multiple entities, we run the same process for each entity separately if there are any changes in the metadata.


After checking for changes in the metadata, we can automatically detect any changes to the schema and update the target schema accordingly. This is achieved using a function called "infer_schema," which automatically detects the metadata schema changes. We use log tables to keep track of the changes and then merge them into the Type 1 and Type 2 tables with the updated table definition. We add four new columns (surrogate key, row hash, load time, and update time) to the Type 1 and Type 2 tables using the "array_agg" function. To ensure that we have a backup of the old data, we use the "clone" functionality of Snowflake.


Finally, we create a stream for each Type 0 record in the metadata file. This stream collects all the data and stores it in a temporary table that includes a unique identifier generated by the Snowflake sequence generator, a hash value for each row, and timestamps for load and update times. With the "merge" function, we can use the primary key and hash column to populate the Type 1 and Type 2 tables from the stream.


 
Snowflake Features Used In Implementation

  1. Step 1 - Detect any changes in Metadata

  2. View over file in the stage

  3. Stored procedures to check for changes

  4. Copy data from S3 using tasks

  5. Step 2 - Detect any changes in Schema

  6. Table schema evolution - Automatically update table schema to accommodate changes

  7. Use infer_schema to detect metadata schema changes

  8. Clone tables to backup old data

  9. Step 3 - Merge data into type 1 & type 2 tables

  10. Use streams for change data capture (CDC)

  11. Generate surrogate keys using sequence generator

  12. Compare rows using hash function

  13. Use merge commands to populate Type 1 and Type 2 tables.


 
Conclusion

Snowpark over Conventional Methods

Snowpark's Python-based stored procedures offer several advantages over those written in

Javascript:


  1. Code length is significantly reduced as multiple functions can be leveraged

  2. Ease of Debugging with Python

  3. With Python, code can be separated from the stored procedures and stored in S3, making it easier to version control using AWS's versioning.

  4. Snowpark for Python brings features like intelligent code completion and type checking, which simplify the development process.


With Snowpark for Python, we can optimize processes such as debugging, version control, and unit testing that were once difficult to achieve. In addition, our code is now portable and compatible with many CI/CD applications.


Interested in learning more about Snowpark for Python? Connect with our data experts today.


 
References
220 views0 comments

Recent Posts

See All
bottom of page