Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions common/batch/batch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,13 +752,11 @@ func (bc *BatchCache) SealBatch(sequencerSets []byte, blockTimestamp uint64, rep
copy(batchHeaderCopy, batchHeader)
bc.sealedBatchHeaders[batchIndex] = &batchHeaderCopy

err = bc.batchStorage.StoreSealedBatch(batchIndex, sealedBatch)
// Persist batch data, header and indices in one atomic write so the stored
// snapshot can never be partially updated.
err = bc.batchStorage.StoreSealedBatchAndHeader(batchIndex, sealedBatch, &batchHeaderCopy)
if err != nil {
log.Error("failed to store sealed batch", "err", err)
}
err = bc.batchStorage.StoreSealedBatchHeader(batchIndex, &batchHeaderCopy)
if err != nil {
log.Error("failed to store sealed batch header", "err", err)
log.Error("failed to store sealed batch and header", "batch_index", batchIndex, "err", err)
}
Comment on lines +755 to 760

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Propagate sealed-batch persistence failures instead of logging and continuing.

SealBatch currently returns success even when StoreSealedBatchAndHeader fails. That leaves callers believing the batch is durable while persistence has failed.

Proposed fix
-	err = bc.batchStorage.StoreSealedBatchAndHeader(batchIndex, sealedBatch, &batchHeaderCopy)
-	if err != nil {
-		log.Error("failed to store sealed batch and header", "batch_index", batchIndex, "err", err)
-	}
+	if err := bc.batchStorage.StoreSealedBatchAndHeader(batchIndex, sealedBatch, &batchHeaderCopy); err != nil {
+		delete(bc.sealedBatches, batchIndex)
+		delete(bc.sealedBatchHeaders, batchIndex)
+		return 0, BatchHeaderBytes{}, false, fmt.Errorf("failed to store sealed batch and header for batch %d: %w", batchIndex, err)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Persist batch data, header and indices in one atomic write so the stored
// snapshot can never be partially updated.
err = bc.batchStorage.StoreSealedBatchAndHeader(batchIndex, sealedBatch, &batchHeaderCopy)
if err != nil {
log.Error("failed to store sealed batch", "err", err)
}
err = bc.batchStorage.StoreSealedBatchHeader(batchIndex, &batchHeaderCopy)
if err != nil {
log.Error("failed to store sealed batch header", "err", err)
log.Error("failed to store sealed batch and header", "batch_index", batchIndex, "err", err)
}
// Persist batch data, header and indices in one atomic write so the stored
// snapshot can never be partially updated.
if err := bc.batchStorage.StoreSealedBatchAndHeader(batchIndex, sealedBatch, &batchHeaderCopy); err != nil {
delete(bc.sealedBatches, batchIndex)
delete(bc.sealedBatchHeaders, batchIndex)
return 0, BatchHeaderBytes{}, false, fmt.Errorf("failed to store sealed batch and header for batch %d: %w", batchIndex, err)
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@common/batch/batch_cache.go` around lines 755 - 760, SealBatch currently
swallows errors from bc.batchStorage.StoreSealedBatchAndHeader and logs them
instead of returning failure; change SealBatch so that after calling
batchStorage.StoreSealedBatchAndHeader(batchIndex, sealedBatch,
&batchHeaderCopy) it returns the encountered error (or a wrapped error with
context) instead of only logging, ensuring the SealBatch function signature and
all callers propagate/handle the error; reference
bc.batchStorage.StoreSealedBatchAndHeader and the SealBatch function when making
the change so persistence failures are propagated to callers.

// Update parent batch information for next batch
bc.parentBatchHeader = &batchHeaderCopy
Expand Down Expand Up @@ -1258,6 +1256,28 @@ func (bc *BatchCache) Delete(batchIndex uint64) error {
return nil
}

// DeleteUntil removes every sealed batch and header with index <= maxIndex from
// both the in-memory maps and persistent storage. Finalize cleanup must use this
// range-based form: the finalize target can jump (multiple submitters, several
// batches finalized at once), so deleting a single index leaves stale lower
// indices behind and punches holes into the persisted indices snapshot, which
// breaks the contiguity assumption of the startup load path.
func (bc *BatchCache) DeleteUntil(maxIndex uint64) error {
bc.mu.Lock()
defer bc.mu.Unlock()
for idx := range bc.sealedBatches {
if idx <= maxIndex {
delete(bc.sealedBatches, idx)
}
}
for idx := range bc.sealedBatchHeaders {
if idx <= maxIndex {
delete(bc.sealedBatchHeaders, idx)
}
}
return bc.batchStorage.DeleteSealedBatchesUpTo(maxIndex)
}

// logSealedBatch logs the details of the sealed batch for debugging purposes.
func (bc *BatchCache) logSealedBatch(batchHeader BatchHeaderBytes, batchHash common.Hash, blockCount uint16, blobCount int) {
version, err := batchHeader.Version()
Expand Down
216 changes: 147 additions & 69 deletions common/batch/batch_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"sort"
"sync"

"github.com/morph-l2/go-ethereum/eth"
Expand Down Expand Up @@ -34,7 +35,9 @@ func NewBatchStorage(db SealedBatchKV) *BatchStorage {
}

// StoreSealedBatch stores a single sealed batch to LevelDB
// Uses JSON encoding for serialization
// Uses JSON encoding for serialization.
// Batch data and the indices snapshot are written in one atomic batch so they
// can never get out of sync; indices update failures are no longer swallowed.
func (s *BatchStorage) StoreSealedBatch(batchIndex uint64, batch *eth.RPCRollupBatch) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -45,18 +48,45 @@ func (s *BatchStorage) StoreSealedBatch(batchIndex uint64, batch *eth.RPCRollupB
return fmt.Errorf("failed to marshal sealed batch %d: %w", batchIndex, err)
}

// Store batch data
key := encodeBatchKey(batchIndex)
if err := s.db.PutBytes(key, encoded); err != nil {
encodedIndices, err := s.indicesSnapshotWith(batchIndex)
if err != nil {
return fmt.Errorf("failed to update batch indices for batch %d: %w", batchIndex, err)
}

puts := []KVPair{
{Key: encodeBatchKey(batchIndex), Value: encoded},
{Key: []byte(SealedBatchIndicesKey), Value: encodedIndices},
}
if err := s.db.WriteBatch(puts, nil); err != nil {
return fmt.Errorf("failed to store sealed batch %d: %w", batchIndex, err)
}
return nil
}

// Update indices list
if err = s.updateBatchIndices(batchIndex, true); err != nil {
log.Warn("Failed to update batch indices", "batch_index", batchIndex, "error", err)
// Don't fail the operation if indices update fails
// StoreSealedBatchAndHeader stores the sealed batch, its header and the updated
// indices snapshot in a single atomic write.
func (s *BatchStorage) StoreSealedBatchAndHeader(batchIndex uint64, batch *eth.RPCRollupBatch, header *BatchHeaderBytes) error {
s.mu.Lock()
defer s.mu.Unlock()

encoded, err := json.Marshal(batch)
if err != nil {
return fmt.Errorf("failed to marshal sealed batch %d: %w", batchIndex, err)
}

encodedIndices, err := s.indicesSnapshotWith(batchIndex)
if err != nil {
return fmt.Errorf("failed to update batch indices for batch %d: %w", batchIndex, err)
}

puts := []KVPair{
{Key: encodeBatchKey(batchIndex), Value: encoded},
{Key: encodeBatchHeaderKey(batchIndex), Value: header.Bytes()},
{Key: []byte(SealedBatchIndicesKey), Value: encodedIndices},
}
if err := s.db.WriteBatch(puts, nil); err != nil {
return fmt.Errorf("failed to store sealed batch and header %d: %w", batchIndex, err)
}
return nil
}

Expand Down Expand Up @@ -132,12 +162,26 @@ func (s *BatchStorage) LoadAllSealedBatchesAndHeader() (map[uint64]*eth.RPCRollu
for i, idx := range indices {
batch, err := s.LoadSealedBatch(idx)
if err != nil {
log.Warn("Failed to load sealed batch, skipping",
log.Warn("Failed to load sealed batch, aborting",
"batch_index", idx, "error", err)
return nil, nil, nil, fmt.Errorf("failed to load batch: %w", err)
}
if i > 0 {
parentBatch := batches[idx-1]
// indices is sorted ascending; a hole means some middle batch was
// deleted (e.g. by a finalize confirmed while other submitters
// advanced the finalize index). Return an error so the caller can
// self-heal instead of dereferencing a missing parent batch.
prevIdx := indices[i-1]
if idx != prevIdx+1 {
log.Error("Sealed batch indices are not contiguous",
"prev_index", prevIdx, "batch_index", idx)
return nil, nil, nil, fmt.Errorf("sealed batch indices not contiguous: %d -> %d", prevIdx, idx)
}
parentBatch := batches[prevIdx]
if parentBatch == nil {
log.Error("Parent batch missing", "parent_index", prevIdx, "batch_index", idx)
return nil, nil, nil, fmt.Errorf("parent batch %d missing for batch %d", prevIdx, idx)
}
parentBatchHash, err := BatchHeaderBytes(batch.ParentBatchHeader).Hash()
if err != nil {
log.Error("Failed to load parent batch header", "batch_index", idx, "error", err)
Expand Down Expand Up @@ -179,31 +223,36 @@ func (s *BatchStorage) LoadAllSealedBatchesAndHeader() (map[uint64]*eth.RPCRollu
return batches, headers, indices, nil
}

// DeleteSealedBatch removes a sealed batch from LevelDB
// DeleteSealedBatch removes a sealed batch (data + header) from LevelDB.
// Data, header and the indices snapshot are removed in one atomic write;
// indices update failures are no longer swallowed.
func (s *BatchStorage) DeleteSealedBatch(batchIndex uint64) error {
s.mu.Lock()
defer s.mu.Unlock()

key := encodeBatchKey(batchIndex)
if err := s.db.Delete(key); err != nil {
return fmt.Errorf("failed to delete sealed batch %d: %w", batchIndex, err)
encodedIndices, err := s.indicesSnapshotWithout(batchIndex)
if err != nil {
return fmt.Errorf("failed to update batch indices for batch %d: %w", batchIndex, err)
}

// Update indices list
if err := s.updateBatchIndices(batchIndex, false); err != nil {
log.Warn("Failed to update batch indices after deletion",
"batch_index", batchIndex, "error", err)
// Don't fail the operation if indices update fails
puts := []KVPair{{Key: []byte(SealedBatchIndicesKey), Value: encodedIndices}}
deletes := [][]byte{encodeBatchKey(batchIndex), encodeBatchHeaderKey(batchIndex)}
if err := s.db.WriteBatch(puts, deletes); err != nil {
return fmt.Errorf("failed to delete sealed batch %d: %w", batchIndex, err)
}

return nil
}

func (s *BatchStorage) DeleteAllSealedBatches() error {
s.mu.RLock()
// Load batch indices
// DeleteSealedBatchesUpTo removes every sealed batch (data + header) with
// index <= maxIndex in a single atomic write. Range-based cleanup keeps the
// surviving indices a contiguous window: single-index deletes punch holes when
// the finalize target jumps (multiple submitters, finalizing several batches at
// once), and such holes crash the startup load path.
func (s *BatchStorage) DeleteSealedBatchesUpTo(maxIndex uint64) error {
s.mu.Lock()
defer s.mu.Unlock()

indices, err := s.loadBatchIndices()
s.mu.RUnlock()
if err != nil {
if isKVNotFound(err) {
// No batches stored yet
Expand All @@ -212,21 +261,51 @@ func (s *BatchStorage) DeleteAllSealedBatches() error {
return fmt.Errorf("failed to load batch indices: %w", err)
}

kept := make([]uint64, 0, len(indices))
var deletes [][]byte
for _, idx := range indices {
err = s.DeleteSealedBatch(idx)
if err != nil {
log.Error("Failed to delete sealed batch",
"batch_index", idx, "error", err)
return err
if idx <= maxIndex {
deletes = append(deletes, encodeBatchKey(idx), encodeBatchHeaderKey(idx))
} else {
kept = append(kept, idx)
}
err = s.DeleteSealedBatchHeader(idx)
if err != nil {
log.Error("Failed to delete sealed batch header",
"batch_index", idx, "error", err)
return err
}
if len(deletes) == 0 {
return nil
}

encodedIndices, err := json.Marshal(kept)
if err != nil {
return fmt.Errorf("failed to marshal batch indices: %w", err)
}
puts := []KVPair{{Key: []byte(SealedBatchIndicesKey), Value: encodedIndices}}
if err := s.db.WriteBatch(puts, deletes); err != nil {
return fmt.Errorf("failed to delete sealed batches up to %d: %w", maxIndex, err)
}
return nil
}

func (s *BatchStorage) DeleteAllSealedBatches() error {
s.mu.Lock()
defer s.mu.Unlock()

indices, err := s.loadBatchIndices()
if err != nil {
if isKVNotFound(err) {
// No batches stored yet
return nil
}
return fmt.Errorf("failed to load batch indices: %w", err)
}

deletes := make([][]byte, 0, len(indices)*2+1)
for _, idx := range indices {
deletes = append(deletes, encodeBatchKey(idx), encodeBatchHeaderKey(idx))
}
deletes = append(deletes, []byte(SealedBatchIndicesKey))
if err := s.db.WriteBatch(nil, deletes); err != nil {
return fmt.Errorf("failed to delete all sealed batches: %w", err)
}
return nil
}

Expand All @@ -238,45 +317,52 @@ func encodeBatchKey(batchIndex uint64) []byte {
return key
}

// updateBatchIndices updates the list of stored batch indices
// add: true to add index, false to remove
func (s *BatchStorage) updateBatchIndices(batchIndex uint64, add bool) error {
// indicesSnapshotWith returns the marshaled indices snapshot with batchIndex added.
func (s *BatchStorage) indicesSnapshotWith(batchIndex uint64) ([]byte, error) {
indices, err := s.loadBatchIndices()
if err != nil {
if isKVNotFound(err) {
indices = []uint64{}
} else {
return err
return nil, err
}
}

if add {
// Add index if not already present
found := false
for _, idx := range indices {
if idx == batchIndex {
found = true
break
}
}
if !found {
indices = append(indices, batchIndex)
found := false
for _, idx := range indices {
if idx == batchIndex {
found = true
break
}
} else {
// Remove index
newIndices := make([]uint64, 0, len(indices))
for _, idx := range indices {
if idx != batchIndex {
newIndices = append(newIndices, idx)
}
}
if !found {
indices = append(indices, batchIndex)
sort.Slice(indices, func(i, j int) bool { return indices[i] < indices[j] })
}
return json.Marshal(indices)
}

// indicesSnapshotWithout returns the marshaled indices snapshot with batchIndex removed.
func (s *BatchStorage) indicesSnapshotWithout(batchIndex uint64) ([]byte, error) {
indices, err := s.loadBatchIndices()
if err != nil {
if isKVNotFound(err) {
indices = []uint64{}
} else {
return nil, err
}
indices = newIndices
}

return s.saveBatchIndices(indices)
newIndices := make([]uint64, 0, len(indices))
for _, idx := range indices {
if idx != batchIndex {
newIndices = append(newIndices, idx)
}
}
return json.Marshal(newIndices)
}

// loadBatchIndices loads the list of stored batch indices
// loadBatchIndices loads the list of stored batch indices, sorted ascending.
func (s *BatchStorage) loadBatchIndices() ([]uint64, error) {
encoded, err := s.db.GetBytes([]byte(SealedBatchIndicesKey))
if err != nil {
Expand All @@ -288,19 +374,11 @@ func (s *BatchStorage) loadBatchIndices() ([]uint64, error) {
return nil, fmt.Errorf("failed to unmarshal batch indices: %w", err)
}

// Keep ordering deterministic regardless of how the snapshot was produced.
sort.Slice(indices, func(i, j int) bool { return indices[i] < indices[j] })
return indices, nil
}

// saveBatchIndices saves the list of batch indices
func (s *BatchStorage) saveBatchIndices(indices []uint64) error {
encoded, err := json.Marshal(indices)
if err != nil {
return fmt.Errorf("failed to marshal batch indices: %w", err)
}

return s.db.PutBytes([]byte(SealedBatchIndicesKey), encoded)
}

// StoreSealedBatchHeader stores a single sealed batch header to LevelDB
func (s *BatchStorage) StoreSealedBatchHeader(batchIndex uint64, header *BatchHeaderBytes) error {
s.mu.Lock()
Expand Down
Loading
Loading