Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"fmt"
"hash/fnv"
"io"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -117,13 +118,12 @@ func analyzeIndex(
defer tokenFile.Close()
defer lidFile.Close()

infoReader := storage.NewIndexReader(rl, infoFile.Name(), infoFile, indexCache.InfoRegistry)
tokenReader := storage.NewIndexReader(rl, tokenFile.Name(), tokenFile, indexCache.TokenRegistry)
lidReader := storage.NewIndexReader(rl, lidFile.Name(), lidFile, indexCache.LIDRegistry)

// --- Info ---
var blockIndex uint32
infoData, _, err := infoReader.ReadIndexBlock(0, nil)
infoData, err := io.ReadAll(infoFile)
if err != nil {
logger.Fatal("error reading info block", zap.String("file", infoFile.Name()), zap.Error(err))
}
Expand Down
5 changes: 4 additions & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ const (
BinaryDataV2

// BinariDataV3 - `.index` file is split across several files
// storing specific sections: `.info`, `.offsets`, `.tokens`, `.ids`, `.lids`
// storing specific sections: `.info`, `.offsets`, `.tokens`, `.ids`, `.lids`.
//
// Also in this version we've changed the binary layout of section storing
// info block. As a result we store info as a plain JSON without additional registry.
BinaryDataV3
)

Expand Down
8 changes: 5 additions & 3 deletions frac/common/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"github.com/ozontech/seq-db/seq"
)

const DistributionMaxInterval = 24 * time.Hour
const DistributionBucket = time.Minute
const DistributionSpreadThreshold = 10 * time.Minute
const (
DistributionMaxInterval = 24 * time.Hour
DistributionBucket = time.Minute
DistributionSpreadThreshold = 10 * time.Minute
)

