Improved streamer debug messages
distinguish between threads
This commit is contained in:
parent
6f991b3c11
commit
bed5438831
|
@ -38,8 +38,12 @@ impl Ncp {
|
||||||
request_sender,
|
request_sender,
|
||||||
)?;
|
)?;
|
||||||
let (response_sender, response_receiver) = channel();
|
let (response_sender, response_receiver) = channel();
|
||||||
let t_responder =
|
let t_responder = streamer::responder(
|
||||||
streamer::responder(gossip_send_socket, blob_recycler.clone(), response_receiver);
|
"ncp",
|
||||||
|
gossip_send_socket,
|
||||||
|
blob_recycler.clone(),
|
||||||
|
response_receiver,
|
||||||
|
);
|
||||||
let t_listen = Crdt::listen(
|
let t_listen = Crdt::listen(
|
||||||
crdt.clone(),
|
crdt.clone(),
|
||||||
window,
|
window,
|
||||||
|
|
|
@ -422,7 +422,13 @@ impl Blob {
|
||||||
{
|
{
|
||||||
let p = r.read().expect("'r' read lock in pub fn send_to");
|
let p = r.read().expect("'r' read lock in pub fn send_to");
|
||||||
let a = p.meta.addr();
|
let a = p.meta.addr();
|
||||||
socket.send_to(&p.data[..p.meta.size], &a)?;
|
if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) {
|
||||||
|
info!(
|
||||||
|
"error sending {} byte packet to {:?}: {:?}",
|
||||||
|
p.meta.size, a, e
|
||||||
|
);
|
||||||
|
Err(e)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
re.recycle(r);
|
re.recycle(r);
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,7 +97,12 @@ impl ReplicateStage {
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (vote_blob_sender, vote_blob_receiver) = channel();
|
let (vote_blob_sender, vote_blob_receiver) = channel();
|
||||||
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
|
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
|
||||||
let t_responder = responder(send, blob_recycler.clone(), vote_blob_receiver);
|
let t_responder = responder(
|
||||||
|
"replicate_stage",
|
||||||
|
send,
|
||||||
|
blob_recycler.clone(),
|
||||||
|
vote_blob_receiver,
|
||||||
|
);
|
||||||
let skeypair = Arc::new(keypair);
|
let skeypair = Arc::new(keypair);
|
||||||
|
|
||||||
let t_replicate = Builder::new()
|
let t_replicate = Builder::new()
|
||||||
|
|
|
@ -64,7 +64,8 @@ impl Rpu {
|
||||||
blob_recycler.clone(),
|
blob_recycler.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let t_responder = streamer::responder(respond_socket, blob_recycler.clone(), blob_receiver);
|
let t_responder =
|
||||||
|
streamer::responder("rpu", respond_socket, blob_recycler.clone(), blob_receiver);
|
||||||
|
|
||||||
let mut thread_hdls = vec![t_receiver, t_responder];
|
let mut thread_hdls = vec![t_receiver, t_responder];
|
||||||
thread_hdls.extend(request_stage.thread_hdls().into_iter());
|
thread_hdls.extend(request_stage.thread_hdls().into_iter());
|
||||||
|
|
|
@ -104,15 +104,20 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)>
|
||||||
Ok((batch, len))
|
Ok((batch, len))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn responder(sock: UdpSocket, recycler: BlobRecycler, r: BlobReceiver) -> JoinHandle<()> {
|
pub fn responder(
|
||||||
|
name: &'static str,
|
||||||
|
sock: UdpSocket,
|
||||||
|
recycler: BlobRecycler,
|
||||||
|
r: BlobReceiver,
|
||||||
|
) -> JoinHandle<()> {
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-responder".to_string())
|
.name(format!("solana-responder-{}", name))
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
if let Err(e) = recv_send(&sock, &recycler, &r) {
|
if let Err(e) = recv_send(&sock, &recycler, &r) {
|
||||||
match e {
|
match e {
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
_ => error!("{:?}", e),
|
_ => error!("{} responder error: {:?}", name, e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -549,7 +554,7 @@ pub fn window(
|
||||||
match e {
|
match e {
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
_ => error!("{:?}", e),
|
_ => error!("window error: {:?}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let _ = repair_window(
|
let _ = repair_window(
|
||||||
|
@ -697,7 +702,7 @@ pub fn broadcaster(
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
Error::CrdtError(CrdtError::TooSmall) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
Error::CrdtError(CrdtError::TooSmall) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
||||||
_ => error!("{:?}", e),
|
_ => error!("broadcaster error: {:?}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -750,7 +755,7 @@ pub fn retransmitter(
|
||||||
match e {
|
match e {
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
_ => error!("{:?}", e),
|
_ => error!("retransmitter error: {:?}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -912,7 +917,12 @@ mod test {
|
||||||
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
|
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
|
||||||
let t_responder = {
|
let t_responder = {
|
||||||
let (s_responder, r_responder) = channel();
|
let (s_responder, r_responder) = channel();
|
||||||
let t_responder = responder(send, resp_recycler.clone(), r_responder);
|
let t_responder = responder(
|
||||||
|
"streamer_send_test",
|
||||||
|
send,
|
||||||
|
resp_recycler.clone(),
|
||||||
|
r_responder,
|
||||||
|
);
|
||||||
let mut msgs = VecDeque::new();
|
let mut msgs = VecDeque::new();
|
||||||
for i in 0..10 {
|
for i in 0..10 {
|
||||||
let b = resp_recycler.allocate();
|
let b = resp_recycler.allocate();
|
||||||
|
@ -986,7 +996,12 @@ mod test {
|
||||||
);
|
);
|
||||||
let t_responder = {
|
let t_responder = {
|
||||||
let (s_responder, r_responder) = channel();
|
let (s_responder, r_responder) = channel();
|
||||||
let t_responder = responder(tn.sockets.replicate, resp_recycler.clone(), r_responder);
|
let t_responder = responder(
|
||||||
|
"window_send_test",
|
||||||
|
tn.sockets.replicate,
|
||||||
|
resp_recycler.clone(),
|
||||||
|
r_responder,
|
||||||
|
);
|
||||||
let mut msgs = VecDeque::new();
|
let mut msgs = VecDeque::new();
|
||||||
for v in 0..10 {
|
for v in 0..10 {
|
||||||
let i = 9 - v;
|
let i = 9 - v;
|
||||||
|
|
|
@ -204,8 +204,12 @@ pub mod tests {
|
||||||
|
|
||||||
// simulate leader sending messages
|
// simulate leader sending messages
|
||||||
let (s_responder, r_responder) = channel();
|
let (s_responder, r_responder) = channel();
|
||||||
let t_responder =
|
let t_responder = streamer::responder(
|
||||||
streamer::responder(leader.sockets.requests, resp_recycler.clone(), r_responder);
|
"test_replicate",
|
||||||
|
leader.sockets.requests,
|
||||||
|
resp_recycler.clone(),
|
||||||
|
r_responder,
|
||||||
|
);
|
||||||
|
|
||||||
let starting_balance = 10_000;
|
let starting_balance = 10_000;
|
||||||
let mint = Mint::new(starting_balance);
|
let mint = Mint::new(starting_balance);
|
||||||
|
|
Loading…
Reference in New Issue