Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/indexer"
"github.com/ozontech/seq-db/parser"
"github.com/ozontech/seq-db/sealing"
"github.com/ozontech/seq-db/seq"
"github.com/ozontech/seq-db/storage"
testcommon "github.com/ozontech/seq-db/tests/common"
Expand Down
2 changes: 1 addition & 1 deletion frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/indexer"
"github.com/ozontech/seq-db/node"
"github.com/ozontech/seq-db/parser"
"github.com/ozontech/seq-db/sealing"
"github.com/ozontech/seq-db/seq"
"github.com/ozontech/seq-db/storage"
"github.com/ozontech/seq-db/storage/s3"
Expand Down
2 changes: 1 addition & 1 deletion fracmanager/fraction_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/node"
"github.com/ozontech/seq-db/sealing"
"github.com/ozontech/seq-db/storage"
"github.com/ozontech/seq-db/storage/s3"
)
Expand Down
2 changes: 1 addition & 1 deletion fracmanager/sealer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/indexer"
"github.com/ozontech/seq-db/sealing"
"github.com/ozontech/seq-db/seq"
testscommon "github.com/ozontech/seq-db/tests/common"
)
Expand Down
152 changes: 43 additions & 109 deletions frac/sealed/sealing/blocks_builder.go → indexwriter/blocks.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sealing
package indexwriter

import (
"encoding/binary"
Expand All @@ -11,53 +11,47 @@ import (
"github.com/ozontech/seq-db/util"
)

type (
TokenBlock = util.Pair[tokensSealBlock, []token.FieldTable]
)
type tokenFieldBlock = util.Pair[unpackedTokenBlock, []token.FieldTable]

// tokensExt represents the token ID range contained in a block.
type tokensExt struct {
// tokenExt represents the token ID range contained in a block.
type tokenExt struct {
minTID uint32 // First token ID in the block
maxTID uint32 // Last token ID in the block
}

// tokensSealBlock represents a sealed block containing token data with metadata.
type tokensSealBlock struct {
ext tokensExt // Tokens block metadata for registry marking
// unpackedTokenBlock represents a sealed block containing token data with metadata.
type unpackedTokenBlock struct {
ext tokenExt // Tokens block metadata for registry marking
payload token.Block // Actual token data payload
}

// lidsExt represents the range and continuation status of LID blocks.
type lidsExt struct {
// lidExt represents the range and continuation status of LID blocks.
type lidExt struct {
minTID uint32 // First token ID in the LID block
maxTID uint32 // Last token ID in the LID block
isContinued bool // Whether LID sequence continues in next block
}

// lidsSealBlock represents a sealed block containing LID (Local ID) data.
type lidsSealBlock struct {
ext lidsExt // LIDs block metadata for registry marking
// unpackedLIDBlock represents a sealed block containing LID (Local ID) data.
type unpackedLIDBlock struct {
ext lidExt // LIDs block metadata for registry marking
payload lids.Block // LID data payload
}

// idsSealBlock represents a sealed block containing various identifier types.
type idsSealBlock struct {
// unpackedIDBlock represents a sealed block containing various identifier types.
type unpackedIDBlock struct {
mids seqids.BlockMIDs
rids seqids.BlockRIDs
params seqids.BlockParams
}

// blocksBuilder constructs sealed blocks from various data sources.
// Provides error tracking and consistency validation during block construction.
type blocksBuilder struct{}

func (bb *blocksBuilder) BuildTokenBlocks(
func tokenBlock(
it iter.Seq2[string, iter.Seq2[TokenPosting, error]],
accumulate func([]uint32) error, blockCapacity int,
) iter.Seq2[TokenBlock, error] {
return func(yield func(TokenBlock, error) bool) {
) iter.Seq2[tokenFieldBlock, error] {
return func(yield func(tokenFieldBlock, error) bool) {
var (
block tokensSealBlock
block unpackedTokenBlock
blockIdx uint32
blockSize int
)
Expand Down Expand Up @@ -86,7 +80,7 @@ func (bb *blocksBuilder) BuildTokenBlocks(
emitFieldEntry()
block.ext.maxTID = currentTID

pair := TokenBlock{First: block, Second: pendingTable}
pair := tokenFieldBlock{First: block, Second: pendingTable}
if !yield(pair, nil) {
return false
}
Expand All @@ -105,15 +99,15 @@ func (bb *blocksBuilder) BuildTokenBlocks(
}

block.ext.minTID = 1
for field, tokenIterator := range it {
for field, tokIt := range it {
emitFieldEntry()

fieldName = field
fieldEntryStartTID = currentTID + 1

for pair, err := range tokenIterator {
for pair, err := range tokIt {
if err != nil {
yield(TokenBlock{}, err)
yield(tokenFieldBlock{}, err)
return
}

Expand All @@ -131,7 +125,7 @@ func (bb *blocksBuilder) BuildTokenBlocks(
block.payload.Payload = append(block.payload.Payload, tok...)

if err := accumulate(tlids); err != nil {
yield(TokenBlock{}, err)
yield(tokenFieldBlock{}, err)
return
}

Expand All @@ -148,7 +142,7 @@ func (bb *blocksBuilder) BuildTokenBlocks(

func newTokenTableEntry(
entryStartTID, entryEndTID uint32,
blockIndex uint32, block tokensSealBlock,
blockIndex uint32, block unpackedTokenBlock,
) *token.TableEntry {
// Convert global TIDs to block-local indices
firstIndex := entryStartTID - block.ext.minTID
Expand All @@ -168,15 +162,15 @@ func newTokenTableEntry(
}
}

// seqBlockID accumulates scalar (ID, position) pairs into sealed ID blocks.
// idBlock accumulates scalar (ID, position) pairs into sealed ID blocks.
// A new block is yielded every `blockCapacity` IDs.
func seqBlockID(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[idsSealBlock, error] {
return func(yield func(idsSealBlock, error) bool) {
var block idsSealBlock
func idBlock(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[unpackedIDBlock, error] {
return func(yield func(unpackedIDBlock, error) bool) {
var block unpackedIDBlock

for pair, err := range ids {
if err != nil {
yield(idsSealBlock{}, err)
yield(unpackedIDBlock{}, err)
return
}

Expand All @@ -202,84 +196,24 @@ func seqBlockID(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[
}
}

type lidAccumulator struct {
blockCapacity int
onBlock func(lidsSealBlock) error

currentTID uint32
currentBlock lidsSealBlock

isEndOfToken bool
isContinued bool
}

func newLIDAccumulator(
blockCapacity int,
onBlock func(lidsSealBlock) error,
) *lidAccumulator {
a := &lidAccumulator{
blockCapacity: blockCapacity,
onBlock: onBlock,
// collapseOrderedFieldsTables merges FieldTables with the same field name.
// Assumes input is sorted by Field.
func collapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable {
if len(src) == 0 {
return nil
}

a.currentBlock.ext.minTID = 1
a.currentBlock.payload = lids.Block{
LIDs: make([]uint32, 0, blockCapacity),
Offsets: []uint32{0},
}

return a
}

// Add processes LIDs of one token (must be called in TID order).
//
// For each block that fills up, `onBlock` is called immediately
// before the backing arrays are reset, so `onBlock` may read the
// block data but must not retain references to it.
func (a *lidAccumulator) Add(lidsbuf []uint32) error {
a.currentTID++

for _, lid := range lidsbuf {
if len(a.currentBlock.payload.LIDs) == a.blockCapacity {
if err := a.onBlock(a.finalizeBlock()); err != nil {
return err
}

a.currentBlock.ext.minTID = a.currentTID
a.currentBlock.payload.LIDs = a.currentBlock.payload.LIDs[:0]
a.currentBlock.payload.Offsets = a.currentBlock.payload.Offsets[:1]
current := src[0]
var dst []token.FieldTable
for _, ft := range src[1:] {
if current.Field == ft.Field {
current.Entries = append(current.Entries, ft.Entries...)
continue
}

a.isEndOfToken = false
a.currentBlock.ext.maxTID = a.currentTID
a.currentBlock.payload.LIDs = append(a.currentBlock.payload.LIDs, lid)
}

a.isEndOfToken = true
a.currentBlock.payload.Offsets = append(
a.currentBlock.payload.Offsets,
uint32(len(a.currentBlock.payload.LIDs)),
)

return nil
}

func (a *lidAccumulator) Finalize() error {
return a.onBlock(a.finalizeBlock())
}

func (a *lidAccumulator) finalizeBlock() lidsSealBlock {
if !a.isEndOfToken {
a.currentBlock.payload.Offsets = append(
a.currentBlock.payload.Offsets,
uint32(len(a.currentBlock.payload.LIDs)),
)
dst = append(dst, current)
current = ft
}

result := a.currentBlock
result.payload.IsLastLID = a.isEndOfToken
result.ext.isContinued = a.isContinued

a.isContinued = !a.isEndOfToken
return result
return append(dst, current)
}
Loading
Loading