A production-ready, real-time machine learning system for cryptocurrency price prediction built with microservices architecture, event-driven design, and ML Ops best practices.
Note: This is a learning-by-doing repository highly inspired from Real-World-ML's real-time-ml-system-cohort-4. Huge credit goes to Pau and Marius, the instructors of the course!
- System Overview
- Architecture
- Core Components
- Data Flow
- Technology Stack
- Prerequisites
- Quick Start
- Development Setup
- Running Services Locally
- Deployment
- Working on the Project
- Architecture Documentation
The Real-Time Crypto Price Predictor is a sophisticated platform that:
- Ingests real-time trade data from cryptocurrency exchanges (Kraken API)
- Processes raw trades into OHLC candles and technical indicators
- Extracts market sentiment from financial news (planned)
- Trains machine learning models using historical features
- Generates real-time price predictions (Planned)
- Monitors prediction accuracy and model performance (Planned)
- Serves predictions via REST API (Planned)
- ⚡ Real-time Processing: Stream processing with Kafka and Quixstreams
- 🤖 ML Pipeline: Automated model training, evaluation, and deployment
- 📊 Feature Store: RisingWave for real-time feature serving
- 🔄 Event-Driven: Microservices communicate via Kafka events
- 📈 Observability: Comprehensive monitoring with Grafana dashboards
- 🚀 Scalable: Kubernetes-based deployment for horizontal scaling
flowchart TD
subgraph External["External Data Sources"]
Kraken[("Trades API\n(Kraken)")]
News[("News API\n(Planned)")]
end
subgraph System["Crypto Price Prediction System"]
subgraph Ingestion["Data Ingestion & Processing"]
Trades["Trades Service"]
Candles["Candles Service"]
TI["Technical Indicators\nService"]
end
subgraph Storage["Feature Store & Storage"]
Kafka{{"Kafka\n(Message Broker)"}}
RW[("RisingWave\n(Feature Store)")]
end
subgraph ML["Machine Learning"]
Trainer["Model Trainer\n(CronJob)"]
Registry[("MLflow\n(Model Registry)")]
end
subgraph Future["Planned / Future Work"]
Predictor["Price Predictor\n(Inference)"]
ES[("Elasticsearch\n(Predictions Store)")]
API["Predictions API"]
Monitor["Model Error\nMonitor"]
end
end
subgraph Users["Users"]
Trader[("Trader")]
end
%% Data Flow
Kraken -->|WebSocket/REST| Trades
Trades -->|Raw Trades| Kafka
Kafka -->|Trades| Candles
Candles -->|OHLC Candles| Kafka
Kafka -->|Candles| TI
TI -->|Indicators| Kafka
Kafka -->|Indicators| RW
%% ML Flow
RW -->|Historical Features| Trainer
Trainer -->|Register Model| Registry
%% Planned Flow (Dotted lines)
RW -.->|Real-time Features| Predictor
Registry -.->|Load Model| Predictor
Predictor -.->|Predictions| Kafka
Kafka -.->|Predictions| ES
ES -.-> API
ES -.-> Monitor
API -.-> Trader
%% Styling
classDef service fill:#f9f,stroke:#333,stroke-width:2px;
classDef infra fill:#dfd,stroke:#333,stroke-width:2px;
classDef external fill:#ddf,stroke:#333,stroke-width:2px;
classDef user fill:#ffd,stroke:#333,stroke-width:2px;
class Trades,Candles,TI,Trainer,Predictor,API,Monitor service;
class Kafka,RW,ES,Registry,Grafana infra;
class Kraken,News external;
class Trader user;
- Purpose: Ingests real-time cryptocurrency trade data
- Features:
- WebSocket streaming for live trades
- REST API backfill for historical data
- Progressive streaming to Kafka
- Health checks and auto-recovery
- Technology: Python, Quixstreams, Kraken API
- Purpose: Converts raw trades into OHLC (Open-High-Low-Close) candles
- Features:
- Tumbling window aggregation
- Multiple timeframe support (configurable)
- Handles late-arriving messages
- Technology: Python, Quixstreams
- Purpose: Calculates technical analysis indicators
- Features:
- Moving averages (SMA, EMA)
- RSI (Relative Strength Index)
- MACD (Moving Average Convergence Divergence)
- OBV (On-Balance Volume)
- Real-time updates as candles arrive
- Technology: Python, Quixstreams, TA-Lib
- Purpose: Trains and evaluates price prediction models (Training Only)
- Features:
- Data validation with Great Expectations
- Data profiling with ydata-profiling
- Multiple model algorithms (LazyPredict)
- Hyperparameter tuning (Optuna)
- Model tracking (MLflow)
- Drift detection (Evidently)
- Technology: Python, Scikit-learn, MLflow, Optuna
- Purpose: Removes duplicate messages from Kafka streams
- Features:
- In-memory caching for deduplication
- Supports trades, candles, and technical indicators
- Kafka: Message broker (Strimzi operator)
- RisingWave: Feature store and stream processing
- MLflow: Model registry and experiment tracking
- Grafana: Monitoring and visualization dashboards
- Structurizr: Architecture documentation (C4 model)
flowchart LR
subgraph Ingestion["1. Data Ingestion"]
API[Trades API] --> S1(Trades Service)
S1 --> K1{Kafka: trades}
end
subgraph Transformation["2. Transformation"]
K1 --> S2(Candles Service)
S2 --> K2{Kafka: candles}
K2 --> S3(Technical Indicators)
S3 --> K3{Kafka: indicators}
end
subgraph FeatureStore["3. Feature Store"]
K3 --> RW[(RisingWave)]
end
subgraph Training["4. ML Training"]
RW --> Trainer(Model Trainer)
Trainer --> MLflow[(MLflow Registry)]
end
subgraph Future["5. Prediction & Serving (Planned)"]
Predictor(Price Predictor) .-> ES[(Elasticsearch)] .-> API(Predictions API)
end
flowchart LR
Trades[Raw Trades] -->|1m window| Candles[OHLC Candles]
Candles -->|Calculation| Indicators[Technical Indicators\nSMA, RSI, MACD]
Indicators -->|Stream| Store[(RisingWave\nFeature Store)]
flowchart LR
Store[(RisingWave)] -->|Historical Features| Validation[Data Validation\nGreat Expectations]
Validation --> Training[Model Training\nLazyPredict]
Training --> Eval[Model Evaluation]
Eval --> Registry[(MLflow Registry\nVersioned Models)]
flowchart LR
Store[(RisingWave)] -.->|Latest Features| Predictor[Price Predictor]
Registry[(MLflow)] -.->|Load Model| Predictor
Predictor -.->|Generate| Prediction[Price Prediction]
Prediction -.->|Store| ES[(Elasticsearch)]
ES -.->|Serve| API[Predictions API]
| Category | Technology | Purpose |
|---|---|---|
| Orchestration | Kubernetes | Container orchestration |
| Messaging | Apache Kafka (Strimzi) | Event streaming |
| Feature Store | RisingWave | Real-time feature storage |
| Model Registry | MLflow | Model versioning & tracking |
| Visualization | Grafana | Dashboards & monitoring |
| Language | Python 3.12 | Service implementation |
| Stream Processing | Quixstreams | Kafka stream processing |
| ML Framework | Scikit-learn, PyTorch | Model training |
| Architecture Docs | Structurizr | C4 model diagrams |
- uv: Python package manager
- Docker: Containerization
- kind: Local Kubernetes cluster
- kubectl: Kubernetes CLI
- k9s: Kubernetes TUI
- helm: Kubernetes package manager
- make: Build automation
- Docker Desktop (Install)
- Python 3.12+ (via
uv) - uv - Python package manager (Install)
- make - Build automation (Install)
- kubectl - Kubernetes CLI (Install)
- kind - Local Kubernetes (Install)
- helm - Kubernetes package manager (Install)
- k9s - Kubernetes TUI (optional, Install)
docker --version # Docker 20.10+
python3 --version # Python 3.12+
uv --version # uv 0.1+
make --version # GNU Make
kubectl version --client # kubectl 1.28+
kind --version # kind 0.20+
helm version # Helm 3.12+Note: For Windows users, we recommend using the devcontainer option (see Development Setup).
git clone https://github.com/mdshihabullah/real-time-crypto-price-predictor.git
cd real-time-crypto-price-predictorSee Development Setup section for detailed instructions.
cd deployments/dev/kind
chmod +x create_cluster.sh
./create_cluster.sh# Install Kafka, RisingWave, MLflow, Grafana
cd deployments/dev/kind
./install_kafka.sh
./install_risingwaves.sh
./install_mlflow.sh
./install_grafana.sh# Run trades service locally (non-dockerized)
make dev service=trades
# Or build and deploy to Kind cluster
make deploy-for-dev service=tradesFollow the Development Environment Guide to install all required tools.
- Install Docker Desktop
- Install VS Code or Cursor with Dev Containers extension
- Open the project in VS Code/Cursor
- Press
Ctrl+Shift+P(orCmd+Shift+Pon Mac) - Select "Dev Containers: Reopen in Container"
The devcontainer includes all tools pre-installed.
# Check Docker
docker run hello-world
# Check Kubernetes cluster (after creating)
kubectl get nodes
# Check k9s (if installed)
k9s# Run trades service
make dev service=trades
# Run candles service
make dev service=candles
# Run technical indicators service
make dev service=technical_indicators
# Run predictor service
make dev service=predictorNote: Ensure Kafka and other infrastructure services are running first.
# Build Docker image
make build-for-dev service=trades
# Deploy to Kind cluster
make deploy-for-dev service=trades
# Check deployment status
kubectl get pods -n services# Local service logs
# (logs appear in terminal)
# Kubernetes pod logs
kubectl logs -n services -l app.kubernetes.io/name=trades -f# 1. Create Kind cluster
cd deployments/dev/kind
./create_cluster.sh
# 2. Install infrastructure
./install_kafka.sh
./install_risingwaves.sh
./install_mlflow.sh
./install_grafana.sh
# 3. Deploy services
make deploy-for-dev service=trades
make deploy-for-dev service=candles
make deploy-for-dev service=technical_indicators
make deploy-for-dev service=predictor # Model training CronJobSee the comprehensive Production Deployment Guide.
Quick Production Commands:
# Check prerequisites
make prod-check-cluster
# Deploy infrastructure
make prod-deploy infra=true
# Deploy all services
make prod-deploy services=true
# Deploy specific service
make prod-deploy service=trades
make prod-deploy service=candles
make prod-deploy service=technical-indicators
make prod-deploy service=predictor-training # Model training CronJob
make prod-deploy service=structurizr
# Get service endpoints
make prod-get-endpoints
# View status
make prod-statusDevelopment (Kind):
# Port-forward services
kubectl port-forward -n kafka svc/kafka-ui 8080:80
kubectl port-forward -n grafana svc/grafana 3000:80
kubectl port-forward -n mlflow svc/mlflow-tracking 5000:5000
kubectl port-forward -n risingwave svc/risingwave-frontend 4567:4567 # SQL interfaceProduction:
# Get LoadBalancer IPs
make prod-get-endpoints
# Or use port-forwarding
make prod-port-forward-grafana
make prod-port-forward-kafka
make prod-port-forward-mlflowreal-time-crypto-price-predictor/
├── services/ # Microservices
│ ├── trades/ # Trade data ingestion
│ ├── candles/ # OHLC candle generation
│ ├── technical_indicators/ # Technical analysis
│ ├── predictor/ # Model training
│ └── deduplication/ # Message deduplication
├── deployments/ # Kubernetes manifests
│ ├── dev/ # Development (Kind)
│ └── prod/ # Production (DigitalOcean)
├── docker/ # Dockerfiles
├── docs/ # Documentation
│ └── c4model/ # Architecture documentation
├── scripts/ # Utility scripts
├── Makefile # Build automation
└── pyproject.toml # Python workspace config
-
Create service directory:
mkdir -p services/my-service/src/my_service
-
Create
pyproject.toml:[project] name = "my-service" version = "0.1.0" dependencies = [...]
-
Add to workspace (
pyproject.tomlroot):[tool.uv.workspace] members = [..., "services/my-service"]
-
Create Dockerfile (
docker/my-service.Dockerfile) -
Create Kubernetes manifests (
deployments/dev/my-service/) -
Update Makefile (if needed)
Current Services:
trades- Trade data ingestion (WebSocket + Backfill)candles- OHLC candle generationtechnical_indicators- Technical analysis calculationspredictor(dev) /predictor-training(prod) - Model training CronJobdeduplication- Message deduplication (prod only)structurizr- Architecture documentation
# Run linting
make lint
# Auto-fix issues
ruff check . --fix# Run service tests (when available)
cd services/trades
pytest
# Test Kubernetes deployment
kubectl get pods -n services
kubectl logs -n services <pod-name># View all Makefile commands
make help
# Run service locally
make dev service=trades
# Build Docker image
make build-for-dev service=trades
# Deploy to Kind
make deploy-for-dev service=trades
# View logs
kubectl logs -n services -l app.kubernetes.io/name=trades -f
# Access Kafka UI
kubectl port-forward -n kafka svc/kafka-ui 8080:80
# Then open http://localhost:8080
# Access Grafana
kubectl port-forward -n grafana svc/grafana 3000:80
# Then open http://localhost:3000 (admin/admin)
# Access RisingWave SQL interface
kubectl port-forward -n risingwave svc/risingwave-frontend 4567:4567
# Connect with: psql -h localhost -p 4567 -d dev -U root
# Access MLflow
kubectl port-forward -n mlflow svc/mlflow-tracking 5000:5000
# Then open http://localhost:5000The project uses Structurizr for architecture documentation following the C4 model:
- System Context: High-level system overview
- Container Diagram: Microservices and infrastructure
- Component Diagrams: Internal service components
- Dynamic Views: Data flow and interaction diagrams
Development:
# Build and deploy Structurizr
make c4model
# Port-forward to access
kubectl port-forward -n structurizr svc/structurizr-lite-svc 8089:8080
# Open http://localhost:8089Production:
make prod-deploy service=structurizr
make prod-port-forward-structurizr
# Open http://localhost:8081Located in docs/c4model/adrs/:
- ADR 0001: Microservices Architecture
- ADR 0002: Feature Store Selection (RisingWave)
- ADR 0003: APIs Implementation
- ADR 0004: Model Registry Tracking (MLflow)
- ADR 0005: Metric Monitoring & Visualization
- ADR 0006: Real-Time Data Processing
Pre-configured dashboards:
- Crypto Currency Price: Real-time candlestick charts
- Technical Indicators: SMA, EMA, RSI, MACD, OBV visualizations
Services expose health endpoints where applicable:
/health- Liveness probe/ready- Readiness probe
- Services use
logurufor structured logging - Logs are available via
kubectl logs - Production: Consider integrating with Loki
- ✅ Data ingestion (Trades, Candles, Technical Indicators)
- ✅ Feature store (RisingWave)
- ✅ Model training pipeline (MLflow)
- ✅ Monitoring dashboards (Grafana)
- 🔄 Real-time inference service
- 🔄 Predictions API
- 🔄 Model error monitoring
- 🔄 Prediction storage and serving
1. Docker not running
docker run hello-world
# If error: Start Docker Desktop2. Kind cluster not accessible
kubectl get nodes
# If error: Recreate cluster
cd deployments/dev/kind
./create_cluster.sh3. Service not starting
# Check pod status
kubectl get pods -n services
# View logs
kubectl logs -n services <pod-name>
# Check events
kubectl describe pod -n services <pod-name>4. Kafka connection issues
# Check Kafka cluster
kubectl get kafka -n kafka
# Check Kafka pods
kubectl get pods -n kafka5. RisingWave connection issues
# Check RisingWave pods
kubectl get pods -n risingwave
# Check RisingWave logs
kubectl logs -n risingwave deployment/risingwave-frontend- Development Environment Setup
- Local Kubernetes Cluster Setup
- Production Deployment Guide
- Trades Service README
- Predictor Service README
- Create a feature branch
- Make your changes
- Run linting:
make lint - Test your changes
- Submit a pull request
[Add your license here]
- Pau and Marius - Instructors of the Real-World-ML course
- Real-World-ML - For the excellent course material and inspiration
- Open Source Community - For the amazing tools and libraries