2023-12-14 01:43:07 -08:00
|
|
|
// RUST_LOG=info cargo run
|
|
|
|
|
|
|
|
use async_stream::stream;
|
|
|
|
use futures::{Stream, StreamExt};
|
2023-12-14 06:25:42 -08:00
|
|
|
use solana_client::rpc_client::RpcClient;
|
|
|
|
use solana_sdk::commitment_config::CommitmentConfig;
|
2023-12-18 03:29:46 -08:00
|
|
|
use solana_streamer::socket::SocketAddrSpace;
|
2023-12-14 01:43:07 -08:00
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::collections::HashSet;
|
|
|
|
use std::pin::pin;
|
2023-12-18 03:29:46 -08:00
|
|
|
use std::str::FromStr;
|
2023-12-14 01:43:07 -08:00
|
|
|
use yellowstone_grpc_client::GeyserGrpcClient;
|
|
|
|
use yellowstone_grpc_proto::geyser::CommitmentLevel;
|
2023-12-22 09:09:12 -08:00
|
|
|
use yellowstone_grpc_proto::prelude::subscribe_update_cluster_info;
|
2023-12-14 01:43:07 -08:00
|
|
|
use yellowstone_grpc_proto::prelude::SubscribeUpdate;
|
|
|
|
use yellowstone_grpc_proto::prelude::SubscribeUpdateClusterInfo;
|
2023-12-22 09:09:12 -08:00
|
|
|
use yellowstone_grpc_proto::prelude::SubscribeUpdateClusterInfoUpdate;
|
2023-12-14 01:43:07 -08:00
|
|
|
use yellowstone_grpc_proto::prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots};
|
|
|
|
use yellowstone_grpc_proto::tonic::Status;
|
|
|
|
|
2023-12-14 06:25:42 -08:00
|
|
|
const RPC_URL: &str = "http://localhost:8899";
|
2023-12-14 01:43:07 -08:00
|
|
|
const GRPC_URL: &str = "http://localhost:10000";
|
|
|
|
|
|
|
|
#[tokio::main]
|
|
|
|
async fn main() -> anyhow::Result<()> {
|
|
|
|
tracing_subscriber::fmt::init();
|
|
|
|
|
|
|
|
let ctrl_c_signal = tokio::signal::ctrl_c();
|
|
|
|
|
2023-12-18 03:29:46 -08:00
|
|
|
//config
|
|
|
|
let allow_private_addr = false;
|
|
|
|
let addr_verifier = SocketAddrSpace::new(allow_private_addr);
|
|
|
|
//let genesis_config_hash = solana_sdk::Hash();
|
|
|
|
// let shred_version =
|
|
|
|
// solana_sdk::shred_version::compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
|
|
|
|
|
2023-12-14 01:43:07 -08:00
|
|
|
let cluster_info_stream = create_geyser_clusterinfo_stream().await?;
|
|
|
|
|
2023-12-18 10:00:43 -08:00
|
|
|
let jh = tokio::spawn({
|
|
|
|
let shred_version = TESTNET_SHRED_VERSION;
|
|
|
|
|
|
|
|
async move {
|
2023-12-22 09:09:12 -08:00
|
|
|
run_loop(cluster_info_stream, shred_version, addr_verifier).await;
|
2023-12-18 10:00:43 -08:00
|
|
|
}
|
2023-12-14 01:43:07 -08:00
|
|
|
});
|
|
|
|
|
|
|
|
tokio::select! {
|
|
|
|
res = jh => {
|
|
|
|
log::error!("Process quit unexpectedly {res:?}");
|
|
|
|
|
|
|
|
}
|
|
|
|
_ = ctrl_c_signal => {
|
|
|
|
log::info!("Received ctrl+c signal");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-12-18 03:29:46 -08:00
|
|
|
//let socket_addr_space = SocketAddrSpace::new(matches.is_present("allow_private_addr"));
|
|
|
|
// pub shred_version: u16 = 0,
|
|
|
|
|
2023-12-18 10:00:43 -08:00
|
|
|
const TESTNET_SHRED_VERSION: u16 = 5106;
|
|
|
|
const TESTNET_GENESIS_HASH: &str = "4uhcVJyU9pJkvQyS88uRDiswHXSCkY3zQawwpjk2NsNY";
|
|
|
|
|
|
|
|
const MAINNET_SHRED_VERSION: u16 = 5106;
|
|
|
|
const MAINNET_GENESIS_HASH: &str = "5eykt4UsFv8P8NJdTREpY1vzqKqZKvdpKuc147dw2N9d";
|
2023-12-18 03:29:46 -08:00
|
|
|
|
2023-12-18 10:00:43 -08:00
|
|
|
fn verify_node(
|
2023-12-22 09:09:12 -08:00
|
|
|
node: &SubscribeUpdateClusterInfoUpdate,
|
2023-12-18 10:00:43 -08:00
|
|
|
addr_verifier: &SocketAddrSpace,
|
|
|
|
shred_version: u16,
|
|
|
|
) -> bool {
|
2023-12-18 03:29:46 -08:00
|
|
|
node.gossip
|
|
|
|
.as_ref()
|
|
|
|
.and_then(|addr| std::net::SocketAddr::from_str(addr).ok())
|
|
|
|
.map(|addr| addr_verifier.check(&addr))
|
|
|
|
.unwrap_or(false)
|
2023-12-18 10:00:43 -08:00
|
|
|
&& node.shred_version as u16 == shred_version
|
2023-12-18 03:29:46 -08:00
|
|
|
}
|
|
|
|
|
2023-12-22 09:09:12 -08:00
|
|
|
async fn run_loop<S>(cluster_info_stream: S, shred_version: u16, socket_addr_space: SocketAddrSpace)
|
2023-12-14 01:43:07 -08:00
|
|
|
where
|
|
|
|
S: Stream<Item = SubscribeUpdateClusterInfo>,
|
|
|
|
{
|
2023-12-22 09:09:12 -08:00
|
|
|
let mut node_list: HashMap<String, SubscribeUpdateClusterInfoUpdate> = HashMap::new();
|
2023-12-14 01:43:07 -08:00
|
|
|
let mut cluster_info_stream = pin!(cluster_info_stream);
|
|
|
|
|
|
|
|
//Log current size of the cluster node info list size.
|
2023-12-14 06:25:42 -08:00
|
|
|
let mut log_interval = tokio::time::interval(tokio::time::Duration::from_secs(15));
|
|
|
|
//spik first tick
|
|
|
|
log_interval.tick().await;
|
2023-12-14 01:43:07 -08:00
|
|
|
|
|
|
|
loop {
|
|
|
|
tokio::select! {
|
|
|
|
//log interval TODO remove
|
|
|
|
_ = log_interval.tick() => {
|
2023-12-18 10:00:43 -08:00
|
|
|
log::info!("Current cluster info list size:{}", node_list.len());
|
2023-12-14 06:25:42 -08:00
|
|
|
//verify cluster_infos
|
|
|
|
let geyzer_clluster = node_list.clone();
|
|
|
|
tokio::task::spawn_blocking(move ||{
|
|
|
|
match get_rpc_cluster_info() {
|
|
|
|
Ok(rpc_cluster) => {
|
|
|
|
verify_clusters(geyzer_clluster, rpc_cluster);
|
|
|
|
},
|
|
|
|
Err(err) => log::warn!("Error during get RPC cluster nodes: {err}"),
|
|
|
|
}
|
|
|
|
});
|
2023-12-14 01:43:07 -08:00
|
|
|
}
|
|
|
|
Some(update) = cluster_info_stream.next() => {
|
2023-12-22 09:09:12 -08:00
|
|
|
match update.data {
|
|
|
|
Some(subscribe_update_cluster_info::Data::Update(update)) => {
|
|
|
|
log::trace!("{}, ClusterInfo update", update.pubkey);
|
|
|
|
if verify_node(&update, &socket_addr_space, shred_version) {
|
|
|
|
node_list.insert(update.pubkey.clone(), update);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Some(subscribe_update_cluster_info::Data::Remove(pk)) => {
|
|
|
|
log::info!("{}, ClusterInfo remove", pk);
|
|
|
|
node_list.remove(&pk);
|
|
|
|
}
|
|
|
|
None => {}
|
2023-12-18 03:29:46 -08:00
|
|
|
}
|
2023-12-22 09:09:12 -08:00
|
|
|
|
2023-12-18 10:05:15 -08:00
|
|
|
// match verify_node(&update, &socket_addr_space, shred_version) {
|
|
|
|
// true => {node_list.insert(update.pubkey.clone(), update);},
|
|
|
|
// false=> log::info!("verify_node fail for:{} for addr:{}", update.pubkey, update.gossip.unwrap_or("None".to_string())),
|
|
|
|
// }
|
2023-12-18 03:29:46 -08:00
|
|
|
|
2023-12-14 01:43:07 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn create_geyser_clusterinfo_stream(
|
|
|
|
) -> anyhow::Result<impl Stream<Item = SubscribeUpdateClusterInfo>> {
|
|
|
|
let geyzer_stream = subscribe_geyzer_clusterinfo(GRPC_URL.to_string()).await?;
|
|
|
|
|
|
|
|
Ok(stream! {
|
|
|
|
for await update_message in geyzer_stream {
|
|
|
|
match update_message {
|
|
|
|
Ok(update_message) => {
|
|
|
|
match update_message.update_oneof {
|
|
|
|
Some(UpdateOneof::ClusterInfo(update_cluster_info)) =>
|
|
|
|
{
|
|
|
|
yield update_cluster_info;
|
|
|
|
}
|
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
Err(tonic_status) => {
|
|
|
|
// TODO identify non-recoverable errors and cancel stream
|
|
|
|
panic!("Receive error on geyzer stream {:?}", tonic_status);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} // -- production loop
|
|
|
|
}) // -- stream!
|
|
|
|
}
|
|
|
|
|
|
|
|
//subscribe Geyser grpc
|
|
|
|
async fn subscribe_geyzer_clusterinfo(
|
|
|
|
grpc_url: String,
|
|
|
|
) -> anyhow::Result<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
|
|
|
|
let mut client = GeyserGrpcClient::connect(grpc_url, None::<&'static str>, None)?;
|
|
|
|
|
|
|
|
let mut slots = HashMap::new();
|
|
|
|
slots.insert("client".to_string(), SubscribeRequestFilterSlots::default());
|
|
|
|
|
|
|
|
//account subscription
|
|
|
|
let mut cluster_filter: HashSet<String> = HashSet::new();
|
|
|
|
cluster_filter.insert("client".to_owned());
|
|
|
|
|
|
|
|
let confirmed_stream = client
|
|
|
|
.subscribe_once(
|
|
|
|
slots.clone(),
|
|
|
|
Default::default(), //accounts
|
|
|
|
Default::default(), //tx
|
|
|
|
Default::default(), //entry
|
|
|
|
Default::default(), //full block
|
|
|
|
Default::default(), //block meta
|
|
|
|
Some(CommitmentLevel::Processed),
|
|
|
|
vec![],
|
|
|
|
None,
|
|
|
|
Some(cluster_filter),
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
Ok(confirmed_stream)
|
|
|
|
}
|
2023-12-14 06:25:42 -08:00
|
|
|
|
|
|
|
fn get_rpc_cluster_info() -> anyhow::Result<Vec<solana_client::rpc_response::RpcContactInfo>> {
|
|
|
|
let rpc_client = RpcClient::new_with_timeout_and_commitment(
|
|
|
|
RPC_URL.to_string(),
|
|
|
|
tokio::time::Duration::from_secs(600),
|
|
|
|
CommitmentConfig::confirmed(),
|
|
|
|
);
|
|
|
|
|
|
|
|
let cluster_nodes = rpc_client.get_cluster_nodes()?;
|
|
|
|
log::info!("RPC cluster nodes size:{}", cluster_nodes.len());
|
|
|
|
Ok(cluster_nodes)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn verify_clusters(
|
2023-12-22 09:09:12 -08:00
|
|
|
mut geyzer_cluster: HashMap<String, SubscribeUpdateClusterInfoUpdate>,
|
2023-12-14 06:25:42 -08:00
|
|
|
rpc_cluster: Vec<solana_client::rpc_response::RpcContactInfo>,
|
|
|
|
) {
|
|
|
|
//verify if all rpc are in geyzer
|
|
|
|
for rpc in &rpc_cluster {
|
|
|
|
if let None = geyzer_cluster.remove(&rpc.pubkey) {
|
2023-12-18 10:05:15 -08:00
|
|
|
log::info!("Rpc node not present in geyzer: {} ", rpc.pubkey);
|
2023-12-14 06:25:42 -08:00
|
|
|
}
|
|
|
|
}
|
2023-12-18 07:12:35 -08:00
|
|
|
geyzer_cluster.iter().for_each(|(key, node)| {
|
|
|
|
log::info!(
|
2023-12-18 10:00:43 -08:00
|
|
|
"Geyzer node not present in RPC: {} gossip:{} shred_version:{} node:{:?}",
|
2023-12-18 07:12:35 -08:00
|
|
|
key,
|
|
|
|
node.gossip
|
|
|
|
.as_ref()
|
|
|
|
.map(|addr| addr.to_string())
|
|
|
|
.unwrap_or("None".to_string()),
|
2023-12-18 10:00:43 -08:00
|
|
|
node.shred_version,
|
|
|
|
node
|
2023-12-18 07:12:35 -08:00
|
|
|
)
|
|
|
|
});
|
2023-12-14 06:25:42 -08:00
|
|
|
}
|