top of page

Data Ingestion In Snowflake Using Kafka

Updated: Apr 3

Author: Prashant Sharma


1. OVERVIEW AND ARCHITECTURE

➢ APACHE KAFKA:


o Apache Kafka software uses a publish and subscribe model to write and read streams of records, similar to a message queue or enterprise messaging system.

o An application publishes messages to a topic, and an application subscribes to a topic to receive those messages.

o Topics can be divided into partitions to increase scalability.


➢ KAFKA CONNECT:


o Kafka Connect is a framework for connecting Kafka with external systems, including databases.

o A Kafka Connect cluster is a separate cluster from the Kafka cluster.

o The Kafka Connect cluster supports running and scaling out connectors (components that support reading and/or writing between external systems).


➢ SNOWFLAKE CONNECTORS:


o Snowflake provides two versions of the connector:

▪ A version of the Confluent package version of Kafka.

▪ A version of the open-source software (OSS) Apache Kafka package.

o From the perspective of Snowflake, a Kafka topic produces a stream of rows to be inserted into a Snowflake table. In general, each Kafka message contains one row.

o In Snowflake, one topic supplies messages (rows) for one Snowflake table.

o The current version of the Kafka connector is limited to loading data into Snowflake.


➢ TARGET TABLES FOR KAFKA TOPICS:


o Kafka topics can be mapped to existing Snowflake tables in the Kafka configuration. o If the topics are not mapped, then the Kafka connector creates a new table for each topic using the topic name.

o Table name rules in the absence of Table name:

▪ Lowercase topic is converted to Uppercase Table name

▪ Topic name starts A-Z, a-z, or (_). If not, the connector prepends an underscore to the table name.

▪ For any illegal character in the Topic name, that character is replaced with an underscore (_).

o Kafka topics can be mapped to existing Snowflake tables in the Kafka configuration. If the topics are not mapped, then the Kafka connector creates a new table for each topic using the topic name. The suffix is an underscore followed by a generated hash code. o Table Structure for Kafka Topics:

▪ RECORD_CONTENT: Contains Kafka Message

▪ RECORD_METADATA: Contains metadata in JSON about the message, i.e., topic, partition, offset, createTime/LogAppendTime, key, schema_id, headers.


➢ ARCHITECTURE AND WORKFLOW:


o The Kafka connector subscribes to one or more Kafka topics based on the configuration information provided via the Kafka configuration file or command line (or the Confluent Control Center; Confluent only).

o The connector creates the following objects for each topic:

▪ One internal stage to temporarily store data files for each topic

▪ One pipe to ingest the data files for each topic partition

▪ One table for each topic

o Architecture diagram:



Workflow diagram:

▪ One or more applications publish JSON or Avro records to a Kafka cluster. The records are split into one or more topic partitions.

▪ The Kafka connector buffers messages from the Kafka topics. When a threshold (time or memory or a number of messages) is reached, the connector writes the messages to a temporary file in the internal stage. The connector triggers Snowpipe to ingest the temporary file. Snowpipe copies a pointer to the data file into a queue.

▪ A Snowflake-provided virtual warehouse loads data from the staged file into the target table (i.e., the table specified in the configuration file for the topic) via the pipe created for the Kafka topic partition.

▪ The connector monitors Snowpipe and deletes each file in the internal stage after confirming that the file data was loaded into the table.

o If a failure prevents the data from loading, the connector moves the file into the table stage and produces an error message.


➢ FAULT TOLERANCE:


o Both Kafka and the Kafka Connector are fault-tolerant.

o Messages are neither duplicated nor silently dropped. Data deduplication logic in the Snowpipe workflow in the data loading chain eliminates duplicate copies of repeating data except in rare cases.

o If an error is detected while Snowpipe loads a record, then the record is not loaded; instead, the record is moved to a table stage.

o Limitations of Fault Tolerance with Connector:

▪ If messages in the Kafka topic are deleted or updated, these changes might not be reflected in the Snowflake table.

▪ The default retention time is seven days. If the system is offline for more than the retention time, then expired records will not be loaded. Similarly, if Kafka’s storage space limit is exceeded, some messages will not be delivered.

o Performance tuning:

▪ Tuning the number of nodes in the Connect cluster

▪ Tuning the number of tasks allocated to the connector

▪ Understanding the impact of the network bandwidth between the connector and the Snowflake deployment


2. ENVIRONMENT SETUP: ON-PREMISE

a. UBUNTU SETUP (FOR WINDOWS ONLY):


i) Goto Start -> Turn Windows Feature on or off


