Alt processing 2024 01 26 (#50)

* Banking stage sidecar deadlock (#48)

* Properly loading the ALTs

* Adding a timeout for ALTs task list

* More optimization for ALTs

* Adding timeout for getMA

* Avoid RwLocks because they cause deadlocks

* Fixing the deadlock

* cargo fmt and minor changes

* Fixing up metrics

* minor changes

* Solving issue with accounts metric

* Avoid waiting alts while processing block

* Fixing the metrics, minor optimizations

* fix double insertion of transaction accounts
This commit is contained in:
galactus 2024-01-29 10:54:11 +01:00 committed by GitHub
parent 566a107145
commit 829cc7952a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 241 additions and 152 deletions

1
Cargo.lock generated
View File

@ -1476,6 +1476,7 @@ dependencies = [
"rustls 0.20.8",
"serde",
"serde_json",
"solana-account-decoder",
"solana-address-lookup-table-program 1.16.17 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-rpc-client",
"solana-rpc-client-api",

View File

@ -10,6 +10,7 @@ solana-rpc-client = "~1.16.17"
solana-rpc-client-api = "~1.16.17"
solana-transaction-status = "~1.16.17"
solana-address-lookup-table-program = "~1.16.17"
solana-account-decoder = "~1.16.17"
itertools = "0.10.5"
serde = { version = "1.0.160", features = ["derive"] }

View File

@ -1,65 +1,76 @@
use crate::block_info::TransactionAccount;
use dashmap::DashMap;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use serde::{Deserialize, Serialize};
use solana_address_lookup_table_program::state::AddressLookupTable;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{account::ReadableAccount, commitment_config::CommitmentConfig, pubkey::Pubkey};
use std::sync::Arc;
use tokio::sync::RwLock;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use std::{sync::Arc, time::Duration};
use crate::block_info::TransactionAccount;
lazy_static::lazy_static! {
static ref ALTS_IN_STORE: IntGauge =
register_int_gauge!(opts!("banking_stage_sidecar_alts_stored", "Alts stored in sidecar")).unwrap();
static ref ALTS_IN_LOADING_QUEUE: IntGauge =
register_int_gauge!(opts!("banking_stage_sidecar_alts_loading_queue", "Alts in loading queue in sidecar")).unwrap();
}
#[derive(Clone)]
pub struct ALTStore {
rpc_client: Arc<RpcClient>,
pub map: Arc<DashMap<Pubkey, Vec<Pubkey>>>,
is_loading: Arc<DashMap<Pubkey, Arc<tokio::sync::RwLock<()>>>>,
loading_queue: Arc<async_channel::Sender<Pubkey>>,
}
impl ALTStore {
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
Self {
let (sx, rx) = async_channel::unbounded();
let instant = Self {
rpc_client,
map: Arc::new(DashMap::new()),
is_loading: Arc::new(DashMap::new()),
loading_queue: Arc::new(sx),
};
{
let instant = instant.clone();
tokio::task::spawn(async move {
loop {
if let Ok(pk) = rx.recv().await {
let mut alts_list = vec![pk];
ALTS_IN_LOADING_QUEUE.dec();
tokio::time::sleep(Duration::from_millis(1)).await;
while let Ok(pk) = rx.try_recv() {
alts_list.push(pk);
ALTS_IN_LOADING_QUEUE.dec();
tokio::time::sleep(Duration::from_millis(1)).await;
}
instant._load_all_alts(&alts_list).await;
}
}
});
}
instant
}
pub async fn load_all_alts(&self, alts_list: Vec<Pubkey>) {
let alts_list = alts_list
.iter()
.filter(|x| self.map.contains_key(&x) || self.is_loading.contains_key(&x))
.cloned()
.collect_vec();
if alts_list.is_empty() {
return;
}
pub async fn load_alts_list(&self, alts_list: &Vec<Pubkey>) {
log::info!("Preloading {} ALTs", alts_list.len());
for batches in alts_list.chunks(1000).map(|x| x.to_vec()) {
let lock = Arc::new(RwLock::new(()));
// add in loading list
batches.iter().for_each(|x| {
self.is_loading.insert(x.clone(), lock.clone());
});
let _context = lock.write().await;
let tasks = batches.chunks(10).map(|batch| {
let tasks = batches.chunks(100).map(|batch| {
let batch = batch.to_vec();
let rpc_client = self.rpc_client.clone();
let this = self.clone();
tokio::spawn(async move {
if let Ok(multiple_accounts) = rpc_client
.get_multiple_accounts_with_commitment(
if let Ok(Ok(multiple_accounts)) = tokio::time::timeout(
Duration::from_secs(30),
rpc_client.get_multiple_accounts_with_commitment(
&batch,
CommitmentConfig::processed(),
)
.await
),
)
.await
{
for (index, acc) in multiple_accounts.value.iter().enumerate() {
if let Some(acc) = acc {
@ -69,9 +80,36 @@ impl ALTStore {
}
})
});
futures::future::join_all(tasks).await;
if let Err(_) =
tokio::time::timeout(Duration::from_secs(20), futures::future::join_all(tasks))
.await
{
log::warn!("timedout getting {} alts", alts_list.len());
}
}
ALTS_IN_STORE.set(self.map.len() as i64);
}
async fn _load_all_alts(&self, alts_list: &Vec<Pubkey>) {
let alts_list = alts_list
.iter()
.filter(|x| !self.map.contains_key(x))
.cloned()
.collect_vec();
if alts_list.is_empty() {
return;
}
self.load_alts_list(&alts_list).await;
}
pub async fn start_loading_missing_alts(&self, alts_list: &Vec<Pubkey>) {
for key in alts_list
.iter()
.filter(|x| !self.map.contains_key(x))
{
ALTS_IN_LOADING_QUEUE.inc();
let _ = self.loading_queue.send(*key).await;
}
log::info!("Finished Loading {} ALTs", alts_list.len());
}
pub fn save_account(&self, address: &Pubkey, data: &[u8]) {
@ -86,22 +124,7 @@ impl ALTStore {
drop(lookup_table);
}
pub async fn reload_alt_from_rpc(&self, alt: &Pubkey) {
let lock = Arc::new(RwLock::new(()));
let _ = lock.write().await;
self.is_loading.insert(alt.clone(), lock.clone());
let response = self
.rpc_client
.get_account_with_commitment(alt, CommitmentConfig::processed())
.await;
if let Ok(account_res) = response {
if let Some(account) = account_res.value {
self.save_account(alt, account.data());
}
}
}
pub async fn load_accounts(
async fn load_accounts(
&self,
alt: &Pubkey,
write_accounts: &Vec<u8>,
@ -147,7 +170,7 @@ impl ALTStore {
.collect_vec();
Some([wa, ra].concat())
}
None => Some(vec![]),
None => None,
}
}
@ -157,31 +180,39 @@ impl ALTStore {
write_accounts: &Vec<u8>,
read_account: &Vec<u8>,
) -> Vec<TransactionAccount> {
match self.is_loading.get(&alt) {
Some(x) => {
// if there is a lock we wait until it is fullfilled
let x = x.value().clone();
log::debug!("waiting for alt {}", alt.to_string());
let _ = x.read().await;
}
None => {
// not loading
}
}
match self.load_accounts(alt, write_accounts, read_account).await {
Some(x) => x,
None => {
//load alt
self.reload_alt_from_rpc(&alt).await;
match self.load_accounts(alt, write_accounts, read_account).await {
Some(x) => x,
None => {
// reloading did not work
log::error!("cannot load alt even after");
vec![]
}
}
// forget alt for now, start loading it for next blocks
// loading should be on its way
vec![]
}
}
}
pub fn serialize(&self) -> Vec<u8> {
bincode::serialize::<BinaryALTData>(&BinaryALTData::new(&self.map)).unwrap()
}
pub fn load_binary(&self, binary_data: Vec<u8>) {
let binary_alt_data = bincode::deserialize::<BinaryALTData>(&binary_data).unwrap();
for (alt, accounts) in binary_alt_data.data.iter() {
self.map.insert(alt.clone(), accounts.clone());
}
}
}
#[derive(Serialize, Deserialize)]
pub struct BinaryALTData {
pub data: Vec<(Pubkey, Vec<Pubkey>)>,
}
impl BinaryALTData {
pub fn new(map: &Arc<DashMap<Pubkey, Vec<Pubkey>>>) -> Self {
let data = map
.iter()
.map(|x| (x.key().clone(), x.value().clone()))
.collect_vec();
Self { data }
}
}

View File

@ -120,8 +120,8 @@ pub struct BlockInfo {
impl BlockInfo {
pub async fn process_versioned_message(
atl_store: Arc<ALTStore>,
signature: String,
atl_store: &Arc<ALTStore>,
signature: &String,
slot: Slot,
message: &VersionedMessage,
prio_fees_in_block: &mut Vec<(u64, u64)>,
@ -257,7 +257,7 @@ impl BlockInfo {
}
Some(BlockTransactionInfo {
signature,
signature: signature.to_string(),
processed_slot: slot as i64,
is_successful,
cu_requested: cu_requested as i64,
@ -378,82 +378,89 @@ impl BlockInfo {
let mut readlocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut total_cu_requested: u64 = 0;
let mut prio_fees_in_block = vec![];
let mut block_transactions = vec![];
let mut lookup_tables = HashSet::new();
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let sigs_and_messages = block
.transactions
.iter()
.filter_map(|transaction| {
let Some(tx) = &transaction.transaction else {
return None;
};
let Some(message) = &tx.message else {
continue;
};
let Some(message) = &tx.message else {
return None;
};
let Some(header) = &message.header else {
continue;
};
let Some(header) = &message.header else {
return None;
};
let Some(meta) = &transaction.meta else {
continue;
};
let signature = Signature::try_from(&tx.signatures[0][0..64])
.unwrap()
.to_string();
let Some(meta) = &transaction.meta else {
return None;
};
let signature = Signature::try_from(&tx.signatures[0][0..64])
.unwrap()
.to_string();
let message = VersionedMessage::V0(v0::Message {
header: MessageHeader {
num_required_signatures: header.num_required_signatures as u8,
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys
.clone()
.into_iter()
.map(|key| {
let bytes: [u8; 32] =
key.try_into().unwrap_or(Pubkey::default().to_bytes());
Pubkey::new_from_array(bytes)
})
.collect(),
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
instructions: message
.instructions
.clone()
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
accounts: ix.accounts,
data: ix.data,
})
.collect(),
address_table_lookups: message
.address_table_lookups
.clone()
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
.account_key
.try_into()
.unwrap_or(Pubkey::default().to_bytes());
let account_key = Pubkey::new_from_array(bytes);
lookup_tables.insert(account_key.clone());
MessageAddressTableLookup {
account_key,
writable_indexes: table.writable_indexes,
readonly_indexes: table.readonly_indexes,
}
})
.collect(),
});
let atl_store = atl_store.clone();
atl_store
.load_all_alts(lookup_tables.iter().cloned().collect_vec())
.await;
let message = VersionedMessage::V0(v0::Message {
header: MessageHeader {
num_required_signatures: header.num_required_signatures as u8,
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys
.clone()
.into_iter()
.map(|key| {
let bytes: [u8; 32] =
key.try_into().unwrap_or(Pubkey::default().to_bytes());
Pubkey::new_from_array(bytes)
})
.collect(),
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
instructions: message
.instructions
.clone()
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
accounts: ix.accounts,
data: ix.data,
})
.collect(),
address_table_lookups: message
.address_table_lookups
.clone()
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
.account_key
.try_into()
.unwrap_or(Pubkey::default().to_bytes());
let account_key = Pubkey::new_from_array(bytes);
lookup_tables.insert(account_key.clone());
MessageAddressTableLookup {
account_key,
writable_indexes: table.writable_indexes,
readonly_indexes: table.readonly_indexes,
}
})
.collect(),
});
Some((signature, message, meta, transaction.is_vote))
})
.collect_vec();
let transaction = Self::process_versioned_message(
atl_store,
signature,
atl_store
.start_loading_missing_alts(&lookup_tables.iter().cloned().collect_vec())
.await;
let mut block_transactions = vec![];
for (signature, message, meta, is_vote) in sigs_and_messages {
let tx = Self::process_versioned_message(
&atl_store,
&signature,
slot,
&message,
&mut prio_fees_in_block,
@ -461,12 +468,12 @@ impl BlockInfo {
&mut readlocked_accounts,
meta.compute_units_consumed.unwrap_or(0),
&mut total_cu_requested,
transaction.is_vote,
is_vote,
meta.err.is_none(),
)
.await;
if let Some(transaction) = transaction {
block_transactions.push(transaction);
if let Some(tx) = tx {
block_transactions.push(tx);
}
}

View File

@ -1,6 +1,8 @@
use clap::Parser;
use itertools::Itertools;
use solana_account_decoder::UiDataSliceConfig;
use solana_rpc_client::nonblocking::rpc_client::{self, RpcClient};
use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_sdk::pubkey::Pubkey;
use std::{
collections::HashMap,
@ -11,7 +13,10 @@ use std::{
},
time::Duration,
};
use tokio::{io::AsyncReadExt, time::Instant};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
time::Instant,
};
use crate::prometheus_sync::PrometheusSync;
use block_info::BlockInfo;
@ -179,8 +184,47 @@ async fn start_tracking_blocks(
None,
)
.unwrap();
// Load all ALTs stores.
// let alts_list = rpc_client
// .get_program_accounts_with_config(
// &solana_address_lookup_table_program::id(),
// RpcProgramAccountsConfig {
// filters: None,
// account_config: RpcAccountInfoConfig {
// encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
// data_slice: Some(UiDataSliceConfig {
// offset: 0,
// length: 0,
// }),
// commitment: None,
// min_context_slot: None,
// },
// with_context: None,
// },
// )
// .await
// .unwrap()
// .iter()
// .map(|(pubkey, _)| pubkey.clone())
// .collect_vec();
// ALT store from binary
// let atl_store = {
// let alt_store = Arc::new(alt_store::ALTStore::new(rpc_client));
// let mut alts_file = tokio::fs::File::open(alts_list).await.unwrap();
// let mut buf = vec![];
// alts_file.read_to_end(&mut buf).await.unwrap();
// alt_store.load_binary(buf);
// alt_store
// };
let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client));
atl_store.load_all_alts(alts_list).await;
atl_store.load_alts_list(&alts_list).await;
// let data = atl_store.serialize();
// let mut alts_file = tokio::fs::File::create("alt_binary.bin").await.unwrap();
// alts_file.write_all(&data).await.unwrap();
loop {
let mut blocks_subs = HashMap::new();

View File

@ -11,6 +11,7 @@ use itertools::Itertools;
use log::{debug, error, info, log, warn, Level};
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use prometheus::{opts, register_int_gauge, IntGauge};
use serde::Serialize;
use solana_sdk::transaction::TransactionError;
use tokio::sync::mpsc::error::SendTimeoutError;
@ -32,6 +33,11 @@ use crate::{
const BLOCK_WRITE_BUFFER_SIZE: usize = 5;
const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 100;
lazy_static::lazy_static! {
static ref ACCOUNTS_SAVING_QUEUE: IntGauge =
register_int_gauge!(opts!("banking_stage_sidecar_accounts_save_queue", "Account in save queue")).unwrap();
}
pub struct TempTableTracker {
count: AtomicU64,
}
@ -118,7 +124,8 @@ impl PostgresSession {
while let Some(accounts_for_transaction) =
accounts_for_transaction_reciever.recv().await
{
let instant = Instant::now();
let instant: Instant = Instant::now();
ACCOUNTS_SAVING_QUEUE.dec();
if let Err(e) = instance
.insert_accounts_for_transaction(accounts_for_transaction)
.await
@ -828,6 +835,7 @@ impl PostgresSession {
})
.collect_vec();
// insert accounts for transaction
ACCOUNTS_SAVING_QUEUE.inc();
let _ = self.accounts_for_transaction_sender.send(txs_accounts);
Ok(())
}
@ -875,12 +883,9 @@ impl PostgresSession {
})
.collect_vec();
ACCOUNTS_SAVING_QUEUE.inc();
let _ = self.accounts_for_transaction_sender.send(txs_accounts);
// save transactions in block
self.insert_transactions_for_block(&block_info.transactions, block_info.slot)
.await?;
// save account usage in blocks
self.save_account_usage_in_block(&block_info).await?;
self.save_block_info(&block_info).await?;