From 9514b3562cffdba1ba9cd62a8d2334fc6a6de0cd Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Wed, 20 Mar 2024 08:42:31 -0500 Subject: [PATCH] Node/CCQ: Concurrent queries (#3844) * Node/CCQ: Concurrent queries * Substantial rework --- node/pkg/query/query.go | 131 +++++++++++++++++-------- node/pkg/query/query_test.go | 8 ++ node/pkg/watchers/evm/ccq.go | 13 ++- node/pkg/watchers/evm/watcher.go | 22 ++--- node/pkg/watchers/solana/ccq.go | 9 +- node/pkg/watchers/solana/client.go | 17 +--- sdk/js-query/src/query/ethCall.test.ts | 67 +++++++++++++ sdk/js-query/src/query/solana.test.ts | 67 +++++++++++++ 8 files changed, 259 insertions(+), 75 deletions(-) diff --git a/node/pkg/query/query.go b/node/pkg/query/query.go index 80d00937a..dced530c4 100644 --- a/node/pkg/query/query.go +++ b/node/pkg/query/query.go @@ -56,6 +56,11 @@ func NewQueryHandler( } type ( + // Watcher is the interface that any watcher that supports cross chain queries must implement. + Watcher interface { + QueryHandler(ctx context.Context, queryRequest *PerChainQueryInternal) + } + // QueryHandler defines the cross chain query handler. QueryHandler struct { logger *zap.Logger @@ -87,8 +92,56 @@ type ( channel chan *PerChainQueryInternal lastUpdateTime time.Time } + + PerChainConfig struct { + TimestampCacheSupported bool + NumWorkers int + } ) +// perChainConfig provides static config info for each chain. If a chain is not listed here, then it does not support queries. +// Every chain listed here must have at least one worker specified. +var perChainConfig = map[vaa.ChainID]PerChainConfig{ + vaa.ChainIDSolana: {NumWorkers: 10, TimestampCacheSupported: false}, + vaa.ChainIDEthereum: {NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDBSC: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDPolygon: {NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDAvalanche: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDOasis: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDAurora: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDFantom: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDKarura: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDAcala: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDKlaytn: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDCelo: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDMoonbeam: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDArbitrum: {NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDOptimism: {NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDBase: {NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDScroll: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDMantle: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDHolesky: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDArbitrumSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDBaseSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDOptimismSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDPolygonSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, +} + +// GetPerChainConfig returns the config for the specified chain. If the chain is not configured it returns an empty struct, +// which is not an error. It just means that queries are not supported for that chain. +func GetPerChainConfig(chainID vaa.ChainID) PerChainConfig { + if pcc, exists := perChainConfig[chainID]; exists { + return pcc + } + return PerChainConfig{} +} + +// QueriesSupported can be used by the watcher to determine if queries are supported for the chain. +func (config PerChainConfig) QueriesSupported() bool { + return config.NumWorkers > 0 +} + // Start initializes the query handler and starts the runnable. func (qh *QueryHandler) Start(ctx context.Context) error { qh.logger.Debug("entering Start", zap.String("enforceFlag", qh.allowedRequestorsStr)) @@ -130,40 +183,15 @@ func handleQueryRequestsImpl( pendingQueries := make(map[string]*pendingQuery) // Key is requestID. - // CCQ is currently only supported on EVM and Solana. - supportedChains := map[vaa.ChainID]struct{}{ - vaa.ChainIDSolana: {}, - vaa.ChainIDEthereum: {}, - vaa.ChainIDBSC: {}, - vaa.ChainIDPolygon: {}, - vaa.ChainIDAvalanche: {}, - vaa.ChainIDOasis: {}, - vaa.ChainIDAurora: {}, - vaa.ChainIDFantom: {}, - vaa.ChainIDKarura: {}, - vaa.ChainIDAcala: {}, - vaa.ChainIDKlaytn: {}, - vaa.ChainIDCelo: {}, - vaa.ChainIDMoonbeam: {}, - vaa.ChainIDArbitrum: {}, - vaa.ChainIDOptimism: {}, - vaa.ChainIDBase: {}, - vaa.ChainIDScroll: {}, - vaa.ChainIDMantle: {}, - vaa.ChainIDSepolia: {}, - vaa.ChainIDHolesky: {}, - vaa.ChainIDArbitrumSepolia: {}, - vaa.ChainIDBaseSepolia: {}, - vaa.ChainIDOptimismSepolia: {}, - vaa.ChainIDPolygonSepolia: {}, - } - - // But we don't want to allow CCQ if the chain is not enabled. - for chainID := range supportedChains { - if _, exists := chainQueryReqC[chainID]; !exists { - delete(supportedChains, chainID) - } else { - logger.Info("queries supported on chain", zap.Stringer("chainID", chainID)) + // Create the set of chains for which CCQ is actually enabled. Those are the ones in the config for which we actually have a watcher enabled. + supportedChains := make(map[vaa.ChainID]struct{}) + for chainID, config := range perChainConfig { + if _, exists := chainQueryReqC[chainID]; exists { + if config.NumWorkers <= 0 { + panic(fmt.Sprintf(`invalid per chain config entry for "%s", no workers specified`, chainID.String())) + } + logger.Info("queries supported on chain", zap.Stringer("chainID", chainID), zap.Int("numWorkers", config.NumWorkers)) + supportedChains[chainID] = struct{}{} // Make sure we have a metric for every enabled chain, so we can see which ones are actually enabled. totalRequestsByChain.WithLabelValues(chainID.String()).Add(0) @@ -452,13 +480,30 @@ func (pq *pendingQuery) numPendingRequests() int { return numPending } -func SupportsTimestampCaching(chainID vaa.ChainID) bool { - /* - - P1: Ethereum, Base, Optimism - - P1.5: Arbitrum, Polygon, Avalanche - - P2: BNB Chain, Moonbeam - - P3: Acala, Celo, Fantom, Karura, Klaytn, Oasis - */ - - return true +// StartWorkers is used by the watchers to start the query handler worker routines. +func StartWorkers( + ctx context.Context, + logger *zap.Logger, + errC chan error, + w Watcher, + queryReqC <-chan *PerChainQueryInternal, + config PerChainConfig, + tag string, +) { + for count := 0; count < config.NumWorkers; count++ { + workerId := count + common.RunWithScissors(ctx, errC, fmt.Sprintf("%s_fetch_query_req", tag), func(ctx context.Context) error { + logger.Debug("CONCURRENT: starting worker", zap.Int("worker", workerId)) + for { + select { + case <-ctx.Done(): + return nil + case queryRequest := <-queryReqC: + logger.Debug("CONCURRENT: processing query request", zap.Int("worker", workerId)) + w.QueryHandler(ctx, queryRequest) + logger.Debug("CONCURRENT: finished processing query request", zap.Int("worker", workerId)) + } + } + }) + } } diff --git a/node/pkg/query/query_test.go b/node/pkg/query/query_test.go index ce7051d05..f75daaa02 100644 --- a/node/pkg/query/query_test.go +++ b/node/pkg/query/query_test.go @@ -810,3 +810,11 @@ func TestPublishRetrySucceeds(t *testing.T) { assert.Equal(t, 1, md.getRequestsPerChain(vaa.ChainIDPolygon)) assert.True(t, validateResponseForTest(t, queryResponsePublication, signedQueryRequest, queryRequest, expectedResults)) } + +func TestPerChainConfigValid(t *testing.T) { + for chainID, config := range perChainConfig { + if config.NumWorkers <= 0 { + assert.Equal(t, "", fmt.Sprintf(`perChainConfig for "%s" has an invalid NumWorkers: %d`, chainID.String(), config.NumWorkers)) + } + } +} diff --git a/node/pkg/watchers/evm/ccq.go b/node/pkg/watchers/evm/ccq.go index 1cac8120f..ab1999c4c 100644 --- a/node/pkg/watchers/evm/ccq.go +++ b/node/pkg/watchers/evm/ccq.go @@ -19,6 +19,15 @@ import ( "github.com/certusone/wormhole/node/pkg/query" ) +// ccqStart starts up CCQ query processing. +func (w *Watcher) ccqStart(ctx context.Context, errC chan error) { + if w.ccqTimestampCache != nil && w.ccqBackfillCache { + w.ccqBackfillStart(ctx, errC) + } + + query.StartWorkers(ctx, w.ccqLogger, errC, w, w.queryReqC, w.ccqConfig, w.chainID.String()) +} + // ccqSendQueryResponse sends a response back to the query handler. In the case of an error, the response parameter may be nil. func (w *Watcher) ccqSendQueryResponse(req *query.PerChainQueryInternal, status query.QueryStatus, response query.ChainSpecificResponse) { queryResponse := query.CreatePerChainQueryResponseInternal(req.RequestID, req.RequestIdx, req.Request.ChainId, status, response) @@ -30,8 +39,8 @@ func (w *Watcher) ccqSendQueryResponse(req *query.PerChainQueryInternal, status } } -// ccqHandleQuery is the top-level query handler. It breaks out the requests based on the type and calls the appropriate handler. -func (w *Watcher) ccqHandleQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal) { +// QueryHandler is the top-level query handler. It breaks out the requests based on the type and calls the appropriate handler. +func (w *Watcher) QueryHandler(ctx context.Context, queryRequest *query.PerChainQueryInternal) { // This can't happen unless there is a programming error - the caller // is expected to send us only requests for our chainID. diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 1f7b4fd24..77aac8d05 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -129,6 +129,7 @@ type ( latestFinalizedBlockNumber uint64 l1Finalizer interfaces.L1Finalizer + ccqConfig query.PerChainConfig ccqMaxBlockNumber *big.Int ccqTimestampCache *BlocksByTimestamp ccqBackfillChannel chan *ccqBackfillRequest @@ -179,6 +180,7 @@ func NewEthWatcher( queryResponseC: queryResponseC, pending: map[pendingKey]*pendingMessage{}, unsafeDevMode: unsafeDevMode, + ccqConfig: query.GetPerChainConfig(chainID), ccqMaxBlockNumber: big.NewInt(0).SetUint64(math.MaxUint64), ccqBackfillCache: ccqBackfillCache, ccqBackfillChannel: make(chan *ccqBackfillRequest, 50), @@ -188,6 +190,7 @@ func NewEthWatcher( func (w *Watcher) Run(parentCtx context.Context) error { var err error logger := supervisor.Logger(parentCtx) + w.ccqLogger = logger.With(zap.String("component", "ccqevm")) logger.Info("Starting watcher", zap.String("watcher_name", "evm"), @@ -198,8 +201,6 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Bool("unsafeDevMode", w.unsafeDevMode), ) - w.ccqLogger = logger.With(zap.String("component", "ccqevm")) - // later on we will spawn multiple go-routines through `RunWithScissors`, i.e. catching panics. // If any of them panic, this function will return, causing this child context to be canceled // such that the other go-routines can free up resources @@ -258,7 +259,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { } } - if query.SupportsTimestampCaching(w.chainID) { + if w.ccqConfig.TimestampCacheSupported { w.ccqTimestampCache = NewBlocksByTimestamp(BTS_MAX_BLOCKS) } @@ -421,21 +422,10 @@ func (w *Watcher) Run(parentCtx context.Context) error { } }) - if w.ccqTimestampCache != nil && w.ccqBackfillCache { - w.ccqBackfillStart(ctx, errC) + if w.ccqConfig.QueriesSupported() { + w.ccqStart(ctx, errC) } - common.RunWithScissors(ctx, errC, "evm_fetch_query_req", func(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return nil - case queryRequest := <-w.queryReqC: - w.ccqHandleQuery(ctx, queryRequest) - } - } - }) - common.RunWithScissors(ctx, errC, "evm_fetch_messages", func(ctx context.Context) error { for { select { diff --git a/node/pkg/watchers/solana/ccq.go b/node/pkg/watchers/solana/ccq.go index 1ec91a605..24e173273 100644 --- a/node/pkg/watchers/solana/ccq.go +++ b/node/pkg/watchers/solana/ccq.go @@ -29,6 +29,11 @@ const ( CCQ_FAST_RETRY_INTERVAL = 200 * time.Millisecond ) +// ccqStart starts up CCQ query processing. +func (w *SolanaWatcher) ccqStart(ctx context.Context) { + query.StartWorkers(ctx, w.ccqLogger, w.errC, w, w.queryReqC, w.ccqConfig, w.chainID.String()) +} + // ccqSendQueryResponse sends a response back to the query handler. func (w *SolanaWatcher) ccqSendQueryResponse(queryResponse *query.PerChainQueryResponseInternal) { select { @@ -45,8 +50,8 @@ func (w *SolanaWatcher) ccqSendErrorResponse(req *query.PerChainQueryInternal, s w.ccqSendQueryResponse(queryResponse) } -// ccqHandleQuery is the top-level query handler. It breaks out the requests based on the type and calls the appropriate handler. -func (w *SolanaWatcher) ccqHandleQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal) { +// QueryHandler is the top-level query handler. It breaks out the requests based on the type and calls the appropriate handler. +func (w *SolanaWatcher) QueryHandler(ctx context.Context, queryRequest *query.PerChainQueryInternal) { // This can't happen unless there is a programming error - the caller // is expected to send us only requests for our chainID. if queryRequest.Request.ChainId != w.chainID { diff --git a/node/pkg/watchers/solana/client.go b/node/pkg/watchers/solana/client.go index b42d2d492..9612d1337 100644 --- a/node/pkg/watchers/solana/client.go +++ b/node/pkg/watchers/solana/client.go @@ -67,6 +67,7 @@ type ( // Outbound query responses to query requests queryResponseC chan<- *query.PerChainQueryResponseInternal + ccqConfig query.PerChainConfig ccqLogger *zap.Logger } @@ -228,6 +229,7 @@ func NewSolanaWatcher( networkName: chainID.String(), queryReqC: queryReqC, queryResponseC: queryResponseC, + ccqConfig: query.GetPerChainConfig(chainID), } } @@ -306,6 +308,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { }) logger := supervisor.Logger(ctx) + s.ccqLogger = logger.With(zap.String("component", "ccqsol")) wsUrl := "" if s.wsUrl != nil { @@ -413,18 +416,8 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { } }) - if s.commitment == rpc.CommitmentType("finalized") { - s.ccqLogger = logger.With(zap.String("component", "ccqsol")) - common.RunWithScissors(ctx, s.errC, "solana_fetch_query_req", func(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return nil - case queryRequest := <-s.queryReqC: - s.ccqHandleQuery(ctx, queryRequest) - } - } - }) + if s.commitment == rpc.CommitmentType("finalized") && s.ccqConfig.QueriesSupported() { + s.ccqStart(ctx) } select { diff --git a/sdk/js-query/src/query/ethCall.test.ts b/sdk/js-query/src/query/ethCall.test.ts index e07864aa3..14bba3db5 100644 --- a/sdk/js-query/src/query/ethCall.test.ts +++ b/sdk/js-query/src/query/ethCall.test.ts @@ -8,6 +8,7 @@ import { } from "@jest/globals"; import Web3, { ETH_DATA_FORMAT } from "web3"; import axios from "axios"; +import { AxiosResponse } from "axios"; import { ChainQueryType, EthCallData, @@ -829,4 +830,70 @@ describe("eth call", () => { }); expect(err).toBe(true); }); + test("concurrent queries", async () => { + const nameCallData = createTestEthCallData(WETH_ADDRESS, "name", "string"); + const decimalsCallData = createTestEthCallData( + WETH_ADDRESS, + "decimals", + "uint8" + ); + const blockNumber = await web3.eth.getBlockNumber(ETH_DATA_FORMAT); + const ethCall = new EthCallQueryRequest(blockNumber, [ + nameCallData, + decimalsCallData, + ]); + const chainId = 2; + const ethQuery = new PerChainQueryRequest(chainId, ethCall); + let nonce = 1; + let promises: Promise>[] = []; + for (let count = 0; count < 20; count++) { + nonce += 1; + const request = new QueryRequest(nonce, [ethQuery]); + const serialized = request.serialize(); + const digest = QueryRequest.digest(ENV, serialized); + const signature = sign(PRIVATE_KEY, digest); + const response = axios.put( + QUERY_URL, + { + signature, + bytes: Buffer.from(serialized).toString("hex"), + }, + { headers: { "X-API-Key": "my_secret_key" } } + ); + promises.push(response); + } + + const responses = await Promise.all(promises); + + expect(responses.length).toEqual(promises.length); + for (let idx = 0; idx < responses.length; idx++) { + const response = responses[idx]; + expect(response.status).toBe(200); + + const queryResponse = QueryResponse.from(response.data.bytes); + expect(queryResponse.version).toEqual(1); + expect(queryResponse.requestChainId).toEqual(0); + expect(queryResponse.request.version).toEqual(1); + expect(queryResponse.request.requests.length).toEqual(1); + expect(queryResponse.request.requests[0].chainId).toEqual(2); + expect(queryResponse.request.requests[0].query.type()).toEqual( + ChainQueryType.EthCall + ); + + const ecr = queryResponse.responses[0].response as EthCallQueryResponse; + expect(ecr.blockNumber).toEqual(BigInt(blockNumber)); + expect(ecr.blockHash).toEqual( + (await web3.eth.getBlock(BigInt(blockNumber))).hash + ); + expect(ecr.results.length).toEqual(2); + expect(ecr.results[0]).toEqual( + // Name + "0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000d5772617070656420457468657200000000000000000000000000000000000000" + ); + expect(ecr.results[1]).toEqual( + // Decimals + "0x0000000000000000000000000000000000000000000000000000000000000012" + ); + } + }); }); diff --git a/sdk/js-query/src/query/solana.test.ts b/sdk/js-query/src/query/solana.test.ts index 0c7074dd3..51c474b57 100644 --- a/sdk/js-query/src/query/solana.test.ts +++ b/sdk/js-query/src/query/solana.test.ts @@ -8,6 +8,7 @@ import { } from "@jest/globals"; import Web3, { ETH_DATA_FORMAT } from "web3"; import axios from "axios"; +import { AxiosResponse } from "axios"; import base58 from "bs58"; import { ChainQueryType, @@ -438,4 +439,70 @@ describe("solana", () => { "57cd18b7f8a4d91a2da9ab4af05d0fbe" ); }); + test("concurrent queries", async () => { + const solAccountReq = new SolanaAccountQueryRequest("finalized", ACCOUNTS); + const query = new PerChainQueryRequest(1, solAccountReq); + let nonce = 42; + let promises: Promise>[] = []; + for (let count = 0; count < 20; count++) { + nonce += 1; + const request = new QueryRequest(nonce, [query]); + const serialized = request.serialize(); + const digest = QueryRequest.digest(ENV, serialized); + const signature = sign(PRIVATE_KEY, digest); + const response = axios.put( + QUERY_URL, + { + signature, + bytes: Buffer.from(serialized).toString("hex"), + }, + { headers: { "X-API-Key": "my_secret_key" } } + ); + promises.push(response); + } + + const responses = await Promise.all(promises); + + expect(responses.length).toEqual(promises.length); + for (let idx = 0; idx < responses.length; idx++) { + const response = responses[idx]; + expect(response.status).toBe(200); + + const queryResponse = QueryResponse.from(response.data.bytes); + expect(queryResponse.version).toEqual(1); + expect(queryResponse.requestChainId).toEqual(0); + expect(queryResponse.request.version).toEqual(1); + expect(queryResponse.request.requests.length).toEqual(1); + expect(queryResponse.request.requests[0].chainId).toEqual(1); + expect(queryResponse.request.requests[0].query.type()).toEqual( + ChainQueryType.SolanaAccount + ); + + const sar = queryResponse.responses[0] + .response as SolanaAccountQueryResponse; + expect(Number(sar.slotNumber)).not.toEqual(0); + expect(Number(sar.blockTime)).not.toEqual(0); + expect(sar.results.length).toEqual(2); + + expect(Number(sar.results[0].lamports)).toEqual(1461600); + expect(Number(sar.results[0].rentEpoch)).toEqual(0); + expect(sar.results[0].executable).toEqual(false); + expect(base58.encode(Buffer.from(sar.results[0].owner))).toEqual( + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" + ); + expect(Buffer.from(sar.results[0].data).toString("hex")).toEqual( + "01000000574108aed69daf7e625a361864b1f74d13702f2ca56de9660e566d1d8691848d0000e8890423c78a0901000000000000000000000000000000000000000000000000000000000000000000000000" + ); + + expect(Number(sar.results[1].lamports)).toEqual(1461600); + expect(Number(sar.results[1].rentEpoch)).toEqual(0); + expect(sar.results[1].executable).toEqual(false); + expect(base58.encode(Buffer.from(sar.results[1].owner))).toEqual( + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" + ); + expect(Buffer.from(sar.results[1].data).toString("hex")).toEqual( + "01000000574108aed69daf7e625a361864b1f74d13702f2ca56de9660e566d1d8691848d01000000000000000001000000000000000000000000000000000000000000000000000000000000000000000000" + ); + } + }); });