Adding support for address lookup tables

This commit is contained in:
godmodegalactus 2023-12-19 13:06:27 +01:00
parent 7a0e8a6084
commit 15611499d3
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
9 changed files with 221 additions and 54 deletions

26
Cargo.lock generated
View File

@ -1432,6 +1432,7 @@ dependencies = [
"rustls 0.20.8",
"serde",
"serde_json",
"solana-address-lookup-table-program 1.16.17 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-rpc-client",
"solana-rpc-client-api",
"solana-sdk",
@ -3198,7 +3199,7 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"solana-address-lookup-table-program",
"solana-address-lookup-table-program 1.16.17 (git+https://github.com/rpcpool/solana-public.git?tag=v1.16.17-geyser-block-v3-mango)",
"solana-config-program",
"solana-sdk",
"spl-token",
@ -3208,6 +3209,27 @@ dependencies = [
"zstd",
]
[[package]]
name = "solana-address-lookup-table-program"
version = "1.16.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ccb31f7f14d5876acd9ec38f5bf6097bfb4b350141d81c7ff2bf684db3ca815"
dependencies = [
"bincode",
"bytemuck",
"log",
"num-derive 0.3.3",
"num-traits",
"rustc_version",
"serde",
"solana-frozen-abi 1.16.17 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-frozen-abi-macro 1.16.17 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-program",
"solana-program-runtime 1.16.17 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-sdk",
"thiserror",
]
[[package]]
name = "solana-address-lookup-table-program"
version = "1.16.17"
@ -3622,7 +3644,7 @@ dependencies = [
"serde_derive",
"serde_json",
"solana-account-decoder",
"solana-address-lookup-table-program",
"solana-address-lookup-table-program 1.16.17 (git+https://github.com/rpcpool/solana-public.git?tag=v1.16.17-geyser-block-v3-mango)",
"solana-sdk",
"spl-associated-token-account",
"spl-memo",

View File

@ -10,6 +10,8 @@ solana-sdk = "~1.16.17"
solana-rpc-client = "~1.16.17"
solana-rpc-client-api = "~1.16.17"
solana-transaction-status = "~1.16.17"
solana-address-lookup-table-program = "~1.16.17"
itertools = "0.10.5"
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"

View File

@ -20,4 +20,4 @@ FROM debian:bullseye-slim as run
RUN apt-get update && apt-get -y install ca-certificates libc6
COPY --from=build /app/target/release/grpc_banking_transactions_notifications /usr/local/bin/
CMD grpc_banking_transactions_notifications --grpc-address-to-fetch-blocks "$GEYSER_GRPC_ADDRESS" --grpc-x-token "$GEYSER_GRPC_X_TOKEN" --banking-grpc-addresses "$LIST_OF_BANKING_STAGE_GRPCS"
CMD grpc_banking_transactions_notifications --rpc-url "$RPC_URL" --grpc-address-to-fetch-blocks "$GEYSER_GRPC_ADDRESS" --grpc-x-token "$GEYSER_GRPC_X_TOKEN" --banking-grpc-addresses "$LIST_OF_BANKING_STAGE_GRPCS"

View File

@ -58,6 +58,8 @@ CREATE TABLE banking_stage_results_2.accounts_map_transaction(
PRIMARY KEY (acc_id, transaction_id)
);
ALTER TABLE banking_stage_results_2.accounts_map_transaction ADD COLUMN is_atl BOOL;
CREATE INDEX accounts_map_transaction_acc_id ON banking_stage_results_2.accounts_map_transaction(acc_id);
CREATE INDEX accounts_map_transaction_transaction_id ON banking_stage_results_2.accounts_map_transaction(transaction_id);

74
src/atl_store.rs Normal file
View File

@ -0,0 +1,74 @@
use dashmap::DashMap;
use itertools::Itertools;
use solana_address_lookup_table_program::state::AddressLookupTable;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{pubkey::Pubkey, slot_hashes::SlotHashes, slot_history::Slot};
use std::sync::Arc;
use crate::block_info::TransactionAccount;
pub struct ATLStore {
rpc_client: Arc<RpcClient>,
pub map: Arc<DashMap<String, Vec<u8>>>,
}
impl ATLStore {
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
Self {
rpc_client,
map: Arc::new(DashMap::new()),
}
}
pub async fn load_atl_from_rpc(&self, atl: &Pubkey) {
if !self.map.contains_key(&atl.to_string()) {
if let Ok(account) = self.rpc_client.get_account(&atl).await {
self.map.insert(atl.to_string(), account.data);
}
}
}
pub async fn get_accounts(
&self,
current_slot: Slot,
atl: Pubkey,
write_accounts: &Vec<u8>,
read_account: &Vec<u8>,
) -> Vec<TransactionAccount> {
self.load_atl_from_rpc(&atl).await;
match self.map.get(&atl.to_string()) {
Some(account) => {
let lookup_table = AddressLookupTable::deserialize(&account.value()).unwrap();
let write_accounts = lookup_table
.lookup(current_slot, write_accounts, &SlotHashes::default())
.unwrap();
let read_account = lookup_table
.lookup(current_slot, read_account, &SlotHashes::default())
.unwrap();
let wa = write_accounts
.iter()
.map(|key| TransactionAccount {
key: key.to_string(),
is_writable: true,
is_signer: false,
is_atl: true,
})
.collect_vec();
let ra = read_account
.iter()
.map(|key| TransactionAccount {
key: key.to_string(),
is_writable: false,
is_signer: false,
is_atl: true,
})
.collect_vec();
[wa, ra].concat()
}
None => {
vec![]
}
}
}
}

View File

@ -12,7 +12,9 @@ use solana_sdk::{
signature::Signature,
slot_history::Slot,
};
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};
use crate::atl_store::ATLStore;
#[derive(Serialize, Debug, Clone)]
pub struct PrioFeeData {
@ -77,10 +79,12 @@ pub struct PrioritizationFeesInfo {
pub p_max: u64,
}
#[derive(Clone)]
pub struct TransactionAccount {
pub key: String,
pub is_writable: bool,
pub is_signer: bool,
pub is_atl: bool,
}
pub struct BlockTransactionInfo {
@ -108,13 +112,14 @@ pub struct BlockInfo {
}
impl BlockInfo {
pub fn process_versioned_message(
pub async fn process_versioned_message(
atl_store: Arc<ATLStore>,
signature: String,
slot: Slot,
message: &VersionedMessage,
prio_fees_in_block: &mut Vec<u64>,
writelocked_accounts: &mut HashMap<Pubkey, AccountData>,
readlocked_accounts: &mut HashMap<Pubkey, AccountData>,
writelocked_accounts: &mut HashMap<String, AccountData>,
readlocked_accounts: &mut HashMap<String, AccountData>,
cu_consumed: u64,
total_cu_requested: &mut u64,
is_vote: bool,
@ -169,19 +174,37 @@ impl BlockInfo {
std::cmp::min(1_400_000, cu_requested.unwrap_or(200000 * nb_ix_except_cb));
*total_cu_requested += cu_requested;
if !is_vote {
let accounts = message
let mut accounts = message
.static_account_keys()
.iter()
.enumerate()
.map(|(index, account)| {
(
message.is_maybe_writable(index),
*account,
message.is_signer(index),
)
.map(|(index, account)| TransactionAccount {
key: account.to_string(),
is_writable: message.is_maybe_writable(index),
is_signer: message.is_signer(index),
is_atl: false,
})
.collect_vec();
for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) {
if let Some(atl_messages) = message.address_table_lookups() {
for atl_message in atl_messages {
let atl_acc = atl_message.account_key;
let mut atl_accs = atl_store
.get_accounts(
slot,
atl_acc,
&atl_message.writable_indexes,
&atl_message.readonly_indexes,
)
.await;
accounts.append(&mut atl_accs);
}
}
for writable_account in accounts
.iter()
.filter(|x| x.is_writable)
.map(|x| x.key.clone())
{
match writelocked_accounts.get_mut(&writable_account) {
Some(x) => {
x.cu_requested += cu_requested;
@ -190,9 +213,9 @@ impl BlockInfo {
}
None => {
writelocked_accounts.insert(
writable_account,
writable_account.clone(),
AccountData {
key: writable_account.to_string(),
key: writable_account,
cu_consumed,
cu_requested,
vec_pf: vec![prioritization_fees],
@ -202,7 +225,11 @@ impl BlockInfo {
}
}
for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) {
for readable_account in accounts
.iter()
.filter(|x| !x.is_writable)
.map(|x| x.key.clone())
{
match readlocked_accounts.get_mut(&readable_account) {
Some(x) => {
x.cu_requested += cu_requested;
@ -211,9 +238,9 @@ impl BlockInfo {
}
None => {
readlocked_accounts.insert(
readable_account,
readable_account.clone(),
AccountData {
key: readable_account.to_string(),
key: readable_account,
cu_consumed,
cu_requested,
vec_pf: vec![prioritization_fees],
@ -222,6 +249,7 @@ impl BlockInfo {
}
}
}
Some(BlockTransactionInfo {
signature,
processed_slot: slot as i64,
@ -230,14 +258,7 @@ impl BlockInfo {
cu_consumed: cu_consumed as i64,
prioritization_fees: prioritization_fees as i64,
supp_infos: String::new(),
accounts: accounts
.iter()
.map(|(is_writable, key, is_signer)| TransactionAccount {
key: key.to_string(),
is_signer: *is_signer,
is_writable: *is_writable,
})
.collect(),
accounts: accounts,
})
} else {
None
@ -245,8 +266,8 @@ impl BlockInfo {
}
pub fn calculate_account_usage(
writelocked_accounts: &HashMap<Pubkey, AccountData>,
readlocked_accounts: &HashMap<Pubkey, AccountData>,
writelocked_accounts: &HashMap<String, AccountData>,
readlocked_accounts: &HashMap<String, AccountData>,
) -> Vec<AccountUsage> {
let mut accounts = writelocked_accounts
.iter()
@ -281,7 +302,8 @@ impl BlockInfo {
}
}
pub fn new(
pub async fn new(
atl_store: Arc<ATLStore>,
block: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateBlock,
) -> BlockInfo {
let block_hash = block.blockhash.clone();
@ -314,8 +336,8 @@ impl BlockInfo {
.unwrap_or(0)
})
.sum::<u64>() as i64;
let mut writelocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut readlocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut writelocked_accounts: HashMap<String, AccountData> = HashMap::new();
let mut readlocked_accounts: HashMap<String, AccountData> = HashMap::new();
let mut total_cu_requested: u64 = 0;
let mut prio_fees_in_block = vec![];
let mut block_transactions = vec![];
@ -383,8 +405,10 @@ impl BlockInfo {
})
.collect(),
});
let atl_store = atl_store.clone();
let transaction = Self::process_versioned_message(
atl_store,
signature,
slot,
&message,
@ -395,7 +419,8 @@ impl BlockInfo {
&mut total_cu_requested,
transaction.is_vote,
meta.err.is_none(),
);
)
.await;
if let Some(transaction) = transaction {
block_transactions.push(transaction);
}

View File

@ -3,6 +3,9 @@ use clap::Parser;
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long)]
pub rpc_url: String,
#[arg(short, long)]
pub grpc_address_to_fetch_blocks: Option<String>,

View File

@ -1,5 +1,7 @@
use clap::Parser;
use itertools::Itertools;
use solana_rpc_client::nonblocking::rpc_client::{self, RpcClient};
use solana_sdk::pubkey::Pubkey;
use std::{
collections::{BTreeMap, HashMap},
sync::{atomic::AtomicU64, Arc},
@ -11,10 +13,11 @@ use block_info::BlockInfo;
use cli::Args;
use dashmap::DashMap;
use futures::StreamExt;
use log::{debug, error};
use log::{debug, error, info};
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
use transaction_info::TransactionInfo;
mod atl_store;
mod block_info;
mod cli;
mod postgres;
@ -43,7 +46,7 @@ pub async fn start_tracking_banking_stage_errors(
map_of_infos: Arc<DashMap<String, BTreeMap<u64, TransactionInfo>>>,
slot_by_errors: Arc<DashMap<u64, u64>>,
slot: Arc<AtomicU64>,
subscribe_to_slots: bool,
_subscribe_to_slots: bool,
) {
loop {
let token: Option<String> = None;
@ -57,16 +60,14 @@ pub async fn start_tracking_banking_stage_errors(
let slot_subscription: HashMap<
String,
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots,
> = if subscribe_to_slots {
> = {
log::info!("subscribing to slots on grpc banking errors");
let mut slot_sub = HashMap::new();
slot_sub.insert(
"slot_sub".to_string(),
"slot_sub_for_banking_tx".to_string(),
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots {},
);
slot_sub
} else {
HashMap::new()
};
let mut geyser_stream = client
@ -77,14 +78,16 @@ pub async fn start_tracking_banking_stage_errors(
Default::default(),
Default::default(),
Default::default(),
Some(yellowstone_grpc_proto::prelude::CommitmentLevel::Processed),
Some(yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed),
Default::default(),
true,
)
.await
.unwrap();
log::info!("started geyser banking stage subscription");
while let Some(message) = geyser_stream.next().await {
while let Ok(Some(message)) =
tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await
{
let Ok(message) = message else {
continue;
};
@ -140,11 +143,11 @@ pub async fn start_tracking_banking_stage_errors(
}
}
error!("geyser banking stage connection failed {}", grpc_address);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn start_tracking_blocks(
rpc_client: Arc<RpcClient>,
grpc_block_addr: String,
grpc_x_token: Option<String>,
postgres: postgres::Postgres,
@ -156,6 +159,7 @@ async fn start_tracking_blocks(
None,
)
.unwrap();
let atl_store = Arc::new(atl_store::ATLStore::new(rpc_client));
loop {
let mut blocks_subs = HashMap::new();
@ -169,6 +173,16 @@ async fn start_tracking_blocks(
},
);
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"sidecar_atl_accounts_subscription".to_string(),
yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterAccounts {
account: vec![],
filters: vec![],
owner: vec![solana_address_lookup_table_program::id().to_string()],
},
);
let mut slot_sub = HashMap::new();
slot_sub.insert(
"slot_sub".to_string(),
@ -212,8 +226,9 @@ async fn start_tracking_blocks(
BANKING_STAGE_BLOCKS_TASK.inc();
let postgres = postgres.clone();
let slot = slot.clone();
let atl_store = atl_store.clone();
tokio::spawn(async move {
let block_info = BlockInfo::new(&block);
let block_info = BlockInfo::new(atl_store, &block).await;
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
@ -224,7 +239,15 @@ async fn start_tracking_blocks(
BANKING_STAGE_BLOCKS_TASK.dec();
});
// delay queue so that we get all the banking stage errors before processing block
}
},
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Account(account_update) => {
info!("ATL updated");
if let Some(account) = account_update.account {
let bytes: [u8; 32] = account.pubkey.try_into().unwrap_or(Pubkey::default().to_bytes());
let pubkey = Pubkey::new_from_array(bytes);
atl_store.map.insert( pubkey.to_string(), account.data);
}
},
_ => {}
};
}
@ -238,6 +261,7 @@ async fn main() {
tracing_subscriber::fmt::init();
let args = Args::parse();
let rpc_client = Arc::new(rpc_client::RpcClient::new(args.rpc_url));
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
@ -270,7 +294,14 @@ async fn main() {
})
.collect_vec();
if let Some(gprc_block_addr) = grpc_block_addr {
start_tracking_blocks(gprc_block_addr, args.grpc_x_token, postgres, slot).await;
start_tracking_blocks(
rpc_client,
gprc_block_addr,
args.grpc_x_token,
postgres,
slot,
)
.await;
}
futures::future::join_all(jhs).await;
}

View File

@ -310,7 +310,8 @@ impl PostgresSession {
account_key char(44),
signature char(88),
is_writable BOOL,
is_signer BOOL
is_signer BOOL,
is_atl BOOL
);",
temp_table
)
@ -322,15 +323,17 @@ impl PostgresSession {
let statement = format!(
r#"
COPY {}(
account_key, signature, is_writable, is_signer
account_key, signature, is_writable, is_signer, is_atl
) FROM STDIN BINARY
"#,
temp_table
);
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer =
BinaryCopyInWriter::new(sink, &[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL]);
let writer = BinaryCopyInWriter::new(
sink,
&[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL, Type::BOOL],
);
pin_mut!(writer);
for acc_tx in accounts_for_transaction {
for acc in &acc_tx.accounts {
@ -339,6 +342,7 @@ impl PostgresSession {
args.push(&acc_tx.signature);
args.push(&acc.writable);
args.push(&acc.is_signer);
args.push(&acc.is_atl);
writer.as_mut().write(&args).await?;
}
}
@ -346,16 +350,17 @@ impl PostgresSession {
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer)
INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl)
SELECT
( select acc_id from banking_stage_results_2.accounts where account_key = t.account_key ),
( select transaction_id from banking_stage_results_2.transactions where signature = t.signature ),
t.is_writable,
t.is_signer
t.is_signer,
t.is_atl
FROM (
SELECT account_key, signature, is_writable, is_signer from {}
SELECT account_key, signature, is_writable, is_signer, is_atl from {}
)
as t (account_key, signature, is_writable, is_signer)
as t (account_key, signature, is_writable, is_signer, is_atl)
ON CONFLICT DO NOTHING;
"#,
temp_table
@ -628,6 +633,7 @@ impl PostgresSession {
key: key.clone(),
writable: *is_writable,
is_signer: false,
is_atl: false,
})
.collect(),
})
@ -674,6 +680,7 @@ impl PostgresSession {
key: acc.key.clone(),
writable: acc.is_writable,
is_signer: acc.is_signer,
is_atl: acc.is_atl,
})
.collect(),
})
@ -760,6 +767,7 @@ pub struct AccountUsed {
key: String,
writable: bool,
is_signer: bool,
is_atl: bool,
}
pub struct AccountsForTransaction {