diff --git a/config/frac_version.go b/config/frac_version.go index ff1283b0..5437e38b 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -18,6 +18,11 @@ const ( // Also in this version we've changed the binary layout of section storing // info block. As a result we store info as a plain JSON without additional registry. BinaryDataV3 + + // BinaryDataV4 - bitpack for LIDs/MIDs + BinaryDataV4 + // BinaryDataV5 - LID blocks have firstLID/lastLID encoded in ext1, isContinued is not used, no legacy TID adjusting + BinaryDataV5 ) -const CurrentFracVersion = BinaryDataV3 +const CurrentFracVersion = BinaryDataV5 diff --git a/frac/fraction_test.go b/frac/fraction_test.go index bdf24408..f8eaaad9 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -94,7 +94,7 @@ func (s *FractionTestSuite) SetupTestCommon() { DocsPositionsZstdLevel: 1, TokenTableZstdLevel: 1, DocBlocksZstdLevel: 1, - LIDBlockSize: 512, + LIDBlockSize: 256, DocBlockSize: 128 * int(units.KiB), } @@ -1323,6 +1323,43 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { fromTime: fromTime, toTime: toTime, }, + // block skipping scenarios + { + name: "service:gateway AND trace_id:trace-2026", + query: "service:gateway AND trace_id:trace-2026", + filter: func(doc *testDoc) bool { + return doc.service == gateway && doc.traceId == "trace-2026" + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)", + query: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)", + filter: func(doc *testDoc) bool { + return doc.service == gateway && (doc.traceId == "trace-0" || doc.traceId == "trace-2500" || doc.traceId == "trace-4999") + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND pod:pod-5", + query: "service:gateway AND pod:pod-5", + filter: func(doc *testDoc) bool { + return doc.service == gateway && doc.pod == "pod-5" + }, + fromTime: fromTime, + toTime: toTime, + }, + { + name: "service:gateway AND pod:pod-5 AND message:failed", + query: "service:gateway AND pod:pod-5 AND message:failed", + filter: func(doc *testDoc) bool { + return doc.service == gateway && doc.pod == "pod-5" && strings.Contains(doc.message, "failed") + }, + fromTime: fromTime, + toTime: toTime, + }, { name: "service:gateway AND message:processing AND message:retry AND level:5", query: "service:gateway AND message:processing AND message:retry AND level:5", @@ -1334,6 +1371,17 @@ func (s *FractionTestSuite) TestSearchLargeFrac() { toTime: toTime, }, // OR operator queries + { + name: "(service OR) AND (trace_id OR)", + query: "(service:bus OR service:kafka) AND (trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000)", + filter: func(doc *testDoc) bool { + return (doc.service == bus || doc.service == kafka) && (doc.traceId == "trace-1000" || + doc.traceId == "trace-1500" || + doc.traceId == "trace-2000") + }, + fromTime: fromTime, + toTime: toTime, + }, { name: "trace_id OR", query: "trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000 OR trace_id:trace-2500 OR trace_id:trace-3000", diff --git a/frac/sealed/lids/iterator_asc.go b/frac/sealed/lids/iterator_asc.go index a2fd0c5a..16715ac1 100644 --- a/frac/sealed/lids/iterator_asc.go +++ b/frac/sealed/lids/iterator_asc.go @@ -81,13 +81,14 @@ func (it *IteratorAsc) NextGeq(nextID node.LID) node.LID { return node.NullLID() } + it.blockIndex = it.table.SeekBlockLeq(it.blockIndex, it.tid, nextID.Unpack()) + it.loadNextLIDsBlock() it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock) it.counter.AddLIDsCount(len(it.lids)) } // fast path: smallest remaining > nextID => skip entire block - // TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header if it.lids[0] > nextID.Unpack() { it.lids = it.lids[:0] continue diff --git a/frac/sealed/lids/iterator_desc.go b/frac/sealed/lids/iterator_desc.go index 5c4e08d9..6c0691e2 100644 --- a/frac/sealed/lids/iterator_desc.go +++ b/frac/sealed/lids/iterator_desc.go @@ -80,13 +80,14 @@ func (it *IteratorDesc) NextGeq(nextID node.LID) node.LID { return node.NullLID() } + it.blockIndex = it.table.SeekBlockGeq(it.blockIndex, it.tid, nextID.Unpack()) + it.loadNextLIDsBlock() // last chunk in block but not last for tid; need load next block it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock) it.counter.AddLIDsCount(len(it.lids)) // inc loaded LIDs count } // fast path: last LID < nextID => skip the entire block - // TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header if nextID.Unpack() > it.lids[len(it.lids)-1] { it.lids = it.lids[:0] continue diff --git a/frac/sealed/lids/table.go b/frac/sealed/lids/table.go index 5a04acdd..c8fd864d 100644 --- a/frac/sealed/lids/table.go +++ b/frac/sealed/lids/table.go @@ -5,6 +5,8 @@ import ( "go.uber.org/zap" + "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/logger" ) @@ -12,24 +14,35 @@ type Table struct { StartBlockIndex uint32 MaxTIDs []uint32 // defines last tid for each block MinTIDs []uint32 // defines first not continued tid for each block + FirstLIDs []uint32 + LastLIDs []uint32 - // TODO: We need fix MinTID issue that we have to compensate with DiskBlock.getAdjustedMinTID() - // TODO: After that we do not need store IsContinued flag, and able calc it as MaxTIDs[i] == MinTIDs[i+1] - IsContinued []bool + FracVer config.BinaryDataVersion + IsContinued []bool // legacy field, only used in BinaryDataV0-BinaryDataV3 (inclusive) } -func NewTable(startOfLIDsBlockIndex uint32, minTIDs, maxTIDs []uint32, isContinued []bool) *Table { +func NewTable( + fracVer config.BinaryDataVersion, + startOfLIDsBlockIndex uint32, + minTIDs, maxTIDs []uint32, + firstLIDs, lastLIDs []uint32, + isContinued []bool) *Table { return &Table{ StartBlockIndex: startOfLIDsBlockIndex, MinTIDs: minTIDs, MaxTIDs: maxTIDs, + FirstLIDs: firstLIDs, + LastLIDs: lastLIDs, IsContinued: isContinued, + FracVer: fracVer, } } func (t *Table) GetAdjustedMinTID(blockIndex uint32) uint32 { - if t.IsContinued[blockIndex] { - return t.MinTIDs[blockIndex] - 1 + if t.FracVer < config.BinaryDataV5 { + if t.IsContinued[blockIndex] { + return t.MinTIDs[blockIndex] - 1 + } } return t.MinTIDs[blockIndex] } @@ -75,6 +88,46 @@ func (t *Table) GetLastBlockIndexForTID(tid uint32) uint32 { return uint32(index) } +// SeekBlockGeq finds next block for provided TID which contains +// lid greater or equal to provided LID starting from provided index (inclusive). +// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions. +func (t *Table) SeekBlockGeq(index uint32, tid uint32, nextLID uint32) uint32 { + if t.FracVer < config.BinaryDataV5 { + // not supported for old frac versions + return index + } + + res := index + for i := index + 1; i < uint32(len(t.MinTIDs)); i++ { + if t.MinTIDs[i] == tid && nextLID >= t.FirstLIDs[i] { + res = i + continue + } + break + } + return res +} + +// SeekBlockLeq finds next block with lowest index for provided TID which contains LIDs +// less or equal to provided LID starting from provided index (inclusive). +// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions. +func (t *Table) SeekBlockLeq(index uint32, tid uint32, nextLID uint32) uint32 { + if t.FracVer < config.BinaryDataV5 { + // not supported for old frac versions + return index + } + + res := index + for i := int(index) - 1; i >= 0; i-- { + if t.MaxTIDs[i] == tid && nextLID <= t.LastLIDs[i] { + res = uint32(i) + continue + } + break + } + return res +} + func (t *Table) HasTIDInPrevBlock(blockIndex, tid uint32) bool { if blockIndex == 0 { // it is no prev block return false diff --git a/frac/sealed/sealing/blocks_builder.go b/frac/sealed/sealing/blocks_builder.go index 3c6ce1b0..c6d0e64c 100644 --- a/frac/sealed/sealing/blocks_builder.go +++ b/frac/sealed/sealing/blocks_builder.go @@ -30,9 +30,10 @@ type tokensSealBlock struct { // lidsExt represents the range and continuation status of LID blocks. type lidsExt struct { - minTID uint32 // First token ID in the LID block - maxTID uint32 // Last token ID in the LID block - isContinued bool // Whether LID sequence continues in next block + minTID uint32 // First token ID in the LID block + maxTID uint32 // Last token ID in the LID block + firstLID uint32 // First LID in the LID block + lastLID uint32 // Last LID in the LID block } // lidsSealBlock represents a sealed block containing LID (Local ID) data. @@ -282,8 +283,7 @@ func (a *lidAccumulator) finalizeBlock() lidsSealBlock { result := a.currentBlock result.payload.IsLastLID = a.isEndOfToken - result.ext.isContinued = a.isContinued - - a.isContinued = !a.isEndOfToken + result.ext.firstLID = a.currentBlock.payload.LIDs[0] + result.ext.lastLID = a.currentBlock.payload.LIDs[len(a.currentBlock.payload.LIDs)-1] return result } diff --git a/frac/sealed/sealing/blocks_builder_test.go b/frac/sealed/sealing/blocks_builder_test.go index d6bca144..eeddbefb 100644 --- a/frac/sealed/sealing/blocks_builder_test.go +++ b/frac/sealed/sealing/blocks_builder_test.go @@ -253,27 +253,27 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { expectedLIDBlocks := []lidsSealBlock{ { - ext: lidsExt{minTID: 1, maxTID: 1, isContinued: false}, + ext: lidsExt{minTID: 1, maxTID: 1, firstLID: 10, lastLID: 30}, payload: lids.Block{LIDs: []uint32{10, 20, 30}, Offsets: []uint32{0, 3}, IsLastLID: false}, }, { - ext: lidsExt{minTID: 1, maxTID: 3, isContinued: true}, + ext: lidsExt{minTID: 1, maxTID: 3, firstLID: 40, lastLID: 3}, payload: lids.Block{LIDs: []uint32{40, 2, 3}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 4, maxTID: 6, isContinued: false}, + ext: lidsExt{minTID: 4, maxTID: 6, firstLID: 4, lastLID: 6}, payload: lids.Block{LIDs: []uint32{4, 5, 6}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 7, maxTID: 9, isContinued: false}, + ext: lidsExt{minTID: 7, maxTID: 9, firstLID: 7, lastLID: 9}, payload: lids.Block{LIDs: []uint32{7, 8, 9}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 10, maxTID: 12, isContinued: false}, + ext: lidsExt{minTID: 10, maxTID: 12, firstLID: 10, lastLID: 12}, payload: lids.Block{LIDs: []uint32{10, 11, 12}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 13, maxTID: 14, isContinued: false}, + ext: lidsExt{minTID: 13, maxTID: 14, firstLID: 13, lastLID: 14}, payload: lids.Block{LIDs: []uint32{13, 14}, Offsets: []uint32{0, 1, 2}, IsLastLID: true}, }, } diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index e7bf7348..59afd7c6 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -3,6 +3,8 @@ package sealing import ( "io" + "github.com/ozontech/seq-db/config" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" @@ -288,22 +290,17 @@ func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock { // packLIDsBlock packs Local IDs (LIDs) into a compressed index block. // Also updates LIDs table for preloaded data access. func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock { - var ext1 uint64 - if block.ext.isContinued { // todo: Legacy continuation flag - ext1 = 1 - block.ext.minTID++ // Adjust for legacy format - } - // Update LIDs table for PreloadedData s.lidsTable.MinTIDs = append(s.lidsTable.MinTIDs, block.ext.minTID) s.lidsTable.MaxTIDs = append(s.lidsTable.MaxTIDs, block.ext.maxTID) - s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.ext.isContinued) + s.lidsTable.FirstLIDs = append(s.lidsTable.FirstLIDs, block.ext.firstLID) + s.lidsTable.LastLIDs = append(s.lidsTable.LastLIDs, block.ext.lastLID) + s.lidsTable.FracVer = config.CurrentFracVersion // Packing block s.buf1 = block.payload.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel) - b.ext1 = ext1 // Legacy continuation flag - b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range - + b.ext1 = uint64(block.ext.lastLID)<<32 | uint64(block.ext.firstLID) // LID range + b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range return b } diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index 893b75a4..18edbe7f 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -41,7 +41,7 @@ func (l *LegacyLoader) Load(blocksData *sealed.BlocksData, info *common.Info, re logger.Fatal("legacy load ids error", zap.Error(err)) } - blocksData.LIDsTable, err = l.loadLIDsTable() + blocksData.LIDsTable, err = l.loadLIDsTable(info.BinaryDataVer) if err != nil { logger.Fatal("legacy load lids error", zap.Error(err)) } @@ -127,12 +127,14 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab } // loadLIDsTable scans LID block headers, recording the absolute start index for lids.Table. -func (l *LegacyLoader) loadLIDsTable() (*lids.Table, error) { +func (l *LegacyLoader) loadLIDsTable(fracVer config.BinaryDataVersion) (*lids.Table, error) { startIndex := l.blockIndex // absolute index of first LID block in .index var ( maxTIDs []uint32 minTIDs []uint32 + firstLIDs []uint32 + lastLIDs []uint32 isContinued []bool ) @@ -150,10 +152,15 @@ func (l *LegacyLoader) loadLIDsTable() (*lids.Table, error) { maxTIDs = append(maxTIDs, uint32(h.GetExt2()>>32)) minTIDs = append(minTIDs, uint32(h.GetExt2()&0xFFFFFFFF)) - isContinued = append(isContinued, h.GetExt1() == 1) + if fracVer >= config.BinaryDataV5 { + lastLIDs = append(lastLIDs, uint32(h.GetExt1()>>32)) + firstLIDs = append(firstLIDs, uint32(h.GetExt1()&0xFFFFFFFF)) + } else { + isContinued = append(isContinued, h.GetExt1() == 1) + } } - return lids.NewTable(startIndex, minTIDs, maxTIDs, isContinued), nil + return lids.NewTable(fracVer, startIndex, minTIDs, maxTIDs, firstLIDs, lastLIDs, isContinued), nil } // IndexReaders holds one IndexReader per split index file. @@ -188,7 +195,7 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, readers blocksData.BlocksOffsets = blockOffsets.Offsets blocksData.IDsTable = l.loadIDsTable(readers.ID, blockOffsets.IDsTotal, info.BinaryDataVer) - blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID) + blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID, info.BinaryDataVer) if err != nil { logger.Fatal("load lids error", zap.Error(err)) } @@ -266,10 +273,12 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio } // loadLIDsTable scans block headers in the .lid file to build lids.Table. -func (l *Loader) loadLIDsTable(r storage.IndexReader) (*lids.Table, error) { +func (l *Loader) loadLIDsTable(r storage.IndexReader, fracVer config.BinaryDataVersion) (*lids.Table, error) { var ( maxTIDs []uint32 minTIDs []uint32 + firstLIDs []uint32 + lastLIDs []uint32 isContinued []bool ) @@ -291,8 +300,14 @@ func (l *Loader) loadLIDsTable(r storage.IndexReader) (*lids.Table, error) { maxTIDs = append(maxTIDs, uint32(ext2>>32)) minTIDs = append(minTIDs, uint32(ext2&0xFFFFFFFF)) - isContinued = append(isContinued, header.GetExt1() == 1) + ext1 := header.GetExt1() + if fracVer >= config.BinaryDataV5 { + lastLIDs = append(lastLIDs, uint32(ext1>>32)) + firstLIDs = append(firstLIDs, uint32(ext1&0xFFFFFFFF)) + } else { + isContinued = append(isContinued, ext1 == 1) + } } - return lids.NewTable(0, minTIDs, maxTIDs, isContinued), nil + return lids.NewTable(fracVer, 0, minTIDs, maxTIDs, firstLIDs, lastLIDs, isContinued), nil }