diff --git a/devnet/spy.yaml b/devnet/spy.yaml index 94cae5fe0..cd85ed3df 100644 --- a/devnet/spy.yaml +++ b/devnet/spy.yaml @@ -48,6 +48,10 @@ spec: # Hardcoded devnet bootstrap (generated from deterministic key in guardiand) - --bootstrap - /dns4/guardian-0.guardian/udp/8999/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw + - --ethRPC + - http://eth-devnet:8545 + - --ethContract + - "0xC89Ce4735882C9F0f0FE26686c53074E09B0D550" - --logLevel=warn ports: - containerPort: 7072 diff --git a/node/cmd/spy/spy.go b/node/cmd/spy/spy.go index d5724a65a..ae616ced6 100644 --- a/node/cmd/spy/spy.go +++ b/node/cmd/spy/spy.go @@ -2,6 +2,7 @@ package spy import ( "context" + "errors" "fmt" "net" "net/http" @@ -48,6 +49,9 @@ var ( spyRPC *string sendTimeout *time.Duration + + ethRPC *string + ethContract *string ) func init() { @@ -65,6 +69,9 @@ func init() { spyRPC = SpyCmd.Flags().String("spyRPC", "", "Listen address for gRPC interface") sendTimeout = SpyCmd.Flags().Duration("sendTimeout", 5*time.Second, "Timeout for sending a message to a subscriber") + + ethRPC = SpyCmd.Flags().String("ethRPC", "", "Ethereum RPC for verifying VAAs (optional)") + ethContract = SpyCmd.Flags().String("ethContract", "", "Ethereum core bridge address for verifying VAAs (required if ethRPC is specified)") } // SpyCmd represents the node command @@ -79,6 +86,7 @@ type spyServer struct { logger *zap.Logger subsSignedVaa map[string]*subscriptionSignedVaa subsSignedVaaMu sync.Mutex + vaaVerifier *VaaVerifier } type message struct { @@ -103,15 +111,23 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error { defer s.subsSignedVaaMu.Unlock() var v *vaa.VAA + var err error + verified := s.vaaVerifier == nil for _, sub := range s.subsSignedVaa { if len(sub.filters) == 0 { + if !verified { + verified = true + v, err = s.verifyVAA(v, vaaBytes) + if err != nil { + return err + } + } sub.ch <- message{vaaBytes: vaaBytes} continue } if v == nil { - var err error v, err = vaa.Unmarshal(vaaBytes) if err != nil { return err @@ -120,6 +136,13 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error { for _, fi := range sub.filters { if fi.chainId == v.EmitterChain && fi.emitterAddr == v.EmitterAddress { + if !verified { + verified = true + v, err = s.verifyVAA(v, vaaBytes) + if err != nil { + return err + } + } sub.ch <- message{vaaBytes: vaaBytes} } } @@ -129,6 +152,31 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error { return nil } +func (s *spyServer) verifyVAA(v *vaa.VAA, vaaBytes []byte) (*vaa.VAA, error) { + if s.vaaVerifier == nil { + panic("verifier is nil") + } + + if v == nil { + var err error + v, err = vaa.Unmarshal(vaaBytes) + if err != nil { + return v, fmt.Errorf(`failed to unmarshal VAA: %w`, err) + } + } + + valid, err := s.vaaVerifier.VerifySignatures(v) + if err != nil { + return v, fmt.Errorf(`failed to verify VAA: %w`, err) + } + + if !valid { + return v, errors.New(`invalid VAA signature`) + } + + return v, nil +} + func (s *spyServer) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, resp spyv1.SpyRPCService_SubscribeSignedVAAServer) error { var fi []filterSignedVaa if req.Filters != nil { @@ -311,6 +359,17 @@ func runSpy(cmd *cobra.Command, args []string) { logger.Fatal("failed to start RPC server", zap.Error(err)) } + // VAA verifier (optional) + if *ethRPC != "" { + if *ethContract == "" { + logger.Fatal(`If "--ethRPC" is specified, "--ethContract" must also be specified`) + } + s.vaaVerifier = NewVaaVerifier(logger, *ethRPC, *ethContract) + if err := s.vaaVerifier.GetInitialGuardianSet(); err != nil { + logger.Fatal(`Failed to read initial guardian set for VAA verification`, zap.Error(err)) + } + } + // Ignore observations go func() { for { @@ -344,7 +403,7 @@ func runSpy(cmd *cobra.Command, args []string) { logger.Info("Received signed VAA", zap.Any("vaa", v.Vaa)) if err := s.PublishSignedVAA(v.Vaa); err != nil { - logger.Error("failed to publish signed VAA", zap.Error(err)) + logger.Error("failed to publish signed VAA", zap.Error(err), zap.Any("vaa", v.Vaa)) } } } diff --git a/node/cmd/spy/vaa_verifier.go b/node/cmd/spy/vaa_verifier.go new file mode 100644 index 000000000..c2aab5123 --- /dev/null +++ b/node/cmd/spy/vaa_verifier.go @@ -0,0 +1,131 @@ +package spy + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/certusone/wormhole/node/pkg/common" + "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" + + ethAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi" + ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind" + ethCommon "github.com/ethereum/go-ethereum/common" + ethClient "github.com/ethereum/go-ethereum/ethclient" + ethRpc "github.com/ethereum/go-ethereum/rpc" +) + +// VaaVerifier is an object that can be used to validate VAA signatures. +// It reads the guardian set on chain whenever a new guardian set index is detected. +type VaaVerifier struct { + logger *zap.Logger + rpcUrl string + coreAddr ethCommon.Address + lock sync.Mutex + guardianSets map[uint32]*common.GuardianSet +} + +// RpcTimeout is the context timeout on RPC calls. +const RpcTimeout = time.Second * 5 + +// NewVaaVerifier creates a VaaVerifier. +func NewVaaVerifier(logger *zap.Logger, rpcUrl string, coreAddr string) *VaaVerifier { + return &VaaVerifier{ + logger: logger, + rpcUrl: rpcUrl, + coreAddr: ethCommon.HexToAddress(coreAddr), + guardianSets: make(map[uint32]*common.GuardianSet), + } +} + +// GetInitialGuardianSet gets the current guardian set and adds it to the map. It is not necessary +// to call this function, but doing so will allow you to verify that the RPC endpoint works on start up, +// rather than having it fail the first VAA is received. +func (v *VaaVerifier) GetInitialGuardianSet() error { + timeout, cancel := context.WithTimeout(context.Background(), RpcTimeout) + defer cancel() + + rawClient, err := ethRpc.DialContext(timeout, v.rpcUrl) + if err != nil { + return fmt.Errorf("failed to connect to ethereum: %w", err) + } + + client := ethClient.NewClient(rawClient) + caller, err := ethAbi.NewAbiCaller(v.coreAddr, client) + if err != nil { + return fmt.Errorf("failed to create caller: %w", err) + } + + gsIndex, err := caller.GetCurrentGuardianSetIndex(ðBind.CallOpts{Context: timeout}) + if err != nil { + return fmt.Errorf("error requesting current guardian set index: %w", err) + } + + result, err := caller.GetGuardianSet(ðBind.CallOpts{Context: timeout}, gsIndex) + if err != nil { + return fmt.Errorf("error requesting guardian set for index %d: %w", gsIndex, err) + } + + gs := &common.GuardianSet{ + Keys: result.Keys, + Index: gsIndex, + } + + v.logger.Warn("read current guardian set", zap.Uint32("index", gsIndex), zap.Any("gs", *gs)) + v.guardianSets[gsIndex] = gs + return nil +} + +// VerifySignatures verifies that the signature on a VAA is valid, based on the guardian set contained in the VAA. +// If the guardian set is not currently in our map, it queries that guardian set and adds it. +func (v *VaaVerifier) VerifySignatures(vv *vaa.VAA) (bool, error) { + v.lock.Lock() + defer v.lock.Unlock() + + gs, exists := v.guardianSets[vv.GuardianSetIndex] + if !exists { + var err error + gs, err = v.fetchGuardianSet(vv.GuardianSetIndex) + if err != nil { + return false, fmt.Errorf("failed to fetch guardian set for index %d: %w", vv.GuardianSetIndex, err) + } + + v.logger.Warn("read guardian set", zap.Uint32("index", gs.Index), zap.Any("gs", *gs)) + v.guardianSets[gs.Index] = gs + } + + if err := vv.Verify(gs.Keys); err != nil { + return false, nil + } + + return true, nil +} + +// fetchGuardianSet reads the guardian set for the index passed in. +func (v *VaaVerifier) fetchGuardianSet(gsIndex uint32) (*common.GuardianSet, error) { + timeout, cancel := context.WithTimeout(context.Background(), RpcTimeout) + defer cancel() + + rawClient, err := ethRpc.DialContext(timeout, v.rpcUrl) + if err != nil { + return nil, fmt.Errorf("failed to connect to ethereum: %w", err) + } + + client := ethClient.NewClient(rawClient) + caller, err := ethAbi.NewAbiCaller(v.coreAddr, client) + if err != nil { + return nil, fmt.Errorf("failed to create caller: %w", err) + } + + gs, err := caller.GetGuardianSet(ðBind.CallOpts{Context: timeout}, gsIndex) + if err != nil { + return nil, fmt.Errorf("error requesting current guardian set for index %d: %w", gsIndex, err) + } + + return &common.GuardianSet{ + Keys: gs.Keys, + Index: gsIndex, + }, nil +}