diff --git a/DEVELOP.md b/DEVELOP.md index ada707e51..5dbed3c00 100644 --- a/DEVELOP.md +++ b/DEVELOP.md @@ -63,3 +63,10 @@ Tear down cluster: tilt down --delete-namespaces Once you're done, press Ctrl-C. Run `tilt down` to tear down the devnet. + + +### Post messages + +To Solana: + + kubectl exec solana-devnet-0 -c setup -- client post-message Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o 1 confirmed ffff diff --git a/bridge/pkg/solana/client.go b/bridge/pkg/solana/client.go index e602a43e3..5bce8ac5d 100644 --- a/bridge/pkg/solana/client.go +++ b/bridge/pkg/solana/client.go @@ -54,6 +54,8 @@ var ( }, []string{"operation", "commitment"}) ) +const rpcTimeout = time.Second * 5 + func NewSolanaWatcher(wsUrl, rpcUrl string, bridgeAddress solana.PublicKey, messageEvents chan *common.MessagePublication) *SolanaWatcher { return &SolanaWatcher{bridge: bridgeAddress, wsUrl: wsUrl, rpcUrl: rpcUrl, messageEvent: messageEvents} } @@ -68,9 +70,10 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { rpcClient := rpc.New(s.rpcUrl) logger := supervisor.Logger(ctx) errC := make(chan error) + var lastSlot uint64 go func() { - timer := time.NewTicker(time.Second * 5) + timer := time.NewTicker(time.Second * 1) defer timer.Stop() for { @@ -78,139 +81,69 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { case <-ctx.Done(): return case <-timer.C: - func() { - // Get current slot height - rCtx, cancel := context.WithTimeout(ctx, time.Second*5) - defer cancel() - start := time.Now() - slot, err := rpcClient.GetSlot(rCtx, "") - queryLatency.WithLabelValues("get_slot", "processed").Observe(time.Since(start).Seconds()) - if err != nil { - solanaConnectionErrors.WithLabelValues("get_slot_error").Inc() - errC <- err - return - } - currentSolanaHeight.Set(float64(slot)) - readiness.SetReady(common.ReadinessSolanaSyncing) - p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{ - Height: int64(slot), - BridgeAddress: bridgeAddr, - }) + commitment := rpc.CommitmentFinalized - logger.Info("current Solana height", zap.Uint64("slot", uint64(slot))) + // Get current slot height + rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + defer cancel() + start := time.Now() + slot, err := rpcClient.GetSlot(rCtx, commitment) + queryLatency.WithLabelValues("get_slot", string(commitment)).Observe(time.Since(start).Seconds()) + if err != nil { + solanaConnectionErrors.WithLabelValues("get_slot_error").Inc() + errC <- err + return + } + if lastSlot == 0 { + lastSlot = slot - 1 + } + currentSolanaHeight.Set(float64(slot)) + readiness.SetReady(common.ReadinessSolanaSyncing) + p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{ + Height: int64(slot), + BridgeAddress: bridgeAddr, + }) + logger.Info("fetched current Solana height", + zap.String("commitment", string(commitment)), + zap.Uint64("slot", slot), + zap.Uint64("lastSlot", lastSlot), + zap.Uint64("pendingSlots", slot-lastSlot), + zap.Duration("took", time.Since(start))) - // Find MessagePublicationAccount accounts without a VAA - rCtx, cancel = context.WithTimeout(ctx, time.Second*5) - defer cancel() - start = time.Now() + // Determine which slots we're missing + // + // Get list of confirmed blocks since the last request. The result + // won't contain skipped slots. + rangeStart := lastSlot + 1 + rangeEnd := slot + rCtx, cancel = context.WithTimeout(ctx, rpcTimeout) + defer cancel() + start = time.Now() + slots, err := rpcClient.GetConfirmedBlocks(rCtx, rangeStart, &rangeEnd, commitment) + queryLatency.WithLabelValues("get_confirmed_blocks", string(commitment)).Observe(time.Since(start).Seconds()) + if err != nil { + solanaConnectionErrors.WithLabelValues("get_confirmed_blocks_error").Inc() + errC <- err + return + } - // Get finalized accounts - fAccounts, err := rpcClient.GetProgramAccountsWithOpts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{ - Commitment: rpc.CommitmentFinalized, - Filters: []rpc.RPCFilter{ - { - Memcmp: &rpc.RPCFilterMemcmp{ - Offset: 0, // Start of the account - Bytes: solana.Base58{'m', 's', 'g'}, // Prefix of the posted message accounts - }, - }, - { - Memcmp: &rpc.RPCFilterMemcmp{ - Offset: 4, // Start of the ConsistencyLevel value - Bytes: solana.Base58{32}, // Only grab messages that require max confirmations - }, - }, - { - Memcmp: &rpc.RPCFilterMemcmp{ - Offset: 5, // Offset of VaaTime - Bytes: solana.Base58{0, 0, 0, 0}, // This means this VAA hasn't been signed yet - }, - }, - }, - }) - queryLatency.WithLabelValues("get_program_accounts", "max").Observe(time.Since(start).Seconds()) - if err != nil { - solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc() - errC <- err - return + logger.Info("fetched slots in range", + zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd), + zap.Duration("took", time.Since(start)), + zap.String("commitment", string(commitment))) + + // Requesting each slot + for _, slot := range slots { + if slot <= lastSlot { + // Skip out-of-range result + // https://github.com/solana-labs/solana/issues/18946 + continue } - // Get confirmed accounts - cAccounts, err := rpcClient.GetProgramAccountsWithOpts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{ - Commitment: rpc.CommitmentConfirmed, - Filters: []rpc.RPCFilter{ - { - Memcmp: &rpc.RPCFilterMemcmp{ - Offset: 0, // Start of the account - Bytes: solana.Base58{'m', 's', 'g'}, // Prefix of the posted message accounts - }, - }, - { - Memcmp: &rpc.RPCFilterMemcmp{ - Offset: 4, // Start of the ConsistencyLevel value - Bytes: solana.Base58{1}, // Only grab messages that require the Confirmed level - }, - }, - { - Memcmp: &rpc.RPCFilterMemcmp{ - Offset: 5, // Offset of VaaTime - Bytes: solana.Base58{0, 0, 0, 0}, // This means this VAA hasn't been signed yet - }, - }, - }, - }) - queryLatency.WithLabelValues("get_program_accounts", "single").Observe(time.Since(start).Seconds()) - if err != nil { - solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc() - errC <- err - return - } + go s.fetchBlock(ctx, logger, commitment, rpcClient, slot) + } - // Merge accounts - accounts := append(fAccounts, cAccounts...) - - logger.Info("fetched transfer proposals without VAA", - zap.Int("n", len(accounts)), - zap.Duration("took", time.Since(start)), - ) - - for _, acc := range accounts { - proposal, err := ParseTransferOutProposal(acc.Account.Data.GetBinary()) - if err != nil { - solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc() - logger.Warn( - "failed to parse transfer proposal", - zap.Stringer("account", acc.Pubkey), - zap.Error(err), - ) - continue - } - - // VAA submitted - if proposal.VaaTime != 0 { - solanaAccountSkips.WithLabelValues("is_submitted_vaa").Inc() - continue - } - - var txHash eth_common.Hash - copy(txHash[:], acc.Pubkey[:]) - - lock := &common.MessagePublication{ - TxHash: txHash, - Timestamp: time.Unix(int64(proposal.SubmissionTime), 0), - Nonce: proposal.Nonce, - Sequence: proposal.Sequence, - EmitterChain: vaa.ChainIDSolana, - EmitterAddress: proposal.EmitterAddress, - Payload: proposal.Payload, - ConsistencyLevel: proposal.ConsistencyLevel, - } - - solanaMessagesConfirmed.Inc() - logger.Info("found message account without VAA", zap.Stringer("address", acc.Pubkey)) - s.messageEvent <- lock - } - }() + lastSlot = slot } } }() @@ -223,6 +156,201 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { } } +func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, commitment rpc.CommitmentType, rpcClient *rpc.Client, slot uint64) { + logger.Debug("requesting block", zap.Uint64("slot", slot), zap.String("commitment", string(commitment))) + rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + defer cancel() + start := time.Now() + rewards := false + out, err := rpcClient.GetConfirmedBlockWithOpts(rCtx, slot, &rpc.GetConfirmedBlockOpts{ + Encoding: "json", + TransactionDetails: "full", + Rewards: &rewards, + Commitment: commitment, + }) + + queryLatency.WithLabelValues("get_confirmed_block", string(commitment)).Observe(time.Since(start).Seconds()) + if err != nil { + solanaConnectionErrors.WithLabelValues("get_confirmed_block_error").Inc() + logger.Error("failed to request block", zap.Error(err), zap.Uint64("slot", slot), + zap.String("commitment", string(commitment))) + return + } + + if out == nil { + logger.Error("nil response when requesting block", zap.Error(err), zap.Uint64("slot", slot), + zap.String("commitment", string(commitment))) + return + } + + logger.Info("fetched block", + zap.Uint64("slot", slot), + zap.Int("num_tx", len(out.Transactions)), + zap.Duration("took", time.Since(start)), + zap.String("commitment", string(commitment))) + +OUTER: + for _, tx := range out.Transactions { + signature := tx.Transaction.Signatures[0] + var programIndex uint16 + for n, key := range tx.Transaction.Message.AccountKeys { + if key.Equals(s.bridge) { + programIndex = uint16(n) + } + } + if programIndex == 0 { + continue + } + + logger.Info("found Wormhole transaction", + zap.Stringer("signature", signature), + zap.Uint64("slot", slot), + zap.String("commitment", string(commitment))) + + // Find top-level instructions + for _, inst := range tx.Transaction.Message.Instructions { + if inst.ProgramIDIndex == programIndex { + // The second account in a well-formed Wormhole instruction is the + // VAA program account. + if len(inst.Accounts) != 9 { + logger.Error("malformed Wormhole instruction: wrong number of accounts", + zap.Stringer("signature", signature), + zap.Uint64("slot", slot), + zap.String("commitment", string(commitment))) + continue OUTER + } + + acc := tx.Transaction.Message.AccountKeys[inst.Accounts[1]] + go s.fetchMessageAccount(ctx, logger, acc, rpcClient, commitment, slot) + continue OUTER + } + } + + // Call GetConfirmedTransaction to get at innerTransactions + rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + defer cancel() + start := time.Now() + tr, err := rpcClient.GetConfirmedTransaction(rCtx, signature) + queryLatency.WithLabelValues("get_confirmed_transaction", string(commitment)).Observe(time.Since(start).Seconds()) + if err != nil { + solanaConnectionErrors.WithLabelValues("get_confirmed_transaction_error").Inc() + logger.Error("failed to request transaction", + zap.Error(err), + zap.Uint64("slot", slot), + zap.String("commitment", string(commitment)), + zap.Stringer("signature", signature)) + return + } + + logger.Info("fetched transaction", + zap.Uint64("slot", slot), + zap.String("commitment", string(commitment)), + zap.Stringer("signature", signature), + zap.Duration("took", time.Since(start))) + + for _, inner := range tr.Meta.InnerInstructions { + for _, inst := range inner.Instructions { + if inst.ProgramIDIndex == programIndex { + if len(inst.Accounts) != 9 { + logger.Error("malformed Wormhole instruction: wrong number of accounts", + zap.Stringer("signature", signature), + zap.Uint64("slot", slot), + zap.String("commitment", string(commitment))) + continue OUTER + } + + acc := tx.Transaction.Message.AccountKeys[inst.Accounts[1]] + go s.fetchMessageAccount(ctx, logger, acc, rpcClient, commitment, slot) + continue OUTER + } + } + } + } +} + +func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, rpcClient *rpc.Client, commitment rpc.CommitmentType, slot uint64) { + // Fetching account + rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + defer cancel() + start := time.Now() + info, err := rpcClient.GetAccountInfoWithOpts(rCtx, acc, &rpc.GetAccountInfoOpts{ + Encoding: solana.EncodingBase64, + Commitment: commitment, + }) + queryLatency.WithLabelValues("get_account_info", string(commitment)).Observe(time.Since(start).Seconds()) + if err != nil { + solanaConnectionErrors.WithLabelValues("get_account_info_error").Inc() + logger.Error("failed to request account", + zap.Error(err), + zap.Uint64("slot", slot), + zap.String("commitment", string(commitment)), + zap.Stringer("account", acc)) + return + } + + if !info.Value.Owner.Equals(s.bridge) { + solanaConnectionErrors.WithLabelValues("account_owner_mismatch").Inc() + logger.Error("account has invalid owner", + zap.Uint64("slot", slot), + zap.String("commitment", string(commitment)), + zap.Stringer("account", acc), + zap.Stringer("unexpected_owner", info.Value.Owner)) + return + } + + data := info.Value.Data.GetBinary() + if string(data[:3]) != "msg" { + solanaConnectionErrors.WithLabelValues("bad_account_data").Inc() + logger.Error("account is not a message account", + zap.Uint64("slot", slot), + zap.String("commitment", string(commitment)), + zap.Stringer("account", acc)) + return + } + + logger.Info("found valid VAA account", + zap.Uint64("slot", slot), + zap.String("commitment", string(commitment)), + zap.Stringer("account", acc), + zap.Binary("data", data)) + + s.processMessageAccount(logger, data, acc) +} + +func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey) { + proposal, err := ParseTransferOutProposal(data) + if err != nil { + solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc() + logger.Error( + "failed to parse transfer proposal", + zap.Stringer("account", acc), + zap.Binary("data", data), + zap.Error(err)) + return + } + + var txHash eth_common.Hash + copy(txHash[:], acc[:]) + + lock := &common.MessagePublication{ + TxHash: txHash, + Timestamp: time.Unix(int64(proposal.SubmissionTime), 0), + Nonce: proposal.Nonce, + Sequence: proposal.Sequence, + EmitterChain: vaa.ChainIDSolana, + EmitterAddress: proposal.EmitterAddress, + Payload: proposal.Payload, + ConsistencyLevel: proposal.ConsistencyLevel, + } + + solanaMessagesConfirmed.Inc() + + logger.Info("message observed", + zap.Stringer("account", acc)) + + s.messageEvent <- lock +} + type ( MessagePublicationAccount struct { VaaVersion uint8 diff --git a/docs/operations.md b/docs/operations.md index b3f9c2266..0c085f109 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -31,6 +31,22 @@ you on whether an event has actually been observed. The whole point of Wormhole We strongly recommend running your own full nodes for both testnet and mainnet (where applicable) so you can test changes for your mainnet full nodes and gain operational experience. +### Solana node requirements + +Your Solana RPC node needs the following parameters enabled: + +``` +--enable-rpc-transaction-history +--enable-cpi-and-log-storage +``` + +`--enable-rpc-transaction-history` enables historic transactions to be retrieved via the *getConfirmedBlock* API, which is required for Wormhole to find transactions. + +`--enable-cpi-and-log-storage` stores metadata about CPI calls. + +Note that these indexes require extra disk space and may slow down catchup. The first startup after +adding these parameters will be slow since Solana needs to recreate all indexes. + ### Ethereum node requirements In order to observe events on the Ethereum chain, you need access to an Ethereum RPC endpoint. The most common