CCQ/Node/EVM: Refactor (#3542)

* CCQ/Node/EVM: Refactor

* Code review rework
This commit is contained in:
bruce-riley 2023-11-28 08:51:30 -06:00 committed by GitHub
parent 21a1129049
commit 36a82af4f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 259 additions and 328 deletions

View File

@ -251,14 +251,14 @@ func (ccq *ccqP2p) publisher(ctx context.Context, gk *ecdsa.PrivateKey, queryRes
ccqP2pMessagesSent.Inc()
if err != nil {
ccq.logger.Error("failed to publish query response",
zap.String("requestID", msg.RequestID()),
zap.String("requestSignature", msg.Signature()),
zap.Any("query_response", msg),
zap.Any("signature", sig),
zap.Error(err),
)
} else {
ccq.logger.Info("published signed query response", //TODO: Change to Debug
zap.String("requestID", msg.RequestID()),
zap.String("requestSignature", msg.Signature()),
zap.Any("query_response", msg),
zap.Any("signature", sig),
)

View File

@ -402,7 +402,7 @@ func parseAllowedRequesters(ccqAllowedRequesters string) (map[ethCommon.Address]
var nullAddr ethCommon.Address
result := make(map[ethCommon.Address]struct{})
for _, str := range strings.Split(ccqAllowedRequesters, ",") {
addr := ethCommon.BytesToAddress(ethCommon.Hex2Bytes(str))
addr := ethCommon.BytesToAddress(ethCommon.Hex2Bytes(strings.TrimPrefix(str, "0x")))
if addr == nullAddr {
return nil, fmt.Errorf("invalid value in `--ccqAllowedRequesters`: `%s`", str)
}

View File

@ -121,6 +121,10 @@ type PerChainQueryInternal struct {
Request *PerChainQueryRequest
}
func (pcqi *PerChainQueryInternal) ID() string {
return fmt.Sprintf("%s:%d", pcqi.RequestID, pcqi.RequestIdx)
}
// QueryRequestDigest returns the query signing prefix based on the environment.
func QueryRequestDigest(env common.Environment, b []byte) ethCommon.Hash {
var queryRequestPrefix []byte

View File

@ -266,7 +266,7 @@ func (left *QueryResponsePublication) Equal(right *QueryResponsePublication) boo
return true
}
func (resp *QueryResponsePublication) RequestID() string {
func (resp *QueryResponsePublication) Signature() string {
if resp == nil || resp.Request == nil {
return "nil"
}

View File

@ -3,6 +3,7 @@ package evm
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"strings"
"time"
@ -18,17 +19,18 @@ import (
"github.com/certusone/wormhole/node/pkg/query"
)
// ccqSendQueryResponseForError sends an error response back to the query handler.
func (w *Watcher) ccqSendQueryResponseForError(req *query.PerChainQueryInternal, status query.QueryStatus) {
queryResponse := query.CreatePerChainQueryResponseInternal(req.RequestID, req.RequestIdx, req.Request.ChainId, status, nil)
// ccqSendQueryResponse sends a response back to the query handler. In the case of an error, the response parameter may be nil.
func (w *Watcher) ccqSendQueryResponse(req *query.PerChainQueryInternal, status query.QueryStatus, response query.ChainSpecificResponse) {
queryResponse := query.CreatePerChainQueryResponseInternal(req.RequestID, req.RequestIdx, req.Request.ChainId, status, response)
select {
case w.queryResponseC <- queryResponse:
w.ccqLogger.Debug("published query response error to handler", zap.String("component", "ccqevm"))
w.ccqLogger.Debug("published query response error to handler")
default:
w.ccqLogger.Error("failed to published query response error to handler", zap.String("component", "ccqevm"))
w.ccqLogger.Error("failed to published query response error to handler")
}
}
// ccqHandleQuery is the top-level query handler. It breaks out the requests based on the type and calls the appropriate handler.
func (w *Watcher) ccqHandleQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal) {
// This can't happen unless there is a programming error - the caller
@ -50,40 +52,55 @@ func (w *Watcher) ccqHandleQuery(ctx context.Context, queryRequest *query.PerCha
w.ccqLogger.Warn("received unsupported request type",
zap.Uint8("payload", uint8(queryRequest.Request.Query.Type())),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError)
w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
}
query.TotalWatcherTime.WithLabelValues(w.chainID.String()).Observe(float64(time.Since(start).Milliseconds()))
}
// EvmCallData contains the details of a single query in the batch.
// EvmCallData contains the details of a single call in the batch.
type EvmCallData struct {
to eth_common.Address
data string
callTransactionArg map[string]interface{}
callResult *eth_hexutil.Bytes
To eth_common.Address
Data string
CallResult *eth_hexutil.Bytes
// These are lowercase so they don't get marshaled for logging purposes. JSON doesn't print anything meaningful for them anyway.
callErr error
callTransactionArg map[string]interface{}
}
func (ecd EvmCallData) String() string {
bytes, err := json.Marshal(ecd)
if err != nil {
bytes = []byte("invalid json")
}
return string(bytes)
}
// ccqHandleEthCallQueryRequest is the query handler for an eth_call request.
func (w *Watcher) ccqHandleEthCallQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.EthCallQueryRequest) {
requestId := "eth_call:" + queryRequest.ID()
block := req.BlockId
w.ccqLogger.Info("received eth_call query request",
zap.String("requestId", requestId),
zap.String("block", block),
zap.Int("numRequests", len(req.CallData)),
)
// Create the block query args.
blockMethod, callBlockArg, err := ccqCreateBlockRequest(block)
if err != nil {
w.ccqLogger.Error("invalid block id in eth_call query request",
zap.Error(err),
zap.String("requestId", requestId),
zap.String("block", block),
zap.Error(err),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError)
w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
return
}
// We build two slices. The first is the batch submitted to the RPC call. It contains one entry for each query plus one to query the block.
// The second is the data associated with each request (but not the block request). The index into both is the index into the request call data.
// Create the batch of requested calls for the specified block.
batch, evmCallData := ccqBuildBatchFromCallData(req, callBlockArg)
// Add the block query to the batch.
@ -100,148 +117,111 @@ func (w *Watcher) ccqHandleEthCallQueryRequest(ctx context.Context, queryRequest
})
// Query the RPC.
start := time.Now()
timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err = w.ethConn.RawBatchCallContext(timeout, batch)
if err != nil {
w.ccqLogger.Error("failed to process eth_call query request",
zap.String("requestId", requestId),
zap.String("block", block),
zap.Any("batch", batch),
zap.Error(err),
zap.String("block", block),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
if blockError != nil {
w.ccqLogger.Error("failed to process eth_call query block request",
zap.Error(blockError),
// Verify that the block read was successful.
if err := w.ccqVerifyBlockResult(blockError, blockResult); err != nil {
w.ccqLogger.Error("failed to verify block for eth_call query",
zap.String("requestId", requestId),
zap.String("block", block),
zap.Any("batch", batch),
zap.Error(err),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
if blockResult.Number == nil {
w.ccqLogger.Error("invalid eth_call query block result",
zap.String("eth_network", w.networkName),
w.ccqLogger.Info("query complete for eth_call",
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("blockNumber", blockResult.Number.String()),
zap.String("blockHash", blockResult.Hash.Hex()),
zap.String("blockTime", blockResult.Time.String()),
zap.Int64("duration", time.Since(start).Milliseconds()),
)
// Verify all the call results and build the batch of results.
results, err := w.ccqVerifyAndExtractQueryResults(requestId, evmCallData)
if err != nil {
w.ccqLogger.Error("failed to process eth_call query call request",
zap.String("requestId", requestId),
zap.String("block", block),
zap.Any("batch", batch),
zap.Error(err),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
return
}
if blockResult.Number.ToInt().Cmp(w.ccqMaxBlockNumber) > 0 {
w.ccqLogger.Error("block number too large for eth_call",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
// Finally, build the response and publish it.
resp := query.EthCallQueryResponse{
BlockNumber: blockResult.Number.ToInt().Uint64(),
Hash: blockResult.Hash,
Time: time.Unix(int64(blockResult.Time), 0),
Results: [][]byte{},
Results: results,
}
errFound := false
for idx := range req.CallData {
if evmCallData[idx].callErr != nil {
w.ccqLogger.Error("failed to process eth_call query call request",
zap.Error(evmCallData[idx].callErr),
zap.String("block", block),
zap.Int("errorIdx", idx),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
errFound = true
break
}
// Nil or Empty results are not valid
// eth_call will return empty when the state doesn't exist for a block
if len(*evmCallData[idx].callResult) == 0 {
w.ccqLogger.Error("invalid call result for eth_call",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.Int("errorIdx", idx),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
errFound = true
break
}
w.ccqLogger.Info("query result for eth_call",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.String("blockNumber", blockResult.Number.String()),
zap.String("blockHash", blockResult.Hash.Hex()),
zap.String("blockTime", blockResult.Time.String()),
zap.Int("idx", idx),
zap.String("to", evmCallData[idx].to.Hex()),
zap.Any("data", evmCallData[idx].data),
zap.String("result", evmCallData[idx].callResult.String()),
)
resp.Results = append(resp.Results, *evmCallData[idx].callResult)
}
if !errFound {
queryResponse := query.CreatePerChainQueryResponseInternal(queryRequest.RequestID, queryRequest.RequestIdx, queryRequest.Request.ChainId, query.QuerySuccess, &resp)
select {
case w.queryResponseC <- queryResponse:
w.ccqLogger.Debug("published query response error to handler", zap.String("component", "ccqevm"))
default:
w.ccqLogger.Error("failed to published query response error to handler", zap.String("component", "ccqevm"))
}
}
w.ccqSendQueryResponse(queryRequest, query.QuerySuccess, &resp)
}
// ccqHandleEthCallByTimestampQueryRequest is the query handler for an eth_call_by_timestamp request.
func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.EthCallByTimestampQueryRequest) {
requestId := "eth_call_by_timestamp:" + queryRequest.ID()
block := req.TargetBlockIdHint
nextBlock := req.FollowingBlockIdHint
w.ccqLogger.Info("received eth_call_by_timestamp query request",
zap.String("requestId", requestId),
zap.Uint64("timestamp", req.TargetTimestamp),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Int("numRequests", len(req.CallData)),
)
// Verify that the two block hints are consistent, either both set, or both unset.
if (block == "") != (nextBlock == "") {
w.ccqLogger.Error("invalid block id hints in eth_call_by_timestamp query request, if one is unset they both must be unset",
w.ccqLogger.Error("invalid block id hints in eth_call_by_timestamp query request, both should be either set or unset",
zap.String("requestId", requestId),
zap.Uint64("timestamp", req.TargetTimestamp),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError)
w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
return
}
// Look up the blocks based on the timestamp if necessary.
if block == "" {
if w.ccqTimestampCache == nil {
w.ccqLogger.Error("error in block id hints in eth_call_by_timestamp query request, they are unset and chain does not support timestamp caching")
w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError)
w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
return
}
// Look the timestamp up in the cache. Note that the cache uses native EVM time, which is seconds, but CCQ uses milliseconds, so we have to convert.
// Look the timestamp up in the cache. Note that the cache uses native EVM time, which is seconds, but CCQ uses microseconds, so we have to convert.
blockNum, nextBlockNum, found := w.ccqTimestampCache.LookUp(req.TargetTimestamp / 1000000)
if !found {
w.ccqLogger.Error("block look up failed in eth_call_by_timestamp query request, timestamp not in cache, will retry",
zap.String("requestId", requestId),
zap.Uint64("timestamp", req.TargetTimestamp),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Uint64("blockNum", blockNum),
zap.Uint64("nextBlockNum", nextBlockNum),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
@ -249,6 +229,7 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q
nextBlock = fmt.Sprintf("0x%x", nextBlockNum)
w.ccqLogger.Info("cache look up in eth_call_by_timestamp query request mapped timestamp to blocks",
zap.String("requestId", requestId),
zap.Uint64("timestamp", req.TargetTimestamp),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
@ -257,30 +238,32 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q
)
}
// Create the query args for both blocks.
blockMethod, callBlockArg, err := ccqCreateBlockRequest(block)
if err != nil {
w.ccqLogger.Error("invalid target block id hint in eth_call_by_timestamp query request",
zap.Error(err),
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Error(err),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError)
w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
return
}
nextBlockMethod, _, err := ccqCreateBlockRequest(nextBlock)
if err != nil {
w.ccqLogger.Error("invalid following block id hint in eth_call_by_timestamp query request",
zap.Error(err),
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Error(err),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError)
w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
return
}
// We build two slices. The first is the batch submitted to the RPC call. It contains one entry for each query plus one to query the block and one for the next block.
// The second is the data associated with each request (but not the block requests). The index into both is the index into the request call data.
// Create the batch of requested calls for the specified block.
batch, evmCallData := ccqBuildBatchFromCallData(req, callBlockArg)
// Add the block query to the batch.
@ -310,86 +293,45 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q
})
// Query the RPC.
start := time.Now()
timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err = w.ethConn.RawBatchCallContext(timeout, batch)
if err != nil {
w.ccqLogger.Error("failed to process eth_call_by_timestamp query request",
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Any("batch", batch),
zap.Error(err),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
// Checks on the target block.
if blockError != nil {
// Verify the target block read was successful.
if err := w.ccqVerifyBlockResult(blockError, blockResult); err != nil {
w.ccqLogger.Error("failed to process eth_call_by_timestamp query target block request",
zap.Error(blockError),
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Any("batch", batch),
zap.Error(err),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
if blockResult.Number == nil {
w.ccqLogger.Error("invalid eth_call_by_timestamp query target block result",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
return
}
if blockResult.Number.ToInt().Cmp(w.ccqMaxBlockNumber) > 0 {
w.ccqLogger.Error("target block number too large for eth_call_by_timestamp",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
return
}
// Checks on the following block.
if nextBlockError != nil {
// Verify the following block read was successful.
if err := w.ccqVerifyBlockResult(nextBlockError, nextBlockResult); err != nil {
w.ccqLogger.Error("failed to process eth_call_by_timestamp query following block request",
zap.Error(nextBlockError),
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Any("batch", batch),
zap.Error(err),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
return
}
if nextBlockResult.Number == nil {
w.ccqLogger.Error("invalid eth_call_by_timestamp query following block result",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
return
}
if nextBlockResult.Number.ToInt().Cmp(w.ccqMaxBlockNumber) > 0 {
w.ccqLogger.Error("following block number too large for eth_call_by_timestamp",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
@ -407,8 +349,8 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q
followingTimestamp := uint64(nextBlockResult.Time * 1000000)
if targetBlockNum+1 != followingBlockNum {
w.ccqLogger.Error(" eth_call_by_timestamp query blocks are not adjacent",
zap.String("eth_network", w.networkName),
w.ccqLogger.Error("eth_call_by_timestamp query blocks are not adjacent",
zap.String("requestId", requestId),
zap.Uint64("desiredTimestamp", req.TargetTimestamp),
zap.Uint64("targetTimestamp", targetTimestamp),
zap.Uint64("followingTimestamp", followingTimestamp),
@ -419,13 +361,13 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q
zap.String("targetBlockTime", blockResult.Time.String()),
zap.String("followingBlockTime", nextBlockResult.Time.String()),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError)
w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
return
}
if req.TargetTimestamp < targetTimestamp || req.TargetTimestamp >= followingTimestamp {
w.ccqLogger.Error(" eth_call_by_timestamp desired timestamp falls outside of block range",
zap.String("eth_network", w.networkName),
w.ccqLogger.Error("eth_call_by_timestamp desired timestamp falls outside of block range",
zap.String("requestId", requestId),
zap.Uint64("desiredTimestamp", req.TargetTimestamp),
zap.Uint64("targetTimestamp", targetTimestamp),
zap.Uint64("followingTimestamp", followingTimestamp),
@ -436,10 +378,39 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q
zap.String("targetBlockTime", blockResult.Time.String()),
zap.String("followingBlockTime", nextBlockResult.Time.String()),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError)
w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
return
}
w.ccqLogger.Info("query complete for eth_call_by_timestamp",
zap.String("requestId", requestId),
zap.Uint64("desiredTimestamp", req.TargetTimestamp),
zap.Uint64("targetTimestamp", targetTimestamp),
zap.Uint64("followingTimestamp", followingTimestamp),
zap.String("targetBlockNumber", blockResult.Number.String()),
zap.String("followingBlockNumber", nextBlockResult.Number.String()),
zap.String("targetBlockHash", blockResult.Hash.Hex()),
zap.String("followingBlockHash", nextBlockResult.Hash.Hex()),
zap.String("targetBlockTime", blockResult.Time.String()),
zap.String("followingBlockTime", nextBlockResult.Time.String()),
zap.Int64("duration", time.Since(start).Milliseconds()),
)
// Verify all the call results and build the batch of results.
results, err := w.ccqVerifyAndExtractQueryResults(requestId, evmCallData)
if err != nil {
w.ccqLogger.Error("failed to process eth_call_by_timestamp query call request",
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Any("batch", batch),
zap.Error(err),
)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
// Finally, build the response and publish it.
resp := query.EthCallByTimestampQueryResponse{
TargetBlockNumber: targetBlockNum,
TargetBlockHash: blockResult.Hash,
@ -447,97 +418,48 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q
FollowingBlockNumber: followingBlockNum,
FollowingBlockHash: nextBlockResult.Hash,
FollowingBlockTime: time.Unix(int64(nextBlockResult.Time), 0),
Results: [][]byte{},
Results: results,
}
errFound := false
for idx := range req.CallData {
if evmCallData[idx].callErr != nil {
w.ccqLogger.Error("failed to process eth_call_by_timestamp query call request",
zap.Error(evmCallData[idx].callErr),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Int("errorIdx", idx),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
errFound = true
break
}
// Nil or Empty results are not valid
// eth_call will return empty when the state doesn't exist for a block
if len(*evmCallData[idx].callResult) == 0 {
w.ccqLogger.Error("invalid call result for eth_call_by_timestamp",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Int("errorIdx", idx),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
errFound = true
break
}
w.ccqLogger.Info(" eth_call_by_timestamp query result",
zap.String("eth_network", w.networkName),
zap.Uint64("desiredTimestamp", req.TargetTimestamp),
zap.Uint64("targetTimestamp", targetTimestamp),
zap.Uint64("followingTimestamp", followingTimestamp),
zap.String("targetBlockNumber", blockResult.Number.String()),
zap.String("followingBlockNumber", nextBlockResult.Number.String()),
zap.String("targetBlockHash", blockResult.Hash.Hex()),
zap.String("followingBlockHash", nextBlockResult.Hash.Hex()),
zap.String("targetBlockTime", blockResult.Time.String()),
zap.String("followingBlockTime", nextBlockResult.Time.String()),
zap.Int("idx", idx),
zap.String("to", evmCallData[idx].to.Hex()),
zap.Any("data", evmCallData[idx].data),
zap.String("result", evmCallData[idx].callResult.String()),
)
resp.Results = append(resp.Results, *evmCallData[idx].callResult)
}
if !errFound {
queryResponse := query.CreatePerChainQueryResponseInternal(queryRequest.RequestID, queryRequest.RequestIdx, queryRequest.Request.ChainId, query.QuerySuccess, &resp)
select {
case w.queryResponseC <- queryResponse:
w.ccqLogger.Debug("published query response error to handler", zap.String("component", "ccqevm"))
default:
w.ccqLogger.Error("failed to published query response error to handler", zap.String("component", "ccqevm"))
}
}
w.ccqSendQueryResponse(queryRequest, query.QuerySuccess, &resp)
}
// ccqHandleEthCallWithFinalityQueryRequest is the query handler for an eth_call_with_finality request.
func (w *Watcher) ccqHandleEthCallWithFinalityQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.EthCallWithFinalityQueryRequest) {
requestId := "eth_call:" + queryRequest.ID()
block := req.BlockId
w.ccqLogger.Info("received eth_call_with_finality query request",
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("finality", req.Finality),
zap.Int("numRequests", len(req.CallData)),
)
// Validate the requested finality.
safeMode := req.Finality == "safe"
if req.Finality != "finalized" && !safeMode {
w.ccqLogger.Error("invalid finality in eth_call_with_finality query request", zap.String("block", block), zap.String("finality", req.Finality), zap.String("block", block))
w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError)
w.ccqLogger.Error("invalid finality in eth_call_with_finality query request",
zap.String("requestId", requestId),
zap.String("block", block),
zap.String("finality", req.Finality),
)
w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
return
}
// Create the block query args.
blockMethod, callBlockArg, err := ccqCreateBlockRequest(block)
if err != nil {
w.ccqLogger.Error("invalid block id in eth_call_with_finality query request",
zap.Error(err),
zap.String("requestId", requestId),
zap.String("block", block),
zap.Error(err),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryFatalError)
w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
return
}
// We build two slices. The first is the batch submitted to the RPC call. It contains one entry for each query plus one to query the block.
// The second is the data associated with each request (but not the block request). The index into both is the index into the request call data.
// Create the batch of requested calls for the specified block.
batch, evmCallData := ccqBuildBatchFromCallData(req, callBlockArg)
// Add the block query to the batch.
@ -554,51 +476,34 @@ func (w *Watcher) ccqHandleEthCallWithFinalityQueryRequest(ctx context.Context,
})
// Query the RPC.
start := time.Now()
timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err = w.ethConn.RawBatchCallContext(timeout, batch)
if err != nil {
w.ccqLogger.Error("failed to process eth_call_with_finality query request",
zap.String("requestId", requestId),
zap.String("block", block),
zap.Any("batch", batch),
zap.Error(err),
zap.String("block", block),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
if blockError != nil {
// Verify that the block read was successful.
if err := w.ccqVerifyBlockResult(blockError, blockResult); err != nil {
w.ccqLogger.Error("failed to process eth_call_with_finality query block request",
zap.Error(blockError),
zap.String("requestId", requestId),
zap.String("block", block),
zap.Any("batch", batch),
zap.Error(err),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
if blockResult.Number == nil {
w.ccqLogger.Error("invalid eth_call_with_finality query block result",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
return
}
if blockResult.Number.ToInt().Cmp(w.ccqMaxBlockNumber) > 0 {
w.ccqLogger.Error("block number too large for eth_call_with_finality",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
return
}
blockNumber := blockResult.Number.ToInt().Uint64()
// Get the latest block number based on the requested finality.
var latestBlockNum uint64
if safeMode {
latestBlockNum = w.getLatestSafeBlockNumber()
@ -606,80 +511,56 @@ func (w *Watcher) ccqHandleEthCallWithFinalityQueryRequest(ctx context.Context,
latestBlockNum = w.GetLatestFinalizedBlockNumber()
}
// Make sure the block has reached requested finality.
blockNumber := blockResult.Number.ToInt().Uint64()
if blockNumber > latestBlockNum {
w.ccqLogger.Info("requested block for eth_call_with_finality has not yet reached the requested finality",
zap.String("finality", req.Finality),
zap.Uint64("requestedBlockNumber", blockNumber),
zap.Uint64("latestBlockNumber", latestBlockNum),
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
return
}
resp := query.EthCallWithFinalityQueryResponse{
BlockNumber: blockNumber,
Hash: blockResult.Hash,
Time: time.Unix(int64(blockResult.Time), 0),
Results: [][]byte{},
}
errFound := false
for idx := range req.CallData {
if evmCallData[idx].callErr != nil {
w.ccqLogger.Error("failed to process eth_call_with_finality query call request",
zap.Error(evmCallData[idx].callErr),
zap.String("block", block),
zap.Int("errorIdx", idx),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
errFound = true
break
}
// Nil or Empty results are not valid
// eth_call_with_finality will return empty when the state doesn't exist for a block
if len(*evmCallData[idx].callResult) == 0 {
w.ccqLogger.Error("invalid call result for eth_call_with_finality",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.Int("errorIdx", idx),
zap.Any("batch", batch),
)
w.ccqSendQueryResponseForError(queryRequest, query.QueryRetryNeeded)
errFound = true
break
}
w.ccqLogger.Info("query result for eth_call_with_finality",
zap.String("eth_network", w.networkName),
zap.String("block", block),
zap.String("requestId", requestId),
zap.String("finality", req.Finality),
zap.Uint64("requestedBlockNumber", blockNumber),
zap.Uint64("latestBlockNumber", latestBlockNum),
zap.String("blockHash", blockResult.Hash.Hex()),
zap.String("blockTime", blockResult.Time.String()),
zap.Int("idx", idx),
zap.String("to", evmCallData[idx].to.Hex()),
zap.Any("data", evmCallData[idx].data),
zap.String("result", evmCallData[idx].callResult.String()),
)
resp.Results = append(resp.Results, *evmCallData[idx].callResult)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
if !errFound {
queryResponse := query.CreatePerChainQueryResponseInternal(queryRequest.RequestID, queryRequest.RequestIdx, queryRequest.Request.ChainId, query.QuerySuccess, &resp)
select {
case w.queryResponseC <- queryResponse:
w.ccqLogger.Debug("published query response error to handler", zap.String("component", "ccqevm"))
default:
w.ccqLogger.Error("failed to published query response error to handler", zap.String("component", "ccqevm"))
}
w.ccqLogger.Info("query complete for eth_call_with_finality",
zap.String("requestId", requestId),
zap.String("finality", req.Finality),
zap.Uint64("requestedBlockNumber", blockNumber),
zap.Uint64("latestBlockNumber", latestBlockNum),
zap.String("blockHash", blockResult.Hash.Hex()),
zap.String("blockTime", blockResult.Time.String()),
zap.Int64("duration", time.Since(start).Milliseconds()),
)
// Verify all the call results and build the batch of results.
results, err := w.ccqVerifyAndExtractQueryResults(requestId, evmCallData)
if err != nil {
w.ccqLogger.Error("failed to process eth_call_with_finality query call request",
zap.String("requestId", requestId),
zap.String("finality", req.Finality),
zap.Uint64("requestedBlockNumber", blockNumber),
zap.Uint64("latestBlockNumber", latestBlockNum),
zap.String("blockHash", blockResult.Hash.Hex()),
zap.String("blockTime", blockResult.Time.String()),
zap.Error(err),
)
w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
return
}
// Finally, build the response and publish it.
resp := query.EthCallWithFinalityQueryResponse{
BlockNumber: blockNumber,
Hash: blockResult.Hash,
Time: time.Unix(int64(blockResult.Time), 0),
Results: results,
}
w.ccqSendQueryResponse(queryRequest, query.QuerySuccess, &resp)
}
// ccqCreateBlockRequest creates a block query. It parses the block string, allowing for both a block number or a block hash. Note that for now, strings like "latest", "finalized" or "safe"
@ -732,6 +613,8 @@ type EthCallDataIntf interface {
CallDataList() []*query.EthCallData
}
// ccqBuildBatchFromCallData builds two slices. The first is the batch submitted to the RPC call. It contains one entry for each query plus one to query the block.
// The second is the data associated with each request (but not the block request). The index into both is the index into the request call data.
func ccqBuildBatchFromCallData(req EthCallDataIntf, callBlockArg interface{}) ([]rpc.BatchElem, []EvmCallData) {
batch := []rpc.BatchElem{}
evmCallData := []EvmCallData{}
@ -741,13 +624,13 @@ func ccqBuildBatchFromCallData(req EthCallDataIntf, callBlockArg interface{}) ([
to := eth_common.BytesToAddress(callData.To)
data := eth_hexutil.Encode(callData.Data)
ecd := EvmCallData{
to: to,
data: data,
To: to,
Data: data,
callTransactionArg: map[string]interface{}{
"to": to,
"data": data,
},
callResult: &eth_hexutil.Bytes{},
CallResult: &eth_hexutil.Bytes{},
}
evmCallData = append(evmCallData, ecd)
@ -757,7 +640,7 @@ func ccqBuildBatchFromCallData(req EthCallDataIntf, callBlockArg interface{}) ([
ecd.callTransactionArg,
callBlockArg,
},
Result: ecd.callResult,
Result: ecd.CallResult,
Error: ecd.callErr,
})
}
@ -765,6 +648,50 @@ func ccqBuildBatchFromCallData(req EthCallDataIntf, callBlockArg interface{}) ([
return batch, evmCallData
}
// ccqVerifyBlockResult does basic verification on the results of the block query.
func (w *Watcher) ccqVerifyBlockResult(blockError error, blockResult connectors.BlockMarshaller) error { //nolint:unparam
if blockError != nil {
return blockError
}
if blockResult.Number == nil {
return fmt.Errorf("block result is nil")
}
if blockResult.Number.ToInt().Cmp(w.ccqMaxBlockNumber) > 0 {
return fmt.Errorf("block number is too large")
}
return nil
}
// ccqVerifyAndExtractQueryResults verifies the array of call results and returns a vector of those results to be published.
func (w *Watcher) ccqVerifyAndExtractQueryResults(requestId string, evmCallData []EvmCallData) ([][]byte, error) {
var err error
results := [][]byte{}
for idx, evmCD := range evmCallData {
if evmCD.callErr != nil {
return nil, fmt.Errorf("call %d failed: %w", idx, evmCD.callErr)
}
// Nil or Empty results are not valid eth_call will return empty when the state doesn't exist for a block
if len(*evmCD.CallResult) == 0 {
return nil, fmt.Errorf("call %d failed: result is empty", idx)
}
w.ccqLogger.Info("query call data result",
zap.String("requestId", requestId),
zap.Int("idx", idx),
zap.Stringer("callData", evmCD),
)
results = append(results, *evmCD.CallResult)
}
return results, err
}
// ccqAddLatestBlock adds the latest block to the timestamp cache. The cache handles rollbacks.
func (w *Watcher) ccqAddLatestBlock(ev *connectors.NewBlock) {
if w.ccqTimestampCache != nil {
w.ccqTimestampCache.AddLatest(w.ccqLogger, ev.Time, ev.Number.Uint64())