Skip to content

mdshihabullah/crypto-price-prediction-pipeline

Repository files navigation

Real-Time Crypto Price Predictor

A production-ready, real-time machine learning system for cryptocurrency price prediction built with microservices architecture, event-driven design, and ML Ops best practices.

Note: This is a learning-by-doing repository highly inspired from Real-World-ML's real-time-ml-system-cohort-4. Huge credit goes to Pau and Marius, the instructors of the course!

📋 Table of Contents

🎯 System Overview

The Real-Time Crypto Price Predictor is a sophisticated platform that:

  1. Ingests real-time trade data from cryptocurrency exchanges (Kraken API)
  2. Processes raw trades into OHLC candles and technical indicators
  3. Extracts market sentiment from financial news (planned)
  4. Trains machine learning models using historical features
  5. Generates real-time price predictions (Planned)
  6. Monitors prediction accuracy and model performance (Planned)
  7. Serves predictions via REST API (Planned)

Key Features

  • Real-time Processing: Stream processing with Kafka and Quixstreams
  • 🤖 ML Pipeline: Automated model training, evaluation, and deployment
  • 📊 Feature Store: RisingWave for real-time feature serving
  • 🔄 Event-Driven: Microservices communicate via Kafka events
  • 📈 Observability: Comprehensive monitoring with Grafana dashboards
  • 🚀 Scalable: Kubernetes-based deployment for horizontal scaling

🏗️ Architecture

System Architecture

flowchart TD
    subgraph External["External Data Sources"]
        Kraken[("Trades API\n(Kraken)")]
        News[("News API\n(Planned)")]
    end

    subgraph System["Crypto Price Prediction System"]
        subgraph Ingestion["Data Ingestion & Processing"]
            Trades["Trades Service"]
            Candles["Candles Service"]
            TI["Technical Indicators\nService"]
        end

        subgraph Storage["Feature Store & Storage"]
            Kafka{{"Kafka\n(Message Broker)"}}
            RW[("RisingWave\n(Feature Store)")]
        end

        subgraph ML["Machine Learning"]
            Trainer["Model Trainer\n(CronJob)"]
            Registry[("MLflow\n(Model Registry)")]
        end

        subgraph Future["Planned / Future Work"]
            Predictor["Price Predictor\n(Inference)"]
            ES[("Elasticsearch\n(Predictions Store)")]
            API["Predictions API"]
            Monitor["Model Error\nMonitor"]
        end
    end

    subgraph Users["Users"]
        Trader[("Trader")]
    end

    %% Data Flow
    Kraken -->|WebSocket/REST| Trades
    Trades -->|Raw Trades| Kafka
    Kafka -->|Trades| Candles
    Candles -->|OHLC Candles| Kafka
    Kafka -->|Candles| TI
    TI -->|Indicators| Kafka
    Kafka -->|Indicators| RW
    
    %% ML Flow
    RW -->|Historical Features| Trainer
    Trainer -->|Register Model| Registry
    
    %% Planned Flow (Dotted lines)
    RW -.->|Real-time Features| Predictor
    Registry -.->|Load Model| Predictor
    Predictor -.->|Predictions| Kafka
    Kafka -.->|Predictions| ES
    ES -.-> API
    ES -.-> Monitor
    
    API -.-> Trader
    
    %% Styling
    classDef service fill:#f9f,stroke:#333,stroke-width:2px;
    classDef infra fill:#dfd,stroke:#333,stroke-width:2px;
    classDef external fill:#ddf,stroke:#333,stroke-width:2px;
    classDef user fill:#ffd,stroke:#333,stroke-width:2px;
    
    class Trades,Candles,TI,Trainer,Predictor,API,Monitor service;
    class Kafka,RW,ES,Registry,Grafana infra;
    class Kraken,News external;
    class Trader user;
Loading

🔧 Core Components

1. Trades Service (services/trades/)

  • Purpose: Ingests real-time cryptocurrency trade data
  • Features:
    • WebSocket streaming for live trades
    • REST API backfill for historical data
    • Progressive streaming to Kafka
    • Health checks and auto-recovery
  • Technology: Python, Quixstreams, Kraken API

2. Candles Service (services/candles/)

  • Purpose: Converts raw trades into OHLC (Open-High-Low-Close) candles
  • Features:
    • Tumbling window aggregation
    • Multiple timeframe support (configurable)
    • Handles late-arriving messages
  • Technology: Python, Quixstreams

3. Technical Indicators Service (services/technical_indicators/)

  • Purpose: Calculates technical analysis indicators
  • Features:
    • Moving averages (SMA, EMA)
    • RSI (Relative Strength Index)
    • MACD (Moving Average Convergence Divergence)
    • OBV (On-Balance Volume)
    • Real-time updates as candles arrive
  • Technology: Python, Quixstreams, TA-Lib

