diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 27f5d971..b37cf2a3 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -16,9 +16,9 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" testcommon "github.com/ozontech/seq-db/tests/common" diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 8757c0db..0fee4795 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -22,10 +22,10 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" diff --git a/frac/sealed/block_offsets.go b/frac/sealed/block_offsets.go index 2be59942..d644a0f7 100644 --- a/frac/sealed/block_offsets.go +++ b/frac/sealed/block_offsets.go @@ -6,13 +6,17 @@ import ( ) type BlockOffsets struct { - IDsTotal uint32 // todo: the best place for this field is Info block - Offsets []uint64 + Offsets []uint64 } func (b *BlockOffsets) Pack(buf []byte) []byte { buf = binary.LittleEndian.AppendUint32(buf, uint32(len(b.Offsets))) - buf = binary.LittleEndian.AppendUint32(buf, b.IDsTotal) + + // NOTE(dkharms): Previously we stored here amount of documents ids. + // + // I've created a task which will require fraction binary version bumping + // to get rid of this: https://github.com/ozontech/seq-db/issues/409 + buf = binary.LittleEndian.AppendUint32(buf, 0) var prev uint64 for _, pos := range b.Offsets { @@ -26,13 +30,16 @@ func (b *BlockOffsets) Unpack(data []byte) error { if len(data) < 4 { return errors.New("blocks offset decoding error: truncated header (missing offsets count)") } + idsBlocksCount := binary.LittleEndian.Uint32(data) data = data[4:] if len(data) < 4 { return errors.New("blocks offset decoding error: truncated header (missing IDsTotal)") } - b.IDsTotal = binary.LittleEndian.Uint32(data) + + // NOTE(dkharms): Previously we stored here amount of documents ids. + _ = binary.LittleEndian.Uint32(data) data = data[4:] offset := uint64(0) @@ -42,15 +49,20 @@ func (b *BlockOffsets) Unpack(data []byte) error { if n == 0 { return errors.New("blocks offset decoding error: varint returned 0") } + if n < 0 { return errors.New("blocks offset decoding error: varint overflow") } + data = data[n:] offset += uint64(delta) + b.Offsets = append(b.Offsets, offset) } + if uint32(len(b.Offsets)) != idsBlocksCount { return errors.New("blocks offset decoding error: offset count mismatch") } + return nil } diff --git a/frac/sealed/seqids/loader.go b/frac/sealed/seqids/loader.go index a4c9ecdb..1f0c05de 100644 --- a/frac/sealed/seqids/loader.go +++ b/frac/sealed/seqids/loader.go @@ -13,7 +13,6 @@ import ( type Table struct { MinBlockIDs []seq.ID // from max to min - IDBlocksTotal uint32 IDsTotal uint32 StartBlockIndex uint32 } diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index 893b75a4..10d95a2d 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -36,7 +36,7 @@ func (l *LegacyLoader) Load(blocksData *sealed.BlocksData, info *common.Info, re l.skipSection() // skip token table blocks var err error - blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info.BinaryDataVer) + blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info) if err != nil { logger.Fatal("legacy load ids error", zap.Error(err)) } @@ -77,7 +77,7 @@ func (l *LegacyLoader) skipSection() { } // loadIDs reads the BlockOffsets block and then scans MID/RID/Pos triplets. -func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Table, []uint64, error) { +func (l *LegacyLoader) loadIDs(info *common.Info) (seqids.Table, []uint64, error) { var buf []byte data, _, err := l.reader.ReadIndexBlock(l.blockIndex, buf) @@ -94,9 +94,8 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab l.blockIndex++ table := seqids.Table{ - StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index - IDsTotal: offsets.IDsTotal, - IDBlocksTotal: uint32(len(offsets.Offsets)), + StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index + IDsTotal: info.DocsTotal + 1, // Increment by one for [seq.SystemID] } for { @@ -111,7 +110,7 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab } mid := seq.MID(h.GetExt1()) - if fracVersion < config.BinaryDataV2 { + if info.BinaryDataVer < config.BinaryDataV2 { mid = seq.MillisToMID(h.GetExt1()) } @@ -184,10 +183,9 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, readers if err != nil { logger.Fatal("load offsets error", zap.Error(err)) } - blocksData.BlocksOffsets = blockOffsets.Offsets - blocksData.IDsTable = l.loadIDsTable(readers.ID, blockOffsets.IDsTotal, info.BinaryDataVer) + blocksData.IDsTable = l.loadIDsTable(readers.ID, info) blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID) if err != nil { logger.Fatal("load lids error", zap.Error(err)) @@ -227,10 +225,10 @@ func (l *Loader) loadBlocksOffsets(r storage.IndexReader) (sealed.BlockOffsets, // loadIDsTable scans block headers in the .id file to build seqids.Table. // Blocks are stored as (MIDs, RIDs, Pos) triplets; we only need MIDs headers. -func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersion config.BinaryDataVersion) seqids.Table { +func (l *Loader) loadIDsTable(r storage.IndexReader, info *common.Info) seqids.Table { table := seqids.Table{ StartBlockIndex: 0, - IDsTotal: idsTotal, + IDsTotal: info.DocsTotal + 1, // Increment by one for [seq.SystemID] } blocksCount, err := r.BlocksCount() @@ -248,7 +246,7 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio } var mid seq.MID - if fracVersion < config.BinaryDataV2 { + if info.BinaryDataVer < config.BinaryDataV2 { mid = seq.MillisToMID(header.GetExt1()) } else { mid = seq.MID(header.GetExt1()) @@ -258,8 +256,6 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio MID: mid, RID: seq.RID(header.GetExt2()), }) - - table.IDBlocksTotal++ } return table diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index 66e6477b..db5feb33 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -12,8 +12,8 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" - "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/node" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" ) diff --git a/fracmanager/sealer_test.go b/fracmanager/sealer_test.go index f85c3f8f..51c16b6b 100644 --- a/fracmanager/sealer_test.go +++ b/fracmanager/sealer_test.go @@ -19,8 +19,8 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" - "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/indexer" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/seq" testscommon "github.com/ozontech/seq-db/tests/common" ) diff --git a/frac/sealed/sealing/blocks_builder.go b/indexwriter/blocks.go similarity index 53% rename from frac/sealed/sealing/blocks_builder.go rename to indexwriter/blocks.go index fc069cbf..3064491b 100644 --- a/frac/sealed/sealing/blocks_builder.go +++ b/indexwriter/blocks.go @@ -1,4 +1,4 @@ -package sealing +package indexwriter import ( "encoding/binary" @@ -11,53 +11,47 @@ import ( "github.com/ozontech/seq-db/util" ) -type ( - TokenBlock = util.Pair[tokensSealBlock, []token.FieldTable] -) +type tokenFieldBlock = util.Pair[unpackedTokenBlock, []token.FieldTable] -// tokensExt represents the token ID range contained in a block. -type tokensExt struct { +// tokenExt represents the token ID range contained in a block. +type tokenExt struct { minTID uint32 // First token ID in the block maxTID uint32 // Last token ID in the block } -// tokensSealBlock represents a sealed block containing token data with metadata. -type tokensSealBlock struct { - ext tokensExt // Tokens block metadata for registry marking +// unpackedTokenBlock represents a sealed block containing token data with metadata. +type unpackedTokenBlock struct { + ext tokenExt // Tokens block metadata for registry marking payload token.Block // Actual token data payload } -// lidsExt represents the range and continuation status of LID blocks. -type lidsExt struct { +// lidExt represents the range and continuation status of LID blocks. +type lidExt struct { minTID uint32 // First token ID in the LID block maxTID uint32 // Last token ID in the LID block isContinued bool // Whether LID sequence continues in next block } -// lidsSealBlock represents a sealed block containing LID (Local ID) data. -type lidsSealBlock struct { - ext lidsExt // LIDs block metadata for registry marking +// unpackedLIDBlock represents a sealed block containing LID (Local ID) data. +type unpackedLIDBlock struct { + ext lidExt // LIDs block metadata for registry marking payload lids.Block // LID data payload } -// idsSealBlock represents a sealed block containing various identifier types. -type idsSealBlock struct { +// unpackedIDBlock represents a sealed block containing various identifier types. +type unpackedIDBlock struct { mids seqids.BlockMIDs rids seqids.BlockRIDs params seqids.BlockParams } -// blocksBuilder constructs sealed blocks from various data sources. -// Provides error tracking and consistency validation during block construction. -type blocksBuilder struct{} - -func (bb *blocksBuilder) BuildTokenBlocks( +func tokenBlock( it iter.Seq2[string, iter.Seq2[TokenPosting, error]], accumulate func([]uint32) error, blockCapacity int, -) iter.Seq2[TokenBlock, error] { - return func(yield func(TokenBlock, error) bool) { +) iter.Seq2[tokenFieldBlock, error] { + return func(yield func(tokenFieldBlock, error) bool) { var ( - block tokensSealBlock + block unpackedTokenBlock blockIdx uint32 blockSize int ) @@ -86,7 +80,7 @@ func (bb *blocksBuilder) BuildTokenBlocks( emitFieldEntry() block.ext.maxTID = currentTID - pair := TokenBlock{First: block, Second: pendingTable} + pair := tokenFieldBlock{First: block, Second: pendingTable} if !yield(pair, nil) { return false } @@ -105,15 +99,15 @@ func (bb *blocksBuilder) BuildTokenBlocks( } block.ext.minTID = 1 - for field, tokenIterator := range it { + for field, tokIt := range it { emitFieldEntry() fieldName = field fieldEntryStartTID = currentTID + 1 - for pair, err := range tokenIterator { + for pair, err := range tokIt { if err != nil { - yield(TokenBlock{}, err) + yield(tokenFieldBlock{}, err) return } @@ -131,7 +125,7 @@ func (bb *blocksBuilder) BuildTokenBlocks( block.payload.Payload = append(block.payload.Payload, tok...) if err := accumulate(tlids); err != nil { - yield(TokenBlock{}, err) + yield(tokenFieldBlock{}, err) return } @@ -148,7 +142,7 @@ func (bb *blocksBuilder) BuildTokenBlocks( func newTokenTableEntry( entryStartTID, entryEndTID uint32, - blockIndex uint32, block tokensSealBlock, + blockIndex uint32, block unpackedTokenBlock, ) *token.TableEntry { // Convert global TIDs to block-local indices firstIndex := entryStartTID - block.ext.minTID @@ -168,15 +162,15 @@ func newTokenTableEntry( } } -// seqBlockID accumulates scalar (ID, position) pairs into sealed ID blocks. +// idBlock accumulates scalar (ID, position) pairs into sealed ID blocks. // A new block is yielded every `blockCapacity` IDs. -func seqBlockID(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[idsSealBlock, error] { - return func(yield func(idsSealBlock, error) bool) { - var block idsSealBlock +func idBlock(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[unpackedIDBlock, error] { + return func(yield func(unpackedIDBlock, error) bool) { + var block unpackedIDBlock for pair, err := range ids { if err != nil { - yield(idsSealBlock{}, err) + yield(unpackedIDBlock{}, err) return } @@ -202,84 +196,24 @@ func seqBlockID(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[ } } -type lidAccumulator struct { - blockCapacity int - onBlock func(lidsSealBlock) error - - currentTID uint32 - currentBlock lidsSealBlock - - isEndOfToken bool - isContinued bool -} - -func newLIDAccumulator( - blockCapacity int, - onBlock func(lidsSealBlock) error, -) *lidAccumulator { - a := &lidAccumulator{ - blockCapacity: blockCapacity, - onBlock: onBlock, +// collapseOrderedFieldsTables merges FieldTables with the same field name. +// Assumes input is sorted by Field. +func collapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable { + if len(src) == 0 { + return nil } - a.currentBlock.ext.minTID = 1 - a.currentBlock.payload = lids.Block{ - LIDs: make([]uint32, 0, blockCapacity), - Offsets: []uint32{0}, - } - - return a -} - -// Add processes LIDs of one token (must be called in TID order). -// -// For each block that fills up, `onBlock` is called immediately -// before the backing arrays are reset, so `onBlock` may read the -// block data but must not retain references to it. -func (a *lidAccumulator) Add(lidsbuf []uint32) error { - a.currentTID++ - - for _, lid := range lidsbuf { - if len(a.currentBlock.payload.LIDs) == a.blockCapacity { - if err := a.onBlock(a.finalizeBlock()); err != nil { - return err - } - - a.currentBlock.ext.minTID = a.currentTID - a.currentBlock.payload.LIDs = a.currentBlock.payload.LIDs[:0] - a.currentBlock.payload.Offsets = a.currentBlock.payload.Offsets[:1] + current := src[0] + var dst []token.FieldTable + for _, ft := range src[1:] { + if current.Field == ft.Field { + current.Entries = append(current.Entries, ft.Entries...) + continue } - a.isEndOfToken = false - a.currentBlock.ext.maxTID = a.currentTID - a.currentBlock.payload.LIDs = append(a.currentBlock.payload.LIDs, lid) - } - - a.isEndOfToken = true - a.currentBlock.payload.Offsets = append( - a.currentBlock.payload.Offsets, - uint32(len(a.currentBlock.payload.LIDs)), - ) - - return nil -} - -func (a *lidAccumulator) Finalize() error { - return a.onBlock(a.finalizeBlock()) -} - -func (a *lidAccumulator) finalizeBlock() lidsSealBlock { - if !a.isEndOfToken { - a.currentBlock.payload.Offsets = append( - a.currentBlock.payload.Offsets, - uint32(len(a.currentBlock.payload.LIDs)), - ) + dst = append(dst, current) + current = ft } - result := a.currentBlock - result.payload.IsLastLID = a.isEndOfToken - result.ext.isContinued = a.isContinued - - a.isContinued = !a.isEndOfToken - return result + return append(dst, current) } diff --git a/frac/sealed/sealing/blocks_builder_test.go b/indexwriter/blocks_test.go similarity index 84% rename from frac/sealed/sealing/blocks_builder_test.go rename to indexwriter/blocks_test.go index d6bca144..8a3951d7 100644 --- a/frac/sealed/sealing/blocks_builder_test.go +++ b/indexwriter/blocks_test.go @@ -1,4 +1,4 @@ -package sealing +package indexwriter import ( "iter" @@ -7,27 +7,20 @@ import ( "github.com/stretchr/testify/assert" - "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/seq" ) -var _ Source = (*mockSource)(nil) - type mockSource struct { - info common.Info - tokens [][]byte - fields []string - fieldMaxTIDs []uint32 - ids []seq.ID - pos []seq.DocPos - tokenLIDs [][]uint32 - blocksOffsets []uint64 + tokens [][]byte + fields []string + fieldMaxTIDs []uint32 + ids []seq.ID + pos []seq.DocPos + tokenLIDs [][]uint32 } -func (m *mockSource) Info() *common.Info { return &m.info } - func (m *mockSource) TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] { return func(yield func(string, iter.Seq2[TokenPosting, error]) bool) { start := 0 @@ -48,8 +41,7 @@ func (m *mockSource) tokensForField(start, end int) iter.Seq2[TokenPosting, erro if j < len(m.tokenLIDs) { lidsbuf = m.tokenLIDs[j] } - pair := TokenPosting{First: m.tokens[j], Second: lidsbuf} - if !yield(pair, nil) { + if !yield(TokenPosting{First: m.tokens[j], Second: lidsbuf}, nil) { return } } @@ -66,8 +58,6 @@ func (m *mockSource) ID() iter.Seq2[DocLocation, error] { } } -func (m *mockSource) BlockOffsets() []uint64 { return m.blocksOffsets } - func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { src := mockSource{ tokens: [][]byte{ @@ -114,10 +104,10 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { const blockSize = 24 const lidBlockCap = 3 - var lidBlocks []lidsSealBlock + var lidBlocks []unpackedLIDBlock lidAccumulator := newLIDAccumulator( lidBlockCap, - func(block lidsSealBlock) error { + func(block unpackedLIDBlock) error { block.payload.LIDs = slices.Clone(block.payload.LIDs) block.payload.Offsets = slices.Clone(block.payload.Offsets) lidBlocks = append(lidBlocks, block) @@ -125,12 +115,9 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { }, ) - var bb blocksBuilder - tokenBlocks := bb.BuildTokenBlocks( + tokenBlocksIter := tokenBlock( src.TokenTriplet(), - func(lids []uint32) error { - return lidAccumulator.Add(lids) - }, + lidAccumulator.Add, blockSize, ) @@ -142,7 +129,7 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { blockIndex := 0 allFieldsTables := []token.FieldTable{} - for pair, err := range tokenBlocks { + for pair, err := range tokenBlocksIter { assert.NoError(t, err) block, fieldsTables := pair.First, pair.Second assert.Equal(t, expectedSizes[blockIndex], block.payload.Len()) @@ -251,29 +238,29 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { assert.Equal(t, actualTokenTable.FieldsTables, expectedTokenTable.FieldsTables) assert.NoError(t, lidAccumulator.Finalize()) - expectedLIDBlocks := []lidsSealBlock{ + expectedLIDBlocks := []unpackedLIDBlock{ { - ext: lidsExt{minTID: 1, maxTID: 1, isContinued: false}, + ext: lidExt{minTID: 1, maxTID: 1, isContinued: false}, payload: lids.Block{LIDs: []uint32{10, 20, 30}, Offsets: []uint32{0, 3}, IsLastLID: false}, }, { - ext: lidsExt{minTID: 1, maxTID: 3, isContinued: true}, + ext: lidExt{minTID: 1, maxTID: 3, isContinued: true}, payload: lids.Block{LIDs: []uint32{40, 2, 3}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 4, maxTID: 6, isContinued: false}, + ext: lidExt{minTID: 4, maxTID: 6, isContinued: false}, payload: lids.Block{LIDs: []uint32{4, 5, 6}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 7, maxTID: 9, isContinued: false}, + ext: lidExt{minTID: 7, maxTID: 9, isContinued: false}, payload: lids.Block{LIDs: []uint32{7, 8, 9}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 10, maxTID: 12, isContinued: false}, + ext: lidExt{minTID: 10, maxTID: 12, isContinued: false}, payload: lids.Block{LIDs: []uint32{10, 11, 12}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 13, maxTID: 14, isContinued: false}, + ext: lidExt{minTID: 13, maxTID: 14, isContinued: false}, payload: lids.Block{LIDs: []uint32{13, 14}, Offsets: []uint32{0, 1, 2}, IsLastLID: true}, }, } @@ -313,7 +300,7 @@ func TestBlocksBuilder_IDsBlocks(t *testing.T) { i := 0 ids := []seq.ID{} pos := []seq.DocPos{} - for block, err := range seqBlockID(src.ID(), 3) { + for block, err := range idBlock(src.ID(), 3) { assert.NoError(t, err) assert.Equal(t, expectedSizes[i], len(block.mids.Values)) diff --git a/frac/sealed/sealing/index.go b/indexwriter/index.go similarity index 71% rename from frac/sealed/sealing/index.go rename to indexwriter/index.go index 5c23842a..f317d2d2 100644 --- a/frac/sealed/sealing/index.go +++ b/indexwriter/index.go @@ -1,7 +1,8 @@ -package sealing +package indexwriter import ( "io" + "iter" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" @@ -11,9 +12,34 @@ import ( "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/util" "github.com/ozontech/seq-db/zstd" ) +type ( + DocLocation = util.Pair[seq.ID, seq.DocPos] + TokenPosting = util.Pair[[]byte, []uint32] +) + +// Source defines the data required to write all index files for a fraction. +type Source interface { + // Info returns metadata describing this source. + Info() *common.Info + + // ID returns an iterator over stored document identifiers paired with + // their positions, in descending [seq.ID] order. + ID() iter.Seq2[DocLocation, error] + + // BlockOffsets returns byte offsets to each document block + // within this source's `.docs` file. + BlockOffsets() []uint64 + + // TokenTriplet iterates over fields in lexicographic order. + // For each field, it yields tokens (lexicographically sorted) + // paired with the local document ID list for that token. + TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] +} + // indexBlock is one compressed (or not) block with its registry metadata. type indexBlock struct { codec storage.Codec @@ -27,7 +53,7 @@ func (i indexBlock) Bin(pos int64) (storage.IndexBlockHeader, []byte) { return storage.NewIndexBlockHeader(pos, i.ext1, i.ext2, uint32(len(i.payload)), i.rawLen, i.codec), i.payload } -type IndexSealer struct { +type IndexWriter struct { params common.SealParams buf1 []byte @@ -38,39 +64,35 @@ type IndexSealer struct { tokenTable token.Table } -func NewIndexSealer(params common.SealParams) *IndexSealer { - return &IndexSealer{ +func New(params common.SealParams) *IndexWriter { + return &IndexWriter{ params: params, buf1: make([]byte, 0, consts.RegularBlockSize), buf2: make([]byte, 0, consts.RegularBlockSize), } } -func (s *IndexSealer) LIDsTable() lids.Table { +func (s *IndexWriter) LIDsTable() lids.Table { return s.lidsTable } -func (s *IndexSealer) TokenTable() token.Table { +func (s *IndexWriter) TokenTable() token.Table { return s.tokenTable } -func (s *IndexSealer) IDsTable() seqids.Table { +func (s *IndexWriter) IDsTable() seqids.Table { return s.idsTable } // WriteOffsetsFile writes the .offsets file containing a single BlockOffsets block. -func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { +func (s *IndexWriter) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { w, err := newWriter(ws) if err != nil { return err } defer w.release() - offsets := sealed.BlockOffsets{ - IDsTotal: src.Info().DocsTotal + 1, - Offsets: src.BlockOffsets(), - } - + offsets := sealed.BlockOffsets{Offsets: src.BlockOffsets()} if err := w.writeBlock(blockTypeOffset, s.packBlocksOffsetsBlock(offsets)); err != nil { return err } @@ -78,14 +100,14 @@ func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { return w.finalize() } -func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error { +func (s *IndexWriter) WriteIDFile(ws io.WriteSeeker, src Source) error { w, err := newWriter(ws) if err != nil { return err } defer w.release() - for block, err := range seqBlockID(src.ID(), consts.IDsPerBlock) { + for block, err := range idBlock(src.ID(), consts.IDsPerBlock) { if err != nil { return err } @@ -106,7 +128,7 @@ func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error { return w.finalize() } -func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) error { +func (s *IndexWriter) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) error { tw, err := newWriter(tws) if err != nil { return err @@ -119,19 +141,15 @@ func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err } defer lw.release() - var ( - bb blocksBuilder - allFieldsTables []token.FieldTable - ) - lidAccumulator := newLIDAccumulator( consts.LIDBlockCap, - func(block lidsSealBlock) error { + func(block unpackedLIDBlock) error { return lw.writeBlock(blockTypeLID, s.packLIDsBlock(block)) }, ) - for pair, err := range bb.BuildTokenBlocks(src.TokenTriplet(), lidAccumulator.Add, consts.RegularBlockSize) { + var allFieldsTables []token.FieldTable + for pair, err := range tokenBlock(src.TokenTriplet(), lidAccumulator.Add, consts.RegularBlockSize) { if err != nil { return err } @@ -150,7 +168,7 @@ func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err return s.finalizeTokenFile(tw, allFieldsTables) } -func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) error { +func (s *IndexWriter) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) error { if err := lidAccumulator.Finalize(); err != nil { return err } @@ -158,7 +176,7 @@ func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) return w.finalize() } -func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.FieldTable) error { +func (s *IndexWriter) finalizeTokenFile(w *writer, allFieldsTables []token.FieldTable) error { // Emit section separator. if err := w.writeEmptyBlock(); err != nil { return err @@ -172,39 +190,17 @@ func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.Field return w.finalize() } -func (s *IndexSealer) WriteInfoFile(ws io.Writer, src Source) error { +func (s *IndexWriter) WriteInfoFile(ws io.Writer, src Source) error { block := sealed.BlockInfo{Info: src.Info()} _, err := ws.Write(s.packInfoBlock(block).payload) return err } -// collapseOrderedFieldsTables merges FieldTables with the same field name. -// Assumes input is sorted by Field. -func collapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable { - if len(src) == 0 { - return nil - } - - current := src[0] - var dst []token.FieldTable - for _, ft := range src[1:] { - if current.Field == ft.Field { - current.Entries = append(current.Entries, ft.Entries...) - continue - } - - dst = append(dst, current) - current = ft - } - - return append(dst, current) -} - func newIndexBlock(raw []byte) indexBlock { return indexBlock{codec: storage.CodecNo, rawLen: uint32(len(raw)), payload: raw} } -func (s *IndexSealer) newIndexBlockZSTD(raw []byte, level int) indexBlock { +func (s *IndexWriter) newIndexBlockZSTD(raw []byte, level int) indexBlock { s.buf2 = zstd.CompressLevel(raw, s.buf2[:0], level) if len(s.buf2) < len(raw) { return indexBlock{codec: storage.CodecZSTD, rawLen: uint32(len(raw)), payload: s.buf2} @@ -213,13 +209,14 @@ func (s *IndexSealer) newIndexBlockZSTD(raw []byte, level int) indexBlock { } // packInfoBlock packs fraction information into an index block. -func (s *IndexSealer) packInfoBlock(block sealed.BlockInfo) indexBlock { +func (s *IndexWriter) packInfoBlock(block sealed.BlockInfo) indexBlock { + s.idsTable.IDsTotal = block.Info.DocsTotal + 1 // Increment by one for [seq.SystemID] s.buf1 = block.Pack(s.buf1[:0]) return newIndexBlock(s.buf1) // Info block is typically small, no compression } // packTokenBlock packs token data into a compressed index block. -func (s *IndexSealer) packTokenBlock(block tokensSealBlock) indexBlock { +func (s *IndexWriter) packTokenBlock(block unpackedTokenBlock) indexBlock { s.buf1 = block.payload.Pack(s.buf1[:0]) // Pack token data b := s.newIndexBlockZSTD(s.buf1, s.params.TokenListZstdLevel) // Store TID range in extended metadata @@ -228,7 +225,7 @@ func (s *IndexSealer) packTokenBlock(block tokensSealBlock) indexBlock { } // packTokenTableBlock packs the token table into a compressed index block. -func (s *IndexSealer) packTokenTableBlock(tokenTableBlock token.TableBlock) indexBlock { +func (s *IndexWriter) packTokenTableBlock(tokenTableBlock token.TableBlock) indexBlock { s.tokenTable = token.TableFromBlocks([]token.TableBlock{tokenTableBlock}) // Store for PreloadedData // Packing block @@ -237,11 +234,7 @@ func (s *IndexSealer) packTokenTableBlock(tokenTableBlock token.TableBlock) inde } // packBlocksOffsetsBlock packs document block offsets into a compressed index block. -func (s *IndexSealer) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlock { - // Update IDs table for PreloadedData - s.idsTable.IDsTotal = block.IDsTotal // Total number of IDs - s.idsTable.IDBlocksTotal = uint32(len(block.Offsets)) // Number of ID blocks - +func (s *IndexWriter) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlock { // Packing block s.buf1 = block.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.DocsPositionsZstdLevel) @@ -249,7 +242,7 @@ func (s *IndexSealer) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlo } // packMIDsBlock packs MIDs into a compressed index block. -func (s *IndexSealer) packMIDsBlock(block idsSealBlock) indexBlock { +func (s *IndexWriter) packMIDsBlock(block unpackedIDBlock) indexBlock { // Get the last ID in the block (smallest due to descending order) last := len(block.mids.Values) - 1 @@ -272,14 +265,14 @@ func (s *IndexSealer) packMIDsBlock(block idsSealBlock) indexBlock { } // packRIDsBlock packs RIDs into a compressed index block. -func (s *IndexSealer) packRIDsBlock(block idsSealBlock) indexBlock { +func (s *IndexWriter) packRIDsBlock(block unpackedIDBlock) indexBlock { s.buf1 = block.rids.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) return b } // packPosBlock packs document positions into a compressed index block. -func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock { +func (s *IndexWriter) packPosBlock(block unpackedIDBlock) indexBlock { s.buf1 = block.params.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) return b @@ -287,7 +280,7 @@ func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock { // packLIDsBlock packs Local IDs (LIDs) into a compressed index block. // Also updates LIDs table for preloaded data access. -func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock { +func (s *IndexWriter) packLIDsBlock(block unpackedLIDBlock) indexBlock { var ext1 uint64 if block.ext.isContinued { // todo: Legacy continuation flag ext1 = 1 diff --git a/indexwriter/lid_accumulator.go b/indexwriter/lid_accumulator.go new file mode 100644 index 00000000..f3b3740a --- /dev/null +++ b/indexwriter/lid_accumulator.go @@ -0,0 +1,85 @@ +package indexwriter + +import "github.com/ozontech/seq-db/frac/sealed/lids" + +type lidAccumulator struct { + blockCapacity int + onBlock func(unpackedLIDBlock) error + + currentTID uint32 + currentBlock unpackedLIDBlock + + isEndOfToken bool + isContinued bool +} + +func newLIDAccumulator( + blockCapacity int, + onBlock func(unpackedLIDBlock) error, +) *lidAccumulator { + a := &lidAccumulator{ + blockCapacity: blockCapacity, + onBlock: onBlock, + } + + a.currentBlock.ext.minTID = 1 + a.currentBlock.payload = lids.Block{ + LIDs: make([]uint32, 0, blockCapacity), + Offsets: []uint32{0}, + } + + return a +} + +// Add processes LIDs of one token (must be called in TID order). +// +// For each block that fills up, `onBlock` is called immediately +// before the backing arrays are reset, so `onBlock` may read the +// block data but must not retain references to it. +func (a *lidAccumulator) Add(lidsbuf []uint32) error { + a.currentTID++ + + for _, lid := range lidsbuf { + if len(a.currentBlock.payload.LIDs) == a.blockCapacity { + if err := a.onBlock(a.finalizeBlock()); err != nil { + return err + } + + a.currentBlock.ext.minTID = a.currentTID + a.currentBlock.payload.LIDs = a.currentBlock.payload.LIDs[:0] + a.currentBlock.payload.Offsets = a.currentBlock.payload.Offsets[:1] + } + + a.isEndOfToken = false + a.currentBlock.ext.maxTID = a.currentTID + a.currentBlock.payload.LIDs = append(a.currentBlock.payload.LIDs, lid) + } + + a.isEndOfToken = true + a.currentBlock.payload.Offsets = append( + a.currentBlock.payload.Offsets, + uint32(len(a.currentBlock.payload.LIDs)), + ) + + return nil +} + +func (a *lidAccumulator) Finalize() error { + return a.onBlock(a.finalizeBlock()) +} + +func (a *lidAccumulator) finalizeBlock() unpackedLIDBlock { + if !a.isEndOfToken { + a.currentBlock.payload.Offsets = append( + a.currentBlock.payload.Offsets, + uint32(len(a.currentBlock.payload.LIDs)), + ) + } + + result := a.currentBlock + result.payload.IsLastLID = a.isEndOfToken + result.ext.isContinued = a.isContinued + + a.isContinued = !a.isEndOfToken + return result +} diff --git a/frac/sealed/sealing/writer.go b/indexwriter/writer.go similarity index 99% rename from frac/sealed/sealing/writer.go rename to indexwriter/writer.go index 1a147e4e..1fb9909d 100644 --- a/frac/sealed/sealing/writer.go +++ b/indexwriter/writer.go @@ -1,4 +1,4 @@ -package sealing +package indexwriter import ( "bytes" diff --git a/frac/sealed/sealing/sealer.go b/sealing/sealer.go similarity index 61% rename from frac/sealed/sealing/sealer.go rename to sealing/sealer.go index 57863d82..0c21ffc4 100644 --- a/frac/sealed/sealing/sealer.go +++ b/sealing/sealer.go @@ -2,41 +2,19 @@ package sealing import ( "errors" - "iter" "os" "path/filepath" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" - "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/indexwriter" "github.com/ozontech/seq-db/util" ) -type ( - DocLocation = util.Pair[seq.ID, seq.DocPos] - TokenPosting = util.Pair[[]byte, []uint32] -) - -// Source interface defines the contract for data sources that can be sealed. -// Provides access to all necessary data components for index creation -type Source interface { - // Info returns metadata describing this source. - Info() *common.Info - - // ID returns an iterator over stored document identifiers paired with - // their positions, in descending [seq.ID] order. - ID() iter.Seq2[DocLocation, error] - - // BlockOffsets returns byte offsets to each document block - // within this source's `.docs` file. - BlockOffsets() []uint64 - - // TokenTriplet iterates over fields in lexicographic order. - // For each field, it yields tokens (lexicographically sorted) - // paired with the local document ID list for that token. - TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] -} +// Source defines the contract for data sources that can be sealed. +// Provides access to all necessary data components for index creation. +type Source = indexwriter.Source // Seal writes five index files (.info, .token, .offsets, .id, .lid) for the fraction // and returns PreloadedData for fast initialization of the sealed fraction. @@ -47,12 +25,11 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { return nil, errors.New("sealing of an empty active fraction is not supported") } - sealer := NewIndexSealer(params) - + w := indexwriter.New(params) if err := createAndWrite( info.Path+consts.OffsetsTmpFileSuffix, info.Path+consts.OffsetsFileSuffix, - func(f *os.File) error { return sealer.WriteOffsetsFile(f, src) }, + func(f *os.File) error { return w.WriteOffsetsFile(f, src) }, ); err != nil { return nil, err } @@ -60,7 +37,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWrite( info.Path+consts.IDTmpFileSuffix, info.Path+consts.IDFileSuffix, - func(f *os.File) error { return sealer.WriteIDFile(f, src) }, + func(f *os.File) error { return w.WriteIDFile(f, src) }, ); err != nil { return nil, err } @@ -68,7 +45,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWriteBoth( info.Path+consts.TokenTmpFileSuffix, info.Path+consts.TokenFileSuffix, info.Path+consts.LIDTmpFileSuffix, info.Path+consts.LIDFileSuffix, - func(tokenF, lidF *os.File) error { return sealer.WriteTokenTriplet(tokenF, lidF, src) }, + func(tokenF, lidF *os.File) error { return w.WriteTokenTriplet(tokenF, lidF, src) }, ); err != nil { return nil, err } @@ -76,7 +53,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWrite( info.Path+consts.InfoTmpFileSuffix, info.Path+consts.InfoFileSuffix, - func(f *os.File) error { return sealer.WriteInfoFile(f, src) }, + func(f *os.File) error { return w.WriteInfoFile(f, src) }, ); err != nil { return nil, err } @@ -100,13 +77,13 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { } info.IndexOnDisk = totalSize - lidsTable := sealer.LIDsTable() + lidsTable := w.LIDsTable() preloaded := &sealed.PreloadedData{ Info: info, - TokenTable: sealer.TokenTable(), + TokenTable: w.TokenTable(), BlocksData: sealed.BlocksData{ - IDsTable: sealer.IDsTable(), + IDsTable: w.IDsTable(), LIDsTable: &lidsTable, BlocksOffsets: src.BlockOffsets(), }, @@ -123,10 +100,7 @@ func syncAndClose(f *os.File) error { return f.Close() } -func createAndWrite( - tmp, final string, - write func(*os.File) error, -) error { +func createAndWrite(tmp, final string, write func(*os.File) error) error { f, err := os.Create(tmp) if err != nil { return err @@ -140,16 +114,16 @@ func createAndWrite( } func createAndWriteBoth( - tmpa, finala, - tmpb, finalb string, + atmp, afinal, + btmp, bfinal string, write func(*os.File, *os.File) error, ) error { - a, err := os.Create(tmpa) + a, err := os.Create(atmp) if err != nil { return err } - b, err := os.Create(tmpb) + b, err := os.Create(btmp) if err != nil { a.Close() return err @@ -160,9 +134,9 @@ func createAndWriteBoth( return err } - if err := os.Rename(tmpa, finala); err != nil { + if err := os.Rename(atmp, afinal); err != nil { return err } - return os.Rename(tmpb, finalb) + return os.Rename(btmp, bfinal) }