From 40309d8f5811fceb023ac7325a43929e95df23b0 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Mon, 12 Jun 2023 11:03:47 +0200 Subject: [PATCH] Finish implementing exit on tpu service --- services/src/block_listenser.rs | 2 -- services/src/tpu_utils/tpu_service.rs | 12 +++++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/services/src/block_listenser.rs b/services/src/block_listenser.rs index a92393d2..8d59503a 100644 --- a/services/src/block_listenser.rs +++ b/services/src/block_listenser.rs @@ -8,8 +8,6 @@ use std::{ use anyhow::{bail, Context}; use chrono::{TimeZone, Utc}; - -use jsonrpsee::SubscriptionSink; use log::{error, info, trace}; use prometheus::{ core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter, diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index 1e063a5d..d36132a5 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -174,7 +174,7 @@ impl TpuService { async fn update_current_slot( &self, update_notifier: tokio::sync::mpsc::UnboundedSender, - ) -> anyhow::Result<()> { + ) { let current_slot = self.current_slot.clone(); let update_slot = |slot: u64| { if slot > current_slot.load(Ordering::Relaxed) { @@ -185,6 +185,9 @@ impl TpuService { }; loop { + if self.check_exit_signal() { + break; + } // always loop update the current slots as it is central to working of TPU let _ = SolanaUtils::poll_slots( self.rpc_client.clone(), @@ -230,14 +233,13 @@ impl TpuService { } } } - Ok(()) }); let this = self.clone(); let (slot_sender, slot_reciever) = tokio::sync::mpsc::unbounded_channel::(); let slot_sub_task: AnyhowJoinHandle = tokio::spawn(async move { - this.update_current_slot(slot_sender).await?; + this.update_current_slot(slot_sender).await; Ok(()) }); @@ -284,4 +286,8 @@ impl TpuService { pub fn get_estimated_slot_holder(&self) -> Arc { self.estimated_slot.clone() } + + pub fn exit(&self) { + self.exit_signal.store(true, Ordering::Relaxed); + } }