Data Ingestion From Kafka To Snowflake

Author: Bharath Kumar Reddy


This blog gives you a brief overview of Apache Kafka and how to setup a kafka connector to ingest data into snowflake.


What is Apache Kafka?
  • Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day.

  • Kafka, like many message publish/subscribe platforms, allows a many-to-many relationship between publishers and subscribers.

  • An application publishes messages to a topic (similar to a database), and an application subscribes to a topic to receive those messages. Kafka can process, as well as transmit, messages.

  • Topics can be divided into partitions to increase scalability.

  • A single application can publish to many topics, and a single application can subscribe to multiple topics. With Snowflake, the typical pattern is that one topic supplies messages (rows) for one Snowflake table.

  • Kafka Connect is a framework for connecting Kafka with external systems, including databases. A Kafka Connect cluster is a separate cluster from the Kafka cluster. The Kafka Connect cluster supports running and scaling out connectors (components that support reading and/or writing between external systems).

  • The Snowflake Connector for Kafka (“Kafka connector”) is designed to run in a Kafka Connect cluster to read data from one or more Apache Kafka topics and load the data into a Snowflake table.

  • From the perspective of Snowflake, a Kafka topic produces a stream of rows to be inserted into a Snowflake table. In general, each Kafka message contains one row.


Target Tables
  • Kafka topics can be mapped to existing Snowflake tables in the Kafka configuration.

  • If the topics are not mapped, then the Kafka connector creates a new table for each topic using the topic name.

  • Supports JSON and AVRO file formats only.


Schema of Tables for Kafka Topics

Every Snowflake table loaded by the Kafka connector has a schema consisting of two VARIANT columns:

  • RECORD_CONTENT. This contains the Kafka message.

  • RECORD_METADATA. This contains metadata about the message, for example, the topic from which the message was read.

Snowflake provides two versions of the connector:

  1. A version for the Confluent Package version of Kafka.

  2. A version for the open source software (OSS) Apache Kafka package.

In this demonstration, we have used the cloud based confluent package version of Kafka.

This link redirects you to confluent cloud sign-up page

Please click on the above link to create a trial account with confluent cloud and start using kafka with snowflake.

Why confluent Kafka?
  • Cloud Native - Completely re-architected Kafka for the cloud to be elastically scalable and globally available - providing a serverless, cost-effective, and fully managed service ready to deploy, operate, and scale in a matter of minutes.

  • Confluent completes Kafka with 120+ connectors, stream processing, enterprise security & governance, global resilience, and more, eliminating the burden and risk of building and maintaining these in-house.

  • Whether in the cloud or across multiple clouds, Confluent has you covered - best of all, you can seamlessly link it all together in real-time to create a consistent data layer across your business.

The Snowflake-Kafka connector relies on key pair authentication rather than basic authentication (i.e. username and password). This authentication method requires a 2048-bit (minimum) RSA key pair. Generate the public-private key pair using OpenSSL. The public key is assigned to the Snowflake user defined in the configuration file.

Please refer to this Snowflake Documentation for more information on how to generate public-private key pairs using OpenSSL.


Workflow for the Kafka Connector

The Kafka connector completes the following process to subscribe to Kafka topics and create Snowflake objects:

  1. The Kafka connector subscribes to one or more Kafka topics based on the configuration

  2. The connector creates the following objects for each topic:

  • One internal stage to temporarily store data files for each topic.

  • One pipe to ingest the data files for each topic partition.

  • One table for each topic. If the table specified for each topic does not exist, the connector creates it; otherwise, the connector creates the RECORD_CONTENT and RECORD_METADATA columns in the existing table



The following diagram shows the ingest flow for Kafka with the Kafka connector:



  1. One or more applications publish JSON or Avro records to a Kafka cluster. The records are split into one or more topic partitions.

  2. The Kafka connector buffers messages from the Kafka topics. When a threshold (time or memory or number of messages) is reached, the connector writes the messages to a temporary file in the internal stage. The connector triggers Snowpipe to ingest the temporary file. Snowpipe copies a pointer to the data file into a queue.

  3. A Snowflake-provided virtual warehouse loads data from the staged file into the target table (i.e. the table specified in the configuration file for the topic) via the pipe created for the Kafka topic partition.

  4. The connector monitors Snowpipe and deletes each file in the internal stage after confirming that the file data was loaded into the table. If a failure prevents the data from loading, the connector moves the file into the table stage and produces an error message.

  5. The connector repeats steps 2-4.


