Run a list of Python callables that depend on each other — in parallel when possible, with per-task log files and optional HTML email notification on failure. Zero dependencies. Pure Python 3.10+.
- 🔗 Declare what depends on what — write your tasks in any order; the runtime sorts them so every dependency runs first.
- ⚡ Run in parallel when you can — independent tasks run together on a thread pool; the runtime switches on automatically for jobs with 10+ tasks.
- 🛡️ One failure doesn't stop the rest — a failed task skips only the jobs that depend on it, and every other part of the workflow keeps running.
- 📝 One log file per task — share a single log across the whole run, or keep them separate for easier debugging.
- 📧 Email alerts when something breaks — pass an
SMTPConfigto a task and get a styled HTML email (with traceback, task context, and the list of jobs that were skipped) the instant it raises. - 🧰 Modern, strictly-typed Python 3.10+ —
from __future__ import annotations, fullmypy --strictclean,dict[str, TaskResult],set[str],|unions.
A Process holds a list of Tasks. At construction it validates names, types, dependency references, and detects cycles — raising before anything runs.
When you call process.run(), tasks are topologically sorted and scheduled: dependencies first, independent tasks in parallel.
A TaskDependency can forward an upstream result directly into a downstream function, as a positional or keyword argument. The result is a ProcessResult with passed_tasks_results and failed_tasks for inspection.
A 15-line "hello pipeline" — one upstream task feeding a downstream one, run in parallel.
from processes import Process, Task, TaskDependency
def load_users() -> list[dict]:
return [{"id": 1}, {"id": 2}, {"id": 3}]
def enrich(users: list[dict]) -> list[dict]:
return [{**u, "name": f"user-{u['id']}"} for u in users]
tasks = [
Task("load", "run.log", load_users),
Task(
"enrich",
"run.log",
enrich,
dependencies=[TaskDependency("load", use_result_as_additional_args=True)],
),
]
with Process(tasks) as p:
result = p.run(parallel=True)
print(result.passed_tasks_results["enrich"].result)
# [{'id': 1, 'name': 'user-1'}, {'id': 2, 'name': 'user-2'}, {'id': 3, 'name': 'user-3'}]A realistic mini-pipeline: fetch two sources in parallel, transform them, aggregate, and notify — with per-task log files, result piping, and one task deliberately failing to show fault isolation.
Show the full end-to-end example
import logging
from pathlib import Path
from processes import HTMLEmailStyle, Process, SMTPConfig, Task, TaskDependency
LOG_DIR = Path("logs")
LOG_DIR.mkdir(exist_ok=True)
# --- 1. Two independent "fetch" tasks that run in parallel -----------------
def fetch_orders() -> list[dict]:
logging.info("querying orders API")
return [{"order_id": 1, "amount": 42.0}, {"order_id": 2, "amount": 17.5}]
def fetch_inventory() -> list[dict]:
logging.info("querying inventory API")
return [{"sku": "A-1", "qty": 12}, {"sku": "B-2", "qty": 3}]
# --- 2. Two transforms that consume the upstream results -------------------
def total_revenue(orders: list[dict]) -> float:
total = sum(o["amount"] for o in orders)
logging.info("revenue computed: %s", total)
return total
def stock_value(inventory: list[dict], *, price_per_unit: float = 10.0) -> float:
value = sum(i["qty"] for i in inventory) * price_per_unit
logging.info("stock value: %s", value)
return value
# --- 3. An aggregator that joins the two branches --------------------------
def build_report(*, revenue: float, stock: float) -> str:
return f"daily-report | revenue={revenue:.2f} stock={stock:.2f}"
# --- 4. A flaky notifier that ALWAYS fails — to show fault isolation -------
def notify_slack(report: str) -> None:
raise RuntimeError("slack webhook returned 503")
# --- 5. A sibling task that does NOT depend on notify and still runs -------
def archive_report(report: str) -> str:
out = LOG_DIR / "report.txt"
out.write_text(report)
return str(out)
# --- 6. Optional: SMTP config so failures page on-call --------------------
smtp = SMTPConfig(
mailhost=("smtp.example.com", 587),
fromaddr="alerts@example.com",
toaddrs=["oncall@example.com"],
credentials=("user", "pass"),
secure=(),
)
tasks = [
Task("fetch_orders", LOG_DIR / "fetch_orders.log", fetch_orders),
Task("fetch_inventory", LOG_DIR / "fetch_inventory.log", fetch_inventory),
Task(
"compute_revenue",
LOG_DIR / "compute_revenue.log",
total_revenue,
dependencies=[TaskDependency("fetch_orders", use_result_as_additional_args=True)],
),
Task(
"compute_stock",
LOG_DIR / "compute_stock.log",
stock_value,
kwargs={"price_per_unit": 7.25},
dependencies=[
TaskDependency(
"fetch_inventory",
use_result_as_additional_kwargs=True,
additional_kwarg_name="inventory",
)
],
),
Task(
"build_report",
LOG_DIR / "build_report.log",
build_report,
dependencies=[
TaskDependency("compute_revenue", use_result_as_additional_kwargs=True,
additional_kwarg_name="revenue"),
TaskDependency("compute_stock", use_result_as_additional_kwargs=True,
additional_kwarg_name="stock"),
],
),
# notify_slack fails on purpose. archive_report is a *sibling*
# of notify_slack (both depend on build_report), so it has no
# dependency on the failed task and runs normally — the rest of
# the workflow is not blackholed by one broken step.
Task(
"notify_slack",
LOG_DIR / "notify_slack.log",
notify_slack,
dependencies=[TaskDependency("build_report", use_result_as_additional_args=True)],
smtp_config=smtp,
),
Task(
"archive_report",
LOG_DIR / "archive_report.log",
archive_report,
dependencies=[TaskDependency("build_report", use_result_as_additional_args=True)],
),
]
with Process(tasks) as process:
result = process.run(parallel=True)
print("passed:", sorted(result.passed_tasks_results))
# archive_report, build_report, compute_revenue, compute_stock, fetch_inventory, fetch_orders
print("failed:", sorted(result.failed_tasks))
# notify_slack
print("report:", result.passed_tasks_results["build_report"].result)
# daily-report | revenue=59.50 stock=262.50The failing notify_slack task does not abort the run. archive_report is a sibling of the failed task (both depend on the successful build_report), so it runs unaffected — the rest of the workflow is not blackholed by one broken step. The HTML email handler also fires on the notify_slack task, paging on-call with the full traceback and the list of downstream tasks that were skipped because of it.
Show API reference
Task(
name: str,
log_path: str | os.PathLike,
func: Callable[..., Any],
args: tuple = (),
kwargs: dict | None = None,
dependencies: list[TaskDependency] | None = None,
smtp_config: SMTPConfig | None = None,
email_style: HTMLEmailStyle | None = None,
timeout: float | None = None,
retries: int | None = 0,
retry_on: tuple[type[Exception], ...] | None = None,
)name— unique within theProcess; no spaces.log_path— the file this task logs to (INFO level, format%(asctime)s - %(name)s - %(levelname)s - %(message)s).func— the callable; receivesfunc(*args, **kwargs)after result-injection.smtp_config— when set, fires an HTML email onlogging.ERROR; body includestask_name,function,args,kwargs, anddownstream_impact.email_style— optional presentation override; defaults toHTMLEmailStyle()(modern, neutral, English) whensmtp_configis set.timeout— seconds allowed per attempt;Nonemeans no limit. When the timeout fires the underlying thread is detached (Python threading limitation).retries— additional attempts after the first failure;0orNonemeans a single attempt. Defaults to0.retry_on— tuple of exception types that trigger a retry. Whenretries >= 1andretry_onisNone, defaults to(ConnectionError, TimeoutError)at call time.
TaskDependency(
task_name: str,
use_result_as_additional_args: bool = False,
use_result_as_additional_kwargs: bool = False,
additional_kwarg_name: str = "",
)use_result_as_additional_args=True— upstream result appended as the next positional arg.use_result_as_additional_kwargs=Truewith a non-emptyadditional_kwarg_name— upstream result injected as a keyword arg.- Both flags can be combined (positional first, then kwarg).
Process(tasks: list[Task]) # validates types, names, deps, cycles
process.run(parallel: bool | None = None, max_workers: int = 4) -> ProcessResult- Raises
DependencyNotFoundError,CircularDependencyError,TypeError,ValueErroron construction if the workflow is malformed. parallel=Noneauto-parallelises whenlen(tasks) >= 10;max_workers=1is always sequential.- Use as a context manager — it cleans up
FileHandlers on exit.
result.passed_tasks_results # dict[str, TaskResult] — name → TaskResult for every task that succeeded
result.failed_tasks # set[str] — all tasks that did not produce a result (errored + skipped)
result.errored_tasks # set[str] — tasks whose function actually raised
result.skipped_tasks # set[str] — tasks skipped because an upstream dependency failed
TaskResult(worked: bool, result: Any, exception: Exception | None)SMTPConfig(
mailhost, # (host, port)
fromaddr,
toaddrs, # list[str]
credentials=None, # (username, password) | None
secure=None, # () = STARTTLS; omit for no encryption
timeout=5,
)HTMLEmailStyle(
style="modern", # classic | modern | compact
palette="neutral", # neutral | catppuccin | neobones | slate
language="en", # en | es | pt | fr | de | it
traced_vars_frame_filter=None, # substring to pick the traced frame | None
)All fields are optional — omit HTMLEmailStyle entirely to use the defaults.
On failure, the email body includes the local variables of the outermost
user frame in the traceback — i.e. the last frame that is not inside
site-packages or your virtualenv. A file:line reference next to the
section shows exactly where those values were captured.
traced_vars_frame_filter lets you point this at a different frame: set it
to a path substring (e.g. one of your own package or module names) to
capture locals from the outermost frame whose filename contains that
substring instead. This is useful for deep-debugging code that runs through
several layers of internal libraries or wrappers, where the default
outermost-user-frame would land too high up the call stack.
Show fault-tolerance rules in detail
When a task raises:
- The exception is caught and stored in
TaskResult.exception; the task name goes intofailed_tasksanderrored_tasks. - Every task that depends on it (directly or indirectly) is skipped — added to
failed_tasksandskipped_taskswithout running. - Every other independent part of the workflow keeps running. With
parallel=Truethey keep running concurrently on the worker pool. - After
run()returns,ProcessResult.errored_tasksandProcessResult.skipped_taskslet you distinguish root failures from cascade skips for triage or alerting.
When a task has retries >= 1, a failure matching retry_on triggers another attempt before the task is declared failed and its dependants are skipped. This gives transient errors (network blips, connection resets) a chance to resolve without aborting downstream work.
This makes the library a good fit for fan-out / fan-in pipelines, "best-effort" notifications, and any workflow where one broken step should not blackhole the rest.
Show comparison with other libraries
| Processes | Airflow | Celery | Luigi | |
|---|---|---|---|---|
| External dependencies | None | many | broker (Redis/RabbitMQ) | few |
| Setup cost | pip install |
cluster | broker + workers | task + config |
| Parallelism | built-in | via executors | via workers | via workers |
| Per-task file logs | yes (built-in) | via handlers | via signals | partial |
| HTML email on failure | yes (built-in) | via callbacks | via signals | manual |
| DAG validation at construction | yes | yes (DAG file) | n/a | partial |
Strict typing (mypy --strict) |
yes | partial | partial | no |
Processes is not a distributed scheduler — there are no workers on remote machines, no SLA monitoring, no web UI. If you need any of those, you need Airflow or a similar orchestrator. If you want a small, fast, dependency-aware pipeline that just runs in a single process, this is it.
Show advanced configuration
- Shared log file — pass the same
log_pathto everyTaskfor a single combined run.log; pass distinct paths for per-task isolation. - Auto-parallel —
Process.run()with no argument runs sequentially for small workflows and switches to parallel forlen(tasks) >= 10. Passparallel=Trueorparallel=Falseto force the mode. - Result inspection — iterate
result.passed_tasks_results.items()to log or post-process every successful task; iterateresult.failed_tasksfor triage. - Re-raising — wrap
process.run()intry/exceptif you need a non-zero exit code on any failure; the library itself does not raise on partial failure.
From PyPI:
pip install processesOr straight from the repository (pure Python, no build step):
pip install git+https://github.com/oliverm91/processes.gitRequires Python 3.10+.
Released under the MIT License — see docs for full API details.
Contributions welcome — see CONTRIBUTING.md for the workflow, style, and commit-message conventions used by this project.