From 764830f23c09fa30b14375a68bed4ca81c5218b0 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Mon, 4 May 2026 19:31:53 +0300 Subject: [PATCH 1/3] refactor: store info as plain json --- cmd/index_analyzer/main.go | 4 +- config/frac_version.go | 5 +- frac/common/info.go | 8 +-- frac/index_cache.go | 3 -- frac/remote.go | 87 ++++++++++++++++++--------------- frac/sealed.go | 26 +++++++--- frac/sealed/sealing/index.go | 22 ++------- frac/sealed_loader.go | 1 - fracmanager/cache_maintainer.go | 1 - storage/s3/reader.go | 11 ++--- 10 files changed, 85 insertions(+), 83 deletions(-) diff --git a/cmd/index_analyzer/main.go b/cmd/index_analyzer/main.go index 4ea8dd44..e2c1c349 100644 --- a/cmd/index_analyzer/main.go +++ b/cmd/index_analyzer/main.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "fmt" "hash/fnv" + "io" "os" "strings" "sync" @@ -117,13 +118,12 @@ func analyzeIndex( defer tokenFile.Close() defer lidFile.Close() - infoReader := storage.NewIndexReader(rl, infoFile.Name(), infoFile, indexCache.InfoRegistry) tokenReader := storage.NewIndexReader(rl, tokenFile.Name(), tokenFile, indexCache.TokenRegistry) lidReader := storage.NewIndexReader(rl, lidFile.Name(), lidFile, indexCache.LIDRegistry) // --- Info --- var blockIndex uint32 - infoData, _, err := infoReader.ReadIndexBlock(0, nil) + infoData, err := io.ReadAll(infoFile) if err != nil { logger.Fatal("error reading info block", zap.String("file", infoFile.Name()), zap.Error(err)) } diff --git a/config/frac_version.go b/config/frac_version.go index 7bdd84d0..ff1283b0 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -13,7 +13,10 @@ const ( BinaryDataV2 // BinariDataV3 - `.index` file is split across several files - // storing specific sections: `.info`, `.offsets`, `.tokens`, `.ids`, `.lids` + // storing specific sections: `.info`, `.offsets`, `.tokens`, `.ids`, `.lids`. + // + // Also in this version we've changed the binary layout of section storing + // info block. As a result we store info as a plain JSON without additional registry. BinaryDataV3 ) diff --git a/frac/common/info.go b/frac/common/info.go index 69121408..20e7f7c2 100644 --- a/frac/common/info.go +++ b/frac/common/info.go @@ -15,9 +15,11 @@ import ( "github.com/ozontech/seq-db/seq" ) -const DistributionMaxInterval = 24 * time.Hour -const DistributionBucket = time.Minute -const DistributionSpreadThreshold = 10 * time.Minute +const ( + DistributionMaxInterval = 24 * time.Hour + DistributionBucket = time.Minute + DistributionSpreadThreshold = 10 * time.Minute +) type Info struct { Path string `json:"name"` diff --git a/frac/index_cache.go b/frac/index_cache.go index 8076495e..043e8c5c 100644 --- a/frac/index_cache.go +++ b/frac/index_cache.go @@ -11,7 +11,6 @@ func newIndexCache() *IndexCache { return &IndexCache{ LegacyRegistry: cache.NewCache[[]byte](nil, nil), - InfoRegistry: cache.NewCache[[]byte](nil, nil), TokenRegistry: cache.NewCache[[]byte](nil, nil), OffsetsRegistry: cache.NewCache[[]byte](nil, nil), IDRegistry: cache.NewCache[[]byte](nil, nil), @@ -32,7 +31,6 @@ type IndexCache struct { LegacyRegistry *cache.Cache[[]byte] // Per-file registry caches (each IndexReader needs its own). - InfoRegistry *cache.Cache[[]byte] TokenRegistry *cache.Cache[[]byte] OffsetsRegistry *cache.Cache[[]byte] IDRegistry *cache.Cache[[]byte] @@ -52,7 +50,6 @@ type IndexCache struct { func (s *IndexCache) Release() { s.LegacyRegistry.Release() - s.InfoRegistry.Release() s.TokenRegistry.Release() s.OffsetsRegistry.Release() s.IDRegistry.Release() diff --git a/frac/remote.go b/frac/remote.go index 716a6f71..f5abde6e 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -55,7 +55,6 @@ type Remote struct { idFile storage.ImmutableFile lidFile storage.ImmutableFile - infoReader storage.IndexReader tokenReader storage.IndexReader offsetsReader storage.IndexReader idReader storage.IndexReader @@ -257,16 +256,16 @@ func (f *Remote) loadInfo() error { if err := f.openInfoLegacy(); err != nil { return err } - f.info = loadInfo(f.legacyReader) + f.info = loadInfoLegacy(f.legacyReader) return nil } if err := f.openInfo(); err != nil { return err } - f.info = loadInfo(f.infoReader) + f.info = loadInfo(f.infoFile) return nil } @@ -293,7 +292,6 @@ func (f *Remote) init() error { } (&Loader{}).Load(&f.blocksData, f.info, IndexReaders{ - Info: f.infoReader, Token: f.tokenReader, Offsets: f.offsetsReader, ID: f.idReader, @@ -323,13 +321,12 @@ func (f *Remote) openInfo() error { return nil } - return f.openRemoteFile(consts.InfoFileSuffix, func(file storage.ImmutableFile) { - f.infoFile = file - f.infoReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.InfoRegistry, - ) - }) + return f.openRemoteFile( + consts.InfoFileSuffix, + func(file storage.ImmutableFile) { + f.infoFile = file + }, + ) } func (f *Remote) openIndex() error { @@ -342,49 +339,61 @@ func (f *Remote) openIndex() error { } if f.tokenFile == nil { - if err := f.openRemoteFile(consts.TokenFileSuffix, func(file storage.ImmutableFile) { - f.tokenFile = file - f.tokenReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.TokenRegistry, - ) - }); err != nil { + if err := f.openRemoteFile( + consts.TokenFileSuffix, + func(file storage.ImmutableFile) { + f.tokenFile = file + f.tokenReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.TokenRegistry, + ) + }, + ); err != nil { return err } } if f.offsetsFile == nil { - if err := f.openRemoteFile(consts.OffsetsFileSuffix, func(file storage.ImmutableFile) { - f.offsetsFile = file - f.offsetsReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.OffsetsRegistry, - ) - }); err != nil { + if err := f.openRemoteFile( + consts.OffsetsFileSuffix, + func(file storage.ImmutableFile) { + f.offsetsFile = file + f.offsetsReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.OffsetsRegistry, + ) + }, + ); err != nil { return err } } if f.idFile == nil { - if err := f.openRemoteFile(consts.IDFileSuffix, func(file storage.ImmutableFile) { - f.idFile = file - f.idReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.IDRegistry, - ) - }); err != nil { + if err := f.openRemoteFile( + consts.IDFileSuffix, + func(file storage.ImmutableFile) { + f.idFile = file + f.idReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.IDRegistry, + ) + }, + ); err != nil { return err } } if f.lidFile == nil { - if err := f.openRemoteFile(consts.LIDFileSuffix, func(file storage.ImmutableFile) { - f.lidFile = file - f.lidReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.LIDRegistry, - ) - }); err != nil { + if err := f.openRemoteFile( + consts.LIDFileSuffix, + func(file storage.ImmutableFile) { + f.lidFile = file + f.lidReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.LIDRegistry, + ) + }, + ); err != nil { return err } } diff --git a/frac/sealed.go b/frac/sealed.go index 2c7fd1fc..9564b96d 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -3,6 +3,7 @@ package frac import ( "context" "errors" + "io" "os" "path/filepath" "sync" @@ -197,10 +198,6 @@ func (f *Sealed) openInfo() { } f.infoFile = file - f.infoReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.InfoRegistry, - ) } func (f *Sealed) openIndex() { @@ -285,12 +282,12 @@ func (f *Sealed) openDocs() { func (f *Sealed) loadInfo() { if f.IsLegacy { f.openInfoLegacy() - f.info = loadInfo(f.legacyReader) + f.info = loadInfoLegacy(f.legacyReader) return } f.openInfo() - f.info = loadInfo(f.infoReader) + f.info = loadInfo(f.infoFile) } func (f *Sealed) init(full bool) { @@ -311,7 +308,6 @@ func (f *Sealed) init(full bool) { } (&Loader{}).Load(&f.blocksData, f.info, IndexReaders{ - Info: f.infoReader, Token: f.tokenReader, Offsets: f.offsetsReader, ID: f.idReader, @@ -540,7 +536,7 @@ func (f *Sealed) IsIntersecting(from, to seq.MID) bool { return f.info.IsIntersecting(from, to) } -func loadInfo(infoReader storage.IndexReader) *common.Info { +func loadInfoLegacy(infoReader storage.IndexReader) *common.Info { block, _, err := infoReader.ReadIndexBlock(0, nil) if err != nil { logger.Fatal("error reading info block", zap.Error(err)) @@ -554,6 +550,20 @@ func loadInfo(infoReader storage.IndexReader) *common.Info { return bi.Info } +func loadInfo(r io.Reader) *common.Info { + block, err := io.ReadAll(r) + if err != nil { + logger.Fatal("error reading info block", zap.Error(err)) + } + + var bi sealed.BlockInfo + if err := bi.Unpack(block); err != nil { + logger.Fatal("error unpacking info block", zap.Error(err)) + } + + return bi.Info +} + // computeIndexOnDisk returns the total on-disk size of index files for a local fraction. func computeIndexOnDisk(basePath string, isLegacy bool) uint64 { suffixes := []string{ diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 576ff294..48a00a13 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -1,6 +1,7 @@ package sealing import ( + "fmt" "io" "github.com/ozontech/seq-db/consts" @@ -192,24 +193,11 @@ func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.Field return w.finalize() } -func (s *IndexSealer) WriteInfoFile(ws io.WriteSeeker, src Source) error { - w, err := newWriter(ws) - if err != nil { - return err - } - defer w.release() - +func (s *IndexSealer) WriteInfoFile(ws io.Writer, src Source) error { block := sealed.BlockInfo{Info: src.Info()} - if err := w.writeBlock(blockTypeInfo, s.packInfoBlock(block)); err != nil { - return err - } - - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - - return w.finalize() + fmt.Printf("s.packInfoBlock(block).payload: %s\n", s.packInfoBlock(block).payload) + _, err := ws.Write(s.packInfoBlock(block).payload) + return err } // collapseOrderedFieldsTables merges FieldTables with the same field name. diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index 8f529139..e5c99424 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -158,7 +158,6 @@ func (l *LegacyLoader) loadLIDsTable() (*lids.Table, error) { // IndexReaders holds one IndexReader per split index file. type IndexReaders struct { - Info storage.IndexReader Token storage.IndexReader Offsets storage.IndexReader ID storage.IndexReader diff --git a/fracmanager/cache_maintainer.go b/fracmanager/cache_maintainer.go index d04aa439..0229a06b 100644 --- a/fracmanager/cache_maintainer.go +++ b/fracmanager/cache_maintainer.go @@ -146,7 +146,6 @@ func (cm *CacheMaintainer) CreateIndexCache() *frac.IndexCache { LegacyRegistry: newCache[[]byte](cm, indexName), // Each index file gets its own registry cache (they all use key=1 internally). - InfoRegistry: newCache[[]byte](cm, indexName), TokenRegistry: newCache[[]byte](cm, indexName), OffsetsRegistry: newCache[[]byte](cm, indexName), IDRegistry: newCache[[]byte](cm, indexName), diff --git a/storage/s3/reader.go b/storage/s3/reader.go index 76ab4f0f..4e161d34 100644 --- a/storage/s3/reader.go +++ b/storage/s3/reader.go @@ -15,9 +15,7 @@ import ( "github.com/ozontech/seq-db/storage" ) -var ( - _ storage.ImmutableFile = (*reader)(nil) -) +var _ storage.ImmutableFile = (*reader)(nil) // reader is a wrapper around S3 client that provides basic IO functions. // Be aware that [reader] is not thread-safe. @@ -64,7 +62,7 @@ func (r *reader) Read(p []byte) (int, error) { if b != expected { return 0, fmt.Errorf( - "s3: short copy occurred: written=%d but expected=%d", + "s3: short copy occurred: read=%d but expected=%d", b, expected, ) } @@ -159,7 +157,6 @@ func (r *reader) Stat() (os.FileInfo, error) { Bucket: aws.String(r.c.bucket), Key: aws.String(r.filename), }) - if err != nil { return nil, fmt.Errorf( "s3: cannot stat file=%q: %w", @@ -199,9 +196,7 @@ func (r *reader) getSize() (int64, error) { return size, nil } -var ( - _ os.FileInfo = (*fileStat)(nil) -) +var _ os.FileInfo = (*fileStat)(nil) type fileStat struct { name string From 8a6e34a9bb4164212e6c131767bc8bbfe3be43d9 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Mon, 4 May 2026 21:32:00 +0300 Subject: [PATCH 2/3] refactor: store info as a plain json --- frac/sealed.go | 22 +++++++++++++++------- frac/sealed/sealing/index.go | 2 -- go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/frac/sealed.go b/frac/sealed.go index 9564b96d..ac5565c0 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -50,7 +50,6 @@ type Sealed struct { idFile *os.File lidFile *os.File - infoReader storage.IndexReader tokenReader storage.IndexReader offsetsReader storage.IndexReader idReader storage.IndexReader @@ -539,26 +538,35 @@ func (f *Sealed) IsIntersecting(from, to seq.MID) bool { func loadInfoLegacy(infoReader storage.IndexReader) *common.Info { block, _, err := infoReader.ReadIndexBlock(0, nil) if err != nil { - logger.Fatal("error reading info block", zap.Error(err)) + logger.Fatal("cannot read info block", zap.Error(err)) } var bi sealed.BlockInfo if err := bi.Unpack(block); err != nil { - logger.Fatal("error unpacking info block", zap.Error(err)) + logger.Fatal("cannot unpack info block", zap.Error(err)) } return bi.Info } -func loadInfo(r io.Reader) *common.Info { - block, err := io.ReadAll(r) +func loadInfo(r interface { + io.ReaderAt + Stat() (os.FileInfo, error) +}, +) *common.Info { + stat, err := r.Stat() if err != nil { - logger.Fatal("error reading info block", zap.Error(err)) + logger.Fatal("cannot stat info file", zap.Error(err)) + } + + block := make([]byte, stat.Size()) + if _, err := r.ReadAt(block, io.SeekStart); err != nil { + logger.Fatal("cannot read info block", zap.Error(err)) } var bi sealed.BlockInfo if err := bi.Unpack(block); err != nil { - logger.Fatal("error unpacking info block", zap.Error(err)) + logger.Fatal("cannot unpack info block", zap.Error(err)) } return bi.Info diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 48a00a13..19e330cc 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -1,7 +1,6 @@ package sealing import ( - "fmt" "io" "github.com/ozontech/seq-db/consts" @@ -195,7 +194,6 @@ func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.Field func (s *IndexSealer) WriteInfoFile(ws io.Writer, src Source) error { block := sealed.BlockInfo{Info: src.Info()} - fmt.Printf("s.packInfoBlock(block).payload: %s\n", s.packInfoBlock(block).payload) _, err := ws.Write(s.packInfoBlock(block).payload) return err } diff --git a/go.mod b/go.mod index 7d259333..7efc9e75 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 - github.com/johannesboyne/gofakes3 v0.0.0-20250916175020-ebf3e50324d3 + github.com/johannesboyne/gofakes3 v0.0.0-20260208201424-4c385a1f6a73 github.com/kkyr/fig v0.5.0 github.com/klauspost/compress v1.18.2 github.com/oklog/ulid/v2 v2.1.1 diff --git a/go.sum b/go.sum index 92b59e95..07cca964 100644 --- a/go.sum +++ b/go.sum @@ -173,8 +173,8 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw= -github.com/johannesboyne/gofakes3 v0.0.0-20250916175020-ebf3e50324d3 h1:2713fQZ560HxoNVgfJH41GKzjMjIG+DW4hH6nYXfXW8= -github.com/johannesboyne/gofakes3 v0.0.0-20250916175020-ebf3e50324d3/go.mod h1:S4S9jGBVlLri0OeqrSSbCGG5vsI6he06UJyuz1WT1EE= +github.com/johannesboyne/gofakes3 v0.0.0-20260208201424-4c385a1f6a73 h1:0xkWp+RMC2ImuKacheMHEAtrbOTMOa0kYkxyzM1Z/II= +github.com/johannesboyne/gofakes3 v0.0.0-20260208201424-4c385a1f6a73/go.mod h1:S4S9jGBVlLri0OeqrSSbCGG5vsI6he06UJyuz1WT1EE= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= From d2416115c6a74c0a4f717e322369b21936b21cf6 Mon Sep 17 00:00:00 2001 From: Daniil Date: Tue, 5 May 2026 11:38:00 +0300 Subject: [PATCH 3/3] feat: do not store trailing separators (#419) --- frac/fraction_test.go | 4 +- frac/remote.go | 2 +- frac/sealed.go | 2 +- frac/sealed/sealing/index.go | 20 --------- frac/sealed/token/table_loader.go | 69 +++++++++++++++++++++++++++---- frac/sealed_loader.go | 32 ++++++++------ storage/index_reader.go | 12 ++++++ 7 files changed, 96 insertions(+), 45 deletions(-) diff --git a/frac/fraction_test.go b/frac/fraction_test.go index bdf4e4e0..8757c0db 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1834,11 +1834,11 @@ func (s *FractionTestSuite) TestFractionInfo() { s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match") case *Sealed: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700), + s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) case *Remote: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700), + s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) default: s.Require().Fail("unsupported fraction type") diff --git a/frac/remote.go b/frac/remote.go index f5abde6e..f68e6986 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -191,7 +191,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), - tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( diff --git a/frac/sealed.go b/frac/sealed.go index ac5565c0..5cf7bfa3 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -507,7 +507,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), - tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 19e330cc..5c23842a 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -75,11 +75,6 @@ func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { return err } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } @@ -108,11 +103,6 @@ func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error { } } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } @@ -165,11 +155,6 @@ func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) return err } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } @@ -184,11 +169,6 @@ func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.Field return err } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } diff --git a/frac/sealed/token/table_loader.go b/frac/sealed/token/table_loader.go index 0750de62..cd04830b 100644 --- a/frac/sealed/token/table_loader.go +++ b/frac/sealed/token/table_loader.go @@ -17,15 +17,24 @@ const CacheKeyTable = 1 type TableLoader struct { fracName string - reader *storage.IndexReader - cache *cache.Cache[Table] - i uint32 - buf []byte + isLegacy bool + + reader *storage.IndexReader + cache *cache.Cache[Table] + + i uint32 + buf []byte } -func NewTableLoader(fracName string, reader *storage.IndexReader, c *cache.Cache[Table]) *TableLoader { +func NewTableLoader( + fracName string, + isLegacy bool, + reader *storage.IndexReader, + c *cache.Cache[Table], +) *TableLoader { return &TableLoader{ fracName: fracName, + isLegacy: isLegacy, reader: reader, cache: c, } @@ -33,10 +42,21 @@ func NewTableLoader(fracName string, reader *storage.IndexReader, c *cache.Cache func (l *TableLoader) Load() Table { table, err := l.cache.GetWithError(CacheKeyTable, func() (Table, int, error) { - blocks, err := l.loadBlocks() + var ( + blocks []TableBlock + err error + ) + + if l.isLegacy { + blocks, err = l.loadBlocksLegacy() + } else { + blocks, err = l.loadBlocks() + } + if err != nil { return nil, 0, err } + table := TableFromBlocks(blocks) return table, table.Size(), nil }) @@ -45,6 +65,7 @@ func (l *TableLoader) Load() Table { zap.String("frac", l.fracName), zap.Error(err)) } + return table } @@ -92,9 +113,11 @@ func (l *TableLoader) readBlock() ([]byte, error) { return block, err } -func (l *TableLoader) loadBlocks() ([]TableBlock, error) { - l.i = 0 - for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { // skip token blocks, go for token table +func (l *TableLoader) loadBlocksLegacy() ([]TableBlock, error) { + l.i = 1 // Skip info block immediately. + + for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { + // Skip token blocks, go for token table. } blocks := make([]TableBlock, 0) @@ -110,6 +133,34 @@ func (l *TableLoader) loadBlocks() ([]TableBlock, error) { return blocks, nil } +func (l *TableLoader) loadBlocks() ([]TableBlock, error) { + l.i = 0 + + blocksCount, err := l.reader.BlocksCount() + if err != nil { + return nil, err + } + + for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { + // Skip token blocks, go for token table. + } + + var blocks []TableBlock + for l.i < uint32(blocksCount) { + data, err := l.readBlock() + if err != nil { + return nil, err + } + + var tb TableBlock + tb.Unpack(data) + + blocks = append(blocks, tb) + } + + return blocks, nil +} + // TableBlock represents how token.Table is stored on disk type TableBlock struct { FieldsTables []FieldTable diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index e5c99424..893b75a4 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -233,14 +233,19 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio IDsTotal: idsTotal, } - for blockIdx := uint32(0); ; { - header, err := r.GetBlockHeader(blockIdx) + blocksCount, err := r.BlocksCount() + if err != nil { + logger.Fatal( + "cannot get block count", + zap.Error(err), + ) + } + + for blockIdx := 0; blockIdx < blocksCount; blockIdx += 3 { + header, err := r.GetBlockHeader(uint32(blockIdx)) if err != nil { logger.Fatal("error reading id block header", zap.Error(err)) } - if header.Len() == 0 { // separator - break - } var mid seq.MID if fracVersion < config.BinaryDataV2 { @@ -255,7 +260,6 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio }) table.IDBlocksTotal++ - blockIdx += 3 // skip RIDs and Pos blocks } return table @@ -269,16 +273,20 @@ func (l *Loader) loadLIDsTable(r storage.IndexReader) (*lids.Table, error) { isContinued []bool ) - for blockIdx := uint32(0); ; blockIdx++ { - header, err := r.GetBlockHeader(blockIdx) + blocksCount, err := r.BlocksCount() + if err != nil { + logger.Fatal( + "cannot get block count", + zap.Error(err), + ) + } + + for blockIdx := 0; blockIdx < blocksCount; blockIdx++ { + header, err := r.GetBlockHeader(uint32(blockIdx)) if err != nil { return nil, err } - if header.Len() == 0 { - break - } - ext2 := header.GetExt2() maxTIDs = append(maxTIDs, uint32(ext2>>32)) minTIDs = append(minTIDs, uint32(ext2&0xFFFFFFFF)) diff --git a/storage/index_reader.go b/storage/index_reader.go index 60cf3641..c7dee18c 100644 --- a/storage/index_reader.go +++ b/storage/index_reader.go @@ -107,3 +107,15 @@ func (r *IndexReader) ReadIndexBlock(blockIndex uint32, dst []byte) ([]byte, uin return dst, uint64(n), err } + +func (r *IndexReader) BlocksCount() (int, error) { + registry, err := r.cache.GetWithError(1, func() ([]byte, int, error) { + data, err := r.readRegistry() + return data, cap(data), err + }) + if err != nil { + return 0, err + } + + return len(registry) / IndexBlockHeaderSize, nil +}