Skip to content

Add units#13

Open
dylanmcreynolds wants to merge 8 commits intomainfrom
units
Open

Add units#13
dylanmcreynolds wants to merge 8 commits intomainfrom
units

Conversation

@dylanmcreynolds
Copy link
Collaborator

Summary

This PR improves CLI testability, fixes async bugs in Unit/config loading, and adds a complete runnable demo.

Key Changes:

  • Adds the concept of a unit which is a collection one operator and multiple listeners and publishers
  • Units can be run be a single cli command, and decoratively configured in a yaml file
  • Changes to use pixi for builds
  • Adds test coverage checking to CI
  • Adds a Containerimage that can be used to build containers with various combinations of dependencies.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new “Unit” abstraction (operator + listeners + publishers) and adds YAML-driven configuration plus a Typer-based CLI to run/validate/list configured units. It also updates the dev workflow with Pixi, adds CI coverage gates, includes a container build, and ships a runnable ZMQ demo with accompanying docs.

Changes:

  • Add Unit runtime container, YAML config loader, and arroyo-run CLI.
  • Update packaging/CI for Pixi + coverage enforcement; add container build support.
  • Add demos, example configs, and documentation for configuration-based usage.

Reviewed changes

Copilot reviewed 30 out of 32 changed files in this pull request and generated 20 comments.

Show a summary per file
File Description
src/arroyopy/unit.py New Unit container + lifecycle management for operator/listeners/publishers
src/arroyopy/config.py YAML/config loader + dynamic import/instantiation of components
src/arroyopy/cli.py New Typer CLI for running/validating/listing units
src/arroyopy/schemas.py Pydantic v2 config updates for arbitrary types
src/arroyopy/__init__.py Export Unit + config helpers at package root
src/arroyopy/app/redis_file_watcher.py Removes old watcher CLI/app implementation
src/_test/test_unit.py Tests for Unit + config loading helpers
src/_test/test_cli.py Tests for CLI business logic + command wiring
pyproject.toml Adds deps (pyyaml), coverage config, pytest config, new script entrypoint
pixi.toml Adds Pixi workspace, environments, and common tasks
examples/unit_config_example.py Programmatic usage examples for YAML/unit loading
examples/config/simple_pipeline.yaml Template YAML config example
examples/config/README.md Documentation for example configs
examples/simple_demo/__init__.py Simple demo package marker
examples/simple_demo/pipeline.yaml Runnable demo pipeline configuration
examples/simple_demo/simple_operator.py Demo operator implementation
examples/simple_demo/zmq_components.py Address-configurable ZMQ listener/publisher wrappers for demo
examples/simple_demo/source.py Demo ZMQ source generator
examples/simple_demo/sink.py Demo ZMQ sink/console subscriber
examples/simple_demo/quick_test.sh Convenience script to run demo end-to-end
examples/simple_demo/README.md Demo walkthrough and setup instructions
docs/index.md New docs index/landing page
docs/configuration.md New configuration guide for Units/YAML/CLI
docs/pixi-guide.md Pixi development guide
docs/PIXI_QUICK_REF.md Pixi quick reference card
docs/PIXI_MIGRATION.md Pixi migration notes
Containerfile Multi-stage build for producing a CLI-capable image
.containerignore Container build ignore rules
.github/workflows/ci.yaml CI split into pixi + pip jobs with coverage enforcement
README.md Expanded docs for Units, config-based deployment, Pixi workflow
Comments suppressed due to low confidence (1)

README.md:380

  • The 'Project Structure...' heading is garbled and uses triple quotes (''') instead of markdown code fences (), so the remainder of the README renders incorrectly. Clean up the heading text and replace the ''' blocks with proper fences.
## Project Structureuse `pixi` for CI in github action. It's great for that but can't get our favorite developr tools to use the python environments that `pixi` creaetes in the `.pixi` folder. If you want to play with `pixi`, here are some tips:

To setup a development environment:

* Git clone this repo and CD into the directory
* Install [pixi](https://pixi.sh/v0.33.0/#installation)
* Install dependencies with
'''
pixi install
'''
* run pre-commit on the files
'''
pixi r pre-commit
'''


* Run pytest with
'''
pixi r test
'''

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +167 to +176
# Instantiate publishers
publishers = []
for i, publisher_config in enumerate(config.get("publishers", [])):
try:
publisher = _instantiate_component(publisher_config)
if not isinstance(publisher, Publisher):
raise ConfigurationError(
f"Publisher must be an instance of Publisher, got {type(publisher)}"
)
publishers.append(publisher)
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The strict isinstance(publisher, Publisher) check will reject existing publishers in this repo (e.g., RedisPublisher in src/arroyopy/redis.py does not subclass Publisher). As written, configs that reference arroyopy.redis.RedisPublisher (also shown in the docstring) will fail to load. Either update built-in publishers to inherit from Publisher, or relax validation to accept any object with an async publish method.

Copilot uses AI. Check for mistakes.
Comment on lines +28 to +35
- class: arroyopy.zmq.ZMQListener
kwargs:
# operator is automatically injected
address: 'tcp://127.0.0.1:5555'
socket_type: 'SUB'

publishers:
- class: arroyopy.zmq.ZMQPublisher
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example config references arroyopy.zmq.ZMQListener with address/socket_type kwargs and arroyopy.zmq.ZMQPublisher, but the codebase currently has no ZMQPublisher and ZMQListener.__init__ expects a pre-created zmq_socket. As-is, load_units_from_yaml will fail to instantiate these components. Update the example to use a configurable wrapper (like the demo's SimpleZMQListener/SimpleZMQPublisher) or extend the built-in ZMQ components to support address-based construction.

Suggested change
- class: arroyopy.zmq.ZMQListener
kwargs:
# operator is automatically injected
address: 'tcp://127.0.0.1:5555'
socket_type: 'SUB'
publishers:
- class: arroyopy.zmq.ZMQPublisher
- class: examples.simple_demo.zmq.SimpleZMQListener
kwargs:
# operator is automatically injected
address: 'tcp://127.0.0.1:5555'
socket_type: 'SUB'
publishers:
- class: examples.simple_demo.zmq.SimpleZMQPublisher

Copilot uses AI. Check for mistakes.
Comment on lines +320 to 334
## Pre-commit Hooks
## Pre-commit Hooks

We use `pre-commit` for code quality checks. It's included in the dev dependencies.

### Setup (Pixi)

```bash
If pre-commit makes changes (e.g., with `black`), review them and add to your commit:

```bash
git add .
# Then run pre-commit again
pixi run pre-commit # or: pre-commit run --all-files
```
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section has duplicated headers and broken markdown fencing (a bash block contains plain text and then starts another bash block). It renders incorrectly and makes the setup instructions hard to follow; please fix the code fences and remove the duplicate 'Pre-commit Hooks' header.

Copilot uses AI. Check for mistakes.
Comment on lines +39 to +109
unit = load_unit_from_yaml(
"examples/config/multi_unit.yaml", unit_name="data_ingestion"
)

logger.info(f"Loaded unit: {unit.name}")
# await unit.start() # Uncomment to actually run


# Example 3: Load all units
async def run_all_units():
"""Load all units from a config file."""
logger.info("Loading all units...")

units = load_units_from_yaml("examples/config/multi_unit.yaml")

for unit in units:
logger.info(f"Found unit: {unit.name}")
logger.info(f" - Operator: {unit.operator.__class__.__name__}")
logger.info(f" - Listeners: {len(unit.listeners)}")
logger.info(f" - Publishers: {len(unit.publishers)}")

# To run all units concurrently:
# tasks = [unit.start() for unit in units]
# await asyncio.gather(*tasks)


# Example 4: Programmatic configuration
async def programmatic_example():
"""Create a unit programmatically without YAML."""

# This would require actual operator/listener/publisher implementations
# operator = MyOperator()
# listener = MyListener(operator)
# publisher = MyPublisher()
# unit = Unit(
# name='programmatic_unit',
# operator=operator,
# listeners=[listener],
# publishers=[publisher]
# )
# await unit.start()

logger.info("See the code for programmatic unit creation example")


def main():
"""Run the examples."""
print("\n=== Arroyo Configuration Examples ===\n")

print("1. Load and inspect unit from config")
# asyncio.run(run_from_config())

print("\n2. Load specific unit from multi-unit config")
# asyncio.run(run_specific_unit())

print("\n3. Load all units")
# asyncio.run(run_all_units())

print("\n4. Programmatic configuration")
# asyncio.run(programmatic_example())

print("\n=== CLI Usage ===\n")
print("To run a unit from the command line:")
print(" arroyo-run examples/config/simple_pipeline.yaml")
print("\nTo validate a configuration:")
print(" arroyo-run validate examples/config/simple_pipeline.yaml")
print("\nTo list units in a config file:")
print(" arroyo-run list-units examples/config/multi_unit.yaml")
print("\nTo run a specific unit:")
print(" arroyo-run examples/config/multi_unit.yaml --unit data_ingestion")
print()
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example references examples/config/multi_unit.yaml in multiple places, but that file is not present in this PR, and the printed CLI usage omits the required run subcommand. Either add the missing YAML or adjust the example paths/commands to ones that exist (e.g., the simple demo).

Copilot uses AI. Check for mistakes.
Comment on lines +104 to +106
self._listeners.append(listener)
await self.operator.add_listener(listener)
logger.info(f"Unit '{self.name}': Added listener {listener.__class__.__name__}")
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit.add_listener() calls await self.operator.add_listener(listener), but the current Operator.add_listener implementation starts the listener and passes a queue argument that Listener.start() implementations in this repo don't accept. Since Unit.start() also starts listeners, this call will either raise at runtime or double-start listeners. Consider removing the call to operator.add_listener here (or refactoring Operator.add_listener/Listener.start to a consistent contract).

Copilot uses AI. Check for mistakes.
#
# To run this demo:
# 1. In terminal 1: python source.py (publishes test data)
# 2. In terminal 2: arroyo-run pipeline.yaml (runs the pipeline)
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The demo instructions say arroyo-run pipeline.yaml, but the CLI defines run as a subcommand, so the correct invocation is arroyo-run run pipeline.yaml. Update these comments to match the actual CLI interface.

Suggested change
# 2. In terminal 2: arroyo-run pipeline.yaml (runs the pipeline)
# 2. In terminal 2: arroyo-run run pipeline.yaml (runs the pipeline)

Copilot uses AI. Check for mistakes.
Comment on lines +118 to +122
# Run async shutdown
loop = asyncio.get_event_loop()
if running_units:
loop.create_task(shutdown_units(running_units))

Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

signal_handler schedules shutdown_units(...) and then immediately calls sys.exit(0), so the shutdown coroutine will typically never get a chance to run/await (especially under asyncio.run). Also, asyncio.get_event_loop() can raise in newer Python versions when no loop is set. Consider wiring signal handling into the running asyncio loop (e.g., create the loop manually and run_until_complete, or use asyncio.Runner/loop.add_signal_handler on Unix) and only exit after shutdown completes.

Suggested change
# Run async shutdown
loop = asyncio.get_event_loop()
if running_units:
loop.create_task(shutdown_units(running_units))
# Attempt to attach shutdown to a running event loop, if any.
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if running_units:
if loop is not None and loop.is_running():
# Schedule shutdown on the running loop and let the main
# asyncio execution flow complete after units stop.
loop.create_task(shutdown_units(running_units))
return
# No running loop: run shutdown synchronously and exit afterwards.
asyncio.run(shutdown_units(running_units))
sys.exit(0)
# No running units; exit immediately.

Copilot uses AI. Check for mistakes.
RUN if [ -z "$EXTRAS" ]; then \
pip install --no-cache-dir /tmp/*.whl; \
else \
pip install --no-cache-dir "/tmp/*.whl[$EXTRAS]"; \
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Installing extras via pip install "/tmp/*.whl[$EXTRAS]" is not valid pip syntax for local wheel paths; extras need to be specified on the distribution name (e.g., arroyopy[zmq] @ file:///.../arroyopy-*.whl). As written, the image build will fail whenever EXTRAS is set. Consider using a direct-reference requirement string with the wheel filename, or switch to installing from source (pip install .[$EXTRAS]) in the runtime stage.

Suggested change
pip install --no-cache-dir "/tmp/*.whl[$EXTRAS]"; \
WHEEL_PATH=$(echo /tmp/*.whl) && \
pip install --no-cache-dir "arroyopy[$EXTRAS] @ file://${WHEEL_PATH}"; \

Copilot uses AI. Check for mistakes.
Comment on lines +165 to +171
self._running = False
self.operator.stop_requested = True

# Stop all listeners
for listener in self._listeners:
await listener.stop()

Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit.stop() sets operator.stop_requested and calls listener.stop(), but it never cancels/awaits the listener tasks created in start(). For listeners whose stop() is a no-op (e.g., FileWatcherListener.stop), Unit.start() will continue awaiting gather() forever. Track an operator task and listener tasks, cancel them on shutdown as needed, and await their completion (ideally with return_exceptions=True).

Copilot uses AI. Check for mistakes.
@dylanmcreynolds dylanmcreynolds changed the title Add units Units Add units Mar 10, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants