Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1834,11 +1834,11 @@ func (s *FractionTestSuite) TestFractionInfo() {
s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match")
case *Sealed:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700),
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
case *Remote:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1700),
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
default:
s.Require().Fail("unsupported fraction type")
Expand Down
2 changes: 1 addition & 1 deletion frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable),

idsTable: &f.blocksData.IDsTable,
idsProvider: seqids.NewProvider(
Expand Down
2 changes: 1 addition & 1 deletion frac/sealed.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider {
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable),

idsTable: &f.blocksData.IDsTable,
idsProvider: seqids.NewProvider(
Expand Down
20 changes: 0 additions & 20 deletions frac/sealed/sealing/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error {
return err
}

// Emit trailing separator.
if err := w.writeEmptyBlock(); err != nil {
return err
}

return w.finalize()
}

Expand Down Expand Up @@ -108,11 +103,6 @@ func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error {
}
}

// Emit trailing separator.
if err := w.writeEmptyBlock(); err != nil {
return err
}

return w.finalize()
}

Expand Down Expand Up @@ -165,11 +155,6 @@ func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator)
return err
}

// Emit trailing separator.
if err := w.writeEmptyBlock(); err != nil {
return err
}

return w.finalize()
}

Expand All @@ -184,11 +169,6 @@ func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.Field
return err
}

// Emit trailing separator.
if err := w.writeEmptyBlock(); err != nil {
return err
}

return w.finalize()
}

Expand Down
69 changes: 60 additions & 9 deletions frac/sealed/token/table_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,46 @@ const CacheKeyTable = 1

type TableLoader struct {
fracName string
reader *storage.IndexReader
cache *cache.Cache[Table]
i uint32
buf []byte
isLegacy bool

reader *storage.IndexReader
cache *cache.Cache[Table]

i uint32
buf []byte
}

func NewTableLoader(fracName string, reader *storage.IndexReader, c *cache.Cache[Table]) *TableLoader {
func NewTableLoader(
fracName string,
isLegacy bool,
reader *storage.IndexReader,
c *cache.Cache[Table],
) *TableLoader {
return &TableLoader{
fracName: fracName,
isLegacy: isLegacy,
reader: reader,
cache: c,
}
}

func (l *TableLoader) Load() Table {
table, err := l.cache.GetWithError(CacheKeyTable, func() (Table, int, error) {
blocks, err := l.loadBlocks()
var (
blocks []TableBlock
err error
)

if l.isLegacy {
blocks, err = l.loadBlocksLegacy()
} else {
blocks, err = l.loadBlocks()
}

if err != nil {
return nil, 0, err
}

table := TableFromBlocks(blocks)
return table, table.Size(), nil
})
Expand All @@ -45,6 +65,7 @@ func (l *TableLoader) Load() Table {
zap.String("frac", l.fracName),
zap.Error(err))
}

return table
}

Expand Down Expand Up @@ -92,9 +113,11 @@ func (l *TableLoader) readBlock() ([]byte, error) {
return block, err
}

func (l *TableLoader) loadBlocks() ([]TableBlock, error) {
l.i = 0
for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { // skip token blocks, go for token table
func (l *TableLoader) loadBlocksLegacy() ([]TableBlock, error) {
l.i = 1 // Skip info block immediately.

for h := l.readHeader(); h.Len() > 0; h = l.readHeader() {
// Skip token blocks, go for token table.
}

blocks := make([]TableBlock, 0)
Expand All @@ -110,6 +133,34 @@ func (l *TableLoader) loadBlocks() ([]TableBlock, error) {
return blocks, nil
}

func (l *TableLoader) loadBlocks() ([]TableBlock, error) {
l.i = 0

blocksCount, err := l.reader.BlocksCount()
if err != nil {
return nil, err
}

for h := l.readHeader(); h.Len() > 0; h = l.readHeader() {
// Skip token blocks, go for token table.
}

var blocks []TableBlock
for l.i < uint32(blocksCount) {
data, err := l.readBlock()
if err != nil {
return nil, err
}

var tb TableBlock
tb.Unpack(data)

blocks = append(blocks, tb)
}

return blocks, nil
}

// TableBlock represents how token.Table is stored on disk
type TableBlock struct {
FieldsTables []FieldTable
Expand Down
32 changes: 20 additions & 12 deletions frac/sealed_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,19 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio
IDsTotal: idsTotal,
}

for blockIdx := uint32(0); ; {
header, err := r.GetBlockHeader(blockIdx)
blocksCount, err := r.BlocksCount()
if err != nil {
logger.Fatal(
"cannot get block count",
zap.Error(err),
)
}

for blockIdx := 0; blockIdx < blocksCount; blockIdx += 3 {
header, err := r.GetBlockHeader(uint32(blockIdx))
if err != nil {
logger.Fatal("error reading id block header", zap.Error(err))
}
if header.Len() == 0 { // separator
break
}

var mid seq.MID
if fracVersion < config.BinaryDataV2 {
Expand All @@ -255,7 +260,6 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio
})

table.IDBlocksTotal++
blockIdx += 3 // skip RIDs and Pos blocks
}

return table
Expand All @@ -269,16 +273,20 @@ func (l *Loader) loadLIDsTable(r storage.IndexReader) (*lids.Table, error) {
isContinued []bool
)

for blockIdx := uint32(0); ; blockIdx++ {
header, err := r.GetBlockHeader(blockIdx)
blocksCount, err := r.BlocksCount()
if err != nil {
logger.Fatal(
"cannot get block count",
zap.Error(err),
)
}

for blockIdx := 0; blockIdx < blocksCount; blockIdx++ {
header, err := r.GetBlockHeader(uint32(blockIdx))
if err != nil {
return nil, err
}

if header.Len() == 0 {
break
}

ext2 := header.GetExt2()
maxTIDs = append(maxTIDs, uint32(ext2>>32))
minTIDs = append(minTIDs, uint32(ext2&0xFFFFFFFF))
Expand Down
12 changes: 12 additions & 0 deletions storage/index_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,15 @@ func (r *IndexReader) ReadIndexBlock(blockIndex uint32, dst []byte) ([]byte, uin

return dst, uint64(n), err
}

func (r *IndexReader) BlocksCount() (int, error) {
registry, err := r.cache.GetWithError(1, func() ([]byte, int, error) {
data, err := r.readRegistry()
return data, cap(data), err
})
if err != nil {
return 0, err
}

return len(registry) / IndexBlockHeaderSize, nil
}