Revert "accidental drop on mustusesender"

This reverts commit 5ec8d09e0d.
This commit is contained in:
teor 2021-02-23 10:54:48 +10:00 committed by Jane Lusby
parent d60226a3cf
commit a5e89f4f2b
1 changed files with 17 additions and 29 deletions

View File

@ -18,7 +18,7 @@ use futures::{
prelude::*, prelude::*,
stream::Stream, stream::Stream,
}; };
use tokio::time::{sleep, Sleep}; use tokio::time::Sleep;
use tower::Service; use tower::Service;
use tracing_futures::Instrument; use tracing_futures::Instrument;
@ -29,7 +29,6 @@ use zebra_chain::{
}; };
use crate::{ use crate::{
constants,
protocol::{ protocol::{
external::{types::Nonce, InventoryHash, Message}, external::{types::Nonce, InventoryHash, Message},
internal::{Request, Response}, internal::{Request, Response},
@ -378,15 +377,11 @@ impl State {
.instrument(span.clone()) .instrument(span.clone())
.await .await
{ {
Either::Left((None, _)) => { Either::Left((None, _)) => Transition::ExitResponse {
Transition::ExitResponse { e: PeerError::ConnectionClosed.into(),
e: PeerError::ConnectionClosed.into(), tx,
tx, },
} Either::Left((Some(Err(e)), _)) => Transition::ExitResponse { e: e.into(), tx },
}
Either::Left((Some(Err(e)), _)) => {
Transition::ExitResponse { e: e.into(), tx }
}
Either::Left((Some(Ok(peer_msg)), _cancel)) => { Either::Left((Some(Ok(peer_msg)), _cancel)) => {
let request_msg = span.in_scope(|| handler.process_message(peer_msg)); let request_msg = span.in_scope(|| handler.process_message(peer_msg));
// If the message was not consumed, check whether it // If the message was not consumed, check whether it
@ -395,11 +390,8 @@ impl State {
// do NOT instrument with the request span, this is // do NOT instrument with the request span, this is
// independent work // independent work
match conn.handle_message_as_request(msg).await { match conn.handle_message_as_request(msg).await {
Ok(()) => { Ok(()) => Transition::AwaitRequest,
Transition::AwaitResponse { tx, handler, span } Err(e) => Transition::Exit(e.into()),
// Transition::AwaitRequest
}
Err(e) => Transition::ExitResponse { e: e.into(), tx },
} }
} else { } else {
// Otherwise, check whether the handler is finished // Otherwise, check whether the handler is finished
@ -523,22 +515,18 @@ where
.step(&mut self, &mut peer_rx) .step(&mut self, &mut peer_rx)
.await; .await;
if matches!(transition, Transition::AwaitResponse { .. }) {
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
}
self.state = match transition.try_into() { self.state = match transition.try_into() {
Ok(state) => Some(state), Ok(state) => Some(state),
Err(e) => { Err(e) => {
// while let Some(InProgressClientRequest { tx, span, .. }) = while let Some(InProgressClientRequest { tx, span, .. }) =
// self.client_rx.next().await self.client_rx.next().await
// { {
// trace!( trace!(
// parent: &span, parent: &span,
// "sending an error response to a pending request on a failed connection" "sending an error response to a pending request on a failed connection"
// ); );
// let _ = tx.send(Err(e.clone())); let _ = tx.send(Err(e.clone()));
// } }
return; return;
} }
} }