Switch all tokio Interval to MissedTickBehavior::Delay (#840)

Burst can lead to undesired behavior, in particular with rate limits.
This commit is contained in:
Christian Kamm 2024-01-09 11:25:55 +01:00 committed by GitHub
parent 857dcb397f
commit 7a0cb6c8f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 38 additions and 26 deletions

View File

@ -31,7 +31,7 @@ pub async fn run(client: &Client, group: Pubkey) -> anyhow::Result<()> {
.map(|(_, p)| (p.oracle, *p))
.collect();
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
let mut interval = mango_v4_client::delay_interval(std::time::Duration::from_secs(5));
loop {
interval.tick().await;

View File

@ -12,7 +12,6 @@ use solana_sdk::{
instruction::{AccountMeta, Instruction},
pubkey::Pubkey,
};
use tokio::time;
use tracing::*;
use warp::Filter;
@ -155,7 +154,7 @@ pub async fn loop_update_index_and_rate(
token_indices: Vec<TokenIndex>,
interval: u64,
) {
let mut interval = time::interval(Duration::from_secs(interval));
let mut interval = mango_v4_client::delay_interval(Duration::from_secs(interval));
loop {
interval.tick().await;
@ -247,7 +246,7 @@ pub async fn loop_consume_events(
perp_market: &PerpMarketContext,
interval: u64,
) {
let mut interval = time::interval(Duration::from_secs(interval));
let mut interval = mango_v4_client::delay_interval(Duration::from_secs(interval));
loop {
interval.tick().await;
@ -365,7 +364,7 @@ pub async fn loop_update_funding(
perp_market: &PerpMarketContext,
interval: u64,
) {
let mut interval = time::interval(Duration::from_secs(interval));
let mut interval = mango_v4_client::delay_interval(Duration::from_secs(interval));
loop {
interval.tick().await;

View File

@ -116,7 +116,7 @@ async fn main() -> Result<(), anyhow::Error> {
);
let debugging_handle = async {
let mut interval = time::interval(time::Duration::from_secs(5));
let mut interval = mango_v4_client::delay_interval(time::Duration::from_secs(5));
loop {
interval.tick().await;
let client = mango_client.clone();

View File

@ -12,8 +12,6 @@ use mango_v4::{
};
use tracing::*;
use tokio::time;
use crate::MangoClient;
pub async fn runner(
@ -117,7 +115,7 @@ pub async fn loop_blocking_price_update(
token_index: TokenIndex,
price: Arc<RwLock<I80F48>>,
) {
let mut interval = time::interval(Duration::from_secs(1));
let mut interval = mango_v4_client::delay_interval(Duration::from_secs(1));
let token_name = &mango_client.context.token(token_index).name;
loop {
interval.tick().await;
@ -135,7 +133,7 @@ pub async fn loop_blocking_orders(
market_name: String,
price: Arc<RwLock<I80F48>>,
) {
let mut interval = time::interval(Duration::from_secs(5));
let mut interval = mango_v4_client::delay_interval(Duration::from_secs(5));
// Cancel existing orders
let orders: Vec<u128> = mango_client

View File

@ -447,7 +447,8 @@ async fn main() -> anyhow::Result<()> {
// But need to take care to abort if the above job aborts beforehand.
let liquidation_job = tokio::spawn({
let mut interval = tokio::time::interval(Duration::from_millis(cli.check_interval_ms));
let mut interval =
mango_v4_client::delay_interval(Duration::from_millis(cli.check_interval_ms));
let shared_state = shared_state.clone();
async move {
let mut must_rebalance = true;
@ -494,8 +495,8 @@ async fn main() -> anyhow::Result<()> {
let token_swap_info_job = tokio::spawn({
// TODO: configurable interval
let mut interval = tokio::time::interval(Duration::from_secs(60));
let mut startup_wait = tokio::time::interval(Duration::from_secs(1));
let mut interval = mango_v4_client::delay_interval(Duration::from_secs(60));
let mut startup_wait = mango_v4_client::delay_interval(Duration::from_secs(1));
let shared_state = shared_state.clone();
async move {
loop {
@ -512,7 +513,7 @@ async fn main() -> anyhow::Result<()> {
.keys()
.copied()
.collect_vec();
let mut min_delay = tokio::time::interval(Duration::from_secs(1));
let mut min_delay = mango_v4_client::delay_interval(Duration::from_secs(1));
for token_index in token_indexes {
min_delay.tick().await;
token_swap_info_updater.update_one(token_index).await;
@ -768,7 +769,7 @@ impl LiquidationState {
}
fn start_chain_data_metrics(chain: Arc<RwLock<chain_data::ChainData>>, metrics: &metrics::Metrics) {
let mut interval = tokio::time::interval(Duration::from_secs(600));
let mut interval = mango_v4_client::delay_interval(Duration::from_secs(600));
let mut metric_slots_count = metrics.register_u64("chain_data_slots_count".into());
let mut metric_accounts_count = metrics.register_u64("chain_data_accounts_count".into());

View File

@ -125,7 +125,7 @@ impl Metrics {
}
pub fn start() -> Metrics {
let mut write_interval = time::interval(time::Duration::from_secs(60));
let mut write_interval = mango_v4_client::delay_interval(time::Duration::from_secs(60));
let registry = Arc::new(RwLock::new(HashMap::<String, Value>::new()));
let registry_c = Arc::clone(&registry);

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use tracing::*;
pub async fn report_regularly(client: Arc<MangoClient>, min_health_ratio: f64) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
let mut interval = mango_v4_client::delay_interval(std::time::Duration::from_secs(600));
loop {
interval.tick().await;
if let Err(e) = report(&client, min_health_ratio).await {

View File

@ -571,7 +571,7 @@ async fn main() -> anyhow::Result<()> {
// keepalive
{
tokio::spawn(async move {
let mut write_interval = time::interval(time::Duration::from_secs(30));
let mut write_interval = mango_v4_client::delay_interval(time::Duration::from_secs(30));
loop {
write_interval.tick().await;

View File

@ -544,7 +544,7 @@ async fn main() -> anyhow::Result<()> {
let exit = exit.clone();
let peers = peers.clone();
tokio::spawn(async move {
let mut write_interval = time::interval(time::Duration::from_secs(30));
let mut write_interval = mango_v4_client::delay_interval(time::Duration::from_secs(30));
loop {
if exit.load(Ordering::Relaxed) {

View File

@ -293,7 +293,7 @@ async fn main() -> anyhow::Result<()> {
});
let settle_job = tokio::spawn({
let mut interval = tokio::time::interval(Duration::from_millis(100));
let mut interval = mango_v4_client::delay_interval(Duration::from_millis(100));
let shared_state = shared_state.clone();
async move {
loop {
@ -314,7 +314,7 @@ async fn main() -> anyhow::Result<()> {
});
let tcs_start_job = tokio::spawn({
let mut interval = tokio::time::interval(Duration::from_millis(100));
let mut interval = mango_v4_client::delay_interval(Duration::from_millis(100));
let shared_state = shared_state.clone();
async move {
loop {
@ -366,7 +366,7 @@ struct SharedState {
}
fn start_chain_data_metrics(chain: Arc<RwLock<chain_data::ChainData>>, metrics: &metrics::Metrics) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
let mut interval = mango_v4_client::delay_interval(std::time::Duration::from_secs(600));
let mut metric_slots_count = metrics.register_u64("chain_data_slots_count".into());
let mut metric_accounts_count = metrics.register_u64("chain_data_accounts_count".into());

View File

@ -125,7 +125,7 @@ impl Metrics {
}
pub fn start() -> Metrics {
let mut write_interval = time::interval(time::Duration::from_secs(60));
let mut write_interval = mango_v4_client::delay_interval(time::Duration::from_secs(60));
let registry = Arc::new(RwLock::new(HashMap::<String, Value>::new()));
let registry_c = Arc::clone(&registry);

View File

@ -1703,7 +1703,7 @@ impl MangoClient {
mango_client: Arc<MangoClient>,
interval: Duration,
) {
let mut delay = tokio::time::interval(interval);
let mut delay = crate::delay_interval(interval);
let rpc_async = mango_client.client.rpc_async();
loop {
delay.tick().await;

View File

@ -222,8 +222,8 @@ async fn feed_snapshots(
}
pub fn start(config: Config, mango_oracles: Vec<Pubkey>, sender: async_channel::Sender<Message>) {
let mut poll_wait_first_snapshot = time::interval(time::Duration::from_secs(2));
let mut interval_between_snapshots = time::interval(config.snapshot_interval);
let mut poll_wait_first_snapshot = crate::delay_interval(time::Duration::from_secs(2));
let mut interval_between_snapshots = crate::delay_interval(config.snapshot_interval);
tokio::spawn(async move {
let rpc_client = http::connect_with_options::<MinimalClient>(&config.rpc_http_url, true)

View File

@ -43,6 +43,20 @@ impl<T> AsyncChannelSendUnlessFull<T> for async_channel::Sender<T> {
}
}
/// Like tokio::time::interval(), but with Delay as default MissedTickBehavior
///
/// The default (Burst) means that if the time between tick() calls is longer
/// than `period` there'll be a burst of catch-up ticks.
///
/// This Interval guarantees that when tick() returns, at least `period` will have
/// elapsed since the last return. That way it's more appropriate for jobs that
/// don't need to catch up.
pub fn delay_interval(period: std::time::Duration) -> tokio::time::Interval {
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval
}
/// A copy of RpcClient::send_and_confirm_transaction that returns the slot the
/// transaction confirmed in.
pub fn send_and_confirm_transaction(