liquidator: forcefully exit process if snapshot job die (#924)

* liquidator: forcefully exit process if snapshot job die

* client: return snapshot_job join handle so it can be watched for early unexpected exit
This commit is contained in:
Serge Farny 2024-04-01 14:45:01 +02:00 committed by GitHub
parent 0b7e62e671
commit 2520c7d095
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 38 additions and 18 deletions

View File

@ -67,7 +67,7 @@ pub async fn save_snapshot(
.await?;
// Getting solana account snapshots via jsonrpc
snapshot_source::start(
let snapshot_job = snapshot_source::start(
snapshot_source::Config {
rpc_http_url: rpc_url.clone(),
mango_group,
@ -79,6 +79,11 @@ pub async fn save_snapshot(
extra_accounts,
account_update_sender,
);
tokio::spawn(async move {
let res = snapshot_job.await;
tracing::error!("Snapshot job exited, terminating process.. ({:?})", res);
std::process::exit(-1);
});
let mut chain_data = chain_data::ChainData::new();

View File

@ -169,7 +169,7 @@ async fn main() -> anyhow::Result<()> {
// Getting solana account snapshots via jsonrpc
// FUTURE: of what to fetch a snapshot - should probably take as an input
snapshot_source::start(
let snapshot_job = snapshot_source::start(
snapshot_source::Config {
rpc_http_url: rpc_url.clone(),
mango_group,
@ -456,12 +456,16 @@ async fn main() -> anyhow::Result<()> {
spawn_token_swap_refresh_job(&cli, shared_state, token_swap_info_updater);
let check_changes_for_abort_job = spawn_context_change_watchdog_job(mango_client.clone());
let mut jobs: futures::stream::FuturesUnordered<_> =
vec![data_job, token_swap_info_job, check_changes_for_abort_job]
.into_iter()
.chain(optional_jobs)
.chain(prio_jobs.into_iter())
.collect();
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
snapshot_job,
data_job,
token_swap_info_job,
check_changes_for_abort_job,
]
.into_iter()
.chain(optional_jobs)
.chain(prio_jobs.into_iter())
.collect();
jobs.next().await;
error!("a critical job aborted, exiting");

View File

@ -69,7 +69,8 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
let mut jobs = vec![exit_processor.job, data_processor.job, health_processor.job];
let mut jobs = vec![exit_processor.job, health_processor.job];
jobs.extend(data_processor.jobs);
if let Some(logger) = logger {
jobs.push(logger.job)

View File

@ -22,7 +22,7 @@ use tracing::warn;
pub struct DataProcessor {
pub channel: tokio::sync::broadcast::Sender<DataEvent>,
pub job: JoinHandle<()>,
pub jobs: Vec<JoinHandle<()>>,
pub chain_data: Arc<RwLock<chain_data::ChainData>>,
}
@ -52,7 +52,7 @@ impl DataProcessor {
) -> anyhow::Result<DataProcessor> {
let mut retry_counter = RetryCounter::new(2);
let mango_group = Pubkey::from_str(&configuration.mango_group)?;
let mango_stream =
let (mango_stream, snapshot_job) =
fail_or_retry!(retry_counter, Self::init_mango_source(configuration).await)?;
let (sender, _) = tokio::sync::broadcast::channel(8192);
let sender_clone = sender.clone();
@ -98,7 +98,7 @@ impl DataProcessor {
let result = DataProcessor {
channel: sender,
job,
jobs: vec![job, snapshot_job],
chain_data,
};
@ -147,7 +147,9 @@ impl DataProcessor {
return Some(Other);
}
async fn init_mango_source(configuration: &Configuration) -> anyhow::Result<Receiver<Message>> {
async fn init_mango_source(
configuration: &Configuration,
) -> anyhow::Result<(Receiver<Message>, JoinHandle<()>)> {
//
// Client setup
//
@ -192,7 +194,7 @@ impl DataProcessor {
// Getting solana account snapshots via jsonrpc
// FUTURE: of what to fetch a snapshot - should probably take as an input
snapshot_source::start(
let snapshot_job = snapshot_source::start(
snapshot_source::Config {
rpc_http_url: configuration.rpc_http_url.clone(),
mango_group,
@ -205,6 +207,6 @@ impl DataProcessor {
account_update_sender,
);
Ok(account_update_receiver)
Ok((account_update_receiver, snapshot_job))
}
}

View File

@ -178,7 +178,7 @@ async fn main() -> anyhow::Result<()> {
// Getting solana account snapshots via jsonrpc
// FUTURE: of what to fetch a snapshot - should probably take as an input
snapshot_source::start(
let snapshot_job = snapshot_source::start(
snapshot_source::Config {
rpc_http_url: rpc_url.clone(),
mango_group,
@ -353,6 +353,7 @@ async fn main() -> anyhow::Result<()> {
use futures::StreamExt;
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
snapshot_job,
data_job,
settle_job,
tcs_start_job,

View File

@ -16,6 +16,7 @@ use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient;
use std::str::FromStr;
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
use tokio::time;
use tracing::*;
@ -223,11 +224,15 @@ async fn feed_snapshots(
Ok(())
}
pub fn start(config: Config, mango_oracles: Vec<Pubkey>, sender: async_channel::Sender<Message>) {
pub fn start(
config: Config,
mango_oracles: Vec<Pubkey>,
sender: async_channel::Sender<Message>,
) -> JoinHandle<()> {
let mut poll_wait_first_snapshot = crate::delay_interval(time::Duration::from_secs(2));
let mut interval_between_snapshots = crate::delay_interval(config.snapshot_interval);
tokio::spawn(async move {
let snapshot_job = tokio::spawn(async move {
let rpc_client = http::connect_with_options::<MinimalClient>(&config.rpc_http_url, true)
.await
.expect("always Ok");
@ -260,4 +265,6 @@ pub fn start(config: Config, mango_oracles: Vec<Pubkey>, sender: async_channel::
};
}
});
snapshot_job
}