Use multiple retransmit stage threads/sockets (#6279)

This commit is contained in:
sakridge 2019-10-10 13:24:03 -07:00 committed by GitHub
parent 570b98c7bc
commit 1b775044f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 158 additions and 74 deletions

View File

@ -10,26 +10,41 @@ use solana_core::contact_info::ContactInfo;
use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use solana_core::leader_schedule_cache::LeaderScheduleCache; use solana_core::leader_schedule_cache::LeaderScheduleCache;
use solana_core::packet::to_packets_chunked; use solana_core::packet::to_packets_chunked;
use solana_core::retransmit_stage::retransmit; use solana_core::retransmit_stage::retransmitter;
use solana_core::test_tx::test_tx; use solana_core::test_tx::test_tx;
use solana_measure::measure::Measure;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp; use solana_sdk::timing::timestamp;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Mutex;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::Builder;
use std::time::Duration;
use test::Bencher; use test::Bencher;
#[bench] #[bench]
fn bench_retransmit(bencher: &mut Bencher) { fn bench_retransmitter(bencher: &mut Bencher) {
solana_logger::setup(); solana_logger::setup();
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let mut cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
const NUM_PEERS: usize = 2; const NUM_PEERS: usize = 4;
let mut peer_sockets = Vec::new();
for _ in 0..NUM_PEERS { for _ in 0..NUM_PEERS {
let id = Pubkey::new_rand(); let id = Pubkey::new_rand();
let contact_info = ContactInfo::new_localhost(&id, timestamp()); let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut contact_info = ContactInfo::new_localhost(&id, timestamp());
contact_info.tvu = socket.local_addr().unwrap();
contact_info.tvu.set_ip("127.0.0.1".parse().unwrap());
contact_info.tvu_forwards = contact_info.tvu;
info!("local: {:?}", contact_info.tvu);
cluster_info.insert_info(contact_info); cluster_info.insert_info(contact_info);
socket.set_nonblocking(true).unwrap();
peer_sockets.push(socket);
} }
let peer_sockets = Arc::new(peer_sockets);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(100_000); let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(100_000);
@ -38,28 +53,73 @@ fn bench_retransmit(bencher: &mut Bencher) {
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let packet_receiver = Arc::new(Mutex::new(packet_receiver));
socket.set_nonblocking(true).unwrap(); const NUM_THREADS: usize = 2;
let sockets = (0..NUM_THREADS)
.map(|_| UdpSocket::bind("0.0.0.0:0").unwrap())
.collect();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
// To work reliably with higher values, this needs larger udp rmem size
let tx = test_tx(); let tx = test_tx();
let len = 4096; const NUM_PACKETS: usize = 50;
let chunk_size = 1024; let chunk_size = NUM_PACKETS / (4 * NUM_THREADS);
let batches = to_packets_chunked(&vec![tx; len], chunk_size); let batches = to_packets_chunked(&vec![tx; NUM_PACKETS], chunk_size);
info!("batches: {}", batches.len());
let retransmitter_handles = retransmitter(
Arc::new(sockets),
bank_forks,
&leader_schedule_cache,
cluster_info,
packet_receiver,
);
let total = Arc::new(AtomicUsize::new(0));
bencher.iter(move || { bencher.iter(move || {
let peer_sockets1 = peer_sockets.clone();
let handles: Vec<_> = (0..NUM_PEERS)
.into_iter()
.map(|p| {
let peer_sockets2 = peer_sockets1.clone();
let total2 = total.clone();
Builder::new()
.name("recv".to_string())
.spawn(move || {
info!("{} waiting on {:?}", p, peer_sockets2[p]);
let mut buf = [0u8; 1024];
loop {
while peer_sockets2[p].recv(&mut buf).is_ok() {
total2.fetch_add(1, Ordering::Relaxed);
}
if total2.load(Ordering::Relaxed) >= NUM_PACKETS {
break;
}
info!("{} recv", total2.load(Ordering::Relaxed));
sleep(Duration::from_millis(1));
}
})
.unwrap()
})
.collect();
for packets in batches.clone() { for packets in batches.clone() {
packet_sender.send(packets).unwrap(); packet_sender.send(packets).unwrap();
} }
info!("sent..."); info!("sent...");
retransmit(
&bank_forks, let mut join_time = Measure::start("join");
&leader_schedule_cache, for h in handles {
&cluster_info, h.join().unwrap();
&packet_receiver, }
&socket, join_time.stop();
) info!("took: {}ms", join_time.as_ms());
.unwrap();
total.store(0, Ordering::Relaxed);
}); });
for t in retransmitter_handles {
t.join().unwrap();
}
} }

View File

@ -31,7 +31,6 @@ use itertools::Itertools;
use rand::SeedableRng; use rand::SeedableRng;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use rand_chacha::ChaChaRng; use rand_chacha::ChaChaRng;
use rayon::prelude::*;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
use solana_netutil::{ use solana_netutil::{
bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range,
@ -747,7 +746,7 @@ impl ClusterInfo {
) -> Result<()> { ) -> Result<()> {
trace!("retransmit orders {}", peers.len()); trace!("retransmit orders {}", peers.len());
let errs: Vec<_> = peers let errs: Vec<_> = peers
.par_iter() .iter()
.filter(|v| v.id != slot_leader_pubkey.unwrap_or_default()) .filter(|v| v.id != slot_leader_pubkey.unwrap_or_default())
.map(|v| { .map(|v| {
let dest = if forwarded { &v.tvu_forwards } else { &v.tvu }; let dest = if forwarded { &v.tvu_forwards } else { &v.tvu };
@ -1565,7 +1564,7 @@ pub struct Sockets {
pub tpu_forwards: Vec<UdpSocket>, pub tpu_forwards: Vec<UdpSocket>,
pub broadcast: UdpSocket, pub broadcast: UdpSocket,
pub repair: UdpSocket, pub repair: UdpSocket,
pub retransmit: UdpSocket, pub retransmit_sockets: Vec<UdpSocket>,
pub storage: Option<UdpSocket>, pub storage: Option<UdpSocket>,
} }
@ -1613,7 +1612,7 @@ impl Node {
tpu_forwards: vec![], tpu_forwards: vec![],
broadcast, broadcast,
repair, repair,
retransmit, retransmit_sockets: vec![retransmit],
storage: Some(storage), storage: Some(storage),
ip_echo: None, ip_echo: None,
}, },
@ -1634,7 +1633,7 @@ impl Node {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_pubsub_port); SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_pubsub_port);
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let storage = UdpSocket::bind("0.0.0.0:0").unwrap(); let storage = UdpSocket::bind("0.0.0.0:0").unwrap();
let info = ContactInfo::new( let info = ContactInfo::new(
pubkey, pubkey,
@ -1659,7 +1658,7 @@ impl Node {
tpu_forwards: vec![tpu_forwards], tpu_forwards: vec![tpu_forwards],
broadcast, broadcast,
repair, repair,
retransmit, retransmit_sockets: vec![retransmit_socket],
storage: None, storage: None,
}, },
} }
@ -1692,16 +1691,18 @@ impl Node {
let (tvu_port, tvu_sockets) = multi_bind_in_range(port_range, 8).expect("tvu multi_bind"); let (tvu_port, tvu_sockets) = multi_bind_in_range(port_range, 8).expect("tvu multi_bind");
let (tvu_forwards_port, tvu_forwards_sockets) = let (tvu_forwards_port, tvu_forwards_sockets) =
multi_bind_in_range(port_range, 8).expect("tpu multi_bind"); multi_bind_in_range(port_range, 8).expect("tvu_forwards multi_bind");
let (tpu_port, tpu_sockets) = multi_bind_in_range(port_range, 32).expect("tpu multi_bind"); let (tpu_port, tpu_sockets) = multi_bind_in_range(port_range, 32).expect("tpu multi_bind");
let (tpu_forwards_port, tpu_forwards_sockets) = let (tpu_forwards_port, tpu_forwards_sockets) =
multi_bind_in_range(port_range, 8).expect("tpu multi_bind"); multi_bind_in_range(port_range, 8).expect("tpu_forwards multi_bind");
let (_, retransmit_sockets) =
multi_bind_in_range(port_range, 8).expect("retransmit multi_bind");
let (_, repair) = Self::bind(port_range); let (_, repair) = Self::bind(port_range);
let (_, broadcast) = Self::bind(port_range); let (_, broadcast) = Self::bind(port_range);
let (_, retransmit) = Self::bind(port_range);
let info = ContactInfo::new( let info = ContactInfo::new(
pubkey, pubkey,
@ -1727,7 +1728,7 @@ impl Node {
tpu_forwards: tpu_forwards_sockets, tpu_forwards: tpu_forwards_sockets,
broadcast, broadcast,
repair, repair,
retransmit, retransmit_sockets,
storage: None, storage: None,
ip_echo: Some(ip_echo), ip_echo: Some(ip_echo),
}, },
@ -1774,6 +1775,7 @@ mod tests {
use crate::shred::max_ticks_per_n_shreds; use crate::shred::max_ticks_per_n_shreds;
use crate::shred::{DataShredHeader, Shred}; use crate::shred::{DataShredHeader, Shred};
use crate::test_tx::test_tx; use crate::test_tx::test_tx;
use rayon::prelude::*;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::collections::HashSet; use std::collections::HashSet;

View File

@ -23,27 +23,38 @@ use std::{
sync::atomic::AtomicBool, sync::atomic::AtomicBool,
sync::mpsc::channel, sync::mpsc::channel,
sync::mpsc::RecvTimeoutError, sync::mpsc::RecvTimeoutError,
sync::Mutex,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration, time::Duration,
}; };
pub fn retransmit( // Limit a given thread to consume about this many packets so that
// it doesn't pull up too much work.
const MAX_PACKET_BATCH_SIZE: usize = 100;
fn retransmit(
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
r: &PacketReceiver, r: &Arc<Mutex<PacketReceiver>>,
sock: &UdpSocket, sock: &UdpSocket,
id: u32,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let packets = r.recv_timeout(timer)?; let r_lock = r.lock().unwrap();
let packets = r_lock.recv_timeout(timer)?;
let mut timer_start = Measure::start("retransmit"); let mut timer_start = Measure::start("retransmit");
let mut total_packets = packets.packets.len(); let mut total_packets = packets.packets.len();
let mut packet_v = vec![packets]; let mut packet_v = vec![packets];
while let Ok(nq) = r.try_recv() { while let Ok(nq) = r_lock.try_recv() {
total_packets += nq.packets.len(); total_packets += nq.packets.len();
packet_v.push(nq); packet_v.push(nq);
if total_packets >= MAX_PACKET_BATCH_SIZE {
break;
}
} }
drop(r_lock);
let r_bank = bank_forks.read().unwrap().working_bank(); let r_bank = bank_forks.read().unwrap().working_bank();
let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot()); let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot());
@ -100,10 +111,11 @@ pub fn retransmit(
} }
timer_start.stop(); timer_start.stop();
debug!( debug!(
"retransmitted {} packets in {}ms retransmit_time: {}ms", "retransmitted {} packets in {}ms retransmit_time: {}ms id: {}",
total_packets, total_packets,
timer_start.as_ms(), timer_start.as_ms(),
retransmit_total retransmit_total,
id,
); );
datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64)); datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64));
datapoint_debug!( datapoint_debug!(
@ -124,39 +136,48 @@ pub fn retransmit(
/// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip. /// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip.
/// * `recycler` - Blob recycler. /// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
fn retransmitter( pub fn retransmitter(
sock: Arc<UdpSocket>, sockets: Arc<Vec<UdpSocket>>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
r: PacketReceiver, r: Arc<Mutex<PacketReceiver>>,
) -> JoinHandle<()> { ) -> Vec<JoinHandle<()>> {
let bank_forks = bank_forks.clone(); (0..sockets.len())
let leader_schedule_cache = leader_schedule_cache.clone(); .map(|s| {
Builder::new() let sockets = sockets.clone();
.name("solana-retransmitter".to_string()) let bank_forks = bank_forks.clone();
.spawn(move || { let leader_schedule_cache = leader_schedule_cache.clone();
trace!("retransmitter started"); let r = r.clone();
loop { let cluster_info = cluster_info.clone();
if let Err(e) = retransmit(
&bank_forks, Builder::new()
&leader_schedule_cache, .name("solana-retransmitter".to_string())
&cluster_info, .spawn(move || {
&r, trace!("retransmitter started");
&sock, loop {
) { if let Err(e) = retransmit(
match e { &bank_forks,
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, &leader_schedule_cache,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), &cluster_info,
_ => { &r,
inc_new_counter_error!("streamer-retransmit-error", 1, 1); &sockets[s],
s as u32,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_error!("streamer-retransmit-error", 1, 1);
}
}
} }
} }
} trace!("exiting retransmitter");
} })
trace!("exiting retransmitter"); .unwrap()
}) })
.unwrap() .collect()
} }
pub struct RetransmitStage { pub struct RetransmitStage {
@ -172,7 +193,7 @@ impl RetransmitStage {
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
blocktree: Arc<Blocktree>, blocktree: Arc<Blocktree>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
retransmit_socket: Arc<UdpSocket>, retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: PacketReceiver, fetch_stage_receiver: PacketReceiver,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
@ -181,8 +202,9 @@ impl RetransmitStage {
) -> Self { ) -> Self {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver));
let t_retransmit = retransmitter( let t_retransmit = retransmitter(
retransmit_socket, retransmit_sockets,
bank_forks.clone(), bank_forks.clone(),
leader_schedule_cache, leader_schedule_cache,
cluster_info.clone(), cluster_info.clone(),
@ -215,7 +237,7 @@ impl RetransmitStage {
}, },
); );
let thread_hdls = vec![t_retransmit]; let thread_hdls = t_retransmit;
Self { Self {
thread_hdls, thread_hdls,
window_service, window_service,
@ -275,7 +297,7 @@ mod tests {
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(other); let mut cluster_info = ClusterInfo::new_with_invalid_keypair(other);
cluster_info.insert_info(me); cluster_info.insert_info(me);
let retransmit_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap()); let retransmit_socket = Arc::new(vec![UdpSocket::bind("0.0.0.0:0").unwrap()]);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
@ -284,7 +306,7 @@ mod tests {
bank_forks, bank_forks,
&leader_schedule_cache, &leader_schedule_cache,
cluster_info, cluster_info,
retransmit_receiver, Arc::new(Mutex::new(retransmit_receiver)),
); );
let _thread_hdls = vec![t_retransmit]; let _thread_hdls = vec![t_retransmit];

View File

@ -49,7 +49,7 @@ pub struct Tvu {
pub struct Sockets { pub struct Sockets {
pub fetch: Vec<UdpSocket>, pub fetch: Vec<UdpSocket>,
pub repair: UdpSocket, pub repair: UdpSocket,
pub retransmit: UdpSocket, pub retransmit: Vec<UdpSocket>,
pub forwards: Vec<UdpSocket>, pub forwards: Vec<UdpSocket>,
} }
@ -92,7 +92,7 @@ impl Tvu {
let Sockets { let Sockets {
repair: repair_socket, repair: repair_socket,
fetch: fetch_sockets, fetch: fetch_sockets,
retransmit: retransmit_socket, retransmit: retransmit_sockets,
forwards: tvu_forward_sockets, forwards: tvu_forward_sockets,
} = sockets; } = sockets;
@ -118,7 +118,7 @@ impl Tvu {
leader_schedule_cache, leader_schedule_cache,
blocktree.clone(), blocktree.clone(),
&cluster_info, &cluster_info,
Arc::new(retransmit_socket), Arc::new(retransmit_sockets),
repair_socket, repair_socket,
fetch_receiver, fetch_receiver,
&exit, &exit,
@ -270,7 +270,7 @@ pub mod tests {
{ {
Sockets { Sockets {
repair: target1.sockets.repair, repair: target1.sockets.repair,
retransmit: target1.sockets.retransmit, retransmit: target1.sockets.retransmit_sockets,
fetch: target1.sockets.tvu, fetch: target1.sockets.tvu,
forwards: target1.sockets.tvu_forwards, forwards: target1.sockets.tvu_forwards,
} }

View File

@ -274,9 +274,10 @@ impl Validator {
.expect("Failed to clone repair socket"), .expect("Failed to clone repair socket"),
retransmit: node retransmit: node
.sockets .sockets
.retransmit .retransmit_sockets
.try_clone() .iter()
.expect("Failed to clone retransmit socket"), .map(|s| s.try_clone().expect("Failed to clone retransmit socket"))
.collect(),
fetch: node fetch: node
.sockets .sockets
.tvu .tvu
@ -378,7 +379,7 @@ impl Validator {
); );
info!( info!(
"local retransmit address: {}", "local retransmit address: {}",
node.sockets.retransmit.local_addr().unwrap() node.sockets.retransmit_sockets[0].local_addr().unwrap()
); );
} }
} }

View File

@ -575,7 +575,6 @@ pub fn main() {
&node.sockets.gossip, &node.sockets.gossip,
&node.sockets.broadcast, &node.sockets.broadcast,
&node.sockets.repair, &node.sockets.repair,
&node.sockets.retransmit,
]; ];
let mut tcp_listeners: Vec<(_, _)> = tcp_ports let mut tcp_listeners: Vec<(_, _)> = tcp_ports