2022-10-28 11:31:15 -07:00
|
|
|
//go:build rocksdb
|
|
|
|
|
|
|
|
package blockstore
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"sort"
|
|
|
|
|
|
|
|
"github.com/linxGnu/grocksdb"
|
2022-10-29 04:50:04 -07:00
|
|
|
"go.firedancer.io/radiance/pkg/shred"
|
2022-10-28 11:31:15 -07:00
|
|
|
"k8s.io/klog/v2"
|
|
|
|
)
|
|
|
|
|
|
|
|
// BlockWalk walks blocks in ascending order over multiple RocksDB databases.
|
|
|
|
type BlockWalk struct {
|
|
|
|
handles []WalkHandle // sorted
|
|
|
|
|
|
|
|
root *grocksdb.Iterator
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewBlockWalk(handles []WalkHandle) (*BlockWalk, error) {
|
|
|
|
if err := sortWalkHandles(handles); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return &BlockWalk{handles: handles}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Seek skips ahead to a specific slot.
|
|
|
|
// The caller must call BlockWalk.Next after Seek.
|
|
|
|
func (m *BlockWalk) Seek(slot uint64) bool {
|
|
|
|
for len(m.handles) > 0 {
|
|
|
|
h := m.handles[0]
|
|
|
|
if slot < h.Start {
|
|
|
|
// trying to Seek to slot below lowest available
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
if slot <= h.Stop {
|
|
|
|
h.Start = slot
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
m.pop()
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
// SlotsAvailable returns the number of contiguous slots that lay ahead.
|
|
|
|
func (m *BlockWalk) SlotsAvailable() (total uint64) {
|
|
|
|
if len(m.handles) == 0 {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
start := m.handles[0].Start
|
|
|
|
for _, h := range m.handles {
|
|
|
|
if h.Start > start {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
stop := h.Stop + 1
|
|
|
|
total += stop - start
|
|
|
|
start = stop
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next seeks to the next slot.
|
|
|
|
func (m *BlockWalk) Next() (meta *SlotMeta, ok bool) {
|
|
|
|
if len(m.handles) == 0 {
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
h := m.handles[0]
|
|
|
|
if m.root == nil {
|
|
|
|
// Open Next database
|
|
|
|
m.root = h.DB.DB.NewIteratorCF(grocksdb.NewDefaultReadOptions(), h.DB.CfRoot)
|
|
|
|
key := MakeSlotKey(h.Start)
|
|
|
|
m.root.Seek(key[:])
|
|
|
|
}
|
|
|
|
if !m.root.Valid() {
|
|
|
|
// Close current DB and go to Next
|
|
|
|
m.pop()
|
|
|
|
return m.Next() // TODO tail recursion optimization?
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get key at current position.
|
|
|
|
slot, ok := ParseSlotKey(m.root.Key().Data())
|
|
|
|
if !ok {
|
|
|
|
klog.Exitf("Invalid slot key: %x", m.root.Key().Data())
|
|
|
|
}
|
|
|
|
if slot > h.Stop {
|
|
|
|
m.pop()
|
|
|
|
return m.Next()
|
|
|
|
}
|
|
|
|
h.Start = slot
|
|
|
|
|
|
|
|
// Get value at current position.
|
|
|
|
var err error
|
|
|
|
meta, err = h.DB.GetSlotMeta(slot)
|
|
|
|
if err != nil {
|
|
|
|
// Invalid slot metas are irrecoverable.
|
|
|
|
// The CAR generation process must stop here.
|
|
|
|
klog.Errorf("FATAL: invalid slot meta at slot %d, aborting CAR generation: %s", slot, err)
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
|
|
|
|
// Seek iterator to Next entry.
|
|
|
|
m.root.Next()
|
|
|
|
|
|
|
|
return meta, true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Entries returns the entries at the current cursor.
|
|
|
|
// Caller must have made an ok call to BlockWalk.Next before calling this.
|
2022-10-29 04:50:04 -07:00
|
|
|
func (m *BlockWalk) Entries(meta *SlotMeta) ([][]shred.Entry, error) {
|
2022-10-28 11:31:15 -07:00
|
|
|
h := m.handles[0]
|
2022-10-29 04:50:04 -07:00
|
|
|
mapping, err := h.DB.GetEntries(meta)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
batches := make([][]shred.Entry, len(mapping))
|
|
|
|
for i, batch := range mapping {
|
|
|
|
batches[i] = batch.Entries
|
|
|
|
}
|
|
|
|
return batches, nil
|
2022-10-28 11:31:15 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// pop closes the current open DB.
|
|
|
|
func (m *BlockWalk) pop() {
|
|
|
|
m.root.Close()
|
|
|
|
m.root = nil
|
|
|
|
m.handles[0].DB.Close()
|
|
|
|
m.handles = m.handles[1:]
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BlockWalk) Close() {
|
|
|
|
if m.root != nil {
|
|
|
|
m.root.Close()
|
|
|
|
m.root = nil
|
|
|
|
}
|
|
|
|
for _, h := range m.handles {
|
|
|
|
h.DB.Close()
|
|
|
|
}
|
|
|
|
m.handles = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type WalkHandle struct {
|
|
|
|
DB *DB
|
|
|
|
Start uint64
|
|
|
|
Stop uint64 // inclusive
|
|
|
|
}
|
|
|
|
|
|
|
|
// sortWalkHandles detects bounds of each DB and sorts handles.
|
|
|
|
func sortWalkHandles(h []WalkHandle) error {
|
|
|
|
for i, db := range h {
|
|
|
|
// Find lowest and highest available slot in DB.
|
|
|
|
start, err := getLowestCompletedSlot(db.DB)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
stop, err := db.DB.MaxRoot()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
h[i] = WalkHandle{
|
|
|
|
Start: start,
|
|
|
|
Stop: stop,
|
|
|
|
DB: db.DB,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
sort.Slice(h, func(i, j int) bool {
|
|
|
|
return h[i].Start < h[j].Start
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// getLowestCompleteSlot finds the lowest slot in a RocksDB from which slots are complete onwards.
|
|
|
|
func getLowestCompletedSlot(d *DB) (uint64, error) {
|
|
|
|
iter := d.DB.NewIteratorCF(grocksdb.NewDefaultReadOptions(), d.CfMeta)
|
|
|
|
defer iter.Close()
|
|
|
|
iter.SeekToFirst()
|
|
|
|
|
|
|
|
// The Solana validator periodically prunes old slots to keep database space bounded.
|
|
|
|
// Therefore, the first (few) slots might have valid meta entries but missing data shreds.
|
|
|
|
// To work around this, we simply start at the lowest meta and iterate until we find a complete entry.
|
|
|
|
|
|
|
|
const maxTries = 32
|
|
|
|
for i := 0; iter.Valid() && i < maxTries; i++ {
|
|
|
|
slot, ok := ParseSlotKey(iter.Key().Data())
|
|
|
|
if !ok {
|
|
|
|
return 0, fmt.Errorf(
|
|
|
|
"getLowestCompletedSlot(%s): choked on invalid slot key: %x", d.DB.Name(), iter.Key().Data())
|
|
|
|
}
|
|
|
|
|
|
|
|
// RocksDB row writes are atomic, therefore meta should never be broken.
|
|
|
|
// If we fail to decode meta, bail as early as possible, as we cannot guarantee compatibility.
|
|
|
|
meta, err := ParseBincode[SlotMeta](iter.Value().Data())
|
|
|
|
if err != nil {
|
|
|
|
return 0, fmt.Errorf(
|
|
|
|
"getLowestCompletedSlot(%s): choked on invalid meta for slot %d", d.DB.Name(), slot)
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err = d.GetEntries(meta); err == nil {
|
|
|
|
// Success!
|
|
|
|
return slot, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
iter.Next()
|
|
|
|
}
|
|
|
|
|
|
|
|
return 0, fmt.Errorf("failed to find a valid complete slot in DB: %s", d.DB.Name())
|
|
|
|
}
|