factors out common retransmit work for shreds of the same slot (#26218)

Shreds arriving at a node for retransmit tend to belong to the same slot
(or a just a couple of different slots). Slot leader and cluster nodes
are common for the shreds of the same slot, and so the common work to
look up these values can be factored out.
This commit first group-bys shreds by slot to factor out that common
lookup work.
This commit is contained in:
behzad nouri 2022-06-25 15:49:05 +00:00 committed by GitHub
parent 2efdb965dd
commit f1b82ec44d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 120 additions and 95 deletions

View File

@ -5,7 +5,7 @@ use {
crate::{
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_nodes::ClusterNodesCache,
cluster_nodes::{ClusterNodes, ClusterNodesCache},
cluster_slots::ClusterSlots,
cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver},
completed_data_sets_service::CompletedDataSetsSender,
@ -14,6 +14,7 @@ use {
window_service::{should_retransmit_and_persist, WindowService},
},
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
itertools::{izip, Itertools},
lru::LruCache,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_client::rpc_response::SlotUpdate,
@ -32,14 +33,18 @@ use {
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp},
solana_streamer::sendmmsg::{multi_target_send, SendPktsError},
solana_streamer::{
sendmmsg::{multi_target_send, SendPktsError},
socket::SocketAddrSpace,
},
std::{
collections::{HashMap, HashSet},
iter::repeat,
net::UdpSocket,
ops::AddAssign,
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
@ -67,7 +72,7 @@ struct RetransmitStats {
num_nodes: AtomicUsize,
num_addrs_failed: AtomicUsize,
num_shreds: usize,
num_shreds_skipped: AtomicUsize,
num_shreds_skipped: usize,
total_batches: usize,
total_time: u64,
epoch_fetch: u64,
@ -75,7 +80,7 @@ struct RetransmitStats {
retransmit_total: AtomicU64,
compute_turbine_peers_total: AtomicU64,
slot_stats: LruCache<Slot, RetransmitSlotStats>,
unknown_shred_slot_leader: AtomicUsize,
unknown_shred_slot_leader: usize,
}
impl RetransmitStats {
@ -104,11 +109,7 @@ impl RetransmitStats {
("num_nodes", stats.num_nodes.into_inner(), i64),
("num_addrs_failed", stats.num_addrs_failed.into_inner(), i64),
("num_shreds", stats.num_shreds, i64),
(
"num_shreds_skipped",
stats.num_shreds_skipped.into_inner(),
i64
),
("num_shreds_skipped", stats.num_shreds_skipped, i64),
("retransmit_total", stats.retransmit_total.into_inner(), i64),
(
"compute_turbine",
@ -117,7 +118,7 @@ impl RetransmitStats {
),
(
"unknown_shred_slot_leader",
stats.unknown_shred_slot_leader.into_inner(),
stats.unknown_shred_slot_leader,
i64
),
);
@ -130,11 +131,10 @@ type ShredFilter = LruCache<ShredId, Vec<u64>>;
// Returns true if shred is already received and should skip retransmit.
fn should_skip_retransmit(
shred: &Shred,
shreds_received: &Mutex<ShredFilter>,
shreds_received: &mut ShredFilter,
packet_hasher: &PacketHasher,
) -> bool {
let key = shred.id();
let mut shreds_received = shreds_received.lock().unwrap();
match shreds_received.get_mut(&key) {
Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true,
Some(sent) => {
@ -155,14 +155,14 @@ fn should_skip_retransmit(
}
fn maybe_reset_shreds_received_cache(
shreds_received: &Mutex<ShredFilter>,
shreds_received: &mut ShredFilter,
packet_hasher: &mut PacketHasher,
hasher_reset_ts: &mut Instant,
) {
const UPDATE_INTERVAL: Duration = Duration::from_secs(1);
if hasher_reset_ts.elapsed() >= UPDATE_INTERVAL {
*hasher_reset_ts = Instant::now();
shreds_received.lock().unwrap().clear();
shreds_received.clear();
packet_hasher.reset();
}
}
@ -178,7 +178,7 @@ fn retransmit(
stats: &mut RetransmitStats,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
hasher_reset_ts: &mut Instant,
shreds_received: &Mutex<ShredFilter>,
shreds_received: &mut ShredFilter,
packet_hasher: &mut PacketHasher,
max_slots: &MaxSlots,
rpc_subscriptions: Option<&RpcSubscriptions>,
@ -202,81 +202,61 @@ fn retransmit(
maybe_reset_shreds_received_cache(shreds_received, packet_hasher, hasher_reset_ts);
epoch_cache_update.stop();
stats.epoch_cache_update += epoch_cache_update.as_us();
let socket_addr_space = cluster_info.socket_addr_space();
let retransmit_shred = |shred: &Shred, socket: &UdpSocket| {
if should_skip_retransmit(shred, shreds_received, packet_hasher) {
stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed);
return None;
}
let shred_slot = shred.slot();
max_slots
.retransmit
.fetch_max(shred_slot, Ordering::Relaxed);
let mut compute_turbine_peers = Measure::start("turbine_start");
// TODO: consider using root-bank here for leader lookup!
// Shreds' signatures should be verified before they reach here, and if
// the leader is unknown they should fail signature check. So here we
// should expect to know the slot leader and otherwise skip the shred.
let slot_leader =
match leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)) {
// Lookup slot leader and cluster nodes for each slot.
let shreds: Vec<_> = shreds
.into_iter()
.filter(|shred| {
if should_skip_retransmit(shred, shreds_received, packet_hasher) {
stats.num_shreds_skipped += 1;
false
} else {
true
}
})
.into_group_map_by(Shred::slot)
.into_iter()
.filter_map(|(slot, shreds)| {
max_slots.retransmit.fetch_max(slot, Ordering::Relaxed);
// TODO: consider using root-bank here for leader lookup!
// Shreds' signatures should be verified before they reach here,
// and if the leader is unknown they should fail signature check.
// So here we should expect to know the slot leader and otherwise
// skip the shred.
let slot_leader = match leader_schedule_cache.slot_leader_at(slot, Some(&working_bank))
{
Some(pubkey) => pubkey,
None => {
stats
.unknown_shred_slot_leader
.fetch_add(1, Ordering::Relaxed);
stats.unknown_shred_slot_leader += shreds.len();
return None;
}
};
let cluster_nodes =
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
let (root_distance, addrs) =
cluster_nodes.get_retransmit_addrs(slot_leader, shred, &root_bank, DATA_PLANE_FANOUT);
let addrs: Vec<_> = addrs
.into_iter()
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect();
compute_turbine_peers.stop();
stats
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);
let mut retransmit_time = Measure::start("retransmit_to");
let num_nodes = match multi_target_send(socket, shred.payload(), &addrs) {
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
stats
.num_addrs_failed
.fetch_add(num_failed, Ordering::Relaxed);
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
addrs.len(),
);
addrs.len() - num_failed
}
};
retransmit_time.stop();
stats.num_nodes.fetch_add(num_nodes, Ordering::Relaxed);
stats
.retransmit_total
.fetch_add(retransmit_time.as_us(), Ordering::Relaxed);
Some((root_distance, num_nodes))
};
let cluster_nodes =
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
Some(izip!(shreds, repeat(slot_leader), repeat(cluster_nodes)))
})
.flatten()
.collect();
let socket_addr_space = cluster_info.socket_addr_space();
let slot_stats = thread_pool.install(|| {
shreds
.into_par_iter()
.with_min_len(4)
.filter_map(|shred| {
.map(|(shred, slot_leader, cluster_nodes)| {
let index = thread_pool.current_thread_index().unwrap();
let socket = &sockets[index % sockets.len()];
Some((shred.slot(), retransmit_shred(&shred, socket)?))
let (root_distance, num_nodes) = retransmit_shred(
&shred,
slot_leader,
&root_bank,
&cluster_nodes,
socket_addr_space,
&sockets[index % sockets.len()],
stats,
);
(shred.slot(), root_distance, num_nodes)
})
.fold(
HashMap::<Slot, RetransmitSlotStats>::new,
|mut acc, (slot, (root_distance, num_nodes))| {
|mut acc, (slot, root_distance, num_nodes)| {
let now = timestamp();
let slot_stats = acc.entry(slot).or_default();
slot_stats.record(now, root_distance, num_nodes);
@ -292,6 +272,51 @@ fn retransmit(
Ok(())
}
fn retransmit_shred(
shred: &Shred,
slot_leader: Pubkey,
root_bank: &Bank,
cluster_nodes: &ClusterNodes<RetransmitStage>,
socket_addr_space: &SocketAddrSpace,
socket: &UdpSocket,
stats: &RetransmitStats,
) -> (/*root_distance:*/ usize, /*num_nodes:*/ usize) {
let mut compute_turbine_peers = Measure::start("turbine_start");
let (root_distance, addrs) =
cluster_nodes.get_retransmit_addrs(slot_leader, shred, root_bank, DATA_PLANE_FANOUT);
let addrs: Vec<_> = addrs
.into_iter()
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect();
compute_turbine_peers.stop();
stats
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);
let mut retransmit_time = Measure::start("retransmit_to");
let num_nodes = match multi_target_send(socket, shred.payload(), &addrs) {
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
stats
.num_addrs_failed
.fetch_add(num_failed, Ordering::Relaxed);
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
addrs.len(),
);
addrs.len() - num_failed
}
};
retransmit_time.stop();
stats.num_nodes.fetch_add(num_nodes, Ordering::Relaxed);
stats
.retransmit_total
.fetch_add(retransmit_time.as_us(), Ordering::Relaxed);
(root_distance, num_nodes)
}
/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
/// See `cluster_info` for network layer definitions.
/// # Arguments
@ -315,7 +340,7 @@ pub fn retransmitter(
);
let mut hasher_reset_ts = Instant::now();
let mut stats = RetransmitStats::new(Instant::now());
let shreds_received = Mutex::new(LruCache::new(DEFAULT_LRU_SIZE));
let mut shreds_received = LruCache::<ShredId, _>::new(DEFAULT_LRU_SIZE);
let mut packet_hasher = PacketHasher::default();
let num_threads = get_thread_count().min(8).max(sockets.len());
let thread_pool = ThreadPoolBuilder::new()
@ -338,7 +363,7 @@ pub fn retransmitter(
&mut stats,
&cluster_nodes_cache,
&mut hasher_reset_ts,
&shreds_received,
&mut shreds_received,
&mut packet_hasher,
&max_slots,
rpc_subscriptions.as_deref(),
@ -488,7 +513,7 @@ impl RetransmitStats {
num_nodes: AtomicUsize::default(),
num_addrs_failed: AtomicUsize::default(),
num_shreds: 0usize,
num_shreds_skipped: AtomicUsize::default(),
num_shreds_skipped: 0usize,
total_batches: 0usize,
total_time: 0u64,
epoch_fetch: 0u64,
@ -497,7 +522,7 @@ impl RetransmitStats {
compute_turbine_peers_total: AtomicU64::default(),
// Cache capacity is manually enforced.
slot_stats: LruCache::<Slot, RetransmitSlotStats>::unbounded(),
unknown_shred_slot_leader: AtomicUsize::default(),
unknown_shred_slot_leader: 0usize,
}
}
@ -611,18 +636,18 @@ mod tests {
version,
0,
);
let shreds_received = Mutex::new(LruCache::new(100));
let mut shreds_received = LruCache::new(100);
let packet_hasher = PacketHasher::default();
// unique shred for (1, 5) should pass
assert!(!should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
// duplicate shred for (1, 5) blocked
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
@ -639,13 +664,13 @@ mod tests {
// first duplicate shred for (1, 5) passed
assert!(!should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
// then blocked
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
@ -662,12 +687,12 @@ mod tests {
// 2nd duplicate shred for (1, 5) blocked
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
@ -675,13 +700,13 @@ mod tests {
// Coding at (1, 5) passes
assert!(!should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
// then blocked
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
@ -689,13 +714,13 @@ mod tests {
// 2nd unique coding at (1, 5) passes
assert!(!should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
// same again is blocked
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
@ -703,12 +728,12 @@ mod tests {
// Another unique coding at (1, 5) always blocked
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&mut shreds_received,
&packet_hasher
));
}