remove ipld, car packages

The Solana IPLD project has moved to
https://github.com/rpcpool/yellowstone-faithful
This commit is contained in:
Richard Patel 2023-06-10 15:50:49 +00:00
parent 058eba3bde
commit ddf48545d2
18 changed files with 4 additions and 7732 deletions

View File

@ -1,22 +0,0 @@
//go:build !lite
package car
import (
"github.com/spf13/cobra"
"go.firedancer.io/radiance/cmd/radiance/car/create"
"go.firedancer.io/radiance/cmd/radiance/car/dump"
)
var Cmd = cobra.Command{
Use: "car",
Short: "Manage IPLD Content-addressable ARchives",
Long: "https://ipld.io/specs/transport/car/",
}
func init() {
Cmd.AddCommand(
&create.Cmd,
&dump.Cmd,
)
}

View File

@ -1,9 +0,0 @@
//go:build lite
package car
import (
"github.com/spf13/cobra"
)
var Cmd cobra.Command

View File

@ -1,81 +0,0 @@
//go:build !lite
package create
import (
"path/filepath"
"strconv"
"time"
"github.com/spf13/cobra"
"go.firedancer.io/radiance/pkg/ipld/cargen"
"k8s.io/klog/v2"
"go.firedancer.io/radiance/pkg/blockstore"
)
var Cmd = cobra.Command{
Use: "create <epoch>",
Short: "Create CAR files from blockstore",
Long: "Extracts Solana ledger data from blockstore (RocksDB) databases,\n" +
"and outputs IPLD CARs (content-addressable archives).\n" +
"\n" +
"Produces at least one CAR per epoch.\n" +
"CAR archive contents are deterministic.",
Args: cobra.ExactArgs(1),
}
var flags = Cmd.Flags()
var (
flagOut = flags.StringP("out", "o", "", "Output directory")
flagDBs = flags.StringArray("db", nil, "Path to RocksDB (can be specified multiple times)")
)
func init() {
Cmd.Run = run
}
func run(c *cobra.Command, args []string) {
start := time.Now()
outPath := filepath.Clean(*flagOut)
epochStr := args[0]
epoch, err := strconv.ParseUint(epochStr, 10, 32)
if err != nil {
klog.Exitf("Invalid epoch arg: %s", epochStr)
}
// Open blockstores
dbPaths := *flagDBs
handles := make([]blockstore.WalkHandle, len(*flagDBs))
for i := range handles {
var err error
handles[i].DB, err = blockstore.OpenReadOnly(dbPaths[i])
if err != nil {
klog.Exitf("Failed to open blockstore at %s: %s", dbPaths[i], err)
}
}
// Create new walker object
walker, err := blockstore.NewBlockWalk(handles, 2 /*TODO*/)
if err != nil {
klog.Exitf("Failed to create multi-DB iterator: %s", err)
}
defer walker.Close()
// Create new cargen worker.
w, err := cargen.NewWorker(outPath, epoch, walker)
if err != nil {
klog.Exitf("Failed to init cargen: %s", err)
}
ctx := c.Context()
if err = w.Run(ctx); err != nil {
klog.Exitf("FATAL: %s", err)
}
klog.Info("DONE")
timeTaken := time.Since(start)
klog.Infof("Time taken: %s", timeTaken)
}

View File

@ -1,57 +0,0 @@
package dump
import (
"errors"
"fmt"
"io"
"os"
bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/ipld/go-car"
"github.com/multiformats/go-multicodec"
"github.com/spf13/cobra"
"k8s.io/klog/v2"
)
var Cmd = cobra.Command{
Use: "dump <car>",
Short: "Dump the contents of a CAR file",
Args: cobra.ExactArgs(1),
}
func init() {
Cmd.Run = run
}
func run(_ *cobra.Command, args []string) {
f, err := os.Open(args[0])
if err != nil {
klog.Exit(err.Error())
}
defer f.Close()
rd, err := car.NewCarReader(f)
if err != nil {
klog.Exitf("Failed to open CAR: %s", err)
}
for {
block, err := rd.Next()
if errors.Is(err, io.EOF) {
break
} else if err != nil {
klog.Exitf("Failed to read block: %s", err)
}
if multicodec.Code(block.Cid().Type()) == multicodec.Raw {
var tx solana.Transaction
if err := bin.UnmarshalBin(&tx, block.RawData()); err != nil {
klog.Errorf("Invalid CID %s: %s", block.Cid(), err)
continue
} else if len(tx.Signatures) == 0 {
klog.Errorf("Invalid CID %s: tx has zero signatures", block.Cid())
continue
}
fmt.Println(tx.Signatures[0].String())
}
}
}

View File

