From 11b24bc537d2e8c0a8fd297e5415372009c634f0 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 19 Jan 2024 09:23:55 +0100 Subject: [PATCH] testcase lagging --- examples/stream_blocks_autoconnect.rs | 29 +++++++++++++++----- src/grpc_subscription_autoreconnect_tasks.rs | 8 +++++- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index 99884fb..0b019b6 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -72,9 +72,12 @@ impl FromYellowstoneExtractor for BlockMiniExtractor { enum TestCases { Basic, - SlowReceiver, + SlowReceiverStartup, + TemporaryLaggingReceiver, CloseAfterReceiving, + AbortTaskFromOutside, } +const TEST_CASE: TestCases = TestCases::TemporaryLaggingReceiver; #[tokio::main] @@ -83,7 +86,6 @@ pub async fn main() { tracing_subscriber::fmt::init(); // console_subscriber::init(); - let test_case = TestCases::CloseAfterReceiving; let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); @@ -112,20 +114,25 @@ pub async fn main() { tokio::spawn(async move { - if let TestCases::SlowReceiver = test_case { + if let TestCases::SlowReceiverStartup = TEST_CASE { sleep(Duration::from_secs(5)).await; } + let mut message_count = 0; while let Some(message) = green_stream.recv().await { - - - + if let TestCases::AbortTaskFromOutside = TEST_CASE { + if message_count > 5 { + info!("(testcase) aborting task from outside"); + jh_geyser_task.abort(); + } + } match message { Message::GeyserSubscribeUpdate(subscriber_update) => { + message_count += 1; // info!("got update: {:?}", subscriber_update.update_oneof.); info!("got update!!!"); - if let TestCases::CloseAfterReceiving = test_case { + if let TestCases::CloseAfterReceiving = TEST_CASE { info!("(testcase) closing stream after receiving"); return; } @@ -134,6 +141,14 @@ pub async fn main() { warn!("Connection attempt: {}", attempt); } } + + if let TestCases::TemporaryLaggingReceiver = TEST_CASE { + if message_count % 3 == 1 { + info!("(testcase) lagging a bit"); + sleep(Duration::from_millis(1500)).await; + } + } + } warn!("Stream aborted"); }); diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index a8a776d..d85db6e 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -69,7 +69,13 @@ enum State>, F: Interceptor> { WaitReconnect(Attempt), } -/// return handler will exit on fatal error +/// connect to grpc source performing autoconect if required, +/// returns mpsc channel; task will abort on fatal error +/// +/// implementation hints: +/// * no panic/unwrap +/// * do not use "?" +/// * do not "return" unless you really want to abort the task pub fn create_geyser_autoconnection_task( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest,