Banking stage subscribe also to have slots notifications

This commit is contained in:
godmodegalactus 2023-12-15 19:33:32 +01:00
parent 3449190818
commit 3384f853ec
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
1 changed files with 7 additions and 20 deletions

View File

@ -43,7 +43,6 @@ pub async fn start_tracking_banking_stage_errors(
map_of_infos: Arc<DashMap<String, BTreeMap<u64, TransactionInfo>>>,
slot_by_errors: Arc<DashMap<u64, u64>>,
slot: Arc<AtomicU64>,
subscribe_to_slots: bool,
) {
loop {
let token: Option<String> = None;
@ -54,20 +53,11 @@ pub async fn start_tracking_banking_stage_errors(
)
.unwrap();
let slot_subscription: HashMap<
String,
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots,
> = if subscribe_to_slots {
log::info!("subscribing to slots on grpc banking errors");
let mut slot_sub = HashMap::new();
slot_sub.insert(
"slot_sub".to_string(),
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots {},
);
slot_sub
} else {
HashMap::new()
};
let mut slot_subscription = HashMap::new();
slot_subscription.insert(
"slot_subscription".to_string(),
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots {},
);
let mut geyser_stream = client
.subscribe_once(
@ -84,7 +74,7 @@ pub async fn start_tracking_banking_stage_errors(
.await
.unwrap();
log::info!("started geyser banking stage subscription");
while let Some(message) = geyser_stream.next().await {
while let Ok(Some(message)) = tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await {
let Ok(message) = message else {
continue;
};
@ -139,8 +129,7 @@ pub async fn start_tracking_banking_stage_errors(
_=>{}
}
}
error!("geyser banking stage connection failed {}", grpc_address);
tokio::time::sleep(Duration::from_secs(1)).await;
error!("geyser banking stage connection failed {} restarting", grpc_address);
}
}
@ -247,7 +236,6 @@ async fn main() {
let postgres = postgres::Postgres::new().await;
let slot = Arc::new(AtomicU64::new(0));
let no_block_subscription = grpc_block_addr.is_none();
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let jhs = args
.banking_grpc_addresses
@ -263,7 +251,6 @@ async fn main() {
map_of_infos,
slot_by_errors,
slot,
no_block_subscription,
)
.await;
})