add verify gossip addr before inserting
This commit is contained in:
parent
da3a3cd39c
commit
4c6af9b3ec
|
@ -36,6 +36,7 @@ solana-rpc-client-api = { path = "../../solana-geyzer-cluster/rpc-client-api" }
|
||||||
solana-version = { path = "../../solana-geyzer-cluster/version"}
|
solana-version = { path = "../../solana-geyzer-cluster/version"}
|
||||||
solana-account-decoder = { path = "../../solana-geyzer-cluster/account-decoder" }
|
solana-account-decoder = { path = "../../solana-geyzer-cluster/account-decoder" }
|
||||||
solana-ledger = { path = "../../solana-geyzer-cluster/ledger"}
|
solana-ledger = { path = "../../solana-geyzer-cluster/ledger"}
|
||||||
|
solana-streamer = { path = "../../solana-geyzer-cluster/streamer"}
|
||||||
|
|
||||||
|
|
||||||
#patch to build locally
|
#patch to build locally
|
||||||
|
|
|
@ -4,9 +4,11 @@ use async_stream::stream;
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use solana_client::rpc_client::RpcClient;
|
use solana_client::rpc_client::RpcClient;
|
||||||
use solana_sdk::commitment_config::CommitmentConfig;
|
use solana_sdk::commitment_config::CommitmentConfig;
|
||||||
|
use solana_streamer::socket::SocketAddrSpace;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::pin::pin;
|
use std::pin::pin;
|
||||||
|
use std::str::FromStr;
|
||||||
use yellowstone_grpc_client::GeyserGrpcClient;
|
use yellowstone_grpc_client::GeyserGrpcClient;
|
||||||
use yellowstone_grpc_proto::geyser::CommitmentLevel;
|
use yellowstone_grpc_proto::geyser::CommitmentLevel;
|
||||||
use yellowstone_grpc_proto::prelude::SubscribeUpdate;
|
use yellowstone_grpc_proto::prelude::SubscribeUpdate;
|
||||||
|
@ -23,6 +25,13 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let ctrl_c_signal = tokio::signal::ctrl_c();
|
let ctrl_c_signal = tokio::signal::ctrl_c();
|
||||||
|
|
||||||
|
//config
|
||||||
|
let allow_private_addr = false;
|
||||||
|
let addr_verifier = SocketAddrSpace::new(allow_private_addr);
|
||||||
|
//let genesis_config_hash = solana_sdk::Hash();
|
||||||
|
// let shred_version =
|
||||||
|
// solana_sdk::shred_version::compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
|
||||||
|
|
||||||
let cluster_info_stream = create_geyser_clusterinfo_stream().await?;
|
let cluster_info_stream = create_geyser_clusterinfo_stream().await?;
|
||||||
|
|
||||||
let jh = tokio::spawn(async move {
|
let jh = tokio::spawn(async move {
|
||||||
|
@ -42,6 +51,19 @@ async fn main() -> anyhow::Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//let socket_addr_space = SocketAddrSpace::new(matches.is_present("allow_private_addr"));
|
||||||
|
// pub shred_version: u16 = 0,
|
||||||
|
|
||||||
|
const SHRED_VERSION: u16 = 0;
|
||||||
|
|
||||||
|
fn verify_node(node: &SubscribeUpdateClusterInfo, addr_verifier: &SocketAddrSpace) -> bool {
|
||||||
|
node.gossip
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|addr| std::net::SocketAddr::from_str(addr).ok())
|
||||||
|
.map(|addr| addr_verifier.check(&addr))
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
async fn run_loop<S>(cluster_info_stream: S)
|
async fn run_loop<S>(cluster_info_stream: S)
|
||||||
where
|
where
|
||||||
S: Stream<Item = SubscribeUpdateClusterInfo>,
|
S: Stream<Item = SubscribeUpdateClusterInfo>,
|
||||||
|
@ -54,11 +76,15 @@ where
|
||||||
//spik first tick
|
//spik first tick
|
||||||
log_interval.tick().await;
|
log_interval.tick().await;
|
||||||
|
|
||||||
|
//TODO remove only to see how shred version change.
|
||||||
|
let mut current_shred_version = 0;
|
||||||
|
let socket_addr_space = SocketAddrSpace::new(false);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
//log interval TODO remove
|
//log interval TODO remove
|
||||||
_ = log_interval.tick() => {
|
_ = log_interval.tick() => {
|
||||||
log::info!("Current cluster info list size:{}", node_list.len());
|
log::info!("Current cluster shred_version:{current_shred_version} info list size:{}", node_list.len());
|
||||||
//verify cluster_infos
|
//verify cluster_infos
|
||||||
let geyzer_clluster = node_list.clone();
|
let geyzer_clluster = node_list.clone();
|
||||||
tokio::task::spawn_blocking(move ||{
|
tokio::task::spawn_blocking(move ||{
|
||||||
|
@ -71,7 +97,12 @@ where
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Some(update) = cluster_info_stream.next() => {
|
Some(update) = cluster_info_stream.next() => {
|
||||||
node_list.insert(update.pubkey.clone(), update);
|
current_shred_version = update.shred_version;
|
||||||
|
match verify_node(&update, &socket_addr_space) {
|
||||||
|
true => {node_list.insert(update.pubkey.clone(), update);},
|
||||||
|
false=> log::info!("verify_node:{} for addr:{}", update.pubkey, update.gossip.unwrap_or("None".to_string())),
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue