Fixing the deadlock

This commit is contained in:
godmodegalactus 2024-01-26 11:43:19 +01:00
parent ddf719cdcf
commit 2b16651f2b
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
2 changed files with 49 additions and 32 deletions

View File

@ -1,12 +1,14 @@
use dashmap::{DashMap, DashSet};
use dashmap::DashMap;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use serde::{Deserialize, Serialize};
use solana_address_lookup_table_program::state::AddressLookupTable;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{account::ReadableAccount, commitment_config::CommitmentConfig, pubkey::Pubkey};
use std::{sync::Arc, time::Duration};
use std::{collections::HashSet, sync::Arc, time::Duration};
use crate::block_info::TransactionAccount;
use tokio::sync::RwLock;
lazy_static::lazy_static! {
static ref ALTS_IN_STORE: IntGauge =
register_int_gauge!(opts!("banking_stage_sidecar_alts_stored", "Alts stored in sidecar")).unwrap();
@ -16,7 +18,7 @@ lazy_static::lazy_static! {
pub struct ALTStore {
rpc_client: Arc<RpcClient>,
pub map: Arc<DashMap<Pubkey, Vec<Pubkey>>>,
is_loading: Arc<DashSet<Pubkey>>,
under_loading: Arc<RwLock<HashSet<Pubkey>>>,
}
impl ALTStore {
@ -24,29 +26,30 @@ impl ALTStore {
Self {
rpc_client,
map: Arc::new(DashMap::new()),
is_loading: Arc::new(DashSet::new()),
under_loading: Arc::new(RwLock::new(HashSet::new())),
}
}
pub async fn load_all_alts(&self, alts_list: Vec<Pubkey>) {
let alts_list = alts_list
let alts_list = {
let lk = self.under_loading.read().await;
alts_list
.iter()
.filter(|x| !self.map.contains_key(x) && !self.is_loading.contains(x))
.filter(|x| !self.map.contains_key(x) && !lk.contains(x))
.cloned()
.collect_vec();
.collect_vec()
};
if alts_list.is_empty() {
return;
}
// add in loading list
alts_list.iter().for_each(|x| {
self.is_loading.insert(x.clone());
});
self.is_loading(&alts_list).await;
log::info!("Preloading {} ALTs", alts_list.len());
for batches in alts_list.chunks(1000).map(|x| x.to_vec()) {
let tasks = batches.chunks(10).map(|batch| {
let tasks = batches.chunks(100).map(|batch| {
let batch = batch.to_vec();
let rpc_client = self.rpc_client.clone();
let this = self.clone();
@ -74,10 +77,9 @@ impl ALTStore {
}
}
alts_list.iter().for_each(|x| {
self.is_loading.remove(x);
});
log::info!("Finished Loading {} ALTs", alts_list.len());
self.finished_loading(&alts_list).await;
ALTS_IN_STORE.set(alts_list.len() as i64);
}
pub fn save_account(&self, address: &Pubkey, data: &[u8]) {
@ -93,7 +95,7 @@ impl ALTStore {
}
pub async fn reload_alt_from_rpc(&self, alt: &Pubkey) {
self.is_loading.insert(alt.clone());
self.is_loading(&vec![*alt]).await;
let response = self
.rpc_client
.get_account_with_commitment(alt, CommitmentConfig::processed())
@ -103,7 +105,7 @@ impl ALTStore {
self.save_account(alt, account.data());
}
}
self.is_loading.remove(alt);
self.finished_loading(&vec![*alt]).await;
}
pub async fn load_accounts(
@ -162,21 +164,17 @@ impl ALTStore {
write_accounts: &Vec<u8>,
read_account: &Vec<u8>,
) -> Vec<TransactionAccount> {
match self.is_loading.get(&alt) {
Some(_) => {
// if there is a lock we wait until it is fullfilled
let mut times = 0;
while times < 10 && self.is_loading.contains(alt) {
times += 1;
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
None => {
// not loading
}
}
self.is_loading.remove(&alt);
let mut times = 0;
const MAX_TIMES_RETRY : usize = 100;
while self.is_loading_contains(alt).await {
if times > MAX_TIMES_RETRY {
break;
}
times += 1;
tokio::time::sleep(Duration::from_millis(10)).await;
}
match self.load_accounts(alt, write_accounts, read_account).await {
Some(x) => x,
None => {
@ -204,6 +202,25 @@ impl ALTStore {
self.map.insert(alt.clone(), accounts.clone());
}
}
async fn is_loading(&self, alts_list: &Vec<Pubkey>) {
let mut write = self.under_loading.write().await;
for alt in alts_list {
write.insert(alt.clone());
}
}
async fn finished_loading(&self, alts_list: &Vec<Pubkey>) {
let mut write = self.under_loading.write().await;
for alt in alts_list {
write.remove(alt);
}
}
async fn is_loading_contains(&self, alt: &Pubkey) -> bool {
let read = self.under_loading.read().await;
read.contains(alt)
}
}
#[derive(Serialize, Deserialize)]

View File

@ -325,7 +325,7 @@ async fn start_tracking_blocks(
}
}
#[tokio::main()]
#[tokio::main(worker_threads=1)]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();