Compare commits

...

2 Commits

Author SHA1 Message Date
Riordan Panayides 0d41c04de5 Snapshot all program ids in fills 2023-01-19 02:44:54 +00:00
Maximilian Schneider 531a2c3d11 collect stats on nr of account writes & bytes store 2023-01-19 11:18:38 +09:00
2 changed files with 22 additions and 5 deletions

View File

@ -1,5 +1,5 @@
use {
solana_sdk::account::AccountSharedData, solana_sdk::pubkey::Pubkey, std::collections::HashMap,
solana_sdk::account::{AccountSharedData, ReadableAccount}, solana_sdk::pubkey::Pubkey, std::collections::HashMap,
};
#[derive(Clone, Copy, Debug, PartialEq)]
@ -35,6 +35,8 @@ pub struct ChainData {
accounts: HashMap<Pubkey, Vec<AccountData>>,
newest_rooted_slot: u64,
newest_processed_slot: u64,
account_versions_stored: usize,
account_bytes_stored: usize,
}
impl ChainData {
@ -44,6 +46,8 @@ impl ChainData {
accounts: HashMap::new(),
newest_rooted_slot: 0,
newest_processed_slot: 0,
account_versions_stored: 0,
account_bytes_stored: 0,
}
}
@ -95,6 +99,9 @@ impl ChainData {
if new_rooted_head {
// for each account, preserve only writes > newest_rooted_slot, or the newest
// rooted write
self.account_versions_stored = 0;
self.account_bytes_stored = 0;
for (_, writes) in self.accounts.iter_mut() {
let newest_rooted_write = writes
.iter()
@ -120,11 +127,16 @@ impl ChainData {
.unwrap_or(self.newest_rooted_slot + 1);
writes
.retain(|w| w.slot == newest_rooted_write || w.slot > self.newest_rooted_slot);
self.account_versions_stored += writes.len();
self.account_bytes_stored += writes.iter().map(|w| w.account.data().len()).fold(0, |acc, l| acc + l)
}
// now it's fine to drop any slots before the new rooted head
// as account writes for non-rooted slots before it have been dropped
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
// TODO: move this to prom
println!("[chain_data] account_versions_stored = {} account_bytes_stored = {}", self.account_versions_stored, self.account_bytes_stored);
}
}
@ -132,6 +144,8 @@ impl ChainData {
use std::collections::hash_map::Entry;
match self.accounts.entry(pubkey) {
Entry::Vacant(v) => {
self.account_versions_stored += 1;
self.account_bytes_stored += account.account.data().len();
v.insert(vec![account]);
}
Entry::Occupied(o) => {
@ -149,6 +163,8 @@ impl ChainData {
v[pos] = account;
}
} else {
self.account_versions_stored += 1;
self.account_bytes_stored += account.account.data().len();
v.insert(pos, account);
}
}

View File

@ -69,7 +69,7 @@ async fn get_snapshot(
account_config: account_info_config.clone(),
};
info!("requesting snapshot");
info!("requesting snapshot {}", program_id);
let account_snapshot = rpc_client
.get_program_accounts(
program_id.to_string(),
@ -77,7 +77,7 @@ async fn get_snapshot(
)
.await
.map_err_anyhow()?;
info!("snapshot received");
info!("snapshot received {}", program_id);
Ok(account_snapshot)
}
@ -88,7 +88,6 @@ async fn feed_data_geyser(
filter_config: &FilterConfig,
sender: async_channel::Sender<Message>,
) -> anyhow::Result<()> {
let program_id = Pubkey::from_str(&snapshot_config.program_id)?;
let connection_string = match &grpc_config.connection_string.chars().next().unwrap() {
'$' => env::var(&grpc_config.connection_string[1..])
.expect("reading connection string from env"),
@ -217,7 +216,9 @@ async fn feed_data_geyser(
}
if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot {
snapshot_needed = false;
snapshot_future = tokio::spawn(get_snapshot(rpc_http_url.clone(), program_id)).fuse();
for program_id in filter_config.program_ids.clone() {
snapshot_future = tokio::spawn(get_snapshot(rpc_http_url.clone(), Pubkey::from_str(&program_id).unwrap())).fuse();
}
}
}
},