This commit is contained in:
Maximilian Schneider 2023-02-18 20:37:51 +09:00
parent ffe776b70d
commit a2f1259e94
9 changed files with 340 additions and 85 deletions

101
Cargo.lock generated
View File

@ -3040,6 +3040,66 @@ dependencies = [
"libc",
]
[[package]]
name = "mango"
version = "3.7.0"
source = "git+https://github.com/blockworks-foundation/mango-v3#c4d52dc7f08e9ba4ae13728d0a2f1c298c0c4a86"
dependencies = [
"anchor-lang",
"arrayref",
"bincode",
"bs58 0.4.0",
"bytemuck",
"enumflags2",
"fixed",
"fixed-macro",
"mango-common",
"mango-logs",
"mango-macro",
"num_enum",
"pyth-client",
"safe-transmute",
"serde",
"serum_dex 0.5.5",
"solana-program",
"spl-token",
"static_assertions",
"switchboard-program",
"switchboard-utils",
"thiserror",
]
[[package]]
name = "mango-common"
version = "3.0.0"
source = "git+https://github.com/blockworks-foundation/mango-v3#c4d52dc7f08e9ba4ae13728d0a2f1c298c0c4a86"
dependencies = [
"bytemuck",
"solana-program",
]
[[package]]
name = "mango-logs"
version = "0.1.0"
source = "git+https://github.com/blockworks-foundation/mango-v3#c4d52dc7f08e9ba4ae13728d0a2f1c298c0c4a86"
dependencies = [
"anchor-lang",
"base64 0.13.1",
]
[[package]]
name = "mango-macro"
version = "3.0.0"
source = "git+https://github.com/blockworks-foundation/mango-v3#c4d52dc7f08e9ba4ae13728d0a2f1c298c0c4a86"
dependencies = [
"bytemuck",
"mango-common",
"quote 1.0.23",
"safe-transmute",
"solana-program",
"syn 1.0.107",
]
[[package]]
name = "mango-v4"
version = "0.6.0"
@ -4171,6 +4231,22 @@ dependencies = [
"syn 1.0.107",
]
[[package]]
name = "pyth-client"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "398f9e51e126a13903254c56f75b3201583db075d0fbb77e931f7515af9b3d16"
dependencies = [
"borsh",
"borsh-derive",
"bytemuck",
"num-derive",
"num-traits",
"serde",
"solana-program",
"thiserror",
]
[[package]]
name = "pyth-sdk"
version = "0.1.0"
@ -5113,6 +5189,29 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "serum_dex"
version = "0.5.5"
source = "git+https://github.com/blockworks-foundation/serum-dex.git?rev=7f55a5ef5f7937b74381a3124021a261cd7d7283#7f55a5ef5f7937b74381a3124021a261cd7d7283"
dependencies = [
"arrayref",
"bincode",
"bytemuck",
"byteorder",
"enumflags2",
"field-offset",
"itertools 0.9.0",
"num-traits",
"num_enum",
"safe-transmute",
"serde",
"solana-program",
"spl-token",
"static_assertions",
"thiserror",
"without-alloc",
]
[[package]]
name = "serum_dex"
version = "0.5.6"
@ -5219,6 +5318,7 @@ dependencies = [
"anchor-client",
"anchor-lang",
"anyhow",
"arrayref",
"async-channel",
"async-trait",
"bs58 0.3.1",
@ -5227,6 +5327,7 @@ dependencies = [
"futures-channel",
"futures-util",
"log 0.3.9",
"mango",
"mango-v4",
"serde",
"serde_derive",

View File

@ -6,7 +6,6 @@ use crate::{
use anchor_lang::prelude::Pubkey;
use async_trait::async_trait;
use log::*;
use solana_sdk::{account::WritableAccount, stake_history::Epoch};
use std::{
collections::{BTreeSet, HashMap},
@ -119,7 +118,7 @@ pub fn init(
},
);
}
Err(skip_reason) => {
Err(_skip_reason) => {
// todo: metrics
}
}

View File

@ -25,7 +25,9 @@ async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"
bytemuck = "1.7.2"
arrayref = "*"
mango-v3 = { package="mango", git = "https://github.com/blockworks-foundation/mango-v3" }
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
serum_dex = { git = "https://github.com/openbook-dex/program" }

View File

@ -34,12 +34,12 @@ pub async fn init(client: Arc<RpcClient>) -> Arc<RwLock<Hash>> {
.expect("fetch initial blockhash"),
));
// launch task
let join_hdl = {
// create a thread-local reference to blockhash
let blockhash_c = blockhash.clone();
spawn(async move { poll_loop(blockhash_c, client).await })
};
// create a thread-local reference to blockhash
let blockhash_c = blockhash.clone();
spawn(async move { poll_loop(blockhash_c, client).await });
return blockhash;
}

