Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions frac/active_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,12 @@ func (di *activeFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) {
}

minLID, maxLID := uint32(0), uint32(math.MaxUint32)
skipLIDsIterator, has, err := di.skipMaskProvider.GetIDsIteratorByFrac(di.fracName, minLID, maxLID, false)
skipLIDsBitmap, err := di.skipMaskProvider.GetIDsBitmapByFrac(di.fracName, minLID, maxLID)
if err != nil {
return nil, err
}

if !has {
if skipLIDsBitmap == nil {
return docsPos, nil
}

Expand All @@ -294,17 +294,8 @@ func (di *activeFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) {
}
}

skipLIDs := make(map[uint32]struct{})
for {
lid := skipLIDsIterator.Next()
if lid.IsNull() {
break
}
skipLIDs[lid.Unpack()] = struct{}{}
}

for i, lid := range allLids {
if _, ok := skipLIDs[lid]; ok {
if skipLIDsBitmap.Contains(uint32(lid)) {
docsPos[i] = seq.DocPosNotFound
}
}
Expand Down
4 changes: 4 additions & 0 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/RoaringBitmap/roaring"
"github.com/alecthomas/units"
"github.com/johannesboyne/gofakes3"
"github.com/johannesboyne/gofakes3/backend/s3mem"
Expand All @@ -37,6 +38,9 @@ type testSkipMaskProvider struct{}
func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) {
return node.NewStatic([]uint32{}, false), false, nil
}
func (testSkipMaskProvider) GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) {
return nil, nil
}
func (testSkipMaskProvider) RemoveFrac(_ string) {}

type FractionTestSuite struct {
Expand Down
17 changes: 5 additions & 12 deletions frac/sealed_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"

"github.com/RoaringBitmap/roaring"
"go.uber.org/zap"

"github.com/ozontech/seq-db/frac/common"
Expand All @@ -24,6 +25,7 @@ import (

type skipMaskProvider interface {
GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error)
GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error)
RemoveFrac(fracName string)
}

Expand Down Expand Up @@ -294,26 +296,17 @@ func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) {
minLID, maxLID = uint32(minVal), uint32(maxVal)
}

skipLIDsIterator, has, err := fi.skipMaskProvider.GetIDsIteratorByFrac(fi.fracName, minLID, maxLID, false)
skipLIDsBitmap, err := fi.skipMaskProvider.GetIDsBitmapByFrac(fi.fracName, minLID, maxLID)
if err != nil {
return nil, err
}

