diff --git a/src/rpcnode_check_alive.rs b/src/rpcnode_check_alive.rs index 7ba1eac..fdf22be 100644 --- a/src/rpcnode_check_alive.rs +++ b/src/rpcnode_check_alive.rs @@ -31,22 +31,24 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; type Slot = u64; enum Check { - Gpa + Gpa, + TokenAccouns, + Gsfa, + GetAccountInfo, + GeyserAllAccounts, + GeyserTokenAccount, + WebsocketAccount, } #[tokio::main(flavor = "multi_thread", worker_threads = 16)] async fn main() { tracing_subscriber::fmt::init(); - let ws_url2 = format!("wss://mango.rpcpool.com/{MAINNET_API_TOKEN}", - MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap()); + let ws_url = format!("wss://mango.rpcpool.com/{MAINNET_API_TOKEN}", + MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap()); let rpc_url = format!("https://mango.rpcpool.com/{MAINNET_API_TOKEN}", MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap()); - let rpc_url = Url::parse(rpc_url.as_str()).unwrap(); - let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string())); - let grpc_addr = std::env::var("GRPC_ADDR").unwrap(); - let geyser_grpc_timeouts = GrpcConnectionTimeouts { connect_timeout: Duration::from_secs(10), request_timeout: Duration::from_secs(10), @@ -54,26 +56,22 @@ async fn main() { receive_timeout: Duration::from_secs(10), }; - let geyser_grpc_config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, geyser_grpc_timeouts.clone()); + let rpc_url = Url::parse(rpc_url.as_str()).unwrap(); + let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string())); + let geyser_grpc_config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, geyser_grpc_timeouts.clone()); let mut all_check_tasks: JoinSet = JoinSet::new(); + all_check_tasks.spawn(rpc_gpa(rpc_client.clone()).then(|_| async { Check::Gpa })); + all_check_tasks.spawn(rpc_get_token_accounts_by_owner(rpc_client.clone()).then(|_| async { Check::TokenAccouns })); + all_check_tasks.spawn(rpc_get_signatures_for_address(rpc_client.clone()).then(|_| async { Check::Gsfa })); + all_check_tasks.spawn(rpc_get_account_info(rpc_client.clone()).then(|_| async { Check::GetAccountInfo })); - all_check_tasks.spawn(rpc_gpa(rpc_client.clone()).then(|x| async { Check::Gpa })); + all_check_tasks.spawn(websocket_account_subscribe(Url::parse(ws_url.as_str()).unwrap()).then(|_| async { Check::WebsocketAccount })); - - all_check_tasks.spawn(rpc_get_token_accounts_by_owner(rpc_client.clone()).then(|x| async { Check::Gpa })); - - all_check_tasks.spawn(rpc_get_signatures_for_address(rpc_client.clone()).then(|x| async { Check::Gpa })); - - - all_check_tasks.spawn(rpc_get_account_info(rpc_client.clone()).then(|x| async { Check::Gpa })); - all_check_tasks.spawn(create_geyser_all_accounts_task(geyser_grpc_config.clone()).then(|x| async { Check::Gpa })); - - all_check_tasks.spawn(create_geyser_token_account_task(geyser_grpc_config.clone()).then(|x| async { Check::Gpa })); - - all_check_tasks.spawn(websocket_account_subscribe(Url::parse(ws_url2.as_str()).unwrap()).then(|x| async { Check::Gpa })); + all_check_tasks.spawn(create_geyser_all_accounts_task(geyser_grpc_config.clone()).then(|_| async { Check::GeyserAllAccounts })); + all_check_tasks.spawn(create_geyser_token_account_task(geyser_grpc_config.clone()).then(|_| async { Check::GeyserTokenAccount })); info!("all tasks started..."); @@ -82,7 +80,6 @@ async fn main() { } info!("all tasks completed..."); - } diff --git a/src/slot_latency_tester.rs b/src/slot_latency_tester.rs index 3c7be42..e5a8d82 100644 --- a/src/slot_latency_tester.rs +++ b/src/slot_latency_tester.rs @@ -36,11 +36,9 @@ async fn main() { let rpc_url = format!("https://mango.rpcpool.com/{MAINNET_API_TOKEN}", MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap()); let rpc_url = Url::parse(rpc_url.as_str()).unwrap(); - let rpc_client = RpcClient::new(rpc_url.to_string()); let grpc_addr = std::env::var("GRPC_ADDR").unwrap(); - let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100); let timeouts = GrpcConnectionTimeouts { connect_timeout: Duration::from_secs(10), @@ -52,6 +50,8 @@ async fn main() { let config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, timeouts.clone()); + let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100); + start_geyser_slots_task(config.clone(), slots_tx.clone()); tokio::spawn(websocket_source(Url::parse(ws_url1.as_str()).unwrap(), slots_tx.clone())); @@ -184,5 +184,3 @@ pub fn slots() -> SubscribeRequest { ping: None, } } - -x \ No newline at end of file