diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 42ca25c8..67b1b1c9 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -411,6 +411,9 @@ func runNode(cmd *cobra.Command, args []string) { // Inbound observations obsvC := make(chan *gossipv1.SignedObservation, 50) + // Inbound signed VAAs + signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50) + // Injected VAAs (manually generated rather than created via observation) injectC := make(chan *vaa.VAA) @@ -456,7 +459,7 @@ func runNode(cmd *cobra.Command, args []string) { // Run supervisor. supervisor.New(rootCtx, logger, func(ctx context.Context) error { if err := supervisor.Run(ctx, "p2p", p2p.Run( - obsvC, sendC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil { + obsvC, sendC, signedInC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil { return err } @@ -494,6 +497,7 @@ func runNode(cmd *cobra.Command, args []string) { sendC, obsvC, injectC, + signedInC, gk, gst, *unsafeDevMode, diff --git a/node/pkg/db/db.go b/node/pkg/db/db.go index 5b51d18c..6cd704c9 100644 --- a/node/pkg/db/db.go +++ b/node/pkg/db/db.go @@ -17,7 +17,7 @@ type VAAID struct { Sequence uint64 } -func vaaIDFromVAA(v *vaa.VAA) *VAAID { +func VaaIDFromVAA(v *vaa.VAA) *VAAID { return &VAAID{ EmitterChain: v.EmitterChain, EmitterAddress: v.EmitterAddress, @@ -57,7 +57,7 @@ func (d *Database) StoreSignedVAA(v *vaa.VAA) error { // TODO: panic if same VAA is stored with different value err := d.db.Update(func(txn *badger.Txn) error { - if err := txn.Set(vaaIDFromVAA(v).Bytes(), b); err != nil { + if err := txn.Set(VaaIDFromVAA(v).Bytes(), b); err != nil { return err } return nil diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index d5e7ef41..57ed8ab4 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -59,7 +59,7 @@ func heartbeatDigest(b []byte) common.Hash { return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...)) } -func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *node_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error { +func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, signedInC chan *gossipv1.SignedVAAWithQuorum, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *node_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error { return func(ctx context.Context) (re error) { logger := supervisor.Logger(ctx) @@ -320,6 +320,9 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, priv crypto. case *gossipv1.GossipMessage_SignedObservation: obsvC <- m.SignedObservation p2pMessagesReceived.WithLabelValues("observation").Inc() + case *gossipv1.GossipMessage_SignedVaaWithQuorum: + signedInC <- m.SignedVaaWithQuorum + p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() default: p2pMessagesReceived.WithLabelValues("unknown").Inc() logger.Warn("received unknown message type (running outdated software?)", diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index d67566f4..3ce8fbb1 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" node_common "github.com/certusone/wormhole/node/pkg/common" + "github.com/certusone/wormhole/node/pkg/db" "github.com/certusone/wormhole/node/pkg/reporter" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -255,3 +256,82 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs } } + +func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) { + v, err := vaa.Unmarshal(m.Vaa) + if err != nil { + p.logger.Warn("received invalid VAA in SignedVAAWithQuorum message", + zap.Error(err), zap.Any("message", m)) + return + } + + // Calculate digest for logging + digest, err := v.SigningMsg() + if err != nil { + panic(err) + } + hash := hex.EncodeToString(digest.Bytes()) + + if p.gs == nil { + p.logger.Warn("dropping SignedVAAWithQuorum message since we haven't initialized our guardian set yet", + zap.String("digest", hash), + zap.Any("message", m), + ) + return + } + + // Verify VAA signature to prevent a DoS attack on our local store. + if !v.VerifySignatures(p.gs.Keys) { + p.logger.Warn("received SignedVAAWithQuorum message with invalid VAA signatures", + zap.String("digest", hash), + zap.Any("message", m), + zap.Any("vaa", v), + ) + return + } + + quorum := CalculateQuorum(len(p.gs.Keys)) + + if len(v.Signatures) < quorum { + p.logger.Warn("received SignedVAAWithQuorum message without quorum", + zap.String("digest", hash), + zap.Any("message", m), + zap.Any("vaa", v), + zap.Int("wanted_sigs", quorum), + zap.Int("got_sigs", len(v.Signatures)), + ) + return + } + + // We now established that: + // - all signatures on the VAA are valid + // - the signature's addresses match the node's current guardian set + // - enough signatures are present for the VAA to reach quorum + + // Check if we already store this VAA + _, err = p.db.GetSignedVAABytes(*db.VaaIDFromVAA(v)) + if err == nil { + p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already store", + zap.String("digest", hash), + ) + return + } else if err != db.ErrVAANotFound { + p.logger.Error("failed to look up VAA in database", + zap.String("digest", hash), + zap.Error(err), + ) + return + } + + // Store signed VAA in database. + p.logger.Info("storing inbound signed VAA with quorum", + zap.String("digest", hash), + zap.Any("vaa", v), + zap.String("bytes", hex.EncodeToString(m.Vaa)), + zap.String("message_id", v.MessageID())) + + if err := p.db.StoreSignedVAA(v); err != nil { + p.logger.Error("failed to store signed VAA", zap.Error(err)) + return + } +} diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index d7dcaeeb..8eeedbc0 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -60,6 +60,8 @@ type Processor struct { sendC chan []byte // obsvC is a channel of inbound decoded observations from p2p obsvC chan *gossipv1.SignedObservation + // signedInC is a channel of inbound signed VAA observations from p2p + signedInC chan *gossipv1.SignedVAAWithQuorum // injectC is a channel of VAAs injected locally. injectC chan *vaa.VAA @@ -106,6 +108,7 @@ func NewProcessor( sendC chan []byte, obsvC chan *gossipv1.SignedObservation, injectC chan *vaa.VAA, + signedInC chan *gossipv1.SignedVAAWithQuorum, gk *ecdsa.PrivateKey, gst *common.GuardianSetState, devnetMode bool, @@ -114,13 +117,15 @@ func NewProcessor( terraLCD string, terraChainID string, terraContract string, - attestationEvents *reporter.AttestationEventReporter) *Processor { + attestationEvents *reporter.AttestationEventReporter, +) *Processor { return &Processor{ lockC: lockC, setC: setC, sendC: sendC, obsvC: obsvC, + signedInC: signedInC, injectC: injectC, gk: gk, gst: gst, @@ -159,6 +164,8 @@ func (p *Processor) Run(ctx context.Context) error { p.handleInjection(ctx, v) case m := <-p.obsvC: p.handleObservation(ctx, m) + case m := <-p.signedInC: + p.handleInboundSignedVAAWithQuorum(ctx, m) case <-p.cleanup.C: p.handleCleanup(ctx) } diff --git a/node/pkg/vaa/structs.go b/node/pkg/vaa/structs.go index 447d564c..86f25453 100644 --- a/node/pkg/vaa/structs.go +++ b/node/pkg/vaa/structs.go @@ -202,7 +202,8 @@ func (v *VAA) SigningMsg() (common.Hash, error) { return hash, nil } -// VerifySignature verifies the signature of the VAA given the signer addresses. +// VerifySignatures verifies the signature of the VAA given the signer addresses. +// Returns true if the signatures were verified successfully. func (v *VAA) VerifySignatures(addresses []common.Address) bool { if len(addresses) < len(v.Signatures) { return false