if !has {
if skipLIDsBitmap == nil {
return fi.getDocPosByLIDs(allLids), nil
}

skipLIDs := make(map[uint32]struct{})
for {
lid := skipLIDsIterator.Next()
if lid.IsNull() {
break
}
skipLIDs[lid.Unpack()] = struct{}{}
}

for i, lid := range allLids {
if _, ok := skipLIDs[uint32(lid)]; ok {
if skipLIDsBitmap.Contains(uint32(lid)) {
allLids[i] = 0
}
}
Expand Down
4 changes: 4 additions & 0 deletions fracmanager/fracmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fracmanager
import (
"testing"

"github.com/RoaringBitmap/roaring"
"github.com/alecthomas/units"
"github.com/stretchr/testify/assert"

Expand All @@ -17,6 +18,9 @@ type testSkipMaskProvider struct{}
func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) {
return node.NewStatic([]uint32{}, reverse), false, nil
}
func (testSkipMaskProvider) GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) {
return nil, nil
}
func (testSkipMaskProvider) RefreshFrac(_ frac.Fraction) {}
func (testSkipMaskProvider) RemoveFrac(_ string) {}

Expand Down
2 changes: 2 additions & 0 deletions fracmanager/fraction_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"time"

"github.com/RoaringBitmap/roaring"
"github.com/oklog/ulid/v2"

"github.com/ozontech/seq-db/frac"
Expand All @@ -22,6 +23,7 @@ const fileBasePattern = "seq-db-"

type skipMaskProvider interface {
GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error)
GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error)
RefreshFrac(frac frac.Fraction)
RemoveFrac(fracName string)
}
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.25
require (
contrib.go.opencensus.io/exporter/jaeger v0.2.1
github.com/KimMachineGun/automemlimit v0.7.5
github.com/RoaringBitmap/roaring v1.9.4
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b
github.com/aws/aws-sdk-go-v2 v1.41.1
github.com/aws/aws-sdk-go-v2/config v1.31.13
Expand Down Expand Up @@ -61,11 +62,13 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.41.2 // indirect
github.com/aws/smithy-go v1.24.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/pprof v0.0.0-20250422154841-e1f9c1950416 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/KimMachineGun/automemlimit v0.7.5 h1:RkbaC0MwhjL1ZuBKunGDjE/ggwAX43DwZrJqVwyveTk=
github.com/KimMachineGun/automemlimit v0.7.5/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM=
github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ=
github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b h1:mimo19zliBX/vSQ6PWWSL9lK8qwHozUj03+zLoEB8O0=
Expand Down Expand Up @@ -71,6 +73,8 @@ github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk=
github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee h1:BnPxIde0gjtTnc9Er7cxvBk8DHLWhEux0SxayC8dP6I=
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/cactus/go-statsd-client v3.1.1+incompatible/go.mod h1:cMRcwZDklk7hXp+Law83urTHUiHMzCev/r4JMYr/zU0=
Expand Down Expand Up @@ -196,6 +200,8 @@ github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
Expand Down Expand Up @@ -241,6 +247,7 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
Expand Down
46 changes: 24 additions & 22 deletions skipmaskmanager/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func unmarshalSkipMask(dst *SkipMaskBinOut, src []byte) (_ []byte, err error) {
return nil, fmt.Errorf("invalid skip mask binary version: %d", version)
}

dst.LIDs, src, err = unmarshalLIDsBlocks(dst.LIDs, src)
src, err = unmarshalLIDsBlocks(src, func(lid uint32) {
dst.LIDs = append(dst.LIDs, lid)
})
if err != nil {
return src, err
}
Expand All @@ -218,7 +220,7 @@ func unmarshalSkipMask(dst *SkipMaskBinOut, src []byte) (_ []byte, err error) {
// unmarshalLIDsBlocks reads all LIDs blocks from the source data.
// First reads the number of blocks, then parses each block header,
// and finally decodes each block's data.
func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) {
func unmarshalLIDsBlocks(src []byte, add func(uint32)) ([]byte, error) {
numberOfBlocks := binary.LittleEndian.Uint32(src)
src = src[sizeOfUint32:]

Expand All @@ -229,34 +231,34 @@ func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) {
header := lidsBlockHeader{}
src, err = header.unmarshal(src)
if err != nil {
return dst, src, fmt.Errorf("can't unmarshal lids header: %s", err)
return src, fmt.Errorf("can't unmarshal lids header: %s", err)
}
headers = append(headers, header)
}

for i := range numberOfBlocks {
dst, src, err = unmarshalLIDsBlock(dst, src, headers[i])
src, err = unmarshalLIDsBlock(src, headers[i], add)
if err != nil {
return dst, src, err
return src, err
}
}

if len(src) > 0 {
return dst, src, fmt.Errorf("unexpected tail when unmarshaling LIDs blocks")
return src, fmt.Errorf("unexpected tail when unmarshaling LIDs blocks")
}

return dst, src, nil
return src, nil
}

// unmarshalLIDsBlock decodes a single LIDs block based on its header.
// Handles both compressed (zstd) and uncompressed codec types.
func unmarshalLIDsBlock(dst []uint32, src []byte, header lidsBlockHeader) ([]uint32, []byte, error) {
func unmarshalLIDsBlock(src []byte, header lidsBlockHeader, add func(uint32)) ([]byte, error) {
if len(src) == 0 {
return dst, src, fmt.Errorf("empty LIDs block")
return src, fmt.Errorf("empty LIDs block")
}

if header.Size == 0 || int(header.Size) > len(src) {
return nil, src, fmt.Errorf("invalid LIDs block length %d; want %d", len(src), header.Size)
return src, fmt.Errorf("invalid LIDs block length %d; want %d", len(src), header.Size)
}

block := src[:header.Size]
Expand All @@ -270,39 +272,39 @@ func unmarshalLIDsBlock(dst []uint32, src []byte, header lidsBlockHeader) ([]uin
defer lidsBlockBufPool.Put(b)
b.B, err = zstd.Decompress(block, b.B)
if err != nil {
return dst, src, fmt.Errorf("can't decompress ids block: %s", err)
return src, fmt.Errorf("can't decompress ids block: %s", err)
}
dst, err = unmarshalLIDsDelta(dst, b.B, header)
err = unmarshalLIDsDelta(b.B, header, add)
if err != nil {
return dst, src, err
return src, err
}
return dst, src, nil
return src, nil
case lidsCodecDelta:
dst, err = unmarshalLIDsDelta(dst, block, header)
err = unmarshalLIDsDelta(block, header, add)
if err != nil {
return dst, src, err
return src, err
}
return dst, src, nil
return src, nil
default:
return dst, src, fmt.Errorf("unknown ids codec: %d", header.Codec)
return src, fmt.Errorf("unknown ids codec: %d", header.Codec)
}
}

func unmarshalLIDsDelta(dst []uint32, block []byte, header lidsBlockHeader) ([]uint32, error) {
func unmarshalLIDsDelta(block []byte, header lidsBlockHeader, add func(uint32)) error {
prevLID := uint32(0)
for range header.Length {
v, n := binary.Varint(block)
block = block[n:]
lid := prevLID + uint32(v)
prevLID = lid
dst = append(dst, lid)
add(lid)
}

if len(block) > 0 {
return dst, fmt.Errorf("unexpected tail when unmarshaling LIDs block")
return fmt.Errorf("unexpected tail when unmarshaling LIDs block")
}

return dst, nil
return nil
}

// getCompressLevel returns the appropriate zstd compression level based on data size.
Expand Down
9 changes: 6 additions & 3 deletions skipmaskmanager/iterator_asc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type IteratorAsc Iterator

func (it *IteratorAsc) String() string {
return "HIDE_FLAG_ITERATOR_ASC"
return "SKIP_MASK_ITERATOR_ASC"
}

func (it *IteratorAsc) Next() node.LID {
Expand Down Expand Up @@ -57,12 +57,15 @@ func (it *IteratorAsc) loadNextLIDsBlock() {
return
}

block, err := it.loader.loadBlock(it.blockIndex)
lids := make([]uint32, 0, it.loader.headers[it.blockIndex].Length)
err := it.loader.loadBlock(it.blockIndex, func(lid uint32) {
lids = append(lids, lid)
})
if err != nil {
logger.Panic("error loading LIDs block", zap.Error(err))
}

it.lids = block
it.lids = lids
it.needTryNextBlock()
}

Expand Down
3 changes: 1 addition & 2 deletions skipmaskmanager/iterator_asc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ func TestIteratorAsc(t *testing.T) {
err := os.WriteFile(filePath, rawSkipMask, 0o644)
require.NoError(t, err)

loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil))
require.NoError(t, err)
loader := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil))

