add test for cached fetcher

This commit is contained in:
GroovieGermanikus 2023-11-07 12:12:32 +01:00
parent 06201bf141
commit adbc87557c
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
5 changed files with 190 additions and 62 deletions

View File

@ -1,7 +1,6 @@
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use mango_feeds_connector::account_fetcher::AccountFetcherFeeds;
use mango_feeds_connector::account_fetchers::RpcAccountFetcher;
use mango_feeds_connector::feeds_chain_data_fetcher::FeedsAccountFetcher;
use mango_feeds_connector::{account_fetcher, chain_data};
use solana_client::nonblocking::rpc_client::{RpcClient as RpcClientAsync, RpcClient};
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
@ -9,10 +8,9 @@ use solana_sdk::account::{AccountSharedData, ReadableAccount};
use solana_sdk::clock::Slot;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake;
use mango_feeds_connector::account_fetcher::AccountFetcherFeeds;
use mango_feeds_connector::account_fetchers::RpcAccountFetcher;
use mango_feeds_connector::feeds_chain_data_fetcher::FeedsAccountFetcher;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
#[tokio::main]
async fn main() {
@ -24,7 +22,6 @@ async fn main() {
program_account_fetcher(rpc_url.clone()).await;
account_fetcher(rpc_url.clone()).await;
}
/// get program accounts for Swap program and print the discriminators
@ -43,11 +40,13 @@ async fn program_accounts_rpc(rpc_url: &String) {
let program = Pubkey::from_str("SwaPpA9LAaLfeLi3a68M4DjnLqgtticKg6CnyNwgAC8").unwrap();
let accounts = rpc_client
.get_program_accounts_with_config(&program, program_accounts_config).await.unwrap();
.get_program_accounts_with_config(&program, program_accounts_config)
.await
.unwrap();
println!("num_of_accounts(unfiltered): {}", accounts.len());
for (_pk, acc) in accounts {
let discriminator = &acc.data()[..8];
let mut di = [0;8];
let mut di = [0; 8];
di.clone_from_slice(discriminator);
println!("discriminator {:02X?}", di);
}
@ -56,34 +55,36 @@ async fn program_accounts_rpc(rpc_url: &String) {
async fn account_fetcher(rpc_url: String) {
let client = RpcClientAsync::new(rpc_url);
let account_fetcher = Arc::new(RpcAccountFetcher {
rpc: client,
});
let account_fetcher = Arc::new(RpcAccountFetcher { rpc: client });
let account_key = Pubkey::from_str("2KgowxogBrGqRcgXQEmqFvC3PGtCu66qERNJevYW8Ajh").unwrap();
let (acc, slot) = account_fetcher.feeds_fetch_raw_account(&account_key)
.await.expect("account must exist");
let (acc, slot) = account_fetcher
.feeds_fetch_raw_account(&account_key)
.await
.expect("account must exist");
println!("price: {:?} by slot {}", acc.lamports(), slot);
}
/// 318 accounts on devnet by discriminator
async fn program_account_fetcher(rpc_url: String) {
let client = RpcClientAsync::new(rpc_url);
let account_fetcher = Arc::new(RpcAccountFetcher {
rpc: client,
});
let account_fetcher = Arc::new(RpcAccountFetcher { rpc: client });
let program_key = Pubkey::from_str("SwaPpA9LAaLfeLi3a68M4DjnLqgtticKg6CnyNwgAC8").unwrap();
// discriminator [01, 01, FC, 06, DD, F6, E1, D7]
let discriminator = [0x01, 0x01, 0xFC, 0x06, 0xDD, 0xF6, 0xE1, 0xD7];
let discriminator = [0x01, 0x01, 0xFC, 0x06, 0xDD, 0xF6, 0xE1, 0xD7];
let (acc, slot) = account_fetcher.feeds_fetch_program_accounts(&program_key, discriminator)
.await.expect("program account must exist");
let (acc, slot) = account_fetcher
.feeds_fetch_program_accounts(&program_key, discriminator)
.await
.expect("program account must exist");
println!("program has {} accounts for discriminator {:02X?}", acc.len(), discriminator);
println!(
"program has {} accounts for discriminator {:02X?}",
acc.len(),
discriminator
);
println!("slot was {}", slot);
}

View File

@ -1,19 +1,19 @@
use std::any;
use anyhow::{anyhow, Context};
use solana_sdk::account::AccountSharedData;
use solana_sdk::clock::Slot;
use solana_sdk::pubkey::Pubkey;
use std::any;
#[async_trait::async_trait]
pub trait AccountFetcherFeeds: Sync + Send {
async fn feeds_fetch_raw_account(&self, address: &Pubkey) -> anyhow::Result<(AccountSharedData, Slot)>;
async fn feeds_fetch_raw_account(
&self,
address: &Pubkey,
) -> anyhow::Result<(AccountSharedData, Slot)>;
async fn feeds_fetch_program_accounts(
&self,
program: &Pubkey,
discriminator: [u8; 8],
) -> anyhow::Result<(Vec<(Pubkey, AccountSharedData)>, Slot)>;
}

View File

@ -8,18 +8,20 @@ use anyhow::{anyhow, Context};
use itertools::Itertools;
use log::debug;
use crate::account_fetcher::AccountFetcherFeeds;
use solana_client::nonblocking::rpc_client::RpcClient as RpcClientAsync;
use solana_sdk::account::{AccountSharedData, ReadableAccount};
use solana_sdk::clock::Slot;
use solana_sdk::pubkey::Pubkey;
use crate::account_fetcher::AccountFetcherFeeds;
#[async_trait::async_trait]
impl AccountFetcherFeeds for RpcAccountFetcher {
async fn feeds_fetch_raw_account(&self, address: &Pubkey) -> anyhow::Result<(AccountSharedData, Slot)> {
let response = self.rpc
async fn feeds_fetch_raw_account(
&self,
address: &Pubkey,
) -> anyhow::Result<(AccountSharedData, Slot)> {
let response = self
.rpc
.get_account_with_commitment(address, self.rpc.commitment())
.await
.with_context(|| format!("fetch account {}", *address))?;
@ -61,20 +63,20 @@ impl AccountFetcherFeeds for RpcAccountFetcher {
.get_program_accounts_with_config(program, config)
.await
.with_context(|| format!("fetch program account {}", *program))?;
Ok((accounts
.into_iter()
.map(|(pk, acc)| (pk, acc.into()))
.collect::<Vec<_>>(), slot_workaround))
Ok((
accounts
.into_iter()
.map(|(pk, acc)| (pk, acc.into()))
.collect::<Vec<_>>(),
slot_workaround,
))
}
}
pub struct RpcAccountFetcher {
pub rpc: RpcClientAsync,
}
struct CoalescedAsyncJob<Key, Output> {
jobs: HashMap<Key, Arc<Lazy<Output>>>,
}
@ -111,8 +113,10 @@ struct AccountCache {
keys_for_program_and_discriminator: HashMap<(Pubkey, [u8; 8]), (Vec<Pubkey>, Slot)>,
account_jobs: CoalescedAsyncJob<Pubkey, anyhow::Result<(AccountSharedData, Slot)>>,
program_accounts_jobs:
CoalescedAsyncJob<(Pubkey, [u8; 8]), anyhow::Result<(Vec<(Pubkey, AccountSharedData)>, Slot)>>,
program_accounts_jobs: CoalescedAsyncJob<
(Pubkey, [u8; 8]),
anyhow::Result<(Vec<(Pubkey, AccountSharedData)>, Slot)>,
>,
}
impl AccountCache {
@ -152,7 +156,10 @@ impl<T: AccountFetcherFeeds> CachedAccountFetcher<T> {
#[async_trait::async_trait]
impl<T: AccountFetcherFeeds + 'static> AccountFetcherFeeds for CachedAccountFetcher<T> {
async fn feeds_fetch_raw_account(&self, address: &Pubkey) -> anyhow::Result<(AccountSharedData, Slot)> {
async fn feeds_fetch_raw_account(
&self,
address: &Pubkey,
) -> anyhow::Result<(AccountSharedData, Slot)> {
// returns Result<(AccountSharedData, Slot)>
let fetch_job = {
let mut cache = self.cache.lock().unwrap();
@ -179,7 +186,10 @@ impl<T: AccountFetcherFeeds + 'static> AccountFetcherFeeds for CachedAccountFetc
debug!("inserted data from wrapped fetcher for {}", address_copy);
}
Err(wrapped_fetcher_err) => {
debug!("error in wrapped fetcher for {}: {}", address_copy, wrapped_fetcher_err);
debug!(
"error in wrapped fetcher for {}: {}",
address_copy, wrapped_fetcher_err
);
}
}
result
@ -204,12 +214,16 @@ impl<T: AccountFetcherFeeds + 'static> AccountFetcherFeeds for CachedAccountFetc
let cache_key = (*program, discriminator);
let fetch_job = {
let mut cache = self.cache.lock().unwrap();
if let Some((accounts, slot)) = cache.keys_for_program_and_discriminator.get(&cache_key) {
return Ok((accounts
.iter()
.map(|pk| (*pk, cache.accounts.get(&pk).unwrap().clone()))
.map(|(pk, (acc, _))| (pk, acc))
.collect::<Vec<_>>(), *slot));
if let Some((accounts, slot)) = cache.keys_for_program_and_discriminator.get(&cache_key)
{
return Ok((
accounts
.iter()
.map(|pk| (*pk, cache.accounts.get(&pk).unwrap().clone()))
.map(|(pk, (acc, _))| (pk, acc))
.collect::<Vec<_>>(),
*slot,
));
}
let self_copy = self.clone();
@ -224,9 +238,10 @@ impl<T: AccountFetcherFeeds + 'static> AccountFetcherFeeds for CachedAccountFetc
let mut cache = self_copy.cache.lock().unwrap();
cache.program_accounts_jobs.remove(&cache_key);
if let Ok((accounts, slot)) = result.as_ref() {
cache
.keys_for_program_and_discriminator
.insert(cache_key, (accounts.iter().map(|(pk, _)| *pk).collect(), *slot));
cache.keys_for_program_and_discriminator.insert(
cache_key,
(accounts.iter().map(|(pk, _)| *pk).collect(), *slot),
);
for (pk, acc) in accounts.iter() {
cache.accounts.insert(*pk, (acc.clone(), *slot));
}
@ -246,3 +261,120 @@ impl<T: AccountFetcherFeeds + 'static> AccountFetcherFeeds for CachedAccountFetc
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use solana_sdk::account::AccountSharedData;
use solana_sdk::clock::Slot;
use solana_sdk::pubkey::Pubkey;
use crate::account_fetcher::AccountFetcherFeeds;
use crate::account_fetchers::CachedAccountFetcher;
struct MockExampleFetcher {
fetched_mango_calls: AtomicU32,
scenario: Scenario,
}
#[derive(Clone, Copy)]
enum Scenario {
Happy,
Error,
}
impl MockExampleFetcher {
pub fn new(scenario: Scenario) -> Self {
Self {
fetched_mango_calls: AtomicU32::new(0),
scenario,
}
}
pub fn assert_call_count(&self, expected: u32) {
assert_eq!(self.fetched_mango_calls.load(Ordering::Acquire), expected);
}
fn inc_call(&self) {
self.fetched_mango_calls.fetch_add(1, Ordering::Release);
}
}
#[async_trait::async_trait]
impl AccountFetcherFeeds for MockExampleFetcher {
async fn feeds_fetch_raw_account(
&self, _address: &Pubkey,
) -> anyhow::Result<(AccountSharedData, Slot)> {
self.inc_call();
match self.scenario {
Scenario::Happy => {
let account_owner =
Pubkey::from_str("66fEFnKyCPUWzxKeB9GngcvZDakjvFCVnYLRtcBk9t5D").unwrap();
let acc = AccountSharedData::new(420000, 0, &account_owner);
return Ok((acc, 2409999333));
}
Scenario::Error => {
return Err(anyhow::anyhow!("simulated error in mock fetcher"));
}
}
}
async fn feeds_fetch_program_accounts(
&self,
_program: &Pubkey,
_discriminator: [u8; 8],
) -> anyhow::Result<(Vec<(Pubkey, AccountSharedData)>, Slot)> {
unreachable!("program accounts not mocked")
}
}
#[tokio::test]
async fn test_cache_hit_success() {
let account = Pubkey::from_str("7v8bovqsYfFfEeiXnGLiGTg2VJAn62hSoSCPidKjKL8w").unwrap();
let mut mock = Arc::new(MockExampleFetcher::new(Scenario::Happy));
let cache = CachedAccountFetcher::new(mock.clone());
let _first_call = cache.feeds_fetch_raw_account(&account).await;
mock.assert_call_count(1);
// must load from cache
let _second_call_cached = cache.feeds_fetch_raw_account(&account).await;
mock.assert_call_count(1);
// clear
cache.clear_cache();
let _third_call_cached = cache.feeds_fetch_raw_account(&account).await;;
mock.assert_call_count(2);
}
#[tokio::test]
async fn test_cache_retry_error() {
let account = Pubkey::from_str("7v8bovqsYfFfEeiXnGLiGTg2VJAn62hSoSCPidKjKL8w").unwrap();
let mut mock = Arc::new(MockExampleFetcher::new(Scenario::Error));
let cache = CachedAccountFetcher::new(mock.clone());
let _first_call = cache.feeds_fetch_raw_account(&account).await;
mock.assert_call_count(1);
// must hit wrapped fetcher again on error
let _second_call_retry = cache.feeds_fetch_raw_account(&account).await;
mock.assert_call_count(2);
}
}

View File

@ -1,13 +1,8 @@
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
use crate::chain_data::*;
use fixed::types::I80F48;
use anyhow::Context;
use solana_client::nonblocking::rpc_client::RpcClient as RpcClientAsync;

View File

@ -1,8 +1,8 @@
pub mod account_fetcher;
pub mod account_fetchers;
pub mod account_write_filter;
pub mod chain_data;
pub mod feeds_chain_data_fetcher;
pub mod account_fetchers;
pub mod account_fetcher;
pub mod grpc_plugin_source;
pub mod metrics;
pub mod snapshot;