type Info struct {
Path string `json:"name"`
Expand Down
4 changes: 2 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1834,11 +1834,11 @@ func (s *FractionTestSuite) TestFractionInfo() {
s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match")
case *Sealed:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700),
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
case *Remote:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700),
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
default:
s.Require().Fail("unsupported fraction type")
Expand Down
3 changes: 0 additions & 3 deletions frac/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ func newIndexCache() *IndexCache {
return &IndexCache{
LegacyRegistry: cache.NewCache[[]byte](nil, nil),

InfoRegistry: cache.NewCache[[]byte](nil, nil),
TokenRegistry: cache.NewCache[[]byte](nil, nil),
OffsetsRegistry: cache.NewCache[[]byte](nil, nil),
IDRegistry: cache.NewCache[[]byte](nil, nil),
Expand All @@ -32,7 +31,6 @@ type IndexCache struct {
LegacyRegistry *cache.Cache[[]byte]

// Per-file registry caches (each IndexReader needs its own).
InfoRegistry *cache.Cache[[]byte]
TokenRegistry *cache.Cache[[]byte]
OffsetsRegistry *cache.Cache[[]byte]
IDRegistry *cache.Cache[[]byte]
Expand All @@ -52,7 +50,6 @@ type IndexCache struct {
func (s *IndexCache) Release() {
s.LegacyRegistry.Release()

s.InfoRegistry.Release()
s.TokenRegistry.Release()
s.OffsetsRegistry.Release()
s.IDRegistry.Release()
Expand Down
89 changes: 49 additions & 40 deletions frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type Remote struct {
idFile storage.ImmutableFile
lidFile storage.ImmutableFile

infoReader storage.IndexReader
tokenReader storage.IndexReader
offsetsReader storage.IndexReader
idReader storage.IndexReader
Expand Down Expand Up @@ -192,7 +191,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable),

idsTable: &f.blocksData.IDsTable,
idsProvider: seqids.NewProvider(
Expand Down Expand Up @@ -257,16 +256,16 @@ func (f *Remote) loadInfo() error {
if err := f.openInfoLegacy(); err != nil {
return err
}
f.info = loadInfo(f.legacyReader)

f.info = loadInfoLegacy(f.legacyReader)
return nil
}

if err := f.openInfo(); err != nil {
return err
}
f.info = loadInfo(f.infoReader)

f.info = loadInfo(f.infoFile)
return nil
}

Expand All @@ -293,7 +292,6 @@ func (f *Remote) init() error {
}

(&Loader{}).Load(&f.blocksData, f.info, IndexReaders{
Info: f.infoReader,
Token: f.tokenReader,
Offsets: f.offsetsReader,
ID: f.idReader,
Expand Down Expand Up @@ -323,13 +321,12 @@ func (f *Remote) openInfo() error {
return nil
}

return f.openRemoteFile(consts.InfoFileSuffix, func(file storage.ImmutableFile) {
f.infoFile = file
f.infoReader = storage.NewIndexReader(
f.readLimiter, file.Name(),
file, f.indexCache.InfoRegistry,
)
})
return f.openRemoteFile(
consts.InfoFileSuffix,
func(file storage.ImmutableFile) {
f.infoFile = file
},
)
}

func (f *Remote) openIndex() error {
Expand All @@ -342,49 +339,61 @@ func (f *Remote) openIndex() error {
}

if f.tokenFile == nil {
if err := f.openRemoteFile(consts.TokenFileSuffix, func(file storage.ImmutableFile) {
f.tokenFile = file
f.tokenReader = storage.NewIndexReader(
f.readLimiter, file.Name(),
file, f.indexCache.TokenRegistry,
)
}); err != nil {
if err := f.openRemoteFile(
consts.TokenFileSuffix,
func(file storage.ImmutableFile) {
f.tokenFile = file
f.tokenReader = storage.NewIndexReader(
f.readLimiter, file.Name(),
file, f.indexCache.TokenRegistry,
)
},
); err != nil {
return err
}
}

if f.offsetsFile == nil {
if err := f.openRemoteFile(consts.OffsetsFileSuffix, func(file storage.ImmutableFile) {
f.offsetsFile = file
f.offsetsReader = storage.NewIndexReader(
f.readLimiter, file.Name(),
file, f.indexCache.OffsetsRegistry,
)
}); err != nil {
if err := f.openRemoteFile(
consts.OffsetsFileSuffix,
func(file storage.ImmutableFile) {
f.offsetsFile = file
f.offsetsReader = storage.NewIndexReader(
f.readLimiter, file.Name(),
file, f.indexCache.OffsetsRegistry,
)
},
); err != nil {
return err
}
}

if f.idFile == nil {
if err := f.openRemoteFile(consts.IDFileSuffix, func(file storage.ImmutableFile) {
f.idFile = file
f.idReader = storage.NewIndexReader(
f.readLimiter, file.Name(),
file, f.indexCache.IDRegistry,
)
}); err != nil {
if err := f.openRemoteFile(
consts.IDFileSuffix,
func(file storage.ImmutableFile) {
f.idFile = file
f.idReader = storage.NewIndexReader(
f.readLimiter, file.Name(),
file, f.indexCache.IDRegistry,
)
},
); err != nil {
return err
}
}

if f.lidFile == nil {
if err := f.openRemoteFile(consts.LIDFileSuffix, func(file storage.ImmutableFile) {
f.lidFile = file
f.lidReader = storage.NewIndexReader(
f.readLimiter, file.Name(),
file, f.indexCache.LIDRegistry,
)
}); err != nil {
if err := f.openRemoteFile(
consts.LIDFileSuffix,
func(file storage.ImmutableFile) {
f.lidFile = file
f.lidReader = storage.NewIndexReader(
f.readLimiter, file.Name(),
file, f.indexCache.LIDRegistry,
)
},
); err != nil {
return err
}
}
Expand Down
42 changes: 30 additions & 12 deletions frac/sealed.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package frac
import (
"context"
"errors"
"io"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -49,7 +50,6 @@ type Sealed struct {
idFile *os.File
lidFile *os.File

infoReader storage.IndexReader
tokenReader storage.IndexReader
offsetsReader storage.IndexReader
idReader storage.IndexReader
Expand Down Expand Up @@ -197,10 +197,6 @@ func (f *Sealed) openInfo() {
}

f.infoFile = file
f.infoReader = storage.NewIndexReader(
f.readLimiter, file.Name(),
file, f.indexCache.InfoRegistry,
)
}

func (f *Sealed) openIndex() {
Expand Down Expand Up @@ -285,12 +281,12 @@ func (f *Sealed) openDocs() {
func (f *Sealed) loadInfo() {
if f.IsLegacy {
f.openInfoLegacy()
f.info = loadInfo(f.legacyReader)
f.info = loadInfoLegacy(f.legacyReader)
return
}

f.openInfo()
f.info = loadInfo(f.infoReader)
f.info = loadInfo(f.infoFile)
}

func (f *Sealed) init(full bool) {
Expand All @@ -311,7 +307,6 @@ func (f *Sealed) init(full bool) {
}

(&Loader{}).Load(&f.blocksData, f.info, IndexReaders{
Info: f.infoReader,
Token: f.tokenReader,
Offsets: f.offsetsReader,
ID: f.idReader,
Expand Down Expand Up @@ -512,7 +507,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider {
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable),

idsTable: &f.blocksData.IDsTable,
idsProvider: seqids.NewProvider(
Expand Down Expand Up @@ -540,15 +535,38 @@ func (f *Sealed) IsIntersecting(from, to seq.MID) bool {
return f.info.IsIntersecting(from, to)
}

func loadInfo(infoReader storage.IndexReader) *common.Info {
func loadInfoLegacy(infoReader storage.IndexReader) *common.Info {
block, _, err := infoReader.ReadIndexBlock(0, nil)
if err != nil {
logger.Fatal("error reading info block", zap.Error(err))
logger.Fatal("cannot read info block", zap.Error(err))
}

var bi sealed.BlockInfo
if err := bi.Unpack(block); err != nil {
logger.Fatal("cannot unpack info block", zap.Error(err))
}

return bi.Info
}

func loadInfo(r interface {
io.ReaderAt
Stat() (os.FileInfo, error)
},
) *common.Info {
stat, err := r.Stat()
if err != nil {
logger.Fatal("cannot stat info file", zap.Error(err))
}

block := make([]byte, stat.Size())
if _, err := r.ReadAt(block, io.SeekStart); err != nil {
logger.Fatal("cannot read info block", zap.Error(err))
}

var bi sealed.BlockInfo
if err := bi.Unpack(block); err != nil {
logger.Fatal("error unpacking info block", zap.Error(err))
logger.Fatal("cannot unpack info block", zap.Error(err))
}

return bi.Info
Expand Down
Loading