iterator := (*IteratorAsc)(NewIterator(loader, tc.minLID, tc.maxLID))
resLIDs := make([]uint32, 0, len(tc.expected))
Expand Down
9 changes: 6 additions & 3 deletions skipmaskmanager/iterator_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type IteratorDesc Iterator

func (it *IteratorDesc) String() string {
return "HIDE_FLAG_ITERATOR_DESC"
return "SKIP_MASK_ITERATOR_DESC"
}

func (it *IteratorDesc) Next() node.LID {
Expand Down Expand Up @@ -55,12 +55,15 @@ func (it *IteratorDesc) loadNextLIDsBlock() {
return
}

block, err := it.loader.loadBlock(it.blockIndex)
lids := make([]uint32, 0, it.loader.headers[it.blockIndex].Length)
err := it.loader.loadBlock(it.blockIndex, func(lid uint32) {
lids = append(lids, lid)
})
if err != nil {
logger.Panic("error loading LIDs block", zap.Error(err))
}

it.lids = block
it.lids = lids
it.needTryNextBlock()
}

Expand Down
3 changes: 1 addition & 2 deletions skipmaskmanager/iterator_desc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ func TestIteratorDesc(t *testing.T) {
err := os.WriteFile(filePath, rawSkipMask, 0o644)
require.NoError(t, err)

loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil))
require.NoError(t, err)
loader := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil))

iterator := (*IteratorDesc)(NewIterator(loader, tc.minLID, tc.maxLID))
resLIDs := make([]uint32, 0, len(tc.expected))
Expand Down
Loading
Loading