ii) In Windows Feature Box, Scroll Down to “Windows Subsystem for Linux” and enable it by checking the box.



iii) Goto Microsoft Store -> Download and Install Ubuntu LTS (Any Ver.)



iv) Restart the PC.


b. JAVA SETUP:


i) Install Java-8:

$ sudo apt-get install openjdk-8-jdk (Might require apt-get update, apt-get upgrade )

Note: Complete Kafka Service Setup before Steps ii & iii


ii) Download Bouncy Castle Libraries(OPTIONAL: Required for Encrypted Private Key):


a) Get the latest version of JAR file for bc-fips


b) Use wget with the link to the jar file from the above step.


$ wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.2.3/bc-fips-1.0.2.3.jar


c) Get the latest version of JAR file for bcpkix-fips


d) Use wget with the link to the jar file from the above step.

$ wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar


iii) Move .jar files to libs folder(OPTIONAL: Required for Encrypted Private Key):


a) Use the Move command to mv downloaded bc-fips file


$ mv bc-fips-1.0.2.3.jar kafka/libs


b) Use the Move command to mv downloaded bcpkix-fips file

$ mv bcpkix-fips-1.0.3.jar kafka/libs


c. RSA KEY SETUP:


i) Encrypted Private Key

$ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8



ii) Encrypted Public Key

$ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub



iii) RSA File Check

$ ls -> Check if rsa_key.p8 and rsa_key.pub is listed



iv) [IMPORTANT] Copy the Content of rsa_key.pub in one line (No line space. For the next step)

-> vim rsa_key.pub



d. SNOWFLAKE ACCOUNT SETUP:


a) Database, Schema, Warehouse, and Role Setup


i) Use the below Script:




ii) Grants and Privileges on created objects



iii) Create a User and Assign a Role



3. KAFKA SERVER SETUP (LINUX AND WINDOWS)

1. Kafka Service Setup:

  1. Download Kafka Service from Apache Kafka Site:

  2. Get the latest TGZ for Kakfa from Apache Kafka site.

Note: Kafka Version 2.8.1 and above doesn’t require Zookeeper Service.



ii. Navigate to the preferred Kafka Service version



iii. Right-click any of the Kafka files ending with .tgz (extension) and Copy the Link Address



iv. In Windows, Open ubuntu(From Start Menu) -> cd Kafka-Server -> wget <Copied_link_address> -> Enter


Or


In Linux, Terminal -> cd Kafka-Server -> wget <Copied_link_address> -> Enter



b. Extract Kafka Tarball(tgz):


i. In Kafka-Server Folder, run the list command to get the kafka-tarball filename $ ls



ii. Extract the tarball from the folder


$ tar -xvzf <kafka_file_name(ending with .tgz)>



ii. Rename Extracted Kafka Directory


$ mv <kafka_directory> kafka


Note: Kafka directory name will contain Kafka Filename without postfix (.tgz)



2. Zookeeper Service Setup (OPTIONAL: FOR KAFKA VERSION BELOW 2.8.1):


a. Download Zookeeper Service from Apache Zookeeper Site:


i. Get the latest TGZ for Zookeeper from Apache Zookeeper site.


Note: Kafka Version 2.8.1 and above doesn’t require Zookeeper Service.

ii. Navigate to the preferred Zookeeper Service version



Right-click any of the Zookeeper files ending with .tgz (extension) and Copy the Link Address



iv. In Windows, Open ubuntu(From Start Menu) -> cd Kafka-Server -> wget <Copied_link_address> -> Enter


Or


In Linux, Terminal -> cd Kafka-Server -> wget <Copied_link_address> -> Enter


b. Extract Zookeeper Tarball(tgz):


i. In Kafka-Server Folder, run the list command to get the zookeeper-tarball filename

$ ls



ii. Extract the tarball from the folder


$ tar -xvzf <zookeeper_file_name(ending with .tgz)>



c. Rename Extracted Zookeeper Directory:


$ mv <zookeeper_directory> zookeeper


d. Zookeeper Configuration (Use Default):


$ mv zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo.cfg


4. SNOWFLAKE KAFKA CONNECTOR:

1. Download Snowflake Kafka Connector Jar:


$ wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.5.0/snowflake-kafka-connector-1.5.0.jar


2. Move the Jar to the kafka/libs directory:


$ mv snowflake-kafka-connector-1.5.0.jar kafka/libs


3. Configuring Kafka Snowflake Connector:

a) Create the Configuration file

$ vi kafka/config/snowflake-kafka-connector.properties

