Skip to content

Eng 3477/sqs queues#7995

Draft
rayharnett wants to merge 28 commits intomainfrom
ENG-3477/sqs-queues
Draft

Eng 3477/sqs queues#7995
rayharnett wants to merge 28 commits intomainfrom
ENG-3477/sqs-queues

Conversation

@rayharnett
Copy link
Copy Markdown

@rayharnett rayharnett commented Apr 22, 2026

Ticket ENG-3477

Description Of Changes

Migrate Fides' Celery task queue broker from Redis to Amazon SQS, eliminating Redis as a hard dependency for task queuing. Redis continues to serve as the application cache and Celery result backend; only the Celery broker role moves to SQS. The migration is controlled by the feature flag FIDES__QUEUE__USE_SQS_QUEUE (default false) so it can be toggled at runtime without a redeploy.

A zero-loss startup migration drains pending tasks from Redis queues and re-enqueues them to SQS during cutover, using a distributed SET NX EX lock to prevent duplicate processing.

Additionally adds a Queue Monitor page to the admin UI (/monitor/queues) for real-time visibility into SQS queue depths.

Code Changes

 * `src/fides/config/queue_settings.py` (new) — QueueSettings config model with feature flag and SQS connection
   parameters (URL, region, credentials, queue name prefix)
 * `src/fides/api/tasks/broker.py` (new) — BrokerURLFactory that produces the correct Celery broker URL and transport
   options (including predefined_queues mapping) based on active config
 * `src/fides/api/tasks/queue_migration.py` (new) — migrate_redis_queues_to_sqs function that drains pending Redis
   tasks and re-enqueues them to SQS with distributed lock guarantee
 * `src/fides/api/util/queue_stats.py` (new) — SQSQueueStatsProvider that fetches queue message counts from SQS via
   GetQueueAttributes, replacing the Redis llen path
 * `src/fides/api/service/privacy_request/sqs_heartbeat.py` (new) — @sqs_heartbeat decorator and background thread
   that periodically calls ChangeMessageVisibility to prevent premature message expiration during long-running DSR
   tasks
 * `src/fides/api/tasks/__init__.py` (modified) — _create_celery updated to decouple result backend from broker
   choice, keeping Redis as result backend by default
 * `src/fides/api/app_setup.py` (modified) — Lifespan hook calls migrate_redis_queues_to_sqs on startup when SQS is
   enabled
 * `src/fides/api/main.py` (modified) — Registers the queue monitor router
 * `src/fides/api/v1/api.py` (modified) — Adds queue monitor endpoint group to v1 API
 * `src/fides/api/v1/endpoints/queue_monitor_endpoints.py` (new) — FastAPI endpoints for queue stats (GET 
   /api/v1/worker-stats extended)
 * `src/fides/api/schemas/queue_monitor.py` (new) — Pydantic schemas for queue monitor responses
 * `src/fides/config/__init__.py` (modified) — Registers queue: QueueSettings on FidesConfig
 * `src/fides/api/util/cache.py` (modified) — get_queue_counts() delegates to the new stats provider
 * `src/fides/api/service/privacy_request/request_service.py` (modified) — Early-exit guard in
   initiate_interrupted_task_requeue_poll to skip requeue poll when SQS is enabled (SQS handles heartbeating)
 * `src/fides/api/worker/__init__.py` (modified) — Worker initialization updated for SQS transport
 * `src/fides/cli/__main__.py` (modified) — CLI updates for SQS configuration
 * `src/fides/api/task/execute_request_tasks.py` (modified) — DSR task functions decorated with @sqs_heartbeat
 * Admin UI — New QueueMonitorTable component, queue monitor slice, routes, and types (/monitor/queues)
 * `docker-compose.yml` — Added elasticmq service (port 9324 API) with pre-created queues
 * `docker/elasticmq/elasticmq.conf` (new) — ElasticMQ config with all 10 Celery queues pre-created 
 * `tests/task/test_broker.py` (new) — Tests for BrokerURLFactory and TestSQSQueueStatsProvider
 * `tests/task/test_queue_migration.py` (new) — Tests for migrate_redis_queues_to_sqs
 * `tests/task/test_sqs_heartbeat.py` (new) — Tests for @sqs_heartbeat decorator and heartbeat thread
 * `tests/task/test_lifespan_migration.py` (new) — Integration tests for startup migration hook
 * `tests/ctl/core/test_queue_settings.py` (new) — Tests for QueueSettings field defaults and env var overrides
 * `tests/ops/service/privacy_request/test_request_service.py` (modified) — Tests for SQS early-exit guard
 * `tests/ops/api/v1/endpoints/test_queue_monitor_endpoints.py` (new) — Tests for queue monitor endpoints
 * `clients/admin-ui/...` — Queue monitor UI components and tests

Steps to Confirm

 1. Redis mode (default): Start the app with default config (no env vars). Verify Celery workers connect to Redis and
    DSR tasks execute normally. Confirm GET /api/v1/worker-stats returns queue counts from Redis.
 2. SQS mode: Set FIDES__QUEUE__USE_SQS_QUEUE=true along with SQS connection env vars (or
    FIDES__QUEUE__SQS_URL=http://localhost:9324 for local ElasticMQ). Verify startup migration completes (or skips if
    no pending Redis tasks), workers connect to SQS, and tasks are consumed from SQS queues.
 3. Queue Monitor UI: Navigate to /monitor/queues in the admin UI. Verify queue depths display correctly for active
    queues.
 4. Heartbeat: Trigger a long-running DSR task in SQS mode. Confirm the SQS heartbeat background thread keeps messages visible until task completion.
 5. Graceful degradation: Stop ElasticMQ (or disable SQS) while the app is running. Verify the app continues to operate
     and logs appropriate warnings.

