Skip to content

Airflow 3.2: fix Slack failure callbacks (companion to dataeng-airflow #250)#70

Draft
mrhallak wants to merge 1 commit into
masterfrom
feature/airflow-3-slack-callback-fix
Draft

Airflow 3.2: fix Slack failure callbacks (companion to dataeng-airflow #250)#70
mrhallak wants to merge 1 commit into
masterfrom
feature/airflow-3-slack-callback-fix

Conversation

@mrhallak
Copy link
Copy Markdown

Summary

Companion to dataeng-airflow#250 (Airflow 3.2.1 retry). Fixes the runtime breakages in dagger's Slack callback paths that surfaced after the upgrade.

The earlier Airflow 3.x prep (PR #69 — imports migration + schedule_intervalschedule) is already on dagger master and pinned in dataeng-airflow. This PR closes the remaining gaps.

What broke

1. SlackWebhookOperator signature change (apache-airflow-providers-slack==9.10.0, live container verified):

SlackWebhookOperator.__init__(*, slack_webhook_conn_id, message='', attachments=None,
                              blocks=None, channel=None, username=None, ...)
  • webhook_token and http_conn_id kwargs are gone.
  • slack_webhook_conn_id is now required.
  • The hook reads the webhook URL from the connection's password field directly, so the explicit BaseHook.get_connection(...).password lookup is removed.

2. DagRun.external_trigger removed in Airflow 3. The live container reports only ['run_type', 'triggered_by', 'triggering_user_name']. Use run_type == DagRunType.MANUAL to preserve the original intent.

3. context["dag"].is_paused not exposed on the SDK DAG class in Airflow 3 callback context. Guarded with getattr(..., False) so the check no-ops cleanly when the attribute is missing.

Why this was invisible until now

Imports still succeed (SlackWebhookOperator class exists, just its __init__ differs), so DAG parsing passes. The callback only runs on task failure, at which point it raises TypeError; the original task is already in failed state, and the only signal is a callback error buried in scheduler/dag-processor logs. We hit it during the 3.2.1 redeploy validation on datatst.

Files

  • dagger/dag_creator/airflow/utils/slack_alerts.py — both task_success_slack_alert and task_fail_slack_alert. Extracted the three skip conditions into _should_skip_alert to deduplicate.
  • dagger/alerts/alert.pyairflow_task_fail_alerts had the same external_trigger / is_paused checks; fixed identically.

Connection setup (operational note)

For the operator to send messages, the slack connection in Airflow must have the webhook URL stored in the password field (no other fields needed). The existing dataeng-airflow-slack_token secret loads into the SLACK_TOKEN env var, which docker-entrypoint converts into a connection — verify that conversion still puts the value in password.

Test plan

  • Branched from origin/master (which already has the Airflow 3 import migration merged)
  • Unit tests pass locally (pytest)
  • CI green
  • Once merged, bump dagger pin in dataeng-airflow#250's docker/requirements.txt and redeploy datatst
  • Trigger a real DAG failure on datatst (after temporarily removing the ENV == "datatst" early-return, or testing on datastg) and confirm a Slack message lands in the alerts channel

Two callback paths were broken on the live Airflow 3.2.1 deploy:

1. SlackWebhookOperator signature changed in providers-slack 9.x.
   - `webhook_token` and `http_conn_id` kwargs are gone.
   - Replaced with `slack_webhook_conn_id`.
   - The webhook URL now lives in the connection's password field;
     the operator's hook reads it. The explicit
     `BaseHook.get_connection(...).password` lookup is removed.

2. DagRun.external_trigger was removed in Airflow 3.
   - Replaced with `dag_run.run_type == DagRunType.MANUAL`, which
     preserves the original intent ("don't alert for manually
     triggered runs").

Also: `context["dag"].is_paused` is not exposed on the SDK DAG object
in Airflow 3 callback context. Guarded with `getattr` so the check
no-ops cleanly if the attribute is missing.

Why this was invisible until now: imports still succeed (the operator
class still exists, just its `__init__` signature differs), so DAG
parsing is fine. The callback only runs on task failure, at which
point it raises TypeError, the original task is already failed, and
the only sign is a callback error in scheduler/dag-processor logs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant