diff --git a/README.md b/README.md index 9c1789c..fe73ac6 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Requires Python 3.7+. - [x] Password authentication - [x] TLS support - [x] Graceful worker shutdown (ctrl-c will allow 15s for pending jobs to finish) +- [x] High-performance batch pushing (via PUSHB) for bulk job creation #### Todo @@ -72,9 +73,32 @@ w.run() # runs until control-c or worker shutdown from Faktory web UI The default mode of concurrency is to use a [ProcessPoolExecutor](https://devdocs.io/python~3.11/library/concurrent.futures#concurrent.futures.ProcessPoolExecutor). Multiple processes are started, the number being controlled by the `concurrency` keyword argument of the `Worker` class. New processes are started only once, and stay up, processing jobs from the queue. There is the possibility to use threads instead of processes as a concurency mechanism. This is done by using `use_threads=True` at Worker creation. As with processes, threads are started once and reused for each job. When doing so, be mindful of the consequences of using threads in your code, like global variables concurrent access, or the fact that initialization code that is run outside of the registered functions will be run only once at worker startup, not once for each thread. + +## Batch Pushing + +For high-throughput scenarios, you can use `push_bulk` to send multiple jobs in a single network request. This utilizes the Faktory `PUSHB` command, significantly reducing network overhead. + +```python +import faktory + +with faktory.connection() as client: + # 1. Prepare a list of job payloads + jobs = [ + client._build_job_payload('test', args=(i, i + 1)) + for i in range(100) + ] + + # 2. Push in bulk (default chunk_size is 1000) + success = client.push_bulk(jobs) +``` + #### Samples -There is very basic [example worker](examples/worker.py) and an [example producer](examples/producer.py) that you can use as a basis for your project. +here are several examples you can use as a basis for your project: +- [Example Worker](examples/worker.py): A basic job consumer. +- [Example Producer](examples/producer.py): A simple job creator using single queuing. +- [Example Batch Producer](examples/producer_batch.py): A high-performance producer demonstrating bulk job creation using `push_bulk`. + #### Connection to Faktory diff --git a/examples/producer_batch.py b/examples/producer_batch.py new file mode 100644 index 0000000..e9ed2b8 --- /dev/null +++ b/examples/producer_batch.py @@ -0,0 +1,125 @@ +import argparse +import logging +import os +import sys +import time + +from faktory import Client + + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("faktory_producer") + + +def run_batch_producer( + faktory_url: str = "tcp://localhost:7419", + task_name: str = "add", + batch_size: int = 10, + interval: int = 5, +) -> None: + """Queues Faktory jobs in batches at regular intervals. + + This function maintains a persistent connection to the Faktory server and + dispatches jobs using the high-performance PUSHB (batch) command. + + Args: + faktory_url: The connection string for the Faktory server. + Defaults to "tcp://localhost:7419". + task_name: The worker task name to enqueue. Defaults to "add". + batch_size: The number of jobs to include in each batch. Defaults to 10. + interval: The number of seconds to sleep between batch dispatches. + Defaults to 5. + + Raises: + ConnectionError: If the producer cannot establish a link to Faktory. + """ + + logger.info("Initializing producer. Target: %s | Task: %s", faktory_url, task_name) + + try: + with Client(faktory=faktory_url) as client: + while True: + jobs = [ + client._build_job_payload(task_name, args=(i, i + 1)) + for i in range(batch_size) + ] + + logger.info("Pushing batch of %d jobs...", len(jobs)) + + if client.push_bulk(jobs): + logger.info("✅ Batch successfully accepted by Faktory.") + else: + logger.error("❌ Batch rejected by Faktory server.") + + time.sleep(interval) + + except (ConnectionRefusedError, OSError) as e: + logger.error("Failed to connect to Faktory at %s: %s", faktory_url, e) + raise + + +def parse_arguments() -> argparse.Namespace: + """Parses command-line arguments and displays professional help descriptions.""" + + parser = argparse.ArgumentParser( + prog="faktory-batch-producer", + description="🚀 High-performance utility to dispatch Faktory jobs using PUSHB.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + epilog="Usage example: python producer_batch.py --task 'my_job' --size 50 --interval 2", + ) + + parser.add_argument( + "--url", + type=str, + default=os.getenv("FAKTORY_URL", "tcp://localhost:7419"), + help="The connection string for the Faktory server.", + ) + + parser.add_argument( + "--task", + type=str, + default="add", + help="The name of the worker task (jobtype) to be executed by the workers.", + ) + + parser.add_argument( + "--size", + type=int, + default=10, + help="The number of jobs to package into a single PUSHB batch.", + ) + + parser.add_argument( + "--interval", + type=int, + default=5, + help="The waiting time (in seconds) between each batch submission.", + ) + + return parser.parse_args() + + +def main() -> None: + """Entry point for the Faktory Batch Producer CLI.""" + args = parse_arguments() + + try: + run_batch_producer( + faktory_url=args.url, + task_name=args.task, + batch_size=args.size, + interval=args.interval, + ) + except KeyboardInterrupt: + logger.info("Shutdown signal received. Exiting gracefully...") + except Exception: + logger.exception("A fatal error occurred during production") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/faktory/client.py b/faktory/client.py index 426c15d..494e785 100644 --- a/faktory/client.py +++ b/faktory/client.py @@ -1,7 +1,11 @@ import typing import uuid - +import json from ._proto import Connection +from typing import Optional +import logging + +logger = logging.getLogger(__name__) class Client: @@ -89,3 +93,75 @@ def queue( def random_job_id(self) -> str: return uuid.uuid4().hex + + def _build_job_payload( + self, task: str, args: Optional[typing.Iterable] = None, **kwargs: typing.Any + ) -> typing.Dict[str, typing.Any]: + """Constructs a standardized Faktory job dictionary. + + Args: + task: The name of the function/worker to execute. + args: An iterable of arguments for the task. Defaults to None. + **kwargs: Optional job parameters including 'jid', 'queue', + 'priority', 'reserve_for', 'at', 'retry', 'backtrace', 'custom'. + + Returns: + A dictionary containing the formatted job payload ready for Faktory. + """ + return { + "jid": kwargs.get("jid") or self.random_job_id(), + "queue": kwargs.get("queue", "default"), + "jobtype": task, + "priority": kwargs.get("priority", 5), + "args": list(args) if args is not None else [], + "reserve_for": kwargs.get("reserve_for"), + "at": kwargs.get("at"), + "retry": kwargs.get("retry", 5), + "backtrace": kwargs.get("backtrace", 0), + "custom": kwargs.get("custom"), + } + + def push_bulk( + self, jobs: typing.List[typing.Dict[str, typing.Any]], chunk_size: int = 1000 + ) -> bool: + """Pushes multiple job payloads to Faktory using the PUSHB command. + + This method handles connection management and splits the job list into + manageable chunks to optimize network performance and memory usage. + + Args: + jobs: A list of job dictionaries (created via _build_job_payload). + chunk_size: The number of jobs to send per PUSHB command. + Defaults to 1000. + + Returns: + True if all chunks were successfully accepted by the server ('{}'), + False otherwise. + + Raises: + socket.error: If a network communication error occurs. + """ + if not jobs: + return True + + was_connected = self.is_connected + if not self.is_connected: + self.connect() + + try: + for i in range(0, len(jobs), chunk_size): + chunk = jobs[i : i + chunk_size] + + json_payload = json.dumps(chunk) + + self.faktory.reply("PUSHB", json_payload) + + # Faktory PUSHB success response is an empty JSON object '{}' + response = next(self.faktory.get_message()) + if response != "{}": + logger.error("Faktory PUSHB rejected: %s", response) + return False + return True + finally: + if not was_connected: + self.disconnect() diff --git a/tests/test_batch.py b/tests/test_batch.py new file mode 100644 index 0000000..e4f4ae5 --- /dev/null +++ b/tests/test_batch.py @@ -0,0 +1,163 @@ +from unittest.mock import MagicMock +from faktory.client import Client +from faktory._proto import Connection +import json +import typing +import pytest + + +@pytest.fixture +def conn() -> Connection: + """Mocked Connection object for protocol testing.""" + mock_conn = MagicMock(spec=Connection) + mock_conn.connect = MagicMock(return_value=True) + mock_conn.disconnect = MagicMock(return_value=None) + mock_conn.reply = MagicMock(return_value=None) + mock_conn.get_message = MagicMock(return_value=iter(["OK"])) + return mock_conn + + +@pytest.fixture +def client(conn) -> Client: + """Faktory Client fixture initialized with a mocked connection.""" + return Client(connection=conn) + + +class TestClientPushBulk: + """Tests the high-performance PUSHB (batch) functionality. + + This suite validates the implementation of the Faktory PUSHB protocol, + ensuring that the client correctly handles JSON serialization, network + chunking, connection lifecycle, and server-side errors. + """ + + def test_can_push_bulk_jobs(self, client: Client): + """Verifies that multiple jobs are correctly serialized and sent. + + Ensures the command is 'PUSHB' and the payload is a valid JSON string + containing all job dictionaries. + """ + client.faktory.get_message = MagicMock(return_value=iter(["{}"])) + + mock_reply = typing.cast(MagicMock, client.faktory.reply) + + jobs = [ + client._build_job_payload("test_task", args=(1,)), + client._build_job_payload("test_task", args=(2,)), + ] + + was_successful = client.push_bulk(jobs) + + assert was_successful is True + + mock_reply.assert_called_once() + + args, _ = mock_reply.call_args + assert args[0] == "PUSHB" + + parsed_payload = json.loads(args[1]) + assert len(parsed_payload) == 2 + + def test_push_bulk_handles_chunking(self, client: Client): + """Verifies that large job lists are split into multiple network calls. + + Validates that the chunk_size parameter is respected and that each + batch starts at the correct index. + """ + client.faktory.get_message = MagicMock(return_value=iter(["{}", "{}", "{}"])) + + mock_reply = typing.cast(MagicMock, client.faktory.reply) + + jobs = [client._build_job_payload("task", args=(i,)) for i in range(5)] + + was_successful = client.push_bulk(jobs, chunk_size=2) + + assert was_successful is True + assert mock_reply.call_count == 3 + + args, _ = mock_reply.call_args_list[1] + + assert args[0] == "PUSHB" + second_payload = json.loads(args[1]) + + assert second_payload[0]["args"] == [2] + + def test_push_bulk_fails_on_server_error(self, client: Client): + """Ensures the method returns False when Faktory rejects a batch. + + Tests the scenario where the server returns an error string instead + of the expected empty JSON object '{}'. + """ + client.faktory.get_message = MagicMock( + return_value=iter(["ERR invalid format"]) + ) + + jobs = [client._build_job_payload("task")] + was_successful = client.push_bulk(jobs) + + assert was_successful is False + + def test_push_bulk_manages_state_correctly(self, client: Client): + """Validates automatic connection and disconnection. + + Ensures that push_bulk opens a connection if needed and always + restores the client to a disconnected state. + """ + client.faktory.get_message = MagicMock(return_value=iter(["{}"])) + mock_connect = typing.cast(MagicMock, client.faktory.connect) + mock_disconnect = typing.cast(MagicMock, client.faktory.disconnect) + + assert client.is_connected is False + + client.push_bulk([{"jid": "1"}]) + + mock_connect.assert_called_once() + mock_disconnect.assert_called_once() + assert client.is_connected is False + + def test_push_bulk_stops_on_first_error(self, client: Client): + """Ensures that the process halts immediately after a batch failure. + + If a chunk in the middle of the list fails, subsequent chunks should + not be sent to avoid inconsistent states. + """ + client.faktory.get_message = MagicMock(return_value=iter(["{}", "ERR"])) + mock_reply = typing.cast(MagicMock, client.faktory.reply) + + jobs = [client._build_job_payload("task") for _ in range(4)] + + was_successful = client.push_bulk(jobs, chunk_size=2) + + assert was_successful is False + assert mock_reply.call_count == 2 + + def test_push_bulk_short_circuits_on_empty_list(self, client: Client): + """Verifies that no network overhead occurs for empty job lists.""" + + mock_connect = typing.cast(MagicMock, client.faktory.connect) + mock_reply = typing.cast(MagicMock, client.faktory.reply) + + was_successful = client.push_bulk([]) + + assert was_successful is True + mock_connect.assert_not_called() + mock_reply.assert_not_called() + + def test_push_bulk_closes_connection_on_exception(self, client: Client): + """Ensures resource cleanup even during critical network failures. + + Guarantees that disconnect is called via a 'finally' block when + a socket error occurs mid-dispatch. + """ + mock_reply = typing.cast(MagicMock, client.faktory.reply) + mock_reply.side_effect = OSError("Connection lost") + + mock_disconnect = typing.cast(MagicMock, client.faktory.disconnect) + + jobs = [client._build_job_payload("task")] + + with pytest.raises(OSError): + client.push_bulk(jobs) + + mock_disconnect.assert_called_once() + assert client.is_connected is False