Inside tokio::spawn, loop over Iterator stream and send ClientRequest

msgs on the channel instead

Related to #49
This commit is contained in:
Deirdre Connolly 2019-10-21 14:05:09 -04:00 committed by Deirdre Connolly
parent e65f5a05ea
commit 61a07c67ef
2 changed files with 28 additions and 8 deletions

View File

@ -162,12 +162,12 @@ where
debug!("constructing PeerClient, spawning PeerServer"); debug!("constructing PeerClient, spawning PeerServer");
let (tx, rx) = mpsc::channel(0); let (server_tx, server_rx) = mpsc::channel(0);
let slot = ErrorSlot::default(); let slot = ErrorSlot::default();
let client = PeerClient { let client = PeerClient {
span: connection_span.clone(), span: connection_span.clone(),
server_tx: tx, server_tx: server_tx.clone(),
error_slot: slot.clone(), error_slot: slot.clone(),
}; };
@ -176,7 +176,7 @@ where
let server = PeerServer { let server = PeerServer {
state: ServerState::AwaitingRequest, state: ServerState::AwaitingRequest,
svc: internal_service, svc: internal_service,
client_rx: rx, client_rx: server_rx,
error_slot: slot, error_slot: slot,
peer_tx, peer_tx,
request_timer: None, request_timer: None,
@ -208,12 +208,29 @@ where
.boxed(), .boxed(),
); );
// client.ready().await?; tokio::spawn(async move {
use futures::channel::oneshot;
tokio::spawn( use super::client::ClientRequest;
Interval::new_interval(Duration::from_secs(60))
.for_each(|_| client.call(Request::Ping(Nonce::default()))), let mut server_tx = server_tx;
);
let mut interval_stream = Interval::new_interval(Duration::from_secs(60));
loop {
interval_stream.next().await;
// We discard the server handle because our
// heartbeat `Ping`s are a special case, and we
// don't actually care about the response here.
let (request_tx, _) = oneshot::channel();
let msg = ClientRequest(Request::Ping(Nonce::default()), request_tx);
if server_tx.send(msg).await.is_err() {
return;
}
}
});
Ok(client) Ok(client)
}; };

View File

@ -16,6 +16,9 @@ pub enum Request {
/// Advertises peers to the remote server. /// Advertises peers to the remote server.
PushPeers(Vec<MetaAddr>), PushPeers(Vec<MetaAddr>),
/// Heartbeats triggered on peer connection start. /// Heartbeats triggered on peer connection start.
// This is included as a bit of a hack, it should only be used
// internally for connection management. You should not expect to
// be firing or handling `Ping` requests or `Pong` responses.
Ping(Nonce), Ping(Nonce),
} }