Snowflake Task Manager Using Streamlit

Author: Sudhendu Pandey


Prelude

One of the constant holdbacks to using the Snowflake Task feature is the manageability and maintainability aspect.


Creating a task is not so hard (we ❤️ Snowflake). It is usually the maintenance, monitoring, etc that is a bit difficult.


Few nice to have features for Snowflake Task would be:

  1. A single view of all the tasks for an account.

  2. Task-specific dashboard showing upcoming tasks, recently failed tasks, etc.

  3. Account-wide centralized dashboard and statistics of tasks.


Note 01: Snowflake is continuously innovating. A thumb rule I follow is: if there is something you can do better in Snowflake, there is someone in the Snowflake engineering team doing exactly that. It is either in ideation, development, QA, private preview, or public preview. They can read your mind.


This means in due time, Snowflake will come up with something native to address all the limitations mentioned above (in fact there is already a preview feature for 1) ).



This exercise is then more of learning Snowflake and Streamlit.


Note 02: This is a weekend(s) project for me so not so clean code! For example, I have used accountadmin, I have not created methods in Python, no pagination, etc. I will eventually clean it up! I would recommend you use this more as a tutorial or fun project than a product! No guarantees!


Snowflake Task

Below is the syntax for creating a Task:

CREATE TASK THIS_IS_SO_EASY WAREHOUSE = mywh SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles' AS SELECT CURRENT_TIMESTAMP;


That is as simple as it can get. We are going to use the following SQL commands in Snowflake.


-- Command 01: Show all tasks in your Snowflake Account.


SHOW TASKS IN ACCOUNT;


-- Command 02: Shows all the task history.

-- This is the similar to query used by Snowflake to create the TASK_HISTORY view.


SELECT NAME, DATABASE_NAME, SCHEMA_NAME,

date_trunc( 'second', CONVERT_TIMEZONE('Asia/Dubai', SCHEDULED_TIME) ) as SCHEDULED_TIME,

STATE, date_trunc( 'second', CONVERT_TIMEZONE('Asia/Dubai', QUERY_START_TIME) ) as START_TIME,

date_trunc( 'second', CONVERT_TIMEZONE('Asia/Dubai', COMPLETED_TIME) ) as END_TIME,

TIMESTAMPDIFF('millisecond', QUERY_START_TIME, COMPLETED_TIME) as DURATION,

ERROR_CODE, ERROR_MESSAGE, QUERY_ID, NEXT_SCHEDULED_TIME, SCHEDULED_FROM

FROMTABLE(SNOWFLAKE.INFORMATION_SCHEMA.TASK_HISTORY())

ORDERBY SCHEDULED_TIME DESC;



Streamlit

I was a fan of Streamlit when they launched back in 2019! My first few streamlit application was just playing around with traffic data.


I did a serious streamlit project for a local cat 🐈community, creating a data app for them (we named it CatALog). Nobody uses it now 😿, but I still love it 🐱. Here is the link and below is a peek at how it looks :)



If you are new to Streamlit, it is worth checking the discuss forum, the website, and the quickstart available at Snowflake.


The Streamlit App flow

On a high level, below is the flow of the code.

  1. Create a Snowflake connection (how).

  2. Execute Snowflake SQL Statements.

  3. Cleanse the response and store it in a data frame into CSV (this is done to avoid calling Snowflake every time someone interacts with the application, saving credits).

  4. Read the CSVs and then use various Streamlit features to display stats, tables, take actions, etc.

  5. Anytime you want to get the latest data from Snowflake, you can use the button ‘Refresh Data’ to get the latest snapshot from Snowflake.


Snowflake Task Manager with Streamlit: Features

Once connected to your Snowflake environment, the Streamlit application can:


❄ View Task Statistics.

❄ See Task List.

❄ See Execution History.

❄ Execute a Task Ad hoc.


Code Summary

The entire code base is available on git here. Below is just a gist of what we do and how you can use the code.


Imports (the usuals)

# import statement START
import time
from datetime import datetime
import tzlocal
import pandas as pd
import plotly.graph_objects as go
import snowflake.connector
import streamlit as st
from plotly.subplots import make_subplots
from st_aggrid import AgGrid, GridOptionsBuilder
from st_aggrid.shared import GridUpdateMode
import os
# import statement END

Connecting to Snowflake and fetching data

# Creating Snowflake Connection Object START
@st.experimental_singleton
def init_connection():
    return snowflake.connector.connect(
        **st.secrets["snowflake"], client_session_keep_alive=True
    )

conn = init_connection()

@st.experimental_memo(ttl=600)
@st.experimental_singleton
def run_query(query):
    with conn.cursor() as cur:
        cur.execute(query)
        return cur.fetchall()


# Creating Snowflake Connection Object END

# Loading Data from Snowflake START
@st.experimental_memo(ttl=600)
@st.experimental_singleton
def load_data_task_list():
    # load_dt = current_dt()
    ls_all = run_query(
        'SHOW TASKS IN ACCOUNT;')
    df_task = pd.DataFrame(ls_all,
                           columns=['CREATED_ON', 'NAME', 'ID', 'DATABASE_NAME', 'SCHEMA_NAME', 'OWNER', 'COMMENT',
                                    'WAREHOUSE', 'SCHEDULE', 'PREDECESSORS', 'STATE', 'DEFINITION', 'CONDITION',
                                    'ALLOW_OVERLAPPING_EXECUTION', 'ERROR_INTEGRATION', 'LAST_COMMITTED_ON',
                                    'LAST_SUSPENDED_ON'])

    df_task['CREATED_ON'] = df_task['CREATED_ON'].dt.strftime('%Y-%m-%d %H:%M:%S')

    df_task.to_csv(task_list_file, index=False)
    final_df_task_list = pd.read_csv(task_list_file)
    loadtime_list = current_dt()
    print('Task List Data is loaded from Snowflake at: ', loadtime_list)
    return final_df_task_list, loadtime_list

# In the below query, we are usign the TASK_HISTORY() function. 
# This function returns task activity within the last 7 days
# or the next scheduled execution within the next 8 days.
# Ideally, if you would like to get more information, 
# you can use the base table: SNOWFLAKE.ACCOUNT_USAGE.TASK_HISTORY.

@st.experimental_singleton
def load_data_task_hist():
    current_dt()
    ls_all = run_query(
        "SELECT NAME, DATABASE_NAME,  SCHEMA_NAME, date_trunc( 'second', CONVERT_TIMEZONE(" + current_tz +
        ", SCHEDULED_TIME) ) as SCHEDULED_TIME, STATE, date_trunc( 'second', CONVERT_TIMEZONE(" + current_tz +
        ", QUERY_START_TIME) ) as START_TIME, date_trunc( 'second', CONVERT_TIMEZONE(" + current_tz +
        ", COMPLETED_TIME) ) as END_TIME, TIMESTAMPDIFF('millisecond', "
        "QUERY_START_TIME, COMPLETED_TIME) as DURATION, ERROR_CODE, ERROR_MESSAGE, QUERY_ID, "
        "NEXT_SCHEDULED_TIME, SCHEDULED_FROM FROM TABLE(SNOWFLAKE.INFORMATION_SCHEMA.TASK_HISTORY()) ORDER BY SCHEDULED_TIME DESC;")

    df_hist = pd.DataFrame(ls_all, columns=[ 'NAME', 'DATABASE_NAME', 'SCHEMA_NAME', 'SCHEDULED_TIME',
                                             'STATE', 'START_TIME', 'END_TIME',
                                            'DURATION',  'ERROR_CODE', 'ERROR_MESSAGE', 'QUERY_ID',
                                            'NEXT_SCHEDULED_TIME','SCHEDULED_FROM'])

    #df_hist['SCHEDULED_TIME'] = df_hist['SCHEDULED_TIME'].dt.strftime('%m-%d-%y %H:%M:%S')

    df_hist.to_csv(task_hist_file, index=False)
    final_df_task_hist = pd.read_csv(task_hist_file)
    loadtime_hist = current_dt()
    print('Task History Data is loaded from Snowflake at: ', loadtime_hist)
    return final_df_task_hist, loadtime_hist


# Loading Data from Snowflake END

Defining the tabs.

Each tab is a container on the Streamlit UI.

# TAB definition START
tab1, tab2, tab3, tab4 = st.tabs(["📈 TASK SUMMARY", "📜 TASK LIST", "🗃 TASK HISTORY", "🏃 EXECUTE TASK"])

The Final Product


Hold it! Wait for it! Here we go!



Running this in your Snowflake Environment?

Please note this code is not production ready.


You can easily run this for your Snowflake environment. Below are the steps:

  1. Set up Python and install the modules which are part of imports.

  2. In the .streamlit directory, edit the secrets.toml connection details. Here is how. Note never check in your secrets.toml file in public git repositories to avoid compromise.

  3. Run the command: streamlit run .\SnowTaskManager.py

  4. If all is good, your Streamlit should start on your http://localhost:8501/


The Next Steps

As mentioned, this might not be the cleanest code, so first thing first, I will clean the repo and code flow a bit. I would add folders and move items around a bit. Pagination is another on the list.


Few new features to add would be an option to create a task directly from Streamlit, have a tab/section dedicated to the failed task, and have the ability to re-trigger it. I wish there was some easy way to visualize DAG in Python or Streamlit. If yes, we can create dags as well!


I would like to thank my colleagues at kipi.bi for helping me! Sumit and Abhijeet!

65 views0 comments

Recent Posts

See All