diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index 234d0ef3..33eacd2f 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -253,63 +253,67 @@ pub fn create_block_processing_task( block_sx: Sender, commitment_level: CommitmentLevel, ) -> AnyhowJoinHandle { - let mut blocks_subs = HashMap::new(); - blocks_subs.insert( - "client".to_string(), - SubscribeRequestFilterBlocks { - account_include: Default::default(), - include_transactions: Some(true), - include_accounts: Some(false), - include_entries: Some(false), - }, - ); - - let commitment_config = match commitment_level { - CommitmentLevel::Confirmed => CommitmentConfig::confirmed(), - CommitmentLevel::Finalized => CommitmentConfig::finalized(), - CommitmentLevel::Processed => CommitmentConfig::processed(), - }; - tokio::spawn(async move { - // connect to grpc - let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token, None)?; - let mut stream = client - .subscribe_once( - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - blocks_subs, - Default::default(), - Some(commitment_level), - Default::default(), - None, - ) - .await?; + loop { + let mut blocks_subs = HashMap::new(); + blocks_subs.insert( + "client".to_string(), + SubscribeRequestFilterBlocks { + account_include: Default::default(), + include_transactions: Some(true), + include_accounts: Some(false), + include_entries: Some(false), + }, + ); - while let Some(message) = stream.next().await { - let message = message?; - - let Some(update) = message.update_oneof else { - continue; + let commitment_config = match commitment_level { + CommitmentLevel::Confirmed => CommitmentConfig::confirmed(), + CommitmentLevel::Finalized => CommitmentConfig::finalized(), + CommitmentLevel::Processed => CommitmentConfig::processed(), }; - match update { - UpdateOneof::Block(block) => { - let block = process_block(block, commitment_config); - block_sx - .send(block) - .context("Grpc failed to send a block")?; - } - UpdateOneof::Ping(_) => { - log::trace!("GRPC Ping"); - } - u => { - bail!("Unexpected update: {u:?}"); - } - }; + // connect to grpc + let mut client = + GeyserGrpcClient::connect(grpc_addr.clone(), grpc_x_token.clone(), None)?; + let mut stream = client + .subscribe_once( + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + blocks_subs, + Default::default(), + Some(commitment_level), + Default::default(), + None, + ) + .await?; + + while let Some(message) = stream.next().await { + let message = message?; + + let Some(update) = message.update_oneof else { + continue; + }; + + match update { + UpdateOneof::Block(block) => { + let block = process_block(block, commitment_config); + block_sx + .send(block) + .context("Grpc failed to send a block")?; + } + UpdateOneof::Ping(_) => { + log::trace!("GRPC Ping"); + } + _ => { + log::trace!("unknown GRPC notification"); + } + }; + } + log::error!("Grpc block subscription broken (resubscribing)"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - bail!("geyser slot stream ended"); }) } @@ -324,68 +328,71 @@ pub fn create_grpc_subscription( let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10); let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10); - let mut slots = HashMap::new(); - slots.insert( - "client".to_string(), - SubscribeRequestFilterSlots { - filter_by_commitment: Some(true), - }, - ); - let slot_task: AnyhowJoinHandle = { let grpc_x_token = grpc_x_token.clone(); let grpc_addr = grpc_addr.clone(); tokio::spawn(async move { - // connect to grpc - let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token.clone(), None)?; - - let version = client.get_version().await?.version; - if version != expected_grpc_version { - log::warn!( - "Expected grpc version {:?}, got {:?}, continue", - expected_grpc_version, - version + loop { + let mut slots = HashMap::new(); + slots.insert( + "client".to_string(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + }, ); - } - let mut stream = client - .subscribe_once( - slots, - Default::default(), - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - Some(CommitmentLevel::Processed), - Default::default(), - None, - ) - .await?; + // connect to grpc + let mut client = + GeyserGrpcClient::connect(grpc_addr.clone(), grpc_x_token.clone(), None)?; - while let Some(message) = stream.next().await { - let message = message?; + let version = client.get_version().await?.version; + if version != expected_grpc_version { + log::warn!( + "Expected grpc version {:?}, got {:?}, continue", + expected_grpc_version, + version + ); + } + let mut stream = client + .subscribe_once( + slots, + Default::default(), + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + Some(CommitmentLevel::Processed), + Default::default(), + None, + ) + .await?; - let Some(update) = message.update_oneof else { + while let Some(message) = stream.next().await { + let message = message?; + + let Some(update) = message.update_oneof else { continue; }; - match update { - UpdateOneof::Slot(slot) => { - slot_sx - .send(SlotNotification { - estimated_processed_slot: slot.slot, - processed_slot: slot.slot, - }) - .context("Error sending slot notification")?; - } - UpdateOneof::Ping(_) => { - log::trace!("GRPC Ping"); - } - k => { - bail!("Unexpected update: {k:?}"); - } - }; + match update { + UpdateOneof::Slot(slot) => { + slot_sx + .send(SlotNotification { + estimated_processed_slot: slot.slot, + processed_slot: slot.slot, + }) + .context("Error sending slot notification")?; + } + UpdateOneof::Ping(_) => { + log::trace!("GRPC Ping"); + } + k => { + bail!("Unexpected update: {k:?}"); + } + }; + } + log::error!("Grpc slot subscription broken (resubscribing)"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - bail!("geyser slot stream ended"); }) }; diff --git a/cluster-endpoints/src/rpc_polling/vote_accounts_and_cluster_info_polling.rs b/cluster-endpoints/src/rpc_polling/vote_accounts_and_cluster_info_polling.rs index d7001cc4..e9fb8ca3 100644 --- a/cluster-endpoints/src/rpc_polling/vote_accounts_and_cluster_info_polling.rs +++ b/cluster-endpoints/src/rpc_polling/vote_accounts_and_cluster_info_polling.rs @@ -22,7 +22,7 @@ pub fn poll_vote_accounts_and_cluster_info( .send(vote_accounts) .context("Should be able to send vote accounts")?; } - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(600)).await; } }) }