diff --git a/bin/cli/src/test_oracles.rs b/bin/cli/src/test_oracles.rs index c53377605..5b07bed05 100644 --- a/bin/cli/src/test_oracles.rs +++ b/bin/cli/src/test_oracles.rs @@ -31,7 +31,7 @@ pub async fn run(client: &Client, group: Pubkey) -> anyhow::Result<()> { .map(|(_, p)| (p.oracle, *p)) .collect(); - let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); + let mut interval = mango_v4_client::delay_interval(std::time::Duration::from_secs(5)); loop { interval.tick().await; diff --git a/bin/keeper/src/crank.rs b/bin/keeper/src/crank.rs index 8d7fbb073..90569d765 100644 --- a/bin/keeper/src/crank.rs +++ b/bin/keeper/src/crank.rs @@ -12,7 +12,6 @@ use solana_sdk::{ instruction::{AccountMeta, Instruction}, pubkey::Pubkey, }; -use tokio::time; use tracing::*; use warp::Filter; @@ -155,7 +154,7 @@ pub async fn loop_update_index_and_rate( token_indices: Vec, interval: u64, ) { - let mut interval = time::interval(Duration::from_secs(interval)); + let mut interval = mango_v4_client::delay_interval(Duration::from_secs(interval)); loop { interval.tick().await; @@ -247,7 +246,7 @@ pub async fn loop_consume_events( perp_market: &PerpMarketContext, interval: u64, ) { - let mut interval = time::interval(Duration::from_secs(interval)); + let mut interval = mango_v4_client::delay_interval(Duration::from_secs(interval)); loop { interval.tick().await; @@ -365,7 +364,7 @@ pub async fn loop_update_funding( perp_market: &PerpMarketContext, interval: u64, ) { - let mut interval = time::interval(Duration::from_secs(interval)); + let mut interval = mango_v4_client::delay_interval(Duration::from_secs(interval)); loop { interval.tick().await; diff --git a/bin/keeper/src/main.rs b/bin/keeper/src/main.rs index 30a3b4b0b..4e006363d 100644 --- a/bin/keeper/src/main.rs +++ b/bin/keeper/src/main.rs @@ -116,7 +116,7 @@ async fn main() -> Result<(), anyhow::Error> { ); let debugging_handle = async { - let mut interval = time::interval(time::Duration::from_secs(5)); + let mut interval = mango_v4_client::delay_interval(time::Duration::from_secs(5)); loop { interval.tick().await; let client = mango_client.clone(); diff --git a/bin/keeper/src/taker.rs b/bin/keeper/src/taker.rs index f1eb3eb27..90a56d0bb 100644 --- a/bin/keeper/src/taker.rs +++ b/bin/keeper/src/taker.rs @@ -12,8 +12,6 @@ use mango_v4::{ }; use tracing::*; -use tokio::time; - use crate::MangoClient; pub async fn runner( @@ -117,7 +115,7 @@ pub async fn loop_blocking_price_update( token_index: TokenIndex, price: Arc>, ) { - let mut interval = time::interval(Duration::from_secs(1)); + let mut interval = mango_v4_client::delay_interval(Duration::from_secs(1)); let token_name = &mango_client.context.token(token_index).name; loop { interval.tick().await; @@ -135,7 +133,7 @@ pub async fn loop_blocking_orders( market_name: String, price: Arc>, ) { - let mut interval = time::interval(Duration::from_secs(5)); + let mut interval = mango_v4_client::delay_interval(Duration::from_secs(5)); // Cancel existing orders let orders: Vec = mango_client diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index 05391acf4..05280f67b 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -447,7 +447,8 @@ async fn main() -> anyhow::Result<()> { // But need to take care to abort if the above job aborts beforehand. let liquidation_job = tokio::spawn({ - let mut interval = tokio::time::interval(Duration::from_millis(cli.check_interval_ms)); + let mut interval = + mango_v4_client::delay_interval(Duration::from_millis(cli.check_interval_ms)); let shared_state = shared_state.clone(); async move { let mut must_rebalance = true; @@ -494,8 +495,8 @@ async fn main() -> anyhow::Result<()> { let token_swap_info_job = tokio::spawn({ // TODO: configurable interval - let mut interval = tokio::time::interval(Duration::from_secs(60)); - let mut startup_wait = tokio::time::interval(Duration::from_secs(1)); + let mut interval = mango_v4_client::delay_interval(Duration::from_secs(60)); + let mut startup_wait = mango_v4_client::delay_interval(Duration::from_secs(1)); let shared_state = shared_state.clone(); async move { loop { @@ -512,7 +513,7 @@ async fn main() -> anyhow::Result<()> { .keys() .copied() .collect_vec(); - let mut min_delay = tokio::time::interval(Duration::from_secs(1)); + let mut min_delay = mango_v4_client::delay_interval(Duration::from_secs(1)); for token_index in token_indexes { min_delay.tick().await; token_swap_info_updater.update_one(token_index).await; @@ -768,7 +769,7 @@ impl LiquidationState { } fn start_chain_data_metrics(chain: Arc>, metrics: &metrics::Metrics) { - let mut interval = tokio::time::interval(Duration::from_secs(600)); + let mut interval = mango_v4_client::delay_interval(Duration::from_secs(600)); let mut metric_slots_count = metrics.register_u64("chain_data_slots_count".into()); let mut metric_accounts_count = metrics.register_u64("chain_data_accounts_count".into()); diff --git a/bin/liquidator/src/metrics.rs b/bin/liquidator/src/metrics.rs index 5f703a08f..526587576 100644 --- a/bin/liquidator/src/metrics.rs +++ b/bin/liquidator/src/metrics.rs @@ -125,7 +125,7 @@ impl Metrics { } pub fn start() -> Metrics { - let mut write_interval = time::interval(time::Duration::from_secs(60)); + let mut write_interval = mango_v4_client::delay_interval(time::Duration::from_secs(60)); let registry = Arc::new(RwLock::new(HashMap::::new())); let registry_c = Arc::clone(®istry); diff --git a/bin/liquidator/src/telemetry.rs b/bin/liquidator/src/telemetry.rs index d7f43f6fc..5cc87bf9a 100644 --- a/bin/liquidator/src/telemetry.rs +++ b/bin/liquidator/src/telemetry.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use tracing::*; pub async fn report_regularly(client: Arc, min_health_ratio: f64) { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(600)); + let mut interval = mango_v4_client::delay_interval(std::time::Duration::from_secs(600)); loop { interval.tick().await; if let Err(e) = report(&client, min_health_ratio).await { diff --git a/bin/service-mango-fills/src/main.rs b/bin/service-mango-fills/src/main.rs index 9350ce4c0..067ea32aa 100644 --- a/bin/service-mango-fills/src/main.rs +++ b/bin/service-mango-fills/src/main.rs @@ -571,7 +571,7 @@ async fn main() -> anyhow::Result<()> { // keepalive { tokio::spawn(async move { - let mut write_interval = time::interval(time::Duration::from_secs(30)); + let mut write_interval = mango_v4_client::delay_interval(time::Duration::from_secs(30)); loop { write_interval.tick().await; diff --git a/bin/service-mango-orderbook/src/main.rs b/bin/service-mango-orderbook/src/main.rs index 4e01f32a8..47abce5e2 100644 --- a/bin/service-mango-orderbook/src/main.rs +++ b/bin/service-mango-orderbook/src/main.rs @@ -544,7 +544,7 @@ async fn main() -> anyhow::Result<()> { let exit = exit.clone(); let peers = peers.clone(); tokio::spawn(async move { - let mut write_interval = time::interval(time::Duration::from_secs(30)); + let mut write_interval = mango_v4_client::delay_interval(time::Duration::from_secs(30)); loop { if exit.load(Ordering::Relaxed) { diff --git a/bin/settler/src/main.rs b/bin/settler/src/main.rs index 06b7b14b3..5c039b4c0 100644 --- a/bin/settler/src/main.rs +++ b/bin/settler/src/main.rs @@ -293,7 +293,7 @@ async fn main() -> anyhow::Result<()> { }); let settle_job = tokio::spawn({ - let mut interval = tokio::time::interval(Duration::from_millis(100)); + let mut interval = mango_v4_client::delay_interval(Duration::from_millis(100)); let shared_state = shared_state.clone(); async move { loop { @@ -314,7 +314,7 @@ async fn main() -> anyhow::Result<()> { }); let tcs_start_job = tokio::spawn({ - let mut interval = tokio::time::interval(Duration::from_millis(100)); + let mut interval = mango_v4_client::delay_interval(Duration::from_millis(100)); let shared_state = shared_state.clone(); async move { loop { @@ -366,7 +366,7 @@ struct SharedState { } fn start_chain_data_metrics(chain: Arc>, metrics: &metrics::Metrics) { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(600)); + let mut interval = mango_v4_client::delay_interval(std::time::Duration::from_secs(600)); let mut metric_slots_count = metrics.register_u64("chain_data_slots_count".into()); let mut metric_accounts_count = metrics.register_u64("chain_data_accounts_count".into()); diff --git a/bin/settler/src/metrics.rs b/bin/settler/src/metrics.rs index 5f703a08f..526587576 100644 --- a/bin/settler/src/metrics.rs +++ b/bin/settler/src/metrics.rs @@ -125,7 +125,7 @@ impl Metrics { } pub fn start() -> Metrics { - let mut write_interval = time::interval(time::Duration::from_secs(60)); + let mut write_interval = mango_v4_client::delay_interval(time::Duration::from_secs(60)); let registry = Arc::new(RwLock::new(HashMap::::new())); let registry_c = Arc::clone(®istry); diff --git a/lib/client/src/client.rs b/lib/client/src/client.rs index cab35d138..d17c6875b 100644 --- a/lib/client/src/client.rs +++ b/lib/client/src/client.rs @@ -1703,7 +1703,7 @@ impl MangoClient { mango_client: Arc, interval: Duration, ) { - let mut delay = tokio::time::interval(interval); + let mut delay = crate::delay_interval(interval); let rpc_async = mango_client.client.rpc_async(); loop { delay.tick().await; diff --git a/lib/client/src/snapshot_source.rs b/lib/client/src/snapshot_source.rs index ca7d1ca0d..d39ab0c2f 100644 --- a/lib/client/src/snapshot_source.rs +++ b/lib/client/src/snapshot_source.rs @@ -222,8 +222,8 @@ async fn feed_snapshots( } pub fn start(config: Config, mango_oracles: Vec, sender: async_channel::Sender) { - let mut poll_wait_first_snapshot = time::interval(time::Duration::from_secs(2)); - let mut interval_between_snapshots = time::interval(config.snapshot_interval); + let mut poll_wait_first_snapshot = crate::delay_interval(time::Duration::from_secs(2)); + let mut interval_between_snapshots = crate::delay_interval(config.snapshot_interval); tokio::spawn(async move { let rpc_client = http::connect_with_options::(&config.rpc_http_url, true) diff --git a/lib/client/src/util.rs b/lib/client/src/util.rs index 4ccf9c566..81669456f 100644 --- a/lib/client/src/util.rs +++ b/lib/client/src/util.rs @@ -43,6 +43,20 @@ impl AsyncChannelSendUnlessFull for async_channel::Sender { } } +/// Like tokio::time::interval(), but with Delay as default MissedTickBehavior +/// +/// The default (Burst) means that if the time between tick() calls is longer +/// than `period` there'll be a burst of catch-up ticks. +/// +/// This Interval guarantees that when tick() returns, at least `period` will have +/// elapsed since the last return. That way it's more appropriate for jobs that +/// don't need to catch up. +pub fn delay_interval(period: std::time::Duration) -> tokio::time::Interval { + let mut interval = tokio::time::interval(period); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + interval +} + /// A copy of RpcClient::send_and_confirm_transaction that returns the slot the /// transaction confirmed in. pub fn send_and_confirm_transaction(