From fad4f65569b44a7987ca05ccccaa563f71a030dc Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 7 Jun 2024 09:38:59 +0200 Subject: [PATCH] clippy+fmt --- src/rpcnode_check_alive.rs | 165 +++++++++++++--------------- src/rpcnode_define_checks.rs | 203 +++++++++++++++++------------------ src/slot_latency_tester.rs | 121 +++++++++------------ 3 files changed, 227 insertions(+), 262 deletions(-) diff --git a/src/rpcnode_check_alive.rs b/src/rpcnode_check_alive.rs index dfdbdee..2f5d99f 100644 --- a/src/rpcnode_check_alive.rs +++ b/src/rpcnode_check_alive.rs @@ -1,45 +1,14 @@ mod rpcnode_define_checks; -use std::collections::{HashMap, HashSet}; -use std::future::Future; -use std::pin::pin; -use std::process::{exit, ExitCode}; -use std::str::FromStr; -use std::sync::Arc; -use std::thread::sleep; -use std::time::Duration; -use futures_util::FutureExt; -use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message}; -use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; -use serde_json::{json, Value}; -use solana_account_decoder::parse_token::spl_token_ids; -use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_rpc_client::rpc_client::GetConfirmedSignaturesForAddress2Config; -use solana_rpc_client_api::request::TokenAccountsFilter; -use solana_rpc_client_api::response::SlotInfo; -use solana_sdk::commitment_config::CommitmentConfig; -use solana_sdk::pubkey::Pubkey; -use tokio::{join, select}; -use tokio::sync::mpsc::error::SendError; -use tokio::sync::mpsc::Sender; -use tokio::task::{JoinError, JoinHandle, JoinSet}; -use tokio::time::{Instant, timeout}; -use tokio_stream::{Stream, StreamExt}; -use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; -use tracing::{debug, error, info, trace, warn}; -use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage}; -use url::Url; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate}; -use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; -use anyhow::Context; use enum_iterator::Sequence; use gethostname::gethostname; -use solana_account_decoder::UiAccountEncoding; -use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; -use solana_rpc_client_api::filter::{Memcmp, RpcFilterType}; use itertools::Itertools; - -type Slot = u64; +use serde_json::{json, Value}; +use std::collections::HashMap; +use std::process::{exit, ExitCode}; +use std::time::Duration; +use tokio::task::JoinSet; +use tracing::{debug, error, info, warn}; const TASK_TIMEOUT: Duration = Duration::from_millis(15000); @@ -59,7 +28,6 @@ enum Check { WebsocketAccount, } - async fn send_webook_discord(discord_body: Value) { let Ok(url) = std::env::var("DISCORD_WEBHOOK") else { info!("sending to discord is disabled"); @@ -67,9 +35,7 @@ async fn send_webook_discord(discord_body: Value) { }; let client = reqwest::Client::new(); - let res = client.post(url) - .json(&discord_body) - .send().await; + let res = client.post(url).json(&discord_body).send().await; match res { Ok(_) => { info!("webhook sent"); @@ -87,28 +53,34 @@ async fn main() -> ExitCode { // name of rpc node for logging/discord (e.g. hostname) let rpcnode_label = std::env::var("RPCNODE_LABEL").unwrap(); - let map_checks_by_name: HashMap = - enum_iterator::all::().map(|check| { - (format!("{:?}", check), check) - }).collect(); + let map_checks_by_name: HashMap = enum_iterator::all::() + .map(|check| (format!("{:?}", check), check)) + .collect(); // comma separated let checks_enabled = std::env::var("CHECKS_ENABLED").unwrap(); debug!("checks_enabled unparsed: {}", checks_enabled); - let checks_enabled: Vec = checks_enabled.split(",").map(|s| { - let s = s.trim(); + let checks_enabled: Vec = checks_enabled + .split(',') + .map(|s| { + let s = s.trim(); - match map_checks_by_name.get(s) { - Some(check) => check, - None => { - error!("unknown check: {}", s); - exit(1); + match map_checks_by_name.get(s) { + Some(check) => check, + None => { + error!("unknown check: {}", s); + exit(1); + } } - } - }).cloned().collect_vec(); + }) + .cloned() + .collect_vec(); - info!("checks enabled for rpcnode <{}>: {:?}", rpcnode_label, checks_enabled); + info!( + "checks enabled for rpcnode <{}>: {:?}", + rpcnode_label, checks_enabled + ); let mut all_check_tasks: JoinSet = JoinSet::new(); @@ -126,7 +98,12 @@ async fn main() -> ExitCode { match res { Ok(CheckResult::Success(check)) => { tasks_successful += 1; - info!("one more task completed <{:?}>, {}/{} left", check, all_check_tasks.len(), tasks_total); + info!( + "one more task completed <{:?}>, {}/{} left", + check, + all_check_tasks.len(), + tasks_total + ); tasks_success.push(check); } Ok(CheckResult::Timeout(check)) => { @@ -145,13 +122,20 @@ async fn main() -> ExitCode { assert!(tasks_total > 0, "no results"); - - let discord_body = create_discord_message(&rpcnode_label, checks_enabled, &mut tasks_success, tasks_timedout, success); + let discord_body = create_discord_message( + &rpcnode_label, + checks_enabled, + &mut tasks_success, + tasks_timedout, + success, + ); send_webook_discord(discord_body).await; if !success { - warn!("rpcnode <{}> - tasks failed ({}) or timed out ({}) of {} total", - rpcnode_label, tasks_failed, tasks_timeout, tasks_total); + warn!( + "rpcnode <{}> - tasks failed ({}) or timed out ({}) of {} total", + rpcnode_label, tasks_failed, tasks_timeout, tasks_total + ); for check in enum_iterator::all::() { if !tasks_success.contains(&check) { warn!("!! did not complet task <{:?}>", check); @@ -159,42 +143,48 @@ async fn main() -> ExitCode { } return ExitCode::FAILURE; } else { - info!("rpcnode <{}> - all {} tasks completed: {:?}", rpcnode_label, tasks_total, tasks_success); + info!( + "rpcnode <{}> - all {} tasks completed: {:?}", + rpcnode_label, tasks_total, tasks_success + ); return ExitCode::SUCCESS; } } fn create_discord_message( - rpcnode_label: &str, checks_enabled: Vec, mut tasks_success: &mut Vec, mut tasks_timedout: Vec, success: bool) -> Value { - let result_per_check = enum_iterator::all::().map(|check| { - let name = format!("{:?}", check); - let disabled = !checks_enabled.contains(&check); - let timedout = tasks_timedout.contains(&check); - let success = tasks_success.contains(&check); - let value = if disabled { - "disabled" - } else if timedout { - "timed out" - } else if success { - "OK" - } else { - "failed" - }; - json! { - { - "name": name, - "value": value + rpcnode_label: &str, + checks_enabled: Vec, + tasks_success: &mut [Check], + tasks_timedout: Vec, + success: bool, +) -> Value { + let result_per_check = enum_iterator::all::() + .map(|check| { + let name = format!("{:?}", check); + let disabled = !checks_enabled.contains(&check); + let timedout = tasks_timedout.contains(&check); + let success = tasks_success.contains(&check); + let value = if disabled { + "disabled" + } else if timedout { + "timed out" + } else if success { + "OK" + } else { + "failed" + }; + json! { + { + "name": name, + "value": value + } } - } - }).collect_vec(); + }) + .collect_vec(); let fields = result_per_check; - let status_color = if success { - 0x00FF00 - } else { - 0xFC4100 - }; + let status_color = if success { 0x00FF00 } else { 0xFC4100 }; let hostname_executed = gethostname(); @@ -219,4 +209,3 @@ fn create_discord_message( }; body } - diff --git a/src/rpcnode_define_checks.rs b/src/rpcnode_define_checks.rs index 7680d7d..5852d1e 100644 --- a/src/rpcnode_define_checks.rs +++ b/src/rpcnode_define_checks.rs @@ -1,9 +1,4 @@ -use std::collections::HashMap; -use std::future::Future; -use std::pin::pin; -use std::str::FromStr; -use std::sync::Arc; -use std::time::Duration; +use crate::{Check, CheckResult, TASK_TIMEOUT}; use anyhow::Context; use futures_util::{FutureExt, StreamExt}; use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; @@ -17,6 +12,12 @@ use solana_rpc_client_api::filter::{Memcmp, RpcFilterType}; use solana_rpc_client_api::request::TokenAccountsFilter; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; +use std::future::Future; +use std::pin::pin; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; use tokio::task::JoinSet; use tokio::time::timeout; use tracing::debug; @@ -24,57 +25,76 @@ use url::Url; use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts}; -use crate::{Check, CheckResult, TASK_TIMEOUT}; -pub fn define_checks(checks_enabled: &Vec, mut all_check_tasks: &mut JoinSet) { +pub fn define_checks(checks_enabled: &[Check], all_check_tasks: &mut JoinSet) { if checks_enabled.contains(&Check::Gpa) { let rpc_client = read_rpc_config(); - add_task(Check::Gpa, rpc_gpa(rpc_client.clone()), &mut all_check_tasks); + add_task(Check::Gpa, rpc_gpa(rpc_client.clone()), all_check_tasks); } if checks_enabled.contains(&Check::TokenAccouns) { let rpc_client = read_rpc_config(); - add_task(Check::TokenAccouns, rpc_get_token_accounts_by_owner(rpc_client.clone()), &mut all_check_tasks); + add_task( + Check::TokenAccouns, + rpc_get_token_accounts_by_owner(rpc_client.clone()), + all_check_tasks, + ); } if checks_enabled.contains(&Check::Gsfa) { let rpc_client = read_rpc_config(); - add_task(Check::Gsfa, rpc_get_signatures_for_address(rpc_client.clone()), &mut all_check_tasks); + add_task( + Check::Gsfa, + rpc_get_signatures_for_address(rpc_client.clone()), + all_check_tasks, + ); } if checks_enabled.contains(&Check::GetAccountInfo) { let rpc_client = read_rpc_config(); - add_task(Check::GetAccountInfo, rpc_get_account_info(rpc_client.clone()), &mut all_check_tasks); + add_task( + Check::GetAccountInfo, + rpc_get_account_info(rpc_client.clone()), + all_check_tasks, + ); } if checks_enabled.contains(&Check::GeyserAllAccounts) { let geyser_grpc_config = read_geyser_config(); - add_task(Check::GeyserAllAccounts, create_geyser_all_accounts_task(geyser_grpc_config), &mut all_check_tasks); + add_task( + Check::GeyserAllAccounts, + create_geyser_all_accounts_task(geyser_grpc_config), + all_check_tasks, + ); } if checks_enabled.contains(&Check::GeyserTokenAccount) { let geyser_grpc_config = read_geyser_config(); - add_task(Check::GeyserTokenAccount, create_geyser_token_account_task(geyser_grpc_config), &mut all_check_tasks); + add_task( + Check::GeyserTokenAccount, + create_geyser_token_account_task(geyser_grpc_config), + all_check_tasks, + ); } if checks_enabled.contains(&Check::WebsocketAccount) { let ws_addr = read_ws_config(); - add_task(Check::WebsocketAccount, websocket_account_subscribe(Url::parse(ws_addr.as_str()).unwrap()), &mut all_check_tasks); + add_task( + Check::WebsocketAccount, + websocket_account_subscribe(Url::parse(ws_addr.as_str()).unwrap()), + all_check_tasks, + ); } } - - fn read_rpc_config() -> Arc { // http://... let rpc_addr = std::env::var("RPC_HTTP_ADDR").unwrap(); let rpc_url = Url::parse(rpc_addr.as_str()).unwrap(); - let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string())); - rpc_client + Arc::new(RpcClient::new(rpc_url.to_string())) } fn read_ws_config() -> String { // wss://... - let ws_addr = std::env::var("RPC_WS_ADDR").unwrap(); - ws_addr + std::env::var("RPC_WS_ADDR").unwrap() } fn read_geyser_config() -> GrpcSourceConfig { @@ -86,84 +106,66 @@ fn read_geyser_config() -> GrpcSourceConfig { subscribe_timeout: Duration::from_secs(10), receive_timeout: Duration::from_secs(10), }; - let geyser_grpc_config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, geyser_grpc_timeouts.clone()); - geyser_grpc_config + GrpcSourceConfig::new( + grpc_addr.to_string(), + None, + None, + geyser_grpc_timeouts.clone(), + ) } - - -fn add_task(check: Check, task: impl Future + Send + 'static, all_check_tasks: &mut JoinSet) { - let timeout = - timeout(TASK_TIMEOUT, task).then(|res| async move { - match res { - Ok(()) => { - CheckResult::Success(check) - } - Err(_) => { - CheckResult::Timeout(check) - } - } - }); +fn add_task( + check: Check, + task: impl Future + Send + 'static, + all_check_tasks: &mut JoinSet, +) { + let timeout = timeout(TASK_TIMEOUT, task).then(|res| async move { + match res { + Ok(()) => CheckResult::Success(check), + Err(_) => CheckResult::Timeout(check), + } + }); all_check_tasks.spawn(timeout); } - // note: this might fail if the yellowstone plugin does not allow "any broadcast filter" async fn create_geyser_all_accounts_task(config: GrpcSourceConfig) { - let green_stream = create_geyser_reconnecting_stream( - config.clone(), - all_accounts(), - ); + let green_stream = create_geyser_reconnecting_stream(config.clone(), all_accounts()); let mut count = 0; let mut green_stream = pin!(green_stream); while let Some(message) = green_stream.next().await { - match message { - Message::GeyserSubscribeUpdate(subscriber_update) => { - match subscriber_update.update_oneof { - Some(UpdateOneof::Account(update)) => { - debug!("Account from geyser: {:?}", update.account.unwrap().pubkey); - count += 1; - if count > 3 { - return; - } - } - _ => {} + if let Message::GeyserSubscribeUpdate(subscriber_update) = message { + if let Some(UpdateOneof::Account(update)) = subscriber_update.update_oneof { + debug!("Account from geyser: {:?}", update.account.unwrap().pubkey); + count += 1; + if count > 3 { + return; } } - _ => {} } - }; + } panic!("failed to receive the requested accounts"); } async fn create_geyser_token_account_task(config: GrpcSourceConfig) { - let green_stream = create_geyser_reconnecting_stream( - config.clone(), - token_accounts(), - ); + let green_stream = create_geyser_reconnecting_stream(config.clone(), token_accounts()); let mut count = 0; let mut green_stream = pin!(green_stream); while let Some(message) = green_stream.next().await { - match message { - Message::GeyserSubscribeUpdate(subscriber_update) => { - match subscriber_update.update_oneof { - Some(UpdateOneof::Account(update)) => { - debug!("Token Account: {:?}", update.account.unwrap().pubkey); - count += 1; - if count > 3 { - return; - } - } - _ => {} + if let Message::GeyserSubscribeUpdate(subscriber_update) = message { + if let Some(UpdateOneof::Account(update)) = subscriber_update.update_oneof { + debug!("Token Account: {:?}", update.account.unwrap().pubkey); + count += 1; + if count > 3 { + return; } } - _ => {} } - }; + } panic!("failed to receive the requested token accounts"); } @@ -171,7 +173,7 @@ async fn create_geyser_token_account_task(config: GrpcSourceConfig) { async fn rpc_gpa(rpc_client: Arc) { let program_pubkey = Pubkey::from_str("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg").unwrap(); - let filter = RpcFilterType::Memcmp(Memcmp::new_raw_bytes(30, vec![42])); + let _filter = RpcFilterType::Memcmp(Memcmp::new_raw_bytes(30, vec![42])); let _config = RpcProgramAccountsConfig { // filters: Some(vec![filter]), @@ -196,16 +198,16 @@ async fn rpc_gpa(rpc_client: Arc) { debug!("Program accounts: {:?}", program_accounts.len()); // mango 12400 on mainnet - assert!(program_accounts.len() > 1000, "program accounts count is too low"); + assert!( + program_accounts.len() > 1000, + "program accounts count is too low" + ); } async fn rpc_get_account_info(rpc_client: Arc) { let program_pubkey = Pubkey::from_str("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg").unwrap(); - let account_info = rpc_client - .get_account(&program_pubkey) - .await - .unwrap(); + let account_info = rpc_client.get_account(&program_pubkey).await.unwrap(); debug!("Account info: {:?}", account_info); @@ -217,10 +219,7 @@ async fn rpc_get_token_accounts_by_owner(rpc_client: Arc) { let mint = Pubkey::from_str("EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v").unwrap(); let token_accounts = rpc_client - .get_token_accounts_by_owner( - &owner_pubkey, - TokenAccountsFilter::Mint(mint), - ) + .get_token_accounts_by_owner(&owner_pubkey, TokenAccountsFilter::Mint(mint)) .await .context("rpc_get_token_accounts_by_owner") .unwrap(); @@ -228,7 +227,7 @@ async fn rpc_get_token_accounts_by_owner(rpc_client: Arc) { // 1 account debug!("Token accounts: {:?}", token_accounts.len()); - assert!(token_accounts.len() > 0, "token accounts count is zero"); + assert!(!token_accounts.is_empty(), "token accounts count is zero"); } async fn rpc_get_signatures_for_address(rpc_client: Arc) { @@ -253,28 +252,24 @@ async fn rpc_get_signatures_for_address(rpc_client: Arc) { assert!(signatures.len() > 10, "signatures count is too low"); } - -async fn websocket_account_subscribe( - rpc_url: Url -) { - let sysvar_subscribe = - json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "accountSubscribe", - "params": [ - "SysvarC1ock11111111111111111111111111111111" - ] - }); +async fn websocket_account_subscribe(rpc_url: Url) { + let sysvar_subscribe = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "accountSubscribe", + "params": [ + "SysvarC1ock11111111111111111111111111111111" + ] + }); let mut ws1 = StableWebSocket::new_with_timeout( rpc_url, sysvar_subscribe.clone(), Duration::from_secs(3), ) - .await - .context("new websocket") - .unwrap(); + .await + .context("new websocket") + .unwrap(); let mut channel = ws1.subscribe_message_channel(); @@ -292,7 +287,6 @@ async fn websocket_account_subscribe( panic!("failed to receive the requested sysvar clock accounts"); } - pub fn all_accounts() -> SubscribeRequest { let mut accounts_subs = HashMap::new(); accounts_subs.insert( @@ -317,7 +311,6 @@ pub fn all_accounts() -> SubscribeRequest { } } - pub fn token_accounts() -> SubscribeRequest { let mut accounts_subs = HashMap::new(); accounts_subs.insert( @@ -325,13 +318,14 @@ pub fn token_accounts() -> SubscribeRequest { SubscribeRequestFilterAccounts { account: vec![], // vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()], - owner: - spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(), + owner: spl_token_ids() + .iter() + .map(|pubkey| pubkey.to_string()) + .collect(), filters: vec![], }, ); - SubscribeRequest { slots: HashMap::new(), accounts: accounts_subs, @@ -344,4 +338,3 @@ pub fn token_accounts() -> SubscribeRequest { ping: None, } } - diff --git a/src/slot_latency_tester.rs b/src/slot_latency_tester.rs index e5a8d82..8f988c3 100644 --- a/src/slot_latency_tester.rs +++ b/src/slot_latency_tester.rs @@ -1,10 +1,5 @@ -use std::collections::HashMap; -use std::pin::pin; -use std::str::FromStr; -use std::thread::sleep; -use std::time::Duration; -use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message}; use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; +use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message}; use serde_json::json; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client::rpc_client::GetConfirmedSignaturesForAddress2Config; @@ -12,17 +7,22 @@ use solana_rpc_client_api::request::TokenAccountsFilter; use solana_rpc_client_api::response::SlotInfo; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; +use std::pin::pin; +use std::str::FromStr; +use std::thread::sleep; +use std::time::Duration; use tokio::select; use tokio::sync::mpsc::error::SendError; -use tokio::sync::mpsc::Sender; use tokio::time::Instant; -use tokio_stream::{Stream, StreamExt}; -use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; -use tracing::{info, trace, warn}; -use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage}; +use tokio_stream::StreamExt; +use tracing::info; use url::Url; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate}; +use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::geyser::{ + SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate, +}; type Slot = u64; @@ -31,15 +31,18 @@ async fn main() { tracing_subscriber::fmt::init(); let ws_url1 = format!("wss://api.mainnet-beta.solana.com"); - let ws_url2 = format!("wss://mango.rpcpool.com/{MAINNET_API_TOKEN}", - MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap()); - let rpc_url = format!("https://mango.rpcpool.com/{MAINNET_API_TOKEN}", - MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap()); + let ws_url2 = format!( + "wss://mango.rpcpool.com/{MAINNET_API_TOKEN}", + MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap() + ); + let rpc_url = format!( + "https://mango.rpcpool.com/{MAINNET_API_TOKEN}", + MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap() + ); let rpc_url = Url::parse(rpc_url.as_str()).unwrap(); let grpc_addr = std::env::var("GRPC_ADDR").unwrap(); - let timeouts = GrpcConnectionTimeouts { connect_timeout: Duration::from_secs(10), request_timeout: Duration::from_secs(10), @@ -47,18 +50,22 @@ async fn main() { receive_timeout: Duration::from_secs(10), }; - let config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, timeouts.clone()); let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100); start_geyser_slots_task(config.clone(), slots_tx.clone()); - tokio::spawn(websocket_source(Url::parse(ws_url1.as_str()).unwrap(), slots_tx.clone())); - tokio::spawn(websocket_source(Url::parse(ws_url2.as_str()).unwrap(), slots_tx.clone())); + tokio::spawn(websocket_source( + Url::parse(ws_url1.as_str()).unwrap(), + slots_tx.clone(), + )); + tokio::spawn(websocket_source( + Url::parse(ws_url2.as_str()).unwrap(), + slots_tx.clone(), + )); tokio::spawn(rpc_getslot_source(rpc_url, slots_tx.clone())); - let started_at = Instant::now(); while let Some(slot) = slots_rx.recv().await { println!("Slot: {}", slot); @@ -68,16 +75,10 @@ async fn main() { } } - sleep(Duration::from_secs(15)); + sleep(Duration::from_secs(15)); } - - -async fn rpc_getslot_source( - rpc_url: Url, - mpsc_downstream: tokio::sync::mpsc::Sender, -) { - +async fn rpc_getslot_source(rpc_url: Url, mpsc_downstream: tokio::sync::mpsc::Sender) { let rpc = RpcClient::new(rpc_url.to_string()); loop { tokio::time::sleep(Duration::from_millis(100)).await; @@ -87,82 +88,64 @@ async fn rpc_getslot_source( .unwrap(); match mpsc_downstream.send(slot).await { Ok(_) => {} - Err(_) => return + Err(_) => return, } } - } - -async fn websocket_source( - rpc_url: Url, - mpsc_downstream: tokio::sync::mpsc::Sender, -) { - - let processed_slot_subscribe = - json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "slotSubscribe", - }); +async fn websocket_source(rpc_url: Url, mpsc_downstream: tokio::sync::mpsc::Sender) { + let processed_slot_subscribe = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "slotSubscribe", + }); let mut ws1 = StableWebSocket::new_with_timeout( rpc_url, processed_slot_subscribe.clone(), Duration::from_secs(3), ) - .await - .unwrap(); + .await + .unwrap(); let mut channel = ws1.subscribe_message_channel(); while let Ok(msg) = channel.recv().await { if let WsMessage::Text(payload) = msg { - let ws_result: jsonrpsee_types::SubscriptionResponse = serde_json::from_str(&payload).unwrap(); + let ws_result: jsonrpsee_types::SubscriptionResponse = + serde_json::from_str(&payload).unwrap(); let slot_info = ws_result.params.result; match mpsc_downstream.send(slot_info.slot).await { Ok(_) => {} - Err(_) => return + Err(_) => return, } } } - } - // note: this might fail if the yellowstone plugin does not allow "any broadcast filter" -fn start_geyser_slots_task(config: GrpcSourceConfig, - mpsc_downstream: tokio::sync::mpsc::Sender, +fn start_geyser_slots_task( + config: GrpcSourceConfig, + mpsc_downstream: tokio::sync::mpsc::Sender, ) { - let green_stream = create_geyser_reconnecting_stream( - config.clone(), - slots(), - ); + let green_stream = create_geyser_reconnecting_stream(config.clone(), slots()); tokio::spawn(async move { let mut green_stream = pin!(green_stream); while let Some(message) = green_stream.next().await { - match message { - Message::GeyserSubscribeUpdate(subscriber_update) => { - match subscriber_update.update_oneof { - Some(UpdateOneof::Slot(slot_info)) => { - info!("Slot from geyser: {:?}", slot_info.slot); - match mpsc_downstream.send(slot_info.slot).await { - Ok(_) => {} - Err(_) => return - } - } - _ => {} + if let Message::GeyserSubscribeUpdate(subscriber_update) = message { + if let Some(UpdateOneof::Slot(slot_info)) = subscriber_update.update_oneof { + info!("Slot from geyser: {:?}", slot_info.slot); + match mpsc_downstream.send(slot_info.slot).await { + Ok(_) => {} + Err(_) => return, } } - _ => {} } } }); } - - pub fn slots() -> SubscribeRequest { let mut slot_subs = HashMap::new(); slot_subs.insert(