From 5c80948c09d6b481b94c858dcd299e4e31867caa Mon Sep 17 00:00:00 2001 From: Mohamad Hallak <16711801+mrhallak@users.noreply.github.com> Date: Mon, 18 May 2026 10:43:00 +0200 Subject: [PATCH] Fix Slack failure callbacks under Airflow 3 / providers-slack 9.x Two callback paths were broken on the live Airflow 3.2.1 deploy: 1. SlackWebhookOperator signature changed in providers-slack 9.x. - `webhook_token` and `http_conn_id` kwargs are gone. - Replaced with `slack_webhook_conn_id`. - The webhook URL now lives in the connection's password field; the operator's hook reads it. The explicit `BaseHook.get_connection(...).password` lookup is removed. 2. DagRun.external_trigger was removed in Airflow 3. - Replaced with `dag_run.run_type == DagRunType.MANUAL`, which preserves the original intent ("don't alert for manually triggered runs"). Also: `context["dag"].is_paused` is not exposed on the SDK DAG object in Airflow 3 callback context. Guarded with `getattr` so the check no-ops cleanly if the attribute is missing. Why this was invisible until now: imports still succeed (the operator class still exists, just its `__init__` signature differs), so DAG parsing is fine. The callback only runs on task failure, at which point it raises TypeError, the original task is already failed, and the only sign is a callback error in scheduler/dag-processor logs. --- dagger/alerts/alert.py | 5 +-- .../dag_creator/airflow/utils/slack_alerts.py | 32 +++++++++---------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/dagger/alerts/alert.py b/dagger/alerts/alert.py index 8cc2b08..893b150 100644 --- a/dagger/alerts/alert.py +++ b/dagger/alerts/alert.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod from typing import List +from airflow.utils.types import DagRunType from slack.web.client import WebClient from dagger import conf from dagger.utilities.config_validator import Attribute, ConfigValidator @@ -98,9 +99,9 @@ def get_task_run_time(task_instance): def airflow_task_fail_alerts(alerts: List[AlertBase], context): if conf.ENV == "datatst": return - if context["dag_run"].external_trigger is True: + if context["dag_run"].run_type == DagRunType.MANUAL: return - if context["dag"].is_paused is True: + if getattr(context["dag"], "is_paused", False): return task_instance = context["task_instance"] diff --git a/dagger/dag_creator/airflow/utils/slack_alerts.py b/dagger/dag_creator/airflow/utils/slack_alerts.py index 86f4338..3da62f3 100644 --- a/dagger/dag_creator/airflow/utils/slack_alerts.py +++ b/dagger/dag_creator/airflow/utils/slack_alerts.py @@ -1,7 +1,7 @@ import os from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator -from airflow.hooks.base import BaseHook +from airflow.utils.types import DagRunType SLACK_CONN_ID = "slack" ENV = os.environ["ENV"].lower() @@ -11,6 +11,16 @@ def get_task_run_time(task_instance): return (task_instance.end_date - task_instance.start_date).total_seconds() +def _should_skip_alert(context): + if ENV == "datatst": + return True + if context["dag_run"].run_type == DagRunType.MANUAL: + return True + if getattr(context["dag"], "is_paused", False): + return True + return False + + def task_success_slack_alert(context): """ Callback task that can be used in DAG to alert of successful task completion @@ -19,14 +29,9 @@ def task_success_slack_alert(context): Returns: None: Calls the SlackWebhookOperator execute method internally """ - if ENV == "datatst": - return - if context["dag_run"].external_trigger is True: - return - if context["dag"].is_paused is True: + if _should_skip_alert(context): return - slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password slack_msg = """ :large_blue_circle: Task Succeeded! *Task*: {task} @@ -45,8 +50,7 @@ def task_success_slack_alert(context): success_alert = SlackWebhookOperator( task_id="slack_test", - http_conn_id="slack", - webhook_token=slack_webhook_token, + slack_webhook_conn_id=SLACK_CONN_ID, message=slack_msg, username="airflow", ) @@ -62,14 +66,9 @@ def task_fail_slack_alert(context): Returns: None: Calls the SlackWebhookOperator execute method internally """ - if ENV == "datatst": - return - if context["dag_run"].external_trigger is True: - return - if context["dag"].is_paused is True: + if _should_skip_alert(context): return - slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password slack_msg = """ :red_circle: Task Failed. *Task*: {task} @@ -88,8 +87,7 @@ def task_fail_slack_alert(context): failed_alert = SlackWebhookOperator( task_id=context["task_instance"].task_id, - http_conn_id=SLACK_CONN_ID, - webhook_token=slack_webhook_token, + slack_webhook_conn_id=SLACK_CONN_ID, message=slack_msg, username="airflow", )