From ffe776b70d0c33210f82494d7a8623d3356b02cc Mon Sep 17 00:00:00 2001 From: Maximilian Schneider Date: Tue, 7 Feb 2023 17:22:21 +0900 Subject: [PATCH] AccountWriteFilter generic router to process account writes for different modules in the same process --- lib/src/account_write_filter.rs | 137 +++++++++ lib/src/lib.rs | 2 + service-mango-crank/src/main.rs | 3 + .../src/mango_v4_perp_crank_sink.rs | 115 +++++++ .../src/openbook_crank_sink.rs | 110 +++++++ .../src/transaction_builder.rs | 289 ++---------------- 6 files changed, 398 insertions(+), 258 deletions(-) create mode 100644 lib/src/account_write_filter.rs create mode 100644 service-mango-crank/src/mango_v4_perp_crank_sink.rs create mode 100644 service-mango-crank/src/openbook_crank_sink.rs diff --git a/lib/src/account_write_filter.rs b/lib/src/account_write_filter.rs new file mode 100644 index 0000000..13efaf9 --- /dev/null +++ b/lib/src/account_write_filter.rs @@ -0,0 +1,137 @@ +use crate::{ + chain_data::{AccountData, ChainData, SlotData}, + metrics::Metrics, + AccountWrite, SlotUpdate, +}; + +use anchor_lang::prelude::Pubkey; +use async_trait::async_trait; +use log::*; +use solana_sdk::{account::WritableAccount, stake_history::Epoch}; +use std::{ + collections::{BTreeSet, HashMap}, + sync::Arc, + time::{Duration, Instant}, +}; + +#[async_trait] +pub trait AccountWriteSink { + async fn process(&self, pubkey: &Pubkey, account: &AccountData) -> Result<(), String>; +} + +#[derive(Clone)] +pub struct AccountWriteRoute { + pub matched_pubkeys: Vec, + pub sink: Arc, + pub timeout_interval: Duration, +} + +#[derive(Clone, Debug)] +struct AcountWriteRecord { + slot: u64, + write_version: u64, + timestamp: Instant, +} + +pub fn init( + routes: Vec, + metrics_sender: Metrics, +) -> anyhow::Result<( + async_channel::Sender, + async_channel::Sender, +)> { + // The actual message may want to also contain a retry count, if it self-reinserts on failure? + let (account_write_queue_sender, account_write_queue_receiver) = + async_channel::unbounded::(); + + // Slot updates flowing from the outside into the single processing thread. From + // there they'll flow into the postgres sending thread. + let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); + + let mut chain_data = ChainData::new(metrics_sender); + let mut last_updated = HashMap::::new(); + + let all_queue_pks: BTreeSet = routes + .iter() + .flat_map(|r| r.matched_pubkeys.iter()) + .map(|pk| pk.clone()) + .collect(); + + // update handling thread, reads both sloths and account updates + tokio::spawn(async move { + loop { + tokio::select! { + Ok(account_write) = account_write_queue_receiver.recv() => { + if all_queue_pks.contains(&account_write.pubkey) { + continue; + } + + chain_data.update_account( + account_write.pubkey, + AccountData { + slot: account_write.slot, + write_version: account_write.write_version, + account: WritableAccount::create( + account_write.lamports, + account_write.data.clone(), + account_write.owner, + account_write.executable, + account_write.rent_epoch as Epoch, + ), + }, + ); + } + Ok(slot_update) = slot_queue_receiver.recv() => { + chain_data.update_slot(SlotData { + slot: slot_update.slot, + parent: slot_update.parent, + status: slot_update.status, + chain: 0, + }); + + } + } + + for route in routes.iter() { + for pk in route.matched_pubkeys.iter() { + match chain_data.account(&pk) { + Ok(account_info) => { + let pk_b58 = pk.to_string(); + if let Some(record) = last_updated.get(&pk_b58) { + let is_unchanged = account_info.slot == record.slot + && account_info.write_version == record.write_version; + let is_throttled = + record.timestamp.elapsed() < route.timeout_interval; + if is_unchanged || is_throttled { + continue; + } + }; + + match route.sink.process(pk, account_info).await { + Ok(()) => { + // todo: metrics + last_updated.insert( + pk_b58.clone(), + AcountWriteRecord { + slot: account_info.slot, + write_version: account_info.write_version, + timestamp: Instant::now(), + }, + ); + } + Err(skip_reason) => { + // todo: metrics + } + } + } + Err(_) => { + // todo: metrics + } + } + } + } + } + }); + + Ok((account_write_queue_sender, slot_queue_sender)) +} diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 6ce9b54..872f115 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,3 +1,5 @@ + +pub mod account_write_filter; pub mod chain_data; pub mod fill_event_filter; pub mod fill_event_postgres_target; diff --git a/service-mango-crank/src/main.rs b/service-mango-crank/src/main.rs index eb7eab4..2d547d4 100644 --- a/service-mango-crank/src/main.rs +++ b/service-mango-crank/src/main.rs @@ -1,7 +1,10 @@ mod blockhash_poller; +mod mango_v4_perp_crank_sink; +mod openbook_crank_sink; mod transaction_builder; mod transaction_sender; + use anchor_client::{ solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair}, Cluster, diff --git a/service-mango-crank/src/mango_v4_perp_crank_sink.rs b/service-mango-crank/src/mango_v4_perp_crank_sink.rs new file mode 100644 index 0000000..85b700f --- /dev/null +++ b/service-mango-crank/src/mango_v4_perp_crank_sink.rs @@ -0,0 +1,115 @@ +use std::{ + borrow::BorrowMut, + collections::{BTreeMap, BTreeSet}, + convert::TryFrom, +}; + +use async_channel::Sender; +use async_trait::async_trait; +use log::*; +use solana_geyser_connector_lib::{ + account_write_filter::AccountWriteSink, chain_data::AccountData, +}; +use solana_sdk::{ + account::ReadableAccount, + instruction::{AccountMeta, Instruction}, + pubkey::Pubkey, +}; + +use bytemuck::cast_ref; +use mango_v4::state::FillEvent; + +use anchor_lang::AccountDeserialize; + +const MAX_BACKLOG: usize = 2; + +pub struct MangoV4PerpCrankSink { + pks: BTreeMap, + group_pk: Pubkey, + instruction_sender: Sender>, +} + +impl MangoV4PerpCrankSink { + pub fn new(pks: Vec<(Pubkey, Pubkey)>, group_pk: Pubkey, instruction_sender: Sender>) -> Self { + Self { + pks: pks.iter().map(|e| e.clone()).collect(), + group_pk, + instruction_sender, + } + } +} + +#[async_trait] +impl AccountWriteSink for MangoV4PerpCrankSink { + async fn process(&self, pk: &Pubkey, account: &AccountData) -> Result<(), String> { + let account = &account.account; + let event_queue: mango_v4::state::EventQueue = + mango_v4::state::EventQueue::try_deserialize(account.data().borrow_mut()).unwrap(); + + // only crank if at least 1 fill or a sufficient events of other categories are buffered + let contains_fill_events = event_queue + .iter() + .find(|e| e.event_type == mango_v4::state::EventType::Fill as u8) + .is_some(); + let has_backlog = event_queue.iter().count() > MAX_BACKLOG; + if !contains_fill_events && !has_backlog { + return Err("throttled".into()) + } + + let mango_accounts: BTreeSet<_> = event_queue + .iter() + .take(10) + .flat_map(|e| { + match mango_v4::state::EventType::try_from(e.event_type).expect("mango v4 event") { + mango_v4::state::EventType::Fill => { + let fill: &mango_v4::state::FillEvent = cast_ref(e); + vec![fill.maker, fill.taker] + } + mango_v4::state::EventType::Out => { + let out: &mango_v4::state::OutEvent = cast_ref(e); + vec![out.owner] + } + mango_v4::state::EventType::Liquidate => vec![], + } + }) + .collect(); + + let mkt_pk = self + .pks + .get(pk) + .expect(&format!("{pk:?} is a known public key")); + let mut ams: Vec<_> = anchor_lang::ToAccountMetas::to_account_metas( + &mango_v4::accounts::PerpConsumeEvents { + group: self.group_pk, + perp_market: *mkt_pk, + event_queue: *pk, + }, + None, + ); + ams.append( + &mut mango_accounts + .iter() + .map(|pk| AccountMeta::new(*pk, false)) + .collect(), + ); + + let ix = Instruction { + program_id: mango_v4::id(), + accounts: ams, + data: anchor_lang::InstructionData::data(&mango_v4::instruction::PerpConsumeEvents { + limit: 10, + }), + }; + + info!( + "evq={pk:?} count={} limit=10", + event_queue.iter().count() + ); + + if let Err(e) = self.instruction_sender.send(vec![ix]).await { + return Err(e.to_string()); + } + + Ok(()) + } +} diff --git a/service-mango-crank/src/openbook_crank_sink.rs b/service-mango-crank/src/openbook_crank_sink.rs new file mode 100644 index 0000000..5cc9f47 --- /dev/null +++ b/service-mango-crank/src/openbook_crank_sink.rs @@ -0,0 +1,110 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + str::FromStr, +}; + +use async_channel::Sender; +use async_trait::async_trait; +use log::*; +use serum_dex::{instruction::MarketInstruction, state::EventView}; +use solana_geyser_connector_lib::{ + account_write_filter::AccountWriteSink, chain_data::AccountData, serum::SerumEventQueueHeader, +}; +use solana_sdk::{ + account::ReadableAccount, + instruction::{AccountMeta, Instruction}, + pubkey::Pubkey, +}; + +const MAX_BACKLOG: usize = 2; + +pub struct OpenbookCrankSink { + pks: BTreeMap, + instruction_sender: Sender>, +} + +impl OpenbookCrankSink { + pub fn new(pks: Vec<(Pubkey, Pubkey)>, instruction_sender: Sender>) -> Self { + Self { + pks: pks.iter().map(|e| e.clone()).collect(), + instruction_sender, + } + } +} + +#[async_trait] +impl AccountWriteSink for OpenbookCrankSink { + async fn process(&self, pk: &Pubkey, account: &AccountData) -> Result<(), String> { + let account = &account.account; + + let inner_data = &account.data()[5..&account.data().len() - 7]; + let header_span = std::mem::size_of::(); + let header: SerumEventQueueHeader = *bytemuck::from_bytes(&inner_data[..header_span]); + let count = header.count; + + let rest = &inner_data[header_span..]; + let event_size = std::mem::size_of::(); + let slop = rest.len() % event_size; + let end = rest.len() - slop; + let events = bytemuck::cast_slice::(&rest[..end]); + let seq_num = header.seq_num; + + let events: Vec<_> = (0..count) + .map(|i| { + let offset = (seq_num - count + i) % events.len() as u64; + let event: serum_dex::state::Event = events[offset as usize]; + event.as_view().unwrap() + }) + .collect(); + + // only crank if at least 1 fill or a sufficient events of other categories are buffered + let contains_fill_events = events + .iter() + .find(|e| matches!(e, serum_dex::state::EventView::Fill { .. })) + .is_some(); + + let has_backlog = events.len() > MAX_BACKLOG; + if !contains_fill_events && !has_backlog { + return Err("throttled".into()); + } + + let oo_pks: BTreeSet<_> = events + .iter() + .map(|e| match e { + EventView::Fill { owner, .. } | EventView::Out { owner, .. } => { + bytemuck::cast_slice::(owner)[0] + } + }) + .collect(); + + let mut ams: Vec<_> = oo_pks + .iter() + .map(|pk| AccountMeta::new(*pk, false)) + .collect(); + + // pass two times evq_pk instead of deprecated fee receivers to reduce encoded tx size + let mkt_pk = self + .pks + .get(pk) + .expect(&format!("{pk:?} is a known public key")); + ams.append( + &mut [mkt_pk, pk, /*coin_pk*/ pk, /*pc_pk*/ pk] + .iter() + .map(|pk| AccountMeta::new(**pk, false)) + .collect(), + ); + + let ix = Instruction { + program_id: Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), + accounts: ams, + data: MarketInstruction::ConsumeEvents(count as u16).pack(), + }; + + info!("evq={pk:?} count={count}"); + if let Err(e) = self.instruction_sender.send(vec![ix]).await { + return Err(e.to_string()) + } + + Ok(()) + } +} diff --git a/service-mango-crank/src/transaction_builder.rs b/service-mango-crank/src/transaction_builder.rs index ab8ec75..d4d1984 100644 --- a/service-mango-crank/src/transaction_builder.rs +++ b/service-mango-crank/src/transaction_builder.rs @@ -2,6 +2,7 @@ use bytemuck::cast_ref; use mango_v4::state::FillEvent; use serum_dex::{instruction::MarketInstruction, state::EventView}; use solana_geyser_connector_lib::{ + account_write_filter::{self, AccountWriteRoute}, chain_data::{AccountData, ChainData, SlotData}, metrics::Metrics, serum::SerumEventQueueHeader, @@ -21,9 +22,12 @@ use std::{ collections::{BTreeSet, HashMap, HashSet}, convert::TryFrom, str::FromStr, + sync::Arc, time::{Duration, Instant}, }; +use crate::{openbook_crank_sink::OpenbookCrankSink, mango_v4_perp_crank_sink::MangoV4PerpCrankSink}; + const MAX_BACKLOG: usize = 2; const TIMEOUT_INTERVAL: Duration = Duration::from_millis(400); @@ -37,268 +41,37 @@ pub fn init( async_channel::Sender, async_channel::Receiver>, )> { - let metrics_sender = metrics_sender.clone(); - - // The actual message may want to also contain a retry count, if it self-reinserts on failure? - let (account_write_queue_sender, account_write_queue_receiver) = - async_channel::unbounded::(); - - // Slot updates flowing from the outside into the single processing thread. From - // there they'll flow into the postgres sending thread. - let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); - // Event queue updates can be consumed by client connections let (instruction_sender, instruction_receiver) = async_channel::unbounded::>(); - let mut chain_cache = ChainData::new(metrics_sender); - let mut last_cranked_evq_versions = HashMap::::new(); + let routes = vec![ + AccountWriteRoute { + matched_pubkeys: serum_queue_pks + .iter() + .map(|(_, evq_pk)| evq_pk.clone()) + .collect(), + sink: Arc::new(OpenbookCrankSink::new( + serum_queue_pks, + instruction_sender.clone(), + )), + timeout_interval: Duration::default(), + }, + AccountWriteRoute { + matched_pubkeys: perp_queue_pks + .iter() + .map(|(_, evq_pk)| evq_pk.clone()) + .collect(), + sink: Arc::new(MangoV4PerpCrankSink::new( + perp_queue_pks, + group_pk, + instruction_sender.clone(), + )), + timeout_interval: Duration::default(), + }, + ]; - let all_queue_pks: HashSet = perp_queue_pks - .iter() - .chain(serum_queue_pks.iter()) - .map(|mkt| mkt.1) - .collect(); - - // update handling thread, reads both sloths and account updates - tokio::spawn(async move { - loop { - tokio::select! { - Ok(account_write) = account_write_queue_receiver.recv() => { - if !all_queue_pks.contains(&account_write.pubkey) { - continue; - } - - chain_cache.update_account( - account_write.pubkey, - AccountData { - slot: account_write.slot, - write_version: account_write.write_version, - account: WritableAccount::create( - account_write.lamports, - account_write.data.clone(), - account_write.owner, - account_write.executable, - account_write.rent_epoch as Epoch, - ), - }, - ); - } - Ok(slot_update) = slot_queue_receiver.recv() => { - chain_cache.update_slot(SlotData { - slot: slot_update.slot, - parent: slot_update.parent, - status: slot_update.status, - chain: 0, - }); - - } - } - - for (mkt_pk, evq_pk) in perp_queue_pks.iter() { - let evq_b58 = evq_pk.to_string(); - let last_evq_version = last_cranked_evq_versions.get(&evq_b58); - - match chain_cache.account(&evq_pk) { - Ok(account_info) => { - // only process if the account state changed - let (is_unchanged, is_on_timeout) = match last_evq_version { - Some((slot, write_version, timestamp)) => ( - account_info.slot == *slot - && account_info.write_version == *write_version, - timestamp.elapsed() < TIMEOUT_INTERVAL, - ), - None => Default::default(), - }; - trace!("mango perp evq={evq_b58} slot={} v={} is_unchanged={is_unchanged} is_on_timeout={is_on_timeout}", account_info.slot, account_info.write_version); - if is_unchanged || is_on_timeout { - continue; - } - - let account = &account_info.account; - let event_queue: mango_v4::state::EventQueue = - mango_v4::state::EventQueue::try_deserialize( - account.data().borrow_mut(), - ) - .unwrap(); - - // only crank if at least 1 fill or a sufficient events of other categories are buffered - let contains_fill_events = event_queue - .iter() - .find(|e| e.event_type == mango_v4::state::EventType::Fill as u8) - .is_some(); - let has_backlog = event_queue.iter().count() > MAX_BACKLOG; - trace!("mango perp evq={evq_b58} slot={} v={} contains_fill_events={contains_fill_events} has_backlog={has_backlog}", account_info.slot, account_info.write_version); - if !contains_fill_events && !has_backlog { - continue; - } - - let mango_accounts: BTreeSet<_> = event_queue - .iter() - .take(10) - .flat_map(|e| { - match mango_v4::state::EventType::try_from(e.event_type) - .expect("mango v4 event") - { - mango_v4::state::EventType::Fill => { - let fill: &mango_v4::state::FillEvent = cast_ref(e); - vec![fill.maker, fill.taker] - } - mango_v4::state::EventType::Out => { - let out: &mango_v4::state::OutEvent = cast_ref(e); - vec![out.owner] - } - mango_v4::state::EventType::Liquidate => vec![], - } - }) - .collect(); - - let mut ams: Vec<_> = anchor_lang::ToAccountMetas::to_account_metas( - &mango_v4::accounts::PerpConsumeEvents { - group: group_pk, - perp_market: *mkt_pk, - event_queue: *evq_pk, - }, - None, - ); - ams.append( - &mut mango_accounts - .iter() - .map(|pk| AccountMeta::new(*pk, false)) - .collect(), - ); - - let ix = Instruction { - program_id: mango_v4::id(), - accounts: ams, - data: anchor_lang::InstructionData::data( - &mango_v4::instruction::PerpConsumeEvents { limit: 10 }, - ), - }; - - info!( - "mango perp evq={evq_b58} count={} limit=10", - event_queue.iter().count() - ); - instruction_sender.send(vec![ix]).await; - - last_cranked_evq_versions.insert( - evq_b58.clone(), - ( - account_info.slot, - account_info.write_version, - Instant::now(), - ), - ); - } - Err(_) => info!("chain_cache could not find {evq_b58}"), - } - } - - for (mkt_pk, evq_pk) in serum_queue_pks.iter() { - let evq_b58 = evq_pk.to_string(); - let last_evq_version = last_cranked_evq_versions.get(&evq_b58); - - match chain_cache.account(&evq_pk) { - Ok(account_info) => { - // only process if the account state changed - let (is_unchanged, is_on_timeout) = match last_evq_version { - Some((slot, write_version, timestamp)) => ( - account_info.slot == *slot - && account_info.write_version == *write_version, - timestamp.elapsed() < TIMEOUT_INTERVAL, - ), - None => Default::default(), - }; - trace!("openbook evq={evq_b58} slot={} v={} is_unchanged={is_unchanged} is_on_timeout={is_on_timeout}", account_info.slot, account_info.write_version); - if is_unchanged || is_on_timeout { - continue; - } - - let account = &account_info.account; - - let inner_data = &account.data()[5..&account.data().len() - 7]; - let header_span = std::mem::size_of::(); - let header: SerumEventQueueHeader = - *bytemuck::from_bytes(&inner_data[..header_span]); - let count = header.count; - - let rest = &inner_data[header_span..]; - let event_size = std::mem::size_of::(); - let slop = rest.len() % event_size; - let end = rest.len() - slop; - let events = - bytemuck::cast_slice::(&rest[..end]); - let seq_num = header.seq_num; - - let events: Vec<_> = (0..count) - .map(|i| { - let offset = (seq_num - count + i) % events.len() as u64; - let event: serum_dex::state::Event = events[offset as usize]; - event.as_view().unwrap() - }) - .collect(); - - // only crank if at least 1 fill or a sufficient events of other categories are buffered - let contains_fill_events = events - .iter() - .find(|e| matches!(e, serum_dex::state::EventView::Fill { .. })) - .is_some(); - let has_backlog = events.len() > MAX_BACKLOG; - if !contains_fill_events && !has_backlog { - continue; - } - - let oo_pks: BTreeSet<_> = events - .iter() - .map(|e| match e { - EventView::Fill { owner, .. } | EventView::Out { owner, .. } => { - bytemuck::cast_slice::(owner)[0] - } - }) - .collect(); - - let mut ams: Vec<_> = oo_pks - .iter() - .map(|pk| AccountMeta::new(*pk, false)) - .collect(); - - // pass two times evq_pk instead of deprecated fee receivers to reduce encoded tx size - ams.append( - &mut [ - mkt_pk, evq_pk, evq_pk, /*coin_pk*/ - evq_pk, /*pc_pk*/ - ] - .iter() - .map(|pk| AccountMeta::new(**pk, false)) - .collect(), - ); - - let ix = Instruction { - program_id: Pubkey::from_str( - "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX", - ) - .unwrap(), - accounts: ams, - data: MarketInstruction::ConsumeEvents(count as u16).pack(), - }; - - info!("openbook evq={evq_b58} count={count}"); - instruction_sender.send(vec![ix]).await; - - last_cranked_evq_versions.insert( - evq_b58.clone(), - ( - account_info.slot, - account_info.write_version, - Instant::now(), - ), - ); - } - Err(_) => info!("chain_cache could not find {evq_b58}"), - } - } - } - }); + let (account_write_queue_sender, slot_queue_sender) = + account_write_filter::init(routes, metrics_sender.clone())?; Ok(( account_write_queue_sender,