liquidator: Work on groups without openbook markets (#701)

This commit is contained in:
Christian Kamm 2023-09-01 15:05:43 +02:00 committed by GitHub
parent c07978fb68
commit aca2b2e679
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 17 deletions

View File

@ -190,8 +190,6 @@ async fn main() -> anyhow::Result<()> {
.map(|s3| s3.market.serum_program)
.unique()
.collect_vec();
// TODO: Currently the websocket source only supports a single serum program address!
assert_eq!(serum_programs.len(), 1);
//
// feed setup
@ -211,7 +209,7 @@ async fn main() -> anyhow::Result<()> {
websocket_source::start(
websocket_source::Config {
rpc_ws_url: ws_url.clone(),
serum_program: *serum_programs.first().unwrap(),
serum_programs,
open_orders_authority: mango_group,
},
mango_oracles.clone(),

View File

@ -43,10 +43,6 @@ struct Cli {
#[clap(short, long, env)]
rpc_url: String,
// TODO: different serum markets could use different serum programs, should come from registered markets
#[clap(long, env)]
serum_program: Pubkey,
#[clap(long, env)]
settler_mango_account: Pubkey,
@ -128,6 +124,13 @@ async fn main() -> anyhow::Result<()> {
.unique()
.collect::<Vec<Pubkey>>();
let serum_programs = group_context
.serum3_markets
.values()
.map(|s3| s3.market.serum_program)
.unique()
.collect_vec();
//
// feed setup
//
@ -147,7 +150,7 @@ async fn main() -> anyhow::Result<()> {
websocket_source::start(
websocket_source::Config {
rpc_ws_url: ws_url.clone(),
serum_program: cli.serum_program,
serum_programs,
open_orders_authority: mango_group,
},
mango_oracles.clone(),

View File

@ -20,7 +20,7 @@ use crate::AnyhowWrap;
pub struct Config {
pub rpc_ws_url: String,
pub serum_program: Pubkey,
pub serum_programs: Vec<Pubkey>,
pub open_orders_authority: Pubkey,
}
@ -67,6 +67,7 @@ async fn feed_data(
Some(all_accounts_config.clone()),
)
.map_err_anyhow()?;
let mut mango_oracles_sub_map = StreamMap::new();
for oracle in mango_oracles.into_iter() {
mango_oracles_sub_map.insert(
@ -84,12 +85,20 @@ async fn feed_data(
.map_err_anyhow()?,
);
}
let mut open_orders_sub = client
.program_subscribe(
config.serum_program.to_string(),
Some(open_orders_accounts_config.clone()),
)
.map_err_anyhow()?;
let mut serum3_oo_sub_map = StreamMap::new();
for serum_program in config.serum_programs.iter() {
serum3_oo_sub_map.insert(
*serum_program,
client
.program_subscribe(
serum_program.to_string(),
Some(open_orders_accounts_config.clone()),
)
.map_err_anyhow()?,
);
}
let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?;
loop {
@ -113,9 +122,10 @@ async fn feed_data(
return Ok(());
}
},
message = open_orders_sub.next() => {
message = serum3_oo_sub_map.next() => {
if let Some(data) = message {
let response = data.map_err_anyhow()?;
info!("got serum message");
let response = data.1.map_err_anyhow()?;
sender.send(Message::Account(AccountUpdate::from_rpc(response)?)).await.expect("sending must succeed");
} else {
warn!("serum stream closed");