A production-architecture data engineering pipeline that ingests live NS train departure events and RDW vehicle registration data into a cloud data warehouse — enabling real-time delay analytics and batch vehicle insights through a live Metabase dashboard.
This project implements the Lambda Architecture pattern: a speed layer for real-time stream processing and a batch layer for large-scale historical processing, unified in Snowflake.
Real insights from the dashboard:
- Amsterdam Centraal (ASD) has the highest average delay (~312 seconds) across all monitored stations
- Intercity (IC) and Sprinter (SPR) account for over 90% of all departures
- On-time rate sits at ~83%, measured as trains departing within 3 minutes of schedule
- Volkswagen is the most registered car brand in the Netherlands (1,139 vehicles in sample)
The platform is built on the Lambda Architecture pattern — two parallel processing lanes that converge in Snowflake:
-
Speed layer — NS API events flow into Apache Kafka every 60 seconds, consumed by PySpark Structured Streaming in 30-second micro-batches, landing in Snowflake within ~90 seconds of occurring at the station.
-
Batch layer — RDW vehicle data is fetched daily at 2am, transformed by PySpark, and loaded into Snowflake as a full snapshot refresh.
Both layers feed the same warehouse, enabling queries that join real-time train delays with historical vehicle registration data.
SPEED LAYER (Real-time) BATCH LAYER (Scheduled)
───────────────────── ───────────────────────
NS API (live delays) RDW Open Data API
│ │
▼ ▼
Apache Kafka Python batch fetch
(event buffer) (Airflow 2am daily)
│ │
▼ ▼
PySpark Structured PySpark Batch Job
Streaming (30s batches) (transform + dedupe)
│ │
└──────────────┬─────────────────┘
▼
Snowflake DWH
(raw + processed schemas)
│
┌───────────┼───────────┐
▼ ▼ ▼
Airflow Data Quality Metabase
Orchestration (6 checks) Dashboard
The Lambda Architecture solves a fundamental trade-off in data engineering:
| Concern | Speed Layer | Batch Layer |
|---|---|---|
| Latency | Seconds (30s micro-batches) | Hours (2am daily) |
| Throughput | ~150 events/minute | 10,000+ records/run |
| Fault tolerance | Kafka checkpointing | Full re-run |
| Use case | Live delay alerts | Historical trends |
By running both in parallel and unifying in Snowflake, we get low-latency access to current data without sacrificing the accuracy and completeness of batch processing.
| Layer | Tool | Why This Tool |
|---|---|---|
| Message broker | Apache Kafka | Decouples producers from consumers; handles backpressure |
| Stream processing | PySpark Structured Streaming | Industry standard; fault-tolerant with checkpointing |
| Batch processing | PySpark + Python | Scalable ETL; same codebase as streaming |
| Orchestration | Apache Airflow | DAG-based scheduling with retry logic and observability |
| Cloud warehouse | Snowflake | Separation of storage and compute; serverless scaling |
| Data quality | Custom quality checks | 6 automated checks on every pipeline run |
| Visualisation | Metabase | Self-hosted BI; connects directly to Snowflake |
| Infrastructure | Docker Compose | Reproducible local environment; prod-ready services |
- Endpoint:
GET /reisinformatie-api/api/v2/departures?station={code} - Stations monitored: ASD (Amsterdam), RTD (Rotterdam), UT (Utrecht), EHV (Eindhoven)
- Poll frequency: Every 60 seconds
- Fields captured: train category, direction, planned/actual departure, delay in seconds, departure status, cancellation flag
- Volume: ~150 events per poll cycle across 4 stations
- Endpoint:
GET https://opendata.rdw.nl/resource/m9d7-ebf2.json - No authentication required (fully open government dataset)
- Fields captured: plate number, vehicle type, brand, model, colour, doors, seats, catalogue price, registration date, APK expiry
- Volume: 10,000 records per batch run
Append-only table of all ingested train departure events.
| Column | Type | Description |
|---|---|---|
| station | VARCHAR | Station code (ASD, RTD, UT, EHV) |
| train_category | VARCHAR | SPR, IC, ICE, etc. |
| direction | VARCHAR | Final destination |
| planned_departure | TIMESTAMP_TZ | Scheduled departure time |
| actual_departure | TIMESTAMP_TZ | Actual departure time |
| delay_seconds | INTEGER | Calculated delay (0 if on time) |
| status | VARCHAR | ON_STATION, DEPARTED, CANCELLED, UNKNOWN |
| cancelled | BOOLEAN | Whether the service was cancelled |
| ingested_at | TIMESTAMP_TZ | When the event was captured |
| load_timestamp | TIMESTAMP_TZ | When the row landed in Snowflake |
Hourly aggregation built by the Airflow DAG.
| Column | Type | Description |
|---|---|---|
| station | VARCHAR | Station code |
| date | DATE | Date of departures |
| train_category | VARCHAR | Train type |
| avg_delay_seconds | FLOAT | Average delay across all departures |
| max_delay_seconds | INTEGER | Worst delay of the day |
| total_departures | INTEGER | Total departure count |
| on_time_pct | FLOAT | % of trains departing within 3 minutes |
Daily snapshot of vehicle registrations.
| Column | Type | Description |
|---|---|---|
| kenteken | VARCHAR | Dutch licence plate (primary key) |
| voertuigsoort | VARCHAR | Vehicle type (Personenauto, Bedrijfsauto, etc.) |
| merk | VARCHAR | Brand (VOLKSWAGEN, TOYOTA, etc.) |
| handelsbenaming | VARCHAR | Model name |
| eerste_kleur | VARCHAR | Primary colour |
| catalogusprijs | INTEGER | Original catalogue price (EUR) |
| datum_eerste_toelating | DATE | First registration date |
| vervaldatum_apk | DATE | MOT expiry date |
- Kafka Producer polls NS API every 60 seconds across 4 stations
- Each departure event is serialised to JSON and produced to the
ns-departuresKafka topic - PySpark Structured Streaming reads from Kafka in 30-second micro-batches
- Each batch is parsed, typed, and written to
raw.ns_departuresin Snowflake - Airflow
ns_streaming_monitorDAG runs hourly:- Checks data freshness (fails if no rows in last hour)
- Triggers aggregation into
processed.delay_summary - Runs data quality checks
- Airflow
rdw_batch_pipelineDAG triggers at 2am daily - Fetches 10,000 vehicle records from RDW Open Data API
- Cleans data: deduplicates by plate number, parses dates, normalises types
- Overwrites
raw.rdw_vehiclesin Snowflake with the fresh snapshot - Runs data quality checks
Every pipeline run validates the following checks. Any failure raises an exception and marks the Airflow task as failed:
| Check | Description |
|---|---|
| No null stations | All rows must have a valid station code |
| Valid station codes | Station must be one of ASD, RTD, UT, EHV |
| No negative delays | Delay in seconds must be ≥ 0 |
| Sane delay range | Delay must be < 7,200 seconds (2 hours) |
| No null departures | planned_departure must not be null |
| Data freshness | Latest row must be within the last 2 hours |
1. Kafka as decoupling layer Rather than writing directly from the producer to Snowflake, Kafka acts as a durable buffer. This means the NS API polling rate is completely decoupled from the Snowflake write speed — each can scale independently. It also provides replay capability: if the Snowflake write fails, Kafka retains the events.
2. PySpark Structured Streaming over a simple Python consumer A Python Kafka consumer was first validated to prove end-to-end connectivity, then replaced with PySpark Structured Streaming. PySpark provides fault-tolerant checkpointing, exactly-once semantics, and horizontal scaling — critical for production workloads.
3. Snowflake Spark connector version pinning The Snowflake Spark connector 2.14.0 targets Spark 3.4, used here with Spark 3.5. Query pushdown is disabled as a result, but append writes function correctly. In production, Spark would be pinned to 3.4 for full connector compatibility.
4. Airflow SequentialExecutor for local development Production Airflow would run with PostgreSQL metadata DB and LocalExecutor. For local development, SQLite + SequentialExecutor reduces infrastructure complexity while preserving the full DAG authoring and scheduling experience.
5. Absolute paths in Airflow tasks
Airflow task runners change working directory unpredictably. All file reads and writes use absolute paths (/tmp/rdw_raw.json) to ensure consistent behaviour regardless of where Airflow executes the task.
After running the full pipeline:
| Metric | Value |
|---|---|
| NS departure events in Snowflake | 17,000+ rows |
| RDW vehicles loaded | 10,000 records |
| Stations monitored | 4 (ASD, RTD, UT, EHV) |
| Airflow DAGs | 2 (hourly + daily) |
| Data quality checks | 6 per run |
| Dashboard charts | 4 live charts |
| Pipeline uptime | Continuous (producer + streaming job) |
transport-analytics/
├── dags/ # Airflow DAG definitions
│ ├── ns_pipeline_dag.py # Hourly streaming monitor
│ └── rdw_pipeline_dag.py # Daily batch pipeline
├── ingestion/
│ ├── kafka_producer.py # NS API → Kafka
│ ├── kafka_consumer.py # Legacy Python consumer (replaced by Spark)
│ └── rdw_batch.py # RDW API → local file
├── processing/
│ ├── streaming_job.py # PySpark Structured Streaming job
│ ├── rdw_loader.py # Snowflake batch loader
│ └── batch_job.py # PySpark batch skeleton
├── quality/
│ └── run_checks.py # Data quality validation
├── warehouse/
│ └── ddl/
│ └── setup.sql # Snowflake table definitions
├── dashboard/ # Metabase screenshots
├── docker-compose.yml # Full local infrastructure
├── requirements.txt
├── .env.example # Credential template
├── README.md # This file
└── DEVELOPER.md # Setup and run guide
See DEVELOPER.md for step-by-step setup, service ports, and troubleshooting.
