From 08da2600cd5f9b04aaa638021728ac009aad33e3 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Mon, 20 Apr 2026 13:41:01 +0400 Subject: [PATCH 1/4] skip LID blocks --- config/frac_version.go | 6 ++- frac/fraction_test.go | 50 ++++++++++++++++++++- frac/sealed/lids/iterator_asc.go | 3 +- frac/sealed/lids/iterator_desc.go | 3 +- frac/sealed/lids/table.go | 64 ++++++++++++++++++++++++--- frac/sealed/sealing/blocks_builder.go | 4 ++ frac/sealed/sealing/index.go | 15 +++---- frac/sealed_loader.go | 15 +++++-- 8 files changed, 137 insertions(+), 23 deletions(-) diff --git a/config/frac_version.go b/config/frac_version.go index d3ff1b14..07427f23 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -9,6 +9,10 @@ const ( BinaryDataV1 // BinaryDataV2 - MIDs stored in nanoseconds BinaryDataV2 + // BinaryDataV3 - bitpack for LIDs/MIDs + BinaryDataV3 + // BinaryDataV4 - LID blocks have firstLID/lastLID encoded in ext1, isContinued is not used, no legacy TID adjusting + BinaryDataV4 ) -const CurrentFracVersion = BinaryDataV2 +const CurrentFracVersion = BinaryDataV4 diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 3f0994e6..21050606 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -97,7 +97,7 @@ func (s *FractionTestSuite) SetupTestCommon() { DocsPositionsZstdLevel: 1, TokenTableZstdLevel: 1, DocBlocksZstdLevel: 1, - LIDBlockSize: 512, + LIDBlockSize: 256, DocBlockSize: 128 * int(units.KiB), } @@ -1326,6 +1326,43 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { fromTime: fromTime, toTime: toTime, }, + // block skipping scenarios + { + name: "service:gateway AND trace_id:trace-2026", + query: "service:gateway AND trace_id:trace-2026", + filter: func(doc *testDoc) bool { + return doc.service == gateway && doc.traceId == "trace-2026" + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)", + query: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)", + filter: func(doc *testDoc) bool { + return doc.service == gateway && (doc.traceId == "trace-0" || doc.traceId == "trace-2500" || doc.traceId == "trace-4999") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND pod:pod-5", + query: "service:gateway AND pod:pod-5", + filter: func(doc *testDoc) bool { + return doc.service == gateway && doc.pod == "pod-5" + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND pod:pod-5 AND message:failed", + query: "service:gateway AND pod:pod-5 AND message:failed", + filter: func(doc *testDoc) bool { + return doc.service == gateway && doc.pod == "pod-5" && strings.Contains(doc.message, "failed") + }, + fromTime: fromTime, + toTime: toTime, + }, { name: "service:gateway AND message:processing AND message:retry AND level:5", query: "service:gateway AND message:processing AND message:retry AND level:5", @@ -1337,6 +1374,17 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { toTime: toTime, }, // OR operator queries + { + name: "(service OR) AND (trace_id OR)", + query: "(service:bus OR service:kafka) AND (trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000)", + filter: func(doc *testDoc) bool { + return (doc.service == bus || doc.service == kafka) && (doc.traceId == "trace-1000" || + doc.traceId == "trace-1500" || + doc.traceId == "trace-2000") + }, + fromTime: fromTime, + toTime: toTime, + }, { name: "trace_id OR", query: "trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000 OR trace_id:trace-2500 OR trace_id:trace-3000", diff --git a/frac/sealed/lids/iterator_asc.go b/frac/sealed/lids/iterator_asc.go index a2fd0c5a..16715ac1 100644 --- a/frac/sealed/lids/iterator_asc.go +++ b/frac/sealed/lids/iterator_asc.go @@ -81,13 +81,14 @@ func (it *IteratorAsc) NextGeq(nextID node.LID) node.LID { return node.NullLID() } + it.blockIndex = it.table.SeekBlockLeq(it.blockIndex, it.tid, nextID.Unpack()) + it.loadNextLIDsBlock() it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock) it.counter.AddLIDsCount(len(it.lids)) } // fast path: smallest remaining > nextID => skip entire block - // TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header if it.lids[0] > nextID.Unpack() { it.lids = it.lids[:0] continue diff --git a/frac/sealed/lids/iterator_desc.go b/frac/sealed/lids/iterator_desc.go index 5c4e08d9..6c0691e2 100644 --- a/frac/sealed/lids/iterator_desc.go +++ b/frac/sealed/lids/iterator_desc.go @@ -80,13 +80,14 @@ func (it *IteratorDesc) NextGeq(nextID node.LID) node.LID { return node.NullLID() } + it.blockIndex = it.table.SeekBlockGeq(it.blockIndex, it.tid, nextID.Unpack()) + it.loadNextLIDsBlock() // last chunk in block but not last for tid; need load next block it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock) it.counter.AddLIDsCount(len(it.lids)) // inc loaded LIDs count } // fast path: last LID < nextID => skip the entire block - // TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header if nextID.Unpack() > it.lids[len(it.lids)-1] { it.lids = it.lids[:0] continue diff --git a/frac/sealed/lids/table.go b/frac/sealed/lids/table.go index 5a04acdd..cdb5548d 100644 --- a/frac/sealed/lids/table.go +++ b/frac/sealed/lids/table.go @@ -3,6 +3,7 @@ package lids import ( "sort" + "github.com/ozontech/seq-db/config" "go.uber.org/zap" "github.com/ozontech/seq-db/logger" @@ -12,24 +13,35 @@ type Table struct { StartBlockIndex uint32 MaxTIDs []uint32 // defines last tid for each block MinTIDs []uint32 // defines first not continued tid for each block + FirstLIDs []uint32 + LastLIDs []uint32 - // TODO: We need fix MinTID issue that we have to compensate with DiskBlock.getAdjustedMinTID() - // TODO: After that we do not need store IsContinued flag, and able calc it as MaxTIDs[i] == MinTIDs[i+1] - IsContinued []bool + FracVer config.BinaryDataVersion + IsContinued []bool // legacy field, only used in BinaryDataV0-BinaryDataV3 (inclusive) } -func NewTable(startOfLIDsBlockIndex uint32, minTIDs, maxTIDs []uint32, isContinued []bool) *Table { +func NewTable( + fracVer config.BinaryDataVersion, + startOfLIDsBlockIndex uint32, + minTIDs, maxTIDs []uint32, + firstLIDs, lastLIDs []uint32, + isContinued []bool) *Table { return &Table{ StartBlockIndex: startOfLIDsBlockIndex, MinTIDs: minTIDs, MaxTIDs: maxTIDs, + FirstLIDs: firstLIDs, + LastLIDs: lastLIDs, IsContinued: isContinued, + FracVer: fracVer, } } func (t *Table) GetAdjustedMinTID(blockIndex uint32) uint32 { - if t.IsContinued[blockIndex] { - return t.MinTIDs[blockIndex] - 1 + if t.FracVer < config.BinaryDataV4 { + if t.IsContinued[blockIndex] { + return t.MinTIDs[blockIndex] - 1 + } } return t.MinTIDs[blockIndex] } @@ -75,6 +87,46 @@ func (t *Table) GetLastBlockIndexForTID(tid uint32) uint32 { return uint32(index) } +// SeekBlockGeq finds next block for provided TID which contains +// lid greater or equal to provided LID starting from provided index (inclusive). +// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions. +func (t *Table) SeekBlockGeq(index uint32, tid uint32, nextLID uint32) uint32 { + if t.FracVer < config.BinaryDataV4 { + // not supported for old frac versions + return index + } + + res := index + for i := index + 1; i < uint32(len(t.MinTIDs)); i++ { + if t.MinTIDs[i] == tid && nextLID >= t.FirstLIDs[i] { + res = i + continue + } + break + } + return res +} + +// SeekBlockLeq finds next block with lowest index for provided TID which contains LIDs +// less or equal to provided LID starting from provided index (inclusive). +// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions. +func (t *Table) SeekBlockLeq(index uint32, tid uint32, nextLID uint32) uint32 { + if t.FracVer < config.BinaryDataV4 { + // not supported for old frac versions + return index + } + + res := index + for i := int(index) - 1; i >= 0; i-- { + if t.MaxTIDs[i] == tid && nextLID <= t.LastLIDs[i] { + res = uint32(i) + continue + } + break + } + return res +} + func (t *Table) HasTIDInPrevBlock(blockIndex, tid uint32) bool { if blockIndex == 0 { // it is no prev block return false diff --git a/frac/sealed/sealing/blocks_builder.go b/frac/sealed/sealing/blocks_builder.go index 14a5cac7..2f75eca3 100644 --- a/frac/sealed/sealing/blocks_builder.go +++ b/frac/sealed/sealing/blocks_builder.go @@ -27,6 +27,8 @@ type tokensSealBlock struct { type lidsExt struct { minTID uint32 // First token ID in the LID block maxTID uint32 // Last token ID in the LID block + firstLID uint32 // First LID in the LID block + lastLID uint32 // Last LID in the LID block isContinued bool // Whether LID sequence continues in next block } @@ -188,6 +190,8 @@ func (bb *blocksBuilder) BuildLIDsBlocks(tokenLIDs iter.Seq[[]uint32], blockCapa currentBlock.payload.IsLastLID = isEndOfToken // TODO(eguguchkin): Remove legacy field currentBlock.ext.isContinued = isContinued // TODO(eguguchkin): Remove legacy field isContinued = !isEndOfToken + currentBlock.ext.firstLID = currentBlock.payload.LIDs[0] + currentBlock.ext.lastLID = currentBlock.payload.LIDs[len(currentBlock.payload.LIDs)-1] return yield(currentBlock) } diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 48cf8302..3b4a6b9e 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -8,6 +8,7 @@ import ( "time" "github.com/alecthomas/units" + "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/bytespool" "github.com/ozontech/seq-db/consts" @@ -414,22 +415,18 @@ func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock { // packLIDsBlock packs Local IDs (LIDs) into a compressed index block. // Also updates LIDs table for preloaded data access. func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock { - var ext1 uint64 - if block.ext.isContinued { // todo: Legacy continuation flag - ext1 = 1 - block.ext.minTID++ // Adjust for legacy format - } - // Update LIDs table for PreloadedData s.lidsTable.MinTIDs = append(s.lidsTable.MinTIDs, block.ext.minTID) s.lidsTable.MaxTIDs = append(s.lidsTable.MaxTIDs, block.ext.maxTID) - s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.ext.isContinued) + s.lidsTable.FirstLIDs = append(s.lidsTable.FirstLIDs, block.ext.firstLID) + s.lidsTable.LastLIDs = append(s.lidsTable.LastLIDs, block.ext.lastLID) + s.lidsTable.FracVer = config.CurrentFracVersion // Packing block s.buf1 = block.payload.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel) - b.ext1 = ext1 // Legacy continuation flag - b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range + b.ext1 = uint64(block.ext.lastLID)<<32 | uint64(block.ext.firstLID) // LID range + b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range return b } diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index ae639862..419d334e 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -36,7 +36,7 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, indexRea logger.Fatal("load ids error", zap.Error(err)) } - if blocksData.LIDsTable, err = l.loadLIDsBlocksTable(); err != nil { + if blocksData.LIDsTable, err = l.loadLIDsBlocksTable(info.BinaryDataVer); err != nil { logger.Fatal("load lids error", zap.Error(err)) } @@ -134,9 +134,11 @@ func (l *Loader) skipTokens() { } } -func (l *Loader) loadLIDsBlocksTable() (*lids.Table, error) { +func (l *Loader) loadLIDsBlocksTable(fracVer config.BinaryDataVersion) (*lids.Table, error) { maxTIDs := make([]uint32, 0) minTIDs := make([]uint32, 0) + firstLIDs := make([]uint32, 0) + lastLIDs := make([]uint32, 0) isContinued := make([]bool, 0) startIndex := l.blockIndex @@ -152,8 +154,13 @@ func (l *Loader) loadLIDsBlocksTable() (*lids.Table, error) { maxTIDs = append(maxTIDs, uint32(ext2>>32)) minTIDs = append(minTIDs, uint32(ext2&0xFFFFFFFF)) - isContinued = append(isContinued, ext1 == 1) + if fracVer >= config.BinaryDataV4 { + lastLIDs = append(lastLIDs, uint32(ext1>>32)) + firstLIDs = append(firstLIDs, uint32(ext1&0xFFFFFFFF)) + } else { + isContinued = append(isContinued, ext1 == 1) + } } - return lids.NewTable(startIndex, minTIDs, maxTIDs, isContinued), nil + return lids.NewTable(fracVer, startIndex, minTIDs, maxTIDs, firstLIDs, lastLIDs, isContinued), nil } From 87a6cfcdf8e5beb0e49cdd42aa251d960e0f312e Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Tue, 21 Apr 2026 15:16:08 +0400 Subject: [PATCH 2/4] remove isContinued from lid ext --- frac/sealed/sealing/blocks_builder.go | 12 ++---- frac/sealed/sealing/blocks_builder_test.go | 47 ++++++++++++---------- 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/frac/sealed/sealing/blocks_builder.go b/frac/sealed/sealing/blocks_builder.go index 2f75eca3..7eee7b1f 100644 --- a/frac/sealed/sealing/blocks_builder.go +++ b/frac/sealed/sealing/blocks_builder.go @@ -25,11 +25,10 @@ type tokensSealBlock struct { // lidsExt represents the range and continuation status of LID blocks. type lidsExt struct { - minTID uint32 // First token ID in the LID block - maxTID uint32 // Last token ID in the LID block - firstLID uint32 // First LID in the LID block - lastLID uint32 // Last LID in the LID block - isContinued bool // Whether LID sequence continues in next block + minTID uint32 // First token ID in the LID block + maxTID uint32 // Last token ID in the LID block + firstLID uint32 // First LID in the LID block + lastLID uint32 // Last LID in the LID block } // lidsSealBlock represents a sealed block containing LID (Local ID) data. @@ -171,7 +170,6 @@ func (bb *blocksBuilder) BuildLIDsBlocks(tokenLIDs iter.Seq[[]uint32], blockCapa currentTID uint32 // Current TID being processed currentBlock lidsSealBlock // Current block under construction isEndOfToken bool // Flag for end of current token's LIDs - isContinued bool // Flag for block continuation ) // Initialize first block @@ -188,8 +186,6 @@ func (bb *blocksBuilder) BuildLIDsBlocks(tokenLIDs iter.Seq[[]uint32], blockCapa currentBlock.payload.Offsets = append(currentBlock.payload.Offsets, uint32(len(currentBlock.payload.LIDs))) } currentBlock.payload.IsLastLID = isEndOfToken // TODO(eguguchkin): Remove legacy field - currentBlock.ext.isContinued = isContinued // TODO(eguguchkin): Remove legacy field - isContinued = !isEndOfToken currentBlock.ext.firstLID = currentBlock.payload.LIDs[0] currentBlock.ext.lastLID = currentBlock.payload.LIDs[len(currentBlock.payload.LIDs)-1] return yield(currentBlock) diff --git a/frac/sealed/sealing/blocks_builder_test.go b/frac/sealed/sealing/blocks_builder_test.go index 80892ca2..fd62ba32 100644 --- a/frac/sealed/sealing/blocks_builder_test.go +++ b/frac/sealed/sealing/blocks_builder_test.go @@ -319,9 +319,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { expected := []lidsSealBlock{{ ext: lidsExt{ - minTID: 1, - maxTID: 1, - isContinued: false, + minTID: 1, + maxTID: 1, + firstLID: 10, + lastLID: 30, }, payload: lids.Block{ LIDs: []uint32{10, 20, 30}, @@ -330,20 +331,21 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { }, }, { ext: lidsExt{ - minTID: 1, - maxTID: 2, - isContinued: true, + minTID: 1, + maxTID: 2, + firstLID: 40, + lastLID: 21, }, payload: lids.Block{ - LIDs: []uint32{40, 11, 21}, - Offsets: []uint32{0, 1, 3}, - IsLastLID: false, + LIDs: []uint32{40, 11, 21}, + Offsets: []uint32{0, 1, 3}, }, }, { ext: lidsExt{ - minTID: 2, - maxTID: 3, - isContinued: true, + minTID: 2, + maxTID: 3, + firstLID: 31, + lastLID: 10, }, payload: lids.Block{ LIDs: []uint32{31, 41, 10}, @@ -352,9 +354,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { }, }, { ext: lidsExt{ - minTID: 3, - maxTID: 3, - isContinued: true, + minTID: 3, + maxTID: 3, + firstLID: 11, + lastLID: 21, }, payload: lids.Block{ LIDs: []uint32{11, 20, 21}, @@ -363,9 +366,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { }, }, { ext: lidsExt{ - minTID: 4, - maxTID: 4, - isContinued: false, + minTID: 4, + maxTID: 4, + firstLID: 30, + lastLID: 50, }, payload: lids.Block{ LIDs: []uint32{30, 40, 50}, @@ -374,9 +378,10 @@ func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { }, }, { ext: lidsExt{ - minTID: 4, - maxTID: 4, - isContinued: true, + minTID: 4, + maxTID: 4, + firstLID: 60, + lastLID: 60, }, payload: lids.Block{ LIDs: []uint32{60}, From f9cb05d5f7a86cba8a62b45ab7db6b9713b390ae Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Tue, 12 May 2026 14:55:48 +0400 Subject: [PATCH 3/4] merge index split --- frac/sealed/sealing/blocks_builder.go | 5 +- frac/sealed/sealing/blocks_builder_test.go | 126 +-------------------- frac/sealed/sealing/index.go | 2 - frac/sealed_loader.go | 22 ++-- 4 files changed, 23 insertions(+), 132 deletions(-) diff --git a/frac/sealed/sealing/blocks_builder.go b/frac/sealed/sealing/blocks_builder.go index 9203c276..c6d0e64c 100644 --- a/frac/sealed/sealing/blocks_builder.go +++ b/frac/sealed/sealing/blocks_builder.go @@ -283,8 +283,7 @@ func (a *lidAccumulator) finalizeBlock() lidsSealBlock { result := a.currentBlock result.payload.IsLastLID = a.isEndOfToken - result.ext.isContinued = a.isContinued - - a.isContinued = !a.isEndOfToken + result.ext.firstLID = a.currentBlock.payload.LIDs[0] + result.ext.lastLID = a.currentBlock.payload.LIDs[len(a.currentBlock.payload.LIDs)-1] return result } diff --git a/frac/sealed/sealing/blocks_builder_test.go b/frac/sealed/sealing/blocks_builder_test.go index 53dddf8f..eeddbefb 100644 --- a/frac/sealed/sealing/blocks_builder_test.go +++ b/frac/sealed/sealing/blocks_builder_test.go @@ -253,27 +253,27 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { expectedLIDBlocks := []lidsSealBlock{ { - ext: lidsExt{minTID: 1, maxTID: 1, isContinued: false}, + ext: lidsExt{minTID: 1, maxTID: 1, firstLID: 10, lastLID: 30}, payload: lids.Block{LIDs: []uint32{10, 20, 30}, Offsets: []uint32{0, 3}, IsLastLID: false}, }, { - ext: lidsExt{minTID: 1, maxTID: 3, isContinued: true}, + ext: lidsExt{minTID: 1, maxTID: 3, firstLID: 40, lastLID: 3}, payload: lids.Block{LIDs: []uint32{40, 2, 3}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 4, maxTID: 6, isContinued: false}, + ext: lidsExt{minTID: 4, maxTID: 6, firstLID: 4, lastLID: 6}, payload: lids.Block{LIDs: []uint32{4, 5, 6}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 7, maxTID: 9, isContinued: false}, + ext: lidsExt{minTID: 7, maxTID: 9, firstLID: 7, lastLID: 9}, payload: lids.Block{LIDs: []uint32{7, 8, 9}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 10, maxTID: 12, isContinued: false}, + ext: lidsExt{minTID: 10, maxTID: 12, firstLID: 10, lastLID: 12}, payload: lids.Block{LIDs: []uint32{10, 11, 12}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 13, maxTID: 14, isContinued: false}, + ext: lidsExt{minTID: 13, maxTID: 14, firstLID: 13, lastLID: 14}, payload: lids.Block{LIDs: []uint32{13, 14}, Offsets: []uint32{0, 1, 2}, IsLastLID: true}, }, } @@ -332,117 +332,3 @@ func TestBlocksBuilder_IDsBlocks(t *testing.T) { assert.Equal(t, src.ids, ids) assert.Equal(t, src.pos, pos) } - -func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { - src := mockSource{ - tokenLIDs: [][]uint32{ - { - 10, // block 1, tid 1 - 20, // block 1, tid 1 - 30, // block 1, tid 1 - - 40, // block 2, tid 1 - }, { - 11, // block 2, tid 2 - 21, // block 2, tid 2 - - 31, // block 3, tid 2 - 41, // block 3, tid 2 - }, { - 10, // block 3, tid 3 - - 11, // block 4, tid 3 - 20, // block 4, tid 3 - 21, // block 4, tid 3 - - }, { - 30, // block 5, tid 4 - 40, // block 5, tid 4 - 50, // block 5, tid 4 - - 60, // block 6, tid 4 - }, - }, - } - - expected := []lidsSealBlock{{ - ext: lidsExt{ - minTID: 1, - maxTID: 1, - firstLID: 10, - lastLID: 30, - }, - payload: lids.Block{ - LIDs: []uint32{10, 20, 30}, - Offsets: []uint32{0, 3}, - IsLastLID: false, - }, - }, { - ext: lidsExt{ - minTID: 1, - maxTID: 2, - firstLID: 40, - lastLID: 21, - }, - payload: lids.Block{ - LIDs: []uint32{40, 11, 21}, - Offsets: []uint32{0, 1, 3}, - }, - }, { - ext: lidsExt{ - minTID: 2, - maxTID: 3, - firstLID: 31, - lastLID: 10, - }, - payload: lids.Block{ - LIDs: []uint32{31, 41, 10}, - Offsets: []uint32{0, 2, 3}, - IsLastLID: false, - }, - }, { - ext: lidsExt{ - minTID: 3, - maxTID: 3, - firstLID: 11, - lastLID: 21, - }, - payload: lids.Block{ - LIDs: []uint32{11, 20, 21}, - Offsets: []uint32{0, 3}, - IsLastLID: true, - }, - }, { - ext: lidsExt{ - minTID: 4, - maxTID: 4, - firstLID: 30, - lastLID: 50, - }, - payload: lids.Block{ - LIDs: []uint32{30, 40, 50}, - Offsets: []uint32{0, 3}, - IsLastLID: false, - }, - }, { - ext: lidsExt{ - minTID: 4, - maxTID: 4, - firstLID: 60, - lastLID: 60, - }, - payload: lids.Block{ - LIDs: []uint32{60}, - Offsets: []uint32{0, 1}, - IsLastLID: true, - }}, - } - bb := blocksBuilder{} - blocks := []lidsSealBlock{} - for block := range bb.BuildLIDsBlocks(src.TokenLIDs(), 3) { - block.payload.LIDs = slices.Clone(block.payload.LIDs) // copy lids - block.payload.Offsets = slices.Clone(block.payload.Offsets) // copy offsets - blocks = append(blocks, block) - } - assert.Equal(t, expected, blocks) -} diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index aeaec1b9..59afd7c6 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -3,10 +3,8 @@ package sealing import ( "io" - "github.com/alecthomas/units" "github.com/ozontech/seq-db/config" - "github.com/ozontech/seq-db/bytespool" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index fb0e5eb8..9e8b1f92 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -127,12 +127,14 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab } // loadLIDsTable scans LID block headers, recording the absolute start index for lids.Table. -func (l *LegacyLoader) loadLIDsTable() (*lids.Table, error) { +func (l *LegacyLoader) loadLIDsTable(fracVer config.BinaryDataVersion) (*lids.Table, error) { startIndex := l.blockIndex // absolute index of first LID block in .index var ( maxTIDs []uint32 minTIDs []uint32 + firstLIDs []uint32 + lastLIDs []uint32 isContinued []bool ) @@ -150,10 +152,15 @@ func (l *LegacyLoader) loadLIDsTable() (*lids.Table, error) { maxTIDs = append(maxTIDs, uint32(h.GetExt2()>>32)) minTIDs = append(minTIDs, uint32(h.GetExt2()&0xFFFFFFFF)) - isContinued = append(isContinued, h.GetExt1() == 1) + if fracVer >= config.BinaryDataV4 { + lastLIDs = append(lastLIDs, uint32(h.GetExt1()>>32)) + firstLIDs = append(firstLIDs, uint32(h.GetExt1()&0xFFFFFFFF)) + } else { + isContinued = append(isContinued, h.GetExt1() == 1) + } } - return lids.NewTable(startIndex, minTIDs, maxTIDs, isContinued), nil + return lids.NewTable(fracVer, startIndex, minTIDs, maxTIDs, firstLIDs, lastLIDs, isContinued), nil } // IndexReaders holds one IndexReader per split index file. @@ -188,7 +195,7 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, readers blocksData.BlocksOffsets = blockOffsets.Offsets blocksData.IDsTable = l.loadIDsTable(readers.ID, blockOffsets.IDsTotal, info.BinaryDataVer) - blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID) + blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID, info.BinaryDataVer) if err != nil { logger.Fatal("load lids error", zap.Error(err)) } @@ -270,8 +277,8 @@ func (l *Loader) loadLIDsTable(r storage.IndexReader, fracVer config.BinaryDataV var ( maxTIDs []uint32 minTIDs []uint32 - firstLIDs []uint32 - lastLIDs []uint32 + firstLIDs []uint32 + lastLIDs []uint32 isContinued []bool ) @@ -293,6 +300,7 @@ func (l *Loader) loadLIDsTable(r storage.IndexReader, fracVer config.BinaryDataV maxTIDs = append(maxTIDs, uint32(ext2>>32)) minTIDs = append(minTIDs, uint32(ext2&0xFFFFFFFF)) + ext1 := header.GetExt1() if fracVer >= config.BinaryDataV4 { lastLIDs = append(lastLIDs, uint32(ext1>>32)) firstLIDs = append(firstLIDs, uint32(ext1&0xFFFFFFFF)) @@ -301,5 +309,5 @@ func (l *Loader) loadLIDsTable(r storage.IndexReader, fracVer config.BinaryDataV } } - return lids.NewTable(0, minTIDs, maxTIDs, isContinued), nil + return lids.NewTable(fracVer, 0, minTIDs, maxTIDs, firstLIDs, lastLIDs, isContinued), nil } From 7b9c1a8608052463692de6f559f6d8a799a59cb7 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Tue, 12 May 2026 16:44:12 +0400 Subject: [PATCH 4/4] use BinaryDataV5 --- frac/sealed/lids/table.go | 9 +++++---- frac/sealed_loader.go | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/frac/sealed/lids/table.go b/frac/sealed/lids/table.go index cdb5548d..c8fd864d 100644 --- a/frac/sealed/lids/table.go +++ b/frac/sealed/lids/table.go @@ -3,9 +3,10 @@ package lids import ( "sort" - "github.com/ozontech/seq-db/config" "go.uber.org/zap" + "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/logger" ) @@ -38,7 +39,7 @@ func NewTable( } func (t *Table) GetAdjustedMinTID(blockIndex uint32) uint32 { - if t.FracVer < config.BinaryDataV4 { + if t.FracVer < config.BinaryDataV5 { if t.IsContinued[blockIndex] { return t.MinTIDs[blockIndex] - 1 } @@ -91,7 +92,7 @@ func (t *Table) GetLastBlockIndexForTID(tid uint32) uint32 { // lid greater or equal to provided LID starting from provided index (inclusive). // - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions. func (t *Table) SeekBlockGeq(index uint32, tid uint32, nextLID uint32) uint32 { - if t.FracVer < config.BinaryDataV4 { + if t.FracVer < config.BinaryDataV5 { // not supported for old frac versions return index } @@ -111,7 +112,7 @@ func (t *Table) SeekBlockGeq(index uint32, tid uint32, nextLID uint32) uint32 { // less or equal to provided LID starting from provided index (inclusive). // - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions. func (t *Table) SeekBlockLeq(index uint32, tid uint32, nextLID uint32) uint32 { - if t.FracVer < config.BinaryDataV4 { + if t.FracVer < config.BinaryDataV5 { // not supported for old frac versions return index } diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index 9e8b1f92..18edbe7f 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -152,7 +152,7 @@ func (l *LegacyLoader) loadLIDsTable(fracVer config.BinaryDataVersion) (*lids.Ta maxTIDs = append(maxTIDs, uint32(h.GetExt2()>>32)) minTIDs = append(minTIDs, uint32(h.GetExt2()&0xFFFFFFFF)) - if fracVer >= config.BinaryDataV4 { + if fracVer >= config.BinaryDataV5 { lastLIDs = append(lastLIDs, uint32(h.GetExt1()>>32)) firstLIDs = append(firstLIDs, uint32(h.GetExt1()&0xFFFFFFFF)) } else { @@ -301,7 +301,7 @@ func (l *Loader) loadLIDsTable(r storage.IndexReader, fracVer config.BinaryDataV minTIDs = append(minTIDs, uint32(ext2&0xFFFFFFFF)) ext1 := header.GetExt1() - if fracVer >= config.BinaryDataV4 { + if fracVer >= config.BinaryDataV5 { lastLIDs = append(lastLIDs, uint32(ext1>>32)) firstLIDs = append(firstLIDs, uint32(ext1&0xFFFFFFFF)) } else {