From 3a032920e5000144e104501d51d52ff570271cb3 Mon Sep 17 00:00:00 2001 From: galactus <96341601+godmodegalactus@users.noreply.github.com> Date: Thu, 29 Feb 2024 10:28:23 +0100 Subject: [PATCH] Bugfix account on demand (#342) * Fixing bug in accounts on demand * cargo fmt * replacing atomicbool with notify --- .../src/subscription_manager.rs | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/accounts-on-demand/src/subscription_manager.rs b/accounts-on-demand/src/subscription_manager.rs index 816426a5..ca514844 100644 --- a/accounts-on-demand/src/subscription_manager.rs +++ b/accounts-on-demand/src/subscription_manager.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashMap, - sync::{atomic::AtomicBool, Arc}, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::StreamExt; use itertools::Itertools; @@ -20,7 +16,7 @@ use solana_lite_rpc_core::{ use solana_sdk::{account::Account, pubkey::Pubkey}; use tokio::sync::{ broadcast::{self, Sender}, - watch, + watch, Notify, }; use yellowstone_grpc_proto::geyser::{ subscribe_request_filter_accounts_filter::Filter, @@ -92,7 +88,7 @@ pub fn start_account_streaming_task( grpc_config: GrpcSourceConfig, accounts_filters: AccountFilters, account_stream_sx: broadcast::Sender, - has_started: Arc, + has_started: Arc, ) -> AnyhowJoinHandle { tokio::spawn(async move { 'main_loop: loop { @@ -200,12 +196,18 @@ pub fn start_account_streaming_task( }; while let Some(message) = account_stream.next().await { - let message = message.unwrap(); + let message = match message { + Ok(message) => message, + Err(status) => { + log::error!("Account on demand grpc error : {}", status.message()); + continue; + } + }; let Some(update) = message.update_oneof else { continue; }; - has_started.store(true, std::sync::atomic::Ordering::Relaxed); + has_started.notify_one(); match update { UpdateOneof::Account(account) => { @@ -271,7 +273,7 @@ pub fn create_grpc_account_streaming_tasks( } let accounts_filters = account_filter_watch.borrow_and_update().clone(); - let has_started = Arc::new(AtomicBool::new(false)); + let has_started = Arc::new(tokio::sync::Notify::new()); let mut current_tasks = grpc_sources .iter() @@ -285,14 +287,13 @@ pub fn create_grpc_account_streaming_tasks( }) .collect_vec(); - 'check_watch: while account_filter_watch.changed().await.is_ok() { + while account_filter_watch.changed().await.is_ok() { ON_DEMAND_SUBSCRIPTION_RESTARTED.inc(); // wait for a second to get all the accounts to update tokio::time::sleep(Duration::from_secs(1)).await; let accounts_filters = account_filter_watch.borrow_and_update().clone(); - let has_started_new = Arc::new(AtomicBool::new(false)); - let elapsed_restart = tokio::time::Instant::now(); + let has_started = Arc::new(tokio::sync::Notify::new()); let new_tasks = grpc_sources .iter() @@ -301,18 +302,18 @@ pub fn create_grpc_account_streaming_tasks( grpc_config.clone(), accounts_filters.clone(), account_sender.clone(), - has_started_new.clone(), + has_started.clone(), ) }) .collect_vec(); - while !has_started_new.load(std::sync::atomic::Ordering::Relaxed) { - if elapsed_restart.elapsed() > Duration::from_secs(60) { - // check if time elapsed during restart is greater than 60ms - log::error!("Tried to restart the accounts on demand task but failed"); - new_tasks.iter().for_each(|x| x.abort()); - continue 'check_watch; - } + if let Err(_elapsed) = + tokio::time::timeout(Duration::from_secs(60), has_started.notified()).await + { + // check if time elapsed during restart is greater than 60ms + log::error!("Tried to restart the accounts on demand task but failed"); + new_tasks.iter().for_each(|x| x.abort()); + continue; } // abort previous tasks