A demo project showing Tansu — a Kafka-compatible streaming broker — integrated with RustFS — S3-compatible object storage. Taxi trip records are ingested in Protobuf, JSON, and Avro, automatically landed as Parquet in a data lake, and queried in-place with DuckDB.
flowchart LR
subgraph host["Host"]
trips["data/trips.json"]
schema["schema/\ntaxi.proto\ntaxi-json.json\ntaxi-avro.avsc"]
duckdb["DuckDB\n(query client)"]
end
subgraph docker["Docker Compose"]
broker["Tansu Broker\n:9092\n(Kafka-compatible)\nparquet mode"]
storage["RustFS\n:9000\n(S3-compatible)"]
end
trips -->|"tansu cat produce"| broker
schema -->|"file://schema\nschema registry"| broker
broker -->|"Protobuf / JSON / Avro\n→ Parquet"| storage
storage -->|"s3://lake/taxi*/*/*.parquet"| duckdb
Topics and serialization formats:
| Topic | Format | Schema file |
|---|---|---|
taxi |
Protobuf | schema/taxi.proto |
taxi-json |
JSON | schema/taxi-json.json |
taxi-avro |
Avro | schema/taxi-avro.avsc |
All topics write Parquet to s3://lake/<topic>/*/*.parquet on RustFS.
just up # start services, create buckets and topics
just taxi-proto-cat # produce taxi data as Protobuf
just taxi-select # query the Parquet lake with DuckDBjust up # Start everything: Docker services, buckets, Kafka topics
just ps # Show service status
just topic-list # List Kafka topics
# Produce data
just taxi-proto-cat # Produce taxi data as Protobuf
just taxi-json-cat # Produce taxi data as JSON
just taxi-avro-cat # Produce taxi data as Avro
# Query data lake (DuckDB)
just taxi-select # All taxi Protobuf records
just taxi-json-select # JSON topic records
just taxi-avro-select # Avro topic recordsjust duckdb "select meta.timestamp, value.* \
from read_parquet('s3://lake/taxi/*/*.parquet') \
where value.trip_id = 1000372"Credentials are stored in .env and loaded automatically by just:
RUSTFS_ACCESS_KEY=rustfsadmin
RUSTFS_SECRET_KEY=rustfsadminThe broker picks these up as AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY. DuckDB reads them from duckdb-init.sql, which also configures the local S3 endpoint (localhost:9000).
schema/taxi.proto defines the canonical record shape used across all three formats:
vendor_id, trip_id, trip_distance, fare_amount, store_and_fwd
The broker's file-based schema registry (SCHEMA_REGISTRY: file://schema) resolves schemas by topic name.
