dj_queue is a database-backed task queue backend for the django.tasks framework.
It keeps the queue, live execution state, runtime metadata, and task results in your database.
- no Redis, RabbitMQ, or separate result store
- PostgreSQL is the first-class production backend
- MySQL 8+, MariaDB 10.6+, and SQLite are supported
- immediate, scheduled, recurring, and concurrency-limited work
dj_queue is inspired by Rails' Solid Queue,
shaped to fit Django's task backend API.
Django applications already depend on the database as the durable system of
record. dj_queue lets background work follow the same model.
It has a narrow, explicit shape:
- application code uses Django's
@taskAPI DjQueueBackendstores jobs and results in Django-managed tables- workers, dispatchers, and schedulers all share one operations layer
- PostgreSQL can use
LISTEN/NOTIFYandSKIP LOCKEDas optimizations - polling remains the correctness path on every supported database
For detailed comparisons with Celery, RQ, Procrastinate, and other alternatives, see COMPARISONS.md.
dj_queue requires Python 3.12+ and Django 6.0+.
Install the package:
pip install dj-queueOptional extras:
pip install "dj-queue[postgres]" # psycopg for PostgreSQL + LISTEN/NOTIFY
pip install "dj-queue[prometheus]" # prometheus_client for /dj_queue/metricsAdd dj_queue to INSTALLED_APPS, register the router, and point Django's task
backend at DjQueueBackend:
# settings.py
INSTALLED_APPS = [
# ...
"dj_queue",
]
DATABASE_ROUTERS = ["dj_queue.routers.DjQueueRouter"]
TASKS = {
"default": {
"BACKEND": "dj_queue.backend.DjQueueBackend",
"QUEUES": [],
"OPTIONS": {},
},
}The router is optional when using the default database, but harmless to include and required for multi-database setups.
dj_queue can coexist with other Django task backends in the same TASKS
setting. It only manages aliases whose BACKEND is
"dj_queue.backend.DjQueueBackend". If a TASKS alias points at some other
backend, dj_queue ignores that alias for runtime commands, admin/dashboard
selection, and observability.
Run migrations:
python manage.py migrateDefine a task with Django's @task decorator:
# myapp/tasks.py
from django.tasks import task
@task
def add(a, b):
return a + bStart the dj_queue runtime in one terminal:
python manage.py dj_queueThen enqueue work from another terminal or from your application code:
from myapp.tasks import add
task_result = add.enqueue(3, 7)
print(task_result.id)Read the result back through Django's task backend API:
from myapp.tasks import add
fresh_result = add.get_backend().get_result(task_result.id)
print(fresh_result.status)
print(fresh_result.return_value)When the worker has executed the job, fresh_result.return_value will be 10.
If Django admin is installed, dj_queue adds an operator dashboard at
/admin/dj_queue/dashboard/.
- queue, process, recurring-task, and semaphore overview
- backend-aware dashboard and raw changelists
- queue controls: pause, resume, clear ready
- job actions: enqueue a fresh copy of any stored job
- failed jobs: retry and discard from list and detail views
- unschedule dynamic recurring tasks
- queue drill-down pages for state-specific inspection
Dashboard overview
Queue drill-down
Use run_after to keep work out of the ready queue until a future time:
from datetime import timedelta
from django.utils import timezone
from myapp.tasks import send_digest
future = timezone.now() + timedelta(hours=1)
send_digest.using(run_after=future).enqueue("daily")Use priority and queue_name on the task call itself:
from myapp.tasks import deliver_email
deliver_email.using(queue_name="email", priority=10).enqueue("welcome")
deliver_email.using(queue_name="email", priority=-5).enqueue("digest")Use enqueue_all() when you need one backend call to submit many jobs:
from myapp.tasks import process_item
results = process_item.get_backend().enqueue_all(
[(process_item, [item_id], {}) for item_id in range(5)]
)enqueue() writes immediately and returns a real persisted task result ID. If a
task depends on rows that are still inside the current transaction, use
enqueue_on_commit():
from django.db import transaction
from dj_queue.api import enqueue_on_commit
from myapp.tasks import send_receipt
with transaction.atomic():
order = create_order()
enqueue_on_commit(send_receipt, order.id)dj_queue does not defer inserts implicitly or return placeholder result IDs for
uncommitted work. Use the helper above or transaction.on_commit() directly
when the job must not exist before commit.
The repository ships real runnable examples in examples/.
Recommended entry points:
- examples/ex01_basic_enqueue.py
- examples/ex07_basic_enqueue_on_commit.py
- examples/ex08_basic_recurring.py
- examples/ex20_advanced_concurrency.py
- examples/ex21_advanced_queue_control.py
- examples/ex24_advanced_multi_db.py
- examples/ex25_advanced_asgi.py
The examples index lists the full progression.
python manage.py dj_queue starts a supervisor for one backend alias.
Job lifecycle:
enqueue -> ready | scheduled | blocked -> claimed -> successful | failed
The runtime has four moving parts:
supervisor: boots and stops the runtimeworkers: claim ready jobs and execute themdispatchers: promote due scheduled jobs and run concurrency maintenancescheduler: enqueue recurring tasks and finished-job cleanup when configured
Useful command variants:
python manage.py dj_queue
python manage.py dj_queue --mode async
python manage.py dj_queue --backend <alias>
python manage.py dj_queue --only-work
python manage.py dj_queue --only-dispatch
python manage.py dj_queue --skip-recurringNotes:
forkis the default standalone modeasyncis also supported as a standalone mode and runs supervised actors in threads inside one process--backendtargets a non-default backend alias--only-workstarts workers without dispatchers or scheduler--only-dispatchstarts dispatchers without workers or scheduler--skip-recurringstarts without the scheduler
fork runs each worker, dispatcher, and scheduler as a separate OS process.
async runs them as threads in one process, i.e., lower memory, less isolation.
Default is fork. Use standalone async when you want one-process supervision
with lower memory use and less isolation, or embedded async when dj_queue
should live inside an ASGI or Gunicorn server process.
In async mode, worker processes > 1 is ignored and normalized to 1.
- within one selected queue, higher numeric
priorityis claimed first - across multiple queue selectors, selector order wins
"*"matches all queues- selectors ending in
*match queue prefixes such asemail*
For example, a worker configured with queues: ["email", "default"] will
prefer ready work from email before default, even if default contains
higher-priority rows.
If you combine queue order with priorities, queue selector order still wins across queues. Prefer one primary scheduling mechanism per worker when you can.
In standalone mode, both fork and async python manage.py dj_queue supervisors own runtime signal handling:
SIGTERMandSIGINTrequest graceful shutdownSIGQUITtakes the immediate hard-exit pathshutdown_timeoutcontrols how long the runtime waits for in-flight work to drainsupervisor_pidfilecan prevent duplicate standalone supervisors on one host
Runners heartbeat into the queue database. If claimed work is left behind,
dj_queue preserves it as failed work that operators can inspect and retry:
ProcessExitError: a supervised runner exited unexpectedlyProcessPrunedError: a runner heartbeat expired and the process was prunedProcessMissingError: claimed work was found without its registered process
Use python manage.py dj_queue_health to check whether any fresh runtime
process rows exist for a backend.
Job payloads and persisted return values are stored in JSON columns, so they must be JSON round-trippable.
- enqueueing args or kwargs that cannot round-trip through JSON fails immediately
- returning a non-JSON-serializable value marks the job failed instead of leaving it claimed forever
If you need to pass model instances, files, or custom objects, store them elsewhere and pass identifiers or serialized data instead.
| Backend | Support level | Notes |
|---|---|---|
| PostgreSQL | first-class | polling, SKIP LOCKED, and optional LISTEN/NOTIFY |
| MySQL 8+ | supported | polling plus SKIP LOCKED |
| MariaDB 10.6+ | supported | polling plus SKIP LOCKED |
| SQLite | supported with limits | polling only, serialized writes, no SKIP LOCKED, no LISTEN/NOTIFY; practical for development, CI, and smaller deployments |
For MySQL or MariaDB, install and configure a Django-compatible driver following Django's database docs.
Polling is the portability path everywhere. Backend-specific features improve latency and throughput but are not correctness requirements.
For production PostgreSQL operational guidance, see Postgres Queue Health.
dj_queue supports both static recurring tasks from settings and dynamic
recurring tasks managed at runtime.
Define recurring tasks in TASKS[...]["OPTIONS"]["recurring"]:
TASKS = {
"default": {
"BACKEND": "dj_queue.backend.DjQueueBackend",
"QUEUES": [],
"OPTIONS": {
"recurring": {
"nightly_cleanup": {
"task_path": "myapp.tasks.cleanup",
"schedule": "0 3 * * *",
"queue_name": "maintenance",
"priority": -5,
"description": "nightly cleanup",
},
},
},
},
}Create, update, and remove recurring tasks at runtime:
from dj_queue.api import schedule_recurring_task, unschedule_recurring_task
schedule_recurring_task(
key="tenant_42_report",
task_path="myapp.tasks.send_report",
schedule="0 * * * *",
queue_name="reports",
priority=5,
)
unschedule_recurring_task("tenant_42_report")Dynamic recurring tasks require
TASKS[backend_alias]["OPTIONS"]["scheduler"]["dynamic_tasks_enabled"] = True
or the equivalent scheduler.dynamic_tasks_enabled: true in the optional YAML
config.
The scheduler is part of the normal dj_queue runtime. You do not run a
separate recurring service.
Notes:
- schedules are cron expressions
- recurring task keys are scoped per backend alias
- only dynamic tasks can be unscheduled at runtime; unscheduling a static task returns
0 - Django admin exposes the same unschedule operation on recurring-task list and detail views
- multiple schedulers sharing the same recurring config dedupe firing in the database
- finished-job cleanup runs as internal scheduler maintenance when
preserve_finished_jobs=Trueandclear_finished_jobs_afteris set - failed-job cleanup can run as internal scheduler maintenance when
clear_failed_jobs_afteris set - recurring execution reservation cleanup can run as internal scheduler maintenance when
clear_recurring_executions_afteris set
Tasks can opt into database-backed concurrency limits.
django.tasks has no standard way to pass backend-specific options through the
@task decorator, so dj_queue reads them as attributes on the wrapped function:
from django.tasks import task
@task
def sync_account(account_id, action):
return f"{account_id}:{action}"
sync_account.func.concurrency_key = "account:{account_id}"
sync_account.func.concurrency_limit = 1
sync_account.func.concurrency_duration = 60
sync_account.func.on_conflict = "block"With this configuration:
- the first matching job can run immediately
- later jobs for the same key can block until capacity is released
on_conflict = "discard"turns the same pattern into singleton-style work
Semaphore rows remain shared on the queue database. If you want per-backend
isolation for a limit, express that in the concurrency_key itself rather than
expecting one semaphore namespace per backend alias.
QueueInfo exposes operational queue controls without bypassing the queue
tables:
from dj_queue.api import QueueInfo
orders = QueueInfo("orders")
print(orders.size)
print(orders.latency)
print(orders.paused)
orders.pause()
orders.resume()
orders.clear()Notes:
- pausing a queue stops future claims, not enqueueing or already-claimed work
- pause rows are scoped per backend alias
clear()discards ready jobs only- pass
backend_alias=when you want to target a non-defaultTASKSalias
Operational commands:
python manage.py dj_queue_health
python manage.py dj_queue_health --max-age 120
python manage.py dj_queue_prune --older-than 86400
python manage.py dj_queue_prune --failed-older-than 604800
python manage.py dj_queue_prune --recurring-older-than 2592000
python manage.py dj_queue_prune --task-path myapp.tasks.cleanup
python manage.py dj_queue_prune --task-key nightly_cleanupThe health and prune commands both accept --backend to target a non-default backend alias.
For dj_queue_prune, --task-path filters finished and failed job cleanup by
task import path, while --task-key filters recurring execution cleanup by
recurring task key.
When a task raises, dj_queue keeps the job and its failed execution row in the
queue database, including the exception class, message, and traceback.
You can retry and discard failed jobs through Django admin, and any raw job detail page can enqueue a fresh copy of that stored job. The failed-job actions also stay available directly through the operations layer:
from dj_queue.operations.jobs import discard_failed_job, retry_failed_job
retry_failed_job(job_id)
discard_failed_job(job_id)Model helpers are available too:
from dj_queue.exceptions import UndiscardableError
from dj_queue.models import ClaimedExecution, FailedExecution
failed = FailedExecution.objects.get(job_id=job_id)
failed.retry()
failed.discard()
FailedExecution.retry_all(FailedExecution.objects.order_by("job_id"))
FailedExecution.discard_all_in_batches()
try:
ClaimedExecution.discard_all_in_batches()
except UndiscardableError:
passFailures stay inspectable until you act on them.
DjQueueBackend.enqueue() raises dj_queue.exceptions.EnqueueError for
backend-side validation failures instead of silently dropping work.
Common reasons include:
- args or kwargs are not JSON round-trippable
concurrency_keyis set withoutconcurrency_limitconcurrency_keycannot be resolved from the enqueue argumentsconcurrency_keydoes not resolve to a non-empty string up to 255 charson_conflictis not"block"or"discard"
from dj_queue.exceptions import EnqueueError
try:
sync_account.enqueue(account_id, "refresh")
except EnqueueError as exc:
handle_enqueue_error(exc)Task execution errors are different: they become failed jobs and stay inspectable in the queue database.
Register hooks before starting the runtime, typically during Django startup. Each callback receives the live supervisor or runner instance.
from dj_queue.hooks import on_start, on_worker_start, register_hook
@on_start
def supervisor_started(process):
print(process.name)
@on_worker_start
def worker_started(process):
print(process.metadata)
@register_hook("scheduler.exit")
def scheduler_exited(process):
print(process.name)Available hook helpers:
- supervisor:
on_start,on_stop,on_exit - worker:
on_worker_start,on_worker_stop,on_worker_exit - dispatcher:
on_dispatcher_start,on_dispatcher_stop,on_dispatcher_exit - scheduler:
on_scheduler_start,on_scheduler_stop,on_scheduler_exit - generic events:
register_hook("worker.start"),register_hook("dispatcher.stop"), and so on
Notes:
- hooks fire in registration order
- hook failures do not block later hooks
- hook failures are isolated and routed through
on_thread_error
dj_queue can keep queue tables on a dedicated database alias:
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"NAME": "app",
},
"queue": {
"ENGINE": "django.db.backends.postgresql",
"NAME": "queue",
},
}
DATABASE_ROUTERS = ["dj_queue.routers.DjQueueRouter"]
TASKS = {
"default": {
"BACKEND": "dj_queue.backend.DjQueueBackend",
"QUEUES": [],
"OPTIONS": {
"database_alias": "queue",
},
},
}Run your normal application migrations on default, then migrate dj_queue
onto the queue database:
python manage.py migrate
python manage.py migrate dj_queue --database queueWith this setup, dj_queue's ORM queries and raw SQL helpers stay on the queue
database.
Projects can mix dj_queue with other Django task backends in the same TASKS mapping:
TASKS = {
"default": {
"BACKEND": "dj_queue.backend.DjQueueBackend",
"QUEUES": [],
"OPTIONS": {
"database_alias": "queue",
},
},
"external": {
"BACKEND": "some_other_backend.Backend",
"QUEUES": [],
"OPTIONS": {},
},
}In that setup:
- jobs with
backend="default"aredj_queue's responsibility - jobs with
backend="external"are the other backend's responsibility dj_queueadmin, dashboard,/dj_queue/stats.json,/dj_queue/metrics, andmanage.py dj_queue --backend ...only operate ondj_queuealiases
Notes:
- if
TASKSis empty or unset,dj_queuestill exposes one implicitdefaultalias using built-in defaults - if
TASKSis non-empty,dj_queueonly manages aliases whoseBACKENDis explicitly"dj_queue.backend.DjQueueBackend"
Operational and configuration guidance for scaling with dj_queue in
production PostgreSQL deployments, covering dedicated database setup, retention
policy, and autovacuum tuning.
- Use a dedicated queue database via
database_alias. Keep reporting and long-running transactions off the queue database. - Keep retention short. Set
preserve_finished_jobs = Falseif you do not need successful results. Otherwise use boundedclear_finished_jobs_after,clear_failed_jobs_after, andclear_recurring_executions_aftervalues. - Run
python manage.py dj_queue_pruneregularly for stricter cleanup. - Keep
use_skip_locked = Trueandlisten_notify = Trueunless you have a specific reason not to. - Tune autovacuum for
dj_queue_jobsand the high-churndj_queue_*_executionstables, often default OLTP settings are too conservative for queue workloads. - Keep transactions short across workers and the rest of your app. Long-lived transactions pin dead tuples and delay vacuum.
- Monitor dead tuples, autovacuum frequency, and long-running queries before reaching for partitioning or bulk-ingest paths.
dj_queue can run inside an existing server process via embedded async
supervision.
Wrap your ASGI application with DjQueueLifespan:
from django.core.asgi import get_asgi_application
from dj_queue.contrib.asgi import DjQueueLifespan
django_application = get_asgi_application()
application = DjQueueLifespan(django_application)Import the provided hooks in your Gunicorn config:
# gunicorn.conf.py
from dj_queue.contrib.gunicorn import post_fork, worker_exitBoth embedded integrations use AsyncSupervisor(standalone=False) and leave
signal handling to the host server.
dj_queue has three separate routing concepts. Keep them distinct:
queue_name: what kind of work this job is. Use it to route lanes inside one backend, such asemail,webhooks, orsearch-index.backend_alias: which logical queue system owns the work. Use it when you want separate runtime config, recurring tasks, pause and process visibility, retention, or admin scoping.database_alias: where that backend's queue tables and runtime activity live. Use it when you want a dedicated database connection path or stronger storage isolation.
Common setup choices:
- one backend, one database: simplest and usually enough
- one backend, separate queue database: good when you want dedicated queue connections
- multiple backends, same database: good for logical and operational separation without another database
- multiple backends, multiple databases: use when you need stronger isolation and accept more migration and deployment complexity
Once migrations are in place, start processing jobs with python manage.py dj_queue
on the machine that should do the work. With the default configuration, this
starts the supervisor, workers, dispatcher, and scheduler for the default
backend alias and processes all queues.
For most deployments, start with a standalone dj_queue process. Reach for a
dedicated queue database before you reach for embedded mode.
- single database, standalone process: easiest way to start. Use the app
database and run
python manage.py dj_queue - dedicated queue database: recommended production default. Keep queue tables
and runtime traffic on
database_alias. See Multi-Database Setup - embedded server mode: run
dj_queueinside ASGI or Gunicorn when you want queue execution colocated with the server process. See Embedded Server Mode
For small deployments, running dj_queue on the same machine as the web server
is often enough. When you need more capacity, multiple machines can point at
the same queue database. Full python manage.py dj_queue instances coordinate
through database locking, so workers and dispatchers share load safely and
recurring firing stays deduplicated across schedulers.
In practice, keep recurring settings identical on every full node and prefer one
full instance plus additional python manage.py dj_queue --only-work nodes.
Add --only-dispatch nodes only when you need more scheduled-job promotion or
concurrency-maintenance throughput.
The main configuration lives in TASKS[backend_alias]["OPTIONS"].
Start with these options:
mode:"fork"or"async"workers: queue selectors, thread counts, and process countsdispatchers: scheduled promotion and concurrency maintenance settingsscheduler: dynamic recurring polling settingsdatabase_alias: database alias for queue tables and runtime activitypreserve_finished_jobsandclear_finished_jobs_after: successful result retention and cleanupclear_failed_jobs_after: optional failed-job retention windowclear_recurring_executions_after: optional recurring reservation retention window
Additional operational tuning is available when needed:
use_skip_locked: useSKIP LOCKEDwhen the active backend supports itlisten_notify: PostgreSQL-only worker wakeup optimization layered on top of pollingsilence_polling: suppressdj_queue's own poll-cycle noise without mutating Django's global SQL loggerprocess_heartbeat_intervalandprocess_alive_threshold: process liveness reporting and stale-runner detectionshutdown_timeout: graceful drain window before standalone shutdown gives up on waitingsupervisor_pidfile: optional pidfile guard for standalone supervisorson_thread_error: dotted callback path for runtime infrastructure exceptions
On PostgreSQL, listen_notify uses the same Django PostgreSQL driver
configuration as the main database connection. Install a compatible driver in
your project, or use dj-queue[postgres] to pull in psycopg.
Configuration precedence is explicit:
- CLI overrides
- environment variables
- YAML file pointed to by
DJ_QUEUE_CONFIG - Django
TASKSsettings
# via cli
python manage.py dj_queue --config /etc/dj_queue.yml
# or via environment variable
DJ_QUEUE_CONFIG=/etc/dj_queue.yml python manage.py dj_queueThe YAML file is an overlay on TASKS[backend_alias]["OPTIONS"]. It supports
two shapes:
- a flat mapping of option values for the selected backend alias
- a
backendsmapping keyed by backend alias, where only the selected alias is applied
Flat mapping example:
mode: async
database_alias: queue
preserve_finished_jobs: true
clear_finished_jobs_after: 86400
clear_failed_jobs_after: null
clear_recurring_executions_after: null
listen_notify: true
silence_polling: true
workers:
- queues: ["default", "email*"]
threads: 8
processes: 1
polling_interval: 0.1
dispatchers:
- batch_size: 500
polling_interval: 1
concurrency_maintenance: true
concurrency_maintenance_interval: 600
scheduler:
dynamic_tasks_enabled: true
polling_interval: 5
recurring:
nightly_cleanup:
task_path: myapp.tasks.cleanup
schedule: "0 3 * * *"
queue_name: maintenance
priority: -5
description: nightly cleanupMulti-backend overlay example:
backends:
default:
mode: async
database_alias: default
workers:
- queues: ["default", "email*"]
threads: 8
processes: 1
polling_interval: 0.1
critical:
mode: fork
database_alias: queue
workers:
- queues: ["alerts", "critical-review"]
threads: 2
processes: 1
polling_interval: 0.05Environment overrides currently supported by dj_queue itself:
DJ_QUEUE_CONFIGDJ_QUEUE_MODEDJ_QUEUE_SKIP_RECURRING
Set on_thread_error to a dotted callable path when you want custom handling
for queue-runtime exceptions:
TASKS = {
"default": {
"BACKEND": "dj_queue.backend.DjQueueBackend",
"QUEUES": [],
"OPTIONS": {
"on_thread_error": "myapp.queue.report_runtime_error",
},
},
}The callback receives the raised exception object for background runtime issues such as hook failures, heartbeat failures, notify-watcher failures, and managed runner crashes. It is not used for exceptions raised by your task code; those become failed jobs instead.
Queue statistics are available in JSON via /dj_queue/stats.json and in
Prometheus text format via /dj_queue/metrics.
These observability endpoints only report dj_queue-managed backend aliases and
ignore aliases configured for some other task backend.
Include dj_queue.urls to expose them:
urlpatterns += [path("dj_queue/", include("dj_queue.urls"))]The /dj_queue/metrics endpoint requires the prometheus extra:
pip install "dj-queue[prometheus]"Exported metric families:
dj_queue_queue_jobs{backend,queue,state}dj_queue_queue_paused{backend,queue}dj_queue_queue_latency_seconds{backend,queue}dj_queue_queue_live_workers{backend,queue}dj_queue_runner_processes{backend,status}dj_queue_runner_processes_by_kind{backend,kind,status}dj_queue_recurring_tasks{backend}dj_queue_semaphores{queue_database}dj_queue_process_rows{backend}
Both endpoints support bearer token authentication. Set
DJ_QUEUE_OBSERVABILITY_TOKEN in settings.py and include it as
Authorization: Bearer <token>. Leave it unset if you protect these URLs at
the network or proxy layer.
MIT

