From 829cc7952a464d596ef815ae669cf4fbcc32abe1 Mon Sep 17 00:00:00 2001 From: galactus <96341601+godmodegalactus@users.noreply.github.com> Date: Mon, 29 Jan 2024 10:54:11 +0100 Subject: [PATCH] Alt processing 2024 01 26 (#50) * Banking stage sidecar deadlock (#48) * Properly loading the ALTs * Adding a timeout for ALTs task list * More optimization for ALTs * Adding timeout for getMA * Avoid RwLocks because they cause deadlocks * Fixing the deadlock * cargo fmt and minor changes * Fixing up metrics * minor changes * Solving issue with accounts metric * Avoid waiting alts while processing block * Fixing the metrics, minor optimizations * fix double insertion of transaction accounts --- Cargo.lock | 1 + Cargo.toml | 1 + src/alt_store.rs | 169 +++++++++++++++++++++++++++------------------- src/block_info.rs | 159 ++++++++++++++++++++++--------------------- src/main.rs | 48 ++++++++++++- src/postgres.rs | 15 ++-- 6 files changed, 241 insertions(+), 152 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53a6cc7..4963858 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1476,6 +1476,7 @@ dependencies = [ "rustls 0.20.8", "serde", "serde_json", + "solana-account-decoder", "solana-address-lookup-table-program 1.16.17 (registry+https://github.com/rust-lang/crates.io-index)", "solana-rpc-client", "solana-rpc-client-api", diff --git a/Cargo.toml b/Cargo.toml index 2cb468e..32396e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ solana-rpc-client = "~1.16.17" solana-rpc-client-api = "~1.16.17" solana-transaction-status = "~1.16.17" solana-address-lookup-table-program = "~1.16.17" +solana-account-decoder = "~1.16.17" itertools = "0.10.5" serde = { version = "1.0.160", features = ["derive"] } diff --git a/src/alt_store.rs b/src/alt_store.rs index bb9b7e3..bfce9d6 100644 --- a/src/alt_store.rs +++ b/src/alt_store.rs @@ -1,65 +1,76 @@ +use crate::block_info::TransactionAccount; use dashmap::DashMap; use itertools::Itertools; use prometheus::{opts, register_int_gauge, IntGauge}; +use serde::{Deserialize, Serialize}; use solana_address_lookup_table_program::state::AddressLookupTable; use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::{account::ReadableAccount, commitment_config::CommitmentConfig, pubkey::Pubkey}; -use std::sync::Arc; -use tokio::sync::RwLock; +use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; +use std::{sync::Arc, time::Duration}; -use crate::block_info::TransactionAccount; lazy_static::lazy_static! { static ref ALTS_IN_STORE: IntGauge = register_int_gauge!(opts!("banking_stage_sidecar_alts_stored", "Alts stored in sidecar")).unwrap(); + + static ref ALTS_IN_LOADING_QUEUE: IntGauge = + register_int_gauge!(opts!("banking_stage_sidecar_alts_loading_queue", "Alts in loading queue in sidecar")).unwrap(); } #[derive(Clone)] pub struct ALTStore { rpc_client: Arc, pub map: Arc>>, - is_loading: Arc>>>, + loading_queue: Arc>, } impl ALTStore { pub fn new(rpc_client: Arc) -> Self { - Self { + let (sx, rx) = async_channel::unbounded(); + let instant = Self { rpc_client, map: Arc::new(DashMap::new()), - is_loading: Arc::new(DashMap::new()), + loading_queue: Arc::new(sx), + }; + + { + let instant = instant.clone(); + tokio::task::spawn(async move { + loop { + if let Ok(pk) = rx.recv().await { + let mut alts_list = vec![pk]; + ALTS_IN_LOADING_QUEUE.dec(); + tokio::time::sleep(Duration::from_millis(1)).await; + while let Ok(pk) = rx.try_recv() { + alts_list.push(pk); + ALTS_IN_LOADING_QUEUE.dec(); + tokio::time::sleep(Duration::from_millis(1)).await; + } + instant._load_all_alts(&alts_list).await; + } + } + }); } + + instant } - pub async fn load_all_alts(&self, alts_list: Vec) { - let alts_list = alts_list - .iter() - .filter(|x| self.map.contains_key(&x) || self.is_loading.contains_key(&x)) - .cloned() - .collect_vec(); - - if alts_list.is_empty() { - return; - } - + pub async fn load_alts_list(&self, alts_list: &Vec) { log::info!("Preloading {} ALTs", alts_list.len()); for batches in alts_list.chunks(1000).map(|x| x.to_vec()) { - let lock = Arc::new(RwLock::new(())); - // add in loading list - batches.iter().for_each(|x| { - self.is_loading.insert(x.clone(), lock.clone()); - }); - let _context = lock.write().await; - let tasks = batches.chunks(10).map(|batch| { + let tasks = batches.chunks(100).map(|batch| { let batch = batch.to_vec(); let rpc_client = self.rpc_client.clone(); let this = self.clone(); tokio::spawn(async move { - if let Ok(multiple_accounts) = rpc_client - .get_multiple_accounts_with_commitment( + if let Ok(Ok(multiple_accounts)) = tokio::time::timeout( + Duration::from_secs(30), + rpc_client.get_multiple_accounts_with_commitment( &batch, CommitmentConfig::processed(), - ) - .await + ), + ) + .await { for (index, acc) in multiple_accounts.value.iter().enumerate() { if let Some(acc) = acc { @@ -69,9 +80,36 @@ impl ALTStore { } }) }); - futures::future::join_all(tasks).await; + if let Err(_) = + tokio::time::timeout(Duration::from_secs(20), futures::future::join_all(tasks)) + .await + { + log::warn!("timedout getting {} alts", alts_list.len()); + } + } + ALTS_IN_STORE.set(self.map.len() as i64); + } + + async fn _load_all_alts(&self, alts_list: &Vec) { + let alts_list = alts_list + .iter() + .filter(|x| !self.map.contains_key(x)) + .cloned() + .collect_vec(); + if alts_list.is_empty() { + return; + } + self.load_alts_list(&alts_list).await; + } + + pub async fn start_loading_missing_alts(&self, alts_list: &Vec) { + for key in alts_list + .iter() + .filter(|x| !self.map.contains_key(x)) + { + ALTS_IN_LOADING_QUEUE.inc(); + let _ = self.loading_queue.send(*key).await; } - log::info!("Finished Loading {} ALTs", alts_list.len()); } pub fn save_account(&self, address: &Pubkey, data: &[u8]) { @@ -86,22 +124,7 @@ impl ALTStore { drop(lookup_table); } - pub async fn reload_alt_from_rpc(&self, alt: &Pubkey) { - let lock = Arc::new(RwLock::new(())); - let _ = lock.write().await; - self.is_loading.insert(alt.clone(), lock.clone()); - let response = self - .rpc_client - .get_account_with_commitment(alt, CommitmentConfig::processed()) - .await; - if let Ok(account_res) = response { - if let Some(account) = account_res.value { - self.save_account(alt, account.data()); - } - } - } - - pub async fn load_accounts( + async fn load_accounts( &self, alt: &Pubkey, write_accounts: &Vec, @@ -147,7 +170,7 @@ impl ALTStore { .collect_vec(); Some([wa, ra].concat()) } - None => Some(vec![]), + None => None, } } @@ -157,31 +180,39 @@ impl ALTStore { write_accounts: &Vec, read_account: &Vec, ) -> Vec { - match self.is_loading.get(&alt) { - Some(x) => { - // if there is a lock we wait until it is fullfilled - let x = x.value().clone(); - log::debug!("waiting for alt {}", alt.to_string()); - let _ = x.read().await; - } - None => { - // not loading - } - } match self.load_accounts(alt, write_accounts, read_account).await { Some(x) => x, None => { - //load alt - self.reload_alt_from_rpc(&alt).await; - match self.load_accounts(alt, write_accounts, read_account).await { - Some(x) => x, - None => { - // reloading did not work - log::error!("cannot load alt even after"); - vec![] - } - } + // forget alt for now, start loading it for next blocks + // loading should be on its way + vec![] } } } + + pub fn serialize(&self) -> Vec { + bincode::serialize::(&BinaryALTData::new(&self.map)).unwrap() + } + + pub fn load_binary(&self, binary_data: Vec) { + let binary_alt_data = bincode::deserialize::(&binary_data).unwrap(); + for (alt, accounts) in binary_alt_data.data.iter() { + self.map.insert(alt.clone(), accounts.clone()); + } + } +} + +#[derive(Serialize, Deserialize)] +pub struct BinaryALTData { + pub data: Vec<(Pubkey, Vec)>, +} + +impl BinaryALTData { + pub fn new(map: &Arc>>) -> Self { + let data = map + .iter() + .map(|x| (x.key().clone(), x.value().clone())) + .collect_vec(); + Self { data } + } } diff --git a/src/block_info.rs b/src/block_info.rs index 5a91805..cd5fb2b 100644 --- a/src/block_info.rs +++ b/src/block_info.rs @@ -120,8 +120,8 @@ pub struct BlockInfo { impl BlockInfo { pub async fn process_versioned_message( - atl_store: Arc, - signature: String, + atl_store: &Arc, + signature: &String, slot: Slot, message: &VersionedMessage, prio_fees_in_block: &mut Vec<(u64, u64)>, @@ -257,7 +257,7 @@ impl BlockInfo { } Some(BlockTransactionInfo { - signature, + signature: signature.to_string(), processed_slot: slot as i64, is_successful, cu_requested: cu_requested as i64, @@ -378,82 +378,89 @@ impl BlockInfo { let mut readlocked_accounts: HashMap = HashMap::new(); let mut total_cu_requested: u64 = 0; let mut prio_fees_in_block = vec![]; - let mut block_transactions = vec![]; let mut lookup_tables = HashSet::new(); - for transaction in &block.transactions { - let Some(tx) = &transaction.transaction else { - continue; - }; + let sigs_and_messages = block + .transactions + .iter() + .filter_map(|transaction| { + let Some(tx) = &transaction.transaction else { + return None; + }; - let Some(message) = &tx.message else { - continue; - }; + let Some(message) = &tx.message else { + return None; + }; - let Some(header) = &message.header else { - continue; - }; + let Some(header) = &message.header else { + return None; + }; - let Some(meta) = &transaction.meta else { - continue; - }; - let signature = Signature::try_from(&tx.signatures[0][0..64]) - .unwrap() - .to_string(); + let Some(meta) = &transaction.meta else { + return None; + }; + let signature = Signature::try_from(&tx.signatures[0][0..64]) + .unwrap() + .to_string(); - let message = VersionedMessage::V0(v0::Message { - header: MessageHeader { - num_required_signatures: header.num_required_signatures as u8, - num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8, - num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8, - }, - account_keys: message - .account_keys - .clone() - .into_iter() - .map(|key| { - let bytes: [u8; 32] = - key.try_into().unwrap_or(Pubkey::default().to_bytes()); - Pubkey::new_from_array(bytes) - }) - .collect(), - recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash), - instructions: message - .instructions - .clone() - .into_iter() - .map(|ix| CompiledInstruction { - program_id_index: ix.program_id_index as u8, - accounts: ix.accounts, - data: ix.data, - }) - .collect(), - address_table_lookups: message - .address_table_lookups - .clone() - .into_iter() - .map(|table| { - let bytes: [u8; 32] = table - .account_key - .try_into() - .unwrap_or(Pubkey::default().to_bytes()); - let account_key = Pubkey::new_from_array(bytes); - lookup_tables.insert(account_key.clone()); - MessageAddressTableLookup { - account_key, - writable_indexes: table.writable_indexes, - readonly_indexes: table.readonly_indexes, - } - }) - .collect(), - }); - let atl_store = atl_store.clone(); - atl_store - .load_all_alts(lookup_tables.iter().cloned().collect_vec()) - .await; + let message = VersionedMessage::V0(v0::Message { + header: MessageHeader { + num_required_signatures: header.num_required_signatures as u8, + num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8, + num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8, + }, + account_keys: message + .account_keys + .clone() + .into_iter() + .map(|key| { + let bytes: [u8; 32] = + key.try_into().unwrap_or(Pubkey::default().to_bytes()); + Pubkey::new_from_array(bytes) + }) + .collect(), + recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash), + instructions: message + .instructions + .clone() + .into_iter() + .map(|ix| CompiledInstruction { + program_id_index: ix.program_id_index as u8, + accounts: ix.accounts, + data: ix.data, + }) + .collect(), + address_table_lookups: message + .address_table_lookups + .clone() + .into_iter() + .map(|table| { + let bytes: [u8; 32] = table + .account_key + .try_into() + .unwrap_or(Pubkey::default().to_bytes()); + let account_key = Pubkey::new_from_array(bytes); + lookup_tables.insert(account_key.clone()); + MessageAddressTableLookup { + account_key, + writable_indexes: table.writable_indexes, + readonly_indexes: table.readonly_indexes, + } + }) + .collect(), + }); + Some((signature, message, meta, transaction.is_vote)) + }) + .collect_vec(); - let transaction = Self::process_versioned_message( - atl_store, - signature, + atl_store + .start_loading_missing_alts(&lookup_tables.iter().cloned().collect_vec()) + .await; + + let mut block_transactions = vec![]; + for (signature, message, meta, is_vote) in sigs_and_messages { + let tx = Self::process_versioned_message( + &atl_store, + &signature, slot, &message, &mut prio_fees_in_block, @@ -461,12 +468,12 @@ impl BlockInfo { &mut readlocked_accounts, meta.compute_units_consumed.unwrap_or(0), &mut total_cu_requested, - transaction.is_vote, + is_vote, meta.err.is_none(), ) .await; - if let Some(transaction) = transaction { - block_transactions.push(transaction); + if let Some(tx) = tx { + block_transactions.push(tx); } } diff --git a/src/main.rs b/src/main.rs index 805c374..1e6d11f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,8 @@ use clap::Parser; use itertools::Itertools; +use solana_account_decoder::UiDataSliceConfig; use solana_rpc_client::nonblocking::rpc_client::{self, RpcClient}; +use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; use solana_sdk::pubkey::Pubkey; use std::{ collections::HashMap, @@ -11,7 +13,10 @@ use std::{ }, time::Duration, }; -use tokio::{io::AsyncReadExt, time::Instant}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + time::Instant, +}; use crate::prometheus_sync::PrometheusSync; use block_info::BlockInfo; @@ -179,8 +184,47 @@ async fn start_tracking_blocks( None, ) .unwrap(); + + // Load all ALTs stores. + // let alts_list = rpc_client + // .get_program_accounts_with_config( + // &solana_address_lookup_table_program::id(), + // RpcProgramAccountsConfig { + // filters: None, + // account_config: RpcAccountInfoConfig { + // encoding: Some(solana_account_decoder::UiAccountEncoding::Base64), + // data_slice: Some(UiDataSliceConfig { + // offset: 0, + // length: 0, + // }), + // commitment: None, + // min_context_slot: None, + // }, + // with_context: None, + // }, + // ) + // .await + // .unwrap() + // .iter() + // .map(|(pubkey, _)| pubkey.clone()) + // .collect_vec(); + + // ALT store from binary + // let atl_store = { + // let alt_store = Arc::new(alt_store::ALTStore::new(rpc_client)); + // let mut alts_file = tokio::fs::File::open(alts_list).await.unwrap(); + // let mut buf = vec![]; + // alts_file.read_to_end(&mut buf).await.unwrap(); + // alt_store.load_binary(buf); + // alt_store + // }; + let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client)); - atl_store.load_all_alts(alts_list).await; + atl_store.load_alts_list(&alts_list).await; + + // let data = atl_store.serialize(); + // let mut alts_file = tokio::fs::File::create("alt_binary.bin").await.unwrap(); + // alts_file.write_all(&data).await.unwrap(); loop { let mut blocks_subs = HashMap::new(); diff --git a/src/postgres.rs b/src/postgres.rs index b95e014..24c9a98 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -11,6 +11,7 @@ use itertools::Itertools; use log::{debug, error, info, log, warn, Level}; use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; +use prometheus::{opts, register_int_gauge, IntGauge}; use serde::Serialize; use solana_sdk::transaction::TransactionError; use tokio::sync::mpsc::error::SendTimeoutError; @@ -32,6 +33,11 @@ use crate::{ const BLOCK_WRITE_BUFFER_SIZE: usize = 5; const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 100; +lazy_static::lazy_static! { + static ref ACCOUNTS_SAVING_QUEUE: IntGauge = + register_int_gauge!(opts!("banking_stage_sidecar_accounts_save_queue", "Account in save queue")).unwrap(); +} + pub struct TempTableTracker { count: AtomicU64, } @@ -118,7 +124,8 @@ impl PostgresSession { while let Some(accounts_for_transaction) = accounts_for_transaction_reciever.recv().await { - let instant = Instant::now(); + let instant: Instant = Instant::now(); + ACCOUNTS_SAVING_QUEUE.dec(); if let Err(e) = instance .insert_accounts_for_transaction(accounts_for_transaction) .await @@ -828,6 +835,7 @@ impl PostgresSession { }) .collect_vec(); // insert accounts for transaction + ACCOUNTS_SAVING_QUEUE.inc(); let _ = self.accounts_for_transaction_sender.send(txs_accounts); Ok(()) } @@ -875,12 +883,9 @@ impl PostgresSession { }) .collect_vec(); + ACCOUNTS_SAVING_QUEUE.inc(); let _ = self.accounts_for_transaction_sender.send(txs_accounts); - // save transactions in block - self.insert_transactions_for_block(&block_info.transactions, block_info.slot) - .await?; - // save account usage in blocks self.save_account_usage_in_block(&block_info).await?; self.save_block_info(&block_info).await?;