Merge pull request #11 from blockworks-foundation/max/fix-crank

upgrade mango-feeds-connector after fixing account write filter bug
This commit is contained in:
galactus 2023-04-06 16:52:57 +02:00 committed by GitHub
commit 5bea7236cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 342 additions and 430 deletions

492
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,29 +8,32 @@ license = "Apache-2.0"
homepage = "mango.markets"
publish = false
rust-version = "1.66.1"
[dependencies]
anyhow = "1.0"
arrayref = "*"
async-std = "1.12.0"
async-channel = "1.6"
async-trait = "0.1.66"
borsh = "0.9.3"
bytemuck = "1.7.2"
chrono = "0.4.19"
clap = "2.33.1"
csv-async = "1.2"
dashmap = "5.4.0"
fixed = { version = ">=1.11.0, <1.12.0", features = ["serde"] }
fixed-macro = "^1.1.1"
multiqueue = "^0.3.2"
futures = "0.3.17"
iter_tools = "0.1.4"
log = "0.4.14"
multiqueue = "^0.3.2"
rand = ">=0.8.5"
rayon = "1.5.1"
serde = "1.0.136"
serde_derive = "1.0.103"
serde_json = "1.0.79"
serde_yaml = "0.8.23"
iter_tools = "0.1.4"
dashmap = "5.4.0"
mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0", default-features = false }
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0" }
mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "ckamm/solana-versions2", default-features = false, features = ["solana-1-15"] }
yellowstone-grpc-proto = "1.0.1"
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }
solana-client = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-core = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
@ -52,25 +55,10 @@ solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", ta
# we have a bunch of helpers to convert between the two explicitly
solana-program = "1.9.17"
thiserror = "1.0"
csv-async = "1.2"
async-std = "1.12.0"
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
async-channel = "1.6"
async-trait = "0.1.66"
prost = "0.11"
warp = "0.3"
futures = "0.3.17"
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
arrayref = "*"
bytemuck = "1.7.2"
toml = "*"
mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0", default-features = false }
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0" }
mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "ckamm/solana-versions2", default-features = false, features = ["solana-1-15"] }
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
[patch.crates-io]
# for gzip encoded responses
jsonrpc-core-client = { git = "https://github.com/ckamm/jsonrpc.git", branch = "ckamm/http-with-gzip" }

View File

