CCQ/Node/EVM: Cache timestamps (#3491)

* NODE/EVM: Cache timestamps

* Code review rework

* Code review rework
This commit is contained in:
bruce-riley 2023-11-08 16:21:30 -06:00 committed by GitHub
parent 0a89e23589
commit 3ba2a46671
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 641 additions and 17 deletions

View File

@ -78,6 +78,10 @@ func (c mockEVMConnector) Client() *ethclient.Client {
panic("unimplemented")
}
func (c mockEVMConnector) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
panic("unimplemented")
}
func generateGS(num int) (keys []*ecdsa.PrivateKey, addrs []common.Address) {
for i := 0; i < num; i++ {
key, err := ethcrypto.GenerateKey()

View File

@ -420,3 +420,14 @@ func (pq *pendingQuery) numPendingRequests() int {
return numPending
}
func SupportsTimestampCaching(chainID vaa.ChainID) bool {
/*
- P1: Ethereum, Base, Optimism
- P1.5: Arbitrum, Polygon, Avalanche
- P2: BNB Chain, Moonbeam
- P3: Acala, Celo, Fantom, Karura, Klaytn, Oasis
*/
return chainID == vaa.ChainIDEthereum || chainID == vaa.ChainIDBase || chainID == vaa.ChainIDOptimism
}

View File

@ -631,19 +631,16 @@ func (ecd *EthCallByTimestampQueryRequest) Validate() error {
if len(ecd.TargetBlockIdHint) > math.MaxUint32 {
return fmt.Errorf("target block id hint too long")
}
if ecd.TargetBlockIdHint == "" {
return fmt.Errorf("target block id is required")
if (ecd.TargetBlockIdHint == "") != (ecd.FollowingBlockIdHint == "") {
return fmt.Errorf("if either the target or following block id is unset, they both must be unset")
}
if !strings.HasPrefix(ecd.TargetBlockIdHint, "0x") {
if ecd.TargetBlockIdHint != "" && !strings.HasPrefix(ecd.TargetBlockIdHint, "0x") {
return fmt.Errorf("target block id must be a hex number or hash starting with 0x")
}
if len(ecd.FollowingBlockIdHint) > math.MaxUint32 {
return fmt.Errorf("following block id hint too long")
}
if ecd.FollowingBlockIdHint == "" {
return fmt.Errorf("following block id is required")
}
if !strings.HasPrefix(ecd.FollowingBlockIdHint, "0x") {
if ecd.FollowingBlockIdHint != "" && !strings.HasPrefix(ecd.FollowingBlockIdHint, "0x") {
return fmt.Errorf("following block id must be a hex number or hash starting with 0x")
}
if len(ecd.CallData) <= 0 {

View File

@ -0,0 +1,161 @@
package evm
import (
"sort"
"sync"
"go.uber.org/zap"
)
const (
BTS_MAX_BLOCKS = 10000
)
type (
BlocksByTimestamp struct {
// cache is ordered by timestamp, blockNum. There may be multiple entries for the same timestamp, but not the same block.
cache Blocks
// maxCacheSize is used to trim the cache.
maxCacheSize int
// mutex is used to protect the cache.
mutex sync.Mutex
}
Blocks []Block
Block struct {
Timestamp uint64
BlockNum uint64
}
)
// NewBlocksByTimestamp creates an empty cache of blocks by timestamp.
func NewBlocksByTimestamp(maxCacheSize int) *BlocksByTimestamp {
return &BlocksByTimestamp{
cache: Blocks{},
maxCacheSize: maxCacheSize,
}
}
// AddLatest adds a block to the end of the cache. This is meant to be used in the normal scenario when a new latest block is received. If the specified
// timestamp or block number is less than the latest in the cache (most likely a rollback), the cache will be truncated and the new value inserted.
func (bts *BlocksByTimestamp) AddLatest(logger *zap.Logger, timestamp uint64, blockNum uint64) {
bts.mutex.Lock()
defer bts.mutex.Unlock()
l := len(bts.cache)
if l > 0 && (blockNum <= bts.cache[l-1].BlockNum || timestamp < bts.cache[l-1].Timestamp) {
// The cache is in order of both timestamp and block number. Search backwards until we find the entry where the block number is less than the one
// passed in and the timestamp is less than or equal to the one passed in. We then truncate everything after that before adding the new one.
idx := l - 1
for ; idx >= 0; idx-- {
if bts.cache[idx].BlockNum < blockNum && bts.cache[idx].Timestamp <= timestamp {
break
}
}
logger.Warn("rollback detected in timestamp cache",
zap.Uint64("oldLatestBlockNum", bts.cache[l-1].BlockNum),
zap.Uint64("oldLatestTimestamp", bts.cache[l-1].Timestamp),
zap.Uint64("newLatestBlockNum", blockNum),
zap.Uint64("newLatestTimestamp", timestamp),
)
bts.cache = bts.cache[:idx+1]
}
bts.cache = append(bts.cache, Block{Timestamp: timestamp, BlockNum: blockNum})
if len(bts.cache) > bts.maxCacheSize {
bts.cache = bts.cache[1:]
}
logger.Debug("cache updated", zap.Int("len", len(bts.cache)), zap.Uint64("lastTimestamp", timestamp), zap.Uint64("lastBlockNum", blockNum))
}
// AddBatch adds a batch of blocks to the cache. This is meant to be used for backfilling the cache. It makes sure there are no duplicate blocks and regenerates the cache in the correct order by timestamp.
func (bts *BlocksByTimestamp) AddBatch(blocks Blocks) {
bts.mutex.Lock()
defer bts.mutex.Unlock()
// First build a map of all the existing blocks so we can avoid duplicates.
blockMap := make(map[uint64]uint64)
for _, block := range bts.cache {
blockMap[block.BlockNum] = block.Timestamp
}
// Now add the new blocks to the map, overwriting any duplicates. (Maybe there was a reorg. . .)
for _, block := range blocks {
blockMap[block.BlockNum] = block.Timestamp
}
// Now put everything into the cache in random order.
cache := Blocks{}
for blockNum, timestamp := range blockMap {
cache = append(cache, Block{Timestamp: timestamp, BlockNum: blockNum})
}
// Sort the cache into timestamp order.
sort.SliceStable(cache, func(i, j int) bool {
return cache[i].Cmp(cache[j]) < 0
})
if len(cache) > bts.maxCacheSize {
// Trim the cache.
trimIdx := len(cache) - bts.maxCacheSize
cache = cache[trimIdx:]
}
bts.cache = cache
}
// LookUp searches the cache for the specified timestamp and returns the blocks surrounding that timestamp. It also returns true if the results are complete or false if they are not.
// The following rules apply:
// - If timestamp is less than the first timestamp in the cache, it returns (0, <theFirstBlockInTheCache>, false)
// - If timestamp is greater than or equal to the last timestamp in the cache, it returns (<theLastBlockInTheCache>, 0, false)
// - If timestamp exactly matches one in the cache, it returns (<theLastBlockForThatTimestamp>, <theFirstBlockForTheNextTimestamp>, true)
// - If timestamp is not in the cache, but there are blocks around it, it returns (<theLastBlockForThePreviousTimestamp>, <theFirstBlockForTheNextTimestamp>, false)
func (bts *BlocksByTimestamp) LookUp(timestamp uint64) (uint64, uint64, bool) {
bts.mutex.Lock()
defer bts.mutex.Unlock()
if len(bts.cache) == 0 { // Empty cache.
return 0, 0, false
}
if timestamp < bts.cache[0].Timestamp { // Before the start of the cache.
return 0, bts.cache[0].BlockNum, false
}
if timestamp >= bts.cache[len(bts.cache)-1].Timestamp { // After the end of the cache (including matching the final timestamp).
return bts.cache[len(bts.cache)-1].BlockNum, 0, false
}
// The search returns the first entry where the timestamp is greater than requested.
idx := bts.cache.SearchForTimestamp(timestamp)
// If the two blocks are adjacent, then we found what we are looking for.
found := bts.cache[idx-1].BlockNum+1 == bts.cache[idx].BlockNum
return bts.cache[idx-1].BlockNum, bts.cache[idx].BlockNum, found
}
func (blocks Blocks) SearchForTimestamp(timestamp uint64) int {
return sort.Search(len(blocks), func(i int) bool { return blocks[i].Timestamp > timestamp })
}
// Cmp compares two blocks, returning the usual -1, 0, +1.
func (lhs Block) Cmp(rhs Block) int {
if lhs.Timestamp < rhs.Timestamp {
return -1
}
if lhs.Timestamp > rhs.Timestamp {
return 1
}
if lhs.BlockNum < rhs.BlockNum {
return -1
}
if lhs.BlockNum > rhs.BlockNum {
return 1
}
return 0
}

View File

@ -0,0 +1,310 @@
package evm
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func cacheIsValid(t *testing.T, bts *BlocksByTimestamp) bool {
t.Helper()
prevBlock := Block{}
for idx := range bts.cache {
// fmt.Println("Compare: prev: ", prevBlock, ", this: ", bts.cache[idx], ": ", prevBlock.Cmp(bts.cache[idx]))
if prevBlock.Cmp(bts.cache[idx]) != -1 {
return false
}
prevBlock = bts.cache[idx]
}
return true
}
func TestBlocksByTimestamp_TestCacheIsValid(t *testing.T) {
bts := NewBlocksByTimestamp(BTS_MAX_BLOCKS)
// Empty cache is valid.
assert.True(t, cacheIsValid(t, bts))
bts.cache = append(bts.cache, Block{Timestamp: 1698621628, BlockNum: 420}) // 0
bts.cache = append(bts.cache, Block{Timestamp: 1698621629, BlockNum: 430}) // 1
bts.cache = append(bts.cache, Block{Timestamp: 1698621629, BlockNum: 440}) // 2
bts.cache = append(bts.cache, Block{Timestamp: 1698621631, BlockNum: 450}) // 3
bts.cache = append(bts.cache, Block{Timestamp: 1698621632, BlockNum: 460}) // 4
// Make sure a valid cache is valid.
assert.True(t, cacheIsValid(t, bts))
// Timestamps match but duplicate block should fail.
bts.cache[2] = Block{Timestamp: 1698621629, BlockNum: 430}
assert.False(t, cacheIsValid(t, bts))
// Restore things.
bts.cache[2] = Block{Timestamp: 1698621629, BlockNum: 440}
assert.True(t, cacheIsValid(t, bts))
// Timestamps match but block out of order should fail.
bts.cache[2] = Block{Timestamp: 1698621629, BlockNum: 425}
assert.False(t, cacheIsValid(t, bts))
// Restore things.
bts.cache[2] = Block{Timestamp: 1698621629, BlockNum: 440}
assert.True(t, cacheIsValid(t, bts))
// Timestamps out of order should fail.
bts.cache[2] = Block{Timestamp: 1698621620, BlockNum: 440}
assert.False(t, cacheIsValid(t, bts))
}
func TestBlocksByTimestamp_AddLatest(t *testing.T) {
logger := zap.NewNop()
bts := NewBlocksByTimestamp(BTS_MAX_BLOCKS)
bts.AddLatest(logger, 1698621628, 420)
bts.AddLatest(logger, 1698621628, 421)
bts.AddLatest(logger, 1698621628, 422)
bts.AddLatest(logger, 1698621629, 423)
bts.AddLatest(logger, 1698621630, 424)
bts.AddLatest(logger, 1698621630, 425)
require.Equal(t, 6, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
// Timestamp going back should trim by timestamp.
bts.AddLatest(logger, 1698621629, 427)
require.Equal(t, 5, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
assert.Equal(t, uint64(427), bts.cache[4].BlockNum)
assert.Equal(t, uint64(1698621629), bts.cache[4].Timestamp)
// Block number only going back should trim by block number only.
bts.AddLatest(logger, 1698621629, 426)
require.Equal(t, 5, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
assert.Equal(t, uint64(426), bts.cache[4].BlockNum)
assert.Equal(t, uint64(1698621629), bts.cache[4].Timestamp)
}
func TestBlocksByTimestamp_AddLatestRollbackEverything(t *testing.T) {
logger := zap.NewNop()
bts := NewBlocksByTimestamp(BTS_MAX_BLOCKS)
bts.AddLatest(logger, 1698621628, 420)
require.Equal(t, 1, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
// Rollback before only block in cache, but not before the timestamp.
bts.AddLatest(logger, 1698621628, 419)
require.Equal(t, 1, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
assert.Equal(t, uint64(419), bts.cache[0].BlockNum)
assert.Equal(t, uint64(1698621628), bts.cache[0].Timestamp)
// Rollback before only timestamp and block in cache.
bts.AddLatest(logger, 1698621627, 418)
require.Equal(t, 1, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
assert.Equal(t, uint64(418), bts.cache[0].BlockNum)
assert.Equal(t, uint64(1698621627), bts.cache[0].Timestamp)
// Add two more blocks at the end, giving a total of three entries.
bts.AddLatest(logger, 1698621628, 419)
bts.AddLatest(logger, 1698621629, 420)
require.Equal(t, 3, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
// Rollback before first block in cache.
bts.AddLatest(logger, 1698621627, 417)
require.Equal(t, 1, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
assert.Equal(t, uint64(417), bts.cache[0].BlockNum)
assert.Equal(t, uint64(1698621627), bts.cache[0].Timestamp)
// Add two more blocks at the end, giving a total of three entries.
bts.AddLatest(logger, 1698621628, 418)
bts.AddLatest(logger, 1698621629, 419)
require.Equal(t, 3, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
// Rollback before first timestamp and block in cache.
bts.AddLatest(logger, 1698621626, 416)
require.Equal(t, 1, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
assert.Equal(t, uint64(416), bts.cache[0].BlockNum)
assert.Equal(t, uint64(1698621626), bts.cache[0].Timestamp)
}
func TestBlocksByTimestamp_AddLatestShouldTrimTheCache(t *testing.T) {
logger := zap.NewNop()
bts := NewBlocksByTimestamp(5)
bts.AddLatest(logger, 1698621628, 420)
bts.AddLatest(logger, 1698621628, 421)
bts.AddLatest(logger, 1698621628, 422)
bts.AddLatest(logger, 1698621628, 423)
bts.AddLatest(logger, 1698621629, 424)
require.Equal(t, 5, len(bts.cache), 5)
require.True(t, cacheIsValid(t, bts))
bts.AddLatest(logger, 1698621629, 425)
assert.Equal(t, 5, len(bts.cache))
assert.True(t, cacheIsValid(t, bts))
assert.Equal(t, uint64(421), bts.cache[0].BlockNum)
assert.Equal(t, uint64(422), bts.cache[1].BlockNum)
assert.Equal(t, uint64(423), bts.cache[2].BlockNum)
assert.Equal(t, uint64(424), bts.cache[3].BlockNum)
assert.Equal(t, uint64(425), bts.cache[4].BlockNum)
}
func TestBlocksByTimestamp_AddBatch(t *testing.T) {
logger := zap.NewNop()
bts := NewBlocksByTimestamp(BTS_MAX_BLOCKS)
// First create a cache with some gaps in it.
bts.AddLatest(logger, 1698621628, 420)
bts.AddLatest(logger, 1698621628, 430)
bts.AddLatest(logger, 1698621728, 440)
bts.AddLatest(logger, 1698621729, 450)
bts.AddLatest(logger, 1698621828, 460)
require.Equal(t, 5, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
batch := []Block{
// Add a couple afterwards.
{Timestamp: 1698621928, BlockNum: 470},
{Timestamp: 1698621929, BlockNum: 480},
// Add a few in the middle.
{Timestamp: 1698621630, BlockNum: 431},
{Timestamp: 1698621631, BlockNum: 432},
// Add one at the front.
{Timestamp: 1698621528, BlockNum: 410},
}
bts.AddBatch(batch)
assert.Equal(t, 10, len(bts.cache))
assert.True(t, cacheIsValid(t, bts))
}
func TestBlocksByTimestamp_AddBatchShouldTrim(t *testing.T) {
logger := zap.NewNop()
bts := NewBlocksByTimestamp(8)
// First create a cache with some gaps in it.
bts.AddLatest(logger, 1698621628, 420)
bts.AddLatest(logger, 1698621628, 430)
bts.AddLatest(logger, 1698621728, 440)
bts.AddLatest(logger, 1698621729, 450)
bts.AddLatest(logger, 1698621828, 460)
require.Equal(t, 5, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
batch := []Block{
// Add a couple afterwards.
{Timestamp: 1698621928, BlockNum: 470},
{Timestamp: 1698621929, BlockNum: 480},
// Add a few in the middle.
{Timestamp: 1698621630, BlockNum: 431},
{Timestamp: 1698621631, BlockNum: 432},
// Add one at the front.
{Timestamp: 1698621528, BlockNum: 410},
}
bts.AddBatch(batch)
assert.Equal(t, 8, len(bts.cache))
assert.True(t, cacheIsValid(t, bts))
assert.Equal(t, uint64(430), bts.cache[0].BlockNum)
assert.Equal(t, uint64(431), bts.cache[1].BlockNum)
assert.Equal(t, uint64(432), bts.cache[2].BlockNum)
assert.Equal(t, uint64(440), bts.cache[3].BlockNum)
assert.Equal(t, uint64(450), bts.cache[4].BlockNum)
assert.Equal(t, uint64(460), bts.cache[5].BlockNum)
assert.Equal(t, uint64(470), bts.cache[6].BlockNum)
assert.Equal(t, uint64(480), bts.cache[7].BlockNum)
}
func TestBlocksByTimestamp_SearchForTimestamp(t *testing.T) {
blocks := Blocks{
{Timestamp: 1698621228, BlockNum: 420}, // 0
{Timestamp: 1698621328, BlockNum: 430}, // 1
{Timestamp: 1698621428, BlockNum: 440}, // 2
{Timestamp: 1698621428, BlockNum: 450}, // 3
{Timestamp: 1698621528, BlockNum: 460}, // 4
}
// Returns the first entry where the timestamp is greater than requested.
assert.Equal(t, 0, blocks.SearchForTimestamp(1698621128))
assert.Equal(t, 1, blocks.SearchForTimestamp(1698621228))
assert.Equal(t, 2, blocks.SearchForTimestamp(1698621328))
assert.Equal(t, 4, blocks.SearchForTimestamp(1698621428))
assert.Equal(t, 5, blocks.SearchForTimestamp(1698621528))
assert.Equal(t, 5, blocks.SearchForTimestamp(1698621628))
}
func TestBlocksByTimestamp_LookUp(t *testing.T) {
logger := zap.NewNop()
bts := NewBlocksByTimestamp(BTS_MAX_BLOCKS)
// Empty cache.
prev, next, found := bts.LookUp(1698621627)
assert.False(t, found)
assert.Equal(t, uint64(0), prev)
assert.Equal(t, uint64(0), next)
bts.AddLatest(logger, 1698621528, 420)
bts.AddLatest(logger, 1698621528, 421)
bts.AddLatest(logger, 1698621628, 422)
bts.AddLatest(logger, 1698621728, 423)
bts.AddLatest(logger, 1698621728, 424)
bts.AddLatest(logger, 1698621828, 426)
require.Equal(t, 6, len(bts.cache))
require.True(t, cacheIsValid(t, bts))
// Before the beginning of the cache.
prev, next, found = bts.LookUp(1698621527)
assert.False(t, found)
assert.Equal(t, uint64(0), prev)
assert.Equal(t, uint64(420), next)
// After the end of the cache.
prev, next, found = bts.LookUp(1698621928)
assert.False(t, found)
assert.Equal(t, uint64(426), prev)
assert.Equal(t, uint64(0), next)
// Last timestamp in the cache.
prev, next, found = bts.LookUp(1698621828)
assert.False(t, found)
assert.Equal(t, uint64(426), prev)
assert.Equal(t, uint64(0), next)
// In the cache, one block for the timestamp.
prev, next, found = bts.LookUp(1698621628)
assert.True(t, found)
assert.Equal(t, uint64(422), prev)
assert.Equal(t, uint64(423), next)
// In the cache, multiple blocks for the same timestamp.
prev, next, found = bts.LookUp(1698621528)
assert.True(t, found)
assert.Equal(t, uint64(421), prev)
assert.Equal(t, uint64(422), next)
// Not in the cache, no gap.
prev, next, found = bts.LookUp(1698621600)
assert.True(t, found)
assert.Equal(t, uint64(421), prev)
assert.Equal(t, uint64(422), next)
// Not in the cache, gap.
prev, next, found = bts.LookUp(1698621800)
assert.False(t, found)
assert.Equal(t, uint64(424), prev)
assert.Equal(t, uint64(426), next)
}

View File

@ -211,6 +211,49 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(logger *zap.Logger, ct
zap.Int("numRequests", len(req.CallData)),
)
if (block == "") != (nextBlock == "") {
logger.Error("invalid block id hints in eth_call_by_timestamp query request, if one is unset they both must be unset",
zap.Uint64("timestamp", req.TargetTimestamp),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
)
w.ccqSendQueryResponseForError(logger, queryRequest, query.QueryFatalError)
return
}
if block == "" {
if w.ccqTimestampCache == nil {
logger.Error("error in block id hints in eth_call_by_timestamp query request, they are unset and chain does not support timestamp caching")
w.ccqSendQueryResponseForError(logger, queryRequest, query.QueryFatalError)
return
}
// Look the timestamp up in the cache. Note that the cache uses native EVM time, which is seconds, but CCQ uses milliseconds, so we have to convert.
blockNum, nextBlockNum, found := w.ccqTimestampCache.LookUp(req.TargetTimestamp / 1000000)
if !found {
logger.Error("block look up failed in eth_call_by_timestamp query request, timestamp not in cache, will retry",
zap.Uint64("timestamp", req.TargetTimestamp),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Uint64("blockNum", blockNum),
zap.Uint64("nextBlockNum", nextBlockNum),
)
w.ccqSendQueryResponseForError(logger, queryRequest, query.QueryRetryNeeded)
return
}
block = fmt.Sprintf("0x%x", blockNum)
nextBlock = fmt.Sprintf("0x%x", nextBlockNum)
logger.Info("cache look up in eth_call_by_timestamp query request mapped timestamp to blocks",
zap.Uint64("timestamp", req.TargetTimestamp),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Uint64("blockNum", blockNum),
zap.Uint64("nextBlockNum", nextBlockNum),
)
}
blockMethod, callBlockArg, err := ccqCreateBlockRequest(block)
if err != nil {
logger.Error("invalid target block id hint in eth_call_by_timestamp query request",
@ -718,3 +761,9 @@ func ccqBuildBatchFromCallData(req EthCallDataIntf, callBlockArg interface{}) ([
return batch, evmCallData
}
func (w *Watcher) ccqAddLatestBlock(ev *connectors.NewBlock) {
if w.ccqTimestampCache != nil {
w.ccqTimestampCache.AddLatest(w.ccqLogger, ev.Time, ev.Number.Uint64())
}
}

View File

@ -8,6 +8,7 @@ import (
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/supervisor"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethEvent "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
@ -19,6 +20,7 @@ import (
// BatchPollConnector uses batch requests to poll for latest, safe and finalized blocks.
type BatchPollConnector struct {
Connector
logger *zap.Logger
Delay time.Duration
blockFeed ethEvent.Feed
errFeed ethEvent.Feed
@ -41,16 +43,16 @@ type (
const MAX_GAP_BATCH_SIZE uint64 = 5
func NewBatchPollConnector(ctx context.Context, baseConnector Connector, delay time.Duration) (*BatchPollConnector, error) {
func NewBatchPollConnector(ctx context.Context, logger *zap.Logger, baseConnector Connector, delay time.Duration) (*BatchPollConnector, error) {
// Create the batch data in the order we want to report them to the watcher, so finalized is most important, latest is least.
batchData := []BatchEntry{
{tag: "finalized", finality: Finalized},
{tag: "safe", finality: Safe},
{tag: "latest", finality: Latest},
}
connector := &BatchPollConnector{
Connector: baseConnector,
logger: logger,
Delay: delay,
batchData: batchData,
}
@ -70,6 +72,17 @@ func (b *BatchPollConnector) SubscribeForBlocks(ctx context.Context, errC chan e
innerErrSink := make(chan string, 10)
innerErrSub := b.errFeed.Subscribe(innerErrSink)
// Use the standard geth head sink to get latest blocks. We do this so that we will be notified of rollbacks. The following document
// indicates that the subscription will receive a replay of all blocks affected by a rollback. This is important for latest because the
// timestamp cache needs to be updated on a rollback. We can only consider polling for latest if we can guarantee that we won't miss rollbacks.
// https://ethereum.org/en/developers/tutorials/using-websockets/#subscription-types
headSink := make(chan *ethTypes.Header, 2)
_, err := b.Connector.SubscribeNewHead(ctx, headSink)
if err != nil {
return nil, fmt.Errorf("failed to subscribe for latest blocks: %w", err)
}
// Use the poller for finalized and safe.
common.RunWithScissors(ctx, errC, "block_poll_subscribe_for_blocks", func(ctx context.Context) error {
for {
select {
@ -84,6 +97,21 @@ func (b *BatchPollConnector) SubscribeForBlocks(ctx context.Context, errC chan e
return nil
case v := <-innerErrSink:
sub.err <- fmt.Errorf(v)
case ev := <-headSink:
if ev == nil {
b.logger.Error("new latest header event is nil")
continue
}
if ev.Number == nil {
b.logger.Error("new latest header block number is nil")
continue
}
b.blockFeed.Send(&NewBlock{
Number: ev.Number,
Time: ev.Time,
Hash: ev.Hash(),
Finality: Latest,
})
}
}
})

View File

@ -17,6 +17,7 @@ import (
ethereum "github.com/ethereum/go-ethereum"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethEvent "github.com/ethereum/go-ethereum/event"
@ -153,6 +154,10 @@ func (e *mockConnectorForBatchPoller) Client() *ethClient.Client {
return e.client
}
func (e *mockConnectorForBatchPoller) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
return nil, nil
}
func batchShouldHaveAllThree(t *testing.T, block []*NewBlock, blockNum uint64, expectedHash ethCommon.Hash) {
require.Equal(t, 3, len(block))
assert.Equal(t, uint64(blockNum), block[0].Number.Uint64())

View File

@ -15,6 +15,7 @@ import (
ethereum "github.com/ethereum/go-ethereum"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethEvent "github.com/ethereum/go-ethereum/event"
@ -212,6 +213,10 @@ func (c *CeloConnector) Client() *ethClient.Client {
panic("unimplemented")
}
func (c *CeloConnector) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
panic("unimplemented")
}
func convertCeloEventToEth(ev *celoAbi.AbiLogMessagePublished) *ethAbi.AbiLogMessagePublished {
return &ethAbi.AbiLogMessagePublished{
Sender: ethCommon.BytesToAddress(ev.Sender.Bytes()),

View File

@ -59,6 +59,7 @@ type Connector interface {
RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
RawBatchCallContext(ctx context.Context, b []rpc.BatchElem) error
Client() *ethClient.Client
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
}
type PollSubscription struct {

View File

@ -11,6 +11,7 @@ import (
ethereum "github.com/ethereum/go-ethereum"
ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethEvent "github.com/ethereum/go-ethereum/event"
@ -112,3 +113,7 @@ func (e *EthereumBaseConnector) RawBatchCallContext(ctx context.Context, b []eth
func (e *EthereumBaseConnector) Client() *ethClient.Client {
return e.client
}
func (e *EthereumBaseConnector) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
return e.client.SubscribeNewHead(ctx, ch)
}

View File

@ -17,6 +17,7 @@ import (
ethereum "github.com/ethereum/go-ethereum"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethEvent "github.com/ethereum/go-ethereum/event"
@ -139,6 +140,10 @@ func (e *mockConnectorForPoller) Client() *ethClient.Client {
return e.client
}
func (e *mockConnectorForPoller) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
return nil, nil
}
type mockFinalizerForPoller struct {
mutex sync.Mutex
finalized bool

View File

@ -144,6 +144,8 @@ type (
rootChainContract string
ccqMaxBlockNumber *big.Int
ccqTimestampCache *BlocksByTimestamp
ccqLogger *zap.Logger
}
pendingKey struct {
@ -171,6 +173,10 @@ func NewEthWatcher(
queryResponseC chan<- *query.PerChainQueryResponseInternal,
unsafeDevMode bool,
) *Watcher {
var ccqTimestampCache *BlocksByTimestamp
if query.SupportsTimestampCaching(chainID) {
ccqTimestampCache = NewBlocksByTimestamp(BTS_MAX_BLOCKS)
}
return &Watcher{
url: url,
@ -188,6 +194,7 @@ func NewEthWatcher(
pending: map[pendingKey]*pendingMessage{},
unsafeDevMode: unsafeDevMode,
ccqMaxBlockNumber: big.NewInt(0).SetUint64(math.MaxUint64),
ccqTimestampCache: ccqTimestampCache,
}
}
@ -204,6 +211,8 @@ func (w *Watcher) Run(parentCtx context.Context) error {
zap.Bool("unsafeDevMode", w.unsafeDevMode),
)
w.ccqLogger = logger.With(zap.String("component", "ccqevm"))
// later on we will spawn multiple go-routines through `RunWithScissors`, i.e. catching panics.
// If any of them panic, this function will return, causing this child context to be canceled
// such that the other go-routines can free up resources
@ -232,7 +241,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("dialing eth client failed: %w", err)
}
w.ethConn, err = connectors.NewBatchPollConnector(ctx, baseConnector, 250*time.Millisecond)
w.ethConn, err = connectors.NewBatchPollConnector(ctx, logger, baseConnector, 1000*time.Millisecond)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
@ -616,6 +625,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
currentEthHeight.WithLabelValues(w.networkName).Set(float64(blockNumberU))
stats.Height = int64(blockNumberU)
w.updateNetworkStats(&stats)
w.ccqAddLatestBlock(ev)
continue
}

View File

@ -72,9 +72,10 @@ async function getEthCallByTimestampArgs(): Promise<[bigint, bigint, bigint]> {
let targetBlockTime = BigInt(0);
while (targetBlockNumber === BigInt(0)) {
let followingBlock = await web3.eth.getBlock(followingBlockNumber);
while (Number(followingBlock) <= 0) {
while (Number(followingBlock.number) <= 0) {
await sleep(1000);
followingBlock = await web3.eth.getBlock(followingBlockNumber);
followingBlock = await web3.eth.getBlock(followingBlock.number);
followingBlockNumber = followingBlock.number;
}
const targetBlock = await web3.eth.getBlock(
(Number(followingBlockNumber) - 1).toString()
@ -385,7 +386,7 @@ describe("eth call", () => {
"0100000001010005020000005b0006079bf7fad4800000000930783238643936333000000009307832386439363331020d500b1d8e8ef31e21c99d1db9a6444d3adf12700000000406fdde030d500b1d8e8ef31e21c99d1db9a6444d3adf12700000000418160ddd"
);
});
test("successful eth_call_by_timestamp query", async () => {
test("successful eth_call_by_timestamp query with block hints", async () => {
const nameCallData = createTestEthCallData(WETH_ADDRESS, "name", "string");
const totalSupplyCallData = createTestEthCallData(
WETH_ADDRESS,
@ -417,6 +418,38 @@ describe("eth call", () => {
);
expect(response.status).toBe(200);
});
test("successful eth_call_by_timestamp query without block hints", async () => {
const nameCallData = createTestEthCallData(WETH_ADDRESS, "name", "string");
const totalSupplyCallData = createTestEthCallData(
WETH_ADDRESS,
"totalSupply",
"uint256"
);
const [targetBlockTime, targetBlockNumber, followingBlockNumber] =
await getEthCallByTimestampArgs();
const ethCall = new EthCallByTimestampQueryRequest(
targetBlockTime + BigInt(5000),
"",
"",
[nameCallData, totalSupplyCallData]
);
const chainId = 2;
const ethQuery = new PerChainQueryRequest(chainId, ethCall);
const nonce = 1;
const request = new QueryRequest(nonce, [ethQuery]);
const serialized = request.serialize();
const digest = QueryRequest.digest(ENV, serialized);
const signature = sign(PRIVATE_KEY, digest);
const response = await axios.put(
QUERY_URL,
{
signature,
bytes: Buffer.from(serialized).toString("hex"),
},
{ headers: { "X-API-Key": "my_secret_key" } }
);
expect(response.status).toBe(200);
});
test("eth_call_by_timestamp query without target timestamp", async () => {
const nameCallData = createTestEthCallData(WETH_ADDRESS, "name", "string");
const totalSupplyCallData = createTestEthCallData(
@ -461,7 +494,7 @@ describe("eth call", () => {
});
expect(err).toBe(true);
});
test("eth_call_by_timestamp query without target hint should fail for now", async () => {
test("eth_call_by_timestamp query with following hint but not target hint should fail", async () => {
const nameCallData = createTestEthCallData(WETH_ADDRESS, "name", "string");
const totalSupplyCallData = createTestEthCallData(
WETH_ADDRESS,
@ -501,12 +534,12 @@ describe("eth call", () => {
err = true;
expect(error.response.status).toBe(400);
expect(error.response.data).toBe(
`failed to validate request: failed to validate per chain query 0: chain specific query is invalid: target block id is required\n`
`failed to validate request: failed to validate per chain query 0: chain specific query is invalid: if either the target or following block id is unset, they both must be unset\n`
);
});
expect(err).toBe(true);
});
test("eth_call_by_timestamp query without following hint should fail for now", async () => {
test("eth_call_by_timestamp query with target hint but not following hint should fail", async () => {
const nameCallData = createTestEthCallData(WETH_ADDRESS, "name", "string");
const totalSupplyCallData = createTestEthCallData(
WETH_ADDRESS,
@ -545,7 +578,7 @@ describe("eth call", () => {
err = true;
expect(error.response.status).toBe(400);
expect(error.response.data).toBe(
`failed to validate request: failed to validate per chain query 0: chain specific query is invalid: following block id is required\n`
`failed to validate request: failed to validate per chain query 0: chain specific query is invalid: if either the target or following block id is unset, they both must be unset\n`
);
});
expect(err).toBe(true);