From 15611499d34912d22ffb66e5b0a62fbccd176d72 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Tue, 19 Dec 2023 13:06:27 +0100 Subject: [PATCH] Adding support for address lookup tables --- Cargo.lock | 26 ++++++++++++-- Cargo.toml | 2 ++ Dockerfile | 2 +- migration.sql | 2 ++ src/atl_store.rs | 74 ++++++++++++++++++++++++++++++++++++++++ src/block_info.rs | 87 ++++++++++++++++++++++++++++++----------------- src/cli.rs | 3 ++ src/main.rs | 55 +++++++++++++++++++++++------- src/postgres.rs | 24 ++++++++----- 9 files changed, 221 insertions(+), 54 deletions(-) create mode 100644 src/atl_store.rs diff --git a/Cargo.lock b/Cargo.lock index b2a0c89..b12fdd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1432,6 +1432,7 @@ dependencies = [ "rustls 0.20.8", "serde", "serde_json", + "solana-address-lookup-table-program 1.16.17 (registry+https://github.com/rust-lang/crates.io-index)", "solana-rpc-client", "solana-rpc-client-api", "solana-sdk", @@ -3198,7 +3199,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "solana-address-lookup-table-program", + "solana-address-lookup-table-program 1.16.17 (git+https://github.com/rpcpool/solana-public.git?tag=v1.16.17-geyser-block-v3-mango)", "solana-config-program", "solana-sdk", "spl-token", @@ -3208,6 +3209,27 @@ dependencies = [ "zstd", ] +[[package]] +name = "solana-address-lookup-table-program" +version = "1.16.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ccb31f7f14d5876acd9ec38f5bf6097bfb4b350141d81c7ff2bf684db3ca815" +dependencies = [ + "bincode", + "bytemuck", + "log", + "num-derive 0.3.3", + "num-traits", + "rustc_version", + "serde", + "solana-frozen-abi 1.16.17 (registry+https://github.com/rust-lang/crates.io-index)", + "solana-frozen-abi-macro 1.16.17 (registry+https://github.com/rust-lang/crates.io-index)", + "solana-program", + "solana-program-runtime 1.16.17 (registry+https://github.com/rust-lang/crates.io-index)", + "solana-sdk", + "thiserror", +] + [[package]] name = "solana-address-lookup-table-program" version = "1.16.17" @@ -3622,7 +3644,7 @@ dependencies = [ "serde_derive", "serde_json", "solana-account-decoder", - "solana-address-lookup-table-program", + "solana-address-lookup-table-program 1.16.17 (git+https://github.com/rpcpool/solana-public.git?tag=v1.16.17-geyser-block-v3-mango)", "solana-sdk", "spl-associated-token-account", "spl-memo", diff --git a/Cargo.toml b/Cargo.toml index f84ddf3..b1b231d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,8 @@ solana-sdk = "~1.16.17" solana-rpc-client = "~1.16.17" solana-rpc-client-api = "~1.16.17" solana-transaction-status = "~1.16.17" +solana-address-lookup-table-program = "~1.16.17" + itertools = "0.10.5" serde = { version = "1.0.160", features = ["derive"] } serde_json = "1.0.96" diff --git a/Dockerfile b/Dockerfile index c5aa353..b53f3b9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,4 +20,4 @@ FROM debian:bullseye-slim as run RUN apt-get update && apt-get -y install ca-certificates libc6 COPY --from=build /app/target/release/grpc_banking_transactions_notifications /usr/local/bin/ -CMD grpc_banking_transactions_notifications --grpc-address-to-fetch-blocks "$GEYSER_GRPC_ADDRESS" --grpc-x-token "$GEYSER_GRPC_X_TOKEN" --banking-grpc-addresses "$LIST_OF_BANKING_STAGE_GRPCS" +CMD grpc_banking_transactions_notifications --rpc-url "$RPC_URL" --grpc-address-to-fetch-blocks "$GEYSER_GRPC_ADDRESS" --grpc-x-token "$GEYSER_GRPC_X_TOKEN" --banking-grpc-addresses "$LIST_OF_BANKING_STAGE_GRPCS" diff --git a/migration.sql b/migration.sql index e7f7214..9baeb46 100644 --- a/migration.sql +++ b/migration.sql @@ -58,6 +58,8 @@ CREATE TABLE banking_stage_results_2.accounts_map_transaction( PRIMARY KEY (acc_id, transaction_id) ); +ALTER TABLE banking_stage_results_2.accounts_map_transaction ADD COLUMN is_atl BOOL; + CREATE INDEX accounts_map_transaction_acc_id ON banking_stage_results_2.accounts_map_transaction(acc_id); CREATE INDEX accounts_map_transaction_transaction_id ON banking_stage_results_2.accounts_map_transaction(transaction_id); diff --git a/src/atl_store.rs b/src/atl_store.rs new file mode 100644 index 0000000..635def6 --- /dev/null +++ b/src/atl_store.rs @@ -0,0 +1,74 @@ +use dashmap::DashMap; +use itertools::Itertools; +use solana_address_lookup_table_program::state::AddressLookupTable; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::{pubkey::Pubkey, slot_hashes::SlotHashes, slot_history::Slot}; +use std::sync::Arc; + +use crate::block_info::TransactionAccount; + +pub struct ATLStore { + rpc_client: Arc, + pub map: Arc>>, +} + +impl ATLStore { + pub fn new(rpc_client: Arc) -> Self { + Self { + rpc_client, + map: Arc::new(DashMap::new()), + } + } + + pub async fn load_atl_from_rpc(&self, atl: &Pubkey) { + if !self.map.contains_key(&atl.to_string()) { + if let Ok(account) = self.rpc_client.get_account(&atl).await { + self.map.insert(atl.to_string(), account.data); + } + } + } + + pub async fn get_accounts( + &self, + current_slot: Slot, + atl: Pubkey, + write_accounts: &Vec, + read_account: &Vec, + ) -> Vec { + self.load_atl_from_rpc(&atl).await; + match self.map.get(&atl.to_string()) { + Some(account) => { + let lookup_table = AddressLookupTable::deserialize(&account.value()).unwrap(); + let write_accounts = lookup_table + .lookup(current_slot, write_accounts, &SlotHashes::default()) + .unwrap(); + let read_account = lookup_table + .lookup(current_slot, read_account, &SlotHashes::default()) + .unwrap(); + + let wa = write_accounts + .iter() + .map(|key| TransactionAccount { + key: key.to_string(), + is_writable: true, + is_signer: false, + is_atl: true, + }) + .collect_vec(); + let ra = read_account + .iter() + .map(|key| TransactionAccount { + key: key.to_string(), + is_writable: false, + is_signer: false, + is_atl: true, + }) + .collect_vec(); + [wa, ra].concat() + } + None => { + vec![] + } + } + } +} diff --git a/src/block_info.rs b/src/block_info.rs index 8b59263..3599340 100644 --- a/src/block_info.rs +++ b/src/block_info.rs @@ -12,7 +12,9 @@ use solana_sdk::{ signature::Signature, slot_history::Slot, }; -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; + +use crate::atl_store::ATLStore; #[derive(Serialize, Debug, Clone)] pub struct PrioFeeData { @@ -77,10 +79,12 @@ pub struct PrioritizationFeesInfo { pub p_max: u64, } +#[derive(Clone)] pub struct TransactionAccount { pub key: String, pub is_writable: bool, pub is_signer: bool, + pub is_atl: bool, } pub struct BlockTransactionInfo { @@ -108,13 +112,14 @@ pub struct BlockInfo { } impl BlockInfo { - pub fn process_versioned_message( + pub async fn process_versioned_message( + atl_store: Arc, signature: String, slot: Slot, message: &VersionedMessage, prio_fees_in_block: &mut Vec, - writelocked_accounts: &mut HashMap, - readlocked_accounts: &mut HashMap, + writelocked_accounts: &mut HashMap, + readlocked_accounts: &mut HashMap, cu_consumed: u64, total_cu_requested: &mut u64, is_vote: bool, @@ -169,19 +174,37 @@ impl BlockInfo { std::cmp::min(1_400_000, cu_requested.unwrap_or(200000 * nb_ix_except_cb)); *total_cu_requested += cu_requested; if !is_vote { - let accounts = message + let mut accounts = message .static_account_keys() .iter() .enumerate() - .map(|(index, account)| { - ( - message.is_maybe_writable(index), - *account, - message.is_signer(index), - ) + .map(|(index, account)| TransactionAccount { + key: account.to_string(), + is_writable: message.is_maybe_writable(index), + is_signer: message.is_signer(index), + is_atl: false, }) .collect_vec(); - for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) { + if let Some(atl_messages) = message.address_table_lookups() { + for atl_message in atl_messages { + let atl_acc = atl_message.account_key; + let mut atl_accs = atl_store + .get_accounts( + slot, + atl_acc, + &atl_message.writable_indexes, + &atl_message.readonly_indexes, + ) + .await; + accounts.append(&mut atl_accs); + } + } + + for writable_account in accounts + .iter() + .filter(|x| x.is_writable) + .map(|x| x.key.clone()) + { match writelocked_accounts.get_mut(&writable_account) { Some(x) => { x.cu_requested += cu_requested; @@ -190,9 +213,9 @@ impl BlockInfo { } None => { writelocked_accounts.insert( - writable_account, + writable_account.clone(), AccountData { - key: writable_account.to_string(), + key: writable_account, cu_consumed, cu_requested, vec_pf: vec![prioritization_fees], @@ -202,7 +225,11 @@ impl BlockInfo { } } - for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) { + for readable_account in accounts + .iter() + .filter(|x| !x.is_writable) + .map(|x| x.key.clone()) + { match readlocked_accounts.get_mut(&readable_account) { Some(x) => { x.cu_requested += cu_requested; @@ -211,9 +238,9 @@ impl BlockInfo { } None => { readlocked_accounts.insert( - readable_account, + readable_account.clone(), AccountData { - key: readable_account.to_string(), + key: readable_account, cu_consumed, cu_requested, vec_pf: vec![prioritization_fees], @@ -222,6 +249,7 @@ impl BlockInfo { } } } + Some(BlockTransactionInfo { signature, processed_slot: slot as i64, @@ -230,14 +258,7 @@ impl BlockInfo { cu_consumed: cu_consumed as i64, prioritization_fees: prioritization_fees as i64, supp_infos: String::new(), - accounts: accounts - .iter() - .map(|(is_writable, key, is_signer)| TransactionAccount { - key: key.to_string(), - is_signer: *is_signer, - is_writable: *is_writable, - }) - .collect(), + accounts: accounts, }) } else { None @@ -245,8 +266,8 @@ impl BlockInfo { } pub fn calculate_account_usage( - writelocked_accounts: &HashMap, - readlocked_accounts: &HashMap, + writelocked_accounts: &HashMap, + readlocked_accounts: &HashMap, ) -> Vec { let mut accounts = writelocked_accounts .iter() @@ -281,7 +302,8 @@ impl BlockInfo { } } - pub fn new( + pub async fn new( + atl_store: Arc, block: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateBlock, ) -> BlockInfo { let block_hash = block.blockhash.clone(); @@ -314,8 +336,8 @@ impl BlockInfo { .unwrap_or(0) }) .sum::() as i64; - let mut writelocked_accounts: HashMap = HashMap::new(); - let mut readlocked_accounts: HashMap = HashMap::new(); + let mut writelocked_accounts: HashMap = HashMap::new(); + let mut readlocked_accounts: HashMap = HashMap::new(); let mut total_cu_requested: u64 = 0; let mut prio_fees_in_block = vec![]; let mut block_transactions = vec![]; @@ -383,8 +405,10 @@ impl BlockInfo { }) .collect(), }); + let atl_store = atl_store.clone(); let transaction = Self::process_versioned_message( + atl_store, signature, slot, &message, @@ -395,7 +419,8 @@ impl BlockInfo { &mut total_cu_requested, transaction.is_vote, meta.err.is_none(), - ); + ) + .await; if let Some(transaction) = transaction { block_transactions.push(transaction); } diff --git a/src/cli.rs b/src/cli.rs index f168dc7..89a4b52 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -3,6 +3,9 @@ use clap::Parser; #[derive(Parser, Debug, Clone)] #[command(author, version, about, long_about = None)] pub struct Args { + #[arg(short, long)] + pub rpc_url: String, + #[arg(short, long)] pub grpc_address_to_fetch_blocks: Option, diff --git a/src/main.rs b/src/main.rs index 1116af4..090be80 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ use clap::Parser; use itertools::Itertools; +use solana_rpc_client::nonblocking::rpc_client::{self, RpcClient}; +use solana_sdk::pubkey::Pubkey; use std::{ collections::{BTreeMap, HashMap}, sync::{atomic::AtomicU64, Arc}, @@ -11,10 +13,11 @@ use block_info::BlockInfo; use cli::Args; use dashmap::DashMap; use futures::StreamExt; -use log::{debug, error}; +use log::{debug, error, info}; use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge}; use transaction_info::TransactionInfo; +mod atl_store; mod block_info; mod cli; mod postgres; @@ -43,7 +46,7 @@ pub async fn start_tracking_banking_stage_errors( map_of_infos: Arc>>, slot_by_errors: Arc>, slot: Arc, - subscribe_to_slots: bool, + _subscribe_to_slots: bool, ) { loop { let token: Option = None; @@ -57,16 +60,14 @@ pub async fn start_tracking_banking_stage_errors( let slot_subscription: HashMap< String, yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots, - > = if subscribe_to_slots { + > = { log::info!("subscribing to slots on grpc banking errors"); let mut slot_sub = HashMap::new(); slot_sub.insert( - "slot_sub".to_string(), + "slot_sub_for_banking_tx".to_string(), yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots {}, ); slot_sub - } else { - HashMap::new() }; let mut geyser_stream = client @@ -77,14 +78,16 @@ pub async fn start_tracking_banking_stage_errors( Default::default(), Default::default(), Default::default(), - Some(yellowstone_grpc_proto::prelude::CommitmentLevel::Processed), + Some(yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed), Default::default(), true, ) .await .unwrap(); log::info!("started geyser banking stage subscription"); - while let Some(message) = geyser_stream.next().await { + while let Ok(Some(message)) = + tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await + { let Ok(message) = message else { continue; }; @@ -140,11 +143,11 @@ pub async fn start_tracking_banking_stage_errors( } } error!("geyser banking stage connection failed {}", grpc_address); - tokio::time::sleep(Duration::from_secs(1)).await; } } async fn start_tracking_blocks( + rpc_client: Arc, grpc_block_addr: String, grpc_x_token: Option, postgres: postgres::Postgres, @@ -156,6 +159,7 @@ async fn start_tracking_blocks( None, ) .unwrap(); + let atl_store = Arc::new(atl_store::ATLStore::new(rpc_client)); loop { let mut blocks_subs = HashMap::new(); @@ -169,6 +173,16 @@ async fn start_tracking_blocks( }, ); + let mut accounts_subs = HashMap::new(); + accounts_subs.insert( + "sidecar_atl_accounts_subscription".to_string(), + yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterAccounts { + account: vec![], + filters: vec![], + owner: vec![solana_address_lookup_table_program::id().to_string()], + }, + ); + let mut slot_sub = HashMap::new(); slot_sub.insert( "slot_sub".to_string(), @@ -212,8 +226,9 @@ async fn start_tracking_blocks( BANKING_STAGE_BLOCKS_TASK.inc(); let postgres = postgres.clone(); let slot = slot.clone(); + let atl_store = atl_store.clone(); tokio::spawn(async move { - let block_info = BlockInfo::new(&block); + let block_info = BlockInfo::new(atl_store, &block).await; TXERROR_COUNT.add( block_info.processed_transactions - block_info.successful_transactions, ); @@ -224,7 +239,15 @@ async fn start_tracking_blocks( BANKING_STAGE_BLOCKS_TASK.dec(); }); // delay queue so that we get all the banking stage errors before processing block - } + }, + yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Account(account_update) => { + info!("ATL updated"); + if let Some(account) = account_update.account { + let bytes: [u8; 32] = account.pubkey.try_into().unwrap_or(Pubkey::default().to_bytes()); + let pubkey = Pubkey::new_from_array(bytes); + atl_store.map.insert( pubkey.to_string(), account.data); + } + }, _ => {} }; } @@ -238,6 +261,7 @@ async fn main() { tracing_subscriber::fmt::init(); let args = Args::parse(); + let rpc_client = Arc::new(rpc_client::RpcClient::new(args.rpc_url)); let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone()); @@ -270,7 +294,14 @@ async fn main() { }) .collect_vec(); if let Some(gprc_block_addr) = grpc_block_addr { - start_tracking_blocks(gprc_block_addr, args.grpc_x_token, postgres, slot).await; + start_tracking_blocks( + rpc_client, + gprc_block_addr, + args.grpc_x_token, + postgres, + slot, + ) + .await; } futures::future::join_all(jhs).await; } diff --git a/src/postgres.rs b/src/postgres.rs index 08d85f7..889da63 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -310,7 +310,8 @@ impl PostgresSession { account_key char(44), signature char(88), is_writable BOOL, - is_signer BOOL + is_signer BOOL, + is_atl BOOL );", temp_table ) @@ -322,15 +323,17 @@ impl PostgresSession { let statement = format!( r#" COPY {}( - account_key, signature, is_writable, is_signer + account_key, signature, is_writable, is_signer, is_atl ) FROM STDIN BINARY "#, temp_table ); let sink: CopyInSink = self.copy_in(statement.as_str()).await?; - let writer = - BinaryCopyInWriter::new(sink, &[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL]); + let writer = BinaryCopyInWriter::new( + sink, + &[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL, Type::BOOL], + ); pin_mut!(writer); for acc_tx in accounts_for_transaction { for acc in &acc_tx.accounts { @@ -339,6 +342,7 @@ impl PostgresSession { args.push(&acc_tx.signature); args.push(&acc.writable); args.push(&acc.is_signer); + args.push(&acc.is_atl); writer.as_mut().write(&args).await?; } } @@ -346,16 +350,17 @@ impl PostgresSession { let statement = format!( r#" - INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer) + INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl) SELECT ( select acc_id from banking_stage_results_2.accounts where account_key = t.account_key ), ( select transaction_id from banking_stage_results_2.transactions where signature = t.signature ), t.is_writable, - t.is_signer + t.is_signer, + t.is_atl FROM ( - SELECT account_key, signature, is_writable, is_signer from {} + SELECT account_key, signature, is_writable, is_signer, is_atl from {} ) - as t (account_key, signature, is_writable, is_signer) + as t (account_key, signature, is_writable, is_signer, is_atl) ON CONFLICT DO NOTHING; "#, temp_table @@ -628,6 +633,7 @@ impl PostgresSession { key: key.clone(), writable: *is_writable, is_signer: false, + is_atl: false, }) .collect(), }) @@ -674,6 +680,7 @@ impl PostgresSession { key: acc.key.clone(), writable: acc.is_writable, is_signer: acc.is_signer, + is_atl: acc.is_atl, }) .collect(), }) @@ -760,6 +767,7 @@ pub struct AccountUsed { key: String, writable: bool, is_signer: bool, + is_atl: bool, } pub struct AccountsForTransaction {