diff --git a/connector-mango/example-config.toml b/connector-mango/example-config.toml index 50bb4c0..5c2c5a2 100644 --- a/connector-mango/example-config.toml +++ b/connector-mango/example-config.toml @@ -1,6 +1,11 @@ -grpc_connection_string = "http://[::1]:10000" -rpc_http_url = "" rpc_ws_url = "" + +[grpc_source] +connection_string = "http://[::1]:10000" +retry_connection_sleep_secs = 30 + +[snapshot_source] +rpc_http_url = "" program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68" [postgres_target] diff --git a/connector-raw/example-config.toml b/connector-raw/example-config.toml index bebaf70..6f6beaf 100644 --- a/connector-raw/example-config.toml +++ b/connector-raw/example-config.toml @@ -1,6 +1,11 @@ -grpc_connection_string = "http://[::1]:10000" -rpc_http_url = "" rpc_ws_url = "" + +[grpc_source] +connection_string = "http://[::1]:10000" +retry_connection_sleep_secs = 30 + +[snapshot_source] +rpc_http_url = "" program_id = "" [postgres_target] diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 9b5264a..7d924bd 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -74,10 +74,11 @@ async fn feed_data_accountsdb( config: &Config, sender: async_channel::Sender, ) -> Result<(), anyhow::Error> { - let program_id = Pubkey::from_str(&config.program_id)?; + let program_id = Pubkey::from_str(&config.snapshot_source.program_id)?; let mut client = - AccountsDbClient::connect(Endpoint::from_str(&config.grpc_connection_string)?).await?; + AccountsDbClient::connect(Endpoint::from_str(&config.grpc_source.connection_string)?) + .await?; let mut update_stream = client .subscribe(accountsdb_proto::SubscribeRequest {}) @@ -93,6 +94,9 @@ async fn feed_data_accountsdb( let mut trigger_snapshot_on_slot = true; let mut snapshot_future = future::Fuse::terminated(); + // The plugin sends a ping every 5s or so + let fatal_idle_timeout = Duration::from_secs(60); + loop { tokio::select! { update = update_stream.next() => { @@ -102,7 +106,7 @@ async fn feed_data_accountsdb( let update = update?; if let UpdateOneof::SlotUpdate(slot_update) = update.update_oneof.as_ref().expect("invalid grpc") { if trigger_snapshot_on_slot && slot_update.status == Status::Processed as i32 { - snapshot_future = tokio::spawn(get_snapshot(config.rpc_http_url.clone(), program_id, slot_update.slot)).fuse(); + snapshot_future = tokio::spawn(get_snapshot(config.snapshot_source.rpc_http_url.clone(), program_id, slot_update.slot)).fuse(); trigger_snapshot_on_slot = false; } } @@ -119,7 +123,7 @@ async fn feed_data_accountsdb( .await .expect("send success"); }, - _ = tokio::time::sleep(Duration::from_secs(60)) => { + _ = tokio::time::sleep(fatal_idle_timeout) => { anyhow::bail!("accountsdb plugin hasn't sent a message in too long"); } } @@ -145,7 +149,10 @@ pub async fn process_events( err ); } - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs( + config.grpc_source.retry_connection_sleep_secs, + )) + .await; } }); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index eee47f9..081ed41 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -70,13 +70,24 @@ pub struct PostgresConfig { pub fatal_connection_timeout_secs: u64, } +#[derive(Clone, Debug, Deserialize)] +pub struct GrpcSourceConfig { + pub connection_string: String, + pub retry_connection_sleep_secs: u64, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct SnapshotSourceConfig { + pub rpc_http_url: String, + pub program_id: String, +} + #[derive(Clone, Debug, Deserialize)] pub struct Config { pub postgres_target: PostgresConfig, - pub grpc_connection_string: String, - pub rpc_http_url: String, + pub grpc_source: GrpcSourceConfig, + pub snapshot_source: SnapshotSourceConfig, pub rpc_ws_url: String, - pub program_id: String, } #[async_trait] diff --git a/lib/src/websocket_source.rs b/lib/src/websocket_source.rs index f27c532..5800cb8 100644 --- a/lib/src/websocket_source.rs +++ b/lib/src/websocket_source.rs @@ -30,22 +30,22 @@ async fn feed_data( config: &Config, sender: async_channel::Sender, ) -> Result<(), anyhow::Error> { - let program_id = Pubkey::from_str(&config.program_id)?; + let program_id = Pubkey::from_str(&config.snapshot_source.program_id)?; let snapshot_duration = Duration::from_secs(300); let connect = ws::try_connect::(&config.rpc_ws_url).map_err_anyhow()?; let client = connect.await.map_err_anyhow()?; - let rpc_client = http::connect_with_options::(&config.rpc_http_url, true) - .await - .map_err_anyhow()?; + let rpc_client = + http::connect_with_options::(&config.snapshot_source.rpc_http_url, true) + .await + .map_err_anyhow()?; let account_info_config = RpcAccountInfoConfig { encoding: Some(UiAccountEncoding::Base64), commitment: Some(CommitmentConfig::processed()), data_slice: None, }; - // TODO: Make addresses filters configurable let program_accounts_config = RpcProgramAccountsConfig { filters: None, with_context: Some(true),