From cdf6d54156e71f2a4bd4a827eb39e960cc25de2d Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Wed, 23 Mar 2022 08:51:17 +0100 Subject: [PATCH] Config: Separate out source config We want to be able to not have a postgres target --- connector-mango/example-config.toml | 8 +++++--- connector-mango/src/main.rs | 10 +++++++--- connector-raw/example-config.toml | 8 +++++--- connector-raw/src/main.rs | 10 +++++++--- lib/src/grpc_plugin_source.rs | 13 ++++++------- lib/src/lib.rs | 12 +++++++++--- lib/src/websocket_source.rs | 19 +++++++++---------- 7 files changed, 48 insertions(+), 32 deletions(-) diff --git a/connector-mango/example-config.toml b/connector-mango/example-config.toml index 1f44a34..5387e61 100644 --- a/connector-mango/example-config.toml +++ b/connector-mango/example-config.toml @@ -1,17 +1,19 @@ +[source] +dedup_queue_size = 50000 rpc_ws_url = "" -[[grpc_sources]] +[[source.grpc_sources]] name = "server" connection_string = "http://[::1]:10000" retry_connection_sleep_secs = 30 -#[grpc_sources.tls] +#[source.grpc_sources.tls] #ca_cert_path = "ca.pem" #client_cert_path = "client.pem" #client_key_path = "client.pem" #domain_name = "example.com" -[snapshot_source] +[source.snapshot] rpc_http_url = "" program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68" diff --git a/connector-mango/src/main.rs b/connector-mango/src/main.rs index 3a35d32..374e788 100644 --- a/connector-mango/src/main.rs +++ b/connector-mango/src/main.rs @@ -40,15 +40,19 @@ async fn main() -> anyhow::Result<()> { let use_geyser = true; if use_geyser { grpc_plugin_source::process_events( - config, + &config.source, account_write_queue_sender, slot_queue_sender, metrics_tx, ) .await; } else { - websocket_source::process_events(config, account_write_queue_sender, slot_queue_sender) - .await; + websocket_source::process_events( + &config.source, + account_write_queue_sender, + slot_queue_sender, + ) + .await; } Ok(()) diff --git a/connector-raw/example-config.toml b/connector-raw/example-config.toml index b46c18f..8f7aec7 100644 --- a/connector-raw/example-config.toml +++ b/connector-raw/example-config.toml @@ -1,17 +1,19 @@ +[source] +dedup_queue_size = 50000 rpc_ws_url = "" -[[grpc_sources]] +[[source.grpc_sources]] name = "server" connection_string = "http://[::1]:10000" retry_connection_sleep_secs = 30 -#[grpc_sources.tls] +#[source.grpc_sources.tls] #ca_cert_path = "ca.pem" #client_cert_path = "client.pem" #client_key_path = "client.pem" #domain_name = "example.com" -[snapshot_source] +[source.snapshot] rpc_http_url = "" program_id = "" diff --git a/connector-raw/src/main.rs b/connector-raw/src/main.rs index 9462d22..6b2ab0c 100644 --- a/connector-raw/src/main.rs +++ b/connector-raw/src/main.rs @@ -33,15 +33,19 @@ async fn main() -> anyhow::Result<()> { let use_geyser = true; if use_geyser { grpc_plugin_source::process_events( - config, + &config.source, account_write_queue_sender, slot_queue_sender, metrics_tx, ) .await; } else { - websocket_source::process_events(config, account_write_queue_sender, slot_queue_sender) - .await; + websocket_source::process_events( + &config.source, + account_write_queue_sender, + slot_queue_sender, + ) + .await; } Ok(()) diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index c7a9ad6..774bc0e 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -19,8 +19,8 @@ pub mod geyser_proto { use geyser_proto::accounts_db_client::AccountsDbClient; use crate::{ - metrics, AccountWrite, AnyhowWrap, Config, GrpcSourceConfig, SlotStatus, SlotUpdate, - SnapshotSourceConfig, TlsConfig, + metrics, AccountWrite, AnyhowWrap, GrpcSourceConfig, SlotStatus, SlotUpdate, + SnapshotSourceConfig, SourceConfig, TlsConfig, }; type SnapshotData = Response>; @@ -236,17 +236,16 @@ fn make_tls_config(config: &TlsConfig) -> ClientTlsConfig { } pub async fn process_events( - config: Config, + config: &SourceConfig, account_write_queue_sender: async_channel::Sender, slot_queue_sender: async_channel::Sender, metrics_sender: metrics::Metrics, ) { // Subscribe to geyser - let (msg_sender, msg_receiver) = - async_channel::bounded::(config.postgres_target.account_write_max_queue_size); - for grpc_source in config.grpc_sources { + let (msg_sender, msg_receiver) = async_channel::bounded::(config.dedup_queue_size); + for grpc_source in config.grpc_sources.clone() { let msg_sender = msg_sender.clone(); - let snapshot_source = config.snapshot_source.clone(); + let snapshot_source = config.snapshot.clone(); let metrics_sender = metrics_sender.clone(); // Make TLS config if configured diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 6b618ac..ac52713 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -108,6 +108,14 @@ pub struct GrpcSourceConfig { pub tls: Option, } +#[derive(Clone, Debug, Deserialize)] +pub struct SourceConfig { + pub dedup_queue_size: usize, + pub grpc_sources: Vec, + pub snapshot: SnapshotSourceConfig, + pub rpc_ws_url: String, +} + #[derive(Clone, Debug, Deserialize)] pub struct SnapshotSourceConfig { pub rpc_http_url: String, @@ -117,9 +125,7 @@ pub struct SnapshotSourceConfig { #[derive(Clone, Debug, Deserialize)] pub struct Config { pub postgres_target: PostgresConfig, - pub grpc_sources: Vec, - pub snapshot_source: SnapshotSourceConfig, - pub rpc_ws_url: String, + pub source: SourceConfig, } #[async_trait] diff --git a/lib/src/websocket_source.rs b/lib/src/websocket_source.rs index fc26aa7..06167cc 100644 --- a/lib/src/websocket_source.rs +++ b/lib/src/websocket_source.rs @@ -19,7 +19,7 @@ use std::{ time::{Duration, Instant}, }; -use crate::{AccountWrite, AnyhowWrap, Config, SlotStatus, SlotUpdate}; +use crate::{AccountWrite, AnyhowWrap, SlotStatus, SlotUpdate, SourceConfig}; enum WebsocketMessage { SingleUpdate(Response), @@ -29,21 +29,19 @@ enum WebsocketMessage { // TODO: the reconnecting should be part of this async fn feed_data( - config: &Config, + config: &SourceConfig, sender: async_channel::Sender, ) -> anyhow::Result<()> { - let program_id = Pubkey::from_str(&config.snapshot_source.program_id)?; + let program_id = Pubkey::from_str(&config.snapshot.program_id)?; let snapshot_duration = Duration::from_secs(300); let connect = ws::try_connect::(&config.rpc_ws_url).map_err_anyhow()?; let client = connect.await.map_err_anyhow()?; - let rpc_client = http::connect_with_options::( - &config.snapshot_source.rpc_http_url, - true, - ) - .await - .map_err_anyhow()?; + let rpc_client = + http::connect_with_options::(&config.snapshot.rpc_http_url, true) + .await + .map_err_anyhow()?; let account_info_config = RpcAccountInfoConfig { encoding: Some(UiAccountEncoding::Base64), @@ -119,12 +117,13 @@ async fn feed_data( // TODO: rename / split / rework pub async fn process_events( - config: Config, + config: &SourceConfig, account_write_queue_sender: async_channel::Sender, slot_queue_sender: async_channel::Sender, ) { // Subscribe to program account updates websocket let (update_sender, update_receiver) = async_channel::unbounded::(); + let config = config.clone(); tokio::spawn(async move { // if the websocket disconnects, we get no data in a while etc, reconnect and try again loop {