Banking stage subscribe also to have slots notifications (#25)
This commit is contained in:
parent
3449190818
commit
a11b235010
27
src/main.rs
27
src/main.rs
|
@ -43,7 +43,6 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
map_of_infos: Arc<DashMap<String, BTreeMap<u64, TransactionInfo>>>,
|
||||
slot_by_errors: Arc<DashMap<u64, u64>>,
|
||||
slot: Arc<AtomicU64>,
|
||||
subscribe_to_slots: bool,
|
||||
) {
|
||||
loop {
|
||||
let token: Option<String> = None;
|
||||
|
@ -54,20 +53,11 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let slot_subscription: HashMap<
|
||||
String,
|
||||
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots,
|
||||
> = if subscribe_to_slots {
|
||||
log::info!("subscribing to slots on grpc banking errors");
|
||||
let mut slot_sub = HashMap::new();
|
||||
slot_sub.insert(
|
||||
"slot_sub".to_string(),
|
||||
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots {},
|
||||
);
|
||||
slot_sub
|
||||
} else {
|
||||
HashMap::new()
|
||||
};
|
||||
let mut slot_subscription = HashMap::new();
|
||||
slot_subscription.insert(
|
||||
"slot_subscription".to_string(),
|
||||
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots {},
|
||||
);
|
||||
|
||||
let mut geyser_stream = client
|
||||
.subscribe_once(
|
||||
|
@ -84,7 +74,7 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
.await
|
||||
.unwrap();
|
||||
log::info!("started geyser banking stage subscription");
|
||||
while let Some(message) = geyser_stream.next().await {
|
||||
while let Ok(Some(message)) = tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await {
|
||||
let Ok(message) = message else {
|
||||
continue;
|
||||
};
|
||||
|
@ -139,8 +129,7 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
_=>{}
|
||||
}
|
||||
}
|
||||
error!("geyser banking stage connection failed {}", grpc_address);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
error!("geyser banking stage connection failed {} restarting", grpc_address);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -247,7 +236,6 @@ async fn main() {
|
|||
|
||||
let postgres = postgres::Postgres::new().await;
|
||||
let slot = Arc::new(AtomicU64::new(0));
|
||||
let no_block_subscription = grpc_block_addr.is_none();
|
||||
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
|
||||
let jhs = args
|
||||
.banking_grpc_addresses
|
||||
|
@ -263,7 +251,6 @@ async fn main() {
|
|||
map_of_infos,
|
||||
slot_by_errors,
|
||||
slot,
|
||||
no_block_subscription,
|
||||
)
|
||||
.await;
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue