Compare commits

...

12 Commits

Author SHA1 Message Date
galactus 660a5f5b68
Merge pull request #22 from blockworks-foundation/updating_to_custom_tpu
Updating to custom tpu
2023-06-30 22:40:37 +02:00
Godmode Galactus e7f7d415ad
Adding more info to block logs 2023-06-29 13:49:19 +02:00
Godmode Galactus 6b946ea4e9
Solving errors, making default rpc_client commitment as finalized 2023-06-27 10:44:59 +02:00
Godmode Galactus d29a936e92
fixing the cu consumed by blocks 2023-06-26 14:56:29 +02:00
Godmode Galactus 7853684abf
moving lite-rpc libraries to version v0.2.2 2023-06-23 15:30:02 +02:00
Godmode Galactus 33850b1e5f
git status 2023-06-23 15:29:28 +02:00
Godmode Galactus 72c4f79911
removing correctly the transactions from the map while confirming 2023-06-20 15:19:18 +02:00
Godmode Galactus 92a0db9cc3
removing unused dependecies and using lite-rpc tag instead of branch 2023-06-20 15:14:46 +02:00
Godmode Galactus 3c2ce7647d
solving deadlock and pointing cargo to github repo 2023-06-15 18:43:57 +02:00
Godmode Galactus 4d56e63e6f
removing confirmed fromt transaction map 2023-06-15 12:00:37 +02:00
Godmode Galactus e765215beb
making mango simulation work with custom tpu and verify transactions using notifications 2023-06-15 10:41:58 +02:00
Godmode Galactus babddac324
Adding solana lite-rpc dependency 2023-06-11 19:19:38 +02:00
14 changed files with 1977 additions and 3325 deletions

4464
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -22,34 +22,33 @@ csv-async = "1.2"
dashmap = "5.4.0"
fixed = { version = ">=1.11.0, <1.12.0", features = ["serde"] }
fixed-macro = "^1.1.1"
futures = "0.3.17"
futures = "0.3.28"
iter_tools = "0.1.4"
log = "0.4.14"
multiqueue = "^0.3.2"
rand = ">=0.8.5"
rayon = "1.5.1"
serde = "1.0.136"
serde_derive = "1.0.103"
serde_json = "1.0.79"
serde_yaml = "0.8.23"
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }
tokio = { version = "1.14.1", features = ["full"] }
regex = "1.7.3"
solana-client = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-core = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-metrics = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-rpc = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-runtime = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-sdk = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-clap-utils = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-cli-config = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-net-utils = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-version = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-logger = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-quic-client = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-client = "1.15.2"
solana-metrics = "1.15.2"
solana-rpc-client = "1.15.2"
solana-runtime = "1.15.2"
solana-sdk = "1.15.2"
solana-clap-utils = "1.15.2"
solana-cli-config = "1.15.2"
solana-net-utils = "1.15.2"
solana-version = "1.15.2"
solana-logger = "1.15.2"
solana-transaction-status = "1.15.2"
solana-account-decoder = "1.15.2"
solana-lite-rpc-core = { git = "https://github.com/blockworks-foundation/lite-rpc.git", tag = "v0.2.2" }
solana-lite-rpc-services = { git = "https://github.com/blockworks-foundation/lite-rpc.git", tag = "v0.2.2" }
# pin program to mango-v3 version of solana sdk
@ -61,6 +60,7 @@ mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0" }
mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "ckamm/solana-versions2", default-features = false, features = ["solana-1-15"] }
bincode = "1.3.3"
"yellowstone-grpc-proto" = "=1.1.0"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

View File

@ -9,6 +9,8 @@ The code then will create transaction request (q) requests per seconds for (n) s
For the best results to avoid limits by quic it is better to fill the argument "identity" of a valid staked validator for the cluster you are testing with.
Do not use localhost use http://127.0.0.1:8899 instead.
## Build
Install configure-mango
@ -69,7 +71,7 @@ OPTIONS:
-i, --identity <FILEPATH> Identity used in the QUIC connection. Identity with a lot of stake has a
better chance to send transaction to the leader
-u, --url <URL_OR_MONIKER> URL for Solana's JSON RPC or moniker (or their first letter): [mainnet-
beta, testnet, devnet, localhost]
beta, testnet, devnet, 127.0.0.1:8899]
-k, --keeper-authority <FILEPATH> If specified, authority keypair would be used to pay for keeper
transactions
-c, --mango-cluster <STR> Name of mango cluster from ids.json

View File

@ -11,7 +11,7 @@ pub struct Config {
pub entrypoint_addr: SocketAddr,
pub json_rpc_url: String,
pub websocket_url: String,
pub id: Keypair,
pub identity: Keypair,
pub duration: Duration,
pub quotes_per_second: u64,
pub account_keys: String,
@ -32,7 +32,7 @@ impl Default for Config {
entrypoint_addr: SocketAddr::from(([127, 0, 0, 1], 8001)),
json_rpc_url: ConfigInput::default().json_rpc_url,
websocket_url: ConfigInput::default().websocket_url,
id: Keypair::new(),
identity: Keypair::new(),
duration: Duration::new(std::u64::MAX, 0),
quotes_per_second: 1,
account_keys: String::new(),
@ -249,7 +249,7 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
&config.keypair_path,
);
if let Ok(id) = read_keypair_file(id_path) {
args.id = id;
args.identity = id;
} else if matches.is_present("identity") {
panic!("could not parse identity path");
}

View File

@ -10,6 +10,7 @@ use chrono::Utc;
use dashmap::DashMap;
use log::{debug, warn};
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig};
use solana_lite_rpc_core::notifications::NotificationMsg;
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
@ -31,6 +32,7 @@ pub async fn process_blocks(
tx_block_data: Sender<BlockData>,
transaction_map: Arc<DashMap<Signature, (TransactionSendRecord, Instant)>>,
slot: u64,
commitment: CommitmentLevel,
) {
let mut mm_transaction_count: u64 = 0;
let rewards = block.rewards.as_ref().unwrap();
@ -122,9 +124,11 @@ pub async fn process_blocks(
} else {
0
},
number_of_mm_transactions: mm_transaction_count,
number_of_mango_simulation_txs: mm_transaction_count,
total_transactions: nb_transactions as u64,
cu_consumed: cu_consumed,
cu_consumed: 0,
cu_consumed_by_mango_simulations: cu_consumed,
commitment,
});
}
}
@ -153,6 +157,161 @@ async fn get_blocks_with_retry(
Err(())
}
pub fn confirmation_by_lite_rpc_notification_stream(
tx_record_rx: UnboundedReceiver<TransactionSendRecord>,
notification_stream: UnboundedReceiver<NotificationMsg>,
tx_confirm_records: tokio::sync::broadcast::Sender<TransactionConfirmRecord>,
tx_block_data: tokio::sync::broadcast::Sender<BlockData>,
exit_signal: Arc<AtomicBool>,
) -> Vec<JoinHandle<()>> {
let transaction_map: Arc<DashMap<String, (TransactionSendRecord, Instant)>> =
Arc::new(DashMap::new());
let confirming_task = {
let transaction_map = transaction_map.clone();
let tx_confirm_records = tx_confirm_records.clone();
let exit_signal = exit_signal.clone();
tokio::spawn(async move {
let mut tx_record_rx = tx_record_rx;
let mut notification_stream = notification_stream;
while !transaction_map.is_empty() || !exit_signal.load(Ordering::Relaxed) {
tokio::select! {
transaction_record = tx_record_rx.recv() => {
if let Some(transaction_record) = transaction_record{
transaction_map
.insert(transaction_record.signature.to_string(), (transaction_record, Instant::now()));
}
},
notification = notification_stream.recv() => {
if let Some(notification) = notification {
match notification {
NotificationMsg::BlockNotificationMsg(block_notification) => {
if block_notification.commitment != CommitmentLevel::Finalized {
continue;
}
let _ = tx_block_data.send(BlockData {
block_hash: block_notification.blockhash.to_string(),
block_leader: block_notification.block_leader,
block_slot: block_notification.slot,
block_time: block_notification.block_time,
number_of_mango_simulation_txs: block_notification.transaction_found,
total_transactions: block_notification.total_transactions,
cu_consumed: block_notification.total_cu_consumed,
cu_consumed_by_mango_simulations: block_notification.cu_consumed_by_txs,
commitment: block_notification.commitment,
});
}
NotificationMsg::UpdateTransactionMsg(tx_update_notifications) => {
for tx_notification in tx_update_notifications {
if tx_notification.commitment != CommitmentLevel::Finalized {
continue;
}
if let Some(value) = transaction_map.get(&tx_notification.signature) {
let (tx_sent_record, _) = value.clone();
let error = match &tx_notification.transaction_status {
Err(e) => {
Some(e.to_string())
},
_ => None
};
let _ = tx_confirm_records.send(TransactionConfirmRecord {
signature: tx_notification.signature.clone(),
confirmed_slot: Some(tx_notification.slot),
confirmed_at: Some(Utc::now().to_string()),
sent_at: tx_sent_record.sent_at.to_string(),
sent_slot: tx_sent_record.sent_slot,
successful: tx_notification.transaction_status.is_ok(),
error,
block_hash: Some(tx_notification.blockhash),
market: tx_sent_record.market.map(|x| x.to_string()),
market_maker: tx_sent_record.market_maker.map(|x| x.to_string()),
keeper_instruction: tx_sent_record.keeper_instruction.clone(),
slot_processed: Some(tx_notification.slot),
slot_leader: Some(tx_notification.leader.to_string()),
timed_out: false,
priority_fees: tx_sent_record.priority_fees,
});
}
transaction_map.remove(&tx_notification.signature);
}
},
_ => {
// others do nothing
}
}
}
},
_ = tokio::time::sleep(Duration::from_secs(1)) => {
// timeout
continue;
}
}
}
log::info!("stopped processing the transactions");
})
};
let cleaner_jh = {
let transaction_map = transaction_map.clone();
let exit_signal = exit_signal.clone();
let tx_confirm_records = tx_confirm_records.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
{
let mut to_remove = vec![];
for tx_data in transaction_map.iter() {
let sent_record = &tx_data.0;
let instant = tx_data.1;
let signature = tx_data.key();
let remove = instant.elapsed() > Duration::from_secs(120);
// add to timeout if not retaining
if remove {
let _ = tx_confirm_records.send(TransactionConfirmRecord {
signature: signature.to_string(),
confirmed_slot: None,
confirmed_at: None,
sent_at: sent_record.sent_at.to_string(),
sent_slot: sent_record.sent_slot,
successful: false,
error: Some("timeout".to_string()),
block_hash: None,
market: sent_record.market.map(|x| x.to_string()),
market_maker: sent_record.market_maker.map(|x| x.to_string()),
keeper_instruction: sent_record.keeper_instruction.clone(),
slot_processed: None,
slot_leader: None,
timed_out: true,
priority_fees: sent_record.priority_fees,
});
to_remove.push(signature.clone());
}
}
for signature in to_remove {
transaction_map.remove(&signature);
}
// if exit and all the transactions are processed
if exit_signal.load(Ordering::Relaxed) && transaction_map.is_empty() {
break;
}
}
}
})
};
vec![confirming_task, cleaner_jh]
}
#[deprecated]
pub fn confirmations_by_blocks(
client: Arc<RpcClient>,
mut tx_record_rx: UnboundedReceiver<TransactionSendRecord>,
@ -309,6 +468,7 @@ pub fn confirmations_by_blocks(
tx_block_data,
transaction_map,
block_slot.1,
commitment_confirmation.commitment,
)
.await;
}

View File

@ -107,10 +107,7 @@ pub fn start(
};
let tpu_manager = tpu_manager.clone();
tokio::spawn(async move {
let ok = tpu_manager.send_transaction(&tx, tx_send_record).await;
trace!("send tx={:?} ok={ok}", tx.signatures[0]);
});
tpu_manager.send_transaction(&tx, tx_send_record).await;
}
}
});

View File

@ -13,7 +13,7 @@ use fixed::types::I80F48;
use log::{debug, info};
use mango::state::{MangoCache, MangoGroup, PerpMarket};
use mango_common::Loadable;
use solana_client::rpc_client::RpcClient;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_program::{clock::DEFAULT_MS_PER_SLOT, pubkey::Pubkey};
use solana_sdk::hash::Hash;
use tokio::{sync::RwLock, task::JoinHandle};
@ -53,14 +53,14 @@ pub fn to_sdk_instruction(
}
}
pub fn load_from_rpc<T: Loadable>(rpc_client: &RpcClient, pk: &Pubkey) -> T {
let acc = rpc_client.get_account(&to_sdk_pk(pk)).unwrap();
pub async fn load_from_rpc<T: Loadable>(rpc_client: &RpcClient, pk: &Pubkey) -> T {
let acc = rpc_client.get_account(&to_sdk_pk(pk)).await.unwrap();
return T::load_from_bytes(acc.data.as_slice()).unwrap().clone();
}
pub async fn get_latest_blockhash(rpc_client: &RpcClient) -> Hash {
loop {
match rpc_client.get_latest_blockhash() {
match rpc_client.get_latest_blockhash().await {
Ok(blockhash) => return blockhash,
Err(err) => {
info!("Couldn't get last blockhash: {:?}", err);
@ -73,7 +73,7 @@ pub async fn get_latest_blockhash(rpc_client: &RpcClient) -> Hash {
pub async fn get_new_latest_blockhash(client: Arc<RpcClient>, blockhash: &Hash) -> Option<Hash> {
let start = Instant::now();
while start.elapsed().as_secs() < 5 {
if let Ok(new_blockhash) = client.get_latest_blockhash() {
if let Ok(new_blockhash) = client.get_latest_blockhash().await {
if new_blockhash != *blockhash {
debug!("Got new blockhash ({:?})", blockhash);
return Some(new_blockhash);
@ -103,7 +103,7 @@ pub async fn poll_blockhash_and_slot(
break;
}
match client.get_slot() {
match client.get_slot().await {
Ok(new_slot) => slot.store(new_slot, Ordering::Release),
Err(e) => {
info!("Failed to download slot: {}, skip", e);
@ -147,72 +147,68 @@ pub fn start_blockhash_polling_service(
})
}
pub fn get_mango_market_perps_cache(
pub async fn get_mango_market_perps_cache(
rpc_client: Arc<RpcClient>,
mango_group_config: &GroupConfig,
mango_program_pk: &Pubkey,
) -> Vec<PerpMarketCache> {
// fetch group
let mango_group_pk = Pubkey::from_str(mango_group_config.public_key.as_str()).unwrap();
let mango_group = load_from_rpc::<MangoGroup>(&rpc_client, &mango_group_pk);
let mango_group = load_from_rpc::<MangoGroup>(&rpc_client, &mango_group_pk).await;
let mango_cache_pk = Pubkey::from_str(mango_group.mango_cache.to_string().as_str()).unwrap();
let mango_cache = load_from_rpc::<MangoCache>(&rpc_client, &mango_cache_pk);
let mango_cache = load_from_rpc::<MangoCache>(&rpc_client, &mango_cache_pk).await;
let mut ret = vec![];
for market_index in 0..mango_group_config.perp_markets.len() {
let perp_maket_config = &mango_group_config.perp_markets[market_index];
let perp_market_pk = Pubkey::from_str(perp_maket_config.public_key.as_str()).unwrap();
let perp_market = load_from_rpc::<PerpMarket>(&rpc_client, &perp_market_pk).await;
mango_group_config
.perp_markets
.iter()
.enumerate()
.map(|(market_index, perp_maket_config)| {
let perp_market_pk = Pubkey::from_str(perp_maket_config.public_key.as_str()).unwrap();
let perp_market = load_from_rpc::<PerpMarket>(&rpc_client, &perp_market_pk);
// fetch price
let base_decimals = mango_group_config.tokens[market_index].decimals;
let quote_decimals = mango_group_config.tokens[0].decimals;
// fetch price
let base_decimals = mango_group_config.tokens[market_index].decimals;
let quote_decimals = mango_group_config.tokens[0].decimals;
let base_unit = I80F48::from_num(10u64.pow(base_decimals as u32));
let quote_unit = I80F48::from_num(10u64.pow(quote_decimals as u32));
let price = mango_cache.price_cache[market_index].price;
println!(
"market index {} price of : {}",
market_index, mango_cache.price_cache[market_index].price
);
let base_unit = I80F48::from_num(10u64.pow(base_decimals as u32));
let quote_unit = I80F48::from_num(10u64.pow(quote_decimals as u32));
let price = mango_cache.price_cache[market_index].price;
println!(
"market index {} price of : {}",
market_index, mango_cache.price_cache[market_index].price
);
let price_quote_lots: i64 = price
.mul(quote_unit)
.mul(I80F48::from_num(perp_market.base_lot_size))
.div(I80F48::from_num(perp_market.quote_lot_size))
.div(base_unit)
.to_num();
let order_base_lots: i64 = base_unit
.div(I80F48::from_num(perp_market.base_lot_size))
.to_num();
let price_quote_lots: i64 = price
.mul(quote_unit)
.mul(I80F48::from_num(perp_market.base_lot_size))
.div(I80F48::from_num(perp_market.quote_lot_size))
.div(base_unit)
.to_num();
let order_base_lots: i64 = base_unit
.div(I80F48::from_num(perp_market.base_lot_size))
.to_num();
let root_bank = &mango_group_config.tokens[market_index].root_key;
let root_bank = Pubkey::from_str(root_bank.as_str()).unwrap();
let node_banks = mango_group_config.tokens[market_index]
.node_keys
.iter()
.map(|x| Pubkey::from_str(x.as_str()).unwrap())
.collect();
let price_oracle =
Pubkey::from_str(mango_group_config.oracles[market_index].public_key.as_str())
.unwrap();
PerpMarketCache {
order_base_lots,
price,
price_quote_lots,
mango_program_pk: mango_program_pk.clone(),
mango_group_pk,
mango_cache_pk,
perp_market_pk,
perp_market,
root_bank,
node_banks,
price_oracle,
bids: perp_market.bids,
asks: perp_market.asks,
}
})
.collect()
let root_bank = &mango_group_config.tokens[market_index].root_key;
let root_bank = Pubkey::from_str(root_bank.as_str()).unwrap();
let node_banks = mango_group_config.tokens[market_index]
.node_keys
.iter()
.map(|x| Pubkey::from_str(x.as_str()).unwrap())
.collect();
let price_oracle =
Pubkey::from_str(mango_group_config.oracles[market_index].public_key.as_str()).unwrap();
ret.push(PerpMarketCache {
order_base_lots,
price,
price_quote_lots,
mango_program_pk: mango_program_pk.clone(),
mango_group_pk,
mango_cache_pk,
perp_market_pk,
perp_market,
root_bank,
node_banks,
price_oracle,
bids: perp_market.bids,
asks: perp_market.asks,
});
}
ret
}

View File

@ -2,7 +2,7 @@ use {
log::info,
mango_simulation::{
cli,
confirmation_strategies::confirmations_by_blocks,
confirmation_strategies::confirmation_by_lite_rpc_notification_stream,
crank::{self, KeeperConfig},
helpers::{
get_latest_blockhash, get_mango_market_perps_cache, start_blockhash_polling_service,
@ -17,7 +17,20 @@ use {
tpu_manager::TpuManager,
},
serde_json,
solana_client::{nonblocking::rpc_client::RpcClient as NbRpcClient, rpc_client::RpcClient},
solana_client::nonblocking::rpc_client::RpcClient as NbRpcClient,
solana_lite_rpc_core::{
block_store::BlockStore,
notifications::NotificationMsg,
quic_connection_utils::QuicConnectionParameters,
tx_store::{empty_tx_store, TxStore},
},
solana_lite_rpc_services::{
block_listenser::BlockListener,
tpu_utils::tpu_service::{TpuService, TpuServiceConfig},
transaction_replayer::TransactionReplayer,
transaction_service::{TransactionService, TransactionServiceBuilder},
tx_sender::TxSender,
},
solana_program::pubkey::Pubkey,
solana_sdk::{commitment_config::CommitmentConfig, signer::keypair::Keypair},
std::{
@ -27,11 +40,66 @@ use {
sync::Arc,
time::Duration,
},
tokio::sync::mpsc::{unbounded_channel, UnboundedSender},
tokio::{sync::RwLock, task::JoinHandle},
};
const METRICS_NAME: &str = "mango-bencher";
async fn configure_transaction_service(
rpc_client: Arc<NbRpcClient>,
identity: Keypair,
block_store: BlockStore,
tx_store: TxStore,
notifier: UnboundedSender<NotificationMsg>,
) -> (TransactionService, JoinHandle<anyhow::Result<()>>) {
let slot = rpc_client.get_slot().await.expect("GetSlot should work");
let tpu_config = TpuServiceConfig {
fanout_slots: 12,
number_of_leaders_to_cache: 1024,
clusterinfo_refresh_time: Duration::from_secs(60 * 60),
leader_schedule_update_frequency: Duration::from_secs(10),
maximum_transaction_in_queue: 200_000,
maximum_number_of_errors: 10,
quic_connection_params: QuicConnectionParameters {
connection_timeout: Duration::from_secs(1),
connection_retry_count: 10,
finalize_timeout: Duration::from_millis(200),
max_number_of_connections: 10,
unistream_timeout: Duration::from_millis(500),
write_timeout: Duration::from_secs(1),
number_of_transactions_per_unistream: 10,
},
};
let tpu_service = TpuService::new(
tpu_config,
Arc::new(identity),
slot,
rpc_client.clone(),
tx_store.clone(),
)
.await
.expect("Should be able to create TPU");
let tx_sender = TxSender::new(tx_store.clone(), tpu_service.clone());
let block_listenser =
BlockListener::new(rpc_client.clone(), tx_store.clone(), block_store.clone());
let replayer = TransactionReplayer::new(
tpu_service.clone(),
tx_store.clone(),
Duration::from_secs(2),
);
let builder = TransactionServiceBuilder::new(
tx_sender,
replayer,
block_listenser,
tpu_service,
1_000_000,
);
builder.start(Some(notifier), block_store, 10, Duration::from_secs(90))
}
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
pub async fn main() -> anyhow::Result<()> {
solana_logger::setup_with_default("info");
@ -43,7 +111,7 @@ pub async fn main() -> anyhow::Result<()> {
let cli::Config {
json_rpc_url,
websocket_url,
id,
identity,
account_keys,
mango_keys,
duration,
@ -81,13 +149,28 @@ pub async fn main() -> anyhow::Result<()> {
.groups
.iter()
.find(|g| g.name == *mango_group_id)
.unwrap();
.expect("Mango group config should exist");
let nb_rpc_client = Arc::new(NbRpcClient::new_with_commitment(
json_rpc_url.to_string(),
CommitmentConfig::confirmed(),
CommitmentConfig::finalized(),
));
let tx_store = empty_tx_store();
let block_store = BlockStore::new(&nb_rpc_client)
.await
.expect("Blockstore should be created");
let (notif_sx, notif_rx) = unbounded_channel();
let (transaction_service, tx_service_jh) = configure_transaction_service(
nb_rpc_client.clone(),
Keypair::from_bytes(identity.to_bytes().as_slice()).unwrap(),
block_store,
tx_store,
notif_sx,
)
.await;
let nb_users = account_keys_parsed.len();
let mut mango_sim_stats = MangoSimulationStats::new(
@ -99,17 +182,24 @@ pub async fn main() -> anyhow::Result<()> {
let (tx_record_sx, tx_record_rx) = tokio::sync::mpsc::unbounded_channel();
let tpu_manager = TpuManager::new(
// continuosly fetch blockhash
let exit_signal = Arc::new(AtomicBool::new(false));
let latest_blockhash = get_latest_blockhash(&nb_rpc_client.clone()).await;
let blockhash = Arc::new(RwLock::new(latest_blockhash));
let current_slot = Arc::new(AtomicU64::new(0));
let blockhash_thread = start_blockhash_polling_service(
exit_signal.clone(),
blockhash.clone(),
current_slot.clone(),
nb_rpc_client.clone(),
websocket_url.clone(),
solana_client::tpu_client::TpuClientConfig::default().fanout_slots,
Keypair::from_bytes(id.to_bytes().as_slice()).unwrap(),
);
let tpu_manager = TpuManager::new(
transaction_service,
mango_sim_stats.clone(),
tx_record_sx.clone(),
)
.await;
tpu_manager.force_reset_after_every(Duration::from_secs(300));
.await?;
info!(
"accounts:{:?} markets:{:?} quotes_per_second:{:?} expected_tps:{:?} duration:{:?}",
@ -122,34 +212,25 @@ pub async fn main() -> anyhow::Result<()> {
duration
);
// continuosly fetch blockhash
let rpc_client = Arc::new(RpcClient::new_with_commitment(
json_rpc_url.to_string(),
CommitmentConfig::finalized(),
));
let exit_signal = Arc::new(AtomicBool::new(false));
let latest_blockhash = get_latest_blockhash(&rpc_client.clone()).await;
let blockhash = Arc::new(RwLock::new(latest_blockhash));
let current_slot = Arc::new(AtomicU64::new(0));
let blockhash_thread = start_blockhash_polling_service(
exit_signal.clone(),
blockhash.clone(),
current_slot.clone(),
rpc_client.clone(),
);
let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str()).unwrap();
let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str())
.expect("Mango program should be able to convert into pubkey");
let perp_market_caches: Vec<PerpMarketCache> =
get_mango_market_perps_cache(rpc_client.clone(), mango_group_config, &mango_program_pk);
get_mango_market_perps_cache(nb_rpc_client.clone(), mango_group_config, &mango_program_pk)
.await;
let quote_root_bank =
Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str()).unwrap();
Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str())
.expect("Quote root bank should be able to convert into pubkey");
();
let quote_node_banks = mango_group_config
.tokens
.last()
.unwrap()
.node_keys
.iter()
.map(|x| Pubkey::from_str(x.as_str()).unwrap())
.map(|x| {
Pubkey::from_str(x.as_str()).expect("Token mint should be able to convert into pubkey")
})
.collect();
clean_market_makers(
@ -177,7 +258,7 @@ pub async fn main() -> anyhow::Result<()> {
} else {
None
};
let from_slot = current_slot.load(Ordering::Relaxed);
let keeper_config = KeeperConfig {
program_id: to_sdk_pk(&mango_program_pk),
rpc_url: json_rpc_url.clone(),
@ -191,7 +272,7 @@ pub async fn main() -> anyhow::Result<()> {
current_slot.clone(),
tpu_manager.clone(),
mango_group_config,
id,
identity,
keeper_prioritization,
);
@ -205,7 +286,7 @@ pub async fn main() -> anyhow::Result<()> {
exit_signal.clone(),
blockhash.clone(),
current_slot.clone(),
tpu_manager,
tpu_manager.clone(),
&duration,
*quotes_per_second,
*priority_fees_proba,
@ -214,8 +295,7 @@ pub async fn main() -> anyhow::Result<()> {
info!("Number of MM threads {}", mm_tasks.len());
drop(tx_record_sx);
let mut tasks = vec![];
tasks.push(blockhash_thread);
let mut tasks = vec![blockhash_thread];
let (tx_status_sx, tx_status_rx) = tokio::sync::broadcast::channel(1000000);
let (block_status_sx, block_status_rx) = tokio::sync::broadcast::channel(1000000);
@ -231,12 +311,11 @@ pub async fn main() -> anyhow::Result<()> {
);
tasks.append(&mut writers_jh);
let mut confirmation_threads = confirmations_by_blocks(
nb_rpc_client,
let mut confirmation_threads = confirmation_by_lite_rpc_notification_stream(
tx_record_rx,
notif_rx,
tx_status_sx,
block_status_sx,
from_slot,
exit_signal.clone(),
);
tasks.append(&mut confirmation_threads);
@ -264,12 +343,27 @@ pub async fn main() -> anyhow::Result<()> {
// we start stopping all other process
// some processes like confirmation of transactions will take some time and will get additional 2 minutes
// to confirm remaining transactions
futures::future::join_all(mm_tasks).await;
info!("finished market making, joining all other services");
println!("finished market making, joining all other services");
exit_signal.store(true, Ordering::Relaxed);
let market_makers_wait_task = {
let exit_signal = exit_signal.clone();
tokio::spawn(async move {
futures::future::join_all(mm_tasks).await;
info!("finished market making, joining all other services");
exit_signal.store(true, Ordering::Relaxed);
})
};
tasks.push(market_makers_wait_task);
let transaction_service = tokio::spawn(async move {
let _ = tx_service_jh.await;
info!("Transaction service joined");
});
tokio::select! {
_ = futures::future::join_all(tasks) => {},
_ = transaction_service => {},
};
futures::future::join_all(tasks).await;
mango_sim_stats.report(true, METRICS_NAME).await;
Ok(())
}

View File

@ -1,4 +1,9 @@
use std::{cell::RefCell, collections::{BTreeMap, HashSet}, convert::TryFrom, mem::size_of};
use std::{
cell::RefCell,
collections::{BTreeMap, HashSet},
convert::TryFrom,
mem::size_of,
};
use arrayref::array_ref;
use async_channel::Sender;
@ -95,27 +100,22 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
trace!("evq {pk:?} seq_num={seq_num} len={len} contains_fill_events={contains_fill_events} has_backlog={has_backlog}");
let mut mango_accounts = HashSet::new();
event_queue
.iter()
.take(MAX_EVENTS_PER_TX)
.for_each(|e|
if mango_accounts.len() < MAX_ACCS_PER_TX {
match EventType::try_from(e.event_type).expect("mango v4 event") {
EventType::Fill => {
let fill: &FillEvent = cast_ref(e);
mango_accounts.insert(fill.maker);
mango_accounts.insert(fill.taker);
}
EventType::Out => {
let out: &OutEvent = cast_ref(e);
mango_accounts.insert(out.owner);
}
EventType::Liquidate => {
}
event_queue.iter().take(MAX_EVENTS_PER_TX).for_each(|e| {
if mango_accounts.len() < MAX_ACCS_PER_TX {
match EventType::try_from(e.event_type).expect("mango v4 event") {
EventType::Fill => {
let fill: &FillEvent = cast_ref(e);
mango_accounts.insert(fill.maker);
mango_accounts.insert(fill.taker);
}
EventType::Out => {
let out: &OutEvent = cast_ref(e);
mango_accounts.insert(out.owner);
}
EventType::Liquidate => {}
}
);
}
});
let pk = solana_sdk::pubkey::Pubkey::new_from_array(pk.to_bytes());
let mkt_pk = self
@ -130,7 +130,10 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
&to_sp_pk(&self.cache_pk),
&to_sp_pk(mkt_pk),
&to_sp_pk(&pk),
&mut mango_accounts.iter().map(|pk| pk.clone()).collect::<Vec<_>>(),
&mut mango_accounts
.iter()
.map(|pk| pk.clone())
.collect::<Vec<_>>(),
MAX_EVENTS_PER_TX,
)
.unwrap(),

View File

@ -317,49 +317,52 @@ pub async fn clean_market_makers(
blockhash: Arc<RwLock<Hash>>,
) {
info!("Cleaning previous transactions by market makers");
let mut tasks = vec![];
for market_maker in account_keys_parsed {
let mango_account_pk =
Pubkey::from_str(market_maker.mango_account_pks[0].as_str()).unwrap();
for perp_market in perp_market_caches {
let market_maker = market_maker.clone();
let perp_market = perp_market.clone();
let rpc_client = rpc_client.clone();
let mango_account_pk = mango_account_pk.clone();
let blockhash = blockhash.clone();
for account_keys_parsed in account_keys_parsed.chunks(10) {
let mut tasks = vec![];
for market_maker in account_keys_parsed {
let mango_account_pk =
Pubkey::from_str(market_maker.mango_account_pks[0].as_str()).unwrap();
for perp_market in perp_market_caches {
let market_maker = market_maker.clone();
let perp_market = perp_market.clone();
let rpc_client = rpc_client.clone();
let mango_account_pk = mango_account_pk.clone();
let blockhash = blockhash.clone();
let task = tokio::spawn(async move {
let mango_account_signer =
Keypair::from_bytes(market_maker.secret_key.as_slice()).unwrap();
let task = tokio::spawn(async move {
let mango_account_signer =
Keypair::from_bytes(market_maker.secret_key.as_slice()).unwrap();
for _ in 0..10 {
let mut tx = create_cancel_all_orders(
&perp_market,
mango_account_pk,
&mango_account_signer,
);
for _ in 0..10 {
let mut tx = create_cancel_all_orders(
&perp_market,
mango_account_pk,
&mango_account_signer,
);
let recent_blockhash = *blockhash.read().await;
tx.sign(&[&mango_account_signer], recent_blockhash);
// send and confirm the transaction with an RPC
if let Ok(res) = tokio::time::timeout(
Duration::from_secs(10),
rpc_client.send_and_confirm_transaction(&tx),
)
.await
{
match res {
Ok(_) => break,
Err(e) => info!("Error occured while doing cancel all for ma : {} and perp market : {} error : {}", mango_account_pk, perp_market.perp_market_pk, e),
let recent_blockhash = *blockhash.read().await;
tx.sign(&[&mango_account_signer], recent_blockhash);
let sig = tx.signatures[0];
// send and confirm the transaction with an RPC
if let Ok(res) = tokio::time::timeout(
Duration::from_secs(10),
rpc_client.send_and_confirm_transaction(&tx),
)
.await
{
match res {
Ok(_) => break,
Err(e) => info!("Error occured while doing cancel all for ma : {}, sig : {} perp market : {} error : {}", mango_account_pk, sig, perp_market.perp_market_pk, e),
}
}
}
}
});
tasks.push(task);
});
tasks.push(task);
}
}
}
futures::future::join_all(tasks).await;
futures::future::join_all(tasks).await;
}
info!("finished cleaning market makers");
}

View File

@ -1,6 +1,6 @@
use chrono::Utc;
use std::str::FromStr;
use solana_sdk::{instruction::Instruction, pubkey::Pubkey};
use std::str::FromStr;
pub fn instruction(data: Vec<u8>) -> Instruction {
Instruction {

View File

@ -3,7 +3,7 @@ use fixed::types::I80F48;
use mango::state::PerpMarket;
use serde::Serialize;
use solana_program::{pubkey::Pubkey, slot_history::Slot};
use solana_sdk::signature::Signature;
use solana_sdk::{signature::Signature, commitment_config::CommitmentLevel};
use std::fmt;
#[derive(Clone, Debug, Serialize)]
@ -99,7 +99,9 @@ pub struct BlockData {
pub block_slot: Slot,
pub block_leader: String,
pub total_transactions: u64,
pub number_of_mm_transactions: u64,
pub number_of_mango_simulation_txs: u64,
pub block_time: u64,
pub cu_consumed: u64,
pub cu_consumed_by_mango_simulations: u64,
pub commitment: CommitmentLevel,
}

View File

@ -1,16 +1,17 @@
use std::{
collections::HashMap,
sync::Mutex,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Instant, collections::HashMap,
time::Instant,
};
use crate::states::{KeeperInstruction, TransactionConfirmRecord};
use iter_tools::Itertools;
use solana_metrics::datapoint_info;
use tokio::{task::JoinHandle, sync::RwLock};
use tokio::{sync::RwLock, task::JoinHandle};
// Non atomic version of counters
#[derive(Clone, Default, Debug)]
@ -49,7 +50,7 @@ impl NACounters {
pub fn diff(&self, other: &NACounters) -> NACounters {
let mut new_error_count = HashMap::new();
for (error, count) in &self.errors {
if let Some(v) = other.errors.get( error ) {
if let Some(v) = other.errors.get(error) {
new_error_count.insert(error.clone(), *count - *v);
} else {
new_error_count.insert(error.clone(), *count);
@ -383,7 +384,13 @@ impl MangoSimulationStats {
.checked_div(counters.num_sent)
.unwrap_or(0)
);
let top_5_errors = counters.errors.iter().sorted_by(|x,y| {(*y.1).cmp(x.1)}).take(5).enumerate().collect_vec();
let top_5_errors = counters
.errors
.iter()
.sorted_by(|x, y| (*y.1).cmp(x.1))
.take(5)
.enumerate()
.collect_vec();
let mut errors_to_print: String = String::new();
for (idx, (error, count)) in top_5_errors {
println!("Error #{idx} : {error} ({count})");
@ -489,11 +496,7 @@ impl MangoSimulationStats {
diff.succ_update_funding_txs,
i64
),
(
"top_5_errors",
errors_to_print,
String
)
("top_5_errors", errors_to_print, String)
);
}
}

View File

@ -1,138 +1,32 @@
use bincode::serialize;
use log::{info, warn, error};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::{connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient};
use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool};
use solana_sdk::signature::Keypair;
use log::warn;
use solana_client::connection_cache::ConnectionCache;
use solana_lite_rpc_services::transaction_service::TransactionService;
use solana_sdk::transaction::Transaction;
use std::time::Duration;
use std::{
net::{IpAddr, Ipv4Addr},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
use tokio::sync::{mpsc::UnboundedSender, RwLock};
use tokio::sync::mpsc::UnboundedSender;
use crate::{states::TransactionSendRecord, stats::MangoSimulationStats};
pub type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
pub type QuicConnectionCache = ConnectionCache;
#[derive(Clone)]
pub struct TpuManager {
error_count: Arc<AtomicU32>,
rpc_client: Arc<RpcClient>,
// why arc twice / one is so that we clone rwlock and other so that we can clone tpu client
tpu_client: Arc<RwLock<Arc<QuicTpuClient>>>,
pub ws_addr: String,
fanout_slots: u64,
identity: Arc<Keypair>,
transaction_service: TransactionService,
stats: MangoSimulationStats,
tx_send_record: UnboundedSender<TransactionSendRecord>,
}
impl TpuManager {
pub async fn new(
rpc_client: Arc<RpcClient>,
ws_addr: String,
fanout_slots: u64,
identity: Keypair,
transaction_service: TransactionService,
stats: MangoSimulationStats,
tx_send_record: UnboundedSender<TransactionSendRecord>,
) -> Self {
let connection_cache = ConnectionCache::new_with_client_options(
4,
None,
Some((&identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))),
None,
);
let quic_connection_cache =
if let ConnectionCache::Quic(connection_cache) = connection_cache {
Some(connection_cache)
} else {
None
};
let tpu_client = Arc::new(
TpuClient::new_with_connection_cache(
rpc_client.clone(),
&ws_addr,
solana_client::tpu_client::TpuClientConfig { fanout_slots },
quic_connection_cache.unwrap(),
)
.await
.unwrap(),
);
Self {
rpc_client,
tpu_client: Arc::new(RwLock::new(tpu_client)),
ws_addr,
fanout_slots,
error_count: Default::default(),
identity: Arc::new(identity),
) -> anyhow::Result<Self> {
Ok(Self {
transaction_service,
stats,
tx_send_record,
}
}
pub async fn reset_tpu_client(&self) -> anyhow::Result<()> {
let identity = Keypair::from_bytes(&self.identity.to_bytes()).unwrap();
let connection_cache = ConnectionCache::new_with_client_options(
4,
None,
Some((&identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))),
None,
);
let quic_connection_cache =
if let ConnectionCache::Quic(connection_cache) = connection_cache {
Some(connection_cache)
} else {
None
};
let tpu_client = Arc::new(
TpuClient::new_with_connection_cache(
self.rpc_client.clone(),
&self.ws_addr,
solana_client::tpu_client::TpuClientConfig {
fanout_slots: self.fanout_slots,
},
quic_connection_cache.unwrap(),
)
.await
.unwrap(),
);
self.error_count.store(0, Ordering::Relaxed);
*self.tpu_client.write().await = tpu_client;
Ok(())
}
pub async fn reset(&self) -> anyhow::Result<()> {
self.error_count.fetch_add(1, Ordering::Relaxed);
if self.error_count.load(Ordering::Relaxed) > 5 {
self.reset_tpu_client().await?;
info!("TPU Reset after 5 errors");
}
Ok(())
}
pub fn force_reset_after_every(&self, duration: Duration) {
let this = self.clone();
tokio::spawn(async move {
tokio::time::sleep(duration).await;
if let Err(e) = this.reset_tpu_client().await {
error!("timely restart of tpu client failed {}", e);
}
});
}
async fn get_tpu_client(&self) -> Arc<QuicTpuClient> {
self.tpu_client.read().await.clone()
})
}
pub async fn send_transaction(
@ -140,7 +34,6 @@ impl TpuManager {
transaction: &solana_sdk::transaction::Transaction,
transaction_sent_record: TransactionSendRecord,
) -> bool {
let tpu_client = self.get_tpu_client().await;
self.stats
.inc_send(&transaction_sent_record.keeper_instruction);
@ -151,41 +44,28 @@ impl TpuManager {
"sending error on channel : {}",
sent.err().unwrap().to_string()
);
if let Err(e) = self.reset().await {
error!("error while reseting tpu client {}", e);
}
}
let transaction = bincode::serialize(transaction).unwrap();
tpu_client.send_transaction(transaction).await
let res = self.transaction_service
.send_transaction(transaction, None)
.await;
if let Err(e) = &res{
print!("error sending txs on custom tpu {e:?}");
}
res.is_ok()
}
pub async fn send_transaction_batch(
&self,
batch: &Vec<(Transaction, TransactionSendRecord)>,
) -> bool {
let tpu_client = self.get_tpu_client().await;
for (_tx, record) in batch {
self.stats.inc_send(&record.keeper_instruction);
let tx_sent_record = self.tx_send_record.clone();
let sent = tx_sent_record.send(record.clone());
if sent.is_err() {
warn!(
"sending error on channel : {}",
sent.err().unwrap().to_string()
);
}
let mut value = true;
for (tx, record) in batch {
value &= self.send_transaction(tx, record.clone()).await;
}
tpu_client
.try_send_wire_transaction_batch(
batch
.iter()
.map(|(tx, _)| serialize(tx).expect("serialization should succeed"))
.collect(),
)
.await
.is_ok()
value
}
}