car: allow multiplexer to seek to a slot
This commit is contained in:
parent
31e05eec91
commit
7aea7df6fd
|
@ -1,7 +1,7 @@
|
|||
package car
|
||||
|
||||
import (
|
||||
"github.com/certusone/radiance/cmd/radiance/car/create"
|
||||
"go.firedancer.io/radiance/cmd/radiance/car/create"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
|
|
|
@ -1,21 +1,53 @@
|
|||
package create
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"go.firedancer.io/radiance/pkg/blockstore"
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
var Cmd = cobra.Command{
|
||||
Use: "create <epoch...>",
|
||||
Use: "create <out.car> <epoch>",
|
||||
Short: "Create CAR archives from blockstore",
|
||||
Long: "Extracts Solana ledger data from blockstore (RocksDB) databases,\n" +
|
||||
"and outputs IPLD CARs (content-addressable archives).\n" +
|
||||
"\n" +
|
||||
"Produces at least one CAR per epoch.\n" +
|
||||
"CAR archive contents are deterministic.",
|
||||
Args: cobra.ExactArgs(2),
|
||||
}
|
||||
|
||||
// TODO: Actually write the CAR!
|
||||
// |
|
||||
// Our plan is to transform epochs of Solana history (432000 slots) into batches of CAR files.
|
||||
// The CAR output must be byte-by-byte deterministic with regard to Solana's authenticated ledger content.
|
||||
// In other words, regardless of which node operator runs this tool, they should always get the same CAR file.
|
||||
// |
|
||||
// github.com/ipld/go-car/v2 is not helpful because it wants to traverse a complete IPLD link system.
|
||||
// However we create an IPLD link system (Merkle-DAG) on the fly in a single pass as we read the chain.
|
||||
// CARv1 is simple enough that we can roll a custom block writer, so no big deal. Vec<(len, cid, data)>
|
||||
// We only need to reserve sufficient space for the CARv1 header at the beginning of the file.
|
||||
// Of course, the root CID is not known yet, so we leave a placeholder hash value and fill it in on completion.
|
||||
// |
|
||||
// The procedure needs to respect Filecoin's 32GB sector size and split data across multiple CARs if needed.
|
||||
// We use Solana blocks as an atomic unit that is never split across CARs.
|
||||
// This allows us to assign a slot range to each CAR for the reader's convenience, at negligible alignment cost.
|
||||
// |
|
||||
// Transforming a single epoch, which takes about a day on mainnet, should take a few hours to transform into CAR.
|
||||
// Because of epoch alignment, the CAR generation process can be trivially parallelized by launching multiple instances.
|
||||
// In theory, the ledger data extraction process for even a single CAR can be parallelized, at questionable gains.
|
||||
// We can synchronize multiple RocksDB iterators that jump over each other block-by-block.
|
||||
// CAR writing cannot be parallelized because of strict ordering requirements (determinism).
|
||||
// |
|
||||
// Open question: Do we want to use CARv2 indexes? Probably yes.
|
||||
// Will complicate our bespoke CAR writing approach though and make it less maintainable.
|
||||
// Maybe we can construct indexes in-memory using go-car while we are writing CARv1 and then append then once done.
|
||||
|
||||
// TODO: there is a number of things [above] that are conceptually incorrect -- @ribasushi
|
||||
|
||||
var flags = Cmd.Flags()
|
||||
|
||||
var (
|
||||
|
@ -26,7 +58,21 @@ func init() {
|
|||
Cmd.Run = run
|
||||
}
|
||||
|
||||
func run(_ *cobra.Command, _ []string) {
|
||||
func run(c *cobra.Command, args []string) {
|
||||
outPath := args[0]
|
||||
epochStr := args[1]
|
||||
epoch, err := strconv.ParseUint(epochStr, 10, 32)
|
||||
if err != nil {
|
||||
klog.Exitf("Invalid epoch arg: %s", epochStr)
|
||||
}
|
||||
|
||||
// TODO mainnet history later on requires multiple CAR files per epoch
|
||||
f, err := os.OpenFile(outPath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666)
|
||||
if err != nil {
|
||||
klog.Exit(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Open blockstores
|
||||
dbPaths := *flagDBs
|
||||
handles := make([]dbHandle, len(*flagDBs))
|
||||
|
@ -45,16 +91,32 @@ func run(_ *cobra.Command, _ []string) {
|
|||
klog.Exitf("Failed to open all DBs: %s", err)
|
||||
}
|
||||
|
||||
for {
|
||||
// TODO context handling
|
||||
// Seek to epoch start and make sure we have all data
|
||||
const epochLen = 432000
|
||||
start := epoch * epochLen
|
||||
stop := start + epochLen
|
||||
mw.seek(start)
|
||||
klog.Infof("Starting at slot %d", start)
|
||||
slotsAvailable := mw.len()
|
||||
if slotsAvailable < epochLen {
|
||||
klog.Exitf("Need %d slots but got %d", epochLen, slotsAvailable)
|
||||
}
|
||||
|
||||
ctx := c.Context()
|
||||
|
||||
for ctx.Err() == nil {
|
||||
meta, ok := mw.next()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if meta.Slot > stop {
|
||||
break
|
||||
}
|
||||
entries, err := mw.get(meta)
|
||||
if err != nil {
|
||||
klog.Exitf("FATAL: Failed to get entry at slot %d: %s", meta.Slot, err)
|
||||
}
|
||||
_ = entries
|
||||
panic("unimplemented")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,19 +4,52 @@ import (
|
|||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/certusone/radiance/pkg/blockstore"
|
||||
"go.firedancer.io/radiance/pkg/blockstore"
|
||||
"github.com/linxGnu/grocksdb"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// TODO: Multiplexer should stop iterating when non-rooted slots are reached
|
||||
|
||||
// multiWalk walks blocks in ascending order over multiple RocksDB databases.
|
||||
type multiWalk struct {
|
||||
iter *grocksdb.Iterator
|
||||
handles []dbHandle // sorted
|
||||
}
|
||||
|
||||
// seek skips ahead to a specific slot.
|
||||
// The caller must call multiWalk.next after seek.
|
||||
func (m *multiWalk) seek(slot uint64) bool {
|
||||
for len(m.handles) > 0 {
|
||||
h := m.handles[0]
|
||||
if slot < h.start {
|
||||
// trying to seek to slot below lowest available
|
||||
return false
|
||||
}
|
||||
if slot <= h.stop {
|
||||
h.start = slot
|
||||
return true
|
||||
}
|
||||
m.pop()
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// len returns the number of contiguous slots that lay ahead.
|
||||
func (m *multiWalk) len() (total uint64) {
|
||||
if len(m.handles) == 0 {
|
||||
return 0
|
||||
}
|
||||
start := m.handles[0].start
|
||||
for _, h := range m.handles {
|
||||
if h.start > start {
|
||||
return
|
||||
}
|
||||
stop := h.stop + 1
|
||||
total += stop - start
|
||||
start = stop
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// next seeks to the next slot.
|
||||
func (m *multiWalk) next() (meta *blockstore.SlotMeta, ok bool) {
|
||||
if len(m.handles) == 0 {
|
||||
|
@ -31,10 +64,7 @@ func (m *multiWalk) next() (meta *blockstore.SlotMeta, ok bool) {
|
|||
}
|
||||
if !m.iter.Valid() {
|
||||
// Close current DB and go to next
|
||||
m.iter.Close()
|
||||
m.iter = nil
|
||||
h.db.Close()
|
||||
m.handles = m.handles[1:]
|
||||
m.pop()
|
||||
return m.next() // TODO tail recursion optimization?
|
||||
}
|
||||
|
||||
|
@ -43,6 +73,11 @@ func (m *multiWalk) next() (meta *blockstore.SlotMeta, ok bool) {
|
|||
if !ok {
|
||||
klog.Exitf("Invalid slot key: %x", m.iter.Key().Data())
|
||||
}
|
||||
if slot > h.stop {
|
||||
m.pop()
|
||||
return m.next()
|
||||
}
|
||||
h.start = slot
|
||||
|
||||
// Get value at current position.
|
||||
meta, err := blockstore.ParseBincode[blockstore.SlotMeta](m.iter.Value().Data())
|
||||
|
@ -74,6 +109,14 @@ func (m *multiWalk) get(meta *blockstore.SlotMeta) ([]blockstore.Entries, error)
|
|||
return h.db.GetEntries(meta)
|
||||
}
|
||||
|
||||
// pop closes the current open DB.
|
||||
func (m *multiWalk) pop() {
|
||||
m.iter.Close()
|
||||
m.iter = nil
|
||||
m.handles[0].db.Close()
|
||||
m.handles = m.handles[1:]
|
||||
}
|
||||
|
||||
func (m *multiWalk) close() {
|
||||
if m.iter != nil {
|
||||
m.iter.Close()
|
||||
|
@ -86,20 +129,26 @@ func (m *multiWalk) close() {
|
|||
}
|
||||
|
||||
type dbHandle struct {
|
||||
start uint64
|
||||
db *blockstore.DB
|
||||
start uint64
|
||||
stop uint64 // inclusive
|
||||
}
|
||||
|
||||
// sortDBs detects bounds of each DB and sorts handles.
|
||||
func sortDBs(h []dbHandle) error {
|
||||
for i, db := range h {
|
||||
// Find lowest available slot in DB.
|
||||
// Find lowest and highest available slot in DB.
|
||||
start, err := getLowestCompletedSlot(db.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stop, err := db.db.MaxRoot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h[i] = dbHandle{
|
||||
start: start,
|
||||
stop: stop,
|
||||
db: db.db,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
package create
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestMultiWalk_Len(t *testing.T) {
|
||||
mw := multiWalk{
|
||||
handles: []dbHandle{
|
||||
{start: 0, stop: 16},
|
||||
{start: 14, stop: 31},
|
||||
{start: 32, stop: 34},
|
||||
{start: 36, stop: 100},
|
||||
},
|
||||
}
|
||||
assert.Equal(t, uint64(35), mw.len())
|
||||
}
|
4
go.mod
4
go.mod
|
@ -21,7 +21,7 @@ require (
|
|||
github.com/prometheus/client_golang v1.13.0
|
||||
github.com/segmentio/textio v1.2.0
|
||||
github.com/slok/go-http-metrics v0.10.0
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
|
||||
github.com/spaolacci/murmur3 v1.1.0
|
||||
github.com/spf13/cobra v1.5.0
|
||||
github.com/stretchr/testify v1.8.0
|
||||
github.com/twmb/franz-go v1.7.1
|
||||
|
@ -72,6 +72,7 @@ require (
|
|||
github.com/inconshreveable/mousetrap v1.0.1 // indirect
|
||||
github.com/josharian/native v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/kr/pretty v0.3.0 // indirect
|
||||
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
|
||||
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
|
||||
github.com/marten-seemann/qtls-go1-17 v0.1.2 // indirect
|
||||
|
@ -120,6 +121,7 @@ require (
|
|||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/genproto v0.0.0-20220725144611-272f38e5d71b // indirect
|
||||
google.golang.org/grpc v1.48.0 // indirect
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||
)
|
||||
|
||||
|
|
15
go.sum
15
go.sum
|
@ -156,6 +156,7 @@ github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
|
|||
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/daaku/go.zipexe v1.0.0/go.mod h1:z8IiR6TsVLEYKwXAoE/I+8ys/sDkgTzSL0CLnGVd57E=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
|
@ -386,12 +387,14 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv
|
|||
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
|
||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
||||
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/leoluk/quic-go v0.27.1-0.20220617104859-ebc95102ed1c h1:AcfDNX/6D/ITmaooWVQUeYpxraPRbrUrrolIN3GSH0U=
|
||||
github.com/leoluk/quic-go v0.27.1-0.20220617104859-ebc95102ed1c/go.mod h1:vXgO/11FBSKM+js1NxoaQ/bPtVFYfB7uxhfHXyMhl1A=
|
||||
github.com/linxGnu/grocksdb v1.7.7 h1:b6o8gagb4FL+P55qUzPchBR/C0u1lWjJOWQSWbhvTWg=
|
||||
|
@ -528,6 +531,8 @@ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ
|
|||
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
|
||||
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
|
||||
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
|
@ -571,8 +576,9 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9
|
|||
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
|
||||
github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE=
|
||||
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
|
||||
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
|
||||
github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4=
|
||||
github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
|
||||
|
@ -1208,8 +1214,9 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
|
|||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
|
|
Loading…
Reference in New Issue