View File

@ -1,10 +1,10 @@
mod blockhash_poller;
mod mango_v3_perp_crank_sink;
mod mango_v4_perp_crank_sink;
mod openbook_crank_sink;
mod transaction_builder;
mod transaction_sender;
use anchor_client::{
solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair},
Cluster,
@ -29,6 +29,37 @@ pub struct Config {
pub rpc_http_url: String,
pub mango_group: String,
pub keypair: Vec<u8>,
pub v3_markets: Vec<(String, String)>,
pub v3_group: (String, String, String),
}
async fn fetch_openbook_evq_pks(
rpc_client: Arc<RpcClient>,
serum_market_pks: &Vec<Pubkey>,
) -> anyhow::Result<Vec<(Pubkey, Pubkey)>> {
let serum_market_ais = rpc_client
.get_multiple_accounts(serum_market_pks.as_slice())
.await?;
let serum_market_ais: Vec<_> = serum_market_ais
.iter()
.filter_map(|maybe_ai| match maybe_ai {
Some(ai) => Some(ai),
None => None,
})
.collect();
Ok(serum_market_ais
.iter()
.enumerate()
.map(|pair| {
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
);
let event_q = market_state.event_q;
(serum_market_pks[pair.0], Pubkey::new(bytes_of(&event_q)))
})
.collect())
}
#[tokio::main]
@ -65,11 +96,11 @@ async fn main() -> anyhow::Result<()> {
Some(rpc_timeout),
0,
);
let group_pk = Pubkey::from_str(&config.mango_group).unwrap();
let v4_group_pk = Pubkey::from_str(&config.mango_group).unwrap();
let group_context =
Arc::new(MangoGroupContext::new_from_rpc(&client.rpc_async(), group_pk).await?);
Arc::new(MangoGroupContext::new_from_rpc(&client.rpc_async(), v4_group_pk).await?);
let perp_queue_pks: Vec<_> = group_context
let v4_perp_market_pks: Vec<_> = group_context
.perp_markets
.iter()
.map(|(_, context)| (context.address, context.market.event_queue))
@ -82,38 +113,26 @@ async fn main() -> anyhow::Result<()> {
.map(|(_, context)| context.market.serum_market_external)
.collect();
let serum_market_ais = client
.rpc_async()
.get_multiple_accounts(serum_market_pks.as_slice())
.await?;
let openbook_pks: Vec<_> =
fetch_openbook_evq_pks(rpc_client.clone(), &serum_market_pks).await.expect("fetch serum evq pks");
let serum_market_ais: Vec<_> = serum_market_ais
.iter()
.filter_map(|maybe_ai| match maybe_ai {
Some(ai) => Some(ai),
None => None,
})
.collect();
let serum_queue_pks: Vec<_> = serum_market_ais
.iter()
.enumerate()
.map(|pair| {
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
);
let event_q = market_state.event_q;
(serum_market_pks[pair.0], Pubkey::new(bytes_of(&event_q)))
})
.collect();
let v3_perp_market_pks: Vec<_> = config.v3_markets.iter().map(|c| {(Pubkey::from_str(&c.0).unwrap(),Pubkey::from_str(&c.1).unwrap(),)}).collect();
let v3_group_pks = (
Pubkey::from_str(&config.v3_group.0).unwrap(),
Pubkey::from_str(&config.v3_group.1).unwrap(),
Pubkey::from_str(&config.v3_group.2).unwrap(),
);
let (account_write_queue_sender, slot_queue_sender, instruction_receiver) =
transaction_builder::init(
perp_queue_pks.clone(),
serum_queue_pks.clone(),
group_pk,
v4_perp_market_pks.clone(),
v3_perp_market_pks,
openbook_pks.clone(),
v4_group_pk,
v3_group_pks,
metrics_tx.clone(),
)
.await
.expect("init transaction builder");
transaction_sender::init(
@ -133,9 +152,9 @@ async fn main() -> anyhow::Result<()> {
.collect::<String>()
);
let use_geyser = true;
let all_queue_pks: HashSet<Pubkey> = perp_queue_pks
let all_queue_pks: HashSet<Pubkey> = v4_perp_market_pks
.iter()
.chain(serum_queue_pks.iter())
.chain(openbook_pks.iter())
.map(|mkt| mkt.1)
.collect();

View File

@ -0,0 +1,138 @@
use std::{
cell::RefCell,
collections::{BTreeMap},
convert::TryFrom,
mem::size_of,
};
use arrayref::array_ref;
use async_channel::Sender;
use async_trait::async_trait;
use log::*;
use mango_v3::{
instruction::consume_events,
queue::{AnyEvent, EventQueueHeader, EventType, FillEvent, OutEvent, Queue},
};
use solana_geyser_connector_lib::{
account_write_filter::AccountWriteSink, chain_data::AccountData,
};
use solana_sdk::{
account::ReadableAccount,
instruction::{Instruction},
pubkey::Pubkey,
};
use bytemuck::cast_ref;
const MAX_BACKLOG: usize = 2;
const MAX_EVENTS_PER_TX: usize = 10;
pub struct MangoV3PerpCrankSink {
pks: BTreeMap<Pubkey, Pubkey>,
group_pk: Pubkey,
cache_pk: Pubkey,
mango_v3_program: Pubkey,
instruction_sender: Sender<Vec<Instruction>>,
}
impl MangoV3PerpCrankSink {
pub fn new(
pks: Vec<(Pubkey, Pubkey)>,
group_pk: Pubkey,
cache_pk: Pubkey,
mango_v3_program: Pubkey,
instruction_sender: Sender<Vec<Instruction>>,
) -> Self {
Self {
pks: pks.iter().map(|e| e.clone()).collect(),
group_pk,
cache_pk,
mango_v3_program,
instruction_sender,
}
}
}
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
const EVENT_SIZE: usize = 200; //size_of::<AnyEvent>();
const QUEUE_LEN: usize = 256;
type EventQueueEvents = [AnyEvent; QUEUE_LEN];
#[async_trait]
impl AccountWriteSink for MangoV3PerpCrankSink {
async fn process(&self, pk: &Pubkey, account: &AccountData) -> Result<(), String> {
let account = &account.account;
let ix: Result<Instruction, String> = {
const HEADER_SIZE: usize = size_of::<EventQueueHeader>();
let header_data = array_ref![account.data(), 0, HEADER_SIZE];
let header = RefCell::<EventQueueHeader>::new(*bytemuck::from_bytes(header_data));
// trace!("evq {} seq_num {}", mkt.name, header.seq_num);
const QUEUE_SIZE: usize = EVENT_SIZE * QUEUE_LEN;
let events_data = array_ref![account.data(), HEADER_SIZE, QUEUE_SIZE];
let events = RefCell::<EventQueueEvents>::new(*bytemuck::from_bytes(events_data));
let event_queue = Queue {
header: header.borrow_mut(),
buf: events.borrow_mut(),
};
// only crank if at least 1 fill or a sufficient events of other categories are buffered
let contains_fill_events = event_queue
.iter()
.find(|e| e.event_type == EventType::Fill as u8)
.is_some();
let has_backlog = event_queue.iter().count() > MAX_BACKLOG;
if !contains_fill_events && !has_backlog {
return Err("throttled".into());
}
let mut mango_accounts: Vec<_> = event_queue
.iter()
.take(MAX_EVENTS_PER_TX)
.flat_map(
|e| match EventType::try_from(e.event_type).expect("mango v4 event") {
EventType::Fill => {
let fill: &FillEvent = cast_ref(e);
vec![fill.maker, fill.taker]
}
EventType::Out => {
let out: &OutEvent = cast_ref(e);
vec![out.owner]
}
EventType::Liquidate => vec![],
},
)
.collect();
let mkt_pk = self
.pks
.get(pk)
.expect(&format!("{pk:?} is a known public key"));
let ix = consume_events(
&self.mango_v3_program,
&self.group_pk,
&self.cache_pk,
mkt_pk,
pk,
&mut mango_accounts,
MAX_EVENTS_PER_TX,
)
.unwrap();
Ok(ix)
};
// info!(
// "evq={pk:?} count={} limit=10",
// event_queue.iter().count()
// );
if let Err(e) = self.instruction_sender.send(vec![ix?]).await {
return Err(e.to_string());
}
Ok(())
}
}

View File

@ -7,6 +7,7 @@ use std::{
use async_channel::Sender;
use async_trait::async_trait;
use log::*;
use mango_v4::state::{EventQueue, EventType, FillEvent, OutEvent};
use solana_geyser_connector_lib::{
account_write_filter::AccountWriteSink, chain_data::AccountData,
};
@ -17,7 +18,6 @@ use solana_sdk::{
};
use bytemuck::cast_ref;
use mango_v4::state::FillEvent;
use anchor_lang::AccountDeserialize;
@ -43,13 +43,12 @@ impl MangoV4PerpCrankSink {
impl AccountWriteSink for MangoV4PerpCrankSink {
async fn process(&self, pk: &Pubkey, account: &AccountData) -> Result<(), String> {
let account = &account.account;
let event_queue: mango_v4::state::EventQueue =
mango_v4::state::EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
let event_queue: EventQueue = EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
// only crank if at least 1 fill or a sufficient events of other categories are buffered
let contains_fill_events = event_queue
.iter()
.find(|e| e.event_type == mango_v4::state::EventType::Fill as u8)
.find(|e| e.event_type == EventType::Fill as u8)
.is_some();
let has_backlog = event_queue.iter().count() > MAX_BACKLOG;
if !contains_fill_events && !has_backlog {
@ -60,16 +59,16 @@ impl AccountWriteSink for MangoV4PerpCrankSink {
.iter()
.take(10)
.flat_map(|e| {
match mango_v4::state::EventType::try_from(e.event_type).expect("mango v4 event") {
mango_v4::state::EventType::Fill => {
let fill: &mango_v4::state::FillEvent = cast_ref(e);
match EventType::try_from(e.event_type).expect("mango v4 event") {
EventType::Fill => {
let fill: &FillEvent = cast_ref(e);
vec![fill.maker, fill.taker]
}
mango_v4::state::EventType::Out => {
let out: &mango_v4::state::OutEvent = cast_ref(e);
EventType::Out => {
let out: &OutEvent = cast_ref(e);
vec![out.owner]
}
mango_v4::state::EventType::Liquidate => vec![],
EventType::Liquidate => vec![],
}
})
.collect();

View File

@ -1,40 +1,23 @@
use bytemuck::cast_ref;
use mango_v4::state::FillEvent;
use serum_dex::{instruction::MarketInstruction, state::EventView};
use crate::{
mango_v3_perp_crank_sink::MangoV3PerpCrankSink, mango_v4_perp_crank_sink::MangoV4PerpCrankSink,
openbook_crank_sink::OpenbookCrankSink,
};
use solana_geyser_connector_lib::{
account_write_filter::{self, AccountWriteRoute},
chain_data::{AccountData, ChainData, SlotData},
metrics::Metrics,
serum::SerumEventQueueHeader,
AccountWrite, SlotUpdate,
};
use solana_sdk::{instruction::Instruction, pubkey::Pubkey};
use std::{sync::Arc, time::Duration};
use anchor_lang::AccountDeserialize;
use log::*;
use solana_sdk::{
account::{ReadableAccount, WritableAccount},
instruction::{AccountMeta, Instruction},
pubkey::Pubkey,
stake_history::Epoch,
};
use std::{
borrow::BorrowMut,
collections::{BTreeSet, HashMap, HashSet},
convert::TryFrom,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
use crate::{openbook_crank_sink::OpenbookCrankSink, mango_v4_perp_crank_sink::MangoV4PerpCrankSink};
const MAX_BACKLOG: usize = 2;
const TIMEOUT_INTERVAL: Duration = Duration::from_millis(400);
pub fn init(
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
group_pk: Pubkey,
pub async fn init(
v4_perp_market_pks: Vec<(Pubkey, Pubkey)>,
v3_perp_market_pks: Vec<(Pubkey, Pubkey)>,
openbook_pks: Vec<(Pubkey, Pubkey)>,
v4_group_pk: Pubkey,
v3_group_pks: (Pubkey, Pubkey, Pubkey),
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
@ -46,24 +29,38 @@ pub fn init(
let routes = vec![
AccountWriteRoute {
matched_pubkeys: serum_queue_pks
matched_pubkeys: openbook_pks
.iter()
.map(|(_, evq_pk)| evq_pk.clone())
.collect(),
sink: Arc::new(OpenbookCrankSink::new(
serum_queue_pks,
openbook_pks,
instruction_sender.clone(),
)),
timeout_interval: TIMEOUT_INTERVAL,
},
AccountWriteRoute {
matched_pubkeys: v3_perp_market_pks
.iter()
.map(|(_, evq_pk)| evq_pk.clone())
.collect(),
sink: Arc::new(MangoV3PerpCrankSink::new(
v3_perp_market_pks,
v3_group_pks.0,
v3_group_pks.1,
v3_group_pks.2,
instruction_sender.clone(),
)),
timeout_interval: Duration::default(),
},
AccountWriteRoute {
matched_pubkeys: perp_queue_pks
matched_pubkeys: v4_perp_market_pks
.iter()
.map(|(_, evq_pk)| evq_pk.clone())
.collect(),
sink: Arc::new(MangoV4PerpCrankSink::new(
perp_queue_pks,
group_pk,
v4_perp_market_pks,
v4_group_pk,
instruction_sender.clone(),
)),
timeout_interval: Duration::default(),

View File

@ -28,8 +28,8 @@ pub async fn send_loop(
*blockhash.read().unwrap(),
);
// TODO: collect metrics
info!("send tx={:?}", tx.signatures[0]);
client.send_transaction_with_config(&tx, cfg).await;
info!("send tx={:?} ok={:?}", tx.signatures[0],
client.send_transaction_with_config(&tx, cfg).await);
}
}
}