node: introduce SignedHeartbeat

Bug: certusone/wormhole#267
Change-Id: Ia34fa053240d7b340287cc4cc1e15556d3ff2893
This commit is contained in:
Leo 2021-07-31 14:36:57 +02:00
parent b02d782f1c
commit 2ebf473531
3 changed files with 92 additions and 19 deletions

View File

@ -404,7 +404,7 @@ func runBridge(cmd *cobra.Command, args []string) {
// Run supervisor.
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p.Run(
obsvC, sendC, rawHeartbeatListeners, priv, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil {
obsvC, sendC, rawHeartbeatListeners, priv, gk, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil {
return err
}

View File

@ -2,8 +2,11 @@ package p2p
import (
"context"
"crypto/ecdsa"
"fmt"
"github.com/certusone/wormhole/bridge/pkg/version"
"github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"strings"
@ -48,10 +51,17 @@ var (
}, []string{"type"})
)
func Run(obsvC chan *gossipv1.SignedObservation,
sendC chan []byte,
rawHeartbeatListeners *publicrpc.RawHeartbeatConns,
var heartbeatMessagePrefix = []byte("heartbeat|")
func heartbeatDigest(b []byte) common.Hash {
return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...))
}
func Run(
obsvC chan *gossipv1.SignedObservation,
sendC chan []byte, rawHeartbeatListeners *publicrpc.RawHeartbeatConns,
priv crypto.PrivKey,
gk *ecdsa.PrivateKey,
port uint,
networkID string,
bootstrapPeers string,
@ -190,24 +200,43 @@ func Run(obsvC chan *gossipv1.SignedObservation,
networks = append(networks, v)
}
msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_Heartbeat{
Heartbeat: &gossipv1.Heartbeat{
NodeName: nodeName,
Counter: ctr,
Timestamp: time.Now().UnixNano(),
Networks: networks,
Version: version.Version(),
GuardianAddr: DefaultRegistry.guardianAddress,
}}}
heartbeat := &gossipv1.Heartbeat{
NodeName: nodeName,
Counter: ctr,
Timestamp: time.Now().UnixNano(),
Networks: networks,
Version: version.Version(),
GuardianAddr: DefaultRegistry.guardianAddress,
}
rawHeartbeatListeners.PublishHeartbeat(msg.GetHeartbeat())
rawHeartbeatListeners.PublishHeartbeat(heartbeat)
b, err := proto.Marshal(&msg)
b, err := proto.Marshal(heartbeat)
if err != nil {
panic(err)
}
DefaultRegistry.mu.Unlock()
// Sign the heartbeat using our node's guardian key.
digest := heartbeatDigest(b)
sig, err := ethcrypto.Sign(digest.Bytes(), gk)
if err != nil {
panic(err)
}
msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedHeartbeat{
SignedHeartbeat: &gossipv1.SignedHeartbeat{
Heartbeat: b,
Signature: sig,
GuardianAddr: ethcrypto.PubkeyToAddress(gk.PublicKey).Bytes(),
}}}
b, err = proto.Marshal(&msg)
if err != nil {
panic(err)
}
err = th.Publish(ctx, b)
if err != nil {
logger.Warn("failed to publish heartbeat message", zap.Error(err))
@ -263,12 +292,31 @@ func Run(obsvC chan *gossipv1.SignedObservation,
zap.String("from", envelope.GetFrom().String()))
switch m := msg.Message.(type) {
// TODO(leo): remove Heartbeat support after upgrade
case *gossipv1.GossipMessage_Heartbeat:
logger.Debug("heartbeat received",
logger.Debug("unsigned heartbeat received",
zap.Any("value", m.Heartbeat),
zap.String("from", envelope.GetFrom().String()))
rawHeartbeatListeners.PublishHeartbeat(msg.GetHeartbeat())
p2pMessagesReceived.WithLabelValues("heartbeat").Inc()
case *gossipv1.GossipMessage_SignedHeartbeat:
s := m.SignedHeartbeat
if heartbeat, err := processSignedHeartbeat(s); err != nil {
p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
logger.Warn("invalid signed heartbeat received",
zap.Error(err),
zap.Any("payload", msg.Message),
zap.Any("value", s),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
} else {
p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc()
logger.Debug("valid signed heartbeat received",
zap.Any("value", heartbeat),
zap.String("from", envelope.GetFrom().String()))
rawHeartbeatListeners.PublishHeartbeat(heartbeat)
}
case *gossipv1.GossipMessage_SignedObservation:
obsvC <- m.SignedObservation
p2pMessagesReceived.WithLabelValues("observation").Inc()
@ -282,3 +330,15 @@ func Run(obsvC chan *gossipv1.SignedObservation,
}
}
}
func processSignedHeartbeat(s *gossipv1.SignedHeartbeat) (*gossipv1.Heartbeat, error) {
// TODO: verify signature here
var h gossipv1.Heartbeat
err := proto.Unmarshal(s.Heartbeat, &h)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal heartbeat: %w", err)
}
return &h, nil
}

View File

@ -6,12 +6,27 @@ option go_package = "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1;go
message GossipMessage {
oneof message {
// Deprecated: use SignedHeartbeat.
Heartbeat heartbeat = 1;
SignedObservation signed_observation = 2;
SignedHeartbeat signed_heartbeat = 3;
}
}
// P2P gossip heartbeats for network introspection purposes. ALL FIELDS ARE UNTRUSTED.
message SignedHeartbeat {
// Serialized Heartbeat message.
bytes heartbeat = 1;
// ECDSA signature using the node's guardian public key.
bytes signature = 2;
// Guardian address that signed this payload (truncated Eth address).
// This is already contained in Heartbeat, however, we want to verify
// the payload before we deserialize it.
bytes guardian_addr = 3;
}
// P2P gossip heartbeats for network introspection purposes.
message Heartbeat {
// The node's arbitrarily chosen, untrusted nodeName.
string node_name = 1;
@ -35,8 +50,6 @@ message Heartbeat {
// Human-readable representation of the guardian key's address.
string guardian_addr = 6;
// TODO: include signed statement of gk public key?
}
// A SignedObservation is a signed statement by a given guardian node