b) Add the following configuration with changes as required




c) Save the File. For vi editor, esc -> Shift + : -> wq -> Enter


5. OPERATION CHECK: KAFKA PRODUCER AND CONSUMER

Note: For Operation, we’ll need multiple terminals in both Linux and Windows


a) Window 1: Start Zookeeper Service

$ kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties


b) Window 2: Start Kafka Service

$ kafka/bin/kafka-server-start.sh kafka/config/server.properties


c) Window 3: Sample File


i. Create a sample JSON file

$ vi car.json

ii. Paste the below data in the file



iii. Save and Exit. For vi editor, esc -> Shift + : -> wq -> Enter


d) Window 3: Kafka Topic


i. Create Kafka Topic with 1 Partition:

$ kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -- partitions 1 --topic car (For Kafka-2.8.1 below)

Or


$ kafka/bin/kafka-topics.sh --create --topic car --replication-factor 1 --partitions 1 --bootstrap-s erver localhost:9092 (For Kafka-2.8.1 and above)

ii. Verify Topic Creation

$ kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 (For Kafka-2.8.1 below)


Or


$ kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 (For Kafka-2.8.1 and above)


e) Window 4: Kafka Console Producer

$ kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic car < car.json


f) Window 5: Kafka Console Consumer

$ kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic car --from beginning


g) Clean-Up


i. Ctrl + C, to stop kafka Consumer.

ii. Close Windows 4 and 5.


6. OPERATION EXECUTION: KAFKA TO SNOWFLAKE INGESTION

a) Run jps to check service following services are running:

i. QuorumPeerMain (Zookeeper)

ii. Kafka

iii. ConnectStandalone (Kafka-Connect Task)


b) Run Kafka Connect Standalone

$ kafka/bin/connect-standalone.sh kafka/config/connect-standalone.properties kafka/config/snowflake-kafka-connector.properties


c) Log in to Snowflake. Verify data is inserted in Mentioned Table


OPERATION PROBLEM

a) Producer Error: Request Error


i. Resolution 1: Update kafka/config/producer.properties by adding the below line

Max.request.size=101214400

Then run the below command:

$ kafka /bin/kafka-console-producer.sh --broker-list localhost:9092 --topic car < car.json -- producer.config kafka/config/producer.properties


ii. Resolution 2: Use the below command to upload the file

$ kafka /bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic car < car.json --producer-property max.request.size=101214400


b) Topic Error: Max Message Size


i. Resolution: Run the Below command on the created topic

$ kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity name car --add-config max.message.bytes=101214400


c) Kafka Error: On Restart, Kafka Service or Zookeeper Service throws error


i. Resolution: Run the Below commands, post which Kafka the topic needs to be created again, and all the properties on the topic must be altered again

$ rm -rf /tmp/kafka-logs/

$ rm -rf /tmp/zookeeper/


8. CONCLUSION

In this article, we have seen:

  1. Quick overview of what Kafka is and what is Snowflake Kafka Connector.

  2. Architecture and workflow for the JSON Data Ingestion into Snowflake using Apache Kafka on a local machine running either Windows or Linux Operating System.

  3. Discussed Fault tolerance and Performance tuning for Apache Kafka

  4. First, How to set up the environment on our operating system based on the Kafka version we’re using and the Snowflake environment.

  5. Next, We saw how to configure the Snowflake Kafka Connector properties files to be used with Snowflake.

  6. Next, How to use the Producer console to send messages to the Consumer console once the complete environment and all the services are up and running.

  7. Later, We Published a JSON file using the Standalone Producer Console with the properties file we configured in step 5 and confirmed the JSON data was published to the Snowflake table mapped in the file.

  8. At Last, we discussed the possible problems that may occur during the operations and their resolution.

The scope of this article was very limited to using the no-code apache Kafka service, but in most of the use cases for real-time data ingestion, we code our own producer and consumer in any programming languages like python, java, etc., which gives the immense possibility ingest the data. We can also ingest TEXT and AVRO files with this no-code approach but with a workaround, CSV files can also be ingested(Reference link Attached).


9. REFERENCES:

CSV Data Ingestion:

https://medium.com/streamthoughts/streaming-data-into-kafka-s01-e01-loading-csv-file-8ea053b232cb

Apache Kafka Documentation:

https://kafka.apache.org/documentation/

Configuration to improve performance:

https://towardsdatascience.com/10-configs-to-make-your-kafka-producer-more-resilient-ec6903c63e3f


375 views0 comments
bottom of page