Skip to content

[MSE] Add Apache Arrow as native columnar block format#18207

Draft
praveenc7 wants to merge 6 commits intoapache:masterfrom
praveenc7:pchaganl/arrow-integration-v2
Draft

[MSE] Add Apache Arrow as native columnar block format#18207
praveenc7 wants to merge 6 commits intoapache:masterfrom
praveenc7:pchaganl/arrow-integration-v2

Conversation

@praveenc7
Copy link
Copy Markdown
Contributor

Summary

#18205

Testing Done

TODO

@gortiz gortiz requested review from gortiz and yashmayya and removed request for yashmayya April 15, 2026 10:44
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 15, 2026

Codecov Report

❌ Patch coverage is 33.02836% with 732 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.26%. Comparing base (9ff14da) to head (be4bb9f).
⚠️ Report is 9 commits behind head on master.

Files with missing lines Patch % Lines
.../apache/pinot/common/datablock/ArrowDataBlock.java 17.39% 126 Missing and 7 partials ⚠️
...inot/query/runtime/blocks/ArrowBlockConverter.java 29.62% 86 Missing and 9 partials ⚠️
...not/query/mailbox/flight/FlightSendingMailbox.java 0.00% 77 Missing ⚠️
...inot/query/mailbox/flight/FlightMailboxServer.java 0.00% 73 Missing ⚠️
...ot/query/runtime/operator/join/ArrowJoinProbe.java 61.58% 55 Missing and 13 partials ⚠️
.../planner/partitioning/SingleColumnKeySelector.java 21.42% 59 Missing and 7 partials ⚠️
.../query/runtime/operator/join/ArrowLookupTable.java 54.54% 49 Missing and 11 partials ⚠️
.../apache/pinot/segment/spi/memory/ArrowBuffers.java 0.00% 32 Missing ⚠️
.../apache/pinot/query/runtime/blocks/ArrowBlock.java 50.98% 22 Missing and 3 partials ⚠️
...pinot/query/runtime/operator/HashJoinOperator.java 63.76% 15 Missing and 10 partials ⚠️
... and 14 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18207      +/-   ##
============================================
- Coverage     63.34%   63.26%   -0.09%     
+ Complexity     1627     1624       -3     
============================================
  Files          3238     3252      +14     
  Lines        197003   198121    +1118     
  Branches      30464    30644     +180     
