diff --git a/connector/src/lib.rs b/connector/src/lib.rs index 1a2c94b..fcee495 100644 --- a/connector/src/lib.rs +++ b/connector/src/lib.rs @@ -106,8 +106,8 @@ pub struct SnapshotSourceConfig { #[derive(Clone, Debug, Deserialize)] pub enum EntityFilter { - FilterByProgramId(String), FilterByAccountIds(Vec), + FilterByProgramId(String), } #[derive(Clone, Debug, Deserialize)] diff --git a/connector/src/snapshot.rs b/connector/src/snapshot.rs index 406fdff..7c0f233 100644 --- a/connector/src/snapshot.rs +++ b/connector/src/snapshot.rs @@ -1,4 +1,4 @@ -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use jsonrpc_core_client::transports::http; use log::*; use solana_account_decoder::{UiAccount, UiAccountEncoding}; @@ -9,7 +9,7 @@ use solana_client::{ use solana_rpc::rpc::rpc_accounts::AccountsDataClient; use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot}; -use crate::{AnyhowWrap, FilterConfig}; +use crate::{AnyhowWrap, EntityFilter, FilterConfig}; /// gPA snapshot struct pub struct SnapshotProgramAccounts { @@ -98,37 +98,25 @@ pub async fn get_snapshot( rpc_http_url: String, filter_config: &FilterConfig, ) -> anyhow::Result<(Slot, Vec<(String, Option)>)> { - if !filter_config.account_ids.is_empty() { - let response = - get_snapshot_gma(rpc_http_url.clone(), filter_config.account_ids.clone()).await; - if let Ok(snapshot) = response { - let accounts: Vec<(String, Option)> = filter_config - .account_ids - .iter() - .zip(snapshot.value) - .map(|x| (x.0.clone(), x.1)) - .collect(); - Ok((snapshot.context.slot, accounts)) - } else { - Err(anyhow!("invalid gma response {:?}", response)) + match &filter_config.entity_filter { + EntityFilter::FilterByAccountIds(account_ids) => { + let response = + get_snapshot_gma(rpc_http_url.clone(), account_ids.clone()).await; + let snapshot = response.context("gma snapshot response").map_err_anyhow()?; + Ok((snapshot.snapshot_slot, snapshot.snapshot_accounts)) } - } else if !filter_config.program_ids.is_empty() { - let response = - get_snapshot_gpa(rpc_http_url.clone(), filter_config.program_ids[0].clone()).await; - if let Ok(OptionalContext::Context(snapshot)) = response { - let accounts: Vec<(String, Option)> = snapshot - .value + EntityFilter::FilterByProgramId(program_id) => { + let response = + get_snapshot_gpa(rpc_http_url.clone(), program_id.clone()).await; + let snapshot = response.context("gpa snapshot response").map_err_anyhow()?; + let accounts: Vec<(String, Option)> = snapshot.snapshot_accounts .iter() .map(|x| { let deref = x.clone(); (deref.pubkey, Some(deref.account)) }) .collect(); - Ok((snapshot.context.slot, accounts)) - } else { - Err(anyhow!("invalid gpa response {:?}", response)) + Ok((snapshot.snapshot_slot, accounts)) } - } else { - Err(anyhow!("invalid filter_config")) } } diff --git a/connector/src/websocket_source.rs b/connector/src/websocket_source.rs index 98a49fe..9de55b9 100644 --- a/connector/src/websocket_source.rs +++ b/connector/src/websocket_source.rs @@ -45,7 +45,6 @@ async fn feed_data( debouncer_errorlog: Arc, ) -> anyhow::Result<()> { - let rpc_ws_url: &str = &config.rpc_ws_url; let connect = ws::try_connect::(rpc_ws_url).map_err_anyhow()?;