diff --git a/Cargo.lock b/Cargo.lock index 1e6eea8bf..e27d84d44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1436,6 +1436,35 @@ name = "itoa" version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "jemalloc-ctl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "jemalloc-sys 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", + "paste 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "jemalloc-sys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cc 1.0.47 (registry+https://github.com/rust-lang/crates.io-index)", + "fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "jemallocator" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "jemalloc-sys 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "jobserver" version = "0.1.16" @@ -2069,6 +2098,26 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "paste" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "paste-impl 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-hack 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "paste-impl" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro-hack 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "pbkdf2" version = "0.3.0" @@ -3294,6 +3343,8 @@ dependencies = [ "hex-literal 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "jemalloc-ctl 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "jemallocator 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 14.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-derive 14.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-http-server 14.0.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5536,6 +5587,9 @@ dependencies = [ "checksum itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0d47946d458e94a1b7bcabbf6521ea7c037062c81f534615abcad76e84d4970d" "checksum itertools 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "87fa75c9dea7b07be3138c49abbb83fd4bea199b5cdc76f9804458edc5da0d6e" "checksum itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "501266b7edd0174f8530248f87f99c88fbe60ca4ef3dd486835b8d8d53136f7f" +"checksum jemalloc-ctl 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c502a5ff9dd2924f1ed32ba96e3b65735d837b4bfd978d3161b1702e66aca4b7" +"checksum jemalloc-sys 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0d3b9f3f5c9b31aa0f5ed3260385ac205db665baa41d49bb8338008ae94ede45" +"checksum jemallocator 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "43ae63fcfc45e99ab3d1b29a46782ad679e98436c3169d15a167a1108a724b69" "checksum jobserver 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "f74e73053eaf95399bf926e48fc7a2a3ce50bd0eaaa2357d391e95b2dcdd4f10" "checksum js-sys 0.3.27 (registry+https://github.com/rust-lang/crates.io-index)" = "1efc4f2a556c58e79c5500912e221dd826bec64ff4aabd8ce71ccef6da02d7d4" "checksum jsonrpc-core 14.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "34651edf3417637cc45e70ed0182ecfa9ced0b7e8131805fccf7400d989845ca" @@ -5605,6 +5659,8 @@ dependencies = [ "checksum parking_lot_core 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "4db1a8ccf734a7bce794cc19b3df06ed87ab2f3907036b693c68f56b4d4537fa" "checksum parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94c8c7923936b28d546dfd14d4472eaf34c99b14e1c973a32b3e6d4eb04298c9" "checksum parking_lot_core 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b876b1b9e7ac6e1a74a6da34d25c42e17e8862aa409cbbbdcfc8d86c6f3bc62b" +"checksum paste 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "423a519e1c6e828f1e73b720f9d9ed2fa643dce8a7737fb43235ce0b41eeaa49" +"checksum paste-impl 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "4214c9e912ef61bf42b81ba9a47e8aad1b2ffaf739ab162bf96d1e011f54e6c5" "checksum pbkdf2 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "006c038a43a45995a9670da19e67600114740e8511d4333bf97a56e66a7542d9" "checksum peeking_take_while 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" diff --git a/core/Cargo.toml b/core/Cargo.toml index 27d1f501d..dc9e7484e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -72,6 +72,8 @@ tokio-io = "0.1" untrusted = "0.7.0" solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.21.0" } reed-solomon-erasure = { package = "solana-reed-solomon-erasure", version = "4.0.1-3", features = ["simd-accel"] } +jemallocator = "0.3.2" +jemalloc-ctl = "0.3.2" [dev-dependencies] hex-literal = "0.2.1" diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 7af7887be..ac50b4414 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -7,6 +7,7 @@ use crate::{ poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}, poh_service::PohService, result::{Error, Result}, + thread_mem_usage, }; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; @@ -110,6 +111,7 @@ impl BankingStage { Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { + thread_mem_usage::datapoint("solana-banking-stage-tx"); Self::process_loop( my_pubkey, &verified_receiver, diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index b97a2648a..57a314d0a 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -24,11 +24,13 @@ use crate::{ repair_service::RepairType, result::{Error, Result}, sendmmsg::{multicast, send_mmsg}, + thread_mem_usage, weighted_shuffle::{weighted_best, weighted_shuffle}, }; use bincode::{serialize, serialized_size}; use core::cmp; use itertools::Itertools; +use jemalloc_ctl::thread::allocatedp; use rand::{thread_rng, Rng}; use solana_ledger::{bank_forks::BankForks, blocktree::Blocktree, staking_utils}; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; @@ -36,9 +38,9 @@ use solana_net_utils::{ bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, multi_bind_in_range, PortRange, }; -use solana_perf::packet::{to_packets_with_destination, Packets}; +use solana_perf::packet::{to_packets_with_destination, Packets, PacketsRecycler}; use solana_sdk::{ - clock::Slot, + clock::{Slot, DEFAULT_MS_PER_SLOT}, pubkey::Pubkey, signature::{Keypair, KeypairUtil, Signable, Signature}, timing::{duration_as_ms, timestamp}, @@ -362,6 +364,7 @@ impl ClusterInfo { .collect(); let max_ts = votes.iter().map(|x| x.0).max().unwrap_or(since); let txs: Vec = votes.into_iter().map(|x| x.1).collect(); + inc_new_counter_info!("cluster_info-get_votes-count", txs.len()); (txs, max_ts) } @@ -403,10 +406,6 @@ impl ClusterInfo { .map(|x| x.value.contact_info().unwrap()) } - pub fn purge(&mut self, now: u64) { - self.gossip.purge(now); - } - pub fn rpc_peers(&self) -> Vec { let me = self.my_data().id; self.gossip @@ -1010,12 +1009,15 @@ impl ClusterInfo { /// At random pick a node and try to get updated changes from them fn run_gossip( obj: &Arc>, + recycler: &PacketsRecycler, stakes: &HashMap, sender: &PacketSender, ) -> Result<()> { let reqs = obj.write().unwrap().gossip_request(&stakes); - let packets = to_packets_with_destination(&reqs); - sender.send(packets)?; + if !reqs.is_empty() { + let packets = to_packets_with_destination(recycler.clone(), &reqs); + sender.send(packets)?; + } Ok(()) } @@ -1032,8 +1034,10 @@ impl ClusterInfo { .spawn(move || { let mut last_push = timestamp(); let mut last_contact_info_trace = timestamp(); + let recycler = PacketsRecycler::default(); loop { let start = timestamp(); + thread_mem_usage::datapoint("solana-gossip"); if start - last_contact_info_trace > 10000 { // Log contact info every 10 seconds info!("\n{}", obj.read().unwrap().contact_info_trace()); @@ -1046,11 +1050,30 @@ impl ClusterInfo { } None => HashMap::new(), }; - let _ = Self::run_gossip(&obj, &stakes, &sender); + let _ = Self::run_gossip(&obj, &recycler, &stakes, &sender); if exit.load(Ordering::Relaxed) { return; } - obj.write().unwrap().purge(timestamp()); + let timeout = { + if let Some(ref bank_forks) = bank_forks { + let bank = bank_forks.read().unwrap().working_bank(); + let epoch = bank.epoch(); + let epoch_schedule = bank.epoch_schedule(); + epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT + } else { + inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + } + }; + let timeouts = obj.read().unwrap().gossip.make_timeouts(&stakes, timeout); + let num_purged = obj.write().unwrap().gossip.purge(timestamp(), &timeouts); + inc_new_counter_info!("cluster_info-purge-count", num_purged); + let table_size = obj.read().unwrap().gossip.crds.table.len(); + datapoint_debug!( + "cluster_info-purge", + ("tabel_size", table_size as i64, i64), + ("purge_stake_timeout", timeout as i64, i64) + ); //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { @@ -1084,20 +1107,25 @@ impl ClusterInfo { } fn run_window_request( + recycler: &PacketsRecycler, from: &ContactInfo, from_addr: &SocketAddr, blocktree: Option<&Arc>, me: &ContactInfo, slot: Slot, shred_index: u64, - ) -> Packets { + ) -> Option { if let Some(blocktree) = blocktree { // Try to find the requested index in one of the slots let packet = Self::get_data_shred_as_packet(blocktree, slot, shred_index, from_addr); if let Ok(Some(packet)) = packet { inc_new_counter_debug!("cluster_info-window-request-ledger", 1); - return Packets::new(vec![packet]); + return Some(Packets::new_with_recycler_data( + recycler, + "run_window_request", + vec![packet], + )); } } @@ -1110,46 +1138,41 @@ impl ClusterInfo { shred_index, ); - Packets::default() + None } fn run_highest_window_request( + recycler: &PacketsRecycler, from_addr: &SocketAddr, blocktree: Option<&Arc>, slot: Slot, highest_index: u64, - ) -> Packets { - if let Some(blocktree) = blocktree { - // Try to find the requested index in one of the slots - let meta = blocktree.meta(slot); - - if let Ok(Some(meta)) = meta { - if meta.received > highest_index { - // meta.received must be at least 1 by this point - let packet = Self::get_data_shred_as_packet( - blocktree, - slot, - meta.received - 1, - from_addr, - ); - - if let Ok(Some(packet)) = packet { - return Packets::new(vec![packet]); - } - } - } + ) -> Option { + let blocktree = blocktree?; + // Try to find the requested index in one of the slots + let meta = blocktree.meta(slot).ok()??; + if meta.received > highest_index { + // meta.received must be at least 1 by this point + let packet = + Self::get_data_shred_as_packet(blocktree, slot, meta.received - 1, from_addr) + .ok()??; + return Some(Packets::new_with_recycler_data( + recycler, + "run_highest_window_request", + vec![packet], + )); } - - Packets::default() + None } fn run_orphan( + recycler: &PacketsRecycler, from_addr: &SocketAddr, blocktree: Option<&Arc>, mut slot: Slot, max_responses: usize, - ) -> Packets { - let mut res = Packets::default(); + ) -> Option { + let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan"); if let Some(blocktree) = blocktree { // Try to find the next "n" parent slots of the input slot while let Ok(Some(meta)) = blocktree.meta(slot) { @@ -1168,18 +1191,23 @@ impl ClusterInfo { } } } - - res + if res.is_empty() { + return None; + } + Some(res) } fn handle_packets( me: &Arc>, + recycler: &PacketsRecycler, blocktree: Option<&Arc>, stakes: &HashMap, packets: Packets, response_sender: &PacketSender, ) { // iter over the packets, collect pulls separately and process everything else + let allocated = allocatedp::mib().unwrap(); + let allocated = allocated.read().unwrap(); let mut gossip_pull_data: Vec = vec![]; packets.packets.iter().for_each(|packet| { let from_addr = packet.meta.addr(); @@ -1187,6 +1215,7 @@ impl ClusterInfo { .into_iter() .for_each(|request| match request { Protocol::PullRequest(filter, caller) => { + let start = allocated.get(); if !caller.verify() { inc_new_counter_error!( "cluster_info-gossip_pull_request_verify_fail", @@ -1204,8 +1233,13 @@ impl ClusterInfo { }); } } + datapoint_debug!( + "solana-gossip-listen-memory", + ("pull_request", (allocated.get() - start) as i64, i64), + ); } Protocol::PullResponse(from, mut data) => { + let start = allocated.get(); data.retain(|v| { let ret = v.verify(); if !ret { @@ -1217,8 +1251,13 @@ impl ClusterInfo { ret }); Self::handle_pull_response(me, &from, data); + datapoint_debug!( + "solana-gossip-listen-memory", + ("pull_response", (allocated.get() - start) as i64, i64), + ); } Protocol::PushMessage(from, mut data) => { + let start = allocated.get(); data.retain(|v| { let ret = v.verify(); if !ret { @@ -1229,10 +1268,17 @@ impl ClusterInfo { } ret }); - let _ignore_disconnect = response_sender - .send(Self::handle_push_message(me, &from, data, stakes)); + let rsp = Self::handle_push_message(me, recycler, &from, data, stakes); + if let Some(rsp) = rsp { + let _ignore_disconnect = response_sender.send(rsp); + } + datapoint_debug!( + "solana-gossip-listen-memory", + ("push_message", (allocated.get() - start) as i64, i64), + ); } Protocol::PruneMessage(from, data) => { + let start = allocated.get(); if data.verify() { inc_new_counter_debug!("cluster_info-prune_message", 1); inc_new_counter_debug!( @@ -1257,19 +1303,31 @@ impl ClusterInfo { } else { inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1); } + datapoint_debug!( + "solana-gossip-listen-memory", + ("prune_message", (allocated.get() - start) as i64, i64), + ); } _ => { - let _ignore_disconnect = response_sender - .send(Self::handle_repair(me, &from_addr, blocktree, request)); + let rsp = Self::handle_repair(me, recycler, &from_addr, blocktree, request); + if let Some(rsp) = rsp { + let _ignore_disconnect = response_sender.send(rsp); + } } }) }); // process the collected pulls together - let _ignore_disconnect = - response_sender.send(Self::handle_pull_requests(me, gossip_pull_data)); + let rsp = Self::handle_pull_requests(me, recycler, gossip_pull_data); + if let Some(rsp) = rsp { + let _ignore_disconnect = response_sender.send(rsp); + } } - fn handle_pull_requests(me: &Arc>, requests: Vec) -> Packets { + fn handle_pull_requests( + me: &Arc>, + recycler: &PacketsRecycler, + requests: Vec, + ) -> Option { // split the requests into addrs and filters let mut caller_and_filters = vec![]; let mut addrs = vec![]; @@ -1284,7 +1342,7 @@ impl ClusterInfo { .unwrap() .gossip .process_pull_requests(caller_and_filters, now); - let mut packets = Packets::default(); + let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); pull_responses .into_iter() .zip(addrs.into_iter()) @@ -1304,7 +1362,10 @@ impl ClusterInfo { .push(Packet::from_data(&from_addr, protocol)) }) }); - packets + if packets.is_empty() { + return None; + } + Some(packets) } fn handle_pull_response(me: &Arc>, from: &Pubkey, data: Vec) { @@ -1324,10 +1385,11 @@ impl ClusterInfo { fn handle_push_message( me: &Arc>, + recycler: &PacketsRecycler, from: &Pubkey, data: Vec, stakes: &HashMap, - ) -> Packets { + ) -> Option { let self_id = me.read().unwrap().gossip.id; inc_new_counter_debug!("cluster_info-push_message", 1); @@ -1362,8 +1424,10 @@ impl ClusterInfo { }) }) .collect(); - let mut packets = to_packets_with_destination(&rsp); - + if rsp.is_empty() { + return None; + } + let mut packets = to_packets_with_destination(recycler.clone(), &rsp); if !packets.is_empty() { let pushes: Vec<_> = me.write().unwrap().new_push_requests(); inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len()); @@ -1371,9 +1435,9 @@ impl ClusterInfo { let p = Packet::from_data(&remote_gossip_addr, &req); packets.packets.push(p); }); - packets + Some(packets) } else { - Packets::default() + None } } @@ -1388,10 +1452,11 @@ impl ClusterInfo { fn handle_repair( me: &Arc>, + recycler: &PacketsRecycler, from_addr: &SocketAddr, blocktree: Option<&Arc>, request: Protocol, - ) -> Packets { + ) -> Option { let now = Instant::now(); //TODO this doesn't depend on cluster_info module, could be moved @@ -1406,7 +1471,7 @@ impl ClusterInfo { self_id, from.id, ); inc_new_counter_debug!("cluster_info-handle-repair--eq", 1); - return Packets::default(); + return None; } me.write() @@ -1422,6 +1487,7 @@ impl ClusterInfo { inc_new_counter_debug!("cluster_info-request-window-index", 1); ( Self::run_window_request( + recycler, from, &from_addr, blocktree, @@ -1437,6 +1503,7 @@ impl ClusterInfo { inc_new_counter_debug!("cluster_info-request-highest-window-index", 1); ( Self::run_highest_window_request( + recycler, &from_addr, blocktree, *slot, @@ -1448,7 +1515,13 @@ impl ClusterInfo { Protocol::RequestOrphan(_, slot) => { inc_new_counter_debug!("cluster_info-request-orphan", 1); ( - Self::run_orphan(&from_addr, blocktree, *slot, MAX_ORPHAN_REPAIR_RESPONSES), + Self::run_orphan( + recycler, + &from_addr, + blocktree, + *slot, + MAX_ORPHAN_REPAIR_RESPONSES, + ), "RequestOrphan", ) } @@ -1464,6 +1537,7 @@ impl ClusterInfo { /// Process messages from the network fn run_listen( obj: &Arc>, + recycler: &PacketsRecycler, blocktree: Option<&Arc>, bank_forks: Option<&Arc>>, requests_receiver: &PacketReceiver, @@ -1472,7 +1546,6 @@ impl ClusterInfo { //TODO cache connections let timeout = Duration::new(1, 0); let reqs = requests_receiver.recv_timeout(timeout)?; - let stakes: HashMap<_, _> = match bank_forks { Some(ref bank_forks) => { staking_utils::staked_nodes(&bank_forks.read().unwrap().working_bank()) @@ -1480,7 +1553,7 @@ impl ClusterInfo { None => HashMap::new(), }; - Self::handle_packets(obj, blocktree, &stakes, reqs, response_sender); + Self::handle_packets(obj, &recycler, blocktree, &stakes, reqs, response_sender); Ok(()) } pub fn listen( @@ -1492,11 +1565,13 @@ impl ClusterInfo { exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); + let recycler = PacketsRecycler::default(); Builder::new() .name("solana-listen".to_string()) .spawn(move || loop { let e = Self::run_listen( &me, + &recycler, blocktree.as_ref(), bank_forks.as_ref(), &requests_receiver, @@ -1513,6 +1588,7 @@ impl ClusterInfo { me.gossip.crds.table.len() ); } + thread_mem_usage::datapoint("solana-listen"); }) .unwrap() } @@ -1961,6 +2037,7 @@ mod tests { /// test window requests respond with the right shred, and do not overrun #[test] fn run_window_request() { + let recycler = PacketsRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { @@ -1979,6 +2056,7 @@ mod tests { 0, ); let rv = ClusterInfo::run_window_request( + &recycler, &me, &socketaddr_any!(), Some(&blocktree), @@ -1986,7 +2064,7 @@ mod tests { 0, 0, ); - assert!(rv.is_empty()); + assert!(rv.is_none()); let mut common_header = ShredCommonHeader::default(); common_header.slot = 2; common_header.index = 1; @@ -2003,6 +2081,7 @@ mod tests { .expect("Expect successful ledger write"); let rv = ClusterInfo::run_window_request( + &recycler, &me, &socketaddr_any!(), Some(&blocktree), @@ -2010,8 +2089,9 @@ mod tests { 2, 1, ); - assert!(!rv.is_empty()); + assert!(!rv.is_none()); let rv: Vec = rv + .expect("packets") .packets .into_iter() .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) @@ -2026,13 +2106,19 @@ mod tests { /// test run_window_requestwindow requests respond with the right shred, and do not overrun #[test] fn run_highest_window_request() { + let recycler = PacketsRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); { let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); - let rv = - ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 0, 0); - assert!(rv.is_empty()); + let rv = ClusterInfo::run_highest_window_request( + &recycler, + &socketaddr_any!(), + Some(&blocktree), + 0, + 0, + ); + assert!(rv.is_none()); let _ = fill_blocktree_slot_with_ticks( &blocktree, @@ -2042,9 +2128,15 @@ mod tests { Hash::default(), ); - let rv = - ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 2, 1); + let rv = ClusterInfo::run_highest_window_request( + &recycler, + &socketaddr_any!(), + Some(&blocktree), + 2, + 1, + ); let rv: Vec = rv + .expect("packets") .packets .into_iter() .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) @@ -2055,12 +2147,13 @@ mod tests { assert_eq!(rv[0].slot(), 2); let rv = ClusterInfo::run_highest_window_request( + &recycler, &socketaddr_any!(), Some(&blocktree), 2, index + 1, ); - assert!(rv.is_empty()); + assert!(rv.is_none()); } Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); @@ -2069,11 +2162,12 @@ mod tests { #[test] fn run_orphan() { solana_logger::setup(); + let recycler = PacketsRecycler::default(); let ledger_path = get_tmp_ledger_path!(); { let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); - let rv = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 2, 0); - assert!(rv.is_empty()); + let rv = ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blocktree), 2, 0); + assert!(rv.is_none()); // Create slots 1, 2, 3 with 5 shreds apiece let (shreds, _) = make_many_slot_entries(1, 3, 5); @@ -2083,16 +2177,18 @@ mod tests { .expect("Expect successful ledger write"); // We don't have slot 4, so we don't know how to service this requeset - let rv = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 4, 5); - assert!(rv.is_empty()); + let rv = ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blocktree), 4, 5); + assert!(rv.is_none()); // For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively // for this request - let rv: Vec<_> = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 3, 5) - .packets - .iter() - .map(|b| b.clone()) - .collect(); + let rv: Vec<_> = + ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blocktree), 3, 5) + .expect("run_orphan packets") + .packets + .iter() + .map(|b| b.clone()) + .collect(); let expected: Vec<_> = (1..=3) .rev() .map(|slot| { diff --git a/core/src/crds.rs b/core/src/crds.rs index bf107eb40..61a0bfb99 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -30,6 +30,7 @@ use indexmap::map::IndexMap; use solana_sdk::hash::{hash, Hash}; use solana_sdk::pubkey::Pubkey; use std::cmp; +use std::collections::HashMap; #[derive(Clone)] pub struct Crds { @@ -141,12 +142,25 @@ impl Crds { } } - /// find all the keys that are older or equal to min_ts - pub fn find_old_labels(&self, min_ts: u64) -> Vec { + /// Find all the keys that are older or equal to the timeout. + /// * timeouts - Pubkey specific timeouts with Pubkey::default() as the default timeout. + pub fn find_old_labels( + &self, + now: u64, + timeouts: &HashMap, + ) -> Vec { + let min_ts = *timeouts + .get(&Pubkey::default()) + .expect("must have default timeout"); self.table .iter() .filter_map(|(k, v)| { - if v.local_timestamp <= min_ts { + if now < v.local_timestamp + || (timeouts.get(&k.pubkey()).is_some() + && now - v.local_timestamp < timeouts[&k.pubkey()]) + { + None + } else if now - v.local_timestamp >= min_ts { Some(k) } else { None @@ -237,25 +251,69 @@ mod test { assert_eq!(crds.table[&val2.label()].insert_timestamp, 3); } #[test] - fn test_find_old_records() { + fn test_find_old_records_default() { let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); assert_eq!(crds.insert(val.clone(), 1), Ok(None)); - - assert!(crds.find_old_labels(0).is_empty()); - assert_eq!(crds.find_old_labels(1), vec![val.label()]); - assert_eq!(crds.find_old_labels(2), vec![val.label()]); + let mut set = HashMap::new(); + set.insert(Pubkey::default(), 0); + assert!(crds.find_old_labels(0, &set).is_empty()); + set.insert(Pubkey::default(), 1); + assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]); + set.insert(Pubkey::default(), 2); + assert_eq!(crds.find_old_labels(4, &set), vec![val.label()]); } #[test] - fn test_remove() { + fn test_remove_default() { let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); assert_matches!(crds.insert(val.clone(), 1), Ok(_)); - - assert_eq!(crds.find_old_labels(1), vec![val.label()]); + let mut set = HashMap::new(); + set.insert(Pubkey::default(), 1); + assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]); crds.remove(&val.label()); - assert!(crds.find_old_labels(1).is_empty()); + assert!(crds.find_old_labels(2, &set).is_empty()); } + #[test] + fn test_find_old_records_staked() { + let mut crds = Crds::default(); + let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); + assert_eq!(crds.insert(val.clone(), 1), Ok(None)); + let mut set = HashMap::new(); + //now < timestamp + set.insert(Pubkey::default(), 0); + set.insert(val.pubkey(), 0); + assert!(crds.find_old_labels(0, &set).is_empty()); + + //pubkey shouldn't expire since its timeout is MAX + set.insert(val.pubkey(), std::u64::MAX); + assert!(crds.find_old_labels(2, &set).is_empty()); + + //default has max timeout, but pubkey should still expire + set.insert(Pubkey::default(), std::u64::MAX); + set.insert(val.pubkey(), 1); + assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]); + + set.insert(val.pubkey(), 2); + assert!(crds.find_old_labels(2, &set).is_empty()); + assert_eq!(crds.find_old_labels(3, &set), vec![val.label()]); + } + + #[test] + fn test_remove_staked() { + let mut crds = Crds::default(); + let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); + assert_matches!(crds.insert(val.clone(), 1), Ok(_)); + let mut set = HashMap::new(); + + //default has max timeout, but pubkey should still expire + set.insert(Pubkey::default(), std::u64::MAX); + set.insert(val.pubkey(), 1); + assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]); + crds.remove(&val.label()); + assert!(crds.find_old_labels(2, &set).is_empty()); + } + #[test] fn test_equal() { let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 54e4f1bb7..a75a6f929 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -162,7 +162,21 @@ impl CrdsGossip { self.pull .process_pull_response(&mut self.crds, from, response, now) } - pub fn purge(&mut self, now: u64) { + + pub fn make_timeouts_test(&self) -> HashMap { + self.make_timeouts(&HashMap::new(), self.pull.crds_timeout) + } + + pub fn make_timeouts( + &self, + stakes: &HashMap, + epoch_ms: u64, + ) -> HashMap { + self.pull.make_timeouts(&self.id, stakes, epoch_ms) + } + + pub fn purge(&mut self, now: u64, timeouts: &HashMap) -> usize { + let mut rv = 0; if now > self.push.msg_timeout { let min = now - self.push.msg_timeout; self.push.purge_old_pending_push_messages(&self.crds, min); @@ -172,13 +186,17 @@ impl CrdsGossip { self.push.purge_old_received_cache(min); } if now > self.pull.crds_timeout { - let min = now - self.pull.crds_timeout; - self.pull.purge_active(&mut self.crds, &self.id, min); + //sanity check + let min = self.pull.crds_timeout; + assert_eq!(timeouts[&self.id], std::u64::MAX); + assert_eq!(timeouts[&Pubkey::default()], min); + rv = self.pull.purge_active(&mut self.crds, now, &timeouts); } if now > 5 * self.pull.crds_timeout { let min = now - 5 * self.pull.crds_timeout; self.pull.purge_purged(min); } + rv } } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 31e19d893..487ab371f 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -263,13 +263,39 @@ impl CrdsGossipPull { } ret } + pub fn make_timeouts_def( + &self, + self_id: &Pubkey, + stakes: &HashMap, + epoch_ms: u64, + min_ts: u64, + ) -> HashMap { + let mut timeouts: HashMap = stakes.keys().map(|s| (*s, epoch_ms)).collect(); + timeouts.insert(*self_id, std::u64::MAX); + timeouts.insert(Pubkey::default(), min_ts); + timeouts + } + + pub fn make_timeouts( + &self, + self_id: &Pubkey, + stakes: &HashMap, + epoch_ms: u64, + ) -> HashMap { + self.make_timeouts_def(self_id, stakes, epoch_ms, self.crds_timeout) + } + /// Purge values from the crds that are older then `active_timeout` /// The value_hash of an active item is put into self.purged_values queue - pub fn purge_active(&mut self, crds: &mut Crds, self_id: &Pubkey, min_ts: u64) { - let old = crds.find_old_labels(min_ts); + pub fn purge_active( + &mut self, + crds: &mut Crds, + now: u64, + timeouts: &HashMap, + ) -> usize { + let old = crds.find_old_labels(now, timeouts); let mut purged: VecDeque<_> = old .iter() - .filter(|label| label.pubkey() != *self_id) .filter_map(|label| { let rv = crds .lookup_versioned(label) @@ -278,7 +304,9 @@ impl CrdsGossipPull { rv }) .collect(); + let ret = purged.len(); self.purged_values.append(&mut purged); + ret } /// Purge values from the `self.purged_values` queue that are older then purge_timeout pub fn purge_purged(&mut self, min_ts: u64) { @@ -551,7 +579,8 @@ mod test { assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label); // purge - node.purge_active(&mut node_crds, &node_pubkey, 1); + let timeouts = node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1); + node.purge_active(&mut node_crds, 2, &timeouts); //verify self is still valid after purge assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index e1e88abcd..8dd495c1d 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -5,6 +5,7 @@ use crate::packet::PacketsRecycler; use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; use crate::streamer::{self, PacketReceiver, PacketSender}; +use crate::thread_mem_usage; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info}; use solana_perf::recycler::Recycler; use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; @@ -121,6 +122,7 @@ impl FetchStage { let fwd_thread_hdl = Builder::new() .name("solana-fetch-stage-fwd-rcvr".to_string()) .spawn(move || loop { + thread_mem_usage::datapoint("solana-fetch-stage-fwd-rcvr"); if let Err(e) = Self::handle_forwarded_packets(&forward_receiver, &sender, &poh_recorder) { diff --git a/core/src/lib.rs b/core/src/lib.rs index d94bd5924..78f90e3b3 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -12,6 +12,7 @@ pub mod chacha_cuda; pub mod cluster_info_vote_listener; pub mod commitment; pub mod shred_fetch_stage; +pub mod thread_mem_usage; #[macro_use] pub mod contact_info; pub mod archiver; @@ -82,3 +83,8 @@ extern crate solana_metrics; #[cfg(test)] #[macro_use] extern crate matches; + +extern crate jemallocator; + +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 51b83e3c3..135584b90 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -8,6 +8,8 @@ use crate::consensus::{StakeLockout, Tower}; use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; use crate::rpc_subscriptions::RpcSubscriptions; +use crate::thread_mem_usage; +use jemalloc_ctl::thread::allocatedp; use solana_ledger::{ bank_forks::BankForks, block_error::BlockError, @@ -86,6 +88,7 @@ struct ForkStats { block_height: u64, has_voted: bool, is_recent: bool, + is_empty: bool, vote_threshold: bool, is_locked_out: bool, stake_lockouts: HashMap, @@ -212,20 +215,30 @@ impl ReplayStage { let mut last_reset = Hash::default(); let mut partition = false; loop { + let allocated = allocatedp::mib().unwrap(); + let allocated = allocated.read().unwrap(); + + thread_mem_usage::datapoint("solana-replay-stage"); let now = Instant::now(); // Stop getting entries if we get exit signal if exit_.load(Ordering::Relaxed) { break; } + let start = allocated.get(); Self::generate_new_bank_forks( &blocktree, &mut bank_forks.write().unwrap(), &leader_schedule_cache, ); + datapoint_debug!( + "replay_stage-memory", + ("generate_new_bank_forks", (allocated.get() - start) as i64, i64), + ); let mut tpu_has_bank = poh_recorder.lock().unwrap().has_bank(); + let start = allocated.get(); let did_complete_bank = Self::replay_active_banks( &blocktree, &bank_forks, @@ -233,14 +246,23 @@ impl ReplayStage { &mut progress, &slot_full_senders, ); + datapoint_debug!( + "replay_stage-memory", + ("replay_active_banks", (allocated.get() - start) as i64, i64), + ); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); loop { - let (vote_bank, heaviest) = - Self::select_fork(&ancestors, &bank_forks, &tower, &mut progress); + let start = allocated.get(); + let (vote_bank, heaviest) = Self::select_fork(&ancestors, &bank_forks, &tower, &mut progress); + datapoint_debug!( + "replay_stage-memory", + ("select_fork", (allocated.get() - start) as i64, i64), + ); let done = vote_bank.is_none(); let mut vote_bank_slot = 0; let reset_bank = vote_bank.as_ref().map(|b| b.0.clone()).or(heaviest); + let start = allocated.get(); if let Some((bank, total_staked)) = vote_bank { info!("voting: {}", bank.slot()); subscriptions.notify_subscribers(bank.slot(), &bank_forks); @@ -271,6 +293,11 @@ impl ReplayStage { &snapshot_package_sender, )?; } + datapoint_debug!( + "replay_stage-memory", + ("votable_bank", (allocated.get() - start) as i64, i64), + ); + let start = allocated.get(); if let Some(bank) = reset_bank { if last_reset != bank.last_blockhash() { Self::reset_poh_recorder( @@ -286,6 +313,7 @@ impl ReplayStage { if !partition && vote_bank_slot != bank.slot() { warn!("PARTITION DETECTED waiting to join fork: {} last vote: {:?}", bank.slot(), tower.last_vote()); inc_new_counter_info!("replay_stage-partition_detected", 1); + datapoint_info!("replay_stage-partition", ("slot", bank.slot() as i64, i64)); partition = true; } else if partition && vote_bank_slot == bank.slot() { warn!("PARTITION resolved fork: {} last vote: {:?}", bank.slot(), tower.last_vote()); @@ -294,11 +322,16 @@ impl ReplayStage { } } } + datapoint_debug!( + "replay_stage-memory", + ("reset_bank", (allocated.get() - start) as i64, i64), + ); if done { break; } } + let start = allocated.get(); if !tpu_has_bank { Self::maybe_start_leader( &my_pubkey, @@ -316,11 +349,11 @@ impl ReplayStage { ); } } - - inc_new_counter_info!( - "replay_stage-duration", - duration_as_ms(&now.elapsed()) as usize - ); + datapoint_debug!( + "replay_stage-memory", + ("start_leader", (allocated.get() - start) as i64, i64), + ); + datapoint_debug!("replay_stage", ("duration", duration_as_ms(&now.elapsed()) as i64, i64)); if did_complete_bank { //just processed a bank, skip the signal; maybe there's more slots available continue; @@ -691,6 +724,7 @@ impl ReplayStage { progress: &mut HashMap, ) -> VoteAndPoHBank { let tower_start = Instant::now(); + let mut frozen_banks: Vec<_> = bank_forks .read() .unwrap() @@ -699,8 +733,14 @@ impl ReplayStage { .cloned() .collect(); frozen_banks.sort_by_key(|bank| bank.slot()); + let num_frozen_banks = frozen_banks.len(); trace!("frozen_banks {}", frozen_banks.len()); + let num_old_banks = frozen_banks + .iter() + .filter(|b| b.slot() < tower.root().unwrap_or(0)) + .count(); + let stats: Vec = frozen_banks .iter() .map(|bank| { @@ -733,7 +773,7 @@ impl ReplayStage { ); if !stats.computed { if !stats.vote_threshold { - info!("vote threshold check failed: {}", bank.slot()); + debug!("vote threshold check failed: {}", bank.slot()); } stats.computed = true; } @@ -747,6 +787,15 @@ impl ReplayStage { stats }) .collect(); + let num_not_recent = stats.iter().filter(|s| !s.is_recent).count(); + let num_has_voted = stats.iter().filter(|s| s.has_voted).count(); + let num_empty = stats.iter().filter(|s| s.is_empty).count(); + let num_threshold_failure = stats.iter().filter(|s| !s.vote_threshold).count(); + let num_votable_threshold_failure = stats + .iter() + .filter(|s| s.is_recent && !s.has_voted && !s.vote_threshold) + .count(); + let mut candidates: Vec<_> = frozen_banks .iter() .zip(stats.iter()) @@ -780,7 +829,21 @@ impl ReplayStage { weights, rv.0.is_some() ); - inc_new_counter_info!("replay_stage-tower_duration", ms as usize); + datapoint_debug!( + "replay_stage-select_fork", + ("frozen_banks", num_frozen_banks as i64, i64), + ("not_recent", num_not_recent as i64, i64), + ("has_voted", num_has_voted as i64, i64), + ("old_banks", num_old_banks as i64, i64), + ("empty_banks", num_empty as i64, i64), + ("threshold_failure", num_threshold_failure as i64, i64), + ( + "votable_threshold_failure", + num_votable_threshold_failure as i64, + i64 + ), + ("tower_duration", ms as i64, i64), + ); rv } diff --git a/core/src/streamer.rs b/core/src/streamer.rs index 579b90dc2..119603b98 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -4,6 +4,7 @@ use crate::packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH}; use crate::recvmmsg::NUM_RCVMMSGS; use crate::result::{Error, Result}; +use crate::thread_mem_usage; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -75,6 +76,7 @@ pub fn receiver( Builder::new() .name("solana-receiver".to_string()) .spawn(move || { + thread_mem_usage::datapoint(name); let _ = recv_loop(&sock, exit, &packet_sender, &recycler.clone(), name); }) .unwrap() @@ -111,6 +113,7 @@ pub fn responder(name: &'static str, sock: Arc, r: PacketReceiver) -> Builder::new() .name(format!("solana-responder-{}", name)) .spawn(move || loop { + thread_mem_usage::datapoint(name); if let Err(e) = recv_send(&sock, &r) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, diff --git a/core/src/thread_mem_usage.rs b/core/src/thread_mem_usage.rs new file mode 100644 index 000000000..c1f57e6c1 --- /dev/null +++ b/core/src/thread_mem_usage.rs @@ -0,0 +1,9 @@ +use jemalloc_ctl::thread; +use solana_metrics::datapoint_debug; + +pub fn datapoint(name: &'static str) { + let allocated = thread::allocatedp::mib().unwrap(); + let allocated = allocated.read().unwrap(); + let mem = allocated.get(); + datapoint_debug!("thread-memory", (name, mem as i64, i64)); +} diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 0342c49b2..9f67d16ff 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -283,7 +283,8 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, let requests: Vec<_> = network_values .par_iter() .map(|node| { - node.lock().unwrap().purge(now); + let timeouts = node.lock().unwrap().make_timeouts_test(); + node.lock().unwrap().purge(now, &timeouts); node.lock().unwrap().new_push_messages(now) }) .collect(); diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index a0c592c3e..1fa3bd177 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -245,14 +245,27 @@ fn run_network_partition(partitions: &[usize]) { sleep(Duration::from_millis(timeout as u64)); } info!("PARTITION_TEST done sleeping until partition end timeout"); - info!("PARTITION_TEST spending on all ndoes"); - cluster_tests::spend_and_verify_all_nodes( - &cluster.entry_point_info, - &cluster.funding_keypair, - num_nodes, - HashSet::new(), - ); - info!("PARTITION_TEST done spending on all ndoes"); + info!("PARTITION_TEST discovering nodes"); + let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, num_nodes).unwrap(); + info!("PARTITION_TEST discovered {} nodes", cluster_nodes.len()); + info!("PARTITION_TEST looking for new roots on all nodes"); + let mut roots = vec![HashSet::new(); cluster_nodes.len()]; + let mut done = false; + while !done { + for (i, ingress_node) in cluster_nodes.iter().enumerate() { + let client = create_client( + ingress_node.client_facing_addr(), + solana_core::cluster_info::VALIDATOR_PORT_RANGE, + ); + let slot = client.get_slot().unwrap_or(0); + roots[i].insert(slot); + let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0); + info!("PARTITION_TEST min observed roots {}/16", min_node); + done = min_node >= 16; + } + sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2)); + } + info!("PARTITION_TEST done spending on all node"); } #[allow(unused_attributes)] diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 18daa1a6e..1981e452b 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -70,6 +70,15 @@ impl Packets { recycler: Some(recycler), } } + pub fn new_with_recycler_data( + recycler: &PacketsRecycler, + name: &'static str, + mut packets: Vec, + ) -> Self { + let mut vec = Self::new_with_recycler(recycler.clone(), packets.len(), name); + vec.packets.append(&mut packets); + vec + } pub fn set_addr(&mut self, addr: &SocketAddr) { for m in self.packets.iter_mut() { @@ -99,8 +108,15 @@ pub fn to_packets(xs: &[T]) -> Vec { to_packets_chunked(xs, NUM_PACKETS) } -pub fn to_packets_with_destination(dests_and_data: &[(SocketAddr, T)]) -> Packets { - let mut out = Packets::default(); +pub fn to_packets_with_destination( + recycler: PacketsRecycler, + dests_and_data: &[(SocketAddr, T)], +) -> Packets { + let mut out = Packets::new_with_recycler( + recycler, + dests_and_data.len(), + "to_packets_with_destination", + ); out.packets.resize(dests_and_data.len(), Packet::default()); for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) { if let Err(e) = Packet::populate_packet(o, Some(&dest_and_data.0), &dest_and_data.1) { diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index 420e12b1c..40e45a794 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex}; #[derive(Debug, Default)] struct RecyclerStats { total: AtomicUsize, + freed: AtomicUsize, reuse: AtomicUsize, max_gc: AtomicUsize, } @@ -85,9 +86,10 @@ impl Recycler { return x; } + let total = self.stats.total.fetch_add(1, Ordering::Relaxed); trace!( "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}", - self.stats.total.fetch_add(1, Ordering::Relaxed), + total, name, self.id, self.stats.reuse.load(Ordering::Relaxed), @@ -111,6 +113,16 @@ impl Recycler { .max_gc .compare_and_swap(max_gc, len, Ordering::Relaxed); } + let total = self.stats.total.load(Ordering::Relaxed); + let reuse = self.stats.reuse.load(Ordering::Relaxed); + let freed = self.stats.total.fetch_add(1, Ordering::Relaxed); + datapoint_debug!( + "recycler", + ("gc_len", len as i64, i64), + ("total", total as i64, i64), + ("freed", freed as i64, i64), + ("reuse", reuse as i64, i64), + ); } } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index b442f93ce..4f75315d0 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -561,6 +561,7 @@ impl AccountsDB { } pub fn purge_zero_lamport_accounts(&self, ancestors: &HashMap) { + self.report_store_stats(); let accounts_index = self.accounts_index.read().unwrap(); let mut purges = Vec::new(); accounts_index.scan_accounts(ancestors, |pubkey, (account_info, slot)| { @@ -841,6 +842,39 @@ impl AccountsDB { infos } + fn report_store_stats(&self) { + let mut total_count = 0; + let mut min = std::usize::MAX; + let mut min_slot = 0; + let mut max = 0; + let mut max_slot = 0; + let mut newest_slot = 0; + let mut oldest_slot = std::u64::MAX; + let stores = self.storage.read().unwrap(); + for (slot, slot_stores) in &stores.0 { + total_count += slot_stores.len(); + if slot_stores.len() < min { + min = slot_stores.len(); + min_slot = *slot; + } + + if slot_stores.len() > max { + max = slot_stores.len(); + max_slot = *slot; + } + if *slot > newest_slot { + newest_slot = *slot; + } + + if *slot < oldest_slot { + oldest_slot = *slot; + } + } + info!("accounts_db: total_stores: {} newest_slot: {} oldest_slot: {} max_slot: {} (num={}) min_slot: {} (num={})", + total_count, newest_slot, oldest_slot, max_slot, max, min_slot, min); + datapoint_info!("accounts_db-stores", ("total_count", total_count, i64)); + } + pub fn verify_hash_internal_state(&self, slot: Slot, ancestors: &HashMap) -> bool { let mut hash_state = BankHash::default(); let hashes: Vec<_> = self.scan_accounts( diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 49252a66a..8f98ca45c 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1385,6 +1385,7 @@ impl Bank { /// A snapshot bank should be purged of 0 lamport accounts which are not part of the hash /// calculation and could shield other real accounts. pub fn verify_snapshot_bank(&self) -> bool { + self.purge_zero_lamport_accounts(); self.rc .accounts .verify_hash_internal_state(self.slot(), &self.ancestors) diff --git a/sdk/src/clock.rs b/sdk/src/clock.rs index 8e80cf169..f3667f126 100644 --- a/sdk/src/clock.rs +++ b/sdk/src/clock.rs @@ -31,6 +31,8 @@ pub const DEFAULT_SLOTS_PER_TURN: u64 = 32 * 4; // leader schedule is governed by this pub const NUM_CONSECUTIVE_LEADER_SLOTS: u64 = 4; +pub const DEFAULT_MS_PER_SLOT: u64 = 1_000 * DEFAULT_TICKS_PER_SLOT / DEFAULT_TICKS_PER_SECOND; + /// The time window of recent block hash values that the bank will track the signatures /// of over. Once the bank discards a block hash, it will reject any transactions that use /// that `recent_blockhash` in a transaction. Lowering this value reduces memory consumption,