diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a9cf89e6d5..2e79c2e8d2 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -646,6 +646,7 @@ impl ClusterInfo { obj: &Arc>, peers: &[ContactInfo], blob: &SharedBlob, + slot_leader_id: Option, s: &UdpSocket, forwarded: bool, ) -> Result<()> { @@ -661,6 +662,7 @@ impl ClusterInfo { trace!("retransmit orders {}", orders.len()); let errs: Vec<_> = orders .par_iter() + .filter(|v| v.id != slot_leader_id.unwrap_or_default()) .map(|v| { debug!( "{}: retransmit blob {} to {} {}", @@ -686,19 +688,6 @@ impl ClusterInfo { Ok(()) } - /// retransmit messages from the leader to layer 1 nodes - /// # Remarks - /// We need to avoid having obj locked while doing any io, such as the `send_to` - pub fn retransmit( - obj: &Arc>, - blob: &SharedBlob, - s: &UdpSocket, - forwarded: bool, - ) -> Result<()> { - let peers = obj.read().unwrap().retransmit_peers(); - ClusterInfo::retransmit_to(obj, &peers, blob, s, forwarded) - } - fn send_orders( id: &Pubkey, s: &UdpSocket, diff --git a/core/src/replicator.rs b/core/src/replicator.rs index d9e98c186e..91ef9a9323 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -235,6 +235,7 @@ impl Replicator { let window_service = WindowService::new( None, //TODO: need a way to validate blobs... https://github.com/solana-labs/solana/issues/3924 + None, //TODO: see above ^ blocktree.clone(), cluster_info.clone(), blob_fetch_receiver, diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 99103311df..493f989bf6 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -5,6 +5,7 @@ use crate::blocktree::Blocktree; use crate::cluster_info::{ compute_retransmit_peers, ClusterInfo, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE, }; +use crate::leader_schedule_cache::LeaderScheduleCache; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; @@ -22,19 +23,20 @@ use std::time::Duration; fn retransmit( bank_forks: &Arc>, + leader_schedule_cache: &Arc, cluster_info: &Arc>, r: &BlobReceiver, sock: &UdpSocket, ) -> Result<()> { let timer = Duration::new(1, 0); - let mut dq = r.recv_timeout(timer)?; + let mut blobs = r.recv_timeout(timer)?; while let Ok(mut nq) = r.try_recv() { - dq.append(&mut nq); + blobs.append(&mut nq); } submit( influxdb::Point::new("retransmit-stage") - .add_field("count", influxdb::Value::Integer(dq.len() as i64)) + .add_field("count", influxdb::Value::Integer(blobs.len() as i64)) .to_owned(), ); let r_bank = bank_forks.read().unwrap().working_bank(); @@ -46,12 +48,14 @@ fn retransmit( NEIGHBORHOOD_SIZE, GROW_LAYER_CAPACITY, ); - for b in &dq { - if b.read().unwrap().meta.forward { - ClusterInfo::retransmit_to(&cluster_info, &neighbors, b, sock, true)?; - ClusterInfo::retransmit_to(&cluster_info, &children, b, sock, false)?; + for blob in &blobs { + let leader = leader_schedule_cache + .slot_leader_at_else_compute(blob.read().unwrap().slot(), r_bank.as_ref()); + if blob.read().unwrap().meta.forward { + ClusterInfo::retransmit_to(&cluster_info, &neighbors, blob, leader, sock, true)?; + ClusterInfo::retransmit_to(&cluster_info, &children, blob, leader, sock, false)?; } else { - ClusterInfo::retransmit_to(&cluster_info, &children, b, sock, true)?; + ClusterInfo::retransmit_to(&cluster_info, &children, blob, leader, sock, true)?; } } Ok(()) @@ -68,16 +72,24 @@ fn retransmit( fn retransmitter( sock: Arc, bank_forks: Arc>, + leader_schedule_cache: &Arc, cluster_info: Arc>, r: BlobReceiver, ) -> JoinHandle<()> { let bank_forks = bank_forks.clone(); + let leader_schedule_cache = leader_schedule_cache.clone(); Builder::new() .name("solana-retransmitter".to_string()) .spawn(move || { trace!("retransmitter started"); loop { - if let Err(e) = retransmit(&bank_forks, &cluster_info, &r, &sock) { + if let Err(e) = retransmit( + &bank_forks, + &leader_schedule_cache, + &cluster_info, + &r, + &sock, + ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -101,6 +113,7 @@ impl RetransmitStage { #[allow(clippy::new_ret_no_self)] pub fn new( bank_forks: Arc>, + leader_schedule_cache: &Arc, blocktree: Arc, cluster_info: &Arc>, retransmit_socket: Arc, @@ -113,11 +126,13 @@ impl RetransmitStage { let t_retransmit = retransmitter( retransmit_socket, bank_forks.clone(), + leader_schedule_cache, cluster_info.clone(), retransmit_receiver, ); let window_service = WindowService::new( Some(bank_forks), + Some(leader_schedule_cache.clone()), blocktree, cluster_info.clone(), fetch_stage_receiver, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 2bf0e30427..186b41dbe6 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -103,6 +103,7 @@ impl Tvu { //then sent to the window, which does the erasure coding reconstruction let retransmit_stage = RetransmitStage::new( bank_forks.clone(), + leader_schedule_cache, blocktree.clone(), &cluster_info, Arc::new(retransmit_socket), diff --git a/core/src/window_service.rs b/core/src/window_service.rs index a7557a73fc..4e22f590ff 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -4,6 +4,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; +use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_utils::slot_leader_at; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::repair_service::{RepairService, RepairSlotRange}; @@ -71,8 +72,19 @@ fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc) -> Result<()> /// drop blobs that are from myself or not from the correct leader for the /// blob's slot -fn should_retransmit_and_persist(blob: &Blob, bank: Option<&Arc>, my_id: &Pubkey) -> bool { - let slot_leader_id = bank.and_then(|bank| slot_leader_at(blob.slot(), &bank)); +fn should_retransmit_and_persist( + blob: &Blob, + bank: Option<&Arc>, + leader_schedule_cache: Option<&Arc>, + my_id: &Pubkey, +) -> bool { + let slot_leader_id = match bank { + None => leader_schedule_cache.and_then(|cache| cache.slot_leader_at(blob.slot())), + Some(bank) => match leader_schedule_cache { + None => slot_leader_at(blob.slot(), &bank), + Some(cache) => cache.slot_leader_at_else_compute(blob.slot(), bank), + }, + }; if blob.id() == *my_id { inc_new_counter_info!("streamer-recv_window-circular_transmission", 1); @@ -90,6 +102,7 @@ fn should_retransmit_and_persist(blob: &Blob, bank: Option<&Arc>, my_id: & fn recv_window( bank_forks: Option<&Arc>>, + leader_schedule_cache: Option<&Arc>, blocktree: &Arc, my_id: &Pubkey, r: &BlobReceiver, @@ -110,6 +123,7 @@ fn recv_window( bank_forks .map(|bank_forks| bank_forks.read().unwrap().working_bank()) .as_ref(), + leader_schedule_cache, my_id, ) }); @@ -154,6 +168,7 @@ pub struct WindowService { impl WindowService { pub fn new( bank_forks: Option>>, + leader_schedule_cache: Option>, blocktree: Arc, cluster_info: Arc>, r: BlobReceiver, @@ -171,6 +186,7 @@ impl WindowService { ); let exit = exit.clone(); let bank_forks = bank_forks.clone(); + let leader_schedule_cache = leader_schedule_cache.clone(); let t_window = Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -181,9 +197,14 @@ impl WindowService { if exit.load(Ordering::Relaxed) { break; } - if let Err(e) = - recv_window(bank_forks.as_ref(), &blocktree, &id, &r, &retransmit) - { + if let Err(e) = recv_window( + bank_forks.as_ref(), + leader_schedule_cache.as_ref(), + &blocktree, + &id, + &r, + &retransmit, + ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -263,23 +284,27 @@ mod test { let bank = Arc::new(Bank::new( &GenesisBlock::new_with_leader(100, &leader_id, 10).0, )); + let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let mut blob = Blob::default(); blob.set_id(&leader_id); // without a Bank and blobs not from me, blob continues - assert_eq!(should_retransmit_and_persist(&blob, None, &me_id), true); + assert_eq!( + should_retransmit_and_persist(&blob, None, None, &me_id), + true + ); // with a Bank for slot 0, blob continues assert_eq!( - should_retransmit_and_persist(&blob, Some(&bank), &me_id), + should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id), true ); // set the blob to have come from the wrong leader blob.set_id(&Pubkey::new_rand()); assert_eq!( - should_retransmit_and_persist(&blob, Some(&bank), &me_id), + should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id), false ); @@ -287,13 +312,16 @@ mod test { // TODO: persistr in blocktree that we didn't know who the leader was at the time? blob.set_slot(100); assert_eq!( - should_retransmit_and_persist(&blob, Some(&bank), &me_id), + should_retransmit_and_persist(&blob, Some(&bank), Some(&cache), &me_id), true ); // if the blob came back from me, it doesn't continue, whether or not I have a bank blob.set_id(&me_id); - assert_eq!(should_retransmit_and_persist(&blob, None, &me_id), false); + assert_eq!( + should_retransmit_and_persist(&blob, None, None, &me_id), + false + ); } #[test] @@ -315,11 +343,13 @@ mod test { let blocktree = Arc::new( Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"), ); + + let bank = Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let bank_forks = Some(Arc::new(RwLock::new(BankForks::new(0, bank)))); let t_window = WindowService::new( - Some(Arc::new(RwLock::new(BankForks::new( - 0, - Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0), - )))), + bank_forks, + Some(leader_schedule_cache), blocktree, subs, r_reader, @@ -391,11 +421,12 @@ mod test { let blocktree = Arc::new( Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"), ); + let bank = Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let bank_forks = Some(Arc::new(RwLock::new(BankForks::new(0, bank)))); let t_window = WindowService::new( - Some(Arc::new(RwLock::new(BankForks::new( - 0, - Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0), - )))), + bank_forks, + Some(leader_schedule_cache), blocktree, subs.clone(), r_reader, diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index dd90ac505e..c0cec2cc74 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -176,7 +176,8 @@ pub fn cluster_info_retransmit() -> result::Result<()> { assert!(done); let b = SharedBlob::default(); b.write().unwrap().meta.size = 10; - ClusterInfo::retransmit(&c1, &b, &tn1, false)?; + let peers = c1.read().unwrap().retransmit_peers(); + ClusterInfo::retransmit_to(&c1, &peers, &b, None, &tn1, false)?; let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| {