2019-10-22 12:44:08 -07:00
|
|
|
use std::{
|
2019-12-13 14:25:14 -08:00
|
|
|
future::Future,
|
2019-10-22 12:44:08 -07:00
|
|
|
net::SocketAddr,
|
|
|
|
pin::Pin,
|
|
|
|
task::{Context, Poll},
|
|
|
|
};
|
|
|
|
|
2019-12-13 14:25:14 -08:00
|
|
|
use futures::prelude::*;
|
|
|
|
use tokio::net::TcpStream;
|
2019-10-22 12:44:08 -07:00
|
|
|
use tower::{discover::Change, Service, ServiceExt};
|
|
|
|
|
|
|
|
use crate::{BoxedStdError, Request, Response};
|
|
|
|
|
2019-11-27 11:48:41 -08:00
|
|
|
use super::{Client, Handshake, HandshakeError};
|
2019-10-22 12:44:08 -07:00
|
|
|
|
2019-11-27 11:42:59 -08:00
|
|
|
/// A wrapper around [`peer::Handshake`] that opens a TCP connection before
|
2019-10-22 12:44:08 -07:00
|
|
|
/// forwarding to the inner handshake service. Writing this as its own
|
|
|
|
/// [`tower::Service`] lets us apply unified timeout policies, etc.
|
2019-11-27 11:43:59 -08:00
|
|
|
pub struct Connector<S> {
|
2019-11-27 11:42:59 -08:00
|
|
|
handshaker: Handshake<S>,
|
2019-10-22 12:44:08 -07:00
|
|
|
}
|
|
|
|
|
2019-11-27 11:43:59 -08:00
|
|
|
impl<S: Clone> Clone for Connector<S> {
|
2019-10-22 12:44:08 -07:00
|
|
|
fn clone(&self) -> Self {
|
2019-11-27 11:43:59 -08:00
|
|
|
Connector {
|
2019-10-22 12:44:08 -07:00
|
|
|
handshaker: self.handshaker.clone(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-27 11:43:59 -08:00
|
|
|
impl<S> Connector<S> {
|
2019-11-27 11:42:59 -08:00
|
|
|
pub fn new(handshaker: Handshake<S>) -> Self {
|
2019-11-27 11:43:59 -08:00
|
|
|
Connector { handshaker }
|
2019-10-22 12:44:08 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-27 11:43:59 -08:00
|
|
|
impl<S> Service<SocketAddr> for Connector<S>
|
2019-10-22 12:44:08 -07:00
|
|
|
where
|
|
|
|
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
|
|
|
|
S::Future: Send,
|
|
|
|
{
|
2019-11-27 11:27:17 -08:00
|
|
|
type Response = Change<SocketAddr, Client>;
|
2019-10-22 12:44:08 -07:00
|
|
|
type Error = HandshakeError;
|
|
|
|
type Future =
|
|
|
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
|
|
|
|
|
|
|
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
|
|
Poll::Ready(Ok(()))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn call(&mut self, addr: SocketAddr) -> Self::Future {
|
|
|
|
let mut hs = self.handshaker.clone();
|
|
|
|
async move {
|
|
|
|
let stream = TcpStream::connect(addr).await?;
|
|
|
|
hs.ready().await?;
|
|
|
|
let client = hs.call((stream, addr)).await?;
|
|
|
|
Ok(Change::Insert(addr, client))
|
|
|
|
}
|
2019-11-26 22:42:42 -08:00
|
|
|
.boxed()
|
2019-10-22 12:44:08 -07:00
|
|
|
}
|
|
|
|
}
|