node/pythnet: websocket subscription (#2219)

* node/pythnet: websocket subscription
This commit is contained in:
jumpsiegel 2023-01-18 10:24:55 -06:00 committed by GitHub
parent e6d3bb8731
commit 3580f51ccf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 299 additions and 71 deletions

View File

@ -45,6 +45,7 @@ config.define_bool("aptos", False, "Enable Aptos component")
config.define_bool("algorand", False, "Enable Algorand component")
config.define_bool("evm2", False, "Enable second Eth component")
config.define_bool("solana", False, "Enable Solana component")
config.define_bool("pythnet", False, "Enable PythNet component")
config.define_bool("terra_classic", False, "Enable Terra Classic component")
config.define_bool("terra2", False, "Enable Terra 2 component")
config.define_bool("spy_relayer", False, "Enable spy relayer")
@ -69,6 +70,7 @@ aptos = cfg.get("aptos", ci)
sui = cfg.get("sui", False)
evm2 = cfg.get("evm2", ci)
solana = cfg.get("solana", ci)
pythnet = cfg.get("pythnet", False)
terra_classic = cfg.get("terra_classic", ci)
terra2 = cfg.get("terra2", ci)
wormchain = cfg.get("wormchain", ci)
@ -208,6 +210,18 @@ def build_node_yaml():
"http://solana-devnet:8899",
]
if pythnet:
container["command"] += [
"--pythnetRPC",
# "http://solana-devnet:8899",
"http://pythnet.rpcpool.com",
"--pythnetWS",
# "ws://solana-devnet:8900",
"wss://pythnet.rpcpool.com",
"--pythnetContract",
"H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU",
]
if terra_classic:
container["command"] += [
"--terraWS",
@ -270,7 +284,7 @@ k8s_yaml_with_ns(build_node_yaml())
guardian_resource_deps = ["eth-devnet"]
if evm2:
guardian_resource_deps = guardian_resource_deps + ["eth-devnet2"]
if solana:
if solana or pythnet:
guardian_resource_deps = guardian_resource_deps + ["solana-devnet"]
if near:
guardian_resource_deps = guardian_resource_deps + ["near"]
@ -366,7 +380,7 @@ k8s_resource(
trigger_mode = trigger_mode,
)
if solana:
if solana or pythnet:
# solana client cli (used for devnet setup)
docker_build(

View File

@ -165,6 +165,7 @@ var (
pythnetContract *string
pythnetRPC *string
pythnetWS *string
arbitrumRPC *string
arbitrumContract *string
@ -307,10 +308,11 @@ func init() {
suiAccount = NodeCmd.Flags().String("suiAccount", "", "sui account")
suiPackage = NodeCmd.Flags().String("suiPackage", "", "sui package")
solanaRPC = NodeCmd.Flags().String("solanaRPC", "", "Solana RPC URL (required")
solanaRPC = NodeCmd.Flags().String("solanaRPC", "", "Solana RPC URL (required)")
pythnetContract = NodeCmd.Flags().String("pythnetContract", "", "Address of the PythNet program (required)")
pythnetRPC = NodeCmd.Flags().String("pythnetRPC", "", "PythNet RPC URL (required")
pythnetRPC = NodeCmd.Flags().String("pythnetRPC", "", "PythNet RPC URL (required)")
pythnetWS = NodeCmd.Flags().String("pythnetWS", "", "PythNet WS URL")
arbitrumRPC = NodeCmd.Flags().String("arbitrumRPC", "", "Arbitrum RPC URL")
arbitrumContract = NodeCmd.Flags().String("arbitrumContract", "", "Arbitrum contract address")
@ -1280,11 +1282,11 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessSolanaSyncing)
chainObsvReqC[vaa.ChainIDSolana] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "solwatch-confirmed",
solana.NewSolanaWatcher(*solanaRPC, solAddress, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run); err != nil {
common.WrapWithScissors(solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run, "solwatch-confirmed")); err != nil {
return err
}
solanaFinalizedWatcher = solana.NewSolanaWatcher(*solanaRPC, solAddress, lockC, chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, common.ReadinessSolanaSyncing, vaa.ChainIDSolana)
if err := supervisor.Run(ctx, "solwatch-finalized", solanaFinalizedWatcher.Run); err != nil {
solanaFinalizedWatcher = solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, lockC, chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, common.ReadinessSolanaSyncing, vaa.ChainIDSolana)
if err := supervisor.Run(ctx, "solwatch-finalized", common.WrapWithScissors(solanaFinalizedWatcher.Run, "solwatch-finalized")); err != nil {
return err
}
}
@ -1294,11 +1296,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessPythNetSyncing)
chainObsvReqC[vaa.ChainIDPythNet] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "pythwatch-confirmed",
solana.NewSolanaWatcher(*pythnetRPC, pythnetAddress, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "pythwatch-finalized",
solana.NewSolanaWatcher(*pythnetRPC, pythnetAddress, lockC, chainObsvReqC[vaa.ChainIDPythNet], rpc.CommitmentFinalized, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run); err != nil {
common.WrapWithScissors(solana.NewSolanaWatcher(*pythnetRPC, pythnetWS, pythnetAddress, *pythnetContract, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run, "pythwatch-confirmed")); err != nil {
return err
}
}

View File

@ -4,9 +4,13 @@ import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"encoding/base64"
"encoding/json"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
@ -16,34 +20,90 @@ import (
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/gagliardetto/solana-go/rpc/jsonrpc"
"github.com/google/uuid"
"github.com/mr-tron/base58"
"github.com/near/borsh-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"nhooyr.io/websocket"
)
type SolanaWatcher struct {
contract solana.PublicKey
rpcUrl string
commitment rpc.CommitmentType
messageEvent chan *common.MessagePublication
obsvReqC chan *gossipv1.ObservationRequest
rpcClient *rpc.Client
// Readiness component
readiness readiness.Component
// VAA ChainID of the network we're connecting to.
chainID vaa.ChainID
// Human readable name of network
networkName string
// The last slot processed by the watcher.
lastSlot uint64
type (
SolanaWatcher struct {
contract solana.PublicKey
rawContract string
rpcUrl string
wsUrl *string
commitment rpc.CommitmentType
messageEvent chan *common.MessagePublication
obsvReqC chan *gossipv1.ObservationRequest
errC chan error
pumpData chan []byte
rpcClient *rpc.Client
// Readiness component
readiness readiness.Component
// VAA ChainID of the network we're connecting to.
chainID vaa.ChainID
// Human readable name of network
networkName string
// The last slot processed by the watcher.
lastSlot uint64
// subscriber id
subId string
// latestFinalizedBlockNumber is the latest block processed by this watcher.
latestBlockNumber uint64
latestBlockNumberMu sync.Mutex
}
// latestFinalizedBlockNumber is the latest block processed by this watcher.
latestBlockNumber uint64
latestBlockNumberMu sync.Mutex
}
EventSubscriptionError struct {
Jsonrpc string `json:"jsonrpc"`
Error struct {
Code int `json:"code"`
Message *string `json:"message"`
} `json:"error"`
ID string `json:"id"`
}
EventSubscriptionData struct {
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
Params *struct {
Result struct {
Context struct {
Slot int64 `json:"slot"`
} `json:"context"`
Value struct {
Pubkey string `json:"pubkey"`
Account struct {
Lamports int64 `json:"lamports"`
Data []string `json:"data"`
Owner string `json:"owner"`
Executable bool `json:"executable"`
RentEpoch int64 `json:"rentEpoch"`
} `json:"account"`
} `json:"value"`
} `json:"result"`
Subscription int `json:"subscription"`
} `json:"params"`
}
MessagePublicationAccount struct {
VaaVersion uint8
// Borsh does not seem to support booleans, so 0=false / 1=true
ConsistencyLevel uint8
VaaTime uint32
VaaSignatureAccount vaa.Address
SubmissionTime uint32
Nonce uint32
Sequence uint64
EmitterChain uint16
EmitterAddress vaa.Address
Payload []byte
}
)
var (
solanaConnectionErrors = promauto.NewCounterVec(
@ -116,7 +176,9 @@ type PostMessageData struct {
func NewSolanaWatcher(
rpcUrl string,
wsUrl *string,
contractAddress solana.PublicKey,
rawContract string,
messageEvents chan *common.MessagePublication,
obsvReqC chan *gossipv1.ObservationRequest,
commitment rpc.CommitmentType,
@ -124,7 +186,9 @@ func NewSolanaWatcher(
chainID vaa.ChainID) *SolanaWatcher {
return &SolanaWatcher{
rpcUrl: rpcUrl,
wsUrl: wsUrl,
contract: contractAddress,
rawContract: rawContract,
messageEvent: messageEvents,
obsvReqC: obsvReqC,
commitment: commitment,
@ -135,6 +199,81 @@ func NewSolanaWatcher(
}
}
func (s *SolanaWatcher) SetupSubscription(ctx context.Context) (error, *websocket.Conn) {
logger := supervisor.Logger(ctx)
logger.Info("Solana watcher connecting to WS node ", zap.String("url", *s.wsUrl))
ws, _, err := websocket.Dial(ctx, *s.wsUrl, nil)
if err != nil {
return err, nil
}
s.subId = uuid.New().String()
s.pumpData = make(chan []byte)
const temp = `{"jsonrpc": "2.0", "id": "%s", "method": "programSubscribe", "params": ["%s", {"encoding": "base64", "commitment": "%s", "filters": []}]}`
var p = fmt.Sprintf(temp, s.subId, s.rawContract, string(s.commitment))
logger.Info("Subscribing using", zap.String("filter", p))
if err := ws.Write(ctx, websocket.MessageText, []byte(p)); err != nil {
logger.Error(fmt.Sprintf("write: %s", err.Error()))
return err, nil
}
return nil, ws
}
func (s *SolanaWatcher) SetupWebSocket(ctx context.Context) error {
if vaa.ChainID(s.chainID) != vaa.ChainIDPythNet {
panic("unsupported chain id")
}
logger := supervisor.Logger(ctx)
err, ws := s.SetupSubscription(ctx)
if err != nil {
return err
}
common.RunWithScissors(ctx, s.errC, "SolanaDataPump", func(ctx context.Context) error {
defer ws.Close(websocket.StatusNormalClosure, "")
for {
select {
case <-ctx.Done():
return nil
default:
rCtx, cancel := context.WithTimeout(ctx, time.Second*300) // 5 minute
defer cancel()
if _, msg, err := ws.Read(rCtx); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
// When a websocket context times out, it closes the websocket... This means we have to re-subscribe
ws.Close(websocket.StatusNormalClosure, "")
err, ws = s.SetupSubscription(ctx)
if err != nil {
return err
}
continue
}
logger.Error(fmt.Sprintf("ReadMessage: '%s'", err.Error()))
if errors.Is(err, io.EOF) {
return err
}
} else {
s.pumpData <- msg
}
}
}
})
return nil
}
func (s *SolanaWatcher) Run(ctx context.Context) error {
// Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
contractAddr := base58.Encode(s.contract[:])
@ -143,16 +282,35 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
})
logger := supervisor.Logger(ctx)
errC := make(chan error)
go func() {
logger.Info("Solana watcher connecting to RPC node ", zap.String("url", s.rpcUrl))
s.errC = make(chan error)
s.pumpData = make(chan []byte)
if s.wsUrl != nil {
err := s.SetupWebSocket(ctx)
if err != nil {
return err
}
}
common.RunWithScissors(ctx, s.errC, "SolanaWatcher", func(ctx context.Context) error {
timer := time.NewTicker(time.Second * 1)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
return nil
case msg := <-s.pumpData:
err := s.processAccountSubscriptionData(ctx, logger, msg)
if err != nil {
p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "account_subscription_data").Inc()
s.errC <- err
return err
}
case m := <-s.obsvReqC:
if m.ChainId != uint32(s.chainID) {
panic("unexpected chain id")
@ -174,8 +332,8 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
if err != nil {
p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "get_slot_error").Inc()
errC <- err
return
s.errC <- err
return err
}
lastSlot := s.lastSlot
@ -189,31 +347,37 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
ContractAddress: contractAddr,
})
rangeStart := lastSlot + 1
rangeEnd := slot
if s.wsUrl == nil {
rangeStart := lastSlot + 1
rangeEnd := slot
logger.Debug("fetched current Solana height",
zap.String("commitment", string(s.commitment)),
zap.Uint64("slot", slot),
zap.Uint64("lastSlot", lastSlot),
zap.Uint64("pendingSlots", slot-lastSlot),
zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd),
zap.Duration("took", time.Since(start)))
logger.Debug("fetched current Solana height",
zap.String("commitment", string(s.commitment)),
zap.Uint64("slot", slot),
zap.Uint64("lastSlot", lastSlot),
zap.Uint64("pendingSlots", slot-lastSlot),
zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd),
zap.Duration("took", time.Since(start)))
// Requesting each slot
for slot := rangeStart; slot <= rangeEnd; slot++ {
go s.retryFetchBlock(ctx, logger, slot, 0)
// Requesting each slot
for slot := rangeStart; slot <= rangeEnd; slot++ {
_slot := slot
common.RunWithScissors(ctx, s.errC, "SolanaWatcherSlotFetcher", func(ctx context.Context) error {
s.retryFetchBlock(ctx, logger, _slot, 0)
return nil
})
}
}
s.lastSlot = slot
}
}
}()
})
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
case err := <-s.errC:
return err
}
}
@ -237,7 +401,10 @@ func (s *SolanaWatcher) retryFetchBlock(ctx context.Context, logger *zap.Logger,
zap.String("commitment", string(s.commitment)),
zap.Uint("retry", retry))
go s.retryFetchBlock(ctx, logger, slot, retry+1)
common.RunWithScissors(ctx, s.errC, "retryFetchBlock", func(ctx context.Context) error {
s.retryFetchBlock(ctx, logger, slot, retry+1)
return nil
})
}
}
@ -279,10 +446,11 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot
// Schedule a single retry just in case the Solana node was confused about the block being missing.
if emptyRetry < maxEmptyRetry {
go func() {
common.RunWithScissors(ctx, s.errC, "delayedFetchBlock", func(ctx context.Context) error {
time.Sleep(retryDelay)
s.fetchBlock(ctx, logger, slot, emptyRetry+1)
}()
return nil
})
}
return true
} else {
@ -465,7 +633,10 @@ func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logg
logger.Debug("fetching VAA account", zap.Stringer("acc", acc),
zap.Stringer("signature", signature), zap.Uint64("slot", slot), zap.Int("idx", idx))
go s.retryFetchMessageAccount(ctx, logger, acc, slot, 0)
common.RunWithScissors(ctx, s.errC, "retryFetchMessageAccount", func(ctx context.Context) error {
s.retryFetchMessageAccount(ctx, logger, acc, slot, 0)
return nil
})
return true, nil
}
@ -491,7 +662,10 @@ func (s *SolanaWatcher) retryFetchMessageAccount(ctx context.Context, logger *za
zap.String("commitment", string(s.commitment)),
zap.Uint("retry", retry))
go s.retryFetchMessageAccount(ctx, logger, acc, slot, retry+1)
common.RunWithScissors(ctx, s.errC, "retryFetchMessageAccount", func(ctx context.Context) error {
s.retryFetchMessageAccount(ctx, logger, acc, slot, retry+1)
return nil
})
}
}
@ -548,6 +722,64 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log
return false
}
func (s *SolanaWatcher) processAccountSubscriptionData(ctx context.Context, logger *zap.Logger, data []byte) error {
// Do we have an error on the subscription?
var e EventSubscriptionError
err := json.Unmarshal(data, &e)
if err != nil {
logger.Error(*s.wsUrl, zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
return err
}
if e.Error.Message != nil {
return errors.New(*e.Error.Message)
}
var res EventSubscriptionData
err = json.Unmarshal(data, &res)
if err != nil {
logger.Error(*s.wsUrl, zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
return err
}
if res.Params == nil {
return nil
}
value := (*res.Params).Result.Value
if value.Account.Owner != s.rawContract {
// We got a message for the wrong contract on the websocket... uncomfortable...
solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "invalid_websocket_account").Inc()
return errors.New("Update for account with wrong owner")
}
data, err = base64.StdEncoding.DecodeString(value.Account.Data[0])
if err != nil {
logger.Error(*s.wsUrl, zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
return err
}
// ignore truncated messages
if len(data) < 3 {
return nil
}
// Other accounts owned by the wormhole contract seem to send updates...
switch string(data[:3]) {
case accountPrefixReliable, accountPrefixUnreliable:
acc := solana.PublicKeyFromBytes([]byte(value.Pubkey))
s.processMessageAccount(logger, data, acc)
default:
break
}
return nil
}
func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey) {
proposal, err := ParseMessagePublicationAccount(data)
if err != nil {
@ -618,22 +850,6 @@ func (s *SolanaWatcher) GetLatestFinalizedBlockNumber() uint64 {
return s.latestBlockNumber
}
type (
MessagePublicationAccount struct {
VaaVersion uint8
// Borsh does not seem to support booleans, so 0=false / 1=true
ConsistencyLevel uint8
VaaTime uint32
VaaSignatureAccount vaa.Address
SubmissionTime uint32
Nonce uint32
Sequence uint64
EmitterChain uint16
EmitterAddress vaa.Address
Payload []byte
}
)
func ParseMessagePublicationAccount(data []byte) (*MessagePublicationAccount, error) {
prop := &MessagePublicationAccount{}
// Skip the b"msg" prefix