============================================
+ Hits         124801   125349     +548     
- Misses        62202    62670     +468     
- Partials      10000    10102     +102     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.24% <33.02%> (+7.99%) ⬆️
java-21 34.74% <0.00%> (-28.58%) ⬇️
temurin 63.26% <33.02%> (-0.09%) ⬇️
unittests 63.26% <33.02%> (-0.09%) ⬇️
unittests1 55.14% <33.02%> (-0.15%) ⬇️
unittests2 34.84% <0.00%> (-0.12%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@yashmayya
Copy link
Copy Markdown
Contributor

Code review

Found 10 issues:

  1. SENTINEL collision causes silent data loss for Integer.MIN_VALUE join keys. IntKeyHasher.computeHash() returns the raw int value, so keys equal to Integer.MIN_VALUE produce a hash identical to the empty-slot sentinel. The probe loop in ArrowJoinProbe.collectMatchesWithCollisions() breaks on sentinel, so these rows are silently dropped in inner joins or treated as unmatched in outer joins.

/** Sentinel value in the hash table marking an empty slot. */
public static final int SENTINEL = Integer.MIN_VALUE;

while (true) {
int rightHash = _hashes[slot];
if (rightHash == ArrowLookupTable.SENTINEL) {
break;
}

  1. NPE in ArrowLookupTable.finalize() when the right side is empty. mergeBlocks() returns null for an empty batch list, then _keySelector.getArrowHasher(null) is called before the null guard in buildHashTable().

*/
public void finalize() {
_mergedBlock = mergeBlocks(_rightBatches);
_keyHasher = _keySelector.getArrowHasher(_mergedBlock);

  1. RIGHT / FULL OUTER JOIN broken in Arrow mode. When needUnmatchedRightRows() is true, buildNonMatchRightRows() reads _rightTable which is never populated in the Arrow path (only _arrowRightTable is set). The assert on _rightTable != null will fire under -ea; without it, unmatched right rows are silently lost. The TODO at line 256 acknowledges this gap.

}
if (needUnmatchedRightRows()) {
// TODO: emit unmatched right rows as Arrow blocks
List<Object[]> rows = buildNonMatchRightRows();
if (!rows.isEmpty()) {

  1. Hardcoded dictionary ID 0 for all dictionary-encoded columns. dictionaryEncoded() and listOfDictionaryEncoded() both use new DictionaryEncoding(0, true, null). Schemas with multiple STRING/JSON columns will share a single dictionary ID, causing DictionaryProvider.lookup() to return the wrong dictionary for the second and subsequent columns -- silent data corruption.

private static Field dictionaryEncoded(String name) {
DictionaryEncoding encoding = new DictionaryEncoding(0, true, null);
FieldType indexFieldType = new FieldType(true, new ArrowType.Int(32, true), encoding, null);
return new Field(name, indexFieldType, null);

  1. arrowTypeToPinot() maps all ArrowType.List to STRING_ARRAY. The reverse conversion ignores the list element type, so INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY, BOOLEAN_ARRAY, and BYTES_ARRAY are all lossily mapped to STRING_ARRAY on round-trip.

}
if (arrowType instanceof ArrowType.List) {
return ColumnDataType.STRING_ARRAY;
}

  1. Child allocator leaked per Flight batch in FlightMailboxService.processStream(). A new BufferAllocator is created per received block but never closed. Closing VectorSchemaRoot releases buffers but does not close the allocator itself, preventing the root allocator from shutting down cleanly.

try (ArrowRecordBatch batch = unloader.getRecordBatch()) {
BufferAllocator blockAllocator = _arrowBuffers.newQueryAllocator("flight-stream-block");
VectorSchemaRoot ownedRoot = VectorSchemaRoot.create(root.getSchema(), blockAllocator);
VectorLoader loader = new VectorLoader(ownedRoot);

  1. ArrowBlock not released after Flight send in FlightSendingMailbox.sendDataBlock(). The ArrowBlock obtained via data.asArrow() is used for transfer but release() is never called on the normal path, leaking off-heap memory per sent block. (The early-termination path at line 93 does call release().)

private void sendDataBlock(MseBlock.Data data) {
ArrowBlock arrowBlock = data.asArrow();
VectorSchemaRoot root = arrowBlock.getDataBlock().getRoot();
DictionaryProvider dictionaryProvider = arrowBlock.getDataBlock().getDictionaryProvider();

  1. Race condition in ArrowBlock.retain(). The check-then-act on _refCount is not atomic: between _refCount.get() == 0 and _refCount.incrementAndGet(), another thread can release() to 0 and close the underlying buffers. A CAS loop is needed.

/** Increments the reference count. Must be paired with a subsequent {@link #release()}. */
public void retain() {
if (_refCount.get() == 0) {
throw new IllegalStateException("ArrowBlock has already been released");
}
_refCount.incrementAndGet();
}

  1. ArrowBlock.isSerialized() returns true incorrectly. ArrowBlock.isRowHeap() returns false, so the inherited default isSerialized() (!isRowHeap()) returns true. QueryDispatcher branches on isSerialized() and will route ArrowBlock through reduceSerialized(), triggering an expensive and unintended Arrow -> RowHeap -> Serialized double conversion. ArrowBlock should override isSerialized() to return false.

/// Returns whether the block is a [SerializedDataBlock].
default boolean isSerialized() {
return !isRowHeap();
}

@Override
public boolean isRowHeap() {
return false;
}

  1. New public inner classes in SingleColumnKeySelector lack class-level Javadoc. 9 new public static inner classes (IntKeyHasher, LongKeyHasher, DictionaryKeyHasher, GenericKeyHasher, IntKeyComparator, LongKeyComparator, BothDictionaryKeyComparator, LeftDictionaryKeyComparator, GenericKeyComparator) have no Javadoc. (CLAUDE.md says "Add class-level Javadoc for new classes; describe behavior and thread-safety")

public static class IntKeyHasher implements ArrowKeyHasher {
private final IntVector _vector;

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

@praveenc7
Copy link
Copy Markdown
Contributor Author

praveenc7 commented Apr 16, 2026

@yashmayya Still working on this, will let you known by next week. When it is ready for full review. I am testing different stratergy to see the performance difference

…in Arrow path

Copy elimination:
- Flight receiver: replace unload/reload with TransferPair (0 copies vs 2)
- Flight sender: skip slice+transfer for single-batch blocks
- ArrowJoinProbe: bulk ArrowBuf gather for fixed-width vectors
- ArrowBlockConverter: direct buffer read with byte-swap for ColumnarDataBlock
- Dictionary merge: skip decode when all batches share the same dictionary
- Dictionary sharing: reference-counted SharedDictionaryProvider instead of deep copy

Correctness fixes from code review:
- Rename ArrowLookupTable.finalize() to build() to avoid Object.finalize() GC conflict
- Use CAS loop in retain() for thread-safe reference counting
- Guard bulk gather fast path with getNullCount()==0 to preserve nulls
- Make KeySelector Arrow methods default to avoid breaking external implementations
- Handle empty right-side in ArrowLookupTable.build() without NPE
Arrow Flight wiring:
- MailboxService creates FlightMailboxServer and FlightChannelManager when Arrow is enabled
- getSendingMailbox() returns FlightSendingMailbox for remote targets (zero-copy IPC)
- Flight port defaults to gRPC port + 1 (configurable via pinot.query.runner.flight.port)
- EOS/error signalling stays on gRPC for rolling-upgrade compatibility
- Flight runs plaintext only (TLS requires PEM certs — TODO for follow-up)

Correctness fixes:
- Close Arrow allocator in OpChain.cancel() (was only in close(), leaked on errors)
- Fall back to row path for RIGHT/FULL joins (buildNonMatchRightRows needs _rightTable)
- Rename ArrowLookupTable.finalize() to build() to avoid Object.finalize() GC conflict
- CAS loop in retain() for thread-safe reference counting
- Guard bulk gather with getNullCount()==0 to preserve nulls
- Make KeySelector Arrow methods default to avoid SPI break
- Fix FLOAT/DOUBLE ClassCastException in ArrowBlockConverter
- Close per-query Arrow BufferAllocator in OpChain.close()

LOC reduction:
- Remove unused ArrowSchemaConverter (inlined into ArrowDataBlock)
- Extract shared tryDirectCopy4/8 for endian-swap fast path
- Merge 4 probe methods into 2 unified implementations
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants