From 8356f43d6078df5b714055bffe4aaa60a4a5f0bf Mon Sep 17 00:00:00 2001 From: Leopold Schabel Date: Wed, 15 Jun 2022 21:53:19 +0200 Subject: [PATCH] Move leader schedule tracker to pkg/leaderschedule --- cmd/rpc/slots/slot.go | 5 +++-- {cmd/rpc/slots => pkg/leaderschedule}/epoch.go | 14 +++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) rename {cmd/rpc/slots => pkg/leaderschedule}/epoch.go (91%) diff --git a/cmd/rpc/slots/slot.go b/cmd/rpc/slots/slot.go index 30e5d42..40e56dc 100644 --- a/cmd/rpc/slots/slot.go +++ b/cmd/rpc/slots/slot.go @@ -11,6 +11,7 @@ import ( "time" "github.com/certusone/radiance/pkg/envfile" + "github.com/certusone/radiance/pkg/leaderschedule" "github.com/certusone/radiance/proto/envv1" "github.com/gagliardetto/solana-go/rpc/ws" "k8s.io/klog/v2" @@ -88,7 +89,7 @@ func main() { highest := &sync.Map{} - sched := &leaderSchedule{} + sched := &leaderschedule.Tracker{} go sched.Run(ctx, env.Nodes) @@ -118,7 +119,7 @@ func main() { select {} } -func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map, sched *leaderSchedule) error { +func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map, sched *leaderschedule.Tracker) error { timeout, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() diff --git a/cmd/rpc/slots/epoch.go b/pkg/leaderschedule/epoch.go similarity index 91% rename from cmd/rpc/slots/epoch.go rename to pkg/leaderschedule/epoch.go index c732c8f..d1caf1c 100644 --- a/cmd/rpc/slots/epoch.go +++ b/pkg/leaderschedule/epoch.go @@ -1,4 +1,4 @@ -package main +package leaderschedule import ( "context" @@ -13,7 +13,7 @@ import ( "k8s.io/klog/v2" ) -type leaderSchedule struct { +type Tracker struct { // mu guards max and cur mu sync.Mutex // max is the current leader schedule's highest slot @@ -39,14 +39,14 @@ const ( ) // FirstSlot returns the epoch number and first slot of the epoch. -func (t *leaderSchedule) FirstSlot(slotInEpoch uint64) (uint64, uint64) { +func (t *Tracker) FirstSlot(slotInEpoch uint64) (uint64, uint64) { epoch := slotInEpoch / t.slotsPerEpoch firstSlot := epoch * t.slotsPerEpoch return epoch, firstSlot } // Update updates the current highest slot. Non-blocking. -func (t *leaderSchedule) Update(slot uint64) { +func (t *Tracker) Update(slot uint64) { t.mu.Lock() defer t.mu.Unlock() @@ -59,7 +59,7 @@ func (t *leaderSchedule) Update(slot uint64) { // is greater than the highest slot in the schedule, the schedule is updated. // // A random node is picked from nodes to do the request against. -func (t *leaderSchedule) Run(ctx context.Context, nodes []*envv1.RPCNode) { +func (t *Tracker) Run(ctx context.Context, nodes []*envv1.RPCNode) { t.initCh = make(chan struct{}) for { @@ -137,7 +137,7 @@ func (t *leaderSchedule) Run(ctx context.Context, nodes []*envv1.RPCNode) { } } -func (t *leaderSchedule) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot uint64) error { +func (t *Tracker) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot uint64) error { t.bySlotMu.Lock() defer t.bySlotMu.Unlock() @@ -182,7 +182,7 @@ func (t *leaderSchedule) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot // Get returns the scheduled leader for the given slot. // It blocks until the leader schedule is available. -func (t *leaderSchedule) Get(slot uint64) solana.PublicKey { +func (t *Tracker) Get(slot uint64) solana.PublicKey { // Block until the leader schedule is available if t.initCh != nil { <-t.initCh