From 36a82af4f050cdfda695b50257de908fb20d5121 Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Tue, 28 Nov 2023 08:51:30 -0600 Subject: [PATCH] CCQ/Node/EVM: Refactor (#3542) * CCQ/Node/EVM: Refactor * Code review rework --- node/pkg/p2p/ccq_p2p.go | 4 +- node/pkg/query/query.go | 2 +- node/pkg/query/request.go | 4 + node/pkg/query/response.go | 2 +- node/pkg/watchers/evm/ccq.go | 575 +++++++++++++++-------------------- 5 files changed, 259 insertions(+), 328 deletions(-) diff --git a/node/pkg/p2p/ccq_p2p.go b/node/pkg/p2p/ccq_p2p.go index 276c9bb9a..e098f5b35 100644 --- a/node/pkg/p2p/ccq_p2p.go +++ b/node/pkg/p2p/ccq_p2p.go @@ -251,14 +251,14 @@ func (ccq *ccqP2p) publisher(ctx context.Context, gk *ecdsa.PrivateKey, queryRes ccqP2pMessagesSent.Inc() if err != nil { ccq.logger.Error("failed to publish query response", - zap.String("requestID", msg.RequestID()), + zap.String("requestSignature", msg.Signature()), zap.Any("query_response", msg), zap.Any("signature", sig), zap.Error(err), ) } else { ccq.logger.Info("published signed query response", //TODO: Change to Debug - zap.String("requestID", msg.RequestID()), + zap.String("requestSignature", msg.Signature()), zap.Any("query_response", msg), zap.Any("signature", sig), ) diff --git a/node/pkg/query/query.go b/node/pkg/query/query.go index 4a87f4f67..ecdbab1fb 100644 --- a/node/pkg/query/query.go +++ b/node/pkg/query/query.go @@ -402,7 +402,7 @@ func parseAllowedRequesters(ccqAllowedRequesters string) (map[ethCommon.Address] var nullAddr ethCommon.Address result := make(map[ethCommon.Address]struct{}) for _, str := range strings.Split(ccqAllowedRequesters, ",") { - addr := ethCommon.BytesToAddress(ethCommon.Hex2Bytes(str)) + addr := ethCommon.BytesToAddress(ethCommon.Hex2Bytes(strings.TrimPrefix(str, "0x"))) if addr == nullAddr { return nil, fmt.Errorf("invalid value in `--ccqAllowedRequesters`: `%s`", str) } diff --git a/node/pkg/query/request.go b/node/pkg/query/request.go index 488ab46a8..9d539583e 100644 --- a/node/pkg/query/request.go +++ b/node/pkg/query/request.go @@ -121,6 +121,10 @@ type PerChainQueryInternal struct { Request *PerChainQueryRequest } +func (pcqi *PerChainQueryInternal) ID() string { + return fmt.Sprintf("%s:%d", pcqi.RequestID, pcqi.RequestIdx) +} + // QueryRequestDigest returns the query signing prefix based on the environment. func QueryRequestDigest(env common.Environment, b []byte) ethCommon.Hash { var queryRequestPrefix []byte diff --git a/node/pkg/query/response.go b/node/pkg/query/response.go index 6d707d979..8e113ae50 100644 --- a/node/pkg/query/response.go +++ b/node/pkg/query/response.go @@ -266,7 +266,7 @@ func (left *QueryResponsePublication) Equal(right *QueryResponsePublication) boo return true } -func (resp *QueryResponsePublication) RequestID() string { +func (resp *QueryResponsePublication) Signature() string { if resp == nil || resp.Request == nil { return "nil" } diff --git a/node/pkg/watchers/evm/ccq.go b/node/pkg/watchers/evm/ccq.go index cfff66a3a..078682824 100644 --- a/node/pkg/watchers/evm/ccq.go +++ b/node/pkg/watchers/evm/ccq.go @@ -3,6 +3,7 @@ package evm import ( "context" "encoding/hex" + "encoding/json" "fmt" "strings" "time" @@ -18,17 +19,18 @@ import ( "github.com/certusone/wormhole/node/pkg/query" ) -// ccqSendQueryResponseForError sends an error response back to the query handler. -func (w *Watcher) ccqSendQueryResponseForError(req *query.PerChainQueryInternal, status query.QueryStatus) { - queryResponse := query.CreatePerChainQueryResponseInternal(req.RequestID, req.RequestIdx, req.Request.ChainId, status, nil) +// ccqSendQueryResponse sends a response back to the query handler. In the case of an error, the response parameter may be nil. +func (w *Watcher) ccqSendQueryResponse(req *query.PerChainQueryInternal, status query.QueryStatus, response query.ChainSpecificResponse) { + queryResponse := query.CreatePerChainQueryResponseInternal(req.RequestID, req.RequestIdx, req.Request.ChainId, status, response) select { case w.queryResponseC <- queryResponse: - w.ccqLogger.Debug("published query response error to handler", zap.String("component", "ccqevm")) + w.ccqLogger.Debug("published query response error to handler") default: - w.ccqLogger.Error("failed to published query response error to handler", zap.String("component", "ccqevm")) + w.ccqLogger.Error("failed to published query response error to handler") } } +// ccqHandleQuery is the top-level query handler. It breaks out the requests based on the type and calls the appropriate handler. func (w *Watcher) ccqHandleQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal) { // This can't happen unless there is a programming error - the caller @@ -50,40 +52,55 @@ func (w *Watcher) ccqHandleQuery(ctx context.Context, queryRequest *query.PerCha w.ccqLogger.Warn("received unsupported request type", zap.Uint8("payload", uint8(queryRequest.Request.Query.Type())), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError) + w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil) } query.TotalWatcherTime.WithLabelValues(w.chainID.String()).Observe(float64(time.Since(start).Milliseconds())) } -// EvmCallData contains the details of a single query in the batch. +// EvmCallData contains the details of a single call in the batch. type EvmCallData struct { - to eth_common.Address - data string - callTransactionArg map[string]interface{} - callResult *eth_hexutil.Bytes + To eth_common.Address + Data string + CallResult *eth_hexutil.Bytes + + // These are lowercase so they don't get marshaled for logging purposes. JSON doesn't print anything meaningful for them anyway. callErr error + callTransactionArg map[string]interface{} } +func (ecd EvmCallData) String() string { + bytes, err := json.Marshal(ecd) + if err != nil { + bytes = []byte("invalid json") + } + + return string(bytes) +} + +// ccqHandleEthCallQueryRequest is the query handler for an eth_call request. func (w *Watcher) ccqHandleEthCallQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.EthCallQueryRequest) { + requestId := "eth_call:" + queryRequest.ID() block := req.BlockId w.ccqLogger.Info("received eth_call query request", + zap.String("requestId", requestId), zap.String("block", block), zap.Int("numRequests", len(req.CallData)), ) + // Create the block query args. blockMethod, callBlockArg, err := ccqCreateBlockRequest(block) if err != nil { w.ccqLogger.Error("invalid block id in eth_call query request", - zap.Error(err), + zap.String("requestId", requestId), zap.String("block", block), + zap.Error(err), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError) + w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil) return } - // We build two slices. The first is the batch submitted to the RPC call. It contains one entry for each query plus one to query the block. - // The second is the data associated with each request (but not the block request). The index into both is the index into the request call data. + // Create the batch of requested calls for the specified block. batch, evmCallData := ccqBuildBatchFromCallData(req, callBlockArg) // Add the block query to the batch. @@ -100,148 +117,111 @@ func (w *Watcher) ccqHandleEthCallQueryRequest(ctx context.Context, queryRequest }) // Query the RPC. + start := time.Now() timeout, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() err = w.ethConn.RawBatchCallContext(timeout, batch) if err != nil { w.ccqLogger.Error("failed to process eth_call query request", + zap.String("requestId", requestId), + zap.String("block", block), + zap.Any("batch", batch), zap.Error(err), - zap.String("block", block), - zap.Any("batch", batch), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) return } - if blockError != nil { - w.ccqLogger.Error("failed to process eth_call query block request", - zap.Error(blockError), + // Verify that the block read was successful. + if err := w.ccqVerifyBlockResult(blockError, blockResult); err != nil { + w.ccqLogger.Error("failed to verify block for eth_call query", + zap.String("requestId", requestId), zap.String("block", block), zap.Any("batch", batch), + zap.Error(err), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) return } - if blockResult.Number == nil { - w.ccqLogger.Error("invalid eth_call query block result", - zap.String("eth_network", w.networkName), + w.ccqLogger.Info("query complete for eth_call", + zap.String("requestId", requestId), + zap.String("block", block), + zap.String("blockNumber", blockResult.Number.String()), + zap.String("blockHash", blockResult.Hash.Hex()), + zap.String("blockTime", blockResult.Time.String()), + zap.Int64("duration", time.Since(start).Milliseconds()), + ) + + // Verify all the call results and build the batch of results. + results, err := w.ccqVerifyAndExtractQueryResults(requestId, evmCallData) + if err != nil { + w.ccqLogger.Error("failed to process eth_call query call request", + zap.String("requestId", requestId), zap.String("block", block), zap.Any("batch", batch), + zap.Error(err), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - return - } - - if blockResult.Number.ToInt().Cmp(w.ccqMaxBlockNumber) > 0 { - w.ccqLogger.Error("block number too large for eth_call", - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) return } + // Finally, build the response and publish it. resp := query.EthCallQueryResponse{ BlockNumber: blockResult.Number.ToInt().Uint64(), Hash: blockResult.Hash, Time: time.Unix(int64(blockResult.Time), 0), - Results: [][]byte{}, + Results: results, } - errFound := false - for idx := range req.CallData { - if evmCallData[idx].callErr != nil { - w.ccqLogger.Error("failed to process eth_call query call request", - zap.Error(evmCallData[idx].callErr), - zap.String("block", block), - zap.Int("errorIdx", idx), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - errFound = true - break - } - - // Nil or Empty results are not valid - // eth_call will return empty when the state doesn't exist for a block - if len(*evmCallData[idx].callResult) == 0 { - w.ccqLogger.Error("invalid call result for eth_call", - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.Int("errorIdx", idx), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - errFound = true - break - } - - w.ccqLogger.Info("query result for eth_call", - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.String("blockNumber", blockResult.Number.String()), - zap.String("blockHash", blockResult.Hash.Hex()), - zap.String("blockTime", blockResult.Time.String()), - zap.Int("idx", idx), - zap.String("to", evmCallData[idx].to.Hex()), - zap.Any("data", evmCallData[idx].data), - zap.String("result", evmCallData[idx].callResult.String()), - ) - - resp.Results = append(resp.Results, *evmCallData[idx].callResult) - } - - if !errFound { - queryResponse := query.CreatePerChainQueryResponseInternal(queryRequest.RequestID, queryRequest.RequestIdx, queryRequest.Request.ChainId, query.QuerySuccess, &resp) - select { - case w.queryResponseC <- queryResponse: - w.ccqLogger.Debug("published query response error to handler", zap.String("component", "ccqevm")) - default: - w.ccqLogger.Error("failed to published query response error to handler", zap.String("component", "ccqevm")) - } - } + w.ccqSendQueryResponse(queryRequest, query.QuerySuccess, &resp) } +// ccqHandleEthCallByTimestampQueryRequest is the query handler for an eth_call_by_timestamp request. func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.EthCallByTimestampQueryRequest) { + requestId := "eth_call_by_timestamp:" + queryRequest.ID() block := req.TargetBlockIdHint nextBlock := req.FollowingBlockIdHint w.ccqLogger.Info("received eth_call_by_timestamp query request", + zap.String("requestId", requestId), zap.Uint64("timestamp", req.TargetTimestamp), zap.String("block", block), zap.String("nextBlock", nextBlock), zap.Int("numRequests", len(req.CallData)), ) + // Verify that the two block hints are consistent, either both set, or both unset. if (block == "") != (nextBlock == "") { - w.ccqLogger.Error("invalid block id hints in eth_call_by_timestamp query request, if one is unset they both must be unset", + w.ccqLogger.Error("invalid block id hints in eth_call_by_timestamp query request, both should be either set or unset", + zap.String("requestId", requestId), zap.Uint64("timestamp", req.TargetTimestamp), zap.String("block", block), zap.String("nextBlock", nextBlock), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError) + w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil) return } + // Look up the blocks based on the timestamp if necessary. if block == "" { if w.ccqTimestampCache == nil { w.ccqLogger.Error("error in block id hints in eth_call_by_timestamp query request, they are unset and chain does not support timestamp caching") - w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError) + w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil) return } - // Look the timestamp up in the cache. Note that the cache uses native EVM time, which is seconds, but CCQ uses milliseconds, so we have to convert. + // Look the timestamp up in the cache. Note that the cache uses native EVM time, which is seconds, but CCQ uses microseconds, so we have to convert. blockNum, nextBlockNum, found := w.ccqTimestampCache.LookUp(req.TargetTimestamp / 1000000) if !found { w.ccqLogger.Error("block look up failed in eth_call_by_timestamp query request, timestamp not in cache, will retry", + zap.String("requestId", requestId), zap.Uint64("timestamp", req.TargetTimestamp), zap.String("block", block), zap.String("nextBlock", nextBlock), zap.Uint64("blockNum", blockNum), zap.Uint64("nextBlockNum", nextBlockNum), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) return } @@ -249,6 +229,7 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q nextBlock = fmt.Sprintf("0x%x", nextBlockNum) w.ccqLogger.Info("cache look up in eth_call_by_timestamp query request mapped timestamp to blocks", + zap.String("requestId", requestId), zap.Uint64("timestamp", req.TargetTimestamp), zap.String("block", block), zap.String("nextBlock", nextBlock), @@ -257,30 +238,32 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q ) } + // Create the query args for both blocks. blockMethod, callBlockArg, err := ccqCreateBlockRequest(block) if err != nil { w.ccqLogger.Error("invalid target block id hint in eth_call_by_timestamp query request", - zap.Error(err), + zap.String("requestId", requestId), zap.String("block", block), zap.String("nextBlock", nextBlock), + zap.Error(err), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError) + w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil) return } nextBlockMethod, _, err := ccqCreateBlockRequest(nextBlock) if err != nil { w.ccqLogger.Error("invalid following block id hint in eth_call_by_timestamp query request", - zap.Error(err), + zap.String("requestId", requestId), zap.String("block", block), zap.String("nextBlock", nextBlock), + zap.Error(err), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError) + w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil) return } - // We build two slices. The first is the batch submitted to the RPC call. It contains one entry for each query plus one to query the block and one for the next block. - // The second is the data associated with each request (but not the block requests). The index into both is the index into the request call data. + // Create the batch of requested calls for the specified block. batch, evmCallData := ccqBuildBatchFromCallData(req, callBlockArg) // Add the block query to the batch. @@ -310,86 +293,45 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q }) // Query the RPC. + start := time.Now() timeout, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() err = w.ethConn.RawBatchCallContext(timeout, batch) - if err != nil { w.ccqLogger.Error("failed to process eth_call_by_timestamp query request", + zap.String("requestId", requestId), + zap.String("block", block), + zap.String("nextBlock", nextBlock), + zap.Any("batch", batch), zap.Error(err), - zap.String("block", block), - zap.String("nextBlock", nextBlock), - zap.Any("batch", batch), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) return } - // Checks on the target block. - if blockError != nil { + // Verify the target block read was successful. + if err := w.ccqVerifyBlockResult(blockError, blockResult); err != nil { w.ccqLogger.Error("failed to process eth_call_by_timestamp query target block request", - zap.Error(blockError), + zap.String("requestId", requestId), zap.String("block", block), zap.String("nextBlock", nextBlock), zap.Any("batch", batch), + zap.Error(err), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) return } - if blockResult.Number == nil { - w.ccqLogger.Error("invalid eth_call_by_timestamp query target block result", - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.String("nextBlock", nextBlock), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - return - } - - if blockResult.Number.ToInt().Cmp(w.ccqMaxBlockNumber) > 0 { - w.ccqLogger.Error("target block number too large for eth_call_by_timestamp", - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.String("nextBlock", nextBlock), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - return - } - - // Checks on the following block. - if nextBlockError != nil { + // Verify the following block read was successful. + if err := w.ccqVerifyBlockResult(nextBlockError, nextBlockResult); err != nil { w.ccqLogger.Error("failed to process eth_call_by_timestamp query following block request", - zap.Error(nextBlockError), + zap.String("requestId", requestId), zap.String("block", block), zap.String("nextBlock", nextBlock), zap.Any("batch", batch), + zap.Error(err), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - return - } - - if nextBlockResult.Number == nil { - w.ccqLogger.Error("invalid eth_call_by_timestamp query following block result", - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.String("nextBlock", nextBlock), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - return - } - - if nextBlockResult.Number.ToInt().Cmp(w.ccqMaxBlockNumber) > 0 { - w.ccqLogger.Error("following block number too large for eth_call_by_timestamp", - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.String("nextBlock", nextBlock), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) return } @@ -407,8 +349,8 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q followingTimestamp := uint64(nextBlockResult.Time * 1000000) if targetBlockNum+1 != followingBlockNum { - w.ccqLogger.Error(" eth_call_by_timestamp query blocks are not adjacent", - zap.String("eth_network", w.networkName), + w.ccqLogger.Error("eth_call_by_timestamp query blocks are not adjacent", + zap.String("requestId", requestId), zap.Uint64("desiredTimestamp", req.TargetTimestamp), zap.Uint64("targetTimestamp", targetTimestamp), zap.Uint64("followingTimestamp", followingTimestamp), @@ -419,13 +361,13 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q zap.String("targetBlockTime", blockResult.Time.String()), zap.String("followingBlockTime", nextBlockResult.Time.String()), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError) + w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil) return } if req.TargetTimestamp < targetTimestamp || req.TargetTimestamp >= followingTimestamp { - w.ccqLogger.Error(" eth_call_by_timestamp desired timestamp falls outside of block range", - zap.String("eth_network", w.networkName), + w.ccqLogger.Error("eth_call_by_timestamp desired timestamp falls outside of block range", + zap.String("requestId", requestId), zap.Uint64("desiredTimestamp", req.TargetTimestamp), zap.Uint64("targetTimestamp", targetTimestamp), zap.Uint64("followingTimestamp", followingTimestamp), @@ -436,10 +378,39 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q zap.String("targetBlockTime", blockResult.Time.String()), zap.String("followingBlockTime", nextBlockResult.Time.String()), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError) + w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil) return } + w.ccqLogger.Info("query complete for eth_call_by_timestamp", + zap.String("requestId", requestId), + zap.Uint64("desiredTimestamp", req.TargetTimestamp), + zap.Uint64("targetTimestamp", targetTimestamp), + zap.Uint64("followingTimestamp", followingTimestamp), + zap.String("targetBlockNumber", blockResult.Number.String()), + zap.String("followingBlockNumber", nextBlockResult.Number.String()), + zap.String("targetBlockHash", blockResult.Hash.Hex()), + zap.String("followingBlockHash", nextBlockResult.Hash.Hex()), + zap.String("targetBlockTime", blockResult.Time.String()), + zap.String("followingBlockTime", nextBlockResult.Time.String()), + zap.Int64("duration", time.Since(start).Milliseconds()), + ) + + // Verify all the call results and build the batch of results. + results, err := w.ccqVerifyAndExtractQueryResults(requestId, evmCallData) + if err != nil { + w.ccqLogger.Error("failed to process eth_call_by_timestamp query call request", + zap.String("requestId", requestId), + zap.String("block", block), + zap.String("nextBlock", nextBlock), + zap.Any("batch", batch), + zap.Error(err), + ) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) + return + } + + // Finally, build the response and publish it. resp := query.EthCallByTimestampQueryResponse{ TargetBlockNumber: targetBlockNum, TargetBlockHash: blockResult.Hash, @@ -447,97 +418,48 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q FollowingBlockNumber: followingBlockNum, FollowingBlockHash: nextBlockResult.Hash, FollowingBlockTime: time.Unix(int64(nextBlockResult.Time), 0), - Results: [][]byte{}, + Results: results, } - errFound := false - for idx := range req.CallData { - if evmCallData[idx].callErr != nil { - w.ccqLogger.Error("failed to process eth_call_by_timestamp query call request", - zap.Error(evmCallData[idx].callErr), - zap.String("block", block), - zap.String("nextBlock", nextBlock), - zap.Int("errorIdx", idx), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - errFound = true - break - } - - // Nil or Empty results are not valid - // eth_call will return empty when the state doesn't exist for a block - if len(*evmCallData[idx].callResult) == 0 { - w.ccqLogger.Error("invalid call result for eth_call_by_timestamp", - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.String("nextBlock", nextBlock), - zap.Int("errorIdx", idx), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - errFound = true - break - } - - w.ccqLogger.Info(" eth_call_by_timestamp query result", - zap.String("eth_network", w.networkName), - zap.Uint64("desiredTimestamp", req.TargetTimestamp), - zap.Uint64("targetTimestamp", targetTimestamp), - zap.Uint64("followingTimestamp", followingTimestamp), - zap.String("targetBlockNumber", blockResult.Number.String()), - zap.String("followingBlockNumber", nextBlockResult.Number.String()), - zap.String("targetBlockHash", blockResult.Hash.Hex()), - zap.String("followingBlockHash", nextBlockResult.Hash.Hex()), - zap.String("targetBlockTime", blockResult.Time.String()), - zap.String("followingBlockTime", nextBlockResult.Time.String()), - zap.Int("idx", idx), - zap.String("to", evmCallData[idx].to.Hex()), - zap.Any("data", evmCallData[idx].data), - zap.String("result", evmCallData[idx].callResult.String()), - ) - - resp.Results = append(resp.Results, *evmCallData[idx].callResult) - } - - if !errFound { - queryResponse := query.CreatePerChainQueryResponseInternal(queryRequest.RequestID, queryRequest.RequestIdx, queryRequest.Request.ChainId, query.QuerySuccess, &resp) - select { - case w.queryResponseC <- queryResponse: - w.ccqLogger.Debug("published query response error to handler", zap.String("component", "ccqevm")) - default: - w.ccqLogger.Error("failed to published query response error to handler", zap.String("component", "ccqevm")) - } - } + w.ccqSendQueryResponse(queryRequest, query.QuerySuccess, &resp) } +// ccqHandleEthCallWithFinalityQueryRequest is the query handler for an eth_call_with_finality request. func (w *Watcher) ccqHandleEthCallWithFinalityQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.EthCallWithFinalityQueryRequest) { + requestId := "eth_call:" + queryRequest.ID() block := req.BlockId w.ccqLogger.Info("received eth_call_with_finality query request", + zap.String("requestId", requestId), zap.String("block", block), zap.String("finality", req.Finality), zap.Int("numRequests", len(req.CallData)), ) + // Validate the requested finality. safeMode := req.Finality == "safe" if req.Finality != "finalized" && !safeMode { - w.ccqLogger.Error("invalid finality in eth_call_with_finality query request", zap.String("block", block), zap.String("finality", req.Finality), zap.String("block", block)) - w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError) + w.ccqLogger.Error("invalid finality in eth_call_with_finality query request", + zap.String("requestId", requestId), + zap.String("block", block), + zap.String("finality", req.Finality), + ) + w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil) return } + // Create the block query args. blockMethod, callBlockArg, err := ccqCreateBlockRequest(block) if err != nil { w.ccqLogger.Error("invalid block id in eth_call_with_finality query request", - zap.Error(err), + zap.String("requestId", requestId), zap.String("block", block), + zap.Error(err), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError) + w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil) return } - // We build two slices. The first is the batch submitted to the RPC call. It contains one entry for each query plus one to query the block. - // The second is the data associated with each request (but not the block request). The index into both is the index into the request call data. + // Create the batch of requested calls for the specified block. batch, evmCallData := ccqBuildBatchFromCallData(req, callBlockArg) // Add the block query to the batch. @@ -554,51 +476,34 @@ func (w *Watcher) ccqHandleEthCallWithFinalityQueryRequest(ctx context.Context, }) // Query the RPC. + start := time.Now() timeout, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() err = w.ethConn.RawBatchCallContext(timeout, batch) - if err != nil { w.ccqLogger.Error("failed to process eth_call_with_finality query request", + zap.String("requestId", requestId), + zap.String("block", block), + zap.Any("batch", batch), zap.Error(err), - zap.String("block", block), - zap.Any("batch", batch), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) return } - if blockError != nil { + // Verify that the block read was successful. + if err := w.ccqVerifyBlockResult(blockError, blockResult); err != nil { w.ccqLogger.Error("failed to process eth_call_with_finality query block request", - zap.Error(blockError), + zap.String("requestId", requestId), zap.String("block", block), zap.Any("batch", batch), + zap.Error(err), ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) return } - if blockResult.Number == nil { - w.ccqLogger.Error("invalid eth_call_with_finality query block result", - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - return - } - - if blockResult.Number.ToInt().Cmp(w.ccqMaxBlockNumber) > 0 { - w.ccqLogger.Error("block number too large for eth_call_with_finality", - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - return - } - - blockNumber := blockResult.Number.ToInt().Uint64() + // Get the latest block number based on the requested finality. var latestBlockNum uint64 if safeMode { latestBlockNum = w.getLatestSafeBlockNumber() @@ -606,80 +511,56 @@ func (w *Watcher) ccqHandleEthCallWithFinalityQueryRequest(ctx context.Context, latestBlockNum = w.GetLatestFinalizedBlockNumber() } + // Make sure the block has reached requested finality. + blockNumber := blockResult.Number.ToInt().Uint64() if blockNumber > latestBlockNum { w.ccqLogger.Info("requested block for eth_call_with_finality has not yet reached the requested finality", - zap.String("finality", req.Finality), - zap.Uint64("requestedBlockNumber", blockNumber), - zap.Uint64("latestBlockNumber", latestBlockNum), - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - return - } - - resp := query.EthCallWithFinalityQueryResponse{ - BlockNumber: blockNumber, - Hash: blockResult.Hash, - Time: time.Unix(int64(blockResult.Time), 0), - Results: [][]byte{}, - } - - errFound := false - for idx := range req.CallData { - if evmCallData[idx].callErr != nil { - w.ccqLogger.Error("failed to process eth_call_with_finality query call request", - zap.Error(evmCallData[idx].callErr), - zap.String("block", block), - zap.Int("errorIdx", idx), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - errFound = true - break - } - - // Nil or Empty results are not valid - // eth_call_with_finality will return empty when the state doesn't exist for a block - if len(*evmCallData[idx].callResult) == 0 { - w.ccqLogger.Error("invalid call result for eth_call_with_finality", - zap.String("eth_network", w.networkName), - zap.String("block", block), - zap.Int("errorIdx", idx), - zap.Any("batch", batch), - ) - w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded) - errFound = true - break - } - - w.ccqLogger.Info("query result for eth_call_with_finality", - zap.String("eth_network", w.networkName), - zap.String("block", block), + zap.String("requestId", requestId), zap.String("finality", req.Finality), zap.Uint64("requestedBlockNumber", blockNumber), zap.Uint64("latestBlockNumber", latestBlockNum), zap.String("blockHash", blockResult.Hash.Hex()), zap.String("blockTime", blockResult.Time.String()), - zap.Int("idx", idx), - zap.String("to", evmCallData[idx].to.Hex()), - zap.Any("data", evmCallData[idx].data), - zap.String("result", evmCallData[idx].callResult.String()), ) - - resp.Results = append(resp.Results, *evmCallData[idx].callResult) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) + return } - if !errFound { - queryResponse := query.CreatePerChainQueryResponseInternal(queryRequest.RequestID, queryRequest.RequestIdx, queryRequest.Request.ChainId, query.QuerySuccess, &resp) - select { - case w.queryResponseC <- queryResponse: - w.ccqLogger.Debug("published query response error to handler", zap.String("component", "ccqevm")) - default: - w.ccqLogger.Error("failed to published query response error to handler", zap.String("component", "ccqevm")) - } + w.ccqLogger.Info("query complete for eth_call_with_finality", + zap.String("requestId", requestId), + zap.String("finality", req.Finality), + zap.Uint64("requestedBlockNumber", blockNumber), + zap.Uint64("latestBlockNumber", latestBlockNum), + zap.String("blockHash", blockResult.Hash.Hex()), + zap.String("blockTime", blockResult.Time.String()), + zap.Int64("duration", time.Since(start).Milliseconds()), + ) + + // Verify all the call results and build the batch of results. + results, err := w.ccqVerifyAndExtractQueryResults(requestId, evmCallData) + if err != nil { + w.ccqLogger.Error("failed to process eth_call_with_finality query call request", + zap.String("requestId", requestId), + zap.String("finality", req.Finality), + zap.Uint64("requestedBlockNumber", blockNumber), + zap.Uint64("latestBlockNumber", latestBlockNum), + zap.String("blockHash", blockResult.Hash.Hex()), + zap.String("blockTime", blockResult.Time.String()), + zap.Error(err), + ) + w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) + return } + + // Finally, build the response and publish it. + resp := query.EthCallWithFinalityQueryResponse{ + BlockNumber: blockNumber, + Hash: blockResult.Hash, + Time: time.Unix(int64(blockResult.Time), 0), + Results: results, + } + + w.ccqSendQueryResponse(queryRequest, query.QuerySuccess, &resp) } // ccqCreateBlockRequest creates a block query. It parses the block string, allowing for both a block number or a block hash. Note that for now, strings like "latest", "finalized" or "safe" @@ -732,6 +613,8 @@ type EthCallDataIntf interface { CallDataList() []*query.EthCallData } +// ccqBuildBatchFromCallData builds two slices. The first is the batch submitted to the RPC call. It contains one entry for each query plus one to query the block. +// The second is the data associated with each request (but not the block request). The index into both is the index into the request call data. func ccqBuildBatchFromCallData(req EthCallDataIntf, callBlockArg interface{}) ([]rpc.BatchElem, []EvmCallData) { batch := []rpc.BatchElem{} evmCallData := []EvmCallData{} @@ -741,13 +624,13 @@ func ccqBuildBatchFromCallData(req EthCallDataIntf, callBlockArg interface{}) ([ to := eth_common.BytesToAddress(callData.To) data := eth_hexutil.Encode(callData.Data) ecd := EvmCallData{ - to: to, - data: data, + To: to, + Data: data, callTransactionArg: map[string]interface{}{ "to": to, "data": data, }, - callResult: ð_hexutil.Bytes{}, + CallResult: ð_hexutil.Bytes{}, } evmCallData = append(evmCallData, ecd) @@ -757,7 +640,7 @@ func ccqBuildBatchFromCallData(req EthCallDataIntf, callBlockArg interface{}) ([ ecd.callTransactionArg, callBlockArg, }, - Result: ecd.callResult, + Result: ecd.CallResult, Error: ecd.callErr, }) } @@ -765,6 +648,50 @@ func ccqBuildBatchFromCallData(req EthCallDataIntf, callBlockArg interface{}) ([ return batch, evmCallData } +// ccqVerifyBlockResult does basic verification on the results of the block query. +func (w *Watcher) ccqVerifyBlockResult(blockError error, blockResult connectors.BlockMarshaller) error { //nolint:unparam + if blockError != nil { + return blockError + } + + if blockResult.Number == nil { + return fmt.Errorf("block result is nil") + } + + if blockResult.Number.ToInt().Cmp(w.ccqMaxBlockNumber) > 0 { + return fmt.Errorf("block number is too large") + } + + return nil +} + +// ccqVerifyAndExtractQueryResults verifies the array of call results and returns a vector of those results to be published. +func (w *Watcher) ccqVerifyAndExtractQueryResults(requestId string, evmCallData []EvmCallData) ([][]byte, error) { + var err error + results := [][]byte{} + for idx, evmCD := range evmCallData { + if evmCD.callErr != nil { + return nil, fmt.Errorf("call %d failed: %w", idx, evmCD.callErr) + } + + // Nil or Empty results are not valid eth_call will return empty when the state doesn't exist for a block + if len(*evmCD.CallResult) == 0 { + return nil, fmt.Errorf("call %d failed: result is empty", idx) + } + + w.ccqLogger.Info("query call data result", + zap.String("requestId", requestId), + zap.Int("idx", idx), + zap.Stringer("callData", evmCD), + ) + + results = append(results, *evmCD.CallResult) + } + + return results, err +} + +// ccqAddLatestBlock adds the latest block to the timestamp cache. The cache handles rollbacks. func (w *Watcher) ccqAddLatestBlock(ev *connectors.NewBlock) { if w.ccqTimestampCache != nil { w.ccqTimestampCache.AddLatest(w.ccqLogger, ev.Time, ev.Number.Uint64())