node/pkg/ethereum: fetch guardian set at startup

Rather than waiting 15s for the timer to tick, request the
guardian set immediately at startup.

This has the added advantage of being able to crash the
runnable if the guardian set can't be fetched initially.

Change-Id: Ib4cb476c05e92ecd06496043d248eb3ca25b8065
This commit is contained in:
Leo 2021-08-21 20:56:43 +02:00 committed by Leopold Schabel
parent 369a18a21c
commit 739e0f2bb9
1 changed files with 47 additions and 33 deletions

View File

@ -83,6 +83,9 @@ type (
pending map[eth_common.Hash]*pendingMessage
pendingMu sync.Mutex
// 0 is a valid guardian set, so we need a nil value here
currentGuardianSet *uint32
}
pendingMessage struct {
@ -150,8 +153,10 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
return fmt.Errorf("failed to subscribe to message publication events: %w", err)
}
// 0 is a valid guardian set, so we need a nil value here
var currentGuardianSet *uint32
// Fetch initial guardian set
if err := e.fetchAndUpdateGuardianSet(logger, ctx, caller); err != nil {
return fmt.Errorf("failed to request guardian set: %v", err)
}
// Poll for guardian set.
go func() {
@ -162,38 +167,9 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
case <-ctx.Done():
return
case <-t.C:
msm := time.Now()
logger.Info("fetching guardian set")
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
idx, gs, err := fetchCurrentGuardianSet(timeout, caller)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
logger.Error("failed requesting guardian set",
if err := e.fetchAndUpdateGuardianSet(logger, ctx, caller); err != nil {
logger.Error("failed updating guardian set",
zap.Error(err), zap.String("eth_network", e.networkName))
cancel()
continue
}
queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
cancel()
if currentGuardianSet != nil && *currentGuardianSet == idx {
continue
}
logger.Info("updated guardian set found",
zap.Any("value", gs), zap.Uint32("index", idx),
zap.String("eth_network", e.networkName))
currentGuardianSet = &idx
if e.setChan != nil {
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: idx,
}
}
}
}
@ -319,6 +295,44 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
}
}
func (e *EthBridgeWatcher) fetchAndUpdateGuardianSet(
logger *zap.Logger,
ctx context.Context,
caller *abi.AbiCaller,
) error {
msm := time.Now()
logger.Info("fetching guardian set")
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
idx, gs, err := fetchCurrentGuardianSet(timeout, caller)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return err
}
queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
if e.currentGuardianSet != nil && *(e.currentGuardianSet) == idx {
return nil
}
logger.Info("updated guardian set found",
zap.Any("value", gs), zap.Uint32("index", idx),
zap.String("eth_network", e.networkName))
e.currentGuardianSet = &idx
if e.setChan != nil {
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: idx,
}
}
return nil
}
// Fetch the current guardian set ID and guardian set from the chain.
func fetchCurrentGuardianSet(ctx context.Context, caller *abi.AbiCaller) (uint32, *abi.StructsGuardianSet, error) {
opts := &bind.CallOpts{Context: ctx}