diff --git a/cmd/rpc/txd/txd.go b/cmd/rpc/txd/txd.go index c88d7e2..77d689c 100644 --- a/cmd/rpc/txd/txd.go +++ b/cmd/rpc/txd/txd.go @@ -9,6 +9,7 @@ import ( "sync/atomic" "time" + "github.com/certusone/radiance/pkg/clusternodes" "github.com/certusone/radiance/pkg/envfile" "github.com/certusone/radiance/pkg/leaderschedule" envv1 "github.com/certusone/radiance/proto/env/v1" @@ -56,13 +57,17 @@ func main() { sched := &leaderschedule.Tracker{} go sched.Run(ctx, env.Nodes) + // Gossip helper + gossip := clusternodes.New() + go gossip.Run(ctx, env.Nodes, time.Minute) + var highest uint64 for _, node := range nodes { node := node go func() { for { - if err := watchSlotUpdates(ctx, node, sched, &highest); err != nil { + if err := watchSlotUpdates(ctx, node, sched, gossip, &highest); err != nil { klog.Errorf("watchSlotUpdates on node %s, reconnecting: %v", node.Name, err) } time.Sleep(time.Second * 5) @@ -73,7 +78,7 @@ func main() { select {} } -func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, sched *leaderschedule.Tracker, highest *uint64) error { +func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, sched *leaderschedule.Tracker, gossip *clusternodes.Tracker, highest *uint64) error { timeout, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() @@ -101,6 +106,17 @@ func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, sched *leadersch if m.Slot > atomic.LoadUint64(highest) { atomic.StoreUint64(highest, m.Slot) klog.Infof("%s: highest slot is now %d", node.Name, m.Slot) + leader, ok := sched.TryGet(m.Slot) + if !ok { + klog.Infof("could not fetch leader for slot %d", m.Slot) + continue + } + g := gossip.GetByPubkey(leader) + if g == nil || g.TPU == nil { + klog.Infof("could not fetch gossip entry for leader %s", leader) + continue + } + klog.Infof("current leader: %s, tpu: %s", leader, *g.TPU) } } } diff --git a/pkg/clusternodes/gossip.go b/pkg/clusternodes/gossip.go new file mode 100644 index 0000000..a32cf73 --- /dev/null +++ b/pkg/clusternodes/gossip.go @@ -0,0 +1,68 @@ +package clusternodes + +import ( + "context" + "math/rand" + "sync" + "time" + + envv1 "github.com/certusone/radiance/proto/env/v1" + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/rpc" + "k8s.io/klog/v2" +) + +type Tracker struct { + mu sync.Mutex + current []*rpc.GetClusterNodesResult + byPubkey map[solana.PublicKey]*rpc.GetClusterNodesResult +} + +func New() *Tracker { + return &Tracker{ + byPubkey: make(map[solana.PublicKey]*rpc.GetClusterNodesResult), + } +} + +// Run periodically fetches the gossip +func (t *Tracker) Run(ctx context.Context, nodes []*envv1.RPCNode, interval time.Duration) { + t.update(ctx, nodes) + for { + select { + case <-ctx.Done(): + return + case <-time.After(interval): + t.update(ctx, nodes) + } + } +} + +func (t *Tracker) update(ctx context.Context, nodes []*envv1.RPCNode) { + now := time.Now() + + // Fetch gossip + node := nodes[rand.Intn(len(nodes))] + c := rpc.New(node.Http) + klog.Infof("Fetching cluster nodes from %s", node.Http) + out, err := c.GetClusterNodes(ctx) + if err != nil { + klog.Errorf("Failed to update cluster nodes: %v", err) + return + } + + klog.Infof("Fetched %d nodes in %v", len(out), time.Since(now)) + + t.mu.Lock() + t.current = out + for _, n := range out { + t.byPubkey[n.Pubkey] = n + } + t.mu.Unlock() +} + +func (t *Tracker) GetByPubkey(pubkey solana.PublicKey) *rpc.GetClusterNodesResult { + t.mu.Lock() + entry := t.byPubkey[pubkey] + t.mu.Unlock() + return entry +} diff --git a/pkg/leaderschedule/epoch.go b/pkg/leaderschedule/epoch.go index 1922c67..a737f89 100644 --- a/pkg/leaderschedule/epoch.go +++ b/pkg/leaderschedule/epoch.go @@ -138,12 +138,7 @@ func (t *Tracker) Run(ctx context.Context, nodes []*envv1.RPCNode) { } func (t *Tracker) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot uint64) error { - t.bySlotMu.Lock() - defer t.bySlotMu.Unlock() - - if t.bySlot == nil { - t.bySlot = make(map[uint64]solana.PublicKey) - } + now := time.Now() // Pick random node from nodes node := nodes[rand.Intn(len(nodes))] @@ -160,6 +155,17 @@ func (t *Tracker) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot uint64 epoch, firstSlot := t.FirstSlot(slot) + klog.Infof("Fetched epoch schedule from %s in %v", node.Http, time.Since(now)) + + now = time.Now() + defer klog.V(1).Infof("bySlotMu: %v", time.Since(now)) + t.bySlotMu.Lock() + defer t.bySlotMu.Unlock() + + if t.bySlot == nil { + t.bySlot = make(map[uint64]solana.PublicKey) + } + // Update the leader schedule m := uint64(0) for pk, slots := range out { @@ -193,3 +199,16 @@ func (t *Tracker) Get(slot uint64) solana.PublicKey { return t.bySlot[slot] } + +// TryGet returns the scheduled leader for the given slot. +// It returns false if the leader schedule is not yet available. +func (t *Tracker) TryGet(slot uint64) (solana.PublicKey, bool) { + t.bySlotMu.RLock() + defer t.bySlotMu.RUnlock() + + if t.bySlot == nil { + return solana.PublicKey{}, false + } + + return t.bySlot[slot], true +}