diff --git a/stake_aggregate/Cargo.toml b/stake_aggregate/Cargo.toml index 8373bff..c66e3c6 100644 --- a/stake_aggregate/Cargo.toml +++ b/stake_aggregate/Cargo.toml @@ -23,6 +23,10 @@ path = "bin/stakehistory.rs" name = "sysvaraccount" path = "bin/sysvaraccount.rs" +[[bin]] +name = "send_get_vote_account" +path = "bin/send_get_vote_account.rs" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/stake_aggregate/bin/parse_validator_stake.rs b/stake_aggregate/bin/parse_validator_stake.rs index 8d507b7..cc8d188 100644 --- a/stake_aggregate/bin/parse_validator_stake.rs +++ b/stake_aggregate/bin/parse_validator_stake.rs @@ -98,7 +98,7 @@ async fn main() -> anyhow::Result<()> { }; let leader_schedule = calculate_leader_schedule(validator_stakes, &epoch).unwrap(); - verify_schedule(leader_schedule).unwrap(); + verify_schedule(leader_schedule, &aggregate_stake_file_path).unwrap(); Ok(()) } @@ -128,7 +128,7 @@ fn calculate_leader_schedule( )) } -fn verify_schedule(schedule: LeaderSchedule) -> anyhow::Result<()> { +fn verify_schedule(schedule: LeaderSchedule, file_name: &str) -> anyhow::Result<()> { log::info!("verify_schedule Start."); let rpc_client = RpcClient::new_with_timeout_and_commitment( RPC_URL.to_string(), @@ -154,6 +154,16 @@ fn verify_schedule(schedule: LeaderSchedule) -> anyhow::Result<()> { //map leaderscheudle to HashMap> let slot_leaders = schedule.get_slot_leaders(); + + //save_slot_leaders + let string_slot_leader: Vec = slot_leaders.iter().map(|pk| pk.to_string()).collect(); + let serialized_schedule = serde_json::to_string(&string_slot_leader).unwrap(); + let filename = format!("{file_name}.schedule.json",); + // Write to the file + let mut file = File::create(filename).unwrap(); + file.write_all(serialized_schedule.as_bytes()).unwrap(); + file.flush().unwrap(); + log::info!("aggregate_leader_schedule len:{}", slot_leaders.len()); let mut input_leader_schedule: HashMap> = HashMap::new(); for (slot, pubkey) in slot_leaders.iter().copied().enumerate() { diff --git a/stake_aggregate/bin/send_get_vote_account.rs b/stake_aggregate/bin/send_get_vote_account.rs new file mode 100644 index 0000000..acb2b08 --- /dev/null +++ b/stake_aggregate/bin/send_get_vote_account.rs @@ -0,0 +1,45 @@ +use futures_util::future::join_all; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::commitment_config::CommitmentConfig; + +const LITE_RPC_URL: &str = "http://127.0.0.1:8890"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + log::info!("Start send get vote account"); + let mut handles = vec![]; + + for _ in 0..10 { + let h = tokio::task::spawn(async move { + call_get_vote_account().await; + }); + handles.push(h); + } + join_all(handles).await; + //std::thread::sleep(std::time::Duration::from_millis(1000000)); + Ok(()) +} + +async fn call_get_vote_account() { + let rpc_client = + RpcClient::new_with_commitment(LITE_RPC_URL.to_string(), CommitmentConfig::confirmed()); + loop { + match tokio::time::timeout( + std::time::Duration::from_millis(1000), + rpc_client.get_vote_accounts(), + ) + .await + { + Ok(Ok(accounts)) => { + if accounts.current.len() == 0 { + log::info!("get vote account len:0",); + } + } + Err(_) => panic!("get_vote_account timeout"), + Ok(Err(err)) => panic!("get_vote_account rpc error:{err}"), + } + std::thread::sleep(std::time::Duration::from_millis(10)); + } +} diff --git a/stake_aggregate/src/stakestore.rs b/stake_aggregate/src/stakestore.rs index e0b0a14..c20a8a7 100644 --- a/stake_aggregate/src/stakestore.rs +++ b/stake_aggregate/src/stakestore.rs @@ -46,14 +46,14 @@ fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake) { //doesn't erase new state with an old one. Can arrive during bootstrapping. //several instructions can be done in the same slot. if strstake.last_update_slot <= stake.last_update_slot { - log::trace!("stake_map_notify_stake Stake store updated stake: {} old_stake:{strstake:?} stake:{stake:?}", stake.pubkey); + log::info!("stake_map_notify_stake updated stake: {} old_stake:{strstake:?} stake:{stake:?}", stake.pubkey); *strstake = stake; } } // If value doesn't exist yet, then insert a new value of 1 std::collections::hash_map::Entry::Vacant(vacant) => { - log::trace!( - "stake_map_notify_stake Stake store insert stake: {} stake:{stake:?}", + log::info!( + "stake_map_notify_stake insert stake: {} stake:{stake:?}", stake.pubkey ); vacant.insert(stake); @@ -347,16 +347,24 @@ pub async fn start_stake_verification_loop( } async fn send_verification( - stake_sender: &mut Sender<(String, Pubkey, Option)>, - stakestore: &mut StakeStore, - instr: &str, - stake_pybkey: Pubkey, + _stake_sender: &mut Sender<(String, Pubkey, Option)>, + _stakestore: &mut StakeStore, + _instr: &str, + _stake_pybkey: Pubkey, ) { - let current_stake = stakestore.stakes.get(&stake_pybkey).cloned(); - stake_sender - .send((instr.to_string(), stake_pybkey, current_stake)) - .await - .unwrap(); + // let current_stake = stakestore.stakes.get(&stake_pybkey).cloned(); + // stake_sender + // .send((instr.to_string(), stake_pybkey, current_stake)) + // .await + // .unwrap(); +} + +fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: Vec) -> bool { + !indexes + .into_iter() + .filter(|index| (instr_accounts[*index] as usize) >= account_keys.len()) + .next() + .is_some() } pub async fn process_stake_tx_message( @@ -366,8 +374,8 @@ pub async fn process_stake_tx_message( instruction: CompiledInstruction, //for debug and trace purpose. program_id_index: u32, - tx_slot: Slot, - current_end_epoch_slot: u64, + _tx_slot: Slot, + _current_end_epoch_slot: u64, ) { //for tracing purpose let account_keys: Vec = account_keys_vec @@ -381,20 +389,21 @@ pub async fn process_stake_tx_message( .collect(); //for debug get stake account index - log::info!( + log::trace!( "Found tx with stake account accounts:{:?} stake account index:{program_id_index:?}", account_keys, ); //merge and delegate has 1 instruction. Create as 2 instructions and the first is not a StakeInstruction. - log::info!( + log::trace!( "Before read instruction of program_id_index:{}", instruction.program_id_index ); let Ok(stake_inst) = bincode::deserialize::(&instruction.data) else { - log::info!( - "Error during stake instruction decoding :{:?}", - &instruction.data + log::warn!( + "Error during stake instruction decoding :{:?} stake_account:{:?}", + &instruction.data, + account_keys, ); return; }; @@ -410,10 +419,14 @@ pub async fn process_stake_tx_message( "custodian": lockup.custodian.to_string(), }); - log::info!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} stakeAccount:{} rentSysvar:{}" + if verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) { + log::info!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} stakeAccount:{} rentSysvar:{}" , account_keys[instruction.accounts[0] as usize].to_string() , account_keys[instruction.accounts[1] as usize].to_string() ); + } else { + log::warn!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} Index error in instruction:{:?}", instruction.accounts); + } send_verification( stake_sender, @@ -424,6 +437,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::Authorize(new_authorized, authority_type) => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { + log::warn!( + "StakeInstruction::Authorize authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let value = json!({ "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "clockSysvar": account_keys[instruction.accounts[1] as usize].to_string(), @@ -448,6 +468,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::DelegateStake => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4, 5]) { + log::warn!( + "StakeInstruction::DelegateStake authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let info = json!({ "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "voteAccount": account_keys[instruction.accounts[1] as usize].to_string(), @@ -467,6 +494,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::Split(lamports) => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { + log::warn!( + "StakeInstruction::Split authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let info = json!({ "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "newSplitAccount": account_keys[instruction.accounts[1] as usize].to_string(), @@ -484,6 +518,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::Withdraw(lamports) => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) { + log::warn!( + "StakeInstruction::Withdraw authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let info = json!({ "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "destination": account_keys[instruction.accounts[1] as usize].to_string(), @@ -505,6 +546,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::Deactivate => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { + log::warn!( + "StakeInstruction::Deactivate authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let info = json!({ "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "clockSysvar": account_keys[instruction.accounts[1] as usize].to_string(), @@ -513,6 +561,13 @@ pub async fn process_stake_tx_message( log::info!("StakeInstruction::Deactivate infos:{info}"); } StakeInstruction::SetLockup(lockup_args) => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) { + log::warn!( + "StakeInstruction::SetLockup authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let unix_timestamp = lockup_args .unix_timestamp .map(|timestamp| json!(timestamp)) @@ -540,6 +595,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::AuthorizeWithSeed(args) => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) { + log::warn!( + "StakeInstruction::AuthorizeWithSeed authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let info = json!({ "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "authorityBase": account_keys[instruction.accounts[1] as usize].to_string(), @@ -563,6 +625,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::InitializeChecked => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) { + log::warn!( + "StakeInstruction::InitializeChecked authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let info = json!({ "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "rentSysvar": account_keys[instruction.accounts[1] as usize].to_string(), @@ -580,6 +649,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::AuthorizeChecked(authority_type) => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) { + log::warn!( + "StakeInstruction::AuthorizeChecked authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let info = json!({ "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "clockSysvar": account_keys[instruction.accounts[1] as usize].to_string(), @@ -600,6 +676,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::AuthorizeCheckedWithSeed(args) => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) { + log::warn!( + "StakeInstruction::AuthorizeCheckedWithSeed authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let info = json!({ "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "authorityBase": account_keys[instruction.accounts[1] as usize].to_string(), @@ -624,6 +707,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::SetLockupChecked(lockup_args) => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { + log::warn!( + "StakeInstruction::SetLockupChecked authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let unix_timestamp = lockup_args .unix_timestamp .map(|timestamp| json!(timestamp)) @@ -652,6 +742,13 @@ pub async fn process_stake_tx_message( log::info!("StakeInstruction::GetMinimumDelegation"); } StakeInstruction::DeactivateDelinquent => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { + log::warn!( + "StakeInstruction::DeactivateDelinquent authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let info = json!({ "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "voteAccount": account_keys[instruction.accounts[1] as usize].to_string(), @@ -668,6 +765,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::Redelegate => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) { + log::warn!( + "StakeInstruction::Redelegate authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let info = json!({ "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "newStakeAccount": account_keys[instruction.accounts[1] as usize].to_string(), @@ -686,6 +790,13 @@ pub async fn process_stake_tx_message( .await; } StakeInstruction::Merge => { + if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) { + log::warn!( + "StakeInstruction::Merge authorized Index error in instruction:{:?}", + instruction.accounts + ); + return; + } let info = json!({ "destination": account_keys[instruction.accounts[0] as usize].to_string(), "source": account_keys[instruction.accounts[1] as usize].to_string(), @@ -694,28 +805,28 @@ pub async fn process_stake_tx_message( "stakeAuthority": account_keys[instruction.accounts[4] as usize].to_string(), }); log::info!("StakeInstruction::Merge infos:{info}"); - let source_account = &account_keys_vec[instruction.accounts[1] as usize]; - let source_bytes: [u8; 32] = source_account[..solana_sdk::pubkey::PUBKEY_BYTES] - .try_into() - .unwrap(); - let source_pubkey = Pubkey::new_from_array(source_bytes); - log::info!( - "DETECT MERGE for source account:{}", - source_pubkey.to_string() - ); + // let source_account = &account_keys_vec[instruction.accounts[1] as usize]; + // let source_bytes: [u8; 32] = source_account[..solana_sdk::pubkey::PUBKEY_BYTES] + // .try_into() + // .unwrap(); + //let source_pubkey = Pubkey::new_from_array(source_bytes); + // log::info!( + // "DETECT MERGE for source account:{}", + // source_pubkey.to_string() + // ); - stakestore.notify_stake_action( - ExtractedAction::Remove(source_pubkey, tx_slot), - current_end_epoch_slot, - ); - stakestore.notify_stake_action( - ExtractedAction::Merge { - source_account: account_keys[instruction.accounts[1] as usize], - destination_account: account_keys[instruction.accounts[0] as usize], - update_slot: tx_slot, - }, - current_end_epoch_slot, - ); + // stakestore.notify_stake_action( + // ExtractedAction::Remove(source_pubkey, tx_slot), + // current_end_epoch_slot, + // ); + // stakestore.notify_stake_action( + // ExtractedAction::Merge { + // source_account: account_keys[instruction.accounts[1] as usize], + // destination_account: account_keys[instruction.accounts[0] as usize], + // update_slot: tx_slot, + // }, + // current_end_epoch_slot, + // ); // send_verification( // stake_sender,