OmniQ is a Redis + Lua, language-agnostic job queue.
This package is the Python client for OmniQ v1.
Core project / docs: https://github.com/not-empty/omniq
- Hybrid lanes
- Ungrouped jobs by default
- Optional grouped jobs (FIFO per group + per-group concurrency)
- Lease-based execution
- Workers reserve a job with a time-limited lease
- Token-gated ACK / heartbeat
reserve()returns alease_tokenheartbeat()andack_*()must include the same token
- Pause / resume (flag-only)
- Pausing prevents new reserves
- Running jobs are not interrupted
- Jobs are not moved
- Admin-safe operations
- Strict
retry,retry_batch,remove,remove_batch
- Strict
- Handler-driven execution layer
ctx.execexposes internal OmniQ operations safely inside handlers
pip install omniq# importing the lib
from omniq.client import OmniqClient
# creating OmniQ passing redis information
omniq = OmniqClient(
host="omniq-redis",
port=6379,
)
# publishing the job
job_id = omniq.publish(
queue="demo",
payload={"hello": "world"},
timeout_ms=30_000
)
print("OK", job_id)from dataclasses import dataclass
from typing import List, Optional
# importing the lib
from omniq.client import OmniqClient
# Nested structure
@dataclass
class Customer:
id: str
email: str
vip: bool
# Main payload
@dataclass
class OrderCreated:
order_id: str
customer: Customer
amount: int
currency: str
items: List[str]
processed: bool
retry_count: int
tags: Optional[List[str]] = None
# creating OmniQ passing redis information
omniq = OmniqClient(
host="omniq-redis",
port=6379,
)
# creating structured payload
payload = OrderCreated(
order_id="ORD-2026-0001",
customer=Customer(
id="CUST-99",
email="leo@example.com",
vip=True,
),
amount=1500,
currency="USD",
items=["keyboard", "mouse"],
processed=False,
retry_count=0,
tags=["priority", "online"],
)
# publish using publish_json
job_id = omniq.publish_json(
queue="deno",
payload=payload,
max_attempts=5,
timeout_ms=60_000,
)
print("OK", job_id)import time
# importing the lib
from omniq.client import OmniqClient
# creating your handler (ctx will have all the job information and actions)
def my_actions(ctx):
print("Waiting 2 seconds")
time.sleep(2)
print("Done")
# creating OmniQ passing redis information
omniq = OmniqClient(
host="omniq-redis",
port=6379,
)
# creating the consumer that will listen and execute the actions in your handler
omniq.consume(
queue="demo",
handler=my_actions,
verbose=True,
drain=False,
)Inside handler(ctx):
queuejob_idpayload_rawpayloadattemptlock_until_mslease_tokengidexec→ execution layer (ctx.exec)
All admin operations are Lua-backed and atomic.
omniq.retry_failed(queue="demo", job_id="01ABC...")- Works only if job state is
failed - Resets attempt counter
- Respects grouping rules
results = omniq.retry_failed_batch(
queue="demo",
job_ids=["01A...", "01B...", "01C..."]
)
for job_id, status, reason in results:
print(job_id, status, reason)- Max 100 jobs per call
- Atomic batch
- Per-job result returned
omniq.remove_job(
queue="demo",
job_id="01ABC...",
lane="failed", # wait | delayed | failed | completed | gwait
)Rules:
- Cannot remove active jobs
- Lane must match job state
- Group safety preserved
results = omniq.remove_jobs_batch(
queue="demo",
lane="failed",
job_ids=["01A...", "01B...", "01C..."]
)- Max 100 per call
- Strict lane validation
- Atomic per batch
pause_result = omniq.pause(
queue="demo",
)
resume_result = omniq.resume(
queue="demo",
)
is_paused = omniq.is_paused(
queue="demo",
)Inside handler(ctx):
queuejob_idpayload_rawpayloadattemptlock_until_mslease_tokengidexec
A handler-driven primitive for fan-out workflows.
No TTL. Cleanup happens only when counter reaches zero.
The first queue will receive a document with 5 pages
# importing the lib
from omniq.client import OmniqClient
# creating OmniQ passing redis information
omniq = OmniqClient(
host="omniq-redis",
port=6379,
)
# publishing the job
job_id = omniq.publish(
queue="documents",
payload={
"document_id": "doc-123", # this will be our unique key to initiate childs and tracking then until completion
"pages": 5, # each page must be completed before something happen
},
)
print("OK", job_id)The first consumer will publish a job for each page passing the unique key for childs tracking
# importing the lib
from omniq.client import OmniqClient
# creating OmniQ passing redis information
omniq = OmniqClient(
host="omniq-redis",
port=6379,
)
# publishing the job
job_id = omniq.publish(
queue="documents",
payload={
"document_id": "doc-123", # this will be our unique key to initiate childs and tracking then until completion
"pages": 5, # each page must be completed before something happen
},
)
print("OK", job_id)The second consumer will deal with each page and ack each child (alerting when the last page was processed)
import time
# importing the lib
from omniq.client import OmniqClient
# creating your handler (ctx will have all the job information and actions)
def page_worker(ctx):
page = ctx.payload["page"]
# getting the unique key to track the childs
completion_key = ctx.payload["completion_key"]
print(f"[page_worker] Processing page {page} (job_id={ctx.job_id})")
time.sleep(1.5)
# acking itself as a child the number of remaining jobs are returned so we can say when the last job was executed
remaining = ctx.exec.child_ack(completion_key)
print(f"[page_worker] Page {page} done. Remaining={remaining}")
# remaining will be 0 ONLY when this is the last job
# will return > 0 when are still jobs to process
# and -1 if something goes wrong with the counter
if remaining == 0:
print("[page_worker] Last page finished.")
# creating OmniQ passing redis information
omniq = OmniqClient(
host="omniq-redis",
port=6379,
)
# creating the consumer that will listen and execute the actions in your handler
omniq.consume(
queue="pages",
handler=page_worker,
verbose=True,
drain=False,
)Properties:
- Idempotent decrement
- Safe under retries
- Cross-queue safe
- Fully business-logic driven
# if you provide a gid (group_id) you can limit the parallel execution for jobs in the same group
omniq.publish(queue="demo", payload={"i": 1}, gid="company:acme", group_limit=1)
# you can also publis ungrouped jobs that will also be executed (fairness by round-robin algorithm)
omniq.publish(queue="demo", payload={"i": 2})- FIFO inside group
- Groups execute in parallel
- Concurrency limited per group
You publish your job as usual
# importing the lib
from omniq.client import OmniqClient
# creating OmniQ passing redis information
uq = OmniqClient(
host="omniq-redis",
port=6379,
)
# publishing the job
job_id = uq.publish(
queue="test",
payload={"hello": "world"},
timeout_ms=30_000
)
print("OK", job_id)Inside your consumer you can pause/resume your queue (or another one)
import time
# importing the lib
from omniq.client import OmniqClient
# creating your handler (ctx will have all the job information and actions)
def pause_unpause_example(ctx):
print("Waiting 2 seconds")
# checking if this queue it is paused (spoiler: it's not)
is_paused = ctx.exec.is_paused(
queue="test"
)
print("Is paused", is_paused)
time.sleep(2)
print("Pausing")
# pausing this queue (this job it's and others active jobs will be not affected but not new job will be start until queue is resumed)
ctx.exec.pause(
queue="test"
)
# checking again now is suposed to be paused
is_paused = ctx.exec.is_paused(
queue="test"
)
print("Is paused", is_paused)
time.sleep(2)
print("Resuming")
# resuming this queue (all other workers can process jobs again)
ctx.exec.resume(
queue="test"
)
# checking again and is suposed to be resumed
is_paused = ctx.exec.is_paused(
queue="test"
)
print("Is paused", is_paused)
time.sleep(2)
print("Done")
# creating OmniQ passing redis information
omniq = OmniqClient(
host="omniq-redis",
port=6379,
)
# creating the consumer that will listen and execute the actions in your handler
omniq.consume(
queue="test",
handler=pause_unpause_example,
verbose=True,
drain=False,
)All examples can be found in the ./examples folder.
See the repository license.