Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"fmt"
"hash/fnv"
"io"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -117,13 +118,12 @@ func analyzeIndex(
defer tokenFile.Close()
defer lidFile.Close()

infoReader := storage.NewIndexReader(rl, infoFile.Name(), infoFile, indexCache.InfoRegistry)
tokenReader := storage.NewIndexReader(rl, tokenFile.Name(), tokenFile, indexCache.TokenRegistry)
lidReader := storage.NewIndexReader(rl, lidFile.Name(), lidFile, indexCache.LIDRegistry)

// --- Info ---
var blockIndex uint32
infoData, _, err := infoReader.ReadIndexBlock(0, nil)
infoData, err := io.ReadAll(infoFile)
if err != nil {
logger.Fatal("error reading info block", zap.String("file", infoFile.Name()), zap.Error(err))
}
Expand Down
5 changes: 4 additions & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ const (
BinaryDataV2

// BinariDataV3 - `.index` file is split across several files
// storing specific sections: `.info`, `.offsets`, `.tokens`, `.ids`, `.lids`
// storing specific sections: `.info`, `.offsets`, `.tokens`, `.ids`, `.lids`.
//
// Also in this version we've changed the binary layout of section storing
// info block. As a result we store info as a plain JSON without additional registry.
BinaryDataV3
)

Expand Down
8 changes: 5 additions & 3 deletions frac/common/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"github.com/ozontech/seq-db/seq"
)

const DistributionMaxInterval = 24 * time.Hour
const DistributionBucket = time.Minute
const DistributionSpreadThreshold = 10 * time.Minute
const (
DistributionMaxInterval = 24 * time.Hour
DistributionBucket = time.Minute
DistributionSpreadThreshold = 10 * time.Minute
)

type Info struct {
Path string `json:"name"`
Expand Down
20 changes: 3 additions & 17 deletions frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ import (
"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/frac/sealed/lids"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/frac/sealed/seqids"
"github.com/ozontech/seq-db/frac/sealed/token"
"github.com/ozontech/seq-db/indexer"
"github.com/ozontech/seq-db/parser"
"github.com/ozontech/seq-db/seq"
Expand Down Expand Up @@ -353,28 +350,17 @@ func seal(active *Active) (*Sealed, error) {
if err != nil {
return nil, err
}
indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

sealed := NewSealedPreloaded(
active.BaseFileName,
preloaded,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
&Config{},
testSkipMaskProvider{},
)

active.Release()
return sealed, nil
}
55 changes: 5 additions & 50 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import (
"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/frac/sealed/lids"
"github.com/ozontech/seq-db/frac/sealed/sealing"
"github.com/ozontech/seq-db/frac/sealed/seqids"
"github.com/ozontech/seq-db/frac/sealed/token"
"github.com/ozontech/seq-db/indexer"
"github.com/ozontech/seq-db/node"
"github.com/ozontech/seq-db/parser"
Expand Down Expand Up @@ -1837,11 +1834,11 @@ func (s *FractionTestSuite) TestFractionInfo() {
s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match")
case *Sealed:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700),
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
case *Remote:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700),
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
default:
s.Require().Fail("unsupported fraction type")
Expand Down Expand Up @@ -2093,25 +2090,11 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed {
preloaded, err := sealing.Seal(activeSealingSource, s.sealParams)
s.Require().NoError(err, "Sealing failed")

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

sealed := NewSealedPreloaded(
active.BaseFileName,
preloaded,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
s.config,
testSkipMaskProvider{},
Expand Down Expand Up @@ -2289,24 +2272,10 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal
sealed := s.newSealed(bulks...)
sealed.Release()

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

sealed = NewSealed(
sealed.BaseFileName,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
nil,
s.config,
Expand Down Expand Up @@ -2363,25 +2332,11 @@ func (s *RemoteFractionTestSuite) SetupTest() {
s.Require().NoError(err, "offload failed")
s.Require().True(offloaded, "didn't offload frac")

indexCache := &IndexCache{
MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),
}

remoteFrac := NewRemote(
context.Background(),
sealed.BaseFileName,
storage.NewReadLimiter(1, nil),
indexCache,
newIndexCache(),
cache.NewCache[[]byte](nil, nil),
sealed.info,
s.config,
Expand Down
41 changes: 34 additions & 7 deletions frac/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,60 @@ import (
"github.com/ozontech/seq-db/frac/sealed/token"
)

func newIndexCache() *IndexCache {
return &IndexCache{
LegacyRegistry: cache.NewCache[[]byte](nil, nil),

TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
LIDRegistry: cache.NewCache[[]byte](nil, nil),

MIDs: cache.NewCache[[]byte](nil, nil),
RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil),
Params: cache.NewCache[seqids.BlockParams](nil, nil),

Tokens: cache.NewCache[*token.Block](nil, nil),
TokenTable: cache.NewCache[token.Table](nil, nil),
LIDs: cache.NewCache[*lids.Block](nil, nil),
}
}

type IndexCache struct {
// Registry cache for legacy sealed fractions.
LegacyRegistry *cache.Cache[[]byte]

// Per-file registry caches (each IndexReader needs its own).
InfoRegistry *cache.Cache[[]byte]
TokenRegistry *cache.Cache[[]byte]
OffsetsRegistry *cache.Cache[[]byte]
IDRegistry *cache.Cache[[]byte]
LIDRegistry *cache.Cache[[]byte]

// Block-level data caches shared across all readers.
MIDs *cache.Cache[[]byte]
RIDs *cache.Cache[seqids.BlockRIDs]
Params *cache.Cache[seqids.BlockParams]
MIDs *cache.Cache[[]byte]
RIDs *cache.Cache[seqids.BlockRIDs]
Params *cache.Cache[seqids.BlockParams]

Tokens *cache.Cache[*token.Block]
TokenTable *cache.Cache[token.Table]
LIDs *cache.Cache[*lids.Block]

LIDs *cache.Cache[*lids.Block]
}

func (s *IndexCache) Release() {
s.InfoRegistry.Release()
s.LegacyRegistry.Release()

s.TokenRegistry.Release()
s.OffsetsRegistry.Release()
s.IDRegistry.Release()
s.LIDRegistry.Release()
s.LIDs.Release()

s.MIDs.Release()
s.RIDs.Release()
s.Params.Release()

s.Tokens.Release()
s.TokenTable.Release()

s.LIDs.Release()
}
Loading