@ -3,15 +3,15 @@ package main
import (
"context"
"flag"
"go.firedancer.io/radiance/cmd/radiance/bigtable"
"go.firedancer.io/radiance/cmd/radiance/tpu_quic"
"go.firedancer.io/radiance/cmd/radiance/tpu_udp"
"os"
"os/signal"
"go.firedancer.io/radiance/cmd/radiance/bigtable"
"go.firedancer.io/radiance/cmd/radiance/tpu_quic"
"go.firedancer.io/radiance/cmd/radiance/tpu_udp"
"github.com/spf13/cobra"
"go.firedancer.io/radiance/cmd/radiance/blockstore"
"go.firedancer.io/radiance/cmd/radiance/car"
"go.firedancer.io/radiance/cmd/radiance/gossip"
"go.firedancer.io/radiance/cmd/radiance/replay"
"k8s.io/klog/v2"
@ -34,7 +34,6 @@ func init() {
cmd.AddCommand(
&bigtable.Cmd,
&blockstore.Cmd,
&car.Cmd,
&gossip.Cmd,
&replay.Cmd,
&tpu_udp.Cmd,

View File

@ -1,79 +0,0 @@
// Package car implements the CARv1 file format.
package car
import (
"bytes"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/multiformats/go-multicodec"
)
// IdentityCID is the "zero-length "identity" multihash with "raw" codec".
//
// This is the best-practices placeholder value to refer to a non-existent or unknown object.
var IdentityCID cid.Cid
func init() {
id, err := cid.Cast([]byte{0x01, 0x55, 0x00, 0x00})
if err != nil {
panic("failed to create zero-length identity multihash with raw codec lmfao")
}
IdentityCID = id
}
// Block is a length-cid-data tuple.
// These make up most of CARv1.
//
// See https://ipld.io/specs/transport/car/carv1/#data
type Block struct {
Length int
Cid cid.Cid
Data []byte
}
// NewBlockFromRaw creates a new CIDv1 with the given multicodec contentType on the fly.
func NewBlockFromRaw(data []byte, contentType uint64) Block {
cidBuilder := cid.V1Builder{
Codec: contentType,
MhType: uint64(multicodec.Sha2_256),
}
id, err := cidBuilder.Sum(data)
if err != nil {
// Something is wrong with go-cid if this fails.
panic("failed to construct CID: " + err.Error())
}
return Block{
Length: id.ByteLen() + len(data),
Cid: id,
Data: data,
}
}
func NewBlockFromCBOR(node datamodel.Node, contentType uint64) (Block, error) {
// TODO: This could be rewritten as zero-copy
var buf bytes.Buffer
if err := dagcbor.Encode(node, &buf); err != nil {
return Block{}, err
}
return NewBlockFromRaw(buf.Bytes(), contentType), nil
}
// TotalLen returns the total length of the block, including the length prefix.
func (b Block) TotalLen() int {
return leb128Len(uint64(b.Length)) + b.Length
}
// leb128Len is like len(leb128.FromUInt64(x)).
// But without an allocation, therefore should be preferred.
func leb128Len(x uint64) (n int) {
n = 1
for {
x >>= 7
if x == 0 {
return
}
n++
}
}

View File

@ -1,59 +0,0 @@
package car
import (
"encoding/binary"
"fmt"
"testing"
"github.com/filecoin-project/go-leb128"
"github.com/stretchr/testify/assert"
)
func TestIdentityCIDStr(t *testing.T) {
assert.Equal(t, "bafkqaaa", IdentityCID.String())
}
func TestLeb128Len(t *testing.T) {
cases := []struct {
n uint64
len int
}{
{0, 1},
{1, 1},
{2, 1},
{3, 1},
{4, 1},
{5, 1},
{63, 1},
{64, 1},
{65, 1},
{100, 1},
{127, 1},
{128, 2},
{129, 2},
{2141192192, 5},
{^uint64(0), 10},
}
for _, tc := range cases {
t.Run(fmt.Sprintf("N_%d", tc.n), func(t *testing.T) {
assert.Equal(t, tc.len, leb128Len(tc.n))
})
}
}
// Sanity-check asserting that Go Uvarint and LEB128 are compatible.
func TestUvarintIsLeb128(t *testing.T) {
cases := []uint64{
0, 1, 2, 3, 4, 5, 63, 64,
65, 100, 127, 128, 129,
2141192192, ^uint64(0),
}
for _, tc := range cases {
t.Run(fmt.Sprintf("N_%d", tc), func(t *testing.T) {
var v0Buf [binary.MaxVarintLen64]byte
v0 := v0Buf[:binary.PutUvarint(v0Buf[:], tc)]
v1 := leb128.FromUInt64(tc)
assert.Equal(t, v0, v1)
})
}
}

View File

@ -1,91 +0,0 @@
package car
import (
"io"
"github.com/filecoin-project/go-leb128"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car"
)
type OutStream interface {
WriteBlock(Block) error
}
// Writer produces CARv1 files with size tracking.
//
// The implementation is kinda memory-efficient.
// Needs up to IPLD block size plus peanuts of memory.
//
// # Rationale
//
// github.com/ipld/go-car/v2 is not helpful because it wants to traverse a complete IPLD link system.
// However we create an IPLD link system (Merkle-DAG) on the fly in a single pass as we read the chain.
// CARv1 is simple enough that we can roll a custom block writer, so no big deal.
type Writer struct {
out countingWriter
}
// NewWriter creates a new CARv1 Writer and writes the header.
func NewWriter(out io.Writer) (*Writer, error) {
w := &Writer{out: newCountingWriter(out)}
// Deliberately using the go-car v0 library here.
// go-car v2 doesn't seem to expose the CARv1 header format.
hdr := car.CarHeader{
Roots: []cid.Cid{IdentityCID}, // placeholder
Version: 1,
}
if err := car.WriteHeader(&hdr, w.out); err != nil {
return nil, err
}
return w, nil
}
// Write raw bytes to CAR.
func (w *Writer) Write(data []byte) (n int, err error) {
return w.out.Write(data)
}
// WriteBlock writes out a length-CID-value tuple.
func (w *Writer) WriteBlock(b Block) (err error) {
if _, err = w.out.Write(leb128.FromUInt64(uint64(b.Length))); err != nil {
return err
}
if _, err = w.out.Write(b.Cid.Bytes()); err != nil {
return err
}
_, err = w.out.Write(b.Data)
return
}
// Written returns the number of bytes written so far.
func (w *Writer) Written() int64 {
return w.out.written()
}
// countingWriter wraps io.Writer, but counts number of written bytes.
// Not thread safe.
type countingWriter struct {
io.Writer
n *int64
}
func newCountingWriter(w io.Writer) countingWriter {
return countingWriter{
Writer: w,
n: new(int64),
}
}
func (c countingWriter) Write(data []byte) (n int, err error) {
n, err = c.Writer.Write(data)
*c.n += int64(n)
return
}
// written returns number of bytes written so far.
func (c countingWriter) written() int64 {
return *c.n
}

View File

@ -1,134 +0,0 @@
package car
import (
"bytes"
"context"
"io"
"testing"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/ipld/go-car"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewWriter(t *testing.T) {
var buf bytes.Buffer
w, err := NewWriter(&buf)
require.NoError(t, err)
require.NotNil(t, w)
// Ensure that CAR header has been written.
assert.Equal(t, []byte{
0x19, // length 0x19 follows
0xa2, // map with two items
0x65, 0x72, 0x6f, 0x6f, 0x74, 0x73, // map key: "roots"
0x81, // map value: array of one root
0xd8, 0x2a, 0x45, 0x00, 0x01, 0x55, 0x00, 0x00, // CID 00 01 55 00 00 (identity)
0x67, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, // map key: "version"
0x01, // integer 0x01
}, buf.Bytes())
// Ensure CBOR can be deserialized.
var x struct {
Roots []cid.Cid `refmt:"roots"`
Version uint64 `refmt:"version"`
}
cbornode.RegisterCborType(x)
err = cbornode.DecodeInto(buf.Bytes()[1:], &x)
require.NoError(t, err)
assert.Equal(t, []cid.Cid{IdentityCID}, x.Roots)
assert.Equal(t, uint64(1), x.Version)
}
func TestNewWriter_Error(t *testing.T) {
var mock mockWriter
mock.err = io.ErrClosedPipe
w, err := NewWriter(&mock)
assert.Nil(t, w)
assert.Same(t, mock.err, err)
}
func TestWriter(t *testing.T) {
var buf bytes.Buffer
w, err := NewWriter(&buf)
require.NoError(t, err)
require.NotNil(t, w)
// Write a bunch of data
require.NoError(t, w.WriteBlock(NewBlockFromRaw([]byte{}, uint64(multicodec.Raw))))
require.NoError(t, w.WriteBlock(NewBlockFromRaw([]byte("hello world"), uint64(multicodec.Raw))))
assert.Equal(t, int64(buf.Len()), w.Written())
// Load using ipld/go-car library
ctx := context.Background()
store := blockstore.NewBlockstore(ds.NewMapDatastore())
ch, err := car.LoadCar(ctx, store, &buf)
require.NoError(t, err)
assert.Equal(t, &car.CarHeader{
Roots: []cid.Cid{IdentityCID},
Version: 1,
}, ch)
allKeys, err := store.AllKeysChan(ctx)
require.NoError(t, err)
var keys []cid.Cid
for key := range allKeys {
keys = append(keys, key)
t.Log(key.String())
}
assert.Len(t, keys, 2)
b, err := store.Get(ctx, cid.MustParse("bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"))
require.NoError(t, err)
assert.Equal(t, b.RawData(), []byte{})
b, err = store.Get(ctx, cid.MustParse("bafkreifzjut3te2nhyekklss27nh3k72ysco7y32koao5eei66wof36n5e"))
require.NoError(t, err)
assert.Equal(t, b.RawData(), []byte("hello world"))
}
func TestCountingWriter(t *testing.T) {
var mock mockWriter
w := newCountingWriter(&mock)
assert.Equal(t, int64(0), w.written())
// successful write
mock.n, mock.err = 5, nil
n, err := w.Write([]byte("hello"))
assert.Equal(t, mock.n, n)
assert.Equal(t, mock.err, err)
assert.Equal(t, int64(5), w.written())
// partial write
mock.n, mock.err = 3, io.ErrClosedPipe
n, err = w.Write([]byte("hello"))
assert.Equal(t, mock.n, n)
assert.Equal(t, mock.err, err)
assert.Equal(t, int64(8), w.written())
// failed write
mock.n, mock.err = 0, io.ErrClosedPipe
n, err = w.Write([]byte("hello"))
assert.Equal(t, mock.n, n)
assert.Equal(t, mock.err, err)
assert.Equal(t, int64(8), w.written())
}
type mockWriter struct {
n int
err error
}
func (m *mockWriter) Write(_ []byte) (int, error) {
return m.n, m.err
}

View File

@ -1,294 +0,0 @@
// Package cargen transforms blockstores into CAR files.
package cargen
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"go.firedancer.io/radiance/pkg/blockstore"
"go.firedancer.io/radiance/pkg/ipld/car"
"go.firedancer.io/radiance/pkg/ipld/ipldgen"
"go.firedancer.io/radiance/pkg/shred"
"k8s.io/klog/v2"
)
// MaxCARSize is the maximum size of a CARv1 file.
//
// Dictated by Filecoin's preferred sector size (currently 32 GiB).
// cargen will attempt to pack CARs as large as possible but never exceed.
//
// Filecoin miners may append CARv2 indexes, which would exceed the total CAR size.
const MaxCARSize = 1 << 35
// Worker extracts Solana blocks from blockstore and produces CAR files.
//
// # Data Source
//
// Solana validator RocksDB archives serve as input.
// Any gaps in history will be detected and thrown as an error.
//
// The worker uses a blockstore.BlockWalk to seamlessly iterate over multiple RocksDBs if needed.
// This is useful if an epoch is split across multiple archives.
//
// # Epochs
//
// Each worker processes a single epoch of Solana history (432000 slots) in rooted order.
//
// Transforming a single epoch, which takes about a day on mainnet, should take a few hours to transform into CAR.
//
// Because of epoch alignment, it is safe to run multiple workers in parallel on distinct epochs.
// Therefore, the CAR generation process can be trivially parallelized by launching multiple instances.
//
// The CAR output will be byte-by-byte deterministic with regard to Solana's authenticated ledger content.
// In other words, regardless of which node operator runs this tool, they should always get the same CAR file.
//
// # Blocks
//
// Each block will be parsed and turned into an IPLD graph.
//
// The procedure respects Worker.CARSize and splits data across multiple CARs if needed.
// This allows us to assign a slot range to each CAR for the reader's convenience, at negligible alignment cost.
//
// # Output
//
// Each run produces one or more car files in the target directory,
// named `ledger-e{epoch}-s{slot}.car`, where slot is the first slot number in the epoch.
//
// The interlinked IPLD blocks are internally encoded with DAG-CBOR.
// Except for the ipldgen.SolanaTx "leaf" nodes, which are encoded using bincode (native).
type Worker struct {
dir string
walk blockstore.BlockWalkI
epoch uint64
stop uint64 // exclusive
handle carHandle
CARSize uint
}
// NewWorker creates a new worker to transform an epoch from blockstore.BlockWalk into CAR files in the given dir.
//
// Creates the directory if it doesn't exist yet.
func NewWorker(dir string, epoch uint64, walk blockstore.BlockWalkI) (*Worker, error) {
if err := os.Mkdir(dir, 0777); err != nil && !errors.Is(err, fs.ErrExist) {
return nil, err
}
// Seek to epoch start and make sure we have all data
const epochLen = 432000
start := epoch * epochLen
stop := start + epochLen
if !walk.Seek(start) {
return nil, fmt.Errorf("slot %d not available in any DB", start)
}
// TODO: This is not robust; if the DB starts in the middle of the epoch, the first slots are going to be skipped.
klog.Infof("Starting at slot %d", start)
slotsAvailable := walk.SlotsAvailable()
if slotsAvailable < epochLen {
return nil, fmt.Errorf("need slots [%d:%d] (epoch %d) but only have up to %d",
start, stop, epoch, start+slotsAvailable)
}
w := &Worker{
dir: dir,
walk: walk,
epoch: epoch,
stop: stop,
CARSize: MaxCARSize,
}
return w, nil
}
func (w *Worker) Run(ctx context.Context) error {
for ctx.Err() == nil {
next, err := w.step()
if err != nil {
return err
}
if !next {
break
}
}
if w.handle.ok() {
if err := w.handle.close(); err != nil {
return err
}
}
return ctx.Err()
}
// step iterates one block forward.
func (w *Worker) step() (next bool, err error) {
meta, ok := w.walk.Next()
if !ok {
return false, nil
}
if meta.Slot > w.stop {
return false, nil
}
entries, err := w.walk.Entries(meta)
if err != nil {
return false, fmt.Errorf("failed to get entry at slot %d: %w", meta.Slot, err)
}
if err := w.ensureHandle(meta.Slot); err != nil {
return false, err
}
if err := w.writeSlot(meta, entries); err != nil {
return false, err
}
if err := w.splitHandle(meta.Slot); err != nil {
return false, err
}
return true, nil
}
// ensureHandle makes sure we have a CAR handle that we can write to.
func (w *Worker) ensureHandle(slot uint64) error {
if w.handle.ok() {
w.handle.lastOffset = w.handle.writer.Written()
return nil
}
return w.handle.open(w.dir, w.epoch, slot)
}
// splitHandle creates a new CAR file if the current one is oversized.
//
// Internally moves blocks that exceed max CAR size from old to new file.
func (w *Worker) splitHandle(slot uint64) (err error) {
size := w.handle.writer.Written()
if size <= int64(w.CARSize) {
return nil
}
// CAR is too large and needs to be split.
klog.Infof("CAR file %s too large, splitting...", w.handle.file.Name())
// Create new target CAR.
var newCAR carHandle
if err = newCAR.open(w.dir, w.epoch, slot); err != nil {
return err
}
// Seek old CAR back to before block.
w.handle.writer = nil
if err = w.handle.cache.Flush(); err != nil {
return fmt.Errorf("failed to flush CAR cache: %w", err)
}
klog.Infof("Seeking to offset %d and copying rest", w.handle.lastOffset)
if _, err = w.handle.file.Seek(w.handle.lastOffset, io.SeekStart); err != nil {
return fmt.Errorf("failed to rewind CAR: %w", err)
}
// Move block from old to new.
if _, err = io.Copy(newCAR.writer, w.handle.file); err != nil {
return fmt.Errorf("failed to move block between CARs: %w", err)
}
// Truncate old handle to make it fit max size.
if err = w.handle.file.Truncate(w.handle.lastOffset); err != nil {
return fmt.Errorf("failed to truncate old CAR (%s) to %d bytes: %w",
w.handle.file.Name(), w.handle.lastOffset, err)
}
// Swap handles.
err = w.handle.close()
w.handle = newCAR
if written := w.handle.writer.Written(); written > int64(w.CARSize) {
klog.Errorf("Slot %d exceeds size of a single CAR (%d > %d)", slot, written, w.CARSize)
}
return err
}
// writeSlot writes a filled Solana slot to the CAR.
// Creates multiple IPLD blocks internally.
func (w *Worker) writeSlot(meta *blockstore.SlotMeta, entries [][]shred.Entry) error {
slot := meta.Slot
asm := ipldgen.NewBlockAssembler(w.handle.writer, slot)
entryNum := 0
klog.V(3).Infof("Slot %d", slot)
for i, batch := range entries {
klog.V(6).Infof("Slot %d batch %d", slot, i)
for j, entry := range batch {
pos := ipldgen.EntryPos{
Slot: slot,
EntryIndex: entryNum,
Batch: i,
BatchIndex: j,
LastShred: -1,
}
if j == len(batch)-1 {
// We map "last shred of batch" to each "last entry of batch"
// so we can reconstruct the shred/entry-batch assignments.
if i >= len(meta.EntryEndIndexes) {
return fmt.Errorf("out-of-bounds batch index %d (have %d batches in slot %d)",
i, len(meta.EntryEndIndexes), slot)
}
pos.LastShred = int(meta.EntryEndIndexes[i])
}
if err := asm.WriteEntry(entry, pos); err != nil {
return fmt.Errorf("failed to write slot %d shred %d (batch %d index %d): %s",
slot, entryNum, i, j, err)
}
entryNum++
}
}
// TODO roll up into ledger entries
if _, err := asm.Finish(); err != nil {
klog.Exitf("Failed to write block: %s", err)
}
return nil
}
type carHandle struct {
file *os.File
cache *bufio.Writer
writer *car.Writer
lastOffset int64
}
const writeBufSize = 16384
func (c *carHandle) open(dir string, epoch uint64, slot uint64) error {
if c.ok() {
return fmt.Errorf("handle not closed")
}
p := filepath.Join(dir, fmt.Sprintf("ledger-e%d-s%d.car", epoch, slot))
f, err := os.OpenFile(p, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
return fmt.Errorf("failed to create CAR: %w", err)
}
cache := bufio.NewWriterSize(f, writeBufSize)
writer, err := car.NewWriter(cache)
if err != nil {
return fmt.Errorf("failed to start CAR at %s: %w", p, err)
}
*c = carHandle{
file: f,
cache: cache,
writer: writer,
lastOffset: 0,
}
klog.Infof("Created new CAR file %s", f.Name())
return nil
}
func (c *carHandle) ok() bool {
return c.writer != nil
}
func (c *carHandle) close() (err error) {
if err = c.cache.Flush(); err != nil {
return err
}
err = c.file.Close()
*c = carHandle{}
return
}

View File

@ -1,270 +0,0 @@
package cargen
import (
"bytes"
"context"
"errors"
"io"
"os"
"path/filepath"
"sort"
"strings"
"testing"
bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/go-logr/logr/testr"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.firedancer.io/radiance/pkg/blockstore"
"go.firedancer.io/radiance/pkg/shred"
"k8s.io/klog/v2"
)
// mockBlockWalk is a mock implementation of blockstore.BlockWalkI.
type mockBlockWalk struct {
queue []*blockstore.SlotMeta
entries map[*blockstore.SlotMeta][][]shred.Entry
staged [][]shred.Entry
slot uint64
left int64
}
func newMockBlockWalk() *mockBlockWalk {
return &mockBlockWalk{
queue: nil,
entries: make(map[*blockstore.SlotMeta][][]shred.Entry),
staged: nil,
}
}
func (m *mockBlockWalk) append(meta *blockstore.SlotMeta, entries [][]shred.Entry) {
m.queue = append(m.queue, meta)
m.entries[meta] = entries
}
func (m *mockBlockWalk) Seek(slot uint64) (ok bool) {
for len(m.queue) > 0 && m.queue[len(m.queue)-1].Slot < slot {
delete(m.entries, m.queue[0])
m.queue = m.queue[1:]
}
return len(m.queue) > 0
}
func (m *mockBlockWalk) SlotsAvailable() uint64 {
if m.left <= 0 {
return 0
}
return uint64(m.left)
}
func (m *mockBlockWalk) Next() (meta *blockstore.SlotMeta, ok bool) {
if len(m.queue) == 0 {
m.left = 0
return nil, false
}
meta = m.queue[0]
m.queue = m.queue[1:]
m.left -= int64(meta.Slot) - int64(m.slot)
m.staged = m.entries[meta]
delete(m.entries, meta)
m.slot = meta.Slot
ok = true
return
}
func (m *mockBlockWalk) Entries(*blockstore.SlotMeta) ([][]shred.Entry, error) {
return m.staged, nil
}
func (m *mockBlockWalk) Close() {
m.queue = nil
m.entries = nil
m.staged = nil
}
func TestGen_Empty(t *testing.T) {
walk := newMockBlockWalk()
dir := t.TempDir()
worker, err := NewWorker(dir, 0, walk)
assert.EqualError(t, err, "slot 0 not available in any DB")
assert.Nil(t, worker)
}
func TestGen_Split(t *testing.T) {
klog.SetLogger(testr.New(t))
t.Cleanup(klog.ClearLogger)
walk := newMockBlockWalk()
walk.left = 432000
walk.append(
&blockstore.SlotMeta{Slot: 432000, EntryEndIndexes: []uint32{0}},
[][]shred.Entry{
{
{
Txns: []solana.Transaction{
{
Signatures: make([]solana.Signature, 1),
Message: solana.Message{
AccountKeys: make([]solana.PublicKey, 3),
Header: solana.MessageHeader{},
RecentBlockhash: solana.Hash{},
Instructions: []solana.CompiledInstruction{
{
Accounts: make([]uint16, 2),
Data: make(solana.Base58, 64),
},
},
},
},
},
},
},
})
walk.append(
&blockstore.SlotMeta{Slot: 432002, EntryEndIndexes: []uint32{0, 1}},
[][]shred.Entry{
{
{
Txns: []solana.Transaction{
{
Signatures: make([]solana.Signature, 1),
Message: solana.Message{
AccountKeys: make([]solana.PublicKey, 1),
Header: solana.MessageHeader{},
RecentBlockhash: solana.Hash{},
Instructions: []solana.CompiledInstruction{
{
Accounts: make([]uint16, 1),
Data: make(solana.Base58, 20),
},
},
},
},
},
},
},
{
{
Txns: []solana.Transaction{
{
Signatures: make([]solana.Signature, 1),
Message: solana.Message{
AccountKeys: make([]solana.PublicKey, 1),
Header: solana.MessageHeader{},
RecentBlockhash: solana.Hash{},
Instructions: []solana.CompiledInstruction{
{
Accounts: make([]uint16, 1),
Data: make(solana.Base58, 20),
},
},
},
},
},
},
},
})
dir := t.TempDir()
worker, err := NewWorker(dir, 1, walk)
require.NoError(t, err)
require.NotNil(t, worker)
worker.CARSize = 1024
require.NoError(t, worker.Run(context.Background()))
entries, err := os.ReadDir(dir)
require.NoError(t, err)
sort.Slice(entries, func(i, j int) bool {
return strings.Compare(entries[i].Name(), entries[j].Name()) < 0
})
assert.Len(t, entries, 2)
cases := map[string]struct {
size int64
cids []string
}{
"ledger-e1-s432000.car": {
size: 534,
cids: []string{
"bafkreicloicbw5yk5bpkthpwgnxrjy4iwqiitbvxs52tbkxm3fqcmgvl7a", // KindTx
"bafyreiawzny3vxq6bir23qjai6fcujyiciqz7iftnibrfzbuvtoy7fvqtq", // KindEntry
"bafyreiaihhxk2rfo5lgilus3wp3mqglv2do3zgtfqpp5zfygrpzdbez3am", // KindBlock
},
},
"ledger-e1-s432002.car": {
size: 782,
cids: []string{
"bafkreihgemfmup2imylvtyp2wj3fymakbx2bfyi2rlzxxq4xv3ycgmc3da", // KindTx
"bafyreih5wmt3ly25phouneamyxg5fs4uu3ma6kdvgwfregnmjsibkuipeq", // KindEntry
"bafkreihgemfmup2imylvtyp2wj3fymakbx2bfyi2rlzxxq4xv3ycgmc3da", // KindTx
"bafyreih5wmt3ly25phouneamyxg5fs4uu3ma6kdvgwfregnmjsibkuipeq", // KindEntry
"bafyreibqnqnmnhunzwcopnc7srlvq5p5bbgex2xspb6ayzf265rcwtaiie", // KindBlock
},
},
}
for name, tc := range cases {
filePath := filepath.Join(dir, name)
t.Run("Parse_"+name, func(t *testing.T) {
// match file size
info, err := os.Stat(filePath)
require.NoError(t, err)
assert.Equal(t, tc.size, info.Size())
// ensure CARs decode
f, err := os.Open(filepath.Join(dir, name))
require.NoError(t, err)
defer f.Close()
rd, err := car.NewCarReader(f)
require.NoError(t, err)
var cids []cid.Cid
for {
block, err := rd.Next()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
cid := block.Cid()
cidType := cid.Type()
t.Logf("CID=%s Multicodec=%#x", cid, cidType)
switch multicodec.Code(cidType) {
case multicodec.Raw:
var tx solana.Transaction
require.NoError(t, bin.UnmarshalBin(&tx, block.RawData()))
t.Logf(" Txn: %s", &tx)
case multicodec.DagCbor:
decodeOpt := dagcbor.DecodeOptions{
AllowLinks: true,
}
builder := basicnode.Prototype.Any.NewBuilder()
require.NoError(t, decodeOpt.Decode(builder, bytes.NewReader(block.RawData())))
node := builder.Build()
t.Logf(" Entry: %s", node.Kind())
default:
panic("Unexpected entry")
}
cids = append(cids, cid)
}
// match CIDs
cidStrs := make([]string, len(cids))
for i, c := range cids {
cidStrs[i] = c.String()
}
assert.Equal(t, tc.cids, cidStrs)
})
}
}

View File

@ -1,316 +0,0 @@
// Package ipldgen transforms Solana ledger data into IPLD DAGs.
package ipldgen
import (
bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/multiformats/go-multicodec"
"go.firedancer.io/radiance/pkg/ipld/car"
"go.firedancer.io/radiance/pkg/ipld/ipldsch"
"go.firedancer.io/radiance/pkg/shred"
)
// CIDLen is the serialized size in bytes of a Raw/DagCbor CIDv1
const CIDLen = 36
// TargetBlockSize is the target size of variable-length IPLD blocks (e.g. link lists).
// Don't set this to the IPFS max block size, as we might overrun by a few kB.
const TargetBlockSize = 1 << 19
// lengthPrefixSize is the max practical size of an array length prefix.
const lengthPrefixSize = 4
// IPLD node identifier
const (
KindTx = iota
KindEntry
KindBlock
)
type BlockAssembler struct {
writer car.OutStream
slot uint64
entries []cidlink.Link
shredding []shredding
}
type shredding struct {
entryEndIdx uint
shredEndIdx uint
}
func NewBlockAssembler(writer car.OutStream, slot uint64) *BlockAssembler {
return &BlockAssembler{
writer: writer,
slot: slot,
}
}
type EntryPos struct {
Slot uint64
EntryIndex int
Batch int
BatchIndex int
LastShred int
}
// WriteEntry appends a ledger entry to the CAR.
func (b *BlockAssembler) WriteEntry(entry shred.Entry, pos EntryPos) error {
txList, err := NewTxListAssembler(b.writer).Assemble(entry.Txns)
if err != nil {
return err
}
builder := ipldsch.Type.Entry.NewBuilder()
entryMap, err := builder.BeginMap(9)
if err != nil {
return err
}
if pos.LastShred > 0 {
b.shredding = append(b.shredding, shredding{
entryEndIdx: uint(pos.EntryIndex),
shredEndIdx: uint(pos.LastShred),
})
}
var nodeAsm datamodel.NodeAssembler
nodeAsm, err = entryMap.AssembleEntry("kind")
if err != nil {
return err
}
if err = nodeAsm.AssignInt(int64(KindEntry)); err != nil {
return err
}
nodeAsm, err = entryMap.AssembleEntry("numHashes")
if err != nil {
return err
}
if err = nodeAsm.AssignInt(int64(entry.NumHashes)); err != nil {
return err
}
nodeAsm, err = entryMap.AssembleEntry("hash")
if err != nil {
return err
}
if err = nodeAsm.AssignBytes(entry.Hash[:]); err != nil {
return err
}
nodeAsm, err = entryMap.AssembleEntry("txs")
if err != nil {
return err
}
if err = nodeAsm.AssignNode(txList); err != nil {
return err
}
if err = entryMap.Finish(); err != nil {
return err
}
node := builder.Build().(ipldsch.Entry).Representation()
block, err := car.NewBlockFromCBOR(node, uint64(multicodec.DagCbor))
if err != nil {
return err
}
b.entries = append(b.entries, cidlink.Link{Cid: block.Cid})
return b.writer.WriteBlock(block)
}
// Finish appends block metadata to the CAR and returns the root CID.
func (b *BlockAssembler) Finish() (link cidlink.Link, err error) {
builder := ipldsch.Type.Block.NewBuilder()
entryMap, err := builder.BeginMap(4)
if err != nil {
return link, err
}
var nodeAsm datamodel.NodeAssembler
nodeAsm, err = entryMap.AssembleEntry("kind")
if err != nil {
return link, err
}
if err = nodeAsm.AssignInt(int64(KindBlock)); err != nil {
return link, err
}
nodeAsm, err = entryMap.AssembleEntry("slot")
if err != nil {
return link, err
}
if err = nodeAsm.AssignInt(int64(b.slot)); err != nil {
return link, err
}
nodeAsm, err = entryMap.AssembleEntry("shredding")
if err != nil {
return link, err
}
list, err := nodeAsm.BeginList(int64(len(b.shredding)))
if err != nil {
return link, err
}
for _, s := range b.shredding {
tuple, err := list.AssembleValue().BeginMap(2)
if err != nil {
return link, err
}
entry, err := tuple.AssembleEntry("entryEndIdx")
if err != nil {
return link, err
}
if err = entry.AssignInt(int64(s.entryEndIdx)); err != nil {
return link, err
}
entry, err = tuple.AssembleEntry("shredEndIdx")
if err != nil {
return link, err
}
if err = entry.AssignInt(int64(s.shredEndIdx)); err != nil {
return link, err
}
if err = tuple.Finish(); err != nil {
return link, err
}
}
if err = list.Finish(); err != nil {
return link, err
}
nodeAsm, err = entryMap.AssembleEntry("entries")
if err != nil {
return link, err
}
list, err = nodeAsm.BeginList(int64(len(b.entries)))
if err != nil {
return link, err
}
for _, entry := range b.entries {
if err = list.AssembleValue().AssignLink(entry); err != nil {
return link, err
}
}
if err = list.Finish(); err != nil {
return link, err
}
if err = entryMap.Finish(); err != nil {
return link, err
}
node := builder.Build().(ipldsch.Block).Representation()
block, err := car.NewBlockFromCBOR(node, uint64(multicodec.DagCbor))
if err != nil {
return link, err
}
if err = b.writer.WriteBlock(block); err != nil {
return link, err
}
return cidlink.Link{Cid: block.Cid}, nil
}
// TxListAssembler produces a Merkle tree of transactions with wide branches.
type TxListAssembler struct {
writer car.OutStream
links []cidlink.Link
}
func NewTxListAssembler(writer car.OutStream) TxListAssembler {
return TxListAssembler{writer: writer}
}
// Assemble produces a transaction list DAG and returns the root node.
func (t TxListAssembler) Assemble(txs []solana.Transaction) (datamodel.Node, error) {
for _, tx := range txs {
if err := t.writeTx(tx); err != nil {
return nil, err
}
}
return t.finish()
}
// writeTx writes out SolanaTx to the CAR and appends CID to memory.
func (t *TxListAssembler) writeTx(tx solana.Transaction) error {
buf, err := bin.MarshalBin(tx)
if err != nil {
panic("failed to marshal tx: " + err.Error())
}
leaf := car.NewBlockFromRaw(buf, uint64(multicodec.Raw))
if err := t.writer.WriteBlock(leaf); err != nil {
return err
}
t.links = append(t.links, cidlink.Link{Cid: leaf.Cid})
return nil
}
// finish recursively writes out RadianceTx into a tree structure until the root fits.
func (t *TxListAssembler) finish() (datamodel.Node, error) {
node, err := t.pack()
if err != nil {
return nil, err
}
// Terminator: Reached root, stop merklerizing.
if len(t.links) == 0 {
return node, nil
}
// Create left link.
block, err := car.NewBlockFromCBOR(node, uint64(multicodec.DagCbor))
if err != nil {
return nil, err
}
var links []cidlink.Link
links = append(links, cidlink.Link{Cid: block.Cid})
if err := t.writer.WriteBlock(block); err != nil {
return nil, err
}
// Create right links.
for len(t.links) > 0 {
node, err = t.pack()
if err != nil {
return nil, err
}
block, err = car.NewBlockFromCBOR(node, uint64(multicodec.DagCbor))
if err != nil {
return nil, err
}
if err = t.writer.WriteBlock(block); err != nil {
return nil, err
}
links = append(links, cidlink.Link{Cid: block.Cid})
}
// Move up layer.
t.links = links
return t.finish()
}
// pack moves as many CIDs as possible into a node.
func (t *TxListAssembler) pack() (node datamodel.Node, err error) {
builder := ipldsch.Type.TransactionList.NewBuilder()
list, err := builder.BeginList(0)
if err != nil {
return nil, err
}
// Pack nodes until we fill TargetBlockSize.
left := TargetBlockSize - lengthPrefixSize
for ; len(t.links) > 0 && left >= CIDLen; left -= CIDLen {
link := t.links[0]
t.links = t.links[1:]
if err := list.AssembleValue().AssignLink(link); err != nil {
return nil, err
}
}
if err := list.Finish(); err != nil {
return nil, err
}
node = builder.Build()
return node, nil
}

View File

@ -1,46 +0,0 @@
package ipldgen
import (
"fmt"
"testing"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.firedancer.io/radiance/pkg/ipld/car"
)
func TestCIDLen(t *testing.T) {
// Check whether codecs actually result in a CID sized CIDLen.
// This is important for our allocation strategies during merklerization.
codecs := []uint64{
uint64(multicodec.Raw),
uint64(multicodec.DagCbor),
}
for _, codec := range codecs {
t.Run(fmt.Sprintf("Codec_%#x", codec), func(t *testing.T) {
builder := cid.V1Builder{
Codec: codec,
MhType: multihash.SHA2_256,
}
id, err := builder.Sum(nil)
require.NoError(t, err)
assert.Equal(t, id.ByteLen(), CIDLen)
})
}
}
type nullCARWriter struct{}
func (nullCARWriter) WriteBlock(car.Block) error {
return nil
}
func TestBlockAssembler_Empty(t *testing.T) {
asm := NewBlockAssembler(nullCARWriter{}, 42)
link, err := asm.Finish()
require.NoError(t, err)
t.Log(link)
}

View File

@ -1,2 +0,0 @@
// Package ipldsch provides Solana ledger IPLD schemas.
package ipldsch

View File

@ -1,51 +0,0 @@
package ipldsch
// Code generated by go-ipld-prime gengo. DO NOT EDIT.
import (
"fmt"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/schema"
)
const (
midvalue = schema.Maybe(4)
allowNull = schema.Maybe(5)
)
type maState uint8
const (
maState_initial maState = iota
maState_midKey
maState_expectValue
maState_midValue
maState_finished
)
type laState uint8
const (
laState_initial laState = iota
laState_midValue
laState_finished
)
type _ErrorThunkAssembler struct {
e error
}
func (ea _ErrorThunkAssembler) BeginMap(_ int64) (datamodel.MapAssembler, error) { return nil, ea.e }
func (ea _ErrorThunkAssembler) BeginList(_ int64) (datamodel.ListAssembler, error) { return nil, ea.e }
func (ea _ErrorThunkAssembler) AssignNull() error { return ea.e }
func (ea _ErrorThunkAssembler) AssignBool(bool) error { return ea.e }
func (ea _ErrorThunkAssembler) AssignInt(int64) error { return ea.e }
func (ea _ErrorThunkAssembler) AssignFloat(float64) error { return ea.e }
func (ea _ErrorThunkAssembler) AssignString(string) error { return ea.e }
func (ea _ErrorThunkAssembler) AssignBytes([]byte) error { return ea.e }
func (ea _ErrorThunkAssembler) AssignLink(datamodel.Link) error { return ea.e }
func (ea _ErrorThunkAssembler) AssignNode(datamodel.Node) error { return ea.e }
func (ea _ErrorThunkAssembler) Prototype() datamodel.NodePrototype {
panic(fmt.Errorf("cannot get prototype from error-carrying assembler: already derailed with error: %w", ea.e))
}

File diff suppressed because it is too large Load Diff

View File

@ -1,127 +0,0 @@
package ipldsch
// Code generated by go-ipld-prime gengo. DO NOT EDIT.
import (
"github.com/ipld/go-ipld-prime/datamodel"
)
var _ datamodel.Node = nil // suppress errors when this dependency is not referenced
// Type is a struct embeding a NodePrototype/Type for every Node implementation in this package.
// One of its major uses is to start the construction of a value.
// You can use it like this:
//
// ipldsch.Type.YourTypeName.NewBuilder().BeginMap() //...
//
// and:
//
// ipldsch.Type.OtherTypeName.NewBuilder().AssignString("x") // ...
var Type typeSlab
type typeSlab struct {
Block _Block__Prototype
Block__Repr _Block__ReprPrototype
Bool _Bool__Prototype
Bool__Repr _Bool__ReprPrototype
Bytes _Bytes__Prototype
Bytes__Repr _Bytes__ReprPrototype
Entry _Entry__Prototype
Entry__Repr _Entry__ReprPrototype
Float _Float__Prototype
Float__Repr _Float__ReprPrototype
Hash _Hash__Prototype
Hash__Repr _Hash__ReprPrototype
Int _Int__Prototype
Int__Repr _Int__ReprPrototype
Link _Link__Prototype
Link__Repr _Link__ReprPrototype
List__Link _List__Link__Prototype
List__Link__Repr _List__Link__ReprPrototype
List__Shredding _List__Shredding__Prototype
List__Shredding__Repr _List__Shredding__ReprPrototype
Shredding _Shredding__Prototype
Shredding__Repr _Shredding__ReprPrototype
String _String__Prototype
String__Repr _String__ReprPrototype
Transaction _Transaction__Prototype
Transaction__Repr _Transaction__ReprPrototype
TransactionList _TransactionList__Prototype
TransactionList__Repr _TransactionList__ReprPrototype
}
// --- type definitions follow ---
// Block matches the IPLD Schema type "Block". It has struct type-kind, and may be interrogated like map kind.
type Block = *_Block
type _Block struct {
kind _Int
slot _Int
entries _List__Link
shredding _List__Shredding
}
// Bool matches the IPLD Schema type "Bool". It has bool kind.
type Bool = *_Bool
type _Bool struct{ x bool }
// Bytes matches the IPLD Schema type "Bytes". It has bytes kind.
type Bytes = *_Bytes
type _Bytes struct{ x []byte }
// Entry matches the IPLD Schema type "Entry". It has struct type-kind, and may be interrogated like map kind.
type Entry = *_Entry
type _Entry struct {
kind _Int
numHashes _Int
hash _Hash
txs _TransactionList
}
// Float matches the IPLD Schema type "Float". It has float kind.
type Float = *_Float
type _Float struct{ x float64 }
// Hash matches the IPLD Schema type "Hash". It has bytes kind.
type Hash = *_Hash
type _Hash struct{ x []byte }
// Int matches the IPLD Schema type "Int". It has int kind.
type Int = *_Int
type _Int struct{ x int64 }
// Link matches the IPLD Schema type "Link". It has link kind.
type Link = *_Link
type _Link struct{ x datamodel.Link }
// List__Link matches the IPLD Schema type "List__Link". It has list kind.
type List__Link = *_List__Link
type _List__Link struct {
x []_Link
}
// List__Shredding matches the IPLD Schema type "List__Shredding". It has list kind.
type List__Shredding = *_List__Shredding
type _List__Shredding struct {
x []_Shredding
}
// Shredding matches the IPLD Schema type "Shredding". It has struct type-kind, and may be interrogated like map kind.
type Shredding = *_Shredding
type _Shredding struct {
entryEndIdx _Int
shredEndIdx _Int
}
// String matches the IPLD Schema type "String". It has string kind.
type String = *_String
type _String struct{ x string }
// Transaction matches the IPLD Schema type "Transaction". It has bytes kind.
type Transaction = *_Transaction
type _Transaction struct{ x []byte }
// TransactionList matches the IPLD Schema type "TransactionList". It has list kind.
type TransactionList = *_TransactionList
type _TransactionList struct {
x []_Link
}

View File

@ -1,24 +0,0 @@
type Block struct {
kind Int
slot Int
entries [ Link ]
shredding [ Shredding ]
} representation tuple
type Shredding struct {
entryEndIdx Int
shredEndIdx Int
} representation tuple
type Entry struct {
kind Int
numHashes Int
hash Hash
txs TransactionList
} representation tuple
type Hash bytes
type TransactionList [ Link ]
type Transaction bytes