From 32db10f0407eba536b97bf058e7ae63badb113a0 Mon Sep 17 00:00:00 2001 From: musitdev Date: Thu, 14 Dec 2023 10:43:07 +0100 Subject: [PATCH] add cluster info feed and delete account pr test --- clusterinfo/Cargo.toml | 44 +++++++ clusterinfo/src/main.rs | 120 +++++++++++++++++++ rust-toolchain.toml | 2 +- stake_aggregate/Cargo.toml | 46 +++++-- stake_aggregate/bin/parse_validator_stake.rs | 4 +- stake_aggregate/bin/sysvaraccount.rs | 3 +- stake_aggregate/bin/testdeleteacc.rs | 104 ++++++++++++++++ stake_aggregate/rust-toolchain.toml | 2 + stake_aggregate/src/main.rs | 5 +- 9 files changed, 310 insertions(+), 20 deletions(-) create mode 100644 clusterinfo/Cargo.toml create mode 100644 clusterinfo/src/main.rs create mode 100644 stake_aggregate/bin/testdeleteacc.rs create mode 100644 stake_aggregate/rust-toolchain.toml diff --git a/clusterinfo/Cargo.toml b/clusterinfo/Cargo.toml new file mode 100644 index 0000000..d550ba8 --- /dev/null +++ b/clusterinfo/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "clusterinfo" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.71" +bincode = "1.3.3" +borsh = "0.10.3" +bs58 = "0.4.0" +hex = "0.4.3" +log = "0.4.17" +tracing-subscriber = "0.3.16" +nom = "7.1.3" +itertools = "0.11.0" +reqwest = "0.11" +serde = "1.0" +serde_json = "1.0" +thiserror = "1.0.40" + +futures = { version = "0.3.28", default-features = false } +futures-util = "0.3.28" +tokio = { version = "1.*", features = ["full"] } +async-stream = "0.3.5" + +yellowstone-grpc-client = { path = "../../yellowstone-grpc-clusterinfo/yellowstone-grpc-client" } +yellowstone-grpc-proto = { path = "../../yellowstone-grpc-clusterinfo/yellowstone-grpc-proto" } + +#patch to build locally +solana-program = { path = "../../solana-geyzer-cluster/sdk/program"} +solana-sdk = { path = "../../solana-geyzer-cluster/sdk"} +solana-client = { path = "../../solana-geyzer-cluster/client"} +solana-rpc-client-api = { path = "../../solana-geyzer-cluster/rpc-client-api" } +solana-version = { path = "../../solana-geyzer-cluster/version"} +solana-account-decoder = { path = "../../solana-geyzer-cluster/account-decoder" } +solana-ledger = { path = "../../solana-geyzer-cluster/ledger"} + + +#patch to build locally +[patch.crates-io] +solana-program = { path = "../../solana-geyzer-cluster/sdk/program"} +solana-zk-token-sdk = { path = "../../solana-geyzer-cluster/zk-token-sdk"} diff --git a/clusterinfo/src/main.rs b/clusterinfo/src/main.rs new file mode 100644 index 0000000..d68b62e --- /dev/null +++ b/clusterinfo/src/main.rs @@ -0,0 +1,120 @@ +// RUST_LOG=info cargo run + +use async_stream::stream; +use futures::{Stream, StreamExt}; +use std::collections::HashMap; +use std::collections::HashSet; +use std::pin::pin; +use yellowstone_grpc_client::GeyserGrpcClient; +use yellowstone_grpc_proto::geyser::CommitmentLevel; +use yellowstone_grpc_proto::prelude::SubscribeUpdate; +use yellowstone_grpc_proto::prelude::SubscribeUpdateClusterInfo; +use yellowstone_grpc_proto::prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots}; +use yellowstone_grpc_proto::tonic::Status; + +const GRPC_URL: &str = "http://localhost:10000"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let ctrl_c_signal = tokio::signal::ctrl_c(); + + let cluster_info_stream = create_geyser_clusterinfo_stream().await?; + + let jh = tokio::spawn(async move { + run_loop(cluster_info_stream).await; + }); + + tokio::select! { + res = jh => { + log::error!("Process quit unexpectedly {res:?}"); + + } + _ = ctrl_c_signal => { + log::info!("Received ctrl+c signal"); + } + } + + Ok(()) +} + +async fn run_loop(cluster_info_stream: S) +where + S: Stream, +{ + let mut node_list: HashMap = HashMap::new(); + let mut cluster_info_stream = pin!(cluster_info_stream); + + //Log current size of the cluster node info list size. + let mut log_interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); + + loop { + tokio::select! { + //log interval TODO remove + _ = log_interval.tick() => { + log::info!("Current cluster info list size:{}", node_list.len()); + } + Some(update) = cluster_info_stream.next() => { + node_list.insert(update.pubkey.clone(), update); + } + } + } +} + +async fn create_geyser_clusterinfo_stream( +) -> anyhow::Result> { + let geyzer_stream = subscribe_geyzer_clusterinfo(GRPC_URL.to_string()).await?; + + Ok(stream! { + for await update_message in geyzer_stream { + match update_message { + Ok(update_message) => { + match update_message.update_oneof { + Some(UpdateOneof::ClusterInfo(update_cluster_info)) => + { + yield update_cluster_info; + } + _ => (), + } + + } + Err(tonic_status) => { + // TODO identify non-recoverable errors and cancel stream + panic!("Receive error on geyzer stream {:?}", tonic_status); + } + } + } // -- production loop + }) // -- stream! +} + +//subscribe Geyser grpc +async fn subscribe_geyzer_clusterinfo( + grpc_url: String, +) -> anyhow::Result>> { + let mut client = GeyserGrpcClient::connect(grpc_url, None::<&'static str>, None)?; + + let mut slots = HashMap::new(); + slots.insert("client".to_string(), SubscribeRequestFilterSlots::default()); + + //account subscription + let mut cluster_filter: HashSet = HashSet::new(); + cluster_filter.insert("client".to_owned()); + + let confirmed_stream = client + .subscribe_once( + slots.clone(), + Default::default(), //accounts + Default::default(), //tx + Default::default(), //entry + Default::default(), //full block + Default::default(), //block meta + Some(CommitmentLevel::Processed), + vec![], + None, + Some(cluster_filter), + ) + .await?; + + Ok(confirmed_stream) +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 22048ac..743f7cd 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.70.0" +channel = "1.72.0" diff --git a/stake_aggregate/Cargo.toml b/stake_aggregate/Cargo.toml index c66e3c6..cbf6165 100644 --- a/stake_aggregate/Cargo.toml +++ b/stake_aggregate/Cargo.toml @@ -27,6 +27,10 @@ path = "bin/sysvaraccount.rs" name = "send_get_vote_account" path = "bin/send_get_vote_account.rs" +[[bin]] +name = "testdeleteacc" +path = "bin/testdeleteacc.rs" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -55,26 +59,42 @@ jsonrpsee = { version = "0.20.0", features = ["macros", "server", "full"] } #jsonrpsee-types = "0.20.0" thiserror = "1.0.40" -#yellowstone-grpc-client = { path = "../../yellowstone-grpc/yellowstone-grpc-client" } -#yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto" } +#yellowstone-grpc-client = { path = "../../yellowstone-grpc-delete-account/yellowstone-grpc-client" } +#yellowstone-grpc-proto = { path = "../../yellowstone-grpc-delete-account/yellowstone-grpc-proto" } #yellowstone-grpc-client = { git = "http://github.com/rpcpool/yellowstone-grpc", rev = "c89b89dfc5f03f11f45ac4a6e832386a1d94cb67" } #yellowstone-grpc-proto = { git = "http://github.com/rpcpool/yellowstone-grpc", rev = "c89b89dfc5f03f11f45ac4a6e832386a1d94cb67" } -#yellowstone-grpc-client = "1.11.0+solana.1.16.14" -#yellowstone-grpc-proto = "1.10.0+solana.1.16.14" +yellowstone-grpc-client = "1.11.0+solana.1.16.14" +yellowstone-grpc-proto = "1.10.0+solana.1.16.14" #yellowstone-grpc-client = "v1.10.0+solana.1.16.17" #yellowstone-grpc-proto = "v1.10.0+solana.1.16.17" -yellowstone-grpc-client = "1.10.0" -yellowstone-grpc-proto = "1.10.0" +#yellowstone-grpc-client = "1.10.0" +#yellowstone-grpc-proto = "1.10.0" -solana-sdk = "1.16.14" -solana-client = "1.16.14" -solana-ledger = "1.16.14" -solana-rpc-client-api = "1.16.14" -solana-version = "1.16.14" -solana-account-decoder = "1.16.14" -solana-program = "1.16.14" + solana-sdk = "1.16.14" + solana-client = "1.16.14" + solana-ledger = "1.16.14" + solana-rpc-client-api = "1.16.14" + solana-version = "1.16.14" + solana-account-decoder = "1.16.14" + solana-program = "1.16.14" + + +#patch to build locally +#solana-program = { path = "../../solana/sdk/program" , version = "=1.18.0"} +#solana-sdk = { path = "../../solana/sdk" , version = "=1.18.0"} +#solana-client = { path = "../../solana/client" , version = "=1.18.0"} +#solana-rpc-client-api = { path = "../../solana/rpc-client-api" , version = "=1.18.0"} +#solana-version = { path = "../../solana/version" , version = "=1.18.0"} +#solana-account-decoder = { path = "../../solana/account-decoder" , version = "=1.18.0"} +#solana-ledger = { path = "../../solana/ledger" , version = "=1.18.0"} + + +#patch to build locally +#[patch.crates-io] +#solana-program = { path = "../../solana/sdk/program" , version = "=1.18.0"} +#solana-zk-token-sdk = { path = "../../solana/zk-token-sdk" , version = "=1.18.0"} diff --git a/stake_aggregate/bin/parse_validator_stake.rs b/stake_aggregate/bin/parse_validator_stake.rs index 8b990df..1c0c4f9 100644 --- a/stake_aggregate/bin/parse_validator_stake.rs +++ b/stake_aggregate/bin/parse_validator_stake.rs @@ -17,8 +17,8 @@ use std::time::Duration; //const STAKE_FILE: &str = "epoch528_leader_schedule_stakes.txt"; //const RPC_URL: &str = "http://localhost:8899"; -//const RPC_URL: &str = "https://api.testnet.solana.com"; -const RPC_URL: &str = "https://api.mainnet-beta.solana.com"; +const RPC_URL: &str = "https://api.testnet.solana.com"; +//const RPC_URL: &str = "https://api.mainnet-beta.solana.com"; #[tokio::main] async fn main() -> anyhow::Result<()> { diff --git a/stake_aggregate/bin/sysvaraccount.rs b/stake_aggregate/bin/sysvaraccount.rs index c73fc24..bdacc0a 100644 --- a/stake_aggregate/bin/sysvaraccount.rs +++ b/stake_aggregate/bin/sysvaraccount.rs @@ -62,7 +62,7 @@ async fn run_loop( //subscribe Geyser grpc //slot subscription let mut slots = HashMap::new(); - slots.insert("client".to_string(), SubscribeRequestFilterSlots {}); + slots.insert("client".to_string(), SubscribeRequestFilterSlots::default()); //account subscription let mut accounts_filter: HashMap = HashMap::new(); @@ -87,6 +87,7 @@ async fn run_loop( Default::default(), //block meta Some(CommitmentLevel::Confirmed), vec![], + None, ) .await?; diff --git a/stake_aggregate/bin/testdeleteacc.rs b/stake_aggregate/bin/testdeleteacc.rs new file mode 100644 index 0000000..a1c6ae2 --- /dev/null +++ b/stake_aggregate/bin/testdeleteacc.rs @@ -0,0 +1,104 @@ +// cargo run --bin testdeleteacc + +use anyhow::bail; +use futures_util::StreamExt; +use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; +use yellowstone_grpc_client::GeyserGrpcClient; +use yellowstone_grpc_proto::geyser::CommitmentLevel; +use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts; +use yellowstone_grpc_proto::{ + prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots}, + tonic::service::Interceptor, +}; + +const GRPC_URL: &str = "http://localhost:10000"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let client = GeyserGrpcClient::connect(GRPC_URL, None::<&'static str>, None)?; + + let ctrl_c_signal = tokio::signal::ctrl_c(); + + tokio::select! { + res = run_loop(client) => { + // This should never happen + log::error!("Services quit unexpectedly {res:?}"); + } + _ = ctrl_c_signal => { + log::info!("Received ctrl+c signal"); + } + } + + Ok(()) +} + +async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Result<()> { + //subscribe Geyser grpc + //slot subscription + let mut slots = HashMap::new(); + slots.insert("client".to_string(), SubscribeRequestFilterSlots::default()); + + //account subscription + let mut accounts_filter: HashMap = HashMap::new(); + accounts_filter.insert( + "client".to_owned(), + SubscribeRequestFilterAccounts { + account: vec![], + owner: vec![solana_sdk::stake::program::ID.to_string()], + filters: vec![], + }, + ); + + let mut confirmed_stream = client + .subscribe_once( + slots.clone(), + accounts_filter, //accounts + Default::default(), //tx + Default::default(), //entry + Default::default(), //full block + Default::default(), //block meta + Some(CommitmentLevel::Confirmed), + vec![], + None, + ) + .await?; + + while let Some(Ok(update)) = confirmed_stream.next().await { + match update.update_oneof { + Some(UpdateOneof::Account(account)) => { + log::info!("Geyser receive account at slot:{}", account.slot); + log::info!( + "Geyser notif with previous account owner:{:?}", + account + .previous_account_state + .map(|acc| Pubkey::try_from(acc.owner).expect("valid pubkey")) + ); + if let Some(account) = account.account { + log::info!( + "Geyser notif for account account:{:?}", + Pubkey::try_from(account.pubkey).expect("valid pubkey") + ); + log::info!( + "Geyser notif account owner:{:?}", + Pubkey::try_from(account.owner).expect("valid pubkey") + ); + } + } + Some(UpdateOneof::Slot(slot)) => log::trace!( + "Geyser receive slot:{} commitment:{:?}", + slot.slot, + slot.status() + ), + Some(UpdateOneof::Ping(_)) => { + log::trace!("GRPC Ping"); + } + k => { + bail!("Unexpected update: {k:?}"); + } + }; + } + Ok(()) +} diff --git a/stake_aggregate/rust-toolchain.toml b/stake_aggregate/rust-toolchain.toml new file mode 100644 index 0000000..743f7cd --- /dev/null +++ b/stake_aggregate/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "1.72.0" diff --git a/stake_aggregate/src/main.rs b/stake_aggregate/src/main.rs index cd5479b..97fc070 100644 --- a/stake_aggregate/src/main.rs +++ b/stake_aggregate/src/main.rs @@ -186,7 +186,7 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re //subscribe Geyser grpc //slot subscription let mut slots = HashMap::new(); - slots.insert("client".to_string(), SubscribeRequestFilterSlots {}); + slots.insert("client".to_string(), SubscribeRequestFilterSlots::default()); //account subscription let mut accounts: HashMap = HashMap::new(); @@ -197,8 +197,6 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re owner: vec![ solana_sdk::stake::program::ID.to_string(), solana_sdk::vote::program::ID.to_string(), - solana_sdk::sysvar::stake_history::ID.to_string(), - // solana_sdk::system_program::ID.to_string(), ], filters: vec![], }, @@ -232,6 +230,7 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re Default::default(), //block meta Some(CommitmentLevel::Confirmed), vec![], + None, ) .await?;