node/pkg/solana: retry account fetches

Change-Id: I310a94a065e883b1a0e200dbd3321d2ec2e03fc0
This commit is contained in:
Leo 2021-09-30 21:47:55 +02:00 committed by Leopold Schabel
parent 2022b55fd4
commit ef34c30049
1 changed files with 33 additions and 6 deletions

View File

@ -61,6 +61,7 @@ const rpcTimeout = time.Second * 5
// Maximum retries for Solana fetching
const maxRetries = 5
const retryDelay = 5 * time.Second
type ConsistencyLevel uint8
@ -236,7 +237,7 @@ func (s *SolanaWatcher) retryFetchBlock(ctx context.Context, logger *zap.Logger,
return
}
time.Sleep(5 * time.Second)
time.Sleep(retryDelay)
logger.Info("retrying block",
zap.Uint64("slot", slot),
@ -394,12 +395,37 @@ func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logg
// The second account in a well-formed Wormhole instruction is the VAA program account.
acc := tx.Transaction.Message.AccountKeys[inst.Accounts[1]]
go s.fetchMessageAccount(ctx, logger, acc, slot)
go s.retryFetchMessageAccount(ctx, logger, acc, slot, 0)
return true, nil
}
func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64) {
func (s *SolanaWatcher) retryFetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64, retry uint) {
retryable := s.fetchMessageAccount(ctx, logger, acc, slot)
if retryable {
if retry >= maxRetries {
logger.Error("max retries for account",
zap.Uint64("slot", slot),
zap.Stringer("account", acc),
zap.String("commitment", string(s.commitment)),
zap.Uint("retry", retry))
return
}
time.Sleep(retryDelay)
logger.Info("retrying account",
zap.Uint64("slot", slot),
zap.Stringer("account", acc),
zap.String("commitment", string(s.commitment)),
zap.Uint("retry", retry))
go s.retryFetchMessageAccount(ctx, logger, acc, slot, retry+1)
}
}
func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64) (retryable bool) {
// Fetching account
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
@ -417,7 +443,7 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)),
zap.Stringer("account", acc))
return
return true
}
if !info.Value.Owner.Equals(s.contract) {
@ -428,7 +454,7 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log
zap.String("commitment", string(s.commitment)),
zap.Stringer("account", acc),
zap.Stringer("unexpected_owner", info.Value.Owner))
return
return false
}
data := info.Value.Data.GetBinary()
@ -439,7 +465,7 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)),
zap.Stringer("account", acc))
return
return false
}
logger.Info("found valid VAA account",
@ -449,6 +475,7 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log
zap.Binary("data", data))
s.processMessageAccount(logger, data, acc)
return false
}
func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey) {