From b958d7cfef0b1551783739c59b78f91db265d1af Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Mon, 4 May 2026 13:14:50 +0300 Subject: [PATCH 1/4] refactor: simplify opening sealed fraction --- frac/fraction_concurrency_test.go | 20 +--- frac/fraction_test.go | 51 +--------- frac/index_cache.go | 40 +++++++- frac/sealed.go | 150 +++++++++++++++--------------- fracmanager/cache_maintainer.go | 17 ++-- 5 files changed, 129 insertions(+), 149 deletions(-) diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 138586fd..27f5d971 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -16,10 +16,7 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/sealing" - "github.com/ozontech/seq-db/frac/sealed/seqids" - "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/parser" "github.com/ozontech/seq-db/seq" @@ -353,28 +350,17 @@ func seal(active *Active) (*Sealed, error) { if err != nil { return nil, err } - indexCache := &IndexCache{ - MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), - Params: cache.NewCache[seqids.BlockParams](nil, nil), - LIDs: cache.NewCache[*lids.Block](nil, nil), - Tokens: cache.NewCache[*token.Block](nil, nil), - TokenTable: cache.NewCache[token.Table](nil, nil), - InfoRegistry: cache.NewCache[[]byte](nil, nil), - TokenRegistry: cache.NewCache[[]byte](nil, nil), - OffsetsRegistry: cache.NewCache[[]byte](nil, nil), - IDRegistry: cache.NewCache[[]byte](nil, nil), - LIDRegistry: cache.NewCache[[]byte](nil, nil), - } + sealed := NewSealedPreloaded( active.BaseFileName, preloaded, storage.NewReadLimiter(1, nil), - indexCache, + newIndexCache(), cache.NewCache[[]byte](nil, nil), &Config{}, testSkipMaskProvider{}, ) + active.Release() return sealed, nil } diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 229216ef..bdf4e4e0 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -22,10 +22,7 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/sealing" - "github.com/ozontech/seq-db/frac/sealed/seqids" - "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/parser" @@ -2093,25 +2090,11 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed { preloaded, err := sealing.Seal(activeSealingSource, s.sealParams) s.Require().NoError(err, "Sealing failed") - indexCache := &IndexCache{ - MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), - Params: cache.NewCache[seqids.BlockParams](nil, nil), - LIDs: cache.NewCache[*lids.Block](nil, nil), - Tokens: cache.NewCache[*token.Block](nil, nil), - TokenTable: cache.NewCache[token.Table](nil, nil), - InfoRegistry: cache.NewCache[[]byte](nil, nil), - TokenRegistry: cache.NewCache[[]byte](nil, nil), - OffsetsRegistry: cache.NewCache[[]byte](nil, nil), - IDRegistry: cache.NewCache[[]byte](nil, nil), - LIDRegistry: cache.NewCache[[]byte](nil, nil), - } - sealed := NewSealedPreloaded( active.BaseFileName, preloaded, storage.NewReadLimiter(1, nil), - indexCache, + newIndexCache(), cache.NewCache[[]byte](nil, nil), s.config, testSkipMaskProvider{}, @@ -2289,24 +2272,10 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal sealed := s.newSealed(bulks...) sealed.Release() - indexCache := &IndexCache{ - MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), - Params: cache.NewCache[seqids.BlockParams](nil, nil), - LIDs: cache.NewCache[*lids.Block](nil, nil), - Tokens: cache.NewCache[*token.Block](nil, nil), - TokenTable: cache.NewCache[token.Table](nil, nil), - InfoRegistry: cache.NewCache[[]byte](nil, nil), - TokenRegistry: cache.NewCache[[]byte](nil, nil), - OffsetsRegistry: cache.NewCache[[]byte](nil, nil), - IDRegistry: cache.NewCache[[]byte](nil, nil), - LIDRegistry: cache.NewCache[[]byte](nil, nil), - } - sealed = NewSealed( sealed.BaseFileName, storage.NewReadLimiter(1, nil), - indexCache, + newIndexCache(), cache.NewCache[[]byte](nil, nil), nil, s.config, @@ -2363,25 +2332,11 @@ func (s *RemoteFractionTestSuite) SetupTest() { s.Require().NoError(err, "offload failed") s.Require().True(offloaded, "didn't offload frac") - indexCache := &IndexCache{ - MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), - Params: cache.NewCache[seqids.BlockParams](nil, nil), - LIDs: cache.NewCache[*lids.Block](nil, nil), - Tokens: cache.NewCache[*token.Block](nil, nil), - TokenTable: cache.NewCache[token.Table](nil, nil), - InfoRegistry: cache.NewCache[[]byte](nil, nil), - TokenRegistry: cache.NewCache[[]byte](nil, nil), - OffsetsRegistry: cache.NewCache[[]byte](nil, nil), - IDRegistry: cache.NewCache[[]byte](nil, nil), - LIDRegistry: cache.NewCache[[]byte](nil, nil), - } - remoteFrac := NewRemote( context.Background(), sealed.BaseFileName, storage.NewReadLimiter(1, nil), - indexCache, + newIndexCache(), cache.NewCache[[]byte](nil, nil), sealed.info, s.config, diff --git a/frac/index_cache.go b/frac/index_cache.go index 852fe51f..8076495e 100644 --- a/frac/index_cache.go +++ b/frac/index_cache.go @@ -7,7 +7,30 @@ import ( "github.com/ozontech/seq-db/frac/sealed/token" ) +func newIndexCache() *IndexCache { + return &IndexCache{ + LegacyRegistry: cache.NewCache[[]byte](nil, nil), + + InfoRegistry: cache.NewCache[[]byte](nil, nil), + TokenRegistry: cache.NewCache[[]byte](nil, nil), + OffsetsRegistry: cache.NewCache[[]byte](nil, nil), + IDRegistry: cache.NewCache[[]byte](nil, nil), + LIDRegistry: cache.NewCache[[]byte](nil, nil), + + MIDs: cache.NewCache[[]byte](nil, nil), + RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), + Params: cache.NewCache[seqids.BlockParams](nil, nil), + + Tokens: cache.NewCache[*token.Block](nil, nil), + TokenTable: cache.NewCache[token.Table](nil, nil), + LIDs: cache.NewCache[*lids.Block](nil, nil), + } +} + type IndexCache struct { + // Registry cache for legacy sealed fractions. + LegacyRegistry *cache.Cache[[]byte] + // Per-file registry caches (each IndexReader needs its own). InfoRegistry *cache.Cache[[]byte] TokenRegistry *cache.Cache[[]byte] @@ -16,24 +39,31 @@ type IndexCache struct { LIDRegistry *cache.Cache[[]byte] // Block-level data caches shared across all readers. - MIDs *cache.Cache[[]byte] - RIDs *cache.Cache[seqids.BlockRIDs] - Params *cache.Cache[seqids.BlockParams] + MIDs *cache.Cache[[]byte] + RIDs *cache.Cache[seqids.BlockRIDs] + Params *cache.Cache[seqids.BlockParams] + Tokens *cache.Cache[*token.Block] TokenTable *cache.Cache[token.Table] - LIDs *cache.Cache[*lids.Block] + + LIDs *cache.Cache[*lids.Block] } func (s *IndexCache) Release() { + s.LegacyRegistry.Release() + s.InfoRegistry.Release() s.TokenRegistry.Release() s.OffsetsRegistry.Release() s.IDRegistry.Release() s.LIDRegistry.Release() - s.LIDs.Release() + s.MIDs.Release() s.RIDs.Release() s.Params.Release() + s.Tokens.Release() s.TokenTable.Release() + + s.LIDs.Release() } diff --git a/frac/sealed.go b/frac/sealed.go index c7c92023..df293b55 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -109,40 +109,89 @@ func NewSealed( return f } - f.openInfo() - f.info = loadInfo(f.infoReader) - f.info.IndexOnDisk = computeIndexOnDisk(f.BaseFileName, f.IsLegacy) + if !isLegacy { + f.openInfo() + f.info = loadInfo(f.infoReader) + } else { + f.openInfoLegacy() + f.info = loadInfo(f.legacyReader) + } + f.info.IndexOnDisk = computeIndexOnDisk(f.BaseFileName, f.IsLegacy) return f } -func (f *Sealed) openInfo() { - if f.IsLegacy { - if f.legacyFile != nil { - return - } +func NewSealedPreloaded( + baseFile string, + preloaded *sealed.PreloadedData, + rl *storage.ReadLimiter, + indexCache *IndexCache, + docsCache *cache.Cache[[]byte], + config *Config, + skipMaskProvider skipMaskProvider, +) *Sealed { + f := &Sealed{ + blocksData: preloaded.BlocksData, + docsCache: docsCache, + indexCache: indexCache, - name := f.BaseFileName + consts.IndexFileSuffix - file, err := os.Open(name) - if err != nil { - logger.Fatal( - "can't open legacy index file", - zap.String("file", name), - zap.Error(err), - ) - } + loadMu: &sync.RWMutex{}, + isLoaded: true, - f.legacyFile = file - f.legacyReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.InfoRegistry, - ) + readLimiter: rl, - // infoReader is used by [loadInfo] - f.infoReader = f.legacyReader + info: preloaded.Info, + BaseFileName: baseFile, + Config: config, + + skipMaskProvider: skipMaskProvider, + } + + // Put token table built during sealing into the cache. + indexCache.TokenTable.Get(token.CacheKeyTable, func() (token.Table, int) { + return preloaded.TokenTable, preloaded.TokenTable.Size() + }) + + f.openDocs() + f.openIndex() + + docsCountK := float64(f.info.DocsTotal) / 1000 + logger.Info("sealed fraction created from active", + zap.String("frac", f.info.Name()), + util.ZapMsTsAsESTimeStr("creation_time", f.info.CreationTime), + zap.String("from", f.info.From.String()), + zap.String("to", f.info.To.String()), + util.ZapFloat64WithPrec("docs_k", docsCountK, 1), + ) + + f.info.MetaOnDisk = 0 + + return f +} + +func (f *Sealed) openInfoLegacy() { + if f.legacyFile != nil { return } + name := f.BaseFileName + consts.IndexFileSuffix + file, err := os.Open(name) + if err != nil { + logger.Fatal( + "can't open legacy index file", + zap.String("file", name), + zap.Error(err), + ) + } + + f.legacyFile = file + f.legacyReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.LegacyRegistry, + ) +} + +func (f *Sealed) openInfo() { if f.infoFile != nil { return } @@ -165,11 +214,14 @@ func (f *Sealed) openInfo() { } func (f *Sealed) openIndex() { - f.openInfo() if f.IsLegacy { + // We have exactly one `.index` file for legacy sealed fractions. + // So opening only this file is sufficient. + f.openInfoLegacy() return } + f.openInfo() if f.tokenFile == nil { name := f.BaseFileName + consts.TokenFileSuffix file, err := os.Open(name) @@ -240,54 +292,6 @@ func (f *Sealed) openDocs() { f.docsReader = storage.NewDocsReader(f.readLimiter, f.docsFile, f.docsCache) } -func NewSealedPreloaded( - baseFile string, - preloaded *sealed.PreloadedData, - rl *storage.ReadLimiter, - indexCache *IndexCache, - docsCache *cache.Cache[[]byte], - config *Config, - skipMaskProvider skipMaskProvider, -) *Sealed { - f := &Sealed{ - blocksData: preloaded.BlocksData, - docsCache: docsCache, - indexCache: indexCache, - - loadMu: &sync.RWMutex{}, - isLoaded: true, - - readLimiter: rl, - - info: preloaded.Info, - BaseFileName: baseFile, - Config: config, - - skipMaskProvider: skipMaskProvider, - } - - // Put token table built during sealing into the cache. - indexCache.TokenTable.Get(token.CacheKeyTable, func() (token.Table, int) { - return preloaded.TokenTable, preloaded.TokenTable.Size() - }) - - f.openDocs() - f.openIndex() - - docsCountK := float64(f.info.DocsTotal) / 1000 - logger.Info("sealed fraction created from active", - zap.String("frac", f.info.Name()), - util.ZapMsTsAsESTimeStr("creation_time", f.info.CreationTime), - zap.String("from", f.info.From.String()), - zap.String("to", f.info.To.String()), - util.ZapFloat64WithPrec("docs_k", docsCountK, 1), - ) - - f.info.MetaOnDisk = 0 - - return f -} - func (f *Sealed) load() { f.loadMu.Lock() defer f.loadMu.Unlock() diff --git a/fracmanager/cache_maintainer.go b/fracmanager/cache_maintainer.go index 2a6ac6dd..d04aa439 100644 --- a/fracmanager/cache_maintainer.go +++ b/fracmanager/cache_maintainer.go @@ -143,18 +143,23 @@ func (cm *CacheMaintainer) CreateSortDocsCache() *cache.Cache[[]byte] { func (cm *CacheMaintainer) CreateIndexCache() *frac.IndexCache { return &frac.IndexCache{ - MIDs: newCache[[]byte](cm, midsName), - RIDs: newCache[seqids.BlockRIDs](cm, ridsName), - Params: newCache[seqids.BlockParams](cm, paramsName), - LIDs: newCache[*lids.Block](cm, lidsName), - Tokens: newCache[*token.Block](cm, tokensName), - TokenTable: newCache[token.Table](cm, tokenTableName), + LegacyRegistry: newCache[[]byte](cm, indexName), + // Each index file gets its own registry cache (they all use key=1 internally). InfoRegistry: newCache[[]byte](cm, indexName), TokenRegistry: newCache[[]byte](cm, indexName), OffsetsRegistry: newCache[[]byte](cm, indexName), IDRegistry: newCache[[]byte](cm, indexName), LIDRegistry: newCache[[]byte](cm, indexName), + + MIDs: newCache[[]byte](cm, midsName), + RIDs: newCache[seqids.BlockRIDs](cm, ridsName), + Params: newCache[seqids.BlockParams](cm, paramsName), + + Tokens: newCache[*token.Block](cm, tokensName), + TokenTable: newCache[token.Table](cm, tokenTableName), + + LIDs: newCache[*lids.Block](cm, lidsName), } } From e790404d832738fbda7be80eff2a7099b969036d Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Mon, 4 May 2026 15:39:03 +0300 Subject: [PATCH 2/4] refactor: make loading logic simpler --- frac/sealed.go | 61 +++++++++++++++++++++++++------------------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/frac/sealed.go b/frac/sealed.go index df293b55..2c7fd1fc 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -55,11 +55,11 @@ type Sealed struct { idReader storage.IndexReader lidReader storage.IndexReader + blocksData sealed.BlocksData indexCache *IndexCache - loadMu *sync.RWMutex - isLoaded bool - blocksData sealed.BlocksData + initMu *sync.RWMutex + isInited bool readLimiter *storage.ReadLimiter @@ -88,7 +88,7 @@ func NewSealed( isLegacy bool, ) *Sealed { f := &Sealed{ - loadMu: &sync.RWMutex{}, + initMu: &sync.RWMutex{}, readLimiter: readLimiter, docsCache: docsCache, @@ -109,15 +109,9 @@ func NewSealed( return f } - if !isLegacy { - f.openInfo() - f.info = loadInfo(f.infoReader) - } else { - f.openInfoLegacy() - f.info = loadInfo(f.legacyReader) - } - + f.loadInfo() f.info.IndexOnDisk = computeIndexOnDisk(f.BaseFileName, f.IsLegacy) + return f } @@ -135,8 +129,8 @@ func NewSealedPreloaded( docsCache: docsCache, indexCache: indexCache, - loadMu: &sync.RWMutex{}, - isLoaded: true, + initMu: &sync.RWMutex{}, + isInited: true, readLimiter: rl, @@ -152,9 +146,6 @@ func NewSealedPreloaded( return preloaded.TokenTable, preloaded.TokenTable.Size() }) - f.openDocs() - f.openIndex() - docsCountK := float64(f.info.DocsTotal) / 1000 logger.Info("sealed fraction created from active", zap.String("frac", f.info.Name()), @@ -165,7 +156,6 @@ func NewSealedPreloaded( ) f.info.MetaOnDisk = 0 - return f } @@ -292,20 +282,31 @@ func (f *Sealed) openDocs() { f.docsReader = storage.NewDocsReader(f.readLimiter, f.docsFile, f.docsCache) } -func (f *Sealed) load() { - f.loadMu.Lock() - defer f.loadMu.Unlock() - - if f.isLoaded { +func (f *Sealed) loadInfo() { + if f.IsLegacy { + f.openInfoLegacy() + f.info = loadInfo(f.legacyReader) return } + f.openInfo() + f.info = loadInfo(f.infoReader) +} + +func (f *Sealed) init(full bool) { + f.initMu.Lock() + defer f.initMu.Unlock() + f.openDocs() f.openIndex() + if f.isInited || !full { + return + } + if f.IsLegacy { (&LegacyLoader{}).Load(&f.blocksData, f.info, f.legacyReader) - f.isLoaded = true + f.isInited = true return } @@ -317,15 +318,12 @@ func (f *Sealed) load() { LID: f.lidReader, }) - f.isLoaded = true + f.isInited = true } // Offload saves all index files and docs to remote storage. func (f *Sealed) Offload(ctx context.Context, u storage.Uploader) (bool, error) { - f.loadMu.Lock() - f.openDocs() - f.openIndex() - f.loadMu.Unlock() + f.init(false) g, gctx := errgroup.WithContext(ctx) g.Go(func() error { return u.Upload(gctx, f.docsFile) }) @@ -356,6 +354,8 @@ func (f *Sealed) Offload(ctx context.Context, u storage.Uploader) (bool, error) } func (f *Sealed) Release() { + f.init(false) + indexFiles := []*os.File{ f.docsFile, f.infoFile, @@ -390,7 +390,6 @@ func (f *Sealed) Release() { func (f *Sealed) Suicide() { f.Release() - // Rename docs atomically first — this commits the intent to delete. oldPath := f.BaseFileName + consts.DocsFileSuffix newPath := f.BaseFileName + consts.DocsDelFileSuffix @@ -490,7 +489,7 @@ func (f *Sealed) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) } func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { - f.load() + f.init(true) tokenReader := &f.tokenReader lidReader := &f.lidReader From 810f3486d4b308581a5352de56abd0fcc5cf7e94 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Mon, 4 May 2026 15:57:17 +0300 Subject: [PATCH 3/4] refactor: make remote logic consistent --- frac/remote.go | 69 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/frac/remote.go b/frac/remote.go index 2d8506af..716a6f71 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -63,8 +63,8 @@ type Remote struct { indexCache *IndexCache - loadMu *sync.RWMutex - isLoaded bool + initMu *sync.RWMutex + isInited bool blocksData sealed.BlocksData s3cli *s3.Client @@ -88,7 +88,7 @@ func NewRemote( f := &Remote{ ctx: ctx, - loadMu: &sync.RWMutex{}, + initMu: &sync.RWMutex{}, readLimiter: readLimiter, docsCache: docsCache, @@ -116,7 +116,7 @@ func NewRemote( // I wrote a small proposal on how we can reduce impact of such events. // https://github.com/ozontech/seq-db/issues/92 - if err := f.openInfo(); err != nil { + if err := f.loadInfo(); err != nil { logger.Error( "cannot open info file: any subsequent operation will fail", zap.String("fraction", filepath.Base(f.BaseFileName)), @@ -124,7 +124,6 @@ func NewRemote( ) } - f.info = loadInfo(f.infoReader) return f } @@ -163,7 +162,7 @@ func (f *Remote) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) } func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, error) { - if err := f.load(); err != nil { + if err := f.init(); err != nil { logger.Error( "will create empty data provider: cannot load remote fraction", zap.String("fraction", f.Info().Name()), @@ -217,6 +216,8 @@ func (f *Remote) IsIntersecting(from, to seq.MID) bool { } func (f *Remote) Suicide() { + // FIXME(dkharms): We need to rename `.remote` file to `._remote` to commit deletion intent. + // Now, we might have fraction leaks in S3 storage since [Suicide] is not atomic. util.MustRemoveFileByPath(f.BaseFileName + consts.RemoteFractionSuffix) f.docsCache.Release() @@ -251,14 +252,28 @@ func (f *Remote) String() string { return fracToString(f, "remote") } -func (f *Remote) load() error { - f.loadMu.Lock() - defer f.loadMu.Unlock() +func (f *Remote) loadInfo() error { + if f.IsLegacy { + if err := f.openInfoLegacy(); err != nil { + return err + } + f.info = loadInfo(f.legacyReader) - if f.isLoaded { return nil } + if err := f.openInfo(); err != nil { + return err + } + f.info = loadInfo(f.infoReader) + + return nil +} + +func (f *Remote) init() error { + f.initMu.Lock() + defer f.initMu.Unlock() + if err := f.openDocs(); err != nil { return err } @@ -267,9 +282,13 @@ func (f *Remote) load() error { return err } + if f.isInited { + return nil + } + if f.IsLegacy { (&LegacyLoader{}).Load(&f.blocksData, f.info, f.legacyReader) - f.isLoaded = true + f.isInited = true return nil } @@ -281,29 +300,25 @@ func (f *Remote) load() error { LID: f.lidReader, }) - f.isLoaded = true + f.isInited = true return nil } -func (f *Remote) openInfo() error { - if f.IsLegacy { - if f.legacyFile != nil { - return nil - } - - indexName := filepath.Base(f.BaseFileName) + consts.IndexFileSuffix - f.legacyFile = s3.NewReader(f.ctx, f.s3cli, indexName) +func (f *Remote) openInfoLegacy() error { + if f.legacyFile != nil { + return nil + } + return f.openRemoteFile(consts.IndexFileSuffix, func(file storage.ImmutableFile) { + f.legacyFile = file f.legacyReader = storage.NewIndexReader( - f.readLimiter, indexName, - f.legacyFile, f.indexCache.InfoRegistry, + f.readLimiter, file.Name(), + file, f.indexCache.LegacyRegistry, ) + }) +} - // infoReader is used by [loadInfo] - f.infoReader = f.legacyReader - return nil - } - +func (f *Remote) openInfo() error { if f.infoFile != nil { return nil } From 03c83e6d9ac4a920a4bcd5d02e8ffb6fae0befaa Mon Sep 17 00:00:00 2001 From: Daniil Date: Tue, 5 May 2026 11:38:30 +0300 Subject: [PATCH 4/4] refactor: store info as a plain json (#418) --- cmd/index_analyzer/main.go | 4 +- config/frac_version.go | 5 +- frac/common/info.go | 8 +-- frac/fraction_test.go | 4 +- frac/index_cache.go | 3 -- frac/remote.go | 89 +++++++++++++++++-------------- frac/sealed.go | 42 ++++++++++----- frac/sealed/sealing/index.go | 40 ++------------ frac/sealed/token/table_loader.go | 69 ++++++++++++++++++++---- frac/sealed_loader.go | 33 +++++++----- fracmanager/cache_maintainer.go | 1 - go.mod | 2 +- go.sum | 4 +- storage/index_reader.go | 12 +++++ storage/s3/reader.go | 11 ++-- 15 files changed, 193 insertions(+), 134 deletions(-) diff --git a/cmd/index_analyzer/main.go b/cmd/index_analyzer/main.go index 4ea8dd44..e2c1c349 100644 --- a/cmd/index_analyzer/main.go +++ b/cmd/index_analyzer/main.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "fmt" "hash/fnv" + "io" "os" "strings" "sync" @@ -117,13 +118,12 @@ func analyzeIndex( defer tokenFile.Close() defer lidFile.Close() - infoReader := storage.NewIndexReader(rl, infoFile.Name(), infoFile, indexCache.InfoRegistry) tokenReader := storage.NewIndexReader(rl, tokenFile.Name(), tokenFile, indexCache.TokenRegistry) lidReader := storage.NewIndexReader(rl, lidFile.Name(), lidFile, indexCache.LIDRegistry) // --- Info --- var blockIndex uint32 - infoData, _, err := infoReader.ReadIndexBlock(0, nil) + infoData, err := io.ReadAll(infoFile) if err != nil { logger.Fatal("error reading info block", zap.String("file", infoFile.Name()), zap.Error(err)) } diff --git a/config/frac_version.go b/config/frac_version.go index 7bdd84d0..ff1283b0 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -13,7 +13,10 @@ const ( BinaryDataV2 // BinariDataV3 - `.index` file is split across several files - // storing specific sections: `.info`, `.offsets`, `.tokens`, `.ids`, `.lids` + // storing specific sections: `.info`, `.offsets`, `.tokens`, `.ids`, `.lids`. + // + // Also in this version we've changed the binary layout of section storing + // info block. As a result we store info as a plain JSON without additional registry. BinaryDataV3 ) diff --git a/frac/common/info.go b/frac/common/info.go index 69121408..20e7f7c2 100644 --- a/frac/common/info.go +++ b/frac/common/info.go @@ -15,9 +15,11 @@ import ( "github.com/ozontech/seq-db/seq" ) -const DistributionMaxInterval = 24 * time.Hour -const DistributionBucket = time.Minute -const DistributionSpreadThreshold = 10 * time.Minute +const ( + DistributionMaxInterval = 24 * time.Hour + DistributionBucket = time.Minute + DistributionSpreadThreshold = 10 * time.Minute +) type Info struct { Path string `json:"name"` diff --git a/frac/fraction_test.go b/frac/fraction_test.go index bdf4e4e0..8757c0db 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1834,11 +1834,11 @@ func (s *FractionTestSuite) TestFractionInfo() { s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match") case *Sealed: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700), + s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) case *Remote: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700), + s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) default: s.Require().Fail("unsupported fraction type") diff --git a/frac/index_cache.go b/frac/index_cache.go index 8076495e..043e8c5c 100644 --- a/frac/index_cache.go +++ b/frac/index_cache.go @@ -11,7 +11,6 @@ func newIndexCache() *IndexCache { return &IndexCache{ LegacyRegistry: cache.NewCache[[]byte](nil, nil), - InfoRegistry: cache.NewCache[[]byte](nil, nil), TokenRegistry: cache.NewCache[[]byte](nil, nil), OffsetsRegistry: cache.NewCache[[]byte](nil, nil), IDRegistry: cache.NewCache[[]byte](nil, nil), @@ -32,7 +31,6 @@ type IndexCache struct { LegacyRegistry *cache.Cache[[]byte] // Per-file registry caches (each IndexReader needs its own). - InfoRegistry *cache.Cache[[]byte] TokenRegistry *cache.Cache[[]byte] OffsetsRegistry *cache.Cache[[]byte] IDRegistry *cache.Cache[[]byte] @@ -52,7 +50,6 @@ type IndexCache struct { func (s *IndexCache) Release() { s.LegacyRegistry.Release() - s.InfoRegistry.Release() s.TokenRegistry.Release() s.OffsetsRegistry.Release() s.IDRegistry.Release() diff --git a/frac/remote.go b/frac/remote.go index 716a6f71..f68e6986 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -55,7 +55,6 @@ type Remote struct { idFile storage.ImmutableFile lidFile storage.ImmutableFile - infoReader storage.IndexReader tokenReader storage.IndexReader offsetsReader storage.IndexReader idReader storage.IndexReader @@ -192,7 +191,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), - tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( @@ -257,16 +256,16 @@ func (f *Remote) loadInfo() error { if err := f.openInfoLegacy(); err != nil { return err } - f.info = loadInfo(f.legacyReader) + f.info = loadInfoLegacy(f.legacyReader) return nil } if err := f.openInfo(); err != nil { return err } - f.info = loadInfo(f.infoReader) + f.info = loadInfo(f.infoFile) return nil } @@ -293,7 +292,6 @@ func (f *Remote) init() error { } (&Loader{}).Load(&f.blocksData, f.info, IndexReaders{ - Info: f.infoReader, Token: f.tokenReader, Offsets: f.offsetsReader, ID: f.idReader, @@ -323,13 +321,12 @@ func (f *Remote) openInfo() error { return nil } - return f.openRemoteFile(consts.InfoFileSuffix, func(file storage.ImmutableFile) { - f.infoFile = file - f.infoReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.InfoRegistry, - ) - }) + return f.openRemoteFile( + consts.InfoFileSuffix, + func(file storage.ImmutableFile) { + f.infoFile = file + }, + ) } func (f *Remote) openIndex() error { @@ -342,49 +339,61 @@ func (f *Remote) openIndex() error { } if f.tokenFile == nil { - if err := f.openRemoteFile(consts.TokenFileSuffix, func(file storage.ImmutableFile) { - f.tokenFile = file - f.tokenReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.TokenRegistry, - ) - }); err != nil { + if err := f.openRemoteFile( + consts.TokenFileSuffix, + func(file storage.ImmutableFile) { + f.tokenFile = file + f.tokenReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.TokenRegistry, + ) + }, + ); err != nil { return err } } if f.offsetsFile == nil { - if err := f.openRemoteFile(consts.OffsetsFileSuffix, func(file storage.ImmutableFile) { - f.offsetsFile = file - f.offsetsReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.OffsetsRegistry, - ) - }); err != nil { + if err := f.openRemoteFile( + consts.OffsetsFileSuffix, + func(file storage.ImmutableFile) { + f.offsetsFile = file + f.offsetsReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.OffsetsRegistry, + ) + }, + ); err != nil { return err } } if f.idFile == nil { - if err := f.openRemoteFile(consts.IDFileSuffix, func(file storage.ImmutableFile) { - f.idFile = file - f.idReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.IDRegistry, - ) - }); err != nil { + if err := f.openRemoteFile( + consts.IDFileSuffix, + func(file storage.ImmutableFile) { + f.idFile = file + f.idReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.IDRegistry, + ) + }, + ); err != nil { return err } } if f.lidFile == nil { - if err := f.openRemoteFile(consts.LIDFileSuffix, func(file storage.ImmutableFile) { - f.lidFile = file - f.lidReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.LIDRegistry, - ) - }); err != nil { + if err := f.openRemoteFile( + consts.LIDFileSuffix, + func(file storage.ImmutableFile) { + f.lidFile = file + f.lidReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.LIDRegistry, + ) + }, + ); err != nil { return err } } diff --git a/frac/sealed.go b/frac/sealed.go index 2c7fd1fc..5cf7bfa3 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -3,6 +3,7 @@ package frac import ( "context" "errors" + "io" "os" "path/filepath" "sync" @@ -49,7 +50,6 @@ type Sealed struct { idFile *os.File lidFile *os.File - infoReader storage.IndexReader tokenReader storage.IndexReader offsetsReader storage.IndexReader idReader storage.IndexReader @@ -197,10 +197,6 @@ func (f *Sealed) openInfo() { } f.infoFile = file - f.infoReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.InfoRegistry, - ) } func (f *Sealed) openIndex() { @@ -285,12 +281,12 @@ func (f *Sealed) openDocs() { func (f *Sealed) loadInfo() { if f.IsLegacy { f.openInfoLegacy() - f.info = loadInfo(f.legacyReader) + f.info = loadInfoLegacy(f.legacyReader) return } f.openInfo() - f.info = loadInfo(f.infoReader) + f.info = loadInfo(f.infoFile) } func (f *Sealed) init(full bool) { @@ -311,7 +307,6 @@ func (f *Sealed) init(full bool) { } (&Loader{}).Load(&f.blocksData, f.info, IndexReaders{ - Info: f.infoReader, Token: f.tokenReader, Offsets: f.offsetsReader, ID: f.idReader, @@ -512,7 +507,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), - tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( @@ -540,15 +535,38 @@ func (f *Sealed) IsIntersecting(from, to seq.MID) bool { return f.info.IsIntersecting(from, to) } -func loadInfo(infoReader storage.IndexReader) *common.Info { +func loadInfoLegacy(infoReader storage.IndexReader) *common.Info { block, _, err := infoReader.ReadIndexBlock(0, nil) if err != nil { - logger.Fatal("error reading info block", zap.Error(err)) + logger.Fatal("cannot read info block", zap.Error(err)) + } + + var bi sealed.BlockInfo + if err := bi.Unpack(block); err != nil { + logger.Fatal("cannot unpack info block", zap.Error(err)) + } + + return bi.Info +} + +func loadInfo(r interface { + io.ReaderAt + Stat() (os.FileInfo, error) +}, +) *common.Info { + stat, err := r.Stat() + if err != nil { + logger.Fatal("cannot stat info file", zap.Error(err)) + } + + block := make([]byte, stat.Size()) + if _, err := r.ReadAt(block, io.SeekStart); err != nil { + logger.Fatal("cannot read info block", zap.Error(err)) } var bi sealed.BlockInfo if err := bi.Unpack(block); err != nil { - logger.Fatal("error unpacking info block", zap.Error(err)) + logger.Fatal("cannot unpack info block", zap.Error(err)) } return bi.Info diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 576ff294..5c23842a 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -75,11 +75,6 @@ func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { return err } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } @@ -108,11 +103,6 @@ func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error { } } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } @@ -165,11 +155,6 @@ func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) return err } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } @@ -184,32 +169,13 @@ func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.Field return err } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } -func (s *IndexSealer) WriteInfoFile(ws io.WriteSeeker, src Source) error { - w, err := newWriter(ws) - if err != nil { - return err - } - defer w.release() - +func (s *IndexSealer) WriteInfoFile(ws io.Writer, src Source) error { block := sealed.BlockInfo{Info: src.Info()} - if err := w.writeBlock(blockTypeInfo, s.packInfoBlock(block)); err != nil { - return err - } - - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - - return w.finalize() + _, err := ws.Write(s.packInfoBlock(block).payload) + return err } // collapseOrderedFieldsTables merges FieldTables with the same field name. diff --git a/frac/sealed/token/table_loader.go b/frac/sealed/token/table_loader.go index 0750de62..cd04830b 100644 --- a/frac/sealed/token/table_loader.go +++ b/frac/sealed/token/table_loader.go @@ -17,15 +17,24 @@ const CacheKeyTable = 1 type TableLoader struct { fracName string - reader *storage.IndexReader - cache *cache.Cache[Table] - i uint32 - buf []byte + isLegacy bool + + reader *storage.IndexReader + cache *cache.Cache[Table] + + i uint32 + buf []byte } -func NewTableLoader(fracName string, reader *storage.IndexReader, c *cache.Cache[Table]) *TableLoader { +func NewTableLoader( + fracName string, + isLegacy bool, + reader *storage.IndexReader, + c *cache.Cache[Table], +) *TableLoader { return &TableLoader{ fracName: fracName, + isLegacy: isLegacy, reader: reader, cache: c, } @@ -33,10 +42,21 @@ func NewTableLoader(fracName string, reader *storage.IndexReader, c *cache.Cache func (l *TableLoader) Load() Table { table, err := l.cache.GetWithError(CacheKeyTable, func() (Table, int, error) { - blocks, err := l.loadBlocks() + var ( + blocks []TableBlock + err error + ) + + if l.isLegacy { + blocks, err = l.loadBlocksLegacy() + } else { + blocks, err = l.loadBlocks() + } + if err != nil { return nil, 0, err } + table := TableFromBlocks(blocks) return table, table.Size(), nil }) @@ -45,6 +65,7 @@ func (l *TableLoader) Load() Table { zap.String("frac", l.fracName), zap.Error(err)) } + return table } @@ -92,9 +113,11 @@ func (l *TableLoader) readBlock() ([]byte, error) { return block, err } -func (l *TableLoader) loadBlocks() ([]TableBlock, error) { - l.i = 0 - for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { // skip token blocks, go for token table +func (l *TableLoader) loadBlocksLegacy() ([]TableBlock, error) { + l.i = 1 // Skip info block immediately. + + for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { + // Skip token blocks, go for token table. } blocks := make([]TableBlock, 0) @@ -110,6 +133,34 @@ func (l *TableLoader) loadBlocks() ([]TableBlock, error) { return blocks, nil } +func (l *TableLoader) loadBlocks() ([]TableBlock, error) { + l.i = 0 + + blocksCount, err := l.reader.BlocksCount() + if err != nil { + return nil, err + } + + for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { + // Skip token blocks, go for token table. + } + + var blocks []TableBlock + for l.i < uint32(blocksCount) { + data, err := l.readBlock() + if err != nil { + return nil, err + } + + var tb TableBlock + tb.Unpack(data) + + blocks = append(blocks, tb) + } + + return blocks, nil +} + // TableBlock represents how token.Table is stored on disk type TableBlock struct { FieldsTables []FieldTable diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index 8f529139..893b75a4 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -158,7 +158,6 @@ func (l *LegacyLoader) loadLIDsTable() (*lids.Table, error) { // IndexReaders holds one IndexReader per split index file. type IndexReaders struct { - Info storage.IndexReader Token storage.IndexReader Offsets storage.IndexReader ID storage.IndexReader @@ -234,14 +233,19 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio IDsTotal: idsTotal, } - for blockIdx := uint32(0); ; { - header, err := r.GetBlockHeader(blockIdx) + blocksCount, err := r.BlocksCount() + if err != nil { + logger.Fatal( + "cannot get block count", + zap.Error(err), + ) + } + + for blockIdx := 0; blockIdx < blocksCount; blockIdx += 3 { + header, err := r.GetBlockHeader(uint32(blockIdx)) if err != nil { logger.Fatal("error reading id block header", zap.Error(err)) } - if header.Len() == 0 { // separator - break - } var mid seq.MID if fracVersion < config.BinaryDataV2 { @@ -256,7 +260,6 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio }) table.IDBlocksTotal++ - blockIdx += 3 // skip RIDs and Pos blocks } return table @@ -270,16 +273,20 @@ func (l *Loader) loadLIDsTable(r storage.IndexReader) (*lids.Table, error) { isContinued []bool ) - for blockIdx := uint32(0); ; blockIdx++ { - header, err := r.GetBlockHeader(blockIdx) + blocksCount, err := r.BlocksCount() + if err != nil { + logger.Fatal( + "cannot get block count", + zap.Error(err), + ) + } + + for blockIdx := 0; blockIdx < blocksCount; blockIdx++ { + header, err := r.GetBlockHeader(uint32(blockIdx)) if err != nil { return nil, err } - if header.Len() == 0 { - break - } - ext2 := header.GetExt2() maxTIDs = append(maxTIDs, uint32(ext2>>32)) minTIDs = append(minTIDs, uint32(ext2&0xFFFFFFFF)) diff --git a/fracmanager/cache_maintainer.go b/fracmanager/cache_maintainer.go index d04aa439..0229a06b 100644 --- a/fracmanager/cache_maintainer.go +++ b/fracmanager/cache_maintainer.go @@ -146,7 +146,6 @@ func (cm *CacheMaintainer) CreateIndexCache() *frac.IndexCache { LegacyRegistry: newCache[[]byte](cm, indexName), // Each index file gets its own registry cache (they all use key=1 internally). - InfoRegistry: newCache[[]byte](cm, indexName), TokenRegistry: newCache[[]byte](cm, indexName), OffsetsRegistry: newCache[[]byte](cm, indexName), IDRegistry: newCache[[]byte](cm, indexName), diff --git a/go.mod b/go.mod index 7d259333..7efc9e75 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 - github.com/johannesboyne/gofakes3 v0.0.0-20250916175020-ebf3e50324d3 + github.com/johannesboyne/gofakes3 v0.0.0-20260208201424-4c385a1f6a73 github.com/kkyr/fig v0.5.0 github.com/klauspost/compress v1.18.2 github.com/oklog/ulid/v2 v2.1.1 diff --git a/go.sum b/go.sum index 92b59e95..07cca964 100644 --- a/go.sum +++ b/go.sum @@ -173,8 +173,8 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw= -github.com/johannesboyne/gofakes3 v0.0.0-20250916175020-ebf3e50324d3 h1:2713fQZ560HxoNVgfJH41GKzjMjIG+DW4hH6nYXfXW8= -github.com/johannesboyne/gofakes3 v0.0.0-20250916175020-ebf3e50324d3/go.mod h1:S4S9jGBVlLri0OeqrSSbCGG5vsI6he06UJyuz1WT1EE= +github.com/johannesboyne/gofakes3 v0.0.0-20260208201424-4c385a1f6a73 h1:0xkWp+RMC2ImuKacheMHEAtrbOTMOa0kYkxyzM1Z/II= +github.com/johannesboyne/gofakes3 v0.0.0-20260208201424-4c385a1f6a73/go.mod h1:S4S9jGBVlLri0OeqrSSbCGG5vsI6he06UJyuz1WT1EE= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= diff --git a/storage/index_reader.go b/storage/index_reader.go index 60cf3641..c7dee18c 100644 --- a/storage/index_reader.go +++ b/storage/index_reader.go @@ -107,3 +107,15 @@ func (r *IndexReader) ReadIndexBlock(blockIndex uint32, dst []byte) ([]byte, uin return dst, uint64(n), err } + +func (r *IndexReader) BlocksCount() (int, error) { + registry, err := r.cache.GetWithError(1, func() ([]byte, int, error) { + data, err := r.readRegistry() + return data, cap(data), err + }) + if err != nil { + return 0, err + } + + return len(registry) / IndexBlockHeaderSize, nil +} diff --git a/storage/s3/reader.go b/storage/s3/reader.go index 76ab4f0f..4e161d34 100644 --- a/storage/s3/reader.go +++ b/storage/s3/reader.go @@ -15,9 +15,7 @@ import ( "github.com/ozontech/seq-db/storage" ) -var ( - _ storage.ImmutableFile = (*reader)(nil) -) +var _ storage.ImmutableFile = (*reader)(nil) // reader is a wrapper around S3 client that provides basic IO functions. // Be aware that [reader] is not thread-safe. @@ -64,7 +62,7 @@ func (r *reader) Read(p []byte) (int, error) { if b != expected { return 0, fmt.Errorf( - "s3: short copy occurred: written=%d but expected=%d", + "s3: short copy occurred: read=%d but expected=%d", b, expected, ) } @@ -159,7 +157,6 @@ func (r *reader) Stat() (os.FileInfo, error) { Bucket: aws.String(r.c.bucket), Key: aws.String(r.filename), }) - if err != nil { return nil, fmt.Errorf( "s3: cannot stat file=%q: %w", @@ -199,9 +196,7 @@ func (r *reader) getSize() (int64, error) { return size, nil } -var ( - _ os.FileInfo = (*fileStat)(nil) -) +var _ os.FileInfo = (*fileStat)(nil) type fileStat struct { name string