diff --git a/frac/fraction_test.go b/frac/fraction_test.go index bdf4e4e0..8757c0db 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1834,11 +1834,11 @@ func (s *FractionTestSuite) TestFractionInfo() { s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match") case *Sealed: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700), + s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) case *Remote: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700), + s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) default: s.Require().Fail("unsupported fraction type") diff --git a/frac/remote.go b/frac/remote.go index f5abde6e..f68e6986 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -191,7 +191,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), - tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( diff --git a/frac/sealed.go b/frac/sealed.go index ac5565c0..5cf7bfa3 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -507,7 +507,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), - tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 19e330cc..5c23842a 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -75,11 +75,6 @@ func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { return err } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } @@ -108,11 +103,6 @@ func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error { } } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } @@ -165,11 +155,6 @@ func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) return err } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } @@ -184,11 +169,6 @@ func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.Field return err } - // Emit trailing separator. - if err := w.writeEmptyBlock(); err != nil { - return err - } - return w.finalize() } diff --git a/frac/sealed/token/table_loader.go b/frac/sealed/token/table_loader.go index 0750de62..cd04830b 100644 --- a/frac/sealed/token/table_loader.go +++ b/frac/sealed/token/table_loader.go @@ -17,15 +17,24 @@ const CacheKeyTable = 1 type TableLoader struct { fracName string - reader *storage.IndexReader - cache *cache.Cache[Table] - i uint32 - buf []byte + isLegacy bool + + reader *storage.IndexReader + cache *cache.Cache[Table] + + i uint32 + buf []byte } -func NewTableLoader(fracName string, reader *storage.IndexReader, c *cache.Cache[Table]) *TableLoader { +func NewTableLoader( + fracName string, + isLegacy bool, + reader *storage.IndexReader, + c *cache.Cache[Table], +) *TableLoader { return &TableLoader{ fracName: fracName, + isLegacy: isLegacy, reader: reader, cache: c, } @@ -33,10 +42,21 @@ func NewTableLoader(fracName string, reader *storage.IndexReader, c *cache.Cache func (l *TableLoader) Load() Table { table, err := l.cache.GetWithError(CacheKeyTable, func() (Table, int, error) { - blocks, err := l.loadBlocks() + var ( + blocks []TableBlock + err error + ) + + if l.isLegacy { + blocks, err = l.loadBlocksLegacy() + } else { + blocks, err = l.loadBlocks() + } + if err != nil { return nil, 0, err } + table := TableFromBlocks(blocks) return table, table.Size(), nil }) @@ -45,6 +65,7 @@ func (l *TableLoader) Load() Table { zap.String("frac", l.fracName), zap.Error(err)) } + return table } @@ -92,9 +113,11 @@ func (l *TableLoader) readBlock() ([]byte, error) { return block, err } -func (l *TableLoader) loadBlocks() ([]TableBlock, error) { - l.i = 0 - for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { // skip token blocks, go for token table +func (l *TableLoader) loadBlocksLegacy() ([]TableBlock, error) { + l.i = 1 // Skip info block immediately. + + for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { + // Skip token blocks, go for token table. } blocks := make([]TableBlock, 0) @@ -110,6 +133,34 @@ func (l *TableLoader) loadBlocks() ([]TableBlock, error) { return blocks, nil } +func (l *TableLoader) loadBlocks() ([]TableBlock, error) { + l.i = 0 + + blocksCount, err := l.reader.BlocksCount() + if err != nil { + return nil, err + } + + for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { + // Skip token blocks, go for token table. + } + + var blocks []TableBlock + for l.i < uint32(blocksCount) { + data, err := l.readBlock() + if err != nil { + return nil, err + } + + var tb TableBlock + tb.Unpack(data) + + blocks = append(blocks, tb) + } + + return blocks, nil +} + // TableBlock represents how token.Table is stored on disk type TableBlock struct { FieldsTables []FieldTable diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index e5c99424..893b75a4 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -233,14 +233,19 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio IDsTotal: idsTotal, } - for blockIdx := uint32(0); ; { - header, err := r.GetBlockHeader(blockIdx) + blocksCount, err := r.BlocksCount() + if err != nil { + logger.Fatal( + "cannot get block count", + zap.Error(err), + ) + } + + for blockIdx := 0; blockIdx < blocksCount; blockIdx += 3 { + header, err := r.GetBlockHeader(uint32(blockIdx)) if err != nil { logger.Fatal("error reading id block header", zap.Error(err)) } - if header.Len() == 0 { // separator - break - } var mid seq.MID if fracVersion < config.BinaryDataV2 { @@ -255,7 +260,6 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio }) table.IDBlocksTotal++ - blockIdx += 3 // skip RIDs and Pos blocks } return table @@ -269,16 +273,20 @@ func (l *Loader) loadLIDsTable(r storage.IndexReader) (*lids.Table, error) { isContinued []bool ) - for blockIdx := uint32(0); ; blockIdx++ { - header, err := r.GetBlockHeader(blockIdx) + blocksCount, err := r.BlocksCount() + if err != nil { + logger.Fatal( + "cannot get block count", + zap.Error(err), + ) + } + + for blockIdx := 0; blockIdx < blocksCount; blockIdx++ { + header, err := r.GetBlockHeader(uint32(blockIdx)) if err != nil { return nil, err } - if header.Len() == 0 { - break - } - ext2 := header.GetExt2() maxTIDs = append(maxTIDs, uint32(ext2>>32)) minTIDs = append(minTIDs, uint32(ext2&0xFFFFFFFF)) diff --git a/storage/index_reader.go b/storage/index_reader.go index 60cf3641..c7dee18c 100644 --- a/storage/index_reader.go +++ b/storage/index_reader.go @@ -107,3 +107,15 @@ func (r *IndexReader) ReadIndexBlock(blockIndex uint32, dst []byte) ([]byte, uin return dst, uint64(n), err } + +func (r *IndexReader) BlocksCount() (int, error) { + registry, err := r.cache.GetWithError(1, func() ([]byte, int, error) { + data, err := r.readRegistry() + return data, cap(data), err + }) + if err != nil { + return 0, err + } + + return len(registry) / IndexBlockHeaderSize, nil +}