From f91965e1dd12878f12993cdd4bd82d5032350aa9 Mon Sep 17 00:00:00 2001 From: musitdev Date: Fri, 6 Oct 2023 17:20:17 +0200 Subject: [PATCH] manage account removal from notification --- stake_aggregate/src/main.rs | 1 - stake_aggregate/src/stakestore.rs | 50 ++++++++++++----------------- stake_aggregate/src/votestore.rs | 52 +++++++++++++++++++++---------- 3 files changed, 55 insertions(+), 48 deletions(-) diff --git a/stake_aggregate/src/main.rs b/stake_aggregate/src/main.rs index 74b485f..2e14271 100644 --- a/stake_aggregate/src/main.rs +++ b/stake_aggregate/src/main.rs @@ -258,7 +258,6 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re //TODO remove. Store parent hash to see if we don't miss a block. let mut parent_block_slot = None; - loop { tokio::select! { Some(req) = request_rx.recv() => { diff --git a/stake_aggregate/src/stakestore.rs b/stake_aggregate/src/stakestore.rs index 98f1978..1c8bad3 100644 --- a/stake_aggregate/src/stakestore.rs +++ b/stake_aggregate/src/stakestore.rs @@ -135,7 +135,12 @@ impl StakeStore { account: AccountPretty, current_end_epoch_slot: Slot, ) -> anyhow::Result<()> { + //if lamport == 0 the account has been removed. if account.lamports == 0 { + self.notify_stake_action( + ExtractedAction::Remove(account.pubkey, account.slot), + current_end_epoch_slot, + ); } else { let Ok(delegated_stake_opt) = account.read_stake() else { bail!("Can't read stake from account data"); @@ -224,21 +229,6 @@ impl StakeStore { for action in self.updates { stakestore.process_stake_action(action); } - - //verify one stake account to test. TODO remove - // let stake_account = stakestore - // .stakes - // .get(&Pubkey::from_str("2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce").unwrap()); - // let stake = stake_account.map(|stake| { - // stake - // .stake - // .stake(current_epoch, stakestore.stake_history.as_ref(), Some(0)) - // }); - // log::info!( - // "merge_stakes 2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce:{:?}", - // stake - // ); - Ok(stakestore) } @@ -249,7 +239,7 @@ impl StakeStore { .map(|stake| stake.last_update_slot <= update_slot) .unwrap_or(true) { - log::info!("remove_from_store for {}", account_pk.to_string()); + log::info!("Stake remove_from_store for {}", account_pk.to_string()); self.stakes.remove(account_pk); } } @@ -726,21 +716,21 @@ pub async fn process_stake_tx_message( current_end_epoch_slot, ); - send_verification( - stake_sender, - stakestore, - "Merge Destination", - account_keys[instruction.accounts[0] as usize], - ) - .await; + // send_verification( + // stake_sender, + // stakestore, + // "Merge Destination", + // account_keys[instruction.accounts[0] as usize], + // ) + // .await; - send_verification( - stake_sender, - stakestore, - "Merge Source", - account_keys[instruction.accounts[1] as usize], - ) - .await; + // send_verification( + // stake_sender, + // stakestore, + // "Merge Source", + // account_keys[instruction.accounts[1] as usize], + // ) + // .await; } } } diff --git a/stake_aggregate/src/votestore.rs b/stake_aggregate/src/votestore.rs index 624df9e..34ceba7 100644 --- a/stake_aggregate/src/votestore.rs +++ b/stake_aggregate/src/votestore.rs @@ -105,32 +105,50 @@ impl VoteStore { new_account: AccountPretty, current_end_epoch_slot: Slot, ) -> anyhow::Result<()> { - let Ok(vote_data) = new_account.read_vote() else { - bail!("Can't read Vote from account data"); - }; + if new_account.lamports == 0 { + self.remove_from_store(&new_account.pubkey, new_account.slot); + } else { + let Ok(vote_data) = new_account.read_vote() else { + bail!("Can't read Vote from account data"); + }; - //log::info!("add_vote {} :{vote_data:?}", new_account.pubkey); + //log::info!("add_vote {} :{vote_data:?}", new_account.pubkey); - let new_voteacc = StoredVote { - pubkey: new_account.pubkey, - vote_data, - last_update_slot: new_account.slot, - write_version: new_account.write_version, - }; + let new_voteacc = StoredVote { + pubkey: new_account.pubkey, + vote_data, + last_update_slot: new_account.slot, + write_version: new_account.write_version, + }; - //during extract push the new update or - //don't insertnow account change that has been done in next epoch. - //put in update pool to be merged next epoch change. - let insert_stake = !self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot; - match insert_stake { - false => self.updates.push((new_account.pubkey, new_voteacc)), - true => self.insert_vote(new_account.pubkey, new_voteacc), + //during extract push the new update or + //don't insertnow account change that has been done in next epoch. + //put in update pool to be merged next epoch change. + let insert_stake = + !self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot; + match insert_stake { + false => self.updates.push((new_account.pubkey, new_voteacc)), + true => self.insert_vote(new_account.pubkey, new_voteacc), + } } + Ok(()) } fn insert_vote(&mut self, vote_account: Pubkey, vote_data: StoredVote) { vote_map_insert_vote(&mut self.votes, vote_account, vote_data); } + + fn remove_from_store(&mut self, account_pk: &Pubkey, update_slot: Slot) { + if self + .votes + .get(account_pk) + .map(|vote| vote.last_update_slot <= update_slot) + .unwrap_or(true) + { + log::info!("Vote remove_from_store for {}", account_pk.to_string()); + self.votes.remove(account_pk); + } + } } pub fn merge_program_account_in_vote_map(