Airflow Integration With Snowflake

Author: Tushar Jadhav


What is Airflow?

Airflow is an open-source workflow automation and scheduling platform that programmatically authors, schedules, and monitors workflows—widely used for orchestrating complex computational workflows, data processing pipelines, and ETL processes. You can easily visualize your data pipeline’s dependencies, progress, logs, code, trigger tasks, and success status.

Why should we use Airflow?

The tool is extendable and has a large community, so it can be easily customized to meet our company’s individual needs.

Airflow offers a compelling and well-equipped UI. That makes tracking jobs, re-running, and configuring the platform a very easy task.


There are many features of apache airflow, six of them important as below – 1. DAGs: DAGs are called Directed Acyclic Graphs. It represents a workflow in Airflow. Each node in a DAG represents a task that needs to be run. They allow us to model natural dependencies in data processing without and force you to architect our workflow with a sense of completion.

2. User-friendly monitoring interface: Airflow provides a monitoring and managing interface, where it is possible to have a quick overview of the status of the different tasks and have the possibility to trigger and clear charges or DAGs runs.

3. Easy to interact with logs: Airflow provides easy access to the logs of each of the different tasks run through its web-UI, making it easy to debug tasks in production.

4. Dynamic Pipeline Generation: Airflow pipelines are configuration-as-code (Python), allowing for dynamic pipeline generation. This allows for writing code that creates pipeline instances dynamically. The data processing we do is not linear and static.

5. Notification system: Airflow provides a default alerting system through email, but you can set it up through slack operator to get alert notifications on slack.

6. Automate our Queries or Python Code: Airflow has a lot of operators set up to run code. Airflow has an operator for most databases and is set up in Python. It has a PythonOperator that allows for quickly porting python code to production.

Snowflake Data Warehouse

Snowflake Data Warehouse is a fully managed, cloud data warehouse available to customers in the form of Software-as-a-Service (SaaS) or Database-as-a-Service (DaaS). The phrase ‘fully managed’ means users shouldn’t be concerned about back-end work like server installation, maintenance, etc. Its unique architecture is a hybrid of traditional shared-disk database architectures and shared-nothing database architectures, which allows complete relational database support on both structured as well as semi-structured data (CSV, JSON, ORC, Avro, Parquet, XML), and now it is supporting to unstructured data such as Images, Videos, etc.

Prerequisites for the integration

There are three basic things needed to learn before started and which are mentioned below:

  • We should have a working knowledge of Python and install the latest python version, i.e., Python 3.8.10.

  • We should have a Snowflake account with access to perform read and write operations.

  • We need to have access to the latest Apache-Airflow version with dependencies installed.

Steps for installing Apache-Airflow with Dependencies:

1. Download Ubuntu from Microsoft Store and install it. 2. After installation, open the Passwordubuntu terminal and set the functions activateusernPasswordweb server password. This only Passwordneeded once when we instantly started the ubuntu. Note: Now, we need to start two different Ubuntu Terminal in which one of the terminals we need to run a web server, and on another, we will run the scheduler. Below is the first ubuntu terminal where we will run Apache-Airflow Webserver 3. After setting the ubuntu, enter into sudo environment by command sudo su. 4. Update the sudo by command apt-get update. 5. Update the sudo by a command apt-get upgrade. 6. Install python virtual environment by command pip install virtualenv. 7. Create a virtual environment by command virtualenv -p python3 airflow_venv. 8. Activate virtual environment by command source airflow_venv/bin/activate. 9. Install apache airflow by command pip install apache-airflow. 10. Set environment variable by command export AIRFLOW_HOME=. 11. Initialize the airflow database by command airflow db init. 12. Run airflow webserver by command airflow webserver -p 8080. Now we will be at the login page. 13. Now, We need to create a login id and password in Apache-Airflow by command.

airflow users create -e EMAIL -f FIRSTNAME -l LASTNAME [-p PASSWORD]
-r ROLE [--use-random-password] -u USERNAME.

After successful login, we will be able to see the below image on UI.


As seen in the above image Airflow web server is active, but we also need to activate the scheduler. Below are steps to active Airflow Scheduler:

Open another ubuntu terminal, enter into sudo environment by command sudo su

  • Activate virtual environment by command source airflow_venv/bin/activate.

  • Initialize the airflow database by command airflow DB init.

  • Now run the scheduler by command airflow scheduler.

  • Open in the browser localhost:8080.

You can see the below if you successfully execute the; above steps along with the webserveractivate and scheduler.


Integration of Apache-Airflow with Snowflake:

Configure Apache-Airflow with snowflake connection. Open localhost:8080 in the browser and go under Admin->Connections. Click on the + symbol and add a new record. Choose the connection type as Snowflake and fill in other details as shown in the screenshot.


Create a DAG file

Go to the folder you’ve designated to be your AIRFLOW_HOME and find the DAGs folder in subfolder_dags/[Inside dag folder, we need to paste Python file]. Create a Python file with the name snowflake_airflow.py that will contain your DAG. Your workflow will automatically be picked up and scheduled to run.

Note: If we cannot find the file directory, go to views and right-click on hidden files.

Below is the complete example of the DAG for the Airflow Snowflake Integration:

import logging
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

args = {"owner": "Airflow", "start_date": airflow.utils.dates.days_ago(2)}

dag = DAG(
dag_id="snowflake_automation", default_args=args, schedule_interval=None
)
snowflake_query = [
"""create table TUSHAR.Employee (id number, first_name string, last_name string, company string, email string, cellphone string, streetaddress string, city string, postalcode number);""",
"""insert into TUSHAR.Employee values(1, 'Rakesh', 'Singh', 'TCS', 'rakesh123', '12345', 'NH4', 'Pune', 24521 ),(2, 'Jack', 'Sparrow', 'Apisero', 'jack123', '5678', 'NH8', 'Mumbai', 998877 );""",
]
def get_row_count(**context):
dwh_hook = SnowflakeHook(snowflake_conn_id="snowflake_conn")
result = dwh_hook.get_first("select count(*) from TUSHAR.Employee")
logging.info("Number of rows in `TUSHAR.Employee` - %s", result[0])
with dag:
create_insert = SnowflakeOperator(
task_id="snowfalke_create",
sql=snowflake_query ,
snowflake_conn_id="snowflake_conn",
)
get_count = PythonOperator(task_id="get_count", python_callable=get_row_count)
create_insert >> get_count

Expected Output:

The above dag will contain the scheduling information and the sequence in which the tasks will execute. DAG execution can be tracked in webserver UI and will look something as below.


Now you can verify the queries fired by the airflow under the history section in snowflake UI. The same queries can be found in the attached DAG file.


Issues found during Integration:

1. While restarting the Apache-airflow we will face an issue about the port is already running. To overcome that, we should use the kill the port by PID number. The command used to kill the port is kill [PID number] or kill -KILL [PID number] or kill -9[PID number]. 2. Always carefully define the DAGs file name. If the two DAGs file names are the same, then the two DAGs files will overlap. 3. The airflow snowflake objects are built for AWS and are not compatible with GCP, so you can’t make a connection successfully if you have a snowflake GCP instance. For this drawback, you need to find GCP versions or create GCP compatible versions.

Conclusion:

In this article, we have learned about Airflow, Snowflake, and how to use Airflow Snowflake combinations for efficient ETL and data pipeline. Not only can we check the heartbeat of our channels, but we can also view graphical representations of the same code we write on UI. Airflow comes with numerous powerful integrations that serve almost any need when it comes to output data.

60 views0 comments

Recent Posts

See All