Author: Mahesh Gupta Bandaru
In the event that you are searching for a simple, cheap data pipeline to pull limited quantities of information from a steady API and store it in cloud storage, then, at that point, the serverless function is a decent decision.
We will involve AWS Lambda for this straightforward venture. AWS Lambda can be utilized and has a running capability without provisioning and keeping a server.
For these, we have to write a python script and deploy it to AWS lambda. Then we can set the function to run the codes based on the external triggers. Then it will load the data into S3 Bucket.
AWS Lambda Function:
Since AWS has turned into the most well-known decision for cloud administrations, there has been a consistent expansion in the number of associations totally conveying their arrangement in AWS. Consequently, the highlights presented by AWS have likewise been investigated generally and have opened up a great many opportunities for the end clients to additionally reinforce their cloud arrangement.
AWS Lambda is a serverless, occasion-driven process administration that allows you to run code for practically any kind of use or backend administration without provisioning or overseeing servers.
Amazon Simple Storage Service (Amazon S3) is an item stockpiling administration offering industry-driving adaptability, information accessibility, security, and execution. Clients of all sizes and ventures can store and safeguard any measure of information for essentially any utilization case, like information lakes, cloud-local applications, and versatile applications. With savvy capacity classes and simple-to-utilize executive highlights, you can streamline costs, sort out information, and arrange tweaked admittance controls to meet explicit business, authoritative, and consistency necessities.
Amazon EventBridge is a serverless event-driven transport that makes it simpler to fabricate event-driven applications at scale utilizing occasions produced from your applications, coordinated Hu Software-as-a-Service (SaaS) applications.
Snowpipe enables loading data from files as soon as they’re available in a stage.
Snowflake empowers information capacity, handling, and insightful arrangements that are quicker, simpler to utilize, and undeniably more adaptable than customary contributions. The Snowflake data platform is not built on any existing database technology or “big data” software platforms such as Hadoop.
For dynamic data, first, we have created a lambda function. Then with the help of python code, we get the data from API. We also added an event bridge to trigger the lambda function every 24 hours.
We have added cloud watch so that it monitors the process, and if any error occurs while loading the data into Snowflake, then it will send that to error logs, and we will recieve the error notification with the help of sns.
And then, with the help of snowpipe, we load the data from S3 bucket into Snowflake database tables.
Getting Covid API:
API endpoint: GET https://api.covid19tracker.in/data/static/timeseries.min.json to get covid daily updated data.
The GET endpoint will return a list of JSON files with the format.
This API has nested json, so we have to write a script to convert it from nested JSON to CSV.
1. Go to IAM and Create a role.
2. Attach the required policies to that role that we have created:
3. Create S3 bucket:
4. Create Lambda Function:
5. Attach the role that we have created to lambda function
6. Python code for lambda function for getting data through API into s3 bucket
from datetime import datetime, timedelta
from botocore.vendored import requests
s3 = boto3.client('s3')
today = datetime.today()
def lambda_handler (event, context):
response = requests.get('https://api.covid19tracker.in/data/static/timeseries.min.json')
response = response.json()
# formatting data
states = list(response.keys())
# print (list (states), type(states))
for i in states:
lastDate = list(datesData.keys())[-1]
# logic to write json to S3 bucket
now = datetime.now()
date_time = now.strftime("%m%d%Y%H%M%S")
fileName = 'test3' +date_time+ '.csv'
csvio = io.StringIO()
writer = csv.writer(csvio)
headers = list(data.keys())
for i in data:
values = list (i.values())
s3.put_object(Bucket=bucket,ContentType='text/csv', Key=fileName, Body=csvio.getvalue())
7. Deploy the code in the lambda function and then test the code.
8. Create an Amazon Event Bridge(cloud watch)event to trigger the lambda function every 24 hours.
9. Check the S3 bucket whether the data(API data) is present or not.
10. By using snowpipe, we will update the latest data into a Snowflake table.
For that, we have created policies and roles:
Created snowpipe dynamic policy
11. Adding policy to the user.
Created snowpipedynamic role
12. By clicking on desc integration, we will get the AWS id and external id update in role->trust policy and click on update.
13. By Clicking on the select statement, we will get the data for covid.
These Lambda functions want to pull data from an API that we access and assess the quality of the data. Your responsibility is to pull the required data from the API and make it available to cloud storage for other consumers to consume the data and assess the quality of the data.