Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Requires Python 3.7+.
- [x] Password authentication
- [x] TLS support
- [x] Graceful worker shutdown (ctrl-c will allow 15s for pending jobs to finish)
- [x] High-performance batch pushing (via PUSHB) for bulk job creation

#### Todo

Expand Down Expand Up @@ -72,9 +73,32 @@ w.run() # runs until control-c or worker shutdown from Faktory web UI

The default mode of concurrency is to use a [ProcessPoolExecutor](https://devdocs.io/python~3.11/library/concurrent.futures#concurrent.futures.ProcessPoolExecutor). Multiple processes are started, the number being controlled by the `concurrency` keyword argument of the `Worker` class. New processes are started only once, and stay up, processing jobs from the queue. There is the possibility to use threads instead of processes as a concurency mechanism. This is done by using `use_threads=True` at Worker creation. As with processes, threads are started once and reused for each job. When doing so, be mindful of the consequences of using threads in your code, like global variables concurrent access, or the fact that initialization code that is run outside of the registered functions will be run only once at worker startup, not once for each thread.


## Batch Pushing

For high-throughput scenarios, you can use `push_bulk` to send multiple jobs in a single network request. This utilizes the Faktory `PUSHB` command, significantly reducing network overhead.

```python
import faktory

with faktory.connection() as client:
# 1. Prepare a list of job payloads
jobs = [
client._build_job_payload('test', args=(i, i + 1))
for i in range(100)
]

# 2. Push in bulk (default chunk_size is 1000)
success = client.push_bulk(jobs)
```

#### Samples

There is very basic [example worker](examples/worker.py) and an [example producer](examples/producer.py) that you can use as a basis for your project.
here are several examples you can use as a basis for your project:
- [Example Worker](examples/worker.py): A basic job consumer.
- [Example Producer](examples/producer.py): A simple job creator using single queuing.
- [Example Batch Producer](examples/producer_batch.py): A high-performance producer demonstrating bulk job creation using `push_bulk`.


#### Connection to Faktory

Expand Down
125 changes: 125 additions & 0 deletions examples/producer_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import argparse
import logging
import os
import sys
import time

from faktory import Client


logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("faktory_producer")


def run_batch_producer(
faktory_url: str = "tcp://localhost:7419",
task_name: str = "add",
batch_size: int = 10,
interval: int = 5,
) -> None:
"""Queues Faktory jobs in batches at regular intervals.

This function maintains a persistent connection to the Faktory server and
dispatches jobs using the high-performance PUSHB (batch) command.

Args:
faktory_url: The connection string for the Faktory server.
Defaults to "tcp://localhost:7419".
task_name: The worker task name to enqueue. Defaults to "add".
batch_size: The number of jobs to include in each batch. Defaults to 10.
interval: The number of seconds to sleep between batch dispatches.
Defaults to 5.

Raises:
ConnectionError: If the producer cannot establish a link to Faktory.
"""

logger.info("Initializing producer. Target: %s | Task: %s", faktory_url, task_name)

try:
with Client(faktory=faktory_url) as client:
while True:
jobs = [
client._build_job_payload(task_name, args=(i, i + 1))
for i in range(batch_size)
]

logger.info("Pushing batch of %d jobs...", len(jobs))

if client.push_bulk(jobs):
logger.info("✅ Batch successfully accepted by Faktory.")
else:
logger.error("❌ Batch rejected by Faktory server.")

time.sleep(interval)

except (ConnectionRefusedError, OSError) as e:
logger.error("Failed to connect to Faktory at %s: %s", faktory_url, e)
raise


def parse_arguments() -> argparse.Namespace:
"""Parses command-line arguments and displays professional help descriptions."""

parser = argparse.ArgumentParser(
prog="faktory-batch-producer",
description="🚀 High-performance utility to dispatch Faktory jobs using PUSHB.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
epilog="Usage example: python producer_batch.py --task 'my_job' --size 50 --interval 2",
)

parser.add_argument(
"--url",
type=str,
default=os.getenv("FAKTORY_URL", "tcp://localhost:7419"),
help="The connection string for the Faktory server.",
)

parser.add_argument(
"--task",
type=str,
default="add",
help="The name of the worker task (jobtype) to be executed by the workers.",
)

parser.add_argument(
"--size",
type=int,
default=10,
help="The number of jobs to package into a single PUSHB batch.",
)

parser.add_argument(
"--interval",
type=int,
default=5,
help="The waiting time (in seconds) between each batch submission.",
)

return parser.parse_args()


def main() -> None:
"""Entry point for the Faktory Batch Producer CLI."""
args = parse_arguments()

try:
run_batch_producer(
faktory_url=args.url,
task_name=args.task,
batch_size=args.size,
interval=args.interval,
)
except KeyboardInterrupt:
logger.info("Shutdown signal received. Exiting gracefully...")
except Exception:
logger.exception("A fatal error occurred during production")
sys.exit(1)


if __name__ == "__main__":
main()
78 changes: 77 additions & 1 deletion faktory/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import typing
import uuid

import json
from ._proto import Connection
from typing import Optional
import logging

logger = logging.getLogger(__name__)


class Client:
Expand Down Expand Up @@ -89,3 +93,75 @@ def queue(

def random_job_id(self) -> str:
return uuid.uuid4().hex

def _build_job_payload(
self, task: str, args: Optional[typing.Iterable] = None, **kwargs: typing.Any
) -> typing.Dict[str, typing.Any]:
"""Constructs a standardized Faktory job dictionary.

Args:
task: The name of the function/worker to execute.
args: An iterable of arguments for the task. Defaults to None.
**kwargs: Optional job parameters including 'jid', 'queue',
'priority', 'reserve_for', 'at', 'retry', 'backtrace', 'custom'.

Returns:
A dictionary containing the formatted job payload ready for Faktory.
"""
return {
"jid": kwargs.get("jid") or self.random_job_id(),
"queue": kwargs.get("queue", "default"),
"jobtype": task,
"priority": kwargs.get("priority", 5),
"args": list(args) if args is not None else [],
"reserve_for": kwargs.get("reserve_for"),
"at": kwargs.get("at"),
"retry": kwargs.get("retry", 5),
"backtrace": kwargs.get("backtrace", 0),
"custom": kwargs.get("custom"),
}

def push_bulk(
self, jobs: typing.List[typing.Dict[str, typing.Any]], chunk_size: int = 1000
) -> bool:
"""Pushes multiple job payloads to Faktory using the PUSHB command.

This method handles connection management and splits the job list into
manageable chunks to optimize network performance and memory usage.

Args:
jobs: A list of job dictionaries (created via _build_job_payload).
chunk_size: The number of jobs to send per PUSHB command.
Defaults to 1000.

Returns:
True if all chunks were successfully accepted by the server ('{}'),
False otherwise.

Raises:
socket.error: If a network communication error occurs.
"""
if not jobs:
return True

was_connected = self.is_connected
if not self.is_connected:
self.connect()

try:
for i in range(0, len(jobs), chunk_size):
chunk = jobs[i : i + chunk_size]

json_payload = json.dumps(chunk)

self.faktory.reply("PUSHB", json_payload)

# Faktory PUSHB success response is an empty JSON object '{}'
response = next(self.faktory.get_message())
if response != "{}":
logger.error("Faktory PUSHB rejected: %s", response)
return False
return True
finally:
if not was_connected:
self.disconnect()
Loading