Give streamer::receiver() threads unique names (#35369)
The name was previously hard-coded to solReceiver. The use of the same name makes it hard to figure out which thread is which when these threads are handling many services (Gossip, Tvu, etc).
This commit is contained in:
parent
564a9f78a0
commit
7d6f1d5911
|
@ -108,6 +108,7 @@ fn main() -> Result<()> {
|
||||||
let (s_reader, r_reader) = unbounded();
|
let (s_reader, r_reader) = unbounded();
|
||||||
read_channels.push(r_reader);
|
read_channels.push(r_reader);
|
||||||
read_threads.push(receiver(
|
read_threads.push(receiver(
|
||||||
|
"solRcvrBenStrmr".to_string(),
|
||||||
Arc::new(read),
|
Arc::new(read),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
s_reader,
|
s_reader,
|
||||||
|
|
|
@ -159,8 +159,10 @@ impl FetchStage {
|
||||||
let tpu_threads: Vec<_> = if tpu_enable_udp {
|
let tpu_threads: Vec<_> = if tpu_enable_udp {
|
||||||
tpu_sockets
|
tpu_sockets
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|socket| {
|
.enumerate()
|
||||||
|
.map(|(i, socket)| {
|
||||||
streamer::receiver(
|
streamer::receiver(
|
||||||
|
format!("solRcvrTpu{i:02}"),
|
||||||
socket,
|
socket,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
sender.clone(),
|
sender.clone(),
|
||||||
|
@ -180,8 +182,10 @@ impl FetchStage {
|
||||||
let tpu_forwards_threads: Vec<_> = if tpu_enable_udp {
|
let tpu_forwards_threads: Vec<_> = if tpu_enable_udp {
|
||||||
tpu_forwards_sockets
|
tpu_forwards_sockets
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|socket| {
|
.enumerate()
|
||||||
|
.map(|(i, socket)| {
|
||||||
streamer::receiver(
|
streamer::receiver(
|
||||||
|
format!("solRcvrTpuFwd{i:02}"),
|
||||||
socket,
|
socket,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
forward_sender.clone(),
|
forward_sender.clone(),
|
||||||
|
@ -200,8 +204,10 @@ impl FetchStage {
|
||||||
let tpu_vote_stats = Arc::new(StreamerReceiveStats::new("tpu_vote_receiver"));
|
let tpu_vote_stats = Arc::new(StreamerReceiveStats::new("tpu_vote_receiver"));
|
||||||
let tpu_vote_threads: Vec<_> = tpu_vote_sockets
|
let tpu_vote_threads: Vec<_> = tpu_vote_sockets
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|socket| {
|
.enumerate()
|
||||||
|
.map(|(i, socket)| {
|
||||||
streamer::receiver(
|
streamer::receiver(
|
||||||
|
format!("solRcvrTpuVot{i:02}"),
|
||||||
socket,
|
socket,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
vote_sender.clone(),
|
vote_sender.clone(),
|
||||||
|
|
|
@ -160,6 +160,7 @@ impl AncestorHashesService {
|
||||||
let outstanding_requests = Arc::<RwLock<OutstandingAncestorHashesRepairs>>::default();
|
let outstanding_requests = Arc::<RwLock<OutstandingAncestorHashesRepairs>>::default();
|
||||||
let (response_sender, response_receiver) = unbounded();
|
let (response_sender, response_receiver) = unbounded();
|
||||||
let t_receiver = streamer::receiver(
|
let t_receiver = streamer::receiver(
|
||||||
|
"solRcvrAncHash".to_string(),
|
||||||
ancestor_hashes_request_socket.clone(),
|
ancestor_hashes_request_socket.clone(),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
response_sender.clone(),
|
response_sender.clone(),
|
||||||
|
@ -1294,6 +1295,7 @@ mod test {
|
||||||
|
|
||||||
// Set up repair request receiver threads
|
// Set up repair request receiver threads
|
||||||
let t_request_receiver = streamer::receiver(
|
let t_request_receiver = streamer::receiver(
|
||||||
|
"solRcvrTest".to_string(),
|
||||||
Arc::new(responder_node.sockets.serve_repair),
|
Arc::new(responder_node.sockets.serve_repair),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
requests_sender,
|
requests_sender,
|
||||||
|
|
|
@ -38,6 +38,7 @@ impl ServeRepairService {
|
||||||
serve_repair_socket.local_addr().unwrap()
|
serve_repair_socket.local_addr().unwrap()
|
||||||
);
|
);
|
||||||
let t_receiver = streamer::receiver(
|
let t_receiver = streamer::receiver(
|
||||||
|
"solRcvrServeRep".to_string(),
|
||||||
serve_repair_socket.clone(),
|
serve_repair_socket.clone(),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
request_sender,
|
request_sender,
|
||||||
|
|
|
@ -147,6 +147,7 @@ impl ShredFetchStage {
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn packet_modifier(
|
fn packet_modifier(
|
||||||
|
receiver_thread_name: &'static str,
|
||||||
sockets: Vec<Arc<UdpSocket>>,
|
sockets: Vec<Arc<UdpSocket>>,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
sender: Sender<PacketBatch>,
|
sender: Sender<PacketBatch>,
|
||||||
|
@ -161,9 +162,11 @@ impl ShredFetchStage {
|
||||||
let (packet_sender, packet_receiver) = unbounded();
|
let (packet_sender, packet_receiver) = unbounded();
|
||||||
let streamers = sockets
|
let streamers = sockets
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|s| {
|
.enumerate()
|
||||||
|
.map(|(i, socket)| {
|
||||||
streamer::receiver(
|
streamer::receiver(
|
||||||
s,
|
format!("{receiver_thread_name}{i:02}"),
|
||||||
|
socket,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
packet_sender.clone(),
|
packet_sender.clone(),
|
||||||
recycler.clone(),
|
recycler.clone(),
|
||||||
|
@ -211,6 +214,7 @@ impl ShredFetchStage {
|
||||||
let recycler = PacketBatchRecycler::warmed(100, 1024);
|
let recycler = PacketBatchRecycler::warmed(100, 1024);
|
||||||
|
|
||||||
let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
|
let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
|
||||||
|
"solRcvrShred",
|
||||||
sockets,
|
sockets,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
sender.clone(),
|
sender.clone(),
|
||||||
|
@ -224,6 +228,7 @@ impl ShredFetchStage {
|
||||||
);
|
);
|
||||||
|
|
||||||
let (repair_receiver, repair_handler) = Self::packet_modifier(
|
let (repair_receiver, repair_handler) = Self::packet_modifier(
|
||||||
|
"solRcvrShredRep",
|
||||||
vec![repair_socket.clone()],
|
vec![repair_socket.clone()],
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
sender.clone(),
|
sender.clone(),
|
||||||
|
|
|
@ -50,6 +50,7 @@ impl GossipService {
|
||||||
);
|
);
|
||||||
let socket_addr_space = *cluster_info.socket_addr_space();
|
let socket_addr_space = *cluster_info.socket_addr_space();
|
||||||
let t_receiver = streamer::receiver(
|
let t_receiver = streamer::receiver(
|
||||||
|
"solRcvrGossip".to_string(),
|
||||||
gossip_socket.clone(),
|
gossip_socket.clone(),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
request_sender,
|
request_sender,
|
||||||
|
|
|
@ -157,6 +157,7 @@ fn recv_loop(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn receiver(
|
pub fn receiver(
|
||||||
|
thread_name: String,
|
||||||
socket: Arc<UdpSocket>,
|
socket: Arc<UdpSocket>,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
packet_batch_sender: PacketBatchSender,
|
packet_batch_sender: PacketBatchSender,
|
||||||
|
@ -169,7 +170,7 @@ pub fn receiver(
|
||||||
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
|
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
|
||||||
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
|
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solReceiver".to_string())
|
.name(thread_name)
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let _ = recv_loop(
|
let _ = recv_loop(
|
||||||
&socket,
|
&socket,
|
||||||
|
@ -480,6 +481,7 @@ mod test {
|
||||||
let (s_reader, r_reader) = unbounded();
|
let (s_reader, r_reader) = unbounded();
|
||||||
let stats = Arc::new(StreamerReceiveStats::new("test"));
|
let stats = Arc::new(StreamerReceiveStats::new("test"));
|
||||||
let t_receiver = receiver(
|
let t_receiver = receiver(
|
||||||
|
"solRcvrTest".to_string(),
|
||||||
Arc::new(read),
|
Arc::new(read),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
s_reader,
|
s_reader,
|
||||||
|
|
Loading…
Reference in New Issue