node/pkg/solana: retry failed requests

Change-Id: I4ee6e0c16c3c6f392ad1d6dbf4b20bdbe9c70f47
This commit is contained in:
Leo 2021-09-18 10:46:48 +02:00
parent 305fcdc393
commit c0bee5e5f0
1 changed files with 31 additions and 5 deletions

View File

@ -60,6 +60,9 @@ var (
const rpcTimeout = time.Second * 5
// Maximum retries for Solana fetching
const maxRetries = 5
type ConsistencyLevel uint8
// Mappings from consistency levels constants to commitment level.
@ -185,7 +188,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
continue
}
go s.fetchBlock(ctx, slot)
go s.retryFetchBlock(ctx, slot, 0)
}
lastSlot = slot
@ -201,7 +204,28 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
}
}
func (s *SolanaWatcher) fetchBlock(ctx context.Context, slot uint64) {
func (s *SolanaWatcher) retryFetchBlock(ctx context.Context, slot uint64, retry uint) {
ok := s.fetchBlock(ctx, slot)
if !ok {
if retry >= maxRetries {
s.logger.Error("max retries for block",
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)),
zap.Uint("retry", retry))
return
}
s.logger.Info("retrying block",
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)),
zap.Uint("retry", retry))
go s.retryFetchBlock(ctx, slot, retry + 1)
}
}
func (s *SolanaWatcher) fetchBlock(ctx context.Context, slot uint64) bool {
s.logger.Debug("requesting block", zap.Uint64("slot", slot), zap.String("commitment", string(s.commitment)))
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel()
@ -220,7 +244,7 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, slot uint64) {
solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_block_error").Inc()
s.logger.Error("failed to request block", zap.Error(err), zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
return
return false
}
if out == nil {
@ -228,7 +252,7 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, slot uint64) {
s.logger.Error("nil response when requesting block", zap.Error(err), zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
return
return false
}
s.logger.Info("fetched block",
@ -289,7 +313,7 @@ OUTER:
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)),
zap.Stringer("signature", signature))
return
return false
}
s.logger.Info("fetched transaction",
@ -311,6 +335,8 @@ OUTER:
}
}
}
return true
}
func (s *SolanaWatcher) processInstruction(ctx context.Context, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx rpc.TransactionWithMeta) (bool, error) {