@ -11,22 +11,43 @@ For the best results to avoid limits by quic it is better to fill the argument "
## Build
Clone repo
Install configure-mango
```sh
https://github.com/blockworks-foundation/mango-simulation.git
git clone https://github.com/godmodegalactus/configure_mango.git
cd configure_mango
yarn install
sh scripts/configure_local.sh
# open a new terminal as the previous one will continue running a solana validator
# this command will hang for around a minute, just wait for it to finish
NB_USERS=50 yarn ts-node index.ts
```
Build
Install mango-simulation
```sh
git clone https://github.com/blockworks-foundation/mango-simulation.git
cd mango-simulation
cargo build
# copy over files from configure_mango while you wait for the build to finish
mkdir -p localnet
cp ../configure_mango/ids.json localnet
cp ../configure_mango/accounts.json localnet
cp ../configure_mango/authority.json localnet
cp ../configure_mango/config/validator-identity.json localnet
```
## Run
To run against your local validator:
```sh
cargo run --bin mango-simulation -- -u http://localhost:8899 --identity validator-identity.json --keeper-authority authority.json --accounts accounts.json --mango ids.json --mango-cluster localnet --duration 10 -q 2 --transaction_save_file tlog.csv --block_data_save_file blog.csv
cargo run --bin mango-simulation -- -u http://127.0.0.1:8899 --identity localnet/validator-identity.json --keeper-authority localnet/authority.json --accounts localnet/accounts.json --mango localnet/ids.json --mango-cluster localnet --duration 10 -q 2 --transaction-save-file tlog.csv --block-data-save-file blog.csv
```
You can also run the simulation against testnet, but you will need to run configure_mango
Details for each argument:
```
USAGE:

View File

@ -20,7 +20,9 @@ use solana_transaction_status::{
use crate::states::{BlockData, TransactionConfirmRecord, TransactionSendRecord};
use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle, time::Instant, sync::broadcast::Sender};
use tokio::{
sync::broadcast::Sender, sync::mpsc::UnboundedReceiver, task::JoinHandle, time::Instant,
};
pub async fn process_blocks(
block: &UiConfirmedBlock,
@ -73,33 +75,31 @@ pub async fn process_blocks(
let transaction_record = transaction_record.0;
mm_transaction_count += 1;
match tx_confirm_records
.send(TransactionConfirmRecord {
signature: transaction_record.signature.to_string(),
confirmed_slot: Some(slot),
confirmed_at: Some(Utc::now().to_string()),
sent_at: transaction_record.sent_at.to_string(),
sent_slot: transaction_record.sent_slot,
successful: if let Some(meta) = &meta {
meta.status.is_ok()
} else {
false
},
error: if let Some(meta) = &meta {
meta.err.as_ref().map(|x| x.to_string())
} else {
None
},
block_hash: Some(block.blockhash.clone()),
market: transaction_record.market.map(|x| x.to_string()),
market_maker: transaction_record.market_maker.map(|x| x.to_string()),
keeper_instruction: transaction_record.keeper_instruction,
slot_processed: Some(slot),
slot_leader: Some(slot_leader.clone()),
timed_out: false,
priority_fees: transaction_record.priority_fees,
})
{
match tx_confirm_records.send(TransactionConfirmRecord {
signature: transaction_record.signature.to_string(),
confirmed_slot: Some(slot),
confirmed_at: Some(Utc::now().to_string()),
sent_at: transaction_record.sent_at.to_string(),
sent_slot: transaction_record.sent_slot,
successful: if let Some(meta) = &meta {
meta.status.is_ok()
} else {
false
},
error: if let Some(meta) = &meta {
meta.err.as_ref().map(|x| x.to_string())
} else {
None
},
block_hash: Some(block.blockhash.clone()),
market: transaction_record.market.map(|x| x.to_string()),
market_maker: transaction_record.market_maker.map(|x| x.to_string()),
keeper_instruction: transaction_record.keeper_instruction,
slot_processed: Some(slot),
slot_leader: Some(slot_leader.clone()),
timed_out: false,
priority_fees: transaction_record.priority_fees,
}) {
Ok(_) => {}
Err(e) => {
warn!("Tx confirm record channel broken {}", e.to_string());
@ -191,24 +191,23 @@ pub fn confirmations_by_blocks(
// add to timeout if not retaining
if remove {
let _ = tx_confirm_records
.send(TransactionConfirmRecord {
signature: signature.to_string(),
confirmed_slot: None,
confirmed_at: None,
sent_at: sent_record.sent_at.to_string(),
sent_slot: sent_record.sent_slot,
successful: false,
error: Some("timeout".to_string()),
block_hash: None,
market: sent_record.market.map(|x| x.to_string()),
market_maker: sent_record.market_maker.map(|x| x.to_string()),
keeper_instruction: sent_record.keeper_instruction.clone(),
slot_processed: None,
slot_leader: None,
timed_out: true,
priority_fees: sent_record.priority_fees,
});
let _ = tx_confirm_records.send(TransactionConfirmRecord {
signature: signature.to_string(),
confirmed_slot: None,
confirmed_at: None,
sent_at: sent_record.sent_at.to_string(),
sent_slot: sent_record.sent_slot,
successful: false,
error: Some("timeout".to_string()),
block_hash: None,
market: sent_record.market.map(|x| x.to_string()),
market_maker: sent_record.market_maker.map(|x| x.to_string()),
keeper_instruction: sent_record.keeper_instruction.clone(),
slot_processed: None,
slot_leader: None,
timed_out: true,
priority_fees: sent_record.priority_fees,
});
to_remove.push(signature.clone());
}
}

View File

@ -8,10 +8,7 @@ use crate::{
use mango_feeds_connector::{
account_write_filter::{self, AccountWriteRoute},
FilterConfig,
websocket_source,
metrics, SourceConfig,
SnapshotSourceConfig, MetricsConfig,
metrics, websocket_source, FilterConfig, MetricsConfig, SnapshotSourceConfig, SourceConfig,
};
use async_channel::unbounded;
@ -31,8 +28,6 @@ use std::{
};
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct KeeperConfig {
pub program_id: Pubkey,
@ -40,7 +35,6 @@ pub struct KeeperConfig {
pub websocket_url: String,
}
pub fn start(
config: KeeperConfig,
exit_signal: Arc<AtomicBool>,
@ -123,7 +117,11 @@ pub fn start(
let routes = vec![AccountWriteRoute {
matched_pubkeys: perp_queue_pks
.iter()
.map(|(_, evq_pk)| mango_feeds_connector::solana_sdk::pubkey::Pubkey::new_from_array(evq_pk.to_bytes()))
.map(|(_, evq_pk)| {
mango_feeds_connector::solana_sdk::pubkey::Pubkey::new_from_array(
evq_pk.to_bytes(),
)
})
.collect(),
sink: Arc::new(MangoV3PerpCrankSink::new(
perp_queue_pks,
@ -135,10 +133,12 @@ pub fn start(
timeout_interval: Duration::default(),
}];
info!("matched_pks={:?}", routes[0].matched_pubkeys);
let (account_write_queue_sender, slot_queue_sender) =
account_write_filter::init(routes, metrics_tx.clone()).expect("filter initializes");
info!("start processing grpc events");
// info!("start processing grpc events");
// grpc_plugin_source::process_events(
// &config,
@ -148,6 +148,11 @@ pub fn start(
// metrics_tx.clone(),
// ).await;
info!(
"start processing websocket events program_id={:?} ws_url={:?}",
config.program_id, config.websocket_url
);
websocket_source::process_events(
&SourceConfig {
dedup_queue_size: 0,

View File

@ -1,4 +1,3 @@
pub mod cli;
pub mod confirmation_strategies;
pub mod crank;

View File

@ -1,6 +1,5 @@
use {
log::info,
serde_json,
mango_simulation::{
cli,
confirmation_strategies::confirmations_by_blocks,
@ -17,6 +16,7 @@ use {
stats::MangoSimulationStats,
tpu_manager::TpuManager,
},
serde_json,
solana_client::{nonblocking::rpc_client::RpcClient as NbRpcClient, rpc_client::RpcClient},
solana_program::pubkey::Pubkey,
solana_sdk::{commitment_config::CommitmentConfig, signer::keypair::Keypair},
@ -215,8 +215,7 @@ pub async fn main() -> anyhow::Result<()> {
);
tasks.append(&mut writers_jh);
let stats_handle =
mango_sim_stats.update_from_tx_status_stream(tx_status_sx.subscribe(), exit_signal.clone());
let stats_handle = mango_sim_stats.update_from_tx_status_stream(tx_status_sx.subscribe());
tasks.push(stats_handle);
let mut confirmation_threads = confirmations_by_blocks(

View File

@ -13,14 +13,9 @@ use solana_sdk::{instruction::Instruction, pubkey::Pubkey};
use bytemuck::cast_ref;
use mango_feeds_connector::{
account_write_filter::AccountWriteSink,
chain_data::AccountData,
};
use mango_feeds_connector::{account_write_filter::AccountWriteSink, chain_data::AccountData};
use crate::{
helpers::{to_sdk_instruction, to_sp_pk},
};
use crate::helpers::{to_sdk_instruction, to_sp_pk};
const MAX_BACKLOG: usize = 2;
const MAX_EVENTS_PER_TX: usize = 10;
@ -61,7 +56,11 @@ type EventQueueEvents = [AnyEvent; QUEUE_LEN];
#[async_trait]
impl AccountWriteSink for MangoV3PerpCrankSink {
async fn process(&self, pk: &mango_feeds_connector::solana_sdk::pubkey::Pubkey, account: &AccountData) -> Result<(), String> {
async fn process(
&self,
pk: &mango_feeds_connector::solana_sdk::pubkey::Pubkey,
account: &AccountData,
) -> Result<(), String> {
let account = &account.account;
let (ix, mkt_pk): (Result<Instruction, String>, Pubkey) = {
@ -112,7 +111,6 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
)
.collect();
let pk = solana_sdk::pubkey::Pubkey::new_from_array(pk.to_bytes());
let mkt_pk = self
.mkt_pks_by_evq_pks
@ -125,7 +123,7 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
&to_sp_pk(&self.group_pk),
&to_sp_pk(&self.cache_pk),
&to_sp_pk(mkt_pk),
&to_sp_pk(&pk),
&to_sp_pk(&pk),
&mut mango_accounts,
MAX_EVENTS_PER_TX,
)

View File

@ -1,6 +1,6 @@
use tokio::{task::JoinHandle, sync::broadcast::Receiver};
use async_std::fs::File;
use crate::states::{BlockData, TransactionConfirmRecord};
use async_std::fs::File;
use tokio::{sync::broadcast::Receiver, task::JoinHandle};
pub fn initialize_result_writers(
transaction_save_file: String,
@ -13,7 +13,7 @@ pub fn initialize_result_writers(
if !transaction_save_file.is_empty() {
let tx_data_jh = tokio::spawn(async move {
let mut writer = csv_async::AsyncSerializer::from_writer(
File::create(transaction_save_file).await.unwrap()
File::create(transaction_save_file).await.unwrap(),
);
let mut tx_data = tx_data;
loop {
@ -31,7 +31,7 @@ pub fn initialize_result_writers(
if !block_data_save_file.is_empty() {
let block_data_jh = tokio::spawn(async move {
let mut writer = csv_async::AsyncSerializer::from_writer(
File::create(block_data_save_file).await.unwrap()
File::create(block_data_save_file).await.unwrap(),
);
let mut block_data = block_data;
loop {

View File

@ -1,6 +1,6 @@
use std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicU64, Ordering},
Arc,
},
time::Instant,
@ -63,16 +63,11 @@ impl MangoSimulationStats {
pub fn update_from_tx_status_stream(
&self,
tx_confirm_record_reciever: tokio::sync::broadcast::Receiver<TransactionConfirmRecord>,
do_exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let counters = self.counters.clone();
tokio::spawn(async move {
let mut tx_confirm_record_reciever = tx_confirm_record_reciever;
loop {
if do_exit.load(Ordering::Relaxed) {
break;
}
if let Ok(tx_data) = tx_confirm_record_reciever.recv().await {
if let Some(_) = tx_data.confirmed_at {
counters.num_confirmed_txs.fetch_add(1, Ordering::Relaxed);
@ -307,17 +302,53 @@ impl MangoSimulationStats {
f64
),
("keeper_consume_events_sent", num_consume_events_txs, i64),
("keeper_consume_events_success", succ_consume_events_txs, i64),
(
"keeper_consume_events_success",
succ_consume_events_txs,
i64
),
("keeper_cache_price_sent", num_cache_price_txs, i64),
("keeper_cache_price_success", succ_cache_price_txs, i64),
("keeper_update_and_cache_qb_sent", num_update_and_cache_quote_bank_txs, i64),
("keeper_update_and_cache_qb_succ", succ_update_and_cache_quote_bank_txs, i64),
("keeper_update_root_banks_sent", num_update_root_banks_txs, i64),
("keeper_update_root_banks_succ", succ_update_root_banks_txs, i64),
("keeper_cache_root_banks_sent", num_cache_root_banks_txs, i64),
("keeper_cache_root_banks_succ", succ_cache_root_banks_txs, i64),
("keeper_update_perp_cache_sent", num_update_perp_cache_txs, i64),
("keeper_update_perp_cache_succ", succ_update_perp_cache_txs, i64),
(
"keeper_update_and_cache_qb_sent",
num_update_and_cache_quote_bank_txs,
i64
),
(
"keeper_update_and_cache_qb_succ",
succ_update_and_cache_quote_bank_txs,
i64
),
(
"keeper_update_root_banks_sent",
num_update_root_banks_txs,
i64
),
(
"keeper_update_root_banks_succ",
succ_update_root_banks_txs,
i64
),
(
"keeper_cache_root_banks_sent",
num_cache_root_banks_txs,
i64
),
(
"keeper_cache_root_banks_succ",
succ_cache_root_banks_txs,
i64
),
(
"keeper_update_perp_cache_sent",
num_update_perp_cache_txs,
i64
),
(
"keeper_update_perp_cache_succ",
succ_update_perp_cache_txs,
i64
),
("keeper_update_funding_sent", num_update_funding_txs, i64),
("keeper_update_funding_succ", succ_update_funding_txs, i64),
);