Skip to content

ravikant-pal/transport-analytics

Repository files navigation

🚆 Dutch Public Transport Analytics Platform

A production-architecture data engineering pipeline that ingests live NS train departure events and RDW vehicle registration data into a cloud data warehouse — enabling real-time delay analytics and batch vehicle insights through a live Metabase dashboard.

This project implements the Lambda Architecture pattern: a speed layer for real-time stream processing and a batch layer for large-scale historical processing, unified in Snowflake.


Dashboard

Dutch Transport Analytics Dashboard

Real insights from the dashboard:

  • Amsterdam Centraal (ASD) has the highest average delay (~312 seconds) across all monitored stations
  • Intercity (IC) and Sprinter (SPR) account for over 90% of all departures
  • On-time rate sits at ~83%, measured as trains departing within 3 minutes of schedule
  • Volkswagen is the most registered car brand in the Netherlands (1,139 vehicles in sample)

Architecture

Architecture Diagram

The platform is built on the Lambda Architecture pattern — two parallel processing lanes that converge in Snowflake:

  • Speed layer — NS API events flow into Apache Kafka every 60 seconds, consumed by PySpark Structured Streaming in 30-second micro-batches, landing in Snowflake within ~90 seconds of occurring at the station.

  • Batch layer — RDW vehicle data is fetched daily at 2am, transformed by PySpark, and loaded into Snowflake as a full snapshot refresh.

Both layers feed the same warehouse, enabling queries that join real-time train delays with historical vehicle registration data.

Lambda Architecture Overview

SPEED LAYER (Real-time)          BATCH LAYER (Scheduled)
─────────────────────            ───────────────────────
NS API (live delays)             RDW Open Data API
      │                                │
      ▼                                ▼
Apache Kafka                    Python batch fetch
(event buffer)                  (Airflow 2am daily)
      │                                │
      ▼                                ▼
PySpark Structured              PySpark Batch Job
Streaming (30s batches)         (transform + dedupe)
      │                                │
      └──────────────┬─────────────────┘
                     ▼
               Snowflake DWH
          (raw + processed schemas)
                     │
         ┌───────────┼───────────┐
         ▼           ▼           ▼
      Airflow    Data Quality  Metabase
   Orchestration  (6 checks)  Dashboard

Why Lambda Architecture?

The Lambda Architecture solves a fundamental trade-off in data engineering:

Concern Speed Layer Batch Layer
Latency Seconds (30s micro-batches) Hours (2am daily)
Throughput ~150 events/minute 10,000+ records/run
Fault tolerance Kafka checkpointing Full re-run
Use case Live delay alerts Historical trends

By running both in parallel and unifying in Snowflake, we get low-latency access to current data without sacrificing the accuracy and completeness of batch processing.


Tech Stack

Layer Tool Why This Tool
Message broker Apache Kafka Decouples producers from consumers; handles backpressure
Stream processing PySpark Structured Streaming Industry standard; fault-tolerant with checkpointing
Batch processing PySpark + Python Scalable ETL; same codebase as streaming
Orchestration Apache Airflow DAG-based scheduling with retry logic and observability
Cloud warehouse Snowflake Separation of storage and compute; serverless scaling
Data quality Custom quality checks 6 automated checks on every pipeline run
Visualisation Metabase Self-hosted BI; connects directly to Snowflake
Infrastructure Docker Compose Reproducible local environment; prod-ready services

Data Sources

NS API — Live Train Departures

  • Endpoint: GET /reisinformatie-api/api/v2/departures?station={code}
  • Stations monitored: ASD (Amsterdam), RTD (Rotterdam), UT (Utrecht), EHV (Eindhoven)
  • Poll frequency: Every 60 seconds
  • Fields captured: train category, direction, planned/actual departure, delay in seconds, departure status, cancellation flag
  • Volume: ~150 events per poll cycle across 4 stations

RDW Open Data — Vehicle Registrations

  • Endpoint: GET https://opendata.rdw.nl/resource/m9d7-ebf2.json
  • No authentication required (fully open government dataset)
  • Fields captured: plate number, vehicle type, brand, model, colour, doors, seats, catalogue price, registration date, APK expiry
  • Volume: 10,000 records per batch run

Data Model

raw.ns_departures

Append-only table of all ingested train departure events.

Column Type Description
station VARCHAR Station code (ASD, RTD, UT, EHV)
train_category VARCHAR SPR, IC, ICE, etc.
direction VARCHAR Final destination
planned_departure TIMESTAMP_TZ Scheduled departure time
actual_departure TIMESTAMP_TZ Actual departure time
delay_seconds INTEGER Calculated delay (0 if on time)
status VARCHAR ON_STATION, DEPARTED, CANCELLED, UNKNOWN
cancelled BOOLEAN Whether the service was cancelled
ingested_at TIMESTAMP_TZ When the event was captured
load_timestamp TIMESTAMP_TZ When the row landed in Snowflake

processed.delay_summary

Hourly aggregation built by the Airflow DAG.

Column Type Description
station VARCHAR Station code
date DATE Date of departures
train_category VARCHAR Train type
avg_delay_seconds FLOAT Average delay across all departures
max_delay_seconds INTEGER Worst delay of the day
total_departures INTEGER Total departure count
on_time_pct FLOAT % of trains departing within 3 minutes

raw.rdw_vehicles

Daily snapshot of vehicle registrations.

Column Type Description
kenteken VARCHAR Dutch licence plate (primary key)
voertuigsoort VARCHAR Vehicle type (Personenauto, Bedrijfsauto, etc.)
merk VARCHAR Brand (VOLKSWAGEN, TOYOTA, etc.)
handelsbenaming VARCHAR Model name
eerste_kleur VARCHAR Primary colour
catalogusprijs INTEGER Original catalogue price (EUR)
datum_eerste_toelating DATE First registration date
vervaldatum_apk DATE MOT expiry date

Pipeline Details

Speed Layer — NS Streaming Pipeline

  1. Kafka Producer polls NS API every 60 seconds across 4 stations
  2. Each departure event is serialised to JSON and produced to the ns-departures Kafka topic
  3. PySpark Structured Streaming reads from Kafka in 30-second micro-batches
  4. Each batch is parsed, typed, and written to raw.ns_departures in Snowflake
  5. Airflow ns_streaming_monitor DAG runs hourly:
    • Checks data freshness (fails if no rows in last hour)
    • Triggers aggregation into processed.delay_summary
    • Runs data quality checks

Batch Layer — RDW Pipeline

  1. Airflow rdw_batch_pipeline DAG triggers at 2am daily
  2. Fetches 10,000 vehicle records from RDW Open Data API
  3. Cleans data: deduplicates by plate number, parses dates, normalises types
  4. Overwrites raw.rdw_vehicles in Snowflake with the fresh snapshot
  5. Runs data quality checks

Data Quality

Every pipeline run validates the following checks. Any failure raises an exception and marks the Airflow task as failed:

Check Description
No null stations All rows must have a valid station code
Valid station codes Station must be one of ASD, RTD, UT, EHV
No negative delays Delay in seconds must be ≥ 0
Sane delay range Delay must be < 7,200 seconds (2 hours)
No null departures planned_departure must not be null
Data freshness Latest row must be within the last 2 hours

Key Design Decisions

1. Kafka as decoupling layer Rather than writing directly from the producer to Snowflake, Kafka acts as a durable buffer. This means the NS API polling rate is completely decoupled from the Snowflake write speed — each can scale independently. It also provides replay capability: if the Snowflake write fails, Kafka retains the events.

2. PySpark Structured Streaming over a simple Python consumer A Python Kafka consumer was first validated to prove end-to-end connectivity, then replaced with PySpark Structured Streaming. PySpark provides fault-tolerant checkpointing, exactly-once semantics, and horizontal scaling — critical for production workloads.

3. Snowflake Spark connector version pinning The Snowflake Spark connector 2.14.0 targets Spark 3.4, used here with Spark 3.5. Query pushdown is disabled as a result, but append writes function correctly. In production, Spark would be pinned to 3.4 for full connector compatibility.

4. Airflow SequentialExecutor for local development Production Airflow would run with PostgreSQL metadata DB and LocalExecutor. For local development, SQLite + SequentialExecutor reduces infrastructure complexity while preserving the full DAG authoring and scheduling experience.

5. Absolute paths in Airflow tasks Airflow task runners change working directory unpredictably. All file reads and writes use absolute paths (/tmp/rdw_raw.json) to ensure consistent behaviour regardless of where Airflow executes the task.


Results

After running the full pipeline:

Metric Value
NS departure events in Snowflake 17,000+ rows
RDW vehicles loaded 10,000 records
Stations monitored 4 (ASD, RTD, UT, EHV)
Airflow DAGs 2 (hourly + daily)
Data quality checks 6 per run
Dashboard charts 4 live charts
Pipeline uptime Continuous (producer + streaming job)

Repository Structure

transport-analytics/
├── dags/                        # Airflow DAG definitions
│   ├── ns_pipeline_dag.py       # Hourly streaming monitor
│   └── rdw_pipeline_dag.py      # Daily batch pipeline
├── ingestion/
│   ├── kafka_producer.py        # NS API → Kafka
│   ├── kafka_consumer.py        # Legacy Python consumer (replaced by Spark)
│   └── rdw_batch.py             # RDW API → local file
├── processing/
│   ├── streaming_job.py         # PySpark Structured Streaming job
│   ├── rdw_loader.py            # Snowflake batch loader
│   └── batch_job.py             # PySpark batch skeleton
├── quality/
│   └── run_checks.py            # Data quality validation
├── warehouse/
│   └── ddl/
│       └── setup.sql            # Snowflake table definitions
├── dashboard/                   # Metabase screenshots
├── docker-compose.yml           # Full local infrastructure
├── requirements.txt
├── .env.example                 # Credential template
├── README.md                    # This file
└── DEVELOPER.md                 # Setup and run guide

Local Setup

See DEVELOPER.md for step-by-step setup, service ports, and troubleshooting.

Contributors

Languages