Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"fmt"
"hash/fnv"
"io"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -117,13 +118,12 @@ func analyzeIndex(
defer tokenFile.Close()
defer lidFile.Close()

infoReader := storage.NewIndexReader(rl, infoFile.Name(), infoFile, indexCache.InfoRegistry)
tokenReader := storage.NewIndexReader(rl, tokenFile.Name(), tokenFile, indexCache.TokenRegistry)
lidReader := storage.NewIndexReader(rl, lidFile.Name(), lidFile, indexCache.LIDRegistry)

// --- Info ---
var blockIndex uint32
infoData, _, err := infoReader.ReadIndexBlock(0, nil)
infoData, err := io.ReadAll(infoFile)
if err != nil {
logger.Fatal("error reading info block", zap.String("file", infoFile.Name()), zap.Error(err))
}
Expand Down
11 changes: 10 additions & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@ type BinaryDataVersion uint16
const (
// BinaryDataV0 - initial version
BinaryDataV0 BinaryDataVersion = iota

// BinaryDataV1 - support RIDs encoded without varint
BinaryDataV1

// BinaryDataV2 - MIDs stored in nanoseconds
BinaryDataV2

// BinariDataV3 - `.index` file is split across several files
// storing specific sections: `.info`, `.offsets`, `.tokens`, `.ids`, `.lids`.
//
// Also in this version we've changed the binary layout of section storing
// info block. As a result we store info as a plain JSON without additional registry.
BinaryDataV3
)

const CurrentFracVersion = BinaryDataV2
const CurrentFracVersion = BinaryDataV3
16 changes: 8 additions & 8 deletions frac/active_sealing_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type ActiveSealingSource struct {
rids *UInt64s // RIDs

fields []string // Sorted field names
fieldTids [][]uint32 // Each field contains sorted TIDs based on token value
fieldTIDs [][]uint32 // Each field contains sorted TIDs based on token value

tokens [][]byte // Tokens (values) by TID
lids []*TokenLIDs // LID lists for each token
Expand All @@ -59,7 +59,7 @@ func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSe
info := *active.info // copy

sortedLIDs := active.GetAllDocuments()
fields, fieldTids := sortFields(active.TokenList)
fields, fieldTIDs := sortFields(active.TokenList)

src := ActiveSealingSource{
params: params,
Expand All @@ -74,7 +74,7 @@ func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSe
rids: active.RIDs,

fields: fields,
fieldTids: fieldTids,
fieldTIDs: fieldTIDs,
tokens: active.TokenList.tidToVal,
lids: active.TokenList.tidToLIDs,

Expand All @@ -100,7 +100,7 @@ func sortFields(tl *TokenList) ([]string, [][]uint32) {
fields := slices.Collect(maps.Keys(tl.FieldTIDs))
slices.Sort(fields)

fieldTids := make([][]uint32, len(tl.FieldTIDs))
fieldTIDs := make([][]uint32, len(tl.FieldTIDs))
for i, field := range fields {
// Make a copy because this memory is shared
// with concurrent readers (user search queries).
Expand All @@ -110,10 +110,10 @@ func sortFields(tl *TokenList) ([]string, [][]uint32) {
return bytes.Compare(tl.tidToVal[i], tl.tidToVal[j])
})

fieldTids[i] = cp
fieldTIDs[i] = cp
}

return fields, fieldTids
return fields, fieldTIDs
}

func (src *ActiveSealingSource) ID() iter.Seq2[DocLocation, error] {
Expand All @@ -122,7 +122,7 @@ func (src *ActiveSealingSource) ID() iter.Seq2[DocLocation, error] {
rids := src.rids.vals

// System ID and DocPos are not stored in `src.sortedLIDs`.
// However we do have to yield them to preserve 1-baseed indexing for ids.
// However we do have to yield them to preserve 1-based indexing for ids.
dloc := DocLocation{First: seq.SystemID, Second: seq.SystemDocPos}
if !yield(dloc, nil) {
return
Expand Down Expand Up @@ -193,7 +193,7 @@ func (src *ActiveSealingSource) TokenTriplet() iter.Seq2[string, iter.Seq2[Token
func (src *ActiveSealingSource) postingsForField(field string, idx int) iter.Seq2[TokenPosting, error] {
var lidsbuf []uint32
return func(yield func(TokenPosting, error) bool) {
for _, tid := range src.fieldTids[idx] {
for _, tid := range src.fieldTIDs[idx] {
token := src.tokens[tid]

lids := src.lids[tid].SortedLIDsUnsafe()
Expand Down
8 changes: 5 additions & 3 deletions frac/common/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"github.com/ozontech/seq-db/seq"
)

const DistributionMaxInterval = 24 * time.Hour
const DistributionBucket = time.Minute
const DistributionSpreadThreshold = 10 * time.Minute
const (
DistributionMaxInterval = 24 * time.Hour
DistributionBucket = time.Minute
DistributionSpreadThreshold = 10 * time.Minute
)

type Info struct {
Path string `json:"name"`
Expand Down
20 changes: 3 additions & 17 deletions frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ import (
"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/frac/sealed/lids"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/frac/sealed/seqids"
"github.com/ozontech/seq-db/frac/sealed/token"
"github.com/ozontech/seq-db/indexer"
"github.com/ozontech/seq-db/parser"
"github.com/ozontech/seq-db/seq"
Expand Down Expand Up @@ -353,28 +350,17 @@ func seal(active *Active) (*Sealed, error) {
if err != nil {
return nil, err
}
indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

sealed := NewSealedPreloaded(
active.BaseFileName,
preloaded,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
&Config{},
testSkipMaskProvider{},
)

active.Release()
return sealed, nil
}
55 changes: 5 additions & 50 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import (
"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/frac/sealed/lids"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/frac/sealed/seqids"
"github.com/ozontech/seq-db/frac/sealed/token"
"github.com/ozontech/seq-db/indexer"
"github.com/ozontech/seq-db/node"
"github.com/ozontech/seq-db/parser"
Expand Down Expand Up @@ -1837,11 +1834,11 @@ func (s *FractionTestSuite) TestFractionInfo() {
s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match")
case *Sealed:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1600),
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
case *Remote:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1600),
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
default:
s.Require().Fail("unsupported fraction type")
Expand Down Expand Up @@ -2093,25 +2090,11 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed {
preloaded, err := sealing.Seal(activeSealingSource, s.sealParams)
s.Require().NoError(err, "Sealing failed")

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

sealed := NewSealedPreloaded(
active.BaseFileName,
preloaded,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
s.config,
testSkipMaskProvider{},
Expand Down Expand Up @@ -2289,24 +2272,10 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal
sealed := s.newSealed(bulks...)
sealed.Release()

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

sealed = NewSealed(
sealed.BaseFileName,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
nil,
s.config,
Expand Down Expand Up @@ -2363,25 +2332,11 @@ func (s *RemoteFractionTestSuite) SetupTest() {
s.Require().NoError(err, "offload failed")
s.Require().True(offloaded, "didn't offload frac")

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

remoteFrac := NewRemote(
context.Background(),
sealed.BaseFileName,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
sealed.info,
s.config,
Expand Down
41 changes: 34 additions & 7 deletions frac/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,60 @@ import (
"github.com/ozontech/seq-db/frac/sealed/token"
)

func newIndexCache() *IndexCache {
return &IndexCache{
LegacyRegistry: cache.NewCache[[]byte](nil, nil),

TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),

MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),

Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
}
}

type IndexCache struct {
// Registry cache for legacy sealed fractions.
LegacyRegistry *cache.Cache[[]byte]

// Per-file registry caches (each IndexReader needs its own).
InfoRegistry *cache.Cache[[]byte]
TokenRegistry *cache.Cache[[]byte]
OffsetsRegistry *cache.Cache[[]byte]
IDRegistry *cache.Cache[[]byte]
LIDRegistry *cache.Cache[[]byte]

// Block-level data caches shared across all readers.
MIDs *cache.Cache[[]byte]
RIDs *cache.Cache[seqids.BlockRIDs]
Params *cache.Cache[seqids.BlockParams]
MIDs *cache.Cache[[]byte]
RIDs *cache.Cache[seqids.BlockRIDs]
Params *cache.Cache[seqids.BlockParams]

Tokens *cache.Cache[*token.Block]
TokenTable *cache.Cache[token.Table]
LIDs *cache.Cache[*lids.Block]

LIDs *cache.Cache[*lids.Block]
}

func (s *IndexCache) Release() {
s.InfoRegistry.Release()
s.LegacyRegistry.Release()

s.TokenRegistry.Release()
s.OffsetsRegistry.Release()
s.IDRegistry.Release()
s.LIDRegistry.Release()
s.LIDs.Release()

s.MIDs.Release()
s.RIDs.Release()
s.Params.Release()

s.Tokens.Release()
s.TokenTable.Release()

s.LIDs.Release()
}
Loading