Node/CCQ: Solana should retry block read (#4068)
* Node/CCQ: Solana should retry block read * Code review rework
This commit is contained in:
parent
b150623df4
commit
cb94143e3f
|
@ -7,6 +7,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -27,6 +28,12 @@ const (
|
|||
|
||||
// CCQ_FAST_RETRY_INTERVAL is how long we sleep between fast retry attempts.
|
||||
CCQ_FAST_RETRY_INTERVAL = 200 * time.Millisecond
|
||||
|
||||
// CCQ_MAX_BLOCK_READ_ATTEMPTS the total number of times we will try to read the block when it returns "Block not available".
|
||||
CCQ_MAX_BLOCK_READ_ATTEMPTS = 3
|
||||
|
||||
// CCQ_BLOCK_RETRY_DELAY is how long we sleep between attempts to read the block time.
|
||||
CCQ_BLOCK_RETRY_DELAY = 250 * time.Millisecond
|
||||
)
|
||||
|
||||
// ccqStart starts up CCQ query processing.
|
||||
|
@ -140,22 +147,44 @@ func (w *SolanaWatcher) ccqBaseHandleSolanaAccountQueryRequest(
|
|||
}
|
||||
|
||||
// Read the block for this slot to get the block time.
|
||||
maxSupportedTransactionVersion := uint64(0)
|
||||
block, err := w.rpcClient.GetBlockWithOpts(rCtx, info.Context.Slot, &rpc.GetBlockOpts{
|
||||
Encoding: solana.EncodingBase64,
|
||||
Commitment: params.Commitment,
|
||||
TransactionDetails: rpc.TransactionDetailsNone,
|
||||
MaxSupportedTransactionVersion: &maxSupportedTransactionVersion,
|
||||
})
|
||||
if err != nil {
|
||||
w.ccqLogger.Error(fmt.Sprintf("failed to read block time for %s query request", tag),
|
||||
zap.String("requestId", requestId),
|
||||
zap.Uint64("slotNumber", info.Context.Slot),
|
||||
zap.Error(err),
|
||||
)
|
||||
var block *rpc.GetBlockResult
|
||||
var numBlockReadAttempts int
|
||||
for {
|
||||
maxSupportedTransactionVersion := uint64(0)
|
||||
block, err = w.rpcClient.GetBlockWithOpts(rCtx, info.Context.Slot, &rpc.GetBlockOpts{
|
||||
Encoding: solana.EncodingBase64,
|
||||
Commitment: params.Commitment,
|
||||
TransactionDetails: rpc.TransactionDetailsNone,
|
||||
MaxSupportedTransactionVersion: &maxSupportedTransactionVersion,
|
||||
})
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
w.ccqSendErrorResponse(queryRequest, query.QueryRetryNeeded)
|
||||
return
|
||||
if !ccqIsBlockNotAvailable(err) {
|
||||
w.ccqLogger.Error(fmt.Sprintf("failed to read block time for %s query request", tag),
|
||||
zap.String("requestId", requestId),
|
||||
zap.Uint64("slotNumber", info.Context.Slot),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
w.ccqSendErrorResponse(queryRequest, query.QueryRetryNeeded)
|
||||
return
|
||||
}
|
||||
|
||||
numBlockReadAttempts += 1
|
||||
if numBlockReadAttempts >= CCQ_MAX_BLOCK_READ_ATTEMPTS {
|
||||
w.ccqLogger.Error(fmt.Sprintf("repeatedly failed to read block time for %s query request, giving up", tag),
|
||||
zap.String("requestId", requestId),
|
||||
zap.Uint64("slotNumber", info.Context.Slot),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
w.ccqSendErrorResponse(queryRequest, query.QueryRetryNeeded)
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(CCQ_BLOCK_RETRY_DELAY)
|
||||
}
|
||||
|
||||
if info == nil {
|
||||
|
@ -539,3 +568,21 @@ func (w *SolanaWatcher) getMultipleAccountsWithOpts(
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ccqIsBlockNotAvailable parses an error to see if it is a "Block not available for slot" error.
|
||||
func ccqIsBlockNotAvailable(err error) bool {
|
||||
/*
|
||||
A "Block not available for slot" error looks like this:
|
||||
"(*jsonrpc.RPCError)(0xc0208a0270)({\n Code: (int) -32004,\n Message: (string) (len=38) \"Block not available for slot 282135928\",\n Data: (interface {}) <nil>\n})\n"
|
||||
*/
|
||||
var rpcErr *jsonrpc.RPCError
|
||||
if !errors.As(err, &rpcErr) {
|
||||
return false // Some other kind of error.
|
||||
}
|
||||
|
||||
if rpcErr.Code != -32004 { // Block not available for slot
|
||||
return false // Some other kind of RPC error.
|
||||
}
|
||||
|
||||
return strings.Contains(rpcErr.Message, "Block not available for slot")
|
||||
}
|
||||
|
|
|
@ -101,3 +101,37 @@ func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) {
|
|||
require.True(t, isMinContext)
|
||||
assert.Equal(t, uint64(0), currentSlot)
|
||||
}
|
||||
|
||||
func TestCcqIsBlockNotAvailableSuccess(t *testing.T) {
|
||||
myErr := &jsonrpc.RPCError{
|
||||
Code: -32004,
|
||||
Message: "Block not available for slot 282135928",
|
||||
Data: nil,
|
||||
}
|
||||
|
||||
assert.True(t, ccqIsBlockNotAvailable(error(myErr)))
|
||||
}
|
||||
|
||||
func TestCcqIsBlockNotAvailableWrongErrorNumberFailure(t *testing.T) {
|
||||
myErr := &jsonrpc.RPCError{
|
||||
Code: -32016,
|
||||
Message: "Minimum context slot has not been reached",
|
||||
Data: map[string]interface{}{
|
||||
"contextSlot": json.Number("13526"),
|
||||
},
|
||||
}
|
||||
|
||||
assert.False(t, ccqIsBlockNotAvailable(error(myErr)))
|
||||
}
|
||||
|
||||
func TestCcqIsBlockNotAvailableWrongTextFailure(t *testing.T) {
|
||||
myErr := &jsonrpc.RPCError{
|
||||
Code: -32004,
|
||||
Message: "Minimum context slot has not been reached",
|
||||
Data: map[string]interface{}{
|
||||
"contextSlot": json.Number("13526"),
|
||||
},
|
||||
}
|
||||
|
||||
assert.False(t, ccqIsBlockNotAvailable(error(myErr)))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue