diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 10579731e..3a0b8ab96 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -3,6 +3,7 @@ package guardiand import ( "context" "encoding/base64" + "encoding/hex" "fmt" "github.com/certusone/wormhole/node/pkg/db" "github.com/certusone/wormhole/node/pkg/notify/discord" @@ -521,7 +522,7 @@ func runNode(cmd *cobra.Command, args []string) { // Inbound signed VAAs signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50) - // Inbound observation requests + // Inbound observation requests from the p2p service (for all chains) obsvReqC := make(chan *gossipv1.ObservationRequest, 50) // Outbound observation requests @@ -533,6 +534,38 @@ func runNode(cmd *cobra.Command, args []string) { // Guardian set state managed by processor gst := common.NewGuardianSetState() + // Per-chain observation requests + chainObsvReqC := make(map[vaa.ChainID]chan *gossipv1.ObservationRequest) + + // Observation request channel for each chain supporting observation requests. + chainObsvReqC[vaa.ChainIDSolana] = make(chan *gossipv1.ObservationRequest) + chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest) + chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest) + chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest) + chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest) + chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest) + if *testnetMode { + chainObsvReqC[vaa.ChainIDEthereumRopsten] = make(chan *gossipv1.ObservationRequest) + } + + // Multiplex observation requests to the appropriate chain + go func() { + for { + select { + case <-rootCtx.Done(): + return + case req := <-obsvReqC: + if channel, ok := chainObsvReqC[vaa.ChainID(req.ChainId)]; ok { + channel <- req + } else { + logger.Error("unknown chain ID for reobservation request", + zap.Uint32("chain_id", req.ChainId), + zap.String("tx_hash", hex.EncodeToString(req.TxHash))) + } + } + } + }() + var notifier *discord.DiscordNotifier if *discordToken != "" { notifier, err = discord.NewDiscordNotifier(*discordToken, *discordChannel, logger) @@ -624,38 +657,36 @@ func runNode(cmd *cobra.Command, args []string) { } if err := supervisor.Run(ctx, "ethwatch", - ethereum.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1).Run); err != nil { + ethereum.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum]).Run); err != nil { return err } if err := supervisor.Run(ctx, "bscwatch", - ethereum.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1).Run); err != nil { + ethereum.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC]).Run); err != nil { return err } if err := supervisor.Run(ctx, "polygonwatch", - ethereum.NewEthWatcher( - *polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, - // Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is - // - // Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect - // developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon - // as specific public guidance exists for Polygon developers. - 512).Run); err != nil { + ethereum.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, 512, chainObsvReqC[vaa.ChainIDPolygon]).Run); err != nil { + // Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is + // + // Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect + // developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon + // as specific public guidance exists for Polygon developers. return err } if err := supervisor.Run(ctx, "avalanchewatch", - ethereum.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1).Run); err != nil { + ethereum.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche]).Run); err != nil { return err } if err := supervisor.Run(ctx, "oasiswatch", - ethereum.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1).Run); err != nil { + ethereum.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis]).Run); err != nil { return err } if *testnetMode { if err := supervisor.Run(ctx, "ethropstenwatch", - ethereum.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, setC, 1).Run); err != nil { + ethereum.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten]).Run); err != nil { return err } } @@ -680,7 +711,7 @@ func runNode(cmd *cobra.Command, args []string) { } if err := supervisor.Run(ctx, "solwatch-finalized", - solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, obsvReqC, rpc.CommitmentFinalized).Run); err != nil { + solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized).Run); err != nil { return err } diff --git a/node/hack/parse_eth_tx/parse_eth_tx.go b/node/hack/parse_eth_tx/parse_eth_tx.go index 45c52e8d2..caa3728da 100644 --- a/node/hack/parse_eth_tx/parse_eth_tx.go +++ b/node/hack/parse_eth_tx/parse_eth_tx.go @@ -32,7 +32,7 @@ func main() { contractAddr := common.HexToAddress(*flagContractAddr) transactionHash := common.HexToHash(*flagTx) - msgs, err := ethereum.MessageEventsForTransaction(ctx, c, contractAddr, vaa.ChainIDEthereum, transactionHash) + block, msgs, err := ethereum.MessageEventsForTransaction(ctx, c, contractAddr, vaa.ChainIDEthereum, transactionHash) if err != nil { log.Fatal(err) } @@ -52,6 +52,7 @@ func main() { } log.Println("------------------------------------------------------") + log.Printf("Block: %d", block) log.Printf("Message ID: %s", v.MessageID()) log.Printf("Digest: %s", v.HexDigest()) log.Printf("VAA: %+v", v) diff --git a/node/pkg/ethereum/by_transaction.go b/node/pkg/ethereum/by_transaction.go index eabcbf2c4..82ea55557 100644 --- a/node/pkg/ethereum/by_transaction.go +++ b/node/pkg/ethereum/by_transaction.go @@ -18,28 +18,30 @@ var ( logMessagePublishedTopic = eth_common.HexToHash("0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2") ) +// MessageEventsForTransaction returns the lockup events for a given transaction. +// Returns the block number and a list of MessagePublication events. func MessageEventsForTransaction( ctx context.Context, c *ethclient.Client, contract eth_common.Address, chainId vaa.ChainID, - tx eth_common.Hash) ([]*common.MessagePublication, error) { + tx eth_common.Hash) (uint64, []*common.MessagePublication, error) { f, err := abi.NewAbiFilterer(contract, c) if err != nil { - return nil, fmt.Errorf("failed to create ABI filterer: %w", err) + return 0, nil, fmt.Errorf("failed to create ABI filterer: %w", err) } // Get transactions logs from transaction receipt, err := c.TransactionReceipt(ctx, tx) if err != nil { - return nil, fmt.Errorf("failed to get transaction receipt: %w", err) + return 0, nil, fmt.Errorf("failed to get transaction receipt: %w", err) } // Get block block, err := c.BlockByHash(ctx, receipt.BlockHash) if err != nil { - return nil, fmt.Errorf("failed to get block: %w", err) + return 0, nil, fmt.Errorf("failed to get block: %w", err) } msgs := make([]*common.MessagePublication, 0, len(receipt.Logs)) @@ -61,7 +63,7 @@ func MessageEventsForTransaction( ev, err := f.ParseLogMessagePublished(*l) if err != nil { - return nil, fmt.Errorf("failed to parse log: %w", err) + return 0, nil, fmt.Errorf("failed to parse log: %w", err) } message := &common.MessagePublication{ @@ -78,5 +80,5 @@ func MessageEventsForTransaction( msgs = append(msgs, message) } - return msgs, nil + return receipt.BlockNumber.Uint64(), msgs, nil } diff --git a/node/pkg/ethereum/watcher.go b/node/pkg/ethereum/watcher.go index 328cf0480..8881cb5ff 100644 --- a/node/pkg/ethereum/watcher.go +++ b/node/pkg/ethereum/watcher.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "math/big" "sync" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -87,6 +88,10 @@ type ( // the governance mechanism lives there), setChan chan *common.GuardianSet + // Incoming re-observation requests from the network. Pre-filtered to only + // include requests for our chainID. + obsvReqC chan *gossipv1.ObservationRequest + pending map[pendingKey]*pendingMessage pendingMu sync.Mutex @@ -118,7 +123,8 @@ func NewEthWatcher( chainID vaa.ChainID, messageEvents chan *common.MessagePublication, setEvents chan *common.GuardianSet, - minConfirmations uint64) *Watcher { + minConfirmations uint64, + obsvReqC chan *gossipv1.ObservationRequest) *Watcher { return &Watcher{ url: url, contract: contract, @@ -128,6 +134,7 @@ func NewEthWatcher( chainID: chainID, msgChan: messageEvents, setChan: setEvents, + obsvReqC: obsvReqC, pending: map[pendingKey]*pendingMessage{}} } @@ -193,6 +200,93 @@ func (e *Watcher) Run(ctx context.Context) error { } }() + // Track the current block number so we can compare it to the block number of + // the message publication for observation requests. + var currentBlockNumber uint64 + + go func() { + for { + select { + case <-ctx.Done(): + return + case r := <-e.obsvReqC: + // This can't happen unless there is a programming error - the caller + // is expected to send us only requests for our chainID. + if vaa.ChainID(r.ChainId) != e.chainID { + panic("invalid chain ID") + } + + tx := eth_common.BytesToHash(r.TxHash) + logger.Info("received observation request", + zap.String("eth_network", e.networkName), + zap.String("tx_hash", tx.Hex())) + + // SECURITY: Load the block number before requesting the transaction to avoid a + // race condition where requesting the tx succeeds and is then dropped due to a fork, + // but blockNumberU had already advanced beyond the required threshold. + // + // In the primary watcher flow, this is of no concern since we assume the node + // always sends the head before it sends the logs (implicit synchronization + // by relying on the same websocket connection). + blockNumberU := atomic.LoadUint64(¤tBlockNumber) + if blockNumberU == 0 { + logger.Error("no block number available, ignoring observation request", + zap.String("eth_network", e.networkName)) + continue + } + + timeout, cancel := context.WithTimeout(ctx, 5*time.Second) + blockNumber, msgs, err := MessageEventsForTransaction(timeout, c, e.contract, e.chainID, tx) + cancel() + + if err != nil { + logger.Error("failed to process observation request", + zap.Error(err), zap.String("eth_network", e.networkName)) + continue + } + + for _, msg := range msgs { + expectedConfirmations := uint64(msg.ConsistencyLevel) + if expectedConfirmations < e.minConfirmations { + expectedConfirmations = e.minConfirmations + } + + // SECURITY: In the recovery flow, we already know which transaction to + // observe, and we can assume that it has reached the expected finality + // level a long time ago. Therefore, the logic is much simpler than the + // primary watcher, which has to wait for finality. + // + // Instead, we can simply check if the transaction's block number is in + // the past by more than the expected confirmation number. + // + // Ensure that the current block number is at least expectedConfirmations + // larger than the message observation's block number. + if blockNumber+expectedConfirmations <= blockNumberU { + logger.Info("re-observed message publication transaction", + zap.Stringer("tx", msg.TxHash), + zap.Stringer("emitter_address", msg.EmitterAddress), + zap.Uint64("sequence", msg.Sequence), + zap.Uint64("current_block", blockNumberU), + zap.Uint64("observed_block", blockNumber), + zap.String("eth_network", e.networkName), + ) + e.msgChan <- msg + } else { + logger.Info("ignoring re-observed message publication transaction", + zap.Stringer("tx", msg.TxHash), + zap.Stringer("emitter_address", msg.EmitterAddress), + zap.Uint64("sequence", msg.Sequence), + zap.Uint64("current_block", blockNumberU), + zap.Uint64("observed_block", blockNumber), + zap.Uint64("expected_confirmations", expectedConfirmations), + zap.String("eth_network", e.networkName), + ) + } + } + } + } + }() + errC := make(chan error) go func() { for { @@ -288,6 +382,7 @@ func (e *Watcher) Run(ctx context.Context) error { e.pendingMu.Lock() blockNumberU := ev.Number.Uint64() + atomic.StoreUint64(¤tBlockNumber, blockNumberU) for key, pLock := range e.pending { expectedConfirmations := uint64(pLock.message.ConsistencyLevel) diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index 2a5fe450d..724564fb7 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -491,12 +491,6 @@ func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *n return nil, fmt.Errorf("failed to unmarshal observation request: %w", err) } - // For now, this supports Solana only. Once we add more chains, we'll have to add a - // multiplexer/router in node.go. - if h.ChainId != uint32(vaa.ChainIDSolana) { - return nil, fmt.Errorf("unsupported chain id: %d", h.ChainId) - } - // TODO: implement per-guardian rate limiting return &h, nil