Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dagger/alerts/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from abc import ABC, abstractmethod
from typing import List

from airflow.utils.types import DagRunType
from slack.web.client import WebClient
from dagger import conf
from dagger.utilities.config_validator import Attribute, ConfigValidator
Expand Down Expand Up @@ -98,9 +99,9 @@ def get_task_run_time(task_instance):
def airflow_task_fail_alerts(alerts: List[AlertBase], context):
if conf.ENV == "datatst":
return
if context["dag_run"].external_trigger is True:
if context["dag_run"].run_type == DagRunType.MANUAL:
return
if context["dag"].is_paused is True:
if getattr(context["dag"], "is_paused", False):
return

task_instance = context["task_instance"]
Expand Down
32 changes: 15 additions & 17 deletions dagger/dag_creator/airflow/utils/slack_alerts.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.hooks.base import BaseHook
from airflow.utils.types import DagRunType

SLACK_CONN_ID = "slack"
ENV = os.environ["ENV"].lower()
Expand All @@ -11,6 +11,16 @@ def get_task_run_time(task_instance):
return (task_instance.end_date - task_instance.start_date).total_seconds()


def _should_skip_alert(context):
if ENV == "datatst":
return True
if context["dag_run"].run_type == DagRunType.MANUAL:
return True
if getattr(context["dag"], "is_paused", False):
return True
return False


def task_success_slack_alert(context):
"""
Callback task that can be used in DAG to alert of successful task completion
Expand All @@ -19,14 +29,9 @@ def task_success_slack_alert(context):
Returns:
None: Calls the SlackWebhookOperator execute method internally
"""
if ENV == "datatst":
return
if context["dag_run"].external_trigger is True:
return
if context["dag"].is_paused is True:
if _should_skip_alert(context):
return

slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:large_blue_circle: Task Succeeded!
*Task*: {task}
Expand All @@ -45,8 +50,7 @@ def task_success_slack_alert(context):

success_alert = SlackWebhookOperator(
task_id="slack_test",
http_conn_id="slack",
webhook_token=slack_webhook_token,
slack_webhook_conn_id=SLACK_CONN_ID,
message=slack_msg,
username="airflow",
)
Expand All @@ -62,14 +66,9 @@ def task_fail_slack_alert(context):
Returns:
None: Calls the SlackWebhookOperator execute method internally
"""
if ENV == "datatst":
return
if context["dag_run"].external_trigger is True:
return
if context["dag"].is_paused is True:
if _should_skip_alert(context):
return

slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:red_circle: Task Failed.
*Task*: {task}
Expand All @@ -88,8 +87,7 @@ def task_fail_slack_alert(context):

failed_alert = SlackWebhookOperator(
task_id=context["task_instance"].task_id,
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
slack_webhook_conn_id=SLACK_CONN_ID,
message=slack_msg,
username="airflow",
)
Expand Down
Loading