From c73b5a783b1acc9c15c89bb2ad7030fa81248c06 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 5 Jun 2024 18:44:19 +0200 Subject: [PATCH] combine nicely --- src/rpcnode_check_alive.rs | 72 ++++++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 19 deletions(-) diff --git a/src/rpcnode_check_alive.rs b/src/rpcnode_check_alive.rs index fdf22be..c37e6f3 100644 --- a/src/rpcnode_check_alive.rs +++ b/src/rpcnode_check_alive.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::future::Future; use std::pin::pin; use std::str::FromStr; use std::sync::Arc; @@ -18,11 +19,11 @@ use solana_sdk::pubkey::Pubkey; use tokio::{join, select}; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::Sender; -use tokio::task::{JoinHandle, JoinSet}; -use tokio::time::Instant; +use tokio::task::{JoinError, JoinHandle, JoinSet}; +use tokio::time::{Instant, timeout}; use tokio_stream::{Stream, StreamExt}; use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; -use tracing::{info, trace, warn}; +use tracing::{debug, info, trace, warn}; use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage}; use url::Url; use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate}; @@ -30,6 +31,14 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; type Slot = u64; +const TASK_TIMEOUT: Duration = Duration::from_millis(5000); + +enum CheckResult { + Success(Check), + Timeout(Check), +} + +#[derive(Debug)] enum Check { Gpa, TokenAccouns, @@ -61,27 +70,52 @@ async fn main() { let geyser_grpc_config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, geyser_grpc_timeouts.clone()); - let mut all_check_tasks: JoinSet = JoinSet::new(); + let mut all_check_tasks: JoinSet = JoinSet::new(); - all_check_tasks.spawn(rpc_gpa(rpc_client.clone()).then(|_| async { Check::Gpa })); - all_check_tasks.spawn(rpc_get_token_accounts_by_owner(rpc_client.clone()).then(|_| async { Check::TokenAccouns })); - all_check_tasks.spawn(rpc_get_signatures_for_address(rpc_client.clone()).then(|_| async { Check::Gsfa })); - all_check_tasks.spawn(rpc_get_account_info(rpc_client.clone()).then(|_| async { Check::GetAccountInfo })); + add_task(Check::Gpa, rpc_gpa(rpc_client.clone()), &mut all_check_tasks); + add_task(Check::TokenAccouns, rpc_get_token_accounts_by_owner(rpc_client.clone()), &mut all_check_tasks); + add_task(Check::Gsfa, rpc_get_signatures_for_address(rpc_client.clone()), &mut all_check_tasks); + add_task(Check::GetAccountInfo, rpc_get_account_info(rpc_client.clone()), &mut all_check_tasks); - all_check_tasks.spawn(websocket_account_subscribe(Url::parse(ws_url.as_str()).unwrap()).then(|_| async { Check::WebsocketAccount })); + add_task(Check::WebsocketAccount, websocket_account_subscribe(Url::parse(ws_url.as_str()).unwrap()), &mut all_check_tasks); - all_check_tasks.spawn(create_geyser_all_accounts_task(geyser_grpc_config.clone()).then(|_| async { Check::GeyserAllAccounts })); - all_check_tasks.spawn(create_geyser_token_account_task(geyser_grpc_config.clone()).then(|_| async { Check::GeyserTokenAccount })); + add_task(Check::GeyserAllAccounts, create_geyser_all_accounts_task(geyser_grpc_config.clone()), &mut all_check_tasks); + add_task(Check::GeyserTokenAccount, create_geyser_token_account_task(geyser_grpc_config.clone()), &mut all_check_tasks); info!("all tasks started..."); while let Some(res) = all_check_tasks.join_next().await { - info!("one more Task completed: {} left", all_check_tasks.len()); + match res { + Ok(CheckResult::Success(check)) => { + info!("one more Task completed: {:?}, {} left", check, all_check_tasks.len()); + } + Ok(CheckResult::Timeout(check)) => { + info!("timeout running Task {:?}", check); + } + Err(_) => { + warn!("Task failed"); + } + } } info!("all tasks completed..."); } +fn add_task(check: Check, task: impl Future + Send + 'static, all_check_tasks: &mut JoinSet) { + let timeout = + timeout(TASK_TIMEOUT, task).then(|res| async move { + match res { + Ok(()) => { + CheckResult::Success(check) + } + Err(_) => { + CheckResult::Timeout(check) + } + } + }); + all_check_tasks.spawn(timeout); +} + // note: this might fail if the yellowstone plugin does not allow "any broadcast filter" async fn create_geyser_all_accounts_task(config: GrpcSourceConfig) { @@ -97,7 +131,7 @@ async fn create_geyser_all_accounts_task(config: GrpcSourceConfig) { Message::GeyserSubscribeUpdate(subscriber_update) => { match subscriber_update.update_oneof { Some(UpdateOneof::Account(update)) => { - info!("Account from geyser: {:?}", update.account.unwrap().pubkey); + debug!("Account from geyser: {:?}", update.account.unwrap().pubkey); count += 1; if count > 3 { return; @@ -124,7 +158,7 @@ async fn create_geyser_token_account_task(config: GrpcSourceConfig) { Message::GeyserSubscribeUpdate(subscriber_update) => { match subscriber_update.update_oneof { Some(UpdateOneof::Account(update)) => { - info!("Token Account: {:?}", update.account.unwrap().pubkey); + debug!("Token Account: {:?}", update.account.unwrap().pubkey); count += 1; if count > 3 { return; @@ -150,7 +184,7 @@ async fn rpc_gpa(rpc_client: Arc) { .await .unwrap(); - info!("Program accounts: {:?}", program_accounts.len()); + debug!("Program accounts: {:?}", program_accounts.len()); // mango 12400 on mainnet // CPL: 107 on mainnet @@ -164,7 +198,7 @@ async fn rpc_get_account_info(rpc_client: Arc) { .await .unwrap(); - info!("Account info: {:?}", account_info); + debug!("Account info: {:?}", account_info); } @@ -181,7 +215,7 @@ async fn rpc_get_token_accounts_by_owner(rpc_client: Arc) { .unwrap(); // 1 account - info!("Token accounts: {:?}", token_accounts.len()); + debug!("Token accounts: {:?}", token_accounts.len()); } async fn rpc_get_signatures_for_address(rpc_client: Arc) { @@ -200,7 +234,7 @@ async fn rpc_get_signatures_for_address(rpc_client: Arc) { .unwrap(); // 42 - info!("Signatures: {:?}", signatures.len()); + debug!("Signatures: {:?}", signatures.len()); } @@ -231,7 +265,7 @@ async fn websocket_account_subscribe( let mut count = 0; while let Ok(msg) = channel.recv().await { if let WsMessage::Text(payload) = msg { - info!("SysvarC1ock: {:?}", payload); + debug!("SysvarC1ock: {:?}", payload); count += 1; } if count > 3 {