Give SigVerify and ShredFetch threads unique names (#98)
- solTvuFetchPmod ==> solTvuPktMod + solTvuRepPktMod - solSigVerifier ==> solSigVerTpu + solSigVerTpuVot
This commit is contained in:
parent
4753dcae71
commit
f3c6c08752
|
@ -160,7 +160,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher, use_same_tx: bool) {
|
||||||
let (packet_s, packet_r) = unbounded();
|
let (packet_s, packet_r) = unbounded();
|
||||||
let (verified_s, verified_r) = BankingTracer::channel_for_test();
|
let (verified_s, verified_r) = BankingTracer::channel_for_test();
|
||||||
let verifier = TransactionSigVerifier::new(verified_s);
|
let verifier = TransactionSigVerifier::new(verified_s);
|
||||||
let stage = SigVerifyStage::new(packet_r, verifier, "bench");
|
let stage = SigVerifyStage::new(packet_r, verifier, "solSigVerBench", "bench");
|
||||||
|
|
||||||
bencher.iter(move || {
|
bencher.iter(move || {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
|
@ -148,6 +148,7 @@ impl ShredFetchStage {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn packet_modifier(
|
fn packet_modifier(
|
||||||
receiver_thread_name: &'static str,
|
receiver_thread_name: &'static str,
|
||||||
|
modifier_thread_name: &'static str,
|
||||||
sockets: Vec<Arc<UdpSocket>>,
|
sockets: Vec<Arc<UdpSocket>>,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
sender: Sender<PacketBatch>,
|
sender: Sender<PacketBatch>,
|
||||||
|
@ -178,7 +179,7 @@ impl ShredFetchStage {
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
let modifier_hdl = Builder::new()
|
let modifier_hdl = Builder::new()
|
||||||
.name("solTvuFetchPMod".to_string())
|
.name(modifier_thread_name.to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let repair_context = repair_context
|
let repair_context = repair_context
|
||||||
.as_ref()
|
.as_ref()
|
||||||
|
@ -215,6 +216,7 @@ impl ShredFetchStage {
|
||||||
|
|
||||||
let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
|
let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
|
||||||
"solRcvrShred",
|
"solRcvrShred",
|
||||||
|
"solTvuPktMod",
|
||||||
sockets,
|
sockets,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
sender.clone(),
|
sender.clone(),
|
||||||
|
@ -229,6 +231,7 @@ impl ShredFetchStage {
|
||||||
|
|
||||||
let (repair_receiver, repair_handler) = Self::packet_modifier(
|
let (repair_receiver, repair_handler) = Self::packet_modifier(
|
||||||
"solRcvrShredRep",
|
"solRcvrShredRep",
|
||||||
|
"solTvuRepPktMod",
|
||||||
vec![repair_socket.clone()],
|
vec![repair_socket.clone()],
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
sender.clone(),
|
sender.clone(),
|
||||||
|
|
|
@ -238,9 +238,11 @@ impl SigVerifyStage {
|
||||||
pub fn new<T: SigVerifier + 'static + Send>(
|
pub fn new<T: SigVerifier + 'static + Send>(
|
||||||
packet_receiver: Receiver<PacketBatch>,
|
packet_receiver: Receiver<PacketBatch>,
|
||||||
verifier: T,
|
verifier: T,
|
||||||
name: &'static str,
|
thread_name: &'static str,
|
||||||
|
metrics_name: &'static str,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let thread_hdl = Self::verifier_services(packet_receiver, verifier, name);
|
let thread_hdl =
|
||||||
|
Self::verifier_service(packet_receiver, verifier, thread_name, metrics_name);
|
||||||
Self { thread_hdl }
|
Self { thread_hdl }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -407,7 +409,8 @@ impl SigVerifyStage {
|
||||||
fn verifier_service<T: SigVerifier + 'static + Send>(
|
fn verifier_service<T: SigVerifier + 'static + Send>(
|
||||||
packet_receiver: Receiver<PacketBatch>,
|
packet_receiver: Receiver<PacketBatch>,
|
||||||
mut verifier: T,
|
mut verifier: T,
|
||||||
name: &'static str,
|
thread_name: &'static str,
|
||||||
|
metrics_name: &'static str,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
let mut stats = SigVerifierStats::default();
|
let mut stats = SigVerifierStats::default();
|
||||||
let mut last_print = Instant::now();
|
let mut last_print = Instant::now();
|
||||||
|
@ -415,7 +418,7 @@ impl SigVerifyStage {
|
||||||
const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
|
const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
|
||||||
const DEDUPER_NUM_BITS: u64 = 63_999_979;
|
const DEDUPER_NUM_BITS: u64 = 63_999_979;
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solSigVerifier".to_string())
|
.name(thread_name.to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
|
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
|
||||||
|
@ -440,7 +443,7 @@ impl SigVerifyStage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if last_print.elapsed().as_secs() > 2 {
|
if last_print.elapsed().as_secs() > 2 {
|
||||||
stats.report(name);
|
stats.report(metrics_name);
|
||||||
stats = SigVerifierStats::default();
|
stats = SigVerifierStats::default();
|
||||||
last_print = Instant::now();
|
last_print = Instant::now();
|
||||||
}
|
}
|
||||||
|
@ -449,14 +452,6 @@ impl SigVerifyStage {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verifier_services<T: SigVerifier + 'static + Send>(
|
|
||||||
packet_receiver: Receiver<PacketBatch>,
|
|
||||||
verifier: T,
|
|
||||||
name: &'static str,
|
|
||||||
) -> JoinHandle<()> {
|
|
||||||
Self::verifier_service(packet_receiver, verifier, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn join(self) -> thread::Result<()> {
|
pub fn join(self) -> thread::Result<()> {
|
||||||
self.thread_hdl.join()
|
self.thread_hdl.join()
|
||||||
}
|
}
|
||||||
|
@ -552,7 +547,7 @@ mod tests {
|
||||||
let (packet_s, packet_r) = unbounded();
|
let (packet_s, packet_r) = unbounded();
|
||||||
let (verified_s, verified_r) = BankingTracer::channel_for_test();
|
let (verified_s, verified_r) = BankingTracer::channel_for_test();
|
||||||
let verifier = TransactionSigVerifier::new(verified_s);
|
let verifier = TransactionSigVerifier::new(verified_s);
|
||||||
let stage = SigVerifyStage::new(packet_r, verifier, "test");
|
let stage = SigVerifyStage::new(packet_r, verifier, "solSigVerTest", "test");
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let packets_per_batch = 128;
|
let packets_per_batch = 128;
|
||||||
|
|
|
@ -190,14 +190,19 @@ impl Tpu {
|
||||||
|
|
||||||
let sigverify_stage = {
|
let sigverify_stage = {
|
||||||
let verifier = TransactionSigVerifier::new(non_vote_sender);
|
let verifier = TransactionSigVerifier::new(non_vote_sender);
|
||||||
SigVerifyStage::new(packet_receiver, verifier, "tpu-verifier")
|
SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier")
|
||||||
};
|
};
|
||||||
|
|
||||||
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
|
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
|
||||||
|
|
||||||
let vote_sigverify_stage = {
|
let vote_sigverify_stage = {
|
||||||
let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender);
|
let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender);
|
||||||
SigVerifyStage::new(vote_packet_receiver, verifier, "tpu-vote-verifier")
|
SigVerifyStage::new(
|
||||||
|
vote_packet_receiver,
|
||||||
|
verifier,
|
||||||
|
"solSigVerTpuVot",
|
||||||
|
"tpu-vote-verifier",
|
||||||
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
let (gossip_vote_sender, gossip_vote_receiver) =
|
let (gossip_vote_sender, gossip_vote_receiver) =
|
||||||
|
|
Loading…
Reference in New Issue