top of page

Getting Started With Kafka Connector For Snowflake

Updated: Mar 16

Author: Sudhendu Pandey



Objective

This short tutorial aims to get the reader up and running with Snowflake Connector for Kafka. The Snowflake documentation does a good job of explaining the concept/working of this connector. This article can be used in conjunction with the documentation.


Steps

All these steps are detailed and assume you have little to no background on how to set up either Kafka or Snowflake (to be fair, no real setup is required on Snowflake, just sign-up and some configurations). Wherever I think there are better reference articles available, I will point you to move there, complete the steps and resume here.


Below is the agenda. The focus is to have a quick win and have a working flow ready. You can skip any section/s as per your setup.

  1. Setting up Snowflake Account

  2. Setting up Kafka (over Ubuntu WSL2) Installation. Running Kafka Verifying Kafka Setup Install and configure the Kafka Connect cluster

  3. Setting up Snowflake Connect for Kafka Downloading the required artifacts Creating the public/private key pair for authentication. Configuring the connector. Configuring the Snowflake Database Objects. Starting the Kafka Snowflake Connector Sending messages to Snowflake via the Kafka Connector Verifying the Kafka Message flow into the Snowflake table.

  4. Possible setup issues and remedies


Setting up Snowflake Account

You will need an account on Snowflake. You can sign-up for one here https://signup.snowflake.com/ (no credit card is required, but always keep a tap on how much Snowflake credit you are spending). The trial account works.



Setting up Kafka

Setting up Apache Kafka is straightforward. I am using a Windows machine and have installed Windows Subsystem for Linux WSL2.


You can follow the article below on Confluent.io on how to set up Kafka on WSL2. The steps are detailed and easy to understand. Below I will summarize the high-level steps and commands for Kafka setup.


Set Up and Run Apache Kafka on Windows


Installing Kafka

Steps to install Kafka (once you are ready with WSL2):

Please verify the Kafka Connect vs. Kafka version compatibility before proceeding with the installation.



sudo apt-get update && sudo apt-get upgrade -y

#get your java

sudo apt install openjdk-8-jdk -y

wget https://ftp.wayne.edu/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz

#NOTE: this is anolder version of Kafka. Please see the countability matrix above for theversion of Kafka vs Snowflake Kafka Connect

tar -xzf kafka_2.13-2.6.0.tgz

cd kafka_2.13-2.6.0


Running Kafka

Follow these steps to run Kafka:

  • Start zookeeper (in the latest version of Apache Kafka, you don’t need Zookeeper);

  • start Kafka;

  • create a Topic;

  • create a producer.

Please note you have to do all these steps in the Ubuntu Linux that you have just installed on the WSL2.

bin/zookeeper-server-start.sh config/zookeeper.properties


#Once Zookeeper has started, you can run the below command in the new Ubuntu console

bin/Kafka-server-start.sh config/server.properties


Tip: Download the Windows Terminal. Best way to work with multiple command windows. Get Windows Terminal


Verifying Kafka

You can verify from the logs if your Kafka instance is up and running. If there are any errors, you might want to debug and fix that before moving forward.


To test, we will use the command-line producer and consumer to test our Kafka instance and see if we can send and receive messages on the Topic we have created.


#Create a Kafka Topic called TEST_KAFKA

bin/Kafka-topics.sh –create –topic TEST_KAFKA –bootstrap-server localhost:9092

#Create a Kafka Producer for the Topic TEST_KAFKA

bin/Kafka-console-producer –broker-list localhost:9092 –topic TEST_KAFKA

#Open a new Ubuntu window and start a Kafka consumer for the Topic TEST_KAFKA

bin/Kafka-console-producer.sh –bootstrap-server localhost:9092 –topic TEST_KAFKA


Setting up Snowflake Connector for Kafka

Snowflake provides very good documentation on how to set up Kafka Connector for Snowflake. I managed to complete the entire setup by just following the documentation, which means it is good enough 🙂


For this tutorial, we are just going to create a working example with the least possible customization. You can refer to the documentation for details on individual parameters, steps, etc.


1. Downloading the required artifacts

You can either use a confluent version of Kafka or the Open-Source Software (OSS) Apache Kafka package. For this setup, I am using the OSS package.


Download the below (3 jars):

Kafka Snowflake connector (check for version): https://mvnrepository.com/artifact/com.snowflake

BouncyCastle Cryptography library:

Place all the downloaded jars in the /lib/ folder of your Kafka setup.



2. Creating the public/private key pair for authentication

To authenticate, the Snowflake connector only accepts key pair authentication (instead of basic authentication).


You can easily generate the public-private key pair using OpenSSL. The public key is assigned to the Snowflake user defined in the configuration file.


#Create a private key in some temporary location. Remember the password that you type. We are going to use this value in our config file below.


OpenSSL genrsa 2048 | OpenSSL pkcs8 -topk8 -v2 aes256 -inform PEM -out C:\tmp\cert\new_rsa_key_v1.p8


#Create a public key using the above private key. We are going to update our Snowflake user to have this public key.


openssl rsa -in C:\tmp\cert\new_rsa_key_v1.p8 -pubout -out C:\tmp\cert\new_rsa_key_v1.pub


Expected Output:



3. Configuring the connector.

For our example, we will configure the Kafka Connector in Standalone mode (in contrast to distributed mode. More info here).


a) Create a file name SF_connect.properties and save it in your Kafka /config/ folder.

b) Edit the file and copy-paste the following configuration parameter.



https://gist.github.com/SudhenduP/c846fae265b195676898a86f06a03078#file-sf_connect-properties


