Use `broadcast::Receiver::recv` instead of `next` (#2933)
On newer versions of Tokio the `Receiver` doesn't implement `Stream`.
This commit is contained in:
parent
0db35fbee0
commit
6905c79fd6
|
@ -6,7 +6,6 @@ use std::{
|
||||||
use futures::future::TryFutureExt;
|
use futures::future::TryFutureExt;
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
stream::StreamExt,
|
|
||||||
sync::mpsc,
|
sync::mpsc,
|
||||||
time::{sleep, Sleep},
|
time::{sleep, Sleep},
|
||||||
};
|
};
|
||||||
|
@ -127,7 +126,7 @@ where
|
||||||
let mut pending_items = 0usize;
|
let mut pending_items = 0usize;
|
||||||
loop {
|
loop {
|
||||||
match timer.as_mut() {
|
match timer.as_mut() {
|
||||||
None => match self.rx.next().await {
|
None => match self.rx.recv().await {
|
||||||
// The first message in a new batch.
|
// The first message in a new batch.
|
||||||
Some(msg) => {
|
Some(msg) => {
|
||||||
let span = msg.span;
|
let span = msg.span;
|
||||||
|
|
Loading…
Reference in New Issue