feat: Large File Support — proxy, operator, SDKs, and demos#128
feat: Large File Support — proxy, operator, SDKs, and demos#128kamir wants to merge 34 commits intoKafScale:mainfrom
Conversation
…ations Multi-language LFS client SDKs and end-to-end demo applications for producing and consuming large files through the KafScale LFS proxy. SDKs: - Java SDK with retry/backoff, configurable timeouts, Maven build - Python SDK with retry/backoff, envelope codec, pip-installable - JavaScript SDK (Node.js) with streaming upload support - Browser SDK (JS) for SPA-based LFS file management Demos: - E60: Medical imaging LFS demo (README/design) - E61: Video streaming LFS demo (README/design) - E62: Industrial IoT LFS demo (README/design) - E70: Java SDK producer demo with video upload - E71: Python SDK demo with video LFS upload - E72: Browser-native LFS SDK single-page app Infrastructure: - Docker Compose stack for local LFS development - Synology S3-compatible storage deployment - Browser demo UI (HTML/CSS/JS) - LFS task tracking documents > This is PR 2 of 2. Depends on PR KafScale#128 (core LFS proxy). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| } | ||
| } | ||
|
|
||
| result, err := p.doUpload(ctx, topic, key, body) |
There was a problem hiding this comment.
The retry logic reuses the same io.Reader across attempts. Once the first attempt reads from it, the data is gone. Retries upload an empty or truncated payload to S3, a pointer record gets written to Kafka, and no error is returned. This silently corrupts the data.
There was a problem hiding this comment.
Bug: Produce and ProducePartitioned passed the same io.Reader to
every retry attempt. After the first attempt consumed the reader,
retries uploaded empty/truncated data to S3, and the pointer
record was written to Kafka with no error — silent data
corruption.
Fix: Added replayable() and rewind() helpers:
- replayable(r) — if the reader already implements io.Seeker
(e.g. *os.File, *bytes.Reader), returns it as-is. Otherwise
buffers the contents into a *bytes.Reader so it can be replayed. - rewind(r) — seeks back to offset 0 before each retry attempt.
Both Produce (line 137) and ProducePartitioned (line 199) now
call replayable before the retry loop and rewind at the top of
each retry iteration. This ensures every attempt uploads the full
payload.
|
one foundational problem I see: the lfs-proxy reimplements the whole logic in the current proxy: Reimplemented from the existing proxy:
Dropped from the existing proxy:
The consequence is that we have to maintain 2 parallel proxies doing largely the same thing. The lfs-proxy also reintroduces the following bugs: |
I think the LFS proxy could be a feature inside the existing proxy but I'll let you and @novatechflow decide that. |
|
I can also recommend a review agent like this one. I am using opencode but its probably easily adaptable to be used in Claude Code. Running that iteratively until the agent stops complaining helped me to find quite a lot of issues in my prior PRs. ---
description: Performs thorough code review of current changes, looking for regressions, subtle bugs, and design issues like a staff engineer.
mode: primary
temperature: 0.1
tools:
write: false
edit: false
bash: true
permission:
bash:
"*": deny
"grep *"
"go test*": allow
"go build*": allow
"go vet*": allow
"go fmt*": allow
"git *": allow
"git push *": deny
"git reset *": deny
---
You are a staff-level engineer performing a thorough review of the current changes in this repository.
Your job is to identify problems and simplification opportunities, not to make changes. Read the diff carefully, then explore the surrounding code to understand the full context before forming opinions. Before diving into details, step back and question whether the approach itself makes sense — does it actually achieve its intended goal, and is there a fundamentally better way to solve the same problem? Assume tests and the build already pass. Flag complexity issues in surrounding code when the change interacts with it, but do not suggest improvements completely unrelated to the changes under review.
Focus on:
- **Regressions**: Does this change break existing behavior? Look at callers, tests, and interfaces that depend on modified code.
- **Concurrency issues**: Race conditions, missing locks, unsafe shared state, goroutine leaks.
- **Error handling**: Swallowed errors, missing error propagation, inconsistent error behavior compared to neighboring code.
- **Edge cases**: Nil pointers, empty slices, integer overflow, off-by-one errors, zero-value traps.
- **Contract violations**: Does the change respect the implicit contracts of the interfaces and functions it touches? Are invariants preserved?
- **Resource leaks**: Unclosed connections, files, or channels. Missing deferred cleanup.
- **Behavioral inconsistencies**: Does the new code behave differently from similar patterns already in the codebase?
- **Architecture and complexity**: Does the change introduce or reveal tight coupling, layering violations, misplaced responsibilities, unnecessary indirection, redundant abstractions, or duplicated logic? Would the change be significantly simpler under a different structural arrangement? Could touched code paths be expressed more directly?
- **Test value**: Are added tests low-value (testing trivial behavior, duplicating existing coverage, or tightly coupled to implementation details)? Are there overlapping tests that could be consolidated? Are high-value tests missing — particularly for edge cases, error paths, concurrency, and integration boundaries that the change affects?
Present your findings in two sections:
## Issues
Numbered list sorted by impact. For each: location (file:line), what is wrong and how it manifests, severity (critical/high/medium/low), and a concrete recommendation.
## Simplification Opportunities
Numbered list sorted by impact. For each: what is unnecessarily complex, where, what a simpler version looks like, and what improves as a result.
If either section has no items, say so explicitly. Do not invent problems or fabricate opportunities.
|
|
@klaudworks we thought about that - but the LFS Proxy needs an SDK as a producer - the normal proxy does not know what file size comes. Means we'd lose default Kafka compatibility. So @kamir's idea was to have a different proxy as an add-on since not all implementations need large file support. Whats your take? We have a discussion open about this feature => #87 |
My main concern is maintaining 2 different versions of kafka plumbing and reintroducing the already fixed bugs, race conditions etc. Also going forward each bug / feature needs to be implemented in both proxies. According to my current understanding, the normal proxy could just have a feature flag for the LFS. If that is enabled the proxy rewrites messages with the LFS_BLOB header as it currently does (currently it also just rewrites messages with this header). The current proxy could then be horizontally scaled if needed or if you want strict separation you can have a separate proxy instance with lfs support enabled where kafka clients send their large messages. I am assuming that @kamir would like to merge this soonish. So I'd recommend to
|
|
We should not merge when we might find a better solution and avoid technical debt. That's why I pointed to the discussion; that's the board to discuss this from an engineering level. The feature flag is quite charming - can be set dynamically with the proxy YAML. |
Update: LFS Proxy Merged into Unified ProxyThis commit merges the standalone LFS proxy ( ArchitectureThe integration inserts LFS record rewriting between parse and route in When Changes to
|
| File | Purpose |
|---|---|
lfs.go |
lfsModule struct, initLFSModule(), rewriteProduceRequest() adapter, health check, backend connectivity |
lfs_rewrite.go |
rewriteProduceRecords() — batch decode/encode, LFS_BLOB header detection, S3 upload, envelope replacement |
lfs_record.go |
Record encoding, varint helpers, lfsBuildRecordBatch() |
lfs_s3.go |
s3Uploader — PutObject, multipart upload, presign, stream upload |
lfs_http.go |
HTTP API (/lfs/produce, /lfs/download, /lfs/uploads/*) with chunked upload sessions |
lfs_metrics.go |
Prometheus metrics (upload duration, bytes, errors, orphans, runtime stats) |
lfs_tracker.go + lfs_tracker_types.go |
Async Kafka-based operations event log with circuit breaker |
lfs_sasl_encode.go |
SASL handshake encoding + produce request encoding for HTTP API path |
lfs_backend_auth.go |
Backend TLS wrapping + SASL PLAIN auth for HTTP API connections |
lfs_backend_tls.go |
Backend TLS config builder |
lfs_http_tls.go |
HTTP server TLS config builder |
lfs_swagger.go |
Swagger UI + OpenAPI spec handler |
lfs_uuid.go |
UUID generation |
openapi.yaml |
Embedded OpenAPI spec |
lfs_test.go |
9 tests with in-memory fakeS3 backend |
Review Issues Addressed
All 7 issues identified during code review have been fixed:
1. lfsDropHeader input slice mutation — Changed from headers[:0] (which shares the underlying array and could corrupt other references) to make([]kmsg.Header, 0, len(headers)) which allocates a fresh slice.
2. connectBackend reads env vars on every call — Moved KAFSCALE_LFS_PROXY_BACKEND_RETRIES and KAFSCALE_LFS_PROXY_BACKEND_BACKOFF_MS parsing to initLFSModule(), stored as backendRetries and backendBackoff struct fields on lfsModule. No more os.Getenv + strconv.Atoi per connection attempt.
3. forwardToBackend unused addr parameter — Removed the unused addr string parameter from the method signature. Updated both callers in lfs_http.go (lines 349, 859) to use _ for the addr returned by connectBackend.
4. Orphan tracking gap in proxy path — rewriteProduceRequest now returns (bool, []orphanInfo, error) instead of (bool, error). The caller in handleProduceRouting() holds the orphan list and only calls p.lfs.trackOrphans(lfsOrphans) if the downstream forwardProduce() returns an error. Previously, orphans were collected but silently discarded.
5. No deadline on forwardToBackend — Added conn.SetDeadline() using the context deadline (if available) or m.dialTimeout as fallback. Deadline is cleared after the read/write completes via defer.
6. Test csErr unused variable — Replaced bytes.Contains([]byte(err.Error()), []byte("checksum")) with proper errors.As(err, &csErr) type assertion plus field validation (csErr.Expected). Removed unused bytes import.
7. lfsGetClientIP bool named err — Renamed the bool return from strings.Cut from err to found for clarity.
What Stays Unchanged
cmd/lfs-proxy/— untouched except for deprecation warning logpkg/lfs/— untouched- All demo scripts and Makefile targets — untouched (they validate the refactoring)
- Existing proxy tests — all 20 pass unchanged
- All
pkg/tests pass
Test Results
cmd/proxy/ — 29 tests PASS (20 existing + 9 LFS)
cmd/lfs-proxy/ — PASS (untouched)
pkg/lfs/ — PASS (untouched)
go vet ./... — clean
go test -race — clean
…ations Multi-language LFS client SDKs and end-to-end demo applications for producing and consuming large files through the KafScale LFS proxy. SDKs: - Java SDK with retry/backoff, configurable timeouts, Maven build - Python SDK with retry/backoff, envelope codec, pip-installable - JavaScript SDK (Node.js) with streaming upload support - Browser SDK (JS) for SPA-based LFS file management Demos: - E60: Medical imaging LFS demo (README/design) - E61: Video streaming LFS demo (README/design) - E62: Industrial IoT LFS demo (README/design) - E70: Java SDK producer demo with video upload - E71: Python SDK demo with video LFS upload - E72: Browser-native LFS SDK single-page app Infrastructure: - Docker Compose stack for local LFS development - Synology S3-compatible storage deployment - Browser demo UI (HTML/CSS/JS) - LFS task tracking documents > This is PR 2 of 2. Depends on PR KafScale#128 (core LFS proxy). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…Scale#104) Dependabot update merged after green checks.
…rocessors Add a complete LFS subsystem for streaming large binary payloads through KafScale via S3-backed object storage with Kafka metadata references. Core components: - LFS proxy server (cmd/lfs-proxy/) with HTTP upload/download, S3 backend, Kafka record publishing, Prometheus metrics, checksum verification, TLS/SASL support, and OpenAPI spec - LFS package (pkg/lfs/) for shared types and client logic - Operator integration: reconcile LFS proxy Deployment, Service, and ConfigMap as part of KafscaleCluster CR - Console LFS admin handlers for object/topic/orphan browsing - IDoc exploder (cmd/idoc-explode/) for SAP IDoc payloads - Iceberg processor LFS-aware decoding and config extensions - E2E test scaffolding and CI pipeline for LFS proxy image - Helm chart templates for LFS proxy, metrics, and monitoring - LFS proxy documentation and OpenAPI specification Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ations Multi-language LFS client SDKs and end-to-end demo applications for producing and consuming large files through the KafScale LFS proxy. SDKs: - Java SDK with retry/backoff, configurable timeouts, Maven build - Python SDK with retry/backoff, envelope codec, pip-installable - JavaScript SDK (Node.js) with streaming upload support - Browser SDK (JS) for SPA-based LFS file management Demos: - E60: Medical imaging LFS demo (README/design) - E61: Video streaming LFS demo (README/design) - E62: Industrial IoT LFS demo (README/design) - E70: Java SDK producer demo with video upload - E71: Python SDK demo with video LFS upload - E72: Browser-native LFS SDK single-page app Infrastructure: - Docker Compose stack for local LFS development - Synology S3-compatible storage deployment - Browser demo UI (HTML/CSS/JS) - LFS task tracking documents > This is PR 2 of 2. Depends on PR KafScale#128 (core LFS proxy). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ches
The Makefile referenced scripts/lfs-demo-{medical,video,industrial}.sh
but the actual files are named {medical,video,industrial}-lfs-demo.sh.
Also adds all LFS scripts that were missing from the lfs-core branch:
lfs-demo.sh, e72-browser-demo.sh, idoc-explode-demo.sh, verify-lfs-urls.sh,
and stage-release-local.sh.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Replace minimal 1-item sample IDoc with a realistic ORDERS05 purchase order containing 3 line items, 3 partners, 3 dates, and 2 status records - Improve demo script to show record counts per topic, total summary, and a preview of the first 2 records from each topic file Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The IDoc demo now exercises the complete LFS data flow: 1. Upload IDoc XML to S3 (simulating LFS proxy blob upload) 2. Create an LFS envelope (pointer record) 3. Feed envelope to idoc-explode which resolves the blob from S3 4. Explode IDoc segments into topic-specific JSONL streams Key improvements: - Routed segments (items, partners, dates, status) now include a `fields` map aggregating all child element values, making each record self-contained and useful for downstream analytics - Realistic ORDERS05 sample with 3 line items, 3 partners, 3 dates, and 2 status records - Demo script uses MinIO for S3 storage and demonstrates envelope creation matching the kfs_lfs envelope format Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Adds the required license header to files flagged by the CI license-check job (hack/check_license_headers.py). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The medical, video, and industrial demo scripts searched for '*.meta'
files in MinIO, but the LFS proxy stores objects as 'obj-{uuid}' with
no extension. This caused 'S3 blobs found: 0' despite successful
uploads. Match on 'obj-*' and narrow search to the namespace prefix.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… gate - Fix all golangci-lint issues (errcheck, staticcheck, ineffassign, unused) across 50 files: checked error returns, migrated deprecated AWS APIs to BaseEndpoint, fixed type switch patterns, suppressed unused test helpers - Fix 9 CodeQL high-severity alerts: add bounds checks for integer conversions (strconv → int32/int16), prevent allocation overflow in record encoding, escape HTML in browser demo to prevent XSS, replace empty password defaults with null in Helm values - Fix 2 CodeQL notices: remove unused Python imports (datetime, Optional) - Fix coverage gate: restore 45% threshold to match main branch - Add .gitignore patterns for Java/JS/Python/Go build artifacts - Add deployment templates and documentation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Embed LFS (Large File Support) as a module inside the existing Kafka
proxy behind KAFSCALE_PROXY_LFS_ENABLED (default: off). When enabled,
the produce path detects LFS_BLOB headers, uploads payloads to S3, and
replaces record values with JSON envelopes — all before the existing
partition-aware fan-out runs.
Key design:
- Insert between parse and route in handleProduceRouting(); setting
payload=nil forces re-encode via the existing fanOut path
- Zero overhead when disabled (nil pointer check only)
- cmd/lfs-proxy/ stays as-is with deprecation warning
- All env vars (KAFSCALE_LFS_PROXY_*) reused unchanged
New files in cmd/proxy/:
lfs.go - lfsModule struct, initLFSModule(), integration adapter
lfs_rewrite.go - rewriteProduceRecords(), batch/record/header helpers
lfs_record.go - record encoding, varint helpers
lfs_s3.go - s3Uploader (PutObject, multipart, presign)
lfs_http.go - HTTP API (/lfs/produce, /lfs/download, /lfs/uploads)
lfs_metrics.go - Prometheus metrics
lfs_tracker.go - async Kafka-based event log
lfs_tracker_types.go - event type definitions
lfs_sasl_encode.go - SASL handshake + produce encoding for HTTP path
lfs_backend_auth.go - backend TLS wrapping + SASL auth
lfs_backend_tls.go - backend TLS config builder
lfs_http_tls.go - HTTP server TLS config builder
lfs_swagger.go - Swagger UI handler
lfs_uuid.go - UUID generation
openapi.yaml - embedded OpenAPI spec
lfs_test.go - 9 tests (blob detection, passthrough, checksum,
mixed records, CRC validation, nil module)
Review fixes applied:
- lfsDropHeader allocates new slice (no input mutation)
- connectBackend uses struct fields, not per-call os.Getenv
- forwardToBackend: removed unused addr param, added conn deadline
- rewriteProduceRequest returns orphans; caller tracks on produce failure
- Test uses errors.As for proper ChecksumError assertion
- lfsGetClientIP: bool from strings.Cut named 'found' not 'err'
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add 32 test files across all packages (acl, broker, cache, idoc, protocol, storage, lfs, console, mcpserver, ui) achieving broad coverage. Include new pkg/lfs/producer.go client, improved coverage gate script, and protobuf license headers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@klaudworks @kamir - can you please resolve the comments? |
Align all LFS modules and test files with the kmsg type aliases introduced in upstream PR KafScale#132. Updates field names (Topic, Partition, Leader, Replicas, ISR, TimeoutMillis, TransactionID, Generation) and replaces removed protocol helpers in encoding/request tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Skip node_modules, build artifacts (target/, egg-info/), examples, deploy templates, and .claude config in the license header checker. Add Apache 2.0 header to deploy/DEPLOYMENT.md. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace removed protocol.ProduceTopicResponse, ProducePartitionResponse, ProduceResponse, and EncodeProduceResponse with kmsg equivalents and protocol.EncodeResponse. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use explicitly typed variables with bounds checks for int→int32/int16 conversions in tracker topic creation. Exclude node_modules, target, and egg-info directories from CodeQL analysis. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ndler errors Previously, when a handler returned an error (e.g., unsupported Fetch version), the broker closed the TCP connection causing clients to see EOF. Now the broker sends a proper Kafka error response and continues serving the connection. Also removes the Fetch v11 minimum version check — kmsg handles all versions natively, so the broker can serve Fetch v0+ requests. This fixes "fetching message: EOF" errors seen with clients like segmentio/kafka-go that may negotiate older Fetch versions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Superseded by #134 — a surgically clean branch ( What changed:
The original |
Summary
Complete LFS (Large File Support) subsystem for streaming large binary payloads through KafScale via S3-backed object storage with Kafka metadata references.
Core LFS Proxy (119 files)
cmd/lfs-proxy/) — HTTP upload/download, S3 backend, Kafka record publishing, Prometheus metrics, checksum verification, TLS/SASL support, OpenAPI specpkg/lfs/) — shared types, producer, consumer, resolver, envelope, and S3 client logicKafscaleClusterCRinternal/console/) — object/topic/orphan browsing handlerscmd/idoc-explode/) — SAP IDoc payload processing with field aggregation into routed parent segmentsClient SDKs & Demos (97 files)
Fixes included
timeimport inlfs_handlers.golfs-demo-medical.sh→medical-lfs-demo.sh)lfs_version→kfs_lfs)Fieldsmap for parent segment field aggregation*.meta→obj-*)Test plan
go vet ./...passesgo test -race ./...passesgo build ./cmd/lfs-proxypasseshack/check_license_headers.py)helm lint deploy/helm/kafscale)🤖 Generated with Claude Code