diff --git a/node/pkg/watchers/evm/connectors/poller.go b/node/pkg/watchers/evm/connectors/poller.go index 9130fb317..4f6ae4c5e 100644 --- a/node/pkg/watchers/evm/connectors/poller.go +++ b/node/pkg/watchers/evm/connectors/poller.go @@ -42,16 +42,20 @@ func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finaliz publishSafeBlocks: publishSafeBlocks, finalizer: finalizer, } - err := supervisor.Run(ctx, "blockPoller", connector.run) + err := supervisor.Run(ctx, "blockPoller", connector.runFromSupervisor) if err != nil { return nil, err } return connector, nil } -func (b *BlockPollConnector) run(ctx context.Context) error { +func (b *BlockPollConnector) runFromSupervisor(ctx context.Context) error { logger := supervisor.Logger(ctx).With(zap.String("eth_network", b.Connector.NetworkName())) + supervisor.Signal(ctx, supervisor.SignalHealthy) + return b.run(ctx, logger) +} +func (b *BlockPollConnector) run(ctx context.Context, logger *zap.Logger) error { lastBlock, err := b.getBlock(ctx, logger, nil, false) if err != nil { return err @@ -66,7 +70,6 @@ func (b *BlockPollConnector) run(ctx context.Context) error { } timer := time.NewTimer(time.Millisecond) // Start immediately. - supervisor.Signal(ctx, supervisor.SignalHealthy) for { select { diff --git a/node/pkg/watchers/evm/connectors/poller_test.go b/node/pkg/watchers/evm/connectors/poller_test.go new file mode 100644 index 000000000..5e24433af --- /dev/null +++ b/node/pkg/watchers/evm/connectors/poller_test.go @@ -0,0 +1,350 @@ +package connectors + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.uber.org/zap" + + ethAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi" + + ethereum "github.com/ethereum/go-ethereum" + ethCommon "github.com/ethereum/go-ethereum/common" + ethTypes "github.com/ethereum/go-ethereum/core/types" + ethClient "github.com/ethereum/go-ethereum/ethclient" + ethEvent "github.com/ethereum/go-ethereum/event" +) + +// mockConnectorForPoller implements the connector interface for testing purposes. +type mockConnectorForPoller struct { + address ethCommon.Address + client *ethClient.Client + mutex sync.Mutex + err error + persistentError bool + blockNumber uint64 +} + +// setError takes an error which will be returned on the next RPC call. The error will persist until cleared. +func (m *mockConnectorForPoller) setError(err error) { + m.mutex.Lock() + m.err = err + m.persistentError = true + m.mutex.Unlock() +} + +// setSingleError takes an error which will be returned on the next RPC call. After that, the error is reset to nil. +func (m *mockConnectorForPoller) setSingleError(err error) { + m.mutex.Lock() + m.err = err + m.persistentError = false + m.mutex.Unlock() +} + +func (e *mockConnectorForPoller) NetworkName() string { + return "mockConnectorForPoller" +} + +func (e *mockConnectorForPoller) ContractAddress() ethCommon.Address { + return e.address +} + +func (e *mockConnectorForPoller) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { + return 0, fmt.Errorf("not implemented") +} + +func (e *mockConnectorForPoller) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { + return ethAbi.StructsGuardianSet{}, fmt.Errorf("not implemented") +} + +func (e *mockConnectorForPoller) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { + var s ethEvent.Subscription + return s, fmt.Errorf("not implemented") +} + +func (e *mockConnectorForPoller) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { + return nil, fmt.Errorf("not implemented") +} + +func (e *mockConnectorForPoller) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { + return 0, fmt.Errorf("not implemented") +} + +func (e *mockConnectorForPoller) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { + return nil, fmt.Errorf("not implemented") +} + +func (e *mockConnectorForPoller) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) { + var s ethEvent.Subscription + return s, fmt.Errorf("not implemented") +} + +func (e *mockConnectorForPoller) RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) (err error) { + if method != "eth_getBlockByNumber" { + panic("method not implemented by mockConnectorForPoller") + } + + e.mutex.Lock() + // If they set the error, return that immediately. + if e.err != nil { + err = e.err + if !e.persistentError { + e.err = nil + } + } else { + str := fmt.Sprintf(`{"author":"0x24c275f0719fdaec6356c4eb9f39ecb9c4d37ce1","baseFeePerGas":"0x3b9aca00","difficulty":"0x0","extraData":"0x","gasLimit":"0xe4e1c0","gasUsed":"0x0","hash":"0xfc8b62a31110121c57cfcccfaf2b147cc2c13b6d01bde4737846cefd29f045cf","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x24c275f0719fdaec6356c4eb9f39ecb9c4d37ce1","nonce":"0x0000000000000000","number":"0x%x","parentHash":"0x09d6d33a658b712f41db7fb9f775f94911ae0132123116aa4f8cf3da9f774e89","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x201","stateRoot":"0x0409ed10e03fd49424ae1489c6fbc6ff1897f45d0e214655ebdb8df94eedc3c0","timestamp":"0x6373ec24","totalDifficulty":"0x0","transactions":[],"transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","uncles":[]}`, e.blockNumber) + err = json.Unmarshal([]byte(str), &result) + } + e.mutex.Unlock() + + return +} + +func (e *mockConnectorForPoller) setBlockNumber(blockNumber uint64) { + e.mutex.Lock() + e.blockNumber = blockNumber + e.mutex.Unlock() +} + +func (e *mockConnectorForPoller) expectedHash() ethCommon.Hash { + return ethCommon.HexToHash("0xfc8b62a31110121c57cfcccfaf2b147cc2c13b6d01bde4737846cefd29f045cf") +} + +func (e *mockConnectorForPoller) Client() *ethClient.Client { + return e.client +} + +type mockFinalizerForPoller struct { + mutex sync.Mutex + finalized bool +} + +func newMockFinalizerForPoller(initialState bool) *mockFinalizerForPoller { + return &mockFinalizerForPoller{finalized: initialState} +} + +func (f *mockFinalizerForPoller) setFinalized(finalized bool) { + f.mutex.Lock() + defer f.mutex.Unlock() + f.finalized = finalized +} + +func (f *mockFinalizerForPoller) IsBlockFinalized(ctx context.Context, block *NewBlock) (bool, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + return f.finalized, nil +} + +// TestBlockPoller is one big, ugly test because of all the set up required. +func TestBlockPoller(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + baseConnector := mockConnectorForPoller{} + + finalizer := newMockFinalizerForPoller(true) // Start by assuming blocks are finalized. + assert.NotNil(t, finalizer) + + poller := &BlockPollConnector{ + Connector: &baseConnector, + Delay: 1 * time.Millisecond, + useFinalized: false, + finalizer: finalizer, + } + + // Set the starting block. + baseConnector.setBlockNumber(0x309a0c) + + // The go routines will post results here. + var mutex sync.Mutex + var block *NewBlock + var err error + var pollerStatus int + + // Start the poller running. + go func() { + mutex.Lock() + pollerStatus = 1 + mutex.Unlock() + err := poller.run(ctx, logger) + require.NoError(t, err) + mutex.Lock() + pollerStatus = 2 + mutex.Unlock() + }() + + // Subscribe for events to be processed by our go routine. + headSink := make(chan *NewBlock, 2) + headerSubscription, suberr := poller.SubscribeForBlocks(ctx, headSink) + require.NoError(t, suberr) + + go func() { + for { + select { + case <-ctx.Done(): + return + case thisErr := <-headerSubscription.Err(): + mutex.Lock() + err = thisErr + mutex.Unlock() + case thisBlock := <-headSink: + require.NotNil(t, thisBlock) + mutex.Lock() + block = thisBlock + mutex.Unlock() + } + } + }() + + // First sleep a bit and make sure there were no start up errors. + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + require.NoError(t, err) + assert.Nil(t, block) + mutex.Unlock() + + // Post the first new block and verify we get it. + baseConnector.setBlockNumber(0x309a0d) + + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + require.NoError(t, err) + require.NotNil(t, block) + assert.Equal(t, uint64(0x309a0d), block.Number.Uint64()) + assert.Equal(t, baseConnector.expectedHash(), block.Hash) + block = nil + mutex.Unlock() + + // Sleep some more and verify we don't see any more blocks, since we haven't posted a new one. + baseConnector.setBlockNumber(0x309a0d) + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + require.NoError(t, err) + require.Nil(t, block) + mutex.Unlock() + + // Post the next block and verify we get it. + baseConnector.setBlockNumber(0x309a0e) + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + require.NoError(t, err) + require.NotNil(t, block) + assert.Equal(t, uint64(0x309a0e), block.Number.Uint64()) + assert.Equal(t, baseConnector.expectedHash(), block.Hash) + block = nil + mutex.Unlock() + + // Post the next block but mark it as not finalized, so we shouldn't see it yet. + mutex.Lock() + finalizer.setFinalized(false) + baseConnector.setBlockNumber(0x309a0f) + mutex.Unlock() + + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + require.NoError(t, err) + require.Nil(t, block) + mutex.Unlock() + + // Once it goes finalized we should see it. + mutex.Lock() + finalizer.setFinalized(true) + mutex.Unlock() + + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + require.NoError(t, err) + require.NotNil(t, block) + assert.Equal(t, uint64(0x309a0f), block.Number.Uint64()) + assert.Equal(t, baseConnector.expectedHash(), block.Hash) + block = nil + mutex.Unlock() + + // An RPC error should be returned to us. + err = nil + baseConnector.setError(fmt.Errorf("RPC failed")) + + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + assert.Error(t, err) + assert.Nil(t, block) + baseConnector.setError(nil) + err = nil + mutex.Unlock() + + // Post the next block and verify we get it (so we survived the RPC error). + baseConnector.setBlockNumber(0x309a10) + + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + require.NoError(t, err) + require.NotNil(t, block) + assert.Equal(t, uint64(0x309a10), block.Number.Uint64()) + assert.Equal(t, baseConnector.expectedHash(), block.Hash) + block = nil + mutex.Unlock() + + // Post an old block and we should not hear about it. + baseConnector.setBlockNumber(0x309a0c) + + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + require.NoError(t, err) + require.Nil(t, block) + mutex.Unlock() + + // But we should keep going when we get a new one. + baseConnector.setBlockNumber(0x309a11) + + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + require.NoError(t, err) + require.NotNil(t, block) + assert.Equal(t, uint64(0x309a11), block.Number.Uint64()) + assert.Equal(t, baseConnector.expectedHash(), block.Hash) + block = nil + mutex.Unlock() + + // If there's a gap in the blocks, we should keep going. + baseConnector.setBlockNumber(0x309a13) + + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + require.NoError(t, err) + require.NotNil(t, block) + assert.Equal(t, uint64(0x309a13), block.Number.Uint64()) + assert.Equal(t, baseConnector.expectedHash(), block.Hash) + block = nil + mutex.Unlock() + + // Should retry on a transient error and be able to continue. + baseConnector.setSingleError(fmt.Errorf("RPC failed")) + baseConnector.setBlockNumber(0x309a14) + + time.Sleep(10 * time.Millisecond) + mutex.Lock() + require.Equal(t, 1, pollerStatus) + require.NoError(t, err) + require.NotNil(t, block) + assert.Equal(t, uint64(0x309a14), block.Number.Uint64()) + assert.Equal(t, baseConnector.expectedHash(), block.Hash) + block = nil + mutex.Unlock() +} diff --git a/node/pkg/watchers/evm/finalizers/arbitrum.go b/node/pkg/watchers/evm/finalizers/arbitrum.go index 9cfa37cb0..229c6c9e1 100644 --- a/node/pkg/watchers/evm/finalizers/arbitrum.go +++ b/node/pkg/watchers/evm/finalizers/arbitrum.go @@ -7,8 +7,6 @@ import ( "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" "github.com/certusone/wormhole/node/pkg/watchers/interfaces" - ethClient "github.com/ethereum/go-ethereum/ethclient" - "go.uber.org/zap" ) @@ -17,20 +15,22 @@ import ( type ArbitrumFinalizer struct { logger *zap.Logger - connector connectors.Connector l1Finalizer interfaces.L1Finalizer } -func NewArbitrumFinalizer(logger *zap.Logger, connector connectors.Connector, client *ethClient.Client, l1Finalizer interfaces.L1Finalizer) *ArbitrumFinalizer { +func NewArbitrumFinalizer(logger *zap.Logger, l1Finalizer interfaces.L1Finalizer) *ArbitrumFinalizer { return &ArbitrumFinalizer{ logger: logger, - connector: connector, l1Finalizer: l1Finalizer, } } // IsBlockFinalized compares the number of the L1 block containing the Arbitrum block with the latest finalized block on Ethereum. func (a *ArbitrumFinalizer) IsBlockFinalized(ctx context.Context, block *connectors.NewBlock) (bool, error) { + if block == nil { + return false, fmt.Errorf("block is nil") + } + if block.L1BlockNumber == nil { return false, fmt.Errorf("l1 block number is nil") } diff --git a/node/pkg/watchers/evm/finalizers/arbitrum_test.go b/node/pkg/watchers/evm/finalizers/arbitrum_test.go new file mode 100644 index 000000000..34a9b105f --- /dev/null +++ b/node/pkg/watchers/evm/finalizers/arbitrum_test.go @@ -0,0 +1,131 @@ +package finalizers + +import ( + "context" + "math/big" + "testing" + + "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" + + ethCommon "github.com/ethereum/go-ethereum/common" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.uber.org/zap" +) + +// mockL1Finalizer implements the L1Finalizer interface for testing purposes. +type mockL1Finalizer struct { + LatestFinalizedBlockNumber uint64 +} + +func (m *mockL1Finalizer) GetLatestFinalizedBlockNumber() uint64 { + return m.LatestFinalizedBlockNumber +} + +func TestArbitrumErrorReturnedIfBlockIsNil(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 125} + + finalizer := NewArbitrumFinalizer(logger, &l1Finalizer) + assert.NotNil(t, finalizer) + + _, err := finalizer.IsBlockFinalized(ctx, nil) + require.Error(t, err) +} + +func TestArbitrumErrorReturnedIfL1BlockIsNil(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 125} + + finalizer := NewArbitrumFinalizer(logger, &l1Finalizer) + assert.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + L1BlockNumber: nil, + } + + _, err := finalizer.IsBlockFinalized(ctx, block) + require.Error(t, err) +} + +func TestArbitrumNotFinalizedIfNoFinalizedL1BlockYet(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{} + + finalizer := NewArbitrumFinalizer(logger, &l1Finalizer) + assert.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + L1BlockNumber: big.NewInt(225), + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, false, finalized) +} + +func TestArbitrumNotFinalizedWhenFinalizedL1IsLessThanTargetL1(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 225} + + finalizer := NewArbitrumFinalizer(logger, &l1Finalizer) + assert.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(127), + Hash: ethCommon.Hash{}, + L1BlockNumber: big.NewInt(226), + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, false, finalized) +} + +func TestArbitrumIsFinalizedWhenFinalizedL1IsEqualsTargetL1(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 225} + + finalizer := NewArbitrumFinalizer(logger, &l1Finalizer) + assert.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + L1BlockNumber: big.NewInt(225), + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, true, finalized) +} + +func TestArbitrumIsFinalizedWhenFinalizedL1IsGreaterThanTargetL1(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 227} + + finalizer := NewArbitrumFinalizer(logger, &l1Finalizer) + assert.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + L1BlockNumber: big.NewInt(225), + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, true, finalized) +} diff --git a/node/pkg/watchers/evm/finalizers/moonbeam.go b/node/pkg/watchers/evm/finalizers/moonbeam.go index f5c41b70e..9c4356044 100644 --- a/node/pkg/watchers/evm/finalizers/moonbeam.go +++ b/node/pkg/watchers/evm/finalizers/moonbeam.go @@ -2,6 +2,7 @@ package finalizers import ( "context" + "fmt" "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" @@ -25,6 +26,10 @@ func NewMoonbeamFinalizer(logger *zap.Logger, connector connectors.Connector) *M } func (m *MoonbeamFinalizer) IsBlockFinalized(ctx context.Context, block *connectors.NewBlock) (bool, error) { + if block == nil { + return false, fmt.Errorf("block is nil") + } + var finalized bool err := m.connector.RawCallContext(ctx, &finalized, "moon_isBlockFinalized", block.Hash.Hex()) if err != nil { diff --git a/node/pkg/watchers/evm/finalizers/moonbeam_test.go b/node/pkg/watchers/evm/finalizers/moonbeam_test.go new file mode 100644 index 000000000..91814a675 --- /dev/null +++ b/node/pkg/watchers/evm/finalizers/moonbeam_test.go @@ -0,0 +1,132 @@ +package finalizers + +import ( + "context" + "encoding/json" + "fmt" + "math/big" + "testing" + + "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" + + ethAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi" + + ethereum "github.com/ethereum/go-ethereum" + ethCommon "github.com/ethereum/go-ethereum/common" + ethTypes "github.com/ethereum/go-ethereum/core/types" + ethEvent "github.com/ethereum/go-ethereum/event" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.uber.org/zap" +) + +type moonbeamMockConnector struct { + isFinalized string + err error +} + +func (e *moonbeamMockConnector) RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) (err error) { + if method != "moon_isBlockFinalized" { + panic("method not implemented by moonbeamMockConnector") + } + + err = json.Unmarshal([]byte(e.isFinalized), &result) + return +} + +func (e *moonbeamMockConnector) NetworkName() string { + return "moonbeamMockConnector" +} + +func (e *moonbeamMockConnector) ContractAddress() ethCommon.Address { + panic("not implemented by moonbeamMockConnector") +} + +func (e *moonbeamMockConnector) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { + panic("not implemented by moonbeamMockConnector") +} + +func (e *moonbeamMockConnector) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { + panic("not implemented by moonbeamMockConnector") +} + +func (e *moonbeamMockConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { + panic("not implemented by moonbeamMockConnector") +} + +func (e *moonbeamMockConnector) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { + panic("not implemented by moonbeamMockConnector") +} + +func (e *moonbeamMockConnector) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { + panic("not implemented by moonbeamMockConnector") +} + +func (e *moonbeamMockConnector) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { + panic("not implemented by moonbeamMockConnector") +} + +func (e *moonbeamMockConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *connectors.NewBlock) (ethereum.Subscription, error) { + panic("not implemented by moonbeamMockConnector") +} + +func TestMoonbeamErrorReturnedIfBlockIsNil(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + baseConnector := moonbeamMockConnector{isFinalized: "true", err: nil} + + finalizer := NewMoonbeamFinalizer(logger, &baseConnector) + assert.NotNil(t, finalizer) + + _, err := finalizer.IsBlockFinalized(ctx, nil) + require.Error(t, err) +} + +func TestMoonbeamBlockNotFinalized(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + baseConnector := moonbeamMockConnector{isFinalized: "false", err: nil} + + finalizer := NewMoonbeamFinalizer(logger, &baseConnector) + assert.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.HexToHash("0x1076cd8c207f31e1638b37bb358c458f216f5451f06e2ccb4eb9db66ad669f30"), + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, false, finalized) +} +func TestMoonbeamBlockIsFinalized(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + baseConnector := moonbeamMockConnector{isFinalized: "true", err: nil} + + finalizer := NewMoonbeamFinalizer(logger, &baseConnector) + assert.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.HexToHash("0x1076cd8c207f31e1638b37bb358c458f216f5451f06e2ccb4eb9db66ad669f30"), + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, true, finalized) +} + +func TestMoonbeamRpcError(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + baseConnector := moonbeamMockConnector{isFinalized: "true", err: fmt.Errorf("RPC failed")} + + finalizer := NewMoonbeamFinalizer(logger, &baseConnector) + assert.NotNil(t, finalizer) + + _, err := finalizer.IsBlockFinalized(ctx, nil) + require.Error(t, err) +} diff --git a/node/pkg/watchers/evm/finalizers/neon.go b/node/pkg/watchers/evm/finalizers/neon.go index 3ffa54343..3e9d72256 100644 --- a/node/pkg/watchers/evm/finalizers/neon.go +++ b/node/pkg/watchers/evm/finalizers/neon.go @@ -2,12 +2,11 @@ package finalizers import ( "context" + "fmt" "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" "github.com/certusone/wormhole/node/pkg/watchers/interfaces" - ethClient "github.com/ethereum/go-ethereum/ethclient" - "go.uber.org/zap" ) @@ -16,20 +15,22 @@ import ( // Neon team on 11/12/2022. Also confirmed that they do not have a websocket interface so we need to poll for log events. type NeonFinalizer struct { logger *zap.Logger - connector connectors.Connector l1Finalizer interfaces.L1Finalizer } -func NewNeonFinalizer(logger *zap.Logger, connector connectors.Connector, client *ethClient.Client, l1Finalizer interfaces.L1Finalizer) *NeonFinalizer { +func NewNeonFinalizer(logger *zap.Logger, l1Finalizer interfaces.L1Finalizer) *NeonFinalizer { return &NeonFinalizer{ logger: logger, - connector: connector, l1Finalizer: l1Finalizer, } } // IsBlockFinalized compares the number of the Neon block with the latest finalized block on Solana. func (f *NeonFinalizer) IsBlockFinalized(ctx context.Context, block *connectors.NewBlock) (bool, error) { + if block == nil { + return false, fmt.Errorf("block is nil") + } + latestL1Block := f.l1Finalizer.GetLatestFinalizedBlockNumber() if latestL1Block == 0 { // This happens on start up. diff --git a/node/pkg/watchers/evm/finalizers/neon_test.go b/node/pkg/watchers/evm/finalizers/neon_test.go new file mode 100644 index 000000000..cb5a0cab0 --- /dev/null +++ b/node/pkg/watchers/evm/finalizers/neon_test.go @@ -0,0 +1,104 @@ +package finalizers + +import ( + "context" + "math/big" + "testing" + + "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" + + ethCommon "github.com/ethereum/go-ethereum/common" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.uber.org/zap" +) + +func TestNeonErrorReturnedIfBlockIsNil(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 125} + + finalizer := NewNeonFinalizer(logger, &l1Finalizer) + assert.NotNil(t, finalizer) + + _, err := finalizer.IsBlockFinalized(ctx, nil) + require.Error(t, err) +} + +func TestNeonNotFinalizedIfNoFinalizedL1BlockYet(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{} + + finalizer := NewNeonFinalizer(logger, &l1Finalizer) + assert.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + L1BlockNumber: nil, + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, false, finalized) +} + +func TestNeonNotFinalizedWhenL1IsLessThanL2(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 125} + + finalizer := NewNeonFinalizer(logger, &l1Finalizer) + assert.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(127), + Hash: ethCommon.Hash{}, + L1BlockNumber: nil, + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, false, finalized) +} + +func TestNeonIsFinalizedWhenL1EqualsL2(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 125} + + finalizer := NewNeonFinalizer(logger, &l1Finalizer) + assert.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + L1BlockNumber: nil, + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, true, finalized) +} + +func TestNeonIsFinalizedWhenL1GreaterThanL2(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 127} + + finalizer := NewNeonFinalizer(logger, &l1Finalizer) + assert.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + L1BlockNumber: nil, + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, true, finalized) +} diff --git a/node/pkg/watchers/evm/finalizers/optimism.go b/node/pkg/watchers/evm/finalizers/optimism.go index 34e1f7ec9..cb3884023 100644 --- a/node/pkg/watchers/evm/finalizers/optimism.go +++ b/node/pkg/watchers/evm/finalizers/optimism.go @@ -32,7 +32,6 @@ import ( type OptimismFinalizer struct { logger *zap.Logger - connector connectors.Connector l1Finalizer interfaces.L1Finalizer latestFinalizedL2Block *big.Int @@ -44,13 +43,17 @@ type OptimismFinalizer struct { ctcClient *ethClient.Client // This is used to grab the rollup information from the ctc contract - ctcCaller *ctcAbi.OptimismCtcAbiCaller + ctcCaller ctcCallerIntf +} + +type ctcCallerIntf interface { + GetTotalElements(opts *ethBind.CallOpts) (*big.Int, error) + GetLastBlockNumber(opts *ethBind.CallOpts) (*big.Int, error) } func NewOptimismFinalizer( ctx context.Context, logger *zap.Logger, - connector connectors.Connector, l1Finalizer interfaces.L1Finalizer, ctcChainUrl string, ctcChainAddress string, @@ -72,7 +75,6 @@ func NewOptimismFinalizer( finalizer := &OptimismFinalizer{ logger: logger, - connector: connector, l1Finalizer: l1Finalizer, latestFinalizedL2Block: big.NewInt(0), finalizerMapping: make([]RollupInfo, 0), @@ -113,6 +115,10 @@ func (f *OptimismFinalizer) GetRollupInfo(ctx context.Context) (RollupInfo, erro } func (f *OptimismFinalizer) IsBlockFinalized(ctx context.Context, block *connectors.NewBlock) (bool, error) { + if block == nil { + return false, fmt.Errorf("block is nil") + } + finalizedL1Block := f.l1Finalizer.GetLatestFinalizedBlockNumber() // Uint64 if finalizedL1Block == 0 { // This happens on start up. diff --git a/node/pkg/watchers/evm/finalizers/optimism_test.go b/node/pkg/watchers/evm/finalizers/optimism_test.go new file mode 100644 index 000000000..109955c5a --- /dev/null +++ b/node/pkg/watchers/evm/finalizers/optimism_test.go @@ -0,0 +1,276 @@ +package finalizers + +import ( + "context" + "fmt" + "math/big" + "sync" + "testing" + "time" + + "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" + "github.com/certusone/wormhole/node/pkg/watchers/interfaces" + + ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind" + ethCommon "github.com/ethereum/go-ethereum/common" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.uber.org/zap" +) + +type ( + mockCtcCaller struct { + mutex sync.Mutex + totalElements []*big.Int + lastBlockNumbers []*big.Int + totalElementsErr error + lastBlockNumbersErr error + } +) + +// SetTotalElements takes an array of big int pointers that represent the L2 block numbers to be returned by GetTotalElements() +func (m *mockCtcCaller) SetTotalElements(totalElements []*big.Int) { + m.mutex.Lock() + m.totalElements = totalElements + m.mutex.Unlock() +} + +// SetLastBlockNumber takes an array of big int pointers that represent the L1 block numbers to be returned by GetLastBlockNumber() +func (m *mockCtcCaller) SetLastBlockNumbers(lastBlockNumbers []*big.Int) { + m.mutex.Lock() + m.lastBlockNumbers = lastBlockNumbers + m.mutex.Unlock() +} + +// SetTotalElementsError takes an error (or nil) which will be returned on the next call to GetTotalElements. The error will persist until cleared. +func (m *mockCtcCaller) SetTotalElementsError(err error) { + m.mutex.Lock() + m.totalElementsErr = err + m.mutex.Unlock() +} + +// SetLastBlockNumber takes an error (or nil) which will be returned on the next call to GetLastBlockNumber. The error will persist until cleared. +func (m *mockCtcCaller) SetLastBlockNumberError(err error) { + m.mutex.Lock() + m.lastBlockNumbersErr = err + m.mutex.Unlock() +} + +func (m *mockCtcCaller) GetTotalElements(opts *ethBind.CallOpts) (result *big.Int, err error) { + m.totalElements, result, err = m.getResult(m.totalElements, m.totalElementsErr) + return +} + +func (m *mockCtcCaller) GetLastBlockNumber(opts *ethBind.CallOpts) (result *big.Int, err error) { + m.lastBlockNumbers, result, err = m.getResult(m.lastBlockNumbers, m.lastBlockNumbersErr) + return +} + +func (m *mockCtcCaller) getResult(resultsIn []*big.Int, errIn error) (resultsOut []*big.Int, result *big.Int, err error) { + for { + m.mutex.Lock() + // If they set the error, return that immediately. + if errIn != nil { + err = errIn + break + } + + // If there are pending results, return the first one. + if len(resultsIn) != 0 { + result = resultsIn[0] + resultsOut = resultsIn[1:] + break + } + + // If we don't have any results, sleep and try again. + m.mutex.Unlock() + time.Sleep(1 * time.Millisecond) + } + + m.mutex.Unlock() + return +} + +func NewOptimismFinalizerForTest( + ctx context.Context, + logger *zap.Logger, + l1Finalizer interfaces.L1Finalizer, + ctcCaller ctcCallerIntf, +) *OptimismFinalizer { + finalizer := &OptimismFinalizer{ + logger: logger, + l1Finalizer: l1Finalizer, + latestFinalizedL2Block: big.NewInt(0), + finalizerMapping: make([]RollupInfo, 0), + ctcCaller: ctcCaller, + } + + return finalizer +} + +func TestOptimismErrorReturnedIfBlockIsNil(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 125} + ctcCaller := &mockCtcCaller{} + + finalizer := NewOptimismFinalizerForTest(ctx, logger, &l1Finalizer, ctcCaller) + require.NotNil(t, finalizer) + + _, err := finalizer.IsBlockFinalized(ctx, nil) + assert.Error(t, err) +} + +func TestOptimismNotFinalizedIfNoFinalizedL1BlockYet(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{} + ctcCaller := mockCtcCaller{} + + finalizer := NewOptimismFinalizerForTest(ctx, logger, &l1Finalizer, &ctcCaller) + require.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, false, finalized) +} + +func TestOptimismNotFinalizedWhenFinalizedL1IsLessThanTargetL1(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 7954401} + ctcCaller := mockCtcCaller{} + + finalizer := NewOptimismFinalizerForTest(ctx, logger, &l1Finalizer, &ctcCaller) + require.NotNil(t, finalizer) + + ctcCaller.SetLastBlockNumbers([]*big.Int{big.NewInt(7954402)}) + ctcCaller.SetTotalElements([]*big.Int{big.NewInt(127)}) + + block := &connectors.NewBlock{ + Number: big.NewInt(127), + Hash: ethCommon.Hash{}, + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, false, finalized) +} + +func TestOptimismNotFinalizedWhenFinalizedL1IsEqualsTargetL1(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 7954402} + ctcCaller := mockCtcCaller{} + + finalizer := NewOptimismFinalizerForTest(ctx, logger, &l1Finalizer, &ctcCaller) + require.NotNil(t, finalizer) + + ctcCaller.SetLastBlockNumbers([]*big.Int{big.NewInt(7954402)}) + ctcCaller.SetTotalElements([]*big.Int{big.NewInt(125)}) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + } + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, true, finalized) +} + +func TestOptimismIsFinalizedWhenFinalizedL1IsGreaterThanTargetL1(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 7954403} + ctcCaller := mockCtcCaller{} + + finalizer := NewOptimismFinalizerForTest(ctx, logger, &l1Finalizer, &ctcCaller) + require.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + } + + ctcCaller.SetLastBlockNumbers([]*big.Int{big.NewInt(7954402)}) + ctcCaller.SetTotalElements([]*big.Int{big.NewInt(125)}) + + finalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + assert.Equal(t, true, finalized) +} + +func TestOptimismL2BlockNumberMustNotGoBackwards(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 7954400} + ctcCaller := mockCtcCaller{} + + finalizer := NewOptimismFinalizerForTest(ctx, logger, &l1Finalizer, &ctcCaller) + require.NotNil(t, finalizer) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + } + + ctcCaller.SetLastBlockNumbers([]*big.Int{big.NewInt(7954402), big.NewInt(7954403)}) + ctcCaller.SetTotalElements([]*big.Int{big.NewInt(124), big.NewInt(123)}) + + isFinalized, err := finalizer.IsBlockFinalized(ctx, block) + require.NoError(t, err) + require.Equal(t, false, isFinalized) + + _, err = finalizer.IsBlockFinalized(ctx, block) + require.Error(t, err) +} + +func TestOptimismGetTotalElementsRpcError(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 125} + ctcCaller := mockCtcCaller{} + + finalizer := NewOptimismFinalizerForTest(ctx, logger, &l1Finalizer, &ctcCaller) + require.NotNil(t, finalizer) + + ctcCaller.SetTotalElementsError(fmt.Errorf("RPC failed")) + ctcCaller.SetLastBlockNumbers([]*big.Int{big.NewInt(7954402)}) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + } + + _, err := finalizer.IsBlockFinalized(ctx, block) + require.Error(t, err) +} + +func TestOptimismGetLastBlockNumberRpcError(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + l1Finalizer := mockL1Finalizer{LatestFinalizedBlockNumber: 125} + ctcCaller := mockCtcCaller{} + + finalizer := NewOptimismFinalizerForTest(ctx, logger, &l1Finalizer, &ctcCaller) + require.NotNil(t, finalizer) + + ctcCaller.SetLastBlockNumberError(fmt.Errorf("RPC failed")) + ctcCaller.SetTotalElements([]*big.Int{big.NewInt(125)}) + + block := &connectors.NewBlock{ + Number: big.NewInt(125), + Hash: ethCommon.Hash{}, + } + + _, err := finalizer.IsBlockFinalized(ctx, block) + require.Error(t, err) +} diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index c5e0528b4..1402f4241 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -242,7 +242,7 @@ func (w *Watcher) Run(ctx context.Context) error { p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("dialing eth client failed: %w", err) } - finalizer := finalizers.NewNeonFinalizer(logger, baseConnector, baseConnector.Client(), w.l1Finalizer) + finalizer := finalizers.NewNeonFinalizer(logger, w.l1Finalizer) pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false, false) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() @@ -265,7 +265,7 @@ func (w *Watcher) Run(ctx context.Context) error { p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("dialing eth client failed: %w", err) } - finalizer := finalizers.NewArbitrumFinalizer(logger, baseConnector, baseConnector.Client(), w.l1Finalizer) + finalizer := finalizers.NewArbitrumFinalizer(logger, w.l1Finalizer) pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false, false) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() @@ -288,7 +288,7 @@ func (w *Watcher) Run(ctx context.Context) error { p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("dialing eth client failed: %w", err) } - finalizer, err := finalizers.NewOptimismFinalizer(timeout, logger, baseConnector, w.l1Finalizer, w.rootChainRpc, w.rootChainContract) + finalizer, err := finalizers.NewOptimismFinalizer(timeout, logger, w.l1Finalizer, w.rootChainRpc, w.rootChainContract) if err != nil { p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("creating optimism finalizer failed: %w", err)