tracks number of shreds sent and received at different distances from the root (#25989)

This commit is contained in:
behzad nouri 2022-06-17 21:33:23 +00:00 committed by GitHub
parent 7d54807da0
commit b3d1f8d1ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 173 additions and 71 deletions

View File

@ -49,7 +49,7 @@ fn get_retransmit_peers_deterministic(
0,
0,
);
let (_neighbors, _children) = cluster_nodes.get_retransmit_peers(
let (_root_distance, _neighbors, _children) = cluster_nodes.get_retransmit_peers(
*slot_leader,
&shred,
root_bank,

View File

@ -180,33 +180,34 @@ impl ClusterNodes<RetransmitStage> {
shred: &Shred,
root_bank: &Bank,
fanout: usize,
) -> Vec<SocketAddr> {
let (neighbors, children) =
) -> (/*root_distance:*/ usize, Vec<SocketAddr>) {
let (root_distance, neighbors, children) =
self.get_retransmit_peers(slot_leader, shred, root_bank, fanout);
if neighbors.is_empty() {
let peers = children.into_iter().filter_map(Node::contact_info);
return peers.map(|peer| peer.tvu).collect();
let addrs = peers.map(|peer| peer.tvu).collect();
return (root_distance, addrs);
}
// If the node is on the critical path (i.e. the first node in each
// neighborhood), it should send the packet to tvu socket of its
// children and also tvu_forward socket of its neighbors. Otherwise it
// should only forward to tvu_forwards socket of its children.
if neighbors[0].pubkey() != self.pubkey {
return children
let addrs = children
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu_forwards))
.collect();
.filter_map(|node| Some(node.contact_info()?.tvu_forwards));
return (root_distance, addrs.collect());
}
// First neighbor is this node itself, so skip it.
neighbors[1..]
let addrs = neighbors[1..]
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu_forwards))
.chain(
children
.iter()
.filter_map(|node| Some(node.contact_info()?.tvu)),
)
.collect()
);
(root_distance, addrs.collect())
}
pub fn get_retransmit_peers(
@ -216,6 +217,7 @@ impl ClusterNodes<RetransmitStage> {
root_bank: &Bank,
fanout: usize,
) -> (
usize, // distance from the root node
Vec<&Node>, // neighbors
Vec<&Node>, // children
) {
@ -237,14 +239,28 @@ impl ClusterNodes<RetransmitStage> {
.position(|node| node.pubkey() == self.pubkey)
.unwrap();
if drop_redundant_turbine_path(shred.slot(), root_bank) {
let root_distance = if self_index == 0 {
0
} else if self_index <= fanout {
1
} else {
2
};
let peers = get_retransmit_peers(fanout, self_index, &nodes);
return (Vec::default(), peers.collect());
return (root_distance, Vec::default(), peers.collect());
}
let root_distance = if self_index == 0 {
0
} else if self_index < fanout {
1
} else {
2
};
let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes);
// Assert that the node itself is included in the set of neighbors, at
// the right offset.
debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey);
(neighbors, children)
(root_distance, neighbors, children)
}
}

View File

@ -54,22 +54,16 @@ const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
#[derive(Default)]
struct RetransmitSlotStats {
num_shreds: usize,
num_nodes: usize,
asof: u64, // Latest timestamp struct was updated.
outset: u64, // 1st shred retransmit timestamp.
// Number of shreds sent and received at different
// distances from the turbine broadcast root.
num_shreds_received: [usize; 3],
num_shreds_sent: [usize; 3],
}
impl AddAssign for RetransmitSlotStats {
fn add_assign(&mut self, other: Self) {
*self = Self {
num_shreds: self.num_shreds + other.num_shreds,
num_nodes: self.num_nodes + other.num_nodes,
}
}
}
#[derive(Default)]
struct RetransmitStats {
since: Option<Instant>,
since: Instant,
num_nodes: AtomicUsize,
num_addrs_failed: AtomicUsize,
num_shreds: usize,
@ -80,7 +74,7 @@ struct RetransmitStats {
epoch_cache_update: u64,
retransmit_total: AtomicU64,
compute_turbine_peers_total: AtomicU64,
slot_stats: HashMap<Slot, RetransmitSlotStats>,
slot_stats: LruCache<Slot, RetransmitSlotStats>,
unknown_shred_slot_leader: AtomicUsize,
}
@ -93,20 +87,13 @@ impl RetransmitStats {
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
) {
const SUBMIT_CADENCE: Duration = Duration::from_secs(2);
let elapsed = self.since.as_ref().map(Instant::elapsed);
if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE {
if self.since.elapsed() < SUBMIT_CADENCE {
return;
}
let num_peers = cluster_nodes_cache
.get(root_bank.slot(), root_bank, working_bank, cluster_info)
.num_peers();
let stats = std::mem::replace(
self,
Self {
since: Some(Instant::now()),
..Self::default()
},
);
let stats = std::mem::replace(self, Self::new(Instant::now()));
datapoint_info!("retransmit-num_nodes", ("count", num_peers, i64));
datapoint_info!(
"retransmit-stage",
@ -134,14 +121,6 @@ impl RetransmitStats {
i64
),
);
for (slot, stats) in stats.slot_stats {
datapoint_info!(
"retransmit-stage-slot-stats",
("slot", slot, i64),
("num_shreds", stats.num_shreds, i64),
("num_nodes", stats.num_nodes, i64),
);
}
}
}
@ -251,7 +230,7 @@ fn retransmit(
let retransmit_shred = |shred: &Shred, socket: &UdpSocket| {
if should_skip_retransmit(shred, shreds_received) {
stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed);
return 0;
return None;
}
let shred_slot = shred.slot();
max_slots
@ -279,13 +258,14 @@ fn retransmit(
stats
.unknown_shred_slot_leader
.fetch_add(1, Ordering::Relaxed);
return 0;
return None;
}
};
let cluster_nodes =
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
let addrs: Vec<_> = cluster_nodes
.get_retransmit_addrs(slot_leader, shred, &root_bank, DATA_PLANE_FANOUT)
let (root_distance, addrs) =
cluster_nodes.get_retransmit_addrs(slot_leader, shred, &root_bank, DATA_PLANE_FANOUT);
let addrs: Vec<_> = addrs
.into_iter()
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect();
@ -315,43 +295,29 @@ fn retransmit(
stats
.retransmit_total
.fetch_add(retransmit_time.as_us(), Ordering::Relaxed);
num_nodes
Some((root_distance, num_nodes))
};
fn merge<K, V>(mut acc: HashMap<K, V>, other: HashMap<K, V>) -> HashMap<K, V>
where
K: Eq + std::hash::Hash,
V: Default + AddAssign,
{
if acc.len() < other.len() {
return merge(other, acc);
}
for (key, value) in other {
*acc.entry(key).or_default() += value;
}
acc
}
let slot_stats = thread_pool.install(|| {
shreds
.into_par_iter()
.with_min_len(4)
.map(|shred| {
.filter_map(|shred| {
let index = thread_pool.current_thread_index().unwrap();
let socket = &sockets[index % sockets.len()];
let num_nodes = retransmit_shred(&shred, socket);
(shred.slot(), num_nodes)
Some((shred.slot(), retransmit_shred(&shred, socket)?))
})
.fold(
HashMap::<Slot, RetransmitSlotStats>::new,
|mut acc, (slot, num_nodes)| {
let stats = acc.entry(slot).or_default();
stats.num_nodes += num_nodes;
stats.num_shreds += 1;
|mut acc, (slot, (root_distance, num_nodes))| {
let now = timestamp();
let slot_stats = acc.entry(slot).or_default();
slot_stats.record(now, root_distance, num_nodes);
acc
},
)
.reduce(HashMap::new, merge)
.reduce(HashMap::new, RetransmitSlotStats::merge)
});
stats.slot_stats = merge(std::mem::take(&mut stats.slot_stats), slot_stats);
stats.upsert_slot_stats(slot_stats);
timer_start.stop();
stats.total_time += timer_start.as_us();
stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache);
@ -380,7 +346,7 @@ pub fn retransmitter(
CLUSTER_NODES_CACHE_TTL,
);
let mut hasher_reset_ts = Instant::now();
let mut stats = RetransmitStats::default();
let mut stats = RetransmitStats::new(Instant::now());
let shreds_received = Mutex::new((LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default()));
let first_shreds_received = Mutex::<BTreeSet<Slot>>::default();
let num_threads = get_thread_count().min(8).max(sockets.len());
@ -526,6 +492,126 @@ impl RetransmitStage {
}
}
impl AddAssign for RetransmitSlotStats {
fn add_assign(&mut self, other: Self) {
let Self {
asof,
outset,
num_shreds_received,
num_shreds_sent,
} = other;
self.asof = self.asof.max(asof);
self.outset = if self.outset == 0 {
outset
} else {
self.outset.min(outset)
};
for k in 0..3 {
self.num_shreds_received[k] += num_shreds_received[k];
self.num_shreds_sent[k] += num_shreds_sent[k];
}
}
}
impl RetransmitStats {
const SLOT_STATS_CACHE_CAPACITY: usize = 750;
fn new(now: Instant) -> Self {
Self {
since: now,
num_nodes: AtomicUsize::default(),
num_addrs_failed: AtomicUsize::default(),
num_shreds: 0usize,
num_shreds_skipped: AtomicUsize::default(),
total_batches: 0usize,
total_time: 0u64,
epoch_fetch: 0u64,
epoch_cache_update: 0u64,
retransmit_total: AtomicU64::default(),
compute_turbine_peers_total: AtomicU64::default(),
// Cache capacity is manually enforced.
slot_stats: LruCache::<Slot, RetransmitSlotStats>::unbounded(),
unknown_shred_slot_leader: AtomicUsize::default(),
}
}
fn upsert_slot_stats<I>(&mut self, feed: I)
where
I: IntoIterator<Item = (Slot, RetransmitSlotStats)>,
{
for (slot, slot_stats) in feed {
match self.slot_stats.get_mut(&slot) {
None => {
self.slot_stats.put(slot, slot_stats);
}
Some(entry) => {
*entry += slot_stats;
}
}
}
while self.slot_stats.len() > Self::SLOT_STATS_CACHE_CAPACITY {
// Pop and submit metrics for the slot which was updated least
// recently. At this point the node most likely will not receive
// and retransmit any more shreds for this slot.
match self.slot_stats.pop_lru() {
Some((slot, stats)) => stats.submit(slot),
None => break,
}
}
}
}
impl RetransmitSlotStats {
fn record(&mut self, now: u64, root_distance: usize, num_nodes: usize) {
self.outset = if self.outset == 0 {
now
} else {
self.outset.min(now)
};
self.asof = self.asof.max(now);
self.num_shreds_received[root_distance] += 1;
self.num_shreds_sent[root_distance] += num_nodes;
}
fn merge(mut acc: HashMap<Slot, Self>, other: HashMap<Slot, Self>) -> HashMap<Slot, Self> {
if acc.len() < other.len() {
return Self::merge(other, acc);
}
for (key, value) in other {
*acc.entry(key).or_default() += value;
}
acc
}
fn submit(&self, slot: Slot) {
let num_shreds: usize = self.num_shreds_received.iter().sum();
let num_nodes: usize = self.num_shreds_sent.iter().sum();
let elapsed_millis = self.asof.saturating_sub(self.outset);
datapoint_info!(
"retransmit-stage-slot-stats",
("slot", slot, i64),
("outset_timestamp", self.outset, i64),
("elapsed_millis", elapsed_millis, i64),
("num_shreds", num_shreds, i64),
("num_nodes", num_nodes, i64),
("num_shreds_received_root", self.num_shreds_received[0], i64),
(
"num_shreds_received_1st_layer",
self.num_shreds_received[1],
i64
),
(
"num_shreds_received_2nd_layer",
self.num_shreds_received[2],
i64
),
("num_shreds_sent_root", self.num_shreds_sent[0], i64),
("num_shreds_sent_1st_layer", self.num_shreds_sent[1], i64),
("num_shreds_sent_2nd_layer", self.num_shreds_sent[2], i64),
);
}
}
#[cfg(test)]
mod tests {
use {super::*, solana_ledger::shred::ShredFlags};