From 8e25a50a289ca984415d55c0ca115c32109df577 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 25 Aug 2023 13:06:44 +0200 Subject: [PATCH] refactor websocket to use new filter struct --- connector/examples/websocket_consumer.rs | 4 +- connector/src/snapshot.rs | 2 +- connector/src/websocket_source.rs | 300 +++++++++++++++-------- 3 files changed, 199 insertions(+), 107 deletions(-) diff --git a/connector/examples/websocket_consumer.rs b/connector/examples/websocket_consumer.rs index 0a3e650..c2c1826 100644 --- a/connector/examples/websocket_consumer.rs +++ b/connector/examples/websocket_consumer.rs @@ -1,5 +1,5 @@ use mango_feeds_connector::{AccountWrite, FilterConfig, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, websocket_source}; -use mango_feeds_connector::EntityFilter::{FilterByAccountIds, FilterByProgramIds}; +use mango_feeds_connector::EntityFilter::{FilterByAccountIds, FilterByProgramId}; /// /// test with local test-valiator (1.16.1) @@ -36,7 +36,7 @@ async fn main() -> anyhow::Result<()> { // program_ids and account_ids are xor'd let filter_config1 = FilterConfig { - entity_filter: FilterByProgramIds(vec!["11111111111111111111111111111111".to_string()]), + entity_filter: FilterByProgramId("11111111111111111111111111111111".to_string()), }; let filter_config2 = FilterConfig { diff --git a/connector/src/snapshot.rs b/connector/src/snapshot.rs index 7c0f233..446dacf 100644 --- a/connector/src/snapshot.rs +++ b/connector/src/snapshot.rs @@ -94,7 +94,7 @@ pub async fn get_snapshot_gma( }) } -pub async fn get_snapshot( +pub async fn __remove_get_snapshot( rpc_http_url: String, filter_config: &FilterConfig, ) -> anyhow::Result<(Slot, Vec<(String, Option)>)> { diff --git a/connector/src/websocket_source.rs b/connector/src/websocket_source.rs index 9de55b9..985d151 100644 --- a/connector/src/websocket_source.rs +++ b/connector/src/websocket_source.rs @@ -23,8 +23,9 @@ use anyhow::Context; use solana_client::rpc_response::RpcBlockUpdateError::UnsupportedTransactionVersion; use tokio::time::timeout; -use crate::{chain_data::SlotStatus, AccountWrite, AnyhowWrap, FilterConfig, SlotUpdate, SourceConfig}; +use crate::{chain_data::SlotStatus, AccountWrite, AnyhowWrap, FilterConfig, SlotUpdate, SourceConfig, EntityFilter}; use crate::debouncer::Debouncer; +use crate::snapshot::{get_snapshot_gma, get_snapshot_gpa, SnapshotMultipleAccounts, SnapshotProgramAccounts}; const SNAPSHOT_MAX_AGE: Duration = Duration::from_secs(300); const WS_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000); @@ -36,8 +37,6 @@ enum WebsocketMessage { SlotUpdate(Arc), } -// TODO: the reconnecting should be part of this -// consume data until an error happens; error must be handled by caller by reconnecting async fn feed_data( config: &SourceConfig, filter_config: &FilterConfig, @@ -45,6 +44,21 @@ async fn feed_data( debouncer_errorlog: Arc, ) -> anyhow::Result<()> { + match &filter_config.entity_filter { + EntityFilter::FilterByAccountIds(account_ids) => feed_data_by_account(config, account_ids.clone(), sender, debouncer_errorlog).await, + EntityFilter::FilterByProgramId(program_id) => feed_data_by_program(config, program_id.clone(), sender, debouncer_errorlog).await, + } +} + +// TODO: the reconnecting should be part of this +// consume data until an error happens; error must be handled by caller by reconnecting +async fn feed_data_by_account( + config: &SourceConfig, + account_ids: Vec, + sender: async_channel::Sender, + debouncer_errorlog: Arc, +) -> anyhow::Result<()> { + let rpc_ws_url: &str = &config.rpc_ws_url; let connect = ws::try_connect::(rpc_ws_url).map_err_anyhow()?; @@ -64,29 +78,18 @@ async fn feed_data( }; let mut account_subs = SelectAll::new(); - let mut program_subs = SelectAll::new(); + // let mut program_subs = SelectAll::new(); - if filter_config.program_ids.is_empty() { - for account_id in filter_config.account_ids.clone() { - account_subs.push( - client - .account_subscribe(account_id.clone(), Some(account_info_config.clone())) - .map(|s| { - let account_id = account_id.clone(); - s.map(move |r| (account_id.clone(), r)) - }) - .map_err_anyhow()?, - ); - } - } else { - info!("FilterConfig specified program_ids, ignoring account_ids"); - for program_id in filter_config.program_ids.clone() { - program_subs.push( - client - .program_subscribe(program_id, Some(program_accounts_config.clone())) - .map_err_anyhow()?, - ); - } + for account_id in &account_ids { + account_subs.push( + client + .account_subscribe(account_id.clone(), Some(account_info_config.clone())) + .map(|s| { + let account_id = account_id.clone(); + s.map(move |r| (account_id.clone(), r)) + }) + .map_err_anyhow()?, + ); } let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?; @@ -100,95 +103,184 @@ async fn feed_data( // including the first time if last_snapshot + SNAPSHOT_MAX_AGE <= Instant::now() { let snapshot_rpc_http_url = config.snapshot.rpc_http_url.clone(); - let snapshot = crate::snapshot::get_snapshot(snapshot_rpc_http_url.clone(), filter_config).await; - if let Ok((slot, accounts)) = snapshot { - debug!( - "fetched new snapshot slot={slot} len={:?} time={:?}", - accounts.len(), - Instant::now() - SNAPSHOT_MAX_AGE - last_snapshot + // TODO split gMA + gPA + // let snapshot = crate::snapshot::get_snapshot(snapshot_rpc_http_url.clone(), filter_config).await; + let response = + get_snapshot_gma(snapshot_rpc_http_url.clone(), account_ids.clone()).await; + let snapshot = response.context("gma snapshot response").map_err_anyhow(); + match snapshot { + Ok(SnapshotMultipleAccounts { snapshot_slot, snapshot_accounts }) => { + debug!( + "fetched new snapshot slot={} len={:?} time={:?}", + snapshot_slot, + snapshot_accounts.len(), + Instant::now() - SNAPSHOT_MAX_AGE - last_snapshot ); - sender - .send(WebsocketMessage::SnapshotUpdate((slot, accounts))) - .await - .expect("sending must succeed"); - } else { - if debouncer_errorlog.can_fire() { - warn!("failed to parse snapshot from rpc url {}, filter_config {:?}", snapshot_rpc_http_url.clone(), filter_config); + sender + .send(WebsocketMessage::SnapshotUpdate((snapshot_slot, snapshot_accounts))) + .await + .expect("sending must succeed"); + } + Err(err) => { + if debouncer_errorlog.can_fire() { + warn!("failed to parse snapshot from rpc url {}: {}", snapshot_rpc_http_url.clone(), err); + } } } last_snapshot = Instant::now(); } - if filter_config.program_ids.is_empty() { - tokio::select! { - account = account_subs.next() => { - match account { - Some((account_id, response)) => { - sender.send( - WebsocketMessage::SingleUpdate( - response - .map( |r: Response| - Response { - context: r.context, - value: RpcKeyedAccount { - pubkey: account_id.clone(), - account: r.value }}) - .map_err_anyhow()?)).await.expect("sending must succeed"); - }, - None => { - if debouncer_errorlog.can_fire() { - warn!("account stream closed"); - } - if filter_config.program_ids.is_empty() { - return Ok(()); - } - }, + tokio::select! { + account = account_subs.next() => { + match account { + Some((account_id, response)) => { + sender.send( + WebsocketMessage::SingleUpdate( + response + .map( |r: Response| + Response { + context: r.context, + value: RpcKeyedAccount { + pubkey: account_id.clone(), + account: r.value }}) + .map_err_anyhow()?)).await.expect("sending must succeed"); + }, + None => { + if debouncer_errorlog.can_fire() { + warn!("account stream closed"); + } + // that is weired - assume that it is okey to cancel in all cases + // if filter_config.program_ids.is_empty() { + // return Ok(()); + // } + return Ok(()); + }, + } + }, + slot_update = slot_sub.next() => { + match slot_update { + Some(slot_update) => { + sender.send(WebsocketMessage::SlotUpdate(slot_update.map_err_anyhow()?)).await.expect("sending must succeed"); + }, + None => { + warn!("slot update stream closed"); + return Ok(()); + }, + } + }, + _ = tokio::time::sleep(Duration::from_secs(60)) => { + warn!("websocket timeout"); + return Ok(()) + } + } + + } +} + +async fn feed_data_by_program( + config: &SourceConfig, + program_id: String, + sender: async_channel::Sender, + debouncer_errorlog: Arc, +) -> anyhow::Result<()> { + + let rpc_ws_url: &str = &config.rpc_ws_url; + + let connect = ws::try_connect::(rpc_ws_url).map_err_anyhow()?; + let timeout = timeout(WS_CONNECT_TIMEOUT, connect).await?; + let client = timeout.map_err_anyhow()?; + + let account_info_config = RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + commitment: Some(CommitmentConfig::processed()), + data_slice: None, + min_context_slot: None, + }; + let program_accounts_config = RpcProgramAccountsConfig { + filters: None, + with_context: Some(true), + account_config: account_info_config.clone(), + }; + + let mut program_subs = SelectAll::new(); + + info!("FilterConfig specified program_ids, ignoring account_ids"); + program_subs.push( + client + .program_subscribe(program_id.clone(), Some(program_accounts_config.clone())) + .map_err_anyhow()?, + ); + + let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?; + + let mut last_snapshot = Instant::now().sub(SNAPSHOT_MAX_AGE); + + // consume from channels unitl an error happens + loop { + + // occasionally cause a new snapshot to be produced + // including the first time + if last_snapshot + SNAPSHOT_MAX_AGE <= Instant::now() { + let snapshot_rpc_http_url = config.snapshot.rpc_http_url.clone(); + // let snapshot = crate::snapshot::get_snapshot(snapshot_rpc_http_url.clone(), filter_config).await; + let response = + get_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.clone()).await; + let snapshot = response.context("gpa snapshot response").map_err_anyhow(); + match snapshot { + Ok(SnapshotProgramAccounts { snapshot_slot, snapshot_accounts } ) => { + let accounts: Vec<(String, Option)> = snapshot_accounts + .iter() + .map(|x| { + let deref = x.clone(); + (deref.pubkey, Some(deref.account)) + }) + .collect(); + debug!( + "fetched new snapshot slot={} len={:?} time={:?}", + snapshot_slot, + accounts.len(), + Instant::now() - SNAPSHOT_MAX_AGE - last_snapshot + ); + sender + .send(WebsocketMessage::SnapshotUpdate((snapshot_slot, accounts))) + .await + .expect("sending must succeed"); + } + Err(err) => { + if debouncer_errorlog.can_fire() { + warn!("failed to parse snapshot from rpc url {}: {}", snapshot_rpc_http_url.clone(), err); } - }, - slot_update = slot_sub.next() => { - match slot_update { - Some(slot_update) => { - sender.send(WebsocketMessage::SlotUpdate(slot_update.map_err_anyhow()?)).await.expect("sending must succeed"); - }, - None => { - warn!("slot update stream closed"); - return Ok(()); - }, - } - }, - _ = tokio::time::sleep(Duration::from_secs(60)) => { - warn!("websocket timeout"); - return Ok(()) } } - } else { - tokio::select! { - program_account = program_subs.next() => { - match program_account { - Some(account) => { - sender.send(WebsocketMessage::SingleUpdate(account.map_err_anyhow()?)).await.expect("sending must succeed"); - }, - None => { - warn!("program account stream closed"); - return Ok(()); - }, - } - }, - slot_update = slot_sub.next() => { - match slot_update { - Some(slot_update) => { - sender.send(WebsocketMessage::SlotUpdate(slot_update.map_err_anyhow()?)).await.expect("sending must succeed"); - }, - None => { - warn!("slot update stream closed"); - return Ok(()); - }, - } - }, - _ = tokio::time::sleep(Duration::from_secs(60)) => { - warn!("websocket timeout"); - return Ok(()) + last_snapshot = Instant::now(); + } + + tokio::select! { + program_account = program_subs.next() => { + match program_account { + Some(account) => { + sender.send(WebsocketMessage::SingleUpdate(account.map_err_anyhow()?)).await.expect("sending must succeed"); + }, + None => { + warn!("program account stream closed"); + return Ok(()); + }, } + }, + slot_update = slot_sub.next() => { + match slot_update { + Some(slot_update) => { + sender.send(WebsocketMessage::SlotUpdate(slot_update.map_err_anyhow()?)).await.expect("sending must succeed"); + }, + None => { + warn!("slot update stream closed"); + return Ok(()); + }, + } + }, + _ = tokio::time::sleep(Duration::from_secs(60)) => { + warn!("websocket timeout"); + return Ok(()) } } }