diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index cc3344429..1051173fc 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -534,33 +534,37 @@ impl RpcSubscriptions { pub(crate) mod tests { use super::*; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use jsonrpc_core::futures; + use jsonrpc_core::futures::{self, stream::Stream}; use jsonrpc_pubsub::typed::Subscriber; use solana_budget_program; use solana_sdk::{ signature::{Keypair, Signer}, system_transaction, }; - use tokio::prelude::{Async, Stream}; + use std::{fmt::Debug, sync::mpsc::channel, time::Instant}; + use tokio::{prelude::FutureExt, runtime::Runtime, timer::Delay}; - pub(crate) fn robust_poll( - mut receiver: futures::sync::mpsc::Receiver, - ) -> Result { - const INITIAL_DELAY_MS: u64 = RECEIVE_DELAY_MILLIS * 2; + pub(crate) fn robust_poll_or_panic( + receiver: futures::sync::mpsc::Receiver, + ) -> T { + let (inner_sender, inner_receiver) = channel(); + let mut rt = Runtime::new().unwrap(); + rt.spawn(futures::lazy(|| { + let recv_timeout = receiver + .into_future() + .timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) + .map(move |result| match result { + (Some(value), _) => inner_sender.send(value).expect("send error"), + (None, _) => panic!("unexpected end of stream"), + }) + .map_err(|err| panic!("stream error {:?}", err)); - std::thread::sleep(Duration::from_millis(INITIAL_DELAY_MS)); - for _i in 0..5 { - let found = receiver.poll(); - if let Ok(Async::Ready(Some(result))) = found { - return Ok(result); - } - std::thread::sleep(Duration::from_millis(RECEIVE_DELAY_MILLIS)); - } - Err(RecvTimeoutError::Timeout) - } - - pub(crate) fn robust_poll_or_panic(receiver: futures::sync::mpsc::Receiver) -> T { - robust_poll(receiver).unwrap_or_else(|err| panic!("expected response! {}", err)) + const INITIAL_DELAY_MS: u64 = RECEIVE_DELAY_MILLIS * 2; + Delay::new(Instant::now() + Duration::from_millis(INITIAL_DELAY_MS)) + .and_then(|_| recv_timeout) + .map_err(|err| panic!("timer error {:?}", err)) + })); + inner_receiver.recv().expect("recv error") } #[test]