Run pubsub test poller in tokio runtime (#8494)
This commit is contained in:
parent
8cf3ef895d
commit
8ec8204a30
|
@ -534,33 +534,37 @@ impl RpcSubscriptions {
|
||||||
pub(crate) mod tests {
|
pub(crate) mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo};
|
use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo};
|
||||||
use jsonrpc_core::futures;
|
use jsonrpc_core::futures::{self, stream::Stream};
|
||||||
use jsonrpc_pubsub::typed::Subscriber;
|
use jsonrpc_pubsub::typed::Subscriber;
|
||||||
use solana_budget_program;
|
use solana_budget_program;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
signature::{Keypair, Signer},
|
signature::{Keypair, Signer},
|
||||||
system_transaction,
|
system_transaction,
|
||||||
};
|
};
|
||||||
use tokio::prelude::{Async, Stream};
|
use std::{fmt::Debug, sync::mpsc::channel, time::Instant};
|
||||||
|
use tokio::{prelude::FutureExt, runtime::Runtime, timer::Delay};
|
||||||
|
|
||||||
|
pub(crate) fn robust_poll_or_panic<T: Debug + Send + 'static>(
|
||||||
|
receiver: futures::sync::mpsc::Receiver<T>,
|
||||||
|
) -> T {
|
||||||
|
let (inner_sender, inner_receiver) = channel();
|
||||||
|
let mut rt = Runtime::new().unwrap();
|
||||||
|
rt.spawn(futures::lazy(|| {
|
||||||
|
let recv_timeout = receiver
|
||||||
|
.into_future()
|
||||||
|
.timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS))
|
||||||
|
.map(move |result| match result {
|
||||||
|
(Some(value), _) => inner_sender.send(value).expect("send error"),
|
||||||
|
(None, _) => panic!("unexpected end of stream"),
|
||||||
|
})
|
||||||
|
.map_err(|err| panic!("stream error {:?}", err));
|
||||||
|
|
||||||
pub(crate) fn robust_poll<T>(
|
|
||||||
mut receiver: futures::sync::mpsc::Receiver<T>,
|
|
||||||
) -> Result<T, RecvTimeoutError> {
|
|
||||||
const INITIAL_DELAY_MS: u64 = RECEIVE_DELAY_MILLIS * 2;
|
const INITIAL_DELAY_MS: u64 = RECEIVE_DELAY_MILLIS * 2;
|
||||||
|
Delay::new(Instant::now() + Duration::from_millis(INITIAL_DELAY_MS))
|
||||||
std::thread::sleep(Duration::from_millis(INITIAL_DELAY_MS));
|
.and_then(|_| recv_timeout)
|
||||||
for _i in 0..5 {
|
.map_err(|err| panic!("timer error {:?}", err))
|
||||||
let found = receiver.poll();
|
}));
|
||||||
if let Ok(Async::Ready(Some(result))) = found {
|
inner_receiver.recv().expect("recv error")
|
||||||
return Ok(result);
|
|
||||||
}
|
|
||||||
std::thread::sleep(Duration::from_millis(RECEIVE_DELAY_MILLIS));
|
|
||||||
}
|
|
||||||
Err(RecvTimeoutError::Timeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn robust_poll_or_panic<T>(receiver: futures::sync::mpsc::Receiver<T>) -> T {
|
|
||||||
robust_poll(receiver).unwrap_or_else(|err| panic!("expected response! {}", err))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
Loading…
Reference in New Issue