STAMP-based safety framework for data pipeline reliability. Interlock prevents pipelines from executing when preconditions aren't safe β like a physical interlock mechanism.
The framework applies Leveson's Systems-Theoretic Accident Model to data engineering: pipelines have control structures with traits (feedback), readiness predicates (process models), and conditional execution (safe control actions).
go get github.com/dwsmith1983/interlock
make build
# Initialize a project (starts a local Valkey container)
./interlock init my-project
cd my-project
# Evaluate a pipeline's readiness
interlock evaluate example
# Run a pipeline (evaluate + trigger)
interlock run example
# Check status
interlock status
# Start the HTTP API server
interlock serveInterlock uses a three-level check system:
- Archetypes define what to check β reusable templates of safety traits (e.g.,
batch-ingestionrequires source-freshness, upstream-dependency, resource-availability). - Pipeline configs specialize how β override thresholds, point to specific evaluators, set TTLs.
- Evaluators perform the actual checks β subprocess (any language, JSON stdin/stdout) or HTTP.
interlock evaluate my-pipeline
-> load pipeline config
-> resolve archetype (merge trait definitions)
-> for each required trait, IN PARALLEL:
spawn evaluator subprocess -> pipe config JSON to stdin -> read result from stdout
-> apply readiness rule (all-required-pass)
-> READY or NOT_READY (with blocking trait list)
Evaluators are executable files. Interlock pipes config as JSON to stdin and reads the result from stdout.
Input (stdin):
{"maxLagSeconds": 300, "source": "sales_events"}Output (stdout):
{"status": "PASS", "value": {"lagSeconds": 45, "threshold": 300}}Status must be "PASS" or "FAIL". Non-zero exit code or timeout = automatic FAIL.
Write evaluators in any language:
#!/bin/bash
# evaluators/check-disk-space
echo '{"status":"PASS","value":{"disk_pct":42}}'#!/usr/bin/env python3
# evaluators/check-source-freshness
import json, sys
config = json.load(sys.stdin)
lag = 45 # query your source system here
result = {"status": "PASS" if lag <= config["maxLagSeconds"] else "FAIL",
"value": {"lagSeconds": lag}}
json.dump(result, sys.stdout)Interlock runs in two modes: local (Redis + subprocess evaluators) and AWS (DynamoDB + Lambda + Step Functions).
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
β interlock serve β
β (HTTP API + watcher loop + status) β
ββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β InterlockEngine β
β (pure STAMP logic β readiness, lifecycle, UCA) β
ββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββ€
β Provider β Redis/Valkey [implemented] β
β Interface β DynamoDB [implemented] β
β β Postgres [archival only] β
ββββββββββββββββ΄ββββββββββββββββββββββββββββββββββββ€
β Evaluator Runner β
β (subprocess: JSON stdin β JSON stdout) β
β (any language: Python, Bash, Go, JS, etc.) β
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
ββββββββββββββββββββ DynamoDB Stream ββββββββββββββββββββββββββββββββ
β DynamoDB β βββββββββββββββββββββββΊ β stream-router β
β (single table) β β MARKER# β SFN β
ββββββββββ¬ββββββββββ β RUNLOG# β lifecycle events β
β ββββββββ¬βββββββββββ¬βββββββββββββ
β β β
β ββββββββββββββΌββββ ββββΌβββββββββββ
β β Step Function β β SNS β
β β (47-state β β (lifecycle) β
β β lifecycle) β βββββββββββββββ
β ββ¬ββββ¬ββββ¬ββββ¬ββββ
β β β β β
ββββββΌββββββ ββββββββββββ βββββββββ΄β ββ΄ββββΌβββ ββββββββββββ ββββββββββββ
βorchestr- β βevaluator β βtrigger β β run- β β SNS β β watchdog β
β ator β β(per-traitβ β(launch)β βcheckerβ β (alerts) β β(EventBr) β
β(14 acts) β β eval) β β β β(poll) β β β βstuck+missβ
ββββββββββββ ββββββββββββ ββββββββββ βββββββββ ββββββββββββ ββββββββββββ
| Cloud | Status |
|---|---|
| AWS (DynamoDB + Lambda + Step Functions) | Implemented |
| GCP (Firestore + Cloud Run + Workflows) | Planned |
| Azure (Cosmos DB + Functions + Durable Functions) | Planned |
| Type | SDK/Protocol | Use Case |
|---|---|---|
command |
Subprocess | Local scripts, CLI tools |
http |
HTTP POST | Generic REST APIs, webhooks |
airflow |
HTTP (Airflow API) | Apache Airflow DAG runs |
glue |
AWS SDK | AWS Glue ETL jobs |
emr |
AWS SDK | Amazon EMR step execution |
emr-serverless |
AWS SDK | EMR Serverless job runs |
step-function |
AWS SDK | AWS Step Functions executions |
databricks |
HTTP (REST 2.1) | Databricks job runs |
Failed triggers are automatically retried with configurable exponential backoff:
retry:
maxAttempts: 3
backoffSeconds: 30
backoffMultiplier: 2.0
retryableFailures: [TRANSIENT, TIMEOUT]When a pipeline completes, Interlock notifies downstream pipelines that depend on it via cascade markers, triggering their evaluation cycles.
After completion, Interlock can monitor traits for drift β detecting when conditions that were true at trigger time have degraded. Drift triggers alerts and rerun records.
watch:
interval: 30s
monitoring:
enabled: true
duration: 2hPipelines define evaluation and completion deadlines. Breaches fire alerts:
sla:
evaluationDeadline: "09:00"
completionDeadline: "12:00"
timezone: America/New_YorkThe watchdog runs independently to detect two classes of silent failures:
- Missed schedules β upstream ingestion failed silently, no MARKER arrived, no evaluation started by the deadline
- Stuck runs β a run started but has been in PENDING/TRIGGERING/RUNNING longer than the threshold (default: 30 minutes)
watchdog:
enabled: true
interval: 5m
stuckRunThreshold: 30mOn AWS, the watchdog runs as a separate Lambda on an EventBridge schedule. It is intentionally outside the Step Function β its job is to detect when the Step Function didn't start.
When a pipeline run reaches a terminal status (COMPLETED or FAILED), the stream-router publishes an SNS event to a lifecycle topic. Downstream consumers can subscribe for active recovery, observability, or notification workflows.
All alerts carry a machine-readable alertType field for filtering and routing:
| Category | Source | Meaning |
|---|---|---|
schedule_missed |
Watchdog | No evaluation started by deadline |
stuck_run |
Watchdog | Run in non-terminal state beyond threshold |
evaluation_sla_breach |
Orchestrator | Evaluation deadline exceeded |
completion_sla_breach |
Orchestrator | Completion deadline exceeded |
validation_timeout |
Orchestrator | Hard validation timeout hit |
trait_drift |
Orchestrator | Post-completion trait regression |
Pipelines can define multiple evaluation schedules, each with independent timing, deadlines, and SLA tracking:
name: multi-window-pipeline
archetype: batch-ingestion
schedules:
- name: morning
after: "06:00"
deadline: "09:00"
timezone: America/New_York
- name: evening
after: "18:00"
deadline: "21:00"
timezone: America/New_YorkPipelines without explicit schedules default to a single daily schedule.
Skip pipeline evaluation on specific days or dates:
exclusions:
days: [Saturday, Sunday]
dates: ["2025-12-25", "2026-01-01"]
calendar: us-holidays # reference a named calendar fileprovider: redis
redis:
addr: localhost:6379
keyPrefix: "interlock:"
server:
addr: ":3000"
archetypeDirs:
- ./archetypes
evaluatorDirs:
- ./evaluators
pipelineDirs:
- ./pipelines
alerts:
- type: console
- type: webhook
url: http://localhost:8080/alertsname: daily-sales-rollup
archetype: batch-ingestion
tier: 2
traits:
source-freshness:
evaluator: ./evaluators/check-sales-freshness
config:
source: sales_events
maxLagSeconds: 60
ttl: 120
timeout: 15
trigger:
type: command
command: "python scripts/run_pipeline.py"interlock/
βββ cmd/
β βββ interlock/ # CLI binary
β βββ lambda/ # AWS Lambda handlers
β βββ stream-router/ # DynamoDB Stream -> SFN
β βββ orchestrator/ # Multi-action workflow logic
β βββ evaluator/ # Single trait evaluation
β βββ trigger/ # Job execution + state machine
β βββ run-checker/ # External job status polling
β βββ watchdog/ # Missed schedule + stuck-run detection
βββ pkg/types/ # Public domain types
βββ internal/
β βββ engine/ # Readiness evaluation engine
β βββ provider/
β β βββ redis/ # Redis/Valkey provider
β β βββ dynamodb/ # DynamoDB provider (single-table)
β β βββ postgres/ # Postgres archival store
β βββ watcher/ # Reactive evaluation loop
β βββ schedule/ # Schedule, SLA, retry utilities
β βββ evaluator/ # Subprocess + HTTP evaluator runners
β βββ trigger/ # Trigger execution (8 types)
β βββ alert/ # Alert dispatching (console, webhook, file, SNS)
β βββ archetype/ # Archetype loading + resolution
β βββ calendar/ # Calendar exclusion registry
β βββ config/ # YAML config loading
βββ deploy/
β βββ terraform/ # Terraform deployment (AWS infrastructure)
β βββ build.sh # Lambda build script
β βββ statemachine.asl.json # Step Function definition
βββ demo/
βββ local/ # Local demo (Redis + Docker Compose)
βββ aws/ # AWS E2E test suite
make build # Build binary
make test # Run all tests
make test-unit # Unit tests (no Redis needed)
make test-integration # Integration tests (requires Redis)
make lint # gofmt + go vet + golangci-lint
make build-lambda # Build Lambda handlers
make local-e2e-test # Run local E2E test suite (Docker)- Go 1.24+
- Docker (for
interlock initβ starts Valkey container) - Redis/Valkey on
localhost:6379(for integration tests) - AWS CLI v2 + Terraform >= 1.5 (for AWS deployment)