cmd/rpc/txd: blockhash tracking

This commit is contained in:
Leopold Schabel 2022-06-23 15:43:29 +02:00
parent a4ff232b04
commit dccb8adf29
2 changed files with 137 additions and 2 deletions

View File

@ -9,10 +9,12 @@ import (
"sync/atomic"
"time"
"github.com/certusone/radiance/pkg/blockhash"
"github.com/certusone/radiance/pkg/clusternodes"
"github.com/certusone/radiance/pkg/envfile"
"github.com/certusone/radiance/pkg/leaderschedule"
envv1 "github.com/certusone/radiance/proto/env/v1"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc/ws"
"k8s.io/klog/v2"
)
@ -61,13 +63,17 @@ func main() {
gossip := clusternodes.New()
go gossip.Run(ctx, env.Nodes, time.Minute)
// Blockhash helper
bh := blockhash.New(nodes)
go bh.Run(ctx, time.Second)
var highest uint64
for _, node := range nodes {
node := node
go func() {
for {
if err := watchSlotUpdates(ctx, node, sched, gossip, &highest); err != nil {
if err := watchSlotUpdates(ctx, node, sched, gossip, bh, &highest); err != nil {
klog.Errorf("watchSlotUpdates on node %s, reconnecting: %v", node.Name, err)
}
time.Sleep(time.Second * 5)
@ -78,7 +84,7 @@ func main() {
select {}
}
func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, sched *leaderschedule.Tracker, gossip *clusternodes.Tracker, highest *uint64) error {
func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, sched *leaderschedule.Tracker, gossip *clusternodes.Tracker, bh *blockhash.Tracker, highest *uint64) error {
timeout, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
@ -100,6 +106,8 @@ func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, sched *leadersch
sched.Update(m.Slot)
var lastBlockhash solana.Hash
if m.Type == ws.SlotsUpdatesFirstShredReceived {
klog.V(1).Infof("%s: first shred received for slot %d", node.Name, m.Slot)
@ -117,6 +125,11 @@ func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, sched *leadersch
continue
}
klog.Infof("current leader: %s, tpu: %s", leader, *g.TPU)
b := bh.MostRecent()
if b != lastBlockhash {
klog.Infof("new blockhash: %s", b)
lastBlockhash = b
}
}
}
}

122
pkg/blockhash/blockhash.go Normal file
View File

@ -0,0 +1,122 @@
package blockhash
import (
"context"
"sync"
"time"
envv1 "github.com/certusone/radiance/proto/env/v1"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"k8s.io/klog/v2"
)
type Tracker struct {
mu sync.Mutex
byNode map[string]struct {
Blockhash solana.Hash
HighestValidSlot uint64
}
nodes []*envv1.RPCNode
c map[string]*rpc.Client
}
func New(nodes []*envv1.RPCNode) *Tracker {
t := &Tracker{
byNode: make(map[string]struct {
Blockhash solana.Hash
HighestValidSlot uint64
}),
c: make(map[string]*rpc.Client),
nodes: nodes,
}
for _, node := range nodes {
t.c[node.Name] = rpc.New(node.Http)
}
return t
}
func (t *Tracker) MostPopular() solana.Hash {
t.mu.Lock()
defer t.mu.Unlock()
// Return the most frequently occurring value of t.byNode
var mostPopular solana.Hash
var mostPopularCount int
for _, h := range t.byNode {
count := 0
for _, h2 := range t.byNode {
if h.Blockhash.Equals(h2.Blockhash) {
count++
}
}
if count > mostPopularCount {
mostPopular = h.Blockhash
mostPopularCount = count
}
klog.V(2).Infof("%s: %d", h, count)
}
return mostPopular
}
func (t *Tracker) MostRecent() solana.Hash {
t.mu.Lock()
defer t.mu.Unlock()
// Return the blockhash which has the highest valid slot
var mostRecent solana.Hash
var highestValidSlot uint64
for _, h := range t.byNode {
if h.HighestValidSlot > highestValidSlot {
highestValidSlot = h.HighestValidSlot
mostRecent = h.Blockhash
}
}
return mostRecent
}
func (t *Tracker) Run(ctx context.Context, interval time.Duration) {
t.update(ctx)
for {
select {
case <-ctx.Done():
return
case <-time.After(interval):
t.update(ctx)
}
}
}
func (t *Tracker) update(ctx context.Context) {
now := time.Now()
for _, node := range t.nodes {
node := node
go func() {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
klog.V(1).Infof("Fetching blockhash from %s", node.Http)
h, err := t.c[node.Name].GetLatestBlockhash(ctx, rpc.CommitmentConfirmed)
if err != nil {
klog.Errorf("%s: failed to request blockhash: %v", node.Name, err)
return
}
klog.V(1).Infof("%s: fetched blockhash %d -> %s in %v",
node.Name, h.Value.LastValidBlockHeight, h.Value.Blockhash, time.Since(now))
t.mu.Lock()
t.byNode[node.Name] = struct {
Blockhash solana.Hash
HighestValidSlot uint64
}{
Blockhash: h.Value.Blockhash,
HighestValidSlot: h.Value.LastValidBlockHeight,
}
t.mu.Unlock()
}()
}
}