From ab18105bf3d070ac69ae77bda0b47d8b5a78c95a Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Tue, 5 May 2026 16:00:00 +0500 Subject: [PATCH] perf(skip masks): skip masks as roaring bitmaps in fetch --- frac/active_index.go | 15 ++---- frac/fraction_test.go | 4 ++ frac/sealed_index.go | 17 ++---- fracmanager/fracmanager_test.go | 4 ++ fracmanager/fraction_provider.go | 2 + go.mod | 3 ++ go.sum | 7 +++ skipmaskmanager/encoding.go | 46 ++++++++-------- skipmaskmanager/iterator_asc.go | 9 ++-- skipmaskmanager/iterator_asc_test.go | 3 +- skipmaskmanager/iterator_desc.go | 9 ++-- skipmaskmanager/iterator_desc_test.go | 3 +- skipmaskmanager/loader.go | 72 ++++++++++++++++++------- skipmaskmanager/loader_test.go | 19 +++++-- skipmaskmanager/merged_iterator.go | 2 +- skipmaskmanager/merged_iterator_test.go | 6 +-- skipmaskmanager/skip_mask_manager.go | 45 ++++++++++++++-- 17 files changed, 178 insertions(+), 88 deletions(-) diff --git a/frac/active_index.go b/frac/active_index.go index 27e4e464..c21077c0 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -278,12 +278,12 @@ func (di *activeFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) { } minLID, maxLID := uint32(0), uint32(math.MaxUint32) - skipLIDsIterator, has, err := di.skipMaskProvider.GetIDsIteratorByFrac(di.fracName, minLID, maxLID, false) + skipLIDsBitmap, err := di.skipMaskProvider.GetIDsBitmapByFrac(di.fracName, minLID, maxLID) if err != nil { return nil, err } - if !has { + if skipLIDsBitmap == nil { return docsPos, nil } @@ -294,17 +294,8 @@ func (di *activeFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) { } } - skipLIDs := make(map[uint32]struct{}) - for { - lid := skipLIDsIterator.Next() - if lid.IsNull() { - break - } - skipLIDs[lid.Unpack()] = struct{}{} - } - for i, lid := range allLids { - if _, ok := skipLIDs[lid]; ok { + if skipLIDsBitmap.Contains(uint32(lid)) { docsPos[i] = seq.DocPosNotFound } } diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 8757c0db..449822c0 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/RoaringBitmap/roaring" "github.com/alecthomas/units" "github.com/johannesboyne/gofakes3" "github.com/johannesboyne/gofakes3/backend/s3mem" @@ -37,6 +38,9 @@ type testSkipMaskProvider struct{} func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { return node.NewStatic([]uint32{}, false), false, nil } +func (testSkipMaskProvider) GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) { + return nil, nil +} func (testSkipMaskProvider) RemoveFrac(_ string) {} type FractionTestSuite struct { diff --git a/frac/sealed_index.go b/frac/sealed_index.go index 7c62713c..d0bbd0a4 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -5,6 +5,7 @@ import ( "fmt" "math" + "github.com/RoaringBitmap/roaring" "go.uber.org/zap" "github.com/ozontech/seq-db/frac/common" @@ -24,6 +25,7 @@ import ( type skipMaskProvider interface { GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) + GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) RemoveFrac(fracName string) } @@ -294,26 +296,17 @@ func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) { minLID, maxLID = uint32(minVal), uint32(maxVal) } - skipLIDsIterator, has, err := fi.skipMaskProvider.GetIDsIteratorByFrac(fi.fracName, minLID, maxLID, false) + skipLIDsBitmap, err := fi.skipMaskProvider.GetIDsBitmapByFrac(fi.fracName, minLID, maxLID) if err != nil { return nil, err } - if !has { + if skipLIDsBitmap == nil { return fi.getDocPosByLIDs(allLids), nil } - skipLIDs := make(map[uint32]struct{}) - for { - lid := skipLIDsIterator.Next() - if lid.IsNull() { - break - } - skipLIDs[lid.Unpack()] = struct{}{} - } - for i, lid := range allLids { - if _, ok := skipLIDs[uint32(lid)]; ok { + if skipLIDsBitmap.Contains(uint32(lid)) { allLids[i] = 0 } } diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index 663dedec..687329a2 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -3,6 +3,7 @@ package fracmanager import ( "testing" + "github.com/RoaringBitmap/roaring" "github.com/alecthomas/units" "github.com/stretchr/testify/assert" @@ -17,6 +18,9 @@ type testSkipMaskProvider struct{} func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { return node.NewStatic([]uint32{}, reverse), false, nil } +func (testSkipMaskProvider) GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) { + return nil, nil +} func (testSkipMaskProvider) RefreshFrac(_ frac.Fraction) {} func (testSkipMaskProvider) RemoveFrac(_ string) {} diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index 66e6477b..5bcd92d9 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -7,6 +7,7 @@ import ( "path/filepath" "time" + "github.com/RoaringBitmap/roaring" "github.com/oklog/ulid/v2" "github.com/ozontech/seq-db/frac" @@ -22,6 +23,7 @@ const fileBasePattern = "seq-db-" type skipMaskProvider interface { GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) + GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) RefreshFrac(frac frac.Fraction) RemoveFrac(fracName string) } diff --git a/go.mod b/go.mod index 7efc9e75..7ea44701 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25 require ( contrib.go.opencensus.io/exporter/jaeger v0.2.1 github.com/KimMachineGun/automemlimit v0.7.5 + github.com/RoaringBitmap/roaring v1.9.4 github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b github.com/aws/aws-sdk-go-v2 v1.41.1 github.com/aws/aws-sdk-go-v2/config v1.31.13 @@ -61,11 +62,13 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.41.2 // indirect github.com/aws/smithy-go v1.24.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bits-and-blooms/bitset v1.12.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/pprof v0.0.0-20250422154841-e1f9c1950416 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect diff --git a/go.sum b/go.sum index 07cca964..2820a54c 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/KimMachineGun/automemlimit v0.7.5 h1:RkbaC0MwhjL1ZuBKunGDjE/ggwAX43DwZrJqVwyveTk= github.com/KimMachineGun/automemlimit v0.7.5/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM= +github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= +github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b h1:mimo19zliBX/vSQ6PWWSL9lK8qwHozUj03+zLoEB8O0= @@ -71,6 +73,8 @@ github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk= github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA= +github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee h1:BnPxIde0gjtTnc9Er7cxvBk8DHLWhEux0SxayC8dP6I= github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M= github.com/cactus/go-statsd-client v3.1.1+incompatible/go.mod h1:cMRcwZDklk7hXp+Law83urTHUiHMzCev/r4JMYr/zU0= @@ -196,6 +200,8 @@ github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s= @@ -241,6 +247,7 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= diff --git a/skipmaskmanager/encoding.go b/skipmaskmanager/encoding.go index c0bd9c81..c096f8ab 100644 --- a/skipmaskmanager/encoding.go +++ b/skipmaskmanager/encoding.go @@ -207,7 +207,9 @@ func unmarshalSkipMask(dst *SkipMaskBinOut, src []byte) (_ []byte, err error) { return nil, fmt.Errorf("invalid skip mask binary version: %d", version) } - dst.LIDs, src, err = unmarshalLIDsBlocks(dst.LIDs, src) + src, err = unmarshalLIDsBlocks(src, func(lid uint32) { + dst.LIDs = append(dst.LIDs, lid) + }) if err != nil { return src, err } @@ -218,7 +220,7 @@ func unmarshalSkipMask(dst *SkipMaskBinOut, src []byte) (_ []byte, err error) { // unmarshalLIDsBlocks reads all LIDs blocks from the source data. // First reads the number of blocks, then parses each block header, // and finally decodes each block's data. -func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) { +func unmarshalLIDsBlocks(src []byte, add func(uint32)) ([]byte, error) { numberOfBlocks := binary.LittleEndian.Uint32(src) src = src[sizeOfUint32:] @@ -229,34 +231,34 @@ func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) { header := lidsBlockHeader{} src, err = header.unmarshal(src) if err != nil { - return dst, src, fmt.Errorf("can't unmarshal lids header: %s", err) + return src, fmt.Errorf("can't unmarshal lids header: %s", err) } headers = append(headers, header) } for i := range numberOfBlocks { - dst, src, err = unmarshalLIDsBlock(dst, src, headers[i]) + src, err = unmarshalLIDsBlock(src, headers[i], add) if err != nil { - return dst, src, err + return src, err } } if len(src) > 0 { - return dst, src, fmt.Errorf("unexpected tail when unmarshaling LIDs blocks") + return src, fmt.Errorf("unexpected tail when unmarshaling LIDs blocks") } - return dst, src, nil + return src, nil } // unmarshalLIDsBlock decodes a single LIDs block based on its header. // Handles both compressed (zstd) and uncompressed codec types. -func unmarshalLIDsBlock(dst []uint32, src []byte, header lidsBlockHeader) ([]uint32, []byte, error) { +func unmarshalLIDsBlock(src []byte, header lidsBlockHeader, add func(uint32)) ([]byte, error) { if len(src) == 0 { - return dst, src, fmt.Errorf("empty LIDs block") + return src, fmt.Errorf("empty LIDs block") } if header.Size == 0 || int(header.Size) > len(src) { - return nil, src, fmt.Errorf("invalid LIDs block length %d; want %d", len(src), header.Size) + return src, fmt.Errorf("invalid LIDs block length %d; want %d", len(src), header.Size) } block := src[:header.Size] @@ -270,39 +272,39 @@ func unmarshalLIDsBlock(dst []uint32, src []byte, header lidsBlockHeader) ([]uin defer lidsBlockBufPool.Put(b) b.B, err = zstd.Decompress(block, b.B) if err != nil { - return dst, src, fmt.Errorf("can't decompress ids block: %s", err) + return src, fmt.Errorf("can't decompress ids block: %s", err) } - dst, err = unmarshalLIDsDelta(dst, b.B, header) + err = unmarshalLIDsDelta(b.B, header, add) if err != nil { - return dst, src, err + return src, err } - return dst, src, nil + return src, nil case lidsCodecDelta: - dst, err = unmarshalLIDsDelta(dst, block, header) + err = unmarshalLIDsDelta(block, header, add) if err != nil { - return dst, src, err + return src, err } - return dst, src, nil + return src, nil default: - return dst, src, fmt.Errorf("unknown ids codec: %d", header.Codec) + return src, fmt.Errorf("unknown ids codec: %d", header.Codec) } } -func unmarshalLIDsDelta(dst []uint32, block []byte, header lidsBlockHeader) ([]uint32, error) { +func unmarshalLIDsDelta(block []byte, header lidsBlockHeader, add func(uint32)) error { prevLID := uint32(0) for range header.Length { v, n := binary.Varint(block) block = block[n:] lid := prevLID + uint32(v) prevLID = lid - dst = append(dst, lid) + add(lid) } if len(block) > 0 { - return dst, fmt.Errorf("unexpected tail when unmarshaling LIDs block") + return fmt.Errorf("unexpected tail when unmarshaling LIDs block") } - return dst, nil + return nil } // getCompressLevel returns the appropriate zstd compression level based on data size. diff --git a/skipmaskmanager/iterator_asc.go b/skipmaskmanager/iterator_asc.go index a50b6dcb..94dbfeba 100644 --- a/skipmaskmanager/iterator_asc.go +++ b/skipmaskmanager/iterator_asc.go @@ -10,7 +10,7 @@ import ( type IteratorAsc Iterator func (it *IteratorAsc) String() string { - return "HIDE_FLAG_ITERATOR_ASC" + return "SKIP_MASK_ITERATOR_ASC" } func (it *IteratorAsc) Next() node.LID { @@ -57,12 +57,15 @@ func (it *IteratorAsc) loadNextLIDsBlock() { return } - block, err := it.loader.loadBlock(it.blockIndex) + lids := make([]uint32, 0, it.loader.headers[it.blockIndex].Length) + err := it.loader.loadBlock(it.blockIndex, func(lid uint32) { + lids = append(lids, lid) + }) if err != nil { logger.Panic("error loading LIDs block", zap.Error(err)) } - it.lids = block + it.lids = lids it.needTryNextBlock() } diff --git a/skipmaskmanager/iterator_asc_test.go b/skipmaskmanager/iterator_asc_test.go index 72ab47ff..3f77bb53 100644 --- a/skipmaskmanager/iterator_asc_test.go +++ b/skipmaskmanager/iterator_asc_test.go @@ -58,8 +58,7 @@ func TestIteratorAsc(t *testing.T) { err := os.WriteFile(filePath, rawSkipMask, 0o644) require.NoError(t, err) - loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) - require.NoError(t, err) + loader := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) iterator := (*IteratorAsc)(NewIterator(loader, tc.minLID, tc.maxLID)) resLIDs := make([]uint32, 0, len(tc.expected)) diff --git a/skipmaskmanager/iterator_desc.go b/skipmaskmanager/iterator_desc.go index c207d671..fc04ae03 100644 --- a/skipmaskmanager/iterator_desc.go +++ b/skipmaskmanager/iterator_desc.go @@ -10,7 +10,7 @@ import ( type IteratorDesc Iterator func (it *IteratorDesc) String() string { - return "HIDE_FLAG_ITERATOR_DESC" + return "SKIP_MASK_ITERATOR_DESC" } func (it *IteratorDesc) Next() node.LID { @@ -55,12 +55,15 @@ func (it *IteratorDesc) loadNextLIDsBlock() { return } - block, err := it.loader.loadBlock(it.blockIndex) + lids := make([]uint32, 0, it.loader.headers[it.blockIndex].Length) + err := it.loader.loadBlock(it.blockIndex, func(lid uint32) { + lids = append(lids, lid) + }) if err != nil { logger.Panic("error loading LIDs block", zap.Error(err)) } - it.lids = block + it.lids = lids it.needTryNextBlock() } diff --git a/skipmaskmanager/iterator_desc_test.go b/skipmaskmanager/iterator_desc_test.go index 749ced7d..063e1b0d 100644 --- a/skipmaskmanager/iterator_desc_test.go +++ b/skipmaskmanager/iterator_desc_test.go @@ -53,8 +53,7 @@ func TestIteratorDesc(t *testing.T) { err := os.WriteFile(filePath, rawSkipMask, 0o644) require.NoError(t, err) - loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) - require.NoError(t, err) + loader := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) iterator := (*IteratorDesc)(NewIterator(loader, tc.minLID, tc.maxLID)) resLIDs := make([]uint32, 0, len(tc.expected)) diff --git a/skipmaskmanager/loader.go b/skipmaskmanager/loader.go index 0a0717b3..90e02ada 100644 --- a/skipmaskmanager/loader.go +++ b/skipmaskmanager/loader.go @@ -5,7 +5,9 @@ import ( "fmt" "io" "os" + "sync" + "github.com/RoaringBitmap/roaring" "go.uber.org/zap" "github.com/ozontech/seq-db/cache" @@ -20,12 +22,12 @@ type loader struct { cashKey uint32 } -func newLoader(filePath string, headersCache *cache.Cache[[]lidsBlockHeader]) (*loader, error) { +func newLoader(filePath string, headersCache *cache.Cache[[]lidsBlockHeader]) *loader { return &loader{ filePath: filePath, headersCache: headersCache, cashKey: hashFilePath(filePath), - }, nil + } } func (l *loader) getFile() (*os.File, error) { @@ -99,22 +101,18 @@ func (l *loader) loadHeaders() ([]lidsBlockHeader, error) { return headers, nil } -func (l *loader) loadBlock(index int) ([]uint32, error) { - if l.headers == nil { - headers, err := l.getHeaders() - if err != nil { - return nil, err - } - l.headers = headers +func (l *loader) loadBlock(index int, add func(uint32)) error { + if err := l.ensureHeaders(); err != nil { + return err } if len(l.headers) < index+1 { - return nil, fmt.Errorf("can't load block: headers len=%d, index=%d", len(l.headers), index) + return fmt.Errorf("can't load block: headers len=%d, index=%d", len(l.headers), index) } file, err := l.getFile() if err != nil { - return nil, err + return err } header := l.headers[index] @@ -122,23 +120,61 @@ func (l *loader) loadBlock(index int) ([]uint32, error) { blockBuf := make([]byte, header.Size) n, err := file.ReadAt(blockBuf, int64(header.Offset)) if err != nil { - return nil, err + return err } if n != len(blockBuf) { - return nil, fmt.Errorf("can't read lids block, read=%d, requested=%d", n, len(blockBuf)) + return fmt.Errorf("can't read lids block, read=%d, requested=%d", n, len(blockBuf)) } - lids := make([]uint32, 0, header.Length) - lids, blockBuf, err = unmarshalLIDsBlock(lids, blockBuf, header) + blockBuf, err = unmarshalLIDsBlock(blockBuf, header, add) if err != nil { - return nil, err + return err } if len(blockBuf) > 0 { - return nil, fmt.Errorf("unexpected tail when unmarshaling LIDs block") + return fmt.Errorf("unexpected tail when unmarshaling LIDs block") + } + + return nil +} + +func (l *loader) loadToBitmap(bitmap *roaring.Bitmap, mu *sync.Mutex, minLID, maxLID uint32) error { + if err := l.ensureHeaders(); err != nil { + return err + } + + for i, header := range l.headers { + if header.MaxLID < minLID || header.MinLID > maxLID { + continue + } + + err := l.loadBlock(i, func(lid uint32) { + mu.Lock() + bitmap.Add(lid) + mu.Unlock() + }) + if err != nil { + return err + } + } + + if err := l.release(); err != nil { + return err + } + + return nil +} + +func (l *loader) ensureHeaders() error { + if l.headers == nil { + headers, err := l.getHeaders() + if err != nil { + return err + } + l.headers = headers } - return lids, nil + return nil } func (l *loader) release() error { diff --git a/skipmaskmanager/loader_test.go b/skipmaskmanager/loader_test.go index eb49a472..3c383d0e 100644 --- a/skipmaskmanager/loader_test.go +++ b/skipmaskmanager/loader_test.go @@ -1,10 +1,13 @@ package skipmaskmanager import ( + "math" "os" "path/filepath" + "sync" "testing" + "github.com/RoaringBitmap/roaring" "github.com/stretchr/testify/require" "github.com/ozontech/seq-db/cache" @@ -23,17 +26,25 @@ func TestLoader(t *testing.T) { err := os.WriteFile(filePath, rawSkipMask, 0o644) require.NoError(t, err) - loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) - require.NoError(t, err) + loader := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) + // test load to []uint32 resLIDs := make([]uint32, 0, len(multipleBlocksLIDs)) const numberOfBlocks = 4 for i := range numberOfBlocks { - block, err := loader.loadBlock(i) + err := loader.loadBlock(i, func(lid uint32) { + resLIDs = append(resLIDs, lid) + }) require.NoError(t, err) - resLIDs = append(resLIDs, block...) } require.Equal(t, lidsToUint32s(multipleBlocksLIDs), resLIDs) + // test load to bitmap + bitmap := roaring.New() + mu := &sync.Mutex{} + err = loader.loadToBitmap(bitmap, mu, 0, math.MaxUint32) + require.NoError(t, err) + require.Equal(t, lidsToUint32s(multipleBlocksLIDs), bitmap.ToArray()) + require.NoError(t, loader.release()) } diff --git a/skipmaskmanager/merged_iterator.go b/skipmaskmanager/merged_iterator.go index e949c12f..7f444fd0 100644 --- a/skipmaskmanager/merged_iterator.go +++ b/skipmaskmanager/merged_iterator.go @@ -22,7 +22,7 @@ func NewNMergedIterators(iterators []node.Node) node.Node { type EmptyIterator struct{} func (it *EmptyIterator) String() string { - return "EMPTY_HIDE_FLAG_ITERATOR" + return "EMPTY_SKIP_MASK_ITERATOR" } func (it *EmptyIterator) Next() node.LID { diff --git a/skipmaskmanager/merged_iterator_test.go b/skipmaskmanager/merged_iterator_test.go index fe0af2af..ff4eae4c 100644 --- a/skipmaskmanager/merged_iterator_test.go +++ b/skipmaskmanager/merged_iterator_test.go @@ -1,7 +1,6 @@ package skipmaskmanager import ( - "fmt" "testing" "github.com/stretchr/testify/require" @@ -46,7 +45,6 @@ func TestMergedIteratorReverse(t *testing.T) { resLIDs = append(resLIDs, lid.Unpack()) } - fmt.Println(resLIDs) require.Equal(t, []uint32{45, 33, 22, 15, 9, 8, 7, 5, 3, 2, 1}, resLIDs) } @@ -55,7 +53,7 @@ type testIteratorDesc struct { } func (it *testIteratorDesc) String() string { - return "TEST_HIDE_FLAG_ITERATOR_DESC" + return "TEST_SKIP_MASK_ITERATOR_DESC" } func (it *testIteratorDesc) Next() node.LID { @@ -77,7 +75,7 @@ type testIteratorAsc struct { } func (it *testIteratorAsc) String() string { - return "TEST_HIDE_FLAG_ITERATOR_ASC" + return "TEST_SKIP_MASK_ITERATOR_ASC" } func (it *testIteratorAsc) Next() node.LID { diff --git a/skipmaskmanager/skip_mask_manager.go b/skipmaskmanager/skip_mask_manager.go index 055cca33..346a522f 100644 --- a/skipmaskmanager/skip_mask_manager.go +++ b/skipmaskmanager/skip_mask_manager.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/RoaringBitmap/roaring" "go.uber.org/zap" "github.com/ozontech/seq-db/cache" @@ -208,11 +209,7 @@ func (smm *SkipMaskManager) GetIDsIteratorByFrac( iterators := make([]node.Node, 0, len(fracFiles)) for _, f := range fracFiles { - loader, err := newLoader(f, smm.headersCache) - if err != nil { - logger.Error("can't open skip mask file", zap.String("path", f), zap.Error(err)) - return nil, has, err - } + loader := newLoader(f, smm.headersCache) if reverse { iterators = append(iterators, (*IteratorAsc)(NewIterator(loader, minLID, maxLID))) } else { @@ -223,6 +220,44 @@ func (smm *SkipMaskManager) GetIDsIteratorByFrac( return NewNMergedIterators(iterators), has, nil } +// GetSkipMaskAsRoaringBitmap returns skip masks as roaring bitmap. +// Currently used in fetch resuests +func (smm *SkipMaskManager) GetIDsBitmapByFrac( + fracName string, + minLID, maxLID uint32, +) (*roaring.Bitmap, error) { + smm.fracsMu.RLock() + defer smm.fracsMu.RUnlock() + + fracFiles, has := smm.fracs[fracName] + if !has { + return nil, nil + } + + bitmap := roaring.New() + mu := &sync.Mutex{} + wg := &sync.WaitGroup{} + var loaderErr error + + for _, f := range fracFiles { + wg.Go(func() { + loader := newLoader(f, smm.headersCache) + if err := loader.loadToBitmap(bitmap, mu, minLID, maxLID); err != nil { + logger.Error("can't load skip mask to bitmap", zap.String("path", f), zap.Error(err)) + loaderErr = err + } + }) + } + + wg.Wait() + + if loaderErr != nil { + return nil, loaderErr + } + + return bitmap, nil +} + // RefreshFrac recomputes skip mask files for a fraction after it has been sealed. // This is called when an active fraction becomes sealed. // The method: