Implementing processing of blocks instead of asking rpc node

This commit is contained in:
godmode galactus 2022-09-04 19:27:30 +02:00
parent e5cb6d5117
commit 7947b45968
2 changed files with 172 additions and 34 deletions

6
bench_tps_run.sh Normal file
View File

@ -0,0 +1,6 @@
cargo run \
--bin solana-bench-mango \
-- -u https://api.testnet.rpcpool.com/dfeb84a5-7fe8-4783-baf9-60cca0babbc7 \
--identity ~/devnet.json \
--accounts ~/accounts20.json \
--mango ~/ids.json --duration 10 -q 2

View File

@ -7,6 +7,7 @@ use mango::{
state::{MangoCache, MangoGroup, PerpMarket},
};
use mango_common::Loadable;
use rayon::ThreadBuilder;
use serde_json;
use solana_bench_mango::{
@ -29,22 +30,22 @@ use solana_sdk::{
stake::instruction,
transaction::Transaction,
};
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding, EncodedConfirmedBlock};
use core::time;
use std::{
collections::HashMap,
collections::VecDeque,
collections::{VecDeque, HashSet},
fs,
ops::{Div, Mul},
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, Ordering, AtomicU64},
mpsc::{channel, Sender, TryRecvError, Receiver},
Arc, RwLock,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
time::{Duration, Instant}, cell::Ref,
};
fn load_from_rpc<T: Loadable>(rpc_client: &RpcClient, pk: &Pubkey) -> T {
@ -141,7 +142,7 @@ impl<T: Clone> RotatingQueue<T> {
where
F: Fn() -> T,
{
let mut item = Self {
let item = Self {
deque: Arc::new(RwLock::new(VecDeque::<T>::new())),
};
{
@ -164,7 +165,7 @@ impl<T: Clone> RotatingQueue<T> {
fn poll_blockhash_and_slot(
exit_signal: &Arc<AtomicBool>,
blockhash: Arc<RwLock<Hash>>,
slot: Arc<RwLock<Slot>>,
slot: &AtomicU64,
client: &Arc<RpcClient>,
_id: &Pubkey,
) {
@ -178,7 +179,7 @@ fn poll_blockhash_and_slot(
let new_slot = client.get_slot().unwrap();
{
*slot.write().unwrap() = new_slot;
slot.store(new_slot, Ordering::Release);
}
if let Some(new_blockhash) = get_new_latest_blockhash(client, &old_blockhash) {
@ -216,7 +217,7 @@ fn send_mm_transactions(
mango_account_pk: Pubkey,
mango_account_signer: &Keypair,
blockhash: Arc<RwLock<Hash>>,
slot: Arc<RwLock<Slot>>,
slot: &AtomicU64,
) {
// update quotes 2x per second
for _ in 0..quotes_per_second {
@ -318,15 +319,17 @@ fn send_mm_transactions(
}
let tpu_client = tpu_client_pool.get();
tpu_client.send_transaction(&tx);
tx_record_sx
let sent = tx_record_sx
.send(TransactionSendRecord {
signature: tx.signatures[0],
sent_at: Utc::now(),
sent_slot : *slot.read().unwrap(),
sent_slot : slot.load(Ordering::Acquire),
market_maker: mango_account_signer.pubkey(),
market: c.perp_market_pk,
})
.unwrap();
});
if sent.is_err() {
println!("sending error on channel : {}", sent.err().unwrap().to_string() );
}
}
}
}
@ -399,10 +402,10 @@ fn process_signature_confirmation_batch(
fn confirmation_by_querying_rpc(
recv_limit : usize,
rpc_client : Arc<RpcClient>,
tx_record_rx: &Receiver<TransactionSendRecord>,
tx_confirm_records : Arc<RwLock<Vec<TransactionConfirmRecord>>>,
tx_timeout_records: Arc<RwLock<Vec<TransactionSendRecord>>>,
rpc_client : Arc<RpcClient>,
tx_record_rx: &Receiver<TransactionSendRecord>
) {
const TIMEOUT: u64 = 30;
let mut error: bool = false;
@ -505,30 +508,157 @@ fn confirmation_by_querying_rpc(
fn confirmations_by_blocks(
client: Arc<RpcClient>,
current_slot: Arc<RwLock<Slot>>,
current_slot: &AtomicU64,
recv_limit : usize,
timeout : AtomicBool,
tx_record_rx: &Receiver<TransactionSendRecord>,
tx_record_rx: Receiver<TransactionSendRecord>,
tx_confirm_records : Arc<RwLock<Vec<TransactionConfirmRecord>>>,
tx_timeout_records: Arc<RwLock<Vec<TransactionSendRecord>>>,
) {
let mut last_slot = 0;
let mut recv_until_confirm = recv_limit;
let finished_all_transactions = AtomicBool::new(false);
while recv_until_confirm > 0 &&
!timeout.load(Ordering::Relaxed) &&
!finished_all_transactions.load(Ordering::Relaxed) {
let recv_until_confirm = Arc::new(AtomicU64::new(recv_limit as u64));
let finished_all_transactions = Arc::new(AtomicBool::new(false));
let transaction_map = Arc::new(RwLock::new(HashMap::<Signature, TransactionSendRecord>::new()));
let joinable = {
let signal_finish = finished_all_transactions.clone();
let transaction_map = transaction_map.clone();
let recv_until_confirm = recv_until_confirm.clone();
Builder::new()
.name("Transaction Mapper".to_string())
.spawn( move || {
loop {
let current_recv_until_confim = recv_until_confirm.load(Ordering::Acquire);
if current_recv_until_confim == 0 {
println!("finished reading and filling transaction map");
sleep(Duration::from_secs(120));
signal_finish.store(true, Ordering::Relaxed);
break;
}
match tx_record_rx.try_recv() {
Ok(tx_record) => {
let map = &mut *transaction_map.write().unwrap();
debug!(
"add to queue len={} sig={}",
map.len() + 1,
tx_record.signature
);
println!("trnsaction : {}", tx_record.signature);
map.insert(tx_record.signature, tx_record);
recv_until_confirm.store( current_recv_until_confim - 1, Ordering::Release);
}
Err(TryRecvError::Empty) => {
debug!("channel emptied");
sleep(Duration::from_millis(100));
}
Err(TryRecvError::Disconnected) => {
{
info!("channel disconnected {}", current_recv_until_confim);
}
debug!("channel disconnected");
sleep(Duration::from_secs(30));
signal_finish.store(true, Ordering::Relaxed);
break; // still confirm remaining transctions
}
}
}
}).unwrap()
};
let mut test_bool = false;
let mut last_slot = current_slot.load(Ordering::Acquire);
while !finished_all_transactions.load(Ordering::Relaxed) {
let transaction_map = transaction_map.clone();
let current_slot = current_slot.load(Ordering::Acquire);
let current_slot = { *current_slot.read().unwrap() };
if last_slot != current_slot {
let block = client.get_block(current_slot).unwrap();
let block_res = client.get_blocks(last_slot, Some(current_slot)).unwrap();
let mut queue = VecDeque::new();
for slot in block_res {
let block = client.get_block(slot).unwrap();
queue.push_front((slot, block));
}
if queue.is_empty(){
sleep(Duration::from_millis(20));
continue;
}
for (slot, block) in queue {
println!("block {} at slot {} contains {} trnsactions", block.blockhash, block.block_height.unwrap(), block.transactions.len());
for solana_transaction_status::EncodedTransactionWithStatusMeta {
transaction,
meta,
version,
} in block.transactions {
if let solana_transaction_status::EncodedTransaction::Json(transaction) = transaction {
transaction.message;
for signature in transaction.signatures {
let signature = Signature::from_str(&signature).unwrap();
let transaction_record_op: Option<TransactionSendRecord> = {
let temp = transaction_map.read();
match temp {
Ok(x) =>
{
let mut ret = None;
for v in x.iter() {
if (*v.0).eq(&signature) {
ret = Some(v.1.clone())
}
}
ret
// if !test_bool {
// println!("Signature in block is {} and map is {}", signature, x.iter().next().unwrap().0);
// test_bool = true;
// }
// let v = x.get(&signature);
// match v {
// Some(x)=> Some(x.clone()),
// None => None,
// }
},
_ =>
{
println!("Error getting transaction map");
None
}
}
};
if let Some(transaction_record) = transaction_record_op {
// remove signature from transaction map
{transaction_map.write().unwrap().remove(&signature);}
let mut lock = tx_confirm_records.write().unwrap();
(*lock).push(TransactionConfirmRecord{
signature: transaction_record.signature,
confirmed_slot: slot, // TODO: should be changed to correct slot
confirmed_at: Utc::now(),
sent_at: transaction_record.sent_at,
sent_slot: transaction_record.sent_slot,
error: if let Some(meta) = &meta {
match &meta.err {
Some(x) => Some(x.to_string()),
None => None,
}
} else {
None
},
})
}
}
}
}
}
last_slot = current_slot;
}
else {
sleep(Duration::from_millis(50));
}
last_slot = current_slot;
}
joinable.join().unwrap();
let mut timeout_writer = tx_timeout_records.write().unwrap();
for x in transaction_map.read().unwrap().iter() {
timeout_writer.push(x.1.clone())
}
}
fn main() {
@ -609,17 +739,17 @@ fn main() {
));
let exit_signal = Arc::new(AtomicBool::new(false));
let blockhash = Arc::new(RwLock::new(get_latest_blockhash(&rpc_client.clone())));
let current_slot = Arc::new(RwLock::new(rpc_client.get_slot().unwrap()));
let current_slot = Arc::new(AtomicU64::new(0));
let blockhash_thread = {
let exit_signal = exit_signal.clone();
let blockhash = blockhash.clone();
let current_slot = current_slot.clone();
let client = rpc_client.clone();
let id = id.pubkey();
let current_slot = current_slot.clone();
Builder::new()
.name("solana-blockhash-poller".to_string())
.spawn(move || {
poll_blockhash_and_slot(&exit_signal, blockhash.clone(), current_slot.clone(), &client, &id);
poll_blockhash_and_slot(&exit_signal, blockhash.clone(), current_slot.as_ref(), &client, &id);
})
.unwrap()
};
@ -692,6 +822,7 @@ fn main() {
mango_account_signer.pubkey(),
mango_account_pk
);
//sleep(Duration::from_secs(10));
let tx_record_sx = tx_record_sx.clone();
@ -709,7 +840,7 @@ fn main() {
mango_account_pk,
&mango_account_signer,
blockhash.clone(),
current_slot.clone()
current_slot.as_ref(),
);
let elapsed_millis: u64 = start.elapsed().as_millis() as u64;
@ -745,7 +876,8 @@ fn main() {
* duration.as_secs() as usize
* quotes_per_second as usize;
confirmation_by_querying_rpc(recv_limit, tx_confirm_records.clone(), tx_timeout_records.clone(), rpc_client.clone(), &tx_record_rx);
confirmation_by_querying_rpc(recv_limit, rpc_client.clone(), &tx_record_rx, tx_confirm_records.clone(), tx_timeout_records.clone());
//confirmations_by_blocks(rpc_client.clone(), &current_slot, recv_limit, tx_record_rx, tx_confirm_records.clone(), tx_timeout_records.clone());
let confirmed: Vec<TransactionConfirmRecord> = {
let lock = tx_confirm_records.write().unwrap();