resubsribing to banking errors notification on failure

This commit is contained in:
godmodegalactus 2023-11-30 17:38:40 +01:00
parent 024f341c77
commit c306d5e508
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
1 changed files with 48 additions and 43 deletions

View File

@ -109,59 +109,64 @@ pub async fn start_tracking_banking_stage_errors(
map_of_infos: Arc<DashMap<String, TransactionInfo>>,
slot_by_errors: Arc<DashMap<u64, u64>>,
) {
let token: Option<String> = None;
let mut client =
yellowstone_grpc_client::GeyserGrpcClient::connect(grpc_address, token, None).unwrap();
let mut geyser_stream = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
Default::default(),
Default::default(),
Some(yellowstone_grpc_proto::prelude::CommitmentLevel::Processed),
Default::default(),
true,
loop {
let token: Option<String> = None;
let mut client = yellowstone_grpc_client::GeyserGrpcClient::connect(
grpc_address.clone(),
token.clone(),
None,
)
.await
.unwrap();
while let Some(message) = geyser_stream.next().await {
let Ok(message) = message else {
continue;
};
let mut geyser_stream = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
Default::default(),
Default::default(),
Some(yellowstone_grpc_proto::prelude::CommitmentLevel::Processed),
Default::default(),
true,
)
.await
.unwrap();
let Some(update) = message.update_oneof else {
while let Some(message) = geyser_stream.next().await {
let Ok(message) = message else {
continue;
};
};
match update {
yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) => {
BANKING_STAGE_ERROR_EVENT_COUNT.inc();
let sig = transaction.signature.to_string();
match slot_by_errors.get_mut(&transaction.slot) {
Some(mut value) => {
*value += 1;
let Some(update) = message.update_oneof else {
continue;
};
if let yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) = update {
BANKING_STAGE_ERROR_EVENT_COUNT.inc();
let sig = transaction.signature.to_string();
match slot_by_errors.get_mut(&transaction.slot) {
Some(mut value) => {
*value += 1;
}
None => {
slot_by_errors.insert(transaction.slot, 1);
}
}
None => {
slot_by_errors.insert(transaction.slot, 1);
match map_of_infos.get_mut(&sig) {
Some(mut x) => {
x.add_notification(&transaction);
}
None => {
let mut x = TransactionInfo::new(&transaction);
x.add_notification(&transaction);
map_of_infos.insert(sig, x);
}
}
}
match map_of_infos.get_mut(&sig) {
Some(mut x) => {
x.add_notification(&transaction);
}
None => {
let mut x = TransactionInfo::new(&transaction);
x.add_notification(&transaction);
map_of_infos.insert(sig, x);
}
}
}
_ => {}
}
error!("geyser banking stage connection failed {}", grpc_address);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}