Skip to content

feat: Add KafkaRecord type and Protobuf deserialization for Kafka trigger binding (#868)#869

Open
TsuyoshiUshio wants to merge 2 commits intoAzure:devfrom
TsuyoshiUshio:feature/issue-868-kafka-record
Open

feat: Add KafkaRecord type and Protobuf deserialization for Kafka trigger binding (#868)#869
TsuyoshiUshio wants to merge 2 commits intoAzure:devfrom
TsuyoshiUshio:feature/issue-868-kafka-record

Conversation

@TsuyoshiUshio
Copy link
Copy Markdown
Contributor

Summary

Add support for binding to raw Apache Kafka records (KafkaRecord type) in the Java worker, enabling users to access full Kafka message metadata via Protobuf deserialization.

Issue

Fixes #868
Relates to Azure/azure-functions-kafka-extension#612

Changes

New files

  • KafkaRecord.java — Main POJO with topic, partition, offset, key/value as raw byte[], timestamp, headers, leader epoch
  • KafkaHeader.java — Header with key (String) + value (byte[]) + getValueAsString() helper
  • KafkaTimestamp.java — Timestamp with getUnixTimestampMs(), getType(), getDateTimeOffset()
  • KafkaTimestampType.java — Enum: NotAvailable, CreateTime, LogAppendTime (with fromValue() safe mapping)
  • KafkaRecordProtoDeserializer.java — Protobuf → KafkaRecord POJO conversion
  • KafkaRecordProto.proto — Shared schema (synced with host extension Microsoft.Azure.WebJobs.Extensions.Kafka)
  • KafkaRecordDeserializerTest.java — 10 unit tests

Modified files

  • RpcModelBindingDataSource.java — Add content_type dispatch: application/json (existing path) vs application/x-protobuf with source AzureKafkaRecord (new KafkaRecord path). Also adds KafkaRecord.class operation to DataOperations
  • pom.xml — Add second protobuf-maven-plugin execution for Kafka proto compilation from src/main/proto/kafka/

Breaking Changes

None. All existing binding types (String, byte[], Map<String, String>, POJO) continue to work unchanged. Users opt in via KafkaRecord parameter type.

Testing

  • 10 unit tests covering: full record round-trip, null key/value, no leader epoch, unknown timestamp type fallback, multiple headers, null header value, timestamp OffsetDateTime, KafkaTimestampType.fromValue()
  • E2E tests: deferred (requires Docker Kafka environment)

User Experience

// Existing (continues to work)
@FunctionName("ExistingTrigger")
public void run(@KafkaTrigger(...) String message) { }

// NEW: Full record access
@FunctionName("KafkaRecordTrigger")
public void run(
    @KafkaTrigger(name = "record", topic = "my-topic",
                  brokerList = "%BrokerList%", consumerGroup = "$Default")
    KafkaRecord record,
    final ExecutionContext context) {
    context.getLogger().info("Topic: " + record.getTopic());
    context.getLogger().info("Key: " + record.getKeyAsString());
    for (KafkaHeader h : record.getHeaders()) {
        context.getLogger().info("Header: " + h.getKey() + " = " + h.getValueAsString());
    }
}

Note

The host-side counterpart (Kafka Extension 4.3.1) is already released on NuGet. The .NET Isolated Worker implementation was completed in Azure/azure-functions-dotnet-worker#3356.

…gger binding

Add support for binding to raw Apache Kafka records (KafkaRecord type) in the
Java worker, enabling users to access full Kafka message metadata.

New files:
- KafkaRecord.java — Main POJO with topic, partition, offset, key/value as
  raw bytes, timestamp, headers, leader epoch
- KafkaHeader.java — Header with key + byte[] value + getValueAsString()
- KafkaTimestamp.java — Timestamp with UnixTimestampMs + type + getDateTimeOffset()
- KafkaTimestampType.java — Enum: NotAvailable, CreateTime, LogAppendTime
- KafkaRecordProtoDeserializer.java — Protobuf to POJO conversion
- KafkaRecordProto.proto — Shared schema (synced with host extension)
- KafkaRecordDeserializerTest.java — 10 unit tests

Modified files:
- RpcModelBindingDataSource.java — Add content_type dispatch: application/json
  (existing path) vs application/x-protobuf (new KafkaRecord path)
- pom.xml — Add second proto source root for Kafka proto compilation

Non-breaking: All existing binding types (String, byte[], Map) unchanged.
Users opt in via KafkaRecord parameter type.

Relates to Azure/azure-functions-kafka-extension#612
Fixes Azure#868

Co-authored-by: Dobby <dobby@microsoft.com>
Move KafkaRecord, KafkaHeader, KafkaTimestamp, KafkaTimestampType from
worker package (com.microsoft.azure.functions.worker.binding.kafka) to
library package (com.microsoft.azure.functions) so user functions can
import these types in their Maven projects.

Worker keeps only KafkaRecordProtoDeserializer (runtime-only code).

Co-authored-by: Dobby <dobby@microsoft.com>
@TsuyoshiUshio
Copy link
Copy Markdown
Contributor Author

Dependency note: This PR depends on Azure/azure-functions-java-library#236 which adds the KafkaRecord, KafkaHeader, KafkaTimestamp, KafkaTimestampType POJO types to the library.

The POJO types were initially in this PR but have been moved to the library so that user function code can import them. This PR now contains only the runtime-side changes:

  • KafkaRecordProtoDeserializer — Protobuf → KafkaRecord conversion
  • RpcModelBindingDataSource — content_type dispatch (JSON vs Protobuf)
  • Proto schema and Maven plugin config

Release order: java-library must be released first → update worker's java-library dependency version → merge this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support raw Kafka record (KafkaRecord) binding with Protobuf deserialization

1 participant