Compare commits
2 Commits
393e10c440
...
0d41c04de5
Author | SHA1 | Date |
---|---|---|
Riordan Panayides | 0d41c04de5 | |
Maximilian Schneider | 531a2c3d11 |
|
@ -1,5 +1,5 @@
|
|||
use {
|
||||
solana_sdk::account::AccountSharedData, solana_sdk::pubkey::Pubkey, std::collections::HashMap,
|
||||
solana_sdk::account::{AccountSharedData, ReadableAccount}, solana_sdk::pubkey::Pubkey, std::collections::HashMap,
|
||||
};
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
|
@ -35,6 +35,8 @@ pub struct ChainData {
|
|||
accounts: HashMap<Pubkey, Vec<AccountData>>,
|
||||
newest_rooted_slot: u64,
|
||||
newest_processed_slot: u64,
|
||||
account_versions_stored: usize,
|
||||
account_bytes_stored: usize,
|
||||
}
|
||||
|
||||
impl ChainData {
|
||||
|
@ -44,6 +46,8 @@ impl ChainData {
|
|||
accounts: HashMap::new(),
|
||||
newest_rooted_slot: 0,
|
||||
newest_processed_slot: 0,
|
||||
account_versions_stored: 0,
|
||||
account_bytes_stored: 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,6 +99,9 @@ impl ChainData {
|
|||
if new_rooted_head {
|
||||
// for each account, preserve only writes > newest_rooted_slot, or the newest
|
||||
// rooted write
|
||||
self.account_versions_stored = 0;
|
||||
self.account_bytes_stored = 0;
|
||||
|
||||
for (_, writes) in self.accounts.iter_mut() {
|
||||
let newest_rooted_write = writes
|
||||
.iter()
|
||||
|
@ -120,11 +127,16 @@ impl ChainData {
|
|||
.unwrap_or(self.newest_rooted_slot + 1);
|
||||
writes
|
||||
.retain(|w| w.slot == newest_rooted_write || w.slot > self.newest_rooted_slot);
|
||||
self.account_versions_stored += writes.len();
|
||||
self.account_bytes_stored += writes.iter().map(|w| w.account.data().len()).fold(0, |acc, l| acc + l)
|
||||
}
|
||||
|
||||
// now it's fine to drop any slots before the new rooted head
|
||||
// as account writes for non-rooted slots before it have been dropped
|
||||
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
|
||||
|
||||
// TODO: move this to prom
|
||||
println!("[chain_data] account_versions_stored = {} account_bytes_stored = {}", self.account_versions_stored, self.account_bytes_stored);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -132,6 +144,8 @@ impl ChainData {
|
|||
use std::collections::hash_map::Entry;
|
||||
match self.accounts.entry(pubkey) {
|
||||
Entry::Vacant(v) => {
|
||||
self.account_versions_stored += 1;
|
||||
self.account_bytes_stored += account.account.data().len();
|
||||
v.insert(vec![account]);
|
||||
}
|
||||
Entry::Occupied(o) => {
|
||||
|
@ -149,6 +163,8 @@ impl ChainData {
|
|||
v[pos] = account;
|
||||
}
|
||||
} else {
|
||||
self.account_versions_stored += 1;
|
||||
self.account_bytes_stored += account.account.data().len();
|
||||
v.insert(pos, account);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ async fn get_snapshot(
|
|||
account_config: account_info_config.clone(),
|
||||
};
|
||||
|
||||
info!("requesting snapshot");
|
||||
info!("requesting snapshot {}", program_id);
|
||||
let account_snapshot = rpc_client
|
||||
.get_program_accounts(
|
||||
program_id.to_string(),
|
||||
|
@ -77,7 +77,7 @@ async fn get_snapshot(
|
|||
)
|
||||
.await
|
||||
.map_err_anyhow()?;
|
||||
info!("snapshot received");
|
||||
info!("snapshot received {}", program_id);
|
||||
Ok(account_snapshot)
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,6 @@ async fn feed_data_geyser(
|
|||
filter_config: &FilterConfig,
|
||||
sender: async_channel::Sender<Message>,
|
||||
) -> anyhow::Result<()> {
|
||||
let program_id = Pubkey::from_str(&snapshot_config.program_id)?;
|
||||
let connection_string = match &grpc_config.connection_string.chars().next().unwrap() {
|
||||
'$' => env::var(&grpc_config.connection_string[1..])
|
||||
.expect("reading connection string from env"),
|
||||
|
@ -217,7 +216,9 @@ async fn feed_data_geyser(
|
|||
}
|
||||
if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot {
|
||||
snapshot_needed = false;
|
||||
snapshot_future = tokio::spawn(get_snapshot(rpc_http_url.clone(), program_id)).fuse();
|
||||
for program_id in filter_config.program_ids.clone() {
|
||||
snapshot_future = tokio::spawn(get_snapshot(rpc_http_url.clone(), Pubkey::from_str(&program_id).unwrap())).fuse();
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue