A lightweight, file-based distributed queue system implemented in Python. This project demonstrates various distributed queue implementations, starting with a simple JSON-based persistence model.
- Single JSON Queue: Full Publisher-Consumer loop implemented.
- Publishers write tasks with ACK.
- Consumers register, receive tasks, process them, and update status.
- Broker splits logic into separate Writer (Publisher) and Reader (Consumer) processes.
- Other Implementations: Planned.
This implementation uses a local JSON file as the persistent queue storage, managed by a dedicated broker process.
- File-Based Persistence: Uses
Queue.jsonas the single source of truth. - Dual-Broker Architecture:
- PublisherBroker: Dedicated process for handling high-throughput write operations.
- ConsumerBroker: Dedicated process for task scheduling and assignment to idle consumers.
- Process Isolation: Brokers run in separate
multiprocessing.Processinstances to decouple file I/O and logic from the main application. - Event-Driven Architecture: Communication uses strict
Eventobjects passed throughmultiprocessing.Queue. - Consumer Lifecycle Management:
- Consumers explicitly register with the Broker.
- Idle consumers are tracked in a
WaitingQueue. - Tasks are pushed to consumers via dedicated
Pipeconnections (Push model).
- Reliable Acknowledgements: Both Publishers and Consumers receive explicit acknowledgements for their operations.
- Concurrency Control: Uses
filelockto ensure safe access to the JSON file across multiple processes.
The system consists of three main components:
-
Publisher (
Publisher.py):- Generates a
TaskInobject containing data. - Sends a
WRITEevent to thePublisherBroker. - Waits for synchronous acknowledgement via a dedicated pipe connection.
- Generates a
-
Consumer (
Consumer.py):- Registers with the
BrokerManagerto receive a dedicated Task Pipe. - Enters a wait loop listening for assigned tasks.
- Processes tasks (simulates work) and sends updates back to the system.
- Automatically re-queues itself as "Available" after task completion.
- Registers with the
-
Broker Manager (
BrokerManager.py):- Orchestrator: Manages the lifecycle of
PublisherBrokerandConsumerBrokerprocesses. - Registry: Maintains maps of all active Publishers and Consumers.
- Router: Listens for completion events and routes acknowledgements/tasks to the correct specific process via Pipes.
- Orchestrator: Manages the lifecycle of
-
Broker Processes:
- PublisherBroker: Batches write operations to
Queue.json(flushing every 2s). - ConsumerBroker: Monitors the queue for pending tasks and assigns them to available consumers from the
WaitingQueue.
- PublisherBroker: Batches write operations to
Event: The envelope for communication. ContainsEventType,EventOwner, and payload.TaskIn: DTO sent by Publishers.Task: Full task object stored inQueue.jsonand processed by Consumers.
- Python >= 3.12
aiofiles: Asynchronous file I/O.filelock: Platform-independent file locking.orjson: Fast JSON serialization/deserialization.uv: (Mandatory) For project management and running.
This project relies on uv for dependency management and execution.
-
Sync Dependencies:
uv sync
-
Run the Application: Run the module directly using
uv run. This command handles the environment activation automatically.uv run python -m singleJsonDistributedQueue.main
src/singleJsonDistributedQueue/
├── broker/
│ ├── BrokerManager.py # Orchestrator, connection registry, routing logic
│ ├── PublisherBroker.py # Handling Writes & Batching
│ └── ConsumerBroker.py # Handling Task Assignment & Reads
├── enum/
│ ├── EventOwner.py # Enums for system components
│ └── EventType.py # WRITE, READ, SHUTDOWN
├── model/
│ ├── Event.py # Communication envelope
│ └── Task.py # data models
├── queue/
│ └── Queue.json # Data storage
├── Publisher.py # Client interface for writing tasks
├── Consumer.py # Client interface for processing tasks
└── main.py # Demonstration entry point