Remove tower/examples (#261)
The examples create a cyclical dependency between this repository and tower-hyper. Examples are moved to https://github.com/tower-rs/examples.
This commit is contained in:
parent
c339f4bf13
commit
91c8357db1
|
@ -32,12 +32,7 @@ tower-timeout = { version = "0.1", path = "../tower-timeout" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
futures = "0.1"
|
futures = "0.1"
|
||||||
tower-hyper = { git = "https://github.com/tower-rs/tower-hyper", rev = "9dfe56f" }
|
|
||||||
tokio-tcp = "0.1"
|
|
||||||
hyper = "0.12"
|
|
||||||
log = "0.4.1"
|
log = "0.4.1"
|
||||||
tokio = "0.1"
|
tokio = "0.1"
|
||||||
env_logger = { version = "0.5.3", default-features = false }
|
env_logger = { version = "0.5.3", default-features = false }
|
||||||
tokio-timer = "0.1"
|
|
||||||
futures-cpupool = "0.1"
|
|
||||||
void = "1"
|
void = "1"
|
||||||
|
|
|
@ -1,160 +0,0 @@
|
||||||
//! Spawns a task to respond to `Service` requests.
|
|
||||||
//!
|
|
||||||
//! The example demonstrates how to implement a service to handle backpressure
|
|
||||||
//! as well as how to use a service that can reach capacity.
|
|
||||||
//!
|
|
||||||
//! A task is used to handle service requests. The requests are dispatched to
|
|
||||||
//! the task using a channel. The task is implemented such requests can pile up
|
|
||||||
//! (responses are sent back after a fixed timeout).
|
|
||||||
|
|
||||||
#![deny(warnings)]
|
|
||||||
|
|
||||||
use env_logger;
|
|
||||||
use futures::{
|
|
||||||
future::{Executor, FutureResult},
|
|
||||||
sync::{mpsc, oneshot},
|
|
||||||
Async, Future, IntoFuture, Poll, Stream,
|
|
||||||
};
|
|
||||||
use futures_cpupool::CpuPool;
|
|
||||||
use log::info;
|
|
||||||
use tokio_timer::Timer;
|
|
||||||
use tower::{MakeService, Service, ServiceExt};
|
|
||||||
|
|
||||||
use std::{io, time::Duration};
|
|
||||||
|
|
||||||
/// Service that dispatches requests to a side task using a channel.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ChannelService {
|
|
||||||
// Send the request and a oneshot Sender to push the response into.
|
|
||||||
tx: Sender,
|
|
||||||
}
|
|
||||||
|
|
||||||
type Sender = mpsc::Sender<(String, oneshot::Sender<String>)>;
|
|
||||||
|
|
||||||
/// Creates new `ChannelService` services.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct NewChannelService {
|
|
||||||
// The number of requests to buffer
|
|
||||||
buffer: usize,
|
|
||||||
|
|
||||||
// The timer
|
|
||||||
timer: Timer,
|
|
||||||
|
|
||||||
// Executor to spawn the task on
|
|
||||||
pool: CpuPool,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Response backed by a oneshot.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ResponseFuture {
|
|
||||||
rx: Option<oneshot::Receiver<String>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The service error
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Error {
|
|
||||||
AtCapacity,
|
|
||||||
Failed,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl NewChannelService {
|
|
||||||
pub fn new(buffer: usize, pool: CpuPool) -> Self {
|
|
||||||
let timer = Timer::default();
|
|
||||||
|
|
||||||
NewChannelService {
|
|
||||||
buffer,
|
|
||||||
timer,
|
|
||||||
pool,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Service<()> for NewChannelService {
|
|
||||||
type Response = ChannelService;
|
|
||||||
type Error = io::Error;
|
|
||||||
type Future = FutureResult<Self::Response, Self::Error>;
|
|
||||||
|
|
||||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
|
||||||
Ok(().into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&mut self, _target: ()) -> Self::Future {
|
|
||||||
let (tx, rx) = mpsc::channel::<(String, oneshot::Sender<String>)>(self.buffer);
|
|
||||||
let timer = self.timer.clone();
|
|
||||||
|
|
||||||
// Create the task that proceses the request
|
|
||||||
self.pool
|
|
||||||
.execute(rx.for_each(move |(msg, tx)| {
|
|
||||||
timer.sleep(Duration::from_millis(500)).then(move |res| {
|
|
||||||
res.unwrap();
|
|
||||||
let _ = tx.send(msg);
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
}))
|
|
||||||
.map(|_| ChannelService { tx })
|
|
||||||
.map_err(|_| io::ErrorKind::Other.into())
|
|
||||||
.into_future()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Service<String> for ChannelService {
|
|
||||||
type Response = String;
|
|
||||||
type Error = Error;
|
|
||||||
type Future = ResponseFuture;
|
|
||||||
|
|
||||||
fn poll_ready(&mut self) -> Poll<(), Error> {
|
|
||||||
self.tx.poll_ready().map_err(|_| Error::Failed)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&mut self, request: String) -> ResponseFuture {
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
|
|
||||||
match self.tx.try_send((request, tx)) {
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(_) => {
|
|
||||||
return ResponseFuture { rx: None };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ResponseFuture { rx: Some(rx) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Future for ResponseFuture {
|
|
||||||
type Item = String;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<String, Error> {
|
|
||||||
match self.rx {
|
|
||||||
Some(ref mut rx) => match rx.poll() {
|
|
||||||
Ok(Async::Ready(v)) => Ok(v.into()),
|
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
|
||||||
Err(_) => Err(Error::Failed),
|
|
||||||
},
|
|
||||||
None => Err(Error::AtCapacity),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn main() {
|
|
||||||
env_logger::init();
|
|
||||||
|
|
||||||
let mut new_service = NewChannelService::new(5, CpuPool::new(1));
|
|
||||||
|
|
||||||
// Get the service
|
|
||||||
let mut service = new_service.make_service(()).wait().unwrap();
|
|
||||||
let mut responses = vec![];
|
|
||||||
|
|
||||||
for i in 0..10 {
|
|
||||||
service = service.ready().wait().unwrap();
|
|
||||||
|
|
||||||
info!("sending request; i={}", i);
|
|
||||||
|
|
||||||
let request = format!("request={}", i);
|
|
||||||
responses.push(service.call(request));
|
|
||||||
}
|
|
||||||
|
|
||||||
for response in responses {
|
|
||||||
println!("response={:?}", response.wait());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,58 +0,0 @@
|
||||||
use futures::Future;
|
|
||||||
use hyper::{
|
|
||||||
client::{connect::Destination, HttpConnector},
|
|
||||||
Request, Response, Uri,
|
|
||||||
};
|
|
||||||
use std::time::Duration;
|
|
||||||
use tower::{builder::ServiceBuilder, ServiceExt};
|
|
||||||
use tower_hyper::{
|
|
||||||
client::{Builder, Connect},
|
|
||||||
retry::{Body, RetryPolicy},
|
|
||||||
util::Connector,
|
|
||||||
};
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
let fut = futures::lazy(|| {
|
|
||||||
request().map(|resp| {
|
|
||||||
dbg!(resp);
|
|
||||||
})
|
|
||||||
});
|
|
||||||
hyper::rt::run(fut)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn request() -> impl Future<Item = Response<hyper::Body>, Error = ()> {
|
|
||||||
let connector = Connector::new(HttpConnector::new(1));
|
|
||||||
let hyper = Connect::new(connector, Builder::new());
|
|
||||||
|
|
||||||
// RetryPolicy is a very simple policy that retries `n` times
|
|
||||||
// if the response has a 500 status code. Here, `n` is 5.
|
|
||||||
let policy = RetryPolicy::new(5);
|
|
||||||
// We're calling the tower/examples/server.rs.
|
|
||||||
let dst = Destination::try_from_uri(Uri::from_static("http://127.0.0.1:3000")).unwrap();
|
|
||||||
|
|
||||||
// Now, to build the service! We use two BufferLayers in order to:
|
|
||||||
// - provide backpressure for the RateLimitLayer, and ConcurrencyLimitLayer
|
|
||||||
// - meet `RetryLayer`'s requirement that our service implement `Service + Clone`
|
|
||||||
// - ..and to provide cheap clones on the service.
|
|
||||||
let client = ServiceBuilder::new()
|
|
||||||
.buffer(5)
|
|
||||||
.rate_limit(5, Duration::from_secs(1))
|
|
||||||
.concurrency_limit(5)
|
|
||||||
.retry(policy)
|
|
||||||
.buffer(5)
|
|
||||||
.make_service(hyper);
|
|
||||||
|
|
||||||
client
|
|
||||||
// Make a `Service` to `dst`...
|
|
||||||
.oneshot(dst)
|
|
||||||
.map_err(|e| panic!("connect error: {:?}", e))
|
|
||||||
.and_then(|svc| {
|
|
||||||
let req = Request::builder()
|
|
||||||
.method("GET")
|
|
||||||
.body(Body::from(Vec::new()))
|
|
||||||
.unwrap();
|
|
||||||
// Send the request when `svc` is ready...
|
|
||||||
svc.oneshot(req)
|
|
||||||
})
|
|
||||||
.map_err(|e| panic!("ruh roh: {:?}", e))
|
|
||||||
}
|
|
|
@ -1,68 +0,0 @@
|
||||||
use futures::{future, Future, Poll, Stream};
|
|
||||||
use hyper::{self, Body, Request, Response};
|
|
||||||
use tokio_tcp::TcpListener;
|
|
||||||
use tower::{builder::ServiceBuilder, Service};
|
|
||||||
use tower_hyper::{body::LiftBody, server::Server};
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
hyper::rt::run(future::lazy(|| {
|
|
||||||
let addr = "127.0.0.1:3000".parse().unwrap();
|
|
||||||
let bind = TcpListener::bind(&addr).expect("bind");
|
|
||||||
|
|
||||||
println!("Listening on http://{}", addr);
|
|
||||||
|
|
||||||
let maker = ServiceBuilder::new()
|
|
||||||
.concurrency_limit(5)
|
|
||||||
.make_service(MakeSvc);
|
|
||||||
|
|
||||||
let server = Server::new(maker);
|
|
||||||
|
|
||||||
bind.incoming()
|
|
||||||
.fold(server, |mut server, stream| {
|
|
||||||
if let Err(e) = stream.set_nodelay(true) {
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
hyper::rt::spawn(
|
|
||||||
server
|
|
||||||
.serve(stream)
|
|
||||||
.map_err(|e| panic!("Server error {:?}", e)),
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(server)
|
|
||||||
})
|
|
||||||
.map_err(|e| panic!("serve error: {:?}", e))
|
|
||||||
.map(|_| {})
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Svc;
|
|
||||||
impl Service<Request<Body>> for Svc {
|
|
||||||
type Response = Response<LiftBody<Body>>;
|
|
||||||
type Error = hyper::Error;
|
|
||||||
type Future = future::FutureResult<Self::Response, Self::Error>;
|
|
||||||
|
|
||||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
|
||||||
Ok(().into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&mut self, _req: Request<Body>) -> Self::Future {
|
|
||||||
let res = Response::new(LiftBody::new(Body::from("Hello World!")));
|
|
||||||
future::ok(res)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct MakeSvc;
|
|
||||||
impl Service<()> for MakeSvc {
|
|
||||||
type Response = Svc;
|
|
||||||
type Error = hyper::Error;
|
|
||||||
type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error> + Send + 'static>;
|
|
||||||
|
|
||||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
|
||||||
Ok(().into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&mut self, _: ()) -> Self::Future {
|
|
||||||
Box::new(future::ok(Svc))
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue