wip add most of the v3 crank code

This commit is contained in:
Maximilian Schneider 2023-02-19 01:22:12 +09:00
parent 02acb24ee5
commit 03a2f8c740
10 changed files with 265 additions and 180 deletions

1
Cargo.lock generated
View File

@ -5445,6 +5445,7 @@ dependencies = [
"solana-version", "solana-version",
"thiserror", "thiserror",
"tokio", "tokio",
"toml",
"tonic 0.6.2", "tonic 0.6.2",
"tonic-build 0.6.2", "tonic-build 0.6.2",
"warp", "warp",

View File

@ -24,7 +24,7 @@ serde_derive = "1.0.103"
serde_json = "1.0.79" serde_json = "1.0.79"
serde_yaml = "0.8.23" serde_yaml = "0.8.23"
mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", branch = "mango_bencher_compatible", default-features = false, features = ["no-entrypoint"] } mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", branch = "mango_bencher_compatible", default-features = false }
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", branch = "mango_bencher_compatible" } mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", branch = "mango_bencher_compatible" }
solana-client = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" } solana-client = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
@ -42,9 +42,12 @@ solana-transaction-status = { git = "https://github.com/solana-labs/solana.git",
solana-quic-client = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" } solana-quic-client = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" } solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
# pin program to mango-v3 version of solana sdk
# now we can use sdk for recent version and program for legacy
# we have a bunch of helpers to convert between the two explicitly
solana-program = "1.9.17"
thiserror = "1.0" thiserror = "1.0"
solana-program = ">=1.9.0"
csv = "1.0.0" csv = "1.0.0"
tonic = { version = "0.6", features = ["tls", "compression"] } tonic = { version = "0.6", features = ["tls", "compression"] }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
@ -58,6 +61,8 @@ jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] } jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
arrayref = "*" arrayref = "*"
bytemuck = "1.7.2" bytemuck = "1.7.2"
toml = "*"
[build-dependencies] [build-dependencies]
tonic-build = { version = "0.6", features = ["compression"] } tonic-build = { version = "0.6", features = ["compression"] }

View File

