From fbf2e16988d1ad01c05b3d92818a74b53525537d Mon Sep 17 00:00:00 2001 From: galactus <96341601+godmodegalactus@users.noreply.github.com> Date: Fri, 1 Dec 2023 12:08:27 +0100 Subject: [PATCH] Adding supp info (#5) * adding prioritization fees with write lock accounts * adding supplimentary infos * moving to multiple geyser endpoints, using copyin instead of insert * mincor changes * adding missing column in transaction info * minor bug fixes and fmt * restart block subscription on geyser * resubsribing to banking errors notification on failure --- Cargo.lock | 40 +++- Cargo.toml | 4 + migration.sql | 13 +- src/block_info.rs | 462 +++++++++++++++++----------------------- src/cli.rs | 8 +- src/main.rs | 410 ++++++++++++++++++++--------------- src/postgres.rs | 99 ++++++--- src/transaction_info.rs | 10 +- 8 files changed, 560 insertions(+), 486 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a4dd7bc..b2a0c89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1440,8 +1440,10 @@ dependencies = [ "tokio", "tokio-postgres", "tracing-subscriber", - "yellowstone-grpc-client", - "yellowstone-grpc-proto", + "yellowstone-grpc-client 1.11.1+solana.1.16.17", + "yellowstone-grpc-client 1.12.0+solana.1.16.17", + "yellowstone-grpc-proto 1.10.0+solana.1.16.17", + "yellowstone-grpc-proto 1.11.0+solana.1.16.17", ] [[package]] @@ -4819,7 +4821,22 @@ dependencies = [ "thiserror", "tonic", "tonic-health", - "yellowstone-grpc-proto", + "yellowstone-grpc-proto 1.10.0+solana.1.16.17", +] + +[[package]] +name = "yellowstone-grpc-client" +version = "1.12.0+solana.1.16.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e58204f372a7e82d15d72bdf99334029c4e9cdc15bd2e9a5c33b598d9f1eb8b6" +dependencies = [ + "bytes", + "futures", + "http", + "thiserror", + "tonic", + "tonic-health", + "yellowstone-grpc-proto 1.11.0+solana.1.16.17", ] [[package]] @@ -4838,6 +4855,23 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "yellowstone-grpc-proto" +version = "1.11.0+solana.1.16.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00d751c6ef3093ec90ab1e16c6a504b5bea99aca6c688c429fed4cc56782f57e" +dependencies = [ + "anyhow", + "bincode", + "prost", + "protobuf-src", + "solana-account-decoder", + "solana-sdk", + "solana-transaction-status", + "tonic", + "tonic-build", +] + [[package]] name = "zerocopy" version = "0.7.11" diff --git a/Cargo.toml b/Cargo.toml index 35212a7..f84ddf3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,10 @@ tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros", "time"] } yellowstone-grpc-client = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"} yellowstone-grpc-proto = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"} +yellowstone-grpc-client_original = { package = "yellowstone-grpc-client", version = "1.12.0+solana.1.16.17" } +yellowstone-grpc-proto_original = { package = "yellowstone-grpc-proto", version = "1.11.0+solana.1.16.17" } + + [patch.crates-io] solana-account-decoder = { git = "https://github.com/rpcpool/solana-public.git", tag = "v1.16.17-geyser-block-v3-mango" } solana-geyser-plugin-interface = { git = "https://github.com/rpcpool/solana-public.git", tag = "v1.16.17-geyser-block-v3-mango" } diff --git a/migration.sql b/migration.sql index 1c14b56..a2290af 100644 --- a/migration.sql +++ b/migration.sql @@ -10,12 +10,13 @@ CREATE TABLE banking_stage_results.transaction_infos ( prioritization_fees BIGINT, utc_timestamp TIMESTAMP WITH TIME ZONE NOT NULL, accounts_used text, - processed_slot BIGINT + processed_slot BIGINT, + supp_infos text ); CREATE TABLE banking_stage_results.blocks ( - block_hash CHAR(44) PRIMARY KEY, - slot BIGINT, + slot BIGINT PRIMARY KEY, + block_hash CHAR(44), leader_identity CHAR(44), successful_transactions BIGINT, banking_stage_errors BIGINT, @@ -23,7 +24,8 @@ CREATE TABLE banking_stage_results.blocks ( total_cu_used BIGINT, total_cu_requested BIGINT, heavily_writelocked_accounts text, - heavily_readlocked_accounts text + heavily_readlocked_accounts text, + supp_infos text ); CREATE INDEX idx_blocks_slot ON banking_stage_results.blocks(slot); @@ -35,5 +37,4 @@ CREATE INDEX idx_blocks_slot_errors ON banking_stage_results.blocks(slot) WHERE CREATE INDEX idx_transaction_infos_timestamp ON banking_stage_results.transaction_infos(utc_timestamp); -- optional CLUSTER banking_stage_results.transaction_infos using idx_transaction_infos_timestamp; -VACUUM FULL banking_stage_results.transaction_infos; - +VACUUM FULL banking_stage_results.transaction_infos; \ No newline at end of file diff --git a/src/block_info.rs b/src/block_info.rs index 0b54d4a..b1b9217 100644 --- a/src/block_info.rs +++ b/src/block_info.rs @@ -13,7 +13,6 @@ use solana_sdk::{ }; use solana_transaction_status::{RewardType, UiConfirmedBlock}; use std::collections::HashMap; -use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock; #[derive(Serialize, Debug, Clone)] pub struct AccountUsage { @@ -48,6 +47,15 @@ impl From<&AccountData> for AccountUsage { } } +#[derive(Serialize, Debug)] +pub struct BlockSupplimentaryInfo { + pub p_min: u64, + pub p_median: u64, + pub p_75: u64, + pub p_90: u64, + pub p_max: u64, +} + pub struct BlockInfo { pub block_hash: String, pub slot: i64, @@ -59,10 +67,160 @@ pub struct BlockInfo { pub total_cu_requested: i64, pub heavily_writelocked_accounts: Vec, pub heavily_readlocked_accounts: Vec, + pub sup_info: Option, } impl BlockInfo { - pub fn new(block: &SubscribeUpdateBlock) -> BlockInfo { + pub fn process_versioned_message( + message: &VersionedMessage, + prio_fees_in_block: &mut Vec, + writelocked_accounts: &mut HashMap, + readlocked_accounts: &mut HashMap, + cu_consumed: u64, + total_cu_requested: &mut u64, + is_vote: bool, + ) { + let (cu_requested, prio_fees, nb_ix_except_cb) = { + let mut cu_request: Option = None; + let mut prio_fees: Option = None; + let mut nb_ix_except_cb: u64 = 0; + + for ix in message.instructions() { + if ix + .program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + let ix_which = + try_from_slice_unchecked::(ix.data.as_slice()); + if let Ok(ix_which) = ix_which { + match ix_which { + ComputeBudgetInstruction::RequestUnitsDeprecated { + units, + additional_fee, + } => { + cu_request = Some(units as u64); + if additional_fee > 0 { + prio_fees = Some( + (units as u64) + .saturating_mul(1000) + .saturating_div(additional_fee as u64), + ); + } + } + ComputeBudgetInstruction::SetComputeUnitLimit(units) => { + cu_request = Some(units as u64) + } + ComputeBudgetInstruction::SetComputeUnitPrice(price) => { + prio_fees = Some(price) + } + _ => {} + } + } + } else { + nb_ix_except_cb += 1; + } + } + + (cu_request, prio_fees, nb_ix_except_cb) + }; + let prioritization_fees = prio_fees.unwrap_or_default(); + prio_fees_in_block.push(prioritization_fees); + let cu_requested = + std::cmp::min(1_400_000, cu_requested.unwrap_or(200000 * nb_ix_except_cb)); + *total_cu_requested += cu_requested; + if !is_vote { + let accounts = message + .static_account_keys() + .iter() + .enumerate() + .map(|(index, account)| (message.is_maybe_writable(index), *account)) + .collect_vec(); + for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) { + match writelocked_accounts.get_mut(&writable_account) { + Some(x) => { + x.cu_requested += cu_requested; + x.cu_consumed += cu_consumed; + x.vec_pf.push(prioritization_fees); + } + None => { + writelocked_accounts.insert( + writable_account, + AccountData { + key: writable_account.to_string(), + cu_consumed, + cu_requested, + vec_pf: vec![prioritization_fees], + }, + ); + } + } + } + + for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) { + match readlocked_accounts.get_mut(&readable_account) { + Some(x) => { + x.cu_requested += cu_requested; + x.cu_consumed += cu_consumed; + x.vec_pf.push(prioritization_fees); + } + None => { + readlocked_accounts.insert( + readable_account, + AccountData { + key: readable_account.to_string(), + cu_consumed, + cu_requested, + vec_pf: vec![prioritization_fees], + }, + ); + } + } + } + } + } + + pub fn calculate_account_usage( + writelocked_accounts: &HashMap, + readlocked_accounts: &HashMap, + ) -> (Vec, Vec) { + let mut heavily_writelocked_accounts = writelocked_accounts + .iter() + .map(|(_, data)| AccountUsage::from(data)) + .collect_vec(); + heavily_writelocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed)); + + let mut heavily_readlocked_accounts: Vec<_> = readlocked_accounts + .iter() + .map(|(_, data)| AccountUsage::from(data)) + .collect(); + heavily_readlocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed)); + (heavily_writelocked_accounts, heavily_readlocked_accounts) + } + + pub fn calculate_supp_info( + prio_fees_in_block: &mut Vec, + ) -> Option { + if !prio_fees_in_block.is_empty() { + prio_fees_in_block.sort(); + let median_index = prio_fees_in_block.len() / 2; + let p75_index = prio_fees_in_block.len() * 75 / 100; + let p90_index = prio_fees_in_block.len() * 90 / 100; + Some(BlockSupplimentaryInfo { + p_min: prio_fees_in_block[0], + p_median: prio_fees_in_block[median_index], + p_75: prio_fees_in_block[p75_index], + p_90: prio_fees_in_block[p90_index], + p_max: prio_fees_in_block.last().cloned().unwrap_or_default(), + }) + } else { + None + } + } + + pub fn new( + block: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateBlock, + banking_stage_errors: Option, + ) -> BlockInfo { let block_hash = block.blockhash.clone(); let slot = block.slot; let leader_identity = block @@ -96,6 +254,7 @@ impl BlockInfo { let mut writelocked_accounts: HashMap = HashMap::new(); let mut readlocked_accounts: HashMap = HashMap::new(); let mut total_cu_requested: u64 = 0; + let mut prio_fees_in_block = vec![]; for transaction in &block.transactions { let Some(tx) = &transaction.transaction else { continue; @@ -158,139 +317,21 @@ impl BlockInfo { .collect(), }); - let legacy_compute_budget: Option<(u32, Option)> = - message.instructions().iter().find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { - units, - additional_fee, - }) = try_from_slice_unchecked(i.data.as_slice()) - { - if additional_fee > 0 { - return Some(( - units, - Some( - ((units.saturating_mul(1000)) - .saturating_div(additional_fee)) - as u64, - ), - )); - } else { - return Some((units, None)); - } - } - } - None - }); - - let legacy_cu_requested = legacy_compute_budget.map(|x| x.0); - - let cu_requested = message - .instructions() - .iter() - .find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = - try_from_slice_unchecked(i.data.as_slice()) - { - return Some(limit); - } - } - None - }) - .or(legacy_cu_requested); - - let prioritization_fees = message - .instructions() - .iter() - .find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = - try_from_slice_unchecked(i.data.as_slice()) - { - return Some(price); - } - } - None - }) - .unwrap_or_default(); - - let cu_requested = cu_requested.unwrap_or(200000) as u64; - let cu_consumed = meta.compute_units_consumed.unwrap_or(0); - total_cu_requested += cu_requested; - - let accounts = message - .static_account_keys() - .iter() - .enumerate() - .map(|(index, account)| (message.is_maybe_writable(index), *account)) - .collect_vec(); - for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) { - match writelocked_accounts.get_mut(&writable_account) { - Some(x) => { - x.cu_requested += cu_requested; - x.cu_consumed += cu_consumed; - if prioritization_fees > 0 { - x.vec_pf.push(prioritization_fees); - } - } - None => { - writelocked_accounts.insert( - writable_account, - AccountData { - key: writable_account.to_string(), - cu_consumed, - cu_requested, - vec_pf: vec![prioritization_fees], - }, - ); - } - } - } - - for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) { - match readlocked_accounts.get_mut(&readable_account) { - Some(x) => { - x.cu_requested += cu_requested; - x.cu_consumed += cu_consumed; - if prioritization_fees > 0 { - x.vec_pf.push(prioritization_fees); - } - } - None => { - readlocked_accounts.insert( - readable_account, - AccountData { - key: readable_account.to_string(), - cu_consumed, - cu_requested, - vec_pf: vec![prioritization_fees], - }, - ); - } - } - } + Self::process_versioned_message( + &message, + &mut prio_fees_in_block, + &mut writelocked_accounts, + &mut readlocked_accounts, + meta.compute_units_consumed.unwrap_or(0), + &mut total_cu_requested, + transaction.is_vote, + ); } - let mut heavily_writelocked_accounts = writelocked_accounts - .iter() - .filter(|(_, account)| account.cu_consumed > 1000000) - .map(|(_, data)| AccountUsage::from(data)) - .collect_vec(); - heavily_writelocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed)); + let (heavily_writelocked_accounts, heavily_readlocked_accounts) = + Self::calculate_account_usage(&writelocked_accounts, &readlocked_accounts); - let mut heavily_readlocked_accounts: Vec<_> = readlocked_accounts - .iter() - .filter(|(_, acc)| acc.cu_consumed > 1000000) - .map(|(_, data)| AccountUsage::from(data)) - .collect(); - heavily_readlocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed)); + let sup_info = Self::calculate_supp_info(&mut prio_fees_in_block); BlockInfo { block_hash, @@ -298,15 +339,16 @@ impl BlockInfo { leader_identity, successful_transactions: successful_transactions as i64, processed_transactions: processed_transactions as i64, - banking_stage_errors: None, + banking_stage_errors, total_cu_used, total_cu_requested: total_cu_requested as i64, heavily_writelocked_accounts, heavily_readlocked_accounts, + sup_info, } } - pub fn new_from_rpc_block( + pub fn _new_from_rpc_block( slot: Slot, block: &UiConfirmedBlock, banking_stage_errors_count: i64, @@ -352,6 +394,7 @@ impl BlockInfo { let mut writelocked_accounts: HashMap = HashMap::new(); let mut readlocked_accounts: HashMap = HashMap::new(); let mut total_cu_requested: u64 = 0; + let mut prio_fees_in_block = vec![]; for transaction in transactions { let Some(tx) = transaction.transaction.decode() else { continue; @@ -362,143 +405,27 @@ impl BlockInfo { let Some(meta) = &transaction.meta else { continue; }; + let is_vote = false; - let legacy_compute_budget: Option<(u32, Option)> = - message.instructions().iter().find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { - units, - additional_fee, - }) = try_from_slice_unchecked(i.data.as_slice()) - { - if additional_fee > 0 { - return Some(( - units, - Some( - ((units.saturating_mul(1000)) - .saturating_div(additional_fee)) - as u64, - ), - )); - } else { - return Some((units, None)); - } - } - } - None - }); - - let legacy_cu_requested = legacy_compute_budget.map(|x| x.0); - - let cu_requested = message - .instructions() - .iter() - .find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = - try_from_slice_unchecked(i.data.as_slice()) - { - return Some(limit); - } - } - None - }) - .or(legacy_cu_requested); - let cu_requested = cu_requested.unwrap_or(200000) as u64; - let cu_consumed = match meta.compute_units_consumed { - solana_transaction_status::option_serializer::OptionSerializer::Some(x) => x, - solana_transaction_status::option_serializer::OptionSerializer::Skip => 0, - solana_transaction_status::option_serializer::OptionSerializer::None => 0, - }; - total_cu_requested += cu_requested; - - let prioritization_fees = message - .instructions() - .iter() - .find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = - try_from_slice_unchecked(i.data.as_slice()) - { - return Some(price); - } - } - None - }) - .unwrap_or_default(); - - let accounts = message - .static_account_keys() - .iter() - .enumerate() - .map(|(index, account)| (message.is_maybe_writable(index), *account)) - .collect_vec(); - for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) { - match writelocked_accounts.get_mut(&writable_account) { - Some(x) => { - x.cu_requested += cu_requested; - x.cu_consumed += cu_consumed; - if prioritization_fees > 0 { - x.vec_pf.push(prioritization_fees) - } - } - None => { - writelocked_accounts.insert( - writable_account, - AccountData { - key: writable_account.to_string(), - cu_consumed, - cu_requested, - vec_pf: vec![prioritization_fees], - }, - ); - } - } - } - - for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) { - match readlocked_accounts.get_mut(&readable_account) { - Some(x) => { - x.cu_requested += cu_requested; - x.cu_consumed += cu_consumed; - if prioritization_fees > 0 { - x.vec_pf.push(prioritization_fees) - } - } - None => { - readlocked_accounts.insert( - readable_account, - AccountData { - key: readable_account.to_string(), - cu_consumed, - cu_requested, - vec_pf: vec![prioritization_fees], - }, - ); - } - } - } + Self::process_versioned_message( + message, + &mut prio_fees_in_block, + &mut writelocked_accounts, + &mut readlocked_accounts, + match meta.compute_units_consumed { + solana_transaction_status::option_serializer::OptionSerializer::None => 0, + solana_transaction_status::option_serializer::OptionSerializer::Skip => 0, + solana_transaction_status::option_serializer::OptionSerializer::Some(x) => x, + }, + &mut total_cu_requested, + is_vote, + ); } - let mut heavily_writelocked_accounts = writelocked_accounts - .iter() - .filter(|(_, account)| account.cu_consumed > 1000000) - .map(|(_, data)| AccountUsage::from(data)) - .collect_vec(); - heavily_writelocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed)); + let (heavily_writelocked_accounts, heavily_readlocked_accounts) = + Self::calculate_account_usage(&writelocked_accounts, &readlocked_accounts); - let mut heavily_readlocked_accounts: Vec<_> = readlocked_accounts - .iter() - .filter(|(_, acc)| acc.cu_consumed > 1000000) - .map(|(_, data)| AccountUsage::from(data)) - .collect(); - heavily_readlocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed)); + let sup_info = Self::calculate_supp_info(&mut prio_fees_in_block); Some(BlockInfo { block_hash, slot: slot as i64, @@ -510,6 +437,7 @@ impl BlockInfo { total_cu_requested: total_cu_requested as i64, heavily_writelocked_accounts, heavily_readlocked_accounts, + sup_info, }) } } diff --git a/src/cli.rs b/src/cli.rs index e4e65a3..8a24878 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -4,13 +4,13 @@ use clap::Parser; #[command(author, version, about, long_about = None)] pub struct Args { #[arg(short, long, default_value_t = String::from("http://127.0.0.1:10000"))] - pub grpc_address: String, + pub grpc_address_to_fetch_blocks: String, - #[arg(long)] + #[arg(short = 'x', long)] pub grpc_x_token: Option, - #[arg(short, long, default_value_t = String::from("http://127.0.0.1:8899"))] - pub rpc_url: String, + #[arg(short, long, value_delimiter = ',')] + pub banking_grpc_addresses: Vec, /// enable metrics to prometheus at addr #[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))] diff --git a/src/main.rs b/src/main.rs index 70b9590..699d499 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,10 @@ use clap::Parser; -use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use itertools::Itertools; use std::{ collections::HashMap, sync::{atomic::AtomicU64, Arc}, time::Duration, }; -use tokio::time::Instant; use crate::prometheus_sync::PrometheusSync; use block_info::BlockInfo; @@ -14,12 +13,8 @@ use dashmap::DashMap; use futures::StreamExt; use log::{debug, error}; use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge}; -use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; +use solana_sdk::signature::Signature; use transaction_info::TransactionInfo; -use yellowstone_grpc_client::GeyserGrpcClient; -use yellowstone_grpc_proto::prelude::{ - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, -}; mod block_info; mod cli; @@ -44,196 +39,259 @@ lazy_static::lazy_static! { register_int_gauge!(opts!("bankingstage_blocks_in_rpc_queue", "Banking stage blocks in rpc queue")).unwrap(); } +// fn spawn_rpc_block_processor() -> tokio::task::JoinHandle<()> { +// let map_of_infos = map_of_infos.clone(); +// let postgres = postgres.clone(); +// let slot_by_errors = slot_by_errors.clone(); +// tokio::spawn(async move { +// let mut rpc_blocks_reciever = rpc_blocks_reciever; +// let rpc_client = RpcClient::new(rpc_url); +// while let Some((wait_until, slot)) = rpc_blocks_reciever.recv().await { +// tokio::time::sleep_until(wait_until).await; +// let block = if let Ok(block) = rpc_client +// .get_block_with_config( +// slot, +// solana_rpc_client_api::config::RpcBlockConfig { +// encoding: Some( +// solana_transaction_status::UiTransactionEncoding::Base64, +// ), +// transaction_details: Some( +// solana_transaction_status::TransactionDetails::Full, +// ), +// rewards: Some(true), +// commitment: Some(CommitmentConfig::confirmed()), +// max_supported_transaction_version: Some(0), +// }, +// ) +// .await +// { +// block +// } else { +// continue; +// }; + +// let Some(transactions) = &block.transactions else { +// continue; +// }; + +// for transaction in transactions { +// let Some(transaction) = &transaction.transaction.decode() else { +// continue; +// }; +// let signature = transaction.signatures[0].to_string(); +// if let Some(mut info) = map_of_infos.get_mut(&signature) { +// info.add_rpc_transaction(slot, transaction); +// } +// } + +// let banking_stage_error_count = slot_by_errors +// .get(&slot) +// .map(|x| *x.value() as i64) +// .unwrap_or_default(); +// let block_info = +// BlockInfo::new_from_rpc_block(slot, &block, banking_stage_error_count); +// if let Some(block_info) = block_info { +// BANKING_STAGE_ERROR_COUNT.add(banking_stage_error_count); +// TXERROR_COUNT.add( +// block_info.processed_transactions - block_info.successful_transactions, +// ); +// if let Err(e) = postgres.save_block_info(block_info).await { +// error!("Error saving block {}", e); +// } +// slot_by_errors.remove(&slot); +// } +// } +// }) +// } + +pub async fn start_tracking_banking_stage_errors( + grpc_address: String, + map_of_infos: Arc>, + slot_by_errors: Arc>, +) { + loop { + let token: Option = None; + let mut client = yellowstone_grpc_client::GeyserGrpcClient::connect( + grpc_address.clone(), + token.clone(), + None, + ) + .unwrap(); + + let mut geyser_stream = client + .subscribe_once( + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + Default::default(), + Default::default(), + Some(yellowstone_grpc_proto::prelude::CommitmentLevel::Processed), + Default::default(), + true, + ) + .await + .unwrap(); + + while let Some(message) = geyser_stream.next().await { + let Ok(message) = message else { + continue; + }; + + let Some(update) = message.update_oneof else { + continue; + }; + + if let yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) = update { + BANKING_STAGE_ERROR_EVENT_COUNT.inc(); + let sig = transaction.signature.to_string(); + match slot_by_errors.get_mut(&transaction.slot) { + Some(mut value) => { + *value += 1; + } + None => { + slot_by_errors.insert(transaction.slot, 1); + } + } + match map_of_infos.get_mut(&sig) { + Some(mut x) => { + x.add_notification(&transaction); + } + None => { + let mut x = TransactionInfo::new(&transaction); + x.add_notification(&transaction); + map_of_infos.insert(sig, x); + } + } + } + } + error!("geyser banking stage connection failed {}", grpc_address); + tokio::time::sleep(Duration::from_secs(1)).await; + } +} + #[tokio::main()] async fn main() { tracing_subscriber::fmt::init(); let args = Args::parse(); - let rpc_url = args.rpc_url; let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone()); - let grpc_addr = args.grpc_address; - let mut client = GeyserGrpcClient::connect(grpc_addr, args.grpc_x_token, None).unwrap(); + let grpc_block_addr = args.grpc_address_to_fetch_blocks; + let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect( + grpc_block_addr, + args.grpc_x_token, + None, + ) + .unwrap(); let map_of_infos = Arc::new(DashMap::::new()); let slot_by_errors = Arc::new(DashMap::::new()); let postgres = postgres::Postgres::new().await; let slot = Arc::new(AtomicU64::new(0)); - let mut blocks_subs = HashMap::new(); - blocks_subs.insert( - "client".to_string(), - SubscribeRequestFilterBlocks { - account_include: Default::default(), - include_transactions: Some(true), - include_accounts: Some(false), - include_entries: Some(false), - }, - ); - let commitment_level = CommitmentLevel::Processed; - - let mut geyser_stream = client - .subscribe_once( - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - blocks_subs, - Default::default(), - Some(commitment_level), - Default::default(), - true, - ) - .await - .unwrap(); - postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); - - // get blocks from rpc server - // because validator does not send banking blocks - let (rpc_blocks_sender, rpc_blocks_reciever) = - tokio::sync::mpsc::unbounded_channel::<(Instant, u64)>(); - let jh2 = { - let map_of_infos = map_of_infos.clone(); - let postgres = postgres.clone(); - let slot_by_errors = slot_by_errors.clone(); - tokio::spawn(async move { - let mut rpc_blocks_reciever = rpc_blocks_reciever; - let rpc_client = RpcClient::new(rpc_url); - while let Some((wait_until, slot)) = rpc_blocks_reciever.recv().await { - tokio::time::sleep_until(wait_until).await; - let block = if let Ok(block) = rpc_client - .get_block_with_config( - slot, - solana_rpc_client_api::config::RpcBlockConfig { - encoding: Some( - solana_transaction_status::UiTransactionEncoding::Base64, - ), - transaction_details: Some( - solana_transaction_status::TransactionDetails::Full, - ), - rewards: Some(true), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }, - ) - .await - { - block - } else { - continue; - }; - - let Some(transactions) = &block.transactions else { - continue; - }; - - for transaction in transactions { - let Some(transaction) = &transaction.transaction.decode() else { - continue; - }; - let signature = transaction.signatures[0].to_string(); - if let Some(mut info) = map_of_infos.get_mut(&signature) { - info.add_rpc_transaction(slot, transaction); - } - } - - let banking_stage_error_count = slot_by_errors - .get(&slot) - .map(|x| *x.value() as i64) - .unwrap_or_default(); - let block_info = - BlockInfo::new_from_rpc_block(slot, &block, banking_stage_error_count); - if let Some(block_info) = block_info { - BANKING_STAGE_ERROR_COUNT.add(banking_stage_error_count); - TXERROR_COUNT.add( - block_info.processed_transactions - block_info.successful_transactions, - ); - if let Err(e) = postgres.save_block_info(block_info).await { - error!("Error saving block {}", e); - } - slot_by_errors.remove(&slot); - } - } + let _jhs = args + .banking_grpc_addresses + .iter() + .map(|address| { + let address = address.clone(); + let map_of_infos = map_of_infos.clone(); + let slot_by_errors = slot_by_errors.clone(); + tokio::spawn(async move { + start_tracking_banking_stage_errors(address, map_of_infos, slot_by_errors).await; + }) }) - }; + .collect_vec(); - while let Some(message) = geyser_stream.next().await { - let Ok(message) = message else { - continue; - }; + loop { + let mut blocks_subs = HashMap::new(); + blocks_subs.insert( + "sidecar_block_subscription".to_string(), + yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterBlocks { + account_include: Default::default(), + include_transactions: Some(true), + include_accounts: Some(false), + include_entries: Some(false), + }, + ); - let Some(update) = message.update_oneof else { - continue; - }; + let mut slot_sub = HashMap::new(); + slot_sub.insert( + "slot_sub".to_string(), + yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterSlots { + filter_by_commitment: None, + }, + ); - match update { - UpdateOneof::BankingTransactionErrors(transaction) => { - if transaction.error.is_none() { - continue; - } - BANKING_STAGE_ERROR_EVENT_COUNT.inc(); - let sig = transaction.signature.to_string(); - match slot_by_errors.get_mut(&transaction.slot) { - Some(mut value) => { - *value += 1; - } - None => { - slot_by_errors.insert(transaction.slot, 1); - rpc_blocks_sender - .send((Instant::now() + Duration::from_secs(30), transaction.slot)) - .expect("should works"); - } - } - match map_of_infos.get_mut(&sig) { - Some(mut x) => { - x.add_notification(&transaction); - } - None => { - let mut x = TransactionInfo::new(&transaction); - x.add_notification(&transaction); - map_of_infos.insert(sig, x); - } - } - } - UpdateOneof::Block(block) => { - debug!("got block {}", block.slot); - BLOCK_TXS.set(block.transactions.len() as i64); - BANKING_STAGE_BLOCKS_COUNTER.inc(); - BANKING_STAGE_BLOCKS_TASK.inc(); - let postgres = postgres.clone(); - let slot = slot.clone(); - let map_of_infos = map_of_infos.clone(); - let slot_by_error = slot_by_errors.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(30)).await; - for transaction in &block.transactions { - let Some(tx) = &transaction.transaction else { - continue; - }; - let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); - if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) { - info.add_transaction(transaction, block.slot); + let mut geyser_stream = client + .subscribe_once( + slot_sub, + Default::default(), + HashMap::new(), + Default::default(), + blocks_subs, + Default::default(), + None, + Default::default(), + None, + ) + .await + .unwrap(); + while let Some(message) = geyser_stream.next().await { + let Ok(message) = message else { + continue; + }; + + let Some(update) = message.update_oneof else { + continue; + }; + + match update { + yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block( + block, + ) => { + debug!("got block {}", block.slot); + BLOCK_TXS.set(block.transactions.len() as i64); + BANKING_STAGE_BLOCKS_COUNTER.inc(); + BANKING_STAGE_BLOCKS_TASK.inc(); + let postgres = postgres.clone(); + let slot = slot.clone(); + let map_of_infos = map_of_infos.clone(); + let slot_by_errors = slot_by_errors.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(30)).await; + for transaction in &block.transactions { + let Some(tx) = &transaction.transaction else { + continue; + }; + let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); + if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) { + info.add_transaction(transaction, block.slot); + } } - } - let block_info = BlockInfo::new(&block); + let banking_stage_error_count = + slot_by_errors.get(&block.slot).map(|x| *x.value() as i64); + let block_info = BlockInfo::new(&block, banking_stage_error_count); - TXERROR_COUNT.add( - block_info.processed_transactions - block_info.successful_transactions, - ); - if let Err(e) = postgres.save_block_info(block_info).await { - error!("Error saving block {}", e); - } - slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); - slot_by_error.remove(&block.slot); - BANKING_STAGE_BLOCKS_TASK.dec(); - }); - // delay queue so that we get all the banking stage errors before processing block - } - _ => {} - }; + TXERROR_COUNT.add( + block_info.processed_transactions - block_info.successful_transactions, + ); + if let Err(e) = postgres.save_block_info(block_info).await { + error!("Error saving block {}", e); + } + slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); + slot_by_errors.remove(&block.slot); + BANKING_STAGE_BLOCKS_TASK.dec(); + }); + // delay queue so that we get all the banking stage errors before processing block + } + _ => {} + }; + } + log::error!("stopping the sidecar, geyser block stream is broken"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - jh2.await.unwrap(); } diff --git a/src/postgres.rs b/src/postgres.rs index 2ed40f1..d844600 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -7,13 +7,20 @@ use anyhow::Context; use base64::Engine; use chrono::{DateTime, Utc}; use dashmap::DashMap; +use futures::pin_mut; use itertools::Itertools; use log::{debug, error, info}; use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use serde::Serialize; use solana_sdk::transaction::TransactionError; -use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket}; +use tokio_postgres::{ + binary_copy::BinaryCopyInWriter, + config::SslMode, + tls::MakeTlsConnect, + types::{ToSql, Type}, + Client, CopyInSink, NoTls, Socket, +}; use crate::{block_info::BlockInfo, transaction_info::TransactionInfo}; @@ -83,7 +90,7 @@ impl PostgresSession { Ok(client) } - pub fn multiline_query(query: &mut String, args: usize, rows: usize, types: &[&str]) { + pub fn _multiline_query(query: &mut String, args: usize, rows: usize, types: &[&str]) { let mut arg_index = 1usize; for row in 0..rows { query.push('('); @@ -117,10 +124,33 @@ impl PostgresSession { } const NUMBER_OF_ARGS: usize = 10; - let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len()); let txs: Vec = txs.iter().map(PostgresTransactionInfo::from).collect(); + + let statement = r#" + COPY banking_stage_results.transaction_infos( + signature, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used, processed_slot + ) FROM STDIN BINARY + "#; + let sink: CopyInSink = self.copy_in(statement).await.unwrap(); + let writer = BinaryCopyInWriter::new( + sink, + &[ + Type::TEXT, + Type::TEXT, + Type::BOOL, + Type::BOOL, + Type::INT8, + Type::INT8, + Type::INT8, + Type::TIMESTAMPTZ, + Type::TEXT, + Type::INT8, + ], + ); + pin_mut!(writer); for tx in txs.iter() { + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS); args.push(&tx.signature); args.push(&tx.errors); args.push(&tx.is_executed); @@ -131,24 +161,24 @@ impl PostgresSession { args.push(&tx.utc_timestamp); args.push(&tx.accounts_used); args.push(&tx.processed_slot); + + writer.as_mut().write(&args).await.unwrap(); } - - let mut query = String::from( - r#" - INSERT INTO banking_stage_results.transaction_infos - (signature, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used, processed_slot) - VALUES - "#, - ); - - Self::multiline_query(&mut query, NUMBER_OF_ARGS, txs.len(), &[]); - self.client.execute(&query, &args).await?; - + writer.finish().await.unwrap(); Ok(()) } + pub async fn copy_in( + &self, + statement: &str, + ) -> Result, tokio_postgres::error::Error> { + // BinaryCopyInWriter + // https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs + self.client.copy_in(statement).await + } + pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> { - const NUMBER_OF_ARGS: usize = 10; + const NUMBER_OF_ARGS: usize = 11; let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS); args.push(&block_info.block_hash); args.push(&block_info.slot); @@ -165,17 +195,34 @@ impl PostgresSession { args.push(&heavily_writelocked_accounts); args.push(&heavily_readlocked_accounts); - let mut query = String::from( - r#" - INSERT INTO banking_stage_results.blocks - (block_hash, slot, leader_identity, successful_transactions, banking_stage_errors, processed_transactions, total_cu_used, total_cu_requested, heavily_writelocked_accounts, heavily_readlocked_accounts) - VALUES - "#, + let supp_infos = serde_json::to_string(&block_info.sup_info).unwrap_or_default(); + args.push(&supp_infos); + + let statement = r#" + COPY banking_stage_results.blocks( + block_hash, slot, leader_identity, successful_transactions, banking_stage_errors, processed_transactions, total_cu_used, total_cu_requested, heavily_writelocked_accounts, heavily_readlocked_accounts, supp_infos + ) FROM STDIN BINARY + "#; + let sink: CopyInSink = self.copy_in(statement).await.unwrap(); + let writer = BinaryCopyInWriter::new( + sink, + &[ + Type::TEXT, + Type::INT8, + Type::TEXT, + Type::INT8, + Type::INT8, + Type::INT8, + Type::INT8, + Type::INT8, + Type::TEXT, + Type::TEXT, + Type::TEXT, + ], ); - - Self::multiline_query(&mut query, NUMBER_OF_ARGS, 1, &[]); - self.client.execute(&query, &args).await?; - + pin_mut!(writer); + writer.as_mut().write(&args).await.unwrap(); + writer.finish().await.unwrap(); Ok(()) } } diff --git a/src/transaction_info.rs b/src/transaction_info.rs index f36ad17..93f6cd0 100644 --- a/src/transaction_info.rs +++ b/src/transaction_info.rs @@ -14,9 +14,7 @@ use solana_sdk::{ slot_history::Slot, transaction::{TransactionError, VersionedTransaction}, }; -use yellowstone_grpc_proto::prelude::{ - SubscribeUpdateBankingTransactionResults, SubscribeUpdateTransactionInfo, -}; +use yellowstone_grpc_proto::prelude::SubscribeUpdateBankingTransactionResults; fn convert_transaction_error_into_int(error: &TransactionError) -> u8 { match error { @@ -223,7 +221,11 @@ impl TransactionInfo { self.processed_slot = Some(slot); } - pub fn add_transaction(&mut self, transaction: &SubscribeUpdateTransactionInfo, slot: Slot) { + pub fn add_transaction( + &mut self, + transaction: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateTransactionInfo, + slot: Slot, + ) { let Some(transaction) = &transaction.transaction else { return; };