diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index f10c9451e..a8e94d43f 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -379,11 +379,11 @@ pub enum ForwardOption { ForwardTransaction, } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct FilterForwardingResults { - total_forwardable_packets: usize, - total_tracer_packets_in_buffer: usize, - total_forwardable_tracer_packets: usize, + pub(crate) total_forwardable_packets: usize, + pub(crate) total_tracer_packets_in_buffer: usize, + pub(crate) total_forwardable_tracer_packets: usize, } impl BankingStage { diff --git a/core/src/latest_unprocessed_votes.rs b/core/src/latest_unprocessed_votes.rs index 1bd5506ea..699e87015 100644 --- a/core/src/latest_unprocessed_votes.rs +++ b/core/src/latest_unprocessed_votes.rs @@ -139,8 +139,8 @@ pub(crate) fn weighted_random_order_by_stake<'a>( } pub struct VoteBatchInsertionMetrics { - num_dropped_gossip: usize, - num_dropped_tpu: usize, + pub(crate) num_dropped_gossip: usize, + pub(crate) num_dropped_tpu: usize, } #[derive(Debug, Default)] @@ -277,6 +277,24 @@ impl LatestUnprocessedVotes { .count() } + /// Drains all votes yet to be processed sorted by a weighted random ordering by stake + pub fn drain_unprocessed(&self, bank: Arc) -> Vec> { + let pubkeys_by_stake = weighted_random_order_by_stake( + &bank, + self.latest_votes_per_pubkey.read().unwrap().keys(), + ) + .collect_vec(); + pubkeys_by_stake + .into_iter() + .filter_map(|pubkey| { + self.get_entry(pubkey).and_then(|lock| { + let mut latest_vote = lock.write().unwrap(); + latest_vote.take_vote() + }) + }) + .collect_vec() + } + /// Sometimes we forward and hold the packets, sometimes we forward and clear. /// This also clears all gossip votes since by definition they have been forwarded pub fn clear_forwarded_packets(&self) { diff --git a/core/src/lib.rs b/core/src/lib.rs index 7ba9f931d..d521075f6 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -75,6 +75,7 @@ pub mod tree_diff; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; pub mod unprocessed_packet_batches; +pub mod unprocessed_transaction_storage; pub mod validator; pub mod verified_vote_packets; pub mod vote_simulator; diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 5fd9e93b1..7caf7d0c1 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -78,7 +78,7 @@ impl Ord for DeserializedPacket { /// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store /// PacketBatch's received from sigverify. Banking thread continuously scans the buffer /// to pick proper packets to add to the block. -#[derive(Default)] +#[derive(Debug, Default)] pub struct UnprocessedPacketBatches { pub packet_priority_queue: MinMaxHeap>, pub message_hash_to_transaction: HashMap, diff --git a/core/src/unprocessed_transaction_storage.rs b/core/src/unprocessed_transaction_storage.rs new file mode 100644 index 000000000..9b5be9883 --- /dev/null +++ b/core/src/unprocessed_transaction_storage.rs @@ -0,0 +1,471 @@ +#![allow(dead_code)] +use { + crate::{ + banking_stage::{BankingStage, FilterForwardingResults, ForwardOption}, + forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, + immutable_deserialized_packet::ImmutableDeserializedPacket, + latest_unprocessed_votes::{ + self, LatestUnprocessedVotes, LatestValidatorVotePacket, VoteBatchInsertionMetrics, + VoteSource, + }, + unprocessed_packet_batches::{self, DeserializedPacket, UnprocessedPacketBatches}, + }, + itertools::Itertools, + min_max_heap::MinMaxHeap, + solana_perf::packet::PacketBatch, + solana_runtime::bank::Bank, + std::{rc::Rc, sync::Arc}, +}; + +const MAX_STAKED_VALIDATORS: usize = 10_000; + +#[derive(Debug)] +pub enum UnprocessedTransactionStorage { + VoteStorage(VoteStorage), + LocalTransactionStorage(ThreadLocalUnprocessedPackets), +} + +#[derive(Debug)] +pub struct ThreadLocalUnprocessedPackets { + unprocessed_packet_batches: UnprocessedPacketBatches, + thread_type: ThreadType, +} + +#[derive(Debug)] +pub struct VoteStorage { + latest_unprocessed_votes: Arc, + vote_source: VoteSource, +} + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum ThreadType { + Voting(VoteSource), + Transactions, +} + +#[derive(Debug, Default)] +pub struct InsertPacketBatchesSummary { + pub(crate) num_dropped_packets: usize, + pub(crate) num_dropped_gossip_vote_packets: usize, + pub(crate) num_dropped_tpu_vote_packets: usize, + pub(crate) num_dropped_tracer_packets: usize, +} + +fn filter_processed_packets<'a, F>( + retryable_transaction_indexes: impl Iterator, + mut f: F, +) where + F: FnMut(usize, usize), +{ + let mut prev_retryable_index = 0; + for (i, retryable_index) in retryable_transaction_indexes.enumerate() { + let start = if i == 0 { 0 } else { prev_retryable_index + 1 }; + + let end = *retryable_index; + prev_retryable_index = *retryable_index; + + if start < end { + f(start, end) + } + } +} + +impl UnprocessedTransactionStorage { + pub fn new_transaction_storage( + unprocessed_packet_batches: UnprocessedPacketBatches, + thread_type: ThreadType, + ) -> Self { + Self::LocalTransactionStorage(ThreadLocalUnprocessedPackets { + unprocessed_packet_batches, + thread_type, + }) + } + + pub fn new_vote_storage( + latest_unprocessed_votes: Arc, + vote_source: VoteSource, + ) -> Self { + Self::VoteStorage(VoteStorage { + latest_unprocessed_votes, + vote_source, + }) + } + + pub fn is_empty(&self) -> bool { + match self { + Self::VoteStorage(vote_storage) => vote_storage.is_empty(), + Self::LocalTransactionStorage(transaction_storage) => transaction_storage.is_empty(), + } + } + + pub fn len(&self) -> usize { + match self { + Self::VoteStorage(vote_storage) => vote_storage.len(), + Self::LocalTransactionStorage(transaction_storage) => transaction_storage.len(), + } + } + + pub fn capacity(&self) -> usize { + match self { + Self::VoteStorage(vote_storage) => vote_storage.capacity(), + Self::LocalTransactionStorage(transaction_storage) => transaction_storage.capacity(), + } + } + + pub fn should_not_process(&self) -> bool { + // The gossip vote thread does not need to process or forward any votes, that is + // handled by the tpu vote thread + if let Self::VoteStorage(vote_storage) = self { + return matches!(vote_storage.vote_source, VoteSource::Gossip); + } + false + } + + #[cfg(test)] + pub fn iter(&mut self) -> impl Iterator { + match self { + Self::LocalTransactionStorage(transaction_storage) => transaction_storage.iter(), + _ => panic!(), + } + } + + pub fn forward_option(&self) -> ForwardOption { + match self { + Self::VoteStorage(vote_storage) => vote_storage.forward_option(), + Self::LocalTransactionStorage(transaction_storage) => { + transaction_storage.forward_option() + } + } + } + + pub fn clear_forwarded_packets(&mut self) { + match self { + Self::LocalTransactionStorage(transaction_storage) => transaction_storage.clear(), // Since we set everything as forwarded this is the same + Self::VoteStorage(vote_storage) => vote_storage.clear_forwarded_packets(), + } + } + + pub fn deserialize_and_insert_batch( + &mut self, + packet_batch: &PacketBatch, + packet_indexes: &[usize], + ) -> InsertPacketBatchesSummary { + match self { + Self::VoteStorage(vote_storage) => { + let VoteBatchInsertionMetrics { + num_dropped_gossip, + num_dropped_tpu, + } = vote_storage.deserialize_and_insert_batch(packet_batch, packet_indexes); + InsertPacketBatchesSummary { + num_dropped_packets: num_dropped_gossip + num_dropped_tpu, + num_dropped_gossip_vote_packets: num_dropped_gossip, + num_dropped_tpu_vote_packets: num_dropped_tpu, + ..InsertPacketBatchesSummary::default() + } + } + Self::LocalTransactionStorage(transaction_storage) => { + let (num_dropped_packets, num_dropped_tracer_packets) = + transaction_storage.deserialize_and_insert_batch(packet_batch, packet_indexes); + InsertPacketBatchesSummary { + num_dropped_packets, + num_dropped_tracer_packets, + ..InsertPacketBatchesSummary::default() + } + } + } + } + + pub fn filter_forwardable_packets_and_add_batches( + &mut self, + forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, + ) -> FilterForwardingResults { + match self { + Self::LocalTransactionStorage(transaction_storage) => transaction_storage + .filter_forwardable_packets_and_add_batches(forward_packet_batches_by_accounts), + Self::VoteStorage(vote_storage) => vote_storage + .filter_forwardable_packets_and_add_batches(forward_packet_batches_by_accounts), + } + } + + /// The processing function takes a stream of packets ready to process, and returns the indices + /// of the unprocessed packets that are eligible for retry. A return value of None means that + /// all packets are unprocessed and eligible for retry. + pub fn process_packets( + &mut self, + bank: Option>, + batch_size: usize, + processing_function: F, + ) where + F: FnMut(&Vec>) -> Option>, + { + match (self, bank) { + (Self::LocalTransactionStorage(transaction_storage), _) => { + transaction_storage.process_packets(batch_size, processing_function) + } + (Self::VoteStorage(vote_storage), Some(bank)) => { + vote_storage.process_packets(bank, batch_size, processing_function) + } + _ => {} + } + } +} + +impl VoteStorage { + fn is_empty(&self) -> bool { + self.latest_unprocessed_votes.is_empty() + } + + fn len(&self) -> usize { + self.latest_unprocessed_votes.len() + } + + fn capacity(&self) -> usize { + MAX_STAKED_VALIDATORS + } + + fn forward_option(&self) -> ForwardOption { + match self.vote_source { + VoteSource::Tpu => ForwardOption::ForwardTpuVote, + VoteSource::Gossip => ForwardOption::NotForward, + } + } + + fn clear_forwarded_packets(&mut self) { + self.latest_unprocessed_votes.clear_forwarded_packets(); + } + + fn deserialize_and_insert_batch( + &mut self, + packet_batch: &PacketBatch, + packet_indexes: &[usize], + ) -> VoteBatchInsertionMetrics { + self.latest_unprocessed_votes + .insert_batch(latest_unprocessed_votes::deserialize_packets( + packet_batch, + packet_indexes, + self.vote_source, + )) + } + + fn filter_forwardable_packets_and_add_batches( + &mut self, + forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, + ) -> FilterForwardingResults { + if matches!(self.vote_source, VoteSource::Tpu) { + let total_forwardable_packets = self + .latest_unprocessed_votes + .get_and_insert_forwardable_packets(forward_packet_batches_by_accounts); + return FilterForwardingResults { + total_forwardable_packets, + ..FilterForwardingResults::default() + }; + } + FilterForwardingResults::default() + } + + fn process_packets(&mut self, bank: Arc, batch_size: usize, mut processing_function: F) + where + F: FnMut(&Vec>) -> Option>, + { + if matches!(self.vote_source, VoteSource::Gossip) { + panic!("Gossip vote thread should not be processing transactions"); + } + + // Insert the retryable votes back in + self.latest_unprocessed_votes.insert_batch( + // Based on the stake distribution present in the supplied bank, drain the unprocessed votes + // from each validator using a weighted random ordering. Votes from validators with + // 0 stake are ignored. + self.latest_unprocessed_votes + .drain_unprocessed(bank) + .into_iter() + .chunks(batch_size) + .into_iter() + .flat_map(|vote_packets| { + let vote_packets = vote_packets.into_iter().collect_vec(); + if let Some(retryable_vote_indices) = processing_function(&vote_packets) { + retryable_vote_indices + .iter() + .map(|i| vote_packets[*i].clone()) + .collect_vec() + } else { + vote_packets + } + }) + .filter_map(|packet| { + LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok() + }), + ); + } +} + +impl ThreadLocalUnprocessedPackets { + fn is_empty(&self) -> bool { + self.unprocessed_packet_batches.is_empty() + } + + pub fn thread_type(&self) -> ThreadType { + self.thread_type + } + + fn len(&self) -> usize { + self.unprocessed_packet_batches.len() + } + + fn capacity(&self) -> usize { + self.unprocessed_packet_batches.capacity() + } + + #[cfg(test)] + fn iter(&mut self) -> impl Iterator { + self.unprocessed_packet_batches.iter() + } + + pub fn iter_mut(&mut self) -> impl Iterator { + self.unprocessed_packet_batches.iter_mut() + } + + fn forward_option(&self) -> ForwardOption { + match self.thread_type { + ThreadType::Transactions => ForwardOption::ForwardTransaction, + ThreadType::Voting(VoteSource::Tpu) => ForwardOption::ForwardTpuVote, + ThreadType::Voting(VoteSource::Gossip) => ForwardOption::NotForward, + } + } + + fn clear(&mut self) { + self.unprocessed_packet_batches.clear(); + } + + fn deserialize_and_insert_batch( + &mut self, + packet_batch: &PacketBatch, + packet_indexes: &[usize], + ) -> (usize, usize) { + self.unprocessed_packet_batches.insert_batch( + unprocessed_packet_batches::deserialize_packets(packet_batch, packet_indexes), + ) + } + + fn filter_forwardable_packets_and_add_batches( + &mut self, + forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, + ) -> FilterForwardingResults { + BankingStage::filter_valid_packets_for_forwarding( + &mut self.unprocessed_packet_batches, + forward_packet_batches_by_accounts, + ) + } + + fn process_packets(&mut self, batch_size: usize, mut processing_function: F) + where + F: FnMut(&Vec>) -> Option>, + { + let mut retryable_packets = { + let capacity = self.unprocessed_packet_batches.capacity(); + std::mem::replace( + &mut self.unprocessed_packet_batches.packet_priority_queue, + MinMaxHeap::with_capacity(capacity), + ) + }; + let retryable_packets: MinMaxHeap> = retryable_packets + .drain_desc() + .chunks(batch_size) + .into_iter() + .flat_map(|packets_to_process| { + let packets_to_process = packets_to_process.into_iter().collect_vec(); + if let Some(retryable_transaction_indexes) = + processing_function(&packets_to_process) + { + // Remove the non-retryable packets, packets that were either: + // 1) Successfully processed + // 2) Failed but not retryable + filter_processed_packets( + retryable_transaction_indexes + .iter() + .chain(std::iter::once(&packets_to_process.len())), + |start, end| { + for processed_packet in &packets_to_process[start..end] { + self.unprocessed_packet_batches + .message_hash_to_transaction + .remove(processed_packet.message_hash()); + } + }, + ); + retryable_transaction_indexes + .iter() + .map(|i| packets_to_process[*i].clone()) + .collect_vec() + } else { + packets_to_process + } + }) + .collect::>(); + + self.unprocessed_packet_batches.packet_priority_queue = retryable_packets; + + // Assert unprocessed queue is still consistent + assert_eq!( + self.unprocessed_packet_batches.packet_priority_queue.len(), + self.unprocessed_packet_batches + .message_hash_to_transaction + .len() + ); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_filter_processed_packets() { + let retryable_indexes = [0, 1, 2, 3]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + filter_processed_packets(retryable_indexes.iter(), f); + assert!(non_retryable_indexes.is_empty()); + + let retryable_indexes = [0, 1, 2, 3, 5]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + filter_processed_packets(retryable_indexes.iter(), f); + assert_eq!(non_retryable_indexes, vec![(4, 5)]); + + let retryable_indexes = [1, 2, 3]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + filter_processed_packets(retryable_indexes.iter(), f); + assert_eq!(non_retryable_indexes, vec![(0, 1)]); + + let retryable_indexes = [1, 2, 3, 5]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + filter_processed_packets(retryable_indexes.iter(), f); + assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5)]); + + let retryable_indexes = [1, 2, 3, 5, 8]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + filter_processed_packets(retryable_indexes.iter(), f); + assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); + + let retryable_indexes = [1, 2, 3, 5, 8, 8]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + filter_processed_packets(retryable_indexes.iter(), f); + assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); + } +}