diff --git a/src/main.rs b/src/main.rs index 1111e3e..699d499 100644 --- a/src/main.rs +++ b/src/main.rs @@ -109,59 +109,64 @@ pub async fn start_tracking_banking_stage_errors( map_of_infos: Arc>, slot_by_errors: Arc>, ) { - let token: Option = None; - let mut client = - yellowstone_grpc_client::GeyserGrpcClient::connect(grpc_address, token, None).unwrap(); - - let mut geyser_stream = client - .subscribe_once( - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - Default::default(), - Default::default(), - Some(yellowstone_grpc_proto::prelude::CommitmentLevel::Processed), - Default::default(), - true, + loop { + let token: Option = None; + let mut client = yellowstone_grpc_client::GeyserGrpcClient::connect( + grpc_address.clone(), + token.clone(), + None, ) - .await .unwrap(); - while let Some(message) = geyser_stream.next().await { - let Ok(message) = message else { - continue; - }; + let mut geyser_stream = client + .subscribe_once( + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + Default::default(), + Default::default(), + Some(yellowstone_grpc_proto::prelude::CommitmentLevel::Processed), + Default::default(), + true, + ) + .await + .unwrap(); - let Some(update) = message.update_oneof else { + while let Some(message) = geyser_stream.next().await { + let Ok(message) = message else { continue; - }; + }; - match update { - yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) => { - BANKING_STAGE_ERROR_EVENT_COUNT.inc(); - let sig = transaction.signature.to_string(); - match slot_by_errors.get_mut(&transaction.slot) { - Some(mut value) => { - *value += 1; + let Some(update) = message.update_oneof else { + continue; + }; + + if let yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) = update { + BANKING_STAGE_ERROR_EVENT_COUNT.inc(); + let sig = transaction.signature.to_string(); + match slot_by_errors.get_mut(&transaction.slot) { + Some(mut value) => { + *value += 1; + } + None => { + slot_by_errors.insert(transaction.slot, 1); + } } - None => { - slot_by_errors.insert(transaction.slot, 1); + match map_of_infos.get_mut(&sig) { + Some(mut x) => { + x.add_notification(&transaction); + } + None => { + let mut x = TransactionInfo::new(&transaction); + x.add_notification(&transaction); + map_of_infos.insert(sig, x); + } } } - match map_of_infos.get_mut(&sig) { - Some(mut x) => { - x.add_notification(&transaction); - } - None => { - let mut x = TransactionInfo::new(&transaction); - x.add_notification(&transaction); - map_of_infos.insert(sig, x); - } - } - } - _ => {} } + error!("geyser banking stage connection failed {}", grpc_address); + tokio::time::sleep(Duration::from_secs(1)).await; } }