Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a new “Unit” abstraction (operator + listeners + publishers) and adds YAML-driven configuration plus a Typer-based CLI to run/validate/list configured units. It also updates the dev workflow with Pixi, adds CI coverage gates, includes a container build, and ships a runnable ZMQ demo with accompanying docs.
Changes:
- Add
Unitruntime container, YAML config loader, andarroyo-runCLI. - Update packaging/CI for Pixi + coverage enforcement; add container build support.
- Add demos, example configs, and documentation for configuration-based usage.
Reviewed changes
Copilot reviewed 30 out of 32 changed files in this pull request and generated 20 comments.
Show a summary per file
| File | Description |
|---|---|
src/arroyopy/unit.py |
New Unit container + lifecycle management for operator/listeners/publishers |
src/arroyopy/config.py |
YAML/config loader + dynamic import/instantiation of components |
src/arroyopy/cli.py |
New Typer CLI for running/validating/listing units |
src/arroyopy/schemas.py |
Pydantic v2 config updates for arbitrary types |
src/arroyopy/__init__.py |
Export Unit + config helpers at package root |
src/arroyopy/app/redis_file_watcher.py |
Removes old watcher CLI/app implementation |
src/_test/test_unit.py |
Tests for Unit + config loading helpers |
src/_test/test_cli.py |
Tests for CLI business logic + command wiring |
pyproject.toml |
Adds deps (pyyaml), coverage config, pytest config, new script entrypoint |
pixi.toml |
Adds Pixi workspace, environments, and common tasks |
examples/unit_config_example.py |
Programmatic usage examples for YAML/unit loading |
examples/config/simple_pipeline.yaml |
Template YAML config example |
examples/config/README.md |
Documentation for example configs |
examples/simple_demo/__init__.py |
Simple demo package marker |
examples/simple_demo/pipeline.yaml |
Runnable demo pipeline configuration |
examples/simple_demo/simple_operator.py |
Demo operator implementation |
examples/simple_demo/zmq_components.py |
Address-configurable ZMQ listener/publisher wrappers for demo |
examples/simple_demo/source.py |
Demo ZMQ source generator |
examples/simple_demo/sink.py |
Demo ZMQ sink/console subscriber |
examples/simple_demo/quick_test.sh |
Convenience script to run demo end-to-end |
examples/simple_demo/README.md |
Demo walkthrough and setup instructions |
docs/index.md |
New docs index/landing page |
docs/configuration.md |
New configuration guide for Units/YAML/CLI |
docs/pixi-guide.md |
Pixi development guide |
docs/PIXI_QUICK_REF.md |
Pixi quick reference card |
docs/PIXI_MIGRATION.md |
Pixi migration notes |
Containerfile |
Multi-stage build for producing a CLI-capable image |
.containerignore |
Container build ignore rules |
.github/workflows/ci.yaml |
CI split into pixi + pip jobs with coverage enforcement |
README.md |
Expanded docs for Units, config-based deployment, Pixi workflow |
Comments suppressed due to low confidence (1)
README.md:380
- The 'Project Structure...' heading is garbled and uses triple quotes (''') instead of markdown code fences (
), so the remainder of the README renders incorrectly. Clean up the heading text and replace the ''' blocks with properfences.
## Project Structureuse `pixi` for CI in github action. It's great for that but can't get our favorite developr tools to use the python environments that `pixi` creaetes in the `.pixi` folder. If you want to play with `pixi`, here are some tips:
To setup a development environment:
* Git clone this repo and CD into the directory
* Install [pixi](https://pixi.sh/v0.33.0/#installation)
* Install dependencies with
'''
pixi install
'''
* run pre-commit on the files
'''
pixi r pre-commit
'''
* Run pytest with
'''
pixi r test
'''
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Instantiate publishers | ||
| publishers = [] | ||
| for i, publisher_config in enumerate(config.get("publishers", [])): | ||
| try: | ||
| publisher = _instantiate_component(publisher_config) | ||
| if not isinstance(publisher, Publisher): | ||
| raise ConfigurationError( | ||
| f"Publisher must be an instance of Publisher, got {type(publisher)}" | ||
| ) | ||
| publishers.append(publisher) |
There was a problem hiding this comment.
The strict isinstance(publisher, Publisher) check will reject existing publishers in this repo (e.g., RedisPublisher in src/arroyopy/redis.py does not subclass Publisher). As written, configs that reference arroyopy.redis.RedisPublisher (also shown in the docstring) will fail to load. Either update built-in publishers to inherit from Publisher, or relax validation to accept any object with an async publish method.
examples/config/simple_pipeline.yaml
Outdated
| - class: arroyopy.zmq.ZMQListener | ||
| kwargs: | ||
| # operator is automatically injected | ||
| address: 'tcp://127.0.0.1:5555' | ||
| socket_type: 'SUB' | ||
|
|
||
| publishers: | ||
| - class: arroyopy.zmq.ZMQPublisher |
There was a problem hiding this comment.
This example config references arroyopy.zmq.ZMQListener with address/socket_type kwargs and arroyopy.zmq.ZMQPublisher, but the codebase currently has no ZMQPublisher and ZMQListener.__init__ expects a pre-created zmq_socket. As-is, load_units_from_yaml will fail to instantiate these components. Update the example to use a configurable wrapper (like the demo's SimpleZMQListener/SimpleZMQPublisher) or extend the built-in ZMQ components to support address-based construction.
| - class: arroyopy.zmq.ZMQListener | |
| kwargs: | |
| # operator is automatically injected | |
| address: 'tcp://127.0.0.1:5555' | |
| socket_type: 'SUB' | |
| publishers: | |
| - class: arroyopy.zmq.ZMQPublisher | |
| - class: examples.simple_demo.zmq.SimpleZMQListener | |
| kwargs: | |
| # operator is automatically injected | |
| address: 'tcp://127.0.0.1:5555' | |
| socket_type: 'SUB' | |
| publishers: | |
| - class: examples.simple_demo.zmq.SimpleZMQPublisher |
| ## Pre-commit Hooks | ||
| ## Pre-commit Hooks | ||
|
|
||
| We use `pre-commit` for code quality checks. It's included in the dev dependencies. | ||
|
|
||
| ### Setup (Pixi) | ||
|
|
||
| ```bash | ||
| If pre-commit makes changes (e.g., with `black`), review them and add to your commit: | ||
|
|
||
| ```bash | ||
| git add . | ||
| # Then run pre-commit again | ||
| pixi run pre-commit # or: pre-commit run --all-files | ||
| ``` |
There was a problem hiding this comment.
This section has duplicated headers and broken markdown fencing (a bash block contains plain text and then starts another bash block). It renders incorrectly and makes the setup instructions hard to follow; please fix the code fences and remove the duplicate 'Pre-commit Hooks' header.
examples/unit_config_example.py
Outdated
| unit = load_unit_from_yaml( | ||
| "examples/config/multi_unit.yaml", unit_name="data_ingestion" | ||
| ) | ||
|
|
||
| logger.info(f"Loaded unit: {unit.name}") | ||
| # await unit.start() # Uncomment to actually run | ||
|
|
||
|
|
||
| # Example 3: Load all units | ||
| async def run_all_units(): | ||
| """Load all units from a config file.""" | ||
| logger.info("Loading all units...") | ||
|
|
||
| units = load_units_from_yaml("examples/config/multi_unit.yaml") | ||
|
|
||
| for unit in units: | ||
| logger.info(f"Found unit: {unit.name}") | ||
| logger.info(f" - Operator: {unit.operator.__class__.__name__}") | ||
| logger.info(f" - Listeners: {len(unit.listeners)}") | ||
| logger.info(f" - Publishers: {len(unit.publishers)}") | ||
|
|
||
| # To run all units concurrently: | ||
| # tasks = [unit.start() for unit in units] | ||
| # await asyncio.gather(*tasks) | ||
|
|
||
|
|
||
| # Example 4: Programmatic configuration | ||
| async def programmatic_example(): | ||
| """Create a unit programmatically without YAML.""" | ||
|
|
||
| # This would require actual operator/listener/publisher implementations | ||
| # operator = MyOperator() | ||
| # listener = MyListener(operator) | ||
| # publisher = MyPublisher() | ||
| # unit = Unit( | ||
| # name='programmatic_unit', | ||
| # operator=operator, | ||
| # listeners=[listener], | ||
| # publishers=[publisher] | ||
| # ) | ||
| # await unit.start() | ||
|
|
||
| logger.info("See the code for programmatic unit creation example") | ||
|
|
||
|
|
||
| def main(): | ||
| """Run the examples.""" | ||
| print("\n=== Arroyo Configuration Examples ===\n") | ||
|
|
||
| print("1. Load and inspect unit from config") | ||
| # asyncio.run(run_from_config()) | ||
|
|
||
| print("\n2. Load specific unit from multi-unit config") | ||
| # asyncio.run(run_specific_unit()) | ||
|
|
||
| print("\n3. Load all units") | ||
| # asyncio.run(run_all_units()) | ||
|
|
||
| print("\n4. Programmatic configuration") | ||
| # asyncio.run(programmatic_example()) | ||
|
|
||
| print("\n=== CLI Usage ===\n") | ||
| print("To run a unit from the command line:") | ||
| print(" arroyo-run examples/config/simple_pipeline.yaml") | ||
| print("\nTo validate a configuration:") | ||
| print(" arroyo-run validate examples/config/simple_pipeline.yaml") | ||
| print("\nTo list units in a config file:") | ||
| print(" arroyo-run list-units examples/config/multi_unit.yaml") | ||
| print("\nTo run a specific unit:") | ||
| print(" arroyo-run examples/config/multi_unit.yaml --unit data_ingestion") | ||
| print() |
There was a problem hiding this comment.
This example references examples/config/multi_unit.yaml in multiple places, but that file is not present in this PR, and the printed CLI usage omits the required run subcommand. Either add the missing YAML or adjust the example paths/commands to ones that exist (e.g., the simple demo).
src/arroyopy/unit.py
Outdated
| self._listeners.append(listener) | ||
| await self.operator.add_listener(listener) | ||
| logger.info(f"Unit '{self.name}': Added listener {listener.__class__.__name__}") |
There was a problem hiding this comment.
Unit.add_listener() calls await self.operator.add_listener(listener), but the current Operator.add_listener implementation starts the listener and passes a queue argument that Listener.start() implementations in this repo don't accept. Since Unit.start() also starts listeners, this call will either raise at runtime or double-start listeners. Consider removing the call to operator.add_listener here (or refactoring Operator.add_listener/Listener.start to a consistent contract).
examples/simple_demo/pipeline.yaml
Outdated
| # | ||
| # To run this demo: | ||
| # 1. In terminal 1: python source.py (publishes test data) | ||
| # 2. In terminal 2: arroyo-run pipeline.yaml (runs the pipeline) |
There was a problem hiding this comment.
The demo instructions say arroyo-run pipeline.yaml, but the CLI defines run as a subcommand, so the correct invocation is arroyo-run run pipeline.yaml. Update these comments to match the actual CLI interface.
| # 2. In terminal 2: arroyo-run pipeline.yaml (runs the pipeline) | |
| # 2. In terminal 2: arroyo-run run pipeline.yaml (runs the pipeline) |
| # Run async shutdown | ||
| loop = asyncio.get_event_loop() | ||
| if running_units: | ||
| loop.create_task(shutdown_units(running_units)) | ||
|
|
There was a problem hiding this comment.
signal_handler schedules shutdown_units(...) and then immediately calls sys.exit(0), so the shutdown coroutine will typically never get a chance to run/await (especially under asyncio.run). Also, asyncio.get_event_loop() can raise in newer Python versions when no loop is set. Consider wiring signal handling into the running asyncio loop (e.g., create the loop manually and run_until_complete, or use asyncio.Runner/loop.add_signal_handler on Unix) and only exit after shutdown completes.
| # Run async shutdown | |
| loop = asyncio.get_event_loop() | |
| if running_units: | |
| loop.create_task(shutdown_units(running_units)) | |
| # Attempt to attach shutdown to a running event loop, if any. | |
| try: | |
| loop = asyncio.get_running_loop() | |
| except RuntimeError: | |
| loop = None | |
| if running_units: | |
| if loop is not None and loop.is_running(): | |
| # Schedule shutdown on the running loop and let the main | |
| # asyncio execution flow complete after units stop. | |
| loop.create_task(shutdown_units(running_units)) | |
| return | |
| # No running loop: run shutdown synchronously and exit afterwards. | |
| asyncio.run(shutdown_units(running_units)) | |
| sys.exit(0) | |
| # No running units; exit immediately. |
| RUN if [ -z "$EXTRAS" ]; then \ | ||
| pip install --no-cache-dir /tmp/*.whl; \ | ||
| else \ | ||
| pip install --no-cache-dir "/tmp/*.whl[$EXTRAS]"; \ |
There was a problem hiding this comment.
Installing extras via pip install "/tmp/*.whl[$EXTRAS]" is not valid pip syntax for local wheel paths; extras need to be specified on the distribution name (e.g., arroyopy[zmq] @ file:///.../arroyopy-*.whl). As written, the image build will fail whenever EXTRAS is set. Consider using a direct-reference requirement string with the wheel filename, or switch to installing from source (pip install .[$EXTRAS]) in the runtime stage.
| pip install --no-cache-dir "/tmp/*.whl[$EXTRAS]"; \ | |
| WHEEL_PATH=$(echo /tmp/*.whl) && \ | |
| pip install --no-cache-dir "arroyopy[$EXTRAS] @ file://${WHEEL_PATH}"; \ |
| self._running = False | ||
| self.operator.stop_requested = True | ||
|
|
||
| # Stop all listeners | ||
| for listener in self._listeners: | ||
| await listener.stop() | ||
|
|
There was a problem hiding this comment.
Unit.stop() sets operator.stop_requested and calls listener.stop(), but it never cancels/awaits the listener tasks created in start(). For listeners whose stop() is a no-op (e.g., FileWatcherListener.stop), Unit.start() will continue awaiting gather() forever. Track an operator task and listener tasks, cancel them on shutdown as needed, and await their completion (ideally with return_exceptions=True).
Summary
This PR improves CLI testability, fixes async bugs in Unit/config loading, and adds a complete runnable demo.
Key Changes:
unitwhich is a collection one operator and multiple listeners and publishers