From fb5efec6936b1178986ad5f84ec3710b90a4e7ea Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Mon, 27 Apr 2026 10:25:09 +0300 Subject: [PATCH 1/7] refactor: introduce `blockbuilder` and `indexwriter` package --- blockbuilder/blocks_builder.go | 303 ++++++++++++++++++ .../blocks_builder_test.go | 74 ++--- frac/fraction_concurrency_test.go | 4 + frac/fraction_test.go | 4 + fracmanager/fraction_provider.go | 2 +- fracmanager/sealer_test.go | 2 +- {frac/sealed/sealing => indexwriter}/index.go | 117 ++++--- .../sealed/sealing => indexwriter}/writer.go | 2 +- {frac/sealed/sealing => sealing}/sealer.go | 137 ++++---- 9 files changed, 460 insertions(+), 185 deletions(-) create mode 100644 blockbuilder/blocks_builder.go rename {frac/sealed/sealing => blockbuilder}/blocks_builder_test.go (76%) rename {frac/sealed/sealing => indexwriter}/index.go (68%) rename {frac/sealed/sealing => indexwriter}/writer.go (99%) rename {frac/sealed/sealing => sealing}/sealer.go (58%) diff --git a/blockbuilder/blocks_builder.go b/blockbuilder/blocks_builder.go new file mode 100644 index 00000000..193b061e --- /dev/null +++ b/blockbuilder/blocks_builder.go @@ -0,0 +1,303 @@ +package blockbuilder + +import ( + "encoding/binary" + "iter" + "unsafe" + + "github.com/ozontech/seq-db/frac/sealed/lids" + "github.com/ozontech/seq-db/frac/sealed/seqids" + "github.com/ozontech/seq-db/frac/sealed/token" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" +) + +type ( + DocLocation = util.Pair[seq.ID, seq.DocPos] + TokenPosting = util.Pair[[]byte, []uint32] + TokenBlock = util.Pair[TokensSealBlock, []token.FieldTable] +) + +// TokensExt represents the token ID range contained in a block. +type TokensExt struct { + MinTID uint32 // First token ID in the block + MaxTID uint32 // Last token ID in the block +} + +// TokensSealBlock represents a sealed block containing token data with metadata. +type TokensSealBlock struct { + Ext TokensExt // Tokens block metadata for registry marking + Payload token.Block // Actual token data payload +} + +// LidsExt represents the range and continuation status of LID blocks. +type LidsExt struct { + MinTID uint32 // First token ID in the LID block + MaxTID uint32 // Last token ID in the LID block + IsContinued bool // Whether LID sequence continues in next block +} + +// LidsSealBlock represents a sealed block containing LID (Local ID) data. +type LidsSealBlock struct { + Ext LidsExt // LIDs block metadata for registry marking + Payload lids.Block // LID data payload +} + +// IdsSealBlock represents a sealed block containing various identifier types. +type IdsSealBlock struct { + MIDs seqids.BlockMIDs + RIDs seqids.BlockRIDs + Params seqids.BlockParams +} + +// BlocksBuilder constructs sealed blocks from various data sources. +type BlocksBuilder struct{} + +func (bb *BlocksBuilder) BuildTokenBlocks( + it iter.Seq2[string, iter.Seq2[TokenPosting, error]], + accumulate func([]uint32) error, blockCapacity int, +) iter.Seq2[TokenBlock, error] { + return func(yield func(TokenBlock, error) bool) { + var ( + block TokensSealBlock + blockIdx uint32 + blockSize int + ) + + var ( + currentTID uint32 + pendingTable []token.FieldTable + fieldName string + fieldEntryStartTID uint32 + ) + + emitFieldEntry := func() { + // Handle case when field does not have tokens. + if fieldName == "" || fieldEntryStartTID > currentTID { + return + } + + entry := newTokenTableEntry(fieldEntryStartTID, currentTID, blockIdx, block) + pendingTable = append(pendingTable, token.FieldTable{ + Field: fieldName, + Entries: []*token.TableEntry{entry}, + }) + } + + flushBlock := func() bool { + emitFieldEntry() + block.Ext.MaxTID = currentTID + + pair := TokenBlock{First: block, Second: pendingTable} + if !yield(pair, nil) { + return false + } + + block.Payload.Payload = block.Payload.Payload[:0] + block.Payload.Offsets = block.Payload.Offsets[:0] + block.Ext.MinTID = currentTID + 1 + + blockIdx++ + blockSize = 0 + + pendingTable = pendingTable[:0] + fieldEntryStartTID = currentTID + 1 + + return true + } + + block.Ext.MinTID = 1 + for field, tokIt := range it { + emitFieldEntry() + + fieldName = field + fieldEntryStartTID = currentTID + 1 + + for pair, err := range tokIt { + if err != nil { + yield(TokenBlock{}, err) + return + } + + tok, tlids := pair.First, pair.Second + tokenSize := int(unsafe.Sizeof(uint32(0))) + len(tok) + + if blockSize > 0 && blockSize+tokenSize > blockCapacity { + if !flushBlock() { + return + } + } + + block.Payload.Offsets = append(block.Payload.Offsets, uint32(len(block.Payload.Payload))) + block.Payload.Payload = binary.LittleEndian.AppendUint32(block.Payload.Payload, uint32(len(tok))) + block.Payload.Payload = append(block.Payload.Payload, tok...) + + if err := accumulate(tlids); err != nil { + yield(TokenBlock{}, err) + return + } + + currentTID++ + blockSize += tokenSize + } + } + + if blockSize > 0 { + flushBlock() + } + } +} + +func newTokenTableEntry( + entryStartTID, entryEndTID uint32, + blockIndex uint32, block TokensSealBlock, +) *token.TableEntry { + // Convert global TIDs to block-local indices + firstIndex := entryStartTID - block.Ext.MinTID + lastIndex := entryEndTID - block.Ext.MinTID + + // Extract min and max token values for the entry range + minVal := string(block.Payload.GetToken(int(firstIndex))) + maxVal := string(block.Payload.GetToken(int(lastIndex))) + + return &token.TableEntry{ + StartIndex: firstIndex, // Starting index within the block + StartTID: entryStartTID, // Starting token ID (global) + BlockIndex: blockIndex, // Reference to containing block + ValCount: lastIndex - firstIndex + 1, // Number of tokens in this entry + MinVal: minVal, // Smallest token value in range + MaxVal: maxVal, // Largest token value in range + } +} + +// SeqBlockID accumulates scalar (ID, position) pairs into sealed ID blocks. +// A new block is yielded every `blockCapacity` IDs. +func SeqBlockID(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[IdsSealBlock, error] { + return func(yield func(IdsSealBlock, error) bool) { + var block IdsSealBlock + + for pair, err := range ids { + if err != nil { + yield(IdsSealBlock{}, err) + return + } + + id, pos := pair.First, pair.Second + block.MIDs.Values = append(block.MIDs.Values, uint64(id.MID)) + block.RIDs.Values = append(block.RIDs.Values, uint64(id.RID)) + block.Params.Values = append(block.Params.Values, uint64(pos)) + + if len(block.MIDs.Values) == blockCapacity { + if !yield(block, nil) { + return + } + + block.MIDs.Values = block.MIDs.Values[:0] + block.RIDs.Values = block.RIDs.Values[:0] + block.Params.Values = block.Params.Values[:0] + } + } + + if len(block.MIDs.Values) > 0 { + yield(block, nil) + } + } +} + +// LidBlocksAcc accumulates LIDs into sealed LID blocks. +type LidBlocksAcc struct { + blockCapacity int + + currentTID uint32 + currentBlock LidsSealBlock + + isEndOfToken bool + isContinued bool +} + +func NewLIDBlocksAccumulator(blockCapacity int) *LidBlocksAcc { + a := &LidBlocksAcc{blockCapacity: blockCapacity} + + a.currentBlock.Ext.MinTID = 1 + a.currentBlock.Payload = lids.Block{ + LIDs: make([]uint32, 0, blockCapacity), + Offsets: []uint32{0}, + } + + return a +} + +// Add processes LIDs of one token (must be called in TID order). +// +// For each block that fills up, `onBlock` is called immediately +// before the backing arrays are reset, so `onBlock` may read the +// block data but must not retain references to it. +func (a *LidBlocksAcc) Add(lidsbuf []uint32, onBlock func(LidsSealBlock) error) error { + a.currentTID++ + + for _, lid := range lidsbuf { + if len(a.currentBlock.Payload.LIDs) == a.blockCapacity { + if err := onBlock(a.finalizeBlock()); err != nil { + return err + } + + a.currentBlock.Ext.MinTID = a.currentTID + a.currentBlock.Payload.LIDs = a.currentBlock.Payload.LIDs[:0] + a.currentBlock.Payload.Offsets = a.currentBlock.Payload.Offsets[:1] + } + + a.isEndOfToken = false + a.currentBlock.Ext.MaxTID = a.currentTID + a.currentBlock.Payload.LIDs = append(a.currentBlock.Payload.LIDs, lid) + } + + a.isEndOfToken = true + a.currentBlock.Payload.Offsets = append( + a.currentBlock.Payload.Offsets, + uint32(len(a.currentBlock.Payload.LIDs)), + ) + + return nil +} + +func (a *LidBlocksAcc) Flush() LidsSealBlock { + return a.finalizeBlock() +} + +func (a *LidBlocksAcc) finalizeBlock() LidsSealBlock { + if !a.isEndOfToken { + a.currentBlock.Payload.Offsets = append( + a.currentBlock.Payload.Offsets, + uint32(len(a.currentBlock.Payload.LIDs)), + ) + } + + result := a.currentBlock + result.Payload.IsLastLID = a.isEndOfToken + result.Ext.IsContinued = a.isContinued + + a.isContinued = !a.isEndOfToken + return result +} + +// CollapseOrderedFieldsTables merges FieldTables with the same field name. +// Assumes input is sorted by Field. +func CollapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable { + if len(src) == 0 { + return nil + } + + current := src[0] + var dst []token.FieldTable + for _, ft := range src[1:] { + if current.Field == ft.Field { + current.Entries = append(current.Entries, ft.Entries...) + continue + } + + dst = append(dst, current) + current = ft + } + + return append(dst, current) +} diff --git a/frac/sealed/sealing/blocks_builder_test.go b/blockbuilder/blocks_builder_test.go similarity index 76% rename from frac/sealed/sealing/blocks_builder_test.go rename to blockbuilder/blocks_builder_test.go index d6bca144..34295a91 100644 --- a/frac/sealed/sealing/blocks_builder_test.go +++ b/blockbuilder/blocks_builder_test.go @@ -1,4 +1,4 @@ -package sealing +package blockbuilder import ( "iter" @@ -7,27 +7,20 @@ import ( "github.com/stretchr/testify/assert" - "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/seq" ) -var _ Source = (*mockSource)(nil) - type mockSource struct { - info common.Info - tokens [][]byte - fields []string - fieldMaxTIDs []uint32 - ids []seq.ID - pos []seq.DocPos - tokenLIDs [][]uint32 - blocksOffsets []uint64 + tokens [][]byte + fields []string + fieldMaxTIDs []uint32 + ids []seq.ID + pos []seq.DocPos + tokenLIDs [][]uint32 } -func (m *mockSource) Info() *common.Info { return &m.info } - func (m *mockSource) TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] { return func(yield func(string, iter.Seq2[TokenPosting, error]) bool) { start := 0 @@ -48,8 +41,7 @@ func (m *mockSource) tokensForField(start, end int) iter.Seq2[TokenPosting, erro if j < len(m.tokenLIDs) { lidsbuf = m.tokenLIDs[j] } - pair := TokenPosting{First: m.tokens[j], Second: lidsbuf} - if !yield(pair, nil) { + if !yield(TokenPosting{First: m.tokens[j], Second: lidsbuf}, nil) { return } } @@ -66,8 +58,6 @@ func (m *mockSource) ID() iter.Seq2[DocLocation, error] { } } -func (m *mockSource) BlockOffsets() []uint64 { return m.blocksOffsets } - func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { src := mockSource{ tokens: [][]byte{ @@ -145,16 +135,16 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { for pair, err := range tokenBlocks { assert.NoError(t, err) block, fieldsTables := pair.First, pair.Second - assert.Equal(t, expectedSizes[blockIndex], block.payload.Len()) - for i := range block.payload.Len() { + assert.Equal(t, expectedSizes[blockIndex], block.Payload.Len()) + for i := range block.Payload.Len() { tid++ - assert.Equal(t, src.tokens[tid-1], block.payload.GetToken(i)) + assert.Equal(t, src.tokens[tid-1], block.Payload.GetToken(i)) } allFieldsTables = append(allFieldsTables, fieldsTables...) blockIndex++ } - actualTokenTable := token.TableBlock{FieldsTables: collapseOrderedFieldsTables(allFieldsTables)} + actualTokenTable := token.TableBlock{FieldsTables: CollapseOrderedFieldsTables(allFieldsTables)} assert.Equal(t, tid, len(src.tokens)) expectedTokenTable := token.TableBlock{ @@ -251,30 +241,30 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { assert.Equal(t, actualTokenTable.FieldsTables, expectedTokenTable.FieldsTables) assert.NoError(t, lidAccumulator.Finalize()) - expectedLIDBlocks := []lidsSealBlock{ + expectedLIDBlocks := []LidsSealBlock{ { - ext: lidsExt{minTID: 1, maxTID: 1, isContinued: false}, - payload: lids.Block{LIDs: []uint32{10, 20, 30}, Offsets: []uint32{0, 3}, IsLastLID: false}, + Ext: LidsExt{MinTID: 1, MaxTID: 1, IsContinued: false}, + Payload: lids.Block{LIDs: []uint32{10, 20, 30}, Offsets: []uint32{0, 3}, IsLastLID: false}, }, { - ext: lidsExt{minTID: 1, maxTID: 3, isContinued: true}, - payload: lids.Block{LIDs: []uint32{40, 2, 3}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + Ext: LidsExt{MinTID: 1, MaxTID: 3, IsContinued: true}, + Payload: lids.Block{LIDs: []uint32{40, 2, 3}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 4, maxTID: 6, isContinued: false}, - payload: lids.Block{LIDs: []uint32{4, 5, 6}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + Ext: LidsExt{MinTID: 4, MaxTID: 6, IsContinued: false}, + Payload: lids.Block{LIDs: []uint32{4, 5, 6}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 7, maxTID: 9, isContinued: false}, - payload: lids.Block{LIDs: []uint32{7, 8, 9}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + Ext: LidsExt{MinTID: 7, MaxTID: 9, IsContinued: false}, + Payload: lids.Block{LIDs: []uint32{7, 8, 9}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 10, maxTID: 12, isContinued: false}, - payload: lids.Block{LIDs: []uint32{10, 11, 12}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + Ext: LidsExt{MinTID: 10, MaxTID: 12, IsContinued: false}, + Payload: lids.Block{LIDs: []uint32{10, 11, 12}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 13, maxTID: 14, isContinued: false}, - payload: lids.Block{LIDs: []uint32{13, 14}, Offsets: []uint32{0, 1, 2}, IsLastLID: true}, + Ext: LidsExt{MinTID: 13, MaxTID: 14, IsContinued: false}, + Payload: lids.Block{LIDs: []uint32{13, 14}, Offsets: []uint32{0, 1, 2}, IsLastLID: true}, }, } assert.Equal(t, expectedLIDBlocks, lidBlocks) @@ -313,18 +303,18 @@ func TestBlocksBuilder_IDsBlocks(t *testing.T) { i := 0 ids := []seq.ID{} pos := []seq.DocPos{} - for block, err := range seqBlockID(src.ID(), 3) { + for block, err := range SeqBlockID(src.ID(), 3) { assert.NoError(t, err) - assert.Equal(t, expectedSizes[i], len(block.mids.Values)) - assert.Equal(t, expectedSizes[i], len(block.rids.Values)) - assert.Equal(t, expectedSizes[i], len(block.params.Values)) + assert.Equal(t, expectedSizes[i], len(block.MIDs.Values)) + assert.Equal(t, expectedSizes[i], len(block.RIDs.Values)) + assert.Equal(t, expectedSizes[i], len(block.Params.Values)) i++ j := 0 - for _, mid := range block.mids.Values { - ids = append(ids, seq.ID{MID: seq.MID(mid), RID: seq.RID(block.rids.Values[j])}) - pos = append(pos, seq.DocPos(block.params.Values[j])) + for _, mid := range block.MIDs.Values { + ids = append(ids, seq.ID{MID: seq.MID(mid), RID: seq.RID(block.RIDs.Values[j])}) + pos = append(pos, seq.DocPos(block.Params.Values[j])) j++ } } diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 27f5d971..639a44c2 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -16,9 +16,13 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/sealing" + "github.com/ozontech/seq-db/frac/sealed/seqids" + "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" testcommon "github.com/ozontech/seq-db/tests/common" diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 8757c0db..dcc534d9 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -22,10 +22,14 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/sealing" + "github.com/ozontech/seq-db/frac/sealed/seqids" + "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index 66e6477b..db5feb33 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -12,8 +12,8 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" - "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/node" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" ) diff --git a/fracmanager/sealer_test.go b/fracmanager/sealer_test.go index f85c3f8f..51c16b6b 100644 --- a/fracmanager/sealer_test.go +++ b/fracmanager/sealer_test.go @@ -19,8 +19,8 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" - "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/indexer" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/seq" testscommon "github.com/ozontech/seq-db/tests/common" ) diff --git a/frac/sealed/sealing/index.go b/indexwriter/index.go similarity index 68% rename from frac/sealed/sealing/index.go rename to indexwriter/index.go index 5c23842a..1060f76d 100644 --- a/frac/sealed/sealing/index.go +++ b/indexwriter/index.go @@ -1,8 +1,10 @@ -package sealing +package indexwriter import ( "io" + "iter" + "github.com/ozontech/seq-db/blockbuilder" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" @@ -14,6 +16,25 @@ import ( "github.com/ozontech/seq-db/zstd" ) +// Source defines the data required to write all index files for a fraction. +type Source interface { + // Info returns metadata describing this source. + Info() *common.Info + + // ID returns an iterator over stored document identifiers paired with + // their positions, in descending [seq.ID] order. + ID() iter.Seq2[blockbuilder.DocLocation, error] + + // BlockOffsets returns byte offsets to each document block + // within this source's `.docs` file. + BlockOffsets() []uint64 + + // TokenTriplet iterates over fields in lexicographic order. + // For each field, it yields tokens (lexicographically sorted) + // paired with the local document ID list for that token. + TokenTriplet() iter.Seq2[string, iter.Seq2[blockbuilder.TokenPosting, error]] +} + // indexBlock is one compressed (or not) block with its registry metadata. type indexBlock struct { codec storage.Codec @@ -27,7 +48,7 @@ func (i indexBlock) Bin(pos int64) (storage.IndexBlockHeader, []byte) { return storage.NewIndexBlockHeader(pos, i.ext1, i.ext2, uint32(len(i.payload)), i.rawLen, i.codec), i.payload } -type IndexSealer struct { +type IndexWriter struct { params common.SealParams buf1 []byte @@ -38,28 +59,28 @@ type IndexSealer struct { tokenTable token.Table } -func NewIndexSealer(params common.SealParams) *IndexSealer { - return &IndexSealer{ +func New(params common.SealParams) *IndexWriter { + return &IndexWriter{ params: params, buf1: make([]byte, 0, consts.RegularBlockSize), buf2: make([]byte, 0, consts.RegularBlockSize), } } -func (s *IndexSealer) LIDsTable() lids.Table { +func (s *IndexWriter) LIDsTable() lids.Table { return s.lidsTable } -func (s *IndexSealer) TokenTable() token.Table { +func (s *IndexWriter) TokenTable() token.Table { return s.tokenTable } -func (s *IndexSealer) IDsTable() seqids.Table { +func (s *IndexWriter) IDsTable() seqids.Table { return s.idsTable } // WriteOffsetsFile writes the .offsets file containing a single BlockOffsets block. -func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { +func (s *IndexWriter) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { w, err := newWriter(ws) if err != nil { return err @@ -78,14 +99,14 @@ func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { return w.finalize() } -func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error { +func (s *IndexWriter) WriteIDFile(ws io.WriteSeeker, src Source) error { w, err := newWriter(ws) if err != nil { return err } defer w.release() - for block, err := range seqBlockID(src.ID(), consts.IDsPerBlock) { + for block, err := range blockbuilder.SeqBlockID(src.ID(), consts.IDsPerBlock) { if err != nil { return err } @@ -106,7 +127,7 @@ func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error { return w.finalize() } -func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) error { +func (s *IndexWriter) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) error { tw, err := newWriter(tws) if err != nil { return err @@ -120,7 +141,7 @@ func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err defer lw.release() var ( - bb blocksBuilder + bb blockbuilder.BlocksBuilder allFieldsTables []token.FieldTable ) @@ -158,7 +179,7 @@ func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) return w.finalize() } -func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.FieldTable) error { +func (s *IndexWriter) finalizeTokenFile(w *writer, allFieldsTables []token.FieldTable) error { // Emit section separator. if err := w.writeEmptyBlock(); err != nil { return err @@ -178,33 +199,11 @@ func (s *IndexSealer) WriteInfoFile(ws io.Writer, src Source) error { return err } -// collapseOrderedFieldsTables merges FieldTables with the same field name. -// Assumes input is sorted by Field. -func collapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable { - if len(src) == 0 { - return nil - } - - current := src[0] - var dst []token.FieldTable - for _, ft := range src[1:] { - if current.Field == ft.Field { - current.Entries = append(current.Entries, ft.Entries...) - continue - } - - dst = append(dst, current) - current = ft - } - - return append(dst, current) -} - func newIndexBlock(raw []byte) indexBlock { return indexBlock{codec: storage.CodecNo, rawLen: uint32(len(raw)), payload: raw} } -func (s *IndexSealer) newIndexBlockZSTD(raw []byte, level int) indexBlock { +func (s *IndexWriter) newIndexBlockZSTD(raw []byte, level int) indexBlock { s.buf2 = zstd.CompressLevel(raw, s.buf2[:0], level) if len(s.buf2) < len(raw) { return indexBlock{codec: storage.CodecZSTD, rawLen: uint32(len(raw)), payload: s.buf2} @@ -213,22 +212,22 @@ func (s *IndexSealer) newIndexBlockZSTD(raw []byte, level int) indexBlock { } // packInfoBlock packs fraction information into an index block. -func (s *IndexSealer) packInfoBlock(block sealed.BlockInfo) indexBlock { +func (s *IndexWriter) packInfoBlock(block sealed.BlockInfo) indexBlock { s.buf1 = block.Pack(s.buf1[:0]) return newIndexBlock(s.buf1) // Info block is typically small, no compression } // packTokenBlock packs token data into a compressed index block. -func (s *IndexSealer) packTokenBlock(block tokensSealBlock) indexBlock { - s.buf1 = block.payload.Pack(s.buf1[:0]) // Pack token data +func (s *IndexWriter) packTokenBlock(block blockbuilder.TokensSealBlock) indexBlock { + s.buf1 = block.Payload.Pack(s.buf1[:0]) // Pack token data b := s.newIndexBlockZSTD(s.buf1, s.params.TokenListZstdLevel) // Store TID range in extended metadata - b.ext1 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) + b.ext1 = uint64(block.Ext.MaxTID)<<32 | uint64(block.Ext.MinTID) return b } // packTokenTableBlock packs the token table into a compressed index block. -func (s *IndexSealer) packTokenTableBlock(tokenTableBlock token.TableBlock) indexBlock { +func (s *IndexWriter) packTokenTableBlock(tokenTableBlock token.TableBlock) indexBlock { s.tokenTable = token.TableFromBlocks([]token.TableBlock{tokenTableBlock}) // Store for PreloadedData // Packing block @@ -237,7 +236,7 @@ func (s *IndexSealer) packTokenTableBlock(tokenTableBlock token.TableBlock) inde } // packBlocksOffsetsBlock packs document block offsets into a compressed index block. -func (s *IndexSealer) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlock { +func (s *IndexWriter) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlock { // Update IDs table for PreloadedData s.idsTable.IDsTotal = block.IDsTotal // Total number of IDs s.idsTable.IDBlocksTotal = uint32(len(block.Offsets)) // Number of ID blocks @@ -249,19 +248,19 @@ func (s *IndexSealer) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlo } // packMIDsBlock packs MIDs into a compressed index block. -func (s *IndexSealer) packMIDsBlock(block idsSealBlock) indexBlock { +func (s *IndexWriter) packMIDsBlock(block blockbuilder.IdsSealBlock) indexBlock { // Get the last ID in the block (smallest due to descending order) - last := len(block.mids.Values) - 1 + last := len(block.MIDs.Values) - 1 minID := seq.ID{ - MID: seq.MID(block.mids.Values[last]), - RID: seq.RID(block.rids.Values[last]), + MID: seq.MID(block.MIDs.Values[last]), + RID: seq.RID(block.RIDs.Values[last]), } s.idsTable.MinBlockIDs = append(s.idsTable.MinBlockIDs, minID) // Store for PreloadedData // Packing block - s.buf1 = block.mids.Pack(s.buf1[:0]) + s.buf1 = block.MIDs.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) // Store min MID and RID in extended metadata @@ -272,38 +271,38 @@ func (s *IndexSealer) packMIDsBlock(block idsSealBlock) indexBlock { } // packRIDsBlock packs RIDs into a compressed index block. -func (s *IndexSealer) packRIDsBlock(block idsSealBlock) indexBlock { - s.buf1 = block.rids.Pack(s.buf1[:0]) +func (s *IndexWriter) packRIDsBlock(block blockbuilder.IdsSealBlock) indexBlock { + s.buf1 = block.RIDs.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) return b } // packPosBlock packs document positions into a compressed index block. -func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock { - s.buf1 = block.params.Pack(s.buf1[:0]) +func (s *IndexWriter) packPosBlock(block blockbuilder.IdsSealBlock) indexBlock { + s.buf1 = block.Params.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) return b } // packLIDsBlock packs Local IDs (LIDs) into a compressed index block. // Also updates LIDs table for preloaded data access. -func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock { +func (s *IndexWriter) packLIDsBlock(block blockbuilder.LidsSealBlock) indexBlock { var ext1 uint64 - if block.ext.isContinued { // todo: Legacy continuation flag + if block.Ext.IsContinued { // todo: Legacy continuation flag ext1 = 1 - block.ext.minTID++ // Adjust for legacy format + block.Ext.MinTID++ // Adjust for legacy format } // Update LIDs table for PreloadedData - s.lidsTable.MinTIDs = append(s.lidsTable.MinTIDs, block.ext.minTID) - s.lidsTable.MaxTIDs = append(s.lidsTable.MaxTIDs, block.ext.maxTID) - s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.ext.isContinued) + s.lidsTable.MinTIDs = append(s.lidsTable.MinTIDs, block.Ext.MinTID) + s.lidsTable.MaxTIDs = append(s.lidsTable.MaxTIDs, block.Ext.MaxTID) + s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.Ext.IsContinued) // Packing block - s.buf1 = block.payload.Pack(s.buf1[:0]) + s.buf1 = block.Payload.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel) b.ext1 = ext1 // Legacy continuation flag - b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range + b.ext2 = uint64(block.Ext.MaxTID)<<32 | uint64(block.Ext.MinTID) // TID range return b } diff --git a/frac/sealed/sealing/writer.go b/indexwriter/writer.go similarity index 99% rename from frac/sealed/sealing/writer.go rename to indexwriter/writer.go index 1a147e4e..1fb9909d 100644 --- a/frac/sealed/sealing/writer.go +++ b/indexwriter/writer.go @@ -1,4 +1,4 @@ -package sealing +package indexwriter import ( "bytes" diff --git a/frac/sealed/sealing/sealer.go b/sealing/sealer.go similarity index 58% rename from frac/sealed/sealing/sealer.go rename to sealing/sealer.go index 57863d82..d3af4baf 100644 --- a/frac/sealed/sealing/sealer.go +++ b/sealing/sealer.go @@ -2,40 +2,67 @@ package sealing import ( "errors" - "iter" "os" "path/filepath" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" - "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/indexwriter" "github.com/ozontech/seq-db/util" ) -type ( - DocLocation = util.Pair[seq.ID, seq.DocPos] - TokenPosting = util.Pair[[]byte, []uint32] -) +// Source defines the contract for data sources that can be sealed. +// Provides access to all necessary data components for index creation. +type Source = indexwriter.Source + +func syncAndClose(f *os.File) error { + if err := f.Sync(); err != nil { + f.Close() + return err + } + return f.Close() +} + +func createAndWrite(tmpPath, finalPath string, write func(*os.File) error) error { + f, err := os.Create(tmpPath) + if err != nil { + return err + } + + if err := errors.Join(write(f), syncAndClose(f)); err != nil { + return err + } + + return os.Rename(tmpPath, finalPath) +} -// Source interface defines the contract for data sources that can be sealed. -// Provides access to all necessary data components for index creation -type Source interface { - // Info returns metadata describing this source. - Info() *common.Info +func createAndWriteBoth( + tmpPath1, finalPath1, + tmpPath2, finalPath2 string, + write func(*os.File, *os.File) error, +) error { + f1, err := os.Create(tmpPath1) + if err != nil { + return err + } + + f2, err := os.Create(tmpPath2) + if err != nil { + f1.Close() + return err + } - // ID returns an iterator over stored document identifiers paired with - // their positions, in descending [seq.ID] order. - ID() iter.Seq2[DocLocation, error] + writeErr := write(f1, f2) + if err := errors.Join(writeErr, syncAndClose(f1), syncAndClose(f2)); err != nil { + return err + } - // BlockOffsets returns byte offsets to each document block - // within this source's `.docs` file. - BlockOffsets() []uint64 + if err := os.Rename(tmpPath1, finalPath1); err != nil { + return err + } - // TokenTriplet iterates over fields in lexicographic order. - // For each field, it yields tokens (lexicographically sorted) - // paired with the local document ID list for that token. - TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] + return os.Rename(tmpPath2, finalPath2) } // Seal writes five index files (.info, .token, .offsets, .id, .lid) for the fraction @@ -47,12 +74,12 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { return nil, errors.New("sealing of an empty active fraction is not supported") } - sealer := NewIndexSealer(params) + writer := indexwriter.New(params) if err := createAndWrite( info.Path+consts.OffsetsTmpFileSuffix, info.Path+consts.OffsetsFileSuffix, - func(f *os.File) error { return sealer.WriteOffsetsFile(f, src) }, + func(f *os.File) error { return writer.WriteOffsetsFile(f, src) }, ); err != nil { return nil, err } @@ -60,7 +87,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWrite( info.Path+consts.IDTmpFileSuffix, info.Path+consts.IDFileSuffix, - func(f *os.File) error { return sealer.WriteIDFile(f, src) }, + func(f *os.File) error { return writer.WriteIDFile(f, src) }, ); err != nil { return nil, err } @@ -68,7 +95,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWriteBoth( info.Path+consts.TokenTmpFileSuffix, info.Path+consts.TokenFileSuffix, info.Path+consts.LIDTmpFileSuffix, info.Path+consts.LIDFileSuffix, - func(tokenF, lidF *os.File) error { return sealer.WriteTokenTriplet(tokenF, lidF, src) }, + func(tokenF, lidF *os.File) error { return writer.WriteTokenTriplet(tokenF, lidF, src) }, ); err != nil { return nil, err } @@ -76,7 +103,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWrite( info.Path+consts.InfoTmpFileSuffix, info.Path+consts.InfoFileSuffix, - func(f *os.File) error { return sealer.WriteInfoFile(f, src) }, + func(f *os.File) error { return writer.WriteInfoFile(f, src) }, ); err != nil { return nil, err } @@ -100,13 +127,13 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { } info.IndexOnDisk = totalSize - lidsTable := sealer.LIDsTable() + lidsTable := writer.LIDsTable() preloaded := &sealed.PreloadedData{ Info: info, - TokenTable: sealer.TokenTable(), + TokenTable: writer.TokenTable(), BlocksData: sealed.BlocksData{ - IDsTable: sealer.IDsTable(), + IDsTable: writer.IDsTable(), LIDsTable: &lidsTable, BlocksOffsets: src.BlockOffsets(), }, @@ -114,55 +141,3 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { return preloaded, nil } - -func syncAndClose(f *os.File) error { - if err := f.Sync(); err != nil { - f.Close() - return err - } - return f.Close() -} - -func createAndWrite( - tmp, final string, - write func(*os.File) error, -) error { - f, err := os.Create(tmp) - if err != nil { - return err - } - - if err := errors.Join(write(f), syncAndClose(f)); err != nil { - return err - } - - return os.Rename(tmp, final) -} - -func createAndWriteBoth( - tmpa, finala, - tmpb, finalb string, - write func(*os.File, *os.File) error, -) error { - a, err := os.Create(tmpa) - if err != nil { - return err - } - - b, err := os.Create(tmpb) - if err != nil { - a.Close() - return err - } - - writeErr := write(a, b) - if err := errors.Join(writeErr, syncAndClose(a), syncAndClose(b)); err != nil { - return err - } - - if err := os.Rename(tmpa, finala); err != nil { - return err - } - - return os.Rename(tmpb, finalb) -} From f04d7e008b2a5ee3a6b83c2e7cb0f7190f31c0f0 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Mon, 27 Apr 2026 10:26:45 +0300 Subject: [PATCH 2/7] refactor: filename similar to package name --- blockbuilder/{blocks_builder.go => block_builder.go} | 0 blockbuilder/{blocks_builder_test.go => block_builder_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename blockbuilder/{blocks_builder.go => block_builder.go} (100%) rename blockbuilder/{blocks_builder_test.go => block_builder_test.go} (100%) diff --git a/blockbuilder/blocks_builder.go b/blockbuilder/block_builder.go similarity index 100% rename from blockbuilder/blocks_builder.go rename to blockbuilder/block_builder.go diff --git a/blockbuilder/blocks_builder_test.go b/blockbuilder/block_builder_test.go similarity index 100% rename from blockbuilder/blocks_builder_test.go rename to blockbuilder/block_builder_test.go From feae3718447be74dac957d7116ff3b6563d8153a Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Mon, 27 Apr 2026 10:30:58 +0300 Subject: [PATCH 3/7] refactor: remove `BlockBuilder` type --- blockbuilder/block_builder.go | 5 +---- indexwriter/index.go | 3 +-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/blockbuilder/block_builder.go b/blockbuilder/block_builder.go index 193b061e..262a2c77 100644 --- a/blockbuilder/block_builder.go +++ b/blockbuilder/block_builder.go @@ -50,10 +50,7 @@ type IdsSealBlock struct { Params seqids.BlockParams } -// BlocksBuilder constructs sealed blocks from various data sources. -type BlocksBuilder struct{} - -func (bb *BlocksBuilder) BuildTokenBlocks( +func BuildTokenBlocks( it iter.Seq2[string, iter.Seq2[TokenPosting, error]], accumulate func([]uint32) error, blockCapacity int, ) iter.Seq2[TokenBlock, error] { diff --git a/indexwriter/index.go b/indexwriter/index.go index 1060f76d..286b620c 100644 --- a/indexwriter/index.go +++ b/indexwriter/index.go @@ -141,7 +141,6 @@ func (s *IndexWriter) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err defer lw.release() var ( - bb blockbuilder.BlocksBuilder allFieldsTables []token.FieldTable ) @@ -152,7 +151,7 @@ func (s *IndexWriter) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err }, ) - for pair, err := range bb.BuildTokenBlocks(src.TokenTriplet(), lidAccumulator.Add, consts.RegularBlockSize) { + for pair, err := range blockbuilder.BuildTokenBlocks(src.TokenTriplet(), accumulate, consts.RegularBlockSize) { if err != nil { return err } From 3255f413277e7c55baf992f608623393efbce0e3 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Mon, 27 Apr 2026 10:33:20 +0300 Subject: [PATCH 4/7] refactor: move unexported functions --- sealing/sealer.go | 98 +++++++++++++++++++++++------------------------ 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/sealing/sealer.go b/sealing/sealer.go index d3af4baf..edd263fe 100644 --- a/sealing/sealer.go +++ b/sealing/sealer.go @@ -16,55 +16,6 @@ import ( // Provides access to all necessary data components for index creation. type Source = indexwriter.Source -func syncAndClose(f *os.File) error { - if err := f.Sync(); err != nil { - f.Close() - return err - } - return f.Close() -} - -func createAndWrite(tmpPath, finalPath string, write func(*os.File) error) error { - f, err := os.Create(tmpPath) - if err != nil { - return err - } - - if err := errors.Join(write(f), syncAndClose(f)); err != nil { - return err - } - - return os.Rename(tmpPath, finalPath) -} - -func createAndWriteBoth( - tmpPath1, finalPath1, - tmpPath2, finalPath2 string, - write func(*os.File, *os.File) error, -) error { - f1, err := os.Create(tmpPath1) - if err != nil { - return err - } - - f2, err := os.Create(tmpPath2) - if err != nil { - f1.Close() - return err - } - - writeErr := write(f1, f2) - if err := errors.Join(writeErr, syncAndClose(f1), syncAndClose(f2)); err != nil { - return err - } - - if err := os.Rename(tmpPath1, finalPath1); err != nil { - return err - } - - return os.Rename(tmpPath2, finalPath2) -} - // Seal writes five index files (.info, .token, .offsets, .id, .lid) for the fraction // and returns PreloadedData for fast initialization of the sealed fraction. func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { @@ -141,3 +92,52 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { return preloaded, nil } + +func syncAndClose(f *os.File) error { + if err := f.Sync(); err != nil { + f.Close() + return err + } + return f.Close() +} + +func createAndWrite(tmp, final string, write func(*os.File) error) error { + f, err := os.Create(tmp) + if err != nil { + return err + } + + if err := errors.Join(write(f), syncAndClose(f)); err != nil { + return err + } + + return os.Rename(tmp, final) +} + +func createAndWriteBoth( + atmp, afinal, + btmp, bfinal string, + write func(*os.File, *os.File) error, +) error { + a, err := os.Create(atmp) + if err != nil { + return err + } + + b, err := os.Create(btmp) + if err != nil { + a.Close() + return err + } + + writeErr := write(a, b) + if err := errors.Join(writeErr, syncAndClose(a), syncAndClose(b)); err != nil { + return err + } + + if err := os.Rename(atmp, afinal); err != nil { + return err + } + + return os.Rename(btmp, bfinal) +} From 5117794fe560b5caca8411554db8cf3a18989786 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Thu, 7 May 2026 11:06:53 +0300 Subject: [PATCH 5/7] chore: fix rebase conflicts --- blockbuilder/block_builder.go | 82 +------- blockbuilder/block_builder_test.go | 19 +- blockbuilder/lid_accumulator.go | 85 ++++++++ frac/fraction_concurrency_test.go | 4 - frac/fraction_test.go | 4 - frac/sealed/sealing/blocks_builder.go | 285 -------------------------- indexwriter/index.go | 29 +-- sealing/sealer.go | 17 +- 8 files changed, 120 insertions(+), 405 deletions(-) create mode 100644 blockbuilder/lid_accumulator.go delete mode 100644 frac/sealed/sealing/blocks_builder.go diff --git a/blockbuilder/block_builder.go b/blockbuilder/block_builder.go index 262a2c77..8103dedf 100644 --- a/blockbuilder/block_builder.go +++ b/blockbuilder/block_builder.go @@ -50,7 +50,7 @@ type IdsSealBlock struct { Params seqids.BlockParams } -func BuildTokenBlocks( +func TokenBlocks( it iter.Seq2[string, iter.Seq2[TokenPosting, error]], accumulate func([]uint32) error, blockCapacity int, ) iter.Seq2[TokenBlock, error] { @@ -167,9 +167,9 @@ func newTokenTableEntry( } } -// SeqBlockID accumulates scalar (ID, position) pairs into sealed ID blocks. +// IDBlock accumulates scalar (ID, position) pairs into sealed ID blocks. // A new block is yielded every `blockCapacity` IDs. -func SeqBlockID(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[IdsSealBlock, error] { +func IDBlock(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[IdsSealBlock, error] { return func(yield func(IdsSealBlock, error) bool) { var block IdsSealBlock @@ -201,82 +201,6 @@ func SeqBlockID(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[ } } -// LidBlocksAcc accumulates LIDs into sealed LID blocks. -type LidBlocksAcc struct { - blockCapacity int - - currentTID uint32 - currentBlock LidsSealBlock - - isEndOfToken bool - isContinued bool -} - -func NewLIDBlocksAccumulator(blockCapacity int) *LidBlocksAcc { - a := &LidBlocksAcc{blockCapacity: blockCapacity} - - a.currentBlock.Ext.MinTID = 1 - a.currentBlock.Payload = lids.Block{ - LIDs: make([]uint32, 0, blockCapacity), - Offsets: []uint32{0}, - } - - return a -} - -// Add processes LIDs of one token (must be called in TID order). -// -// For each block that fills up, `onBlock` is called immediately -// before the backing arrays are reset, so `onBlock` may read the -// block data but must not retain references to it. -func (a *LidBlocksAcc) Add(lidsbuf []uint32, onBlock func(LidsSealBlock) error) error { - a.currentTID++ - - for _, lid := range lidsbuf { - if len(a.currentBlock.Payload.LIDs) == a.blockCapacity { - if err := onBlock(a.finalizeBlock()); err != nil { - return err - } - - a.currentBlock.Ext.MinTID = a.currentTID - a.currentBlock.Payload.LIDs = a.currentBlock.Payload.LIDs[:0] - a.currentBlock.Payload.Offsets = a.currentBlock.Payload.Offsets[:1] - } - - a.isEndOfToken = false - a.currentBlock.Ext.MaxTID = a.currentTID - a.currentBlock.Payload.LIDs = append(a.currentBlock.Payload.LIDs, lid) - } - - a.isEndOfToken = true - a.currentBlock.Payload.Offsets = append( - a.currentBlock.Payload.Offsets, - uint32(len(a.currentBlock.Payload.LIDs)), - ) - - return nil -} - -func (a *LidBlocksAcc) Flush() LidsSealBlock { - return a.finalizeBlock() -} - -func (a *LidBlocksAcc) finalizeBlock() LidsSealBlock { - if !a.isEndOfToken { - a.currentBlock.Payload.Offsets = append( - a.currentBlock.Payload.Offsets, - uint32(len(a.currentBlock.Payload.LIDs)), - ) - } - - result := a.currentBlock - result.Payload.IsLastLID = a.isEndOfToken - result.Ext.IsContinued = a.isContinued - - a.isContinued = !a.isEndOfToken - return result -} - // CollapseOrderedFieldsTables merges FieldTables with the same field name. // Assumes input is sorted by Field. func CollapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable { diff --git a/blockbuilder/block_builder_test.go b/blockbuilder/block_builder_test.go index 34295a91..95fe7698 100644 --- a/blockbuilder/block_builder_test.go +++ b/blockbuilder/block_builder_test.go @@ -104,23 +104,20 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { const blockSize = 24 const lidBlockCap = 3 - var lidBlocks []lidsSealBlock - lidAccumulator := newLIDAccumulator( + var lidBlocks []LidsSealBlock + lidAccumulator := NewLIDAccumulator( lidBlockCap, - func(block lidsSealBlock) error { - block.payload.LIDs = slices.Clone(block.payload.LIDs) - block.payload.Offsets = slices.Clone(block.payload.Offsets) + func(block LidsSealBlock) error { + block.Payload.LIDs = slices.Clone(block.Payload.LIDs) + block.Payload.Offsets = slices.Clone(block.Payload.Offsets) lidBlocks = append(lidBlocks, block) return nil }, ) - var bb blocksBuilder - tokenBlocks := bb.BuildTokenBlocks( + tokenBlocks := TokenBlocks( src.TokenTriplet(), - func(lids []uint32) error { - return lidAccumulator.Add(lids) - }, + lidAccumulator.Add, blockSize, ) @@ -303,7 +300,7 @@ func TestBlocksBuilder_IDsBlocks(t *testing.T) { i := 0 ids := []seq.ID{} pos := []seq.DocPos{} - for block, err := range SeqBlockID(src.ID(), 3) { + for block, err := range IDBlock(src.ID(), 3) { assert.NoError(t, err) assert.Equal(t, expectedSizes[i], len(block.MIDs.Values)) diff --git a/blockbuilder/lid_accumulator.go b/blockbuilder/lid_accumulator.go new file mode 100644 index 00000000..ef81a970 --- /dev/null +++ b/blockbuilder/lid_accumulator.go @@ -0,0 +1,85 @@ +package blockbuilder + +import "github.com/ozontech/seq-db/frac/sealed/lids" + +type LIDAccumulator struct { + blockCapacity int + onBlock func(LidsSealBlock) error + + currentTID uint32 + currentBlock LidsSealBlock + + isEndOfToken bool + isContinued bool +} + +func NewLIDAccumulator( + blockCapacity int, + onBlock func(LidsSealBlock) error, +) *LIDAccumulator { + a := &LIDAccumulator{ + blockCapacity: blockCapacity, + onBlock: onBlock, + } + + a.currentBlock.Ext.MinTID = 1 + a.currentBlock.Payload = lids.Block{ + LIDs: make([]uint32, 0, blockCapacity), + Offsets: []uint32{0}, + } + + return a +} + +// Add processes LIDs of one token (must be called in TID order). +// +// For each block that fills up, `onBlock` is called immediately +// before the backing arrays are reset, so `onBlock` may read the +// block data but must not retain references to it. +func (a *LIDAccumulator) Add(lidsbuf []uint32) error { + a.currentTID++ + + for _, lid := range lidsbuf { + if len(a.currentBlock.Payload.LIDs) == a.blockCapacity { + if err := a.onBlock(a.finalizeBlock()); err != nil { + return err + } + + a.currentBlock.Ext.MinTID = a.currentTID + a.currentBlock.Payload.LIDs = a.currentBlock.Payload.LIDs[:0] + a.currentBlock.Payload.Offsets = a.currentBlock.Payload.Offsets[:1] + } + + a.isEndOfToken = false + a.currentBlock.Ext.MaxTID = a.currentTID + a.currentBlock.Payload.LIDs = append(a.currentBlock.Payload.LIDs, lid) + } + + a.isEndOfToken = true + a.currentBlock.Payload.Offsets = append( + a.currentBlock.Payload.Offsets, + uint32(len(a.currentBlock.Payload.LIDs)), + ) + + return nil +} + +func (a *LIDAccumulator) Finalize() error { + return a.onBlock(a.finalizeBlock()) +} + +func (a *LIDAccumulator) finalizeBlock() LidsSealBlock { + if !a.isEndOfToken { + a.currentBlock.Payload.Offsets = append( + a.currentBlock.Payload.Offsets, + uint32(len(a.currentBlock.Payload.LIDs)), + ) + } + + result := a.currentBlock + result.Payload.IsLastLID = a.isEndOfToken + result.Ext.IsContinued = a.isContinued + + a.isContinued = !a.isEndOfToken + return result +} diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 639a44c2..b37cf2a3 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -16,10 +16,6 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/frac/sealed/lids" - "github.com/ozontech/seq-db/frac/sealed/sealing" - "github.com/ozontech/seq-db/frac/sealed/seqids" - "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/parser" "github.com/ozontech/seq-db/sealing" diff --git a/frac/fraction_test.go b/frac/fraction_test.go index dcc534d9..0fee4795 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -22,10 +22,6 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/frac/sealed/lids" - "github.com/ozontech/seq-db/frac/sealed/sealing" - "github.com/ozontech/seq-db/frac/sealed/seqids" - "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/parser" diff --git a/frac/sealed/sealing/blocks_builder.go b/frac/sealed/sealing/blocks_builder.go deleted file mode 100644 index fc069cbf..00000000 --- a/frac/sealed/sealing/blocks_builder.go +++ /dev/null @@ -1,285 +0,0 @@ -package sealing - -import ( - "encoding/binary" - "iter" - "unsafe" - - "github.com/ozontech/seq-db/frac/sealed/lids" - "github.com/ozontech/seq-db/frac/sealed/seqids" - "github.com/ozontech/seq-db/frac/sealed/token" - "github.com/ozontech/seq-db/util" -) - -type ( - TokenBlock = util.Pair[tokensSealBlock, []token.FieldTable] -) - -// tokensExt represents the token ID range contained in a block. -type tokensExt struct { - minTID uint32 // First token ID in the block - maxTID uint32 // Last token ID in the block -} - -// tokensSealBlock represents a sealed block containing token data with metadata. -type tokensSealBlock struct { - ext tokensExt // Tokens block metadata for registry marking - payload token.Block // Actual token data payload -} - -// lidsExt represents the range and continuation status of LID blocks. -type lidsExt struct { - minTID uint32 // First token ID in the LID block - maxTID uint32 // Last token ID in the LID block - isContinued bool // Whether LID sequence continues in next block -} - -// lidsSealBlock represents a sealed block containing LID (Local ID) data. -type lidsSealBlock struct { - ext lidsExt // LIDs block metadata for registry marking - payload lids.Block // LID data payload -} - -// idsSealBlock represents a sealed block containing various identifier types. -type idsSealBlock struct { - mids seqids.BlockMIDs - rids seqids.BlockRIDs - params seqids.BlockParams -} - -// blocksBuilder constructs sealed blocks from various data sources. -// Provides error tracking and consistency validation during block construction. -type blocksBuilder struct{} - -func (bb *blocksBuilder) BuildTokenBlocks( - it iter.Seq2[string, iter.Seq2[TokenPosting, error]], - accumulate func([]uint32) error, blockCapacity int, -) iter.Seq2[TokenBlock, error] { - return func(yield func(TokenBlock, error) bool) { - var ( - block tokensSealBlock - blockIdx uint32 - blockSize int - ) - - var ( - currentTID uint32 - pendingTable []token.FieldTable - fieldName string - fieldEntryStartTID uint32 - ) - - emitFieldEntry := func() { - // Handle case when field does not have tokens. - if fieldName == "" || fieldEntryStartTID > currentTID { - return - } - - entry := newTokenTableEntry(fieldEntryStartTID, currentTID, blockIdx, block) - pendingTable = append(pendingTable, token.FieldTable{ - Field: fieldName, - Entries: []*token.TableEntry{entry}, - }) - } - - flushBlock := func() bool { - emitFieldEntry() - block.ext.maxTID = currentTID - - pair := TokenBlock{First: block, Second: pendingTable} - if !yield(pair, nil) { - return false - } - - block.payload.Payload = block.payload.Payload[:0] - block.payload.Offsets = block.payload.Offsets[:0] - block.ext.minTID = currentTID + 1 - - blockIdx++ - blockSize = 0 - - pendingTable = pendingTable[:0] - fieldEntryStartTID = currentTID + 1 - - return true - } - - block.ext.minTID = 1 - for field, tokenIterator := range it { - emitFieldEntry() - - fieldName = field - fieldEntryStartTID = currentTID + 1 - - for pair, err := range tokenIterator { - if err != nil { - yield(TokenBlock{}, err) - return - } - - tok, tlids := pair.First, pair.Second - tokenSize := int(unsafe.Sizeof(uint32(0))) + len(tok) - - if blockSize > 0 && blockSize+tokenSize > blockCapacity { - if !flushBlock() { - return - } - } - - block.payload.Offsets = append(block.payload.Offsets, uint32(len(block.payload.Payload))) - block.payload.Payload = binary.LittleEndian.AppendUint32(block.payload.Payload, uint32(len(tok))) - block.payload.Payload = append(block.payload.Payload, tok...) - - if err := accumulate(tlids); err != nil { - yield(TokenBlock{}, err) - return - } - - currentTID++ - blockSize += tokenSize - } - } - - if blockSize > 0 { - flushBlock() - } - } -} - -func newTokenTableEntry( - entryStartTID, entryEndTID uint32, - blockIndex uint32, block tokensSealBlock, -) *token.TableEntry { - // Convert global TIDs to block-local indices - firstIndex := entryStartTID - block.ext.minTID - lastIndex := entryEndTID - block.ext.minTID - - // Extract min and max token values for the entry range - minVal := string(block.payload.GetToken(int(firstIndex))) - maxVal := string(block.payload.GetToken(int(lastIndex))) - - return &token.TableEntry{ - StartIndex: firstIndex, // Starting index within the block - StartTID: entryStartTID, // Starting token ID (global) - BlockIndex: blockIndex, // Reference to containing block - ValCount: lastIndex - firstIndex + 1, // Number of tokens in this entry - MinVal: minVal, // Smallest token value in range - MaxVal: maxVal, // Largest token value in range - } -} - -// seqBlockID accumulates scalar (ID, position) pairs into sealed ID blocks. -// A new block is yielded every `blockCapacity` IDs. -func seqBlockID(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[idsSealBlock, error] { - return func(yield func(idsSealBlock, error) bool) { - var block idsSealBlock - - for pair, err := range ids { - if err != nil { - yield(idsSealBlock{}, err) - return - } - - id, pos := pair.First, pair.Second - block.mids.Values = append(block.mids.Values, uint64(id.MID)) - block.rids.Values = append(block.rids.Values, uint64(id.RID)) - block.params.Values = append(block.params.Values, uint64(pos)) - - if len(block.mids.Values) == blockCapacity { - if !yield(block, nil) { - return - } - - block.mids.Values = block.mids.Values[:0] - block.rids.Values = block.rids.Values[:0] - block.params.Values = block.params.Values[:0] - } - } - - if len(block.mids.Values) > 0 { - yield(block, nil) - } - } -} - -type lidAccumulator struct { - blockCapacity int - onBlock func(lidsSealBlock) error - - currentTID uint32 - currentBlock lidsSealBlock - - isEndOfToken bool - isContinued bool -} - -func newLIDAccumulator( - blockCapacity int, - onBlock func(lidsSealBlock) error, -) *lidAccumulator { - a := &lidAccumulator{ - blockCapacity: blockCapacity, - onBlock: onBlock, - } - - a.currentBlock.ext.minTID = 1 - a.currentBlock.payload = lids.Block{ - LIDs: make([]uint32, 0, blockCapacity), - Offsets: []uint32{0}, - } - - return a -} - -// Add processes LIDs of one token (must be called in TID order). -// -// For each block that fills up, `onBlock` is called immediately -// before the backing arrays are reset, so `onBlock` may read the -// block data but must not retain references to it. -func (a *lidAccumulator) Add(lidsbuf []uint32) error { - a.currentTID++ - - for _, lid := range lidsbuf { - if len(a.currentBlock.payload.LIDs) == a.blockCapacity { - if err := a.onBlock(a.finalizeBlock()); err != nil { - return err - } - - a.currentBlock.ext.minTID = a.currentTID - a.currentBlock.payload.LIDs = a.currentBlock.payload.LIDs[:0] - a.currentBlock.payload.Offsets = a.currentBlock.payload.Offsets[:1] - } - - a.isEndOfToken = false - a.currentBlock.ext.maxTID = a.currentTID - a.currentBlock.payload.LIDs = append(a.currentBlock.payload.LIDs, lid) - } - - a.isEndOfToken = true - a.currentBlock.payload.Offsets = append( - a.currentBlock.payload.Offsets, - uint32(len(a.currentBlock.payload.LIDs)), - ) - - return nil -} - -func (a *lidAccumulator) Finalize() error { - return a.onBlock(a.finalizeBlock()) -} - -func (a *lidAccumulator) finalizeBlock() lidsSealBlock { - if !a.isEndOfToken { - a.currentBlock.payload.Offsets = append( - a.currentBlock.payload.Offsets, - uint32(len(a.currentBlock.payload.LIDs)), - ) - } - - result := a.currentBlock - result.payload.IsLastLID = a.isEndOfToken - result.ext.isContinued = a.isContinued - - a.isContinued = !a.isEndOfToken - return result -} diff --git a/indexwriter/index.go b/indexwriter/index.go index 286b620c..87fec07a 100644 --- a/indexwriter/index.go +++ b/indexwriter/index.go @@ -13,9 +13,15 @@ import ( "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/util" "github.com/ozontech/seq-db/zstd" ) +type ( + DocLocation = util.Pair[seq.ID, seq.DocPos] + TokenPosting = util.Pair[[]byte, []uint32] +) + // Source defines the data required to write all index files for a fraction. type Source interface { // Info returns metadata describing this source. @@ -23,7 +29,7 @@ type Source interface { // ID returns an iterator over stored document identifiers paired with // their positions, in descending [seq.ID] order. - ID() iter.Seq2[blockbuilder.DocLocation, error] + ID() iter.Seq2[DocLocation, error] // BlockOffsets returns byte offsets to each document block // within this source's `.docs` file. @@ -32,7 +38,7 @@ type Source interface { // TokenTriplet iterates over fields in lexicographic order. // For each field, it yields tokens (lexicographically sorted) // paired with the local document ID list for that token. - TokenTriplet() iter.Seq2[string, iter.Seq2[blockbuilder.TokenPosting, error]] + TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] } // indexBlock is one compressed (or not) block with its registry metadata. @@ -106,7 +112,7 @@ func (s *IndexWriter) WriteIDFile(ws io.WriteSeeker, src Source) error { } defer w.release() - for block, err := range blockbuilder.SeqBlockID(src.ID(), consts.IDsPerBlock) { + for block, err := range blockbuilder.IDBlock(src.ID(), consts.IDsPerBlock) { if err != nil { return err } @@ -140,18 +146,15 @@ func (s *IndexWriter) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err } defer lw.release() - var ( - allFieldsTables []token.FieldTable - ) - - lidAccumulator := newLIDAccumulator( + lidAccumulator := blockbuilder.NewLIDAccumulator( consts.LIDBlockCap, - func(block lidsSealBlock) error { + func(block blockbuilder.LidsSealBlock) error { return lw.writeBlock(blockTypeLID, s.packLIDsBlock(block)) }, ) - for pair, err := range blockbuilder.BuildTokenBlocks(src.TokenTriplet(), accumulate, consts.RegularBlockSize) { + var allFieldsTables []token.FieldTable + for pair, err := range blockbuilder.TokenBlocks(src.TokenTriplet(), lidAccumulator.Add, consts.RegularBlockSize) { if err != nil { return err } @@ -170,7 +173,7 @@ func (s *IndexWriter) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err return s.finalizeTokenFile(tw, allFieldsTables) } -func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) error { +func (s *IndexWriter) finalizeLIDFile(w *writer, lidAccumulator *blockbuilder.LIDAccumulator) error { if err := lidAccumulator.Finalize(); err != nil { return err } @@ -184,7 +187,7 @@ func (s *IndexWriter) finalizeTokenFile(w *writer, allFieldsTables []token.Field return err } - tokenTableBlock := token.TableBlock{FieldsTables: collapseOrderedFieldsTables(allFieldsTables)} + tokenTableBlock := token.TableBlock{FieldsTables: blockbuilder.CollapseOrderedFieldsTables(allFieldsTables)} if err := w.writeBlock(blockTypeTokenTable, s.packTokenTableBlock(tokenTableBlock)); err != nil { return err } @@ -192,7 +195,7 @@ func (s *IndexWriter) finalizeTokenFile(w *writer, allFieldsTables []token.Field return w.finalize() } -func (s *IndexSealer) WriteInfoFile(ws io.Writer, src Source) error { +func (s *IndexWriter) WriteInfoFile(ws io.Writer, src Source) error { block := sealed.BlockInfo{Info: src.Info()} _, err := ws.Write(s.packInfoBlock(block).payload) return err diff --git a/sealing/sealer.go b/sealing/sealer.go index edd263fe..0c21ffc4 100644 --- a/sealing/sealer.go +++ b/sealing/sealer.go @@ -25,12 +25,11 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { return nil, errors.New("sealing of an empty active fraction is not supported") } - writer := indexwriter.New(params) - + w := indexwriter.New(params) if err := createAndWrite( info.Path+consts.OffsetsTmpFileSuffix, info.Path+consts.OffsetsFileSuffix, - func(f *os.File) error { return writer.WriteOffsetsFile(f, src) }, + func(f *os.File) error { return w.WriteOffsetsFile(f, src) }, ); err != nil { return nil, err } @@ -38,7 +37,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWrite( info.Path+consts.IDTmpFileSuffix, info.Path+consts.IDFileSuffix, - func(f *os.File) error { return writer.WriteIDFile(f, src) }, + func(f *os.File) error { return w.WriteIDFile(f, src) }, ); err != nil { return nil, err } @@ -46,7 +45,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWriteBoth( info.Path+consts.TokenTmpFileSuffix, info.Path+consts.TokenFileSuffix, info.Path+consts.LIDTmpFileSuffix, info.Path+consts.LIDFileSuffix, - func(tokenF, lidF *os.File) error { return writer.WriteTokenTriplet(tokenF, lidF, src) }, + func(tokenF, lidF *os.File) error { return w.WriteTokenTriplet(tokenF, lidF, src) }, ); err != nil { return nil, err } @@ -54,7 +53,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWrite( info.Path+consts.InfoTmpFileSuffix, info.Path+consts.InfoFileSuffix, - func(f *os.File) error { return writer.WriteInfoFile(f, src) }, + func(f *os.File) error { return w.WriteInfoFile(f, src) }, ); err != nil { return nil, err } @@ -78,13 +77,13 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { } info.IndexOnDisk = totalSize - lidsTable := writer.LIDsTable() + lidsTable := w.LIDsTable() preloaded := &sealed.PreloadedData{ Info: info, - TokenTable: writer.TokenTable(), + TokenTable: w.TokenTable(), BlocksData: sealed.BlocksData{ - IDsTable: writer.IDsTable(), + IDsTable: w.IDsTable(), LIDsTable: &lidsTable, BlocksOffsets: src.BlockOffsets(), }, From 25539ea2b016645d21147d448c262265f4f23b7f Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Thu, 7 May 2026 11:35:18 +0300 Subject: [PATCH 6/7] refactor: merge `indexwriter` and `blockbuilder` --- blockbuilder/block_builder.go | 224 ------------------ blockbuilder/lid_accumulator.go | 85 ------- indexwriter/blocks.go | 219 +++++++++++++++++ .../blocks_test.go | 64 ++--- indexwriter/index.go | 53 ++--- indexwriter/lid_accumulator.go | 85 +++++++ 6 files changed, 362 insertions(+), 368 deletions(-) delete mode 100644 blockbuilder/block_builder.go delete mode 100644 blockbuilder/lid_accumulator.go create mode 100644 indexwriter/blocks.go rename blockbuilder/block_builder_test.go => indexwriter/blocks_test.go (77%) create mode 100644 indexwriter/lid_accumulator.go diff --git a/blockbuilder/block_builder.go b/blockbuilder/block_builder.go deleted file mode 100644 index 8103dedf..00000000 --- a/blockbuilder/block_builder.go +++ /dev/null @@ -1,224 +0,0 @@ -package blockbuilder - -import ( - "encoding/binary" - "iter" - "unsafe" - - "github.com/ozontech/seq-db/frac/sealed/lids" - "github.com/ozontech/seq-db/frac/sealed/seqids" - "github.com/ozontech/seq-db/frac/sealed/token" - "github.com/ozontech/seq-db/seq" - "github.com/ozontech/seq-db/util" -) - -type ( - DocLocation = util.Pair[seq.ID, seq.DocPos] - TokenPosting = util.Pair[[]byte, []uint32] - TokenBlock = util.Pair[TokensSealBlock, []token.FieldTable] -) - -// TokensExt represents the token ID range contained in a block. -type TokensExt struct { - MinTID uint32 // First token ID in the block - MaxTID uint32 // Last token ID in the block -} - -// TokensSealBlock represents a sealed block containing token data with metadata. -type TokensSealBlock struct { - Ext TokensExt // Tokens block metadata for registry marking - Payload token.Block // Actual token data payload -} - -// LidsExt represents the range and continuation status of LID blocks. -type LidsExt struct { - MinTID uint32 // First token ID in the LID block - MaxTID uint32 // Last token ID in the LID block - IsContinued bool // Whether LID sequence continues in next block -} - -// LidsSealBlock represents a sealed block containing LID (Local ID) data. -type LidsSealBlock struct { - Ext LidsExt // LIDs block metadata for registry marking - Payload lids.Block // LID data payload -} - -// IdsSealBlock represents a sealed block containing various identifier types. -type IdsSealBlock struct { - MIDs seqids.BlockMIDs - RIDs seqids.BlockRIDs - Params seqids.BlockParams -} - -func TokenBlocks( - it iter.Seq2[string, iter.Seq2[TokenPosting, error]], - accumulate func([]uint32) error, blockCapacity int, -) iter.Seq2[TokenBlock, error] { - return func(yield func(TokenBlock, error) bool) { - var ( - block TokensSealBlock - blockIdx uint32 - blockSize int - ) - - var ( - currentTID uint32 - pendingTable []token.FieldTable - fieldName string - fieldEntryStartTID uint32 - ) - - emitFieldEntry := func() { - // Handle case when field does not have tokens. - if fieldName == "" || fieldEntryStartTID > currentTID { - return - } - - entry := newTokenTableEntry(fieldEntryStartTID, currentTID, blockIdx, block) - pendingTable = append(pendingTable, token.FieldTable{ - Field: fieldName, - Entries: []*token.TableEntry{entry}, - }) - } - - flushBlock := func() bool { - emitFieldEntry() - block.Ext.MaxTID = currentTID - - pair := TokenBlock{First: block, Second: pendingTable} - if !yield(pair, nil) { - return false - } - - block.Payload.Payload = block.Payload.Payload[:0] - block.Payload.Offsets = block.Payload.Offsets[:0] - block.Ext.MinTID = currentTID + 1 - - blockIdx++ - blockSize = 0 - - pendingTable = pendingTable[:0] - fieldEntryStartTID = currentTID + 1 - - return true - } - - block.Ext.MinTID = 1 - for field, tokIt := range it { - emitFieldEntry() - - fieldName = field - fieldEntryStartTID = currentTID + 1 - - for pair, err := range tokIt { - if err != nil { - yield(TokenBlock{}, err) - return - } - - tok, tlids := pair.First, pair.Second - tokenSize := int(unsafe.Sizeof(uint32(0))) + len(tok) - - if blockSize > 0 && blockSize+tokenSize > blockCapacity { - if !flushBlock() { - return - } - } - - block.Payload.Offsets = append(block.Payload.Offsets, uint32(len(block.Payload.Payload))) - block.Payload.Payload = binary.LittleEndian.AppendUint32(block.Payload.Payload, uint32(len(tok))) - block.Payload.Payload = append(block.Payload.Payload, tok...) - - if err := accumulate(tlids); err != nil { - yield(TokenBlock{}, err) - return - } - - currentTID++ - blockSize += tokenSize - } - } - - if blockSize > 0 { - flushBlock() - } - } -} - -func newTokenTableEntry( - entryStartTID, entryEndTID uint32, - blockIndex uint32, block TokensSealBlock, -) *token.TableEntry { - // Convert global TIDs to block-local indices - firstIndex := entryStartTID - block.Ext.MinTID - lastIndex := entryEndTID - block.Ext.MinTID - - // Extract min and max token values for the entry range - minVal := string(block.Payload.GetToken(int(firstIndex))) - maxVal := string(block.Payload.GetToken(int(lastIndex))) - - return &token.TableEntry{ - StartIndex: firstIndex, // Starting index within the block - StartTID: entryStartTID, // Starting token ID (global) - BlockIndex: blockIndex, // Reference to containing block - ValCount: lastIndex - firstIndex + 1, // Number of tokens in this entry - MinVal: minVal, // Smallest token value in range - MaxVal: maxVal, // Largest token value in range - } -} - -// IDBlock accumulates scalar (ID, position) pairs into sealed ID blocks. -// A new block is yielded every `blockCapacity` IDs. -func IDBlock(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[IdsSealBlock, error] { - return func(yield func(IdsSealBlock, error) bool) { - var block IdsSealBlock - - for pair, err := range ids { - if err != nil { - yield(IdsSealBlock{}, err) - return - } - - id, pos := pair.First, pair.Second - block.MIDs.Values = append(block.MIDs.Values, uint64(id.MID)) - block.RIDs.Values = append(block.RIDs.Values, uint64(id.RID)) - block.Params.Values = append(block.Params.Values, uint64(pos)) - - if len(block.MIDs.Values) == blockCapacity { - if !yield(block, nil) { - return - } - - block.MIDs.Values = block.MIDs.Values[:0] - block.RIDs.Values = block.RIDs.Values[:0] - block.Params.Values = block.Params.Values[:0] - } - } - - if len(block.MIDs.Values) > 0 { - yield(block, nil) - } - } -} - -// CollapseOrderedFieldsTables merges FieldTables with the same field name. -// Assumes input is sorted by Field. -func CollapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable { - if len(src) == 0 { - return nil - } - - current := src[0] - var dst []token.FieldTable - for _, ft := range src[1:] { - if current.Field == ft.Field { - current.Entries = append(current.Entries, ft.Entries...) - continue - } - - dst = append(dst, current) - current = ft - } - - return append(dst, current) -} diff --git a/blockbuilder/lid_accumulator.go b/blockbuilder/lid_accumulator.go deleted file mode 100644 index ef81a970..00000000 --- a/blockbuilder/lid_accumulator.go +++ /dev/null @@ -1,85 +0,0 @@ -package blockbuilder - -import "github.com/ozontech/seq-db/frac/sealed/lids" - -type LIDAccumulator struct { - blockCapacity int - onBlock func(LidsSealBlock) error - - currentTID uint32 - currentBlock LidsSealBlock - - isEndOfToken bool - isContinued bool -} - -func NewLIDAccumulator( - blockCapacity int, - onBlock func(LidsSealBlock) error, -) *LIDAccumulator { - a := &LIDAccumulator{ - blockCapacity: blockCapacity, - onBlock: onBlock, - } - - a.currentBlock.Ext.MinTID = 1 - a.currentBlock.Payload = lids.Block{ - LIDs: make([]uint32, 0, blockCapacity), - Offsets: []uint32{0}, - } - - return a -} - -// Add processes LIDs of one token (must be called in TID order). -// -// For each block that fills up, `onBlock` is called immediately -// before the backing arrays are reset, so `onBlock` may read the -// block data but must not retain references to it. -func (a *LIDAccumulator) Add(lidsbuf []uint32) error { - a.currentTID++ - - for _, lid := range lidsbuf { - if len(a.currentBlock.Payload.LIDs) == a.blockCapacity { - if err := a.onBlock(a.finalizeBlock()); err != nil { - return err - } - - a.currentBlock.Ext.MinTID = a.currentTID - a.currentBlock.Payload.LIDs = a.currentBlock.Payload.LIDs[:0] - a.currentBlock.Payload.Offsets = a.currentBlock.Payload.Offsets[:1] - } - - a.isEndOfToken = false - a.currentBlock.Ext.MaxTID = a.currentTID - a.currentBlock.Payload.LIDs = append(a.currentBlock.Payload.LIDs, lid) - } - - a.isEndOfToken = true - a.currentBlock.Payload.Offsets = append( - a.currentBlock.Payload.Offsets, - uint32(len(a.currentBlock.Payload.LIDs)), - ) - - return nil -} - -func (a *LIDAccumulator) Finalize() error { - return a.onBlock(a.finalizeBlock()) -} - -func (a *LIDAccumulator) finalizeBlock() LidsSealBlock { - if !a.isEndOfToken { - a.currentBlock.Payload.Offsets = append( - a.currentBlock.Payload.Offsets, - uint32(len(a.currentBlock.Payload.LIDs)), - ) - } - - result := a.currentBlock - result.Payload.IsLastLID = a.isEndOfToken - result.Ext.IsContinued = a.isContinued - - a.isContinued = !a.isEndOfToken - return result -} diff --git a/indexwriter/blocks.go b/indexwriter/blocks.go new file mode 100644 index 00000000..3064491b --- /dev/null +++ b/indexwriter/blocks.go @@ -0,0 +1,219 @@ +package indexwriter + +import ( + "encoding/binary" + "iter" + "unsafe" + + "github.com/ozontech/seq-db/frac/sealed/lids" + "github.com/ozontech/seq-db/frac/sealed/seqids" + "github.com/ozontech/seq-db/frac/sealed/token" + "github.com/ozontech/seq-db/util" +) + +type tokenFieldBlock = util.Pair[unpackedTokenBlock, []token.FieldTable] + +// tokenExt represents the token ID range contained in a block. +type tokenExt struct { + minTID uint32 // First token ID in the block + maxTID uint32 // Last token ID in the block +} + +// unpackedTokenBlock represents a sealed block containing token data with metadata. +type unpackedTokenBlock struct { + ext tokenExt // Tokens block metadata for registry marking + payload token.Block // Actual token data payload +} + +// lidExt represents the range and continuation status of LID blocks. +type lidExt struct { + minTID uint32 // First token ID in the LID block + maxTID uint32 // Last token ID in the LID block + isContinued bool // Whether LID sequence continues in next block +} + +// unpackedLIDBlock represents a sealed block containing LID (Local ID) data. +type unpackedLIDBlock struct { + ext lidExt // LIDs block metadata for registry marking + payload lids.Block // LID data payload +} + +// unpackedIDBlock represents a sealed block containing various identifier types. +type unpackedIDBlock struct { + mids seqids.BlockMIDs + rids seqids.BlockRIDs + params seqids.BlockParams +} + +func tokenBlock( + it iter.Seq2[string, iter.Seq2[TokenPosting, error]], + accumulate func([]uint32) error, blockCapacity int, +) iter.Seq2[tokenFieldBlock, error] { + return func(yield func(tokenFieldBlock, error) bool) { + var ( + block unpackedTokenBlock + blockIdx uint32 + blockSize int + ) + + var ( + currentTID uint32 + pendingTable []token.FieldTable + fieldName string + fieldEntryStartTID uint32 + ) + + emitFieldEntry := func() { + // Handle case when field does not have tokens. + if fieldName == "" || fieldEntryStartTID > currentTID { + return + } + + entry := newTokenTableEntry(fieldEntryStartTID, currentTID, blockIdx, block) + pendingTable = append(pendingTable, token.FieldTable{ + Field: fieldName, + Entries: []*token.TableEntry{entry}, + }) + } + + flushBlock := func() bool { + emitFieldEntry() + block.ext.maxTID = currentTID + + pair := tokenFieldBlock{First: block, Second: pendingTable} + if !yield(pair, nil) { + return false + } + + block.payload.Payload = block.payload.Payload[:0] + block.payload.Offsets = block.payload.Offsets[:0] + block.ext.minTID = currentTID + 1 + + blockIdx++ + blockSize = 0 + + pendingTable = pendingTable[:0] + fieldEntryStartTID = currentTID + 1 + + return true + } + + block.ext.minTID = 1 + for field, tokIt := range it { + emitFieldEntry() + + fieldName = field + fieldEntryStartTID = currentTID + 1 + + for pair, err := range tokIt { + if err != nil { + yield(tokenFieldBlock{}, err) + return + } + + tok, tlids := pair.First, pair.Second + tokenSize := int(unsafe.Sizeof(uint32(0))) + len(tok) + + if blockSize > 0 && blockSize+tokenSize > blockCapacity { + if !flushBlock() { + return + } + } + + block.payload.Offsets = append(block.payload.Offsets, uint32(len(block.payload.Payload))) + block.payload.Payload = binary.LittleEndian.AppendUint32(block.payload.Payload, uint32(len(tok))) + block.payload.Payload = append(block.payload.Payload, tok...) + + if err := accumulate(tlids); err != nil { + yield(tokenFieldBlock{}, err) + return + } + + currentTID++ + blockSize += tokenSize + } + } + + if blockSize > 0 { + flushBlock() + } + } +} + +func newTokenTableEntry( + entryStartTID, entryEndTID uint32, + blockIndex uint32, block unpackedTokenBlock, +) *token.TableEntry { + // Convert global TIDs to block-local indices + firstIndex := entryStartTID - block.ext.minTID + lastIndex := entryEndTID - block.ext.minTID + + // Extract min and max token values for the entry range + minVal := string(block.payload.GetToken(int(firstIndex))) + maxVal := string(block.payload.GetToken(int(lastIndex))) + + return &token.TableEntry{ + StartIndex: firstIndex, // Starting index within the block + StartTID: entryStartTID, // Starting token ID (global) + BlockIndex: blockIndex, // Reference to containing block + ValCount: lastIndex - firstIndex + 1, // Number of tokens in this entry + MinVal: minVal, // Smallest token value in range + MaxVal: maxVal, // Largest token value in range + } +} + +// idBlock accumulates scalar (ID, position) pairs into sealed ID blocks. +// A new block is yielded every `blockCapacity` IDs. +func idBlock(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[unpackedIDBlock, error] { + return func(yield func(unpackedIDBlock, error) bool) { + var block unpackedIDBlock + + for pair, err := range ids { + if err != nil { + yield(unpackedIDBlock{}, err) + return + } + + id, pos := pair.First, pair.Second + block.mids.Values = append(block.mids.Values, uint64(id.MID)) + block.rids.Values = append(block.rids.Values, uint64(id.RID)) + block.params.Values = append(block.params.Values, uint64(pos)) + + if len(block.mids.Values) == blockCapacity { + if !yield(block, nil) { + return + } + + block.mids.Values = block.mids.Values[:0] + block.rids.Values = block.rids.Values[:0] + block.params.Values = block.params.Values[:0] + } + } + + if len(block.mids.Values) > 0 { + yield(block, nil) + } + } +} + +// collapseOrderedFieldsTables merges FieldTables with the same field name. +// Assumes input is sorted by Field. +func collapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable { + if len(src) == 0 { + return nil + } + + current := src[0] + var dst []token.FieldTable + for _, ft := range src[1:] { + if current.Field == ft.Field { + current.Entries = append(current.Entries, ft.Entries...) + continue + } + + dst = append(dst, current) + current = ft + } + + return append(dst, current) +} diff --git a/blockbuilder/block_builder_test.go b/indexwriter/blocks_test.go similarity index 77% rename from blockbuilder/block_builder_test.go rename to indexwriter/blocks_test.go index 95fe7698..8a3951d7 100644 --- a/blockbuilder/block_builder_test.go +++ b/indexwriter/blocks_test.go @@ -1,4 +1,4 @@ -package blockbuilder +package indexwriter import ( "iter" @@ -104,18 +104,18 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { const blockSize = 24 const lidBlockCap = 3 - var lidBlocks []LidsSealBlock - lidAccumulator := NewLIDAccumulator( + var lidBlocks []unpackedLIDBlock + lidAccumulator := newLIDAccumulator( lidBlockCap, - func(block LidsSealBlock) error { - block.Payload.LIDs = slices.Clone(block.Payload.LIDs) - block.Payload.Offsets = slices.Clone(block.Payload.Offsets) + func(block unpackedLIDBlock) error { + block.payload.LIDs = slices.Clone(block.payload.LIDs) + block.payload.Offsets = slices.Clone(block.payload.Offsets) lidBlocks = append(lidBlocks, block) return nil }, ) - tokenBlocks := TokenBlocks( + tokenBlocksIter := tokenBlock( src.TokenTriplet(), lidAccumulator.Add, blockSize, @@ -129,19 +129,19 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { blockIndex := 0 allFieldsTables := []token.FieldTable{} - for pair, err := range tokenBlocks { + for pair, err := range tokenBlocksIter { assert.NoError(t, err) block, fieldsTables := pair.First, pair.Second - assert.Equal(t, expectedSizes[blockIndex], block.Payload.Len()) - for i := range block.Payload.Len() { + assert.Equal(t, expectedSizes[blockIndex], block.payload.Len()) + for i := range block.payload.Len() { tid++ - assert.Equal(t, src.tokens[tid-1], block.Payload.GetToken(i)) + assert.Equal(t, src.tokens[tid-1], block.payload.GetToken(i)) } allFieldsTables = append(allFieldsTables, fieldsTables...) blockIndex++ } - actualTokenTable := token.TableBlock{FieldsTables: CollapseOrderedFieldsTables(allFieldsTables)} + actualTokenTable := token.TableBlock{FieldsTables: collapseOrderedFieldsTables(allFieldsTables)} assert.Equal(t, tid, len(src.tokens)) expectedTokenTable := token.TableBlock{ @@ -238,30 +238,30 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { assert.Equal(t, actualTokenTable.FieldsTables, expectedTokenTable.FieldsTables) assert.NoError(t, lidAccumulator.Finalize()) - expectedLIDBlocks := []LidsSealBlock{ + expectedLIDBlocks := []unpackedLIDBlock{ { - Ext: LidsExt{MinTID: 1, MaxTID: 1, IsContinued: false}, - Payload: lids.Block{LIDs: []uint32{10, 20, 30}, Offsets: []uint32{0, 3}, IsLastLID: false}, + ext: lidExt{minTID: 1, maxTID: 1, isContinued: false}, + payload: lids.Block{LIDs: []uint32{10, 20, 30}, Offsets: []uint32{0, 3}, IsLastLID: false}, }, { - Ext: LidsExt{MinTID: 1, MaxTID: 3, IsContinued: true}, - Payload: lids.Block{LIDs: []uint32{40, 2, 3}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + ext: lidExt{minTID: 1, maxTID: 3, isContinued: true}, + payload: lids.Block{LIDs: []uint32{40, 2, 3}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - Ext: LidsExt{MinTID: 4, MaxTID: 6, IsContinued: false}, - Payload: lids.Block{LIDs: []uint32{4, 5, 6}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + ext: lidExt{minTID: 4, maxTID: 6, isContinued: false}, + payload: lids.Block{LIDs: []uint32{4, 5, 6}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - Ext: LidsExt{MinTID: 7, MaxTID: 9, IsContinued: false}, - Payload: lids.Block{LIDs: []uint32{7, 8, 9}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + ext: lidExt{minTID: 7, maxTID: 9, isContinued: false}, + payload: lids.Block{LIDs: []uint32{7, 8, 9}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - Ext: LidsExt{MinTID: 10, MaxTID: 12, IsContinued: false}, - Payload: lids.Block{LIDs: []uint32{10, 11, 12}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + ext: lidExt{minTID: 10, maxTID: 12, isContinued: false}, + payload: lids.Block{LIDs: []uint32{10, 11, 12}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - Ext: LidsExt{MinTID: 13, MaxTID: 14, IsContinued: false}, - Payload: lids.Block{LIDs: []uint32{13, 14}, Offsets: []uint32{0, 1, 2}, IsLastLID: true}, + ext: lidExt{minTID: 13, maxTID: 14, isContinued: false}, + payload: lids.Block{LIDs: []uint32{13, 14}, Offsets: []uint32{0, 1, 2}, IsLastLID: true}, }, } assert.Equal(t, expectedLIDBlocks, lidBlocks) @@ -300,18 +300,18 @@ func TestBlocksBuilder_IDsBlocks(t *testing.T) { i := 0 ids := []seq.ID{} pos := []seq.DocPos{} - for block, err := range IDBlock(src.ID(), 3) { + for block, err := range idBlock(src.ID(), 3) { assert.NoError(t, err) - assert.Equal(t, expectedSizes[i], len(block.MIDs.Values)) - assert.Equal(t, expectedSizes[i], len(block.RIDs.Values)) - assert.Equal(t, expectedSizes[i], len(block.Params.Values)) + assert.Equal(t, expectedSizes[i], len(block.mids.Values)) + assert.Equal(t, expectedSizes[i], len(block.rids.Values)) + assert.Equal(t, expectedSizes[i], len(block.params.Values)) i++ j := 0 - for _, mid := range block.MIDs.Values { - ids = append(ids, seq.ID{MID: seq.MID(mid), RID: seq.RID(block.RIDs.Values[j])}) - pos = append(pos, seq.DocPos(block.Params.Values[j])) + for _, mid := range block.mids.Values { + ids = append(ids, seq.ID{MID: seq.MID(mid), RID: seq.RID(block.rids.Values[j])}) + pos = append(pos, seq.DocPos(block.params.Values[j])) j++ } } diff --git a/indexwriter/index.go b/indexwriter/index.go index 87fec07a..12fedafe 100644 --- a/indexwriter/index.go +++ b/indexwriter/index.go @@ -4,7 +4,6 @@ import ( "io" "iter" - "github.com/ozontech/seq-db/blockbuilder" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" @@ -112,7 +111,7 @@ func (s *IndexWriter) WriteIDFile(ws io.WriteSeeker, src Source) error { } defer w.release() - for block, err := range blockbuilder.IDBlock(src.ID(), consts.IDsPerBlock) { + for block, err := range idBlock(src.ID(), consts.IDsPerBlock) { if err != nil { return err } @@ -146,15 +145,15 @@ func (s *IndexWriter) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err } defer lw.release() - lidAccumulator := blockbuilder.NewLIDAccumulator( + lidAccumulator := newLIDAccumulator( consts.LIDBlockCap, - func(block blockbuilder.LidsSealBlock) error { + func(block unpackedLIDBlock) error { return lw.writeBlock(blockTypeLID, s.packLIDsBlock(block)) }, ) var allFieldsTables []token.FieldTable - for pair, err := range blockbuilder.TokenBlocks(src.TokenTriplet(), lidAccumulator.Add, consts.RegularBlockSize) { + for pair, err := range tokenBlock(src.TokenTriplet(), lidAccumulator.Add, consts.RegularBlockSize) { if err != nil { return err } @@ -173,7 +172,7 @@ func (s *IndexWriter) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err return s.finalizeTokenFile(tw, allFieldsTables) } -func (s *IndexWriter) finalizeLIDFile(w *writer, lidAccumulator *blockbuilder.LIDAccumulator) error { +func (s *IndexWriter) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) error { if err := lidAccumulator.Finalize(); err != nil { return err } @@ -187,7 +186,7 @@ func (s *IndexWriter) finalizeTokenFile(w *writer, allFieldsTables []token.Field return err } - tokenTableBlock := token.TableBlock{FieldsTables: blockbuilder.CollapseOrderedFieldsTables(allFieldsTables)} + tokenTableBlock := token.TableBlock{FieldsTables: collapseOrderedFieldsTables(allFieldsTables)} if err := w.writeBlock(blockTypeTokenTable, s.packTokenTableBlock(tokenTableBlock)); err != nil { return err } @@ -220,11 +219,11 @@ func (s *IndexWriter) packInfoBlock(block sealed.BlockInfo) indexBlock { } // packTokenBlock packs token data into a compressed index block. -func (s *IndexWriter) packTokenBlock(block blockbuilder.TokensSealBlock) indexBlock { - s.buf1 = block.Payload.Pack(s.buf1[:0]) // Pack token data +func (s *IndexWriter) packTokenBlock(block unpackedTokenBlock) indexBlock { + s.buf1 = block.payload.Pack(s.buf1[:0]) // Pack token data b := s.newIndexBlockZSTD(s.buf1, s.params.TokenListZstdLevel) // Store TID range in extended metadata - b.ext1 = uint64(block.Ext.MaxTID)<<32 | uint64(block.Ext.MinTID) + b.ext1 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) return b } @@ -250,19 +249,19 @@ func (s *IndexWriter) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlo } // packMIDsBlock packs MIDs into a compressed index block. -func (s *IndexWriter) packMIDsBlock(block blockbuilder.IdsSealBlock) indexBlock { +func (s *IndexWriter) packMIDsBlock(block unpackedIDBlock) indexBlock { // Get the last ID in the block (smallest due to descending order) - last := len(block.MIDs.Values) - 1 + last := len(block.mids.Values) - 1 minID := seq.ID{ - MID: seq.MID(block.MIDs.Values[last]), - RID: seq.RID(block.RIDs.Values[last]), + MID: seq.MID(block.mids.Values[last]), + RID: seq.RID(block.rids.Values[last]), } s.idsTable.MinBlockIDs = append(s.idsTable.MinBlockIDs, minID) // Store for PreloadedData // Packing block - s.buf1 = block.MIDs.Pack(s.buf1[:0]) + s.buf1 = block.mids.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) // Store min MID and RID in extended metadata @@ -273,38 +272,38 @@ func (s *IndexWriter) packMIDsBlock(block blockbuilder.IdsSealBlock) indexBlock } // packRIDsBlock packs RIDs into a compressed index block. -func (s *IndexWriter) packRIDsBlock(block blockbuilder.IdsSealBlock) indexBlock { - s.buf1 = block.RIDs.Pack(s.buf1[:0]) +func (s *IndexWriter) packRIDsBlock(block unpackedIDBlock) indexBlock { + s.buf1 = block.rids.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) return b } // packPosBlock packs document positions into a compressed index block. -func (s *IndexWriter) packPosBlock(block blockbuilder.IdsSealBlock) indexBlock { - s.buf1 = block.Params.Pack(s.buf1[:0]) +func (s *IndexWriter) packPosBlock(block unpackedIDBlock) indexBlock { + s.buf1 = block.params.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) return b } // packLIDsBlock packs Local IDs (LIDs) into a compressed index block. // Also updates LIDs table for preloaded data access. -func (s *IndexWriter) packLIDsBlock(block blockbuilder.LidsSealBlock) indexBlock { +func (s *IndexWriter) packLIDsBlock(block unpackedLIDBlock) indexBlock { var ext1 uint64 - if block.Ext.IsContinued { // todo: Legacy continuation flag + if block.ext.isContinued { // todo: Legacy continuation flag ext1 = 1 - block.Ext.MinTID++ // Adjust for legacy format + block.ext.minTID++ // Adjust for legacy format } // Update LIDs table for PreloadedData - s.lidsTable.MinTIDs = append(s.lidsTable.MinTIDs, block.Ext.MinTID) - s.lidsTable.MaxTIDs = append(s.lidsTable.MaxTIDs, block.Ext.MaxTID) - s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.Ext.IsContinued) + s.lidsTable.MinTIDs = append(s.lidsTable.MinTIDs, block.ext.minTID) + s.lidsTable.MaxTIDs = append(s.lidsTable.MaxTIDs, block.ext.maxTID) + s.lidsTable.IsContinued = append(s.lidsTable.IsContinued, block.ext.isContinued) // Packing block - s.buf1 = block.Payload.Pack(s.buf1[:0]) + s.buf1 = block.payload.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel) b.ext1 = ext1 // Legacy continuation flag - b.ext2 = uint64(block.Ext.MaxTID)<<32 | uint64(block.Ext.MinTID) // TID range + b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range return b } diff --git a/indexwriter/lid_accumulator.go b/indexwriter/lid_accumulator.go new file mode 100644 index 00000000..f3b3740a --- /dev/null +++ b/indexwriter/lid_accumulator.go @@ -0,0 +1,85 @@ +package indexwriter + +import "github.com/ozontech/seq-db/frac/sealed/lids" + +type lidAccumulator struct { + blockCapacity int + onBlock func(unpackedLIDBlock) error + + currentTID uint32 + currentBlock unpackedLIDBlock + + isEndOfToken bool + isContinued bool +} + +func newLIDAccumulator( + blockCapacity int, + onBlock func(unpackedLIDBlock) error, +) *lidAccumulator { + a := &lidAccumulator{ + blockCapacity: blockCapacity, + onBlock: onBlock, + } + + a.currentBlock.ext.minTID = 1 + a.currentBlock.payload = lids.Block{ + LIDs: make([]uint32, 0, blockCapacity), + Offsets: []uint32{0}, + } + + return a +} + +// Add processes LIDs of one token (must be called in TID order). +// +// For each block that fills up, `onBlock` is called immediately +// before the backing arrays are reset, so `onBlock` may read the +// block data but must not retain references to it. +func (a *lidAccumulator) Add(lidsbuf []uint32) error { + a.currentTID++ + + for _, lid := range lidsbuf { + if len(a.currentBlock.payload.LIDs) == a.blockCapacity { + if err := a.onBlock(a.finalizeBlock()); err != nil { + return err + } + + a.currentBlock.ext.minTID = a.currentTID + a.currentBlock.payload.LIDs = a.currentBlock.payload.LIDs[:0] + a.currentBlock.payload.Offsets = a.currentBlock.payload.Offsets[:1] + } + + a.isEndOfToken = false + a.currentBlock.ext.maxTID = a.currentTID + a.currentBlock.payload.LIDs = append(a.currentBlock.payload.LIDs, lid) + } + + a.isEndOfToken = true + a.currentBlock.payload.Offsets = append( + a.currentBlock.payload.Offsets, + uint32(len(a.currentBlock.payload.LIDs)), + ) + + return nil +} + +func (a *lidAccumulator) Finalize() error { + return a.onBlock(a.finalizeBlock()) +} + +func (a *lidAccumulator) finalizeBlock() unpackedLIDBlock { + if !a.isEndOfToken { + a.currentBlock.payload.Offsets = append( + a.currentBlock.payload.Offsets, + uint32(len(a.currentBlock.payload.LIDs)), + ) + } + + result := a.currentBlock + result.payload.IsLastLID = a.isEndOfToken + result.ext.isContinued = a.isContinued + + a.isContinued = !a.isEndOfToken + return result +} From aa9e94ed4acc7c21166c2341196364c1e866bfe8 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Mon, 27 Apr 2026 12:14:03 +0300 Subject: [PATCH 7/7] refactor: do not store ids count in offsets --- frac/sealed/block_offsets.go | 20 ++++++++++++++++---- frac/sealed/seqids/loader.go | 1 - frac/sealed_loader.go | 22 +++++++++------------- indexwriter/index.go | 11 ++--------- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/frac/sealed/block_offsets.go b/frac/sealed/block_offsets.go index 2be59942..d644a0f7 100644 --- a/frac/sealed/block_offsets.go +++ b/frac/sealed/block_offsets.go @@ -6,13 +6,17 @@ import ( ) type BlockOffsets struct { - IDsTotal uint32 // todo: the best place for this field is Info block - Offsets []uint64 + Offsets []uint64 } func (b *BlockOffsets) Pack(buf []byte) []byte { buf = binary.LittleEndian.AppendUint32(buf, uint32(len(b.Offsets))) - buf = binary.LittleEndian.AppendUint32(buf, b.IDsTotal) + + // NOTE(dkharms): Previously we stored here amount of documents ids. + // + // I've created a task which will require fraction binary version bumping + // to get rid of this: https://github.com/ozontech/seq-db/issues/409 + buf = binary.LittleEndian.AppendUint32(buf, 0) var prev uint64 for _, pos := range b.Offsets { @@ -26,13 +30,16 @@ func (b *BlockOffsets) Unpack(data []byte) error { if len(data) < 4 { return errors.New("blocks offset decoding error: truncated header (missing offsets count)") } + idsBlocksCount := binary.LittleEndian.Uint32(data) data = data[4:] if len(data) < 4 { return errors.New("blocks offset decoding error: truncated header (missing IDsTotal)") } - b.IDsTotal = binary.LittleEndian.Uint32(data) + + // NOTE(dkharms): Previously we stored here amount of documents ids. + _ = binary.LittleEndian.Uint32(data) data = data[4:] offset := uint64(0) @@ -42,15 +49,20 @@ func (b *BlockOffsets) Unpack(data []byte) error { if n == 0 { return errors.New("blocks offset decoding error: varint returned 0") } + if n < 0 { return errors.New("blocks offset decoding error: varint overflow") } + data = data[n:] offset += uint64(delta) + b.Offsets = append(b.Offsets, offset) } + if uint32(len(b.Offsets)) != idsBlocksCount { return errors.New("blocks offset decoding error: offset count mismatch") } + return nil } diff --git a/frac/sealed/seqids/loader.go b/frac/sealed/seqids/loader.go index a4c9ecdb..1f0c05de 100644 --- a/frac/sealed/seqids/loader.go +++ b/frac/sealed/seqids/loader.go @@ -13,7 +13,6 @@ import ( type Table struct { MinBlockIDs []seq.ID // from max to min - IDBlocksTotal uint32 IDsTotal uint32 StartBlockIndex uint32 } diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index 893b75a4..10d95a2d 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -36,7 +36,7 @@ func (l *LegacyLoader) Load(blocksData *sealed.BlocksData, info *common.Info, re l.skipSection() // skip token table blocks var err error - blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info.BinaryDataVer) + blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info) if err != nil { logger.Fatal("legacy load ids error", zap.Error(err)) } @@ -77,7 +77,7 @@ func (l *LegacyLoader) skipSection() { } // loadIDs reads the BlockOffsets block and then scans MID/RID/Pos triplets. -func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Table, []uint64, error) { +func (l *LegacyLoader) loadIDs(info *common.Info) (seqids.Table, []uint64, error) { var buf []byte data, _, err := l.reader.ReadIndexBlock(l.blockIndex, buf) @@ -94,9 +94,8 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab l.blockIndex++ table := seqids.Table{ - StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index - IDsTotal: offsets.IDsTotal, - IDBlocksTotal: uint32(len(offsets.Offsets)), + StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index + IDsTotal: info.DocsTotal + 1, // Increment by one for [seq.SystemID] } for { @@ -111,7 +110,7 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab } mid := seq.MID(h.GetExt1()) - if fracVersion < config.BinaryDataV2 { + if info.BinaryDataVer < config.BinaryDataV2 { mid = seq.MillisToMID(h.GetExt1()) } @@ -184,10 +183,9 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, readers if err != nil { logger.Fatal("load offsets error", zap.Error(err)) } - blocksData.BlocksOffsets = blockOffsets.Offsets - blocksData.IDsTable = l.loadIDsTable(readers.ID, blockOffsets.IDsTotal, info.BinaryDataVer) + blocksData.IDsTable = l.loadIDsTable(readers.ID, info) blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID) if err != nil { logger.Fatal("load lids error", zap.Error(err)) @@ -227,10 +225,10 @@ func (l *Loader) loadBlocksOffsets(r storage.IndexReader) (sealed.BlockOffsets, // loadIDsTable scans block headers in the .id file to build seqids.Table. // Blocks are stored as (MIDs, RIDs, Pos) triplets; we only need MIDs headers. -func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersion config.BinaryDataVersion) seqids.Table { +func (l *Loader) loadIDsTable(r storage.IndexReader, info *common.Info) seqids.Table { table := seqids.Table{ StartBlockIndex: 0, - IDsTotal: idsTotal, + IDsTotal: info.DocsTotal + 1, // Increment by one for [seq.SystemID] } blocksCount, err := r.BlocksCount() @@ -248,7 +246,7 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio } var mid seq.MID - if fracVersion < config.BinaryDataV2 { + if info.BinaryDataVer < config.BinaryDataV2 { mid = seq.MillisToMID(header.GetExt1()) } else { mid = seq.MID(header.GetExt1()) @@ -258,8 +256,6 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio MID: mid, RID: seq.RID(header.GetExt2()), }) - - table.IDBlocksTotal++ } return table diff --git a/indexwriter/index.go b/indexwriter/index.go index 12fedafe..f317d2d2 100644 --- a/indexwriter/index.go +++ b/indexwriter/index.go @@ -92,11 +92,7 @@ func (s *IndexWriter) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { } defer w.release() - offsets := sealed.BlockOffsets{ - IDsTotal: src.Info().DocsTotal + 1, - Offsets: src.BlockOffsets(), - } - + offsets := sealed.BlockOffsets{Offsets: src.BlockOffsets()} if err := w.writeBlock(blockTypeOffset, s.packBlocksOffsetsBlock(offsets)); err != nil { return err } @@ -214,6 +210,7 @@ func (s *IndexWriter) newIndexBlockZSTD(raw []byte, level int) indexBlock { // packInfoBlock packs fraction information into an index block. func (s *IndexWriter) packInfoBlock(block sealed.BlockInfo) indexBlock { + s.idsTable.IDsTotal = block.Info.DocsTotal + 1 // Increment by one for [seq.SystemID] s.buf1 = block.Pack(s.buf1[:0]) return newIndexBlock(s.buf1) // Info block is typically small, no compression } @@ -238,10 +235,6 @@ func (s *IndexWriter) packTokenTableBlock(tokenTableBlock token.TableBlock) inde // packBlocksOffsetsBlock packs document block offsets into a compressed index block. func (s *IndexWriter) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlock { - // Update IDs table for PreloadedData - s.idsTable.IDsTotal = block.IDsTotal // Total number of IDs - s.idsTable.IDBlocksTotal = uint32(len(block.Offsets)) // Number of ID blocks - // Packing block s.buf1 = block.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.DocsPositionsZstdLevel)