adds fallback logic if retransmit multicast fails (#17714)
In retransmit-stage, based on the packet.meta.seed and resulting children/neighbors, each packet is sent to a different set of peers: https://github.com/solana-labs/solana/blob/708bbcb00/core/src/retransmit_stage.rs#L421-L457 However, current code errors out as soon as a multicast call fails, which will skip all the remaining packets: https://github.com/solana-labs/solana/blob/708bbcb00/core/src/retransmit_stage.rs#L467-L470 This can exacerbate packets loss in turbine. This commit: * keeps iterating over retransmit packets for loop even if some intermediate sends fail. * adds a fallback to UdpSocket::send_to if multicast fails. Recent discord chat: https://discord.com/channels/428295358100013066/689412830075551748/849530845052403733
This commit is contained in:
parent
bea7ce717c
commit
be957f25c9
|
@ -465,9 +465,9 @@ fn retransmit(
|
|||
|
||||
let mut retransmit_time = Measure::start("retransmit_to");
|
||||
if !packet.meta.forward {
|
||||
ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?;
|
||||
ClusterInfo::retransmit_to(&neighbors, packet, sock, true);
|
||||
}
|
||||
ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?;
|
||||
ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward);
|
||||
retransmit_time.stop();
|
||||
retransmit_total += retransmit_time.as_us();
|
||||
}
|
||||
|
|
|
@ -1382,12 +1382,7 @@ impl ClusterInfo {
|
|||
/// retransmit messages to a list of nodes
|
||||
/// # Remarks
|
||||
/// We need to avoid having obj locked while doing a io, such as the `send_to`
|
||||
pub fn retransmit_to(
|
||||
peers: &[&ContactInfo],
|
||||
packet: &Packet,
|
||||
s: &UdpSocket,
|
||||
forwarded: bool,
|
||||
) -> Result<(), GossipError> {
|
||||
pub fn retransmit_to(peers: &[&ContactInfo], packet: &Packet, s: &UdpSocket, forwarded: bool) {
|
||||
trace!("retransmit orders {}", peers.len());
|
||||
let dests: Vec<_> = if forwarded {
|
||||
peers
|
||||
|
@ -1398,22 +1393,28 @@ impl ClusterInfo {
|
|||
} else {
|
||||
peers.iter().map(|peer| &peer.tvu).collect()
|
||||
};
|
||||
let mut sent = 0;
|
||||
while sent < dests.len() {
|
||||
match multicast(s, &packet.data[..packet.meta.size], &dests[sent..]) {
|
||||
Ok(n) => sent += n,
|
||||
Err(e) => {
|
||||
inc_new_counter_error!(
|
||||
"cluster_info-retransmit-send_to_error",
|
||||
dests.len() - sent,
|
||||
1
|
||||
);
|
||||
error!("retransmit result {:?}", e);
|
||||
return Err(GossipError::Io(e));
|
||||
let mut dests = &dests[..];
|
||||
let data = &packet.data[..packet.meta.size];
|
||||
while !dests.is_empty() {
|
||||
match multicast(s, data, dests) {
|
||||
Ok(n) => dests = &dests[n..],
|
||||
Err(err) => {
|
||||
inc_new_counter_error!("cluster_info-retransmit-send_to_error", dests.len(), 1);
|
||||
error!("retransmit multicast: {:?}", err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
let mut errs = 0;
|
||||
for dest in dests {
|
||||
if let Err(err) = s.send_to(data, dest) {
|
||||
error!("retransmit send: {}, {:?}", dest, err);
|
||||
errs += 1;
|
||||
}
|
||||
}
|
||||
if errs != 0 {
|
||||
inc_new_counter_error!("cluster_info-retransmit-error", errs, 1);
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_self(&self) {
|
||||
|
|
|
@ -209,7 +209,7 @@ pub fn cluster_info_retransmit() {
|
|||
p.meta.size = 10;
|
||||
let peers = c1.tvu_peers();
|
||||
let retransmit_peers: Vec<_> = peers.iter().collect();
|
||||
ClusterInfo::retransmit_to(&retransmit_peers, &p, &tn1, false).unwrap();
|
||||
ClusterInfo::retransmit_to(&retransmit_peers, &p, &tn1, false);
|
||||
let res: Vec<_> = [tn1, tn2, tn3]
|
||||
.into_par_iter()
|
||||
.map(|s| {
|
||||
|
|
Loading…
Reference in New Issue