Banking stage sidecar deadlock (#48)

* Properly loading the ALTs

* Adding a timeout for ALTs task list

* More optimization for ALTs

* Adding timeout for getMA

* Avoid RwLocks because they cause deadlocks

* Fixing the deadlock

* cargo fmt and minor changes
This commit is contained in:
galactus 2024-01-26 11:47:00 +01:00 committed by GitHub
parent 64d607155c
commit 37bb07a88b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 216 additions and 112 deletions

1
Cargo.lock generated
View File

@ -1476,6 +1476,7 @@ dependencies = [
"rustls 0.20.8",
"serde",
"serde_json",
"solana-account-decoder",
"solana-address-lookup-table-program 1.16.17 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-rpc-client",
"solana-rpc-client-api",

View File

@ -10,6 +10,7 @@ solana-rpc-client = "~1.16.17"
solana-rpc-client-api = "~1.16.17"
solana-transaction-status = "~1.16.17"
solana-address-lookup-table-program = "~1.16.17"
solana-account-decoder = "~1.16.17"
itertools = "0.10.5"
serde = { version = "1.0.160", features = ["derive"] }

View File

@ -1,13 +1,14 @@
use crate::block_info::TransactionAccount;
use dashmap::DashMap;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use serde::{Deserialize, Serialize};
use solana_address_lookup_table_program::state::AddressLookupTable;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{account::ReadableAccount, commitment_config::CommitmentConfig, pubkey::Pubkey};
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use crate::block_info::TransactionAccount;
lazy_static::lazy_static! {
static ref ALTS_IN_STORE: IntGauge =
register_int_gauge!(opts!("banking_stage_sidecar_alts_stored", "Alts stored in sidecar")).unwrap();
@ -17,7 +18,7 @@ lazy_static::lazy_static! {
pub struct ALTStore {
rpc_client: Arc<RpcClient>,
pub map: Arc<DashMap<Pubkey, Vec<Pubkey>>>,
is_loading: Arc<DashMap<Pubkey, Arc<tokio::sync::RwLock<()>>>>,
under_loading: Arc<RwLock<HashSet<Pubkey>>>,
}
impl ALTStore {
@ -25,41 +26,42 @@ impl ALTStore {
Self {
rpc_client,
map: Arc::new(DashMap::new()),
is_loading: Arc::new(DashMap::new()),
under_loading: Arc::new(RwLock::new(HashSet::new())),
}
}
pub async fn load_all_alts(&self, alts_list: Vec<Pubkey>) {
let alts_list = alts_list
.iter()
.filter(|x| self.map.contains_key(&x) || self.is_loading.contains_key(&x))
.cloned()
.collect_vec();
let alts_list = {
let lk = self.under_loading.read().await;
alts_list
.iter()
.filter(|x| !self.map.contains_key(x) && !lk.contains(x))
.cloned()
.collect_vec()
};
if alts_list.is_empty() {
return;
}
// add in loading list
self.is_loading(&alts_list).await;
log::info!("Preloading {} ALTs", alts_list.len());
for batches in alts_list.chunks(1000).map(|x| x.to_vec()) {
let lock = Arc::new(RwLock::new(()));
// add in loading list
batches.iter().for_each(|x| {
self.is_loading.insert(x.clone(), lock.clone());
});
let _context = lock.write().await;
let tasks = batches.chunks(10).map(|batch| {
let tasks = batches.chunks(100).map(|batch| {
let batch = batch.to_vec();
let rpc_client = self.rpc_client.clone();
let this = self.clone();
tokio::spawn(async move {
if let Ok(multiple_accounts) = rpc_client
.get_multiple_accounts_with_commitment(
if let Ok(Ok(multiple_accounts)) = tokio::time::timeout(
Duration::from_secs(30),
rpc_client.get_multiple_accounts_with_commitment(
&batch,
CommitmentConfig::processed(),
)
.await
),
)
.await
{
for (index, acc) in multiple_accounts.value.iter().enumerate() {
if let Some(acc) = acc {
@ -69,9 +71,16 @@ impl ALTStore {
}
})
});
futures::future::join_all(tasks).await;
if let Err(_) =
tokio::time::timeout(Duration::from_secs(20), futures::future::join_all(tasks))
.await
{
log::warn!("timedout getting {} alts", alts_list.len());
}
}
log::info!("Finished Loading {} ALTs", alts_list.len());
self.finished_loading(&alts_list).await;
ALTS_IN_STORE.set(alts_list.len() as i64);
}
pub fn save_account(&self, address: &Pubkey, data: &[u8]) {
@ -87,9 +96,7 @@ impl ALTStore {
}
pub async fn reload_alt_from_rpc(&self, alt: &Pubkey) {
let lock = Arc::new(RwLock::new(()));
let _ = lock.write().await;
self.is_loading.insert(alt.clone(), lock.clone());
self.is_loading(&vec![*alt]).await;
let response = self
.rpc_client
.get_account_with_commitment(alt, CommitmentConfig::processed())
@ -99,6 +106,7 @@ impl ALTStore {
self.save_account(alt, account.data());
}
}
self.finished_loading(&vec![*alt]).await;
}
pub async fn load_accounts(
@ -157,17 +165,16 @@ impl ALTStore {
write_accounts: &Vec<u8>,
read_account: &Vec<u8>,
) -> Vec<TransactionAccount> {
match self.is_loading.get(&alt) {
Some(x) => {
// if there is a lock we wait until it is fullfilled
let x = x.value().clone();
log::debug!("waiting for alt {}", alt.to_string());
let _ = x.read().await;
}
None => {
// not loading
let mut times = 0;
const MAX_TIMES_RETRY: usize = 100;
while self.is_loading_contains(alt).await {
if times > MAX_TIMES_RETRY {
break;
}
times += 1;
tokio::time::sleep(Duration::from_millis(10)).await;
}
match self.load_accounts(alt, write_accounts, read_account).await {
Some(x) => x,
None => {
@ -184,4 +191,49 @@ impl ALTStore {
}
}
}
pub fn serialize(&self) -> Vec<u8> {
bincode::serialize::<BinaryALTData>(&BinaryALTData::new(&self.map)).unwrap()
}
pub fn load_binary(&self, binary_data: Vec<u8>) {
let binary_alt_data = bincode::deserialize::<BinaryALTData>(&binary_data).unwrap();
for (alt, accounts) in binary_alt_data.data.iter() {
self.map.insert(alt.clone(), accounts.clone());
}
}
async fn is_loading(&self, alts_list: &Vec<Pubkey>) {
let mut write = self.under_loading.write().await;
for alt in alts_list {
write.insert(alt.clone());
}
}
async fn finished_loading(&self, alts_list: &Vec<Pubkey>) {
let mut write = self.under_loading.write().await;
for alt in alts_list {
write.remove(alt);
}
}
async fn is_loading_contains(&self, alt: &Pubkey) -> bool {
let read = self.under_loading.read().await;
read.contains(alt)
}
}
#[derive(Serialize, Deserialize)]
pub struct BinaryALTData {
pub data: Vec<(Pubkey, Vec<Pubkey>)>,
}
impl BinaryALTData {
pub fn new(map: &Arc<DashMap<Pubkey, Vec<Pubkey>>>) -> Self {
let data = map
.iter()
.map(|x| (x.key().clone(), x.value().clone()))
.collect_vec();
Self { data }
}
}

View File

@ -120,8 +120,8 @@ pub struct BlockInfo {
impl BlockInfo {
pub async fn process_versioned_message(
atl_store: Arc<ALTStore>,
signature: String,
atl_store: &Arc<ALTStore>,
signature: &String,
slot: Slot,
message: &VersionedMessage,
prio_fees_in_block: &mut Vec<(u64, u64)>,
@ -257,7 +257,7 @@ impl BlockInfo {
}
Some(BlockTransactionInfo {
signature,
signature: signature.to_string(),
processed_slot: slot as i64,
is_successful,
cu_requested: cu_requested as i64,
@ -378,82 +378,89 @@ impl BlockInfo {
let mut readlocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut total_cu_requested: u64 = 0;
let mut prio_fees_in_block = vec![];
let mut block_transactions = vec![];
let mut lookup_tables = HashSet::new();
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let sigs_and_messages = block
.transactions
.iter()
.filter_map(|transaction| {
let Some(tx) = &transaction.transaction else {
return None;
};
let Some(message) = &tx.message else {
continue;
};
let Some(message) = &tx.message else {
return None;
};
let Some(header) = &message.header else {
continue;
};
let Some(header) = &message.header else {
return None;
};
let Some(meta) = &transaction.meta else {
continue;
};
let signature = Signature::try_from(&tx.signatures[0][0..64])
.unwrap()
.to_string();
let Some(meta) = &transaction.meta else {
return None;
};
let signature = Signature::try_from(&tx.signatures[0][0..64])
.unwrap()
.to_string();
let message = VersionedMessage::V0(v0::Message {
header: MessageHeader {
num_required_signatures: header.num_required_signatures as u8,
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys
.clone()
.into_iter()
.map(|key| {
let bytes: [u8; 32] =
key.try_into().unwrap_or(Pubkey::default().to_bytes());
Pubkey::new_from_array(bytes)
})
.collect(),
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
instructions: message
.instructions
.clone()
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
accounts: ix.accounts,
data: ix.data,
})
.collect(),
address_table_lookups: message
.address_table_lookups
.clone()
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
.account_key
.try_into()
.unwrap_or(Pubkey::default().to_bytes());
let account_key = Pubkey::new_from_array(bytes);
lookup_tables.insert(account_key.clone());
MessageAddressTableLookup {
account_key,
writable_indexes: table.writable_indexes,
readonly_indexes: table.readonly_indexes,
}
})
.collect(),
});
let atl_store = atl_store.clone();
atl_store
.load_all_alts(lookup_tables.iter().cloned().collect_vec())
.await;
let message = VersionedMessage::V0(v0::Message {
header: MessageHeader {
num_required_signatures: header.num_required_signatures as u8,
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys
.clone()
.into_iter()
.map(|key| {
let bytes: [u8; 32] =
key.try_into().unwrap_or(Pubkey::default().to_bytes());
Pubkey::new_from_array(bytes)
})
.collect(),
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
instructions: message
.instructions
.clone()
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
accounts: ix.accounts,
data: ix.data,
})
.collect(),
address_table_lookups: message
.address_table_lookups
.clone()
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
.account_key
.try_into()
.unwrap_or(Pubkey::default().to_bytes());
let account_key = Pubkey::new_from_array(bytes);
lookup_tables.insert(account_key.clone());
MessageAddressTableLookup {
account_key,
writable_indexes: table.writable_indexes,
readonly_indexes: table.readonly_indexes,
}
})
.collect(),
});
Some((signature, message, meta, transaction.is_vote))
})
.collect_vec();
let transaction = Self::process_versioned_message(
atl_store,
signature,
atl_store
.load_all_alts(lookup_tables.iter().cloned().collect_vec())
.await;
let mut block_transactions = vec![];
for (signature, message, meta, is_vote) in sigs_and_messages {
let tx = Self::process_versioned_message(
&atl_store,
&signature,
slot,
&message,
&mut prio_fees_in_block,
@ -461,12 +468,12 @@ impl BlockInfo {
&mut readlocked_accounts,
meta.compute_units_consumed.unwrap_or(0),
&mut total_cu_requested,
transaction.is_vote,
is_vote,
meta.err.is_none(),
)
.await;
if let Some(transaction) = transaction {
block_transactions.push(transaction);
if let Some(tx) = tx {
block_transactions.push(tx);
}
}

View File

@ -1,6 +1,8 @@
use clap::Parser;
use itertools::Itertools;
use solana_account_decoder::UiDataSliceConfig;
use solana_rpc_client::nonblocking::rpc_client::{self, RpcClient};
use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_sdk::pubkey::Pubkey;
use std::{
collections::HashMap,
@ -11,7 +13,10 @@ use std::{
},
time::Duration,
};
use tokio::{io::AsyncReadExt, time::Instant};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
time::Instant,
};
use crate::prometheus_sync::PrometheusSync;
use block_info::BlockInfo;
@ -179,9 +184,48 @@ async fn start_tracking_blocks(
None,
)
.unwrap();
// Load all ALTs stores.
// let alts_list = rpc_client
// .get_program_accounts_with_config(
// &solana_address_lookup_table_program::id(),
// RpcProgramAccountsConfig {
// filters: None,
// account_config: RpcAccountInfoConfig {
// encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
// data_slice: Some(UiDataSliceConfig {
// offset: 0,
// length: 0,
// }),
// commitment: None,
// min_context_slot: None,
// },
// with_context: None,
// },
// )
// .await
// .unwrap()
// .iter()
// .map(|(pubkey, _)| pubkey.clone())
// .collect_vec();
// ALT store from binary
// let atl_store = {
// let alt_store = Arc::new(alt_store::ALTStore::new(rpc_client));
// let mut alts_file = tokio::fs::File::open(alts_list).await.unwrap();
// let mut buf = vec![];
// alts_file.read_to_end(&mut buf).await.unwrap();
// alt_store.load_binary(buf);
// alt_store
// };
let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client));
atl_store.load_all_alts(alts_list).await;
// let data = atl_store.serialize();
// let mut alts_file = tokio::fs::File::create("alt_binary.bin").await.unwrap();
// alts_file.write_all(&data).await.unwrap();
loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
@ -284,7 +328,6 @@ async fn start_tracking_blocks(
}
}
#[tokio::main()]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();