Compare commits
12 Commits
c2c52c25f9
...
660a5f5b68
Author | SHA1 | Date |
---|---|---|
galactus | 660a5f5b68 | |
Godmode Galactus | e7f7d415ad | |
Godmode Galactus | 6b946ea4e9 | |
Godmode Galactus | d29a936e92 | |
Godmode Galactus | 7853684abf | |
Godmode Galactus | 33850b1e5f | |
Godmode Galactus | 72c4f79911 | |
Godmode Galactus | 92a0db9cc3 | |
Godmode Galactus | 3c2ce7647d | |
Godmode Galactus | 4d56e63e6f | |
Godmode Galactus | e765215beb | |
Godmode Galactus | babddac324 |
File diff suppressed because it is too large
Load Diff
36
Cargo.toml
36
Cargo.toml
|
@ -22,34 +22,33 @@ csv-async = "1.2"
|
|||
dashmap = "5.4.0"
|
||||
fixed = { version = ">=1.11.0, <1.12.0", features = ["serde"] }
|
||||
fixed-macro = "^1.1.1"
|
||||
futures = "0.3.17"
|
||||
futures = "0.3.28"
|
||||
iter_tools = "0.1.4"
|
||||
log = "0.4.14"
|
||||
multiqueue = "^0.3.2"
|
||||
rand = ">=0.8.5"
|
||||
rayon = "1.5.1"
|
||||
serde = "1.0.136"
|
||||
serde_derive = "1.0.103"
|
||||
serde_json = "1.0.79"
|
||||
serde_yaml = "0.8.23"
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio = { version = "1.14.1", features = ["full"] }
|
||||
regex = "1.7.3"
|
||||
|
||||
solana-client = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-core = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-metrics = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-rpc = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-runtime = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-sdk = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-clap-utils = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-cli-config = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-net-utils = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-version = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-logger = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-quic-client = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
|
||||
solana-client = "1.15.2"
|
||||
solana-metrics = "1.15.2"
|
||||
solana-rpc-client = "1.15.2"
|
||||
solana-runtime = "1.15.2"
|
||||
solana-sdk = "1.15.2"
|
||||
solana-clap-utils = "1.15.2"
|
||||
solana-cli-config = "1.15.2"
|
||||
solana-net-utils = "1.15.2"
|
||||
solana-version = "1.15.2"
|
||||
solana-logger = "1.15.2"
|
||||
solana-transaction-status = "1.15.2"
|
||||
solana-account-decoder = "1.15.2"
|
||||
|
||||
solana-lite-rpc-core = { git = "https://github.com/blockworks-foundation/lite-rpc.git", tag = "v0.2.2" }
|
||||
solana-lite-rpc-services = { git = "https://github.com/blockworks-foundation/lite-rpc.git", tag = "v0.2.2" }
|
||||
|
||||
|
||||
# pin program to mango-v3 version of solana sdk
|
||||
|
@ -61,6 +60,7 @@ mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "
|
|||
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0" }
|
||||
mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "ckamm/solana-versions2", default-features = false, features = ["solana-1-15"] }
|
||||
bincode = "1.3.3"
|
||||
"yellowstone-grpc-proto" = "=1.1.0"
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
targets = ["x86_64-unknown-linux-gnu"]
|
||||
|
|
|
@ -9,6 +9,8 @@ The code then will create transaction request (q) requests per seconds for (n) s
|
|||
|
||||
For the best results to avoid limits by quic it is better to fill the argument "identity" of a valid staked validator for the cluster you are testing with.
|
||||
|
||||
Do not use localhost use http://127.0.0.1:8899 instead.
|
||||
|
||||
## Build
|
||||
|
||||
Install configure-mango
|
||||
|
@ -69,7 +71,7 @@ OPTIONS:
|
|||
-i, --identity <FILEPATH> Identity used in the QUIC connection. Identity with a lot of stake has a
|
||||
better chance to send transaction to the leader
|
||||
-u, --url <URL_OR_MONIKER> URL for Solana's JSON RPC or moniker (or their first letter): [mainnet-
|
||||
beta, testnet, devnet, localhost]
|
||||
beta, testnet, devnet, 127.0.0.1:8899]
|
||||
-k, --keeper-authority <FILEPATH> If specified, authority keypair would be used to pay for keeper
|
||||
transactions
|
||||
-c, --mango-cluster <STR> Name of mango cluster from ids.json
|
||||
|
|
|
@ -11,7 +11,7 @@ pub struct Config {
|
|||
pub entrypoint_addr: SocketAddr,
|
||||
pub json_rpc_url: String,
|
||||
pub websocket_url: String,
|
||||
pub id: Keypair,
|
||||
pub identity: Keypair,
|
||||
pub duration: Duration,
|
||||
pub quotes_per_second: u64,
|
||||
pub account_keys: String,
|
||||
|
@ -32,7 +32,7 @@ impl Default for Config {
|
|||
entrypoint_addr: SocketAddr::from(([127, 0, 0, 1], 8001)),
|
||||
json_rpc_url: ConfigInput::default().json_rpc_url,
|
||||
websocket_url: ConfigInput::default().websocket_url,
|
||||
id: Keypair::new(),
|
||||
identity: Keypair::new(),
|
||||
duration: Duration::new(std::u64::MAX, 0),
|
||||
quotes_per_second: 1,
|
||||
account_keys: String::new(),
|
||||
|
@ -249,7 +249,7 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
|
|||
&config.keypair_path,
|
||||
);
|
||||
if let Ok(id) = read_keypair_file(id_path) {
|
||||
args.id = id;
|
||||
args.identity = id;
|
||||
} else if matches.is_present("identity") {
|
||||
panic!("could not parse identity path");
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ use chrono::Utc;
|
|||
use dashmap::DashMap;
|
||||
use log::{debug, warn};
|
||||
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig};
|
||||
use solana_lite_rpc_core::notifications::NotificationMsg;
|
||||
use solana_sdk::{
|
||||
commitment_config::{CommitmentConfig, CommitmentLevel},
|
||||
signature::Signature,
|
||||
|
@ -31,6 +32,7 @@ pub async fn process_blocks(
|
|||
tx_block_data: Sender<BlockData>,
|
||||
transaction_map: Arc<DashMap<Signature, (TransactionSendRecord, Instant)>>,
|
||||
slot: u64,
|
||||
commitment: CommitmentLevel,
|
||||
) {
|
||||
let mut mm_transaction_count: u64 = 0;
|
||||
let rewards = block.rewards.as_ref().unwrap();
|
||||
|
@ -122,9 +124,11 @@ pub async fn process_blocks(
|
|||
} else {
|
||||
0
|
||||
},
|
||||
number_of_mm_transactions: mm_transaction_count,
|
||||
number_of_mango_simulation_txs: mm_transaction_count,
|
||||
total_transactions: nb_transactions as u64,
|
||||
cu_consumed: cu_consumed,
|
||||
cu_consumed: 0,
|
||||
cu_consumed_by_mango_simulations: cu_consumed,
|
||||
commitment,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -153,6 +157,161 @@ async fn get_blocks_with_retry(
|
|||
Err(())
|
||||
}
|
||||
|
||||
pub fn confirmation_by_lite_rpc_notification_stream(
|
||||
tx_record_rx: UnboundedReceiver<TransactionSendRecord>,
|
||||
notification_stream: UnboundedReceiver<NotificationMsg>,
|
||||
tx_confirm_records: tokio::sync::broadcast::Sender<TransactionConfirmRecord>,
|
||||
tx_block_data: tokio::sync::broadcast::Sender<BlockData>,
|
||||
exit_signal: Arc<AtomicBool>,
|
||||
) -> Vec<JoinHandle<()>> {
|
||||
let transaction_map: Arc<DashMap<String, (TransactionSendRecord, Instant)>> =
|
||||
Arc::new(DashMap::new());
|
||||
|
||||
let confirming_task = {
|
||||
let transaction_map = transaction_map.clone();
|
||||
let tx_confirm_records = tx_confirm_records.clone();
|
||||
let exit_signal = exit_signal.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut tx_record_rx = tx_record_rx;
|
||||
let mut notification_stream = notification_stream;
|
||||
|
||||
while !transaction_map.is_empty() || !exit_signal.load(Ordering::Relaxed) {
|
||||
tokio::select! {
|
||||
transaction_record = tx_record_rx.recv() => {
|
||||
if let Some(transaction_record) = transaction_record{
|
||||
transaction_map
|
||||
.insert(transaction_record.signature.to_string(), (transaction_record, Instant::now()));
|
||||
}
|
||||
|
||||
},
|
||||
notification = notification_stream.recv() => {
|
||||
if let Some(notification) = notification {
|
||||
|
||||
match notification {
|
||||
NotificationMsg::BlockNotificationMsg(block_notification) => {
|
||||
if block_notification.commitment != CommitmentLevel::Finalized {
|
||||
continue;
|
||||
}
|
||||
let _ = tx_block_data.send(BlockData {
|
||||
block_hash: block_notification.blockhash.to_string(),
|
||||
block_leader: block_notification.block_leader,
|
||||
block_slot: block_notification.slot,
|
||||
block_time: block_notification.block_time,
|
||||
number_of_mango_simulation_txs: block_notification.transaction_found,
|
||||
total_transactions: block_notification.total_transactions,
|
||||
cu_consumed: block_notification.total_cu_consumed,
|
||||
cu_consumed_by_mango_simulations: block_notification.cu_consumed_by_txs,
|
||||
commitment: block_notification.commitment,
|
||||
});
|
||||
}
|
||||
NotificationMsg::UpdateTransactionMsg(tx_update_notifications) => {
|
||||
|
||||
for tx_notification in tx_update_notifications {
|
||||
if tx_notification.commitment != CommitmentLevel::Finalized {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(value) = transaction_map.get(&tx_notification.signature) {
|
||||
let (tx_sent_record, _) = value.clone();
|
||||
let error = match &tx_notification.transaction_status {
|
||||
Err(e) => {
|
||||
Some(e.to_string())
|
||||
},
|
||||
_ => None
|
||||
};
|
||||
let _ = tx_confirm_records.send(TransactionConfirmRecord {
|
||||
signature: tx_notification.signature.clone(),
|
||||
confirmed_slot: Some(tx_notification.slot),
|
||||
confirmed_at: Some(Utc::now().to_string()),
|
||||
sent_at: tx_sent_record.sent_at.to_string(),
|
||||
sent_slot: tx_sent_record.sent_slot,
|
||||
successful: tx_notification.transaction_status.is_ok(),
|
||||
error,
|
||||
block_hash: Some(tx_notification.blockhash),
|
||||
market: tx_sent_record.market.map(|x| x.to_string()),
|
||||
market_maker: tx_sent_record.market_maker.map(|x| x.to_string()),
|
||||
keeper_instruction: tx_sent_record.keeper_instruction.clone(),
|
||||
slot_processed: Some(tx_notification.slot),
|
||||
slot_leader: Some(tx_notification.leader.to_string()),
|
||||
timed_out: false,
|
||||
priority_fees: tx_sent_record.priority_fees,
|
||||
});
|
||||
}
|
||||
|
||||
transaction_map.remove(&tx_notification.signature);
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
// others do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = tokio::time::sleep(Duration::from_secs(1)) => {
|
||||
// timeout
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
log::info!("stopped processing the transactions");
|
||||
})
|
||||
};
|
||||
|
||||
let cleaner_jh = {
|
||||
let transaction_map = transaction_map.clone();
|
||||
let exit_signal = exit_signal.clone();
|
||||
let tx_confirm_records = tx_confirm_records.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
{
|
||||
let mut to_remove = vec![];
|
||||
|
||||
for tx_data in transaction_map.iter() {
|
||||
let sent_record = &tx_data.0;
|
||||
let instant = tx_data.1;
|
||||
let signature = tx_data.key();
|
||||
let remove = instant.elapsed() > Duration::from_secs(120);
|
||||
|
||||
// add to timeout if not retaining
|
||||
if remove {
|
||||
let _ = tx_confirm_records.send(TransactionConfirmRecord {
|
||||
signature: signature.to_string(),
|
||||
confirmed_slot: None,
|
||||
confirmed_at: None,
|
||||
sent_at: sent_record.sent_at.to_string(),
|
||||
sent_slot: sent_record.sent_slot,
|
||||
successful: false,
|
||||
error: Some("timeout".to_string()),
|
||||
block_hash: None,
|
||||
market: sent_record.market.map(|x| x.to_string()),
|
||||
market_maker: sent_record.market_maker.map(|x| x.to_string()),
|
||||
keeper_instruction: sent_record.keeper_instruction.clone(),
|
||||
slot_processed: None,
|
||||
slot_leader: None,
|
||||
timed_out: true,
|
||||
priority_fees: sent_record.priority_fees,
|
||||
});
|
||||
to_remove.push(signature.clone());
|
||||
}
|
||||
}
|
||||
|
||||
for signature in to_remove {
|
||||
transaction_map.remove(&signature);
|
||||
}
|
||||
|
||||
// if exit and all the transactions are processed
|
||||
if exit_signal.load(Ordering::Relaxed) && transaction_map.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
vec![confirming_task, cleaner_jh]
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub fn confirmations_by_blocks(
|
||||
client: Arc<RpcClient>,
|
||||
mut tx_record_rx: UnboundedReceiver<TransactionSendRecord>,
|
||||
|
@ -309,6 +468,7 @@ pub fn confirmations_by_blocks(
|
|||
tx_block_data,
|
||||
transaction_map,
|
||||
block_slot.1,
|
||||
commitment_confirmation.commitment,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
|
|
@ -107,10 +107,7 @@ pub fn start(
|
|||
};
|
||||
|
||||
let tpu_manager = tpu_manager.clone();
|
||||
tokio::spawn(async move {
|
||||
let ok = tpu_manager.send_transaction(&tx, tx_send_record).await;
|
||||
trace!("send tx={:?} ok={ok}", tx.signatures[0]);
|
||||
});
|
||||
tpu_manager.send_transaction(&tx, tx_send_record).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
122
src/helpers.rs
122
src/helpers.rs
|
@ -13,7 +13,7 @@ use fixed::types::I80F48;
|
|||
use log::{debug, info};
|
||||
use mango::state::{MangoCache, MangoGroup, PerpMarket};
|
||||
use mango_common::Loadable;
|
||||
use solana_client::rpc_client::RpcClient;
|
||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_program::{clock::DEFAULT_MS_PER_SLOT, pubkey::Pubkey};
|
||||
use solana_sdk::hash::Hash;
|
||||
use tokio::{sync::RwLock, task::JoinHandle};
|
||||
|
@ -53,14 +53,14 @@ pub fn to_sdk_instruction(
|
|||
}
|
||||
}
|
||||
|
||||
pub fn load_from_rpc<T: Loadable>(rpc_client: &RpcClient, pk: &Pubkey) -> T {
|
||||
let acc = rpc_client.get_account(&to_sdk_pk(pk)).unwrap();
|
||||
pub async fn load_from_rpc<T: Loadable>(rpc_client: &RpcClient, pk: &Pubkey) -> T {
|
||||
let acc = rpc_client.get_account(&to_sdk_pk(pk)).await.unwrap();
|
||||
return T::load_from_bytes(acc.data.as_slice()).unwrap().clone();
|
||||
}
|
||||
|
||||
pub async fn get_latest_blockhash(rpc_client: &RpcClient) -> Hash {
|
||||
loop {
|
||||
match rpc_client.get_latest_blockhash() {
|
||||
match rpc_client.get_latest_blockhash().await {
|
||||
Ok(blockhash) => return blockhash,
|
||||
Err(err) => {
|
||||
info!("Couldn't get last blockhash: {:?}", err);
|
||||
|
@ -73,7 +73,7 @@ pub async fn get_latest_blockhash(rpc_client: &RpcClient) -> Hash {
|
|||
pub async fn get_new_latest_blockhash(client: Arc<RpcClient>, blockhash: &Hash) -> Option<Hash> {
|
||||
let start = Instant::now();
|
||||
while start.elapsed().as_secs() < 5 {
|
||||
if let Ok(new_blockhash) = client.get_latest_blockhash() {
|
||||
if let Ok(new_blockhash) = client.get_latest_blockhash().await {
|
||||
if new_blockhash != *blockhash {
|
||||
debug!("Got new blockhash ({:?})", blockhash);
|
||||
return Some(new_blockhash);
|
||||
|
@ -103,7 +103,7 @@ pub async fn poll_blockhash_and_slot(
|
|||
break;
|
||||
}
|
||||
|
||||
match client.get_slot() {
|
||||
match client.get_slot().await {
|
||||
Ok(new_slot) => slot.store(new_slot, Ordering::Release),
|
||||
Err(e) => {
|
||||
info!("Failed to download slot: {}, skip", e);
|
||||
|
@ -147,72 +147,68 @@ pub fn start_blockhash_polling_service(
|
|||
})
|
||||
}
|
||||
|
||||
pub fn get_mango_market_perps_cache(
|
||||
pub async fn get_mango_market_perps_cache(
|
||||
rpc_client: Arc<RpcClient>,
|
||||
mango_group_config: &GroupConfig,
|
||||
mango_program_pk: &Pubkey,
|
||||
) -> Vec<PerpMarketCache> {
|
||||
// fetch group
|
||||
let mango_group_pk = Pubkey::from_str(mango_group_config.public_key.as_str()).unwrap();
|
||||
let mango_group = load_from_rpc::<MangoGroup>(&rpc_client, &mango_group_pk);
|
||||
let mango_group = load_from_rpc::<MangoGroup>(&rpc_client, &mango_group_pk).await;
|
||||
let mango_cache_pk = Pubkey::from_str(mango_group.mango_cache.to_string().as_str()).unwrap();
|
||||
let mango_cache = load_from_rpc::<MangoCache>(&rpc_client, &mango_cache_pk);
|
||||
let mango_cache = load_from_rpc::<MangoCache>(&rpc_client, &mango_cache_pk).await;
|
||||
let mut ret = vec![];
|
||||
for market_index in 0..mango_group_config.perp_markets.len() {
|
||||
let perp_maket_config = &mango_group_config.perp_markets[market_index];
|
||||
let perp_market_pk = Pubkey::from_str(perp_maket_config.public_key.as_str()).unwrap();
|
||||
let perp_market = load_from_rpc::<PerpMarket>(&rpc_client, &perp_market_pk).await;
|
||||
|
||||
mango_group_config
|
||||
.perp_markets
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(market_index, perp_maket_config)| {
|
||||
let perp_market_pk = Pubkey::from_str(perp_maket_config.public_key.as_str()).unwrap();
|
||||
let perp_market = load_from_rpc::<PerpMarket>(&rpc_client, &perp_market_pk);
|
||||
// fetch price
|
||||
let base_decimals = mango_group_config.tokens[market_index].decimals;
|
||||
let quote_decimals = mango_group_config.tokens[0].decimals;
|
||||
|
||||
// fetch price
|
||||
let base_decimals = mango_group_config.tokens[market_index].decimals;
|
||||
let quote_decimals = mango_group_config.tokens[0].decimals;
|
||||
let base_unit = I80F48::from_num(10u64.pow(base_decimals as u32));
|
||||
let quote_unit = I80F48::from_num(10u64.pow(quote_decimals as u32));
|
||||
let price = mango_cache.price_cache[market_index].price;
|
||||
println!(
|
||||
"market index {} price of : {}",
|
||||
market_index, mango_cache.price_cache[market_index].price
|
||||
);
|
||||
|
||||
let base_unit = I80F48::from_num(10u64.pow(base_decimals as u32));
|
||||
let quote_unit = I80F48::from_num(10u64.pow(quote_decimals as u32));
|
||||
let price = mango_cache.price_cache[market_index].price;
|
||||
println!(
|
||||
"market index {} price of : {}",
|
||||
market_index, mango_cache.price_cache[market_index].price
|
||||
);
|
||||
let price_quote_lots: i64 = price
|
||||
.mul(quote_unit)
|
||||
.mul(I80F48::from_num(perp_market.base_lot_size))
|
||||
.div(I80F48::from_num(perp_market.quote_lot_size))
|
||||
.div(base_unit)
|
||||
.to_num();
|
||||
let order_base_lots: i64 = base_unit
|
||||
.div(I80F48::from_num(perp_market.base_lot_size))
|
||||
.to_num();
|
||||
|
||||
let price_quote_lots: i64 = price
|
||||
.mul(quote_unit)
|
||||
.mul(I80F48::from_num(perp_market.base_lot_size))
|
||||
.div(I80F48::from_num(perp_market.quote_lot_size))
|
||||
.div(base_unit)
|
||||
.to_num();
|
||||
let order_base_lots: i64 = base_unit
|
||||
.div(I80F48::from_num(perp_market.base_lot_size))
|
||||
.to_num();
|
||||
|
||||
let root_bank = &mango_group_config.tokens[market_index].root_key;
|
||||
let root_bank = Pubkey::from_str(root_bank.as_str()).unwrap();
|
||||
let node_banks = mango_group_config.tokens[market_index]
|
||||
.node_keys
|
||||
.iter()
|
||||
.map(|x| Pubkey::from_str(x.as_str()).unwrap())
|
||||
.collect();
|
||||
let price_oracle =
|
||||
Pubkey::from_str(mango_group_config.oracles[market_index].public_key.as_str())
|
||||
.unwrap();
|
||||
PerpMarketCache {
|
||||
order_base_lots,
|
||||
price,
|
||||
price_quote_lots,
|
||||
mango_program_pk: mango_program_pk.clone(),
|
||||
mango_group_pk,
|
||||
mango_cache_pk,
|
||||
perp_market_pk,
|
||||
perp_market,
|
||||
root_bank,
|
||||
node_banks,
|
||||
price_oracle,
|
||||
bids: perp_market.bids,
|
||||
asks: perp_market.asks,
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
let root_bank = &mango_group_config.tokens[market_index].root_key;
|
||||
let root_bank = Pubkey::from_str(root_bank.as_str()).unwrap();
|
||||
let node_banks = mango_group_config.tokens[market_index]
|
||||
.node_keys
|
||||
.iter()
|
||||
.map(|x| Pubkey::from_str(x.as_str()).unwrap())
|
||||
.collect();
|
||||
let price_oracle =
|
||||
Pubkey::from_str(mango_group_config.oracles[market_index].public_key.as_str()).unwrap();
|
||||
ret.push(PerpMarketCache {
|
||||
order_base_lots,
|
||||
price,
|
||||
price_quote_lots,
|
||||
mango_program_pk: mango_program_pk.clone(),
|
||||
mango_group_pk,
|
||||
mango_cache_pk,
|
||||
perp_market_pk,
|
||||
perp_market,
|
||||
root_bank,
|
||||
node_banks,
|
||||
price_oracle,
|
||||
bids: perp_market.bids,
|
||||
asks: perp_market.asks,
|
||||
});
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
|
182
src/main.rs
182
src/main.rs
|
@ -2,7 +2,7 @@ use {
|
|||
log::info,
|
||||
mango_simulation::{
|
||||
cli,
|
||||
confirmation_strategies::confirmations_by_blocks,
|
||||
confirmation_strategies::confirmation_by_lite_rpc_notification_stream,
|
||||
crank::{self, KeeperConfig},
|
||||
helpers::{
|
||||
get_latest_blockhash, get_mango_market_perps_cache, start_blockhash_polling_service,
|
||||
|
@ -17,7 +17,20 @@ use {
|
|||
tpu_manager::TpuManager,
|
||||
},
|
||||
serde_json,
|
||||
solana_client::{nonblocking::rpc_client::RpcClient as NbRpcClient, rpc_client::RpcClient},
|
||||
solana_client::nonblocking::rpc_client::RpcClient as NbRpcClient,
|
||||
solana_lite_rpc_core::{
|
||||
block_store::BlockStore,
|
||||
notifications::NotificationMsg,
|
||||
quic_connection_utils::QuicConnectionParameters,
|
||||
tx_store::{empty_tx_store, TxStore},
|
||||
},
|
||||
solana_lite_rpc_services::{
|
||||
block_listenser::BlockListener,
|
||||
tpu_utils::tpu_service::{TpuService, TpuServiceConfig},
|
||||
transaction_replayer::TransactionReplayer,
|
||||
transaction_service::{TransactionService, TransactionServiceBuilder},
|
||||
tx_sender::TxSender,
|
||||
},
|
||||
solana_program::pubkey::Pubkey,
|
||||
solana_sdk::{commitment_config::CommitmentConfig, signer::keypair::Keypair},
|
||||
std::{
|
||||
|
@ -27,11 +40,66 @@ use {
|
|||
sync::Arc,
|
||||
time::Duration,
|
||||
},
|
||||
tokio::sync::mpsc::{unbounded_channel, UnboundedSender},
|
||||
tokio::{sync::RwLock, task::JoinHandle},
|
||||
};
|
||||
|
||||
const METRICS_NAME: &str = "mango-bencher";
|
||||
|
||||
async fn configure_transaction_service(
|
||||
rpc_client: Arc<NbRpcClient>,
|
||||
identity: Keypair,
|
||||
block_store: BlockStore,
|
||||
tx_store: TxStore,
|
||||
notifier: UnboundedSender<NotificationMsg>,
|
||||
) -> (TransactionService, JoinHandle<anyhow::Result<()>>) {
|
||||
let slot = rpc_client.get_slot().await.expect("GetSlot should work");
|
||||
let tpu_config = TpuServiceConfig {
|
||||
fanout_slots: 12,
|
||||
number_of_leaders_to_cache: 1024,
|
||||
clusterinfo_refresh_time: Duration::from_secs(60 * 60),
|
||||
leader_schedule_update_frequency: Duration::from_secs(10),
|
||||
maximum_transaction_in_queue: 200_000,
|
||||
maximum_number_of_errors: 10,
|
||||
quic_connection_params: QuicConnectionParameters {
|
||||
connection_timeout: Duration::from_secs(1),
|
||||
connection_retry_count: 10,
|
||||
finalize_timeout: Duration::from_millis(200),
|
||||
max_number_of_connections: 10,
|
||||
unistream_timeout: Duration::from_millis(500),
|
||||
write_timeout: Duration::from_secs(1),
|
||||
number_of_transactions_per_unistream: 10,
|
||||
},
|
||||
};
|
||||
|
||||
let tpu_service = TpuService::new(
|
||||
tpu_config,
|
||||
Arc::new(identity),
|
||||
slot,
|
||||
rpc_client.clone(),
|
||||
tx_store.clone(),
|
||||
)
|
||||
.await
|
||||
.expect("Should be able to create TPU");
|
||||
|
||||
let tx_sender = TxSender::new(tx_store.clone(), tpu_service.clone());
|
||||
let block_listenser =
|
||||
BlockListener::new(rpc_client.clone(), tx_store.clone(), block_store.clone());
|
||||
let replayer = TransactionReplayer::new(
|
||||
tpu_service.clone(),
|
||||
tx_store.clone(),
|
||||
Duration::from_secs(2),
|
||||
);
|
||||
let builder = TransactionServiceBuilder::new(
|
||||
tx_sender,
|
||||
replayer,
|
||||
block_listenser,
|
||||
tpu_service,
|
||||
1_000_000,
|
||||
);
|
||||
builder.start(Some(notifier), block_store, 10, Duration::from_secs(90))
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||
pub async fn main() -> anyhow::Result<()> {
|
||||
solana_logger::setup_with_default("info");
|
||||
|
@ -43,7 +111,7 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
let cli::Config {
|
||||
json_rpc_url,
|
||||
websocket_url,
|
||||
id,
|
||||
identity,
|
||||
account_keys,
|
||||
mango_keys,
|
||||
duration,
|
||||
|
@ -81,13 +149,28 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
.groups
|
||||
.iter()
|
||||
.find(|g| g.name == *mango_group_id)
|
||||
.unwrap();
|
||||
.expect("Mango group config should exist");
|
||||
|
||||
let nb_rpc_client = Arc::new(NbRpcClient::new_with_commitment(
|
||||
json_rpc_url.to_string(),
|
||||
CommitmentConfig::confirmed(),
|
||||
CommitmentConfig::finalized(),
|
||||
));
|
||||
|
||||
let tx_store = empty_tx_store();
|
||||
let block_store = BlockStore::new(&nb_rpc_client)
|
||||
.await
|
||||
.expect("Blockstore should be created");
|
||||
|
||||
let (notif_sx, notif_rx) = unbounded_channel();
|
||||
let (transaction_service, tx_service_jh) = configure_transaction_service(
|
||||
nb_rpc_client.clone(),
|
||||
Keypair::from_bytes(identity.to_bytes().as_slice()).unwrap(),
|
||||
block_store,
|
||||
tx_store,
|
||||
notif_sx,
|
||||
)
|
||||
.await;
|
||||
|
||||
let nb_users = account_keys_parsed.len();
|
||||
|
||||
let mut mango_sim_stats = MangoSimulationStats::new(
|
||||
|
@ -99,17 +182,24 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
|
||||
let (tx_record_sx, tx_record_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let tpu_manager = TpuManager::new(
|
||||
// continuosly fetch blockhash
|
||||
let exit_signal = Arc::new(AtomicBool::new(false));
|
||||
let latest_blockhash = get_latest_blockhash(&nb_rpc_client.clone()).await;
|
||||
let blockhash = Arc::new(RwLock::new(latest_blockhash));
|
||||
let current_slot = Arc::new(AtomicU64::new(0));
|
||||
let blockhash_thread = start_blockhash_polling_service(
|
||||
exit_signal.clone(),
|
||||
blockhash.clone(),
|
||||
current_slot.clone(),
|
||||
nb_rpc_client.clone(),
|
||||
websocket_url.clone(),
|
||||
solana_client::tpu_client::TpuClientConfig::default().fanout_slots,
|
||||
Keypair::from_bytes(id.to_bytes().as_slice()).unwrap(),
|
||||
);
|
||||
|
||||
let tpu_manager = TpuManager::new(
|
||||
transaction_service,
|
||||
mango_sim_stats.clone(),
|
||||
tx_record_sx.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
tpu_manager.force_reset_after_every(Duration::from_secs(300));
|
||||
.await?;
|
||||
|
||||
info!(
|
||||
"accounts:{:?} markets:{:?} quotes_per_second:{:?} expected_tps:{:?} duration:{:?}",
|
||||
|
@ -122,34 +212,25 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
duration
|
||||
);
|
||||
|
||||
// continuosly fetch blockhash
|
||||
let rpc_client = Arc::new(RpcClient::new_with_commitment(
|
||||
json_rpc_url.to_string(),
|
||||
CommitmentConfig::finalized(),
|
||||
));
|
||||
let exit_signal = Arc::new(AtomicBool::new(false));
|
||||
let latest_blockhash = get_latest_blockhash(&rpc_client.clone()).await;
|
||||
let blockhash = Arc::new(RwLock::new(latest_blockhash));
|
||||
let current_slot = Arc::new(AtomicU64::new(0));
|
||||
let blockhash_thread = start_blockhash_polling_service(
|
||||
exit_signal.clone(),
|
||||
blockhash.clone(),
|
||||
current_slot.clone(),
|
||||
rpc_client.clone(),
|
||||
);
|
||||
let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str()).unwrap();
|
||||
let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str())
|
||||
.expect("Mango program should be able to convert into pubkey");
|
||||
let perp_market_caches: Vec<PerpMarketCache> =
|
||||
get_mango_market_perps_cache(rpc_client.clone(), mango_group_config, &mango_program_pk);
|
||||
get_mango_market_perps_cache(nb_rpc_client.clone(), mango_group_config, &mango_program_pk)
|
||||
.await;
|
||||
|
||||
let quote_root_bank =
|
||||
Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str()).unwrap();
|
||||
Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str())
|
||||
.expect("Quote root bank should be able to convert into pubkey");
|
||||
();
|
||||
let quote_node_banks = mango_group_config
|
||||
.tokens
|
||||
.last()
|
||||
.unwrap()
|
||||
.node_keys
|
||||
.iter()
|
||||
.map(|x| Pubkey::from_str(x.as_str()).unwrap())
|
||||
.map(|x| {
|
||||
Pubkey::from_str(x.as_str()).expect("Token mint should be able to convert into pubkey")
|
||||
})
|
||||
.collect();
|
||||
|
||||
clean_market_makers(
|
||||
|
@ -177,7 +258,7 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
} else {
|
||||
None
|
||||
};
|
||||
let from_slot = current_slot.load(Ordering::Relaxed);
|
||||
|
||||
let keeper_config = KeeperConfig {
|
||||
program_id: to_sdk_pk(&mango_program_pk),
|
||||
rpc_url: json_rpc_url.clone(),
|
||||
|
@ -191,7 +272,7 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
current_slot.clone(),
|
||||
tpu_manager.clone(),
|
||||
mango_group_config,
|
||||
id,
|
||||
identity,
|
||||
keeper_prioritization,
|
||||
);
|
||||
|
||||
|
@ -205,7 +286,7 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
exit_signal.clone(),
|
||||
blockhash.clone(),
|
||||
current_slot.clone(),
|
||||
tpu_manager,
|
||||
tpu_manager.clone(),
|
||||
&duration,
|
||||
*quotes_per_second,
|
||||
*priority_fees_proba,
|
||||
|
@ -214,8 +295,7 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
|
||||
info!("Number of MM threads {}", mm_tasks.len());
|
||||
drop(tx_record_sx);
|
||||
let mut tasks = vec![];
|
||||
tasks.push(blockhash_thread);
|
||||
let mut tasks = vec![blockhash_thread];
|
||||
|
||||
let (tx_status_sx, tx_status_rx) = tokio::sync::broadcast::channel(1000000);
|
||||
let (block_status_sx, block_status_rx) = tokio::sync::broadcast::channel(1000000);
|
||||
|
@ -231,12 +311,11 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
);
|
||||
tasks.append(&mut writers_jh);
|
||||
|
||||
let mut confirmation_threads = confirmations_by_blocks(
|
||||
nb_rpc_client,
|
||||
let mut confirmation_threads = confirmation_by_lite_rpc_notification_stream(
|
||||
tx_record_rx,
|
||||
notif_rx,
|
||||
tx_status_sx,
|
||||
block_status_sx,
|
||||
from_slot,
|
||||
exit_signal.clone(),
|
||||
);
|
||||
tasks.append(&mut confirmation_threads);
|
||||
|
@ -264,12 +343,27 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
// we start stopping all other process
|
||||
// some processes like confirmation of transactions will take some time and will get additional 2 minutes
|
||||
// to confirm remaining transactions
|
||||
futures::future::join_all(mm_tasks).await;
|
||||
info!("finished market making, joining all other services");
|
||||
println!("finished market making, joining all other services");
|
||||
exit_signal.store(true, Ordering::Relaxed);
|
||||
let market_makers_wait_task = {
|
||||
let exit_signal = exit_signal.clone();
|
||||
tokio::spawn(async move {
|
||||
futures::future::join_all(mm_tasks).await;
|
||||
info!("finished market making, joining all other services");
|
||||
exit_signal.store(true, Ordering::Relaxed);
|
||||
})
|
||||
};
|
||||
|
||||
tasks.push(market_makers_wait_task);
|
||||
|
||||
let transaction_service = tokio::spawn(async move {
|
||||
let _ = tx_service_jh.await;
|
||||
info!("Transaction service joined");
|
||||
});
|
||||
|
||||
tokio::select! {
|
||||
_ = futures::future::join_all(tasks) => {},
|
||||
_ = transaction_service => {},
|
||||
};
|
||||
|
||||
futures::future::join_all(tasks).await;
|
||||
mango_sim_stats.report(true, METRICS_NAME).await;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
use std::{cell::RefCell, collections::{BTreeMap, HashSet}, convert::TryFrom, mem::size_of};
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
collections::{BTreeMap, HashSet},
|
||||
convert::TryFrom,
|
||||
mem::size_of,
|
||||
};
|
||||
|
||||
use arrayref::array_ref;
|
||||
use async_channel::Sender;
|
||||
|
@ -95,27 +100,22 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
|
|||
trace!("evq {pk:?} seq_num={seq_num} len={len} contains_fill_events={contains_fill_events} has_backlog={has_backlog}");
|
||||
|
||||
let mut mango_accounts = HashSet::new();
|
||||
event_queue
|
||||
.iter()
|
||||
.take(MAX_EVENTS_PER_TX)
|
||||
.for_each(|e|
|
||||
if mango_accounts.len() < MAX_ACCS_PER_TX {
|
||||
match EventType::try_from(e.event_type).expect("mango v4 event") {
|
||||
EventType::Fill => {
|
||||
let fill: &FillEvent = cast_ref(e);
|
||||
mango_accounts.insert(fill.maker);
|
||||
mango_accounts.insert(fill.taker);
|
||||
}
|
||||
EventType::Out => {
|
||||
let out: &OutEvent = cast_ref(e);
|
||||
mango_accounts.insert(out.owner);
|
||||
}
|
||||
EventType::Liquidate => {
|
||||
|
||||
}
|
||||
event_queue.iter().take(MAX_EVENTS_PER_TX).for_each(|e| {
|
||||
if mango_accounts.len() < MAX_ACCS_PER_TX {
|
||||
match EventType::try_from(e.event_type).expect("mango v4 event") {
|
||||
EventType::Fill => {
|
||||
let fill: &FillEvent = cast_ref(e);
|
||||
mango_accounts.insert(fill.maker);
|
||||
mango_accounts.insert(fill.taker);
|
||||
}
|
||||
EventType::Out => {
|
||||
let out: &OutEvent = cast_ref(e);
|
||||
mango_accounts.insert(out.owner);
|
||||
}
|
||||
EventType::Liquidate => {}
|
||||
}
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
let pk = solana_sdk::pubkey::Pubkey::new_from_array(pk.to_bytes());
|
||||
let mkt_pk = self
|
||||
|
@ -130,7 +130,10 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
|
|||
&to_sp_pk(&self.cache_pk),
|
||||
&to_sp_pk(mkt_pk),
|
||||
&to_sp_pk(&pk),
|
||||
&mut mango_accounts.iter().map(|pk| pk.clone()).collect::<Vec<_>>(),
|
||||
&mut mango_accounts
|
||||
.iter()
|
||||
.map(|pk| pk.clone())
|
||||
.collect::<Vec<_>>(),
|
||||
MAX_EVENTS_PER_TX,
|
||||
)
|
||||
.unwrap(),
|
||||
|
|
|
@ -317,49 +317,52 @@ pub async fn clean_market_makers(
|
|||
blockhash: Arc<RwLock<Hash>>,
|
||||
) {
|
||||
info!("Cleaning previous transactions by market makers");
|
||||
let mut tasks = vec![];
|
||||
|
||||
for market_maker in account_keys_parsed {
|
||||
let mango_account_pk =
|
||||
Pubkey::from_str(market_maker.mango_account_pks[0].as_str()).unwrap();
|
||||
for perp_market in perp_market_caches {
|
||||
let market_maker = market_maker.clone();
|
||||
let perp_market = perp_market.clone();
|
||||
let rpc_client = rpc_client.clone();
|
||||
let mango_account_pk = mango_account_pk.clone();
|
||||
let blockhash = blockhash.clone();
|
||||
for account_keys_parsed in account_keys_parsed.chunks(10) {
|
||||
let mut tasks = vec![];
|
||||
for market_maker in account_keys_parsed {
|
||||
let mango_account_pk =
|
||||
Pubkey::from_str(market_maker.mango_account_pks[0].as_str()).unwrap();
|
||||
for perp_market in perp_market_caches {
|
||||
let market_maker = market_maker.clone();
|
||||
let perp_market = perp_market.clone();
|
||||
let rpc_client = rpc_client.clone();
|
||||
let mango_account_pk = mango_account_pk.clone();
|
||||
let blockhash = blockhash.clone();
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
let mango_account_signer =
|
||||
Keypair::from_bytes(market_maker.secret_key.as_slice()).unwrap();
|
||||
let task = tokio::spawn(async move {
|
||||
let mango_account_signer =
|
||||
Keypair::from_bytes(market_maker.secret_key.as_slice()).unwrap();
|
||||
|
||||
for _ in 0..10 {
|
||||
let mut tx = create_cancel_all_orders(
|
||||
&perp_market,
|
||||
mango_account_pk,
|
||||
&mango_account_signer,
|
||||
);
|
||||
for _ in 0..10 {
|
||||
let mut tx = create_cancel_all_orders(
|
||||
&perp_market,
|
||||
mango_account_pk,
|
||||
&mango_account_signer,
|
||||
);
|
||||
|
||||
let recent_blockhash = *blockhash.read().await;
|
||||
tx.sign(&[&mango_account_signer], recent_blockhash);
|
||||
// send and confirm the transaction with an RPC
|
||||
if let Ok(res) = tokio::time::timeout(
|
||||
Duration::from_secs(10),
|
||||
rpc_client.send_and_confirm_transaction(&tx),
|
||||
)
|
||||
.await
|
||||
{
|
||||
match res {
|
||||
Ok(_) => break,
|
||||
Err(e) => info!("Error occured while doing cancel all for ma : {} and perp market : {} error : {}", mango_account_pk, perp_market.perp_market_pk, e),
|
||||
let recent_blockhash = *blockhash.read().await;
|
||||
tx.sign(&[&mango_account_signer], recent_blockhash);
|
||||
let sig = tx.signatures[0];
|
||||
// send and confirm the transaction with an RPC
|
||||
if let Ok(res) = tokio::time::timeout(
|
||||
Duration::from_secs(10),
|
||||
rpc_client.send_and_confirm_transaction(&tx),
|
||||
)
|
||||
.await
|
||||
{
|
||||
match res {
|
||||
Ok(_) => break,
|
||||
Err(e) => info!("Error occured while doing cancel all for ma : {}, sig : {} perp market : {} error : {}", mango_account_pk, sig, perp_market.perp_market_pk, e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
tasks.push(task);
|
||||
});
|
||||
tasks.push(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
futures::future::join_all(tasks).await;
|
||||
futures::future::join_all(tasks).await;
|
||||
}
|
||||
info!("finished cleaning market makers");
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use chrono::Utc;
|
||||
use std::str::FromStr;
|
||||
use solana_sdk::{instruction::Instruction, pubkey::Pubkey};
|
||||
use std::str::FromStr;
|
||||
|
||||
pub fn instruction(data: Vec<u8>) -> Instruction {
|
||||
Instruction {
|
||||
|
|
|
@ -3,7 +3,7 @@ use fixed::types::I80F48;
|
|||
use mango::state::PerpMarket;
|
||||
use serde::Serialize;
|
||||
use solana_program::{pubkey::Pubkey, slot_history::Slot};
|
||||
use solana_sdk::signature::Signature;
|
||||
use solana_sdk::{signature::Signature, commitment_config::CommitmentLevel};
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
|
@ -99,7 +99,9 @@ pub struct BlockData {
|
|||
pub block_slot: Slot,
|
||||
pub block_leader: String,
|
||||
pub total_transactions: u64,
|
||||
pub number_of_mm_transactions: u64,
|
||||
pub number_of_mango_simulation_txs: u64,
|
||||
pub block_time: u64,
|
||||
pub cu_consumed: u64,
|
||||
pub cu_consumed_by_mango_simulations: u64,
|
||||
pub commitment: CommitmentLevel,
|
||||
}
|
||||
|
|
21
src/stats.rs
21
src/stats.rs
|
@ -1,16 +1,17 @@
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
sync::Mutex,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Instant, collections::HashMap,
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use crate::states::{KeeperInstruction, TransactionConfirmRecord};
|
||||
use iter_tools::Itertools;
|
||||
use solana_metrics::datapoint_info;
|
||||
use tokio::{task::JoinHandle, sync::RwLock};
|
||||
use tokio::{sync::RwLock, task::JoinHandle};
|
||||
|
||||
// Non atomic version of counters
|
||||
#[derive(Clone, Default, Debug)]
|
||||
|
@ -49,7 +50,7 @@ impl NACounters {
|
|||
pub fn diff(&self, other: &NACounters) -> NACounters {
|
||||
let mut new_error_count = HashMap::new();
|
||||
for (error, count) in &self.errors {
|
||||
if let Some(v) = other.errors.get( error ) {
|
||||
if let Some(v) = other.errors.get(error) {
|
||||
new_error_count.insert(error.clone(), *count - *v);
|
||||
} else {
|
||||
new_error_count.insert(error.clone(), *count);
|
||||
|
@ -383,7 +384,13 @@ impl MangoSimulationStats {
|
|||
.checked_div(counters.num_sent)
|
||||
.unwrap_or(0)
|
||||
);
|
||||
let top_5_errors = counters.errors.iter().sorted_by(|x,y| {(*y.1).cmp(x.1)}).take(5).enumerate().collect_vec();
|
||||
let top_5_errors = counters
|
||||
.errors
|
||||
.iter()
|
||||
.sorted_by(|x, y| (*y.1).cmp(x.1))
|
||||
.take(5)
|
||||
.enumerate()
|
||||
.collect_vec();
|
||||
let mut errors_to_print: String = String::new();
|
||||
for (idx, (error, count)) in top_5_errors {
|
||||
println!("Error #{idx} : {error} ({count})");
|
||||
|
@ -489,11 +496,7 @@ impl MangoSimulationStats {
|
|||
diff.succ_update_funding_txs,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"top_5_errors",
|
||||
errors_to_print,
|
||||
String
|
||||
)
|
||||
("top_5_errors", errors_to_print, String)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,138 +1,32 @@
|
|||
use bincode::serialize;
|
||||
use log::{info, warn, error};
|
||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_client::{connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient};
|
||||
use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool};
|
||||
use solana_sdk::signature::Keypair;
|
||||
use log::warn;
|
||||
use solana_client::connection_cache::ConnectionCache;
|
||||
use solana_lite_rpc_services::transaction_service::TransactionService;
|
||||
use solana_sdk::transaction::Transaction;
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
sync::{
|
||||
atomic::{AtomicU32, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
use tokio::sync::{mpsc::UnboundedSender, RwLock};
|
||||
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
use crate::{states::TransactionSendRecord, stats::MangoSimulationStats};
|
||||
|
||||
pub type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
|
||||
pub type QuicConnectionCache = ConnectionCache;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TpuManager {
|
||||
error_count: Arc<AtomicU32>,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
// why arc twice / one is so that we clone rwlock and other so that we can clone tpu client
|
||||
tpu_client: Arc<RwLock<Arc<QuicTpuClient>>>,
|
||||
pub ws_addr: String,
|
||||
fanout_slots: u64,
|
||||
identity: Arc<Keypair>,
|
||||
transaction_service: TransactionService,
|
||||
stats: MangoSimulationStats,
|
||||
tx_send_record: UnboundedSender<TransactionSendRecord>,
|
||||
}
|
||||
|
||||
impl TpuManager {
|
||||
pub async fn new(
|
||||
rpc_client: Arc<RpcClient>,
|
||||
ws_addr: String,
|
||||
fanout_slots: u64,
|
||||
identity: Keypair,
|
||||
transaction_service: TransactionService,
|
||||
stats: MangoSimulationStats,
|
||||
tx_send_record: UnboundedSender<TransactionSendRecord>,
|
||||
) -> Self {
|
||||
let connection_cache = ConnectionCache::new_with_client_options(
|
||||
4,
|
||||
None,
|
||||
Some((&identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))),
|
||||
None,
|
||||
);
|
||||
let quic_connection_cache =
|
||||
if let ConnectionCache::Quic(connection_cache) = connection_cache {
|
||||
Some(connection_cache)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let tpu_client = Arc::new(
|
||||
TpuClient::new_with_connection_cache(
|
||||
rpc_client.clone(),
|
||||
&ws_addr,
|
||||
solana_client::tpu_client::TpuClientConfig { fanout_slots },
|
||||
quic_connection_cache.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
Self {
|
||||
rpc_client,
|
||||
tpu_client: Arc::new(RwLock::new(tpu_client)),
|
||||
ws_addr,
|
||||
fanout_slots,
|
||||
error_count: Default::default(),
|
||||
identity: Arc::new(identity),
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
transaction_service,
|
||||
stats,
|
||||
tx_send_record,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn reset_tpu_client(&self) -> anyhow::Result<()> {
|
||||
let identity = Keypair::from_bytes(&self.identity.to_bytes()).unwrap();
|
||||
let connection_cache = ConnectionCache::new_with_client_options(
|
||||
4,
|
||||
None,
|
||||
Some((&identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))),
|
||||
None,
|
||||
);
|
||||
let quic_connection_cache =
|
||||
if let ConnectionCache::Quic(connection_cache) = connection_cache {
|
||||
Some(connection_cache)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let tpu_client = Arc::new(
|
||||
TpuClient::new_with_connection_cache(
|
||||
self.rpc_client.clone(),
|
||||
&self.ws_addr,
|
||||
solana_client::tpu_client::TpuClientConfig {
|
||||
fanout_slots: self.fanout_slots,
|
||||
},
|
||||
quic_connection_cache.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
self.error_count.store(0, Ordering::Relaxed);
|
||||
*self.tpu_client.write().await = tpu_client;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn reset(&self) -> anyhow::Result<()> {
|
||||
self.error_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
if self.error_count.load(Ordering::Relaxed) > 5 {
|
||||
self.reset_tpu_client().await?;
|
||||
info!("TPU Reset after 5 errors");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn force_reset_after_every(&self, duration: Duration) {
|
||||
let this = self.clone();
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(duration).await;
|
||||
if let Err(e) = this.reset_tpu_client().await {
|
||||
error!("timely restart of tpu client failed {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn get_tpu_client(&self) -> Arc<QuicTpuClient> {
|
||||
self.tpu_client.read().await.clone()
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn send_transaction(
|
||||
|
@ -140,7 +34,6 @@ impl TpuManager {
|
|||
transaction: &solana_sdk::transaction::Transaction,
|
||||
transaction_sent_record: TransactionSendRecord,
|
||||
) -> bool {
|
||||
let tpu_client = self.get_tpu_client().await;
|
||||
self.stats
|
||||
.inc_send(&transaction_sent_record.keeper_instruction);
|
||||
|
||||
|
@ -151,41 +44,28 @@ impl TpuManager {
|
|||
"sending error on channel : {}",
|
||||
sent.err().unwrap().to_string()
|
||||
);
|
||||
if let Err(e) = self.reset().await {
|
||||
error!("error while reseting tpu client {}", e);
|
||||
}
|
||||
}
|
||||
let transaction = bincode::serialize(transaction).unwrap();
|
||||
|
||||
tpu_client.send_transaction(transaction).await
|
||||
let res = self.transaction_service
|
||||
.send_transaction(transaction, None)
|
||||
.await;
|
||||
|
||||
if let Err(e) = &res{
|
||||
print!("error sending txs on custom tpu {e:?}");
|
||||
}
|
||||
res.is_ok()
|
||||
|
||||
}
|
||||
|
||||
pub async fn send_transaction_batch(
|
||||
&self,
|
||||
batch: &Vec<(Transaction, TransactionSendRecord)>,
|
||||
) -> bool {
|
||||
let tpu_client = self.get_tpu_client().await;
|
||||
|
||||
for (_tx, record) in batch {
|
||||
self.stats.inc_send(&record.keeper_instruction);
|
||||
|
||||
let tx_sent_record = self.tx_send_record.clone();
|
||||
let sent = tx_sent_record.send(record.clone());
|
||||
if sent.is_err() {
|
||||
warn!(
|
||||
"sending error on channel : {}",
|
||||
sent.err().unwrap().to_string()
|
||||
);
|
||||
}
|
||||
let mut value = true;
|
||||
for (tx, record) in batch {
|
||||
value &= self.send_transaction(tx, record.clone()).await;
|
||||
}
|
||||
|
||||
tpu_client
|
||||
.try_send_wire_transaction_batch(
|
||||
batch
|
||||
.iter()
|
||||
.map(|(tx, _)| serialize(tx).expect("serialization should succeed"))
|
||||
.collect(),
|
||||
)
|
||||
.await
|
||||
.is_ok()
|
||||
value
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue