add cluster verification with RPC
This commit is contained in:
parent
32db10f040
commit
da3a3cd39c
|
@ -2,6 +2,8 @@
|
||||||
|
|
||||||
use async_stream::stream;
|
use async_stream::stream;
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
|
use solana_client::rpc_client::RpcClient;
|
||||||
|
use solana_sdk::commitment_config::CommitmentConfig;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::pin::pin;
|
use std::pin::pin;
|
||||||
|
@ -12,6 +14,7 @@ use yellowstone_grpc_proto::prelude::SubscribeUpdateClusterInfo;
|
||||||
use yellowstone_grpc_proto::prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots};
|
use yellowstone_grpc_proto::prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots};
|
||||||
use yellowstone_grpc_proto::tonic::Status;
|
use yellowstone_grpc_proto::tonic::Status;
|
||||||
|
|
||||||
|
const RPC_URL: &str = "http://localhost:8899";
|
||||||
const GRPC_URL: &str = "http://localhost:10000";
|
const GRPC_URL: &str = "http://localhost:10000";
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
@ -47,13 +50,25 @@ where
|
||||||
let mut cluster_info_stream = pin!(cluster_info_stream);
|
let mut cluster_info_stream = pin!(cluster_info_stream);
|
||||||
|
|
||||||
//Log current size of the cluster node info list size.
|
//Log current size of the cluster node info list size.
|
||||||
let mut log_interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
|
let mut log_interval = tokio::time::interval(tokio::time::Duration::from_secs(15));
|
||||||
|
//spik first tick
|
||||||
|
log_interval.tick().await;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
//log interval TODO remove
|
//log interval TODO remove
|
||||||
_ = log_interval.tick() => {
|
_ = log_interval.tick() => {
|
||||||
log::info!("Current cluster info list size:{}", node_list.len());
|
log::info!("Current cluster info list size:{}", node_list.len());
|
||||||
|
//verify cluster_infos
|
||||||
|
let geyzer_clluster = node_list.clone();
|
||||||
|
tokio::task::spawn_blocking(move ||{
|
||||||
|
match get_rpc_cluster_info() {
|
||||||
|
Ok(rpc_cluster) => {
|
||||||
|
verify_clusters(geyzer_clluster, rpc_cluster);
|
||||||
|
},
|
||||||
|
Err(err) => log::warn!("Error during get RPC cluster nodes: {err}"),
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
Some(update) = cluster_info_stream.next() => {
|
Some(update) = cluster_info_stream.next() => {
|
||||||
node_list.insert(update.pubkey.clone(), update);
|
node_list.insert(update.pubkey.clone(), update);
|
||||||
|
@ -118,3 +133,30 @@ async fn subscribe_geyzer_clusterinfo(
|
||||||
|
|
||||||
Ok(confirmed_stream)
|
Ok(confirmed_stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_rpc_cluster_info() -> anyhow::Result<Vec<solana_client::rpc_response::RpcContactInfo>> {
|
||||||
|
let rpc_client = RpcClient::new_with_timeout_and_commitment(
|
||||||
|
RPC_URL.to_string(),
|
||||||
|
tokio::time::Duration::from_secs(600),
|
||||||
|
CommitmentConfig::confirmed(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let cluster_nodes = rpc_client.get_cluster_nodes()?;
|
||||||
|
log::info!("RPC cluster nodes size:{}", cluster_nodes.len());
|
||||||
|
Ok(cluster_nodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn verify_clusters(
|
||||||
|
mut geyzer_cluster: HashMap<String, SubscribeUpdateClusterInfo>,
|
||||||
|
rpc_cluster: Vec<solana_client::rpc_response::RpcContactInfo>,
|
||||||
|
) {
|
||||||
|
//verify if all rpc are in geyzer
|
||||||
|
for rpc in &rpc_cluster {
|
||||||
|
if let None = geyzer_cluster.remove(&rpc.pubkey) {
|
||||||
|
log::info!("Rpc node not present in geyzer: {}", rpc.pubkey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
geyzer_cluster
|
||||||
|
.keys()
|
||||||
|
.for_each(|node| log::info!("Geyzer node not present in RPC: {}", node));
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue