Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .seqbench/comparison.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
GOGC=100

SEQDB_STORAGE_FRAC_SIZE=16MiB
SEQDB_STORAGE_FRAC_SIZE=1MiB
SEQDB_STORAGE_TOTAL_SIZE=10GiB

SEQDB_LIMITS_QUERY_RATE=1024
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ test-deps:

.PHONY: test
test: test-deps
LOG_LEVEL=ERROR go test ./... -count 1
LOG_LEVEL=ERROR go test ./... -count 1 -v

.bin-deps: export GOBIN := $(LOCAL_BIN)
.bin-deps:
Expand Down
9 changes: 9 additions & 0 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,15 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider {
}
}

func (f *Active) MemSize() int {
return f.MIDs.Size() +
f.RIDs.Size() +
f.DocBlocks.Size() +
f.DocsPositions.Size() +
f.IDsToLIDs.Size() +
f.TokenList.Size()
}

func (f *Active) Info() *common.Info {
f.infoMu.RLock()
defer f.infoMu.RUnlock()
Expand Down
11 changes: 11 additions & 0 deletions frac/active_docs_positions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package frac

import (
"sync"
"unsafe"

"github.com/ozontech/seq-db/seq"
)
Expand Down Expand Up @@ -32,6 +33,16 @@ func (dp *DocsPositions) GetSync(id seq.ID) seq.DocPos {
return dp.Get(id)
}

func (dp *DocsPositions) Size() int {
dp.mu.RLock()
defer dp.mu.RUnlock()

const entrySize = int(unsafe.Sizeof(seq.ID{})) +
int(unsafe.Sizeof(seq.DocPos(0)))

return len(dp.idToPos) * entrySize
}

// SetMultiple returns a slice of added ids
func (dp *DocsPositions) SetMultiple(ids []seq.ID, pos []seq.DocPos) []seq.ID {
dp.mu.Lock()
Expand Down
7 changes: 7 additions & 0 deletions frac/active_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package frac

import (
"sync"
"unsafe"
)

type UInt64s struct {
Expand Down Expand Up @@ -47,3 +48,9 @@ func (l *UInt64s) Append(val uint64) uint32 {

return l.append(val)
}

func (l *UInt64s) Size() int {
l.mu.RLock()
defer l.mu.RUnlock()
return len(l.vals) * int(unsafe.Sizeof(int64(0)))
}
12 changes: 12 additions & 0 deletions frac/active_lids.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ func mergeSorted(right, left []uint32, mids, rids []uint64) []uint32 {
return result
}

func (tl *TokenLIDs) Size() int {
tl.sortedMu.Lock()
sortedLen := len(tl.sorted)
tl.sortedMu.Unlock()

tl.queueMu.Lock()
queueLen := len(tl.queue)
tl.queueMu.Unlock()

return (sortedLen + queueLen) * 4
}

func (tl *TokenLIDs) PutLIDsInQueue(lids []uint32) int {
tl.queueMu.Lock()
defer tl.queueMu.Unlock()
Expand Down
11 changes: 11 additions & 0 deletions frac/active_lids_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package frac

import (
"sync"
"unsafe"

"github.com/ozontech/seq-db/seq"
)
Expand All @@ -27,6 +28,16 @@ func (al *ActiveLIDs) Get(id seq.ID) (seq.LID, bool) {
return val, ok
}

func (al *ActiveLIDs) Size() int {
al.mu.RLock()
defer al.mu.RUnlock()

const entrySize = int(unsafe.Sizeof(seq.ID{})) +
int(unsafe.Sizeof(seq.LID(0)))

return len(al.idToLid) * entrySize
}

func (al *ActiveLIDs) SetMultiple(ids []seq.ID, lids []uint32) {
al.mu.Lock()
defer al.mu.Unlock()
Expand Down
28 changes: 14 additions & 14 deletions frac/active_sealing_source.go → frac/active_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type (
IndexedDocBlock = util.Pair[[]byte, []seq.DocPos]
)

type ActiveSealingSource struct {
type ActiveSource struct {
params common.SealParams // Sealing parameters

info *common.Info // fraction Info
Expand All @@ -55,13 +55,13 @@ type ActiveSealingSource struct {
docsReader *storage.DocsReader // Document storage reader
}

func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSealingSource, error) {
func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSource, error) {
info := *active.info // copy

sortedLIDs := active.GetAllDocuments()
fields, fieldTIDs := sortFields(active.TokenList)

src := ActiveSealingSource{
src := ActiveSource{
params: params,

info: &info,
Expand Down Expand Up @@ -116,7 +116,7 @@ func sortFields(tl *TokenList) ([]string, [][]uint32) {
return fields, fieldTIDs
}

func (src *ActiveSealingSource) ID() iter.Seq2[DocLocation, error] {
func (src *ActiveSource) ID() iter.Seq2[DocLocation, error] {
return func(yield func(DocLocation, error) bool) {
mids := src.mids.vals
rids := src.rids.vals
Expand Down Expand Up @@ -155,11 +155,11 @@ func (src *ActiveSealingSource) ID() iter.Seq2[DocLocation, error] {
}
}

func (src *ActiveSealingSource) BlockOffsets() []uint64 {
func (src *ActiveSource) BlockOffsets() []uint64 {
return src.blocksOffsets
}

func (src *ActiveSealingSource) prepareInfo() {
func (src *ActiveSource) prepareInfo() {
src.info.MetaOnDisk = 0
src.info.SealingTime = uint64(src.created.UnixMilli())
mids := src.mids.vals
Expand All @@ -170,17 +170,17 @@ func (src *ActiveSealingSource) prepareInfo() {
src.info.BuildDistribution(mids)
}

func (src *ActiveSealingSource) prepareLids() {
func (src *ActiveSource) prepareLids() {
for _, tl := range src.lids[1:] {
tl.GetLIDs(src.mids, src.rids)
}
}

func (src *ActiveSealingSource) Info() *common.Info {
func (src *ActiveSource) Info() *common.Info {
return src.info
}

func (src *ActiveSealingSource) TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] {
func (src *ActiveSource) TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] {
return func(yield func(string, iter.Seq2[TokenPosting, error]) bool) {
for idx, field := range src.fields {
if !yield(field, src.postingsForField(field, idx)) {
Expand All @@ -190,7 +190,7 @@ func (src *ActiveSealingSource) TokenTriplet() iter.Seq2[string, iter.Seq2[Token
}
}

func (src *ActiveSealingSource) postingsForField(field string, idx int) iter.Seq2[TokenPosting, error] {
func (src *ActiveSource) postingsForField(field string, idx int) iter.Seq2[TokenPosting, error] {
var lidsbuf []uint32
return func(yield func(TokenPosting, error) bool) {
for _, tid := range src.fieldTIDs[idx] {
Expand Down Expand Up @@ -221,7 +221,7 @@ func makeInverser(sortedLIDs []uint32) []uint32 {

// Docs returns an iterator for documents with their IDs.
// Handles duplicate IDs (for nested indexes).
func (src *ActiveSealingSource) Docs() iter.Seq2[Document, error] {
func (src *ActiveSource) Docs() iter.Seq2[Document, error] {
return func(yield func(Document, error) bool) {
var (
curdoc []byte
Expand Down Expand Up @@ -256,7 +256,7 @@ func (src *ActiveSealingSource) Docs() iter.Seq2[Document, error] {
}

// doc reads a document from storage by its position.
func (src *ActiveSealingSource) doc(pos seq.DocPos) ([]byte, error) {
func (src *ActiveSource) doc(pos seq.DocPos) ([]byte, error) {
blockIndex, docOffset := pos.Unpack()
blockOffset := src.blocksOffsets[blockIndex]

Expand All @@ -277,7 +277,7 @@ func (src *ActiveSealingSource) doc(pos seq.DocPos) ([]byte, error) {

// SortDocs sorts documents and writes them in compressed form to disk.
// Creates a temporary file that is then renamed to the final one.
func (src *ActiveSealingSource) SortDocs() error {
func (src *ActiveSource) SortDocs() error {
start := time.Now()
logger.Info("sorting docs...")

Expand Down Expand Up @@ -346,7 +346,7 @@ func (src *ActiveSealingSource) SortDocs() error {

// writeDocs compresses and writes document blocks, calculating new offsets
// and collecting document positions.
func (src *ActiveSealingSource) writeDocs(blocks iter.Seq2[IndexedDocBlock, error], w io.Writer) ([]uint64, []seq.DocPos, error) {
func (src *ActiveSource) writeDocs(blocks iter.Seq2[IndexedDocBlock, error], w io.Writer) ([]uint64, []seq.DocPos, error) {
offset := 0
buf := make([]byte, 0)
blocksOffsets := make([]uint64, 0)
Expand Down
27 changes: 27 additions & 0 deletions frac/active_token_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"hash/crc32"
"sync"
"unsafe"

"github.com/ozontech/seq-db/seq"

Expand Down Expand Up @@ -91,6 +92,32 @@ func NewActiveTokenList(workers int) *TokenList {

return tl
}

func (tl *TokenList) Size() int {
size := 0

tl.tidMu.RLock()
for _, val := range tl.tidToVal {
size += len(val)
}

for _, lids := range tl.tidToLIDs {
if lids != nil {
size += lids.Size()
}
}
tl.tidMu.RUnlock()

tl.fieldsMu.RLock()
for field, tids := range tl.FieldTIDs {
size += len(field) +
len(tids)*int(unsafe.Sizeof(uint32(0)))
}
tl.fieldsMu.RUnlock()

return size
}

func (tl *TokenList) Stop() {
for _, c := range tl.chList {
close(c)
Expand Down
2 changes: 1 addition & 1 deletion fracmanager/fraction_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active
r.mu.Lock()
defer r.mu.Unlock()

if r.active.instance.Info().DocsOnDisk <= maxSize {
if uint64(r.active.instance.MemSize()) <= maxSize {
return nil, nil, nil
}

Expand Down