@ -1,134 +1,134 @@
use crate::{ use crate::{
chain_data::{AccountData, AccountWrite, SlotUpdate, ChainData, SlotData}, chain_data::{AccountData, AccountWrite, ChainData, SlotData, SlotUpdate},
metrics::Metrics, metrics::Metrics,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use solana_sdk::{account::WritableAccount, stake_history::Epoch, pubkey::Pubkey}; use solana_sdk::{account::WritableAccount, pubkey::Pubkey, stake_history::Epoch};
use std::{ use std::{
collections::{BTreeSet, HashMap}, collections::{BTreeSet, HashMap},
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
#[async_trait] #[async_trait]
pub trait AccountWriteSink { pub trait AccountWriteSink {
async fn process(&self, pubkey: &Pubkey, account: &AccountData) -> Result<(), String>; async fn process(&self, pubkey: &Pubkey, account: &AccountData) -> Result<(), String>;
} }
#[derive(Clone)] #[derive(Clone)]
pub struct AccountWriteRoute { pub struct AccountWriteRoute {
pub matched_pubkeys: Vec<Pubkey>, pub matched_pubkeys: Vec<Pubkey>,
pub sink: Arc<dyn AccountWriteSink + Send + Sync>, pub sink: Arc<dyn AccountWriteSink + Send + Sync>,
pub timeout_interval: Duration, pub timeout_interval: Duration,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct AcountWriteRecord { struct AcountWriteRecord {
slot: u64, slot: u64,
write_version: u64, write_version: u64,
timestamp: Instant, timestamp: Instant,
} }
pub fn init( pub fn init(
routes: Vec<AccountWriteRoute>, routes: Vec<AccountWriteRoute>,
metrics_sender: Metrics, metrics_sender: Metrics,
) -> anyhow::Result<( ) -> anyhow::Result<(
async_channel::Sender<AccountWrite>, async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>, async_channel::Sender<SlotUpdate>,
)> { )> {
// The actual message may want to also contain a retry count, if it self-reinserts on failure? // The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) = let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<AccountWrite>(); async_channel::unbounded::<AccountWrite>();
// Slot updates flowing from the outside into the single processing thread. From // Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread. // there they'll flow into the postgres sending thread.
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>(); let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
let mut chain_data = ChainData::new(metrics_sender); let mut chain_data = ChainData::new(metrics_sender);
let mut last_updated = HashMap::<String, AcountWriteRecord>::new(); let mut last_updated = HashMap::<String, AcountWriteRecord>::new();
let all_queue_pks: BTreeSet<Pubkey> = routes let all_queue_pks: BTreeSet<Pubkey> = routes
.iter() .iter()
.flat_map(|r| r.matched_pubkeys.iter()) .flat_map(|r| r.matched_pubkeys.iter())
.map(|pk| pk.clone()) .map(|pk| pk.clone())
.collect(); .collect();
// update handling thread, reads both sloths and account updates // update handling thread, reads both sloths and account updates
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
tokio::select! { tokio::select! {
Ok(account_write) = account_write_queue_receiver.recv() => { Ok(account_write) = account_write_queue_receiver.recv() => {
if all_queue_pks.contains(&account_write.pubkey) { if all_queue_pks.contains(&account_write.pubkey) {
continue; continue;
} }
chain_data.update_account( chain_data.update_account(
account_write.pubkey, account_write.pubkey,
AccountData { AccountData {
slot: account_write.slot, slot: account_write.slot,
write_version: account_write.write_version, write_version: account_write.write_version,
account: WritableAccount::create( account: WritableAccount::create(
account_write.lamports, account_write.lamports,
account_write.data.clone(), account_write.data.clone(),
account_write.owner, account_write.owner,
account_write.executable, account_write.executable,
account_write.rent_epoch as Epoch, account_write.rent_epoch as Epoch,
), ),
}, },
); );
} }
Ok(slot_update) = slot_queue_receiver.recv() => { Ok(slot_update) = slot_queue_receiver.recv() => {
chain_data.update_slot(SlotData { chain_data.update_slot(SlotData {
slot: slot_update.slot, slot: slot_update.slot,
parent: slot_update.parent, parent: slot_update.parent,
status: slot_update.status, status: slot_update.status,
chain: 0, chain: 0,
}); });
} }
} }
for route in routes.iter() { for route in routes.iter() {
for pk in route.matched_pubkeys.iter() { for pk in route.matched_pubkeys.iter() {
match chain_data.account(&pk) { match chain_data.account(&pk) {
Ok(account_info) => { Ok(account_info) => {
let pk_b58 = pk.to_string(); let pk_b58 = pk.to_string();
if let Some(record) = last_updated.get(&pk_b58) { if let Some(record) = last_updated.get(&pk_b58) {
let is_unchanged = account_info.slot == record.slot let is_unchanged = account_info.slot == record.slot
&& account_info.write_version == record.write_version; && account_info.write_version == record.write_version;
let is_throttled = let is_throttled =
record.timestamp.elapsed() < route.timeout_interval; record.timestamp.elapsed() < route.timeout_interval;
if is_unchanged || is_throttled { if is_unchanged || is_throttled {
continue; continue;
} }
}; };
match route.sink.process(pk, account_info).await { match route.sink.process(pk, account_info).await {
Ok(()) => { Ok(()) => {
// todo: metrics // todo: metrics
last_updated.insert( last_updated.insert(
pk_b58.clone(), pk_b58.clone(),
AcountWriteRecord { AcountWriteRecord {
slot: account_info.slot, slot: account_info.slot,
write_version: account_info.write_version, write_version: account_info.write_version,
timestamp: Instant::now(), timestamp: Instant::now(),
}, },
); );
} }
Err(_skip_reason) => { Err(_skip_reason) => {
// todo: metrics // todo: metrics
} }
} }
} }
Err(_) => { Err(_) => {
// todo: metrics // todo: metrics
} }
} }
} }
} }
} }
}); });
Ok((account_write_queue_sender, slot_queue_sender)) Ok((account_write_queue_sender, slot_queue_sender))
} }

