diff --git a/node/pkg/query/query.go b/node/pkg/query/query.go index 1cd41a7b5..9c1b1bc64 100644 --- a/node/pkg/query/query.go +++ b/node/pkg/query/query.go @@ -22,9 +22,12 @@ const ( // RequestTimeout indicates how long before a request is considered to have timed out. RequestTimeout = 1 * time.Minute - // RetryInterval specifies how long we will wait between retry intervals. This is the interval of our ticker. + // RetryInterval specifies how long we will wait between retry intervals. RetryInterval = 10 * time.Second + // AuditInterval specifies how often to audit the list of pending queries. + AuditInterval = time.Second + // SignedQueryRequestChannelSize is the buffer size of the incoming query request channel. SignedQueryRequestChannelSize = 50 @@ -105,7 +108,7 @@ func (qh *QueryHandler) Start(ctx context.Context) error { // handleQueryRequests multiplexes observation requests to the appropriate chain func (qh *QueryHandler) handleQueryRequests(ctx context.Context) error { - return handleQueryRequestsImpl(ctx, qh.logger, qh.signedQueryReqC, qh.chainQueryReqC, qh.allowedRequestors, qh.queryResponseReadC, qh.queryResponseWriteC, qh.env, RequestTimeout, RetryInterval) + return handleQueryRequestsImpl(ctx, qh.logger, qh.signedQueryReqC, qh.chainQueryReqC, qh.allowedRequestors, qh.queryResponseReadC, qh.queryResponseWriteC, qh.env, RequestTimeout, RetryInterval, AuditInterval) } // handleQueryRequestsImpl allows instantiating the handler in the test environment with shorter timeout and retry parameters. @@ -120,6 +123,7 @@ func handleQueryRequestsImpl( env common.Environment, requestTimeoutImpl time.Duration, retryIntervalImpl time.Duration, + auditIntervalImpl time.Duration, ) error { qLogger := logger.With(zap.String("component", "ccqhandler")) qLogger.Info("cross chain queries are enabled", zap.Any("allowedRequestors", allowedRequestors), zap.String("env", string(env))) @@ -165,7 +169,7 @@ func handleQueryRequestsImpl( } } - ticker := time.NewTicker(retryIntervalImpl) + ticker := time.NewTicker(auditIntervalImpl) defer ticker.Stop() for { diff --git a/node/pkg/query/query_test.go b/node/pkg/query/query_test.go index 55b939012..ce7051d05 100644 --- a/node/pkg/query/query_test.go +++ b/node/pkg/query/query_test.go @@ -36,6 +36,7 @@ const ( // Speed things up for testing purposes. requestTimeoutForTest = 100 * time.Millisecond retryIntervalForTest = 10 * time.Millisecond + auditIntervalForTest = 10 * time.Millisecond pollIntervalForTest = 5 * time.Millisecond ) @@ -436,7 +437,7 @@ func createQueryHandlerForTestWithoutPublisher(t *testing.T, ctx context.Context go func() { err := handleQueryRequestsImpl(ctx, logger, md.signedQueryReqReadC, md.chainQueryReqC, ccqAllowedRequestersList, - md.queryResponseReadC, md.queryResponsePublicationWriteC, common.GoTest, requestTimeoutForTest, retryIntervalForTest) + md.queryResponseReadC, md.queryResponsePublicationWriteC, common.GoTest, requestTimeoutForTest, retryIntervalForTest, auditIntervalForTest) assert.NoError(t, err) }() diff --git a/node/pkg/watchers/solana/ccq.go b/node/pkg/watchers/solana/ccq.go index 6cde4f449..6387d314c 100644 --- a/node/pkg/watchers/solana/ccq.go +++ b/node/pkg/watchers/solana/ccq.go @@ -3,7 +3,10 @@ package solana import ( "context" "encoding/hex" + "encoding/json" "errors" + "fmt" + "strconv" "time" "go.uber.org/zap" @@ -11,6 +14,19 @@ import ( "github.com/certusone/wormhole/node/pkg/query" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" + "github.com/gagliardetto/solana-go/rpc/jsonrpc" +) + +const ( + // CCQ_RETRY_SLOP gets subtracted from the query retry interval to determine how long we can continue fast retries. + // We don't want the fast retry time to be too close to the query retry interval. + CCQ_RETRY_SLOP = 250 * time.Millisecond + + // CCQ_ESTIMATED_SLOT_TIME is the estimated Solana slot time used for estimating how long until the MinContextSlot will be reached. + CCQ_ESTIMATED_SLOT_TIME = 400 * time.Millisecond + + // CCQ_FAST_RETRY_INTERVAL is how long we sleep between fast retry attempts. + CCQ_FAST_RETRY_INTERVAL = 200 * time.Millisecond ) // ccqSendQueryResponse sends a response back to the query handler. In the case of an error, the response parameter may be nil. @@ -37,7 +53,8 @@ func (w *SolanaWatcher) ccqHandleQuery(ctx context.Context, queryRequest *query. switch req := queryRequest.Request.Query.(type) { case *query.SolanaAccountQueryRequest: - w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req) + giveUpTime := start.Add(query.RetryInterval).Add(-CCQ_RETRY_SLOP) + w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, false) default: w.ccqLogger.Warn("received unsupported request type", zap.Uint8("payload", uint8(queryRequest.Request.Query.Type())), @@ -49,15 +66,17 @@ func (w *SolanaWatcher) ccqHandleQuery(ctx context.Context, queryRequest *query. } // ccqHandleSolanaAccountQueryRequest is the query handler for a sol_account request. -func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest) { +func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, giveUpTime time.Time, isRetry bool) { requestId := "sol_account:" + queryRequest.ID() - w.ccqLogger.Info("received a sol_account query", - zap.Uint64("minContextSlot", req.MinContextSlot), - zap.Uint64("dataSliceOffset", req.DataSliceOffset), - zap.Uint64("dataSliceLength", req.DataSliceLength), - zap.Int("numAccounts", len(req.Accounts)), - zap.String("requestId", requestId), - ) + if !isRetry { + w.ccqLogger.Info("received a sol_account query", + zap.Uint64("minContextSlot", req.MinContextSlot), + zap.Uint64("dataSliceOffset", req.DataSliceOffset), + zap.Uint64("dataSliceLength", req.DataSliceLength), + zap.Int("numAccounts", len(req.Accounts)), + zap.String("requestId", requestId), + ) + } rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) defer cancel() @@ -88,6 +107,10 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context, // Read the accounts. info, err := w.getMultipleAccountsWithOpts(rCtx, accounts, ¶ms) if err != nil { + if w.ccqCheckForMinSlotContext(ctx, queryRequest, req, requestId, err, giveUpTime, !isRetry) { + // Return without posting a response because a go routine was created to handle it. + return + } w.ccqLogger.Error("read failed for sol_account query request", zap.String("requestId", requestId), zap.Any("accounts", accounts), @@ -182,6 +205,116 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context, w.ccqSendQueryResponse(queryRequest, query.QuerySuccess, resp) } +// ccqCheckForMinSlotContext checks to see if the returned error was due to the min context slot not being reached. +// If so, and the estimated time in the future is not too great, it kicks off a go routine to sleep and do a retry. +// In that case, it returns true, telling the caller that it is handling the request so it should not post a response. +// Note that the go routine only does a single retry, but may result in another go routine being initiated to do another, and so on. +func (w *SolanaWatcher) ccqCheckForMinSlotContext( + ctx context.Context, + queryRequest *query.PerChainQueryInternal, + req *query.SolanaAccountQueryRequest, + requestId string, + err error, + giveUpTime time.Time, + log bool, +) bool { + if req.MinContextSlot == 0 { + return false + } + + if time.Now().After(giveUpTime) { + w.ccqLogger.Info("giving up on fast retry", zap.String("requestId", requestId)) + return false + } + + isMinContext, currentSlot, err := ccqIsMinContextSlotError(err) + if err != nil { + w.ccqLogger.Error("failed to parse for min context slot error", zap.Error(err)) + return false + } + + if !isMinContext { + return false + } + + // Estimate how far in the future the requested slot is, using our estimated slot time. + futureSlotEstimate := time.Duration(req.MinContextSlot-currentSlot) * CCQ_ESTIMATED_SLOT_TIME + + // If the requested slot is more than ten seconds in the future, use the regular retry mechanism. + if futureSlotEstimate > query.RetryInterval { + w.ccqLogger.Info("minimum context slot is too far in the future, requesting slow retry", + zap.String("requestId", requestId), + zap.Uint64("currentSlot", currentSlot), + zap.Uint64("minContextSlot", req.MinContextSlot), + zap.Stringer("futureSlotEstimate", futureSlotEstimate), + ) + return false + } + + // Kick off the retry after a short delay. + go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, giveUpTime, log) + return true +} + +// ccqSleepAndRetryAccountQuery does a short sleep and then initiates a retry. +func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, requestId string, currentSlot uint64, giveUpTime time.Time, log bool) { + if log { + w.ccqLogger.Info("minimum context slot has not been reached, will retry shortly", + zap.String("requestId", requestId), + zap.Uint64("currentSlot", currentSlot), + zap.Uint64("minContextSlot", req.MinContextSlot), + zap.Stringer("retryInterval", CCQ_FAST_RETRY_INTERVAL), + ) + } + + time.Sleep(CCQ_FAST_RETRY_INTERVAL) + + if log { + w.ccqLogger.Info("initiating fast retry", zap.String("requestId", requestId)) + } + + w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, true) +} + +// ccqIsMinContextSlotError parses an error to see if it is "Minimum context slot has not been reached". If it is, it returns the slot number +func ccqIsMinContextSlotError(err error) (bool, uint64, error) { + /* + A MinContextSlot error looks like this (and contains the context slot): + "(*jsonrpc.RPCError)(0xc00b3881b0)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (map[string]interface {}) (len=1) {\n (string) (len=11) \"contextSlot\": (json.Number) (len=4) \"3630\"\n }\n})\n" + */ + var rpcErr *jsonrpc.RPCError + if !errors.As(err, &rpcErr) { + return false, 0, nil // Some other kind of error. That's okay. + } + + if rpcErr.Code != -32016 { // Minimum context slot has not been reached + return false, 0, nil // Some other kind of RPC error. That's okay. + } + + // From here on down, any error is bad because the MinContextSlot error is not in the expected format. + m, ok := rpcErr.Data.(map[string]interface{}) + if !ok { + return false, 0, fmt.Errorf("failed to extract data from min context slot error") + } + + contextSlot, ok := m["contextSlot"] + if !ok { + return false, 0, fmt.Errorf(`min context slot error does not contain "contextSlot"`) + } + + currentSlotAsJson, ok := contextSlot.(json.Number) + if !ok { + return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not json.Number`) + } + + currentSlot, typeErr := strconv.ParseUint(currentSlotAsJson.String(), 10, 64) + if typeErr != nil { + return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not uint64: %w`, err) + } + + return true, currentSlot, nil +} + type M map[string]interface{} // getMultipleAccountsWithOpts is a work-around for the fact that the library call doesn't honor MinContextSlot. diff --git a/node/pkg/watchers/solana/ccq_test.go b/node/pkg/watchers/solana/ccq_test.go new file mode 100644 index 000000000..9f6185414 --- /dev/null +++ b/node/pkg/watchers/solana/ccq_test.go @@ -0,0 +1,103 @@ +package solana + +import ( + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/certusone/wormhole/node/pkg/query" + "github.com/gagliardetto/solana-go/rpc/jsonrpc" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRetrySlopIsValid(t *testing.T) { + assert.Less(t, CCQ_RETRY_SLOP, query.RetryInterval) +} + +func TestCcqIsMinContextSlotErrorSuccess(t *testing.T) { + myErr := &jsonrpc.RPCError{ + Code: -32016, + Message: "Minimum context slot has not been reached", + Data: map[string]interface{}{ + "contextSlot": json.Number("13526"), + }, + } + + isMinContext, currentSlot, err := ccqIsMinContextSlotError(error(myErr)) + require.NoError(t, err) + require.True(t, isMinContext) + assert.Equal(t, uint64(13526), currentSlot) +} + +func TestCcqIsMinContextSlotErrorSomeOtherError(t *testing.T) { + myErr := fmt.Errorf("Some other error") + isMinContext, _, err := ccqIsMinContextSlotError(error(myErr)) + require.NoError(t, err) + require.False(t, isMinContext) +} + +func TestCcqIsMinContextSlotErrorSomeOtherRPCError(t *testing.T) { + myErr := &jsonrpc.RPCError{ + Code: -32000, + Message: "Some other RPC error", + Data: map[string]interface{}{ + "contextSlot": json.Number("13526"), + }, + } + + isMinContext, _, err := ccqIsMinContextSlotError(error(myErr)) + require.NoError(t, err) + require.False(t, isMinContext) +} + +func TestCcqIsMinContextSlotErrorNoData(t *testing.T) { + myErr := &jsonrpc.RPCError{ + Code: -32016, + Message: "Minimum context slot has not been reached", + } + + _, _, err := ccqIsMinContextSlotError(error(myErr)) + assert.EqualError(t, err, `failed to extract data from min context slot error`) +} + +func TestCcqIsMinContextSlotErrorContextSlotMissing(t *testing.T) { + myErr := &jsonrpc.RPCError{ + Code: -32016, + Message: "Minimum context slot has not been reached", + Data: map[string]interface{}{ + "someOtherField": json.Number("13526"), + }, + } + + _, _, err := ccqIsMinContextSlotError(error(myErr)) + assert.EqualError(t, err, `min context slot error does not contain "contextSlot"`) +} + +func TestCcqIsMinContextSlotErrorContextSlotIsNotAJsonNumber(t *testing.T) { + myErr := &jsonrpc.RPCError{ + Code: -32016, + Message: "Minimum context slot has not been reached", + Data: map[string]interface{}{ + "contextSlot": "13526", + }, + } + + _, _, err := ccqIsMinContextSlotError(error(myErr)) + assert.EqualError(t, err, `min context slot error "contextSlot" is not json.Number`) +} + +func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) { + myErr := &jsonrpc.RPCError{ + Code: -32016, + Message: "Minimum context slot has not been reached", + Data: map[string]interface{}{ + "contextSlot": json.Number("HelloWorld"), + }, + } + + _, _, err := ccqIsMinContextSlotError(error(myErr)) + assert.True(t, strings.Contains(err.Error(), `min context slot error "contextSlot" is not uint64`)) +} diff --git a/sdk/js-query/src/query/solana.test.ts b/sdk/js-query/src/query/solana.test.ts index a45e6d821..e788db6e5 100644 --- a/sdk/js-query/src/query/solana.test.ts +++ b/sdk/js-query/src/query/solana.test.ts @@ -27,6 +27,7 @@ const ENV = "DEVNET"; const SERVER_URL = CI ? "http://query-server:" : "http://localhost:"; const CCQ_SERVER_URL = SERVER_URL + "6069/v1"; const QUERY_URL = CCQ_SERVER_URL + "/query"; +const SOLANA_NODE_URL = CI ? "http://solana-devnet:8899" : "http://localhost:8899"; const PRIVATE_KEY = "cfb12303a19cde580bb4dd771639b0d26bc68353645571a8cff516ab2ee113a0"; @@ -36,6 +37,17 @@ const ACCOUNTS = [ "BVxyYhm498L79r4HMQ9sxZ5bi41DmJmeWZ7SCS7Cyvna", // Example NFT in devnet ]; +async function getSolanaSlot(comm: string): Promise { + const response = await axios.post(SOLANA_NODE_URL, { + jsonrpc: "2.0", + id: 1, + method: "getSlot", + params: [{ commitment: comm, transactionDetails: "none" }], + }); + + return response.data.result; +} + describe("solana", () => { test("serialize and deserialize sol_account request with defaults", () => { const solAccountReq = new SolanaAccountQueryRequest("finalized", ACCOUNTS); @@ -175,6 +187,66 @@ describe("solana", () => { "01000000574108aed69daf7e625a361864b1f74d13702f2ca56de9660e566d1d8691848d0000e8890423c78a0901000000000000000000000000000000000000000000000000000000000000000000000000" ); + expect(sar.results[1].lamports).toEqual(BigInt(1461600)); + expect(sar.results[1].rentEpoch).toEqual(BigInt(0)); + expect(sar.results[1].executable).toEqual(false); + expect(base58.encode(Buffer.from(sar.results[1].owner))).toEqual( + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" + ); + expect(Buffer.from(sar.results[1].data).toString("hex")).toEqual( + "01000000574108aed69daf7e625a361864b1f74d13702f2ca56de9660e566d1d8691848d01000000000000000001000000000000000000000000000000000000000000000000000000000000000000000000" + ); + }); + test("sol_account query with future min context slot", async () => { + const currSlot = await getSolanaSlot("finalized"); + const minContextSlot = BigInt(currSlot) + BigInt(10); + const solAccountReq = new SolanaAccountQueryRequest( + "finalized", + ACCOUNTS, + minContextSlot + ); + const nonce = 42; + const query = new PerChainQueryRequest(1, solAccountReq); + const request = new QueryRequest(nonce, [query]); + const serialized = request.serialize(); + const digest = QueryRequest.digest(ENV, serialized); + const signature = sign(PRIVATE_KEY, digest); + const response = await axios.put( + QUERY_URL, + { + signature, + bytes: Buffer.from(serialized).toString("hex"), + }, + { headers: { "X-API-Key": "my_secret_key" } } + ); + expect(response.status).toBe(200); + + const queryResponse = QueryResponse.from(response.data.bytes); + expect(queryResponse.version).toEqual(1); + expect(queryResponse.requestChainId).toEqual(0); + expect(queryResponse.request.version).toEqual(1); + expect(queryResponse.request.requests.length).toEqual(1); + expect(queryResponse.request.requests[0].chainId).toEqual(1); + expect(queryResponse.request.requests[0].query.type()).toEqual( + ChainQueryType.SolanaAccount + ); + + const sar = queryResponse.responses[0] + .response as SolanaAccountQueryResponse; + expect(sar.slotNumber).toEqual(minContextSlot); + expect(sar.blockTime).not.toEqual(BigInt(0)); + expect(sar.results.length).toEqual(2); + + expect(sar.results[0].lamports).toEqual(BigInt(1461600)); + expect(sar.results[0].rentEpoch).toEqual(BigInt(0)); + expect(sar.results[0].executable).toEqual(false); + expect(base58.encode(Buffer.from(sar.results[0].owner))).toEqual( + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" + ); + expect(Buffer.from(sar.results[0].data).toString("hex")).toEqual( + "01000000574108aed69daf7e625a361864b1f74d13702f2ca56de9660e566d1d8691848d0000e8890423c78a0901000000000000000000000000000000000000000000000000000000000000000000000000" + ); + expect(sar.results[1].lamports).toEqual(BigInt(1461600)); expect(sar.results[1].rentEpoch).toEqual(BigInt(0)); expect(sar.results[1].executable).toEqual(false);