geyser source migrated to new filter format

This commit is contained in:
GroovieGermanikus 2023-08-25 12:44:15 +02:00
parent b8b6a58420
commit e8620a832b
3 changed files with 15 additions and 28 deletions

View File

@ -106,8 +106,8 @@ pub struct SnapshotSourceConfig {
#[derive(Clone, Debug, Deserialize)]
pub enum EntityFilter {
FilterByProgramId(String),
FilterByAccountIds(Vec<String>),
FilterByProgramId(String),
}
#[derive(Clone, Debug, Deserialize)]

View File

@ -1,4 +1,4 @@
use anyhow::anyhow;
use anyhow::{anyhow, Context};
use jsonrpc_core_client::transports::http;
use log::*;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
@ -9,7 +9,7 @@ use solana_client::{
use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
use crate::{AnyhowWrap, FilterConfig};
use crate::{AnyhowWrap, EntityFilter, FilterConfig};
/// gPA snapshot struct
pub struct SnapshotProgramAccounts {
@ -98,37 +98,25 @@ pub async fn get_snapshot(
rpc_http_url: String,
filter_config: &FilterConfig,
) -> anyhow::Result<(Slot, Vec<(String, Option<UiAccount>)>)> {
if !filter_config.account_ids.is_empty() {
let response =
get_snapshot_gma(rpc_http_url.clone(), filter_config.account_ids.clone()).await;
if let Ok(snapshot) = response {
let accounts: Vec<(String, Option<UiAccount>)> = filter_config
.account_ids
.iter()
.zip(snapshot.value)
.map(|x| (x.0.clone(), x.1))
.collect();
Ok((snapshot.context.slot, accounts))
} else {
Err(anyhow!("invalid gma response {:?}", response))
match &filter_config.entity_filter {
EntityFilter::FilterByAccountIds(account_ids) => {
let response =
get_snapshot_gma(rpc_http_url.clone(), account_ids.clone()).await;
let snapshot = response.context("gma snapshot response").map_err_anyhow()?;
Ok((snapshot.snapshot_slot, snapshot.snapshot_accounts))
}
} else if !filter_config.program_ids.is_empty() {
let response =
get_snapshot_gpa(rpc_http_url.clone(), filter_config.program_ids[0].clone()).await;
if let Ok(OptionalContext::Context(snapshot)) = response {
let accounts: Vec<(String, Option<UiAccount>)> = snapshot
.value
EntityFilter::FilterByProgramId(program_id) => {
let response =
get_snapshot_gpa(rpc_http_url.clone(), program_id.clone()).await;
let snapshot = response.context("gpa snapshot response").map_err_anyhow()?;
let accounts: Vec<(String, Option<UiAccount>)> = snapshot.snapshot_accounts
.iter()
.map(|x| {
let deref = x.clone();
(deref.pubkey, Some(deref.account))
})
.collect();
Ok((snapshot.context.slot, accounts))
} else {
Err(anyhow!("invalid gpa response {:?}", response))
Ok((snapshot.snapshot_slot, accounts))
}
} else {
Err(anyhow!("invalid filter_config"))
}
}

View File

@ -45,7 +45,6 @@ async fn feed_data(
debouncer_errorlog: Arc<Debouncer>,
) -> anyhow::Result<()> {
let rpc_ws_url: &str = &config.rpc_ws_url;
let connect = ws::try_connect::<RpcSolPubSubClient>(rpc_ws_url).map_err_anyhow()?;