manage account removal from notification
This commit is contained in:
parent
db8a3079cb
commit
f91965e1dd
|
@ -258,7 +258,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
|
||||
//TODO remove. Store parent hash to see if we don't miss a block.
|
||||
let mut parent_block_slot = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(req) = request_rx.recv() => {
|
||||
|
|
|
@ -135,7 +135,12 @@ impl StakeStore {
|
|||
account: AccountPretty,
|
||||
current_end_epoch_slot: Slot,
|
||||
) -> anyhow::Result<()> {
|
||||
//if lamport == 0 the account has been removed.
|
||||
if account.lamports == 0 {
|
||||
self.notify_stake_action(
|
||||
ExtractedAction::Remove(account.pubkey, account.slot),
|
||||
current_end_epoch_slot,
|
||||
);
|
||||
} else {
|
||||
let Ok(delegated_stake_opt) = account.read_stake() else {
|
||||
bail!("Can't read stake from account data");
|
||||
|
@ -224,21 +229,6 @@ impl StakeStore {
|
|||
for action in self.updates {
|
||||
stakestore.process_stake_action(action);
|
||||
}
|
||||
|
||||
//verify one stake account to test. TODO remove
|
||||
// let stake_account = stakestore
|
||||
// .stakes
|
||||
// .get(&Pubkey::from_str("2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce").unwrap());
|
||||
// let stake = stake_account.map(|stake| {
|
||||
// stake
|
||||
// .stake
|
||||
// .stake(current_epoch, stakestore.stake_history.as_ref(), Some(0))
|
||||
// });
|
||||
// log::info!(
|
||||
// "merge_stakes 2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce:{:?}",
|
||||
// stake
|
||||
// );
|
||||
|
||||
Ok(stakestore)
|
||||
}
|
||||
|
||||
|
@ -249,7 +239,7 @@ impl StakeStore {
|
|||
.map(|stake| stake.last_update_slot <= update_slot)
|
||||
.unwrap_or(true)
|
||||
{
|
||||
log::info!("remove_from_store for {}", account_pk.to_string());
|
||||
log::info!("Stake remove_from_store for {}", account_pk.to_string());
|
||||
self.stakes.remove(account_pk);
|
||||
}
|
||||
}
|
||||
|
@ -726,21 +716,21 @@ pub async fn process_stake_tx_message(
|
|||
current_end_epoch_slot,
|
||||
);
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"Merge Destination",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
// send_verification(
|
||||
// stake_sender,
|
||||
// stakestore,
|
||||
// "Merge Destination",
|
||||
// account_keys[instruction.accounts[0] as usize],
|
||||
// )
|
||||
// .await;
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"Merge Source",
|
||||
account_keys[instruction.accounts[1] as usize],
|
||||
)
|
||||
.await;
|
||||
// send_verification(
|
||||
// stake_sender,
|
||||
// stakestore,
|
||||
// "Merge Source",
|
||||
// account_keys[instruction.accounts[1] as usize],
|
||||
// )
|
||||
// .await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -105,32 +105,50 @@ impl VoteStore {
|
|||
new_account: AccountPretty,
|
||||
current_end_epoch_slot: Slot,
|
||||
) -> anyhow::Result<()> {
|
||||
let Ok(vote_data) = new_account.read_vote() else {
|
||||
bail!("Can't read Vote from account data");
|
||||
};
|
||||
if new_account.lamports == 0 {
|
||||
self.remove_from_store(&new_account.pubkey, new_account.slot);
|
||||
} else {
|
||||
let Ok(vote_data) = new_account.read_vote() else {
|
||||
bail!("Can't read Vote from account data");
|
||||
};
|
||||
|
||||
//log::info!("add_vote {} :{vote_data:?}", new_account.pubkey);
|
||||
//log::info!("add_vote {} :{vote_data:?}", new_account.pubkey);
|
||||
|
||||
let new_voteacc = StoredVote {
|
||||
pubkey: new_account.pubkey,
|
||||
vote_data,
|
||||
last_update_slot: new_account.slot,
|
||||
write_version: new_account.write_version,
|
||||
};
|
||||
let new_voteacc = StoredVote {
|
||||
pubkey: new_account.pubkey,
|
||||
vote_data,
|
||||
last_update_slot: new_account.slot,
|
||||
write_version: new_account.write_version,
|
||||
};
|
||||
|
||||
//during extract push the new update or
|
||||
//don't insertnow account change that has been done in next epoch.
|
||||
//put in update pool to be merged next epoch change.
|
||||
let insert_stake = !self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot;
|
||||
match insert_stake {
|
||||
false => self.updates.push((new_account.pubkey, new_voteacc)),
|
||||
true => self.insert_vote(new_account.pubkey, new_voteacc),
|
||||
//during extract push the new update or
|
||||
//don't insertnow account change that has been done in next epoch.
|
||||
//put in update pool to be merged next epoch change.
|
||||
let insert_stake =
|
||||
!self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot;
|
||||
match insert_stake {
|
||||
false => self.updates.push((new_account.pubkey, new_voteacc)),
|
||||
true => self.insert_vote(new_account.pubkey, new_voteacc),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
fn insert_vote(&mut self, vote_account: Pubkey, vote_data: StoredVote) {
|
||||
vote_map_insert_vote(&mut self.votes, vote_account, vote_data);
|
||||
}
|
||||
|
||||
fn remove_from_store(&mut self, account_pk: &Pubkey, update_slot: Slot) {
|
||||
if self
|
||||
.votes
|
||||
.get(account_pk)
|
||||
.map(|vote| vote.last_update_slot <= update_slot)
|
||||
.unwrap_or(true)
|
||||
{
|
||||
log::info!("Vote remove_from_store for {}", account_pk.to_string());
|
||||
self.votes.remove(account_pk);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn merge_program_account_in_vote_map(
|
||||
|
|
Loading…
Reference in New Issue