cmd/rpc/slots: add leader tracking
This commit is contained in:
parent
8d076e33de
commit
8f46262bab
|
@ -0,0 +1,195 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/certusone/radiance/proto/envv1"
|
||||
"github.com/gagliardetto/solana-go"
|
||||
"github.com/gagliardetto/solana-go/rpc"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
type leaderSchedule struct {
|
||||
// mu guards max and cur
|
||||
mu sync.Mutex
|
||||
// max is the current leader schedule's highest slot
|
||||
max uint64
|
||||
// cur is the current highest slot observed on the network
|
||||
cur uint64
|
||||
|
||||
// bySlot maps slots to their leader. Grows indefinitely as epochs progress.
|
||||
bySlot map[uint64]solana.PublicKey
|
||||
bySlotMu sync.Mutex
|
||||
|
||||
// slotsPerEpoch is the number of slots per epoch on the network.
|
||||
// Fetched once via RPC. Used to calculate epoch boundaries.
|
||||
slotsPerEpoch uint64
|
||||
|
||||
// initCh is used to signal that the leader schedule is available.
|
||||
initCh chan struct{}
|
||||
}
|
||||
|
||||
const (
|
||||
// prefetchSlots is the number of slots to prefetch.
|
||||
prefetchSlots = 1000
|
||||
)
|
||||
|
||||
// FirstSlot returns the epoch number and first slot of the epoch.
|
||||
func (t *leaderSchedule) FirstSlot(slotInEpoch uint64) (uint64, uint64) {
|
||||
epoch := slotInEpoch / t.slotsPerEpoch
|
||||
firstSlot := epoch * t.slotsPerEpoch
|
||||
return epoch, firstSlot
|
||||
}
|
||||
|
||||
// Update updates the current highest slot. Non-blocking.
|
||||
func (t *leaderSchedule) Update(slot uint64) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
if slot > t.cur {
|
||||
t.cur = slot
|
||||
}
|
||||
}
|
||||
|
||||
// Run periodically updates the leader schedule. When the current slot + prefetchSlots
|
||||
// is greater than the highest slot in the schedule, the schedule is updated.
|
||||
//
|
||||
// A random node is picked from nodes to do the request against.
|
||||
func (t *leaderSchedule) Run(ctx context.Context, nodes []*envv1.RPCNode) {
|
||||
t.initCh = make(chan struct{})
|
||||
|
||||
for {
|
||||
// Fetch slots per epoch
|
||||
node := nodes[rand.Intn(len(nodes))]
|
||||
c := rpc.New(node.Http)
|
||||
klog.Infof("Fetching epoch schedule from %s", node.Http)
|
||||
out, err := c.GetEpochSchedule(ctx)
|
||||
if err != nil {
|
||||
klog.Errorf("get epoch schedule: %w", err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
if out.FirstNormalEpoch != 0 {
|
||||
panic("first normal epoch should be 0")
|
||||
}
|
||||
if out.FirstNormalSlot != 0 {
|
||||
panic("first normal slot should be 0")
|
||||
}
|
||||
if out.LeaderScheduleSlotOffset <= prefetchSlots {
|
||||
panic("leader schedule slot offset should be greater than prefetch slots")
|
||||
}
|
||||
t.slotsPerEpoch = out.SlotsPerEpoch
|
||||
klog.Infof("Got epoch schedule: slotsPerEpoch=%d", t.slotsPerEpoch)
|
||||
break
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
t.mu.Lock()
|
||||
fetch := false
|
||||
|
||||
// If we're less than prefetchSlots slots away from the epoch boundary,
|
||||
// fetch the new leader schedule.
|
||||
threshold := t.cur + prefetchSlots
|
||||
|
||||
if threshold > t.max && t.cur != 0 {
|
||||
fetch = true
|
||||
}
|
||||
|
||||
// If we have no current leader schedule, fetch the current one.
|
||||
var slot uint64
|
||||
var prefetch bool
|
||||
if t.max == 0 {
|
||||
slot = t.cur
|
||||
} else {
|
||||
// If we have a leader schedule, prefetch the next one
|
||||
slot = t.max + 1
|
||||
prefetch = true
|
||||
}
|
||||
t.mu.Unlock()
|
||||
if fetch {
|
||||
if prefetch {
|
||||
klog.Infof("Prefetching leader schedule for cur=%d, threshold=%d, max=%d, slot=%d",
|
||||
t.cur, threshold, t.max, slot)
|
||||
if err := t.fetch(ctx, nodes, slot); err != nil {
|
||||
klog.Errorf("Failed to fetch leader schedule: %v", err)
|
||||
}
|
||||
} else {
|
||||
klog.Infof("Fetching initial leader schedule for cur=%d, threshold=%d, max=%d, slot=%d",
|
||||
t.cur, threshold, t.max, slot)
|
||||
if err := t.fetch(ctx, nodes, slot); err != nil {
|
||||
klog.Errorf("Failed to fetch leader schedule: %v", err)
|
||||
}
|
||||
|
||||
// Signal that the leader schedule is available
|
||||
close(t.initCh)
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *leaderSchedule) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot uint64) error {
|
||||
t.bySlotMu.Lock()
|
||||
defer t.bySlotMu.Unlock()
|
||||
|
||||
if t.bySlot == nil {
|
||||
t.bySlot = make(map[uint64]solana.PublicKey)
|
||||
}
|
||||
|
||||
// Pick random node from nodes
|
||||
node := nodes[rand.Intn(len(nodes))]
|
||||
klog.Infof("Using node %s", node.Http)
|
||||
|
||||
// Fetch the leader schedule
|
||||
c := rpc.New(node.Http)
|
||||
out, err := c.GetLeaderScheduleWithOpts(ctx, &rpc.GetLeaderScheduleOpts{
|
||||
Epoch: &slot,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("get leader schedule: %w", err)
|
||||
}
|
||||
|
||||
epoch, firstSlot := t.FirstSlot(slot)
|
||||
|
||||
// Update the leader schedule
|
||||
m := uint64(0)
|
||||
for pk, slots := range out {
|
||||
for _, s := range slots {
|
||||
t.bySlot[firstSlot+s] = pk
|
||||
if firstSlot+s > m {
|
||||
m = firstSlot + s
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
t.mu.Lock()
|
||||
t.max = m
|
||||
t.mu.Unlock()
|
||||
|
||||
klog.Infof("Updated leader schedule for epoch=%d, slot=%d, first=%d, max=%d",
|
||||
epoch, slot, firstSlot, t.max)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get returns the scheduled leader for the given slot.
|
||||
// It blocks until the leader schedule is available.
|
||||
func (t *leaderSchedule) Get(slot uint64) solana.PublicKey {
|
||||
// Block until the leader schedule is available
|
||||
if t.initCh != nil {
|
||||
<-t.initCh
|
||||
}
|
||||
|
||||
t.bySlotMu.Lock()
|
||||
defer t.bySlotMu.Unlock()
|
||||
|
||||
return t.bySlot[slot]
|
||||
}
|
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -18,9 +20,12 @@ var (
|
|||
flagEnv = flag.String("env", ".env.prototxt", "Env file (.prototxt)")
|
||||
flagOnly = flag.String("only", "", "Only watch specified nodes (comma-separated)")
|
||||
flagType = flag.String("type", "", "Only print specific types")
|
||||
|
||||
flagDebugAddr = flag.String("debugAddr", "localhost:6060", "pprof/metrics listen address")
|
||||
)
|
||||
|
||||
func init() {
|
||||
klog.InitFlags(nil)
|
||||
flag.Parse()
|
||||
}
|
||||
|
||||
|
@ -68,19 +73,30 @@ func main() {
|
|||
klog.Fatalf("No nodes found in env file")
|
||||
}
|
||||
|
||||
go func() {
|
||||
klog.Error(http.ListenAndServe(*flagDebugAddr, nil))
|
||||
}()
|
||||
|
||||
nodes = filterNodes(nodes, parseOnlyFlag(*flagOnly))
|
||||
|
||||
if len(nodes) == 0 {
|
||||
klog.Exitf("No nodes in environment or all nodes filtered")
|
||||
}
|
||||
klog.Infof("Watching %d nodes", len(nodes))
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
highest := &sync.Map{}
|
||||
|
||||
sched := &leaderSchedule{}
|
||||
|
||||
go sched.Run(ctx, env.Nodes)
|
||||
|
||||
for _, node := range nodes {
|
||||
node := node
|
||||
go func() {
|
||||
for {
|
||||
if err := watchSlotUpdates(ctx, node, highest); err != nil {
|
||||
if err := watchSlotUpdates(ctx, node, highest, sched); err != nil {
|
||||
klog.Errorf("watchSlotUpdates on node %s, reconnecting: %v", node.Name, err)
|
||||
}
|
||||
time.Sleep(time.Second * 5)
|
||||
|
@ -102,7 +118,7 @@ func main() {
|
|||
select {}
|
||||
}
|
||||
|
||||
func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map) error {
|
||||
func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map, sched *leaderSchedule) error {
|
||||
timeout, cancel := context.WithTimeout(ctx, time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
|
@ -123,12 +139,14 @@ func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Ma
|
|||
}
|
||||
|
||||
ts := m.Timestamp.Time()
|
||||
delta := time.Since(ts)
|
||||
delay := time.Since(ts)
|
||||
|
||||
if *flagType != "" && string(m.Type) != *flagType {
|
||||
continue
|
||||
}
|
||||
|
||||
sched.Update(m.Slot)
|
||||
|
||||
var first time.Time
|
||||
if m.Type == ws.SlotsUpdatesFirstShredReceived {
|
||||
value, _ := highest.LoadOrStore(m.Slot, ts)
|
||||
|
@ -146,13 +164,13 @@ func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Ma
|
|||
|
||||
var prop int64
|
||||
if !first.IsZero() {
|
||||
prop = time.Since(first).Milliseconds()
|
||||
prop = ts.Sub(first).Milliseconds()
|
||||
} else {
|
||||
prop = -1
|
||||
}
|
||||
|
||||
klog.Infof("%s: slot=%d type=%s delta=%dms prop=%dms parent=%d stats=%v",
|
||||
node.Name, m.Slot, m.Type, delta.Milliseconds(), prop, m.Parent, m.Stats)
|
||||
klog.Infof("%s: slot=%d type=%s delay=%dms prop=%dms parent=%d stats=%v leader=%s",
|
||||
node.Name, m.Slot, m.Type, delay.Milliseconds(), prop, m.Parent, m.Stats, sched.Get(m.Slot))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue