Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ const (
// Also in this version we've changed the binary layout of section storing
// info block. As a result we store info as a plain JSON without additional registry.
BinaryDataV3

// BinaryDataV4 - bitpack for LIDs/MIDs
BinaryDataV4
// BinaryDataV5 - LID blocks have firstLID/lastLID encoded in ext1, isContinued is not used, no legacy TID adjusting
BinaryDataV5
)

const CurrentFracVersion = BinaryDataV3
const CurrentFracVersion = BinaryDataV5
50 changes: 49 additions & 1 deletion frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *FractionTestSuite) SetupTestCommon() {
DocsPositionsZstdLevel: 1,
TokenTableZstdLevel: 1,
DocBlocksZstdLevel: 1,
LIDBlockSize: 512,
LIDBlockSize: 256,
DocBlockSize: 128 * int(units.KiB),
}

Expand Down Expand Up @@ -1323,6 +1323,43 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
fromTime: fromTime,
toTime: toTime,
},
// block skipping scenarios
{
name: "service:gateway AND trace_id:trace-2026",
query: "service:gateway AND trace_id:trace-2026",
filter: func(doc *testDoc) bool {
return doc.service == gateway && doc.traceId == "trace-2026"
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)",
query: "service:gateway AND (trace_id:trace-0 OR trace_id:trace-2500 OR trace_id:trace-4999)",
filter: func(doc *testDoc) bool {
return doc.service == gateway && (doc.traceId == "trace-0" || doc.traceId == "trace-2500" || doc.traceId == "trace-4999")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND pod:pod-5",
query: "service:gateway AND pod:pod-5",
filter: func(doc *testDoc) bool {
return doc.service == gateway && doc.pod == "pod-5"
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND pod:pod-5 AND message:failed",
query: "service:gateway AND pod:pod-5 AND message:failed",
filter: func(doc *testDoc) bool {
return doc.service == gateway && doc.pod == "pod-5" && strings.Contains(doc.message, "failed")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:gateway AND message:processing AND message:retry AND level:5",
query: "service:gateway AND message:processing AND message:retry AND level:5",
Expand All @@ -1334,6 +1371,17 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
toTime: toTime,
},
// OR operator queries
{
name: "(service OR) AND (trace_id OR)",
query: "(service:bus OR service:kafka) AND (trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000)",
filter: func(doc *testDoc) bool {
return (doc.service == bus || doc.service == kafka) && (doc.traceId == "trace-1000" ||
doc.traceId == "trace-1500" ||
doc.traceId == "trace-2000")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "trace_id OR",
query: "trace_id:trace-1000 OR trace_id:trace-1500 OR trace_id:trace-2000 OR trace_id:trace-2500 OR trace_id:trace-3000",
Expand Down
3 changes: 2 additions & 1 deletion frac/sealed/lids/iterator_asc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,14 @@ func (it *IteratorAsc) NextGeq(nextID node.LID) node.LID {
return node.NullLID()
}

it.blockIndex = it.table.SeekBlockLeq(it.blockIndex, it.tid, nextID.Unpack())

it.loadNextLIDsBlock()
it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock)
it.counter.AddLIDsCount(len(it.lids))
}

// fast path: smallest remaining > nextID => skip entire block
// TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header
if it.lids[0] > nextID.Unpack() {
it.lids = it.lids[:0]
continue
Expand Down
3 changes: 2 additions & 1 deletion frac/sealed/lids/iterator_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ func (it *IteratorDesc) NextGeq(nextID node.LID) node.LID {
return node.NullLID()
}

it.blockIndex = it.table.SeekBlockGeq(it.blockIndex, it.tid, nextID.Unpack())

it.loadNextLIDsBlock() // last chunk in block but not last for tid; need load next block
it.lids, it.tryNextBlock = it.narrowLIDsRange(it.lids, it.tryNextBlock)
it.counter.AddLIDsCount(len(it.lids)) // inc loaded LIDs count
}

// fast path: last LID < nextID => skip the entire block
// TODO(cheb0): We could also pass LID into narrowLIDsRange to perform block skipping once we add something like MinLID to LID block header
if nextID.Unpack() > it.lids[len(it.lids)-1] {
it.lids = it.lids[:0]
continue
Expand Down
65 changes: 59 additions & 6 deletions frac/sealed/lids/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,44 @@ import (

"go.uber.org/zap"

"github.com/ozontech/seq-db/config"

"github.com/ozontech/seq-db/logger"
)

type Table struct {
StartBlockIndex uint32
MaxTIDs []uint32 // defines last tid for each block
MinTIDs []uint32 // defines first not continued tid for each block
FirstLIDs []uint32
LastLIDs []uint32

// TODO: We need fix MinTID issue that we have to compensate with DiskBlock.getAdjustedMinTID()
// TODO: After that we do not need store IsContinued flag, and able calc it as MaxTIDs[i] == MinTIDs[i+1]
IsContinued []bool
FracVer config.BinaryDataVersion
IsContinued []bool // legacy field, only used in BinaryDataV0-BinaryDataV3 (inclusive)
}

func NewTable(startOfLIDsBlockIndex uint32, minTIDs, maxTIDs []uint32, isContinued []bool) *Table {
func NewTable(
fracVer config.BinaryDataVersion,
startOfLIDsBlockIndex uint32,
minTIDs, maxTIDs []uint32,
firstLIDs, lastLIDs []uint32,
isContinued []bool) *Table {
return &Table{
StartBlockIndex: startOfLIDsBlockIndex,
MinTIDs: minTIDs,
MaxTIDs: maxTIDs,
FirstLIDs: firstLIDs,
LastLIDs: lastLIDs,
IsContinued: isContinued,
FracVer: fracVer,
}
}

func (t *Table) GetAdjustedMinTID(blockIndex uint32) uint32 {
if t.IsContinued[blockIndex] {
return t.MinTIDs[blockIndex] - 1
if t.FracVer < config.BinaryDataV5 {
if t.IsContinued[blockIndex] {
return t.MinTIDs[blockIndex] - 1
}
}
return t.MinTIDs[blockIndex]
}
Expand Down Expand Up @@ -75,6 +88,46 @@ func (t *Table) GetLastBlockIndexForTID(tid uint32) uint32 {
return uint32(index)
}

// SeekBlockGeq finds next block for provided TID which contains
// lid greater or equal to provided LID starting from provided index (inclusive).
// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions.
func (t *Table) SeekBlockGeq(index uint32, tid uint32, nextLID uint32) uint32 {
if t.FracVer < config.BinaryDataV5 {
// not supported for old frac versions
return index
}

res := index
for i := index + 1; i < uint32(len(t.MinTIDs)); i++ {
if t.MinTIDs[i] == tid && nextLID >= t.FirstLIDs[i] {
res = i
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here i is int, but in SeekBlockLeq() it is uint32. let's make it consistent, for example:

for i := int(index) + 1; i <len(t.MinTIDs); i++ {
	if t.MinTIDs[i] == tid && nextLID >= t.FirstLIDs[i] {
		res = uint32(i)
		continue
	}
	break
}

continue
}
break
}
return res
}

// SeekBlockLeq finds next block with lowest index for provided TID which contains LIDs
// less or equal to provided LID starting from provided index (inclusive).
// - index: an index of block which is already suits and contains next portion of LIDs. Safe to return for old fractions.
func (t *Table) SeekBlockLeq(index uint32, tid uint32, nextLID uint32) uint32 {
if t.FracVer < config.BinaryDataV5 {
// not supported for old frac versions
return index
}

res := index
for i := int(index) - 1; i >= 0; i-- {
if t.MaxTIDs[i] == tid && nextLID <= t.LastLIDs[i] {
res = uint32(i)
continue
}
break
}
return res
}

func (t *Table) HasTIDInPrevBlock(blockIndex, tid uint32) bool {
if blockIndex == 0 { // it is no prev block
return false
Expand Down
12 changes: 6 additions & 6 deletions frac/sealed/sealing/blocks_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type tokensSealBlock struct {

// lidsExt represents the range and continuation status of LID blocks.
type lidsExt struct {
minTID uint32 // First token ID in the LID block
maxTID uint32 // Last token ID in the LID block
isContinued bool // Whether LID sequence continues in next block
minTID uint32 // First token ID in the LID block
maxTID uint32 // Last token ID in the LID block
firstLID uint32 // First LID in the LID block
lastLID uint32 // Last LID in the LID block
}

// lidsSealBlock represents a sealed block containing LID (Local ID) data.
Expand Down Expand Up @@ -282,8 +283,7 @@ func (a *lidAccumulator) finalizeBlock() lidsSealBlock {

result := a.currentBlock
result.payload.IsLastLID = a.isEndOfToken
result.ext.isContinued = a.isContinued

a.isContinued = !a.isEndOfToken
result.ext.firstLID = a.currentBlock.payload.LIDs[0]
result.ext.lastLID = a.currentBlock.payload.LIDs[len(a.currentBlock.payload.LIDs)-1]
return result
}
12 changes: 6 additions & 6 deletions frac/sealed/sealing/blocks_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,27 +253,27 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) {

expectedLIDBlocks := []lidsSealBlock{
{
ext: lidsExt{minTID: 1, maxTID: 1, isContinued: false},
ext: lidsExt{minTID: 1, maxTID: 1, firstLID: 10, lastLID: 30},
payload: lids.Block{LIDs: []uint32{10, 20, 30}, Offsets: []uint32{0, 3}, IsLastLID: false},
},
{
ext: lidsExt{minTID: 1, maxTID: 3, isContinued: true},
ext: lidsExt{minTID: 1, maxTID: 3, firstLID: 40, lastLID: 3},
payload: lids.Block{LIDs: []uint32{40, 2, 3}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true},
},
{
ext: lidsExt{minTID: 4, maxTID: 6, isContinued: false},
ext: lidsExt{minTID: 4, maxTID: 6, firstLID: 4, lastLID: 6},
payload: lids.Block{LIDs: []uint32{4, 5, 6}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true},
},
{
ext: lidsExt{minTID: 7, maxTID: 9, isContinued: false},
ext: lidsExt{minTID: 7, maxTID: 9, firstLID: 7, lastLID: 9},
payload: lids.Block{LIDs: []uint32{7, 8, 9}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true},
},
{
ext: lidsExt{minTID: 10, maxTID: 12, isContinued: false},
ext: lidsExt{minTID: 10, maxTID: 12, firstLID: 10, lastLID: 12},
payload: lids.Block{LIDs: []uint32{10, 11, 12}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true},
},
{
ext: lidsExt{minTID: 13, maxTID: 14, isContinued: false},
ext: lidsExt{minTID: 13, maxTID: 14, firstLID: 13, lastLID: 14},
payload: lids.Block{LIDs: []uint32{13, 14}, Offsets: []uint32{0, 1, 2}, IsLastLID: true},
},
}
Expand Down
17 changes: 7 additions & 10 deletions frac/sealed/sealing/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sealing
import (
"io"

"github.com/ozontech/seq-db/config"

"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
Expand Down Expand Up @@ -288,22 +290,17 @@ func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock {
// packLIDsBlock packs Local IDs (LIDs) into a compressed index block.
// Also updates LIDs table for preloaded data access.
func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock {
var ext1 uint64
if block.ext.isContinued { // todo: Legacy continuation flag
ext1 = 1
block.ext.minTID++ // Adjust for legacy format
}

// Update LIDs table for PreloadedData
s.lidsTable.MinTIDs = append(s.lidsTable.MinTIDs, block.ext.minTID)
s.lidsTable.MaxTIDs = append(s.lidsTable.MaxTIDs, block.ext.maxTID)
s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.ext.isContinued)
s.lidsTable.FirstLIDs = append(s.lidsTable.FirstLIDs, block.ext.firstLID)
s.lidsTable.LastLIDs = append(s.lidsTable.LastLIDs, block.ext.lastLID)
s.lidsTable.FracVer = config.CurrentFracVersion

// Packing block
s.buf1 = block.payload.Pack(s.buf1[:0])
b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel)
b.ext1 = ext1 // Legacy continuation flag
b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range

b.ext1 = uint64(block.ext.lastLID)<<32 | uint64(block.ext.firstLID) // LID range
b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range
return b
}
31 changes: 23 additions & 8 deletions frac/sealed_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (l *LegacyLoader) Load(blocksData *sealed.BlocksData, info *common.Info, re
logger.Fatal("legacy load ids error", zap.Error(err))
}

blocksData.LIDsTable, err = l.loadLIDsTable()
blocksData.LIDsTable, err = l.loadLIDsTable(info.BinaryDataVer)
if err != nil {
logger.Fatal("legacy load lids error", zap.Error(err))
}
Expand Down Expand Up @@ -127,12 +127,14 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab
}

// loadLIDsTable scans LID block headers, recording the absolute start index for lids.Table.
func (l *LegacyLoader) loadLIDsTable() (*lids.Table, error) {
func (l *LegacyLoader) loadLIDsTable(fracVer config.BinaryDataVersion) (*lids.Table, error) {
startIndex := l.blockIndex // absolute index of first LID block in .index

var (
maxTIDs []uint32
minTIDs []uint32
firstLIDs []uint32
lastLIDs []uint32
isContinued []bool
)

Expand All @@ -150,10 +152,15 @@ func (l *LegacyLoader) loadLIDsTable() (*lids.Table, error) {
maxTIDs = append(maxTIDs, uint32(h.GetExt2()>>32))
minTIDs = append(minTIDs, uint32(h.GetExt2()&0xFFFFFFFF))

isContinued = append(isContinued, h.GetExt1() == 1)
if fracVer >= config.BinaryDataV5 {
lastLIDs = append(lastLIDs, uint32(h.GetExt1()>>32))
firstLIDs = append(firstLIDs, uint32(h.GetExt1()&0xFFFFFFFF))
} else {
isContinued = append(isContinued, h.GetExt1() == 1)
}
}

return lids.NewTable(startIndex, minTIDs, maxTIDs, isContinued), nil
return lids.NewTable(fracVer, startIndex, minTIDs, maxTIDs, firstLIDs, lastLIDs, isContinued), nil
}

// IndexReaders holds one IndexReader per split index file.
Expand Down Expand Up @@ -188,7 +195,7 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, readers
blocksData.BlocksOffsets = blockOffsets.Offsets
blocksData.IDsTable = l.loadIDsTable(readers.ID, blockOffsets.IDsTotal, info.BinaryDataVer)

blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID)
blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID, info.BinaryDataVer)
if err != nil {
logger.Fatal("load lids error", zap.Error(err))
}
Expand Down Expand Up @@ -266,10 +273,12 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio
}

// loadLIDsTable scans block headers in the .lid file to build lids.Table.
func (l *Loader) loadLIDsTable(r storage.IndexReader) (*lids.Table, error) {
func (l *Loader) loadLIDsTable(r storage.IndexReader, fracVer config.BinaryDataVersion) (*lids.Table, error) {
var (
maxTIDs []uint32
minTIDs []uint32
firstLIDs []uint32
lastLIDs []uint32
isContinued []bool
)

Expand All @@ -291,8 +300,14 @@ func (l *Loader) loadLIDsTable(r storage.IndexReader) (*lids.Table, error) {
maxTIDs = append(maxTIDs, uint32(ext2>>32))
minTIDs = append(minTIDs, uint32(ext2&0xFFFFFFFF))

isContinued = append(isContinued, header.GetExt1() == 1)
ext1 := header.GetExt1()
if fracVer >= config.BinaryDataV5 {
lastLIDs = append(lastLIDs, uint32(ext1>>32))
firstLIDs = append(firstLIDs, uint32(ext1&0xFFFFFFFF))
} else {
isContinued = append(isContinued, ext1 == 1)
}
}

return lids.NewTable(0, minTIDs, maxTIDs, isContinued), nil
return lids.NewTable(fracVer, 0, minTIDs, maxTIDs, firstLIDs, lastLIDs, isContinued), nil
}