diff --git a/stake_aggregate/src/main.rs b/stake_aggregate/src/main.rs index 083a79b..dd50c2f 100644 --- a/stake_aggregate/src/main.rs +++ b/stake_aggregate/src/main.rs @@ -201,6 +201,11 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re let jh = tokio::spawn(async move { BootstrapEvent::InitBootstrap }); spawned_bootstrap_task.push(jh); + //For DEBUG TODO remove: + //start stake verification loop + let mut stake_verification_sender = + crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await; + loop { tokio::select! { Some(req) = request_rx.recv() => { @@ -366,11 +371,12 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re ); let program_index = instruction.program_id_index; crate::stakestore::process_stake_tx_message( + &mut stake_verification_sender, &mut stakestore , &message.account_keys , instruction , program_index - ); + ).await; } } } diff --git a/stake_aggregate/src/stakestore.rs b/stake_aggregate/src/stakestore.rs index 862301a..b71af97 100644 --- a/stake_aggregate/src/stakestore.rs +++ b/stake_aggregate/src/stakestore.rs @@ -10,6 +10,7 @@ use solana_sdk::stake::instruction::StakeInstruction; use solana_sdk::stake::state::Delegation; use solana_sdk::stake::state::StakeState; use std::collections::HashMap; +use tokio::sync::mpsc::Sender; use yellowstone_grpc_proto::solana::storage::confirmed_block::CompiledInstruction; pub type StakeMap = HashMap; @@ -281,7 +282,75 @@ pub fn read_stake_from_account_data(mut data: &[u8]) -> anyhow::Result Sender<(String, Pubkey, Option)> { + let (request_tx, mut request_rx) = + tokio::sync::mpsc::channel::<(String, Pubkey, Option)>(100); + tokio::spawn(async move { + while let Some((instruction, stake_pk, stake)) = request_rx.recv().await { + tokio::spawn({ + let rpc_url = rpc_url.clone(); + async move { + tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await; + //get stake from RPC + let rpc_client = + solana_client::nonblocking::rpc_client::RpcClient::new_with_commitment( + rpc_url, + solana_sdk::commitment_config::CommitmentConfig::finalized(), + ); + match rpc_client.get_account(&stake_pk).await { + Ok(account) => { + let stake_data = crate::stakestore::read_stake_from_account_data( + account.data.as_slice(), + ); + log::info!( + "VERIFICATION Instruction:{instruction} Account:{} verification current stake:{stake:?} Rpc stake:{stake_data:?}", + stake_pk + ); + } + Err(solana_client::client_error::ClientError { + kind: + solana_client::client_error::ClientErrorKind::RpcError( + solana_client::rpc_request::RpcError::ForUser(msg), + ), + .. + }) => { + log::info!( + "VERIFICATION Instruction:{instruction} Account:{} verification RPC not found:{}", + stake_pk, + msg + ); + } + Err(err) => { + log::info!( + "VERIFICATION Instruction:{instruction} Account:{} verification RPC error:{err}", + stake_pk + ); + } + } + } + }); + } + }); + request_tx +} + +async fn send_verification( + stake_sender: &mut Sender<(String, Pubkey, Option)>, + stakestore: &mut StakeStore, + instr: &str, + stake_pybkey: Pubkey, +) { + let current_stake = stakestore.stakes.get(&stake_pybkey).cloned(); + stake_sender + .send((instr.to_string(), stake_pybkey, current_stake)) + .await + .unwrap(); +} + +pub async fn process_stake_tx_message( + stake_sender: &mut Sender<(String, Pubkey, Option)>, stakestore: &mut StakeStore, account_keys_vec: &[Vec], instruction: CompiledInstruction, @@ -330,6 +399,14 @@ pub fn process_stake_tx_message( , account_keys[instruction.accounts[0] as usize].to_string() , account_keys[instruction.accounts[1] as usize].to_string() ); + + send_verification( + stake_sender, + stakestore, + "Initialize", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::Authorize(new_authorized, authority_type) => { let value = json!({ @@ -346,6 +423,14 @@ pub fn process_stake_tx_message( ) }); log::info!("StakeInstruction::Authorize value:{value} custodian:{custodian:?}"); + + send_verification( + stake_sender, + stakestore, + "Authorize", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::DelegateStake => { let info = json!({ @@ -357,6 +442,14 @@ pub fn process_stake_tx_message( "stakeAuthority": account_keys[instruction.accounts[5] as usize].to_string(), }); log::info!("StakeInstruction::DelegateStake infos:{info}"); + + send_verification( + stake_sender, + stakestore, + "DelegateStake", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::Split(lamports) => { let info = json!({ @@ -366,6 +459,14 @@ pub fn process_stake_tx_message( "lamports": lamports, }); log::info!("StakeInstruction::Split infos:{info}"); + + send_verification( + stake_sender, + stakestore, + "Split", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::Withdraw(lamports) => { let info = json!({ @@ -379,6 +480,14 @@ pub fn process_stake_tx_message( let custodian = (instruction.accounts.len() >= 6) .then(|| json!(account_keys[instruction.accounts[5] as usize].to_string())); log::info!("StakeInstruction::Withdraw custodian:{custodian:?}infos:{info}"); + + send_verification( + stake_sender, + stakestore, + "Withdraw", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::Deactivate => { let info = json!({ @@ -406,6 +515,14 @@ pub fn process_stake_tx_message( "custodian": account_keys[instruction.accounts[1] as usize].to_string(), }); log::info!("StakeInstruction::SetLockup unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian} infos:{info}"); + + send_verification( + stake_sender, + stakestore, + "SetLockup", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::AuthorizeWithSeed(args) => { let info = json!({ @@ -421,6 +538,14 @@ pub fn process_stake_tx_message( let custodian = (instruction.accounts.len() >= 4) .then(|| json!(account_keys[instruction.accounts[3] as usize].to_string())); log::info!("StakeInstruction::AuthorizeWithSeed clockSysvar:{clock_sysvar:?} custodian:{custodian:?} infos:{info}"); + + send_verification( + stake_sender, + stakestore, + "AuthorizeWithSeed", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::InitializeChecked => { let info = json!({ @@ -430,6 +555,14 @@ pub fn process_stake_tx_message( "withdrawer": account_keys[instruction.accounts[3] as usize].to_string(), }); log::info!("StakeInstruction::InitializeChecked infos:{info}"); + + send_verification( + stake_sender, + stakestore, + "InitializeChecked", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::AuthorizeChecked(authority_type) => { let info = json!({ @@ -442,6 +575,14 @@ pub fn process_stake_tx_message( let custodian = (instruction.accounts.len() >= 5) .then(|| json!(account_keys[instruction.accounts[4] as usize].to_string())); log::info!("StakeInstruction::AuthorizeChecked custodian:{custodian:?} infos:{info}"); + + send_verification( + stake_sender, + stakestore, + "AuthorizeChecked", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::AuthorizeCheckedWithSeed(args) => { let info = json!({ @@ -458,6 +599,14 @@ pub fn process_stake_tx_message( log::info!( "StakeInstruction::AuthorizeCheckedWithSeed custodian:{custodian:?} infos:{info}" ); + + send_verification( + stake_sender, + stakestore, + "AuthorizeCheckedWithSeed", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::SetLockupChecked(lockup_args) => { let unix_timestamp = lockup_args @@ -475,6 +624,14 @@ pub fn process_stake_tx_message( "custodian": account_keys[instruction.accounts[1] as usize].to_string(), }); log::info!("StakeInstruction::SetLockupChecked unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian:?} infos:{info}"); + + send_verification( + stake_sender, + stakestore, + "SetLockupChecked", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::GetMinimumDelegation => { log::info!("StakeInstruction::GetMinimumDelegation"); @@ -486,6 +643,14 @@ pub fn process_stake_tx_message( "referenceVoteAccount": account_keys[instruction.accounts[2] as usize].to_string(), }); log::info!("StakeInstruction::DeactivateDelinquent infos:{info}"); + + send_verification( + stake_sender, + stakestore, + "DeactivateDelinquent", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::Redelegate => { let info = json!({ @@ -496,6 +661,14 @@ pub fn process_stake_tx_message( "stakeAuthority": account_keys[instruction.accounts[4] as usize].to_string(), }); log::info!("StakeInstruction::Redelegate infos:{info}"); + + send_verification( + stake_sender, + stakestore, + "Redelegate", + account_keys[instruction.accounts[0] as usize], + ) + .await; } StakeInstruction::Merge => { let info = json!({ @@ -516,6 +689,22 @@ pub fn process_stake_tx_message( source_pubkey.to_string() ); stakestore.remove_stake(source_pubkey); + + send_verification( + stake_sender, + stakestore, + "Merge Destination", + account_keys[instruction.accounts[0] as usize], + ) + .await; + + send_verification( + stake_sender, + stakestore, + "Merge Source", + account_keys[instruction.accounts[1] as usize], + ) + .await; } } }