diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index 90ec8b3b0..1db2e8e86 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -190,8 +190,6 @@ async fn main() -> anyhow::Result<()> { .map(|s3| s3.market.serum_program) .unique() .collect_vec(); - // TODO: Currently the websocket source only supports a single serum program address! - assert_eq!(serum_programs.len(), 1); // // feed setup @@ -211,7 +209,7 @@ async fn main() -> anyhow::Result<()> { websocket_source::start( websocket_source::Config { rpc_ws_url: ws_url.clone(), - serum_program: *serum_programs.first().unwrap(), + serum_programs, open_orders_authority: mango_group, }, mango_oracles.clone(), diff --git a/bin/settler/src/main.rs b/bin/settler/src/main.rs index c41af0744..7671e4223 100644 --- a/bin/settler/src/main.rs +++ b/bin/settler/src/main.rs @@ -43,10 +43,6 @@ struct Cli { #[clap(short, long, env)] rpc_url: String, - // TODO: different serum markets could use different serum programs, should come from registered markets - #[clap(long, env)] - serum_program: Pubkey, - #[clap(long, env)] settler_mango_account: Pubkey, @@ -128,6 +124,13 @@ async fn main() -> anyhow::Result<()> { .unique() .collect::>(); + let serum_programs = group_context + .serum3_markets + .values() + .map(|s3| s3.market.serum_program) + .unique() + .collect_vec(); + // // feed setup // @@ -147,7 +150,7 @@ async fn main() -> anyhow::Result<()> { websocket_source::start( websocket_source::Config { rpc_ws_url: ws_url.clone(), - serum_program: cli.serum_program, + serum_programs, open_orders_authority: mango_group, }, mango_oracles.clone(), diff --git a/lib/client/src/websocket_source.rs b/lib/client/src/websocket_source.rs index 61c34ad60..426ccf427 100644 --- a/lib/client/src/websocket_source.rs +++ b/lib/client/src/websocket_source.rs @@ -20,7 +20,7 @@ use crate::AnyhowWrap; pub struct Config { pub rpc_ws_url: String, - pub serum_program: Pubkey, + pub serum_programs: Vec, pub open_orders_authority: Pubkey, } @@ -67,6 +67,7 @@ async fn feed_data( Some(all_accounts_config.clone()), ) .map_err_anyhow()?; + let mut mango_oracles_sub_map = StreamMap::new(); for oracle in mango_oracles.into_iter() { mango_oracles_sub_map.insert( @@ -84,12 +85,20 @@ async fn feed_data( .map_err_anyhow()?, ); } - let mut open_orders_sub = client - .program_subscribe( - config.serum_program.to_string(), - Some(open_orders_accounts_config.clone()), - ) - .map_err_anyhow()?; + + let mut serum3_oo_sub_map = StreamMap::new(); + for serum_program in config.serum_programs.iter() { + serum3_oo_sub_map.insert( + *serum_program, + client + .program_subscribe( + serum_program.to_string(), + Some(open_orders_accounts_config.clone()), + ) + .map_err_anyhow()?, + ); + } + let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?; loop { @@ -113,9 +122,10 @@ async fn feed_data( return Ok(()); } }, - message = open_orders_sub.next() => { + message = serum3_oo_sub_map.next() => { if let Some(data) = message { - let response = data.map_err_anyhow()?; + info!("got serum message"); + let response = data.1.map_err_anyhow()?; sender.send(Message::Account(AccountUpdate::from_rpc(response)?)).await.expect("sending must succeed"); } else { warn!("serum stream closed");