1. Setting Up Snowflake

Snowflake recommends that you create a separate user and role for each Kafka instance so that the access privileges can be individually revoked if needed. The role should be assigned as the default role for the user.

The following script creates a separate role for the kafka instance to use and grants required privileges.

use role securityadmin;

create role kafka_connector_role_1;

-- Grant privileges on the database.

grant usage on database demo_db to role kafka_connector_role_1;

-- Grant privileges on the schema.

grant usage on schema demo_db.kafka_demo to role kafka_connector_role_1;

grant create table on schema demo_db.kafka_demo to role kafka_connector_role_1;

grant create stage on schema demo_db.kafka_demo to role kafka_connector_role_1;

grant create pipe on schema demo_db.kafka_demo to role kafka_connector_role_1;

grant role kafka_connector_role_1 to user bharath;

2. Setup Confluent Kafka Cluster
  • It is recommended to choose the same cloud provider in which your snowflake account is hosted for faster data transmission and to avoid extra costs.




3. Connect Kafka to Snowflake

Connect the kafka cluster with snowflake by using kafka connect snowflake connector.

  • Create a topic -- to ingest data through a topic

  • Configure snowflake sink connector



Code Snippet:

from flask import Flask, request, jsonify

from flask_cors import CORS, cross_origin

from json_schema import json_schema

from werkzeug.exceptions import HTTPException

from confluent_kafka import Producer

import socket

import json

app = Flask(__name__)

cors = CORS(app)

app.config['CORS_HEADERS'] = 'Content-Type'

@app.errorhandler(404)

def resource_not_found(error):

"""

Resource not found error handler. Returns JSON instead of HTML Page

"""

return jsonify(message=str(error))

@app.errorhandler(500)

def server_error(error):

"""

Server error 500 (error handler). Returns JSON instead of HTML Page

"""

return jsonify(message=str(error))

customer_sample = '''{

"CUSTOMER_ID":102,

"CUSTOMER_NAME":"Tom",

"DATE_OF_BIRTH":"2000-11-11",

"GENDER":"male",

"INCOME":120000,

"PROFESSION":"Doctor",

"PHONE_NUMBER":998888888,

"EMAIL_ID":"Tom@gmail.com",

"LOCATION_ID":"78",

"CREATED_DATE":"2019-03-01",

"LAST_MODIFIED":"2013-09-20"

}'''

customer_schema_str = json_schema.dumps(customer_sample)

customer_schema = json_schema.loads(customer_schema_str)

location_sample = '''{

"LOCATION_ID": 1,

"CITY": "Achalpur",

"STATE": "Maharashtra",

"PINCODE": 100000

}'''

location_schema_str = json_schema.dumps(location_sample)

location_schema = json_schema.loads(location_schema_str)

Bank_sample = '''{

"BANK_ID":1000,

"CUSTOMER_ID":100,

"BANK_NAME":"Axis",

"BANK_ACCOUNT_NUMBER":728310006296,

"BRANCH":"Hospet",

"IFSC_CODE":"HDFC7863765",

"CREATED_DATE":"2019-12-20",

"LAST_MODIFIED":"2013-09-20"

}'''

Bank_schema_str = json_schema.dumps(Bank_sample)

Bank_schema = json_schema.loads(Bank_schema_str)

Card_sample = '''{

"CARD_ID":2000,

"CUSTOMER_ID":100,

"CARD_NUMBER":5612200011065932,

"CARD_TYPE":"CREDIT",

"CARD_VENDOR":"MASTERCARD",

"VALIDITY":"8/29/2023",

"BANK_NAME":"BoB",

"CREATED_DATE":"2019-03-29",

"LAST_MODIFIED":"2020-04-30"

}'''

Card_schema_str = json_schema.dumps(Card_sample)

Card_schema = json_schema.loads(Card_schema_str)

