Author: Rupesh Neve
Introduction:
Airflow is a popular open-source platform for orchestrating complex data workflows. One of its key features is the ability to send email alerts to notify users of workflow successes or failures. With Airflow, it's also possible to attach files to these email alerts, making it easy to include relevant information for debugging or analysis purposes.
Sending email alerts with attachments in Airflow can be a powerful way to keep your team informed and on top of any issues that arise in your data pipelines. Whether you're monitoring for errors, tracking performance, or simply keeping stakeholders up-to-date, this feature can help streamline your workflow and boost productivity. In this article, we'll explore how to send email alerts with attachments in Airflow.
Implementation steps:
Here we will use Microsoft Outlook account and office365 server to send the emails to the targeted users.
Step 1:
Get the App password of your Microsoft Outlook email account. Steps to get the App password are: -
Login to your account and go to My Microsoft Account.
Choose Update under the Security section.
Choose Advanced security options.
Choose Create App Password. (Note: Two-step verification should be enabled).
Step 2:
Once we have the App password, we can go and configure the airflow.cfg file with the parameters as listed below.
Go to the SMTP section and change the following parameter.
SMTP_PORT: 587
SMTP_HOST: smtp.office365.com
SMTP_USER: <YourEmail>@outlook.com
SMTP_PASSWORD: <This is your app password>
SMTP_STARTTLS: True
SMTP_SSL: False
SMTP_MAIL_FROM: <YourEmail>@outlook.com
Step 3:
Once you made the changes into your airflow.cfg file, save it and restart your airflow instance.
Step 4:
Now everything is in place, let's move to the DAG and write a function to send the email on failure with a log file as an attachment.
Code -
def send_email_on_failure(context):​ log_path = <Your log path> email_subject = f"Airflow alert: Task {context.get('task_instance').task_id} failed" email_exception = context.get('exception') email_traceback_str = str(traceback.format_exc()) html_content = f""" <h2>Airflow alert</h2> <h3>Task in the below table has failed. </h3> <table border="1" cellpadding="5"> <tr><th>Task</th><td>{context.get('task_instance').task_id}</td></tr> <tr><th>DAG</th><td>{context.get('dag').dag_id}</td></tr> <tr><th>Execution Time</th><td>{context.get('execution_date')}</td></tr> <tr><th>Exception</th><td>{email_exception}</td></tr> <tr><th>Traceback</th><td><pre>{email_traceback_str}</pre></td></tr> </table> """ #send email functionality email_failure_task = EmailOperator( task_id = 'email_failure', to= <targeted User emails> subject=Subject, html_content=html_content, files=[log_path] ) email_failure_task.execute(context)
Step 5:
Once this function is appended into your DAG file, add one more key-value pair in your args dictionary.
Code -
args = { 'on_failure_callback': <Function name (send_email_on_failure in our case)> }
Step 5:
Execute this DAG file and when your DAG fails you will get the email with attachments.
Conclusion:
To sum up, sending email alerts with attachments in Airflow is an effective way to keep your team informed and your workflow running smoothly. By following the step-by-step guide, best practices, and tips outlined in this article, you can optimize the feature to meet your specific needs and increase productivity. Whether you're monitoring for errors, tracking performance, or simply keeping stakeholders up-to-date, Airflow's email alert feature with attachments is a valuable tool for any data-driven organization.
References:
https://docs.astronomer.io/learn/error-notifications-in-airflow#custom-email-notifications
https://medium.com/@cndro/airflow-email-notification-f4b32269f339
https://airflow.apache.org/docs/apache-airflow/stable/howto/email-config.html