From 3af233e3eb94b0c0d0f28266d3c5ba695f842028 Mon Sep 17 00:00:00 2001 From: Leo Date: Sat, 31 Jul 2021 18:51:38 +0200 Subject: [PATCH] node: add GuardianSetState and verify heartbeat signature Bug: certusone/wormhole#267 Change-Id: Ica8015fbbd52506d800670d933051f410bb1caa7 --- bridge/cmd/guardiand/bridge.go | 6 +++- bridge/pkg/common/guardianset.go | 20 +++++++++++++ bridge/pkg/p2p/p2p.go | 49 +++++++++++++++++++++---------- bridge/pkg/processor/processor.go | 7 +++++ 4 files changed, 65 insertions(+), 17 deletions(-) diff --git a/bridge/cmd/guardiand/bridge.go b/bridge/cmd/guardiand/bridge.go index ed6210405..fddf6447a 100644 --- a/bridge/cmd/guardiand/bridge.go +++ b/bridge/cmd/guardiand/bridge.go @@ -368,6 +368,9 @@ func runBridge(cmd *cobra.Command, args []string) { // Injected VAAs (manually generated rather than created via observation) injectC := make(chan *vaa.VAA) + // Guardian set state managed by processor + gst := &common.GuardianSetState{} + // Load p2p private key var priv crypto.PrivKey if *unsafeDevMode { @@ -404,7 +407,7 @@ func runBridge(cmd *cobra.Command, args []string) { // Run supervisor. supervisor.New(rootCtx, logger, func(ctx context.Context) error { if err := supervisor.Run(ctx, "p2p", p2p.Run( - obsvC, sendC, rawHeartbeatListeners, priv, gk, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil { + obsvC, sendC, rawHeartbeatListeners, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil { return err } @@ -443,6 +446,7 @@ func runBridge(cmd *cobra.Command, args []string) { obsvC, injectC, gk, + gst, *unsafeDevMode, *devNumGuardians, *ethRPC, diff --git a/bridge/pkg/common/guardianset.go b/bridge/pkg/common/guardianset.go index a3ed9e74d..de76fc641 100644 --- a/bridge/pkg/common/guardianset.go +++ b/bridge/pkg/common/guardianset.go @@ -2,6 +2,7 @@ package common import ( "github.com/ethereum/go-ethereum/common" + "sync" ) // Matching constants: @@ -39,3 +40,22 @@ func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) { return -1, false } + +type GuardianSetState struct { + mu sync.Mutex + current *GuardianSet +} + +func (st *GuardianSetState) Set(set *GuardianSet) { + st.mu.Lock() + defer st.mu.Unlock() + + st.current = set +} + +func (st *GuardianSetState) Get() *GuardianSet { + st.mu.Lock() + defer st.mu.Unlock() + + return st.current +} diff --git a/bridge/pkg/p2p/p2p.go b/bridge/pkg/p2p/p2p.go index 841f61473..419dbbf7b 100644 --- a/bridge/pkg/p2p/p2p.go +++ b/bridge/pkg/p2p/p2p.go @@ -3,7 +3,9 @@ package p2p import ( "context" "crypto/ecdsa" + "errors" "fmt" + bridge_common "github.com/certusone/wormhole/bridge/pkg/common" "github.com/certusone/wormhole/bridge/pkg/version" "github.com/ethereum/go-ethereum/common" ethcrypto "github.com/ethereum/go-ethereum/crypto" @@ -57,17 +59,7 @@ func heartbeatDigest(b []byte) common.Hash { return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...)) } -func Run( - obsvC chan *gossipv1.SignedObservation, - sendC chan []byte, rawHeartbeatListeners *publicrpc.RawHeartbeatConns, - priv crypto.PrivKey, - gk *ecdsa.PrivateKey, - port uint, - networkID string, - bootstrapPeers string, - nodeName string, - rootCtxCancel context.CancelFunc) func(ctx context.Context) error { - +func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeatListeners *publicrpc.RawHeartbeatConns, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *bridge_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, rootCtxCancel context.CancelFunc) func(ctx context.Context) error { return func(ctx context.Context) (re error) { logger := supervisor.Logger(ctx) @@ -301,9 +293,16 @@ func Run( p2pMessagesReceived.WithLabelValues("heartbeat").Inc() case *gossipv1.GossipMessage_SignedHeartbeat: s := m.SignedHeartbeat - if heartbeat, err := processSignedHeartbeat(s); err != nil { + gs := gst.Get() + if gs == nil { + // No valid guardian set yet - dropping heartbeat + logger.Debug("skipping heartbeat - no guardian set", + zap.Any("value", s), + zap.String("from", envelope.GetFrom().String())) + break + } + if heartbeat, err := processSignedHeartbeat(s, gs); err != nil { p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() - logger.Warn("invalid signed heartbeat received", zap.Error(err), zap.Any("payload", msg.Message), @@ -331,11 +330,29 @@ func Run( } } -func processSignedHeartbeat(s *gossipv1.SignedHeartbeat) (*gossipv1.Heartbeat, error) { - // TODO: verify signature here +func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet) (*gossipv1.Heartbeat, error) { + envelopeAddr := common.BytesToAddress(s.GuardianAddr) + idx, ok := gs.KeyIndex(envelopeAddr) + if !ok { + return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr) + } + + pk := gs.Keys[idx] + + digest := heartbeatDigest(s.Heartbeat) + + pubKey, err := ethcrypto.Ecrecover(digest.Bytes(), s.Signature) + if err != nil { + return nil, errors.New("failed to recover public key") + } + + signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:]) + if pk != signerAddr { + return nil, fmt.Errorf("invalid signer: %v", signerAddr) + } var h gossipv1.Heartbeat - err := proto.Unmarshal(s.Heartbeat, &h) + err = proto.Unmarshal(s.Heartbeat, &h) if err != nil { return nil, fmt.Errorf("failed to unmarshal heartbeat: %w", err) } diff --git a/bridge/pkg/processor/processor.go b/bridge/pkg/processor/processor.go index 181685b62..8abc4b715 100644 --- a/bridge/pkg/processor/processor.go +++ b/bridge/pkg/processor/processor.go @@ -83,6 +83,10 @@ type Processor struct { // gs is the currently valid guardian set gs *common.GuardianSet + // gst is managed by the processor and allows concurrent access to the + // guardian set by other components. + gst *common.GuardianSetState + // state is the current runtime VAA view state *aggregationState // gk pk as eth address @@ -100,6 +104,7 @@ func NewProcessor( obsvC chan *gossipv1.SignedObservation, injectC chan *vaa.VAA, gk *ecdsa.PrivateKey, + gst *common.GuardianSetState, devnetMode bool, devnetNumGuardians uint, devnetEthRPC string, @@ -114,6 +119,7 @@ func NewProcessor( obsvC: obsvC, injectC: injectC, gk: gk, + gst: gst, devnetMode: devnetMode, devnetNumGuardians: devnetNumGuardians, devnetEthRPC: devnetEthRPC, @@ -140,6 +146,7 @@ func (p *Processor) Run(ctx context.Context) error { p.logger.Info("guardian set updated", zap.Strings("set", p.gs.KeysAsHexStrings()), zap.Uint32("index", p.gs.Index)) + p.gst.Set(p.gs) case k := <-p.lockC: p.handleMessage(ctx, k) case v := <-p.injectC: