From fd8fb50fb25c68a1dc47b4c6094ae16bb64e9d63 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 25 Apr 2024 09:01:05 +0200 Subject: [PATCH] remove .clones and use fd_bs58 --- Cargo.lock | 42 +++++++++++++++++++-- Cargo.toml | 4 +- src/alt_store.rs | 35 +++++++++-------- src/block_info.rs | 37 +++++++++--------- src/main.rs | 23 ++++++++---- src/postgres.rs | 83 +++++++++++++++++++++++++---------------- src/transaction_info.rs | 5 ++- 7 files changed, 148 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1780d65..822b1f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1241,6 +1241,12 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +[[package]] +name = "fd_bs58" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdaac1b30ff95de4562a79ae990f91ac7bf2062d7313cc25c58987762c8e8160" + [[package]] name = "feature-probe" version = "0.1.1" @@ -1466,6 +1472,7 @@ dependencies = [ "clap", "const_env", "dashmap", + "fd_bs58", "futures", "itertools 0.10.5", "lazy_static", @@ -1485,6 +1492,7 @@ dependencies = [ "thiserror", "tokio", "tokio-postgres", + "tracing", "tracing-subscriber", "yellowstone-grpc-client 1.11.1+solana.1.16.17", "yellowstone-grpc-client 1.12.0+solana.1.16.17", @@ -1951,6 +1959,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -2775,8 +2792,17 @@ checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2787,9 +2813,15 @@ checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.2", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.2" @@ -4428,10 +4460,14 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/Cargo.toml b/Cargo.toml index 95786b5..c841330 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,13 +17,15 @@ serde = { version = "1.0.160", features = ["derive"] } serde_json = "1.0.96" bincode = "1.3.3" bs58 = "0.4.0" +fd_bs58 = "0.1.0" base64 = "0.21.0" thiserror = "1.0.40" futures = "0.3.28" bytes = "1.4.0" anyhow = "1.0.70" log = "0.4.17" -tracing-subscriber = "0.3.18" +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } clap = { version = "4.2.4", features = ["derive", "env"] } dashmap = "5.4.0" const_env = "0.1.2" diff --git a/src/alt_store.rs b/src/alt_store.rs index 114939e..f9aa7a6 100644 --- a/src/alt_store.rs +++ b/src/alt_store.rs @@ -26,14 +26,14 @@ pub struct ALTStore { impl ALTStore { pub fn new(rpc_client: Arc) -> Self { let (sx, rx) = async_channel::unbounded(); - let instant = Self { + let alt_store = Self { rpc_client, map: Arc::new(DashMap::new()), loading_queue: Arc::new(sx), }; { - let instant = instant.clone(); + let instant = alt_store.clone(); tokio::task::spawn(async move { loop { if let Ok(pk) = rx.recv().await { @@ -51,15 +51,15 @@ impl ALTStore { }); } - instant + alt_store } - pub async fn load_alts_list(&self, alts_list: &Vec) { + pub async fn load_alts_list(&self, alts_list: Vec<&Pubkey>) { log::info!("Preloading {} ALTs", alts_list.len()); - for batches in alts_list.chunks(1000).map(|x| x.to_vec()) { + for batches in alts_list.chunks(1000) { let tasks = batches.chunks(100).map(|batch| { - let batch = batch.to_vec(); + let batch: Vec = batch.into_iter().map(|pk| (*pk).clone()).collect_vec(); let rpc_client = self.rpc_client.clone(); let this = self.clone(); tokio::spawn(async move { @@ -74,7 +74,7 @@ impl ALTStore { { for (index, acc) in multiple_accounts.value.iter().enumerate() { if let Some(acc) = acc { - this.save_account(&batch[index], &acc.data); + this.save_account(batch[index], &acc.data); } } } @@ -94,26 +94,25 @@ impl ALTStore { let alts_list = alts_list .iter() .filter(|x| !self.map.contains_key(x)) - .cloned() .collect_vec(); if alts_list.is_empty() { return; } - self.load_alts_list(&alts_list).await; + self.load_alts_list(alts_list).await; } - pub async fn start_loading_missing_alts(&self, alts_list: &Vec) { + pub async fn start_loading_missing_alts(&self, alts_list: &Vec<&Pubkey>) { for key in alts_list.iter().filter(|x| !self.map.contains_key(x)) { ALTS_IN_LOADING_QUEUE.inc(); - let _ = self.loading_queue.send(*key).await; + let _ = self.loading_queue.send(*key.clone()).await; } } - pub fn save_account(&self, address: &Pubkey, data: &[u8]) { + pub fn save_account(&self, address: Pubkey, data: &[u8]) { let lookup_table = AddressLookupTable::deserialize(&data).unwrap(); if self .map - .insert(address.clone(), lookup_table.addresses.to_vec()) + .insert(address, lookup_table.addresses.to_vec()) .is_none() { ALTS_IN_STORE.inc(); @@ -121,7 +120,7 @@ impl ALTStore { drop(lookup_table); } - async fn load_accounts( + async fn load_accounts<'a>( &self, alt: &Pubkey, write_accounts: &Vec, @@ -188,13 +187,13 @@ impl ALTStore { } pub fn serialize(&self) -> Vec { - bincode::serialize::(&BinaryALTData::new(&self.map)).unwrap() + bincode::serialize::(&BinaryALTData::new(self.map.clone())).unwrap() } pub fn load_binary(&self, binary_data: Vec) { let binary_alt_data = bincode::deserialize::(&binary_data).unwrap(); - for (alt, accounts) in binary_alt_data.data.iter() { - self.map.insert(alt.clone(), accounts.clone()); + for (pubkey, accounts) in binary_alt_data.data { + self.map.insert(pubkey, accounts); } } } @@ -205,7 +204,7 @@ pub struct BinaryALTData { } impl BinaryALTData { - pub fn new(map: &Arc>>) -> Self { + pub fn new(map: Arc>>) -> Self { let data = map .iter() .map(|x| (x.key().clone(), x.value().clone())) diff --git a/src/block_info.rs b/src/block_info.rs index 1dc100b..7f7e65f 100644 --- a/src/block_info.rs +++ b/src/block_info.rs @@ -17,6 +17,7 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, }; +use std::rc::Rc; #[derive(Serialize, Debug, Clone)] pub struct PrioFeeData { @@ -121,7 +122,7 @@ pub struct BlockInfo { impl BlockInfo { pub async fn process_versioned_message( atl_store: &Arc, - signature: &String, + signature: String, slot: Slot, message: &VersionedMessage, prio_fees_in_block: &mut Vec<(u64, u64)>, @@ -183,10 +184,10 @@ impl BlockInfo { if !is_vote { let mut accounts = message .static_account_keys() - .iter() + .iter().cloned() .enumerate() - .map(|(index, account)| TransactionAccount { - key: account.clone(), + .map(|(index, account_pk)| TransactionAccount { + key: account_pk, is_writable: message.is_maybe_writable(index), is_signer: message.is_signer(index), is_alt: false, @@ -209,7 +210,7 @@ impl BlockInfo { for writable_account in accounts .iter() .filter(|x| x.is_writable) - .map(|x| x.key.clone()) + .map(|x| x.key) { match writelocked_accounts.get_mut(&writable_account) { Some(x) => { @@ -219,9 +220,9 @@ impl BlockInfo { } None => { writelocked_accounts.insert( - writable_account.clone(), + writable_account, AccountData { - key: writable_account.to_string(), + key: fd_bs58::encode_32(writable_account), cu_consumed, cu_requested, vec_pf: vec![prioritization_fees], @@ -234,7 +235,7 @@ impl BlockInfo { for readable_account in accounts .iter() .filter(|x| !x.is_writable) - .map(|x| x.key.clone()) + .map(|x| x.key) { match readlocked_accounts.get_mut(&readable_account) { Some(x) => { @@ -244,9 +245,9 @@ impl BlockInfo { } None => { readlocked_accounts.insert( - readable_account.clone(), + readable_account, AccountData { - key: readable_account.to_string(), + key: fd_bs58::encode_32(readable_account), cu_consumed, cu_requested, vec_pf: vec![prioritization_fees], @@ -257,14 +258,14 @@ impl BlockInfo { } Some(BlockTransactionInfo { - signature: signature.to_string(), + signature, processed_slot: slot as i64, is_successful, cu_requested: cu_requested as i64, cu_consumed: cu_consumed as i64, prioritization_fees: prioritization_fees as i64, supp_infos: String::new(), - accounts: accounts, + accounts, }) } else { None @@ -348,6 +349,7 @@ impl BlockInfo { block: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateBlock, ) -> BlockInfo { let block_hash = block.blockhash.clone(); + let _span = tracing::debug_span!("map_block_info", block_hash = block_hash); let slot = block.slot; let leader_identity = block .rewards @@ -401,9 +403,8 @@ impl BlockInfo { let Some(meta) = &transaction.meta else { return None; }; - let signature = Signature::try_from(&tx.signatures[0][0..64]) - .unwrap() - .to_string(); + + let signature = fd_bs58::encode_64(&tx.signatures[0]); let message = VersionedMessage::V0(v0::Message { header: MessageHeader { @@ -442,7 +443,7 @@ impl BlockInfo { .try_into() .unwrap_or(Pubkey::default().to_bytes()); let account_key = Pubkey::new_from_array(bytes); - lookup_tables.insert(account_key.clone()); + lookup_tables.insert(account_key); MessageAddressTableLookup { account_key, writable_indexes: table.writable_indexes, @@ -456,14 +457,14 @@ impl BlockInfo { .collect_vec(); atl_store - .start_loading_missing_alts(&lookup_tables.iter().cloned().collect_vec()) + .start_loading_missing_alts(&lookup_tables.iter().collect_vec()) .await; let mut block_transactions = vec![]; for (signature, message, meta, is_vote) in sigs_and_messages { let tx = Self::process_versioned_message( &atl_store, - &signature, + signature, slot, &message, &mut prio_fees_in_block, diff --git a/src/main.rs b/src/main.rs index 4ccb7a7..e0b1d72 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,6 +26,8 @@ use futures::StreamExt; use log::{debug, error, info}; use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge}; use tokio::sync::mpsc::Sender; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::fmt::format::FmtSpan; use yellowstone_grpc_client_original::GeyserGrpcClientBufferConfig; use transaction_info::TransactionInfo; @@ -123,7 +125,7 @@ pub async fn start_tracking_banking_stage_errors( BANKING_STAGE_ERROR_EVENT_COUNT.inc(); instance = Instant::now(); - let sig = transaction.signature.to_string(); + let sig = &transaction.signature; match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) { Some(mut x) => { let tx_info = x.value_mut(); @@ -133,7 +135,7 @@ pub async fn start_tracking_banking_stage_errors( // map_of_infos might get populated by parallel writers if multiple geyser sources are configured let write_version = error_plugin_write_version.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let tx_info = TransactionInfo::new(&transaction, write_version); - map_of_infos.insert((sig, transaction.slot), tx_info); + map_of_infos.insert((sig.clone(), transaction.slot), tx_info); } } }, @@ -158,7 +160,7 @@ async fn start_tracking_blocks( grpc_x_token: Option, block_sender_postgres: Vec>, slot: Arc, - alts_list: Vec, + alts_list: Vec<&Pubkey>, ) { let block_counter = Arc::new(AtomicU64::new(0)); let restart_block_subscription = Arc::new(AtomicBool::new(false)); @@ -233,7 +235,7 @@ async fn start_tracking_blocks( // }; let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client)); - atl_store.load_alts_list(&alts_list).await; + atl_store.load_alts_list(alts_list).await; // let data = atl_store.serialize(); // let mut alts_file = tokio::fs::File::create("alt_binary.bin").await.unwrap(); @@ -331,7 +333,7 @@ async fn start_tracking_blocks( if let Some(account) = account_update.account { let bytes: [u8; 32] = account.pubkey.try_into().unwrap_or(Pubkey::default().to_bytes()); let pubkey = Pubkey::new_from_array(bytes); - atl_store.save_account(&pubkey, &account.data); + atl_store.save_account(pubkey, &account.data); } }, _ => {} @@ -345,7 +347,12 @@ async fn start_tracking_blocks( #[tokio::main()] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); + tracing_subscriber::fmt::fmt() + .with_env_filter(EnvFilter::from_default_env()) + // not sure if "CLOSE" is exactly what we want + // ex. "close time.busy=14.7ms time.idle=14.0µs" + .with_span_events(FmtSpan::CLOSE) + .init(); let args = Args::parse(); let rpc_client = Arc::new(rpc_client::RpcClient::new(args.rpc_url)); @@ -370,7 +377,7 @@ async fn main() -> anyhow::Result<()> { .split("\r\n") .map(|x| x.trim().to_string()) .filter(|x| x.len() > 0) - .map(|x| Pubkey::from_str(&x).unwrap()) + .map(|x| Pubkey::new_from_array(fd_bs58::decode_32(x).unwrap())) .collect_vec(); let mut block_senders = vec![]; @@ -410,7 +417,7 @@ async fn main() -> anyhow::Result<()> { args.grpc_x_token, block_senders, slot, - alts_list, + alts_list.iter().map(|x| x).collect_vec(), ) .await; } diff --git a/src/postgres.rs b/src/postgres.rs index 9fe4507..0e29a55 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -3,6 +3,7 @@ use std::{ sync::{atomic::AtomicU64, Arc}, time::Duration, }; +use std::rc::Rc; use anyhow::Context; use base64::Engine; @@ -14,6 +15,7 @@ use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use prometheus::{opts, register_int_gauge, IntGauge}; use serde::Serialize; +use solana_sdk::pubkey::Pubkey; use solana_sdk::transaction::TransactionError; use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::Sender; @@ -199,7 +201,7 @@ impl PostgresSession { Ok(()) } - pub async fn create_transaction_ids(&self, signatures: HashSet) -> anyhow::Result<()> { + pub async fn create_transaction_ids(&self, signatures: Vec) -> anyhow::Result<()> { // create temp table let temp_table = self.get_new_temp_table(); @@ -456,9 +458,10 @@ impl PostgresSession { ); pin_mut!(writer); for acc_tx in accounts_for_transaction { - for acc in &acc_tx.accounts { + for acc in acc_tx.accounts { let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(4); - args.push(&acc.key); + let pubkey_str = &acc.key; + args.push(&pubkey_str); args.push(&acc_tx.signature); args.push(&acc.writable); args.push(&acc.is_signer); @@ -847,9 +850,10 @@ impl PostgresSession { return Ok(()); } // create transaction ids - let signatures = txs + let signatures: Vec = txs .iter() .map(|transaction| transaction.signature.clone()) + .unique() .collect(); self.create_transaction_ids(signatures).await?; // create account ids @@ -870,7 +874,7 @@ impl PostgresSession { .account_used .iter() .map(|(key, is_writable)| AccountUsed { - key: key.clone(), + key: fd_bs58::encode_32(key), writable: *is_writable, is_signer: false, is_atl: false, @@ -899,44 +903,59 @@ impl PostgresSession { } pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> { + // 750ms + let _span = tracing::info_span!("save_block", slot = block_info.slot); let instant = Instant::now(); // create transaction ids let int_sig = Instant::now(); - let signatures = block_info - .transactions - .iter() - .map(|transaction| transaction.signature.clone()) - .collect(); + let signatures = { + // .3ms + let _span = tracing::debug_span!("map_signatures", slot = block_info.slot); + block_info + .transactions + .iter() + .map(|transaction| transaction.signature.clone()) + .collect() + }; self.create_transaction_ids(signatures).await?; TIME_TO_SAVE_TRANSACTION.set(int_sig.elapsed().as_millis() as i64); // create account ids let ins_acc = Instant::now(); - let accounts = block_info - .heavily_locked_accounts - .iter() - .map(|acc| acc.key.clone()) - .collect(); + let accounts = { + // .6ms + let _span = tracing::debug_span!("map_accounts", slot = block_info.slot); + block_info + .heavily_locked_accounts + .iter() + .map(|acc| acc.key.clone()) + .collect() + }; self.create_accounts_for_transaction(accounts).await?; ACCOUNT_SAVE_TIME.set(ins_acc.elapsed().as_millis() as i64); let instant_acc_tx: Instant = Instant::now(); - let txs_accounts = block_info - .transactions - .iter() - .map(|tx| AccountsForTransaction { - signature: tx.signature.clone(), - accounts: tx - .accounts - .iter() - .map(|acc| AccountUsed { - key: acc.key.to_string(), - writable: acc.is_writable, - is_signer: acc.is_signer, - is_atl: acc.is_alt, - }) - .collect(), - }) - .collect_vec(); + let txs_accounts = { + // 90ms + let _span = tracing::debug_span!("map_txs_accounts", slot = block_info.slot); + block_info + .transactions + .iter() + .map(|tx| AccountsForTransaction { + signature: tx.signature.clone(), + accounts: tx + .accounts + .iter() + .map(|acc| AccountUsed { + key: fd_bs58::encode_32(&acc.key), + writable: acc.is_writable, + is_signer: acc.is_signer, + is_atl: acc.is_alt, + }) + .collect(), + }) + .collect_vec() + }; + if let Err(e) = self.insert_accounts_for_transaction(txs_accounts).await { error!("Error inserting accounts for transactions : {e:?}"); } diff --git a/src/transaction_info.rs b/src/transaction_info.rs index 306b82a..ce308c5 100644 --- a/src/transaction_info.rs +++ b/src/transaction_info.rs @@ -1,4 +1,6 @@ use std::{collections::HashMap, hash::Hash}; +use std::rc::Rc; +use std::sync::Arc; use chrono::{DateTime, Utc}; use itertools::Itertools; @@ -103,8 +105,9 @@ impl TransactionInfo { .iter() .map(|x| (x.account.clone(), x.is_writable)) .collect_vec(); + Self { - signature: notification.signature.clone(), + signature: notification.signature.clone().into(), errors, slot: notification.slot, utc_timestamp,