diff --git a/keeper/src/consume_events.rs b/keeper/src/consume_events.rs index 4607f382d..37d5d80af 100644 --- a/keeper/src/consume_events.rs +++ b/keeper/src/consume_events.rs @@ -1,8 +1,8 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use anchor_lang::{AccountDeserialize, __private::bytemuck::cast_ref}; -use log::{error, info, warn}; +use log::{info, warn}; use mango_v4::state::{EventQueue, EventType, FillEvent, OutEvent, PerpMarket}; use solana_sdk::{ @@ -13,26 +13,23 @@ use tokio::time; use crate::MangoClient; -pub async fn loop_blocking( - mango_client: &'static MangoClient, - pk: Pubkey, - perp_market: PerpMarket, -) { +pub async fn loop_blocking(mango_client: Arc, pk: Pubkey, perp_market: PerpMarket) { let mut interval = time::interval(Duration::from_secs(5)); loop { interval.tick().await; + let client = mango_client.clone(); tokio::task::spawn_blocking(move || { - perform_operation(mango_client, pk, perp_market).expect("Something went wrong here..."); + perform_operation(client, pk, perp_market).expect("Something went wrong here..."); }); } } pub fn perform_operation( - mango_client: &'static MangoClient, + mango_client: Arc, pk: Pubkey, perp_market: PerpMarket, ) -> anyhow::Result<()> { - let mut event_queue = match get_event_queue(mango_client, &perp_market) { + let mut event_queue = match get_event_queue(&mango_client, &perp_market) { Ok(value) => value, Err(value) => return value, }; @@ -101,7 +98,7 @@ pub fn perform_operation( sig ); } - Err(e) => error!("Crank: {:?}", e), + Err(e) => log::error!("Crank: {:?}", e), } Ok(()) @@ -128,6 +125,6 @@ fn get_event_queue( let mut data_slice: &[u8] = &data; AccountDeserialize::try_deserialize(&mut data_slice).ok() }; - let mut event_queue = event_queue_opt.unwrap(); + let event_queue = event_queue_opt.unwrap(); Ok(event_queue) } diff --git a/keeper/src/crank.rs b/keeper/src/crank.rs index 650210502..03d77eb08 100644 --- a/keeper/src/crank.rs +++ b/keeper/src/crank.rs @@ -1,14 +1,22 @@ +use std::sync::Arc; + use crate::{consume_events, update_index, MangoClient}; use anyhow::ensure; +use futures::Future; + use mango_v4::state::{Bank, PerpMarket}; use solana_client::rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType}; use solana_sdk::{pubkey::Pubkey, signer::Signer}; -pub async fn runner(mango_client: &'static MangoClient) -> Result<(), anyhow::Error> { + +pub async fn runner( + mango_client: Arc, + debugging_handle: impl Future, +) -> Result<(), anyhow::Error> { // Collect all banks for a group belonging to an admin let banks = mango_client .program() @@ -30,7 +38,7 @@ pub async fn runner(mango_client: &'static MangoClient) -> Result<(), anyhow::Er let handles1 = banks .iter() - .map(|(pk, bank)| update_index::loop_blocking(mango_client, *pk, *bank)) + .map(|(pk, bank)| update_index::loop_blocking(mango_client.clone(), *pk, *bank)) .collect::>(); // Collect all perp markets for a group belonging to an admin @@ -51,16 +59,20 @@ pub async fn runner(mango_client: &'static MangoClient) -> Result<(), anyhow::Er encoding: None, })])?; - ensure!(!perp_markets.is_empty()); + // TODO: enable + // ensure!(!perp_markets.is_empty()); let handles2 = perp_markets .iter() - .map(|(pk, perp_market)| consume_events::loop_blocking(mango_client, *pk, *perp_market)) + .map(|(pk, perp_market)| { + consume_events::loop_blocking(mango_client.clone(), *pk, *perp_market) + }) .collect::>(); futures::join!( futures::future::join_all(handles1), - futures::future::join_all(handles2) + futures::future::join_all(handles2), + debugging_handle ); Ok(()) diff --git a/keeper/src/main.rs b/keeper/src/main.rs index 09d308d80..ce5acc0e8 100644 --- a/keeper/src/main.rs +++ b/keeper/src/main.rs @@ -3,6 +3,7 @@ mod crank; mod update_index; use std::env; +use std::sync::Arc; use anchor_client::{Client, Cluster, Program}; @@ -16,6 +17,7 @@ use solana_sdk::{ pubkey::Pubkey, signer::{keypair, Signer}, }; +use tokio::time; // TODO // - may be nice to have one-shot cranking as well as the interval cranking @@ -150,18 +152,31 @@ fn main() -> Result<(), anyhow::Error> { Command::Liquidator {} => todo!(), }; - let mango_client: &'static _ = Box::leak(Box::new(MangoClient::new( - cluster, commitment, payer, admin, - ))); + let mango_client = Arc::new(MangoClient::new(cluster, commitment, payer, admin)); let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); + let debugging_handle = async { + let mut interval = time::interval(time::Duration::from_secs(5)); + loop { + interval.tick().await; + let client = mango_client.clone(); + tokio::task::spawn_blocking(move || { + log::info!( + "std::sync::Arc::strong_count() {}", + Arc::::strong_count(&client) + ) + }); + } + }; + match command { Command::Crank { .. } => { - let x: Result<(), anyhow::Error> = rt.block_on(crank::runner(mango_client)); + let client = mango_client.clone(); + let x: Result<(), anyhow::Error> = rt.block_on(crank::runner(client, debugging_handle)); x.expect("Something went wrong here..."); } Command::Liquidator { .. } => { diff --git a/keeper/src/update_index.rs b/keeper/src/update_index.rs index 911c3471b..15218ddd5 100644 --- a/keeper/src/update_index.rs +++ b/keeper/src/update_index.rs @@ -1,6 +1,6 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; -use log::{error, info}; +use log::{info}; use mango_v4::state::Bank; use solana_sdk::{instruction::Instruction, pubkey::Pubkey}; @@ -8,18 +8,19 @@ use tokio::time; use crate::MangoClient; -pub async fn loop_blocking(mango_client: &'static MangoClient, pk: Pubkey, bank: Bank) { +pub async fn loop_blocking(mango_client: Arc, pk: Pubkey, bank: Bank) { let mut interval = time::interval(Duration::from_secs(5)); loop { interval.tick().await; + let client = mango_client.clone(); tokio::task::spawn_blocking(move || { - perform_operation(mango_client, pk, bank).expect("Something went wrong here..."); + perform_operation(client, pk, bank).expect("Something went wrong here..."); }); } } pub fn perform_operation( - mango_client: &'static MangoClient, + mango_client: Arc, pk: Pubkey, bank: Bank, ) -> anyhow::Result<()> { @@ -43,7 +44,7 @@ pub fn perform_operation( sig ); } - Err(e) => error!("Crank: {:?}", e), + Err(e) => log::error!("Crank: {:?}", e), } Ok(())