blockstore: use new shred API

Change-Id: I4dcdfe94e3d72342698828880656161ab12d60c8
This commit is contained in:
Richard Patel 2022-12-17 04:26:02 +00:00
parent ea1e6cd07b
commit ec8deb7ac2
8 changed files with 59 additions and 55 deletions

View File

@ -97,7 +97,7 @@ func dumpShreds(
defer iter.Close()
prefix := blockstore.MakeShredKey(slot, 0)
iter.Seek(prefix[:])
for {
for iter.ValidForPrefix(prefix[:8]) {
curSlot, curIndex, ok := blockstore.ParseShredKey(iter.Key().Data())
if !ok || curSlot != slot {
break

View File

@ -11,7 +11,6 @@ import (
"github.com/linxGnu/grocksdb"
"github.com/vbauerster/mpb/v8"
"go.firedancer.io/radiance/pkg/blockstore"
"go.firedancer.io/radiance/pkg/shred"
"k8s.io/klog/v2"
)
@ -167,17 +166,12 @@ func (w *worker) readSlot() (shouldContinue bool) {
}
// Read data shreds.
shreds, err := blockstore.GetDataShredsFromIter(w.shred, metaSlot, 0, uint32(meta.Received))
shreds, err := blockstore.GetDataShredsFromIter(w.shred, metaSlot, 0, uint32(meta.Received), 2)
if err != nil {
klog.Warningf("slot %d: invalid data shreds: %s", metaSlot, err)
return
}
for _, s := range shreds {
data, _ := s.Data()
numBytes += shred.LegacyDataHeaderSize + uint64(len(data))
}
// TODO Sigverify data shreds
// Deshred and parse entries.

View File

@ -22,7 +22,7 @@ func makeEntryBatch(b *blockstore.Entries, withTxs bool) entryBatch {
}
shreds := make([]uint32, len(b.Shreds))
for i, s := range b.Shreds {
shreds[i] = s.CommonHeader().Index
shreds[i] = s.Index
}
return entryBatch{
Entries: es,

View File

@ -25,11 +25,12 @@ var Cmd = cobra.Command{
var flags = Cmd.Flags()
var (
flagSlots = flags.String("slots", "", "Slots to dump")
flagEntries = flags.Bool("entries", false, "Also dump slot entries")
flagShreds = flags.Bool("shreds", false, "Also dump shreds")
flagTxns = flags.Bool("txs", false, "Also dump transactions")
flagRoots = flags.Bool("roots", false, "Dump roots table")
flagSlots = flags.String("slots", "", "Slots to dump")
flagEntries = flags.Bool("entries", false, "Also dump slot entries")
flagShreds = flags.Bool("shreds", false, "Also dump shreds")
flagTxns = flags.Bool("txs", false, "Also dump transactions")
flagRoots = flags.Bool("roots", false, "Dump roots table")
flagShredRevision = flags.Int("shred-revision", 2, "Shred revision (1, 2)")
)
func init() {
@ -40,7 +41,7 @@ func run(c *cobra.Command, args []string) {
go func() {
// No need for clean shutdown, exit quickly
<-c.Context().Done()
os.Exit(0)
// os.Exit(0)
}()
rocksDB := args[0]
@ -153,6 +154,7 @@ func dumpAllSlots(db *blockstore.DB) {
}
slot, ok := blockstore.ParseSlotKey(iter.Key().Data())
if !ok {
klog.Errorf("Invalid slot key: %x", iter.Key().Data())
continue
}
dumpSlot(db, slot)
@ -194,7 +196,7 @@ func printSlotMeta(slotMeta *blockstore.SlotMeta) {
}
func dumpDataShreds(db *blockstore.DB, slot uint64) {
shreds, err := db.GetAllDataShreds(slot)
shreds, err := db.GetAllDataShreds(slot, *flagShredRevision)
if err != nil {
klog.Errorf("Failed to get data shreds of slot %d: %s", slot, err)
return
@ -210,7 +212,7 @@ func dumpDataShreds(db *blockstore.DB, slot uint64) {
}
func dumpDataEntries(db *blockstore.DB, meta *blockstore.SlotMeta) {
entries, err := db.GetEntries(meta)
entries, err := db.GetEntries(meta, *flagShredRevision)
if err != nil {
klog.Errorf("Failed to recover entries of slot %d: %s", meta.Slot, err)
return

View File

@ -13,16 +13,20 @@ import (
// BlockWalk walks blocks in ascending order over multiple RocksDB databases.
type BlockWalk struct {
handles []WalkHandle // sorted
handles []WalkHandle // sorted
shredRevision int
root *grocksdb.Iterator
}
func NewBlockWalk(handles []WalkHandle) (*BlockWalk, error) {
if err := sortWalkHandles(handles); err != nil {
func NewBlockWalk(handles []WalkHandle, shredRevision int) (*BlockWalk, error) {
if err := sortWalkHandles(handles, shredRevision); err != nil {
return nil, err
}
return &BlockWalk{handles: handles}, nil
return &BlockWalk{
handles: handles,
shredRevision: shredRevision,
}, nil
}
// Seek skips ahead to a specific slot.
@ -109,7 +113,7 @@ func (m *BlockWalk) Next() (meta *SlotMeta, ok bool) {
// Caller must have made an ok call to BlockWalk.Next before calling this.
func (m *BlockWalk) Entries(meta *SlotMeta) ([][]shred.Entry, error) {
h := m.handles[0]
mapping, err := h.DB.GetEntries(meta)
mapping, err := h.DB.GetEntries(meta, m.shredRevision)
if err != nil {
return nil, err
}
@ -146,10 +150,10 @@ type WalkHandle struct {
}
// sortWalkHandles detects bounds of each DB and sorts handles.
func sortWalkHandles(h []WalkHandle) error {
func sortWalkHandles(h []WalkHandle, shredRevision int) error {
for i, db := range h {
// Find lowest and highest available slot in DB.
start, err := getLowestCompletedSlot(db.DB)
start, err := getLowestCompletedSlot(db.DB, shredRevision)
if err != nil {
return err
}
@ -170,7 +174,7 @@ func sortWalkHandles(h []WalkHandle) error {
}
// getLowestCompleteSlot finds the lowest slot in a RocksDB from which slots are complete onwards.
func getLowestCompletedSlot(d *DB) (uint64, error) {
func getLowestCompletedSlot(d *DB, shredRevision int) (uint64, error) {
iter := d.DB.NewIteratorCF(grocksdb.NewDefaultReadOptions(), d.CfMeta)
defer iter.Close()
iter.SeekToFirst()
@ -195,7 +199,7 @@ func getLowestCompletedSlot(d *DB) (uint64, error) {
"getLowestCompletedSlot(%s): choked on invalid meta for slot %d", d.DB.Name(), slot)
}
if _, err = d.GetEntries(meta); err == nil {
if _, err = d.GetEntries(meta, shredRevision); err == nil {
// Success!
return slot, nil
}

View File

@ -73,7 +73,7 @@ func (e *Entries) Slot() uint64 {
if len(e.Shreds) == 0 {
return math.MaxUint64
}
return e.Shreds[0].CommonHeader().Slot
return e.Shreds[0].Slot
}
// DataShredsToEntries reassembles shreds to entries containing transactions.
@ -81,10 +81,7 @@ func DataShredsToEntries(meta *SlotMeta, shreds []shred.Shred) (entries []Entrie
ranges := meta.entryRanges()
for _, r := range ranges {
parts := shreds[r.startIdx : r.endIdx+1]
entryBytes, err := shred.Concat(parts)
if err != nil {
return nil, err
}
entryBytes := shred.Concat(parts)
if len(entryBytes) == 0 {
continue
}

View File

@ -10,29 +10,34 @@ import (
"go.firedancer.io/radiance/pkg/shred"
)
func (d *DB) GetEntries(meta *SlotMeta) ([]Entries, error) {
shreds, err := d.GetDataShreds(meta.Slot, 0, uint32(meta.Received))
func (d *DB) GetEntries(meta *SlotMeta, shredRevision int) ([]Entries, error) {
shreds, err := d.GetDataShreds(meta.Slot, 0, uint32(meta.Received), shredRevision)
if err != nil {
return nil, err
}
return DataShredsToEntries(meta, shreds)
}
func (d *DB) GetAllDataShreds(slot uint64) ([]shred.Shred, error) {
return d.getAllShreds(d.CfDataShred, slot)
func (d *DB) GetAllDataShreds(slot uint64, revision int) ([]shred.Shred, error) {
return d.getAllShreds(d.CfDataShred, slot, revision)
}
func (d *DB) GetDataShreds(slot uint64, startIdx, endIdx uint32) ([]shred.Shred, error) {
func (d *DB) GetDataShreds(slot uint64, startIdx, endIdx uint32, revision int) ([]shred.Shred, error) {
iter := d.DB.NewIteratorCF(grocksdb.NewDefaultReadOptions(), d.CfDataShred)
defer iter.Close()
key := MakeShredKey(slot, uint64(startIdx))
iter.Seek(key[:])
return GetDataShredsFromIter(iter, slot, startIdx, endIdx)
return GetDataShredsFromIter(iter, slot, startIdx, endIdx, revision)
}
// GetDataShredsFromIter is like GetDataShreds, but takes a custom iterator.
// The iterator must be seeked to the indicated slot/startIdx.
func GetDataShredsFromIter(iter *grocksdb.Iterator, slot uint64, startIdx, endIdx uint32) ([]shred.Shred, error) {
func GetDataShredsFromIter(
iter *grocksdb.Iterator,
slot uint64,
startIdx, endIdx uint32,
revision int,
) ([]shred.Shred, error) {
var shreds []shred.Shred
for i := startIdx; i < endIdx; i++ {
var curSlot, index uint64
@ -51,8 +56,8 @@ func GetDataShredsFromIter(iter *grocksdb.Iterator, slot uint64, startIdx, endId
if index != uint64(i) {
return nil, fmt.Errorf("missing shred %d for slot %d", i, index)
}
s := shred.NewShredFromSerialized(iter.Value().Data(), shred.VersionByMainnetSlot(slot))
if s == nil {
s := shred.NewShredFromSerialized(iter.Value().Data(), revision)
if !s.Ok() {
return nil, fmt.Errorf("failed to deserialize shred %d/%d", slot, i)
}
shreds = append(shreds, s)
@ -61,8 +66,8 @@ func GetDataShredsFromIter(iter *grocksdb.Iterator, slot uint64, startIdx, endId
return shreds, nil
}
func (d *DB) GetDataShred(slot, index uint64) (shred.Shred, error) {
return d.getShred(d.CfDataShred, slot, index)
func (d *DB) GetDataShred(slot, index uint64, revision int) shred.Shred {
return d.getShred(d.CfDataShred, slot, index, revision)
}
func (d *DB) GetRawDataShred(slot, index uint64) (*grocksdb.Slice, error) {
@ -70,11 +75,11 @@ func (d *DB) GetRawDataShred(slot, index uint64) (*grocksdb.Slice, error) {
}
func (d *DB) GetAllCodeShreds(slot uint64) ([]shred.Shred, error) {
return d.getAllShreds(d.CfDataShred, slot)
return d.getAllShreds(d.CfDataShred, slot, shred.RevisionV2)
}
func (d *DB) GetCodeShred(slot, index uint64) (shred.Shred, error) {
return d.getShred(d.CfCodeShred, slot, index)
func (d *DB) GetCodeShred(slot, index uint64) shred.Shred {
return d.getShred(d.CfCodeShred, slot, index, shred.RevisionV2)
}
func (d *DB) GetRawCodeShred(slot, index uint64) (*grocksdb.Slice, error) {
@ -93,30 +98,32 @@ func (d *DB) getRawShred(
func (d *DB) getShred(
cf *grocksdb.ColumnFamilyHandle,
slot, index uint64,
) (shred.Shred, error) {
revision int,
) (s shred.Shred) {
value, err := d.getRawShred(cf, slot, index)
if err != nil {
return nil, err
return
}
defer value.Free()
s := shred.NewShredFromSerialized(value.Data(), shred.VersionByMainnetSlot(slot))
return s, nil
return shred.NewShredFromSerialized(value.Data(), revision)
}
func (d *DB) getAllShreds(
cf *grocksdb.ColumnFamilyHandle,
slot uint64,
revision int,
) ([]shred.Shred, error) {
iter := d.DB.NewIteratorCF(grocksdb.NewDefaultReadOptions(), cf)
defer iter.Close()
prefix := MakeShredKey(slot, 0)
prefix := MakeSlotKey(slot)
iter.Seek(prefix[:])
var shreds []shred.Shred
for iter.ValidForPrefix(prefix[:8]) {
s := shred.NewShredFromSerialized(iter.Value().Data(), shred.VersionByMainnetSlot(slot))
if s != nil {
shreds = append(shreds, s)
for iter.ValidForPrefix(prefix[:]) {
s := shred.NewShredFromSerialized(iter.Value().Data(), revision)
if !s.Ok() {
return nil, fmt.Errorf("invalid shred %d/%d", slot, len(shreds))
}
shreds = append(shreds, s)
iter.Next()
}
return shreds, nil

View File

@ -450,7 +450,7 @@ func parseShreds(t testing.TB, raw [][]byte, version int) (shreds []shred.Shred)
shreds[i] = shred.NewShredFromSerialized(buf, version)
require.NotNil(t, shreds[i], "invalid shred %d", i)
// Forgetting this assert cost me half an hour of time
assert.Equal(t, shreds[i].CommonHeader().Index, uint32(i))
assert.Equal(t, shreds[i].Index, uint32(i))
}
return shreds
}