tower-buffer: fix Worker closing when service.poll_ready was NotReady

This commit is contained in:
Sean McArthur 2018-11-30 16:05:13 -08:00
parent 6168ba27b5
commit 801adb18db
2 changed files with 31 additions and 3 deletions

View File

@ -398,9 +398,11 @@ where
Ok(Async::NotReady) => {
// Put out current message back in its slot.
self.current_message = Some(msg);
// We don't want to return quite yet
// We want to also make progress on current requests
break;
if !any_outstanding {
return Ok(Async::NotReady);
}
// We may want to also make progress on current requests
}
Err(_) => {
self.state.open.store(false, Ordering::Release);
@ -428,6 +430,7 @@ where
// We are all done!
break;
} else {
debug_assert!(any_outstanding);
if let Async::Ready(()) = self.service.poll_service().map_err(|_| ())? {
// Note to future iterations that there's no reason to call poll_service.
any_outstanding = false;

View File

@ -56,6 +56,31 @@ fn clears_canceled_requests() {
assert_eq!(res3.wait().unwrap(), "world3");
}
#[test]
fn when_inner_is_not_ready() {
let (mut service, mut handle) = new_service();
// Make the service NotReady
handle.allow(0);
let mut res1 = service.call("hello");
// Allow the Buffer's executor to do work
::std::thread::sleep(::std::time::Duration::from_millis(100));
with_task(|| {
assert!(res1.poll().expect("res1.poll").is_not_ready());
assert!(handle.poll_request().expect("poll_request").is_not_ready());
});
handle.allow(1);
let req1 = handle.next_request().expect("next_request1");
assert_eq!(*req1, "hello");
req1.respond("world");
assert_eq!(res1.wait().expect("res1.wait"), "world");
}
type Mock = tower_mock::Mock<&'static str, &'static str, ()>;
type Handle = tower_mock::Handle<&'static str, &'static str, ()>;