Use epoch as the gossip purge timeout for staked nodes. (#7005)
automerge
This commit is contained in:
parent
ba9aaee7cd
commit
b150da837a
|
@ -1436,6 +1436,35 @@ name = "itoa"
|
|||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "jemalloc-ctl"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"jemalloc-sys 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"paste 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jemalloc-sys"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"cc 1.0.47 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jemallocator"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"jemalloc-sys 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.16"
|
||||
|
@ -2069,6 +2098,26 @@ dependencies = [
|
|||
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "paste"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"paste-impl 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"proc-macro-hack 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "paste-impl"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"proc-macro-hack 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"proc-macro2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"syn 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pbkdf2"
|
||||
version = "0.3.0"
|
||||
|
@ -3294,6 +3343,8 @@ dependencies = [
|
|||
"hex-literal 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"indexmap 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"itertools 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"jemalloc-ctl 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"jemallocator 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"jsonrpc-core 14.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"jsonrpc-derive 14.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"jsonrpc-http-server 14.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -5536,6 +5587,9 @@ dependencies = [
|
|||
"checksum itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0d47946d458e94a1b7bcabbf6521ea7c037062c81f534615abcad76e84d4970d"
|
||||
"checksum itertools 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "87fa75c9dea7b07be3138c49abbb83fd4bea199b5cdc76f9804458edc5da0d6e"
|
||||
"checksum itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "501266b7edd0174f8530248f87f99c88fbe60ca4ef3dd486835b8d8d53136f7f"
|
||||
"checksum jemalloc-ctl 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c502a5ff9dd2924f1ed32ba96e3b65735d837b4bfd978d3161b1702e66aca4b7"
|
||||
"checksum jemalloc-sys 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0d3b9f3f5c9b31aa0f5ed3260385ac205db665baa41d49bb8338008ae94ede45"
|
||||
"checksum jemallocator 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "43ae63fcfc45e99ab3d1b29a46782ad679e98436c3169d15a167a1108a724b69"
|
||||
"checksum jobserver 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "f74e73053eaf95399bf926e48fc7a2a3ce50bd0eaaa2357d391e95b2dcdd4f10"
|
||||
"checksum js-sys 0.3.27 (registry+https://github.com/rust-lang/crates.io-index)" = "1efc4f2a556c58e79c5500912e221dd826bec64ff4aabd8ce71ccef6da02d7d4"
|
||||
"checksum jsonrpc-core 14.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "34651edf3417637cc45e70ed0182ecfa9ced0b7e8131805fccf7400d989845ca"
|
||||
|
@ -5605,6 +5659,8 @@ dependencies = [
|
|||
"checksum parking_lot_core 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "4db1a8ccf734a7bce794cc19b3df06ed87ab2f3907036b693c68f56b4d4537fa"
|
||||
"checksum parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94c8c7923936b28d546dfd14d4472eaf34c99b14e1c973a32b3e6d4eb04298c9"
|
||||
"checksum parking_lot_core 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b876b1b9e7ac6e1a74a6da34d25c42e17e8862aa409cbbbdcfc8d86c6f3bc62b"
|
||||
"checksum paste 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "423a519e1c6e828f1e73b720f9d9ed2fa643dce8a7737fb43235ce0b41eeaa49"
|
||||
"checksum paste-impl 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "4214c9e912ef61bf42b81ba9a47e8aad1b2ffaf739ab162bf96d1e011f54e6c5"
|
||||
"checksum pbkdf2 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "006c038a43a45995a9670da19e67600114740e8511d4333bf97a56e66a7542d9"
|
||||
"checksum peeking_take_while 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
|
||||
"checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831"
|
||||
|
|
|
@ -72,6 +72,8 @@ tokio-io = "0.1"
|
|||
untrusted = "0.7.0"
|
||||
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.21.0" }
|
||||
reed-solomon-erasure = { package = "solana-reed-solomon-erasure", version = "4.0.1-3", features = ["simd-accel"] }
|
||||
jemallocator = "0.3.2"
|
||||
jemalloc-ctl = "0.3.2"
|
||||
|
||||
[dev-dependencies]
|
||||
hex-literal = "0.2.1"
|
||||
|
|
|
@ -7,6 +7,7 @@ use crate::{
|
|||
poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry},
|
||||
poh_service::PohService,
|
||||
result::{Error, Result},
|
||||
thread_mem_usage,
|
||||
};
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
|
||||
use itertools::Itertools;
|
||||
|
@ -110,6 +111,7 @@ impl BankingStage {
|
|||
Builder::new()
|
||||
.name("solana-banking-stage-tx".to_string())
|
||||
.spawn(move || {
|
||||
thread_mem_usage::datapoint("solana-banking-stage-tx");
|
||||
Self::process_loop(
|
||||
my_pubkey,
|
||||
&verified_receiver,
|
||||
|
|
|
@ -24,11 +24,13 @@ use crate::{
|
|||
repair_service::RepairType,
|
||||
result::{Error, Result},
|
||||
sendmmsg::{multicast, send_mmsg},
|
||||
thread_mem_usage,
|
||||
weighted_shuffle::{weighted_best, weighted_shuffle},
|
||||
};
|
||||
use bincode::{serialize, serialized_size};
|
||||
use core::cmp;
|
||||
use itertools::Itertools;
|
||||
use jemalloc_ctl::thread::allocatedp;
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_ledger::{bank_forks::BankForks, blocktree::Blocktree, staking_utils};
|
||||
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
|
||||
|
@ -36,9 +38,9 @@ use solana_net_utils::{
|
|||
bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range,
|
||||
multi_bind_in_range, PortRange,
|
||||
};
|
||||
use solana_perf::packet::{to_packets_with_destination, Packets};
|
||||
use solana_perf::packet::{to_packets_with_destination, Packets, PacketsRecycler};
|
||||
use solana_sdk::{
|
||||
clock::Slot,
|
||||
clock::{Slot, DEFAULT_MS_PER_SLOT},
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, KeypairUtil, Signable, Signature},
|
||||
timing::{duration_as_ms, timestamp},
|
||||
|
@ -362,6 +364,7 @@ impl ClusterInfo {
|
|||
.collect();
|
||||
let max_ts = votes.iter().map(|x| x.0).max().unwrap_or(since);
|
||||
let txs: Vec<Transaction> = votes.into_iter().map(|x| x.1).collect();
|
||||
inc_new_counter_info!("cluster_info-get_votes-count", txs.len());
|
||||
(txs, max_ts)
|
||||
}
|
||||
|
||||
|
@ -403,10 +406,6 @@ impl ClusterInfo {
|
|||
.map(|x| x.value.contact_info().unwrap())
|
||||
}
|
||||
|
||||
pub fn purge(&mut self, now: u64) {
|
||||
self.gossip.purge(now);
|
||||
}
|
||||
|
||||
pub fn rpc_peers(&self) -> Vec<ContactInfo> {
|
||||
let me = self.my_data().id;
|
||||
self.gossip
|
||||
|
@ -1010,12 +1009,15 @@ impl ClusterInfo {
|
|||
/// At random pick a node and try to get updated changes from them
|
||||
fn run_gossip(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
sender: &PacketSender,
|
||||
) -> Result<()> {
|
||||
let reqs = obj.write().unwrap().gossip_request(&stakes);
|
||||
let packets = to_packets_with_destination(&reqs);
|
||||
sender.send(packets)?;
|
||||
if !reqs.is_empty() {
|
||||
let packets = to_packets_with_destination(recycler.clone(), &reqs);
|
||||
sender.send(packets)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1032,8 +1034,10 @@ impl ClusterInfo {
|
|||
.spawn(move || {
|
||||
let mut last_push = timestamp();
|
||||
let mut last_contact_info_trace = timestamp();
|
||||
let recycler = PacketsRecycler::default();
|
||||
loop {
|
||||
let start = timestamp();
|
||||
thread_mem_usage::datapoint("solana-gossip");
|
||||
if start - last_contact_info_trace > 10000 {
|
||||
// Log contact info every 10 seconds
|
||||
info!("\n{}", obj.read().unwrap().contact_info_trace());
|
||||
|
@ -1046,11 +1050,30 @@ impl ClusterInfo {
|
|||
}
|
||||
None => HashMap::new(),
|
||||
};
|
||||
let _ = Self::run_gossip(&obj, &stakes, &sender);
|
||||
let _ = Self::run_gossip(&obj, &recycler, &stakes, &sender);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
obj.write().unwrap().purge(timestamp());
|
||||
let timeout = {
|
||||
if let Some(ref bank_forks) = bank_forks {
|
||||
let bank = bank_forks.read().unwrap().working_bank();
|
||||
let epoch = bank.epoch();
|
||||
let epoch_schedule = bank.epoch_schedule();
|
||||
epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT
|
||||
} else {
|
||||
inc_new_counter_info!("cluster_info-purge-no_working_bank", 1);
|
||||
CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
|
||||
}
|
||||
};
|
||||
let timeouts = obj.read().unwrap().gossip.make_timeouts(&stakes, timeout);
|
||||
let num_purged = obj.write().unwrap().gossip.purge(timestamp(), &timeouts);
|
||||
inc_new_counter_info!("cluster_info-purge-count", num_purged);
|
||||
let table_size = obj.read().unwrap().gossip.crds.table.len();
|
||||
datapoint_debug!(
|
||||
"cluster_info-purge",
|
||||
("tabel_size", table_size as i64, i64),
|
||||
("purge_stake_timeout", timeout as i64, i64)
|
||||
);
|
||||
//TODO: possibly tune this parameter
|
||||
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
|
||||
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
|
||||
|
@ -1084,20 +1107,25 @@ impl ClusterInfo {
|
|||
}
|
||||
|
||||
fn run_window_request(
|
||||
recycler: &PacketsRecycler,
|
||||
from: &ContactInfo,
|
||||
from_addr: &SocketAddr,
|
||||
blocktree: Option<&Arc<Blocktree>>,
|
||||
me: &ContactInfo,
|
||||
slot: Slot,
|
||||
shred_index: u64,
|
||||
) -> Packets {
|
||||
) -> Option<Packets> {
|
||||
if let Some(blocktree) = blocktree {
|
||||
// Try to find the requested index in one of the slots
|
||||
let packet = Self::get_data_shred_as_packet(blocktree, slot, shred_index, from_addr);
|
||||
|
||||
if let Ok(Some(packet)) = packet {
|
||||
inc_new_counter_debug!("cluster_info-window-request-ledger", 1);
|
||||
return Packets::new(vec![packet]);
|
||||
return Some(Packets::new_with_recycler_data(
|
||||
recycler,
|
||||
"run_window_request",
|
||||
vec![packet],
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1110,46 +1138,41 @@ impl ClusterInfo {
|
|||
shred_index,
|
||||
);
|
||||
|
||||
Packets::default()
|
||||
None
|
||||
}
|
||||
|
||||
fn run_highest_window_request(
|
||||
recycler: &PacketsRecycler,
|
||||
from_addr: &SocketAddr,
|
||||
blocktree: Option<&Arc<Blocktree>>,
|
||||
slot: Slot,
|
||||
highest_index: u64,
|
||||
) -> Packets {
|
||||
if let Some(blocktree) = blocktree {
|
||||
// Try to find the requested index in one of the slots
|
||||
let meta = blocktree.meta(slot);
|
||||
|
||||
if let Ok(Some(meta)) = meta {
|
||||
if meta.received > highest_index {
|
||||
// meta.received must be at least 1 by this point
|
||||
let packet = Self::get_data_shred_as_packet(
|
||||
blocktree,
|
||||
slot,
|
||||
meta.received - 1,
|
||||
from_addr,
|
||||
);
|
||||
|
||||
if let Ok(Some(packet)) = packet {
|
||||
return Packets::new(vec![packet]);
|
||||
}
|
||||
}
|
||||
}
|
||||
) -> Option<Packets> {
|
||||
let blocktree = blocktree?;
|
||||
// Try to find the requested index in one of the slots
|
||||
let meta = blocktree.meta(slot).ok()??;
|
||||
if meta.received > highest_index {
|
||||
// meta.received must be at least 1 by this point
|
||||
let packet =
|
||||
Self::get_data_shred_as_packet(blocktree, slot, meta.received - 1, from_addr)
|
||||
.ok()??;
|
||||
return Some(Packets::new_with_recycler_data(
|
||||
recycler,
|
||||
"run_highest_window_request",
|
||||
vec![packet],
|
||||
));
|
||||
}
|
||||
|
||||
Packets::default()
|
||||
None
|
||||
}
|
||||
|
||||
fn run_orphan(
|
||||
recycler: &PacketsRecycler,
|
||||
from_addr: &SocketAddr,
|
||||
blocktree: Option<&Arc<Blocktree>>,
|
||||
mut slot: Slot,
|
||||
max_responses: usize,
|
||||
) -> Packets {
|
||||
let mut res = Packets::default();
|
||||
) -> Option<Packets> {
|
||||
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
|
||||
if let Some(blocktree) = blocktree {
|
||||
// Try to find the next "n" parent slots of the input slot
|
||||
while let Ok(Some(meta)) = blocktree.meta(slot) {
|
||||
|
@ -1168,18 +1191,23 @@ impl ClusterInfo {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
if res.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(res)
|
||||
}
|
||||
|
||||
fn handle_packets(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
blocktree: Option<&Arc<Blocktree>>,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
packets: Packets,
|
||||
response_sender: &PacketSender,
|
||||
) {
|
||||
// iter over the packets, collect pulls separately and process everything else
|
||||
let allocated = allocatedp::mib().unwrap();
|
||||
let allocated = allocated.read().unwrap();
|
||||
let mut gossip_pull_data: Vec<PullData> = vec![];
|
||||
packets.packets.iter().for_each(|packet| {
|
||||
let from_addr = packet.meta.addr();
|
||||
|
@ -1187,6 +1215,7 @@ impl ClusterInfo {
|
|||
.into_iter()
|
||||
.for_each(|request| match request {
|
||||
Protocol::PullRequest(filter, caller) => {
|
||||
let start = allocated.get();
|
||||
if !caller.verify() {
|
||||
inc_new_counter_error!(
|
||||
"cluster_info-gossip_pull_request_verify_fail",
|
||||
|
@ -1204,8 +1233,13 @@ impl ClusterInfo {
|
|||
});
|
||||
}
|
||||
}
|
||||
datapoint_debug!(
|
||||
"solana-gossip-listen-memory",
|
||||
("pull_request", (allocated.get() - start) as i64, i64),
|
||||
);
|
||||
}
|
||||
Protocol::PullResponse(from, mut data) => {
|
||||
let start = allocated.get();
|
||||
data.retain(|v| {
|
||||
let ret = v.verify();
|
||||
if !ret {
|
||||
|
@ -1217,8 +1251,13 @@ impl ClusterInfo {
|
|||
ret
|
||||
});
|
||||
Self::handle_pull_response(me, &from, data);
|
||||
datapoint_debug!(
|
||||
"solana-gossip-listen-memory",
|
||||
("pull_response", (allocated.get() - start) as i64, i64),
|
||||
);
|
||||
}
|
||||
Protocol::PushMessage(from, mut data) => {
|
||||
let start = allocated.get();
|
||||
data.retain(|v| {
|
||||
let ret = v.verify();
|
||||
if !ret {
|
||||
|
@ -1229,10 +1268,17 @@ impl ClusterInfo {
|
|||
}
|
||||
ret
|
||||
});
|
||||
let _ignore_disconnect = response_sender
|
||||
.send(Self::handle_push_message(me, &from, data, stakes));
|
||||
let rsp = Self::handle_push_message(me, recycler, &from, data, stakes);
|
||||
if let Some(rsp) = rsp {
|
||||
let _ignore_disconnect = response_sender.send(rsp);
|
||||
}
|
||||
datapoint_debug!(
|
||||
"solana-gossip-listen-memory",
|
||||
("push_message", (allocated.get() - start) as i64, i64),
|
||||
);
|
||||
}
|
||||
Protocol::PruneMessage(from, data) => {
|
||||
let start = allocated.get();
|
||||
if data.verify() {
|
||||
inc_new_counter_debug!("cluster_info-prune_message", 1);
|
||||
inc_new_counter_debug!(
|
||||
|
@ -1257,19 +1303,31 @@ impl ClusterInfo {
|
|||
} else {
|
||||
inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1);
|
||||
}
|
||||
datapoint_debug!(
|
||||
"solana-gossip-listen-memory",
|
||||
("prune_message", (allocated.get() - start) as i64, i64),
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
let _ignore_disconnect = response_sender
|
||||
.send(Self::handle_repair(me, &from_addr, blocktree, request));
|
||||
let rsp = Self::handle_repair(me, recycler, &from_addr, blocktree, request);
|
||||
if let Some(rsp) = rsp {
|
||||
let _ignore_disconnect = response_sender.send(rsp);
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
// process the collected pulls together
|
||||
let _ignore_disconnect =
|
||||
response_sender.send(Self::handle_pull_requests(me, gossip_pull_data));
|
||||
let rsp = Self::handle_pull_requests(me, recycler, gossip_pull_data);
|
||||
if let Some(rsp) = rsp {
|
||||
let _ignore_disconnect = response_sender.send(rsp);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_pull_requests(me: &Arc<RwLock<Self>>, requests: Vec<PullData>) -> Packets {
|
||||
fn handle_pull_requests(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
requests: Vec<PullData>,
|
||||
) -> Option<Packets> {
|
||||
// split the requests into addrs and filters
|
||||
let mut caller_and_filters = vec![];
|
||||
let mut addrs = vec![];
|
||||
|
@ -1284,7 +1342,7 @@ impl ClusterInfo {
|
|||
.unwrap()
|
||||
.gossip
|
||||
.process_pull_requests(caller_and_filters, now);
|
||||
let mut packets = Packets::default();
|
||||
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
|
||||
pull_responses
|
||||
.into_iter()
|
||||
.zip(addrs.into_iter())
|
||||
|
@ -1304,7 +1362,10 @@ impl ClusterInfo {
|
|||
.push(Packet::from_data(&from_addr, protocol))
|
||||
})
|
||||
});
|
||||
packets
|
||||
if packets.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(packets)
|
||||
}
|
||||
|
||||
fn handle_pull_response(me: &Arc<RwLock<Self>>, from: &Pubkey, data: Vec<CrdsValue>) {
|
||||
|
@ -1324,10 +1385,11 @@ impl ClusterInfo {
|
|||
|
||||
fn handle_push_message(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
from: &Pubkey,
|
||||
data: Vec<CrdsValue>,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
) -> Packets {
|
||||
) -> Option<Packets> {
|
||||
let self_id = me.read().unwrap().gossip.id;
|
||||
inc_new_counter_debug!("cluster_info-push_message", 1);
|
||||
|
||||
|
@ -1362,8 +1424,10 @@ impl ClusterInfo {
|
|||
})
|
||||
})
|
||||
.collect();
|
||||
let mut packets = to_packets_with_destination(&rsp);
|
||||
|
||||
if rsp.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let mut packets = to_packets_with_destination(recycler.clone(), &rsp);
|
||||
if !packets.is_empty() {
|
||||
let pushes: Vec<_> = me.write().unwrap().new_push_requests();
|
||||
inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len());
|
||||
|
@ -1371,9 +1435,9 @@ impl ClusterInfo {
|
|||
let p = Packet::from_data(&remote_gossip_addr, &req);
|
||||
packets.packets.push(p);
|
||||
});
|
||||
packets
|
||||
Some(packets)
|
||||
} else {
|
||||
Packets::default()
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1388,10 +1452,11 @@ impl ClusterInfo {
|
|||
|
||||
fn handle_repair(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
from_addr: &SocketAddr,
|
||||
blocktree: Option<&Arc<Blocktree>>,
|
||||
request: Protocol,
|
||||
) -> Packets {
|
||||
) -> Option<Packets> {
|
||||
let now = Instant::now();
|
||||
|
||||
//TODO this doesn't depend on cluster_info module, could be moved
|
||||
|
@ -1406,7 +1471,7 @@ impl ClusterInfo {
|
|||
self_id, from.id,
|
||||
);
|
||||
inc_new_counter_debug!("cluster_info-handle-repair--eq", 1);
|
||||
return Packets::default();
|
||||
return None;
|
||||
}
|
||||
|
||||
me.write()
|
||||
|
@ -1422,6 +1487,7 @@ impl ClusterInfo {
|
|||
inc_new_counter_debug!("cluster_info-request-window-index", 1);
|
||||
(
|
||||
Self::run_window_request(
|
||||
recycler,
|
||||
from,
|
||||
&from_addr,
|
||||
blocktree,
|
||||
|
@ -1437,6 +1503,7 @@ impl ClusterInfo {
|
|||
inc_new_counter_debug!("cluster_info-request-highest-window-index", 1);
|
||||
(
|
||||
Self::run_highest_window_request(
|
||||
recycler,
|
||||
&from_addr,
|
||||
blocktree,
|
||||
*slot,
|
||||
|
@ -1448,7 +1515,13 @@ impl ClusterInfo {
|
|||
Protocol::RequestOrphan(_, slot) => {
|
||||
inc_new_counter_debug!("cluster_info-request-orphan", 1);
|
||||
(
|
||||
Self::run_orphan(&from_addr, blocktree, *slot, MAX_ORPHAN_REPAIR_RESPONSES),
|
||||
Self::run_orphan(
|
||||
recycler,
|
||||
&from_addr,
|
||||
blocktree,
|
||||
*slot,
|
||||
MAX_ORPHAN_REPAIR_RESPONSES,
|
||||
),
|
||||
"RequestOrphan",
|
||||
)
|
||||
}
|
||||
|
@ -1464,6 +1537,7 @@ impl ClusterInfo {
|
|||
/// Process messages from the network
|
||||
fn run_listen(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
blocktree: Option<&Arc<Blocktree>>,
|
||||
bank_forks: Option<&Arc<RwLock<BankForks>>>,
|
||||
requests_receiver: &PacketReceiver,
|
||||
|
@ -1472,7 +1546,6 @@ impl ClusterInfo {
|
|||
//TODO cache connections
|
||||
let timeout = Duration::new(1, 0);
|
||||
let reqs = requests_receiver.recv_timeout(timeout)?;
|
||||
|
||||
let stakes: HashMap<_, _> = match bank_forks {
|
||||
Some(ref bank_forks) => {
|
||||
staking_utils::staked_nodes(&bank_forks.read().unwrap().working_bank())
|
||||
|
@ -1480,7 +1553,7 @@ impl ClusterInfo {
|
|||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
Self::handle_packets(obj, blocktree, &stakes, reqs, response_sender);
|
||||
Self::handle_packets(obj, &recycler, blocktree, &stakes, reqs, response_sender);
|
||||
Ok(())
|
||||
}
|
||||
pub fn listen(
|
||||
|
@ -1492,11 +1565,13 @@ impl ClusterInfo {
|
|||
exit: &Arc<AtomicBool>,
|
||||
) -> JoinHandle<()> {
|
||||
let exit = exit.clone();
|
||||
let recycler = PacketsRecycler::default();
|
||||
Builder::new()
|
||||
.name("solana-listen".to_string())
|
||||
.spawn(move || loop {
|
||||
let e = Self::run_listen(
|
||||
&me,
|
||||
&recycler,
|
||||
blocktree.as_ref(),
|
||||
bank_forks.as_ref(),
|
||||
&requests_receiver,
|
||||
|
@ -1513,6 +1588,7 @@ impl ClusterInfo {
|
|||
me.gossip.crds.table.len()
|
||||
);
|
||||
}
|
||||
thread_mem_usage::datapoint("solana-listen");
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
@ -1961,6 +2037,7 @@ mod tests {
|
|||
/// test window requests respond with the right shred, and do not overrun
|
||||
#[test]
|
||||
fn run_window_request() {
|
||||
let recycler = PacketsRecycler::default();
|
||||
solana_logger::setup();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
|
@ -1979,6 +2056,7 @@ mod tests {
|
|||
0,
|
||||
);
|
||||
let rv = ClusterInfo::run_window_request(
|
||||
&recycler,
|
||||
&me,
|
||||
&socketaddr_any!(),
|
||||
Some(&blocktree),
|
||||
|
@ -1986,7 +2064,7 @@ mod tests {
|
|||
0,
|
||||
0,
|
||||
);
|
||||
assert!(rv.is_empty());
|
||||
assert!(rv.is_none());
|
||||
let mut common_header = ShredCommonHeader::default();
|
||||
common_header.slot = 2;
|
||||
common_header.index = 1;
|
||||
|
@ -2003,6 +2081,7 @@ mod tests {
|
|||
.expect("Expect successful ledger write");
|
||||
|
||||
let rv = ClusterInfo::run_window_request(
|
||||
&recycler,
|
||||
&me,
|
||||
&socketaddr_any!(),
|
||||
Some(&blocktree),
|
||||
|
@ -2010,8 +2089,9 @@ mod tests {
|
|||
2,
|
||||
1,
|
||||
);
|
||||
assert!(!rv.is_empty());
|
||||
assert!(!rv.is_none());
|
||||
let rv: Vec<Shred> = rv
|
||||
.expect("packets")
|
||||
.packets
|
||||
.into_iter()
|
||||
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
|
||||
|
@ -2026,13 +2106,19 @@ mod tests {
|
|||
/// test run_window_requestwindow requests respond with the right shred, and do not overrun
|
||||
#[test]
|
||||
fn run_highest_window_request() {
|
||||
let recycler = PacketsRecycler::default();
|
||||
solana_logger::setup();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
|
||||
let rv =
|
||||
ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 0, 0);
|
||||
assert!(rv.is_empty());
|
||||
let rv = ClusterInfo::run_highest_window_request(
|
||||
&recycler,
|
||||
&socketaddr_any!(),
|
||||
Some(&blocktree),
|
||||
0,
|
||||
0,
|
||||
);
|
||||
assert!(rv.is_none());
|
||||
|
||||
let _ = fill_blocktree_slot_with_ticks(
|
||||
&blocktree,
|
||||
|
@ -2042,9 +2128,15 @@ mod tests {
|
|||
Hash::default(),
|
||||
);
|
||||
|
||||
let rv =
|
||||
ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 2, 1);
|
||||
let rv = ClusterInfo::run_highest_window_request(
|
||||
&recycler,
|
||||
&socketaddr_any!(),
|
||||
Some(&blocktree),
|
||||
2,
|
||||
1,
|
||||
);
|
||||
let rv: Vec<Shred> = rv
|
||||
.expect("packets")
|
||||
.packets
|
||||
.into_iter()
|
||||
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
|
||||
|
@ -2055,12 +2147,13 @@ mod tests {
|
|||
assert_eq!(rv[0].slot(), 2);
|
||||
|
||||
let rv = ClusterInfo::run_highest_window_request(
|
||||
&recycler,
|
||||
&socketaddr_any!(),
|
||||
Some(&blocktree),
|
||||
2,
|
||||
index + 1,
|
||||
);
|
||||
assert!(rv.is_empty());
|
||||
assert!(rv.is_none());
|
||||
}
|
||||
|
||||
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||
|
@ -2069,11 +2162,12 @@ mod tests {
|
|||
#[test]
|
||||
fn run_orphan() {
|
||||
solana_logger::setup();
|
||||
let recycler = PacketsRecycler::default();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
|
||||
let rv = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 2, 0);
|
||||
assert!(rv.is_empty());
|
||||
let rv = ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blocktree), 2, 0);
|
||||
assert!(rv.is_none());
|
||||
|
||||
// Create slots 1, 2, 3 with 5 shreds apiece
|
||||
let (shreds, _) = make_many_slot_entries(1, 3, 5);
|
||||
|
@ -2083,16 +2177,18 @@ mod tests {
|
|||
.expect("Expect successful ledger write");
|
||||
|
||||
// We don't have slot 4, so we don't know how to service this requeset
|
||||
let rv = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 4, 5);
|
||||
assert!(rv.is_empty());
|
||||
let rv = ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blocktree), 4, 5);
|
||||
assert!(rv.is_none());
|
||||
|
||||
// For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively
|
||||
// for this request
|
||||
let rv: Vec<_> = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 3, 5)
|
||||
.packets
|
||||
.iter()
|
||||
.map(|b| b.clone())
|
||||
.collect();
|
||||
let rv: Vec<_> =
|
||||
ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blocktree), 3, 5)
|
||||
.expect("run_orphan packets")
|
||||
.packets
|
||||
.iter()
|
||||
.map(|b| b.clone())
|
||||
.collect();
|
||||
let expected: Vec<_> = (1..=3)
|
||||
.rev()
|
||||
.map(|slot| {
|
||||
|
|
|
@ -30,6 +30,7 @@ use indexmap::map::IndexMap;
|
|||
use solana_sdk::hash::{hash, Hash};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::cmp;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Crds {
|
||||
|
@ -141,12 +142,25 @@ impl Crds {
|
|||
}
|
||||
}
|
||||
|
||||
/// find all the keys that are older or equal to min_ts
|
||||
pub fn find_old_labels(&self, min_ts: u64) -> Vec<CrdsValueLabel> {
|
||||
/// Find all the keys that are older or equal to the timeout.
|
||||
/// * timeouts - Pubkey specific timeouts with Pubkey::default() as the default timeout.
|
||||
pub fn find_old_labels(
|
||||
&self,
|
||||
now: u64,
|
||||
timeouts: &HashMap<Pubkey, u64>,
|
||||
) -> Vec<CrdsValueLabel> {
|
||||
let min_ts = *timeouts
|
||||
.get(&Pubkey::default())
|
||||
.expect("must have default timeout");
|
||||
self.table
|
||||
.iter()
|
||||
.filter_map(|(k, v)| {
|
||||
if v.local_timestamp <= min_ts {
|
||||
if now < v.local_timestamp
|
||||
|| (timeouts.get(&k.pubkey()).is_some()
|
||||
&& now - v.local_timestamp < timeouts[&k.pubkey()])
|
||||
{
|
||||
None
|
||||
} else if now - v.local_timestamp >= min_ts {
|
||||
Some(k)
|
||||
} else {
|
||||
None
|
||||
|
@ -237,25 +251,69 @@ mod test {
|
|||
assert_eq!(crds.table[&val2.label()].insert_timestamp, 3);
|
||||
}
|
||||
#[test]
|
||||
fn test_find_old_records() {
|
||||
fn test_find_old_records_default() {
|
||||
let mut crds = Crds::default();
|
||||
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
|
||||
assert_eq!(crds.insert(val.clone(), 1), Ok(None));
|
||||
|
||||
assert!(crds.find_old_labels(0).is_empty());
|
||||
assert_eq!(crds.find_old_labels(1), vec![val.label()]);
|
||||
assert_eq!(crds.find_old_labels(2), vec![val.label()]);
|
||||
let mut set = HashMap::new();
|
||||
set.insert(Pubkey::default(), 0);
|
||||
assert!(crds.find_old_labels(0, &set).is_empty());
|
||||
set.insert(Pubkey::default(), 1);
|
||||
assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]);
|
||||
set.insert(Pubkey::default(), 2);
|
||||
assert_eq!(crds.find_old_labels(4, &set), vec![val.label()]);
|
||||
}
|
||||
#[test]
|
||||
fn test_remove() {
|
||||
fn test_remove_default() {
|
||||
let mut crds = Crds::default();
|
||||
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
|
||||
assert_matches!(crds.insert(val.clone(), 1), Ok(_));
|
||||
|
||||
assert_eq!(crds.find_old_labels(1), vec![val.label()]);
|
||||
let mut set = HashMap::new();
|
||||
set.insert(Pubkey::default(), 1);
|
||||
assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]);
|
||||
crds.remove(&val.label());
|
||||
assert!(crds.find_old_labels(1).is_empty());
|
||||
assert!(crds.find_old_labels(2, &set).is_empty());
|
||||
}
|
||||
#[test]
|
||||
fn test_find_old_records_staked() {
|
||||
let mut crds = Crds::default();
|
||||
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
|
||||
assert_eq!(crds.insert(val.clone(), 1), Ok(None));
|
||||
let mut set = HashMap::new();
|
||||
//now < timestamp
|
||||
set.insert(Pubkey::default(), 0);
|
||||
set.insert(val.pubkey(), 0);
|
||||
assert!(crds.find_old_labels(0, &set).is_empty());
|
||||
|
||||
//pubkey shouldn't expire since its timeout is MAX
|
||||
set.insert(val.pubkey(), std::u64::MAX);
|
||||
assert!(crds.find_old_labels(2, &set).is_empty());
|
||||
|
||||
//default has max timeout, but pubkey should still expire
|
||||
set.insert(Pubkey::default(), std::u64::MAX);
|
||||
set.insert(val.pubkey(), 1);
|
||||
assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]);
|
||||
|
||||
set.insert(val.pubkey(), 2);
|
||||
assert!(crds.find_old_labels(2, &set).is_empty());
|
||||
assert_eq!(crds.find_old_labels(3, &set), vec![val.label()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_staked() {
|
||||
let mut crds = Crds::default();
|
||||
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
|
||||
assert_matches!(crds.insert(val.clone(), 1), Ok(_));
|
||||
let mut set = HashMap::new();
|
||||
|
||||
//default has max timeout, but pubkey should still expire
|
||||
set.insert(Pubkey::default(), std::u64::MAX);
|
||||
set.insert(val.pubkey(), 1);
|
||||
assert_eq!(crds.find_old_labels(2, &set), vec![val.label()]);
|
||||
crds.remove(&val.label());
|
||||
assert!(crds.find_old_labels(2, &set).is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_equal() {
|
||||
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
|
||||
|
|
|
@ -162,7 +162,21 @@ impl CrdsGossip {
|
|||
self.pull
|
||||
.process_pull_response(&mut self.crds, from, response, now)
|
||||
}
|
||||
pub fn purge(&mut self, now: u64) {
|
||||
|
||||
pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> {
|
||||
self.make_timeouts(&HashMap::new(), self.pull.crds_timeout)
|
||||
}
|
||||
|
||||
pub fn make_timeouts(
|
||||
&self,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
epoch_ms: u64,
|
||||
) -> HashMap<Pubkey, u64> {
|
||||
self.pull.make_timeouts(&self.id, stakes, epoch_ms)
|
||||
}
|
||||
|
||||
pub fn purge(&mut self, now: u64, timeouts: &HashMap<Pubkey, u64>) -> usize {
|
||||
let mut rv = 0;
|
||||
if now > self.push.msg_timeout {
|
||||
let min = now - self.push.msg_timeout;
|
||||
self.push.purge_old_pending_push_messages(&self.crds, min);
|
||||
|
@ -172,13 +186,17 @@ impl CrdsGossip {
|
|||
self.push.purge_old_received_cache(min);
|
||||
}
|
||||
if now > self.pull.crds_timeout {
|
||||
let min = now - self.pull.crds_timeout;
|
||||
self.pull.purge_active(&mut self.crds, &self.id, min);
|
||||
//sanity check
|
||||
let min = self.pull.crds_timeout;
|
||||
assert_eq!(timeouts[&self.id], std::u64::MAX);
|
||||
assert_eq!(timeouts[&Pubkey::default()], min);
|
||||
rv = self.pull.purge_active(&mut self.crds, now, &timeouts);
|
||||
}
|
||||
if now > 5 * self.pull.crds_timeout {
|
||||
let min = now - 5 * self.pull.crds_timeout;
|
||||
self.pull.purge_purged(min);
|
||||
}
|
||||
rv
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -263,13 +263,39 @@ impl CrdsGossipPull {
|
|||
}
|
||||
ret
|
||||
}
|
||||
pub fn make_timeouts_def(
|
||||
&self,
|
||||
self_id: &Pubkey,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
epoch_ms: u64,
|
||||
min_ts: u64,
|
||||
) -> HashMap<Pubkey, u64> {
|
||||
let mut timeouts: HashMap<Pubkey, u64> = stakes.keys().map(|s| (*s, epoch_ms)).collect();
|
||||
timeouts.insert(*self_id, std::u64::MAX);
|
||||
timeouts.insert(Pubkey::default(), min_ts);
|
||||
timeouts
|
||||
}
|
||||
|
||||
pub fn make_timeouts(
|
||||
&self,
|
||||
self_id: &Pubkey,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
epoch_ms: u64,
|
||||
) -> HashMap<Pubkey, u64> {
|
||||
self.make_timeouts_def(self_id, stakes, epoch_ms, self.crds_timeout)
|
||||
}
|
||||
|
||||
/// Purge values from the crds that are older then `active_timeout`
|
||||
/// The value_hash of an active item is put into self.purged_values queue
|
||||
pub fn purge_active(&mut self, crds: &mut Crds, self_id: &Pubkey, min_ts: u64) {
|
||||
let old = crds.find_old_labels(min_ts);
|
||||
pub fn purge_active(
|
||||
&mut self,
|
||||
crds: &mut Crds,
|
||||
now: u64,
|
||||
timeouts: &HashMap<Pubkey, u64>,
|
||||
) -> usize {
|
||||
let old = crds.find_old_labels(now, timeouts);
|
||||
let mut purged: VecDeque<_> = old
|
||||
.iter()
|
||||
.filter(|label| label.pubkey() != *self_id)
|
||||
.filter_map(|label| {
|
||||
let rv = crds
|
||||
.lookup_versioned(label)
|
||||
|
@ -278,7 +304,9 @@ impl CrdsGossipPull {
|
|||
rv
|
||||
})
|
||||
.collect();
|
||||
let ret = purged.len();
|
||||
self.purged_values.append(&mut purged);
|
||||
ret
|
||||
}
|
||||
/// Purge values from the `self.purged_values` queue that are older then purge_timeout
|
||||
pub fn purge_purged(&mut self, min_ts: u64) {
|
||||
|
@ -551,7 +579,8 @@ mod test {
|
|||
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
|
||||
|
||||
// purge
|
||||
node.purge_active(&mut node_crds, &node_pubkey, 1);
|
||||
let timeouts = node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1);
|
||||
node.purge_active(&mut node_crds, 2, &timeouts);
|
||||
|
||||
//verify self is still valid after purge
|
||||
assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);
|
||||
|
|
|
@ -5,6 +5,7 @@ use crate::packet::PacketsRecycler;
|
|||
use crate::poh_recorder::PohRecorder;
|
||||
use crate::result::{Error, Result};
|
||||
use crate::streamer::{self, PacketReceiver, PacketSender};
|
||||
use crate::thread_mem_usage;
|
||||
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info};
|
||||
use solana_perf::recycler::Recycler;
|
||||
use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT;
|
||||
|
@ -121,6 +122,7 @@ impl FetchStage {
|
|||
let fwd_thread_hdl = Builder::new()
|
||||
.name("solana-fetch-stage-fwd-rcvr".to_string())
|
||||
.spawn(move || loop {
|
||||
thread_mem_usage::datapoint("solana-fetch-stage-fwd-rcvr");
|
||||
if let Err(e) =
|
||||
Self::handle_forwarded_packets(&forward_receiver, &sender, &poh_recorder)
|
||||
{
|
||||
|
|
|
@ -12,6 +12,7 @@ pub mod chacha_cuda;
|
|||
pub mod cluster_info_vote_listener;
|
||||
pub mod commitment;
|
||||
pub mod shred_fetch_stage;
|
||||
pub mod thread_mem_usage;
|
||||
#[macro_use]
|
||||
pub mod contact_info;
|
||||
pub mod archiver;
|
||||
|
@ -82,3 +83,8 @@ extern crate solana_metrics;
|
|||
#[cfg(test)]
|
||||
#[macro_use]
|
||||
extern crate matches;
|
||||
|
||||
extern crate jemallocator;
|
||||
|
||||
#[global_allocator]
|
||||
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
||||
|
|
|
@ -8,6 +8,8 @@ use crate::consensus::{StakeLockout, Tower};
|
|||
use crate::poh_recorder::PohRecorder;
|
||||
use crate::result::{Error, Result};
|
||||
use crate::rpc_subscriptions::RpcSubscriptions;
|
||||
use crate::thread_mem_usage;
|
||||
use jemalloc_ctl::thread::allocatedp;
|
||||
use solana_ledger::{
|
||||
bank_forks::BankForks,
|
||||
block_error::BlockError,
|
||||
|
@ -86,6 +88,7 @@ struct ForkStats {
|
|||
block_height: u64,
|
||||
has_voted: bool,
|
||||
is_recent: bool,
|
||||
is_empty: bool,
|
||||
vote_threshold: bool,
|
||||
is_locked_out: bool,
|
||||
stake_lockouts: HashMap<u64, StakeLockout>,
|
||||
|
@ -212,20 +215,30 @@ impl ReplayStage {
|
|||
let mut last_reset = Hash::default();
|
||||
let mut partition = false;
|
||||
loop {
|
||||
let allocated = allocatedp::mib().unwrap();
|
||||
let allocated = allocated.read().unwrap();
|
||||
|
||||
thread_mem_usage::datapoint("solana-replay-stage");
|
||||
let now = Instant::now();
|
||||
// Stop getting entries if we get exit signal
|
||||
if exit_.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
let start = allocated.get();
|
||||
Self::generate_new_bank_forks(
|
||||
&blocktree,
|
||||
&mut bank_forks.write().unwrap(),
|
||||
&leader_schedule_cache,
|
||||
);
|
||||
datapoint_debug!(
|
||||
"replay_stage-memory",
|
||||
("generate_new_bank_forks", (allocated.get() - start) as i64, i64),
|
||||
);
|
||||
|
||||
let mut tpu_has_bank = poh_recorder.lock().unwrap().has_bank();
|
||||
|
||||
let start = allocated.get();
|
||||
let did_complete_bank = Self::replay_active_banks(
|
||||
&blocktree,
|
||||
&bank_forks,
|
||||
|
@ -233,14 +246,23 @@ impl ReplayStage {
|
|||
&mut progress,
|
||||
&slot_full_senders,
|
||||
);
|
||||
datapoint_debug!(
|
||||
"replay_stage-memory",
|
||||
("replay_active_banks", (allocated.get() - start) as i64, i64),
|
||||
);
|
||||
|
||||
let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
|
||||
loop {
|
||||
let (vote_bank, heaviest) =
|
||||
Self::select_fork(&ancestors, &bank_forks, &tower, &mut progress);
|
||||
let start = allocated.get();
|
||||
let (vote_bank, heaviest) = Self::select_fork(&ancestors, &bank_forks, &tower, &mut progress);
|
||||
datapoint_debug!(
|
||||
"replay_stage-memory",
|
||||
("select_fork", (allocated.get() - start) as i64, i64),
|
||||
);
|
||||
let done = vote_bank.is_none();
|
||||
let mut vote_bank_slot = 0;
|
||||
let reset_bank = vote_bank.as_ref().map(|b| b.0.clone()).or(heaviest);
|
||||
let start = allocated.get();
|
||||
if let Some((bank, total_staked)) = vote_bank {
|
||||
info!("voting: {}", bank.slot());
|
||||
subscriptions.notify_subscribers(bank.slot(), &bank_forks);
|
||||
|
@ -271,6 +293,11 @@ impl ReplayStage {
|
|||
&snapshot_package_sender,
|
||||
)?;
|
||||
}
|
||||
datapoint_debug!(
|
||||
"replay_stage-memory",
|
||||
("votable_bank", (allocated.get() - start) as i64, i64),
|
||||
);
|
||||
let start = allocated.get();
|
||||
if let Some(bank) = reset_bank {
|
||||
if last_reset != bank.last_blockhash() {
|
||||
Self::reset_poh_recorder(
|
||||
|
@ -286,6 +313,7 @@ impl ReplayStage {
|
|||
if !partition && vote_bank_slot != bank.slot() {
|
||||
warn!("PARTITION DETECTED waiting to join fork: {} last vote: {:?}", bank.slot(), tower.last_vote());
|
||||
inc_new_counter_info!("replay_stage-partition_detected", 1);
|
||||
datapoint_info!("replay_stage-partition", ("slot", bank.slot() as i64, i64));
|
||||
partition = true;
|
||||
} else if partition && vote_bank_slot == bank.slot() {
|
||||
warn!("PARTITION resolved fork: {} last vote: {:?}", bank.slot(), tower.last_vote());
|
||||
|
@ -294,11 +322,16 @@ impl ReplayStage {
|
|||
}
|
||||
}
|
||||
}
|
||||
datapoint_debug!(
|
||||
"replay_stage-memory",
|
||||
("reset_bank", (allocated.get() - start) as i64, i64),
|
||||
);
|
||||
if done {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let start = allocated.get();
|
||||
if !tpu_has_bank {
|
||||
Self::maybe_start_leader(
|
||||
&my_pubkey,
|
||||
|
@ -316,11 +349,11 @@ impl ReplayStage {
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
inc_new_counter_info!(
|
||||
"replay_stage-duration",
|
||||
duration_as_ms(&now.elapsed()) as usize
|
||||
);
|
||||
datapoint_debug!(
|
||||
"replay_stage-memory",
|
||||
("start_leader", (allocated.get() - start) as i64, i64),
|
||||
);
|
||||
datapoint_debug!("replay_stage", ("duration", duration_as_ms(&now.elapsed()) as i64, i64));
|
||||
if did_complete_bank {
|
||||
//just processed a bank, skip the signal; maybe there's more slots available
|
||||
continue;
|
||||
|
@ -691,6 +724,7 @@ impl ReplayStage {
|
|||
progress: &mut HashMap<u64, ForkProgress>,
|
||||
) -> VoteAndPoHBank {
|
||||
let tower_start = Instant::now();
|
||||
|
||||
let mut frozen_banks: Vec<_> = bank_forks
|
||||
.read()
|
||||
.unwrap()
|
||||
|
@ -699,8 +733,14 @@ impl ReplayStage {
|
|||
.cloned()
|
||||
.collect();
|
||||
frozen_banks.sort_by_key(|bank| bank.slot());
|
||||
let num_frozen_banks = frozen_banks.len();
|
||||
|
||||
trace!("frozen_banks {}", frozen_banks.len());
|
||||
let num_old_banks = frozen_banks
|
||||
.iter()
|
||||
.filter(|b| b.slot() < tower.root().unwrap_or(0))
|
||||
.count();
|
||||
|
||||
let stats: Vec<ForkStats> = frozen_banks
|
||||
.iter()
|
||||
.map(|bank| {
|
||||
|
@ -733,7 +773,7 @@ impl ReplayStage {
|
|||
);
|
||||
if !stats.computed {
|
||||
if !stats.vote_threshold {
|
||||
info!("vote threshold check failed: {}", bank.slot());
|
||||
debug!("vote threshold check failed: {}", bank.slot());
|
||||
}
|
||||
stats.computed = true;
|
||||
}
|
||||
|
@ -747,6 +787,15 @@ impl ReplayStage {
|
|||
stats
|
||||
})
|
||||
.collect();
|
||||
let num_not_recent = stats.iter().filter(|s| !s.is_recent).count();
|
||||
let num_has_voted = stats.iter().filter(|s| s.has_voted).count();
|
||||
let num_empty = stats.iter().filter(|s| s.is_empty).count();
|
||||
let num_threshold_failure = stats.iter().filter(|s| !s.vote_threshold).count();
|
||||
let num_votable_threshold_failure = stats
|
||||
.iter()
|
||||
.filter(|s| s.is_recent && !s.has_voted && !s.vote_threshold)
|
||||
.count();
|
||||
|
||||
let mut candidates: Vec<_> = frozen_banks
|
||||
.iter()
|
||||
.zip(stats.iter())
|
||||
|
@ -780,7 +829,21 @@ impl ReplayStage {
|
|||
weights,
|
||||
rv.0.is_some()
|
||||
);
|
||||
inc_new_counter_info!("replay_stage-tower_duration", ms as usize);
|
||||
datapoint_debug!(
|
||||
"replay_stage-select_fork",
|
||||
("frozen_banks", num_frozen_banks as i64, i64),
|
||||
("not_recent", num_not_recent as i64, i64),
|
||||
("has_voted", num_has_voted as i64, i64),
|
||||
("old_banks", num_old_banks as i64, i64),
|
||||
("empty_banks", num_empty as i64, i64),
|
||||
("threshold_failure", num_threshold_failure as i64, i64),
|
||||
(
|
||||
"votable_threshold_failure",
|
||||
num_votable_threshold_failure as i64,
|
||||
i64
|
||||
),
|
||||
("tower_duration", ms as i64, i64),
|
||||
);
|
||||
rv
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
use crate::packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH};
|
||||
use crate::recvmmsg::NUM_RCVMMSGS;
|
||||
use crate::result::{Error, Result};
|
||||
use crate::thread_mem_usage;
|
||||
use solana_sdk::timing::duration_as_ms;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
@ -75,6 +76,7 @@ pub fn receiver(
|
|||
Builder::new()
|
||||
.name("solana-receiver".to_string())
|
||||
.spawn(move || {
|
||||
thread_mem_usage::datapoint(name);
|
||||
let _ = recv_loop(&sock, exit, &packet_sender, &recycler.clone(), name);
|
||||
})
|
||||
.unwrap()
|
||||
|
@ -111,6 +113,7 @@ pub fn responder(name: &'static str, sock: Arc<UdpSocket>, r: PacketReceiver) ->
|
|||
Builder::new()
|
||||
.name(format!("solana-responder-{}", name))
|
||||
.spawn(move || loop {
|
||||
thread_mem_usage::datapoint(name);
|
||||
if let Err(e) = recv_send(&sock, &r) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
use jemalloc_ctl::thread;
|
||||
use solana_metrics::datapoint_debug;
|
||||
|
||||
pub fn datapoint(name: &'static str) {
|
||||
let allocated = thread::allocatedp::mib().unwrap();
|
||||
let allocated = allocated.read().unwrap();
|
||||
let mem = allocated.get();
|
||||
datapoint_debug!("thread-memory", (name, mem as i64, i64));
|
||||
}
|
|
@ -283,7 +283,8 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
|
|||
let requests: Vec<_> = network_values
|
||||
.par_iter()
|
||||
.map(|node| {
|
||||
node.lock().unwrap().purge(now);
|
||||
let timeouts = node.lock().unwrap().make_timeouts_test();
|
||||
node.lock().unwrap().purge(now, &timeouts);
|
||||
node.lock().unwrap().new_push_messages(now)
|
||||
})
|
||||
.collect();
|
||||
|
|
|
@ -245,14 +245,27 @@ fn run_network_partition(partitions: &[usize]) {
|
|||
sleep(Duration::from_millis(timeout as u64));
|
||||
}
|
||||
info!("PARTITION_TEST done sleeping until partition end timeout");
|
||||
info!("PARTITION_TEST spending on all ndoes");
|
||||
cluster_tests::spend_and_verify_all_nodes(
|
||||
&cluster.entry_point_info,
|
||||
&cluster.funding_keypair,
|
||||
num_nodes,
|
||||
HashSet::new(),
|
||||
);
|
||||
info!("PARTITION_TEST done spending on all ndoes");
|
||||
info!("PARTITION_TEST discovering nodes");
|
||||
let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, num_nodes).unwrap();
|
||||
info!("PARTITION_TEST discovered {} nodes", cluster_nodes.len());
|
||||
info!("PARTITION_TEST looking for new roots on all nodes");
|
||||
let mut roots = vec![HashSet::new(); cluster_nodes.len()];
|
||||
let mut done = false;
|
||||
while !done {
|
||||
for (i, ingress_node) in cluster_nodes.iter().enumerate() {
|
||||
let client = create_client(
|
||||
ingress_node.client_facing_addr(),
|
||||
solana_core::cluster_info::VALIDATOR_PORT_RANGE,
|
||||
);
|
||||
let slot = client.get_slot().unwrap_or(0);
|
||||
roots[i].insert(slot);
|
||||
let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0);
|
||||
info!("PARTITION_TEST min observed roots {}/16", min_node);
|
||||
done = min_node >= 16;
|
||||
}
|
||||
sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2));
|
||||
}
|
||||
info!("PARTITION_TEST done spending on all node");
|
||||
}
|
||||
|
||||
#[allow(unused_attributes)]
|
||||
|
|
|
@ -70,6 +70,15 @@ impl Packets {
|
|||
recycler: Some(recycler),
|
||||
}
|
||||
}
|
||||
pub fn new_with_recycler_data(
|
||||
recycler: &PacketsRecycler,
|
||||
name: &'static str,
|
||||
mut packets: Vec<Packet>,
|
||||
) -> Self {
|
||||
let mut vec = Self::new_with_recycler(recycler.clone(), packets.len(), name);
|
||||
vec.packets.append(&mut packets);
|
||||
vec
|
||||
}
|
||||
|
||||
pub fn set_addr(&mut self, addr: &SocketAddr) {
|
||||
for m in self.packets.iter_mut() {
|
||||
|
@ -99,8 +108,15 @@ pub fn to_packets<T: Serialize>(xs: &[T]) -> Vec<Packets> {
|
|||
to_packets_chunked(xs, NUM_PACKETS)
|
||||
}
|
||||
|
||||
pub fn to_packets_with_destination<T: Serialize>(dests_and_data: &[(SocketAddr, T)]) -> Packets {
|
||||
let mut out = Packets::default();
|
||||
pub fn to_packets_with_destination<T: Serialize>(
|
||||
recycler: PacketsRecycler,
|
||||
dests_and_data: &[(SocketAddr, T)],
|
||||
) -> Packets {
|
||||
let mut out = Packets::new_with_recycler(
|
||||
recycler,
|
||||
dests_and_data.len(),
|
||||
"to_packets_with_destination",
|
||||
);
|
||||
out.packets.resize(dests_and_data.len(), Packet::default());
|
||||
for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) {
|
||||
if let Err(e) = Packet::populate_packet(o, Some(&dest_and_data.0), &dest_and_data.1) {
|
||||
|
|
|
@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex};
|
|||
#[derive(Debug, Default)]
|
||||
struct RecyclerStats {
|
||||
total: AtomicUsize,
|
||||
freed: AtomicUsize,
|
||||
reuse: AtomicUsize,
|
||||
max_gc: AtomicUsize,
|
||||
}
|
||||
|
@ -85,9 +86,10 @@ impl<T: Default + Reset> Recycler<T> {
|
|||
return x;
|
||||
}
|
||||
|
||||
let total = self.stats.total.fetch_add(1, Ordering::Relaxed);
|
||||
trace!(
|
||||
"allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
|
||||
self.stats.total.fetch_add(1, Ordering::Relaxed),
|
||||
total,
|
||||
name,
|
||||
self.id,
|
||||
self.stats.reuse.load(Ordering::Relaxed),
|
||||
|
@ -111,6 +113,16 @@ impl<T: Default + Reset> Recycler<T> {
|
|||
.max_gc
|
||||
.compare_and_swap(max_gc, len, Ordering::Relaxed);
|
||||
}
|
||||
let total = self.stats.total.load(Ordering::Relaxed);
|
||||
let reuse = self.stats.reuse.load(Ordering::Relaxed);
|
||||
let freed = self.stats.total.fetch_add(1, Ordering::Relaxed);
|
||||
datapoint_debug!(
|
||||
"recycler",
|
||||
("gc_len", len as i64, i64),
|
||||
("total", total as i64, i64),
|
||||
("freed", freed as i64, i64),
|
||||
("reuse", reuse as i64, i64),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -561,6 +561,7 @@ impl AccountsDB {
|
|||
}
|
||||
|
||||
pub fn purge_zero_lamport_accounts(&self, ancestors: &HashMap<u64, usize>) {
|
||||
self.report_store_stats();
|
||||
let accounts_index = self.accounts_index.read().unwrap();
|
||||
let mut purges = Vec::new();
|
||||
accounts_index.scan_accounts(ancestors, |pubkey, (account_info, slot)| {
|
||||
|
@ -841,6 +842,39 @@ impl AccountsDB {
|
|||
infos
|
||||
}
|
||||
|
||||
fn report_store_stats(&self) {
|
||||
let mut total_count = 0;
|
||||
let mut min = std::usize::MAX;
|
||||
let mut min_slot = 0;
|
||||
let mut max = 0;
|
||||
let mut max_slot = 0;
|
||||
let mut newest_slot = 0;
|
||||
let mut oldest_slot = std::u64::MAX;
|
||||
let stores = self.storage.read().unwrap();
|
||||
for (slot, slot_stores) in &stores.0 {
|
||||
total_count += slot_stores.len();
|
||||
if slot_stores.len() < min {
|
||||
min = slot_stores.len();
|
||||
min_slot = *slot;
|
||||
}
|
||||
|
||||
if slot_stores.len() > max {
|
||||
max = slot_stores.len();
|
||||
max_slot = *slot;
|
||||
}
|
||||
if *slot > newest_slot {
|
||||
newest_slot = *slot;
|
||||
}
|
||||
|
||||
if *slot < oldest_slot {
|
||||
oldest_slot = *slot;
|
||||
}
|
||||
}
|
||||
info!("accounts_db: total_stores: {} newest_slot: {} oldest_slot: {} max_slot: {} (num={}) min_slot: {} (num={})",
|
||||
total_count, newest_slot, oldest_slot, max_slot, max, min_slot, min);
|
||||
datapoint_info!("accounts_db-stores", ("total_count", total_count, i64));
|
||||
}
|
||||
|
||||
pub fn verify_hash_internal_state(&self, slot: Slot, ancestors: &HashMap<Slot, usize>) -> bool {
|
||||
let mut hash_state = BankHash::default();
|
||||
let hashes: Vec<_> = self.scan_accounts(
|
||||
|
|
|
@ -1385,6 +1385,7 @@ impl Bank {
|
|||
/// A snapshot bank should be purged of 0 lamport accounts which are not part of the hash
|
||||
/// calculation and could shield other real accounts.
|
||||
pub fn verify_snapshot_bank(&self) -> bool {
|
||||
self.purge_zero_lamport_accounts();
|
||||
self.rc
|
||||
.accounts
|
||||
.verify_hash_internal_state(self.slot(), &self.ancestors)
|
||||
|
|
|
@ -31,6 +31,8 @@ pub const DEFAULT_SLOTS_PER_TURN: u64 = 32 * 4;
|
|||
// leader schedule is governed by this
|
||||
pub const NUM_CONSECUTIVE_LEADER_SLOTS: u64 = 4;
|
||||
|
||||
pub const DEFAULT_MS_PER_SLOT: u64 = 1_000 * DEFAULT_TICKS_PER_SLOT / DEFAULT_TICKS_PER_SECOND;
|
||||
|
||||
/// The time window of recent block hash values that the bank will track the signatures
|
||||
/// of over. Once the bank discards a block hash, it will reject any transactions that use
|
||||
/// that `recent_blockhash` in a transaction. Lowering this value reduces memory consumption,
|
||||
|
|
Loading…
Reference in New Issue