EventFlow is a production-grade event-driven backend system built with Python, providing real-time visibility into user actions and system events. It demonstrates modern event streaming architecture with support for high-throughput message processing, reliable delivery, and comprehensive observability.
ποΈ Architecture Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EventFlow System β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββ
β β β
βΌ βΌ βΌ
βββββββββββββββββββββββ βββββββββββββββββββββββ βββββββββββββββββββββββ
β Event Producers β β Message Broker β β Event Consumers β
β ββββββββββββββββ β β ββββββββββββββ β β ββββββββββββββββ β
β β’ REST API β β β’ Redis Streams β β β’ Event Workers β
β β’ Batch Processing β ββββββββββΆ β β’ Consumer Groups β ββββββββββΆ β β’ Retry Handler β
β β’ Event Factory β β β’ Message Ordering β β β’ DLQ Handler β
β β’ CloudEvents Spec β β β’ Persistence β β β’ Webhook Delivery β
βββββββββββββββββββββββ βββββββββββββββββββββββ βββββββββββββββββββββββ
β β β
β β β
ββββββββββββββββββββ¬βββββββββββββββββββββββ΄βββββββββββββββββββ¬βββββββββββββββββββββββ
β β
βΌ βΌ
βββββββββββββββββββββββ βββββββββββββββββββββββ
β Persistence Layer β β Observability β
β βββββββββββββββββ β β ββββββββββββ β
β β’ PostgreSQL β β β’ Structured Logs β
β β’ Event Store β β β’ Prometheus Metricsβ
β β’ Query Patterns β β β’ Health Checks β
β β’ Data Retention β β β’ Grafana Dashboardsβ
βββββββββββββββββββββββ βββββββββββββββββββββββ
CloudEvents-compliant schema design for interoperability
Multiple event types : User, System, Business, Analytics events
Event batching with automatic flushing for optimized throughput
Producer pool for high-concurrency scenarios
Redis Streams for durable, ordered message delivery
Consumer groups for horizontal scaling
Message acknowledgment with automatic retry
Dead Letter Queue (DLQ) for failed message handling
Exponential backoff retry mechanism with jitter
Circuit breaker pattern for fault tolerance
Graceful shutdown with in-flight message completion
Idempotent processing via unique event IDs
Structured logging with JSON output (structlog)
Prometheus metrics for real-time monitoring
Health check endpoints for container orchestration
Request tracing with correlation IDs
RESTful endpoints built with FastAPI
Auto-generated OpenAPI documentation
Event replay capabilities
Subscription management for webhooks
EventFlow/
βββ src/eventflow/
β βββ __init__.py # Package initialization
β βββ app.py # FastAPI application entry
β βββ cli.py # Command-line interface
β βββ config.py # Configuration management
β β
β βββ api/ # REST API layer
β β βββ dependencies.py # FastAPI dependencies
β β βββ schemas.py # Request/response models
β β βββ routes/
β β βββ events.py # Event CRUD endpoints
β β βββ health.py # Health check endpoints
β β βββ dlq.py # Dead letter queue management
β β βββ subscriptions.py # Webhook subscriptions
β β βββ metrics.py # Prometheus metrics endpoint
β β
β βββ core/ # Core business logic
β β βββ broker.py # Redis Streams broker
β β βββ producer.py # Event producer with batching
β β
β βββ consumers/ # Event processing
β β βββ consumer.py # Event consumer with retry
β β βββ worker.py # Background workers
β β
β βββ models/ # Database models
β β βββ database.py # SQLAlchemy models
β β
β βββ persistence/ # Data access layer
β β βββ database.py # Database connection
β β βββ repository.py # Repository pattern
β β
β βββ schemas/ # Domain schemas
β β βββ events.py # CloudEvents schemas
β β
β βββ observability/ # Monitoring & logging
β βββ logging.py # Structured logging
β βββ metrics.py # Prometheus metrics
β βββ retry.py # Retry & circuit breaker
β
βββ tests/ # Test suite
β βββ conftest.py # Pytest fixtures
β βββ test_api.py # API integration tests
β βββ test_schemas.py # Schema validation tests
β βββ test_retry.py # Retry mechanism tests
β
βββ scripts/
β βββ generate_demo_data.py # Demo data generator
β
βββ docker/ # Docker configuration
β βββ prometheus.yml # Prometheus config
β βββ grafana/ # Grafana provisioning
β
βββ Dockerfile # API server image
βββ Dockerfile.worker # Worker image
βββ docker-compose.yml # Full stack orchestration
βββ pyproject.toml # Project configuration
βββ .env.example # Environment template
βββ README.md # This file
Python 3.11+
Docker & Docker Compose
Redis 7.0+
PostgreSQL 15+
git clone https://github.com/yourusername/eventflow.git
cd eventflow
# Create environment file
cp .env.example .env
2. Start with Docker Compose
# Start all services (API, Worker, PostgreSQL, Redis, Monitoring)
docker-compose up -d
# View logs
docker-compose logs -f eventflow-api
# Check health
curl http://localhost:8000/health
# Create a user event
curl -X POST http://localhost:8000/api/v1/events \
-H " Content-Type: application/json" \
-d ' {
"type": "user.created",
"source": "api",
"data": {
"user_id": "usr_12345",
"email": "user@example.com",
"username": "johndoe"
}
}'
Setup Virtual Environment
# Create and activate virtual environment
python -m venv .venv
.venv\S cripts\a ctivate # Windows
source .venv/bin/activate # Linux/Mac
# Install dependencies
pip install -e " .[dev]"
# Start dependencies
docker-compose up -d postgres redis
# Run database migrations
python -m eventflow.cli db upgrade
# Start API server
uvicorn eventflow.app:create_app --factory --reload
# In another terminal - Start worker
python -m eventflow.cli worker start
# Generate 500 realistic events
python scripts/generate_demo_data.py
POST /api/v1/events
Content-Type: application/json
{
"type" : " user.login" ,
"source" : " web-app" ,
"data" : {
"user_id" : " usr_123" ,
"ip_address" : " 192.168.1.1"
},
"metadata" : {
"priority" : " high" ,
"partition_key" : " usr_123"
}
}
GET /api/v1/events?event_type=user.login&limit=50&offset=0
GET /api/v1/events/{event_id}
POST /api/v1/events/replay
Content-Type: application/json
{
"stream" : " events:user" ,
"start_time" : " 2024-01-01T00:00:00Z" ,
"end_time" : " 2024-01-02T00:00:00Z"
}
GET /api/v1/dlq?limit=100
POST /api/v1/dlq/{message_id}/retry
DELETE /api/v1/dlq/{message_id}
POST /api/v1/subscriptions
Content-Type: application/json
{
"name" : " order-notifications" ,
"event_types" : [" order.created" , " order.completed" ],
"webhook_url" : " https://your-service.com/webhook" ,
"secret" : " your-webhook-secret"
}
GET /health # Basic health
GET /health/ready # Readiness probe
GET /health/live # Liveness probe
GET /metrics # Prometheus metrics
Event Type
Description
user.created
New user registration
user.updated
User profile update
user.deleted
User account deletion
user.login
User login
user.logout
User logout
Event Type
Description
system.health_check
Service health report
system.error
System error occurred
system.config_changed
Configuration update
Event Type
Description
order.created
New order placed
order.updated
Order status change
payment.completed
Payment processed
notification.sent
Notification delivered
Event Type
Description
page.view
Page view tracked
feature.used
Feature usage logged
conversion.tracked
Conversion recorded
Variable
Default
Description
APP_NAME
eventflow
Application name
APP_ENV
development
Environment (development/staging/production)
DEBUG
false
Enable debug mode
API_HOST
0.0.0.0
API server host
API_PORT
8000
API server port
DATABASE_URL
postgresql+asyncpg://...
Database connection URL
REDIS_URL
redis://localhost:6379
Redis connection URL
LOG_LEVEL
INFO
Logging level
LOG_FORMAT
json
Log format (json/console)
Variable
Default
Description
BATCH_SIZE
100
Events per batch
BATCH_TIMEOUT_MS
1000
Batch flush timeout
CONSUMER_GROUP
eventflow-workers
Redis consumer group
MAX_RETRIES
3
Max retry attempts
RETRY_BASE_DELAY
1.0
Base retry delay (seconds)
# Run all tests
pytest
# Run with coverage
pytest --cov=eventflow --cov-report=html
# Run specific test file
pytest tests/test_api.py -v
# Run tests matching pattern
pytest -k " test_event" -v
Unit Tests : Schema validation, utility functions
Integration Tests : API endpoints, database operations
Reliability Tests : Retry mechanisms, circuit breaker
# Build images
docker-compose build
# Start in production mode
docker-compose -f docker-compose.yml up -d
# Scale workers
docker-compose up -d --scale eventflow-worker=3
The containers include health checks for orchestration:
healthcheck :
test : ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval : 30s
timeout : 10s
retries : 3
# View resource usage
docker stats
# View logs
docker-compose logs -f --tail=100
# Restart services
docker-compose restart eventflow-api eventflow-worker
Key metrics exported:
Metric
Type
Description
eventflow_events_published_total
Counter
Total events published
eventflow_events_consumed_total
Counter
Total events consumed
eventflow_events_failed_total
Counter
Failed event processing
eventflow_event_processing_seconds
Histogram
Processing latency
eventflow_dlq_size
Gauge
Dead letter queue size
eventflow_circuit_breaker_state
Gauge
Circuit breaker status
Pre-configured dashboards for:
Event throughput and latency
Error rates and DLQ growth
Consumer lag and processing time
System health metrics
π§ Reliability Patterns
Retry with Exponential Backoff
from eventflow .observability import retry_async , RetryConfig
config = RetryConfig (
max_retries = 3 ,
base_delay = 1.0 ,
max_delay = 60.0 ,
exponential_base = 2.0 ,
jitter = True ,
)
@retry_async (config )
async def process_event (event ):
# Processing logic
pass
from eventflow .observability import CircuitBreaker
breaker = CircuitBreaker (
failure_threshold = 5 ,
recovery_timeout = 30.0 ,
half_open_max_calls = 3 ,
)
async with breaker :
await call_external_service ()
Fork the repository
Create a feature branch (git checkout -b feature/amazing-feature)
Commit your changes (git commit -m 'Add amazing feature')
Push to the branch (git push origin feature/amazing-feature)
Open a Pull Request
Follow PEP 8 style guide
Write tests for new features
Update documentation as needed
Use conventional commit messages
This project is licensed under the MIT License - see the LICENSE file for details.
Built with β€οΈ for scalable event-driven systems