From f1bb5dfba92bb38374a7b891206f38fff6bfdc43 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 7 Jun 2024 08:40:48 +0200 Subject: [PATCH] make checks configurable --- Cargo.lock | 1 + Cargo.toml | 1 + src/rpcnode_check_alive.rs | 147 ++++++++++++++++++++++--------------- 3 files changed, 88 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 144bd0d..9d94325 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2027,6 +2027,7 @@ dependencies = [ "enum-iterator 2.1.0", "futures-util", "geyser-grpc-connector", + "itertools 0.10.5", "jsonrpsee-types", "reqwest 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index ad17ec7..4986923 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,3 +37,4 @@ enum-iterator = "2.1.0" url = "2.5.0" config = "0.14.0" +itertools = "0.10.5" diff --git a/src/rpcnode_check_alive.rs b/src/rpcnode_check_alive.rs index d43fc0d..c04b098 100644 --- a/src/rpcnode_check_alive.rs +++ b/src/rpcnode_check_alive.rs @@ -45,7 +45,7 @@ enum CheckResult { Timeout(Check), } -#[derive(Debug, PartialEq, Sequence)] +#[derive(Clone, Debug, PartialEq, Sequence)] enum Check { Gpa, TokenAccouns, @@ -73,7 +73,6 @@ async fn send_webook_discord() { error!("webhook failed: {:?}", e); } } - } #[tokio::main(flavor = "multi_thread", worker_threads = 16)] @@ -83,18 +82,19 @@ async fn main() -> ExitCode { // send_webook_discord().await; + // name of rpc node for logging/discord (e.g. hostname) + let rpcnode_label = std::env::var("RPCNODE_LABEL").unwrap(); let map_checks_by_name: HashMap = - enum_iterator::all::().map(|check| { - (format!("{:?}", check), check) - }).collect(); - info!("map_checks_by_name: {:?}", map_checks_by_name); + enum_iterator::all::().map(|check| { + (format!("{:?}", check), check) + }).collect(); // comma separated let checks_enabled = std::env::var("CHECKS_ENABLED").unwrap(); debug!("checks_enabled unparsed: {}", checks_enabled); - let checks_enabled = checks_enabled.split(",").map(|s| { + let checks_enabled: Vec = checks_enabled.split(",").map(|s| { let s = s.trim(); match map_checks_by_name.get(s) { @@ -104,50 +104,48 @@ async fn main() -> ExitCode { exit(1); } } - }).collect_vec(); + }).cloned().collect_vec(); - info!("checks enabled: {:?}", checks_enabled); + info!("checks enabled for rpcnode <{}>: {:?}", rpcnode_label, checks_enabled); - // name of rpc node for logging/discord (e.g. hostname) - let rpcnode_label = std::env::var("RPCNODE_LABEL").unwrap(); - // http://... - let rpc_addr = std::env::var("RPC_HTTP_ADDR").unwrap(); - // wss://... - let ws_addr = std::env::var("RPC_WS_ADDR").unwrap(); - - // http://... - let grpc_addr = std::env::var("GRPC_ADDR").unwrap(); - - let geyser_grpc_timeouts = GrpcConnectionTimeouts { - connect_timeout: Duration::from_secs(10), - request_timeout: Duration::from_secs(10), - subscribe_timeout: Duration::from_secs(10), - receive_timeout: Duration::from_secs(10), - }; - - info!("rpcnode_check_alive against {}: (rpc {}, ws {}, gprc {})", rpcnode_label, rpc_addr, ws_addr, grpc_addr); - - let rpc_url = Url::parse(rpc_addr.as_str()).unwrap(); - let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string())); - - let geyser_grpc_config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, geyser_grpc_timeouts.clone()); let mut all_check_tasks: JoinSet = JoinSet::new(); - add_task(Check::Gpa, rpc_gpa(rpc_client.clone()), &mut all_check_tasks); - add_task(Check::TokenAccouns, rpc_get_token_accounts_by_owner(rpc_client.clone()), &mut all_check_tasks); - add_task(Check::Gsfa, rpc_get_signatures_for_address(rpc_client.clone()), &mut all_check_tasks); - add_task(Check::GetAccountInfo, rpc_get_account_info(rpc_client.clone()), &mut all_check_tasks); + if checks_enabled.contains(&Check::Gpa) { + let rpc_client = read_rpc_config(); + add_task(Check::Gpa, rpc_gpa(rpc_client.clone()), &mut all_check_tasks); + } + if checks_enabled.contains(&Check::TokenAccouns) { + let rpc_client = read_rpc_config(); + add_task(Check::TokenAccouns, rpc_get_token_accounts_by_owner(rpc_client.clone()), &mut all_check_tasks); + } + if checks_enabled.contains(&Check::Gsfa) { + let rpc_client = read_rpc_config(); + add_task(Check::Gsfa, rpc_get_signatures_for_address(rpc_client.clone()), &mut all_check_tasks); + } + if checks_enabled.contains(&Check::GetAccountInfo) { + let rpc_client = read_rpc_config(); + add_task(Check::GetAccountInfo, rpc_get_account_info(rpc_client.clone()), &mut all_check_tasks); + } - add_task(Check::WebsocketAccount, websocket_account_subscribe(Url::parse(ws_addr.as_str()).unwrap()), &mut all_check_tasks); + if checks_enabled.contains(&Check::GeyserAllAccounts) { + let geyser_grpc_config = read_geyser_config(); + add_task(Check::GeyserAllAccounts, create_geyser_all_accounts_task(geyser_grpc_config), &mut all_check_tasks); + } + if checks_enabled.contains(&Check::GeyserTokenAccount) { + let geyser_grpc_config = read_geyser_config(); + add_task(Check::GeyserTokenAccount, create_geyser_token_account_task(geyser_grpc_config), &mut all_check_tasks); + } + if checks_enabled.contains(&Check::WebsocketAccount) { + let ws_addr = read_ws_config(); + add_task(Check::WebsocketAccount, websocket_account_subscribe(Url::parse(ws_addr.as_str()).unwrap()), &mut all_check_tasks); + } - add_task(Check::GeyserAllAccounts, create_geyser_all_accounts_task(geyser_grpc_config.clone()), &mut all_check_tasks); - add_task(Check::GeyserTokenAccount, create_geyser_token_account_task(geyser_grpc_config.clone()), &mut all_check_tasks); - - info!("all tasks started..."); + let tasks_total = all_check_tasks.len(); + info!("all {} tasks started...", tasks_total); let mut tasks_success = Vec::new(); let mut tasks_successful = 0; @@ -157,7 +155,7 @@ async fn main() -> ExitCode { match res { Ok(CheckResult::Success(check)) => { tasks_successful += 1; - info!("one more task completed <{:?}>, {} left", check, all_check_tasks.len()); + info!("one more task completed <{:?}>, {}/{} left", check, all_check_tasks.len(), tasks_total); tasks_success.push(check); } Ok(CheckResult::Timeout(check)) => { @@ -176,7 +174,8 @@ async fn main() -> ExitCode { if tasks_failed + tasks_timeout > 0 { - warn!("tasks failed ({}) or timed out ({}) of {} total", tasks_failed, tasks_timeout, tasks_total); + warn!("rpcnode <{}> - tasks failed ({}) or timed out ({}) of {} total", + rpcnode_label, tasks_failed, tasks_timeout, tasks_total); for check in enum_iterator::all::() { if !tasks_success.contains(&check) { warn!("!! did not complet task <{:?}>", check); @@ -184,24 +183,55 @@ async fn main() -> ExitCode { } return ExitCode::SUCCESS; } else { - info!("all {} tasks completed...", tasks_total); + info!("rpcnode <{}> - all {} tasks completed: {:?}", rpcnode_label, tasks_total, tasks_success); return ExitCode::FAILURE; } - } -fn add_task(check: Check, task: impl Future + Send + 'static, all_check_tasks: &mut JoinSet) { + +fn read_rpc_config() -> Arc { + // http://... + let rpc_addr = std::env::var("RPC_HTTP_ADDR").unwrap(); + + let rpc_url = Url::parse(rpc_addr.as_str()).unwrap(); + let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string())); + + rpc_client +} + +fn read_ws_config() -> String { + // wss://... + let ws_addr = std::env::var("RPC_WS_ADDR").unwrap(); + + ws_addr +} + +fn read_geyser_config() -> GrpcSourceConfig { + let grpc_addr = std::env::var("GRPC_ADDR").unwrap(); + + let geyser_grpc_timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(10), + request_timeout: Duration::from_secs(10), + subscribe_timeout: Duration::from_secs(10), + receive_timeout: Duration::from_secs(10), + }; + let geyser_grpc_config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, geyser_grpc_timeouts.clone()); + + geyser_grpc_config +} + +fn add_task(check: Check, task: impl Future + Send + 'static, all_check_tasks: &mut JoinSet) { let timeout = timeout(TASK_TIMEOUT, task).then(|res| async move { - match res { - Ok(()) => { - CheckResult::Success(check) + match res { + Ok(()) => { + CheckResult::Success(check) + } + Err(_) => { + CheckResult::Timeout(check) + } } - Err(_) => { - CheckResult::Timeout(check) - } - } - }); + }); all_check_tasks.spawn(timeout); } @@ -265,8 +295,7 @@ async fn create_geyser_token_account_task(config: GrpcSourceConfig) { panic!("failed to receive the requested token accounts"); } -async fn rpc_gpa(rpc_client: Arc) { - +async fn rpc_gpa(rpc_client: Arc) { let program_pubkey = Pubkey::from_str("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg").unwrap(); let filter = RpcFilterType::Memcmp(Memcmp::new_raw_bytes(30, vec![42])); @@ -308,7 +337,6 @@ async fn rpc_get_account_info(rpc_client: Arc) { debug!("Account info: {:?}", account_info); assert!(account_info.lamports > 0, "account lamports is zero"); - } async fn rpc_get_token_accounts_by_owner(rpc_client: Arc) { @@ -355,8 +383,7 @@ async fn rpc_get_signatures_for_address(rpc_client: Arc) { async fn websocket_account_subscribe( rpc_url: Url -) { - +) { let sysvar_subscribe = json!({ "jsonrpc": "2.0", @@ -393,7 +420,6 @@ async fn websocket_account_subscribe( } - pub fn all_accounts() -> SubscribeRequest { let mut accounts_subs = HashMap::new(); accounts_subs.insert( @@ -419,7 +445,6 @@ pub fn all_accounts() -> SubscribeRequest { } - pub fn token_accounts() -> SubscribeRequest { let mut accounts_subs = HashMap::new(); accounts_subs.insert(