Node: make karura/acala watcher use finalized blocks (#1747)
* Node: acala watcher use finalized blocks * node: acala not in safe mode assume finalized mode * Remove unused function * Changes signature of checkForSafeMode() * Beef up the safe mode check * Remove unnecessary function
This commit is contained in:
parent
a00a4824b2
commit
49b3b6ab61
|
@ -23,22 +23,19 @@ type PollFinalizer interface {
|
||||||
// finalizer which will be used to only return finalized blocks on subscriptions.
|
// finalizer which will be used to only return finalized blocks on subscriptions.
|
||||||
type BlockPollConnector struct {
|
type BlockPollConnector struct {
|
||||||
Connector
|
Connector
|
||||||
Delay time.Duration
|
Delay time.Duration
|
||||||
isEthPoS bool
|
useFinalized bool
|
||||||
hasEthSwitchedToPoS bool
|
finalizer PollFinalizer
|
||||||
finalizer PollFinalizer
|
blockFeed ethEvent.Feed
|
||||||
|
errFeed ethEvent.Feed
|
||||||
blockFeed ethEvent.Feed
|
|
||||||
errFeed ethEvent.Feed
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration, isEthPoS bool) (*BlockPollConnector, error) {
|
func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration, useFinalized bool) (*BlockPollConnector, error) {
|
||||||
connector := &BlockPollConnector{
|
connector := &BlockPollConnector{
|
||||||
Connector: baseConnector,
|
Connector: baseConnector,
|
||||||
Delay: delay,
|
Delay: delay,
|
||||||
isEthPoS: isEthPoS,
|
useFinalized: useFinalized,
|
||||||
hasEthSwitchedToPoS: false,
|
finalizer: finalizer,
|
||||||
finalizer: finalizer,
|
|
||||||
}
|
}
|
||||||
err := supervisor.Run(ctx, "blockPoller", connector.run)
|
err := supervisor.Run(ctx, "blockPoller", connector.run)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -134,10 +131,6 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger,
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BlockPollConnector) SetEthSwitched() {
|
|
||||||
b.hasEthSwitchedToPoS = true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) {
|
func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) {
|
||||||
sub := NewPollSubscription()
|
sub := NewPollSubscription()
|
||||||
blockSub := b.blockFeed.Subscribe(sink)
|
blockSub := b.blockFeed.Subscribe(sink)
|
||||||
|
@ -171,7 +164,7 @@ func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, n
|
||||||
var numStr string
|
var numStr string
|
||||||
if number != nil {
|
if number != nil {
|
||||||
numStr = ethHexUtils.EncodeBig(number)
|
numStr = ethHexUtils.EncodeBig(number)
|
||||||
} else if b.hasEthSwitchedToPoS {
|
} else if b.useFinalized {
|
||||||
numStr = "finalized"
|
numStr = "finalized"
|
||||||
} else {
|
} else {
|
||||||
numStr = "latest"
|
numStr = "latest"
|
||||||
|
@ -196,12 +189,6 @@ func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, n
|
||||||
)
|
)
|
||||||
return nil, fmt.Errorf("failed to unmarshal block: Number is nil")
|
return nil, fmt.Errorf("failed to unmarshal block: Number is nil")
|
||||||
}
|
}
|
||||||
d := big.Int(*m.Difficulty)
|
|
||||||
if b.isEthPoS && !b.hasEthSwitchedToPoS && d.Cmp(big.NewInt(0)) == 0 {
|
|
||||||
logger.Info("switching from latest to finalized", zap.Duration("delay", b.Delay))
|
|
||||||
b.SetEthSwitched()
|
|
||||||
return b.getBlock(ctx, logger, number)
|
|
||||||
}
|
|
||||||
n := big.Int(*m.Number)
|
n := big.Int(*m.Number)
|
||||||
return &NewBlock{
|
return &NewBlock{
|
||||||
Number: &n,
|
Number: &n,
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
eth_common "github.com/ethereum/go-ethereum/common"
|
eth_common "github.com/ethereum/go-ethereum/common"
|
||||||
|
eth_hexutil "github.com/ethereum/go-ethereum/common/hexutil"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/certusone/wormhole/node/pkg/common"
|
"github.com/certusone/wormhole/node/pkg/common"
|
||||||
|
@ -102,9 +103,8 @@ type (
|
||||||
minConfirmations uint64
|
minConfirmations uint64
|
||||||
|
|
||||||
// Interface to the chain specific ethereum library.
|
// Interface to the chain specific ethereum library.
|
||||||
ethConn connectors.Connector
|
ethConn connectors.Connector
|
||||||
shouldCheckSafeMode bool
|
unsafeDevMode bool
|
||||||
unsafeDevMode bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pendingKey struct {
|
pendingKey struct {
|
||||||
|
@ -133,28 +133,33 @@ func NewEthWatcher(
|
||||||
unsafeDevMode bool) *Watcher {
|
unsafeDevMode bool) *Watcher {
|
||||||
|
|
||||||
return &Watcher{
|
return &Watcher{
|
||||||
url: url,
|
url: url,
|
||||||
contract: contract,
|
contract: contract,
|
||||||
networkName: networkName,
|
networkName: networkName,
|
||||||
readiness: readiness,
|
readiness: readiness,
|
||||||
minConfirmations: minConfirmations,
|
minConfirmations: minConfirmations,
|
||||||
chainID: chainID,
|
chainID: chainID,
|
||||||
msgChan: messageEvents,
|
msgChan: messageEvents,
|
||||||
setChan: setEvents,
|
setChan: setEvents,
|
||||||
obsvReqC: obsvReqC,
|
obsvReqC: obsvReqC,
|
||||||
pending: map[pendingKey]*pendingMessage{},
|
pending: map[pendingKey]*pendingMessage{},
|
||||||
shouldCheckSafeMode: (chainID == vaa.ChainIDKarura || chainID == vaa.ChainIDAcala) && (!unsafeDevMode),
|
unsafeDevMode: unsafeDevMode,
|
||||||
unsafeDevMode: unsafeDevMode,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Watcher) Run(ctx context.Context) error {
|
func (w *Watcher) Run(ctx context.Context) error {
|
||||||
logger := supervisor.Logger(ctx)
|
logger := supervisor.Logger(ctx)
|
||||||
|
|
||||||
if w.shouldCheckSafeMode {
|
useFinalizedBlocks := (w.chainID == vaa.ChainIDEthereum && (!w.unsafeDevMode))
|
||||||
if err := w.checkForSafeMode(ctx); err != nil {
|
if (w.chainID == vaa.ChainIDKarura || w.chainID == vaa.ChainIDAcala) && (!w.unsafeDevMode) {
|
||||||
|
ufb, err := w.getAcalaMode(ctx)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ufb {
|
||||||
|
useFinalizedBlocks = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
|
// Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
|
||||||
|
@ -175,7 +180,8 @@ func (w *Watcher) Run(ctx context.Context) error {
|
||||||
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
||||||
return fmt.Errorf("dialing eth client failed: %w", err)
|
return fmt.Errorf("dialing eth client failed: %w", err)
|
||||||
}
|
}
|
||||||
} else if w.chainID == vaa.ChainIDEthereum && !w.unsafeDevMode {
|
} else if useFinalizedBlocks {
|
||||||
|
logger.Info("using finalized blocks")
|
||||||
baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
|
baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
||||||
|
@ -702,24 +708,39 @@ func fetchCurrentGuardianSet(ctx context.Context, ethConn connectors.Connector)
|
||||||
return currentIndex, &gs, nil
|
return currentIndex, &gs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Watcher) checkForSafeMode(ctx context.Context) error {
|
func (w *Watcher) getAcalaMode(ctx context.Context) (useFinalizedBlocks bool, errRet error) {
|
||||||
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
c, err := rpc.DialContext(timeout, w.url)
|
c, err := rpc.DialContext(timeout, w.url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to connect to url %s to check for safe mode: %w", w.url, err)
|
errRet = fmt.Errorf("failed to connect to url %s to check acala mode: %w", w.url, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First check to see if polling for finalized blocks is suported.
|
||||||
|
type Marshaller struct {
|
||||||
|
Number *eth_hexutil.Big
|
||||||
|
}
|
||||||
|
|
||||||
|
var m Marshaller
|
||||||
|
err = c.CallContext(ctx, &m, "eth_getBlockByNumber", "finalized", false)
|
||||||
|
if err == nil {
|
||||||
|
useFinalizedBlocks = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If finalized blocks are not supported, then we had better be in safe mode!
|
||||||
var safe bool
|
var safe bool
|
||||||
err = c.CallContext(ctx, &safe, "net_isSafeMode")
|
err = c.CallContext(ctx, &safe, "net_isSafeMode")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("check for safe mode for url %s failed: %w", w.url, err)
|
errRet = fmt.Errorf("check for safe mode for url %s failed: %w", w.url, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !safe {
|
if !safe {
|
||||||
return fmt.Errorf("url %s is not using safe mode", w.url)
|
errRet = fmt.Errorf("url %s does not support finalized blocks and is not using safe mode", w.url)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue