Decouple lifecycle of processor and ethwatch

We now do an independent fetch of the guardian set.
This commit is contained in:
Leo 2020-08-22 00:21:41 +02:00
parent 0d5bef7366
commit ef2aab5998
4 changed files with 47 additions and 34 deletions

View File

@ -178,12 +178,8 @@ func main() {
return err
}
// We need to re-fetch the initial initiator set when ethwatch dies, so we want to restart the watcher as well.
// TODO: on-demand fetching of guardian set to avoid restarting ethwatch?
if err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
"ethwatch": ethereum.NewEthBridgeWatcher(*ethRPC, ethContractAddr, *ethConfirmations, lockC, setC).Run,
"processor": vaaConsensusProcessor(lockC, setC, gk, sendC, obsvC, solanaVaaC),
}); err != nil {
if err := supervisor.Run(ctx, "ethwatch",
ethereum.NewEthBridgeWatcher(*ethRPC, ethContractAddr, *ethConfirmations, lockC, setC).Run); err != nil {
return err
}
@ -192,6 +188,10 @@ func main() {
return err
}
if err := supervisor.Run(ctx, "processor", vaaConsensusProcessor(lockC, setC, gk, sendC, obsvC, solanaVaaC)); err != nil {
return err
}
logger.Info("Started internal services")
supervisor.Signal(ctx, supervisor.SignalHealthy)

View File

@ -15,6 +15,7 @@ import (
"github.com/certusone/wormhole/bridge/pkg/common"
"github.com/certusone/wormhole/bridge/pkg/devnet"
"github.com/certusone/wormhole/bridge/pkg/ethereum"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/bridge/pkg/supervisor"
"github.com/certusone/wormhole/bridge/pkg/vaa"
@ -44,13 +45,17 @@ func vaaConsensusProcessor(lockC chan *common.ChainLock, setC chan *common.Guard
// Get initial validator set from Ethereum. We could also fetch it from Solana,
// because both sets are synchronized, we simply made an arbitrary decision to use Ethereum.
// TODO: this can deadlock even in the same RunGroup - needs to fetch the set independently
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
idx, ethGs, err := ethereum.FetchCurrentGuardianSet(timeout, logger, *ethRPC, ethcommon.HexToAddress(*ethContract))
if err != nil {
return fmt.Errorf("failed requesting guardian set from Ethereum: %w", err)
}
logger.Info("waiting for initial validator set to be fetched from Ethereum")
gs := <-setC
logger.Info("current guardian set received",
zap.Strings("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index))
gs := &common.GuardianSet{
Keys: ethGs.Keys,
Index: idx,
}
// In debug mode, node 0 submits a VAA that configures the desired number of guardians to both chains.
if *unsafeDevMode {

View File

@ -68,7 +68,7 @@ func SubmitVAA(ctx context.Context, rpcURL string, vaa *vaa.VAA) (*types.Transac
panic(err)
}
supervisor.Logger(ctx).Info("initial guardian set VAA", zap.Binary("binary", b)) // TODO
supervisor.Logger(ctx).Info("submitted VAA to devnet", zap.Binary("binary", b)) // TODO
tx, err := bridge.SubmitVAA(opts, b)
if err != nil {

View File

@ -194,27 +194,6 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
// Fetch current guardian set
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
defer cancel()
opts := &bind.CallOpts{Context: timeout}
currentIndex, err := caller.GuardianSetIndex(opts)
if err != nil {
return fmt.Errorf("error requesting current guardian set index: %w", err)
}
gs, err := caller.GetGuardianSet(opts, currentIndex)
if err != nil {
return fmt.Errorf("error requesting current guardian set value: %w", err)
}
logger.Info("current guardian set fetched", zap.Any("value", gs), zap.Uint32("index", currentIndex))
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: currentIndex,
}
select {
case <-ctx.Done():
return ctx.Err()
@ -222,3 +201,32 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
return err
}
}
// Fetch the current guardian set ID and guardian set from the chain.
func FetchCurrentGuardianSet(ctx context.Context, logger *zap.Logger, rpcURL string, bridgeContract eth_common.Address) (uint32, *abi.WormholeGuardianSet, error) {
c, err := ethclient.DialContext(ctx, rpcURL)
if err != nil {
return 0, nil, fmt.Errorf("dialing eth client failed: %w", err)
}
caller, err := abi.NewAbiCaller(bridgeContract, c)
if err != nil {
panic(err)
}
opts := &bind.CallOpts{Context: ctx}
currentIndex, err := caller.GuardianSetIndex(opts)
if err != nil {
return 0, nil, fmt.Errorf("error requesting current guardian set index: %w", err)
}
gs, err := caller.GetGuardianSet(opts, currentIndex)
if err != nil {
return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err)
}
logger.Info("current guardian set fetched", zap.Any("value", gs), zap.Uint32("index", currentIndex))
return currentIndex, &gs, nil
}