Adding supp info (#5)

* adding prioritization fees with write lock accounts

* adding supplimentary infos

* moving to multiple geyser endpoints, using copyin instead of insert

* mincor changes

* adding missing column in transaction info

* minor bug fixes and fmt

* restart block subscription on geyser

* resubsribing to banking errors notification on failure
This commit is contained in:
galactus 2023-12-01 12:08:27 +01:00 committed by GitHub
parent 96b4e7e2b7
commit fbf2e16988
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 560 additions and 486 deletions

40
Cargo.lock generated
View File

@ -1440,8 +1440,10 @@ dependencies = [
"tokio",
"tokio-postgres",
"tracing-subscriber",
"yellowstone-grpc-client",
"yellowstone-grpc-proto",
"yellowstone-grpc-client 1.11.1+solana.1.16.17",
"yellowstone-grpc-client 1.12.0+solana.1.16.17",
"yellowstone-grpc-proto 1.10.0+solana.1.16.17",
"yellowstone-grpc-proto 1.11.0+solana.1.16.17",
]
[[package]]
@ -4819,7 +4821,22 @@ dependencies = [
"thiserror",
"tonic",
"tonic-health",
"yellowstone-grpc-proto",
"yellowstone-grpc-proto 1.10.0+solana.1.16.17",
]
[[package]]
name = "yellowstone-grpc-client"
version = "1.12.0+solana.1.16.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e58204f372a7e82d15d72bdf99334029c4e9cdc15bd2e9a5c33b598d9f1eb8b6"
dependencies = [
"bytes",
"futures",
"http",
"thiserror",
"tonic",
"tonic-health",
"yellowstone-grpc-proto 1.11.0+solana.1.16.17",
]
[[package]]
@ -4838,6 +4855,23 @@ dependencies = [
"tonic-build",
]
[[package]]
name = "yellowstone-grpc-proto"
version = "1.11.0+solana.1.16.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00d751c6ef3093ec90ab1e16c6a504b5bea99aca6c688c429fed4cc56782f57e"
dependencies = [
"anyhow",
"bincode",
"prost",
"protobuf-src",
"solana-account-decoder",
"solana-sdk",
"solana-transaction-status",
"tonic",
"tonic-build",
]
[[package]]
name = "zerocopy"
version = "0.7.11"

View File

@ -37,6 +37,10 @@ tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros", "time"] }
yellowstone-grpc-client = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"}
yellowstone-grpc-proto = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"}
yellowstone-grpc-client_original = { package = "yellowstone-grpc-client", version = "1.12.0+solana.1.16.17" }
yellowstone-grpc-proto_original = { package = "yellowstone-grpc-proto", version = "1.11.0+solana.1.16.17" }
[patch.crates-io]
solana-account-decoder = { git = "https://github.com/rpcpool/solana-public.git", tag = "v1.16.17-geyser-block-v3-mango" }
solana-geyser-plugin-interface = { git = "https://github.com/rpcpool/solana-public.git", tag = "v1.16.17-geyser-block-v3-mango" }

View File

@ -10,12 +10,13 @@ CREATE TABLE banking_stage_results.transaction_infos (
prioritization_fees BIGINT,
utc_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
accounts_used text,
processed_slot BIGINT
processed_slot BIGINT,
supp_infos text
);
CREATE TABLE banking_stage_results.blocks (
block_hash CHAR(44) PRIMARY KEY,
slot BIGINT,
slot BIGINT PRIMARY KEY,
block_hash CHAR(44),
leader_identity CHAR(44),
successful_transactions BIGINT,
banking_stage_errors BIGINT,
@ -23,7 +24,8 @@ CREATE TABLE banking_stage_results.blocks (
total_cu_used BIGINT,
total_cu_requested BIGINT,
heavily_writelocked_accounts text,
heavily_readlocked_accounts text
heavily_readlocked_accounts text,
supp_infos text
);
CREATE INDEX idx_blocks_slot ON banking_stage_results.blocks(slot);
@ -35,5 +37,4 @@ CREATE INDEX idx_blocks_slot_errors ON banking_stage_results.blocks(slot) WHERE
CREATE INDEX idx_transaction_infos_timestamp ON banking_stage_results.transaction_infos(utc_timestamp);
-- optional
CLUSTER banking_stage_results.transaction_infos using idx_transaction_infos_timestamp;
VACUUM FULL banking_stage_results.transaction_infos;
VACUUM FULL banking_stage_results.transaction_infos;

View File

@ -13,7 +13,6 @@ use solana_sdk::{
};
use solana_transaction_status::{RewardType, UiConfirmedBlock};
use std::collections::HashMap;
use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock;
#[derive(Serialize, Debug, Clone)]
pub struct AccountUsage {
@ -48,6 +47,15 @@ impl From<&AccountData> for AccountUsage {
}
}
#[derive(Serialize, Debug)]
pub struct BlockSupplimentaryInfo {
pub p_min: u64,
pub p_median: u64,
pub p_75: u64,
pub p_90: u64,
pub p_max: u64,
}
pub struct BlockInfo {
pub block_hash: String,
pub slot: i64,
@ -59,10 +67,160 @@ pub struct BlockInfo {
pub total_cu_requested: i64,
pub heavily_writelocked_accounts: Vec<AccountUsage>,
pub heavily_readlocked_accounts: Vec<AccountUsage>,
pub sup_info: Option<BlockSupplimentaryInfo>,
}
impl BlockInfo {
pub fn new(block: &SubscribeUpdateBlock) -> BlockInfo {
pub fn process_versioned_message(
message: &VersionedMessage,
prio_fees_in_block: &mut Vec<u64>,
writelocked_accounts: &mut HashMap<Pubkey, AccountData>,
readlocked_accounts: &mut HashMap<Pubkey, AccountData>,
cu_consumed: u64,
total_cu_requested: &mut u64,
is_vote: bool,
) {
let (cu_requested, prio_fees, nb_ix_except_cb) = {
let mut cu_request: Option<u64> = None;
let mut prio_fees: Option<u64> = None;
let mut nb_ix_except_cb: u64 = 0;
for ix in message.instructions() {
if ix
.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
let ix_which =
try_from_slice_unchecked::<ComputeBudgetInstruction>(ix.data.as_slice());
if let Ok(ix_which) = ix_which {
match ix_which {
ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
} => {
cu_request = Some(units as u64);
if additional_fee > 0 {
prio_fees = Some(
(units as u64)
.saturating_mul(1000)
.saturating_div(additional_fee as u64),
);
}
}
ComputeBudgetInstruction::SetComputeUnitLimit(units) => {
cu_request = Some(units as u64)
}
ComputeBudgetInstruction::SetComputeUnitPrice(price) => {
prio_fees = Some(price)
}
_ => {}
}
}
} else {
nb_ix_except_cb += 1;
}
}
(cu_request, prio_fees, nb_ix_except_cb)
};
let prioritization_fees = prio_fees.unwrap_or_default();
prio_fees_in_block.push(prioritization_fees);
let cu_requested =
std::cmp::min(1_400_000, cu_requested.unwrap_or(200000 * nb_ix_except_cb));
*total_cu_requested += cu_requested;
if !is_vote {
let accounts = message
.static_account_keys()
.iter()
.enumerate()
.map(|(index, account)| (message.is_maybe_writable(index), *account))
.collect_vec();
for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) {
match writelocked_accounts.get_mut(&writable_account) {
Some(x) => {
x.cu_requested += cu_requested;
x.cu_consumed += cu_consumed;
x.vec_pf.push(prioritization_fees);
}
None => {
writelocked_accounts.insert(
writable_account,
AccountData {
key: writable_account.to_string(),
cu_consumed,
cu_requested,
vec_pf: vec![prioritization_fees],
},
);
}
}
}
for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) {
match readlocked_accounts.get_mut(&readable_account) {
Some(x) => {
x.cu_requested += cu_requested;
x.cu_consumed += cu_consumed;
x.vec_pf.push(prioritization_fees);
}
None => {
readlocked_accounts.insert(
readable_account,
AccountData {
key: readable_account.to_string(),
cu_consumed,
cu_requested,
vec_pf: vec![prioritization_fees],
},
);
}
}
}
}
}
pub fn calculate_account_usage(
writelocked_accounts: &HashMap<Pubkey, AccountData>,
readlocked_accounts: &HashMap<Pubkey, AccountData>,
) -> (Vec<AccountUsage>, Vec<AccountUsage>) {
let mut heavily_writelocked_accounts = writelocked_accounts
.iter()
.map(|(_, data)| AccountUsage::from(data))
.collect_vec();
heavily_writelocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
let mut heavily_readlocked_accounts: Vec<_> = readlocked_accounts
.iter()
.map(|(_, data)| AccountUsage::from(data))
.collect();
heavily_readlocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
(heavily_writelocked_accounts, heavily_readlocked_accounts)
}
pub fn calculate_supp_info(
prio_fees_in_block: &mut Vec<u64>,
) -> Option<BlockSupplimentaryInfo> {
if !prio_fees_in_block.is_empty() {
prio_fees_in_block.sort();
let median_index = prio_fees_in_block.len() / 2;
let p75_index = prio_fees_in_block.len() * 75 / 100;
let p90_index = prio_fees_in_block.len() * 90 / 100;
Some(BlockSupplimentaryInfo {
p_min: prio_fees_in_block[0],
p_median: prio_fees_in_block[median_index],
p_75: prio_fees_in_block[p75_index],
p_90: prio_fees_in_block[p90_index],
p_max: prio_fees_in_block.last().cloned().unwrap_or_default(),
})
} else {
None
}
}
pub fn new(
block: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateBlock,
banking_stage_errors: Option<i64>,
) -> BlockInfo {
let block_hash = block.blockhash.clone();
let slot = block.slot;
let leader_identity = block
@ -96,6 +254,7 @@ impl BlockInfo {
let mut writelocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut readlocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut total_cu_requested: u64 = 0;
let mut prio_fees_in_block = vec![];
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
@ -158,139 +317,21 @@ impl BlockInfo {
.collect(),
});
let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((
units,
Some(
((units.saturating_mul(1000))
.saturating_div(additional_fee))
as u64,
),
));
} else {
return Some((units, None));
}
}
}
None
});
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let cu_requested = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
})
.or(legacy_cu_requested);
let prioritization_fees = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}
None
})
.unwrap_or_default();
let cu_requested = cu_requested.unwrap_or(200000) as u64;
let cu_consumed = meta.compute_units_consumed.unwrap_or(0);
total_cu_requested += cu_requested;
let accounts = message
.static_account_keys()
.iter()
.enumerate()
.map(|(index, account)| (message.is_maybe_writable(index), *account))
.collect_vec();
for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) {
match writelocked_accounts.get_mut(&writable_account) {
Some(x) => {
x.cu_requested += cu_requested;
x.cu_consumed += cu_consumed;
if prioritization_fees > 0 {
x.vec_pf.push(prioritization_fees);
}
}
None => {
writelocked_accounts.insert(
writable_account,
AccountData {
key: writable_account.to_string(),
cu_consumed,
cu_requested,
vec_pf: vec![prioritization_fees],
},
);
}
}
}
for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) {
match readlocked_accounts.get_mut(&readable_account) {
Some(x) => {
x.cu_requested += cu_requested;
x.cu_consumed += cu_consumed;
if prioritization_fees > 0 {
x.vec_pf.push(prioritization_fees);
}
}
None => {
readlocked_accounts.insert(
readable_account,
AccountData {
key: readable_account.to_string(),
cu_consumed,
cu_requested,
vec_pf: vec![prioritization_fees],
},
);
}
}
}
Self::process_versioned_message(
&message,
&mut prio_fees_in_block,
&mut writelocked_accounts,
&mut readlocked_accounts,
meta.compute_units_consumed.unwrap_or(0),
&mut total_cu_requested,
transaction.is_vote,
);
}
let mut heavily_writelocked_accounts = writelocked_accounts
.iter()
.filter(|(_, account)| account.cu_consumed > 1000000)
.map(|(_, data)| AccountUsage::from(data))
.collect_vec();
heavily_writelocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
let (heavily_writelocked_accounts, heavily_readlocked_accounts) =
Self::calculate_account_usage(&writelocked_accounts, &readlocked_accounts);
let mut heavily_readlocked_accounts: Vec<_> = readlocked_accounts
.iter()
.filter(|(_, acc)| acc.cu_consumed > 1000000)
.map(|(_, data)| AccountUsage::from(data))
.collect();
heavily_readlocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
let sup_info = Self::calculate_supp_info(&mut prio_fees_in_block);
BlockInfo {
block_hash,
@ -298,15 +339,16 @@ impl BlockInfo {
leader_identity,
successful_transactions: successful_transactions as i64,
processed_transactions: processed_transactions as i64,
banking_stage_errors: None,
banking_stage_errors,
total_cu_used,
total_cu_requested: total_cu_requested as i64,
heavily_writelocked_accounts,
heavily_readlocked_accounts,
sup_info,
}
}
pub fn new_from_rpc_block(
pub fn _new_from_rpc_block(
slot: Slot,
block: &UiConfirmedBlock,
banking_stage_errors_count: i64,
@ -352,6 +394,7 @@ impl BlockInfo {
let mut writelocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut readlocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut total_cu_requested: u64 = 0;
let mut prio_fees_in_block = vec![];
for transaction in transactions {
let Some(tx) = transaction.transaction.decode() else {
continue;
@ -362,143 +405,27 @@ impl BlockInfo {
let Some(meta) = &transaction.meta else {
continue;
};
let is_vote = false;
let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((
units,
Some(
((units.saturating_mul(1000))
.saturating_div(additional_fee))
as u64,
),
));
} else {
return Some((units, None));
}
}
}
None
});
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let cu_requested = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
})
.or(legacy_cu_requested);
let cu_requested = cu_requested.unwrap_or(200000) as u64;
let cu_consumed = match meta.compute_units_consumed {
solana_transaction_status::option_serializer::OptionSerializer::Some(x) => x,
solana_transaction_status::option_serializer::OptionSerializer::Skip => 0,
solana_transaction_status::option_serializer::OptionSerializer::None => 0,
};
total_cu_requested += cu_requested;
let prioritization_fees = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}
None
})
.unwrap_or_default();
let accounts = message
.static_account_keys()
.iter()
.enumerate()
.map(|(index, account)| (message.is_maybe_writable(index), *account))
.collect_vec();
for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) {
match writelocked_accounts.get_mut(&writable_account) {
Some(x) => {
x.cu_requested += cu_requested;
x.cu_consumed += cu_consumed;
if prioritization_fees > 0 {
x.vec_pf.push(prioritization_fees)
}
}
None => {
writelocked_accounts.insert(
writable_account,
AccountData {
key: writable_account.to_string(),
cu_consumed,
cu_requested,
vec_pf: vec![prioritization_fees],
},
);
}
}
}
for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) {
match readlocked_accounts.get_mut(&readable_account) {
Some(x) => {
x.cu_requested += cu_requested;
x.cu_consumed += cu_consumed;
if prioritization_fees > 0 {
x.vec_pf.push(prioritization_fees)
}
}
None => {
readlocked_accounts.insert(
readable_account,
AccountData {
key: readable_account.to_string(),
cu_consumed,
cu_requested,
vec_pf: vec![prioritization_fees],
},
);
}
}
}
Self::process_versioned_message(
message,
&mut prio_fees_in_block,
&mut writelocked_accounts,
&mut readlocked_accounts,
match meta.compute_units_consumed {
solana_transaction_status::option_serializer::OptionSerializer::None => 0,
solana_transaction_status::option_serializer::OptionSerializer::Skip => 0,
solana_transaction_status::option_serializer::OptionSerializer::Some(x) => x,
},
&mut total_cu_requested,
is_vote,
);
}
let mut heavily_writelocked_accounts = writelocked_accounts
.iter()
.filter(|(_, account)| account.cu_consumed > 1000000)
.map(|(_, data)| AccountUsage::from(data))
.collect_vec();
heavily_writelocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
let (heavily_writelocked_accounts, heavily_readlocked_accounts) =
Self::calculate_account_usage(&writelocked_accounts, &readlocked_accounts);
let mut heavily_readlocked_accounts: Vec<_> = readlocked_accounts
.iter()
.filter(|(_, acc)| acc.cu_consumed > 1000000)
.map(|(_, data)| AccountUsage::from(data))
.collect();
heavily_readlocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
let sup_info = Self::calculate_supp_info(&mut prio_fees_in_block);
Some(BlockInfo {
block_hash,
slot: slot as i64,
@ -510,6 +437,7 @@ impl BlockInfo {
total_cu_requested: total_cu_requested as i64,
heavily_writelocked_accounts,
heavily_readlocked_accounts,
sup_info,
})
}
}

View File

@ -4,13 +4,13 @@ use clap::Parser;
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long, default_value_t = String::from("http://127.0.0.1:10000"))]
pub grpc_address: String,
pub grpc_address_to_fetch_blocks: String,
#[arg(long)]
#[arg(short = 'x', long)]
pub grpc_x_token: Option<String>,
#[arg(short, long, default_value_t = String::from("http://127.0.0.1:8899"))]
pub rpc_url: String,
#[arg(short, long, value_delimiter = ',')]
pub banking_grpc_addresses: Vec<String>,
/// enable metrics to prometheus at addr
#[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))]

View File

@ -1,11 +1,10 @@
use clap::Parser;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use itertools::Itertools;
use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use tokio::time::Instant;
use crate::prometheus_sync::PrometheusSync;
use block_info::BlockInfo;
@ -14,12 +13,8 @@ use dashmap::DashMap;
use futures::StreamExt;
use log::{debug, error};
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use solana_sdk::signature::Signature;
use transaction_info::TransactionInfo;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
};
mod block_info;
mod cli;
@ -44,196 +39,259 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("bankingstage_blocks_in_rpc_queue", "Banking stage blocks in rpc queue")).unwrap();
}
// fn spawn_rpc_block_processor() -> tokio::task::JoinHandle<()> {
// let map_of_infos = map_of_infos.clone();
// let postgres = postgres.clone();
// let slot_by_errors = slot_by_errors.clone();
// tokio::spawn(async move {
// let mut rpc_blocks_reciever = rpc_blocks_reciever;
// let rpc_client = RpcClient::new(rpc_url);
// while let Some((wait_until, slot)) = rpc_blocks_reciever.recv().await {
// tokio::time::sleep_until(wait_until).await;
// let block = if let Ok(block) = rpc_client
// .get_block_with_config(
// slot,
// solana_rpc_client_api::config::RpcBlockConfig {
// encoding: Some(
// solana_transaction_status::UiTransactionEncoding::Base64,
// ),
// transaction_details: Some(
// solana_transaction_status::TransactionDetails::Full,
// ),
// rewards: Some(true),
// commitment: Some(CommitmentConfig::confirmed()),
// max_supported_transaction_version: Some(0),
// },
// )
// .await
// {
// block
// } else {
// continue;
// };
// let Some(transactions) = &block.transactions else {
// continue;
// };
// for transaction in transactions {
// let Some(transaction) = &transaction.transaction.decode() else {
// continue;
// };
// let signature = transaction.signatures[0].to_string();
// if let Some(mut info) = map_of_infos.get_mut(&signature) {
// info.add_rpc_transaction(slot, transaction);
// }
// }
// let banking_stage_error_count = slot_by_errors
// .get(&slot)
// .map(|x| *x.value() as i64)
// .unwrap_or_default();
// let block_info =
// BlockInfo::new_from_rpc_block(slot, &block, banking_stage_error_count);
// if let Some(block_info) = block_info {
// BANKING_STAGE_ERROR_COUNT.add(banking_stage_error_count);
// TXERROR_COUNT.add(
// block_info.processed_transactions - block_info.successful_transactions,
// );
// if let Err(e) = postgres.save_block_info(block_info).await {
// error!("Error saving block {}", e);
// }
// slot_by_errors.remove(&slot);
// }
// }
// })
// }
pub async fn start_tracking_banking_stage_errors(
grpc_address: String,
map_of_infos: Arc<DashMap<String, TransactionInfo>>,
slot_by_errors: Arc<DashMap<u64, u64>>,
) {
loop {
let token: Option<String> = None;
let mut client = yellowstone_grpc_client::GeyserGrpcClient::connect(
grpc_address.clone(),
token.clone(),
None,
)
.unwrap();
let mut geyser_stream = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
Default::default(),
Default::default(),
Some(yellowstone_grpc_proto::prelude::CommitmentLevel::Processed),
Default::default(),
true,
)
.await
.unwrap();
while let Some(message) = geyser_stream.next().await {
let Ok(message) = message else {
continue;
};
let Some(update) = message.update_oneof else {
continue;
};
if let yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) = update {
BANKING_STAGE_ERROR_EVENT_COUNT.inc();
let sig = transaction.signature.to_string();
match slot_by_errors.get_mut(&transaction.slot) {
Some(mut value) => {
*value += 1;
}
None => {
slot_by_errors.insert(transaction.slot, 1);
}
}
match map_of_infos.get_mut(&sig) {
Some(mut x) => {
x.add_notification(&transaction);
}
None => {
let mut x = TransactionInfo::new(&transaction);
x.add_notification(&transaction);
map_of_infos.insert(sig, x);
}
}
}
}
error!("geyser banking stage connection failed {}", grpc_address);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
#[tokio::main()]
async fn main() {
tracing_subscriber::fmt::init();
let args = Args::parse();
let rpc_url = args.rpc_url;
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
let grpc_addr = args.grpc_address;
let mut client = GeyserGrpcClient::connect(grpc_addr, args.grpc_x_token, None).unwrap();
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect(
grpc_block_addr,
args.grpc_x_token,
None,
)
.unwrap();
let map_of_infos = Arc::new(DashMap::<String, TransactionInfo>::new());
let slot_by_errors = Arc::new(DashMap::<u64, u64>::new());
let postgres = postgres::Postgres::new().await;
let slot = Arc::new(AtomicU64::new(0));
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
let commitment_level = CommitmentLevel::Processed;
let mut geyser_stream = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
true,
)
.await
.unwrap();
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
// get blocks from rpc server
// because validator does not send banking blocks
let (rpc_blocks_sender, rpc_blocks_reciever) =
tokio::sync::mpsc::unbounded_channel::<(Instant, u64)>();
let jh2 = {
let map_of_infos = map_of_infos.clone();
let postgres = postgres.clone();
let slot_by_errors = slot_by_errors.clone();
tokio::spawn(async move {
let mut rpc_blocks_reciever = rpc_blocks_reciever;
let rpc_client = RpcClient::new(rpc_url);
while let Some((wait_until, slot)) = rpc_blocks_reciever.recv().await {
tokio::time::sleep_until(wait_until).await;
let block = if let Ok(block) = rpc_client
.get_block_with_config(
slot,
solana_rpc_client_api::config::RpcBlockConfig {
encoding: Some(
solana_transaction_status::UiTransactionEncoding::Base64,
),
transaction_details: Some(
solana_transaction_status::TransactionDetails::Full,
),
rewards: Some(true),
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
},
)
.await
{
block
} else {
continue;
};
let Some(transactions) = &block.transactions else {
continue;
};
for transaction in transactions {
let Some(transaction) = &transaction.transaction.decode() else {
continue;
};
let signature = transaction.signatures[0].to_string();
if let Some(mut info) = map_of_infos.get_mut(&signature) {
info.add_rpc_transaction(slot, transaction);
}
}
let banking_stage_error_count = slot_by_errors
.get(&slot)
.map(|x| *x.value() as i64)
.unwrap_or_default();
let block_info =
BlockInfo::new_from_rpc_block(slot, &block, banking_stage_error_count);
if let Some(block_info) = block_info {
BANKING_STAGE_ERROR_COUNT.add(banking_stage_error_count);
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres.save_block_info(block_info).await {
error!("Error saving block {}", e);
}
slot_by_errors.remove(&slot);
}
}
let _jhs = args
.banking_grpc_addresses
.iter()
.map(|address| {
let address = address.clone();
let map_of_infos = map_of_infos.clone();
let slot_by_errors = slot_by_errors.clone();
tokio::spawn(async move {
start_tracking_banking_stage_errors(address, map_of_infos, slot_by_errors).await;
})
})
};
.collect_vec();
while let Some(message) = geyser_stream.next().await {
let Ok(message) = message else {
continue;
};
loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"sidecar_block_subscription".to_string(),
yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
let Some(update) = message.update_oneof else {
continue;
};
let mut slot_sub = HashMap::new();
slot_sub.insert(
"slot_sub".to_string(),
yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterSlots {
filter_by_commitment: None,
},
);
match update {
UpdateOneof::BankingTransactionErrors(transaction) => {
if transaction.error.is_none() {
continue;
}
BANKING_STAGE_ERROR_EVENT_COUNT.inc();
let sig = transaction.signature.to_string();
match slot_by_errors.get_mut(&transaction.slot) {
Some(mut value) => {
*value += 1;
}
None => {
slot_by_errors.insert(transaction.slot, 1);
rpc_blocks_sender
.send((Instant::now() + Duration::from_secs(30), transaction.slot))
.expect("should works");
}
}
match map_of_infos.get_mut(&sig) {
Some(mut x) => {
x.add_notification(&transaction);
}
None => {
let mut x = TransactionInfo::new(&transaction);
x.add_notification(&transaction);
map_of_infos.insert(sig, x);
}
}
}
UpdateOneof::Block(block) => {
debug!("got block {}", block.slot);
BLOCK_TXS.set(block.transactions.len() as i64);
BANKING_STAGE_BLOCKS_COUNTER.inc();
BANKING_STAGE_BLOCKS_TASK.inc();
let postgres = postgres.clone();
let slot = slot.clone();
let map_of_infos = map_of_infos.clone();
let slot_by_error = slot_by_errors.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) {
info.add_transaction(transaction, block.slot);
let mut geyser_stream = client
.subscribe_once(
slot_sub,
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
None,
Default::default(),
None,
)
.await
.unwrap();
while let Some(message) = geyser_stream.next().await {
let Ok(message) = message else {
continue;
};
let Some(update) = message.update_oneof else {
continue;
};
match update {
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block(
block,
) => {
debug!("got block {}", block.slot);
BLOCK_TXS.set(block.transactions.len() as i64);
BANKING_STAGE_BLOCKS_COUNTER.inc();
BANKING_STAGE_BLOCKS_TASK.inc();
let postgres = postgres.clone();
let slot = slot.clone();
let map_of_infos = map_of_infos.clone();
let slot_by_errors = slot_by_errors.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) {
info.add_transaction(transaction, block.slot);
}
}
}
let block_info = BlockInfo::new(&block);
let banking_stage_error_count =
slot_by_errors.get(&block.slot).map(|x| *x.value() as i64);
let block_info = BlockInfo::new(&block, banking_stage_error_count);
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres.save_block_info(block_info).await {
error!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
slot_by_error.remove(&block.slot);
BANKING_STAGE_BLOCKS_TASK.dec();
});
// delay queue so that we get all the banking stage errors before processing block
}
_ => {}
};
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres.save_block_info(block_info).await {
error!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
slot_by_errors.remove(&block.slot);
BANKING_STAGE_BLOCKS_TASK.dec();
});
// delay queue so that we get all the banking stage errors before processing block
}
_ => {}
};
}
log::error!("stopping the sidecar, geyser block stream is broken");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
jh2.await.unwrap();
}

View File

@ -7,13 +7,20 @@ use anyhow::Context;
use base64::Engine;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use futures::pin_mut;
use itertools::Itertools;
use log::{debug, error, info};
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use serde::Serialize;
use solana_sdk::transaction::TransactionError;
use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket};
use tokio_postgres::{
binary_copy::BinaryCopyInWriter,
config::SslMode,
tls::MakeTlsConnect,
types::{ToSql, Type},
Client, CopyInSink, NoTls, Socket,
};
use crate::{block_info::BlockInfo, transaction_info::TransactionInfo};
@ -83,7 +90,7 @@ impl PostgresSession {
Ok(client)
}
pub fn multiline_query(query: &mut String, args: usize, rows: usize, types: &[&str]) {
pub fn _multiline_query(query: &mut String, args: usize, rows: usize, types: &[&str]) {
let mut arg_index = 1usize;
for row in 0..rows {
query.push('(');
@ -117,10 +124,33 @@ impl PostgresSession {
}
const NUMBER_OF_ARGS: usize = 10;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len());
let txs: Vec<PostgresTransactionInfo> =
txs.iter().map(PostgresTransactionInfo::from).collect();
let statement = r#"
COPY banking_stage_results.transaction_infos(
signature, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used, processed_slot
) FROM STDIN BINARY
"#;
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await.unwrap();
let writer = BinaryCopyInWriter::new(
sink,
&[
Type::TEXT,
Type::TEXT,
Type::BOOL,
Type::BOOL,
Type::INT8,
Type::INT8,
Type::INT8,
Type::TIMESTAMPTZ,
Type::TEXT,
Type::INT8,
],
);
pin_mut!(writer);
for tx in txs.iter() {
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS);
args.push(&tx.signature);
args.push(&tx.errors);
args.push(&tx.is_executed);
@ -131,24 +161,24 @@ impl PostgresSession {
args.push(&tx.utc_timestamp);
args.push(&tx.accounts_used);
args.push(&tx.processed_slot);
writer.as_mut().write(&args).await.unwrap();
}
let mut query = String::from(
r#"
INSERT INTO banking_stage_results.transaction_infos
(signature, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used, processed_slot)
VALUES
"#,
);
Self::multiline_query(&mut query, NUMBER_OF_ARGS, txs.len(), &[]);
self.client.execute(&query, &args).await?;
writer.finish().await.unwrap();
Ok(())
}
pub async fn copy_in(
&self,
statement: &str,
) -> Result<CopyInSink<bytes::Bytes>, tokio_postgres::error::Error> {
// BinaryCopyInWriter
// https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs
self.client.copy_in(statement).await
}
pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 10;
const NUMBER_OF_ARGS: usize = 11;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS);
args.push(&block_info.block_hash);
args.push(&block_info.slot);
@ -165,17 +195,34 @@ impl PostgresSession {
args.push(&heavily_writelocked_accounts);
args.push(&heavily_readlocked_accounts);
let mut query = String::from(
r#"
INSERT INTO banking_stage_results.blocks
(block_hash, slot, leader_identity, successful_transactions, banking_stage_errors, processed_transactions, total_cu_used, total_cu_requested, heavily_writelocked_accounts, heavily_readlocked_accounts)
VALUES
"#,
let supp_infos = serde_json::to_string(&block_info.sup_info).unwrap_or_default();
args.push(&supp_infos);
let statement = r#"
COPY banking_stage_results.blocks(
block_hash, slot, leader_identity, successful_transactions, banking_stage_errors, processed_transactions, total_cu_used, total_cu_requested, heavily_writelocked_accounts, heavily_readlocked_accounts, supp_infos
) FROM STDIN BINARY
"#;
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await.unwrap();
let writer = BinaryCopyInWriter::new(
sink,
&[
Type::TEXT,
Type::INT8,
Type::TEXT,
Type::INT8,
Type::INT8,
Type::INT8,
Type::INT8,
Type::INT8,
Type::TEXT,
Type::TEXT,
Type::TEXT,
],
);
Self::multiline_query(&mut query, NUMBER_OF_ARGS, 1, &[]);
self.client.execute(&query, &args).await?;
pin_mut!(writer);
writer.as_mut().write(&args).await.unwrap();
writer.finish().await.unwrap();
Ok(())
}
}

View File

@ -14,9 +14,7 @@ use solana_sdk::{
slot_history::Slot,
transaction::{TransactionError, VersionedTransaction},
};
use yellowstone_grpc_proto::prelude::{
SubscribeUpdateBankingTransactionResults, SubscribeUpdateTransactionInfo,
};
use yellowstone_grpc_proto::prelude::SubscribeUpdateBankingTransactionResults;
fn convert_transaction_error_into_int(error: &TransactionError) -> u8 {
match error {
@ -223,7 +221,11 @@ impl TransactionInfo {
self.processed_slot = Some(slot);
}
pub fn add_transaction(&mut self, transaction: &SubscribeUpdateTransactionInfo, slot: Slot) {
pub fn add_transaction(
&mut self,
transaction: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateTransactionInfo,
slot: Slot,
) {
let Some(transaction) = &transaction.transaction else {
return;
};