diff --git a/clusterinfo/Cargo.toml b/clusterinfo/Cargo.toml new file mode 100644 index 0000000..b1c42dd --- /dev/null +++ b/clusterinfo/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "clusterinfo" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.71" +bincode = "1.3.3" +borsh = "0.10.3" +bs58 = "0.4.0" +hex = "0.4.3" +log = "0.4.17" +tracing-subscriber = "0.3.16" +nom = "7.1.3" +itertools = "0.11.0" +reqwest = "0.11" +serde = "1.0" +serde_json = "1.0" +thiserror = "1.0.40" + +futures = { version = "0.3.28", default-features = false } +futures-util = "0.3.28" +tokio = { version = "1.*", features = ["full"] } +async-stream = "0.3.5" + +yellowstone-grpc-client = { path = "../../yellowstone-grpc-clusterinfo/yellowstone-grpc-client" } +yellowstone-grpc-proto = { path = "../../yellowstone-grpc-clusterinfo/yellowstone-grpc-proto" } + +#patch to build locally +solana-program = { path = "../../solana-geyzer-cluster/sdk/program"} +solana-sdk = { path = "../../solana-geyzer-cluster/sdk"} +solana-client = { path = "../../solana-geyzer-cluster/client"} +solana-rpc-client-api = { path = "../../solana-geyzer-cluster/rpc-client-api" } +solana-version = { path = "../../solana-geyzer-cluster/version"} +solana-account-decoder = { path = "../../solana-geyzer-cluster/account-decoder" } +solana-ledger = { path = "../../solana-geyzer-cluster/ledger"} +solana-streamer = { path = "../../solana-geyzer-cluster/streamer"} + + +#patch to build locally +[patch.crates-io] +solana-program = { path = "../../solana-geyzer-cluster/sdk/program"} +solana-zk-token-sdk = { path = "../../solana-geyzer-cluster/zk-token-sdk"} diff --git a/clusterinfo/src/main.rs b/clusterinfo/src/main.rs new file mode 100644 index 0000000..ce89894 --- /dev/null +++ b/clusterinfo/src/main.rs @@ -0,0 +1,226 @@ +// RUST_LOG=info cargo run + +use async_stream::stream; +use futures::{Stream, StreamExt}; +use solana_client::rpc_client::RpcClient; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_streamer::socket::SocketAddrSpace; +use std::collections::HashMap; +use std::collections::HashSet; +use std::pin::pin; +use std::str::FromStr; +use yellowstone_grpc_client::GeyserGrpcClient; +use yellowstone_grpc_proto::geyser::CommitmentLevel; +use yellowstone_grpc_proto::prelude::subscribe_update_cluster_info; +use yellowstone_grpc_proto::prelude::SubscribeUpdate; +use yellowstone_grpc_proto::prelude::SubscribeUpdateClusterInfo; +use yellowstone_grpc_proto::prelude::SubscribeUpdateClusterInfoUpdate; +use yellowstone_grpc_proto::prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots}; +use yellowstone_grpc_proto::tonic::Status; + +const RPC_URL: &str = "http://localhost:8899"; +const GRPC_URL: &str = "http://localhost:10000"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let ctrl_c_signal = tokio::signal::ctrl_c(); + + //config + let allow_private_addr = false; + let addr_verifier = SocketAddrSpace::new(allow_private_addr); + //let genesis_config_hash = solana_sdk::Hash(); + // let shred_version = + // solana_sdk::shred_version::compute_shred_version(&genesis_config.hash(), Some(&hard_forks)); + + let cluster_info_stream = create_geyser_clusterinfo_stream().await?; + + let jh = tokio::spawn({ + let shred_version = TESTNET_SHRED_VERSION; + + async move { + run_loop(cluster_info_stream, shred_version, addr_verifier).await; + } + }); + + tokio::select! { + res = jh => { + log::error!("Process quit unexpectedly {res:?}"); + + } + _ = ctrl_c_signal => { + log::info!("Received ctrl+c signal"); + } + } + + Ok(()) +} + +//let socket_addr_space = SocketAddrSpace::new(matches.is_present("allow_private_addr")); +// pub shred_version: u16 = 0, + +const TESTNET_SHRED_VERSION: u16 = 5106; +const TESTNET_GENESIS_HASH: &str = "4uhcVJyU9pJkvQyS88uRDiswHXSCkY3zQawwpjk2NsNY"; + +const MAINNET_SHRED_VERSION: u16 = 5106; +const MAINNET_GENESIS_HASH: &str = "5eykt4UsFv8P8NJdTREpY1vzqKqZKvdpKuc147dw2N9d"; + +fn verify_node( + node: &SubscribeUpdateClusterInfoUpdate, + addr_verifier: &SocketAddrSpace, + shred_version: u16, +) -> bool { + node.gossip + .as_ref() + .and_then(|addr| std::net::SocketAddr::from_str(addr).ok()) + .map(|addr| addr_verifier.check(&addr)) + .unwrap_or(false) + && node.shred_version as u16 == shred_version +} + +async fn run_loop(cluster_info_stream: S, shred_version: u16, socket_addr_space: SocketAddrSpace) +where + S: Stream, +{ + let mut node_list: HashMap = HashMap::new(); + let mut cluster_info_stream = pin!(cluster_info_stream); + + //Log current size of the cluster node info list size. + let mut log_interval = tokio::time::interval(tokio::time::Duration::from_secs(15)); + //spik first tick + log_interval.tick().await; + + loop { + tokio::select! { + //log interval TODO remove + _ = log_interval.tick() => { + log::info!("Current cluster info list size:{}", node_list.len()); + //verify cluster_infos + let geyzer_clluster = node_list.clone(); + tokio::task::spawn_blocking(move ||{ + match get_rpc_cluster_info() { + Ok(rpc_cluster) => { + verify_clusters(geyzer_clluster, rpc_cluster); + }, + Err(err) => log::warn!("Error during get RPC cluster nodes: {err}"), + } + }); + } + Some(update) = cluster_info_stream.next() => { + match update.data { + Some(subscribe_update_cluster_info::Data::Update(update)) => { + log::trace!("{}, ClusterInfo update", update.pubkey); + if verify_node(&update, &socket_addr_space, shred_version) { + node_list.insert(update.pubkey.clone(), update); + } + } + Some(subscribe_update_cluster_info::Data::Remove(pk)) => { + log::info!("{}, ClusterInfo remove", pk); + node_list.remove(&pk); + } + None => {} + } + + // match verify_node(&update, &socket_addr_space, shred_version) { + // true => {node_list.insert(update.pubkey.clone(), update);}, + // false=> log::info!("verify_node fail for:{} for addr:{}", update.pubkey, update.gossip.unwrap_or("None".to_string())), + // } + + } + } + } +} + +async fn create_geyser_clusterinfo_stream( +) -> anyhow::Result> { + let geyzer_stream = subscribe_geyzer_clusterinfo(GRPC_URL.to_string()).await?; + + Ok(stream! { + for await update_message in geyzer_stream { + match update_message { + Ok(update_message) => { + match update_message.update_oneof { + Some(UpdateOneof::ClusterInfo(update_cluster_info)) => + { + yield update_cluster_info; + } + _ => (), + } + + } + Err(tonic_status) => { + // TODO identify non-recoverable errors and cancel stream + panic!("Receive error on geyzer stream {:?}", tonic_status); + } + } + } // -- production loop + }) // -- stream! +} + +//subscribe Geyser grpc +async fn subscribe_geyzer_clusterinfo( + grpc_url: String, +) -> anyhow::Result>> { + let mut client = GeyserGrpcClient::connect(grpc_url, None::<&'static str>, None)?; + + let mut slots = HashMap::new(); + slots.insert("client".to_string(), SubscribeRequestFilterSlots::default()); + + //account subscription + let mut cluster_filter: HashSet = HashSet::new(); + cluster_filter.insert("client".to_owned()); + + let confirmed_stream = client + .subscribe_once( + slots.clone(), + Default::default(), //accounts + Default::default(), //tx + Default::default(), //entry + Default::default(), //full block + Default::default(), //block meta + Some(CommitmentLevel::Processed), + vec![], + None, + Some(cluster_filter), + ) + .await?; + + Ok(confirmed_stream) +} + +fn get_rpc_cluster_info() -> anyhow::Result> { + let rpc_client = RpcClient::new_with_timeout_and_commitment( + RPC_URL.to_string(), + tokio::time::Duration::from_secs(600), + CommitmentConfig::confirmed(), + ); + + let cluster_nodes = rpc_client.get_cluster_nodes()?; + log::info!("RPC cluster nodes size:{}", cluster_nodes.len()); + Ok(cluster_nodes) +} + +fn verify_clusters( + mut geyzer_cluster: HashMap, + rpc_cluster: Vec, +) { + //verify if all rpc are in geyzer + for rpc in &rpc_cluster { + if let None = geyzer_cluster.remove(&rpc.pubkey) { + log::info!("Rpc node not present in geyzer: {} ", rpc.pubkey); + } + } + geyzer_cluster.iter().for_each(|(key, node)| { + log::info!( + "Geyzer node not present in RPC: {} gossip:{} shred_version:{} node:{:?}", + key, + node.gossip + .as_ref() + .map(|addr| addr.to_string()) + .unwrap_or("None".to_string()), + node.shred_version, + node + ) + }); +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 22048ac..743f7cd 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.70.0" +channel = "1.72.0" diff --git a/stake_aggregate/Cargo.toml b/stake_aggregate/Cargo.toml index c66e3c6..cbf6165 100644 --- a/stake_aggregate/Cargo.toml +++ b/stake_aggregate/Cargo.toml @@ -27,6 +27,10 @@ path = "bin/sysvaraccount.rs" name = "send_get_vote_account" path = "bin/send_get_vote_account.rs" +[[bin]] +name = "testdeleteacc" +path = "bin/testdeleteacc.rs" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -55,26 +59,42 @@ jsonrpsee = { version = "0.20.0", features = ["macros", "server", "full"] } #jsonrpsee-types = "0.20.0" thiserror = "1.0.40" -#yellowstone-grpc-client = { path = "../../yellowstone-grpc/yellowstone-grpc-client" } -#yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto" } +#yellowstone-grpc-client = { path = "../../yellowstone-grpc-delete-account/yellowstone-grpc-client" } +#yellowstone-grpc-proto = { path = "../../yellowstone-grpc-delete-account/yellowstone-grpc-proto" } #yellowstone-grpc-client = { git = "http://github.com/rpcpool/yellowstone-grpc", rev = "c89b89dfc5f03f11f45ac4a6e832386a1d94cb67" } #yellowstone-grpc-proto = { git = "http://github.com/rpcpool/yellowstone-grpc", rev = "c89b89dfc5f03f11f45ac4a6e832386a1d94cb67" } -#yellowstone-grpc-client = "1.11.0+solana.1.16.14" -#yellowstone-grpc-proto = "1.10.0+solana.1.16.14" +yellowstone-grpc-client = "1.11.0+solana.1.16.14" +yellowstone-grpc-proto = "1.10.0+solana.1.16.14" #yellowstone-grpc-client = "v1.10.0+solana.1.16.17" #yellowstone-grpc-proto = "v1.10.0+solana.1.16.17" -yellowstone-grpc-client = "1.10.0" -yellowstone-grpc-proto = "1.10.0" +#yellowstone-grpc-client = "1.10.0" +#yellowstone-grpc-proto = "1.10.0" -solana-sdk = "1.16.14" -solana-client = "1.16.14" -solana-ledger = "1.16.14" -solana-rpc-client-api = "1.16.14" -solana-version = "1.16.14" -solana-account-decoder = "1.16.14" -solana-program = "1.16.14" + solana-sdk = "1.16.14" + solana-client = "1.16.14" + solana-ledger = "1.16.14" + solana-rpc-client-api = "1.16.14" + solana-version = "1.16.14" + solana-account-decoder = "1.16.14" + solana-program = "1.16.14" + + +#patch to build locally +#solana-program = { path = "../../solana/sdk/program" , version = "=1.18.0"} +#solana-sdk = { path = "../../solana/sdk" , version = "=1.18.0"} +#solana-client = { path = "../../solana/client" , version = "=1.18.0"} +#solana-rpc-client-api = { path = "../../solana/rpc-client-api" , version = "=1.18.0"} +#solana-version = { path = "../../solana/version" , version = "=1.18.0"} +#solana-account-decoder = { path = "../../solana/account-decoder" , version = "=1.18.0"} +#solana-ledger = { path = "../../solana/ledger" , version = "=1.18.0"} + + +#patch to build locally +#[patch.crates-io] +#solana-program = { path = "../../solana/sdk/program" , version = "=1.18.0"} +#solana-zk-token-sdk = { path = "../../solana/zk-token-sdk" , version = "=1.18.0"} diff --git a/stake_aggregate/bin/parse_validator_stake.rs b/stake_aggregate/bin/parse_validator_stake.rs index cc8d188..1c0c4f9 100644 --- a/stake_aggregate/bin/parse_validator_stake.rs +++ b/stake_aggregate/bin/parse_validator_stake.rs @@ -18,6 +18,7 @@ use std::time::Duration; //const STAKE_FILE: &str = "epoch528_leader_schedule_stakes.txt"; //const RPC_URL: &str = "http://localhost:8899"; const RPC_URL: &str = "https://api.testnet.solana.com"; +//const RPC_URL: &str = "https://api.mainnet-beta.solana.com"; #[tokio::main] async fn main() -> anyhow::Result<()> { diff --git a/stake_aggregate/bin/sysvaraccount.rs b/stake_aggregate/bin/sysvaraccount.rs index c73fc24..bdacc0a 100644 --- a/stake_aggregate/bin/sysvaraccount.rs +++ b/stake_aggregate/bin/sysvaraccount.rs @@ -62,7 +62,7 @@ async fn run_loop( //subscribe Geyser grpc //slot subscription let mut slots = HashMap::new(); - slots.insert("client".to_string(), SubscribeRequestFilterSlots {}); + slots.insert("client".to_string(), SubscribeRequestFilterSlots::default()); //account subscription let mut accounts_filter: HashMap = HashMap::new(); @@ -87,6 +87,7 @@ async fn run_loop( Default::default(), //block meta Some(CommitmentLevel::Confirmed), vec![], + None, ) .await?; diff --git a/stake_aggregate/bin/testdeleteacc.rs b/stake_aggregate/bin/testdeleteacc.rs new file mode 100644 index 0000000..a1c6ae2 --- /dev/null +++ b/stake_aggregate/bin/testdeleteacc.rs @@ -0,0 +1,104 @@ +// cargo run --bin testdeleteacc + +use anyhow::bail; +use futures_util::StreamExt; +use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; +use yellowstone_grpc_client::GeyserGrpcClient; +use yellowstone_grpc_proto::geyser::CommitmentLevel; +use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts; +use yellowstone_grpc_proto::{ + prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots}, + tonic::service::Interceptor, +}; + +const GRPC_URL: &str = "http://localhost:10000"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let client = GeyserGrpcClient::connect(GRPC_URL, None::<&'static str>, None)?; + + let ctrl_c_signal = tokio::signal::ctrl_c(); + + tokio::select! { + res = run_loop(client) => { + // This should never happen + log::error!("Services quit unexpectedly {res:?}"); + } + _ = ctrl_c_signal => { + log::info!("Received ctrl+c signal"); + } + } + + Ok(()) +} + +async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Result<()> { + //subscribe Geyser grpc + //slot subscription + let mut slots = HashMap::new(); + slots.insert("client".to_string(), SubscribeRequestFilterSlots::default()); + + //account subscription + let mut accounts_filter: HashMap = HashMap::new(); + accounts_filter.insert( + "client".to_owned(), + SubscribeRequestFilterAccounts { + account: vec![], + owner: vec![solana_sdk::stake::program::ID.to_string()], + filters: vec![], + }, + ); + + let mut confirmed_stream = client + .subscribe_once( + slots.clone(), + accounts_filter, //accounts + Default::default(), //tx + Default::default(), //entry + Default::default(), //full block + Default::default(), //block meta + Some(CommitmentLevel::Confirmed), + vec![], + None, + ) + .await?; + + while let Some(Ok(update)) = confirmed_stream.next().await { + match update.update_oneof { + Some(UpdateOneof::Account(account)) => { + log::info!("Geyser receive account at slot:{}", account.slot); + log::info!( + "Geyser notif with previous account owner:{:?}", + account + .previous_account_state + .map(|acc| Pubkey::try_from(acc.owner).expect("valid pubkey")) + ); + if let Some(account) = account.account { + log::info!( + "Geyser notif for account account:{:?}", + Pubkey::try_from(account.pubkey).expect("valid pubkey") + ); + log::info!( + "Geyser notif account owner:{:?}", + Pubkey::try_from(account.owner).expect("valid pubkey") + ); + } + } + Some(UpdateOneof::Slot(slot)) => log::trace!( + "Geyser receive slot:{} commitment:{:?}", + slot.slot, + slot.status() + ), + Some(UpdateOneof::Ping(_)) => { + log::trace!("GRPC Ping"); + } + k => { + bail!("Unexpected update: {k:?}"); + } + }; + } + Ok(()) +} diff --git a/stake_aggregate/rust-toolchain.toml b/stake_aggregate/rust-toolchain.toml new file mode 100644 index 0000000..743f7cd --- /dev/null +++ b/stake_aggregate/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "1.72.0" diff --git a/stake_aggregate/src/epoch.rs b/stake_aggregate/src/epoch.rs index bd03108..2257490 100644 --- a/stake_aggregate/src/epoch.rs +++ b/stake_aggregate/src/epoch.rs @@ -7,10 +7,92 @@ use solana_client::nonblocking::rpc_client::RpcClient; use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; use solana_sdk::epoch_info::EpochInfo; use solana_sdk::sysvar::epoch_schedule::EpochSchedule; +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::ops::Bound; use std::sync::Arc; use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel; +use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock; use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot; +#[derive(Debug)] +pub struct BlockSlotVerifier { + block_cache: BTreeMap, + slot_cache: BTreeSet, +} + +impl BlockSlotVerifier { + pub fn new() -> Self { + BlockSlotVerifier { + block_cache: BTreeMap::new(), + slot_cache: BTreeSet::new(), + } + } + pub fn process_slot(&mut self, slot: u64) -> Option<(u64, SubscribeUpdateBlock)> { + match self.block_cache.remove(&slot) { + //the block is already seen. Return slot/block. + Some(block) => Some((slot, block)), + None => { + self.slot_cache.insert(slot); + self.verify(slot); + None + } + } + } + pub fn process_block( + &mut self, + block: SubscribeUpdateBlock, + ) -> Option<(u64, SubscribeUpdateBlock)> { + let slot = block.slot; + if self.slot_cache.remove(&slot) { + //the slot is already seen. Return slot/block. + Some((slot, block)) + } else { + //Cache block and wait for the slot + let old = self.block_cache.insert(slot, block); + if old.is_some() { + log::warn!("Receive 2 blocks for the same slot:{slot}"); + } + None + } + } + + fn verify(&mut self, current_slot: u64) { + //do some verification on cached block and slot + let old_slot: Vec<_> = self + .slot_cache + .range(( + Bound::Unbounded, + Bound::Included(current_slot.saturating_sub(2)), + )) + .copied() + .collect(); + if old_slot.len() > 0 { + log::error!("Missing block for slots:{:?}", old_slot); + for slot in &old_slot { + self.slot_cache.remove(&slot); + } + } + + //verify that there's no too old block. + let old_block_slots: Vec<_> = self + .block_cache + .range(( + Bound::Unbounded, + Bound::Included(current_slot.saturating_sub(2)), + )) + .map(|(slot, _)| slot) + .copied() + .collect(); + if old_block_slots.len() > 0 { + log::error!("Missing slot for block slot:{:?}", old_slot); + for slot in old_block_slots { + self.block_cache.remove(&slot); + } + } + } +} + #[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)] pub struct Epoch { pub epoch: u64, diff --git a/stake_aggregate/src/main.rs b/stake_aggregate/src/main.rs index be30a42..97fc070 100644 --- a/stake_aggregate/src/main.rs +++ b/stake_aggregate/src/main.rs @@ -36,6 +36,7 @@ curl http://localhost:3001 -X POST -H "Content-Type: application/json" -d ' use crate::bootstrap::BootstrapData; use crate::bootstrap::BootstrapEvent; +use crate::epoch::BlockSlotVerifier; use crate::leader_schedule::LeaderScheduleData; use crate::stakestore::StakeStore; use crate::votestore::VoteStore; @@ -55,6 +56,7 @@ use yellowstone_grpc_proto::geyser::SubscribeUpdateAccount; use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts; use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocks; use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta; +use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock; use yellowstone_grpc_proto::{ prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots}, tonic::service::Interceptor, @@ -184,7 +186,7 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re //subscribe Geyser grpc //slot subscription let mut slots = HashMap::new(); - slots.insert("client".to_string(), SubscribeRequestFilterSlots {}); + slots.insert("client".to_string(), SubscribeRequestFilterSlots::default()); //account subscription let mut accounts: HashMap = HashMap::new(); @@ -195,8 +197,6 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re owner: vec![ solana_sdk::stake::program::ID.to_string(), solana_sdk::vote::program::ID.to_string(), - solana_sdk::sysvar::stake_history::ID.to_string(), - // solana_sdk::system_program::ID.to_string(), ], filters: vec![], }, @@ -230,6 +230,7 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re Default::default(), //block meta Some(CommitmentLevel::Confirmed), vec![], + None, ) .await?; @@ -254,11 +255,13 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re //For DEBUG TODO remove: //start stake verification loop - let mut stake_verification_sender = - crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await; + // let mut stake_verification_sender = + // crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await; - //TODO remove. Store parent hash to see if we don't miss a block. - let mut parent_block_slot = None; + //use to process block at confirm slot. + //at confirm slot are send before the block. + //Verify that the last confirmed slot receive is the block slot. + let mut block_slot_verifier = BlockSlotVerifier::new(); loop { tokio::select! { Some(req) = request_rx.recv() => { @@ -409,6 +412,12 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re } } + if let CommitmentLevel::Confirmed = slot.status() { + log::trace!("Receive confirmed slot:{}", slot.slot); + if let Some((slot, block)) = block_slot_verifier.process_slot(slot.slot) { + process_block(block, slot); + } + } } Some(UpdateOneof::BlockMeta(block_meta)) => { log::info!("Receive Block Meta at slot: {}", block_meta.slot); @@ -420,49 +429,8 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re block.parent_slot, ); - //TODO remove; Detect missing block - if let Some(parent_block_slot) = parent_block_slot { - if parent_block_slot != block.parent_slot { - log::error!("Bad parent slot stored:{} block:{}, miss a block" - ,parent_block_slot,block.parent_slot - ); - } - } - - parent_block_slot = Some(block.slot); - - //parse to detect stake merge tx. - //first in the main thread then in a specific thread. - let stake_public_key: Vec = solana_sdk::stake::program::id().to_bytes().to_vec(); - for notif_tx in block.transactions { - if !notif_tx.is_vote { - if let Some(message) = notif_tx.transaction.and_then(|tx| tx.message) { - for instruction in message.instructions { - //filter stake tx - if message.account_keys[instruction.program_id_index as usize] == stake_public_key { - let source_bytes: [u8; 64] = notif_tx.signature[..solana_sdk::signature::SIGNATURE_BYTES] - .try_into() - .unwrap(); - log::info!("New stake Tx sign:{} at block slot:{:?} current_slot:{} accounts:{:?}" - , solana_sdk::signature::Signature::from(source_bytes).to_string() - , block.slot - , current_epoch_state.current_slot.confirmed_slot - , instruction.accounts - ); - let program_index = instruction.program_id_index; - crate::stakestore::process_stake_tx_message( - &mut stake_verification_sender, - &mut stakestore - , &message.account_keys - , instruction - , program_index - , block.slot - , current_epoch_state.current_epoch_end_slot(), - ).await; - } - } - } - } + if let Some((slot, block)) = block_slot_verifier.process_block(block) { + process_block(block, slot); } } @@ -509,6 +477,42 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re Ok(()) } +fn process_block(block: SubscribeUpdateBlock, confirmed_slot: Slot) { + //parse to detect stake merge tx. + //first in the main thread then in a specific thread. + let stake_public_key: Vec = solana_sdk::stake::program::id().to_bytes().to_vec(); + for notif_tx in block.transactions { + if !notif_tx.is_vote { + if let Some(message) = notif_tx.transaction.and_then(|tx| tx.message) { + for instruction in message.instructions { + //filter stake tx + if message.account_keys[instruction.program_id_index as usize] + == stake_public_key + { + let source_bytes: [u8; 64] = notif_tx.signature + [..solana_sdk::signature::SIGNATURE_BYTES] + .try_into() + .unwrap(); + log::info!( + "New stake Tx sign:{} at block slot:{:?} current_slot:{} accounts:{:?}", + solana_sdk::signature::Signature::from(source_bytes).to_string(), + block.slot, + confirmed_slot, + instruction.accounts + ); + let program_index = instruction.program_id_index; + crate::stakestore::process_stake_tx_message( + &message.account_keys, + instruction, + program_index, + ); + } + } + } + } + } +} + #[derive(Debug)] #[allow(dead_code)] pub struct AccountPretty { diff --git a/stake_aggregate/src/stakestore.rs b/stake_aggregate/src/stakestore.rs index c20a8a7..2cad793 100644 --- a/stake_aggregate/src/stakestore.rs +++ b/stake_aggregate/src/stakestore.rs @@ -346,19 +346,6 @@ pub async fn start_stake_verification_loop( request_tx } -async fn send_verification( - _stake_sender: &mut Sender<(String, Pubkey, Option)>, - _stakestore: &mut StakeStore, - _instr: &str, - _stake_pybkey: Pubkey, -) { - // let current_stake = stakestore.stakes.get(&stake_pybkey).cloned(); - // stake_sender - // .send((instr.to_string(), stake_pybkey, current_stake)) - // .await - // .unwrap(); -} - fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: Vec) -> bool { !indexes .into_iter() @@ -367,15 +354,11 @@ fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: V .is_some() } -pub async fn process_stake_tx_message( - stake_sender: &mut Sender<(String, Pubkey, Option)>, - stakestore: &mut StakeStore, +pub fn process_stake_tx_message( account_keys_vec: &[Vec], instruction: CompiledInstruction, //for debug and trace purpose. program_id_index: u32, - _tx_slot: Slot, - _current_end_epoch_slot: u64, ) { //for tracing purpose let account_keys: Vec = account_keys_vec @@ -427,14 +410,6 @@ pub async fn process_stake_tx_message( } else { log::warn!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} Index error in instruction:{:?}", instruction.accounts); } - - send_verification( - stake_sender, - stakestore, - "Initialize", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::Authorize(new_authorized, authority_type) => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { @@ -458,14 +433,6 @@ pub async fn process_stake_tx_message( ) }); log::info!("StakeInstruction::Authorize value:{value} custodian:{custodian:?}"); - - send_verification( - stake_sender, - stakestore, - "Authorize", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::DelegateStake => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4, 5]) { @@ -484,14 +451,6 @@ pub async fn process_stake_tx_message( "stakeAuthority": account_keys[instruction.accounts[5] as usize].to_string(), }); log::info!("StakeInstruction::DelegateStake infos:{info}"); - - send_verification( - stake_sender, - stakestore, - "DelegateStake", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::Split(lamports) => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { @@ -508,14 +467,6 @@ pub async fn process_stake_tx_message( "lamports": lamports, }); log::info!("StakeInstruction::Split infos:{info}"); - - send_verification( - stake_sender, - stakestore, - "Split", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::Withdraw(lamports) => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) { @@ -536,14 +487,6 @@ pub async fn process_stake_tx_message( let custodian = (instruction.accounts.len() >= 6) .then(|| json!(account_keys[instruction.accounts[5] as usize].to_string())); log::info!("StakeInstruction::Withdraw custodian:{custodian:?}infos:{info}"); - - send_verification( - stake_sender, - stakestore, - "Withdraw", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::Deactivate => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { @@ -585,14 +528,6 @@ pub async fn process_stake_tx_message( "custodian": account_keys[instruction.accounts[1] as usize].to_string(), }); log::info!("StakeInstruction::SetLockup unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian} infos:{info}"); - - send_verification( - stake_sender, - stakestore, - "SetLockup", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::AuthorizeWithSeed(args) => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) { @@ -615,14 +550,6 @@ pub async fn process_stake_tx_message( let custodian = (instruction.accounts.len() >= 4) .then(|| json!(account_keys[instruction.accounts[3] as usize].to_string())); log::info!("StakeInstruction::AuthorizeWithSeed clockSysvar:{clock_sysvar:?} custodian:{custodian:?} infos:{info}"); - - send_verification( - stake_sender, - stakestore, - "AuthorizeWithSeed", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::InitializeChecked => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) { @@ -639,14 +566,6 @@ pub async fn process_stake_tx_message( "withdrawer": account_keys[instruction.accounts[3] as usize].to_string(), }); log::info!("StakeInstruction::InitializeChecked infos:{info}"); - - send_verification( - stake_sender, - stakestore, - "InitializeChecked", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::AuthorizeChecked(authority_type) => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) { @@ -666,14 +585,6 @@ pub async fn process_stake_tx_message( let custodian = (instruction.accounts.len() >= 5) .then(|| json!(account_keys[instruction.accounts[4] as usize].to_string())); log::info!("StakeInstruction::AuthorizeChecked custodian:{custodian:?} infos:{info}"); - - send_verification( - stake_sender, - stakestore, - "AuthorizeChecked", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::AuthorizeCheckedWithSeed(args) => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) { @@ -697,14 +608,6 @@ pub async fn process_stake_tx_message( log::info!( "StakeInstruction::AuthorizeCheckedWithSeed custodian:{custodian:?} infos:{info}" ); - - send_verification( - stake_sender, - stakestore, - "AuthorizeCheckedWithSeed", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::SetLockupChecked(lockup_args) => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { @@ -729,14 +632,6 @@ pub async fn process_stake_tx_message( "custodian": account_keys[instruction.accounts[1] as usize].to_string(), }); log::info!("StakeInstruction::SetLockupChecked unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian:?} infos:{info}"); - - send_verification( - stake_sender, - stakestore, - "SetLockupChecked", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::GetMinimumDelegation => { log::info!("StakeInstruction::GetMinimumDelegation"); @@ -755,14 +650,6 @@ pub async fn process_stake_tx_message( "referenceVoteAccount": account_keys[instruction.accounts[2] as usize].to_string(), }); log::info!("StakeInstruction::DeactivateDelinquent infos:{info}"); - - send_verification( - stake_sender, - stakestore, - "DeactivateDelinquent", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::Redelegate => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) { @@ -780,14 +667,6 @@ pub async fn process_stake_tx_message( "stakeAuthority": account_keys[instruction.accounts[4] as usize].to_string(), }); log::info!("StakeInstruction::Redelegate infos:{info}"); - - send_verification( - stake_sender, - stakestore, - "Redelegate", - account_keys[instruction.accounts[0] as usize], - ) - .await; } StakeInstruction::Merge => { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) { @@ -827,22 +706,6 @@ pub async fn process_stake_tx_message( // }, // current_end_epoch_slot, // ); - - // send_verification( - // stake_sender, - // stakestore, - // "Merge Destination", - // account_keys[instruction.accounts[0] as usize], - // ) - // .await; - - // send_verification( - // stake_sender, - // stakestore, - // "Merge Source", - // account_keys[instruction.accounts[1] as usize], - // ) - // .await; } } }