top of page

Publish Data From Snowflake To Kafka

Authors: Sahil Adlakha & Ayushi Rawat


Introduction

This blog will guide you through the implementation of how you can write data to Kafka topics from Snowflake; yes, you read it right, not consuming data from Kafka but writing data to Kafka.


We will make use of Docker image for implementing the same.


If you are looking for a blog to guide you through the process of consuming data from Kafka you can navigate to this blog for a detailed explanation and walkthrough of the same.


Let’s first understand what you can expect from this blog.


What will be covered in this Blog?

  • What is Docker Desktop?

  • Setup the Snowflake Environment

  • Configure Docker Environment

  • Install Docker Image

  • Configure Confluent Kafka Connect JDBC Source Connector

  • Configure Snowflake JDBC Driver

  • Kafka topic consuming data from Snowflake

Let’s get started!


Setup the Snowflake Environment


Let us start by configuring the Snowflake environment; all you need to do is simply execute the below set of commands inside the Snowflake account.


--Setup database and schema

use database demo;

use schema public;


--Create a table called ‘snowflake2kafka’

create or replace table "snowflake2kafka"

(

"id" integer not null,

"start_date" timestamp not null,

"end_date" timestamp not null,

"name" varchar(255),

"city" varchar(255)

);


What is Docker Desktop?


Docker Desktop lets you develop, distribute, and execute containerized applications and microservices effortlessly. It offers an intuitive GUI for direct management of containers, apps, and images on your local system.


Configure Docker


Let us start by installing Docker for desktop in our local system. Use the link below for installation. https://docs.docker.com/desktop/

Once the setup is complete, your system will prompt for a restart. If you open the Docker environment, you will have a similar screen available.

Now, you need to open your command prompt and execute the following command to pull a docker image for configuring Kafka.


docker pull landoop/fast-data-dev


If you need more information about the docker image, you can read it at: https://hub.docker.com/r/landoop/fast-data-dev


Once you execute the above command, the download will get triggered with status at the end reading Downloaded newer image for landoop/fast-data-dev:latest.


As shown in the screenshots below:

After the successful download of the image, you will be able to see it inside your Docker environment.

Once the docker image is available, you are set to download the following -

  1. The snowflake JDBC driver. You can download the latest version from this link.

  2. The Confluent Kafka connect JDBC Source connector. You can download it from this link.


Once the download is complete, extract the zip file and look for Kafka-Connect-jdbc-<version>.jar file.


Next, we need to place these two jar files inside the respective path. Search for \\wsl.localhost in the file explorer.

Once done search for kafka-connect-jdbc* directory and place the jar files here.


Once you are inside the docker environment and have opened the landoop/fast-data-dev image, you can now create a new container with the below configurations. Start off by adding port numbers.


Refer the below screenshots for better understanding.

Once you have added the port numbers, you must add the Host Path. In order to add the host path, create a folder in C drive and give it a name of your choice.


For example we created a folder path as:


“C:\Users\SahilAdlakha\Documents\Kafka_volume”.


Note: Give the last folder name with a forward slash in the container path and the remaining path before that in Host Path.


Once done, you can hit the Run button.


Refer to the below screenshots for a better understanding.

Once you create a container and have initiated a run, then you will be able to see a similar screen. You now need to navigate to port 3030.

Once you are at port 3030, you will be redirected to the Kafka development environment, where Coyote health checking will be initiated, wait for a moment for it to get completed. Once it is, you can now open Connectors tile (KAFKA CONNECT UI) by hitting enter.

A new screen for creating connectors will appear and you can create a new connection from here. In order to connect with Kafka, we will make use of the JDBC connector. Select JDBC as source connector in order to proceed.


Refer to the below screenshots for a better understanding.

When you click on create, you will be redirected to fill the configurations for the properties tab. Use the below code and pass your Snowflake credentials and Snowflake account specific values.


NOTE: Do not try to change name and connector.class properties.


name=snowflake.to.kafka.connector

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

timestamp.column.name=updated_at

incrementing.column.name=id

transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey

connection.password=<password>

query=select * from "snowflake2kafka"

connection.attempts=100

transforms=AddNamespace,createKey,AddKeyNamespace

connection.backoff.ms=300000

transforms.AddNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value

timestamp.delay.interval.ms=3000

table.types=table

mode=incrementing

topic.prefix=test

transforms.AddKeyNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Key

connection.user=<username>

transforms.AddNamespace.schema.name=inc.evil.coursecatalog.InstructorAggregate

transforms.createKey.fields=id

poll.interval.ms=1500

transforms.AddKeyNamespace.schema.name=inc.evil.coursecatalog.Key

numeric.mapping=best_fit

connection.url=jdbc:snowflake://qk58119.central-india.azure.snowflakecomputing.com/?warehouse=<warehouse_name>&db=<database>&schema=<schema>&role=<role>


Refer to the below screenshots for a better understanding.

Click on the Create button. You should see something like this.


In the highlighted area, “1” shows the connector is running, and “topics - test” specifies the name which was mentioned in the properties.

As the connection is running, We are now ready to see the captured changes. Let us insert some records in the Snowflake table.


INSERT INTO snowflake2kafka VALUES (15, '2023-01-29 10:00:00.400121', '2023-01-29 11:00:19.400121', 'Ayushi', 'Pune');


In the background, Kafka keeps on checking the changes in the table. In our case, we are looking for the ID column to be incrementally changed. Kafka will pick up the record if the ID holds an incremental value.


Navigate to the home screen, Click Enter on Topics Tile and look for the test topic. In our case it had 3 records before inserting the new record.


Therefore, after inserting the above record, the topic data looks like this.

Let us insert one more record.

INSERTINTO"snowflake2kafka"("id","created_at","updated_at","first_name","address")VALUES(16,'2023-01-2910:00:00.400121','2023-01-2911:00:19.400121','Sahil','Bangalore');

As you can see in the above image, Kafka is reading data from the snowflake table.


Conclusion

In conclusion, the successful integration of Snowflake tables with Kafka opens up new avenues for efficient data consumption. And with this, we are all set to consume data from Snowflake tables or stream data out of Snowflake using Kafka.



References:


17 views0 comments
bottom of page