node/pkg/solana: add initial logic for block-by-block requests

CPI part is untested.

Commitment level is hardcoded to "finalized", but can be refactored
to use both "committed" and "finalized" later.

certusone/wormhole#248

Change-Id: I5ae7711c306b33650367e6f7a417ab9d88753612
This commit is contained in:
Leo 2021-07-28 18:33:12 +02:00 committed by Leopold Schabel
parent b70466d185
commit d5e6c844e3
3 changed files with 279 additions and 128 deletions

View File

@ -63,3 +63,10 @@ Tear down cluster:
tilt down --delete-namespaces
Once you're done, press Ctrl-C. Run `tilt down` to tear down the devnet.
### Post messages
To Solana:
kubectl exec solana-devnet-0 -c setup -- client post-message Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o 1 confirmed ffff

View File

@ -54,6 +54,8 @@ var (
}, []string{"operation", "commitment"})
)
const rpcTimeout = time.Second * 5
func NewSolanaWatcher(wsUrl, rpcUrl string, bridgeAddress solana.PublicKey, messageEvents chan *common.MessagePublication) *SolanaWatcher {
return &SolanaWatcher{bridge: bridgeAddress, wsUrl: wsUrl, rpcUrl: rpcUrl, messageEvent: messageEvents}
}
@ -68,9 +70,10 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
rpcClient := rpc.New(s.rpcUrl)
logger := supervisor.Logger(ctx)
errC := make(chan error)
var lastSlot uint64
go func() {
timer := time.NewTicker(time.Second * 5)
timer := time.NewTicker(time.Second * 1)
defer timer.Stop()
for {
@ -78,139 +81,69 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
case <-ctx.Done():
return
case <-timer.C:
func() {
// Get current slot height
rCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
start := time.Now()
slot, err := rpcClient.GetSlot(rCtx, "")
queryLatency.WithLabelValues("get_slot", "processed").Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_slot_error").Inc()
errC <- err
return
}
currentSolanaHeight.Set(float64(slot))
readiness.SetReady(common.ReadinessSolanaSyncing)
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
Height: int64(slot),
BridgeAddress: bridgeAddr,
})
commitment := rpc.CommitmentFinalized
logger.Info("current Solana height", zap.Uint64("slot", uint64(slot)))
// Get current slot height
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
start := time.Now()
slot, err := rpcClient.GetSlot(rCtx, commitment)
queryLatency.WithLabelValues("get_slot", string(commitment)).Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_slot_error").Inc()
errC <- err
return
}
if lastSlot == 0 {
lastSlot = slot - 1
}
currentSolanaHeight.Set(float64(slot))
readiness.SetReady(common.ReadinessSolanaSyncing)
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
Height: int64(slot),
BridgeAddress: bridgeAddr,
})
logger.Info("fetched current Solana height",
zap.String("commitment", string(commitment)),
zap.Uint64("slot", slot),
zap.Uint64("lastSlot", lastSlot),
zap.Uint64("pendingSlots", slot-lastSlot),
zap.Duration("took", time.Since(start)))
// Find MessagePublicationAccount accounts without a VAA
rCtx, cancel = context.WithTimeout(ctx, time.Second*5)
defer cancel()
start = time.Now()
// Determine which slots we're missing
//
// Get list of confirmed blocks since the last request. The result
// won't contain skipped slots.
rangeStart := lastSlot + 1
rangeEnd := slot
rCtx, cancel = context.WithTimeout(ctx, rpcTimeout)
defer cancel()
start = time.Now()
slots, err := rpcClient.GetConfirmedBlocks(rCtx, rangeStart, &rangeEnd, commitment)
queryLatency.WithLabelValues("get_confirmed_blocks", string(commitment)).Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_confirmed_blocks_error").Inc()
errC <- err
return
}
// Get finalized accounts
fAccounts, err := rpcClient.GetProgramAccountsWithOpts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{
Commitment: rpc.CommitmentFinalized,
Filters: []rpc.RPCFilter{
{
Memcmp: &rpc.RPCFilterMemcmp{
Offset: 0, // Start of the account
Bytes: solana.Base58{'m', 's', 'g'}, // Prefix of the posted message accounts
},
},
{
Memcmp: &rpc.RPCFilterMemcmp{
Offset: 4, // Start of the ConsistencyLevel value
Bytes: solana.Base58{32}, // Only grab messages that require max confirmations
},
},
{
Memcmp: &rpc.RPCFilterMemcmp{
Offset: 5, // Offset of VaaTime
Bytes: solana.Base58{0, 0, 0, 0}, // This means this VAA hasn't been signed yet
},
},
},
})
queryLatency.WithLabelValues("get_program_accounts", "max").Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc()
errC <- err
return
logger.Info("fetched slots in range",
zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd),
zap.Duration("took", time.Since(start)),
zap.String("commitment", string(commitment)))
// Requesting each slot
for _, slot := range slots {
if slot <= lastSlot {
// Skip out-of-range result
// https://github.com/solana-labs/solana/issues/18946
continue
}
// Get confirmed accounts
cAccounts, err := rpcClient.GetProgramAccountsWithOpts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{
Commitment: rpc.CommitmentConfirmed,
Filters: []rpc.RPCFilter{
{
Memcmp: &rpc.RPCFilterMemcmp{
Offset: 0, // Start of the account
Bytes: solana.Base58{'m', 's', 'g'}, // Prefix of the posted message accounts
},
},
{
Memcmp: &rpc.RPCFilterMemcmp{
Offset: 4, // Start of the ConsistencyLevel value
Bytes: solana.Base58{1}, // Only grab messages that require the Confirmed level
},
},
{
Memcmp: &rpc.RPCFilterMemcmp{
Offset: 5, // Offset of VaaTime
Bytes: solana.Base58{0, 0, 0, 0}, // This means this VAA hasn't been signed yet
},
},
},
})
queryLatency.WithLabelValues("get_program_accounts", "single").Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc()
errC <- err
return
}
go s.fetchBlock(ctx, logger, commitment, rpcClient, slot)
}
// Merge accounts
accounts := append(fAccounts, cAccounts...)
logger.Info("fetched transfer proposals without VAA",
zap.Int("n", len(accounts)),
zap.Duration("took", time.Since(start)),
)
for _, acc := range accounts {
proposal, err := ParseTransferOutProposal(acc.Account.Data.GetBinary())
if err != nil {
solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
logger.Warn(
"failed to parse transfer proposal",
zap.Stringer("account", acc.Pubkey),
zap.Error(err),
)
continue
}
// VAA submitted
if proposal.VaaTime != 0 {
solanaAccountSkips.WithLabelValues("is_submitted_vaa").Inc()
continue
}
var txHash eth_common.Hash
copy(txHash[:], acc.Pubkey[:])
lock := &common.MessagePublication{
TxHash: txHash,
Timestamp: time.Unix(int64(proposal.SubmissionTime), 0),
Nonce: proposal.Nonce,
Sequence: proposal.Sequence,
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: proposal.EmitterAddress,
Payload: proposal.Payload,
ConsistencyLevel: proposal.ConsistencyLevel,
}
solanaMessagesConfirmed.Inc()
logger.Info("found message account without VAA", zap.Stringer("address", acc.Pubkey))
s.messageEvent <- lock
}
}()
lastSlot = slot
}
}
}()
@ -223,6 +156,201 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
}
}
func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, commitment rpc.CommitmentType, rpcClient *rpc.Client, slot uint64) {
logger.Debug("requesting block", zap.Uint64("slot", slot), zap.String("commitment", string(commitment)))
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
start := time.Now()
rewards := false
out, err := rpcClient.GetConfirmedBlockWithOpts(rCtx, slot, &rpc.GetConfirmedBlockOpts{
Encoding: "json",
TransactionDetails: "full",
Rewards: &rewards,
Commitment: commitment,
})
queryLatency.WithLabelValues("get_confirmed_block", string(commitment)).Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_confirmed_block_error").Inc()
logger.Error("failed to request block", zap.Error(err), zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)))
return
}
if out == nil {
logger.Error("nil response when requesting block", zap.Error(err), zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)))
return
}
logger.Info("fetched block",
zap.Uint64("slot", slot),
zap.Int("num_tx", len(out.Transactions)),
zap.Duration("took", time.Since(start)),
zap.String("commitment", string(commitment)))
OUTER:
for _, tx := range out.Transactions {
signature := tx.Transaction.Signatures[0]
var programIndex uint16
for n, key := range tx.Transaction.Message.AccountKeys {
if key.Equals(s.bridge) {
programIndex = uint16(n)
}
}
if programIndex == 0 {
continue
}
logger.Info("found Wormhole transaction",
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)))
// Find top-level instructions
for _, inst := range tx.Transaction.Message.Instructions {
if inst.ProgramIDIndex == programIndex {
// The second account in a well-formed Wormhole instruction is the
// VAA program account.
if len(inst.Accounts) != 9 {
logger.Error("malformed Wormhole instruction: wrong number of accounts",
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)))
continue OUTER
}
acc := tx.Transaction.Message.AccountKeys[inst.Accounts[1]]
go s.fetchMessageAccount(ctx, logger, acc, rpcClient, commitment, slot)
continue OUTER
}
}
// Call GetConfirmedTransaction to get at innerTransactions
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
start := time.Now()
tr, err := rpcClient.GetConfirmedTransaction(rCtx, signature)
queryLatency.WithLabelValues("get_confirmed_transaction", string(commitment)).Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_confirmed_transaction_error").Inc()
logger.Error("failed to request transaction",
zap.Error(err),
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.Stringer("signature", signature))
return
}
logger.Info("fetched transaction",
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.Stringer("signature", signature),
zap.Duration("took", time.Since(start)))
for _, inner := range tr.Meta.InnerInstructions {
for _, inst := range inner.Instructions {
if inst.ProgramIDIndex == programIndex {
if len(inst.Accounts) != 9 {
logger.Error("malformed Wormhole instruction: wrong number of accounts",
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)))
continue OUTER
}
acc := tx.Transaction.Message.AccountKeys[inst.Accounts[1]]
go s.fetchMessageAccount(ctx, logger, acc, rpcClient, commitment, slot)
continue OUTER
}
}
}
}
}
func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, rpcClient *rpc.Client, commitment rpc.CommitmentType, slot uint64) {
// Fetching account
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
start := time.Now()
info, err := rpcClient.GetAccountInfoWithOpts(rCtx, acc, &rpc.GetAccountInfoOpts{
Encoding: solana.EncodingBase64,
Commitment: commitment,
})
queryLatency.WithLabelValues("get_account_info", string(commitment)).Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_account_info_error").Inc()
logger.Error("failed to request account",
zap.Error(err),
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.Stringer("account", acc))
return
}
if !info.Value.Owner.Equals(s.bridge) {
solanaConnectionErrors.WithLabelValues("account_owner_mismatch").Inc()
logger.Error("account has invalid owner",
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.Stringer("account", acc),
zap.Stringer("unexpected_owner", info.Value.Owner))
return
}
data := info.Value.Data.GetBinary()
if string(data[:3]) != "msg" {
solanaConnectionErrors.WithLabelValues("bad_account_data").Inc()
logger.Error("account is not a message account",
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.Stringer("account", acc))
return
}
logger.Info("found valid VAA account",
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.Stringer("account", acc),
zap.Binary("data", data))
s.processMessageAccount(logger, data, acc)
}
func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey) {
proposal, err := ParseTransferOutProposal(data)
if err != nil {
solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
logger.Error(
"failed to parse transfer proposal",
zap.Stringer("account", acc),
zap.Binary("data", data),
zap.Error(err))
return
}
var txHash eth_common.Hash
copy(txHash[:], acc[:])
lock := &common.MessagePublication{
TxHash: txHash,
Timestamp: time.Unix(int64(proposal.SubmissionTime), 0),
Nonce: proposal.Nonce,
Sequence: proposal.Sequence,
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: proposal.EmitterAddress,
Payload: proposal.Payload,
ConsistencyLevel: proposal.ConsistencyLevel,
}
solanaMessagesConfirmed.Inc()
logger.Info("message observed",
zap.Stringer("account", acc))
s.messageEvent <- lock
}
type (
MessagePublicationAccount struct {
VaaVersion uint8

View File

@ -31,6 +31,22 @@ you on whether an event has actually been observed. The whole point of Wormhole
We strongly recommend running your own full nodes for both testnet and mainnet (where applicable)
so you can test changes for your mainnet full nodes and gain operational experience.
### Solana node requirements
Your Solana RPC node needs the following parameters enabled:
```
--enable-rpc-transaction-history
--enable-cpi-and-log-storage
```
`--enable-rpc-transaction-history` enables historic transactions to be retrieved via the *getConfirmedBlock* API, which is required for Wormhole to find transactions.
`--enable-cpi-and-log-storage` stores metadata about CPI calls.
Note that these indexes require extra disk space and may slow down catchup. The first startup after
adding these parameters will be slow since Solana needs to recreate all indexes.
### Ethereum node requirements
In order to observe events on the Ethereum chain, you need access to an Ethereum RPC endpoint. The most common