diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index afdedbf239..a14e967aec 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -10,26 +10,41 @@ use solana_core::contact_info::ContactInfo; use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use solana_core::leader_schedule_cache::LeaderScheduleCache; use solana_core::packet::to_packets_chunked; -use solana_core::retransmit_stage::retransmit; +use solana_core::retransmit_stage::retransmitter; use solana_core::test_tx::test_tx; +use solana_measure::measure::Measure; use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::net::UdpSocket; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::channel; +use std::sync::Mutex; use std::sync::{Arc, RwLock}; +use std::thread::sleep; +use std::thread::Builder; +use std::time::Duration; use test::Bencher; #[bench] -fn bench_retransmit(bencher: &mut Bencher) { +fn bench_retransmitter(bencher: &mut Bencher) { solana_logger::setup(); let mut cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - const NUM_PEERS: usize = 2; + const NUM_PEERS: usize = 4; + let mut peer_sockets = Vec::new(); for _ in 0..NUM_PEERS { let id = Pubkey::new_rand(); - let contact_info = ContactInfo::new_localhost(&id, timestamp()); + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let mut contact_info = ContactInfo::new_localhost(&id, timestamp()); + contact_info.tvu = socket.local_addr().unwrap(); + contact_info.tvu.set_ip("127.0.0.1".parse().unwrap()); + contact_info.tvu_forwards = contact_info.tvu; + info!("local: {:?}", contact_info.tvu); cluster_info.insert_info(contact_info); + socket.set_nonblocking(true).unwrap(); + peer_sockets.push(socket); } + let peer_sockets = Arc::new(peer_sockets); let cluster_info = Arc::new(RwLock::new(cluster_info)); let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(100_000); @@ -38,28 +53,73 @@ fn bench_retransmit(bencher: &mut Bencher) { let bank = bank_forks.working_bank(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let (packet_sender, packet_receiver) = channel(); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket.set_nonblocking(true).unwrap(); + let packet_receiver = Arc::new(Mutex::new(packet_receiver)); + const NUM_THREADS: usize = 2; + let sockets = (0..NUM_THREADS) + .map(|_| UdpSocket::bind("0.0.0.0:0").unwrap()) + .collect(); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + // To work reliably with higher values, this needs larger udp rmem size let tx = test_tx(); - let len = 4096; - let chunk_size = 1024; - let batches = to_packets_chunked(&vec![tx; len], chunk_size); + const NUM_PACKETS: usize = 50; + let chunk_size = NUM_PACKETS / (4 * NUM_THREADS); + let batches = to_packets_chunked(&vec![tx; NUM_PACKETS], chunk_size); + info!("batches: {}", batches.len()); + let retransmitter_handles = retransmitter( + Arc::new(sockets), + bank_forks, + &leader_schedule_cache, + cluster_info, + packet_receiver, + ); + + let total = Arc::new(AtomicUsize::new(0)); bencher.iter(move || { + let peer_sockets1 = peer_sockets.clone(); + let handles: Vec<_> = (0..NUM_PEERS) + .into_iter() + .map(|p| { + let peer_sockets2 = peer_sockets1.clone(); + let total2 = total.clone(); + Builder::new() + .name("recv".to_string()) + .spawn(move || { + info!("{} waiting on {:?}", p, peer_sockets2[p]); + let mut buf = [0u8; 1024]; + loop { + while peer_sockets2[p].recv(&mut buf).is_ok() { + total2.fetch_add(1, Ordering::Relaxed); + } + if total2.load(Ordering::Relaxed) >= NUM_PACKETS { + break; + } + info!("{} recv", total2.load(Ordering::Relaxed)); + sleep(Duration::from_millis(1)); + } + }) + .unwrap() + }) + .collect(); + for packets in batches.clone() { packet_sender.send(packets).unwrap(); } info!("sent..."); - retransmit( - &bank_forks, - &leader_schedule_cache, - &cluster_info, - &packet_receiver, - &socket, - ) - .unwrap(); + + let mut join_time = Measure::start("join"); + for h in handles { + h.join().unwrap(); + } + join_time.stop(); + info!("took: {}ms", join_time.as_ms()); + + total.store(0, Ordering::Relaxed); }); + + for t in retransmitter_handles { + t.join().unwrap(); + } } diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 30ce606f72..95555153b9 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -31,7 +31,6 @@ use itertools::Itertools; use rand::SeedableRng; use rand::{thread_rng, Rng}; use rand_chacha::ChaChaRng; -use rayon::prelude::*; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; use solana_netutil::{ bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, @@ -747,7 +746,7 @@ impl ClusterInfo { ) -> Result<()> { trace!("retransmit orders {}", peers.len()); let errs: Vec<_> = peers - .par_iter() + .iter() .filter(|v| v.id != slot_leader_pubkey.unwrap_or_default()) .map(|v| { let dest = if forwarded { &v.tvu_forwards } else { &v.tvu }; @@ -1565,7 +1564,7 @@ pub struct Sockets { pub tpu_forwards: Vec, pub broadcast: UdpSocket, pub repair: UdpSocket, - pub retransmit: UdpSocket, + pub retransmit_sockets: Vec, pub storage: Option, } @@ -1613,7 +1612,7 @@ impl Node { tpu_forwards: vec![], broadcast, repair, - retransmit, + retransmit_sockets: vec![retransmit], storage: Some(storage), ip_echo: None, }, @@ -1634,7 +1633,7 @@ impl Node { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_pubsub_port); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); - let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); + let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let storage = UdpSocket::bind("0.0.0.0:0").unwrap(); let info = ContactInfo::new( pubkey, @@ -1659,7 +1658,7 @@ impl Node { tpu_forwards: vec![tpu_forwards], broadcast, repair, - retransmit, + retransmit_sockets: vec![retransmit_socket], storage: None, }, } @@ -1692,16 +1691,18 @@ impl Node { let (tvu_port, tvu_sockets) = multi_bind_in_range(port_range, 8).expect("tvu multi_bind"); let (tvu_forwards_port, tvu_forwards_sockets) = - multi_bind_in_range(port_range, 8).expect("tpu multi_bind"); + multi_bind_in_range(port_range, 8).expect("tvu_forwards multi_bind"); let (tpu_port, tpu_sockets) = multi_bind_in_range(port_range, 32).expect("tpu multi_bind"); let (tpu_forwards_port, tpu_forwards_sockets) = - multi_bind_in_range(port_range, 8).expect("tpu multi_bind"); + multi_bind_in_range(port_range, 8).expect("tpu_forwards multi_bind"); + + let (_, retransmit_sockets) = + multi_bind_in_range(port_range, 8).expect("retransmit multi_bind"); let (_, repair) = Self::bind(port_range); let (_, broadcast) = Self::bind(port_range); - let (_, retransmit) = Self::bind(port_range); let info = ContactInfo::new( pubkey, @@ -1727,7 +1728,7 @@ impl Node { tpu_forwards: tpu_forwards_sockets, broadcast, repair, - retransmit, + retransmit_sockets, storage: None, ip_echo: Some(ip_echo), }, @@ -1774,6 +1775,7 @@ mod tests { use crate::shred::max_ticks_per_n_shreds; use crate::shred::{DataShredHeader, Shred}; use crate::test_tx::test_tx; + use rayon::prelude::*; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::collections::HashSet; diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 32c55392b7..550b09a5a6 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -23,27 +23,38 @@ use std::{ sync::atomic::AtomicBool, sync::mpsc::channel, sync::mpsc::RecvTimeoutError, + sync::Mutex, sync::{Arc, RwLock}, thread::{self, Builder, JoinHandle}, time::Duration, }; -pub fn retransmit( +// Limit a given thread to consume about this many packets so that +// it doesn't pull up too much work. +const MAX_PACKET_BATCH_SIZE: usize = 100; + +fn retransmit( bank_forks: &Arc>, leader_schedule_cache: &Arc, cluster_info: &Arc>, - r: &PacketReceiver, + r: &Arc>, sock: &UdpSocket, + id: u32, ) -> Result<()> { let timer = Duration::new(1, 0); - let packets = r.recv_timeout(timer)?; + let r_lock = r.lock().unwrap(); + let packets = r_lock.recv_timeout(timer)?; let mut timer_start = Measure::start("retransmit"); let mut total_packets = packets.packets.len(); let mut packet_v = vec![packets]; - while let Ok(nq) = r.try_recv() { + while let Ok(nq) = r_lock.try_recv() { total_packets += nq.packets.len(); packet_v.push(nq); + if total_packets >= MAX_PACKET_BATCH_SIZE { + break; + } } + drop(r_lock); let r_bank = bank_forks.read().unwrap().working_bank(); let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot()); @@ -100,10 +111,11 @@ pub fn retransmit( } timer_start.stop(); debug!( - "retransmitted {} packets in {}ms retransmit_time: {}ms", + "retransmitted {} packets in {}ms retransmit_time: {}ms id: {}", total_packets, timer_start.as_ms(), - retransmit_total + retransmit_total, + id, ); datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64)); datapoint_debug!( @@ -124,39 +136,48 @@ pub fn retransmit( /// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip. /// * `recycler` - Blob recycler. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. -fn retransmitter( - sock: Arc, +pub fn retransmitter( + sockets: Arc>, bank_forks: Arc>, leader_schedule_cache: &Arc, cluster_info: Arc>, - r: PacketReceiver, -) -> JoinHandle<()> { - let bank_forks = bank_forks.clone(); - let leader_schedule_cache = leader_schedule_cache.clone(); - Builder::new() - .name("solana-retransmitter".to_string()) - .spawn(move || { - trace!("retransmitter started"); - loop { - if let Err(e) = retransmit( - &bank_forks, - &leader_schedule_cache, - &cluster_info, - &r, - &sock, - ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => { - inc_new_counter_error!("streamer-retransmit-error", 1, 1); + r: Arc>, +) -> Vec> { + (0..sockets.len()) + .map(|s| { + let sockets = sockets.clone(); + let bank_forks = bank_forks.clone(); + let leader_schedule_cache = leader_schedule_cache.clone(); + let r = r.clone(); + let cluster_info = cluster_info.clone(); + + Builder::new() + .name("solana-retransmitter".to_string()) + .spawn(move || { + trace!("retransmitter started"); + loop { + if let Err(e) = retransmit( + &bank_forks, + &leader_schedule_cache, + &cluster_info, + &r, + &sockets[s], + s as u32, + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => { + inc_new_counter_error!("streamer-retransmit-error", 1, 1); + } + } } } - } - } - trace!("exiting retransmitter"); + trace!("exiting retransmitter"); + }) + .unwrap() }) - .unwrap() + .collect() } pub struct RetransmitStage { @@ -172,7 +193,7 @@ impl RetransmitStage { leader_schedule_cache: &Arc, blocktree: Arc, cluster_info: &Arc>, - retransmit_socket: Arc, + retransmit_sockets: Arc>, repair_socket: Arc, fetch_stage_receiver: PacketReceiver, exit: &Arc, @@ -181,8 +202,9 @@ impl RetransmitStage { ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); + let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver)); let t_retransmit = retransmitter( - retransmit_socket, + retransmit_sockets, bank_forks.clone(), leader_schedule_cache, cluster_info.clone(), @@ -215,7 +237,7 @@ impl RetransmitStage { }, ); - let thread_hdls = vec![t_retransmit]; + let thread_hdls = t_retransmit; Self { thread_hdls, window_service, @@ -275,7 +297,7 @@ mod tests { let mut cluster_info = ClusterInfo::new_with_invalid_keypair(other); cluster_info.insert_info(me); - let retransmit_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap()); + let retransmit_socket = Arc::new(vec![UdpSocket::bind("0.0.0.0:0").unwrap()]); let cluster_info = Arc::new(RwLock::new(cluster_info)); let (retransmit_sender, retransmit_receiver) = channel(); @@ -284,7 +306,7 @@ mod tests { bank_forks, &leader_schedule_cache, cluster_info, - retransmit_receiver, + Arc::new(Mutex::new(retransmit_receiver)), ); let _thread_hdls = vec![t_retransmit]; diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 01e2a8426e..19eb7d3d72 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -49,7 +49,7 @@ pub struct Tvu { pub struct Sockets { pub fetch: Vec, pub repair: UdpSocket, - pub retransmit: UdpSocket, + pub retransmit: Vec, pub forwards: Vec, } @@ -92,7 +92,7 @@ impl Tvu { let Sockets { repair: repair_socket, fetch: fetch_sockets, - retransmit: retransmit_socket, + retransmit: retransmit_sockets, forwards: tvu_forward_sockets, } = sockets; @@ -118,7 +118,7 @@ impl Tvu { leader_schedule_cache, blocktree.clone(), &cluster_info, - Arc::new(retransmit_socket), + Arc::new(retransmit_sockets), repair_socket, fetch_receiver, &exit, @@ -270,7 +270,7 @@ pub mod tests { { Sockets { repair: target1.sockets.repair, - retransmit: target1.sockets.retransmit, + retransmit: target1.sockets.retransmit_sockets, fetch: target1.sockets.tvu, forwards: target1.sockets.tvu_forwards, } diff --git a/core/src/validator.rs b/core/src/validator.rs index e83da6879c..70fed44e6a 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -274,9 +274,10 @@ impl Validator { .expect("Failed to clone repair socket"), retransmit: node .sockets - .retransmit - .try_clone() - .expect("Failed to clone retransmit socket"), + .retransmit_sockets + .iter() + .map(|s| s.try_clone().expect("Failed to clone retransmit socket")) + .collect(), fetch: node .sockets .tvu @@ -378,7 +379,7 @@ impl Validator { ); info!( "local retransmit address: {}", - node.sockets.retransmit.local_addr().unwrap() + node.sockets.retransmit_sockets[0].local_addr().unwrap() ); } } diff --git a/validator/src/lib.rs b/validator/src/lib.rs index 5d9442489a..1e080cc032 100644 --- a/validator/src/lib.rs +++ b/validator/src/lib.rs @@ -575,7 +575,6 @@ pub fn main() { &node.sockets.gossip, &node.sockets.broadcast, &node.sockets.repair, - &node.sockets.retransmit, ]; let mut tcp_listeners: Vec<(_, _)> = tcp_ports