From c823c56773629bb0a50d996d259b438821498fd8 Mon Sep 17 00:00:00 2001 From: Jane Lusby Date: Fri, 11 Dec 2020 14:24:53 -0800 Subject: [PATCH] start working on panic propagation --- tower/Cargo.toml | 2 +- tower/src/buffer/service.rs | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/tower/Cargo.toml b/tower/Cargo.toml index 52b3a66..b4d9ed5 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -24,7 +24,7 @@ keywords = ["io", "async", "non-blocking", "futures", "service"] edition = "2018" [features] -default = ["log"] +default = ["log", "buffer"] log = ["tracing/log"] balance = ["discover", "load", "ready-cache", "make", "rand", "slab", "tokio/stream"] buffer = ["tokio/sync", "tokio/rt", "tokio/stream"] diff --git a/tower/src/buffer/service.rs b/tower/src/buffer/service.rs index e3a77fb..3a9fa56 100644 --- a/tower/src/buffer/service.rs +++ b/tower/src/buffer/service.rs @@ -7,7 +7,10 @@ use super::{ use crate::semaphore::Semaphore; use futures_core::ready; use std::task::{Context, Poll}; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; use tower_service::Service; /// Adds an mpsc buffer in front of an inner service. @@ -32,6 +35,7 @@ where // limit how many items are in the channel. semaphore: Semaphore, handle: Handle, + task_handle: Option>, } impl Buffer @@ -63,8 +67,9 @@ where T::Error: Send + Sync, Request: Send + 'static, { - let (service, worker) = Self::pair(service, bound); - tokio::spawn(worker); + let (mut service, worker) = Self::pair(service, bound); + let task_handle = tokio::spawn(worker); + service.task_handle = Some(task_handle); service } @@ -87,6 +92,7 @@ where tx, handle, semaphore, + task_handle: None, }, worker, ) @@ -158,6 +164,7 @@ where tx: self.tx.clone(), handle: self.handle.clone(), semaphore: self.semaphore.clone(), + task_handle: None, } } }