Switch tower-buffer to use tokio-sync (#152)
This commit is contained in:
parent
e6a3f76707
commit
2d72bc8660
|
@ -10,6 +10,7 @@ tower-service = { version = "0.2", path = "../tower-service" }
|
|||
tower-direct-service = { version = "0.1", path = "../tower-direct-service" }
|
||||
tokio-executor = "0.1"
|
||||
lazycell = "1.2"
|
||||
tokio-sync = "0.1"
|
||||
|
||||
[dev-dependencies]
|
||||
tower-mock = { version = "0.1", path = "../tower-mock" }
|
||||
|
|
|
@ -9,12 +9,13 @@
|
|||
extern crate futures;
|
||||
extern crate lazycell;
|
||||
extern crate tokio_executor;
|
||||
extern crate tokio_sync;
|
||||
extern crate tower_direct_service;
|
||||
extern crate tower_service;
|
||||
|
||||
use futures::future::Executor;
|
||||
use futures::sync::mpsc;
|
||||
use futures::sync::oneshot;
|
||||
use tokio_sync::mpsc;
|
||||
use tokio_sync::oneshot;
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
|
@ -290,7 +291,7 @@ where
|
|||
|
||||
match self.tx.try_send(Message { request, tx }) {
|
||||
Err(e) => {
|
||||
if e.is_disconnected() {
|
||||
if e.is_closed() {
|
||||
ResponseFuture {
|
||||
state: ResponseState::Failed(self.get_error_on_closed()),
|
||||
}
|
||||
|
@ -345,7 +346,7 @@ where
|
|||
Ok(Async::Ready(Ok(f))) => fut = f,
|
||||
Ok(Async::Ready(Err(e))) => return Err(Error::Closed(e)),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(futures::Canceled) => unreachable!(
|
||||
Err(_) => unreachable!(
|
||||
"Worker exited without sending error to all outstanding requests."
|
||||
),
|
||||
},
|
||||
|
@ -405,14 +406,14 @@ where
|
|||
// poll_cancel returns Async::Ready is the receiver is dropped.
|
||||
// Returning NotReady means it is still alive, so we should still
|
||||
// use it.
|
||||
if msg.tx.poll_cancel()?.is_not_ready() {
|
||||
if msg.tx.poll_close()?.is_not_ready() {
|
||||
return Ok(Async::Ready(Some(msg)));
|
||||
}
|
||||
}
|
||||
|
||||
// Get the next request
|
||||
while let Some(mut msg) = try_ready!(self.rx.poll()) {
|
||||
if msg.tx.poll_cancel()?.is_not_ready() {
|
||||
while let Some(mut msg) = try_ready!(self.rx.poll().map_err(|_| ())) {
|
||||
if msg.tx.poll_close()?.is_not_ready() {
|
||||
return Ok(Async::Ready(Some(msg)));
|
||||
}
|
||||
// Otherwise, request is canceled, so pop the next one.
|
||||
|
|
Loading…
Reference in New Issue