node: handle inbound SignedVAAWithQuorum messages

Change-Id: I539155bb4e59d728ea528e6e2f70b6fbb3338a41
This commit is contained in:
Leo 2021-09-13 15:03:26 +02:00 committed by Leopold Schabel
parent ec07ed0288
commit 7914512797
6 changed files with 101 additions and 6 deletions

View File

@ -411,6 +411,9 @@ func runNode(cmd *cobra.Command, args []string) {
// Inbound observations
obsvC := make(chan *gossipv1.SignedObservation, 50)
// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
// Injected VAAs (manually generated rather than created via observation)
injectC := make(chan *vaa.VAA)
@ -456,7 +459,7 @@ func runNode(cmd *cobra.Command, args []string) {
// Run supervisor.
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p.Run(
obsvC, sendC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil {
obsvC, sendC, signedInC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil {
return err
}
@ -494,6 +497,7 @@ func runNode(cmd *cobra.Command, args []string) {
sendC,
obsvC,
injectC,
signedInC,
gk,
gst,
*unsafeDevMode,

View File

@ -17,7 +17,7 @@ type VAAID struct {
Sequence uint64
}
func vaaIDFromVAA(v *vaa.VAA) *VAAID {
func VaaIDFromVAA(v *vaa.VAA) *VAAID {
return &VAAID{
EmitterChain: v.EmitterChain,
EmitterAddress: v.EmitterAddress,
@ -57,7 +57,7 @@ func (d *Database) StoreSignedVAA(v *vaa.VAA) error {
// TODO: panic if same VAA is stored with different value
err := d.db.Update(func(txn *badger.Txn) error {
if err := txn.Set(vaaIDFromVAA(v).Bytes(), b); err != nil {
if err := txn.Set(VaaIDFromVAA(v).Bytes(), b); err != nil {
return err
}
return nil

View File

@ -59,7 +59,7 @@ func heartbeatDigest(b []byte) common.Hash {
return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...))
}
func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *node_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, signedInC chan *gossipv1.SignedVAAWithQuorum, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *node_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
return func(ctx context.Context) (re error) {
logger := supervisor.Logger(ctx)
@ -320,6 +320,9 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, priv crypto.
case *gossipv1.GossipMessage_SignedObservation:
obsvC <- m.SignedObservation
p2pMessagesReceived.WithLabelValues("observation").Inc()
case *gossipv1.GossipMessage_SignedVaaWithQuorum:
signedInC <- m.SignedVaaWithQuorum
p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc()
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()
logger.Warn("received unknown message type (running outdated software?)",

View File

@ -5,6 +5,7 @@ import (
"encoding/hex"
"fmt"
node_common "github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/reporter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -255,3 +256,82 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
}
}
func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) {
v, err := vaa.Unmarshal(m.Vaa)
if err != nil {
p.logger.Warn("received invalid VAA in SignedVAAWithQuorum message",
zap.Error(err), zap.Any("message", m))
return
}
// Calculate digest for logging
digest, err := v.SigningMsg()
if err != nil {
panic(err)
}
hash := hex.EncodeToString(digest.Bytes())
if p.gs == nil {
p.logger.Warn("dropping SignedVAAWithQuorum message since we haven't initialized our guardian set yet",
zap.String("digest", hash),
zap.Any("message", m),
)
return
}
// Verify VAA signature to prevent a DoS attack on our local store.
if !v.VerifySignatures(p.gs.Keys) {
p.logger.Warn("received SignedVAAWithQuorum message with invalid VAA signatures",
zap.String("digest", hash),
zap.Any("message", m),
zap.Any("vaa", v),
)
return
}
quorum := CalculateQuorum(len(p.gs.Keys))
if len(v.Signatures) < quorum {
p.logger.Warn("received SignedVAAWithQuorum message without quorum",
zap.String("digest", hash),
zap.Any("message", m),
zap.Any("vaa", v),
zap.Int("wanted_sigs", quorum),
zap.Int("got_sigs", len(v.Signatures)),
)
return
}
// We now established that:
// - all signatures on the VAA are valid
// - the signature's addresses match the node's current guardian set
// - enough signatures are present for the VAA to reach quorum
// Check if we already store this VAA
_, err = p.db.GetSignedVAABytes(*db.VaaIDFromVAA(v))
if err == nil {
p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already store",
zap.String("digest", hash),
)
return
} else if err != db.ErrVAANotFound {
p.logger.Error("failed to look up VAA in database",
zap.String("digest", hash),
zap.Error(err),
)
return
}
// Store signed VAA in database.
p.logger.Info("storing inbound signed VAA with quorum",
zap.String("digest", hash),
zap.Any("vaa", v),
zap.String("bytes", hex.EncodeToString(m.Vaa)),
zap.String("message_id", v.MessageID()))
if err := p.db.StoreSignedVAA(v); err != nil {
p.logger.Error("failed to store signed VAA", zap.Error(err))
return
}
}

View File

@ -60,6 +60,8 @@ type Processor struct {
sendC chan []byte
// obsvC is a channel of inbound decoded observations from p2p
obsvC chan *gossipv1.SignedObservation
// signedInC is a channel of inbound signed VAA observations from p2p
signedInC chan *gossipv1.SignedVAAWithQuorum
// injectC is a channel of VAAs injected locally.
injectC chan *vaa.VAA
@ -106,6 +108,7 @@ func NewProcessor(
sendC chan []byte,
obsvC chan *gossipv1.SignedObservation,
injectC chan *vaa.VAA,
signedInC chan *gossipv1.SignedVAAWithQuorum,
gk *ecdsa.PrivateKey,
gst *common.GuardianSetState,
devnetMode bool,
@ -114,13 +117,15 @@ func NewProcessor(
terraLCD string,
terraChainID string,
terraContract string,
attestationEvents *reporter.AttestationEventReporter) *Processor {
attestationEvents *reporter.AttestationEventReporter,
) *Processor {
return &Processor{
lockC: lockC,
setC: setC,
sendC: sendC,
obsvC: obsvC,
signedInC: signedInC,
injectC: injectC,
gk: gk,
gst: gst,
@ -159,6 +164,8 @@ func (p *Processor) Run(ctx context.Context) error {
p.handleInjection(ctx, v)
case m := <-p.obsvC:
p.handleObservation(ctx, m)
case m := <-p.signedInC:
p.handleInboundSignedVAAWithQuorum(ctx, m)
case <-p.cleanup.C:
p.handleCleanup(ctx)
}

View File

@ -202,7 +202,8 @@ func (v *VAA) SigningMsg() (common.Hash, error) {
return hash, nil
}
// VerifySignature verifies the signature of the VAA given the signer addresses.
// VerifySignatures verifies the signature of the VAA given the signer addresses.
// Returns true if the signatures were verified successfully.
func (v *VAA) VerifySignatures(addresses []common.Address) bool {
if len(addresses) < len(v.Signatures) {
return false