diff --git a/cmd/rpc/slots/epoch.go b/cmd/rpc/slots/epoch.go new file mode 100644 index 0000000..c732c8f --- /dev/null +++ b/cmd/rpc/slots/epoch.go @@ -0,0 +1,195 @@ +package main + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/certusone/radiance/proto/envv1" + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/rpc" + "k8s.io/klog/v2" +) + +type leaderSchedule struct { + // mu guards max and cur + mu sync.Mutex + // max is the current leader schedule's highest slot + max uint64 + // cur is the current highest slot observed on the network + cur uint64 + + // bySlot maps slots to their leader. Grows indefinitely as epochs progress. + bySlot map[uint64]solana.PublicKey + bySlotMu sync.Mutex + + // slotsPerEpoch is the number of slots per epoch on the network. + // Fetched once via RPC. Used to calculate epoch boundaries. + slotsPerEpoch uint64 + + // initCh is used to signal that the leader schedule is available. + initCh chan struct{} +} + +const ( + // prefetchSlots is the number of slots to prefetch. + prefetchSlots = 1000 +) + +// FirstSlot returns the epoch number and first slot of the epoch. +func (t *leaderSchedule) FirstSlot(slotInEpoch uint64) (uint64, uint64) { + epoch := slotInEpoch / t.slotsPerEpoch + firstSlot := epoch * t.slotsPerEpoch + return epoch, firstSlot +} + +// Update updates the current highest slot. Non-blocking. +func (t *leaderSchedule) Update(slot uint64) { + t.mu.Lock() + defer t.mu.Unlock() + + if slot > t.cur { + t.cur = slot + } +} + +// Run periodically updates the leader schedule. When the current slot + prefetchSlots +// is greater than the highest slot in the schedule, the schedule is updated. +// +// A random node is picked from nodes to do the request against. +func (t *leaderSchedule) Run(ctx context.Context, nodes []*envv1.RPCNode) { + t.initCh = make(chan struct{}) + + for { + // Fetch slots per epoch + node := nodes[rand.Intn(len(nodes))] + c := rpc.New(node.Http) + klog.Infof("Fetching epoch schedule from %s", node.Http) + out, err := c.GetEpochSchedule(ctx) + if err != nil { + klog.Errorf("get epoch schedule: %w", err) + time.Sleep(time.Second) + continue + } + if out.FirstNormalEpoch != 0 { + panic("first normal epoch should be 0") + } + if out.FirstNormalSlot != 0 { + panic("first normal slot should be 0") + } + if out.LeaderScheduleSlotOffset <= prefetchSlots { + panic("leader schedule slot offset should be greater than prefetch slots") + } + t.slotsPerEpoch = out.SlotsPerEpoch + klog.Infof("Got epoch schedule: slotsPerEpoch=%d", t.slotsPerEpoch) + break + } + + for { + select { + case <-ctx.Done(): + return + default: + t.mu.Lock() + fetch := false + + // If we're less than prefetchSlots slots away from the epoch boundary, + // fetch the new leader schedule. + threshold := t.cur + prefetchSlots + + if threshold > t.max && t.cur != 0 { + fetch = true + } + + // If we have no current leader schedule, fetch the current one. + var slot uint64 + var prefetch bool + if t.max == 0 { + slot = t.cur + } else { + // If we have a leader schedule, prefetch the next one + slot = t.max + 1 + prefetch = true + } + t.mu.Unlock() + if fetch { + if prefetch { + klog.Infof("Prefetching leader schedule for cur=%d, threshold=%d, max=%d, slot=%d", + t.cur, threshold, t.max, slot) + if err := t.fetch(ctx, nodes, slot); err != nil { + klog.Errorf("Failed to fetch leader schedule: %v", err) + } + } else { + klog.Infof("Fetching initial leader schedule for cur=%d, threshold=%d, max=%d, slot=%d", + t.cur, threshold, t.max, slot) + if err := t.fetch(ctx, nodes, slot); err != nil { + klog.Errorf("Failed to fetch leader schedule: %v", err) + } + + // Signal that the leader schedule is available + close(t.initCh) + } + } + time.Sleep(time.Second) + } + } +} + +func (t *leaderSchedule) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot uint64) error { + t.bySlotMu.Lock() + defer t.bySlotMu.Unlock() + + if t.bySlot == nil { + t.bySlot = make(map[uint64]solana.PublicKey) + } + + // Pick random node from nodes + node := nodes[rand.Intn(len(nodes))] + klog.Infof("Using node %s", node.Http) + + // Fetch the leader schedule + c := rpc.New(node.Http) + out, err := c.GetLeaderScheduleWithOpts(ctx, &rpc.GetLeaderScheduleOpts{ + Epoch: &slot, + }) + if err != nil { + return fmt.Errorf("get leader schedule: %w", err) + } + + epoch, firstSlot := t.FirstSlot(slot) + + // Update the leader schedule + m := uint64(0) + for pk, slots := range out { + for _, s := range slots { + t.bySlot[firstSlot+s] = pk + if firstSlot+s > m { + m = firstSlot + s + } + } + } + + t.mu.Lock() + t.max = m + t.mu.Unlock() + + klog.Infof("Updated leader schedule for epoch=%d, slot=%d, first=%d, max=%d", + epoch, slot, firstSlot, t.max) + return nil +} + +// Get returns the scheduled leader for the given slot. +// It blocks until the leader schedule is available. +func (t *leaderSchedule) Get(slot uint64) solana.PublicKey { + // Block until the leader schedule is available + if t.initCh != nil { + <-t.initCh + } + + t.bySlotMu.Lock() + defer t.bySlotMu.Unlock() + + return t.bySlot[slot] +} diff --git a/cmd/rpc/slots/slot.go b/cmd/rpc/slots/slot.go index 5de5b32..30e5d42 100644 --- a/cmd/rpc/slots/slot.go +++ b/cmd/rpc/slots/slot.go @@ -4,6 +4,8 @@ import ( "context" "flag" "fmt" + "net/http" + _ "net/http/pprof" "strings" "sync" "time" @@ -18,9 +20,12 @@ var ( flagEnv = flag.String("env", ".env.prototxt", "Env file (.prototxt)") flagOnly = flag.String("only", "", "Only watch specified nodes (comma-separated)") flagType = flag.String("type", "", "Only print specific types") + + flagDebugAddr = flag.String("debugAddr", "localhost:6060", "pprof/metrics listen address") ) func init() { + klog.InitFlags(nil) flag.Parse() } @@ -68,19 +73,30 @@ func main() { klog.Fatalf("No nodes found in env file") } + go func() { + klog.Error(http.ListenAndServe(*flagDebugAddr, nil)) + }() + nodes = filterNodes(nodes, parseOnlyFlag(*flagOnly)) + if len(nodes) == 0 { + klog.Exitf("No nodes in environment or all nodes filtered") + } klog.Infof("Watching %d nodes", len(nodes)) ctx := context.Background() highest := &sync.Map{} + sched := &leaderSchedule{} + + go sched.Run(ctx, env.Nodes) + for _, node := range nodes { node := node go func() { for { - if err := watchSlotUpdates(ctx, node, highest); err != nil { + if err := watchSlotUpdates(ctx, node, highest, sched); err != nil { klog.Errorf("watchSlotUpdates on node %s, reconnecting: %v", node.Name, err) } time.Sleep(time.Second * 5) @@ -102,7 +118,7 @@ func main() { select {} } -func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map) error { +func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map, sched *leaderSchedule) error { timeout, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() @@ -123,12 +139,14 @@ func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Ma } ts := m.Timestamp.Time() - delta := time.Since(ts) + delay := time.Since(ts) if *flagType != "" && string(m.Type) != *flagType { continue } + sched.Update(m.Slot) + var first time.Time if m.Type == ws.SlotsUpdatesFirstShredReceived { value, _ := highest.LoadOrStore(m.Slot, ts) @@ -146,13 +164,13 @@ func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Ma var prop int64 if !first.IsZero() { - prop = time.Since(first).Milliseconds() + prop = ts.Sub(first).Milliseconds() } else { prop = -1 } - klog.Infof("%s: slot=%d type=%s delta=%dms prop=%dms parent=%d stats=%v", - node.Name, m.Slot, m.Type, delta.Milliseconds(), prop, m.Parent, m.Stats) + klog.Infof("%s: slot=%d type=%s delay=%dms prop=%dms parent=%d stats=%v leader=%s", + node.Name, m.Slot, m.Type, delay.Milliseconds(), prop, m.Parent, m.Stats, sched.Get(m.Slot)) } }