Move leader schedule tracker to pkg/leaderschedule

This commit is contained in:
Leopold Schabel 2022-06-15 21:53:19 +02:00
parent 8f46262bab
commit 8356f43d60
2 changed files with 10 additions and 9 deletions

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/certusone/radiance/pkg/envfile"
"github.com/certusone/radiance/pkg/leaderschedule"
"github.com/certusone/radiance/proto/envv1"
"github.com/gagliardetto/solana-go/rpc/ws"
"k8s.io/klog/v2"
@ -88,7 +89,7 @@ func main() {
highest := &sync.Map{}
sched := &leaderSchedule{}
sched := &leaderschedule.Tracker{}
go sched.Run(ctx, env.Nodes)
@ -118,7 +119,7 @@ func main() {
select {}
}
func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map, sched *leaderSchedule) error {
func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map, sched *leaderschedule.Tracker) error {
timeout, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

View File

@ -1,4 +1,4 @@
package main
package leaderschedule
import (
"context"
@ -13,7 +13,7 @@ import (
"k8s.io/klog/v2"
)
type leaderSchedule struct {
type Tracker struct {
// mu guards max and cur
mu sync.Mutex
// max is the current leader schedule's highest slot
@ -39,14 +39,14 @@ const (
)
// FirstSlot returns the epoch number and first slot of the epoch.
func (t *leaderSchedule) FirstSlot(slotInEpoch uint64) (uint64, uint64) {
func (t *Tracker) FirstSlot(slotInEpoch uint64) (uint64, uint64) {
epoch := slotInEpoch / t.slotsPerEpoch
firstSlot := epoch * t.slotsPerEpoch
return epoch, firstSlot
}
// Update updates the current highest slot. Non-blocking.
func (t *leaderSchedule) Update(slot uint64) {
func (t *Tracker) Update(slot uint64) {
t.mu.Lock()
defer t.mu.Unlock()
@ -59,7 +59,7 @@ func (t *leaderSchedule) Update(slot uint64) {
// is greater than the highest slot in the schedule, the schedule is updated.
//
// A random node is picked from nodes to do the request against.
func (t *leaderSchedule) Run(ctx context.Context, nodes []*envv1.RPCNode) {
func (t *Tracker) Run(ctx context.Context, nodes []*envv1.RPCNode) {
t.initCh = make(chan struct{})
for {
@ -137,7 +137,7 @@ func (t *leaderSchedule) Run(ctx context.Context, nodes []*envv1.RPCNode) {
}
}
func (t *leaderSchedule) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot uint64) error {
func (t *Tracker) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot uint64) error {
t.bySlotMu.Lock()
defer t.bySlotMu.Unlock()
@ -182,7 +182,7 @@ func (t *leaderSchedule) fetch(ctx context.Context, nodes []*envv1.RPCNode, slot
// Get returns the scheduled leader for the given slot.
// It blocks until the leader schedule is available.
func (t *leaderSchedule) Get(slot uint64) solana.PublicKey {
func (t *Tracker) Get(slot uint64) solana.PublicKey {
// Block until the leader schedule is available
if t.initCh != nil {
<-t.initCh