Preloading ALTs from a predetermined list

This commit is contained in:
godmodegalactus 2023-12-26 14:40:33 +01:00
parent ff0d1ebc8c
commit 739c8d2a67
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
6 changed files with 5226 additions and 29 deletions

View File

@ -21,4 +21,4 @@ RUN apt-get update && apt-get -y install ca-certificates libc6
COPY --from=build /app/target/release/grpc_banking_transactions_notifications /usr/local/bin/
COPY --from=build /app/target/release/cleanupdb /usr/local/bin/
CMD grpc_banking_transactions_notifications --rpc-url "$RPC_URL" --grpc-address-to-fetch-blocks "$GEYSER_GRPC_ADDRESS" --grpc-x-token "$GEYSER_GRPC_X_TOKEN" --banking-grpc-addresses "$LIST_OF_BANKING_STAGE_GRPCS"
CMD grpc_banking_transactions_notifications --rpc-url "$RPC_URL" --grpc-address-to-fetch-blocks "$GEYSER_GRPC_ADDRESS" --grpc-x-token "$GEYSER_GRPC_X_TOKEN" --banking-grpc-addresses "$LIST_OF_BANKING_STAGE_GRPCS" -a alts.txt

5156
alts.txt Normal file

File diff suppressed because it is too large Load Diff

View File

@ -4,7 +4,7 @@ use prometheus::{opts, register_int_gauge, IntGauge};
use solana_address_lookup_table_program::state::AddressLookupTable;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{account::ReadableAccount, commitment_config::CommitmentConfig, pubkey::Pubkey};
use std::{sync::Arc, time::Duration};
use std::{str::FromStr, sync::Arc, time::Duration};
use crate::block_info::TransactionAccount;
lazy_static::lazy_static! {
@ -12,6 +12,7 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("banking_stage_sidecar_alts_stored", "Alts stored in sidecar")).unwrap();
}
#[derive(Clone)]
pub struct ALTStore {
rpc_client: Arc<RpcClient>,
pub map: Arc<DashMap<Pubkey, Vec<Pubkey>>>,
@ -25,16 +26,38 @@ impl ALTStore {
}
}
pub async fn load_all_alts(&self) {
let get_pa = self
.rpc_client
.get_program_accounts(&solana_address_lookup_table_program::id())
.await;
if let Ok(pas) = get_pa {
for (key, acc) in pas {
self.save_account(&key, acc.data());
}
pub async fn load_all_alts(&self, alts_list: Vec<String>) {
let alts_list = alts_list
.iter()
.map(|x| x.trim())
.filter(|x| x.len() > 0)
.map(|x| Pubkey::from_str(&x).unwrap())
.collect_vec();
log::info!("Preloading {} ALTs", alts_list.len());
for batches in alts_list.chunks(1000).map(|x| x.to_vec()) {
let tasks = batches.chunks(10).map(|batch| {
let batch = batch.to_vec();
let rpc_client = self.rpc_client.clone();
let this = self.clone();
tokio::spawn(async move {
if let Ok(multiple_accounts) = rpc_client
.get_multiple_accounts_with_commitment(
&batch,
CommitmentConfig::processed(),
)
.await
{
for (index, acc) in multiple_accounts.value.iter().enumerate() {
if let Some(acc) = acc {
this.save_account(&batch[index], &acc.data);
}
}
}
})
});
futures::future::join_all(tasks).await;
}
log::info!("Finished Loading {} ALTs", alts_list.len());
}
pub fn save_account(&self, address: &Pubkey, data: &[u8]) {

View File

@ -185,17 +185,17 @@ impl BlockInfo {
})
.collect_vec();
if let Some(atl_messages) = message.address_table_lookups() {
// for atl_message in atl_messages {
// let atl_acc = atl_message.account_key;
// let mut atl_accs = atl_store
// .get_accounts(
// &atl_acc,
// &atl_message.writable_indexes,
// &atl_message.readonly_indexes,
// )
// .await;
// accounts.append(&mut atl_accs);
// }
for atl_message in atl_messages {
let atl_acc = atl_message.account_key;
let mut atl_accs = atl_store
.get_accounts(
&atl_acc,
&atl_message.writable_indexes,
&atl_message.readonly_indexes,
)
.await;
accounts.append(&mut atl_accs);
}
}
for writable_account in accounts

View File

@ -18,4 +18,7 @@ pub struct Args {
/// enable metrics to prometheus at addr
#[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))]
pub prometheus_addr: String,
#[arg(short = 'a', long, default_value_t = String::from("alts.txt"))]
pub alts: String,
}

View File

@ -7,6 +7,7 @@ use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use tokio::io::AsyncReadExt;
use crate::prometheus_sync::PrometheusSync;
use block_info::BlockInfo;
@ -88,7 +89,7 @@ pub async fn start_tracking_banking_stage_errors(
tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await
{
let Ok(message) = message else {
continue;
continue;
};
let Some(update) = message.update_oneof else {
@ -135,6 +136,7 @@ async fn start_tracking_blocks(
grpc_x_token: Option<String>,
postgres: postgres::Postgres,
slot: Arc<AtomicU64>,
alts_list: Vec<String>,
) {
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect(
grpc_block_addr,
@ -143,7 +145,7 @@ async fn start_tracking_blocks(
)
.unwrap();
let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client));
atl_store.load_all_alts().await;
atl_store.load_all_alts(alts_list).await;
loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
@ -192,12 +194,12 @@ async fn start_tracking_blocks(
tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await
{
let Ok(message) = message else {
continue;
};
continue;
};
let Some(update) = message.update_oneof else {
continue;
};
continue;
};
match update {
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block(
@ -242,7 +244,7 @@ async fn start_tracking_blocks(
}
#[tokio::main()]
async fn main() {
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
@ -256,6 +258,17 @@ async fn main() {
let postgres = postgres::Postgres::new().await;
let slot = Arc::new(AtomicU64::new(0));
let no_block_subscription = grpc_block_addr.is_none();
let alts = args.alts;
//load alts from the file
let mut alts_string = String::new();
let mut alts_file = tokio::fs::File::open(alts).await?;
alts_file.read_to_string(&mut alts_string).await?;
let alts_list = alts_string
.split("\r\n")
.map(|x| x.to_string())
.collect_vec();
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let jhs = args
.banking_grpc_addresses
@ -282,8 +295,10 @@ async fn main() {
args.grpc_x_token,
postgres,
slot,
alts_list,
)
.await;
}
futures::future::join_all(jhs).await;
Ok(())
}