bridge: move initial guardian set fetching to pkg/ethereum/watcher.go
This removes the special case in the processor. The initial guardian set
is now treated like a regular guardian set update, and the devnet
update check is executed on every update.
Fixes an edge case where processing a guardian set update would fail
with a spurious `abi: attempting to unmarshall an empty string while arguments are expected`
error, leaving the node in a bad state since restarting ethwatch
wouldn't cause the guardian set to be re-fetched.
ghstack-source-id: e580a65e90
Pull Request resolved: https://github.com/certusone/wormhole/pull/46
This commit is contained in:
parent
3b35ddc1ce
commit
08156ca438
|
@ -15,7 +15,6 @@ import (
|
||||||
|
|
||||||
"github.com/certusone/wormhole/bridge/pkg/common"
|
"github.com/certusone/wormhole/bridge/pkg/common"
|
||||||
"github.com/certusone/wormhole/bridge/pkg/devnet"
|
"github.com/certusone/wormhole/bridge/pkg/devnet"
|
||||||
"github.com/certusone/wormhole/bridge/pkg/ethereum"
|
|
||||||
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
|
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
|
||||||
"github.com/certusone/wormhole/bridge/pkg/supervisor"
|
"github.com/certusone/wormhole/bridge/pkg/supervisor"
|
||||||
"github.com/certusone/wormhole/bridge/pkg/vaa"
|
"github.com/certusone/wormhole/bridge/pkg/vaa"
|
||||||
|
@ -42,46 +41,7 @@ func vaaConsensusProcessor(lockC chan *common.ChainLock, setC chan *common.Guard
|
||||||
our_addr := crypto.PubkeyToAddress(gk.PublicKey)
|
our_addr := crypto.PubkeyToAddress(gk.PublicKey)
|
||||||
state := &aggregationState{vaaMap{}}
|
state := &aggregationState{vaaMap{}}
|
||||||
|
|
||||||
// Get initial validator set from Ethereum. We could also fetch it from Solana,
|
var gs *common.GuardianSet
|
||||||
// because both sets are synchronized, we simply made an arbitrary decision to use Ethereum.
|
|
||||||
|
|
||||||
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
idx, ethGs, err := ethereum.FetchCurrentGuardianSet(timeout, logger, *ethRPC, ethcommon.HexToAddress(*ethContract))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed requesting guardian set from Ethereum: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
gs := &common.GuardianSet{
|
|
||||||
Keys: ethGs.Keys,
|
|
||||||
Index: idx,
|
|
||||||
}
|
|
||||||
|
|
||||||
// In debug mode, node 0 submits a VAA that configures the desired number of guardians to both chains.
|
|
||||||
if *unsafeDevMode {
|
|
||||||
idx, err := devnet.GetDevnetIndex()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if idx == 0 && (uint(len(gs.Keys)) != *devNumGuardians) {
|
|
||||||
v := devnet.DevnetGuardianSetVSS(*devNumGuardians)
|
|
||||||
|
|
||||||
logger.Info(fmt.Sprintf("guardian set has %d members, expecting %d - submitting VAA",
|
|
||||||
len(gs.Keys), *devNumGuardians),
|
|
||||||
zap.Any("v", v))
|
|
||||||
|
|
||||||
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
tx, err := devnet.SubmitVAA(timeout, *ethRPC, v)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("failed to submit devnet guardian set change", zap.Error(err))
|
|
||||||
}
|
|
||||||
logger.Info("devnet guardian set change submitted to Ethereum", zap.Any("tx", tx))
|
|
||||||
|
|
||||||
vaaC <- v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
supervisor.Signal(ctx, supervisor.SignalHealthy)
|
supervisor.Signal(ctx, supervisor.SignalHealthy)
|
||||||
|
|
||||||
|
@ -93,6 +53,12 @@ func vaaConsensusProcessor(lockC chan *common.ChainLock, setC chan *common.Guard
|
||||||
logger.Info("guardian set updated",
|
logger.Info("guardian set updated",
|
||||||
zap.Strings("set", gs.KeysAsHexStrings()),
|
zap.Strings("set", gs.KeysAsHexStrings()),
|
||||||
zap.Uint32("index", gs.Index))
|
zap.Uint32("index", gs.Index))
|
||||||
|
|
||||||
|
// Dev mode guardian set update check (no-op in production)
|
||||||
|
err := checkDevModeGuardianSetUpdate(ctx, vaaC, gs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
case k := <-lockC:
|
case k := <-lockC:
|
||||||
supervisor.Logger(ctx).Info("lockup confirmed",
|
supervisor.Logger(ctx).Info("lockup confirmed",
|
||||||
zap.Stringer("source_chain", k.SourceChain),
|
zap.Stringer("source_chain", k.SourceChain),
|
||||||
|
@ -321,3 +287,36 @@ func vaaConsensusProcessor(lockC chan *common.ChainLock, setC chan *common.Guard
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkDevModeGuardianSetUpdate(ctx context.Context, vaaC chan *vaa.VAA, gs *common.GuardianSet) error {
|
||||||
|
logger := supervisor.Logger(ctx)
|
||||||
|
|
||||||
|
if *unsafeDevMode {
|
||||||
|
idx, err := devnet.GetDevnetIndex()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get devnet index: %s")
|
||||||
|
}
|
||||||
|
|
||||||
|
if idx == 0 && (uint(len(gs.Keys)) != *devNumGuardians) {
|
||||||
|
v := devnet.DevnetGuardianSetVSS(*devNumGuardians)
|
||||||
|
|
||||||
|
logger.Info(fmt.Sprintf("guardian set has %d members, expecting %d - submitting VAA",
|
||||||
|
len(gs.Keys), *devNumGuardians),
|
||||||
|
zap.Any("v", v))
|
||||||
|
|
||||||
|
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
tx, err := devnet.SubmitVAA(timeout, *ethRPC, v)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to submit devnet guardian set change: %v")
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("devnet guardian set change submitted to Ethereum", zap.Any("tx", tx))
|
||||||
|
|
||||||
|
// Submit VAA to Solana as well. This is asynchronous and can fail, leading to inconsistent devnet state.
|
||||||
|
vaaC <- v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -83,6 +83,20 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
|
||||||
errC := make(chan error)
|
errC := make(chan error)
|
||||||
logger := supervisor.Logger(ctx)
|
logger := supervisor.Logger(ctx)
|
||||||
|
|
||||||
|
// Get initial validator set from Ethereum. We could also fetch it from Solana,
|
||||||
|
// because both sets are synchronized, we simply made an arbitrary decision to use Ethereum.
|
||||||
|
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
idx, gs, err := FetchCurrentGuardianSet(timeout, e.url, e.bridge)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed requesting guardian set from Ethereum: %w", err)
|
||||||
|
}
|
||||||
|
logger.Info("initial guardian set fetched", zap.Any("value", gs), zap.Uint32("index", idx))
|
||||||
|
e.setChan <- &common.GuardianSet{
|
||||||
|
Keys: gs.Keys,
|
||||||
|
Index: idx,
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -132,7 +146,9 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
|
||||||
|
|
||||||
gs, err := caller.GetGuardianSet(&bind.CallOpts{Context: timeout}, ev.NewGuardianIndex)
|
gs, err := caller.GetGuardianSet(&bind.CallOpts{Context: timeout}, ev.NewGuardianIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errC <- fmt.Errorf("error requesting new guardian set value: %w", err)
|
// We failed to process the guardian set update and are now out of sync with the chain.
|
||||||
|
// Recover by crashing the runnable, which causes the guardian set to be re-fetched.
|
||||||
|
errC <- fmt.Errorf("error requesting new guardian set value for %d: %w", ev.NewGuardianIndex, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,7 +220,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch the current guardian set ID and guardian set from the chain.
|
// Fetch the current guardian set ID and guardian set from the chain.
|
||||||
func FetchCurrentGuardianSet(ctx context.Context, logger *zap.Logger, rpcURL string, bridgeContract eth_common.Address) (uint32, *abi.WormholeGuardianSet, error) {
|
func FetchCurrentGuardianSet(ctx context.Context, rpcURL string, bridgeContract eth_common.Address) (uint32, *abi.WormholeGuardianSet, error) {
|
||||||
c, err := ethclient.DialContext(ctx, rpcURL)
|
c, err := ethclient.DialContext(ctx, rpcURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("dialing eth client failed: %w", err)
|
return 0, nil, fmt.Errorf("dialing eth client failed: %w", err)
|
||||||
|
@ -227,7 +243,5 @@ func FetchCurrentGuardianSet(ctx context.Context, logger *zap.Logger, rpcURL str
|
||||||
return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err)
|
return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("current guardian set fetched", zap.Any("value", gs), zap.Uint32("index", currentIndex))
|
|
||||||
|
|
||||||
return currentIndex, &gs, nil
|
return currentIndex, &gs, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue