Fix Retransmit slamming the leader with its own blobs (#3938)
This commit is contained in:
parent
69e67d06a7
commit
43f7cd8149
|
@ -646,6 +646,7 @@ impl ClusterInfo {
|
|||
obj: &Arc<RwLock<Self>>,
|
||||
peers: &[ContactInfo],
|
||||
blob: &SharedBlob,
|
||||
slot_leader_id: Option<Pubkey>,
|
||||
s: &UdpSocket,
|
||||
forwarded: bool,
|
||||
) -> Result<()> {
|
||||
|
@ -661,6 +662,7 @@ impl ClusterInfo {
|
|||
trace!("retransmit orders {}", orders.len());
|
||||
let errs: Vec<_> = orders
|
||||
.par_iter()
|
||||
.filter(|v| v.id != slot_leader_id.unwrap_or_default())
|
||||
.map(|v| {
|
||||
debug!(
|
||||
"{}: retransmit blob {} to {} {}",
|
||||
|
@ -686,19 +688,6 @@ impl ClusterInfo {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// retransmit messages from the leader to layer 1 nodes
|
||||
/// # Remarks
|
||||
/// We need to avoid having obj locked while doing any io, such as the `send_to`
|
||||
pub fn retransmit(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
blob: &SharedBlob,
|
||||
s: &UdpSocket,
|
||||
forwarded: bool,
|
||||
) -> Result<()> {
|
||||
let peers = obj.read().unwrap().retransmit_peers();
|
||||
ClusterInfo::retransmit_to(obj, &peers, blob, s, forwarded)
|
||||
}
|
||||
|
||||
fn send_orders(
|
||||
id: &Pubkey,
|
||||
s: &UdpSocket,
|
||||
|
|
|
@ -235,6 +235,7 @@ impl Replicator {
|
|||
|
||||
let window_service = WindowService::new(
|
||||
None, //TODO: need a way to validate blobs... https://github.com/solana-labs/solana/issues/3924
|
||||
None, //TODO: see above ^
|
||||
blocktree.clone(),
|
||||
cluster_info.clone(),
|
||||
blob_fetch_receiver,
|
||||
|
|
|
@ -5,6 +5,7 @@ use crate::blocktree::Blocktree;
|
|||
use crate::cluster_info::{
|
||||
compute_retransmit_peers, ClusterInfo, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE,
|
||||
};
|
||||
use crate::leader_schedule_cache::LeaderScheduleCache;
|
||||
use crate::result::{Error, Result};
|
||||
use crate::service::Service;
|
||||
use crate::staking_utils;
|
||||
|
@ -22,19 +23,20 @@ use std::time::Duration;
|
|||
|
||||
fn retransmit(
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
r: &BlobReceiver,
|
||||
sock: &UdpSocket,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let mut dq = r.recv_timeout(timer)?;
|
||||
let mut blobs = r.recv_timeout(timer)?;
|
||||
while let Ok(mut nq) = r.try_recv() {
|
||||
dq.append(&mut nq);
|
||||
blobs.append(&mut nq);
|
||||
}
|
||||
|
||||
submit(
|
||||
influxdb::Point::new("retransmit-stage")
|
||||
.add_field("count", influxdb::Value::Integer(dq.len() as i64))
|
||||
.add_field("count", influxdb::Value::Integer(blobs.len() as i64))
|
||||
.to_owned(),
|
||||
);
|
||||
let r_bank = bank_forks.read().unwrap().working_bank();
|
||||
|
@ -46,12 +48,14 @@ fn retransmit(
|
|||
NEIGHBORHOOD_SIZE,
|
||||
GROW_LAYER_CAPACITY,
|
||||
);
|
||||
for b in &dq {
|
||||
if b.read().unwrap().meta.forward {
|
||||
ClusterInfo::retransmit_to(&cluster_info, &neighbors, b, sock, true)?;
|
||||
ClusterInfo::retransmit_to(&cluster_info, &children, b, sock, false)?;
|
||||
for blob in &blobs {
|
||||
let leader = leader_schedule_cache
|
||||
.slot_leader_at_else_compute(blob.read().unwrap().slot(), r_bank.as_ref());
|
||||
if blob.read().unwrap().meta.forward {
|
||||
ClusterInfo::retransmit_to(&cluster_info, &neighbors, blob, leader, sock, true)?;
|
||||
ClusterInfo::retransmit_to(&cluster_info, &children, blob, leader, sock, false)?;
|
||||
} else {
|
||||
ClusterInfo::retransmit_to(&cluster_info, &children, b, sock, true)?;
|
||||
ClusterInfo::retransmit_to(&cluster_info, &children, blob, leader, sock, true)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
@ -68,16 +72,24 @@ fn retransmit(
|
|||
fn retransmitter(
|
||||
sock: Arc<UdpSocket>,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||
r: BlobReceiver,
|
||||
) -> JoinHandle<()> {
|
||||
let bank_forks = bank_forks.clone();
|
||||
let leader_schedule_cache = leader_schedule_cache.clone();
|
||||
Builder::new()
|
||||
.name("solana-retransmitter".to_string())
|
||||
.spawn(move || {
|
||||
trace!("retransmitter started");
|
||||
loop {
|
||||
if let Err(e) = retransmit(&bank_forks, &cluster_info, &r, &sock) {
|
||||
if let Err(e) = retransmit(
|
||||
&bank_forks,
|
||||
&leader_schedule_cache,
|
||||
&cluster_info,
|
||||
&r,
|
||||
&sock,
|
||||
) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
|
@ -101,6 +113,7 @@ impl RetransmitStage {
|
|||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new(
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
blocktree: Arc<Blocktree>,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
retransmit_socket: Arc<UdpSocket>,
|
||||
|
@ -113,11 +126,13 @@ impl RetransmitStage {
|
|||
let t_retransmit = retransmitter(
|
||||
retransmit_socket,
|
||||
bank_forks.clone(),
|
||||
leader_schedule_cache,
|
||||
cluster_info.clone(),
|
||||
retransmit_receiver,
|
||||
);
|
||||
let window_service = WindowService::new(
|
||||
Some(bank_forks),
|
||||
Some(leader_schedule_cache.clone()),
|
||||
blocktree,
|
||||
cluster_info.clone(),
|
||||
fetch_stage_receiver,
|
||||
|
|
|
@ -103,6 +103,7 @@ impl Tvu {
|
|||
//then sent to the window, which does the erasure coding reconstruction
|
||||
let retransmit_stage = RetransmitStage::new(
|
||||
bank_forks.clone(),
|
||||
leader_schedule_cache,
|
||||
blocktree.clone(),
|
||||
&cluster_info,
|
||||
Arc::new(retransmit_socket),
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
use crate::bank_forks::BankForks;
|
||||
use crate::blocktree::Blocktree;
|
||||
use crate::cluster_info::ClusterInfo;
|
||||
use crate::leader_schedule_cache::LeaderScheduleCache;
|
||||
use crate::leader_schedule_utils::slot_leader_at;
|
||||
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
|
||||
use crate::repair_service::{RepairService, RepairSlotRange};
|
||||
|
@ -71,8 +72,19 @@ fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc<Blocktree>) -> Result<()>
|
|||
|
||||
/// drop blobs that are from myself or not from the correct leader for the
|
||||
/// blob's slot
|
||||
fn should_retransmit_and_persist(blob: &Blob, bank: Option<&Arc<Bank>>, my_id: &Pubkey) -> bool {
|
||||
let slot_leader_id = bank.and_then(|bank| slot_leader_at(blob.slot(), &bank));
|
||||
fn should_retransmit_and_persist(
|
||||
blob: &Blob,
|
||||
bank: Option<&Arc<Bank>>,
|
||||
leader_schedule_cache: Option<&Arc<LeaderScheduleCache>>,
|
||||
my_id: &Pubkey,
|
||||
) -> bool {
|
||||
let slot_leader_id = match bank {
|
||||
None => leader_schedule_cache.and_then(|cache| cache.slot_leader_at(blob.slot())),
|
||||
Some(bank) => match leader_schedule_cache {
|
||||
None => slot_leader_at(blob.slot(), &bank),
|
||||
Some(cache) => cache.slot_leader_at_else_compute(blob.slot(), bank),
|
||||
},
|
||||
};
|
||||
|
||||
if blob.id() == *my_id {
|
||||
inc_new_counter_info!("streamer-recv_window-circular_transmission", 1);
|
||||
|
@ -90,6 +102,7 @@ fn should_retransmit_and_persist(blob: &Blob, bank: Option<&Arc<Bank>>, my_id: &
|
|||
|
||||
fn recv_window(
|
||||
bank_forks: Option<&Arc<RwLock<BankForks>>>,
|
||||
leader_schedule_cache: Option<&Arc<LeaderScheduleCache>>,
|
||||
blocktree: &Arc<Blocktree>,
|
||||
my_id: &Pubkey,
|
||||
r: &BlobReceiver,
|
||||
|
@ -110,6 +123,7 @@ fn recv_window(
|
|||
bank_forks
|
||||
.map(|bank_forks| bank_forks.read().unwrap().working_bank())
|
||||
.as_ref(),
|
||||
leader_schedule_cache,
|
||||
my_id,
|
||||
)
|
||||
});
|
||||
|
@ -154,6 +168,7 @@ pub struct WindowService {
|
|||
impl WindowService {
|
||||
pub fn new(
|
||||
bank_forks: Option<Arc<RwLock<BankForks>>>,
|
||||
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
|
||||
blocktree: Arc<Blocktree>,
|
||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||
r: BlobReceiver,
|
||||
|
@ -171,6 +186,7 @@ impl WindowService {
|
|||
);
|
||||
let exit = exit.clone();
|
||||
let bank_forks = bank_forks.clone();
|
||||
let leader_schedule_cache = leader_schedule_cache.clone();
|
||||
let t_window = Builder::new()
|
||||
.name("solana-window".to_string())
|
||||
.spawn(move || {
|
||||
|
@ -181,9 +197,14 @@ impl WindowService {
|
|||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
if let Err(e) =
|
||||
recv_window(bank_forks.as_ref(), &blocktree, &id, &r, &retransmit)
|
||||
{
|
||||
if let Err(e) = recv_window(
|
||||
bank_forks.as_ref(),
|
||||
leader_schedule_cache.as_ref(),
|
||||
&blocktree,
|
||||
&id,
|
||||
&r,
|
||||
&retransmit,
|
||||
) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
|
@ -263,23 +284,27 @@ mod test {
|
|||
let bank = Arc::new(Bank::new(
|
||||
&GenesisBlock::new_with_leader(100, &leader_id, 10).0,
|
||||
));
|
||||
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
|
||||
|
||||
let mut blob = Blob::default();
|
||||
blob.set_id(&leader_id);
|
||||
|
||||
// without a Bank and blobs not from me, blob continues
|
||||
assert_eq!(should_retransmit_and_persist(&blob, None, &me_id), true);
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&blob, None, None, &me_id),
|
||||
true
|
||||
);
|
||||
|
||||
// with a Bank for slot 0, blob continues
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&blob, Some(&bank), &me_id),
|
||||
should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id),
|
||||
true
|
||||
);
|
||||
|
||||
// set the blob to have come from the wrong leader
|
||||
blob.set_id(&Pubkey::new_rand());
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&blob, Some(&bank), &me_id),
|
||||
should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id),
|
||||
false
|
||||
);
|
||||
|
||||
|
@ -287,13 +312,16 @@ mod test {
|
|||
// TODO: persistr in blocktree that we didn't know who the leader was at the time?
|
||||
blob.set_slot(100);
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&blob, Some(&bank), &me_id),
|
||||
should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id),
|
||||
true
|
||||
);
|
||||
|
||||
// if the blob came back from me, it doesn't continue, whether or not I have a bank
|
||||
blob.set_id(&me_id);
|
||||
assert_eq!(should_retransmit_and_persist(&blob, None, &me_id), false);
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&blob, None, None, &me_id),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -315,11 +343,13 @@ mod test {
|
|||
let blocktree = Arc::new(
|
||||
Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"),
|
||||
);
|
||||
|
||||
let bank = Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0);
|
||||
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
|
||||
let bank_forks = Some(Arc::new(RwLock::new(BankForks::new(0, bank))));
|
||||
let t_window = WindowService::new(
|
||||
Some(Arc::new(RwLock::new(BankForks::new(
|
||||
0,
|
||||
Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0),
|
||||
)))),
|
||||
bank_forks,
|
||||
Some(leader_schedule_cache),
|
||||
blocktree,
|
||||
subs,
|
||||
r_reader,
|
||||
|
@ -391,11 +421,12 @@ mod test {
|
|||
let blocktree = Arc::new(
|
||||
Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"),
|
||||
);
|
||||
let bank = Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0);
|
||||
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
|
||||
let bank_forks = Some(Arc::new(RwLock::new(BankForks::new(0, bank))));
|
||||
let t_window = WindowService::new(
|
||||
Some(Arc::new(RwLock::new(BankForks::new(
|
||||
0,
|
||||
Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0),
|
||||
)))),
|
||||
bank_forks,
|
||||
Some(leader_schedule_cache),
|
||||
blocktree,
|
||||
subs.clone(),
|
||||
r_reader,
|
||||
|
|
|
@ -176,7 +176,8 @@ pub fn cluster_info_retransmit() -> result::Result<()> {
|
|||
assert!(done);
|
||||
let b = SharedBlob::default();
|
||||
b.write().unwrap().meta.size = 10;
|
||||
ClusterInfo::retransmit(&c1, &b, &tn1, false)?;
|
||||
let peers = c1.read().unwrap().retransmit_peers();
|
||||
ClusterInfo::retransmit_to(&c1, &peers, &b, None, &tn1, false)?;
|
||||
let res: Vec<_> = [tn1, tn2, tn3]
|
||||
.into_par_iter()
|
||||
.map(|s| {
|
||||
|
|
Loading…
Reference in New Issue