4. Predictor Service (services/predictor/)

  • Purpose: Trains and evaluates price prediction models (Training Only)
  • Features:
    • Data validation with Great Expectations
    • Data profiling with ydata-profiling
    • Multiple model algorithms (LazyPredict)
    • Hyperparameter tuning (Optuna)
    • Model tracking (MLflow)
    • Drift detection (Evidently)
  • Technology: Python, Scikit-learn, MLflow, Optuna

5. Deduplication Service (services/deduplication/)

  • Purpose: Removes duplicate messages from Kafka streams
  • Features:
    • In-memory caching for deduplication
    • Supports trades, candles, and technical indicators

Infrastructure Components

  • Kafka: Message broker (Strimzi operator)
  • RisingWave: Feature store and stream processing
  • MLflow: Model registry and experiment tracking
  • Grafana: Monitoring and visualization dashboards
  • Structurizr: Architecture documentation (C4 model)

📊 Data Flow

Complete Data Pipeline

flowchart LR
    subgraph Ingestion["1. Data Ingestion"]
        API[Trades API] --> S1(Trades Service)
        S1 --> K1{Kafka: trades}
    end

    subgraph Transformation["2. Transformation"]
        K1 --> S2(Candles Service)
        S2 --> K2{Kafka: candles}
        K2 --> S3(Technical Indicators)
        S3 --> K3{Kafka: indicators}
    end

    subgraph FeatureStore["3. Feature Store"]
        K3 --> RW[(RisingWave)]
    end

    subgraph Training["4. ML Training"]
        RW --> Trainer(Model Trainer)
        Trainer --> MLflow[(MLflow Registry)]
    end
    
    subgraph Future["5. Prediction & Serving (Planned)"]
        Predictor(Price Predictor) .-> ES[(Elasticsearch)] .-> API(Predictions API)
    end
Loading

Technical Indicators Pipeline

flowchart LR
    Trades[Raw Trades] -->|1m window| Candles[OHLC Candles]
    Candles -->|Calculation| Indicators[Technical Indicators\nSMA, RSI, MACD]
    Indicators -->|Stream| Store[(RisingWave\nFeature Store)]
Loading

Model Training Pipeline

flowchart LR
    Store[(RisingWave)] -->|Historical Features| Validation[Data Validation\nGreat Expectations]
    Validation --> Training[Model Training\nLazyPredict]
    Training --> Eval[Model Evaluation]
    Eval --> Registry[(MLflow Registry\nVersioned Models)]
Loading

Inference Pipeline (Planned)

flowchart LR
    Store[(RisingWave)] -.->|Latest Features| Predictor[Price Predictor]
    Registry[(MLflow)] -.->|Load Model| Predictor
    Predictor -.->|Generate| Prediction[Price Prediction]
    Prediction -.->|Store| ES[(Elasticsearch)]
    ES -.->|Serve| API[Predictions API]
Loading

🛠️ Technology Stack

Core Technologies

Category Technology Purpose
Orchestration Kubernetes Container orchestration
Messaging Apache Kafka (Strimzi) Event streaming
Feature Store RisingWave Real-time feature storage
Model Registry MLflow Model versioning & tracking
Visualization Grafana Dashboards & monitoring
Language Python 3.12 Service implementation
Stream Processing Quixstreams Kafka stream processing
ML Framework Scikit-learn, PyTorch Model training
Architecture Docs Structurizr C4 model diagrams

Development Tools

  • uv: Python package manager
  • Docker: Containerization
  • kind: Local Kubernetes cluster
  • kubectl: Kubernetes CLI
  • k9s: Kubernetes TUI
  • helm: Kubernetes package manager
  • make: Build automation

📦 Prerequisites

Required Tools

  1. Docker Desktop (Install)
  2. Python 3.12+ (via uv)
  3. uv - Python package manager (Install)
  4. make - Build automation (Install)
  5. kubectl - Kubernetes CLI (Install)
  6. kind - Local Kubernetes (Install)
  7. helm - Kubernetes package manager (Install)
  8. k9s - Kubernetes TUI (optional, Install)

Quick Installation Check

docker --version          # Docker 20.10+
python3 --version         # Python 3.12+
uv --version              # uv 0.1+
make --version            # GNU Make
kubectl version --client  # kubectl 1.28+
kind --version            # kind 0.20+
helm version              # Helm 3.12+

Note: For Windows users, we recommend using the devcontainer option (see Development Setup).

🚀 Quick Start

1. Clone the Repository

git clone https://github.com/mdshihabullah/real-time-crypto-price-predictor.git
cd real-time-crypto-price-predictor

2. Set Up Development Environment

See Development Setup section for detailed instructions.

3. Create Local Kubernetes Cluster

cd deployments/dev/kind
chmod +x create_cluster.sh
./create_cluster.sh

4. Deploy Infrastructure

# Install Kafka, RisingWave, MLflow, Grafana
cd deployments/dev/kind
./install_kafka.sh
./install_risingwaves.sh
./install_mlflow.sh
./install_grafana.sh

5. Run a Service Locally

# Run trades service locally (non-dockerized)
make dev service=trades

# Or build and deploy to Kind cluster
make deploy-for-dev service=trades

💻 Development Setup

Option 1: Manual Installation

Follow the Development Environment Guide to install all required tools.

Option 2: DevContainer (Recommended for Windows)

  1. Install Docker Desktop
  2. Install VS Code or Cursor with Dev Containers extension
  3. Open the project in VS Code/Cursor
  4. Press Ctrl+Shift+P (or Cmd+Shift+P on Mac)
  5. Select "Dev Containers: Reopen in Container"

The devcontainer includes all tools pre-installed.

Verify Setup

# Check Docker
docker run hello-world

# Check Kubernetes cluster (after creating)
kubectl get nodes

# Check k9s (if installed)
k9s

🏃 Running Services Locally

Running Services as Standalone Python Apps

# Run trades service
make dev service=trades

# Run candles service
make dev service=candles

# Run technical indicators service
make dev service=technical_indicators

# Run predictor service
make dev service=predictor

Note: Ensure Kafka and other infrastructure services are running first.

Building and Deploying to Kind Cluster

# Build Docker image
make build-for-dev service=trades

# Deploy to Kind cluster
make deploy-for-dev service=trades

# Check deployment status
kubectl get pods -n services

Viewing Service Logs

# Local service logs
# (logs appear in terminal)

# Kubernetes pod logs
kubectl logs -n services -l app.kubernetes.io/name=trades -f

🚢 Deployment

Development Deployment (Kind)

# 1. Create Kind cluster
cd deployments/dev/kind
./create_cluster.sh

# 2. Install infrastructure
./install_kafka.sh
./install_risingwaves.sh
./install_mlflow.sh
./install_grafana.sh

# 3. Deploy services
make deploy-for-dev service=trades
make deploy-for-dev service=candles
make deploy-for-dev service=technical_indicators
make deploy-for-dev service=predictor  # Model training CronJob

Production Deployment (DigitalOcean Kubernetes)

See the comprehensive Production Deployment Guide.

Quick Production Commands:

# Check prerequisites
make prod-check-cluster

# Deploy infrastructure
make prod-deploy infra=true

# Deploy all services
make prod-deploy services=true

# Deploy specific service
make prod-deploy service=trades
make prod-deploy service=candles
make prod-deploy service=technical-indicators
make prod-deploy service=predictor-training  # Model training CronJob
make prod-deploy service=structurizr

# Get service endpoints
make prod-get-endpoints

# View status
make prod-status

Accessing Services

Development (Kind):

# Port-forward services
kubectl port-forward -n kafka svc/kafka-ui 8080:80
kubectl port-forward -n grafana svc/grafana 3000:80
kubectl port-forward -n mlflow svc/mlflow-tracking 5000:5000
kubectl port-forward -n risingwave svc/risingwave-frontend 4567:4567  # SQL interface

Production:

# Get LoadBalancer IPs
make prod-get-endpoints

# Or use port-forwarding
make prod-port-forward-grafana
make prod-port-forward-kafka
make prod-port-forward-mlflow

👨‍💻 Working on the Project

Project Structure

real-time-crypto-price-predictor/
├── services/              # Microservices
│   ├── trades/           # Trade data ingestion
│   ├── candles/          # OHLC candle generation
│   ├── technical_indicators/  # Technical analysis
│   ├── predictor/        # Model training
│   └── deduplication/    # Message deduplication
├── deployments/          # Kubernetes manifests
│   ├── dev/             # Development (Kind)
│   └── prod/            # Production (DigitalOcean)
├── docker/              # Dockerfiles
├── docs/               # Documentation
│   └── c4model/        # Architecture documentation
├── scripts/             # Utility scripts
├── Makefile            # Build automation
└── pyproject.toml      # Python workspace config

Adding a New Service

  1. Create service directory:

    mkdir -p services/my-service/src/my_service
  2. Create pyproject.toml:

    [project]
    name = "my-service"
    version = "0.1.0"
    dependencies = [...]
  3. Add to workspace (pyproject.toml root):

    [tool.uv.workspace]
    members = [..., "services/my-service"]
  4. Create Dockerfile (docker/my-service.Dockerfile)

  5. Create Kubernetes manifests (deployments/dev/my-service/)

  6. Update Makefile (if needed)

