From e405747409b3604156ead58ffee5110e450bf036 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 7 Apr 2021 11:15:38 -0400 Subject: [PATCH] Revert "Add limit and shrink policy for recycler (#15320)" This reverts commit c2e8814dcee0f8741bcf680dc9bea88b59862dcb. --- Cargo.lock | 1 - bench-streamer/src/main.rs | 2 +- core/src/cluster_info.rs | 12 +- core/src/fetch_stage.rs | 7 +- core/src/gossip_service.rs | 2 +- core/src/serve_repair.rs | 24 +- core/src/serve_repair_service.rs | 2 +- core/src/shred_fetch_stage.rs | 4 +- core/src/sigverify.rs | 4 +- core/src/sigverify_shreds.rs | 5 +- core/src/tpu.rs | 3 - core/src/tvu.rs | 1 - ledger/benches/sigverify_shreds.rs | 2 +- ledger/src/entry.rs | 17 +- ledger/src/sigverify_shreds.rs | 24 +- perf/Cargo.toml | 1 - perf/benches/recycler.rs | 6 +- perf/benches/sigverify.rs | 6 +- perf/src/cuda_runtime.rs | 3 - perf/src/packet.rs | 30 ++- perf/src/recycler.rs | 414 +++-------------------------- perf/src/recycler_cache.rs | 15 +- perf/src/sigverify.rs | 26 +- streamer/src/streamer.rs | 9 +- 24 files changed, 129 insertions(+), 491 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d9c2069a0..4719e024fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4841,7 +4841,6 @@ dependencies = [ "serde", "solana-budget-program", "solana-logger 1.7.0", - "solana-measure", "solana-metrics", "solana-rayon-threadlimit", "solana-sdk", diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 80fe815211..18fa4f0d0e 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -75,7 +75,7 @@ fn main() -> Result<()> { let mut read_channels = Vec::new(); let mut read_threads = Vec::new(); - let recycler = PacketsRecycler::new_without_limit("bench-streamer-recycler-shrink-stats"); + let recycler = PacketsRecycler::default(); for _ in 0..num_sockets { let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap(); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index be7ed79bf1..dde6f07168 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1923,7 +1923,7 @@ impl ClusterInfo { let mut last_contact_info_trace = timestamp(); let mut last_contact_info_save = timestamp(); let mut entrypoints_processed = false; - let recycler = PacketsRecycler::new_without_limit("gossip-recycler-shrink-stats"); + let recycler = PacketsRecycler::default(); let crds_data = vec![ CrdsData::Version(Version::new(self.id())), CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())), @@ -2187,7 +2187,7 @@ impl ClusterInfo { .process_pull_requests(callers.cloned(), timestamp()); let output_size_limit = self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; - let mut packets = Packets::new_with_recycler(recycler.clone(), 64).unwrap(); + let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { let mut rng = rand::thread_rng(); let check_pull_request = @@ -2472,7 +2472,8 @@ impl ClusterInfo { if packets.is_empty() { None } else { - let packets = Packets::new_with_recycler_data(recycler, packets).unwrap(); + let packets = + Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets); Some(packets) } } @@ -3164,8 +3165,7 @@ impl ClusterInfo { exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); - let recycler = - PacketsRecycler::new_without_limit("cluster-info-listen-recycler-shrink-stats"); + let recycler = PacketsRecycler::default(); Builder::new() .name("solana-listen".to_string()) .spawn(move || { @@ -3611,7 +3611,7 @@ mod tests { .iter() .map(|ping| Pong::new(ping, &this_node).unwrap()) .collect(); - let recycler = PacketsRecycler::new_without_limit(""); + let recycler = PacketsRecycler::default(); let packets = cluster_info .handle_ping_messages( remote_nodes diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 46999ad37f..418e748d6c 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -35,7 +35,6 @@ impl FetchStage { exit, &sender, &poh_recorder, - None, coalesce_ms, ), receiver, @@ -47,7 +46,6 @@ impl FetchStage { exit: &Arc, sender: &PacketSender, poh_recorder: &Arc>, - allocated_packet_limit: Option, coalesce_ms: u64, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); @@ -58,7 +56,6 @@ impl FetchStage { exit, &sender, &poh_recorder, - allocated_packet_limit, coalesce_ms, ) } @@ -104,11 +101,9 @@ impl FetchStage { exit: &Arc, sender: &PacketSender, poh_recorder: &Arc>, - limit: Option, coalesce_ms: u64, ) -> Self { - let recycler: PacketsRecycler = - Recycler::warmed(1000, 1024, limit, "fetch_stage_recycler_shrink"); + let recycler: PacketsRecycler = Recycler::warmed(1000, 1024); let tpu_threads = sockets.into_iter().map(|socket| { streamer::receiver( diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 3bf4786629..d73b24dca9 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -47,7 +47,7 @@ impl GossipService { gossip_socket.clone(), &exit, request_sender, - Recycler::new_without_limit("gossip-receiver-recycler-shrink-stats"), + Recycler::default(), "gossip_receiver", 1, ); diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index ecde46cc97..58682ef56f 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -278,7 +278,7 @@ impl ServeRepair { exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); - let recycler = PacketsRecycler::new_without_limit("serve-repair-recycler-shrink-stats"); + let recycler = PacketsRecycler::default(); Builder::new() .name("solana-repair-listen".to_string()) .spawn(move || { @@ -490,7 +490,11 @@ impl ServeRepair { if let Some(packet) = packet { inc_new_counter_debug!("serve_repair-window-request-ledger", 1); - return Some(Packets::new_with_recycler_data(recycler, vec![packet])).unwrap(); + return Some(Packets::new_with_recycler_data( + recycler, + "run_window_request", + vec![packet], + )); } } @@ -526,7 +530,11 @@ impl ServeRepair { from_addr, nonce, )?; - return Packets::new_with_recycler_data(recycler, vec![packet]); + return Some(Packets::new_with_recycler_data( + recycler, + "run_highest_window_request", + vec![packet], + )); } None } @@ -539,7 +547,7 @@ impl ServeRepair { max_responses: usize, nonce: Nonce, ) -> Option { - let mut res = Packets::new_with_recycler(recycler.clone(), 64).unwrap(); + let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan"); if let Some(blockstore) = blockstore { // Try to find the next "n" parent slots of the input slot while let Ok(Some(meta)) = blockstore.meta(slot) { @@ -593,7 +601,7 @@ mod tests { /// test run_window_request responds with the right shred, and do not overrun fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) { - let recycler = PacketsRecycler::new_without_limit(""); + let recycler = PacketsRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { @@ -661,7 +669,7 @@ mod tests { /// test window requests respond with the right shred, and do not overrun fn run_window_request(slot: Slot, nonce: Nonce) { - let recycler = PacketsRecycler::new_without_limit(""); + let recycler = PacketsRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { @@ -829,7 +837,7 @@ mod tests { fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) { solana_logger::setup(); - let recycler = PacketsRecycler::new_without_limit(""); + let recycler = PacketsRecycler::default(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); @@ -900,7 +908,7 @@ mod tests { #[test] fn run_orphan_corrupted_shred_size() { solana_logger::setup(); - let recycler = PacketsRecycler::new_without_limit(""); + let recycler = PacketsRecycler::default(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index 2f8bb7819b..f2ef152e9c 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -30,7 +30,7 @@ impl ServeRepairService { serve_repair_socket.clone(), &exit, request_sender, - Recycler::new_without_limit("serve-repair-receiver-recycler-shrink-stats"), + Recycler::default(), "serve_repair_receiver", 1, ); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 2a5e1b7fd3..4f79f8ba0e 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -168,10 +168,8 @@ impl ShredFetchStage { sender: &PacketSender, bank_forks: Option>>, exit: &Arc, - limit: Option, ) -> Self { - let recycler: PacketsRecycler = - Recycler::warmed(100, 1024, limit, "shred_fetch_stage_recycler_shrink"); + let recycler: PacketsRecycler = Recycler::warmed(100, 1024); let (mut tvu_threads, tvu_filter) = Self::packet_modifier( sockets, diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 2103c1b3fa..fbf6ea11ae 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -23,8 +23,8 @@ impl Default for TransactionSigVerifier { fn default() -> Self { init(); Self { - recycler: Recycler::warmed(50, 4096, None, ""), - recycler_out: Recycler::warmed(50, 4096, None, ""), + recycler: Recycler::warmed(50, 4096), + recycler_out: Recycler::warmed(50, 4096), } } } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 99f74036d3..98ff0bd2b6 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -25,10 +25,7 @@ impl ShredSigVerifier { Self { bank_forks, leader_schedule_cache, - recycler_cache: RecyclerCache::warmed( - "shred-sig-verifier-offsets-recycler-shrink-stats", - "shred-sig-verifier-buffer-recycler-shrink-stats", - ), + recycler_cache: RecyclerCache::warmed(), } } fn read_slots(batches: &[Packets]) -> HashSet { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 600fb891e6..f01fcf97ba 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -75,9 +75,6 @@ impl Tpu { &exit, &packet_sender, &poh_recorder, - // At 1024 packets per `Packet`, each packet about MTU size ~1k, this is roughly - // 20GB - Some(20_000), tpu_coalesce_ms, ); let (verified_sender, verified_receiver) = unbounded(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index eea4b27524..d94e6edf1e 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -151,7 +151,6 @@ impl Tvu { &fetch_sender, Some(bank_forks.clone()), &exit, - None, ); let (verified_sender, verified_receiver) = unbounded(); diff --git a/ledger/benches/sigverify_shreds.rs b/ledger/benches/sigverify_shreds.rs index 0fd83dd037..116e1fd77a 100644 --- a/ledger/benches/sigverify_shreds.rs +++ b/ledger/benches/sigverify_shreds.rs @@ -16,7 +16,7 @@ const NUM_PACKETS: usize = 256; const NUM_BATCHES: usize = 1; #[bench] fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { - let recycler_cache = RecyclerCache::new("", ""); + let recycler_cache = RecyclerCache::default(); let mut packets = Packets::default(); packets.packets.set_pinnable(); diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index 4df5257386..9697d15778 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -258,21 +258,12 @@ pub struct EntryVerificationState { device_verification_data: DeviceVerificationData, } -#[derive(Clone)] +#[derive(Default, Clone)] pub struct VerifyRecyclers { hash_recycler: Recycler>, tick_count_recycler: Recycler>, } -impl Default for VerifyRecyclers { - fn default() -> Self { - Self { - hash_recycler: Recycler::new_without_limit("hash_recycler_shrink_stats"), - tick_count_recycler: Recycler::new_without_limit("tick_count_recycler_shrink_stats"), - } - } -} - #[derive(PartialEq, Clone, Copy, Debug)] pub enum EntryVerificationStatus { Failure, @@ -590,12 +581,14 @@ impl EntrySlice for [Entry] { .take(self.len()) .collect(); - let mut hashes_pinned = recyclers.hash_recycler.allocate().unwrap(); + let mut hashes_pinned = recyclers.hash_recycler.allocate("poh_verify_hash"); hashes_pinned.set_pinnable(); hashes_pinned.resize(hashes.len(), Hash::default()); hashes_pinned.copy_from_slice(&hashes); - let mut num_hashes_vec = recyclers.tick_count_recycler.allocate().unwrap(); + let mut num_hashes_vec = recyclers + .tick_count_recycler + .allocate("poh_verify_num_hashes"); num_hashes_vec.reserve_and_pin(cmp::max(1, self.len())); for entry in self { num_hashes_vec.push(entry.num_hashes.saturating_sub(1)); diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 6601f07a36..effe496a2b 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -137,7 +137,7 @@ fn slot_key_data_for_gpu< .push(*slot); } } - let mut keyvec = recycler_cache.buffer().allocate().unwrap(); + let mut keyvec = recycler_cache.buffer().allocate("shred_gpu_pubkeys"); keyvec.set_pinnable(); let mut slot_to_key_ix = HashMap::new(); @@ -152,7 +152,7 @@ fn slot_key_data_for_gpu< slot_to_key_ix.insert(s, i); } } - let mut offsets = recycler_cache.offsets().allocate().unwrap(); + let mut offsets = recycler_cache.offsets().allocate("shred_offsets"); offsets.set_pinnable(); slots.iter().for_each(|packet_slots| { packet_slots.iter().for_each(|slot| { @@ -185,11 +185,11 @@ fn shred_gpu_offsets( batches: &[Packets], recycler_cache: &RecyclerCache, ) -> (TxOffset, TxOffset, TxOffset, Vec>) { - let mut signature_offsets = recycler_cache.offsets().allocate().unwrap(); + let mut signature_offsets = recycler_cache.offsets().allocate("shred_signatures"); signature_offsets.set_pinnable(); - let mut msg_start_offsets = recycler_cache.offsets().allocate().unwrap(); + let mut msg_start_offsets = recycler_cache.offsets().allocate("shred_msg_starts"); msg_start_offsets.set_pinnable(); - let mut msg_sizes = recycler_cache.offsets().allocate().unwrap(); + let mut msg_sizes = recycler_cache.offsets().allocate("shred_msg_sizes"); msg_sizes.set_pinnable(); let mut v_sig_lens = vec![]; for batch in batches.iter() { @@ -242,7 +242,7 @@ pub fn verify_shreds_gpu( trace!("pubkeys_len: {}", pubkeys_len); let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) = shred_gpu_offsets(pubkeys_len, batches, recycler_cache); - let mut out = recycler_cache.buffer().allocate().unwrap(); + let mut out = recycler_cache.buffer().allocate("out_buffer"); out.set_pinnable(); elems.push( perf_libs::Elems { @@ -332,7 +332,7 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) { } pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec { - let mut vec = cache.buffer().allocate().unwrap(); + let mut vec = cache.buffer().allocate("pinned_keypair"); let pubkey = keypair.pubkey().to_bytes(); let secret = keypair.secret().to_bytes(); let mut hasher = Sha512::default(); @@ -370,17 +370,17 @@ pub fn sign_shreds_gpu( let mut num_packets = num_keypair_packets; //should be zero - let mut pubkey_offsets = recycler_cache.offsets().allocate().unwrap(); + let mut pubkey_offsets = recycler_cache.offsets().allocate("pubkey offsets"); pubkey_offsets.resize(count, 0); - let mut secret_offsets = recycler_cache.offsets().allocate().unwrap(); + let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets"); secret_offsets.resize(count, pubkey_size as u32); trace!("offset: {}", offset); let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) = shred_gpu_offsets(offset, batches, recycler_cache); let total_sigs = signature_offsets.len(); - let mut signatures_out = recycler_cache.buffer().allocate().unwrap(); + let mut signatures_out = recycler_cache.buffer().allocate("ed25519 signatures"); signatures_out.set_pinnable(); signatures_out.resize(total_sigs * sig_size, 0); elems.push( @@ -560,7 +560,7 @@ pub mod tests { fn run_test_sigverify_shreds_gpu(slot: Slot) { solana_logger::setup(); - let recycler_cache = RecyclerCache::new("", ""); + let recycler_cache = RecyclerCache::default(); let mut batch = [Packets::default()]; let mut shred = Shred::new_from_data( @@ -624,7 +624,7 @@ pub mod tests { fn run_test_sigverify_shreds_sign_gpu(slot: Slot) { solana_logger::setup(); - let recycler_cache = RecyclerCache::new("", ""); + let recycler_cache = RecyclerCache::default(); let mut packets = Packets::default(); let num_packets = 32; diff --git a/perf/Cargo.toml b/perf/Cargo.toml index 5e55de4049..8ab4a157fd 100644 --- a/perf/Cargo.toml +++ b/perf/Cargo.toml @@ -22,7 +22,6 @@ solana-sdk = { path = "../sdk", version = "=1.7.0" } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.7.0" } solana-budget-program = { path = "../programs/budget", version = "=1.7.0" } solana-logger = { path = "../logger", version = "=1.7.0" } -solana-measure = { path = "../measure", version = "=1.7.0" } solana-metrics = { path = "../metrics", version = "=1.7.0" } curve25519-dalek = { version = "2" } diff --git a/perf/benches/recycler.rs b/perf/benches/recycler.rs index a2a9788a07..ed7b36534f 100644 --- a/perf/benches/recycler.rs +++ b/perf/benches/recycler.rs @@ -10,13 +10,13 @@ use test::Bencher; fn bench_recycler(bencher: &mut Bencher) { solana_logger::setup(); - let recycler: PacketsRecycler = Recycler::new_without_limit("me"); + let recycler: PacketsRecycler = Recycler::default(); for _ in 0..1000 { - recycler.recycle_for_test(recycler.allocate().expect("There is no limit")); + let _packet = recycler.allocate(""); } bencher.iter(move || { - recycler.recycle_for_test(recycler.allocate().expect("There is no limit")); + let _packet = recycler.allocate(""); }); } diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 75aea3eaa4..588392fa55 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -15,8 +15,8 @@ fn bench_sigverify(bencher: &mut Bencher) { // generate packet vector let mut batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::>(), 128); - let recycler = Recycler::new_without_limit(""); - let recycler_out = Recycler::new_without_limit(""); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); // verify packets bencher.iter(|| { let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); @@ -30,7 +30,7 @@ fn bench_get_offsets(bencher: &mut Bencher) { // generate packet vector let batches = to_packets_chunked(&std::iter::repeat(tx).take(1024).collect::>(), 1024); - let recycler = Recycler::new_without_limit(""); + let recycler = Recycler::default(); // verify packets bencher.iter(|| { let _ans = sigverify::generate_offsets(&batches, &recycler); diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index fee2405b06..158fe8d369 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -70,9 +70,6 @@ impl Reset for PinnedVec { fn set_recycler(&mut self, recycler: Weak>) { self.recycler = recycler; } - fn unset_recycler(&mut self) { - self.recycler = Weak::default(); - } } impl From> for Vec { diff --git a/perf/src/packet.rs b/perf/src/packet.rs index a6cf3d1d10..946124e7fe 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -28,21 +28,19 @@ impl Packets { Packets { packets } } - pub fn new_with_recycler(recycler: PacketsRecycler, size: usize) -> Option { - let maybe_packets = recycler.allocate(); - maybe_packets.map(|mut packets| { - packets.reserve_and_pin(size); - Packets { packets } - }) + pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { + let mut packets = recycler.allocate(name); + packets.reserve_and_pin(size); + Packets { packets } } pub fn new_with_recycler_data( recycler: &PacketsRecycler, + name: &'static str, mut packets: Vec, - ) -> Option { - Self::new_with_recycler(recycler.clone(), packets.len()).map(|mut vec| { - vec.packets.append(&mut packets); - vec - }) + ) -> Self { + let mut vec = Self::new_with_recycler(recycler.clone(), packets.len(), name); + vec.packets.append(&mut packets); + vec } pub fn set_addr(&mut self, addr: &SocketAddr) { @@ -78,7 +76,11 @@ pub fn to_packets_with_destination( recycler: PacketsRecycler, dests_and_data: &[(SocketAddr, T)], ) -> Packets { - let mut out = Packets::new_with_recycler(recycler, dests_and_data.len()).unwrap(); + let mut out = Packets::new_with_recycler( + recycler, + dests_and_data.len(), + "to_packets_with_destination", + ); out.packets.resize(dests_and_data.len(), Packet::default()); for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) { if !dest_and_data.0.ip().is_unspecified() && dest_and_data.0.port() != 0 { @@ -136,9 +138,9 @@ mod tests { #[test] fn test_to_packets_pinning() { - let recycler = PacketsRecycler::new_without_limit(""); + let recycler = PacketsRecycler::default(); for i in 0..2 { - let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1); + let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1, "first one"); } } } diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index e1dfaa937e..1d94d8e4e2 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -1,24 +1,7 @@ use rand::{thread_rng, Rng}; -use solana_measure::measure::Measure; -use std::{ - convert::TryFrom, - sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, Weak, - }, - time::Instant, -}; - -pub const DEFAULT_MINIMUM_OBJECT_COUNT: u32 = 1000; -pub const DEFAULT_SHRINK_PCT: u32 = 80; -pub const DEFAULT_MAX_ABOVE_SHRINK_PCT_COUNT: u32 = 10; -pub const DEFAULT_CHECK_SHRINK_INTERVAL_MS: u32 = 10000; - -enum AllocationDecision { - Reuse(T), - Allocate(u32, usize), - AllocationLimitReached, -} +use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, Weak}; #[derive(Debug, Default)] struct RecyclerStats { @@ -28,226 +11,36 @@ struct RecyclerStats { max_gc: AtomicUsize, } -#[derive(Debug, Default)] -struct RecyclerShrinkStats { - resulting_size: u32, - target_size: u32, - ideal_num_to_remove: u32, - shrink_elapsed: u64, - drop_elapsed: u64, -} - -impl RecyclerShrinkStats { - fn report(&self, shrink_metric_name: &'static str) { - datapoint_info!( - shrink_metric_name, - ("target_size", self.target_size as i64, i64), - ("resulting_size", self.resulting_size as i64, i64), - ("ideal_num_to_remove", self.ideal_num_to_remove as i64, i64), - ("recycler_shrink_elapsed", self.shrink_elapsed as i64, i64), - ("drop_elapsed", self.drop_elapsed as i64, i64) - ); - } -} - -#[derive(Clone)] -pub struct Recycler { +#[derive(Clone, Default)] +pub struct Recycler { recycler: Arc>, - shrink_metric_name: &'static str, -} - -impl Recycler { - pub fn new_without_limit(shrink_metric_name: &'static str) -> Self { - Self { - recycler: Arc::new(RecyclerX::default()), - shrink_metric_name, - } - } - - pub fn new_with_limit(shrink_metric_name: &'static str, limit: u32) -> Self { - Self { - recycler: Arc::new(RecyclerX::new(Some(limit))), - shrink_metric_name, - } - } } #[derive(Debug)] -pub struct ObjectPool { - objects: Vec, - shrink_pct: u32, - minimum_object_count: u32, - above_shrink_pct_count: u32, - max_above_shrink_pct_count: u32, - check_shrink_interval_ms: u32, - last_shrink_check_time: Instant, - pub total_allocated_count: u32, - limit: Option, -} -impl Default for ObjectPool { - fn default() -> Self { - ObjectPool { - objects: vec![], - shrink_pct: DEFAULT_SHRINK_PCT, - minimum_object_count: DEFAULT_MINIMUM_OBJECT_COUNT, - above_shrink_pct_count: 0, - max_above_shrink_pct_count: DEFAULT_MAX_ABOVE_SHRINK_PCT_COUNT, - check_shrink_interval_ms: DEFAULT_CHECK_SHRINK_INTERVAL_MS, - last_shrink_check_time: Instant::now(), - total_allocated_count: 0, - limit: None, - } - } -} - -impl ObjectPool { - fn new(limit: Option) -> Self { - Self { - limit, - ..Self::default() - } - } - - fn len(&self) -> usize { - self.objects.len() - } - - fn get_shrink_target(shrink_pct: u32, current_size: u32) -> u32 { - let shrink_pct = u64::from(shrink_pct); - let current_size = u64::from(current_size); - let shrink_target = shrink_pct - .saturating_mul(current_size) - .saturating_add(99) - .checked_div(100) - .unwrap_or(0); - u32::try_from(shrink_target).unwrap_or(u32::MAX) - } - - fn shrink_if_necessary( - &mut self, - recycler_name: &'static str, - ) -> Option<(RecyclerShrinkStats, Vec)> { - let is_consistent = self.total_allocated_count as usize >= self.len(); - assert!( - is_consistent, - "Object pool inconsistent: {} {} {}", - self.total_allocated_count, - self.len(), - recycler_name - ); - if self.last_shrink_check_time.elapsed().as_millis() > self.check_shrink_interval_ms as u128 - { - self.last_shrink_check_time = Instant::now(); - let shrink_threshold_count = - Self::get_shrink_target(self.shrink_pct, self.total_allocated_count); - - // If more than the shrink threshold of all allocated objects are sitting doing nothing, - // increment the `above_shrink_pct_count`. - if self.len() > self.minimum_object_count as usize - && self.len() > shrink_threshold_count as usize - { - self.above_shrink_pct_count = self.above_shrink_pct_count.saturating_add(1); - } else { - self.above_shrink_pct_count = 0; - } - - if self.above_shrink_pct_count as usize >= self.max_above_shrink_pct_count as usize { - let mut recycler_shrink_elapsed = Measure::start("recycler_shrink"); - // Do the shrink - let target_size = std::cmp::max(self.minimum_object_count, shrink_threshold_count); - let ideal_num_to_remove = self.total_allocated_count.saturating_sub(target_size); - let mut shrink_removed_objects = Vec::with_capacity(ideal_num_to_remove as usize); - for _ in 0..ideal_num_to_remove { - if let Some(mut expired_object) = self.objects.pop() { - expired_object.unset_recycler(); - // Drop these outside of the lock because the Drop() implmentation for - // certain objects like PinnedVec's can be expensive - shrink_removed_objects.push(expired_object); - // May not be able to shrink exactly `ideal_num_to_remove` objects since - // in the case of new allocations, `total_allocated_count` is incremented - // before the object is allocated (see `should_allocate_new` logic below). - // This race allows a difference of up to the number of threads allocating - // with this recycler. - self.total_allocated_count = self.total_allocated_count.saturating_sub(1); - } else { - break; - } - } - recycler_shrink_elapsed.stop(); - self.above_shrink_pct_count = 0; - Some(( - RecyclerShrinkStats { - resulting_size: self.total_allocated_count, - target_size, - ideal_num_to_remove, - shrink_elapsed: recycler_shrink_elapsed.as_us(), - // Filled in later - drop_elapsed: 0, - }, - shrink_removed_objects, - )) - } else { - None - } - } else { - None - } - } - - fn make_allocation_decision(&mut self) -> AllocationDecision { - if let Some(reused_object) = self.objects.pop() { - AllocationDecision::Reuse(reused_object) - } else if let Some(limit) = self.limit { - if self.total_allocated_count < limit { - self.total_allocated_count = self.total_allocated_count.saturating_add(1); - AllocationDecision::Allocate(self.total_allocated_count, self.len()) - } else { - AllocationDecision::AllocationLimitReached - } - } else { - self.total_allocated_count = self.total_allocated_count.saturating_add(1); - AllocationDecision::Allocate(self.total_allocated_count, self.len()) - } - } -} - -#[derive(Debug)] -pub struct RecyclerX { - gc: Mutex>, +pub struct RecyclerX { + gc: Mutex>, stats: RecyclerStats, id: usize, } -impl Default for RecyclerX { +impl Default for RecyclerX { fn default() -> RecyclerX { let id = thread_rng().gen_range(0, 1000); trace!("new recycler..{}", id); RecyclerX { - gc: Mutex::new(ObjectPool::default()), + gc: Mutex::new(vec![]), stats: RecyclerStats::default(), id, } } } -impl RecyclerX { - fn new(limit: Option) -> Self { - RecyclerX { - gc: Mutex::new(ObjectPool::new(limit)), - ..Self::default() - } - } -} - pub trait Reset { fn reset(&mut self); fn warm(&mut self, size_hint: usize); fn set_recycler(&mut self, recycler: Weak>) where Self: std::marker::Sized; - fn unset_recycler(&mut self) - where - Self: std::marker::Sized; } lazy_static! { @@ -263,21 +56,12 @@ fn warm_recyclers() -> bool { } impl Recycler { - pub fn warmed( - num: u32, - size_hint: usize, - limit: Option, - shrink_metric_name: &'static str, - ) -> Self { - assert!(num <= limit.unwrap_or(std::u32::MAX)); - let new = Self { - recycler: Arc::new(RecyclerX::new(limit)), - shrink_metric_name, - }; + pub fn warmed(num: usize, size_hint: usize) -> Self { + let new = Self::default(); if warm_recyclers() { let warmed_items: Vec<_> = (0..num) .map(|_| { - let mut item = new.allocate().unwrap(); + let mut item = new.allocate("warming"); item.warm(size_hint); item }) @@ -289,55 +73,33 @@ impl Recycler { new } - pub fn allocate(&self) -> Option { - let (allocation_decision, shrink_output) = { - let mut object_pool = self - .recycler - .gc - .lock() - .expect("recycler lock in pb fn allocate"); + pub fn allocate(&self, name: &'static str) -> T { + let new = self + .recycler + .gc + .lock() + .expect("recycler lock in pb fn allocate") + .pop(); - let shrink_output = object_pool.shrink_if_necessary(self.shrink_metric_name); - - // Grab the allocation decision and shrinking stats, do the expensive - // allocations/deallocations outside of the lock. - (object_pool.make_allocation_decision(), shrink_output) - }; - - if let Some((mut shrink_stats, shrink_removed_objects)) = shrink_output { - let mut shrink_removed_object_elapsed = Measure::start("shrink_removed_object_elapsed"); - drop(shrink_removed_objects); - shrink_removed_object_elapsed.stop(); - shrink_stats.drop_elapsed = shrink_removed_object_elapsed.as_us(); - shrink_stats.report(self.shrink_metric_name); + if let Some(mut x) = new { + self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); + x.reset(); + return x; } - match allocation_decision { - AllocationDecision::Reuse(mut reused_object) => { - self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); - reused_object.reset(); - Some(reused_object) - } - AllocationDecision::Allocate(total_allocated_count, recycled_len) => { - let mut t = T::default(); - t.set_recycler(Arc::downgrade(&self.recycler)); - if total_allocated_count % 1000 == 0 { - datapoint_info!( - "recycler_total_allocated_count", - ("name", self.shrink_metric_name, String), - ("count", total_allocated_count as i64, i64), - ("recycled_len", recycled_len as i64, i64), - ) - } - Some(t) - } + let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed); + trace!( + "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}", + total, + name, + self.recycler.id, + self.recycler.stats.reuse.load(Ordering::Relaxed), + self.recycler.stats.max_gc.load(Ordering::Relaxed), + ); - AllocationDecision::AllocationLimitReached => None, - } - } - - pub fn recycle_for_test(&self, x: T) { - self.recycler.recycle(x); + let mut t = T::default(); + t.set_recycler(Arc::downgrade(&self.recycler)); + t } } @@ -345,7 +107,7 @@ impl RecyclerX { pub fn recycle(&self, x: T) { let len = { let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); - gc.objects.push(x); + gc.push(x); gc.len() }; @@ -375,8 +137,6 @@ impl RecyclerX { #[cfg(test)] mod tests { use super::*; - use crate::packet::PacketsRecycler; - use std::{thread::sleep, time::Duration}; impl Reset for u64 { fn reset(&mut self) { @@ -384,115 +144,19 @@ mod tests { } fn warm(&mut self, _size_hint: usize) {} fn set_recycler(&mut self, _recycler: Weak>) {} - fn unset_recycler(&mut self) {} } #[test] fn test_recycler() { - let recycler = Recycler::new_without_limit(""); - let mut y: u64 = recycler.allocate().unwrap(); + let recycler = Recycler::default(); + let mut y: u64 = recycler.allocate("test_recycler1"); assert_eq!(y, 0); y = 20; let recycler2 = recycler.clone(); recycler2.recycler.recycle(y); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1); - let z = recycler.allocate().unwrap(); + let z = recycler.allocate("test_recycler2"); assert_eq!(z, 10); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); } - - #[test] - fn test_recycler_limit() { - let limit = 10; - assert!(limit <= DEFAULT_MINIMUM_OBJECT_COUNT); - // Use PacketRecycler so that dropping the allocated object - // actually recycles - let recycler = PacketsRecycler::new_with_limit("", limit); - let mut allocated_items = vec![]; - for i in 0..limit * 2 { - let x = recycler.allocate(); - if i < limit { - allocated_items.push(x.unwrap()); - } else { - assert!(x.is_none()); - } - } - assert_eq!( - recycler.recycler.gc.lock().unwrap().total_allocated_count, - limit - ); - assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0_usize); - drop(allocated_items); - assert_eq!( - recycler.recycler.gc.lock().unwrap().total_allocated_count, - limit - ); - assert_eq!(recycler.recycler.gc.lock().unwrap().len(), limit as usize); - } - - #[test] - fn test_recycler_shrink() { - let limit = DEFAULT_MINIMUM_OBJECT_COUNT * 2; - let max_above_shrink_pct_count = 2; - let shrink_pct = 80; - let recycler = PacketsRecycler::new_with_limit("", limit); - { - let mut locked_recycler = recycler.recycler.gc.lock().unwrap(); - // Make the shrink interval a long time so shrinking doesn't happen yet - locked_recycler.check_shrink_interval_ms = std::u32::MAX; - // Set the count to one so that we shrink on every other allocation later. - locked_recycler.max_above_shrink_pct_count = max_above_shrink_pct_count; - locked_recycler.shrink_pct = shrink_pct; - } - - let mut allocated_items = vec![]; - for _ in 0..limit { - allocated_items.push(recycler.allocate().unwrap()); - } - assert_eq!( - recycler.recycler.gc.lock().unwrap().total_allocated_count, - limit - ); - assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); - drop(allocated_items); - assert_eq!(recycler.recycler.gc.lock().unwrap().len(), limit as usize); - - let shrink_interval = 10; - { - let mut locked_recycler = recycler.recycler.gc.lock().unwrap(); - locked_recycler.check_shrink_interval_ms = shrink_interval; - } - - let mut current_total_allocated_count = - recycler.recycler.gc.lock().unwrap().total_allocated_count; - - // Shrink the recycler until it hits the minimum - let mut i = 0; - while current_total_allocated_count != DEFAULT_MINIMUM_OBJECT_COUNT { - sleep(Duration::from_millis(shrink_interval as u64 * 2)); - recycler.allocate().unwrap(); - let expected_above_shrink_pct_count = (i + 1) % max_above_shrink_pct_count; - assert_eq!( - recycler.recycler.gc.lock().unwrap().above_shrink_pct_count, - (i + 1) % max_above_shrink_pct_count - ); - if expected_above_shrink_pct_count == 0 { - // Shrink happened, update the expected `current_total_allocated_count`; - current_total_allocated_count = std::cmp::max( - ObjectPool::::get_shrink_target(shrink_pct, current_total_allocated_count), - DEFAULT_MINIMUM_OBJECT_COUNT, - ); - assert_eq!( - recycler.recycler.gc.lock().unwrap().total_allocated_count, - current_total_allocated_count - ); - assert_eq!( - recycler.recycler.gc.lock().unwrap().len(), - current_total_allocated_count as usize - ); - } - - i += 1; - } - } } diff --git a/perf/src/recycler_cache.rs b/perf/src/recycler_cache.rs index b46875bd0f..5dcf777230 100644 --- a/perf/src/recycler_cache.rs +++ b/perf/src/recycler_cache.rs @@ -2,24 +2,17 @@ use crate::cuda_runtime::PinnedVec; use crate::recycler::Recycler; use crate::sigverify::TxOffset; -#[derive(Clone)] +#[derive(Default, Clone)] pub struct RecyclerCache { recycler_offsets: Recycler, recycler_buffer: Recycler>, } impl RecyclerCache { - pub fn new(offsets_shrink_name: &'static str, buffer_shrink_name: &'static str) -> Self { + pub fn warmed() -> Self { Self { - recycler_offsets: Recycler::new_without_limit(offsets_shrink_name), - recycler_buffer: Recycler::new_without_limit(buffer_shrink_name), - } - } - - pub fn warmed(offsets_shrink_name: &'static str, buffer_shrink_name: &'static str) -> Self { - Self { - recycler_offsets: Recycler::warmed(50, 4096, None, offsets_shrink_name), - recycler_buffer: Recycler::warmed(50, 4096, None, buffer_shrink_name), + recycler_offsets: Recycler::warmed(50, 4096), + recycler_buffer: Recycler::warmed(50, 4096), } } pub fn offsets(&self) -> &Recycler { diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 356d217cb8..a012f1bead 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -247,13 +247,13 @@ fn get_packet_offsets(packet: &Packet, current_offset: usize) -> PacketOffsets { pub fn generate_offsets(batches: &[Packets], recycler: &Recycler) -> TxOffsets { debug!("allocating.."); - let mut signature_offsets: PinnedVec<_> = recycler.allocate().unwrap(); + let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets"); signature_offsets.set_pinnable(); - let mut pubkey_offsets: PinnedVec<_> = recycler.allocate().unwrap(); + let mut pubkey_offsets: PinnedVec<_> = recycler.allocate("pubkey_offsets"); pubkey_offsets.set_pinnable(); - let mut msg_start_offsets: PinnedVec<_> = recycler.allocate().unwrap(); + let mut msg_start_offsets: PinnedVec<_> = recycler.allocate("msg_start_offsets"); msg_start_offsets.set_pinnable(); - let mut msg_sizes: PinnedVec<_> = recycler.allocate().unwrap(); + let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets"); msg_sizes.set_pinnable(); let mut current_offset: usize = 0; let mut v_sig_lens = Vec::new(); @@ -405,7 +405,7 @@ pub fn ed25519_verify( debug!("CUDA ECDSA for {}", batch_size(batches)); debug!("allocating out.."); - let mut out = recycler_out.allocate().unwrap(); + let mut out = recycler_out.allocate("out_buffer"); out.set_pinnable(); let mut elems = Vec::new(); let mut rvs = Vec::new(); @@ -748,8 +748,8 @@ mod tests { let mut batches = generate_packet_vec(&packet, n, 2); - let recycler = Recycler::new_without_limit(""); - let recycler_out = Recycler::new_without_limit(""); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); // verify packets sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); @@ -770,8 +770,8 @@ mod tests { let mut batches = generate_packet_vec(&packet, 1, 1); - let recycler = Recycler::new_without_limit(""); - let recycler_out = Recycler::new_without_limit(""); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); // verify packets sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); assert!(batches @@ -810,8 +810,8 @@ mod tests { batches[0].packets.push(packet); - let recycler = Recycler::new_without_limit(""); - let recycler_out = Recycler::new_without_limit(""); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); // verify packets sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out); @@ -840,8 +840,8 @@ mod tests { let tx = test_multisig_tx(); let packet = sigverify::make_packet_from_transaction(tx); - let recycler = Recycler::new_without_limit(""); - let recycler_out = Recycler::new_without_limit(""); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); for _ in 0..50 { let n = thread_rng().gen_range(1, 30); let num_batches = thread_rng().gen_range(2, 30); diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 339d45a891..349142a565 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -42,10 +42,7 @@ fn recv_loop( let mut now = Instant::now(); let mut num_max_received = 0; // Number of times maximum packets were received loop { - let (mut msgs, should_send) = - Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH) - .map(|allocated| (allocated, true)) - .unwrap_or((Packets::with_capacity(PACKETS_PER_BATCH), false)); + let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name); loop { // Check for exit signal, even if socket is busy // (for instance the leader transaction socket) @@ -58,7 +55,7 @@ fn recv_loop( } recv_count += len; call_count += 1; - if len > 0 && should_send { + if len > 0 { channel.send(msgs)?; } break; @@ -211,7 +208,7 @@ mod test { Arc::new(read), &exit, s_reader, - Recycler::new_without_limit(""), + Recycler::default(), "test", 1, );