refactor websocket to use new filter struct

This commit is contained in:
GroovieGermanikus 2023-08-25 13:06:44 +02:00
parent e8620a832b
commit 8e25a50a28
3 changed files with 199 additions and 107 deletions

View File

@ -1,5 +1,5 @@
use mango_feeds_connector::{AccountWrite, FilterConfig, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, websocket_source};
use mango_feeds_connector::EntityFilter::{FilterByAccountIds, FilterByProgramIds};
use mango_feeds_connector::EntityFilter::{FilterByAccountIds, FilterByProgramId};
///
/// test with local test-valiator (1.16.1)
@ -36,7 +36,7 @@ async fn main() -> anyhow::Result<()> {
// program_ids and account_ids are xor'd
let filter_config1 = FilterConfig {
entity_filter: FilterByProgramIds(vec!["11111111111111111111111111111111".to_string()]),
entity_filter: FilterByProgramId("11111111111111111111111111111111".to_string()),
};
let filter_config2 = FilterConfig {

View File

@ -94,7 +94,7 @@ pub async fn get_snapshot_gma(
})
}
pub async fn get_snapshot(
pub async fn __remove_get_snapshot(
rpc_http_url: String,
filter_config: &FilterConfig,
) -> anyhow::Result<(Slot, Vec<(String, Option<UiAccount>)>)> {

View File

@ -23,8 +23,9 @@ use anyhow::Context;
use solana_client::rpc_response::RpcBlockUpdateError::UnsupportedTransactionVersion;
use tokio::time::timeout;
use crate::{chain_data::SlotStatus, AccountWrite, AnyhowWrap, FilterConfig, SlotUpdate, SourceConfig};
use crate::{chain_data::SlotStatus, AccountWrite, AnyhowWrap, FilterConfig, SlotUpdate, SourceConfig, EntityFilter};
use crate::debouncer::Debouncer;
use crate::snapshot::{get_snapshot_gma, get_snapshot_gpa, SnapshotMultipleAccounts, SnapshotProgramAccounts};
const SNAPSHOT_MAX_AGE: Duration = Duration::from_secs(300);
const WS_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
@ -36,8 +37,6 @@ enum WebsocketMessage {
SlotUpdate(Arc<solana_client::rpc_response::SlotUpdate>),
}
// TODO: the reconnecting should be part of this
// consume data until an error happens; error must be handled by caller by reconnecting
async fn feed_data(
config: &SourceConfig,
filter_config: &FilterConfig,
@ -45,6 +44,21 @@ async fn feed_data(
debouncer_errorlog: Arc<Debouncer>,
) -> anyhow::Result<()> {
match &filter_config.entity_filter {
EntityFilter::FilterByAccountIds(account_ids) => feed_data_by_account(config, account_ids.clone(), sender, debouncer_errorlog).await,
EntityFilter::FilterByProgramId(program_id) => feed_data_by_program(config, program_id.clone(), sender, debouncer_errorlog).await,
}
}
// TODO: the reconnecting should be part of this
// consume data until an error happens; error must be handled by caller by reconnecting
async fn feed_data_by_account(
config: &SourceConfig,
account_ids: Vec<String>,
sender: async_channel::Sender<WebsocketMessage>,
debouncer_errorlog: Arc<Debouncer>,
) -> anyhow::Result<()> {
let rpc_ws_url: &str = &config.rpc_ws_url;
let connect = ws::try_connect::<RpcSolPubSubClient>(rpc_ws_url).map_err_anyhow()?;
@ -64,10 +78,9 @@ async fn feed_data(
};
let mut account_subs = SelectAll::new();
let mut program_subs = SelectAll::new();
// let mut program_subs = SelectAll::new();
if filter_config.program_ids.is_empty() {
for account_id in filter_config.account_ids.clone() {
for account_id in &account_ids {
account_subs.push(
client
.account_subscribe(account_id.clone(), Some(account_info_config.clone()))
@ -78,16 +91,6 @@ async fn feed_data(
.map_err_anyhow()?,
);
}
} else {
info!("FilterConfig specified program_ids, ignoring account_ids");
for program_id in filter_config.program_ids.clone() {
program_subs.push(
client
.program_subscribe(program_id, Some(program_accounts_config.clone()))
.map_err_anyhow()?,
);
}
}
let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?;
@ -100,26 +103,33 @@ async fn feed_data(
// including the first time
if last_snapshot + SNAPSHOT_MAX_AGE <= Instant::now() {
let snapshot_rpc_http_url = config.snapshot.rpc_http_url.clone();
let snapshot = crate::snapshot::get_snapshot(snapshot_rpc_http_url.clone(), filter_config).await;
if let Ok((slot, accounts)) = snapshot {
// TODO split gMA + gPA
// let snapshot = crate::snapshot::get_snapshot(snapshot_rpc_http_url.clone(), filter_config).await;
let response =
get_snapshot_gma(snapshot_rpc_http_url.clone(), account_ids.clone()).await;
let snapshot = response.context("gma snapshot response").map_err_anyhow();
match snapshot {
Ok(SnapshotMultipleAccounts { snapshot_slot, snapshot_accounts }) => {
debug!(
"fetched new snapshot slot={slot} len={:?} time={:?}",
accounts.len(),
"fetched new snapshot slot={} len={:?} time={:?}",
snapshot_slot,
snapshot_accounts.len(),
Instant::now() - SNAPSHOT_MAX_AGE - last_snapshot
);
sender
.send(WebsocketMessage::SnapshotUpdate((slot, accounts)))
.send(WebsocketMessage::SnapshotUpdate((snapshot_slot, snapshot_accounts)))
.await
.expect("sending must succeed");
} else {
}
Err(err) => {
if debouncer_errorlog.can_fire() {
warn!("failed to parse snapshot from rpc url {}, filter_config {:?}", snapshot_rpc_http_url.clone(), filter_config);
warn!("failed to parse snapshot from rpc url {}: {}", snapshot_rpc_http_url.clone(), err);
}
}
}
last_snapshot = Instant::now();
}
if filter_config.program_ids.is_empty() {
tokio::select! {
account = account_subs.next() => {
match account {
@ -139,9 +149,11 @@ async fn feed_data(
if debouncer_errorlog.can_fire() {
warn!("account stream closed");
}
if filter_config.program_ids.is_empty() {
// that is weired - assume that it is okey to cancel in all cases
// if filter_config.program_ids.is_empty() {
// return Ok(());
// }
return Ok(());
}
},
}
},
@ -161,7 +173,88 @@ async fn feed_data(
return Ok(())
}
}
} else {
}
}
async fn feed_data_by_program(
config: &SourceConfig,
program_id: String,
sender: async_channel::Sender<WebsocketMessage>,
debouncer_errorlog: Arc<Debouncer>,
) -> anyhow::Result<()> {
let rpc_ws_url: &str = &config.rpc_ws_url;
let connect = ws::try_connect::<RpcSolPubSubClient>(rpc_ws_url).map_err_anyhow()?;
let timeout = timeout(WS_CONNECT_TIMEOUT, connect).await?;
let client = timeout.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::processed()),
data_slice: None,
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
with_context: Some(true),
account_config: account_info_config.clone(),
};
let mut program_subs = SelectAll::new();
info!("FilterConfig specified program_ids, ignoring account_ids");
program_subs.push(
client
.program_subscribe(program_id.clone(), Some(program_accounts_config.clone()))
.map_err_anyhow()?,
);
let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?;
let mut last_snapshot = Instant::now().sub(SNAPSHOT_MAX_AGE);
// consume from channels unitl an error happens
loop {
// occasionally cause a new snapshot to be produced
// including the first time
if last_snapshot + SNAPSHOT_MAX_AGE <= Instant::now() {
let snapshot_rpc_http_url = config.snapshot.rpc_http_url.clone();
// let snapshot = crate::snapshot::get_snapshot(snapshot_rpc_http_url.clone(), filter_config).await;
let response =
get_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.clone()).await;
let snapshot = response.context("gpa snapshot response").map_err_anyhow();
match snapshot {
Ok(SnapshotProgramAccounts { snapshot_slot, snapshot_accounts } ) => {
let accounts: Vec<(String, Option<UiAccount>)> = snapshot_accounts
.iter()
.map(|x| {
let deref = x.clone();
(deref.pubkey, Some(deref.account))
})
.collect();
debug!(
"fetched new snapshot slot={} len={:?} time={:?}",
snapshot_slot,
accounts.len(),
Instant::now() - SNAPSHOT_MAX_AGE - last_snapshot
);
sender
.send(WebsocketMessage::SnapshotUpdate((snapshot_slot, accounts)))
.await
.expect("sending must succeed");
}
Err(err) => {
if debouncer_errorlog.can_fire() {
warn!("failed to parse snapshot from rpc url {}: {}", snapshot_rpc_http_url.clone(), err);
}
}
}
last_snapshot = Instant::now();
}
tokio::select! {
program_account = program_subs.next() => {
match program_account {
@ -192,7 +285,6 @@ async fn feed_data(
}
}
}
}
// TODO: rename / split / rework
pub async fn process_events(