adding proper way to exit for each service

This commit is contained in:
Godmode Galactus 2023-06-12 22:29:36 +02:00
parent 0eeea15b1a
commit 24fa79ba63
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
5 changed files with 80 additions and 30 deletions

View File

@ -1,6 +1,6 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::Duration,
@ -253,6 +253,7 @@ impl BlockListener {
commitment_config: CommitmentConfig,
notifier: Option<NotificationSender>,
estimated_slot: Arc<AtomicU64>,
exit_signal: Arc<AtomicBool>,
) -> anyhow::Result<()> {
let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel();
let (block_schedule_queue_sx, block_schedule_queue_rx) = async_channel::unbounded::<Slot>();
@ -260,14 +261,18 @@ impl BlockListener {
// task to fetch blocks
//
let this = self.clone();
let exit_signal_l = exit_signal.clone();
let slot_indexer_tasks = (0..8).map(move |_| {
let this = this.clone();
let notifier = notifier.clone();
let slot_retry_queue_sx = slot_retry_queue_sx.clone();
let block_schedule_queue_rx = block_schedule_queue_rx.clone();
let exit_signal_l = exit_signal_l.clone();
let task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
loop {
if exit_signal_l.load(Ordering::Relaxed) {
break;
}
match block_schedule_queue_rx.recv().await {
Ok(slot) => {
if commitment_config.is_finalized() {
@ -299,7 +304,7 @@ impl BlockListener {
}
}
}
//bail!("Block Slot channel closed")
bail!("Block Slot channel closed")
});
task
@ -311,9 +316,13 @@ impl BlockListener {
let slot_retry_task: JoinHandle<anyhow::Result<()>> = {
let block_schedule_queue_sx = block_schedule_queue_sx.clone();
let recent_slot = recent_slot.clone();
let exit_signal_l = exit_signal.clone();
tokio::spawn(async move {
while let Some((slot, instant)) = slot_retry_queue_rx.recv().await {
if exit_signal_l.load(Ordering::Relaxed) {
break;
}
BLOCKS_IN_RETRY_QUEUE.dec();
let recent_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed);
// if slot is too old ignore
@ -363,6 +372,9 @@ impl BlockListener {
continue;
}
if exit_signal.load(Ordering::Relaxed) {
break;
}
// filter already processed slots
let new_block_slots: Vec<u64> = (last_latest_slot..new_slot).collect();
// context for lock
@ -384,6 +396,7 @@ impl BlockListener {
last_latest_slot = new_slot;
recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed);
}
Ok(())
});
tokio::select! {
@ -400,13 +413,17 @@ impl BlockListener {
}
// continuosly poll processed blocks and feed into blockstore
pub fn listen_processed(self) -> JoinHandle<anyhow::Result<()>> {
pub fn listen_processed(self, exit_signal: Arc<AtomicBool>) -> JoinHandle<anyhow::Result<()>> {
let block_processor = self.block_processor;
tokio::spawn(async move {
info!("processed block listner started");
loop {
if exit_signal.load(Ordering::Relaxed) {
break;
}
if let Err(err) = block_processor
.poll_latest_block(CommitmentConfig::processed())
.await
@ -417,6 +434,7 @@ impl BlockListener {
// sleep
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
Ok(())
})
}

View File

@ -1,3 +1,5 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use crate::block_listenser::BlockListener;
@ -47,19 +49,27 @@ impl Cleaner {
BLOCKS_IN_BLOCKSTORE.set(self.block_store.number_of_blocks_in_store() as i64);
}
pub fn start(self, ttl_duration: Duration) -> JoinHandle<anyhow::Result<()>> {
pub fn start(
self,
ttl_duration: Duration,
exit_signal: Arc<AtomicBool>,
) -> JoinHandle<anyhow::Result<()>> {
let mut ttl = tokio::time::interval(ttl_duration);
tokio::spawn(async move {
info!("Cleaning memory");
loop {
if exit_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
ttl.tick().await;
self.clean_tx_sender(ttl_duration);
self.clean_block_listeners(ttl_duration);
self.clean_block_store(ttl_duration).await;
}
Ok(())
})
}
}

View File

@ -58,7 +58,6 @@ pub struct TpuService {
identity_stakes: Arc<RwLock<IdentityStakes>>,
txs_sent_store: TxStore,
leader_schedule: Arc<LeaderSchedule>,
exit_signal: Arc<AtomicBool>,
}
impl TpuService {
@ -92,7 +91,6 @@ impl TpuService {
identity,
identity_stakes: Arc::new(RwLock::new(IdentityStakes::default())),
txs_sent_store,
exit_signal: Arc::new(AtomicBool::new(false)),
})
}
@ -126,10 +124,6 @@ impl TpuService {
Ok(())
}
fn check_exit_signal(&self) -> bool {
self.exit_signal.load(Ordering::Relaxed)
}
async fn update_quic_connections(&self) {
let estimated_slot = self.estimated_slot.load(Ordering::Relaxed);
let current_slot = self.current_slot.load(Ordering::Relaxed);
@ -168,7 +162,15 @@ impl TpuService {
.await;
}
async fn update_current_slot(&self, update_notifier: tokio::sync::mpsc::UnboundedSender<u64>) {
fn check_exit_signal(exit_signal: &Arc<AtomicBool>) -> bool {
exit_signal.load(Ordering::Relaxed)
}
async fn update_current_slot(
&self,
update_notifier: tokio::sync::mpsc::UnboundedSender<u64>,
exit_signal: Arc<AtomicBool>,
) {
let current_slot = self.current_slot.clone();
let update_slot = |slot: u64| {
if slot > current_slot.load(Ordering::Relaxed) {
@ -179,7 +181,7 @@ impl TpuService {
};
loop {
if self.check_exit_signal() {
if Self::check_exit_signal(&exit_signal) {
break;
}
// always loop update the current slots as it is central to working of TPU
@ -189,7 +191,7 @@ impl TpuService {
}
}
pub async fn start(&self) -> anyhow::Result<()> {
pub async fn start(&self, exit_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
self.leader_schedule
.load_cluster_info(self.rpc_client.clone())
.await?;
@ -198,17 +200,18 @@ impl TpuService {
self.update_quic_connections().await;
let this = self.clone();
let exit_signal_l = exit_signal.clone();
let jh_update_leaders = tokio::spawn(async move {
let mut last_cluster_info_update = Instant::now();
let leader_schedule_update_interval =
Duration::from_secs(LEADER_SCHEDULE_UPDATE_INTERVAL);
let cluster_info_update_interval = Duration::from_secs(CLUSTERINFO_REFRESH_TIME);
loop {
if this.check_exit_signal() {
if Self::check_exit_signal(&exit_signal_l) {
break;
}
tokio::time::sleep(leader_schedule_update_interval).await;
if this.check_exit_signal() {
if Self::check_exit_signal(&exit_signal_l) {
break;
}
@ -228,19 +231,20 @@ impl TpuService {
let this = self.clone();
let (slot_sender, slot_reciever) = tokio::sync::mpsc::unbounded_channel::<Slot>();
let exit_signal_l = exit_signal.clone();
let slot_sub_task: AnyhowJoinHandle = tokio::spawn(async move {
this.update_current_slot(slot_sender).await;
this.update_current_slot(slot_sender, exit_signal_l).await;
Ok(())
});
let estimated_slot = self.estimated_slot.clone();
let current_slot = self.current_slot.clone();
let this = self.clone();
let exit_signal_l = exit_signal.clone();
let estimated_slot_calculation = tokio::spawn(async move {
let mut slot_update_notifier = slot_reciever;
loop {
if this.check_exit_signal() {
if Self::check_exit_signal(&exit_signal_l) {
break;
}
@ -277,8 +281,4 @@ impl TpuService {
pub fn get_estimated_slot_holder(&self) -> Arc<AtomicU64> {
self.estimated_slot.clone()
}
pub fn exit(&self) {
self.exit_signal.store(true, Ordering::Relaxed);
}
}

View File

@ -2,7 +2,10 @@
// It will send, replay if necessary and confirm by listening to blocks
use std::{
sync::{atomic::AtomicBool, Arc},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
@ -71,9 +74,12 @@ impl TransactionServiceBuilder {
let exit_signal_t = exit_signal.clone();
let block_store_t = block_store.clone();
let jh_services: JoinHandle<String> = tokio::spawn(async move {
let tpu_service_fx = tpu_service.start();
let tpu_service_fx = tpu_service.start(exit_signal_t.clone());
let tx_sender_jh = tx_sender.clone().execute(tx_recv, notifier.clone());
let tx_sender_jh =
tx_sender
.clone()
.execute(tx_recv, notifier.clone(), exit_signal_t.clone());
let replay_service = tx_replayer.start_service(
replay_channel_task,
@ -85,18 +91,22 @@ impl TransactionServiceBuilder {
CommitmentConfig::finalized(),
notifier.clone(),
tpu_service.get_estimated_slot_holder(),
exit_signal_t.clone(),
);
let confirmed_block_listener = block_listner.clone().listen(
CommitmentConfig::confirmed(),
None,
tpu_service.get_estimated_slot_holder(),
exit_signal_t.clone(),
);
let processed_block_listener = block_listner.clone().listen_processed();
let processed_block_listener = block_listner
.clone()
.listen_processed(exit_signal_t.clone());
let cleaner = Cleaner::new(tx_sender.clone(), block_listner.clone(), block_store_t)
.start(clean_interval);
.start(clean_interval, exit_signal_t);
tokio::select! {
res = tpu_service_fx => {
@ -199,4 +209,8 @@ impl TransactionService {
}
Ok(signature.to_string())
}
pub fn stop(&self) {
self.exit_signal.store(true, Ordering::Relaxed)
}
}

View File

@ -1,4 +1,7 @@
use std::time::{Duration, Instant};
use std::{
sync::{atomic::AtomicBool, Arc},
time::{Duration, Instant},
};
use anyhow::bail;
use chrono::Utc;
@ -130,6 +133,7 @@ impl TxSender {
self,
mut recv: Receiver<(String, WireTransaction, u64)>,
notifier: Option<NotificationSender>,
exit_signal: Arc<AtomicBool>,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
let tx_sender = self.clone();
@ -137,6 +141,9 @@ impl TxSender {
let mut sigs_and_slots = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut txs = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut timeout_interval = INTERVAL_PER_BATCH_IN_MS;
if exit_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
// In solana there in sig verify stage rate is limited to 2000 txs in 50ms
// taking this as reference
@ -180,6 +187,7 @@ impl TxSender {
.forward_txs(sigs_and_slots, txs, notifier.clone())
.await;
}
Ok(())
})
}