create crank skeleton

This commit is contained in:
Maximilian Schneider 2023-02-03 16:15:58 +09:00
parent 531a2c3d11
commit 5e78f008f6
9 changed files with 546 additions and 13 deletions

55
Cargo.lock generated
View File

@ -5136,6 +5136,61 @@ dependencies = [
"without-alloc",
]
[[package]]
name = "serum_dex"
version = "0.5.10"
source = "git+https://github.com/openbook-dex/program#c85e56deeaead43abbc33b7301058838b9c5136d"
dependencies = [
"anchor-lang",
"arrayref",
"bincode",
"bytemuck",
"byteorder",
"default-env",
"enumflags2",
"field-offset",
"itertools 0.10.5",
"num-traits",
"num_enum",
"safe-transmute",
"serde",
"solana-program",
"solana-security-txt",
"spl-token",
"static_assertions",
"thiserror",
"without-alloc",
]
[[package]]
name = "service-mango-crank"
version = "0.1.0"
dependencies = [
"anchor-client",
"anchor-lang",
"anyhow",
"async-channel",
"async-trait",
"bs58 0.3.1",
"bytemuck",
"client",
"futures-channel",
"futures-util",
"log 0.3.9",
"mango-v4",
"serde",
"serde_derive",
"serde_json",
"serum_dex 0.5.10 (git+https://github.com/openbook-dex/program)",
"solana-geyser-connector-lib",
"solana-logger",
"solana-sdk",
"tokio",
"tokio-tungstenite",
"toml",
"ws",
]
[[package]]
name = "service-mango-fills"
version = "0.1.0"

View File

@ -1,6 +1,7 @@
[workspace]
members = [
"lib",
"service-mango-crank",
"service-mango-fills",
"service-mango-pnl",
"service-mango-orderbook",

View File

@ -1,9 +1,8 @@
use crate::{
chain_data::{AccountData, ChainData, SlotData},
metrics::{MetricType, Metrics},
AccountWrite, SlotUpdate,
AccountWrite, SlotUpdate, serum::SerumEventQueueHeader,
};
use bytemuck::{Pod, Zeroable};
use log::*;
use serde::{ser::SerializeStruct, Serialize, Serializer};
use serum_dex::state::EventView;
@ -50,17 +49,6 @@ pub struct SerumFillUpdate {
pub write_version: u64,
}
#[derive(Copy, Clone, Debug)]
#[repr(packed)]
pub struct SerumEventQueueHeader {
_account_flags: u64, // Initialized, EventQueue
_head: u64,
count: u64,
seq_num: u64,
}
unsafe impl Zeroable for SerumEventQueueHeader {}
unsafe impl Pod for SerumEventQueueHeader {}
impl Serialize for FillUpdate {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where

View File

@ -6,6 +6,7 @@ pub mod memory_target;
pub mod metrics;
pub mod postgres_target;
pub mod postgres_types_numeric;
pub mod serum;
pub mod websocket_source;
pub use chain_data::SlotStatus;

12
lib/src/serum.rs Normal file
View File

@ -0,0 +1,12 @@
use bytemuck::{Pod, Zeroable};
#[derive(Copy, Clone, Debug)]
#[repr(packed)]
pub struct SerumEventQueueHeader {
pub _account_flags: u64, // Initialized, EventQueue
pub _head: u64,
pub count: u64,
pub seq_num: u64,
}
unsafe impl Zeroable for SerumEventQueueHeader {}
unsafe impl Pod for SerumEventQueueHeader {}

View File

@ -0,0 +1,32 @@
[package]
name = "service-mango-crank"
version = "0.1.0"
authors = ["Maximilian Schneider <max@mango.markets>"]
edition = "2018"
[dependencies]
solana-geyser-connector-lib = { path = "../lib" }
solana-logger = "1"
solana-sdk = "1"
bs58 = "*"
log = "*"
anyhow = "*"
toml = "*"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
futures-channel = "0.3"
futures-util = "0.3"
ws = "^0.9.2"
async-channel = "1.6"
async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"
bytemuck = "1.7.2"
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
serum_dex = { git = "https://github.com/openbook-dex/program" }
anchor-lang = "0.25.0"
anchor-client = "0.25.0"

View File

@ -0,0 +1,166 @@
use crate::Pubkey;
use solana_geyser_connector_lib::{AccountWrite, metrics::Metrics, SlotUpdate, chain_data::{ChainData, AccountData, SlotData}, serum::SerumEventQueueHeader};
use solana_sdk::{account::{WritableAccount, ReadableAccount}, stake_history::Epoch};
use std::{borrow::BorrowMut, collections::{HashMap, HashSet}};
use log::*;
use anchor_lang::AccountDeserialize;
pub enum EventQueueFilterMessage {}
pub async fn init(
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
async_channel::Receiver<EventQueueFilterMessage>,
)> {
let metrics_sender = metrics_sender.clone();
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<AccountWrite>();
// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
// Event queue updates can be consumed by client connections
let (filter_update_sender, filter_update_receiver) =
async_channel::unbounded::<EventQueueFilterMessage>();
let mut chain_cache = ChainData::new();
let mut perp_events_cache = HashMap::<String, [mango_v4::state::AnyEvent; mango_v4::state::MAX_NUM_EVENTS as usize]>::new();
let mut serum_events_cache = HashMap::<String, Vec<serum_dex::state::Event>>::new();
let mut seq_num_cache = HashMap::<String, u64>::new();
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
let relevant_pubkeys = all_queue_pks
.iter()
.map(|m| m.1)
.collect::<HashSet<Pubkey>>();
// update handling thread, reads both sloths and account updates
tokio::spawn(async move {
loop {
tokio::select! {
Ok(account_write) = account_write_queue_receiver.recv() => {
if !relevant_pubkeys.contains(&account_write.pubkey) {
continue;
}
chain_cache.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
Ok(slot_update) = slot_queue_receiver.recv() => {
chain_cache.update_slot(SlotData {
slot: slot_update.slot,
parent: slot_update.parent,
status: slot_update.status,
chain: 0,
});
}
}
for mkt in all_queue_pks.iter() {
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
let mkt_pk = mkt.1;
match chain_cache.account(&mkt_pk) {
Ok(account_info) => {
// only process if the account state changed
let evq_version = (account_info.slot, account_info.write_version);
let evq_pk_string = mkt.1.to_string();
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
if evq_version == *last_evq_version {
continue;
}
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
let account = &account_info.account;
let is_perp = mango_v4::check_id(account.owner());
if is_perp {
let event_queue =
mango_v4::state::EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
trace!(
"evq {} seq_num {}",
evq_pk_string,
event_queue.header.seq_num
);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
Some(old_events) => {
},
_ => {
info!("perp_events_cache could not find {}", evq_pk_string)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
perp_events_cache
.insert(evq_pk_string.clone(), event_queue.buf.clone());
} else {
let inner_data = &account.data()[5..&account.data().len() - 7];
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
let header: SerumEventQueueHeader =
*bytemuck::from_bytes(&inner_data[..header_span]);
let seq_num = header.seq_num;
let count = header.count;
let rest = &inner_data[header_span..];
let slop = rest.len() % std::mem::size_of::<serum_dex::state::Event>();
let new_len = rest.len() - slop;
let events = &rest[..new_len];
debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::<serum_dex::state::Event>());
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) {
Some(old_events) => {
},
_ => {
info!("serum_events_cache could not find {}", evq_pk_string)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
serum_events_cache
.insert(evq_pk_string.clone(), events.clone().to_vec());
}
}
Err(_) => info!("chain_cache could not find {}", mkt.1),
}
}
}
});
Ok((
account_write_queue_sender,
slot_queue_sender,
filter_update_receiver,
))
}

View File

@ -0,0 +1,225 @@
mod event_queue_filter;
use anchor_client::{
solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair},
Cluster,
};
use anchor_lang::prelude::Pubkey;
use bytemuck::cast_slice;
use client::{Client, MangoGroupContext};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{
future::{self, Ready},
pin_mut, SinkExt, StreamExt, TryStreamExt,
};
use log::*;
use std::{
collections::{HashMap, HashSet},
convert::identity,
fs::File,
io::Read,
net::SocketAddr,
str::FromStr,
sync::Arc,
sync::Mutex,
time::Duration,
};
use tokio::{
net::{TcpListener, TcpStream},
pin, time,
};
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::Deserialize;
use solana_geyser_connector_lib::{
fill_event_filter::SerumFillCheckpoint,
metrics::{MetricType, MetricU64},
FilterConfig, StatusResponse,
};
use solana_geyser_connector_lib::{
fill_event_filter::{self, FillCheckpoint},
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
};
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
type SerumCheckpointMap = Arc<Mutex<HashMap<String, SerumFillCheckpoint>>>;
#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "command")]
pub enum Command {
#[serde(rename = "subscribe")]
Subscribe(SubscribeCommand),
#[serde(rename = "unsubscribe")]
Unsubscribe(UnsubscribeCommand),
#[serde(rename = "getMarkets")]
GetMarkets,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeCommand {
pub market_id: String,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UnsubscribeCommand {
pub market_id: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub source: SourceConfig,
pub metrics: MetricsConfig,
pub bind_ws_addr: String,
pub rpc_http_url: String,
pub mango_group: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
eprintln!("Please enter a config file path argument.");
return Ok(());
}
let config: Config = {
let mut file = File::open(&args[1])?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
toml::from_str(&contents).unwrap()
};
solana_logger::setup_with_default("info");
let metrics_tx = metrics::start(config.metrics, "fills".into());
let metrics_opened_connections =
metrics_tx.register_u64("fills_feed_opened_connections".into(), MetricType::Counter);
let metrics_closed_connections =
metrics_tx.register_u64("fills_feed_closed_connections".into(), MetricType::Counter);
let rpc_url = config.rpc_http_url;
let ws_url = rpc_url.replace("https", "wss");
let rpc_timeout = Duration::from_secs(10);
let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone());
let client = Client::new(
cluster.clone(),
CommitmentConfig::processed(),
&Keypair::new(),
Some(rpc_timeout),
);
let group_context = Arc::new(
MangoGroupContext::new_from_rpc(
&client.rpc_async(),
Pubkey::from_str(&config.mango_group).unwrap(),
)
.await?,
);
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
.perp_markets
.iter()
.map(|(_, context)| (context.address, context.market.event_queue))
.collect();
let serum_market_pks: Vec<Pubkey> = group_context
.serum3_markets
.iter()
.map(|(_, context)| context.market.serum_market_external)
.collect();
let serum_market_ais = client
.rpc_async()
.get_multiple_accounts(serum_market_pks.as_slice())
.await?;
let serum_market_ais: Vec<&Account> = serum_market_ais
.iter()
.filter_map(|maybe_ai| match maybe_ai {
Some(ai) => Some(ai),
None => None,
})
.collect();
let serum_queue_pks: Vec<(Pubkey, Pubkey)> = serum_market_ais
.iter()
.enumerate()
.map(|pair| {
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
);
(
serum_market_pks[pair.0],
Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])),
)
})
.collect();
let a: Vec<(String, String)> = group_context
.serum3_markets
.iter()
.map(|(_, context)| {
(
context.market.serum_market_external.to_string(),
context.market.name().to_owned(),
)
})
.collect();
let b: Vec<(String, String)> = group_context
.perp_markets
.iter()
.map(|(_, context)| {
(
context.address.to_string(),
context.market.name().to_owned(),
)
})
.collect();
let market_pubkey_strings: HashMap<String, String> = [a, b].concat().into_iter().collect();
let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(
perp_queue_pks.clone(),
serum_queue_pks.clone(),
metrics_tx.clone(),
)
.await?;
info!(
"rpc connect: {}",
config
.source
.grpc_sources
.iter()
.map(|c| c.connection_string.clone())
.collect::<String>()
);
let use_geyser = true;
let filter_config = FilterConfig {
program_ids: vec![
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
],
};
if use_geyser {
grpc_plugin_source::process_events(
&config.source,
&filter_config,
account_write_queue_sender,
slot_queue_sender,
metrics_tx.clone(),
)
.await;
} else {
websocket_source::process_events(
&config.source,
account_write_queue_sender,
slot_queue_sender,
)
.await;
}
Ok(())
}

View File

@ -0,0 +1,53 @@
bind_ws_addr = "0.0.0.0:2082"
[source]
dedup_queue_size = 50000
rpc_ws_url = ""
[[source.grpc_sources]]
name = "accountsdb-client"
connection_string = "http://mango.devnet.rpcpool.com:10001"
retry_connection_sleep_secs = 30
[source.grpc_sources.tls]
ca_cert_path = "ca-devnet.pem"
client_cert_path = "client-devnet.pem"
client_key_path = "client-devnet.pem"
domain_name = "mango-accountsdb.rpcpool.com"
[source.snapshot]
rpc_http_url = "http://mango.devnet.rpcpool.com/"
program_id = "4skJ85cdxQAFVKbcGgfun8iZPL7BadVYXG3kGEGkufqA"
[[markets]]
name = "MNGO-PERP"
event_queue = "uaUCSQejWYrDeYSuvn4As4kaCwJ2rLnRQSsSjY3ogZk"
[[markets]]
name = "ETH-PERP"
event_queue = "8WLv5fKLYkyZpFG74kRmp2RALHQFcNKmH7eJn8ebHC13"
[[markets]]
name = "SOL-PERP"
event_queue = "CZ5MCRvkN38d5pnZDDEEyMiED3drgDUVpEUjkuJq31Kf"
[[markets]]
name = "ADA-PERP"
event_queue = "5v5fz2cCSy2VvrgVf5Vu7PF23RiZjv6BL36bgg48bA1c"
[[markets]]
name = "FTT-PERP"
event_queue = "7rswj7FVZcMYUKxcTLndZhWBmuVNc2GuxqjuXU8KcPWv"
[[markets]]
name = "AVAX-PERP"
event_queue = "4b7NqjqWoQoQh9V3dubfjkLPQVNJijwAwr7D9q6vTqqd"
[[markets]]
name = "BNB-PERP"
event_queue = "96Y87LTz5Mops7wdT9EJo1eM79XToKYJJmRZxNatV85d"
[[markets]]
name = "MATIC-PERP"
event_queue = "77maU5zdfYayqhqjBi2ocosM4PXvPXxbps2Up7dxDsMR"