Skip to content

Fix Kinesis offset handling for inclusive start sequence numbers and empty batches#18231

Open
utafrali wants to merge 1 commit intoapache:masterfrom
utafrali:fix/issue-18220-revisit-kinesis-offset-handling-for-incl
Open

Fix Kinesis offset handling for inclusive start sequence numbers and empty batches#18231
utafrali wants to merge 1 commit intoapache:masterfrom
utafrali:fix/issue-18220-revisit-kinesis-offset-handling-for-incl

Conversation

@utafrali
Copy link
Copy Markdown
Contributor

Fixes #18220

Addressed the two issues from the TODOs. The start sequence number for a new partition is inclusive, but the code was using AFTER_SEQUENCE_NUMBER which lost the first message. Fixed by checking startOffset.isInclusive() and using AT_SEQUENCE_NUMBER when appropriate.

Also fixed empty batch handling. Kinesis can return empty results temporarily even when records are available. We now check if there's a non-null next shard iterator and continue using it on the next cycle instead of treating it as fully caught up.

Added tests for both fixes.

@noob-se7en noob-se7en added bug Something is not working as expected ingestion Related to data ingestion pipeline kinesis Related to AWS Kinesis stream connector real-time Related to realtime table ingestion and serving labels Apr 16, 2026
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one high-signal compatibility risk; see inline comment.

return JsonUtils.newObjectNode().put(_shardId, _sequenceNumber).toString();
ObjectNode node = JsonUtils.newObjectNode().put(_shardId, _sequenceNumber);
if (_inclusive) {
node.put("inclusive", true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding inclusive to the serialized offset changes the persisted wire format from {"<shard>":"<seq>"} to a two-field JSON object. Older Pinot binaries only accept the one-field form in new KinesisPartitionGroupOffset(String), so a rolling upgrade or rollback can fail as soon as a new controller/server writes one of these offsets into shared metadata. Because stream offsets cross process and version boundaries, this needs a backward-compatible encoding strategy or a rollout plan that keeps the on-disk format stable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something is not working as expected ingestion Related to data ingestion pipeline kinesis Related to AWS Kinesis stream connector real-time Related to realtime table ingestion and serving

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Revisit Kinesis offset handling for inclusive start sequence numbers and empty batches

3 participants