diff --git a/bin/cli/src/save_snapshot.rs b/bin/cli/src/save_snapshot.rs index 4c0c375eb..b65524ca7 100644 --- a/bin/cli/src/save_snapshot.rs +++ b/bin/cli/src/save_snapshot.rs @@ -66,7 +66,7 @@ pub async fn save_snapshot( .await?; // Getting solana account snapshots via jsonrpc - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { rpc_http_url: rpc_url.clone(), mango_group, @@ -78,6 +78,11 @@ pub async fn save_snapshot( oracles_and_vaults, account_update_sender, ); + tokio::spawn(async move { + let res = snapshot_job.await; + tracing::error!("Snapshot job exited, terminating process.. ({:?})", res); + std::process::exit(-1); + }); let mut chain_data = chain_data::ChainData::new(); diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index 8fff17323..13d4c09d6 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -155,7 +155,7 @@ async fn main() -> anyhow::Result<()> { // Getting solana account snapshots via jsonrpc // FUTURE: of what to fetch a snapshot - should probably take as an input - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { rpc_http_url: rpc_url.clone(), mango_group, @@ -457,6 +457,7 @@ async fn main() -> anyhow::Result<()> { liquidation_job, token_swap_info_job, check_changes_for_abort_job, + snapshot_job, ] .into_iter() .chain(prio_jobs.into_iter()) diff --git a/bin/service-mango-health/src/main.rs b/bin/service-mango-health/src/main.rs index 9b3b5174a..377415fc7 100644 --- a/bin/service-mango-health/src/main.rs +++ b/bin/service-mango-health/src/main.rs @@ -64,7 +64,8 @@ async fn main() -> anyhow::Result<()> { ) .await?; - let mut jobs = vec![exit_processor.job, data_processor.job, health_processor.job]; + let mut jobs = vec![exit_processor.job, health_processor.job]; + jobs.extend(data_processor.jobs); if let Some(logger) = logger { jobs.push(logger.job) diff --git a/bin/service-mango-health/src/processors/data.rs b/bin/service-mango-health/src/processors/data.rs index c9b3f6ac5..24c5c85d8 100644 --- a/bin/service-mango-health/src/processors/data.rs +++ b/bin/service-mango-health/src/processors/data.rs @@ -22,7 +22,7 @@ use tracing::warn; pub struct DataProcessor { pub channel: tokio::sync::broadcast::Sender, - pub job: JoinHandle<()>, + pub jobs: Vec>, pub chain_data: Arc>, } @@ -52,7 +52,7 @@ impl DataProcessor { ) -> anyhow::Result { let mut retry_counter = RetryCounter::new(2); let mango_group = Pubkey::from_str(&configuration.mango_group)?; - let mango_stream = + let (mango_stream, snapshot_job) = fail_or_retry!(retry_counter, Self::init_mango_source(configuration).await)?; let (sender, _) = tokio::sync::broadcast::channel(8192); let sender_clone = sender.clone(); @@ -98,7 +98,7 @@ impl DataProcessor { let result = DataProcessor { channel: sender, - job, + jobs: vec![job, snapshot_job], chain_data, }; @@ -147,7 +147,9 @@ impl DataProcessor { return Some(Other); } - async fn init_mango_source(configuration: &Configuration) -> anyhow::Result> { + async fn init_mango_source( + configuration: &Configuration, + ) -> anyhow::Result<(Receiver, JoinHandle<()>)> { // // Client setup // @@ -192,7 +194,7 @@ impl DataProcessor { // Getting solana account snapshots via jsonrpc // FUTURE: of what to fetch a snapshot - should probably take as an input - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { rpc_http_url: configuration.rpc_http_url.clone(), mango_group, @@ -205,6 +207,6 @@ impl DataProcessor { account_update_sender, ); - Ok(account_update_receiver) + Ok((account_update_receiver, snapshot_job)) } } diff --git a/bin/settler/src/main.rs b/bin/settler/src/main.rs index 57f408a21..d1be966c4 100644 --- a/bin/settler/src/main.rs +++ b/bin/settler/src/main.rs @@ -178,7 +178,7 @@ async fn main() -> anyhow::Result<()> { // Getting solana account snapshots via jsonrpc // FUTURE: of what to fetch a snapshot - should probably take as an input - snapshot_source::start( + let snapshot_job = snapshot_source::start( snapshot_source::Config { rpc_http_url: rpc_url.clone(), mango_group, @@ -353,6 +353,7 @@ async fn main() -> anyhow::Result<()> { use futures::StreamExt; let mut jobs: futures::stream::FuturesUnordered<_> = vec![ + snapshot_job, data_job, settle_job, tcs_start_job, diff --git a/lib/client/src/snapshot_source.rs b/lib/client/src/snapshot_source.rs index d39ab0c2f..05c29f64c 100644 --- a/lib/client/src/snapshot_source.rs +++ b/lib/client/src/snapshot_source.rs @@ -16,6 +16,7 @@ use solana_rpc::rpc::rpc_accounts::AccountsDataClient; use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient; use std::str::FromStr; use std::time::Duration; +use tokio::task::JoinHandle; use tokio::time; use tracing::*; @@ -221,11 +222,15 @@ async fn feed_snapshots( Ok(()) } -pub fn start(config: Config, mango_oracles: Vec, sender: async_channel::Sender) { +pub fn start( + config: Config, + mango_oracles: Vec, + sender: async_channel::Sender, +) -> JoinHandle<()> { let mut poll_wait_first_snapshot = crate::delay_interval(time::Duration::from_secs(2)); let mut interval_between_snapshots = crate::delay_interval(config.snapshot_interval); - tokio::spawn(async move { + let snapshot_job = tokio::spawn(async move { let rpc_client = http::connect_with_options::(&config.rpc_http_url, true) .await .expect("always Ok"); @@ -258,4 +263,6 @@ pub fn start(config: Config, mango_oracles: Vec, sender: async_channel:: }; } }); + + snapshot_job }