Transaction_sample = '''{

"TRANSACTION_ID":100,

"CUSTOMER_ID":1000,

"TRANSACTION_SOURCE":"Card",

"SOURCE_NUMBER":2218,

"VENDOR_NAME":"Sporer-Stroman",

"CATEGORY":"Investment",

"TRANSACTION_AMOUNT":152.04,

"TRANSACTION_TYPE":"Debit",

"TRANSACTION_TIMESTAMP":"2022-01-12",

"PROCESSED_TIMESTAMP":"2022-01-14",

"TAG_ID":45

}'''

Transaction_schema_str = json_schema.dumps(Transaction_sample)

Transaction_schema = json_schema.loads(Transaction_schema_str)

def Kafka_Producer(topic_name, message):

"""

Kafka producer sends data to respective topic.

"""

conf = {'bootstrap.servers': "<Enter Text Here>",

'security.protocol': "SASL_SSL",

'sasl.mechanisms': "PLAIN",

'sasl.username': "<Enter Text Here>",

'sasl.password': "<Enter Text Here>",

'client.id': socket.gethostname()}

producer = Producer(conf)

producer.produce(topic_name, key="key", value=message)

producer.flush()

print('Payload sent...')

return 'done'

@app.errorhandler(HTTPException)

def handle_exception(e):

"""Return JSON instead of HTML for HTTP errors."""

# start with the correct headers and status code from the error

response = e.get_response()

# replace the body with JSON

response.data = json.dumps({

"code": e.code,

"name": e.name,

"description": e.description,

})

response.content_type = "application/json"

return response

@app.route('/customer/', methods=['GET', 'POST'])

@cross_origin()

def customer():

"""

Customer Topic API for inserting data in customer schema

"""

request_data = request.get_json(force=True, silent=True)

json_data = json.dumps(request_data)

if json_schema.match(json_data, customer_schema_str):

topic_name = 'Customer'

Kafka_Producer(topic_name, json_data)

return jsonify({"message": "Processed Successfully"})

return jsonify({"message": "Bad Request"})

@app.route('/location/', methods=['GET', 'POST'])

@cross_origin()

def location():

"""

Location Topic API for inserting data in customer schema

"""

request_data = request.get_json(force=True, silent=True)

json_data = json.dumps(request_data)

if json_schema.match(json_data, location_schema_str):

topic_name = 'Location'

Kafka_Producer(topic_name, json_data)

return jsonify({"message": "Processed Successfully"})

return jsonify({"message": "Bad Request"})

@app.route('/Bank/', methods=['GET', 'POST'])

@cross_origin()

def Bank():

"""

Bank Topic API for inserting data in customer schema

"""

request_data = request.get_json(force=True, silent=True)

json_data = json.dumps(request_data)

if json_schema.match(json_data, Bank_schema_str):

topic_name = 'Bank'

Kafka_Producer(topic_name, json_data)

return jsonify({"message": "Processed Successfully"})

return jsonify({"message": "Bad Request"})

@app.route('/Card/', methods=['GET', 'POST'])

@cross_origin()

def Card():

"""

Card Topic API for inserting data in customer schema

"""

request_data = request.get_json(force=True, silent=True)

json_data = json.dumps(request_data)

if json_schema.match(json_data, Card_schema_str):

topic_name = 'Card'

Kafka_Producer(topic_name, json_data)

return jsonify({"message": "Processed Successfully"})

return jsonify({"message": "Bad Request"})

@app.route('/Transaction/', methods=['GET', 'POST'])

@cross_origin()

def Transaction():

"""

Transaction Topic API for inserting data in customer schema

"""

request_data = request.get_json(force=True, silent=True)

json_data = json.dumps(request_data)

if json_schema.match(json_data, Transaction_schema_str):

topic_name = 'Transaction'

Kafka_Producer(topic_name, json_data)

return jsonify({"message": "Processed Successfully"})

return jsonify({"message": "Bad Request"})

if __name__ == "__main__":

app.run(debug=True, port=8080)

Conclusion:

After completing the above steps, one can set up clients to produce data with your cluster in the programming language of your choice (Python in our case) or can configure providers available in the confluent cloud with your cluster.

Please refer to the below resource to learn more about kafka.


33 views0 comments

Recent Posts

See All