testcase lagging

This commit is contained in:
GroovieGermanikus 2024-01-19 09:23:55 +01:00
parent 0200e9d469
commit 11b24bc537
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 29 additions and 8 deletions

View File

@ -72,9 +72,12 @@ impl FromYellowstoneExtractor for BlockMiniExtractor {
enum TestCases { enum TestCases {
Basic, Basic,
SlowReceiver, SlowReceiverStartup,
TemporaryLaggingReceiver,
CloseAfterReceiving, CloseAfterReceiving,
AbortTaskFromOutside,
} }
const TEST_CASE: TestCases = TestCases::TemporaryLaggingReceiver;
#[tokio::main] #[tokio::main]
@ -83,7 +86,6 @@ pub async fn main() {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
// console_subscriber::init(); // console_subscriber::init();
let test_case = TestCases::CloseAfterReceiving;
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
@ -112,20 +114,25 @@ pub async fn main() {
tokio::spawn(async move { tokio::spawn(async move {
if let TestCases::SlowReceiver = test_case { if let TestCases::SlowReceiverStartup = TEST_CASE {
sleep(Duration::from_secs(5)).await; sleep(Duration::from_secs(5)).await;
} }
let mut message_count = 0;
while let Some(message) = green_stream.recv().await { while let Some(message) = green_stream.recv().await {
if let TestCases::AbortTaskFromOutside = TEST_CASE {
if message_count > 5 {
info!("(testcase) aborting task from outside");
jh_geyser_task.abort();
}
}
match message { match message {
Message::GeyserSubscribeUpdate(subscriber_update) => { Message::GeyserSubscribeUpdate(subscriber_update) => {
message_count += 1;
// info!("got update: {:?}", subscriber_update.update_oneof.); // info!("got update: {:?}", subscriber_update.update_oneof.);
info!("got update!!!"); info!("got update!!!");
if let TestCases::CloseAfterReceiving = test_case { if let TestCases::CloseAfterReceiving = TEST_CASE {
info!("(testcase) closing stream after receiving"); info!("(testcase) closing stream after receiving");
return; return;
} }
@ -134,6 +141,14 @@ pub async fn main() {
warn!("Connection attempt: {}", attempt); warn!("Connection attempt: {}", attempt);
} }
} }
if let TestCases::TemporaryLaggingReceiver = TEST_CASE {
if message_count % 3 == 1 {
info!("(testcase) lagging a bit");
sleep(Duration::from_millis(1500)).await;
}
}
} }
warn!("Stream aborted"); warn!("Stream aborted");
}); });

View File

@ -69,7 +69,13 @@ enum State<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
WaitReconnect(Attempt), WaitReconnect(Attempt),
} }
/// return handler will exit on fatal error /// connect to grpc source performing autoconect if required,
/// returns mpsc channel; task will abort on fatal error
///
/// implementation hints:
/// * no panic/unwrap
/// * do not use "?"
/// * do not "return" unless you really want to abort the task
pub fn create_geyser_autoconnection_task( pub fn create_geyser_autoconnection_task(
grpc_source: GrpcSourceConfig, grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest, subscribe_filter: SubscribeRequest,