Snowpark is a developer framework that now brings Python support to Snowflake for enhanced developer experience. Snowpark allows developers to write custom transformation logic and deploy the code in a serverless manner to Snowflake’s virtual warehouses.
Snowpark provides support for data ingestion through APIs and custom Python code to move data from stages to Snowflake tables. The kipi.bi team has leveraged Snowpark for our in-house Migration Accelerator to automate the process for creating data pipelines through a Python-based approach.
Designed to make development easier within a client’s Snowflake environment, Snowpark adoption rates have skyrocketed with uses-cases ranging from core data integration pipelines to building data applications including machine learning and advanced analytics.
Below, we will take a closer look at a Snowpark for data ingestion use-case.
There are multiple ways to ingest data into Snowflake. In this blog, we will explore how to ingest data into Snowflake from AWS S3 through a metadata file-driven approach leveraging Snowpark for Python for the orchestration framework.
Metadata driven framework leveraging Snowpark for Python
The metadata file is placed on S3 with all required information including load schedule, S3 bucket location, table name, etc. All files that need to be ingested into Snowflake are also in S3 as data files in Parquet format.
Data for a particular entity needs to be loaded to various tables in this use case as type0, type1, and type2, with logging enabled to capture structure changes on the tables. Basic functionality of each table is as follows:
Type 0 (Append) Table - a straight copy of data coming in from S3
Type 1 (Regular) Table - has logic to identify the latest values on a primary key from the appended table
Type 2 (History) Table - has logic to maintain the history of changes on a primary key from the appended table
Schema_audit tables to capture any changes in the schema of the data for a table coming from the source parquet files
This use case also involves handling Schema Drift (addition of new columns and change in data type) and Schema Evolution (taking actions on schema drift in Snowflake) dynamically to address changes occurring in the source files.
All operations must be driven by a metadata file that can be modified at any time on S3. For example changes in schedule_cron or S3 bucket location for a particular table.
Logical representation of the Framework
Let's take a look at the framework needed to execute the use case:
To begin, we need to check if anything has changed in the metadata of a JSON file stored in S3. This could include things like a scheduled cron or a primary key. We can create a view of the JSON file to see if any records have been updated. Then, we compare the new metadata to a previous snapshot to see if there are any changes. If there are, we replace the old metadata with the new metadata and keep a record of the changes. If the JSON file contains multiple entities, we run the same process for each entity separately if there are any changes in the metadata.
After checking for changes in the metadata, we can automatically detect any changes to the schema and update the target schema accordingly. This is achieved using a function called "infer_schema," which automatically detects the metadata schema changes. We use log tables to keep track of the changes and then merge them into the Type 1 and Type 2 tables with the updated table definition. We add four new columns (surrogate key, row hash, load time, and update time) to the Type 1 and Type 2 tables using the "array_agg" function. To ensure that we have a backup of the old data, we use the "clone" functionality of Snowflake.
Finally, we create a stream for each Type 0 record in the metadata file. This stream collects all the data and stores it in a temporary table that includes a unique identifier generated by the Snowflake sequence generator, a hash value for each row, and timestamps for load and update times. With the "merge" function, we can use the primary key and hash column to populate the Type 1 and Type 2 tables from the stream.
Snowflake Features Used In Implementation
Step 1 - Detect any changes in Metadata
View over file in the stage
Stored procedures to check for changes
Copy data from S3 using tasks
Step 2 - Detect any changes in Schema
Table schema evolution - Automatically update table schema to accommodate changes
Use infer_schema to detect metadata schema changes
Clone tables to backup old data
Step 3 - Merge data into type 1 & type 2 tables
Use streams for change data capture (CDC)
Generate surrogate keys using sequence generator
Compare rows using hash function
Use merge commands to populate Type 1 and Type 2 tables.
Snowpark over Conventional Methods
Snowpark's Python-based stored procedures offer several advantages over those written in
Code length is significantly reduced as multiple functions can be leveraged
Ease of Debugging with Python
With Python, code can be separated from the stored procedures and stored in S3, making it easier to version control using AWS's versioning.
Snowpark for Python brings features like intelligent code completion and type checking, which simplify the development process.
With Snowpark for Python, we can optimize processes such as debugging, version control, and unit testing that were once difficult to achieve. In addition, our code is now portable and compatible with many CI/CD applications.
Interested in learning more about Snowpark for Python? Connect with our data experts today.
Writing Stored Procedures in Python: https://docs.snowflake.com/en/LIMITEDACCESS/stored-procedures-python.html
Table Schema Evolution: https://docs.snowflake.com/en/LIMITEDACCESS/table-schema-evolution.html