node: add GuardianSetState and verify heartbeat signature

Bug: certusone/wormhole#267
Change-Id: Ica8015fbbd52506d800670d933051f410bb1caa7
This commit is contained in:
Leo 2021-07-31 18:51:38 +02:00
parent 08a8f390ca
commit 3af233e3eb
4 changed files with 65 additions and 17 deletions

View File

@ -368,6 +368,9 @@ func runBridge(cmd *cobra.Command, args []string) {
// Injected VAAs (manually generated rather than created via observation)
injectC := make(chan *vaa.VAA)
// Guardian set state managed by processor
gst := &common.GuardianSetState{}
// Load p2p private key
var priv crypto.PrivKey
if *unsafeDevMode {
@ -404,7 +407,7 @@ func runBridge(cmd *cobra.Command, args []string) {
// Run supervisor.
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p.Run(
obsvC, sendC, rawHeartbeatListeners, priv, gk, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil {
obsvC, sendC, rawHeartbeatListeners, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil {
return err
}
@ -443,6 +446,7 @@ func runBridge(cmd *cobra.Command, args []string) {
obsvC,
injectC,
gk,
gst,
*unsafeDevMode,
*devNumGuardians,
*ethRPC,

View File

@ -2,6 +2,7 @@ package common
import (
"github.com/ethereum/go-ethereum/common"
"sync"
)
// Matching constants:
@ -39,3 +40,22 @@ func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) {
return -1, false
}
type GuardianSetState struct {
mu sync.Mutex
current *GuardianSet
}
func (st *GuardianSetState) Set(set *GuardianSet) {
st.mu.Lock()
defer st.mu.Unlock()
st.current = set
}
func (st *GuardianSetState) Get() *GuardianSet {
st.mu.Lock()
defer st.mu.Unlock()
return st.current
}

View File

@ -3,7 +3,9 @@ package p2p
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
bridge_common "github.com/certusone/wormhole/bridge/pkg/common"
"github.com/certusone/wormhole/bridge/pkg/version"
"github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
@ -57,17 +59,7 @@ func heartbeatDigest(b []byte) common.Hash {
return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...))
}
func Run(
obsvC chan *gossipv1.SignedObservation,
sendC chan []byte, rawHeartbeatListeners *publicrpc.RawHeartbeatConns,
priv crypto.PrivKey,
gk *ecdsa.PrivateKey,
port uint,
networkID string,
bootstrapPeers string,
nodeName string,
rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeatListeners *publicrpc.RawHeartbeatConns, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *bridge_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
return func(ctx context.Context) (re error) {
logger := supervisor.Logger(ctx)
@ -301,9 +293,16 @@ func Run(
p2pMessagesReceived.WithLabelValues("heartbeat").Inc()
case *gossipv1.GossipMessage_SignedHeartbeat:
s := m.SignedHeartbeat
if heartbeat, err := processSignedHeartbeat(s); err != nil {
gs := gst.Get()
if gs == nil {
// No valid guardian set yet - dropping heartbeat
logger.Debug("skipping heartbeat - no guardian set",
zap.Any("value", s),
zap.String("from", envelope.GetFrom().String()))
break
}
if heartbeat, err := processSignedHeartbeat(s, gs); err != nil {
p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
logger.Warn("invalid signed heartbeat received",
zap.Error(err),
zap.Any("payload", msg.Message),
@ -331,11 +330,29 @@ func Run(
}
}
func processSignedHeartbeat(s *gossipv1.SignedHeartbeat) (*gossipv1.Heartbeat, error) {
// TODO: verify signature here
func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet) (*gossipv1.Heartbeat, error) {
envelopeAddr := common.BytesToAddress(s.GuardianAddr)
idx, ok := gs.KeyIndex(envelopeAddr)
if !ok {
return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr)
}
pk := gs.Keys[idx]
digest := heartbeatDigest(s.Heartbeat)
pubKey, err := ethcrypto.Ecrecover(digest.Bytes(), s.Signature)
if err != nil {
return nil, errors.New("failed to recover public key")
}
signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:])
if pk != signerAddr {
return nil, fmt.Errorf("invalid signer: %v", signerAddr)
}
var h gossipv1.Heartbeat
err := proto.Unmarshal(s.Heartbeat, &h)
err = proto.Unmarshal(s.Heartbeat, &h)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal heartbeat: %w", err)
}

View File

@ -83,6 +83,10 @@ type Processor struct {
// gs is the currently valid guardian set
gs *common.GuardianSet
// gst is managed by the processor and allows concurrent access to the
// guardian set by other components.
gst *common.GuardianSetState
// state is the current runtime VAA view
state *aggregationState
// gk pk as eth address
@ -100,6 +104,7 @@ func NewProcessor(
obsvC chan *gossipv1.SignedObservation,
injectC chan *vaa.VAA,
gk *ecdsa.PrivateKey,
gst *common.GuardianSetState,
devnetMode bool,
devnetNumGuardians uint,
devnetEthRPC string,
@ -114,6 +119,7 @@ func NewProcessor(
obsvC: obsvC,
injectC: injectC,
gk: gk,
gst: gst,
devnetMode: devnetMode,
devnetNumGuardians: devnetNumGuardians,
devnetEthRPC: devnetEthRPC,
@ -140,6 +146,7 @@ func (p *Processor) Run(ctx context.Context) error {
p.logger.Info("guardian set updated",
zap.Strings("set", p.gs.KeysAsHexStrings()),
zap.Uint32("index", p.gs.Index))
p.gst.Set(p.gs)
case k := <-p.lockC:
p.handleMessage(ctx, k)
case v := <-p.injectC: