skips shreds deserialization before retransmit (#26230)

Fully deserializing shreds in window-service before sending them to
retransmit stage adds latency to shreds propagation.
This commit instead channels through the payload and relies on only
partial deserialization of a few required fields: slot, shred-index,
shred-type.
This commit is contained in:
behzad nouri 2022-06-30 12:13:00 +00:00 committed by GitHub
parent 24c6f820ce
commit 88599fd760
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 119 additions and 62 deletions

View File

@ -51,7 +51,7 @@ fn get_retransmit_peers_deterministic(
);
let (_root_distance, _neighbors, _children) = cluster_nodes.get_retransmit_peers(
slot_leader,
&shred,
&shred.id(),
root_bank,
solana_gossip::cluster_info::DATA_PLANE_FANOUT,
);

View File

@ -155,7 +155,8 @@ fn bench_retransmitter(bencher: &mut Bencher) {
shred.set_index(index);
index += 1;
index %= 200;
let _ = shreds_sender.send(vec![shred.clone()]);
let shred = shred.payload().clone();
let _ = shreds_sender.send(vec![shred]);
}
slot += 1;

View File

@ -419,7 +419,7 @@ pub fn broadcast_shreds(
let root_bank = root_bank.clone();
shreds.flat_map(move |shred| {
repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs(
shred,
&shred.id(),
&root_bank,
DATA_PLANE_FANOUT,
socket_addr_space,

View File

@ -303,7 +303,12 @@ impl BroadcastRun for BroadcastDuplicatesRun {
.iter()
.filter_map(|shred| {
let addr = cluster_nodes
.get_broadcast_addrs(shred, &root_bank, DATA_PLANE_FANOUT, socket_addr_space)
.get_broadcast_addrs(
&shred.id(),
&root_bank,
DATA_PLANE_FANOUT,
socket_addr_space,
)
.first()
.copied()?;
let node = nodes.iter().find(|node| node.tvu == addr)?;

View File

@ -12,7 +12,7 @@ use {
crds_value::{CrdsData, CrdsValue},
weighted_shuffle::WeightedShuffle,
},
solana_ledger::shred::Shred,
solana_ledger::shred::ShredId,
solana_runtime::bank::Bank,
solana_sdk::{
clock::{Epoch, Slot},
@ -116,13 +116,13 @@ impl ClusterNodes<BroadcastStage> {
pub(crate) fn get_broadcast_addrs(
&self,
shred: &Shred,
shred: &ShredId,
root_bank: &Bank,
fanout: usize,
socket_addr_space: &SocketAddrSpace,
) -> Vec<SocketAddr> {
const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60);
let shred_seed = shred.id().seed(&self.pubkey);
let shred_seed = shred.seed(&self.pubkey);
let mut rng = ChaChaRng::from_seed(shred_seed);
let index = match self.weighted_shuffle.first(&mut rng) {
None => return Vec::default(),
@ -177,7 +177,7 @@ impl ClusterNodes<RetransmitStage> {
pub(crate) fn get_retransmit_addrs(
&self,
slot_leader: &Pubkey,
shred: &Shred,
shred: &ShredId,
root_bank: &Bank,
fanout: usize,
) -> (/*root_distance:*/ usize, Vec<SocketAddr>) {
@ -213,7 +213,7 @@ impl ClusterNodes<RetransmitStage> {
pub fn get_retransmit_peers(
&self,
slot_leader: &Pubkey,
shred: &Shred,
shred: &ShredId,
root_bank: &Bank,
fanout: usize,
) -> (
@ -221,7 +221,7 @@ impl ClusterNodes<RetransmitStage> {
Vec<&Node>, // neighbors
Vec<&Node>, // children
) {
let shred_seed = shred.id().seed(slot_leader);
let shred_seed = shred.seed(slot_leader);
let mut weighted_shuffle = self.weighted_shuffle.clone();
// Exclude slot leader from list of nodes.
if slot_leader == &self.pubkey {

View File

@ -4,7 +4,6 @@
use {
ahash::AHasher,
rand::{thread_rng, Rng},
solana_ledger::shred::Shred,
solana_perf::packet::Packet,
std::hash::Hasher,
};
@ -29,8 +28,8 @@ impl PacketHasher {
self.hash_data(packet.data(..).unwrap_or_default())
}
pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 {
self.hash_data(shred.payload())
pub(crate) fn hash_shred(&self, shred: &[u8]) -> u64 {
self.hash_data(shred)
}
fn hash_data(&self, data: &[u8]) -> u64 {

View File

@ -25,7 +25,7 @@ use {
solana_ledger::{
blockstore::Blockstore,
leader_schedule_cache::LeaderScheduleCache,
shred::{Shred, ShredId},
shred::{self, ShredId},
},
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
@ -136,11 +136,11 @@ type ShredFilter = LruCache<ShredId, Vec<u64>>;
// Returns true if shred is already received and should skip retransmit.
fn should_skip_retransmit(
shred: &Shred,
key: ShredId,
shred: &[u8],
shreds_received: &mut ShredFilter,
packet_hasher: &PacketHasher,
) -> bool {
let key = shred.id();
match shreds_received.get_mut(&key) {
Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true,
Some(sent) => {
@ -179,7 +179,7 @@ fn retransmit(
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
cluster_info: &ClusterInfo,
shreds_receiver: &Receiver<Vec<Shred>>,
shreds_receiver: &Receiver<Vec</*shred:*/ Vec<u8>>>,
sockets: &[UdpSocket],
stats: &mut RetransmitStats,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
@ -211,15 +211,16 @@ fn retransmit(
// Lookup slot leader and cluster nodes for each slot.
let shreds: Vec<_> = shreds
.into_iter()
.filter(|shred| {
if should_skip_retransmit(shred, shreds_received, packet_hasher) {
.filter_map(|shred| {
let key = shred::layout::get_shred_id(&shred)?;
if should_skip_retransmit(key, &shred, shreds_received, packet_hasher) {
stats.num_shreds_skipped += 1;
false
None
} else {
true
Some((key, shred))
}
})
.into_group_map_by(Shred::slot)
.into_group_map_by(|(key, _shred)| key.slot())
.into_iter()
.filter_map(|(slot, shreds)| {
max_slots.retransmit.fetch_max(slot, Ordering::Relaxed);
@ -255,8 +256,9 @@ fn retransmit(
shreds
.into_iter()
.enumerate()
.map(|(index, (shred, slot_leader, cluster_nodes))| {
.map(|(index, ((key, shred), slot_leader, cluster_nodes))| {
let (root_distance, num_nodes) = retransmit_shred(
&key,
&shred,
&slot_leader,
&root_bank,
@ -265,16 +267,17 @@ fn retransmit(
&sockets[index % sockets.len()],
stats,
);
(shred.slot(), root_distance, num_nodes)
(key.slot(), root_distance, num_nodes)
})
.fold(HashMap::new(), record)
} else {
thread_pool.install(|| {
shreds
.into_par_iter()
.map(|(shred, slot_leader, cluster_nodes)| {
.map(|((key, shred), slot_leader, cluster_nodes)| {
let index = thread_pool.current_thread_index().unwrap();
let (root_distance, num_nodes) = retransmit_shred(
&key,
&shred,
&slot_leader,
&root_bank,
@ -283,7 +286,7 @@ fn retransmit(
&sockets[index % sockets.len()],
stats,
);
(shred.slot(), root_distance, num_nodes)
(key.slot(), root_distance, num_nodes)
})
.fold(HashMap::new, record)
.reduce(HashMap::new, RetransmitSlotStats::merge)
@ -297,7 +300,8 @@ fn retransmit(
}
fn retransmit_shred(
shred: &Shred,
key: &ShredId,
shred: &[u8],
slot_leader: &Pubkey,
root_bank: &Bank,
cluster_nodes: &ClusterNodes<RetransmitStage>,
@ -307,7 +311,7 @@ fn retransmit_shred(
) -> (/*root_distance:*/ usize, /*num_nodes:*/ usize) {
let mut compute_turbine_peers = Measure::start("turbine_start");
let (root_distance, addrs) =
cluster_nodes.get_retransmit_addrs(slot_leader, shred, root_bank, DATA_PLANE_FANOUT);
cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, DATA_PLANE_FANOUT);
let addrs: Vec<_> = addrs
.into_iter()
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
@ -318,7 +322,7 @@ fn retransmit_shred(
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);
let mut retransmit_time = Measure::start("retransmit_to");
let num_nodes = match multi_target_send(socket, shred.payload(), &addrs) {
let num_nodes = match multi_target_send(socket, shred, &addrs) {
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
stats
@ -354,7 +358,7 @@ pub fn retransmitter(
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>,
shreds_receiver: Receiver<Vec<Shred>>,
shreds_receiver: Receiver<Vec</*shred:*/ Vec<u8>>>,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> JoinHandle<()> {
@ -630,7 +634,10 @@ impl RetransmitSlotStats {
#[cfg(test)]
mod tests {
use {super::*, solana_ledger::shred::ShredFlags};
use {
super::*,
solana_ledger::shred::{Shred, ShredFlags},
};
#[test]
fn test_already_received() {
@ -651,13 +658,15 @@ mod tests {
let packet_hasher = PacketHasher::default();
// unique shred for (1, 5) should pass
assert!(!should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
// duplicate shred for (1, 5) blocked
assert!(should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
@ -674,13 +683,15 @@ mod tests {
);
// first duplicate shred for (1, 5) passed
assert!(!should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
// then blocked
assert!(should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
@ -697,12 +708,14 @@ mod tests {
);
// 2nd duplicate shred for (1, 5) blocked
assert!(should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
@ -710,13 +723,15 @@ mod tests {
let shred = Shred::new_from_parity_shard(slot, index, &[], 0, 1, 1, 0, version);
// Coding at (1, 5) passes
assert!(!should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
// then blocked
assert!(should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
@ -724,13 +739,15 @@ mod tests {
let shred = Shred::new_from_parity_shard(slot, index, &[], 2, 1, 1, 0, version);
// 2nd unique coding at (1, 5) passes
assert!(!should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
// same again is blocked
assert!(should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
@ -738,12 +755,14 @@ mod tests {
let shred = Shred::new_from_parity_shard(slot, index, &[], 3, 1, 1, 0, version);
// Another unique coding at (1, 5) always blocked
assert!(should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(should_skip_retransmit(
&shred,
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));

View File

@ -16,7 +16,7 @@ use {
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics},
leader_schedule_cache::LeaderScheduleCache,
shred::{Nonce, Shred},
shred::{self, Nonce, Shred},
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
@ -36,6 +36,7 @@ use {
},
};
type ShredPayload = Vec<u8>;
type DuplicateSlotSender = Sender<Slot>;
pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
@ -108,8 +109,8 @@ impl WindowServiceMetrics {
struct ReceiveWindowStats {
num_iters: usize,
num_packets: usize,
num_shreds: usize, // num_discards: num_packets - num_shreds
num_repairs: usize,
num_shreds: usize, // num_discards: num_packets - num_shreds
elapsed: Duration, // excludes waiting time on the receiver channel.
addrs: HashMap</*source:*/ SocketAddr, /*num packets:*/ usize>,
since: Option<Instant>,
@ -229,14 +230,14 @@ fn prune_shreds_invalid_repair(
}
fn run_insert<F>(
shred_receiver: &Receiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
shred_receiver: &Receiver<(Vec<ShredPayload>, Vec<Option<RepairMeta>>)>,
blockstore: &Blockstore,
leader_schedule_cache: &LeaderScheduleCache,
handle_duplicate: F,
metrics: &mut BlockstoreInsertionMetrics,
ws_metrics: &mut WindowServiceMetrics,
completed_data_sets_sender: &CompletedDataSetsSender,
retransmit_sender: &Sender<Vec<Shred>>,
retransmit_sender: &Sender<Vec<ShredPayload>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
) -> Result<()>
where
@ -253,7 +254,15 @@ where
shred_receiver_elapsed.stop();
ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
ws_metrics.num_shreds_received += shreds.len() as u64;
// TODO: Consider using thread-pool here instead of recv_window.
let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = shreds
.into_iter()
.zip(repair_infos)
.filter_map(|(shred, repair_info)| {
let shred = Shred::new_from_serialized_shred(shred).ok()?;
Some((shred, repair_info))
})
.unzip();
let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed");
let num_shreds = shreds.len();
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests);
@ -285,9 +294,9 @@ where
}
fn recv_window(
insert_shred_sender: &Sender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
insert_shred_sender: &Sender<(Vec<ShredPayload>, Vec<Option<RepairMeta>>)>,
verified_receiver: &Receiver<Vec<PacketBatch>>,
retransmit_sender: &Sender<Vec<Shred>>,
retransmit_sender: &Sender<Vec<ShredPayload>>,
turbine_disabled: &AtomicBool,
thread_pool: &ThreadPool,
stats: &mut ReceiveWindowStats,
@ -301,17 +310,16 @@ fn recv_window(
if turbine_disabled || packet.meta.discard() {
return None;
}
let serialized_shred = packet.data(..)?.to_vec();
let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?;
let shred = shred::layout::get_shred(packet)?;
if packet.meta.repair() {
let repair_info = RepairMeta {
_from_addr: packet.meta.socket_addr(),
// If can't parse the nonce, dump the packet.
nonce: repair_response::nonce(packet)?,
};
Some((shred, Some(repair_info)))
Some((shred.to_vec(), Some(repair_info)))
} else {
Some((shred, None))
Some((shred.to_vec(), None))
}
};
let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
@ -379,7 +387,7 @@ impl WindowService {
pub(crate) fn new(
blockstore: Arc<Blockstore>,
verified_receiver: Receiver<Vec<PacketBatch>>,
retransmit_sender: Sender<Vec<Shred>>,
retransmit_sender: Sender<Vec<ShredPayload>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
@ -482,10 +490,10 @@ impl WindowService {
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
insert_receiver: Receiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
insert_receiver: Receiver<(Vec<ShredPayload>, Vec<Option<RepairMeta>>)>,
check_duplicate_sender: Sender<Shred>,
completed_data_sets_sender: CompletedDataSetsSender,
retransmit_sender: Sender<Vec<Shred>>,
retransmit_sender: Sender<Vec<ShredPayload>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> JoinHandle<()> {
let mut handle_timeout = || {};
@ -540,10 +548,10 @@ impl WindowService {
fn start_recv_window_thread(
id: Pubkey,
exit: Arc<AtomicBool>,
insert_sender: Sender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
insert_sender: Sender<(Vec<ShredPayload>, Vec<Option<RepairMeta>>)>,
verified_receiver: Receiver<Vec<PacketBatch>>,
turbine_disabled: Arc<AtomicBool>,
retransmit_sender: Sender<Vec<Shred>>,
retransmit_sender: Sender<Vec<ShredPayload>>,
) -> JoinHandle<()> {
let mut stats = ReceiveWindowStats::default();
Builder::new()

View File

@ -798,7 +798,7 @@ impl Blockstore {
is_repaired: Vec<bool>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec<Shred>>>,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
handle_duplicate: &F,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)>
@ -937,7 +937,12 @@ impl Blockstore {
.collect();
if !recovered_data_shreds.is_empty() {
if let Some(retransmit_sender) = retransmit_sender {
let _ = retransmit_sender.send(recovered_data_shreds);
let _ = retransmit_sender.send(
recovered_data_shreds
.into_iter()
.map(Shred::into_payload)
.collect(),
);
}
}
}

View File

@ -215,7 +215,7 @@ pub enum Shred {
}
/// Tuple which uniquely identifies a shred should it exists.
#[derive(Clone, Copy, Eq, Hash, PartialEq)]
#[derive(Clone, Copy, Eq, Debug, Hash, PartialEq)]
pub struct ShredId(Slot, /*shred index:*/ u32, ShredType);
impl ShredId {
@ -223,6 +223,10 @@ impl ShredId {
ShredId(slot, index, shred_type)
}
pub fn slot(&self) -> Slot {
self.0
}
pub(crate) fn unwrap(&self) -> (Slot, /*shred index:*/ u32, ShredType) {
(self.0, self.1, self.2)
}
@ -549,17 +553,20 @@ pub mod layout {
ShredVariant::try_from(shred_variant).map_err(|_| Error::InvalidShredVariant)
}
#[inline]
pub(super) fn get_shred_type(shred: &[u8]) -> Result<ShredType, Error> {
let shred_variant = get_shred_variant(shred)?;
Ok(ShredType::from(shred_variant))
}
#[inline]
pub fn get_slot(shred: &[u8]) -> Option<Slot> {
<[u8; 8]>::try_from(shred.get(OFFSET_OF_SHRED_SLOT..)?.get(..8)?)
.map(Slot::from_le_bytes)
.ok()
}
#[inline]
pub(super) fn get_index(shred: &[u8]) -> Option<u32> {
<[u8; 4]>::try_from(shred.get(OFFSET_OF_SHRED_INDEX..)?.get(..4)?)
.map(u32::from_le_bytes)
@ -582,6 +589,15 @@ pub mod layout {
.ok()
}
#[inline]
pub fn get_shred_id(shred: &[u8]) -> Option<ShredId> {
Some(ShredId(
get_slot(shred)?,
get_index(shred)?,
get_shred_type(shred).ok()?,
))
}
// Returns slice range of the shred payload which is signed.
pub(crate) fn get_signed_message_range(shred: &[u8]) -> Option<Range<usize>> {
let range = match get_shred_variant(shred).ok()? {
@ -1219,9 +1235,13 @@ mod tests {
fn verify_shred_layout(shred: &Shred, packet: &Packet) {
let data = layout::get_shred(packet).unwrap();
assert_eq!(data, packet.data(..).unwrap());
assert_eq!(layout::get_slot(data), Some(shred.slot()));
assert_eq!(layout::get_index(data), Some(shred.index()));
assert_eq!(layout::get_version(data), Some(shred.version()));
assert_eq!(layout::get_shred_id(data), Some(shred.id()));
assert_eq!(layout::get_signature(data), Some(shred.signature()));
assert_eq!(layout::get_shred_type(data).unwrap(), shred.shred_type());
match shred.shred_type() {
ShredType::Code => {
assert_matches!(