cmd/rpc/txd: leader tracking
This commit is contained in:
parent
8cab82248f
commit
a4ff232b04
|
@ -9,6 +9,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/certusone/radiance/pkg/clusternodes"
|
||||
"github.com/certusone/radiance/pkg/envfile"
|
||||
"github.com/certusone/radiance/pkg/leaderschedule"
|
||||
envv1 "github.com/certusone/radiance/proto/env/v1"
|
||||
|
@ -56,13 +57,17 @@ func main() {
|
|||
sched := &leaderschedule.Tracker{}
|
||||
go sched.Run(ctx, env.Nodes)
|
||||
|
||||
// Gossip helper
|
||||
gossip := clusternodes.New()
|
||||
go gossip.Run(ctx, env.Nodes, time.Minute)
|
||||
|
||||
var highest uint64
|
||||
|
||||
for _, node := range nodes {
|
||||
node := node
|
||||
go func() {
|
||||
for {
|
||||
if err := watchSlotUpdates(ctx, node, sched, &highest); err != nil {
|
||||
if err := watchSlotUpdates(ctx, node, sched, gossip, &highest); err != nil {
|
||||
klog.Errorf("watchSlotUpdates on node %s, reconnecting: %v", node.Name, err)
|
||||
}
|
||||
time.Sleep(time.Second * 5)
|
||||
|
@ -73,7 +78,7 @@ func main() {
|
|||
select {}
|
||||
}
|
||||
|
||||
func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, sched *leaderschedule.Tracker, highest *uint64) error {
|
||||
func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, sched *leaderschedule.Tracker, gossip *clusternodes.Tracker, highest *uint64) error {
|
||||
timeout, cancel := context.WithTimeout(ctx, time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
|
@ -101,6 +106,17 @@ func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, sched *leadersch
|
|||
if m.Slot > atomic.LoadUint64(highest) {
|
||||
atomic.StoreUint64(highest, m.Slot)
|
||||
klog.Infof("%s: highest slot is now %d", node.Name, m.Slot)
|
||||
leader, ok := sched.TryGet(m.Slot)
|
||||
if !ok {
|
||||
klog.Infof("could not fetch leader for slot %d", m.Slot)
|
||||
continue
|
||||
}
|
||||
g := gossip.GetByPubkey(leader)
|
||||
if g == nil || g.TPU == nil {
|
||||
klog.Infof("could not fetch gossip entry for leader %s", leader)
|
||||
continue
|
||||
}
|
||||
klog.Infof("current leader: %s, tpu: %s", leader, *g.TPU)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
package clusternodes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
envv1 "github.com/certusone/radiance/proto/env/v1"
|
||||
"github.com/gagliardetto/solana-go"
|
||||
"github.com/gagliardetto/solana-go/rpc"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
type Tracker struct {
|
||||
mu sync.Mutex
|
||||
current []*rpc.GetClusterNodesResult
|
||||
byPubkey map[solana.PublicKey]*rpc.GetClusterNodesResult
|
||||
}
|
||||
|
||||
func New() *Tracker {
|
||||
return &Tracker{
|
||||
byPubkey: make(map[solana.PublicKey]*rpc.GetClusterNodesResult),
|
||||
}
|
||||
}
|
||||
|
||||
// Run periodically fetches the gossip
|
||||
func (t *Tracker) Run(ctx context.Context, nodes []*envv1.RPCNode, interval time.Duration) {
|
||||
t.update(ctx, nodes)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(interval):
|
||||
t.update(ctx, nodes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Tracker) update(ctx context.Context, nodes []*envv1.RPCNode) {
|
||||
now := time.Now()
|
||||
|
||||
// Fetch gossip
|
||||
node := nodes[rand.Intn(len(nodes))]
|
||||
c := rpc.New(node.Http)
|
||||
klog.Infof("Fetching cluster nodes from %s", node.Http)
|
||||
out, err := c.GetClusterNodes(ctx)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update cluster nodes: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
klog.Infof("Fetched %d nodes in %v", len(out), time.Since(now))
|
||||
|
||||
t.mu.Lock()
|
||||
t.current = out
|
||||
for _, n := range out {
|
||||
t.byPubkey[n.Pubkey] = n
|
||||
}
|
||||
t.mu.Unlock()
|
||||
}
|
||||
|
||||
func (t *Tracker) GetByPubkey(pubkey solana.PublicKey) *rpc.GetClusterNodesResult {
|
||||
t.mu.Lock()
|
||||
entry := t.byPubkey[pubkey]
|
||||
t.mu.Unlock()
|
||||
return entry
|
||||
}
|
|
@ -138,12 +138,7 @@ func (t *Tracker) Run(ctx context.Context, nodes []*envv1.RPCNode) {
|
|||
}
|
||||
|
||||
func (t *Tracker) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot uint64) error {
|
||||
t.bySlotMu.Lock()
|
||||
defer t.bySlotMu.Unlock()
|
||||
|
||||
if t.bySlot == nil {
|
||||
t.bySlot = make(map[uint64]solana.PublicKey)
|
||||
}
|
||||
now := time.Now()
|
||||
|
||||
// Pick random node from nodes
|
||||
node := nodes[rand.Intn(len(nodes))]
|
||||
|
@ -160,6 +155,17 @@ func (t *Tracker) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot uint64
|
|||
|
||||
epoch, firstSlot := t.FirstSlot(slot)
|
||||
|
||||
klog.Infof("Fetched epoch schedule from %s in %v", node.Http, time.Since(now))
|
||||
|
||||
now = time.Now()
|
||||
defer klog.V(1).Infof("bySlotMu: %v", time.Since(now))
|
||||
t.bySlotMu.Lock()
|
||||
defer t.bySlotMu.Unlock()
|
||||
|
||||
if t.bySlot == nil {
|
||||
t.bySlot = make(map[uint64]solana.PublicKey)
|
||||
}
|
||||
|
||||
// Update the leader schedule
|
||||
m := uint64(0)
|
||||
for pk, slots := range out {
|
||||
|
@ -193,3 +199,16 @@ func (t *Tracker) Get(slot uint64) solana.PublicKey {
|
|||
|
||||
return t.bySlot[slot]
|
||||
}
|
||||
|
||||
// TryGet returns the scheduled leader for the given slot.
|
||||
// It returns false if the leader schedule is not yet available.
|
||||
func (t *Tracker) TryGet(slot uint64) (solana.PublicKey, bool) {
|
||||
t.bySlotMu.RLock()
|
||||
defer t.bySlotMu.RUnlock()
|
||||
|
||||
if t.bySlot == nil {
|
||||
return solana.PublicKey{}, false
|
||||
}
|
||||
|
||||
return t.bySlot[slot], true
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue