Bugfix account on demand (#342)

* Fixing bug in accounts on demand

* cargo fmt

* replacing atomicbool with notify
This commit is contained in:
galactus 2024-02-29 10:28:23 +01:00 committed by GitHub
parent 1dcd3abcf2
commit 3a032920e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 22 additions and 21 deletions

View File

@ -1,8 +1,4 @@
use std::{
collections::HashMap,
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use futures::StreamExt;
use itertools::Itertools;
@ -20,7 +16,7 @@ use solana_lite_rpc_core::{
use solana_sdk::{account::Account, pubkey::Pubkey};
use tokio::sync::{
broadcast::{self, Sender},
watch,
watch, Notify,
};
use yellowstone_grpc_proto::geyser::{
subscribe_request_filter_accounts_filter::Filter,
@ -92,7 +88,7 @@ pub fn start_account_streaming_task(
grpc_config: GrpcSourceConfig,
accounts_filters: AccountFilters,
account_stream_sx: broadcast::Sender<AccountNotificationMessage>,
has_started: Arc<AtomicBool>,
has_started: Arc<Notify>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
'main_loop: loop {
@ -200,12 +196,18 @@ pub fn start_account_streaming_task(
};
while let Some(message) = account_stream.next().await {
let message = message.unwrap();
let message = match message {
Ok(message) => message,
Err(status) => {
log::error!("Account on demand grpc error : {}", status.message());
continue;
}
};
let Some(update) = message.update_oneof else {
continue;
};
has_started.store(true, std::sync::atomic::Ordering::Relaxed);
has_started.notify_one();
match update {
UpdateOneof::Account(account) => {
@ -271,7 +273,7 @@ pub fn create_grpc_account_streaming_tasks(
}
let accounts_filters = account_filter_watch.borrow_and_update().clone();
let has_started = Arc::new(AtomicBool::new(false));
let has_started = Arc::new(tokio::sync::Notify::new());
let mut current_tasks = grpc_sources
.iter()
@ -285,14 +287,13 @@ pub fn create_grpc_account_streaming_tasks(
})
.collect_vec();
'check_watch: while account_filter_watch.changed().await.is_ok() {
while account_filter_watch.changed().await.is_ok() {
ON_DEMAND_SUBSCRIPTION_RESTARTED.inc();
// wait for a second to get all the accounts to update
tokio::time::sleep(Duration::from_secs(1)).await;
let accounts_filters = account_filter_watch.borrow_and_update().clone();
let has_started_new = Arc::new(AtomicBool::new(false));
let elapsed_restart = tokio::time::Instant::now();
let has_started = Arc::new(tokio::sync::Notify::new());
let new_tasks = grpc_sources
.iter()
@ -301,18 +302,18 @@ pub fn create_grpc_account_streaming_tasks(
grpc_config.clone(),
accounts_filters.clone(),
account_sender.clone(),
has_started_new.clone(),
has_started.clone(),
)
})
.collect_vec();
while !has_started_new.load(std::sync::atomic::Ordering::Relaxed) {
if elapsed_restart.elapsed() > Duration::from_secs(60) {
// check if time elapsed during restart is greater than 60ms
log::error!("Tried to restart the accounts on demand task but failed");
new_tasks.iter().for_each(|x| x.abort());
continue 'check_watch;
}
if let Err(_elapsed) =
tokio::time::timeout(Duration::from_secs(60), has_started.notified()).await
{
// check if time elapsed during restart is greater than 60ms
log::error!("Tried to restart the accounts on demand task but failed");
new_tasks.iter().for_each(|x| x.abort());
continue;
}
// abort previous tasks