fix(network): Add a send timeout to outbound peer messages (#3417)

* fix(network): add a send timeout to outbound peer messages

* test(network): test peer send and receive timeouts

And the equivalent success cases:
- spawn the run loop with no messages
- spawn the run loop and send and receive a message

* test(network): check for specific error types in the tests

And add an outbound error test that doesn't expect a response.

* test(network): use bounded fake peer connection channels

This lets us actually trigger send timeouts in the tests.

* refactor(network): rename some confusing types and variables

fastmod peer_inbound_tx peer_tx zebra*
fastmod peer_inbound_rx peer_rx zebra*

fastmod ClientSendTimeout ConnectionSendTimeout zebra*
fastmod ClientReceiveTimeout ConnectionReceiveTimeout zebra*

* doc(network test): explain the purpose of each peer connection test vector
This commit is contained in:
teor 2022-02-01 04:22:00 +10:00 committed by GitHub
parent f270fd2de6
commit c76ff56cd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 566 additions and 116 deletions

View File

@ -43,7 +43,8 @@ pub const OUTBOUND_PEER_BIAS_DENOMINATOR: usize = 2;
/// buffer adds up to 6 seconds worth of blocks to the queue.
pub const PEERSET_BUFFER_SIZE: usize = 3;
/// The timeout for requests made to a remote peer.
/// The timeout for sending a message to a remote peer,
/// and receiving a response from a remote peer.
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
/// The timeout for handshakes when connecting to new peers.

View File

@ -28,8 +28,9 @@ use crate::{
constants,
meta_addr::MetaAddr,
peer::{
error::AlreadyErrored, ClientRequestReceiver, ErrorSlot, InProgressClientRequest,
MustUseOneshotSender, PeerError, SharedPeerError,
connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver,
ConnectedAddr, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
SharedPeerError,
},
peer_set::ConnectionTracker,
protocol::{
@ -39,6 +40,8 @@ use crate::{
BoxError,
};
mod peer_tx;
#[cfg(test)]
mod tests;
@ -437,7 +440,7 @@ impl From<Request> for InboundMessage {
}
}
/// The state associated with a peer connection.
/// The channels, services, and associated state for a peer connection.
pub struct Connection<S, Tx> {
/// The state of this connection's current request or response.
pub(super) state: State,
@ -474,9 +477,7 @@ pub struct Connection<S, Tx> {
/// This channel accepts [`Message`]s.
///
/// The corresponding peer message receiver is passed to [`Connection::run`].
///
/// TODO: add a timeout when sending messages to the remote peer (#3234)
pub(super) peer_tx: Tx,
pub(super) peer_tx: PeerTx<Tx>,
/// A connection tracker that reduces the open connection count when dropped.
/// Used to limit the number of open connections in Zebra.
@ -498,6 +499,31 @@ pub struct Connection<S, Tx> {
pub(super) last_metrics_state: Option<Cow<'static, str>>,
}
impl<S, Tx> Connection<S, Tx> {
/// Return a new connection from its channels, services, and shared state.
pub(crate) fn new(
inbound_service: S,
client_rx: futures::channel::mpsc::Receiver<ClientRequest>,
error_slot: ErrorSlot,
peer_tx: Tx,
connection_tracker: ConnectionTracker,
connected_addr: ConnectedAddr,
) -> Self {
Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: inbound_service,
client_rx: client_rx.into(),
error_slot,
peer_tx: peer_tx.into(),
connection_tracker,
metrics_label: connected_addr.get_transient_addr_label(),
last_metrics_state: None,
}
}
}
impl<S, Tx> Connection<S, Tx>
where
S: Service<Request, Response = Response, Error = BoxError>,
@ -702,7 +728,7 @@ where
}
Either::Left((Either::Right(_), _peer_fut)) => {
trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout;
let e = PeerError::ConnectionReceiveTimeout;
// Replace the state with a temporary value,
// so we can take ownership of the response sender.

View File

@ -0,0 +1,37 @@
//! The peer message sender channel.
use futures::{Sink, SinkExt};
use zebra_chain::serialization::SerializationError;
use crate::{constants::REQUEST_TIMEOUT, protocol::external::Message, PeerError};
/// A wrapper type for a peer connection message sender.
///
/// Used to apply a timeout to send messages.
#[derive(Clone, Debug)]
pub struct PeerTx<Tx> {
/// A channel for sending Zcash messages to the connected peer.
///
/// This channel accepts [`Message`]s.
inner: Tx,
}
impl<Tx> PeerTx<Tx>
where
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
/// Sends `msg` on `self.inner`, returning a timeout error if it takes too long.
pub async fn send(&mut self, msg: Message) -> Result<(), PeerError> {
tokio::time::timeout(REQUEST_TIMEOUT, self.inner.send(msg))
.await
.map_err(|_| PeerError::ConnectionSendTimeout)?
.map_err(Into::into)
}
}
impl<Tx> From<Tx> for PeerTx<Tx> {
fn from(tx: Tx) -> Self {
PeerTx { inner: tx }
}
}

View File

@ -8,9 +8,7 @@ use zebra_chain::serialization::SerializationError;
use zebra_test::mock_service::MockService;
use crate::{
peer::{
client::ClientRequestReceiver, connection::State, ClientRequest, Connection, ErrorSlot,
},
peer::{ClientRequest, ConnectedAddr, Connection, ErrorSlot},
peer_set::ActiveConnectionCounter,
protocol::external::Message,
Request, Response,
@ -23,17 +21,20 @@ mod vectors;
fn new_test_connection<A>() -> (
Connection<
MockService<Request, Response, A>,
SinkMapErr<mpsc::UnboundedSender<Message>, fn(mpsc::SendError) -> SerializationError>,
SinkMapErr<mpsc::Sender<Message>, fn(mpsc::SendError) -> SerializationError>,
>,
mpsc::Sender<ClientRequest>,
MockService<Request, Response, A>,
mpsc::UnboundedReceiver<Message>,
mpsc::Receiver<Message>,
ErrorSlot,
) {
let mock_inbound_service = MockService::build().finish();
let (client_tx, client_rx) = mpsc::channel(1);
let (client_tx, client_rx) = mpsc::channel(0);
let shared_error_slot = ErrorSlot::default();
let (peer_outbound_tx, peer_outbound_rx) = mpsc::unbounded();
// Normally the network has more capacity than the sender's single implicit slot,
// but the smaller capacity makes some tests easier.
let (peer_tx, peer_rx) = mpsc::channel(0);
let error_converter: fn(mpsc::SendError) -> SerializationError = |_| {
io::Error::new(
@ -42,26 +43,22 @@ fn new_test_connection<A>() -> (
)
.into()
};
let peer_tx = peer_outbound_tx.sink_map_err(error_converter);
let peer_tx = peer_tx.sink_map_err(error_converter);
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: mock_inbound_service.clone(),
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
let connection = Connection::new(
mock_inbound_service.clone(),
client_rx,
shared_error_slot.clone(),
peer_tx,
connection_tracker: ActiveConnectionCounter::new_counter().track_connection(),
metrics_label: "test".to_string(),
last_metrics_state: None,
};
ActiveConnectionCounter::new_counter().track_connection(),
ConnectedAddr::Isolated,
);
(
connection,
client_tx,
mock_inbound_service,
peer_outbound_rx,
peer_rx,
shared_error_slot,
)
}

View File

@ -41,7 +41,7 @@ proptest! {
runtime.block_on(async move {
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (mut peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let (mut peer_tx, peer_rx) = mpsc::channel(1);
let (
connection,
@ -51,7 +51,7 @@ proptest! {
shared_error_slot,
) = new_test_connection();
let connection_task = tokio::spawn(connection.run(peer_inbound_rx));
let connection_task = tokio::spawn(connection.run(peer_rx));
let response_to_first_request = send_block_request(
first_block.hash(),
@ -71,13 +71,13 @@ proptest! {
.await;
// Reply to first request
peer_inbound_tx
peer_tx
.send(Ok(Message::Block(first_block)))
.await
.expect("Failed to send response to first block request");
// Reply to second request
peer_inbound_tx
peer_tx
.send(Ok(Message::Block(second_block.clone())))
.await
.expect("Failed to send response to second block request");
@ -100,7 +100,7 @@ proptest! {
inbound_service.expect_no_requests().await?;
// Stop the connection thread
mem::drop(peer_inbound_tx);
mem::drop(peer_tx);
let connection_task_result = connection_task.await;
prop_assert!(connection_task_result.is_ok());
@ -114,11 +114,11 @@ proptest! {
fn new_test_connection() -> (
Connection<
MockService<Request, Response, PropTestAssertion>,
SinkMapErr<mpsc::UnboundedSender<Message>, fn(mpsc::SendError) -> SerializationError>,
SinkMapErr<mpsc::Sender<Message>, fn(mpsc::SendError) -> SerializationError>,
>,
mpsc::Sender<ClientRequest>,
MockService<Request, Response, PropTestAssertion>,
mpsc::UnboundedReceiver<Message>,
mpsc::Receiver<Message>,
ErrorSlot,
) {
super::new_test_connection()
@ -127,7 +127,7 @@ fn new_test_connection() -> (
async fn send_block_request(
block: block::Hash,
client_requests: &mut mpsc::Sender<ClientRequest>,
outbound_messages: &mut mpsc::UnboundedReceiver<Message>,
outbound_messages: &mut mpsc::Receiver<Message>,
) -> oneshot::Receiver<Result<Response, SharedPeerError>> {
let (response_sender, response_receiver) = oneshot::channel();

View File

@ -1,15 +1,23 @@
//! Fixed test vectors for peer connections.
//!
//! TODO:
//! - connection tests when awaiting requests (#3232)
//! - connection tests with closed/dropped peer_outbound_tx (#3233)
//! TODO: add tests for:
//! - inbound message as request
//! - inbound message, but not a request (or a response)
use futures::{channel::mpsc, sink::SinkMapErr, FutureExt, StreamExt};
use std::{collections::HashSet, task::Poll, time::Duration};
use futures::{
channel::{mpsc, oneshot},
sink::SinkMapErr,
FutureExt, StreamExt,
};
use tracing::Span;
use zebra_chain::serialization::SerializationError;
use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::{
constants::REQUEST_TIMEOUT,
peer::{
connection::{Connection, State},
ClientRequest, ErrorSlot,
@ -18,18 +26,19 @@ use crate::{
PeerError, Request, Response,
};
/// Test that the connection run loop works as a future
#[tokio::test]
async fn connection_run_loop_ok() {
zebra_test::init();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let (peer_tx, peer_rx) = mpsc::channel(1);
let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) =
new_test_connection();
let connection = connection.run(peer_inbound_rx);
let connection = connection.run(peer_rx);
// The run loop will wait forever for a request from Zebra or the peer,
// without any errors, channel closes, or bytes written.
@ -41,57 +50,71 @@ async fn connection_run_loop_ok() {
assert_eq!(result, None);
let error = shared_error_slot.try_get_error();
assert!(
matches!(error, None),
"unexpected connection error: {:?}",
error
);
assert!(error.is_none(), "unexpected error: {:?}", error);
assert!(!client_tx.is_closed());
assert!(!peer_inbound_tx.is_closed());
assert!(!peer_tx.is_closed());
inbound_service.expect_no_requests().await;
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Test that the connection run loop works as a spawned task
#[tokio::test]
async fn connection_run_loop_future_drop() {
async fn connection_run_loop_spawn_ok() {
zebra_test::init();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let (peer_tx, peer_rx) = mpsc::channel(1);
let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) =
new_test_connection();
let connection = connection.run(peer_inbound_rx);
// now_or_never implicitly drops the connection future.
let result = connection.now_or_never();
assert_eq!(result, None);
// Spawn the connection run loop
let mut connection_join_handle = tokio::spawn(connection.run(peer_rx));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
assert!(error.is_none(), "unexpected error: {:?}", error);
assert!(client_tx.is_closed());
assert!(peer_inbound_tx.is_closed());
assert!(peer_outbound_messages.next().await.is_none());
assert!(!client_tx.is_closed());
assert!(!peer_tx.is_closed());
inbound_service.expect_no_requests().await;
// Make sure that the connection did not:
// - panic, or
// - return.
//
// This test doesn't cause any fatal errors,
// so returning would be incorrect behaviour.
let connection_result = futures::poll!(&mut connection_join_handle);
assert!(
matches!(connection_result, Poll::Pending),
"unexpected run loop termination: {:?}",
connection_result,
);
// We need to abort the connection, because it holds a lock on the outbound channel.
connection_join_handle.abort();
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Test that the connection run loop works as a spawned task with messages in and out
#[tokio::test]
async fn connection_run_loop_client_close() {
async fn connection_run_loop_message_ok() {
zebra_test::init();
tokio::time::pause();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let (mut peer_tx, peer_rx) = mpsc::channel(1);
let (
connection,
@ -101,7 +124,116 @@ async fn connection_run_loop_client_close() {
shared_error_slot,
) = new_test_connection();
let connection = connection.run(peer_inbound_rx);
// Spawn the connection run loop
let mut connection_join_handle = tokio::spawn(connection.run(peer_rx));
// Simulate a message send and receive
let (request_tx, mut request_rx) = oneshot::channel();
let request = ClientRequest {
request: Request::Peers,
tx: request_tx,
span: Span::current(),
};
client_tx
.try_send(request)
.expect("internal request channel is valid");
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, Some(Message::GetAddr));
peer_tx
.try_send(Ok(Message::Addr(Vec::new())))
.expect("peer inbound response channel is valid");
// give the event loop time to run
tokio::task::yield_now().await;
let peer_response = request_rx.try_recv();
assert_eq!(
peer_response
.expect("peer internal response channel is valid")
.expect("response is present")
.expect("response is a message (not an error)"),
Response::Peers(Vec::new()),
);
let error = shared_error_slot.try_get_error();
assert!(error.is_none(), "unexpected error: {:?}", error);
assert!(!client_tx.is_closed());
assert!(!peer_tx.is_closed());
inbound_service.expect_no_requests().await;
// Make sure that the connection did not:
// - panic, or
// - return.
//
// This test doesn't cause any fatal errors,
// so returning would be incorrect behaviour.
let connection_result = futures::poll!(&mut connection_join_handle);
assert!(
matches!(connection_result, Poll::Pending),
"unexpected run loop termination: {:?}",
connection_result,
);
// We need to abort the connection, because it holds a lock on the outbound channel.
connection_join_handle.abort();
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Test that the connection run loop fails correctly when dropped
#[tokio::test]
async fn connection_run_loop_future_drop() {
zebra_test::init();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_tx, peer_rx) = mpsc::channel(1);
let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) =
new_test_connection();
let connection = connection.run(peer_rx);
// now_or_never implicitly drops the connection future.
let result = connection.now_or_never();
assert_eq!(result, None);
let error = shared_error_slot.try_get_error();
assert_eq!(
error.expect("missing expected error").inner_debug(),
"ConnectionDropped",
);
assert!(client_tx.is_closed());
assert!(peer_tx.is_closed());
inbound_service.expect_no_requests().await;
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Test that the connection run loop fails correctly when the internal client closes the connection channel
#[tokio::test]
async fn connection_run_loop_client_close() {
zebra_test::init();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_tx, peer_rx) = mpsc::channel(1);
let (
connection,
mut client_tx,
mut inbound_service,
mut peer_outbound_messages,
shared_error_slot,
) = new_test_connection();
let connection = connection.run(peer_rx);
// Explicitly close the client channel.
client_tx.close_channel();
@ -113,30 +245,35 @@ async fn connection_run_loop_client_close() {
assert_eq!(result, Some(()));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
assert_eq!(
error.expect("missing expected error").inner_debug(),
"ClientDropped",
);
assert!(client_tx.is_closed());
assert!(peer_inbound_tx.is_closed());
assert!(peer_tx.is_closed());
inbound_service.expect_no_requests().await;
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Test that the connection run loop fails correctly when the internal client drops the connection channel
#[tokio::test]
async fn connection_run_loop_client_drop() {
zebra_test::init();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let (peer_tx, peer_rx) = mpsc::channel(1);
let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) =
new_test_connection();
let connection = connection.run(peer_inbound_rx);
let connection = connection.run(peer_rx);
// Drop the client channel.
std::mem::drop(client_tx);
@ -148,32 +285,38 @@ async fn connection_run_loop_client_drop() {
assert_eq!(result, Some(()));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
assert_eq!(
error.expect("missing expected error").inner_debug(),
"ClientDropped",
);
assert!(peer_inbound_tx.is_closed());
assert!(peer_tx.is_closed());
inbound_service.expect_no_requests().await;
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Test that the connection run loop fails correctly when the peer channel is closed.
/// (We're not sure if tokio closes or drops the TcpStream when the TCP connection closes.)
#[tokio::test]
async fn connection_run_loop_inbound_close() {
zebra_test::init();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (mut peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let (mut peer_tx, peer_rx) = mpsc::channel(1);
let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) =
new_test_connection();
let connection = connection.run(peer_inbound_rx);
let connection = connection.run(peer_rx);
// Explicitly close the inbound peer channel.
peer_inbound_tx.close_channel();
peer_tx.close_channel();
// If we drop the future, the connection will close anyway, so we avoid the drop by cloning it.
let connection = connection.shared();
@ -182,33 +325,39 @@ async fn connection_run_loop_inbound_close() {
assert_eq!(result, Some(()));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
assert_eq!(
error.expect("missing expected error").inner_debug(),
"ConnectionClosed",
);
assert!(client_tx.is_closed());
assert!(peer_inbound_tx.is_closed());
assert!(peer_tx.is_closed());
inbound_service.expect_no_requests().await;
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Test that the connection run loop fails correctly when the peer channel is dropped
/// (We're not sure if tokio closes or drops the TcpStream when the TCP connection closes.)
#[tokio::test]
async fn connection_run_loop_inbound_drop() {
zebra_test::init();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let (peer_tx, peer_rx) = mpsc::channel(1);
let (connection, client_tx, mut inbound_service, mut peer_outbound_messages, shared_error_slot) =
new_test_connection();
let connection = connection.run(peer_inbound_rx);
let connection = connection.run(peer_rx);
// Drop the inbound peer channel.
std::mem::drop(peer_inbound_tx);
std::mem::drop(peer_tx);
// If we drop the future, the connection will close anyway, so we avoid the drop by cloning it.
let connection = connection.shared();
@ -217,24 +366,29 @@ async fn connection_run_loop_inbound_drop() {
assert_eq!(result, Some(()));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
assert_eq!(
error.expect("missing expected error").inner_debug(),
"ConnectionClosed",
);
assert!(client_tx.is_closed());
inbound_service.expect_no_requests().await;
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert!(peer_outbound_messages.next().await.is_none());
inbound_service.expect_no_requests().await;
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Test that the connection run loop fails correctly on internal connection errors.
#[tokio::test]
async fn connection_run_loop_failed() {
zebra_test::init();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let (peer_tx, peer_rx) = mpsc::channel(1);
let (
mut connection,
@ -247,10 +401,10 @@ async fn connection_run_loop_failed() {
// Simulate an internal connection error.
connection.state = State::Failed;
shared_error_slot
.try_update_error(PeerError::ClientRequestTimeout.into())
.try_update_error(PeerError::Overloaded.into())
.expect("unexpected previous error in tests");
let connection = connection.run(peer_inbound_rx);
let connection = connection.run(peer_rx);
// If we drop the future, the connection will close anyway, so we avoid the drop by cloning it.
let connection = connection.shared();
@ -261,27 +415,253 @@ async fn connection_run_loop_failed() {
assert_eq!(result, Some(()));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
assert_eq!(
error.expect("missing expected error").inner_debug(),
"Overloaded",
);
assert!(client_tx.is_closed());
assert!(peer_inbound_tx.is_closed());
assert!(peer_tx.is_closed());
inbound_service.expect_no_requests().await;
// We need to drop the future, because it holds a mutable reference to the bytes.
std::mem::drop(connection_guard);
assert!(peer_outbound_messages.next().await.is_none());
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Test that the connection run loop fails correctly when sending a message to a peer times out,
/// but we are not expecting a response message from the peer.
#[tokio::test]
async fn connection_run_loop_send_timeout_nil_response() {
zebra_test::init();
tokio::time::pause();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_tx, peer_rx) = mpsc::channel(1);
let (
connection,
mut client_tx,
mut inbound_service,
mut peer_outbound_messages,
shared_error_slot,
) = new_test_connection();
// Spawn the connection run loop
let mut connection_join_handle = tokio::spawn(connection.run(peer_rx));
// Simulate a message send timeout
let (request_tx, mut request_rx) = oneshot::channel();
let request = ClientRequest {
request: Request::AdvertiseTransactionIds(HashSet::new()),
tx: request_tx,
span: Span::current(),
};
client_tx.try_send(request).expect("channel is valid");
// Make the send timeout
tokio::time::sleep(REQUEST_TIMEOUT + Duration::from_secs(1)).await;
// Send timeouts close the connection
let error = shared_error_slot.try_get_error();
assert_eq!(
error.expect("missing expected error").inner_debug(),
"ConnectionSendTimeout",
);
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, Some(Message::Inv(Vec::new())));
let peer_response = request_rx.try_recv();
assert_eq!(
peer_response
.expect("peer internal response channel is valid")
.expect("response is present")
.expect_err("response is an error (not a message)")
.inner_debug(),
"ConnectionSendTimeout",
);
assert!(client_tx.is_closed());
assert!(peer_tx.is_closed());
inbound_service.expect_no_requests().await;
// Make sure that the connection finished, but did not panic.
let connection_result = futures::poll!(&mut connection_join_handle);
assert!(
matches!(connection_result, Poll::Ready(Ok(()))),
"expected run loop termination, but run loop continued: {:?}",
connection_result,
);
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Test that the connection run loop fails correctly when sending a message to a peer times out,
/// and we are expecting a response message from the peer.
#[tokio::test]
async fn connection_run_loop_send_timeout_expect_response() {
zebra_test::init();
tokio::time::pause();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_tx, peer_rx) = mpsc::channel(1);
let (
connection,
mut client_tx,
mut inbound_service,
mut peer_outbound_messages,
shared_error_slot,
) = new_test_connection();
// Spawn the connection run loop
let mut connection_join_handle = tokio::spawn(connection.run(peer_rx));
// Simulate a message send timeout
let (request_tx, mut request_rx) = oneshot::channel();
let request = ClientRequest {
request: Request::Peers,
tx: request_tx,
span: Span::current(),
};
client_tx.try_send(request).expect("channel is valid");
// Make the send timeout
tokio::time::sleep(REQUEST_TIMEOUT + Duration::from_secs(1)).await;
// Send timeouts close the connection
let error = shared_error_slot.try_get_error();
assert_eq!(
error.expect("missing expected error").inner_debug(),
"ConnectionSendTimeout",
);
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, Some(Message::GetAddr));
let peer_response = request_rx.try_recv();
assert_eq!(
peer_response
.expect("peer internal response channel is valid")
.expect("response is present")
.expect_err("response is an error (not a message)")
.inner_debug(),
"ConnectionSendTimeout",
);
assert!(client_tx.is_closed());
assert!(peer_tx.is_closed());
inbound_service.expect_no_requests().await;
// Make sure that the connection finished, but did not panic.
let connection_result = futures::poll!(&mut connection_join_handle);
assert!(
matches!(connection_result, Poll::Ready(Ok(()))),
"expected run loop termination, but run loop continued: {:?}",
connection_result,
);
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Test that the connection run loop continues but returns an error to the client,
/// when a peer accepts a message, but does not send an expected response.
#[tokio::test]
async fn connection_run_loop_receive_timeout() {
zebra_test::init();
tokio::time::pause();
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (peer_tx, peer_rx) = mpsc::channel(1);
let (
connection,
mut client_tx,
mut inbound_service,
mut peer_outbound_messages,
shared_error_slot,
) = new_test_connection();
// Spawn the connection run loop
let mut connection_join_handle = tokio::spawn(connection.run(peer_rx));
// Simulate a message receive timeout
let (request_tx, mut request_rx) = oneshot::channel();
let request = ClientRequest {
request: Request::Peers,
tx: request_tx,
span: Span::current(),
};
client_tx.try_send(request).expect("channel is valid");
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, Some(Message::GetAddr));
// Make the receive timeout
tokio::time::sleep(REQUEST_TIMEOUT + Duration::from_secs(1)).await;
// Receive timeouts don't close the connection
let error = shared_error_slot.try_get_error();
assert!(error.is_none(), "unexpected error: {:?}", error);
assert!(!client_tx.is_closed());
assert!(!peer_tx.is_closed());
let peer_response = request_rx.try_recv();
assert_eq!(
peer_response
.expect("peer internal response channel is valid")
.expect("response is present")
.expect_err("response is an error (not a message)")
.inner_debug(),
"ConnectionReceiveTimeout",
);
inbound_service.expect_no_requests().await;
// Make sure that the connection did not:
// - panic, or
// - return.
//
// This test doesn't cause any fatal errors,
// so returning would be incorrect behaviour.
let connection_result = futures::poll!(&mut connection_join_handle);
assert!(
matches!(connection_result, Poll::Pending),
"unexpected run loop termination: {:?}",
connection_result,
);
// We need to abort the connection, because it holds a lock on the outbound channel.
connection_join_handle.abort();
let outbound_message = peer_outbound_messages.next().await;
assert_eq!(outbound_message, None);
}
/// Creates a new [`Connection`] instance for unit tests.
fn new_test_connection() -> (
Connection<
MockService<Request, Response, PanicAssertion>,
SinkMapErr<mpsc::UnboundedSender<Message>, fn(mpsc::SendError) -> SerializationError>,
SinkMapErr<mpsc::Sender<Message>, fn(mpsc::SendError) -> SerializationError>,
>,
mpsc::Sender<ClientRequest>,
MockService<Request, Response, PanicAssertion>,
mpsc::UnboundedReceiver<Message>,
mpsc::Receiver<Message>,
ErrorSlot,
) {
super::new_test_connection()

View File

@ -21,6 +21,15 @@ where
}
}
impl SharedPeerError {
/// Returns a debug-formatted string describing the inner [`PeerError`].
///
/// Unfortunately, [`TracedError`] makes it impossible to get a reference to the original error.
pub fn inner_debug(&self) -> String {
format!("{:?}", self.0.as_ref())
}
}
/// An error related to peer connection handling.
#[derive(Error, Debug)]
#[allow(dead_code)]
@ -49,9 +58,13 @@ pub enum PeerError {
#[error("Internal heartbeat task exited")]
HeartbeatTaskExited,
/// The remote peer did not respond to a [`peer::Client`] request in time.
#[error("Client request timed out")]
ClientRequestTimeout,
/// Sending a message to a remote peer took too long.
#[error("Sending Client request timed out")]
ConnectionSendTimeout,
/// Receiving a response to a [`peer::Client`] request took too long.
#[error("Receiving client response timed out")]
ConnectionReceiveTimeout,
/// A serialization error occurred while reading or writing a message.
#[error("Serialization error: {0}")]
@ -82,7 +95,8 @@ impl PeerError {
PeerError::ClientCancelledHeartbeatTask => "ClientCancelledHeartbeatTask".into(),
PeerError::HeartbeatTaskExited => "HeartbeatTaskExited".into(),
PeerError::ConnectionTaskExited => "ConnectionTaskExited".into(),
PeerError::ClientRequestTimeout => "ClientRequestTimeout".into(),
PeerError::ConnectionSendTimeout => "ConnectionSendTimeout".into(),
PeerError::ConnectionReceiveTimeout => "ConnectionReceiveTimeout".into(),
// TODO: add error kinds or summaries to `SerializationError`
PeerError::Serialization(inner) => format!("Serialization({})", inner).into(),
PeerError::DuplicateHandshake => "DuplicateHandshake".into(),

View File

@ -958,19 +958,14 @@ where
})
.boxed();
use super::connection;
let server = Connection {
state: connection::State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: inbound_service,
client_rx: server_rx.into(),
error_slot: error_slot.clone(),
let server = Connection::new(
inbound_service,
server_rx,
error_slot.clone(),
peer_tx,
connection_tracker,
metrics_label: connected_addr.get_transient_addr_label(),
last_metrics_state: None,
};
connected_addr,
);
let connection_task = tokio::spawn(
server