Available Services for Deployment

Current Services:

  • trades - Trade data ingestion (WebSocket + Backfill)
  • candles - OHLC candle generation
  • technical_indicators - Technical analysis calculations
  • predictor (dev) / predictor-training (prod) - Model training CronJob
  • deduplication - Message deduplication (prod only)
  • structurizr - Architecture documentation

Code Style

# Run linting
make lint

# Auto-fix issues
ruff check . --fix

Testing

# Run service tests (when available)
cd services/trades
pytest

# Test Kubernetes deployment
kubectl get pods -n services
kubectl logs -n services <pod-name>

Common Development Tasks

# View all Makefile commands
make help

# Run service locally
make dev service=trades

# Build Docker image
make build-for-dev service=trades

# Deploy to Kind
make deploy-for-dev service=trades

# View logs
kubectl logs -n services -l app.kubernetes.io/name=trades -f

# Access Kafka UI
kubectl port-forward -n kafka svc/kafka-ui 8080:80
# Then open http://localhost:8080

# Access Grafana
kubectl port-forward -n grafana svc/grafana 3000:80
# Then open http://localhost:3000 (admin/admin)

# Access RisingWave SQL interface
kubectl port-forward -n risingwave svc/risingwave-frontend 4567:4567
# Connect with: psql -h localhost -p 4567 -d dev -U root

# Access MLflow
kubectl port-forward -n mlflow svc/mlflow-tracking 5000:5000
# Then open http://localhost:5000

📚 Architecture Documentation

C4 Model Diagrams

The project uses Structurizr for architecture documentation following the C4 model:

  • System Context: High-level system overview
  • Container Diagram: Microservices and infrastructure
  • Component Diagrams: Internal service components
  • Dynamic Views: Data flow and interaction diagrams

Viewing Architecture Diagrams

Development:

# Build and deploy Structurizr
make c4model

# Port-forward to access
kubectl port-forward -n structurizr svc/structurizr-lite-svc 8089:8080
# Open http://localhost:8089

Production:

make prod-deploy service=structurizr
make prod-port-forward-structurizr
# Open http://localhost:8081

Architecture Decision Records (ADRs)

Located in docs/c4model/adrs/:

Documentation Files

🔍 Monitoring & Observability

Grafana Dashboards

Pre-configured dashboards:

  • Crypto Currency Price: Real-time candlestick charts
  • Technical Indicators: SMA, EMA, RSI, MACD, OBV visualizations

Service Health Checks

Services expose health endpoints where applicable:

  • /health - Liveness probe
  • /ready - Readiness probe

Logging

  • Services use loguru for structured logging
  • Logs are available via kubectl logs
  • Production: Consider integrating with Loki

Current System Status

  • ✅ Data ingestion (Trades, Candles, Technical Indicators)
  • ✅ Feature store (RisingWave)
  • ✅ Model training pipeline (MLflow)
  • ✅ Monitoring dashboards (Grafana)

Planned for Future

  • 🔄 Real-time inference service
  • 🔄 Predictions API
  • 🔄 Model error monitoring
  • 🔄 Prediction storage and serving

🐛 Troubleshooting

Common Issues

1. Docker not running

docker run hello-world
# If error: Start Docker Desktop

2. Kind cluster not accessible

kubectl get nodes
# If error: Recreate cluster
cd deployments/dev/kind
./create_cluster.sh

3. Service not starting

# Check pod status
kubectl get pods -n services

# View logs
kubectl logs -n services <pod-name>

# Check events
kubectl describe pod -n services <pod-name>

4. Kafka connection issues

# Check Kafka cluster
kubectl get kafka -n kafka

# Check Kafka pods
kubectl get pods -n kafka

5. RisingWave connection issues

# Check RisingWave pods
kubectl get pods -n risingwave

# Check RisingWave logs
kubectl logs -n risingwave deployment/risingwave-frontend

📖 Additional Resources

🤝 Contributing

  1. Create a feature branch
  2. Make your changes
  3. Run linting: make lint
  4. Test your changes
  5. Submit a pull request

📄 License

[Add your license here]

🙏 Acknowledgments

  • Pau and Marius - Instructors of the Real-World-ML course
  • Real-World-ML - For the excellent course material and inspiration
  • Open Source Community - For the amazing tools and libraries

About

A production-ready, real-time machine learning system for cryptocurrency price prediction. Based on lambda architecture, it ingests market data, performs feature engineering for technical indicators, trains predictive models following MLOps best practices

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors