Compare commits

...

3 Commits

Author SHA1 Message Date
musitdev 6c3a998f9b remove some logs 2023-12-18 19:05:15 +01:00
musitdev 0752cfa65c add shred version verification 2023-12-18 19:00:43 +01:00
musitdev 577ea296d6 add some logs 2023-12-18 16:12:35 +01:00
1 changed files with 38 additions and 15 deletions

View File

@ -34,8 +34,12 @@ async fn main() -> anyhow::Result<()> {
let cluster_info_stream = create_geyser_clusterinfo_stream().await?;
let jh = tokio::spawn(async move {
run_loop(cluster_info_stream).await;
let jh = tokio::spawn({
let shred_version = TESTNET_SHRED_VERSION;
async move {
run_loop(cluster_info_stream, shred_version).await;
}
});
tokio::select! {
@ -54,17 +58,26 @@ async fn main() -> anyhow::Result<()> {
//let socket_addr_space = SocketAddrSpace::new(matches.is_present("allow_private_addr"));
// pub shred_version: u16 = 0,
const SHRED_VERSION: u16 = 0;
const TESTNET_SHRED_VERSION: u16 = 5106;
const TESTNET_GENESIS_HASH: &str = "4uhcVJyU9pJkvQyS88uRDiswHXSCkY3zQawwpjk2NsNY";
fn verify_node(node: &SubscribeUpdateClusterInfo, addr_verifier: &SocketAddrSpace) -> bool {
const MAINNET_SHRED_VERSION: u16 = 5106;
const MAINNET_GENESIS_HASH: &str = "5eykt4UsFv8P8NJdTREpY1vzqKqZKvdpKuc147dw2N9d";
fn verify_node(
node: &SubscribeUpdateClusterInfo,
addr_verifier: &SocketAddrSpace,
shred_version: u16,
) -> bool {
node.gossip
.as_ref()
.and_then(|addr| std::net::SocketAddr::from_str(addr).ok())
.map(|addr| addr_verifier.check(&addr))
.unwrap_or(false)
&& node.shred_version as u16 == shred_version
}
async fn run_loop<S>(cluster_info_stream: S)
async fn run_loop<S>(cluster_info_stream: S, shred_version: u16)
where
S: Stream<Item = SubscribeUpdateClusterInfo>,
{
@ -77,14 +90,13 @@ where
log_interval.tick().await;
//TODO remove only to see how shred version change.
let mut current_shred_version = 0;
let socket_addr_space = SocketAddrSpace::new(false);
loop {
tokio::select! {
//log interval TODO remove
_ = log_interval.tick() => {
log::info!("Current cluster shred_version:{current_shred_version} info list size:{}", node_list.len());
log::info!("Current cluster info list size:{}", node_list.len());
//verify cluster_infos
let geyzer_clluster = node_list.clone();
tokio::task::spawn_blocking(move ||{
@ -97,11 +109,13 @@ where
});
}
Some(update) = cluster_info_stream.next() => {
current_shred_version = update.shred_version;
match verify_node(&update, &socket_addr_space) {
true => {node_list.insert(update.pubkey.clone(), update);},
false=> log::info!("verify_node:{} for addr:{}", update.pubkey, update.gossip.unwrap_or("None".to_string())),
if verify_node(&update, &socket_addr_space, shred_version) {
node_list.insert(update.pubkey.clone(), update);
}
// match verify_node(&update, &socket_addr_space, shred_version) {
// true => {node_list.insert(update.pubkey.clone(), update);},
// false=> log::info!("verify_node fail for:{} for addr:{}", update.pubkey, update.gossip.unwrap_or("None".to_string())),
// }
}
}
@ -184,10 +198,19 @@ fn verify_clusters(
//verify if all rpc are in geyzer
for rpc in &rpc_cluster {
if let None = geyzer_cluster.remove(&rpc.pubkey) {
log::info!("Rpc node not present in geyzer: {}", rpc.pubkey);
log::info!("Rpc node not present in geyzer: {} ", rpc.pubkey);
}
}
geyzer_cluster
.keys()
.for_each(|node| log::info!("Geyzer node not present in RPC: {}", node));
geyzer_cluster.iter().for_each(|(key, node)| {
log::info!(
"Geyzer node not present in RPC: {} gossip:{} shred_version:{} node:{:?}",
key,
node.gossip
.as_ref()
.map(|addr| addr.to_string())
.unwrap_or("None".to_string()),
node.shred_version,
node
)
});
}