diff --git a/Tiltfile b/Tiltfile index a37d55aff..ea1a91d45 100644 --- a/Tiltfile +++ b/Tiltfile @@ -45,6 +45,7 @@ config.define_bool("aptos", False, "Enable Aptos component") config.define_bool("algorand", False, "Enable Algorand component") config.define_bool("evm2", False, "Enable second Eth component") config.define_bool("solana", False, "Enable Solana component") +config.define_bool("pythnet", False, "Enable PythNet component") config.define_bool("terra_classic", False, "Enable Terra Classic component") config.define_bool("terra2", False, "Enable Terra 2 component") config.define_bool("spy_relayer", False, "Enable spy relayer") @@ -69,6 +70,7 @@ aptos = cfg.get("aptos", ci) sui = cfg.get("sui", False) evm2 = cfg.get("evm2", ci) solana = cfg.get("solana", ci) +pythnet = cfg.get("pythnet", False) terra_classic = cfg.get("terra_classic", ci) terra2 = cfg.get("terra2", ci) wormchain = cfg.get("wormchain", ci) @@ -208,6 +210,18 @@ def build_node_yaml(): "http://solana-devnet:8899", ] + if pythnet: + container["command"] += [ + "--pythnetRPC", +# "http://solana-devnet:8899", + "http://pythnet.rpcpool.com", + "--pythnetWS", +# "ws://solana-devnet:8900", + "wss://pythnet.rpcpool.com", + "--pythnetContract", + "H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU", + ] + if terra_classic: container["command"] += [ "--terraWS", @@ -270,7 +284,7 @@ k8s_yaml_with_ns(build_node_yaml()) guardian_resource_deps = ["eth-devnet"] if evm2: guardian_resource_deps = guardian_resource_deps + ["eth-devnet2"] -if solana: +if solana or pythnet: guardian_resource_deps = guardian_resource_deps + ["solana-devnet"] if near: guardian_resource_deps = guardian_resource_deps + ["near"] @@ -366,7 +380,7 @@ k8s_resource( trigger_mode = trigger_mode, ) -if solana: +if solana or pythnet: # solana client cli (used for devnet setup) docker_build( diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 7afd92e9d..de72e350d 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -165,6 +165,7 @@ var ( pythnetContract *string pythnetRPC *string + pythnetWS *string arbitrumRPC *string arbitrumContract *string @@ -307,10 +308,11 @@ func init() { suiAccount = NodeCmd.Flags().String("suiAccount", "", "sui account") suiPackage = NodeCmd.Flags().String("suiPackage", "", "sui package") - solanaRPC = NodeCmd.Flags().String("solanaRPC", "", "Solana RPC URL (required") + solanaRPC = NodeCmd.Flags().String("solanaRPC", "", "Solana RPC URL (required)") pythnetContract = NodeCmd.Flags().String("pythnetContract", "", "Address of the PythNet program (required)") - pythnetRPC = NodeCmd.Flags().String("pythnetRPC", "", "PythNet RPC URL (required") + pythnetRPC = NodeCmd.Flags().String("pythnetRPC", "", "PythNet RPC URL (required)") + pythnetWS = NodeCmd.Flags().String("pythnetWS", "", "PythNet WS URL") arbitrumRPC = NodeCmd.Flags().String("arbitrumRPC", "", "Arbitrum RPC URL") arbitrumContract = NodeCmd.Flags().String("arbitrumContract", "", "Arbitrum contract address") @@ -1280,11 +1282,11 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessSolanaSyncing) chainObsvReqC[vaa.ChainIDSolana] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "solwatch-confirmed", - solana.NewSolanaWatcher(*solanaRPC, solAddress, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run); err != nil { + common.WrapWithScissors(solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run, "solwatch-confirmed")); err != nil { return err } - solanaFinalizedWatcher = solana.NewSolanaWatcher(*solanaRPC, solAddress, lockC, chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, common.ReadinessSolanaSyncing, vaa.ChainIDSolana) - if err := supervisor.Run(ctx, "solwatch-finalized", solanaFinalizedWatcher.Run); err != nil { + solanaFinalizedWatcher = solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, lockC, chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, common.ReadinessSolanaSyncing, vaa.ChainIDSolana) + if err := supervisor.Run(ctx, "solwatch-finalized", common.WrapWithScissors(solanaFinalizedWatcher.Run, "solwatch-finalized")); err != nil { return err } } @@ -1294,11 +1296,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessPythNetSyncing) chainObsvReqC[vaa.ChainIDPythNet] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "pythwatch-confirmed", - solana.NewSolanaWatcher(*pythnetRPC, pythnetAddress, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run); err != nil { - return err - } - if err := supervisor.Run(ctx, "pythwatch-finalized", - solana.NewSolanaWatcher(*pythnetRPC, pythnetAddress, lockC, chainObsvReqC[vaa.ChainIDPythNet], rpc.CommitmentFinalized, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run); err != nil { + common.WrapWithScissors(solana.NewSolanaWatcher(*pythnetRPC, pythnetWS, pythnetAddress, *pythnetContract, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run, "pythwatch-confirmed")); err != nil { return err } } diff --git a/node/pkg/watchers/solana/client.go b/node/pkg/watchers/solana/client.go index cfcd0ceaa..bbf71341c 100644 --- a/node/pkg/watchers/solana/client.go +++ b/node/pkg/watchers/solana/client.go @@ -4,9 +4,13 @@ import ( "context" "errors" "fmt" + "io" "sync" "time" + "encoding/base64" + "encoding/json" + "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/p2p" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" @@ -16,34 +20,90 @@ import ( "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" "github.com/gagliardetto/solana-go/rpc/jsonrpc" + "github.com/google/uuid" "github.com/mr-tron/base58" "github.com/near/borsh-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" + "nhooyr.io/websocket" ) -type SolanaWatcher struct { - contract solana.PublicKey - rpcUrl string - commitment rpc.CommitmentType - messageEvent chan *common.MessagePublication - obsvReqC chan *gossipv1.ObservationRequest - rpcClient *rpc.Client - // Readiness component - readiness readiness.Component - // VAA ChainID of the network we're connecting to. - chainID vaa.ChainID - // Human readable name of network - networkName string - // The last slot processed by the watcher. - lastSlot uint64 +type ( + SolanaWatcher struct { + contract solana.PublicKey + rawContract string + rpcUrl string + wsUrl *string + commitment rpc.CommitmentType + messageEvent chan *common.MessagePublication + obsvReqC chan *gossipv1.ObservationRequest + errC chan error + pumpData chan []byte + rpcClient *rpc.Client + // Readiness component + readiness readiness.Component + // VAA ChainID of the network we're connecting to. + chainID vaa.ChainID + // Human readable name of network + networkName string + // The last slot processed by the watcher. + lastSlot uint64 + // subscriber id + subId string - // latestFinalizedBlockNumber is the latest block processed by this watcher. - latestBlockNumber uint64 - latestBlockNumberMu sync.Mutex -} + // latestFinalizedBlockNumber is the latest block processed by this watcher. + latestBlockNumber uint64 + latestBlockNumberMu sync.Mutex + } + + EventSubscriptionError struct { + Jsonrpc string `json:"jsonrpc"` + Error struct { + Code int `json:"code"` + Message *string `json:"message"` + } `json:"error"` + ID string `json:"id"` + } + + EventSubscriptionData struct { + Jsonrpc string `json:"jsonrpc"` + Method string `json:"method"` + Params *struct { + Result struct { + Context struct { + Slot int64 `json:"slot"` + } `json:"context"` + Value struct { + Pubkey string `json:"pubkey"` + Account struct { + Lamports int64 `json:"lamports"` + Data []string `json:"data"` + Owner string `json:"owner"` + Executable bool `json:"executable"` + RentEpoch int64 `json:"rentEpoch"` + } `json:"account"` + } `json:"value"` + } `json:"result"` + Subscription int `json:"subscription"` + } `json:"params"` + } + + MessagePublicationAccount struct { + VaaVersion uint8 + // Borsh does not seem to support booleans, so 0=false / 1=true + ConsistencyLevel uint8 + VaaTime uint32 + VaaSignatureAccount vaa.Address + SubmissionTime uint32 + Nonce uint32 + Sequence uint64 + EmitterChain uint16 + EmitterAddress vaa.Address + Payload []byte + } +) var ( solanaConnectionErrors = promauto.NewCounterVec( @@ -116,7 +176,9 @@ type PostMessageData struct { func NewSolanaWatcher( rpcUrl string, + wsUrl *string, contractAddress solana.PublicKey, + rawContract string, messageEvents chan *common.MessagePublication, obsvReqC chan *gossipv1.ObservationRequest, commitment rpc.CommitmentType, @@ -124,7 +186,9 @@ func NewSolanaWatcher( chainID vaa.ChainID) *SolanaWatcher { return &SolanaWatcher{ rpcUrl: rpcUrl, + wsUrl: wsUrl, contract: contractAddress, + rawContract: rawContract, messageEvent: messageEvents, obsvReqC: obsvReqC, commitment: commitment, @@ -135,6 +199,81 @@ func NewSolanaWatcher( } } +func (s *SolanaWatcher) SetupSubscription(ctx context.Context) (error, *websocket.Conn) { + logger := supervisor.Logger(ctx) + + logger.Info("Solana watcher connecting to WS node ", zap.String("url", *s.wsUrl)) + + ws, _, err := websocket.Dial(ctx, *s.wsUrl, nil) + + if err != nil { + return err, nil + } + + s.subId = uuid.New().String() + + s.pumpData = make(chan []byte) + + const temp = `{"jsonrpc": "2.0", "id": "%s", "method": "programSubscribe", "params": ["%s", {"encoding": "base64", "commitment": "%s", "filters": []}]}` + var p = fmt.Sprintf(temp, s.subId, s.rawContract, string(s.commitment)) + + logger.Info("Subscribing using", zap.String("filter", p)) + + if err := ws.Write(ctx, websocket.MessageText, []byte(p)); err != nil { + logger.Error(fmt.Sprintf("write: %s", err.Error())) + return err, nil + } + return nil, ws +} + +func (s *SolanaWatcher) SetupWebSocket(ctx context.Context) error { + if vaa.ChainID(s.chainID) != vaa.ChainIDPythNet { + panic("unsupported chain id") + } + + logger := supervisor.Logger(ctx) + + err, ws := s.SetupSubscription(ctx) + if err != nil { + return err + } + + common.RunWithScissors(ctx, s.errC, "SolanaDataPump", func(ctx context.Context) error { + defer ws.Close(websocket.StatusNormalClosure, "") + + for { + select { + case <-ctx.Done(): + return nil + default: + rCtx, cancel := context.WithTimeout(ctx, time.Second*300) // 5 minute + defer cancel() + + if _, msg, err := ws.Read(rCtx); err != nil { + if errors.Is(err, context.DeadlineExceeded) { + // When a websocket context times out, it closes the websocket... This means we have to re-subscribe + ws.Close(websocket.StatusNormalClosure, "") + err, ws = s.SetupSubscription(ctx) + if err != nil { + return err + } + continue + } + + logger.Error(fmt.Sprintf("ReadMessage: '%s'", err.Error())) + if errors.Is(err, io.EOF) { + return err + } + } else { + s.pumpData <- msg + } + } + } + }) + + return nil +} + func (s *SolanaWatcher) Run(ctx context.Context) error { // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing) contractAddr := base58.Encode(s.contract[:]) @@ -143,16 +282,35 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { }) logger := supervisor.Logger(ctx) - errC := make(chan error) - go func() { + logger.Info("Solana watcher connecting to RPC node ", zap.String("url", s.rpcUrl)) + + s.errC = make(chan error) + s.pumpData = make(chan []byte) + + if s.wsUrl != nil { + err := s.SetupWebSocket(ctx) + if err != nil { + return err + } + } + + common.RunWithScissors(ctx, s.errC, "SolanaWatcher", func(ctx context.Context) error { timer := time.NewTicker(time.Second * 1) defer timer.Stop() for { select { case <-ctx.Done(): - return + return nil + case msg := <-s.pumpData: + err := s.processAccountSubscriptionData(ctx, logger, msg) + if err != nil { + p2p.DefaultRegistry.AddErrorCount(s.chainID, 1) + solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "account_subscription_data").Inc() + s.errC <- err + return err + } case m := <-s.obsvReqC: if m.ChainId != uint32(s.chainID) { panic("unexpected chain id") @@ -174,8 +332,8 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { if err != nil { p2p.DefaultRegistry.AddErrorCount(s.chainID, 1) solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "get_slot_error").Inc() - errC <- err - return + s.errC <- err + return err } lastSlot := s.lastSlot @@ -189,31 +347,37 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { ContractAddress: contractAddr, }) - rangeStart := lastSlot + 1 - rangeEnd := slot + if s.wsUrl == nil { + rangeStart := lastSlot + 1 + rangeEnd := slot - logger.Debug("fetched current Solana height", - zap.String("commitment", string(s.commitment)), - zap.Uint64("slot", slot), - zap.Uint64("lastSlot", lastSlot), - zap.Uint64("pendingSlots", slot-lastSlot), - zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd), - zap.Duration("took", time.Since(start))) + logger.Debug("fetched current Solana height", + zap.String("commitment", string(s.commitment)), + zap.Uint64("slot", slot), + zap.Uint64("lastSlot", lastSlot), + zap.Uint64("pendingSlots", slot-lastSlot), + zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd), + zap.Duration("took", time.Since(start))) - // Requesting each slot - for slot := rangeStart; slot <= rangeEnd; slot++ { - go s.retryFetchBlock(ctx, logger, slot, 0) + // Requesting each slot + for slot := rangeStart; slot <= rangeEnd; slot++ { + _slot := slot + common.RunWithScissors(ctx, s.errC, "SolanaWatcherSlotFetcher", func(ctx context.Context) error { + s.retryFetchBlock(ctx, logger, _slot, 0) + return nil + }) + } } s.lastSlot = slot } } - }() + }) select { case <-ctx.Done(): return ctx.Err() - case err := <-errC: + case err := <-s.errC: return err } } @@ -237,7 +401,10 @@ func (s *SolanaWatcher) retryFetchBlock(ctx context.Context, logger *zap.Logger, zap.String("commitment", string(s.commitment)), zap.Uint("retry", retry)) - go s.retryFetchBlock(ctx, logger, slot, retry+1) + common.RunWithScissors(ctx, s.errC, "retryFetchBlock", func(ctx context.Context) error { + s.retryFetchBlock(ctx, logger, slot, retry+1) + return nil + }) } } @@ -279,10 +446,11 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot // Schedule a single retry just in case the Solana node was confused about the block being missing. if emptyRetry < maxEmptyRetry { - go func() { + common.RunWithScissors(ctx, s.errC, "delayedFetchBlock", func(ctx context.Context) error { time.Sleep(retryDelay) s.fetchBlock(ctx, logger, slot, emptyRetry+1) - }() + return nil + }) } return true } else { @@ -465,7 +633,10 @@ func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logg logger.Debug("fetching VAA account", zap.Stringer("acc", acc), zap.Stringer("signature", signature), zap.Uint64("slot", slot), zap.Int("idx", idx)) - go s.retryFetchMessageAccount(ctx, logger, acc, slot, 0) + common.RunWithScissors(ctx, s.errC, "retryFetchMessageAccount", func(ctx context.Context) error { + s.retryFetchMessageAccount(ctx, logger, acc, slot, 0) + return nil + }) return true, nil } @@ -491,7 +662,10 @@ func (s *SolanaWatcher) retryFetchMessageAccount(ctx context.Context, logger *za zap.String("commitment", string(s.commitment)), zap.Uint("retry", retry)) - go s.retryFetchMessageAccount(ctx, logger, acc, slot, retry+1) + common.RunWithScissors(ctx, s.errC, "retryFetchMessageAccount", func(ctx context.Context) error { + s.retryFetchMessageAccount(ctx, logger, acc, slot, retry+1) + return nil + }) } } @@ -548,6 +722,64 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log return false } +func (s *SolanaWatcher) processAccountSubscriptionData(ctx context.Context, logger *zap.Logger, data []byte) error { + // Do we have an error on the subscription? + var e EventSubscriptionError + err := json.Unmarshal(data, &e) + if err != nil { + logger.Error(*s.wsUrl, zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(s.chainID, 1) + return err + } + + if e.Error.Message != nil { + return errors.New(*e.Error.Message) + } + + var res EventSubscriptionData + err = json.Unmarshal(data, &res) + if err != nil { + logger.Error(*s.wsUrl, zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(s.chainID, 1) + return err + } + + if res.Params == nil { + return nil + } + + value := (*res.Params).Result.Value + + if value.Account.Owner != s.rawContract { + // We got a message for the wrong contract on the websocket... uncomfortable... + solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "invalid_websocket_account").Inc() + return errors.New("Update for account with wrong owner") + } + + data, err = base64.StdEncoding.DecodeString(value.Account.Data[0]) + if err != nil { + logger.Error(*s.wsUrl, zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(s.chainID, 1) + return err + } + + // ignore truncated messages + if len(data) < 3 { + return nil + } + + // Other accounts owned by the wormhole contract seem to send updates... + switch string(data[:3]) { + case accountPrefixReliable, accountPrefixUnreliable: + acc := solana.PublicKeyFromBytes([]byte(value.Pubkey)) + s.processMessageAccount(logger, data, acc) + default: + break + } + + return nil +} + func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey) { proposal, err := ParseMessagePublicationAccount(data) if err != nil { @@ -618,22 +850,6 @@ func (s *SolanaWatcher) GetLatestFinalizedBlockNumber() uint64 { return s.latestBlockNumber } -type ( - MessagePublicationAccount struct { - VaaVersion uint8 - // Borsh does not seem to support booleans, so 0=false / 1=true - ConsistencyLevel uint8 - VaaTime uint32 - VaaSignatureAccount vaa.Address - SubmissionTime uint32 - Nonce uint32 - Sequence uint64 - EmitterChain uint16 - EmitterAddress vaa.Address - Payload []byte - } -) - func ParseMessagePublicationAccount(data []byte) (*MessagePublicationAccount, error) { prop := &MessagePublicationAccount{} // Skip the b"msg" prefix