node/pkg/solana: simplify client method signatures

Change-Id: Ic317c78d7415dc11baf39d696ebdd57336b219bc
This commit is contained in:
Leo 2021-07-29 18:07:53 +02:00 committed by Leopold Schabel
parent 2d9ae8e860
commit d37375e9a3
1 changed files with 55 additions and 53 deletions

View File

@ -26,6 +26,8 @@ type SolanaWatcher struct {
rpcUrl string
commitment rpc.CommitmentType
messageEvent chan *common.MessagePublication
logger *zap.Logger
rpcClient *rpc.Client
}
var (
@ -100,6 +102,7 @@ func NewSolanaWatcher(
wsUrl: wsUrl, rpcUrl: rpcUrl,
messageEvent: messageEvents,
commitment: commitment,
rpcClient: rpc.New(rpcUrl),
}
}
@ -110,8 +113,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
BridgeAddress: bridgeAddr,
})
rpcClient := rpc.New(s.rpcUrl)
logger := supervisor.Logger(ctx)
s.logger = supervisor.Logger(ctx)
errC := make(chan error)
var lastSlot uint64
@ -128,7 +130,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
start := time.Now()
slot, err := rpcClient.GetSlot(rCtx, s.commitment)
slot, err := s.rpcClient.GetSlot(rCtx, s.commitment)
queryLatency.WithLabelValues("get_slot", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_slot_error").Inc()
@ -144,7 +146,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
Height: int64(slot),
BridgeAddress: bridgeAddr,
})
logger.Info("fetched current Solana height",
s.logger.Info("fetched current Solana height",
zap.String("commitment", string(s.commitment)),
zap.Uint64("slot", slot),
zap.Uint64("lastSlot", lastSlot),
@ -160,7 +162,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
rCtx, cancel = context.WithTimeout(ctx, rpcTimeout)
defer cancel()
start = time.Now()
slots, err := rpcClient.GetConfirmedBlocks(rCtx, rangeStart, &rangeEnd, s.commitment)
slots, err := s.rpcClient.GetConfirmedBlocks(rCtx, rangeStart, &rangeEnd, s.commitment)
queryLatency.WithLabelValues("get_confirmed_blocks", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_blocks_error").Inc()
@ -168,7 +170,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
return
}
logger.Info("fetched slots in range",
s.logger.Info("fetched slots in range",
zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd),
zap.Duration("took", time.Since(start)),
zap.String("commitment", string(s.commitment)))
@ -181,7 +183,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
continue
}
go s.fetchBlock(ctx, logger, s.commitment, rpcClient, slot)
go s.fetchBlock(ctx, slot)
}
lastSlot = slot
@ -197,38 +199,38 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
}
}
func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, commitment rpc.CommitmentType, rpcClient *rpc.Client, slot uint64) {
logger.Debug("requesting block", zap.Uint64("slot", slot), zap.String("commitment", string(commitment)))
func (s *SolanaWatcher) fetchBlock(ctx context.Context, slot uint64) {
s.logger.Debug("requesting block", zap.Uint64("slot", slot), zap.String("commitment", string(s.commitment)))
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
start := time.Now()
rewards := false
out, err := rpcClient.GetConfirmedBlockWithOpts(rCtx, slot, &rpc.GetConfirmedBlockOpts{
out, err := s.rpcClient.GetConfirmedBlockWithOpts(rCtx, slot, &rpc.GetConfirmedBlockOpts{
Encoding: "json",
TransactionDetails: "full",
Rewards: &rewards,
Commitment: commitment,
Commitment: s.commitment,
})
queryLatency.WithLabelValues("get_confirmed_block", string(commitment)).Observe(time.Since(start).Seconds())
queryLatency.WithLabelValues("get_confirmed_block", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_confirmed_block_error").Inc()
logger.Error("failed to request block", zap.Error(err), zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)))
s.logger.Error("failed to request block", zap.Error(err), zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
return
}
if out == nil {
logger.Error("nil response when requesting block", zap.Error(err), zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)))
s.logger.Error("nil response when requesting block", zap.Error(err), zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
return
}
logger.Info("fetched block",
s.logger.Info("fetched block",
zap.Uint64("slot", slot),
zap.Int("num_tx", len(out.Transactions)),
zap.Duration("took", time.Since(start)),
zap.String("commitment", string(commitment)))
zap.String("commitment", string(s.commitment)))
OUTER:
for _, tx := range out.Transactions {
@ -243,20 +245,20 @@ OUTER:
continue
}
logger.Info("found Wormhole transaction",
s.logger.Info("found Wormhole transaction",
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)))
zap.String("commitment", string(s.commitment)))
// Find top-level instructions
for _, inst := range tx.Transaction.Message.Instructions {
found, err := s.processInstruction(ctx, logger, commitment, rpcClient, slot, inst, programIndex, tx)
found, err := s.processInstruction(ctx, slot, inst, programIndex, tx)
if err != nil {
logger.Error("malformed Wormhole instruction",
s.logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.String("commitment", string(s.commitment)),
zap.Binary("data", inst.Data))
continue OUTER
}
@ -269,43 +271,43 @@ OUTER:
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
start := time.Now()
tr, err := rpcClient.GetConfirmedTransactionWithOpts(rCtx, signature, &rpc.GetTransactionOpts{
tr, err := s.rpcClient.GetConfirmedTransactionWithOpts(rCtx, signature, &rpc.GetTransactionOpts{
Encoding: "json",
Commitment: commitment,
Commitment: s.commitment,
})
queryLatency.WithLabelValues("get_confirmed_transaction", string(commitment)).Observe(time.Since(start).Seconds())
queryLatency.WithLabelValues("get_confirmed_transaction", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_confirmed_transaction_error").Inc()
logger.Error("failed to request transaction",
s.logger.Error("failed to request transaction",
zap.Error(err),
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.String("commitment", string(s.commitment)),
zap.Stringer("signature", signature))
return
}
logger.Info("fetched transaction",
s.logger.Info("fetched transaction",
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.String("commitment", string(s.commitment)),
zap.Stringer("signature", signature),
zap.Duration("took", time.Since(start)))
for _, inner := range tr.Meta.InnerInstructions {
for _, inst := range inner.Instructions {
_, err := s.processInstruction(ctx, logger, commitment, rpcClient, slot, inst, programIndex, tx)
_, err := s.processInstruction(ctx, slot, inst, programIndex, tx)
if err != nil {
logger.Error("malformed Wormhole instruction",
s.logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)))
zap.String("commitment", string(s.commitment)))
}
}
}
}
}
func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, commitment rpc.CommitmentType, rpcClient *rpc.Client, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx rpc.TransactionWithMeta) (bool, error) {
func (s *SolanaWatcher) processInstruction(ctx context.Context, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx rpc.TransactionWithMeta) (bool, error) {
if inst.ProgramIDIndex != programIndex {
return false, nil
}
@ -325,49 +327,49 @@ func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logg
return false, fmt.Errorf("failed to deserialize instruction data: %w", err)
}
logger.Info("post message data", zap.Any("deserialized_data", data))
s.logger.Info("post message data", zap.Any("deserialized_data", data))
level, err := data.ConsistencyLevel.Commitment()
if err != nil {
return false, fmt.Errorf("failed to determine commitment: %w", err)
}
if level != commitment {
if level != s.commitment {
return true, nil
}
// The second account in a well-formed Wormhole instruction is the VAA program account.
acc := tx.Transaction.Message.AccountKeys[inst.Accounts[1]]
go s.fetchMessageAccount(ctx, logger, acc, rpcClient, commitment, slot)
go s.fetchMessageAccount(ctx, acc, slot)
return true, nil
}
func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, rpcClient *rpc.Client, commitment rpc.CommitmentType, slot uint64) {
func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, acc solana.PublicKey, slot uint64) {
// Fetching account
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
start := time.Now()
info, err := rpcClient.GetAccountInfoWithOpts(rCtx, acc, &rpc.GetAccountInfoOpts{
info, err := s.rpcClient.GetAccountInfoWithOpts(rCtx, acc, &rpc.GetAccountInfoOpts{
Encoding: solana.EncodingBase64,
Commitment: commitment,
Commitment: s.commitment,
})
queryLatency.WithLabelValues("get_account_info", string(commitment)).Observe(time.Since(start).Seconds())
queryLatency.WithLabelValues("get_account_info", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_account_info_error").Inc()
logger.Error("failed to request account",
s.logger.Error("failed to request account",
zap.Error(err),
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.String("commitment", string(s.commitment)),
zap.Stringer("account", acc))
return
}
if !info.Value.Owner.Equals(s.bridge) {
solanaConnectionErrors.WithLabelValues("account_owner_mismatch").Inc()
logger.Error("account has invalid owner",
s.logger.Error("account has invalid owner",
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.String("commitment", string(s.commitment)),
zap.Stringer("account", acc),
zap.Stringer("unexpected_owner", info.Value.Owner))
return
@ -376,27 +378,27 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log
data := info.Value.Data.GetBinary()
if string(data[:3]) != "msg" {
solanaConnectionErrors.WithLabelValues("bad_account_data").Inc()
logger.Error("account is not a message account",
s.logger.Error("account is not a message account",
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.String("commitment", string(s.commitment)),
zap.Stringer("account", acc))
return
}
logger.Info("found valid VAA account",
s.logger.Info("found valid VAA account",
zap.Uint64("slot", slot),
zap.String("commitment", string(commitment)),
zap.String("commitment", string(s.commitment)),
zap.Stringer("account", acc),
zap.Binary("data", data))
s.processMessageAccount(logger, data, acc)
s.processMessageAccount(data, acc)
}
func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey) {
func (s *SolanaWatcher) processMessageAccount(data []byte, acc solana.PublicKey) {
proposal, err := ParseTransferOutProposal(data)
if err != nil {
solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
logger.Error(
s.logger.Error(
"failed to parse transfer proposal",
zap.Stringer("account", acc),
zap.Binary("data", data),
@ -420,7 +422,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
solanaMessagesConfirmed.Inc()
logger.Info("message observed",
s.logger.Info("message observed",
zap.Stringer("account", acc),
zap.Time("timestamp", observation.Timestamp),
zap.Uint32("nonce", observation.Nonce),