View File

@ -1,8 +1,7 @@
use crate::metrics::{MetricType, MetricU64, Metrics}; use crate::metrics::{MetricType, MetricU64, Metrics};
use { use {
solana_sdk::account::{Account, AccountSharedData, ReadableAccount}, solana_sdk::{pubkey::Pubkey, account::{Account, AccountSharedData, ReadableAccount}},
solana_sdk::pubkey::Pubkey,
std::collections::HashMap, std::collections::HashMap,
}; };
@ -28,7 +27,6 @@ pub struct AccountData {
pub account: AccountSharedData, pub account: AccountSharedData,
} }
#[derive(Clone, PartialEq, Debug)] #[derive(Clone, PartialEq, Debug)]
pub struct AccountWrite { pub struct AccountWrite {
pub pubkey: Pubkey, pub pubkey: Pubkey,
@ -65,7 +63,6 @@ pub struct SlotUpdate {
pub status: SlotStatus, pub status: SlotStatus,
} }
/// Track slots and account writes /// Track slots and account writes
/// ///
/// - use account() to retrieve the current best data for an account. /// - use account() to retrieve the current best data for an account.

View File

@ -14,9 +14,9 @@ use chrono::Utc;
use crossbeam_channel::{Receiver, TryRecvError}; use crossbeam_channel::{Receiver, TryRecvError};
use log::{debug, error, info, trace}; use log::{debug, error, info, trace};
use solana_client::{rpc_client::RpcClient, rpc_config::RpcBlockConfig}; use solana_client::{rpc_client::RpcClient, rpc_config::RpcBlockConfig};
use solana_program::pubkey::Pubkey;
use solana_sdk::{ use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel}, commitment_config::{CommitmentConfig, CommitmentLevel},
pubkey::Pubkey,
signature::Signature, signature::Signature,
}; };
use solana_transaction_status::RewardType; use solana_transaction_status::RewardType;

View File

@ -1,73 +1,110 @@
use std::{thread::{JoinHandle, Builder}, sync::{Arc, RwLock}, str::FromStr, time::Duration}; use std::{
fs::File,
io::Read,
str::FromStr,
sync::{Arc, RwLock},
thread::{Builder, JoinHandle},
time::Duration,
};
// use solana_client::rpc_client::RpcClient; // use solana_client::rpc_client::RpcClient;
use solana_sdk::{pubkey::Pubkey, signature::Keypair, instruction::Instruction}; use crate::{
account_write_filter::{self, AccountWriteRoute},
grpc_plugin_source::{self, FilterConfig, SourceConfig},
mango::GroupConfig,
mango_v3_perp_crank_sink::MangoV3PerpCrankSink,
metrics, blockhash_poller, transaction_sender,
};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{instruction::Instruction, pubkey::Pubkey, signature::Keypair};
use crate::{mango::GroupConfig, account_write_filter::AccountWriteRoute, mango_v3_perp_crank_sink::MangoV3PerpCrankSink}; fn start_crank_thread(identity: Keypair, group: GroupConfig) -> JoinHandle<()> {
let perp_queue_pks: Vec<_> = group
.perp_markets
.iter()
.map(|m| {
(
Pubkey::from_str(&m.public_key).unwrap(),
Pubkey::from_str(&m.events_key).unwrap(),
)
})
.collect();
let group_pk = Pubkey::from_str(&group.public_key).unwrap();
let cache_pk = Pubkey::from_str(&group.cache_pk).unwrap();
let mango_program_id = Pubkey::from_str(&group.mango_program_id).unwrap();
let filter_config = FilterConfig {
program_ids: vec![],
account_ids: group.perp_markets.iter().map(|m| m.events_key.clone()).collect(),
};
return Builder::new()
.name("crank".to_string())
.spawn(move || {
let config: SourceConfig = {
let mut file = File::open("source.toml").expect("source.toml file in cwd");
let mut contents = String::new();
file.read_to_string(&mut contents)
.expect("source.toml to contain data");
toml::from_str(&contents).unwrap()
};
let metrics_tx = metrics::start(
metrics::MetricsConfig {
output_stdout: true,
output_http: false,
},
"crank".into(),
);
let rpc_client = Arc::new(RpcClient::new("".into()));
// pub async fn send_tx_loop( // TODO await future
// ixs_rx: async_channel::Receiver<Vec<Instruction>>, let blockhash = blockhash_poller::init(rpc_client.clone());
// blockhash: Arc<RwLock<Hash>>,
// client: Arc<RpcClient>,
// keypair: Keypair,
// ) {
// info!("signing with keypair pk={:?}", keypair.pubkey());
// let cfg = RpcSendTransactionConfig {
// skip_preflight: true,
// ..RpcSendTransactionConfig::default()
// };
// loop {
// if let Ok(ixs) = ixs_rx.recv().await {
// // TODO add priority fee
// let tx = Transaction::new_signed_with_payer(
// &ixs,
// Some(&keypair.pubkey()),
// &[&keypair],
// *blockhash.read().unwrap(),
// );
// // TODO: collect metrics
// info!("send tx={:?} ok={:?}", tx.signatures[0], client.send_transaction_with_config(&tx, cfg).await);
// }
// }
// }
// Event queue updates can be consumed by client connections
let (instruction_sender, instruction_receiver) =
async_channel::unbounded::<Vec<Instruction>>();
fn start_crank_thread( transaction_sender::init(
identity: Keypair, instruction_receiver,
group: GroupConfig blockhash,
) -> JoinHandle<()> { rpc_client,
identity
);
let routes = vec![AccountWriteRoute {
matched_pubkeys: perp_queue_pks
.iter()
.map(|(_, evq_pk)| evq_pk.clone())
.collect(),
sink: Arc::new(MangoV3PerpCrankSink::new(
perp_queue_pks,
group_pk,
cache_pk,
mango_program_id,
instruction_sender.clone(),
)),
timeout_interval: Duration::default(),
}];
let perp_queue_pks: Vec<_> = group.perp_markets.iter().map(|m| (Pubkey::from_str(&m.public_key).unwrap(), Pubkey::from_str(&m.events_key).unwrap())).collect(); let (account_write_queue_sender, slot_queue_sender) =
let group_pk = Pubkey::from_str(&group.public_key).unwrap(); account_write_filter::init(routes, metrics_tx.clone()).expect("filter initializes");
return Builder::new() // TODO figure out how to start tokio stuff here
.name("crank".to_string()) grpc_plugin_source::process_events(
.spawn(move || { &config,
&filter_config,
account_write_queue_sender,
slot_queue_sender,
metrics_tx.clone(),
);
// .await;
})
.expect("launch crank thread");
// TODO also implement websocket handler
// websocket_source::process_events(
// Event queue updates can be consumed by client connections // &config.source,
let (instruction_sender, instruction_receiver) = async_channel::unbounded::<Vec<Instruction>>(); // account_write_queue_sender,
// slot_queue_sender,
// )
}
let routes = vec![
AccountWriteRoute {
matched_pubkeys: perp_queue_pks
.iter()
.map(|(_, evq_pk)| evq_pk.clone())
.collect(),
sink: Arc::new(MangoV3PerpCrankSink::new(
perp_queue_pks,
group_pk,
instruction_sender.clone(),
)),
timeout_interval: Duration::default(),
}];
}).expect("launch crank thread")
}

View File

@ -12,6 +12,7 @@ pub mod market_markers;
pub mod metrics; pub mod metrics;
pub mod rotating_queue; pub mod rotating_queue;
pub mod states; pub mod states;
pub mod transaction_sender;
trait AnyhowWrap { trait AnyhowWrap {
type Value; type Value;

View File

@ -26,6 +26,7 @@ pub struct MangoConfig {
pub struct GroupConfig { pub struct GroupConfig {
pub name: String, pub name: String,
pub public_key: String, pub public_key: String,
pub cache_pk: String,
pub mango_program_id: String, pub mango_program_id: String,
pub serum_program_id: String, pub serum_program_id: String,
pub oracles: Vec<OracleConfig>, pub oracles: Vec<OracleConfig>,

View File

@ -12,15 +12,14 @@ use mango::{
instruction::consume_events, instruction::consume_events,
queue::{AnyEvent, EventQueueHeader, EventType, FillEvent, OutEvent, Queue}, queue::{AnyEvent, EventQueueHeader, EventType, FillEvent, OutEvent, Queue},
}; };
use solana_sdk::{pubkey::Pubkey, instruction::Instruction};
use solana_sdk::{ use solana_sdk::{
account::ReadableAccount, account::ReadableAccount,
instruction::{Instruction},
pubkey::Pubkey,
}; };
use bytemuck::cast_ref; use bytemuck::cast_ref;
use crate::{account_write_filter::AccountWriteSink, chain_data::AccountData}; use crate::{account_write_filter::AccountWriteSink, chain_data::AccountData, helpers::{to_sdk_instruction, to_sp_pk}};
const MAX_BACKLOG: usize = 2; const MAX_BACKLOG: usize = 2;
const MAX_EVENTS_PER_TX: usize = 10; const MAX_EVENTS_PER_TX: usize = 10;
@ -108,16 +107,16 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
.get(pk) .get(pk)
.expect(&format!("{pk:?} is a known public key")); .expect(&format!("{pk:?} is a known public key"));
let ix = consume_events( let ix = to_sdk_instruction(consume_events(
&self.mango_v3_program, &to_sp_pk(&self.mango_v3_program),
&self.group_pk, &to_sp_pk(&self.group_pk),
&self.cache_pk, &to_sp_pk(&self.cache_pk),
mkt_pk, &to_sp_pk(mkt_pk),
pk, &to_sp_pk(pk),
&mut mango_accounts, &mut mango_accounts,
MAX_EVENTS_PER_TX, MAX_EVENTS_PER_TX,
) )
.unwrap(); .unwrap());
Ok(ix) Ok(ix)
}; };

44
src/transaction_sender.rs Normal file
View File

@ -0,0 +1,44 @@
use log::*;
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig};
use solana_sdk::{
hash::Hash, instruction::Instruction, signature::Keypair, signature::Signer,
transaction::Transaction,
};
use std::sync::{Arc, RwLock};
use tokio::spawn;
pub async fn send_loop(
ixs_rx: async_channel::Receiver<Vec<Instruction>>,
blockhash: Arc<RwLock<Hash>>,
client: Arc<RpcClient>,
keypair: Keypair,
) {
info!("signing with keypair pk={:?}", keypair.pubkey());
let cfg = RpcSendTransactionConfig {
skip_preflight: true,
..RpcSendTransactionConfig::default()
};
loop {
if let Ok(ixs) = ixs_rx.recv().await {
// TODO add priority fee
let tx = Transaction::new_signed_with_payer(
&ixs,
Some(&keypair.pubkey()),
&[&keypair],
*blockhash.read().unwrap(),
);
// TODO: collect metrics
info!("send tx={:?} ok={:?}", tx.signatures[0],
client.send_transaction_with_config(&tx, cfg).await);
}
}
}
pub fn init(
ixs_rx: async_channel::Receiver<Vec<Instruction>>,
blockhash: Arc<RwLock<Hash>>,
client: Arc<RpcClient>,
keypair: Keypair,
) {
spawn(async move { send_loop(ixs_rx, blockhash, client, keypair).await });
}