Node/CCQ: Concurrent queries (#3844)

* Node/CCQ: Concurrent queries

* Substantial rework
This commit is contained in:
bruce-riley 2024-03-20 08:42:31 -05:00 committed by GitHub
parent 281a3514ad
commit 9514b3562c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 259 additions and 75 deletions

View File

@ -56,6 +56,11 @@ func NewQueryHandler(
}
type (
// Watcher is the interface that any watcher that supports cross chain queries must implement.
Watcher interface {
QueryHandler(ctx context.Context, queryRequest *PerChainQueryInternal)
}
// QueryHandler defines the cross chain query handler.
QueryHandler struct {
logger *zap.Logger
@ -87,8 +92,56 @@ type (
channel chan *PerChainQueryInternal
lastUpdateTime time.Time
}
PerChainConfig struct {
TimestampCacheSupported bool
NumWorkers int
}
)
// perChainConfig provides static config info for each chain. If a chain is not listed here, then it does not support queries.
// Every chain listed here must have at least one worker specified.
var perChainConfig = map[vaa.ChainID]PerChainConfig{
vaa.ChainIDSolana: {NumWorkers: 10, TimestampCacheSupported: false},
vaa.ChainIDEthereum: {NumWorkers: 5, TimestampCacheSupported: true},
vaa.ChainIDBSC: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDPolygon: {NumWorkers: 5, TimestampCacheSupported: true},
vaa.ChainIDAvalanche: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDOasis: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDAurora: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDFantom: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDKarura: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDAcala: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDKlaytn: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDCelo: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDMoonbeam: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDArbitrum: {NumWorkers: 5, TimestampCacheSupported: true},
vaa.ChainIDOptimism: {NumWorkers: 5, TimestampCacheSupported: true},
vaa.ChainIDBase: {NumWorkers: 5, TimestampCacheSupported: true},
vaa.ChainIDScroll: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDMantle: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDHolesky: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDArbitrumSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDBaseSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDOptimismSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
vaa.ChainIDPolygonSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
}
// GetPerChainConfig returns the config for the specified chain. If the chain is not configured it returns an empty struct,
// which is not an error. It just means that queries are not supported for that chain.
func GetPerChainConfig(chainID vaa.ChainID) PerChainConfig {
if pcc, exists := perChainConfig[chainID]; exists {
return pcc
}
return PerChainConfig{}
}
// QueriesSupported can be used by the watcher to determine if queries are supported for the chain.
func (config PerChainConfig) QueriesSupported() bool {
return config.NumWorkers > 0
}
// Start initializes the query handler and starts the runnable.
func (qh *QueryHandler) Start(ctx context.Context) error {
qh.logger.Debug("entering Start", zap.String("enforceFlag", qh.allowedRequestorsStr))
@ -130,40 +183,15 @@ func handleQueryRequestsImpl(
pendingQueries := make(map[string]*pendingQuery) // Key is requestID.
// CCQ is currently only supported on EVM and Solana.
supportedChains := map[vaa.ChainID]struct{}{
vaa.ChainIDSolana: {},
vaa.ChainIDEthereum: {},
vaa.ChainIDBSC: {},
vaa.ChainIDPolygon: {},
vaa.ChainIDAvalanche: {},
vaa.ChainIDOasis: {},
vaa.ChainIDAurora: {},
vaa.ChainIDFantom: {},
vaa.ChainIDKarura: {},
vaa.ChainIDAcala: {},
vaa.ChainIDKlaytn: {},
vaa.ChainIDCelo: {},
vaa.ChainIDMoonbeam: {},
vaa.ChainIDArbitrum: {},
vaa.ChainIDOptimism: {},
vaa.ChainIDBase: {},
vaa.ChainIDScroll: {},
vaa.ChainIDMantle: {},
vaa.ChainIDSepolia: {},
vaa.ChainIDHolesky: {},
vaa.ChainIDArbitrumSepolia: {},
vaa.ChainIDBaseSepolia: {},
vaa.ChainIDOptimismSepolia: {},
vaa.ChainIDPolygonSepolia: {},
}
// But we don't want to allow CCQ if the chain is not enabled.
for chainID := range supportedChains {
if _, exists := chainQueryReqC[chainID]; !exists {
delete(supportedChains, chainID)
} else {
logger.Info("queries supported on chain", zap.Stringer("chainID", chainID))
// Create the set of chains for which CCQ is actually enabled. Those are the ones in the config for which we actually have a watcher enabled.
supportedChains := make(map[vaa.ChainID]struct{})
for chainID, config := range perChainConfig {
if _, exists := chainQueryReqC[chainID]; exists {
if config.NumWorkers <= 0 {
panic(fmt.Sprintf(`invalid per chain config entry for "%s", no workers specified`, chainID.String()))
}
logger.Info("queries supported on chain", zap.Stringer("chainID", chainID), zap.Int("numWorkers", config.NumWorkers))
supportedChains[chainID] = struct{}{}
// Make sure we have a metric for every enabled chain, so we can see which ones are actually enabled.
totalRequestsByChain.WithLabelValues(chainID.String()).Add(0)
@ -452,13 +480,30 @@ func (pq *pendingQuery) numPendingRequests() int {
return numPending
}
func SupportsTimestampCaching(chainID vaa.ChainID) bool {
/*
- P1: Ethereum, Base, Optimism
- P1.5: Arbitrum, Polygon, Avalanche
- P2: BNB Chain, Moonbeam
- P3: Acala, Celo, Fantom, Karura, Klaytn, Oasis
*/
return true
// StartWorkers is used by the watchers to start the query handler worker routines.
func StartWorkers(
ctx context.Context,
logger *zap.Logger,
errC chan error,
w Watcher,
queryReqC <-chan *PerChainQueryInternal,
config PerChainConfig,
tag string,
) {
for count := 0; count < config.NumWorkers; count++ {
workerId := count
common.RunWithScissors(ctx, errC, fmt.Sprintf("%s_fetch_query_req", tag), func(ctx context.Context) error {
logger.Debug("CONCURRENT: starting worker", zap.Int("worker", workerId))
for {
select {
case <-ctx.Done():
return nil
case queryRequest := <-queryReqC:
logger.Debug("CONCURRENT: processing query request", zap.Int("worker", workerId))
w.QueryHandler(ctx, queryRequest)
logger.Debug("CONCURRENT: finished processing query request", zap.Int("worker", workerId))
}
}
})
}
}

View File

@ -810,3 +810,11 @@ func TestPublishRetrySucceeds(t *testing.T) {
assert.Equal(t, 1, md.getRequestsPerChain(vaa.ChainIDPolygon))
assert.True(t, validateResponseForTest(t, queryResponsePublication, signedQueryRequest, queryRequest, expectedResults))
}
func TestPerChainConfigValid(t *testing.T) {
for chainID, config := range perChainConfig {
if config.NumWorkers <= 0 {
assert.Equal(t, "", fmt.Sprintf(`perChainConfig for "%s" has an invalid NumWorkers: %d`, chainID.String(), config.NumWorkers))
}
}
}

View File

@ -19,6 +19,15 @@ import (
"github.com/certusone/wormhole/node/pkg/query"
)
// ccqStart starts up CCQ query processing.
func (w *Watcher) ccqStart(ctx context.Context, errC chan error) {
if w.ccqTimestampCache != nil && w.ccqBackfillCache {
w.ccqBackfillStart(ctx, errC)
}
query.StartWorkers(ctx, w.ccqLogger, errC, w, w.queryReqC, w.ccqConfig, w.chainID.String())
}
// ccqSendQueryResponse sends a response back to the query handler. In the case of an error, the response parameter may be nil.
func (w *Watcher) ccqSendQueryResponse(req *query.PerChainQueryInternal, status query.QueryStatus, response query.ChainSpecificResponse) {
queryResponse := query.CreatePerChainQueryResponseInternal(req.RequestID, req.RequestIdx, req.Request.ChainId, status, response)
@ -30,8 +39,8 @@ func (w *Watcher) ccqSendQueryResponse(req *query.PerChainQueryInternal, status
}
}
// ccqHandleQuery is the top-level query handler. It breaks out the requests based on the type and calls the appropriate handler.
func (w *Watcher) ccqHandleQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal) {
// QueryHandler is the top-level query handler. It breaks out the requests based on the type and calls the appropriate handler.
func (w *Watcher) QueryHandler(ctx context.Context, queryRequest *query.PerChainQueryInternal) {
// This can't happen unless there is a programming error - the caller
// is expected to send us only requests for our chainID.

View File

@ -129,6 +129,7 @@ type (
latestFinalizedBlockNumber uint64
l1Finalizer interfaces.L1Finalizer
ccqConfig query.PerChainConfig
ccqMaxBlockNumber *big.Int
ccqTimestampCache *BlocksByTimestamp
ccqBackfillChannel chan *ccqBackfillRequest
@ -179,6 +180,7 @@ func NewEthWatcher(
queryResponseC: queryResponseC,
pending: map[pendingKey]*pendingMessage{},
unsafeDevMode: unsafeDevMode,
ccqConfig: query.GetPerChainConfig(chainID),
ccqMaxBlockNumber: big.NewInt(0).SetUint64(math.MaxUint64),
ccqBackfillCache: ccqBackfillCache,
ccqBackfillChannel: make(chan *ccqBackfillRequest, 50),
@ -188,6 +190,7 @@ func NewEthWatcher(
func (w *Watcher) Run(parentCtx context.Context) error {
var err error
logger := supervisor.Logger(parentCtx)
w.ccqLogger = logger.With(zap.String("component", "ccqevm"))
logger.Info("Starting watcher",
zap.String("watcher_name", "evm"),
@ -198,8 +201,6 @@ func (w *Watcher) Run(parentCtx context.Context) error {
zap.Bool("unsafeDevMode", w.unsafeDevMode),
)
w.ccqLogger = logger.With(zap.String("component", "ccqevm"))
// later on we will spawn multiple go-routines through `RunWithScissors`, i.e. catching panics.
// If any of them panic, this function will return, causing this child context to be canceled
// such that the other go-routines can free up resources
@ -258,7 +259,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
}
}
if query.SupportsTimestampCaching(w.chainID) {
if w.ccqConfig.TimestampCacheSupported {
w.ccqTimestampCache = NewBlocksByTimestamp(BTS_MAX_BLOCKS)
}
@ -421,21 +422,10 @@ func (w *Watcher) Run(parentCtx context.Context) error {
}
})
if w.ccqTimestampCache != nil && w.ccqBackfillCache {
w.ccqBackfillStart(ctx, errC)
if w.ccqConfig.QueriesSupported() {
w.ccqStart(ctx, errC)
}
common.RunWithScissors(ctx, errC, "evm_fetch_query_req", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case queryRequest := <-w.queryReqC:
w.ccqHandleQuery(ctx, queryRequest)
}
}
})
common.RunWithScissors(ctx, errC, "evm_fetch_messages", func(ctx context.Context) error {
for {
select {

View File

@ -29,6 +29,11 @@ const (
CCQ_FAST_RETRY_INTERVAL = 200 * time.Millisecond
)
// ccqStart starts up CCQ query processing.
func (w *SolanaWatcher) ccqStart(ctx context.Context) {
query.StartWorkers(ctx, w.ccqLogger, w.errC, w, w.queryReqC, w.ccqConfig, w.chainID.String())
}
// ccqSendQueryResponse sends a response back to the query handler.
func (w *SolanaWatcher) ccqSendQueryResponse(queryResponse *query.PerChainQueryResponseInternal) {
select {
@ -45,8 +50,8 @@ func (w *SolanaWatcher) ccqSendErrorResponse(req *query.PerChainQueryInternal, s
w.ccqSendQueryResponse(queryResponse)
}
// ccqHandleQuery is the top-level query handler. It breaks out the requests based on the type and calls the appropriate handler.
func (w *SolanaWatcher) ccqHandleQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal) {
// QueryHandler is the top-level query handler. It breaks out the requests based on the type and calls the appropriate handler.
func (w *SolanaWatcher) QueryHandler(ctx context.Context, queryRequest *query.PerChainQueryInternal) {
// This can't happen unless there is a programming error - the caller
// is expected to send us only requests for our chainID.
if queryRequest.Request.ChainId != w.chainID {

View File

@ -67,6 +67,7 @@ type (
// Outbound query responses to query requests
queryResponseC chan<- *query.PerChainQueryResponseInternal
ccqConfig query.PerChainConfig
ccqLogger *zap.Logger
}
@ -228,6 +229,7 @@ func NewSolanaWatcher(
networkName: chainID.String(),
queryReqC: queryReqC,
queryResponseC: queryResponseC,
ccqConfig: query.GetPerChainConfig(chainID),
}
}
@ -306,6 +308,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
})
logger := supervisor.Logger(ctx)
s.ccqLogger = logger.With(zap.String("component", "ccqsol"))
wsUrl := ""
if s.wsUrl != nil {
@ -413,18 +416,8 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
}
})
if s.commitment == rpc.CommitmentType("finalized") {
s.ccqLogger = logger.With(zap.String("component", "ccqsol"))
common.RunWithScissors(ctx, s.errC, "solana_fetch_query_req", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case queryRequest := <-s.queryReqC:
s.ccqHandleQuery(ctx, queryRequest)
}
}
})
if s.commitment == rpc.CommitmentType("finalized") && s.ccqConfig.QueriesSupported() {
s.ccqStart(ctx)
}
select {

View File

@ -8,6 +8,7 @@ import {
} from "@jest/globals";
import Web3, { ETH_DATA_FORMAT } from "web3";
import axios from "axios";
import { AxiosResponse } from "axios";
import {
ChainQueryType,
EthCallData,
@ -829,4 +830,70 @@ describe("eth call", () => {
});
expect(err).toBe(true);
});
test("concurrent queries", async () => {
const nameCallData = createTestEthCallData(WETH_ADDRESS, "name", "string");
const decimalsCallData = createTestEthCallData(
WETH_ADDRESS,
"decimals",
"uint8"
);
const blockNumber = await web3.eth.getBlockNumber(ETH_DATA_FORMAT);
const ethCall = new EthCallQueryRequest(blockNumber, [
nameCallData,
decimalsCallData,
]);
const chainId = 2;
const ethQuery = new PerChainQueryRequest(chainId, ethCall);
let nonce = 1;
let promises: Promise<AxiosResponse<any, any>>[] = [];
for (let count = 0; count < 20; count++) {
nonce += 1;
const request = new QueryRequest(nonce, [ethQuery]);
const serialized = request.serialize();
const digest = QueryRequest.digest(ENV, serialized);
const signature = sign(PRIVATE_KEY, digest);
const response = axios.put(
QUERY_URL,
{
signature,
bytes: Buffer.from(serialized).toString("hex"),
},
{ headers: { "X-API-Key": "my_secret_key" } }
);
promises.push(response);
}
const responses = await Promise.all(promises);
expect(responses.length).toEqual(promises.length);
for (let idx = 0; idx < responses.length; idx++) {
const response = responses[idx];
expect(response.status).toBe(200);
const queryResponse = QueryResponse.from(response.data.bytes);
expect(queryResponse.version).toEqual(1);
expect(queryResponse.requestChainId).toEqual(0);
expect(queryResponse.request.version).toEqual(1);
expect(queryResponse.request.requests.length).toEqual(1);
expect(queryResponse.request.requests[0].chainId).toEqual(2);
expect(queryResponse.request.requests[0].query.type()).toEqual(
ChainQueryType.EthCall
);
const ecr = queryResponse.responses[0].response as EthCallQueryResponse;
expect(ecr.blockNumber).toEqual(BigInt(blockNumber));
expect(ecr.blockHash).toEqual(
(await web3.eth.getBlock(BigInt(blockNumber))).hash
);
expect(ecr.results.length).toEqual(2);
expect(ecr.results[0]).toEqual(
// Name
"0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000d5772617070656420457468657200000000000000000000000000000000000000"
);
expect(ecr.results[1]).toEqual(
// Decimals
"0x0000000000000000000000000000000000000000000000000000000000000012"
);
}
});
});

View File

@ -8,6 +8,7 @@ import {
} from "@jest/globals";
import Web3, { ETH_DATA_FORMAT } from "web3";
import axios from "axios";
import { AxiosResponse } from "axios";
import base58 from "bs58";
import {
ChainQueryType,
@ -438,4 +439,70 @@ describe("solana", () => {
"57cd18b7f8a4d91a2da9ab4af05d0fbe"
);
});
test("concurrent queries", async () => {
const solAccountReq = new SolanaAccountQueryRequest("finalized", ACCOUNTS);
const query = new PerChainQueryRequest(1, solAccountReq);
let nonce = 42;
let promises: Promise<AxiosResponse<any, any>>[] = [];
for (let count = 0; count < 20; count++) {
nonce += 1;
const request = new QueryRequest(nonce, [query]);
const serialized = request.serialize();
const digest = QueryRequest.digest(ENV, serialized);
const signature = sign(PRIVATE_KEY, digest);
const response = axios.put(
QUERY_URL,
{
signature,
bytes: Buffer.from(serialized).toString("hex"),
},
{ headers: { "X-API-Key": "my_secret_key" } }
);
promises.push(response);
}
const responses = await Promise.all(promises);
expect(responses.length).toEqual(promises.length);
for (let idx = 0; idx < responses.length; idx++) {
const response = responses[idx];
expect(response.status).toBe(200);
const queryResponse = QueryResponse.from(response.data.bytes);
expect(queryResponse.version).toEqual(1);
expect(queryResponse.requestChainId).toEqual(0);
expect(queryResponse.request.version).toEqual(1);
expect(queryResponse.request.requests.length).toEqual(1);
expect(queryResponse.request.requests[0].chainId).toEqual(1);
expect(queryResponse.request.requests[0].query.type()).toEqual(
ChainQueryType.SolanaAccount
);
const sar = queryResponse.responses[0]
.response as SolanaAccountQueryResponse;
expect(Number(sar.slotNumber)).not.toEqual(0);
expect(Number(sar.blockTime)).not.toEqual(0);
expect(sar.results.length).toEqual(2);
expect(Number(sar.results[0].lamports)).toEqual(1461600);
expect(Number(sar.results[0].rentEpoch)).toEqual(0);
expect(sar.results[0].executable).toEqual(false);
expect(base58.encode(Buffer.from(sar.results[0].owner))).toEqual(
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
);
expect(Buffer.from(sar.results[0].data).toString("hex")).toEqual(
"01000000574108aed69daf7e625a361864b1f74d13702f2ca56de9660e566d1d8691848d0000e8890423c78a0901000000000000000000000000000000000000000000000000000000000000000000000000"
);
expect(Number(sar.results[1].lamports)).toEqual(1461600);
expect(Number(sar.results[1].rentEpoch)).toEqual(0);
expect(sar.results[1].executable).toEqual(false);
expect(base58.encode(Buffer.from(sar.results[1].owner))).toEqual(
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
);
expect(Buffer.from(sar.results[1].data).toString("hex")).toEqual(
"01000000574108aed69daf7e625a361864b1f74d13702f2ca56de9660e566d1d8691848d01000000000000000001000000000000000000000000000000000000000000000000000000000000000000000000"
);
}
});
});