Implement graceful shutdown for the peer set (#3071)
Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
parent
c4118dcc2c
commit
3fc049e2eb
|
@ -44,6 +44,7 @@
|
|||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
convert,
|
||||
fmt::Debug,
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
|
@ -147,7 +148,18 @@ where
|
|||
<D::Service as Service<Request>>::Future: Send + 'static,
|
||||
<D::Service as Load>::Metric: Debug,
|
||||
{
|
||||
/// Construct a peerset which uses `discover` internally.
|
||||
/// Construct a peerset which uses `discover` to manage peer connections.
|
||||
///
|
||||
/// Arguments:
|
||||
/// - `config`: configures the peer set connection limit;
|
||||
/// - `discover`: handles peer connects and disconnects;
|
||||
/// - `demand_signal`: requests more peers when all peers are busy (unready);
|
||||
/// - `handle_rx`: receives background task handles,
|
||||
/// monitors them to make sure they're still running,
|
||||
/// and shuts down all the tasks as soon as one task exits;
|
||||
/// - `inv_stream`: receives inventory advertisements for peers,
|
||||
/// allowing the peer set to direct inventory requests;
|
||||
/// - `address_book`: when peer set is busy, it logs address book diagnostics.
|
||||
pub fn new(
|
||||
config: &Config,
|
||||
discover: D,
|
||||
|
@ -172,6 +184,10 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Check background task handles to make sure they're still running.
|
||||
///
|
||||
/// If any background task exits, shuts down all other background tasks,
|
||||
/// and returns an error.
|
||||
fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> {
|
||||
if self.guards.is_empty() {
|
||||
match self.handle_rx.try_recv() {
|
||||
|
@ -187,13 +203,28 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
match Pin::new(&mut self.guards).poll_next(cx) {
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(Some(res)) => res??,
|
||||
Poll::Ready(None) => Err("all background tasks have exited")?,
|
||||
let exit_error = match Pin::new(&mut self.guards).poll_next(cx) {
|
||||
Poll::Pending => return Ok(()),
|
||||
Poll::Ready(Some(res)) => {
|
||||
info!(
|
||||
background_tasks = %self.guards.len(),
|
||||
"a peer set background task exited, shutting down other peer set tasks"
|
||||
);
|
||||
|
||||
// Flatten the join result and inner result,
|
||||
// then turn Ok() task exits into errors.
|
||||
res.map_err(Into::into)
|
||||
.and_then(convert::identity)
|
||||
.and(Err("a peer set background task exited".into()))
|
||||
}
|
||||
Poll::Ready(None) => Err("all peer set background tasks have exited".into()),
|
||||
};
|
||||
|
||||
for guard in self.guards.iter() {
|
||||
guard.abort();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
exit_error
|
||||
}
|
||||
|
||||
fn poll_unready(&mut self, cx: &mut Context<'_>) {
|
||||
|
|
Loading…
Reference in New Issue