From f9292177e98627d3063dd3295d665b4e3ca036b1 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Mon, 22 Apr 2024 19:07:22 +0200 Subject: [PATCH] fix(hermes): reconnect on wh connection termination (#1488) * fix(hermes): reconnect on wh connection termination `tokio::select` disables the branch that runs the wh connection if it returns OK and it never gets checked again. This change changes the `run` return to never return OK. * refactor(hermes): use Result in pythnet network listener thread --- apps/hermes/Cargo.lock | 2 +- apps/hermes/Cargo.toml | 2 +- apps/hermes/src/network/pythnet.rs | 97 ++++++++++++++--------------- apps/hermes/src/network/wormhole.rs | 16 ++++- 4 files changed, 61 insertions(+), 56 deletions(-) diff --git a/apps/hermes/Cargo.lock b/apps/hermes/Cargo.lock index 23eeeaf6..e7fb0f53 100644 --- a/apps/hermes/Cargo.lock +++ b/apps/hermes/Cargo.lock @@ -1796,7 +1796,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermes" -version = "0.5.5" +version = "0.5.6" dependencies = [ "anyhow", "async-trait", diff --git a/apps/hermes/Cargo.toml b/apps/hermes/Cargo.toml index 93de5a3e..d67f3ba9 100644 --- a/apps/hermes/Cargo.toml +++ b/apps/hermes/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.5.5" +version = "0.5.6" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/apps/hermes/src/network/pythnet.rs b/apps/hermes/src/network/pythnet.rs index c6eae7d7..7af25cde 100644 --- a/apps/hermes/src/network/pythnet.rs +++ b/apps/hermes/src/network/pythnet.rs @@ -139,7 +139,7 @@ async fn fetch_bridge_data( } } -pub async fn run(store: Arc, pythnet_ws_endpoint: String) -> Result<()> { +pub async fn run(store: Arc, pythnet_ws_endpoint: String) -> Result { let client = PubsubClient::new(pythnet_ws_endpoint.as_ref()).await?; let config = RpcProgramAccountsConfig { @@ -160,59 +160,54 @@ pub async fn run(store: Arc, pythnet_ws_endpoint: String) -> Result<()> { .program_subscribe(&system_program::id(), Some(config)) .await?; - loop { - match notif.next().await { - Some(update) => { - let account: Account = match update.value.account.decode() { - Some(account) => account, - None => { - tracing::error!(?update, "Failed to decode account from update."); - continue; - } - }; - - let accumulator_messages = AccumulatorMessages::try_from_slice(&account.data); - match accumulator_messages { - Ok(accumulator_messages) => { - let (candidate, _) = Pubkey::find_program_address( - &[ - b"AccumulatorState", - &accumulator_messages.ring_index().to_be_bytes(), - ], - &system_program::id(), - ); - - if candidate.to_string() == update.value.pubkey { - let store = store.clone(); - tokio::spawn(async move { - if let Err(err) = Aggregates::store_update( - &*store, - Update::AccumulatorMessages(accumulator_messages), - ) - .await - { - tracing::error!(error = ?err, "Failed to store accumulator messages."); - } - }); - } else { - tracing::error!( - ?candidate, - ?update.value.pubkey, - "Failed to verify message public keys.", - ); - } - } - - Err(err) => { - tracing::error!(error = ?err, "Failed to parse AccumulatorMessages."); - } - }; - } + while let Some(update) = notif.next().await { + let account: Account = match update.value.account.decode() { + Some(account) => account, None => { - return Err(anyhow!("Pythnet network listener terminated")); + tracing::error!(?update, "Failed to decode account from update."); + continue; } - } + }; + + let accumulator_messages = AccumulatorMessages::try_from_slice(&account.data); + match accumulator_messages { + Ok(accumulator_messages) => { + let (candidate, _) = Pubkey::find_program_address( + &[ + b"AccumulatorState", + &accumulator_messages.ring_index().to_be_bytes(), + ], + &system_program::id(), + ); + + if candidate.to_string() == update.value.pubkey { + let store = store.clone(); + tokio::spawn(async move { + if let Err(err) = Aggregates::store_update( + &*store, + Update::AccumulatorMessages(accumulator_messages), + ) + .await + { + tracing::error!(error = ?err, "Failed to store accumulator messages."); + } + }); + } else { + tracing::error!( + ?candidate, + ?update.value.pubkey, + "Failed to verify message public keys.", + ); + } + } + + Err(err) => { + tracing::error!(error = ?err, "Failed to parse AccumulatorMessages."); + } + }; } + + Err(anyhow!("Pythnet network listener connection terminated")) } /// Fetch existing GuardianSet accounts from Wormhole. diff --git a/apps/hermes/src/network/wormhole.rs b/apps/hermes/src/network/wormhole.rs index 77891863..a8302fcf 100644 --- a/apps/hermes/src/network/wormhole.rs +++ b/apps/hermes/src/network/wormhole.rs @@ -49,7 +49,11 @@ use { Digest, Keccak256, }, - std::sync::Arc, + std::{ + sync::Arc, + time::Duration, + }, + tokio::time::Instant, tonic::Request, wormhole_sdk::{ vaa::{ @@ -158,10 +162,16 @@ mod proto { pub async fn spawn(opts: RunOptions, state: Arc) -> Result<()> { let mut exit = crate::EXIT.subscribe(); loop { + let current_time = Instant::now(); tokio::select! { _ = exit.changed() => break, Err(err) = run(opts.clone(), state.clone()) => { tracing::error!(error = ?err, "Wormhole gRPC service failed."); + + if current_time.elapsed() < Duration::from_secs(30) { + tracing::error!("Wormhole listener restarting too quickly. Sleep 1s."); + tokio::time::sleep(Duration::from_secs(1)).await; + } } } } @@ -170,7 +180,7 @@ pub async fn spawn(opts: RunOptions, state: Arc) -> Result<()> { } #[tracing::instrument(skip(opts, state))] -async fn run(opts: RunOptions, state: Arc) -> Result<()> { +async fn run(opts: RunOptions, state: Arc) -> Result { let mut client = SpyRpcServiceClient::connect(opts.wormhole.spy_rpc_addr).await?; let mut stream = client .subscribe_signed_vaa(Request::new(SubscribeSignedVaaRequest { @@ -190,7 +200,7 @@ async fn run(opts: RunOptions, state: Arc) -> Result<()> { } } - Ok(()) + Err(anyhow!("Wormhole gRPC stream terminated.")) } /// Process a message received via a Wormhole gRPC connection.