diff --git a/clusterinfo/src/main.rs b/clusterinfo/src/main.rs index d68b62e..74141e2 100644 --- a/clusterinfo/src/main.rs +++ b/clusterinfo/src/main.rs @@ -2,6 +2,8 @@ use async_stream::stream; use futures::{Stream, StreamExt}; +use solana_client::rpc_client::RpcClient; +use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; use std::collections::HashSet; use std::pin::pin; @@ -12,6 +14,7 @@ use yellowstone_grpc_proto::prelude::SubscribeUpdateClusterInfo; use yellowstone_grpc_proto::prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots}; use yellowstone_grpc_proto::tonic::Status; +const RPC_URL: &str = "http://localhost:8899"; const GRPC_URL: &str = "http://localhost:10000"; #[tokio::main] @@ -47,13 +50,25 @@ where let mut cluster_info_stream = pin!(cluster_info_stream); //Log current size of the cluster node info list size. - let mut log_interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); + let mut log_interval = tokio::time::interval(tokio::time::Duration::from_secs(15)); + //spik first tick + log_interval.tick().await; loop { tokio::select! { //log interval TODO remove _ = log_interval.tick() => { log::info!("Current cluster info list size:{}", node_list.len()); + //verify cluster_infos + let geyzer_clluster = node_list.clone(); + tokio::task::spawn_blocking(move ||{ + match get_rpc_cluster_info() { + Ok(rpc_cluster) => { + verify_clusters(geyzer_clluster, rpc_cluster); + }, + Err(err) => log::warn!("Error during get RPC cluster nodes: {err}"), + } + }); } Some(update) = cluster_info_stream.next() => { node_list.insert(update.pubkey.clone(), update); @@ -118,3 +133,30 @@ async fn subscribe_geyzer_clusterinfo( Ok(confirmed_stream) } + +fn get_rpc_cluster_info() -> anyhow::Result> { + let rpc_client = RpcClient::new_with_timeout_and_commitment( + RPC_URL.to_string(), + tokio::time::Duration::from_secs(600), + CommitmentConfig::confirmed(), + ); + + let cluster_nodes = rpc_client.get_cluster_nodes()?; + log::info!("RPC cluster nodes size:{}", cluster_nodes.len()); + Ok(cluster_nodes) +} + +fn verify_clusters( + mut geyzer_cluster: HashMap, + rpc_cluster: Vec, +) { + //verify if all rpc are in geyzer + for rpc in &rpc_cluster { + if let None = geyzer_cluster.remove(&rpc.pubkey) { + log::info!("Rpc node not present in geyzer: {}", rpc.pubkey); + } + } + geyzer_cluster + .keys() + .for_each(|node| log::info!("Geyzer node not present in RPC: {}", node)); +}