diff --git a/node/pkg/watchers/solana/ccq.go b/node/pkg/watchers/solana/ccq.go index 137c5f6c4..01e70ec8f 100644 --- a/node/pkg/watchers/solana/ccq.go +++ b/node/pkg/watchers/solana/ccq.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "strconv" + "strings" "time" "go.uber.org/zap" @@ -27,6 +28,12 @@ const ( // CCQ_FAST_RETRY_INTERVAL is how long we sleep between fast retry attempts. CCQ_FAST_RETRY_INTERVAL = 200 * time.Millisecond + + // CCQ_MAX_BLOCK_READ_ATTEMPTS the total number of times we will try to read the block when it returns "Block not available". + CCQ_MAX_BLOCK_READ_ATTEMPTS = 3 + + // CCQ_BLOCK_RETRY_DELAY is how long we sleep between attempts to read the block time. + CCQ_BLOCK_RETRY_DELAY = 250 * time.Millisecond ) // ccqStart starts up CCQ query processing. @@ -140,22 +147,44 @@ func (w *SolanaWatcher) ccqBaseHandleSolanaAccountQueryRequest( } // Read the block for this slot to get the block time. - maxSupportedTransactionVersion := uint64(0) - block, err := w.rpcClient.GetBlockWithOpts(rCtx, info.Context.Slot, &rpc.GetBlockOpts{ - Encoding: solana.EncodingBase64, - Commitment: params.Commitment, - TransactionDetails: rpc.TransactionDetailsNone, - MaxSupportedTransactionVersion: &maxSupportedTransactionVersion, - }) - if err != nil { - w.ccqLogger.Error(fmt.Sprintf("failed to read block time for %s query request", tag), - zap.String("requestId", requestId), - zap.Uint64("slotNumber", info.Context.Slot), - zap.Error(err), - ) + var block *rpc.GetBlockResult + var numBlockReadAttempts int + for { + maxSupportedTransactionVersion := uint64(0) + block, err = w.rpcClient.GetBlockWithOpts(rCtx, info.Context.Slot, &rpc.GetBlockOpts{ + Encoding: solana.EncodingBase64, + Commitment: params.Commitment, + TransactionDetails: rpc.TransactionDetailsNone, + MaxSupportedTransactionVersion: &maxSupportedTransactionVersion, + }) + if err == nil { + break + } - w.ccqSendErrorResponse(queryRequest, query.QueryRetryNeeded) - return + if !ccqIsBlockNotAvailable(err) { + w.ccqLogger.Error(fmt.Sprintf("failed to read block time for %s query request", tag), + zap.String("requestId", requestId), + zap.Uint64("slotNumber", info.Context.Slot), + zap.Error(err), + ) + + w.ccqSendErrorResponse(queryRequest, query.QueryRetryNeeded) + return + } + + numBlockReadAttempts += 1 + if numBlockReadAttempts >= CCQ_MAX_BLOCK_READ_ATTEMPTS { + w.ccqLogger.Error(fmt.Sprintf("repeatedly failed to read block time for %s query request, giving up", tag), + zap.String("requestId", requestId), + zap.Uint64("slotNumber", info.Context.Slot), + zap.Error(err), + ) + + w.ccqSendErrorResponse(queryRequest, query.QueryRetryNeeded) + return + } + + time.Sleep(CCQ_BLOCK_RETRY_DELAY) } if info == nil { @@ -539,3 +568,21 @@ func (w *SolanaWatcher) getMultipleAccountsWithOpts( } return } + +// ccqIsBlockNotAvailable parses an error to see if it is a "Block not available for slot" error. +func ccqIsBlockNotAvailable(err error) bool { + /* + A "Block not available for slot" error looks like this: + "(*jsonrpc.RPCError)(0xc0208a0270)({\n Code: (int) -32004,\n Message: (string) (len=38) \"Block not available for slot 282135928\",\n Data: (interface {}) \n})\n" + */ + var rpcErr *jsonrpc.RPCError + if !errors.As(err, &rpcErr) { + return false // Some other kind of error. + } + + if rpcErr.Code != -32004 { // Block not available for slot + return false // Some other kind of RPC error. + } + + return strings.Contains(rpcErr.Message, "Block not available for slot") +} diff --git a/node/pkg/watchers/solana/ccq_test.go b/node/pkg/watchers/solana/ccq_test.go index 9c0931b3f..cace58d2d 100644 --- a/node/pkg/watchers/solana/ccq_test.go +++ b/node/pkg/watchers/solana/ccq_test.go @@ -101,3 +101,37 @@ func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) { require.True(t, isMinContext) assert.Equal(t, uint64(0), currentSlot) } + +func TestCcqIsBlockNotAvailableSuccess(t *testing.T) { + myErr := &jsonrpc.RPCError{ + Code: -32004, + Message: "Block not available for slot 282135928", + Data: nil, + } + + assert.True(t, ccqIsBlockNotAvailable(error(myErr))) +} + +func TestCcqIsBlockNotAvailableWrongErrorNumberFailure(t *testing.T) { + myErr := &jsonrpc.RPCError{ + Code: -32016, + Message: "Minimum context slot has not been reached", + Data: map[string]interface{}{ + "contextSlot": json.Number("13526"), + }, + } + + assert.False(t, ccqIsBlockNotAvailable(error(myErr))) +} + +func TestCcqIsBlockNotAvailableWrongTextFailure(t *testing.T) { + myErr := &jsonrpc.RPCError{ + Code: -32004, + Message: "Minimum context slot has not been reached", + Data: map[string]interface{}{ + "contextSlot": json.Number("13526"), + }, + } + + assert.False(t, ccqIsBlockNotAvailable(error(myErr))) +}