From af3e828f4f72f89889c12ce62f25d4a0950918b9 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Thu, 7 May 2026 15:01:10 +0300 Subject: [PATCH 1/2] feat: rotate fraction based on in-memory size --- Makefile | 2 +- frac/active.go | 9 ++++++ frac/active_docs_positions.go | 11 ++++++++ frac/active_ids.go | 7 +++++ frac/active_lids.go | 12 ++++++++ frac/active_lids_map.go | 11 ++++++++ ...ive_sealing_source.go => active_source.go} | 28 +++++++++---------- frac/active_token_list.go | 27 ++++++++++++++++++ fracmanager/fraction_registry.go | 2 +- 9 files changed, 93 insertions(+), 16 deletions(-) rename frac/{active_sealing_source.go => active_source.go} (91%) diff --git a/Makefile b/Makefile index 9a90289f..1d8c034d 100644 --- a/Makefile +++ b/Makefile @@ -56,7 +56,7 @@ test-deps: .PHONY: test test: test-deps - LOG_LEVEL=ERROR go test ./... -count 1 + LOG_LEVEL=ERROR go test ./... -count 1 -v .bin-deps: export GOBIN := $(LOCAL_BIN) .bin-deps: diff --git a/frac/active.go b/frac/active.go index 91e25c22..e825a2ca 100644 --- a/frac/active.go +++ b/frac/active.go @@ -410,6 +410,15 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { } } +func (f *Active) MemSize() int { + return f.MIDs.Size() + + f.RIDs.Size() + + f.DocBlocks.Size() + + f.DocsPositions.Size() + + f.IDsToLIDs.Size() + + f.TokenList.Size() +} + func (f *Active) Info() *common.Info { f.infoMu.RLock() defer f.infoMu.RUnlock() diff --git a/frac/active_docs_positions.go b/frac/active_docs_positions.go index f058091f..618ca910 100644 --- a/frac/active_docs_positions.go +++ b/frac/active_docs_positions.go @@ -2,6 +2,7 @@ package frac import ( "sync" + "unsafe" "github.com/ozontech/seq-db/seq" ) @@ -32,6 +33,16 @@ func (dp *DocsPositions) GetSync(id seq.ID) seq.DocPos { return dp.Get(id) } +func (dp *DocsPositions) Size() int { + dp.mu.RLock() + defer dp.mu.RUnlock() + + const entrySize = int(unsafe.Sizeof(seq.ID{})) + + int(unsafe.Sizeof(seq.DocPos(0))) + + return len(dp.idToPos) * entrySize +} + // SetMultiple returns a slice of added ids func (dp *DocsPositions) SetMultiple(ids []seq.ID, pos []seq.DocPos) []seq.ID { dp.mu.Lock() diff --git a/frac/active_ids.go b/frac/active_ids.go index 1195c8fa..cab87083 100644 --- a/frac/active_ids.go +++ b/frac/active_ids.go @@ -2,6 +2,7 @@ package frac import ( "sync" + "unsafe" ) type UInt64s struct { @@ -47,3 +48,9 @@ func (l *UInt64s) Append(val uint64) uint32 { return l.append(val) } + +func (l *UInt64s) Size() int { + l.mu.RLock() + defer l.mu.RUnlock() + return len(l.vals) * int(unsafe.Sizeof(int64(0))) +} diff --git a/frac/active_lids.go b/frac/active_lids.go index 236136ef..41970d45 100644 --- a/frac/active_lids.go +++ b/frac/active_lids.go @@ -135,6 +135,18 @@ func mergeSorted(right, left []uint32, mids, rids []uint64) []uint32 { return result } +func (tl *TokenLIDs) Size() int { + tl.sortedMu.Lock() + sortedLen := len(tl.sorted) + tl.sortedMu.Unlock() + + tl.queueMu.Lock() + queueLen := len(tl.queue) + tl.queueMu.Unlock() + + return (sortedLen + queueLen) * 4 +} + func (tl *TokenLIDs) PutLIDsInQueue(lids []uint32) int { tl.queueMu.Lock() defer tl.queueMu.Unlock() diff --git a/frac/active_lids_map.go b/frac/active_lids_map.go index bae64854..418fb82c 100644 --- a/frac/active_lids_map.go +++ b/frac/active_lids_map.go @@ -2,6 +2,7 @@ package frac import ( "sync" + "unsafe" "github.com/ozontech/seq-db/seq" ) @@ -27,6 +28,16 @@ func (al *ActiveLIDs) Get(id seq.ID) (seq.LID, bool) { return val, ok } +func (al *ActiveLIDs) Size() int { + al.mu.RLock() + defer al.mu.RUnlock() + + const entrySize = int(unsafe.Sizeof(seq.ID{})) + + int(unsafe.Sizeof(seq.LID(0))) + + return len(al.idToLid) * entrySize +} + func (al *ActiveLIDs) SetMultiple(ids []seq.ID, lids []uint32) { al.mu.Lock() defer al.mu.Unlock() diff --git a/frac/active_sealing_source.go b/frac/active_source.go similarity index 91% rename from frac/active_sealing_source.go rename to frac/active_source.go index e7c451e2..af7084b0 100644 --- a/frac/active_sealing_source.go +++ b/frac/active_source.go @@ -30,7 +30,7 @@ type ( IndexedDocBlock = util.Pair[[]byte, []seq.DocPos] ) -type ActiveSealingSource struct { +type ActiveSource struct { params common.SealParams // Sealing parameters info *common.Info // fraction Info @@ -55,13 +55,13 @@ type ActiveSealingSource struct { docsReader *storage.DocsReader // Document storage reader } -func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSealingSource, error) { +func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSource, error) { info := *active.info // copy sortedLIDs := active.GetAllDocuments() fields, fieldTIDs := sortFields(active.TokenList) - src := ActiveSealingSource{ + src := ActiveSource{ params: params, info: &info, @@ -116,7 +116,7 @@ func sortFields(tl *TokenList) ([]string, [][]uint32) { return fields, fieldTIDs } -func (src *ActiveSealingSource) ID() iter.Seq2[DocLocation, error] { +func (src *ActiveSource) ID() iter.Seq2[DocLocation, error] { return func(yield func(DocLocation, error) bool) { mids := src.mids.vals rids := src.rids.vals @@ -155,11 +155,11 @@ func (src *ActiveSealingSource) ID() iter.Seq2[DocLocation, error] { } } -func (src *ActiveSealingSource) BlockOffsets() []uint64 { +func (src *ActiveSource) BlockOffsets() []uint64 { return src.blocksOffsets } -func (src *ActiveSealingSource) prepareInfo() { +func (src *ActiveSource) prepareInfo() { src.info.MetaOnDisk = 0 src.info.SealingTime = uint64(src.created.UnixMilli()) mids := src.mids.vals @@ -170,17 +170,17 @@ func (src *ActiveSealingSource) prepareInfo() { src.info.BuildDistribution(mids) } -func (src *ActiveSealingSource) prepareLids() { +func (src *ActiveSource) prepareLids() { for _, tl := range src.lids[1:] { tl.GetLIDs(src.mids, src.rids) } } -func (src *ActiveSealingSource) Info() *common.Info { +func (src *ActiveSource) Info() *common.Info { return src.info } -func (src *ActiveSealingSource) TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] { +func (src *ActiveSource) TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] { return func(yield func(string, iter.Seq2[TokenPosting, error]) bool) { for idx, field := range src.fields { if !yield(field, src.postingsForField(field, idx)) { @@ -190,7 +190,7 @@ func (src *ActiveSealingSource) TokenTriplet() iter.Seq2[string, iter.Seq2[Token } } -func (src *ActiveSealingSource) postingsForField(field string, idx int) iter.Seq2[TokenPosting, error] { +func (src *ActiveSource) postingsForField(field string, idx int) iter.Seq2[TokenPosting, error] { var lidsbuf []uint32 return func(yield func(TokenPosting, error) bool) { for _, tid := range src.fieldTIDs[idx] { @@ -221,7 +221,7 @@ func makeInverser(sortedLIDs []uint32) []uint32 { // Docs returns an iterator for documents with their IDs. // Handles duplicate IDs (for nested indexes). -func (src *ActiveSealingSource) Docs() iter.Seq2[Document, error] { +func (src *ActiveSource) Docs() iter.Seq2[Document, error] { return func(yield func(Document, error) bool) { var ( curdoc []byte @@ -256,7 +256,7 @@ func (src *ActiveSealingSource) Docs() iter.Seq2[Document, error] { } // doc reads a document from storage by its position. -func (src *ActiveSealingSource) doc(pos seq.DocPos) ([]byte, error) { +func (src *ActiveSource) doc(pos seq.DocPos) ([]byte, error) { blockIndex, docOffset := pos.Unpack() blockOffset := src.blocksOffsets[blockIndex] @@ -277,7 +277,7 @@ func (src *ActiveSealingSource) doc(pos seq.DocPos) ([]byte, error) { // SortDocs sorts documents and writes them in compressed form to disk. // Creates a temporary file that is then renamed to the final one. -func (src *ActiveSealingSource) SortDocs() error { +func (src *ActiveSource) SortDocs() error { start := time.Now() logger.Info("sorting docs...") @@ -346,7 +346,7 @@ func (src *ActiveSealingSource) SortDocs() error { // writeDocs compresses and writes document blocks, calculating new offsets // and collecting document positions. -func (src *ActiveSealingSource) writeDocs(blocks iter.Seq2[IndexedDocBlock, error], w io.Writer) ([]uint64, []seq.DocPos, error) { +func (src *ActiveSource) writeDocs(blocks iter.Seq2[IndexedDocBlock, error], w io.Writer) ([]uint64, []seq.DocPos, error) { offset := 0 buf := make([]byte, 0) blocksOffsets := make([]uint64, 0) diff --git a/frac/active_token_list.go b/frac/active_token_list.go index adf94ffd..77c85d02 100644 --- a/frac/active_token_list.go +++ b/frac/active_token_list.go @@ -5,6 +5,7 @@ import ( "fmt" "hash/crc32" "sync" + "unsafe" "github.com/ozontech/seq-db/seq" @@ -91,6 +92,32 @@ func NewActiveTokenList(workers int) *TokenList { return tl } + +func (tl *TokenList) Size() int { + size := 0 + + tl.tidMu.RLock() + for _, val := range tl.tidToVal { + size += len(val) + } + + for _, lids := range tl.tidToLIDs { + if lids != nil { + size += lids.Size() + } + } + tl.tidMu.RUnlock() + + tl.fieldsMu.RLock() + for field, tids := range tl.FieldTIDs { + size += len(field) + + len(tids)*int(unsafe.Sizeof(uint32(0))) + } + tl.fieldsMu.RUnlock() + + return size +} + func (tl *TokenList) Stop() { for _, c := range tl.chList { close(c) diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index c8a12eef..fa24b7f3 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -122,7 +122,7 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active r.mu.Lock() defer r.mu.Unlock() - if r.active.instance.Info().DocsOnDisk <= maxSize { + if uint64(r.active.instance.MemSize()) <= maxSize { return nil, nil, nil } From 55a05b67b234574a7615ff0ec3b1daa9a39044f8 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Thu, 7 May 2026 16:00:25 +0300 Subject: [PATCH 2/2] chore: frac_size=1MiB --- .seqbench/comparison.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.seqbench/comparison.env b/.seqbench/comparison.env index ee12dfcb..a43f6254 100644 --- a/.seqbench/comparison.env +++ b/.seqbench/comparison.env @@ -1,6 +1,6 @@ GOGC=100 -SEQDB_STORAGE_FRAC_SIZE=16MiB +SEQDB_STORAGE_FRAC_SIZE=1MiB SEQDB_STORAGE_TOTAL_SIZE=10GiB SEQDB_LIMITS_QUERY_RATE=1024