Pre-Merge Checklist

  • Issue requirements met
  • All CI pipelines succeeded
  • CHANGELOG.md updated
    • Add a db-migration This indicates that a change includes a database migration label to the entry if your change includes a DB migration
    • Add a high-risk This issue suggests changes that have a high-probability of breaking existing code label to the entry if your change includes a high-risk change (i.e. potential for performance impact or unexpected regression) that should be flagged
    • Updates unreleased work already in Changelog, no new entry necessary
  • UX feedback:
    • All UX related changes have been reviewed by a designer
    • No UX review needed
  • Followup issues:
    • Followup issues created
    • No followup issues
  • Database migrations:
    • Ensure that your downrev is up to date with the latest revision on main
    • Ensure that your downgrade() migration is correct and works
      • If a downgrade migration is not possible for this change, please call this out in the PR description!
    • No migrations
  • Documentation:
    • Documentation complete, PR opened in fidesdocs
    • Documentation issue created in fidesdocs
    • If there are any new client scopes created as part of the pull request, remember to update public-facing documentation that references our scope registry
    • No documentation updates required

@vercel
Copy link
Copy Markdown
Contributor

vercel Bot commented Apr 22, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
fides-plus-nightly Ready Ready Preview, Comment Apr 23, 2026 0:57am
1 Skipped Deployment
Project Deployment Actions Updated (UTC)
fides-privacy-center Ignored Ignored Apr 23, 2026 0:57am

Request Review

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 22, 2026

Dependency Review

The following issues were found:
  • ✅ 0 vulnerable package(s)
See the Details below.

Snapshot Warnings

⚠️: No snapshots were found for the head SHA 7c6f073.
Ensure that dependencies are being submitted on PR branches and consider enabling retry-on-snapshot-warnings. See the documentation for more information and troubleshooting advice.

Scanned Files

  • clients/admin-ui/package.json
  • clients/package-lock.json
  • uv.lock

Comment thread src/fides/api/tasks/broker.py Fixed
Comment thread src/fides/api/util/queue_stats.py Fixed
Comment thread tests/task/test_broker.py Fixed
Comment thread tests/ctl/core/test_dataset.py Fixed
Comment thread tests/task/test_queue_migration.py Fixed
Comment thread tests/task/test_queue_migration.py Fixed
Comment thread tests/ops/api/v1/endpoints/test_queue_monitor_endpoints.py Fixed
Comment thread tests/ops/api/v1/endpoints/test_queue_monitor_endpoints.py Fixed
Comment thread tests/util/test_queue_stats.py Fixed
Comment thread tests/ctl/core/test_dataset.py Fixed
Comment thread tests/task/test_queue_migration.py Fixed
Comment thread tests/ops/api/v1/endpoints/test_queue_monitor_endpoints.py Fixed
Comment thread tests/ops/api/v1/endpoints/test_queue_monitor_endpoints.py Fixed
Comment thread src/fides/api/tasks/broker.py Fixed
Comment thread src/fides/api/util/queue_stats.py Dismissed
"fides.api.tasks.queue_migration.get_sqs_client",
return_value=sqs_client,
)
expect_exception = False
"fides.api.tasks.queue_migration.get_sqs_client",
return_value=sqs_client,
)
expect_exception = False
"fides.api.tasks.queue_migration.get_sqs_client",
return_value=sqs_client,
)
expect_exception = False # per-queue errors are swallowed
"fides.api.tasks.queue_migration.get_sqs_client",
side_effect=RuntimeError("cannot build sqs client"),
)
expect_exception = False # SQS build errors are swallowed
from typing import Any, Dict, List, Optional
from unittest.mock import MagicMock, patch

import pytest
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 23, 2026

Codecov Report

❌ Patch coverage is 41.57783% with 274 lines in your changes missing coverage. Please review.
✅ Project coverage is 82.53%. Comparing base (4d36209) to head (7c6f073).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/fides/api/tasks/queue_migration.py 0.00% 76 Missing ⚠️
...fides/api/service/privacy_request/sqs_heartbeat.py 19.51% 65 Missing and 1 partial ⚠️
src/fides/api/tasks/broker.py 45.00% 59 Missing and 7 partials ⚠️
src/fides/api/util/queue_stats.py 57.33% 31 Missing and 1 partial ⚠️
src/fides/api/app_setup.py 11.11% 15 Missing and 1 partial ⚠️
src/fides/api/tasks/__init__.py 48.27% 8 Missing and 7 partials ⚠️
src/fides/cli/__main__.py 0.00% 2 Missing ⚠️
src/fides/api/util/cache.py 75.00% 1 Missing ⚠️

❌ Your patch check has failed because the patch coverage (41.57%) is below the target coverage (100.00%). You can increase the patch coverage or adjust the target coverage.
❌ Your project check has failed because the head coverage (82.53%) is below the target coverage (85.00%). You can increase the head coverage or adjust the target coverage.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #7995      +/-   ##
==========================================
- Coverage   84.97%   82.53%   -2.44%     
==========================================
  Files         631      639       +8     
  Lines       41239    41680     +441     
  Branches     4787     4844      +57     
==========================================
- Hits        35041    34402     -639     
- Misses       5113     6158    +1045     
- Partials     1085     1120      +35     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions
Copy link
Copy Markdown

Title Lines Statements Branches Functions
admin-ui Coverage: 8%
6.38% (2815/44117) 5.62% (1411/25078) 4.47% (584/13048)
fides-js Coverage: 78%
78.98% (1962/2484) 65.55% (1214/1852) 72.57% (336/463)
privacy-center Coverage: 88%
85.97% (331/385) 81.36% (179/220) 78.87% (56/71)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant