diff --git a/clusterinfo/Cargo.toml b/clusterinfo/Cargo.toml index d550ba8..b1c42dd 100644 --- a/clusterinfo/Cargo.toml +++ b/clusterinfo/Cargo.toml @@ -36,6 +36,7 @@ solana-rpc-client-api = { path = "../../solana-geyzer-cluster/rpc-client-api" } solana-version = { path = "../../solana-geyzer-cluster/version"} solana-account-decoder = { path = "../../solana-geyzer-cluster/account-decoder" } solana-ledger = { path = "../../solana-geyzer-cluster/ledger"} +solana-streamer = { path = "../../solana-geyzer-cluster/streamer"} #patch to build locally diff --git a/clusterinfo/src/main.rs b/clusterinfo/src/main.rs index 74141e2..ef186ea 100644 --- a/clusterinfo/src/main.rs +++ b/clusterinfo/src/main.rs @@ -4,9 +4,11 @@ use async_stream::stream; use futures::{Stream, StreamExt}; use solana_client::rpc_client::RpcClient; use solana_sdk::commitment_config::CommitmentConfig; +use solana_streamer::socket::SocketAddrSpace; use std::collections::HashMap; use std::collections::HashSet; use std::pin::pin; +use std::str::FromStr; use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::geyser::CommitmentLevel; use yellowstone_grpc_proto::prelude::SubscribeUpdate; @@ -23,6 +25,13 @@ async fn main() -> anyhow::Result<()> { let ctrl_c_signal = tokio::signal::ctrl_c(); + //config + let allow_private_addr = false; + let addr_verifier = SocketAddrSpace::new(allow_private_addr); + //let genesis_config_hash = solana_sdk::Hash(); + // let shred_version = + // solana_sdk::shred_version::compute_shred_version(&genesis_config.hash(), Some(&hard_forks)); + let cluster_info_stream = create_geyser_clusterinfo_stream().await?; let jh = tokio::spawn(async move { @@ -42,6 +51,19 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +//let socket_addr_space = SocketAddrSpace::new(matches.is_present("allow_private_addr")); +// pub shred_version: u16 = 0, + +const SHRED_VERSION: u16 = 0; + +fn verify_node(node: &SubscribeUpdateClusterInfo, addr_verifier: &SocketAddrSpace) -> bool { + node.gossip + .as_ref() + .and_then(|addr| std::net::SocketAddr::from_str(addr).ok()) + .map(|addr| addr_verifier.check(&addr)) + .unwrap_or(false) +} + async fn run_loop(cluster_info_stream: S) where S: Stream, @@ -54,11 +76,15 @@ where //spik first tick log_interval.tick().await; + //TODO remove only to see how shred version change. + let mut current_shred_version = 0; + let socket_addr_space = SocketAddrSpace::new(false); + loop { tokio::select! { //log interval TODO remove _ = log_interval.tick() => { - log::info!("Current cluster info list size:{}", node_list.len()); + log::info!("Current cluster shred_version:{current_shred_version} info list size:{}", node_list.len()); //verify cluster_infos let geyzer_clluster = node_list.clone(); tokio::task::spawn_blocking(move ||{ @@ -71,7 +97,12 @@ where }); } Some(update) = cluster_info_stream.next() => { - node_list.insert(update.pubkey.clone(), update); + current_shred_version = update.shred_version; + match verify_node(&update, &socket_addr_space) { + true => {node_list.insert(update.pubkey.clone(), update);}, + false=> log::info!("verify_node:{} for addr:{}", update.pubkey, update.gossip.unwrap_or("None".to_string())), + } + } } }