diff --git a/data-streams/src/chain_data.rs b/data-streams/src/chain_data.rs index 901b28b..af81f2f 100644 --- a/data-streams/src/chain_data.rs +++ b/data-streams/src/chain_data.rs @@ -39,6 +39,7 @@ pub struct ChainData { accounts: HashMap>, newest_rooted_slot: u64, newest_processed_slot: u64, + best_chain_slot: u64, account_versions_stored: usize, account_bytes_stored: usize, metric_accounts_stored: MetricU64, @@ -53,6 +54,7 @@ impl ChainData { accounts: HashMap::new(), newest_rooted_slot: 0, newest_processed_slot: 0, + best_chain_slot: 0, account_versions_stored: 0, account_bytes_stored: 0, metric_accounts_stored: metrics_sender @@ -78,6 +80,13 @@ impl ChainData { self.newest_rooted_slot = new_slot.slot; } + // Use the highest slot that has a known parent as best chain + // (sometimes slots OptimisticallyConfirm before we even know the parent!) + let new_best_chain = new_slot.parent.is_some() && new_slot.slot > self.best_chain_slot; + if new_best_chain { + self.best_chain_slot = new_slot.slot; + } + let mut parent_update = false; use std::collections::hash_map::Entry; @@ -89,16 +98,19 @@ impl ChainData { let v = o.into_mut(); parent_update = v.parent != new_slot.parent && new_slot.parent.is_some(); v.parent = v.parent.or(new_slot.parent); - v.status = new_slot.status; + // Never decrease the slot status + if v.status == SlotStatus::Processed || new_slot.status == SlotStatus::Rooted { + v.status = new_slot.status; + } } }; - if new_processed_head || parent_update { + if new_best_chain || parent_update { // update the "chain" field down to the first rooted slot - let mut slot = self.newest_processed_slot; + let mut slot = self.best_chain_slot; loop { if let Some(data) = self.slots.get_mut(&slot) { - data.chain = self.newest_processed_slot; + data.chain = self.best_chain_slot; if data.status == SlotStatus::Rooted { break; } @@ -131,7 +143,7 @@ impl ChainData { // getting rooted, hence assume non-uncle slots < newest_rooted_slot // are rooted too s.status == SlotStatus::Rooted - || s.chain == self.newest_processed_slot + || s.chain == self.best_chain_slot }) // preserved account writes for deleted slots <= newest_rooted_slot // are expected to be rooted @@ -196,11 +208,13 @@ impl ChainData { self.slots .get(&write.slot) // either the slot is rooted or in the current chain - .map(|s| s.status == SlotStatus::Rooted || s.chain == self.newest_processed_slot) + .map(|s| { + s.status == SlotStatus::Rooted + || s.chain == self.best_chain_slot + || write.slot > self.best_chain_slot + }) // if the slot can't be found but preceeds newest rooted, use it too (old rooted slots are removed) - .unwrap_or( - write.slot <= self.newest_rooted_slot || write.slot > self.newest_processed_slot, - ) + .unwrap_or(write.slot <= self.newest_rooted_slot || write.slot > self.best_chain_slot) } /// Cloned snapshot of all the most recent live writes per pubkey @@ -212,7 +226,7 @@ impl ChainData { .iter() .rev() .find(|w| self.is_account_write_live(w))?; - Some((pubkey.clone(), latest_good_write.clone())) + Some((*pubkey, latest_good_write.clone())) }) .collect() } @@ -221,10 +235,44 @@ impl ChainData { pub fn account<'a>(&'a self, pubkey: &Pubkey) -> anyhow::Result<&'a AccountData> { self.accounts .get(pubkey) - .ok_or(anyhow::anyhow!("account {} not found", pubkey))? + .ok_or_else(|| anyhow::anyhow!("account {} not found", pubkey))? .iter() .rev() .find(|w| self.is_account_write_live(w)) - .ok_or(anyhow::anyhow!("account {} has no live data", pubkey)) + .ok_or_else(|| anyhow::anyhow!("account {} has no live data", pubkey)) + } + + pub fn iter_accounts<'a>(&'a self) -> impl Iterator { + self.accounts.iter().filter_map(|(pk, writes)| { + writes + .iter() + .rev() + .find(|w| self.is_account_write_live(w)) + .map(|latest_write| (pk, latest_write)) + }) + } + + pub fn slots_count(&self) -> usize { + self.slots.len() + } + + pub fn accounts_count(&self) -> usize { + self.accounts.len() + } + + pub fn account_writes_count(&self) -> usize { + self.account_versions_stored + } + + pub fn account_bytes(&self) -> usize { + self.account_bytes_stored + } + + pub fn best_chain_slot(&self) -> u64 { + self.best_chain_slot + } + + pub fn newest_rooted_slot(&self) -> u64 { + self.newest_rooted_slot } }