Note: The following properties are specific to your instance and hence need to be updated in accordance with your setup.


#your Snowflake instance ID. Please see the first part of the URL.

snowflake.url.name=acb1234.snowflakecomputing.com:443

#your account username

snowflake.user.name=thisismyusername

#privatekey you generated in last step

snowflake.private.key= #place-your-private-key-here-.-no-linefeed-

everything-should-be-in-one-line

#passphrase you used to generate the private key

snowflake.private.key.passphrase= #the-password-you-entered-while-creating-private-key


4. Configuring the Snowflake Database Objects.



You need to do a couple of steps to ensure the right database objects are available, along with the appropriate access for Kafka Connect to work.


You can use the following database script as-is* and execute it on your Snowflake Worksheet.

*Please note the comments (!!!!!!) where you need to make changes.



https://gist.github.com/SudhenduP/97c1b86014a35e4ef4b144dace5ab0b0#file-db-object-snowflake-ddl


5. Starting the Kafka Snowflake Connector

The next step is to start the connector. This is probably where you will face most of the issues. But hopefully, you can debug and resolve this as the error logs are very explanatory. I have also listed the errors I encountered and the fix I did in the last section.


Command to start Kafka Standalone connect for Snowflake:

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties


#Example:

bin/connect-standalone.sh /home/sudh/kafka_2.13-2.6.0/config/connect-standalone.properties /home/sudh/kafka_2.13-2.6.0/config/SF_connect.properties


On successful startup of the connector, you should see something like the below. Ensure that there is no ERROR log in the startup by scrolling through the logs.



6. Sending messages to Snowflake via the Kafka Connector



Made it this far, ha! Amazing! We can now start sending messages on the Kafka Topic, and if everything is set up right, we should be seeing our messages landing up in the Kafka table we have created (one Topic is linked to one table).


Important stuff to know, every Snowflake table loaded by the Kafka connector has a schema consisting of at least two VARIANT columns:

  • RECORD_CONTENT. This contains the Kafka message.

  • RECORD_METADATA. This contains metadata about the message, for example, the Topic from which the message was read.

Let us start by creating a new Topic (the same name as we have given in our connect config properties file (SF_connect.properties) above). Once the Topic is created, we can start the producer on the Topic.


Below are the commands for the creation and starting producer (use as is):

#Create the Topic

bin/kafka-topics.sh –create –topic snowflake_in_topic –bootstrap-server localhost:9092

#Start the producer

bin/kafka-console-producer –broker-list localhost:9092 –topic snowflake_in_topic

At the producer prompt, you can send a sample JSON data given below (or anything you have):

{“order_id”: {“int”: 1212}, “customer_id”: {“string”: abcd}, “order_ts”: {“int”: 333333}, “order_total_usd”: {“double”: 3.8900000000000001}, “item”: {“string”: “Wainwright”}}


At this point, go and check your Kafka Connector console window, it provides some logs like the below:


[SF_KAFKA_CONNECTOR] init pipe: SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_PIPE_KAFKA_TABLE_IN_0 (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)

[2021-09-05 20:02:11,563] INFO

[SF_KAFKA_CONNECTOR] Using existing table KAFKA_TABLE_IN. (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)

[2021-09-05 20:02:12,590] INFO

[SF_KAFKA_CONNECTOR] list stage SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_STAGE_KAFKA_TABLE_IN retrieved 0 file names (com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1:79)

[2021-09-05 20:02:12,590] INFO

[SF_KAFKA_CONNECTOR] Using existing stage SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_STAGE_KAFKA_TABLE_IN. (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)

[2021-09-05 20:02:14,342] INFO

[SF_KAFKA_CONNECTOR] list stage SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_STAGE_KAFKA_TABLE_IN retrieved 0 file names (com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1:79)

[2021-09-05 20:02:14,344] INFO

[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_PIPE_KAFKA_TABLE_IN_0, recovered from existing pipe (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)

[2021-09-05 20:02:14,345] INFO

[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_PIPE_KAFKA_TABLE_IN_0: cleaner started (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)

[2021-09-05 20:02:14,346] INFO

[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_PIPE_KAFKA_TABLE_IN_0: flusher started (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)


7. Verifying the Kafka Message flow into the Snowflake table.

Login to your Snowflake Console. The URL should look something like the below:

abcd1234.snowflakecomputing.com/console#/internal/worksheet

Navigate to your Database->Schema->Table. Right-click and select the Query table. You should see the one you entered.




If you made it this far, amazing! The Snowflake documentation makes it a lot easier!


Possible issues you might face and fixes

Error message: unable to decode base64 string: invalid characters encountered in base64 data

Version mismatch between Kafka Connect vs Kafka: I had an older version of Kafka on my laptop (1+ years old), whereas I downloaded the latest version of Kafka Connect. There was some issue with the version (some java deserialization issues).


Error message: Invalid JWT token


You have an issue with your key pair generated. Ensure you have followed the step (Creating the public/private key pair for authentication) and also added the correct entry in the database for rsa_key_pub


Error message: Cannot perform CREATE TABLE. This session does not have a current database. Call ‘USE DATABASE,’ or use a qualified name.


You have to ensure you have executed the following statement (change the default role)

ALTER USER YOURUSERNAME set DEFAULT_ROLE = KAFKA_CONNECT_ROLE;

Credit and Ref:

  1. https://pandeysudhendu.medium.com/getting-started-with-kafka-connector-for-snowflake-3bf596e550bb

https://docs.snowflake.com/

If you have any questions, please comment or head over to Snowflake Community.



770 views0 comments

Recent Posts

See All
bottom of page