Updating confrim by block, to do it in parallel mode

This commit is contained in:
godmode galactus 2022-09-07 20:58:07 +02:00
parent 7947b45968
commit 73cc5f332d
1 changed files with 126 additions and 245 deletions

View File

@ -98,14 +98,13 @@ struct TransactionConfirmRecord {
pub sent_at: DateTime<Utc>,
pub confirmed_slot: Slot,
pub confirmed_at: DateTime<Utc>,
pub successful: bool,
pub slot_leader: Pubkey,
pub error: Option<String>,
}
#[derive(Clone)]
struct TransactionTimeoutRecord {
pub signature: Signature,
pub sent_at: DateTime<Utc>,
pub sent_slot: Slot,
pub market_maker: Pubkey,
pub market: Pubkey,
pub block_hash: Pubkey,
pub slot_processed: Slot,
}
#[derive(Clone)]
@ -360,6 +359,7 @@ fn process_signature_confirmation_batch(
sent_at: tx_record.sent_at,
confirmed_at: Utc::now(),
confirmed_slot: s.slot,
successful: s.err.is_none(),
error: s.err.as_ref().map(|e| {
let err_msg = e.to_string();
debug!(
@ -368,6 +368,11 @@ fn process_signature_confirmation_batch(
);
err_msg
}),
block_hash: Pubkey::default(),
slot_leader: Pubkey::default(),
market: tx_record.market,
market_maker: tx_record.market_maker,
slot_processed: tx_record.sent_slot,
});
debug!(
@ -507,153 +512,128 @@ fn confirmation_by_querying_rpc(
}
fn confirmations_by_blocks(
client: Arc<RpcClient>,
clients: RotatingQueue<Arc<RpcClient>>,
current_slot: &AtomicU64,
recv_limit : usize,
tx_record_rx: Receiver<TransactionSendRecord>,
tx_confirm_records : Arc<RwLock<Vec<TransactionConfirmRecord>>>,
tx_timeout_records: Arc<RwLock<Vec<TransactionSendRecord>>>,
) {
let recv_until_confirm = Arc::new(AtomicU64::new(recv_limit as u64));
let finished_all_transactions = Arc::new(AtomicBool::new(false));
let mut recv_until_confirm = recv_limit;
let transaction_map = Arc::new(RwLock::new(HashMap::<Signature, TransactionSendRecord>::new()));
let last_slot = current_slot.load(Ordering::Acquire);
let joinable = {
let signal_finish = finished_all_transactions.clone();
let transaction_map = transaction_map.clone();
let recv_until_confirm = recv_until_confirm.clone();
Builder::new()
.name("Transaction Mapper".to_string())
.spawn( move || {
loop {
let current_recv_until_confim = recv_until_confirm.load(Ordering::Acquire);
if current_recv_until_confim == 0 {
println!("finished reading and filling transaction map");
sleep(Duration::from_secs(120));
signal_finish.store(true, Ordering::Relaxed);
break;
}
match tx_record_rx.try_recv() {
Ok(tx_record) => {
let map = &mut *transaction_map.write().unwrap();
debug!(
"add to queue len={} sig={}",
map.len() + 1,
tx_record.signature
);
println!("trnsaction : {}", tx_record.signature);
map.insert(tx_record.signature, tx_record);
recv_until_confirm.store( current_recv_until_confim - 1, Ordering::Release);
}
Err(TryRecvError::Empty) => {
debug!("channel emptied");
sleep(Duration::from_millis(100));
}
Err(TryRecvError::Disconnected) => {
{
info!("channel disconnected {}", current_recv_until_confim);
}
debug!("channel disconnected");
sleep(Duration::from_secs(30));
signal_finish.store(true, Ordering::Relaxed);
break; // still confirm remaining transctions
}
}
while recv_until_confirm != 0 {
match tx_record_rx.try_recv() {
Ok(tx_record) => {
let mut transaction_map = transaction_map.write().unwrap();
debug!(
"add to queue len={} sig={}",
transaction_map.len() + 1,
tx_record.signature
);
transaction_map.insert(tx_record.signature, tx_record);
recv_until_confirm-=1;
}
Err(TryRecvError::Empty) => {
debug!("channel emptied");
sleep(Duration::from_millis(100));
}
Err(TryRecvError::Disconnected) => {
{
info!("channel disconnected {}", recv_until_confirm);
}
}).unwrap()
};
let mut test_bool = false;
let mut last_slot = current_slot.load(Ordering::Acquire);
while !finished_all_transactions.load(Ordering::Relaxed) {
let transaction_map = transaction_map.clone();
let current_slot = current_slot.load(Ordering::Acquire);
if last_slot != current_slot {
let block_res = client.get_blocks(last_slot, Some(current_slot)).unwrap();
let mut queue = VecDeque::new();
for slot in block_res {
let block = client.get_block(slot).unwrap();
queue.push_front((slot, block));
debug!("channel disconnected");
break; // still confirm remaining transctions
}
if queue.is_empty(){
sleep(Duration::from_millis(20));
continue;
}
for (slot, block) in queue {
println!("block {} at slot {} contains {} trnsactions", block.blockhash, block.block_height.unwrap(), block.transactions.len());
for solana_transaction_status::EncodedTransactionWithStatusMeta {
transaction,
meta,
version,
} in block.transactions {
if let solana_transaction_status::EncodedTransaction::Json(transaction) = transaction {
transaction.message;
for signature in transaction.signatures {
let signature = Signature::from_str(&signature).unwrap();
let transaction_record_op: Option<TransactionSendRecord> = {
let temp = transaction_map.read();
match temp {
Ok(x) =>
{
let mut ret = None;
for v in x.iter() {
if (*v.0).eq(&signature) {
ret = Some(v.1.clone())
}
}
ret
// if !test_bool {
// println!("Signature in block is {} and map is {}", signature, x.iter().next().unwrap().0);
// test_bool = true;
// }
// let v = x.get(&signature);
// match v {
// Some(x)=> Some(x.clone()),
// None => None,
// }
},
_ =>
{
println!("Error getting transaction map");
None
}
}
};
if let Some(transaction_record) = transaction_record_op {
// remove signature from transaction map
{transaction_map.write().unwrap().remove(&signature);}
let mut lock = tx_confirm_records.write().unwrap();
(*lock).push(TransactionConfirmRecord{
signature: transaction_record.signature,
confirmed_slot: slot, // TODO: should be changed to correct slot
confirmed_at: Utc::now(),
sent_at: transaction_record.sent_at,
sent_slot: transaction_record.sent_slot,
error: if let Some(meta) = &meta {
match &meta.err {
Some(x) => Some(x.to_string()),
None => None,
}
} else {
None
},
})
}
}
}
}
}
last_slot = current_slot;
}
else {
sleep(Duration::from_millis(50));
}
}
joinable.join().unwrap();
println!("finished mapping all the trasactions");
sleep(Duration::from_secs(40));
let commitment_confirmation = CommitmentConfig{commitment:CommitmentLevel::Confirmed};
let block_res = clients.get().get_blocks_with_commitment(last_slot, None, commitment_confirmation).unwrap();
let nb_blocks = block_res.len();
let nb_thread:usize = 16;
println!("processing {} blocks", nb_blocks);
let mut join_handles = Vec::new();
for slot_batch in block_res.chunks( if nb_blocks > nb_thread {nb_blocks.div(nb_thread)} else {nb_blocks} ).map(|x| x.to_vec()) {
let map = transaction_map.clone();
let client = clients.get().clone();
let tx_confirm_records = tx_confirm_records.clone();
let joinble = Builder::new()
.name("getting blocks and searching transactions".to_string())
.spawn( move || {
for slot in slot_batch {
let block = client.get_block_with_config(slot, RpcBlockConfig { encoding: None, transaction_details: None, rewards: None, commitment: Some(commitment_confirmation), max_supported_transaction_version: None }).unwrap();
let rewards = &block.rewards.unwrap();
if let Some(transactions) = block.transactions {
println!("block {} at slot {} contains {} transactions", block.blockhash, block.block_height.unwrap(), transactions.len());
for solana_transaction_status::EncodedTransactionWithStatusMeta {
transaction,
meta,
version,
} in transactions {
if let solana_transaction_status::EncodedTransaction::Json(transaction) = transaction {
transaction.message;
for signature in transaction.signatures {
let signature = Signature::from_str(&signature).unwrap();
let transaction_record_op = {
let map = map.read().unwrap();
let rec = map.get(&signature);
match rec {
Some(x)=> Some(x.clone()),
None => None,
}
};
if let Some(transaction_record) = transaction_record_op {
let mut lock = tx_confirm_records.write().unwrap();
(*lock).push(TransactionConfirmRecord{
signature: transaction_record.signature,
confirmed_slot: slot, // TODO: should be changed to correct slot
confirmed_at: Utc::now(),
sent_at: transaction_record.sent_at,
sent_slot: transaction_record.sent_slot,
successful: if let Some(meta) = &meta {
meta.status.is_ok()
} else {
false
},
error: if let Some(meta) = &meta {
match &meta.err {
Some(x) => Some(x.to_string()),
None => None,
}
} else {
None
},
block_hash: Pubkey::from_str(block.blockhash.as_str()).unwrap(),
market: transaction_record.market,
market_maker: transaction_record.market_maker,
slot_processed: slot,
slot_leader: Pubkey::from_str(rewards.iter()
.find(|r| r.reward_type == Some(RewardType::Fee))
.unwrap()
.pubkey.as_str()).unwrap(),
})
}
map.write().unwrap().remove(&signature);
}
}
}
}
}
}).unwrap();
join_handles.push(joinble);
}
for handle in join_handles {
handle.join().unwrap();
}
let mut timeout_writer = tx_timeout_records.write().unwrap();
for x in transaction_map.read().unwrap().iter() {
timeout_writer.push(x.1.clone())
@ -698,7 +678,7 @@ fn main() {
.unwrap();
let number_of_tpu_clients: usize = 2 * (*quotes_per_second as usize);
let rpc_client_for_tpu_client =
let rpc_clients =
RotatingQueue::<Arc<RpcClient>>::new(number_of_tpu_clients, || {
Arc::new(RpcClient::new_with_commitment(
json_rpc_url.to_string(),
@ -711,7 +691,7 @@ fn main() {
|| {
Arc::new(
TpuClient::new_with_connection_cache(
rpc_client_for_tpu_client.get().clone(),
rpc_clients.get().clone(),
&websocket_url,
solana_client::tpu_client::TpuClientConfig::default(),
Arc::new(ConnectionCache::default()),
@ -876,8 +856,8 @@ fn main() {
* duration.as_secs() as usize
* quotes_per_second as usize;
confirmation_by_querying_rpc(recv_limit, rpc_client.clone(), &tx_record_rx, tx_confirm_records.clone(), tx_timeout_records.clone());
//confirmations_by_blocks(rpc_client.clone(), &current_slot, recv_limit, tx_record_rx, tx_confirm_records.clone(), tx_timeout_records.clone());
//confirmation_by_querying_rpc(recv_limit, rpc_client.clone(), &tx_record_rx, tx_confirm_records.clone(), tx_timeout_records.clone());
confirmations_by_blocks(rpc_clients, &current_slot, recv_limit, tx_record_rx, tx_confirm_records.clone(), tx_timeout_records.clone());
let confirmed: Vec<TransactionConfirmRecord> = {
let lock = tx_confirm_records.write().unwrap();
@ -925,105 +905,6 @@ fn main() {
confirmation_times.last().unwrap(),
confirmation_times[confirmation_times.len() / 2]
);
let mut slots = confirmed
.iter()
.map(|r| r.confirmed_slot)
.collect::<Vec<_>>();
slots.sort();
slots.dedup();
info!(
"slots min={} max={} num={}",
slots.first().unwrap(),
slots.last().unwrap(),
slots.len()
);
info!("slot csv stats");
let mut num_tx_confirmed_by_slot = HashMap::new();
for r in confirmed.iter() {
*num_tx_confirmed_by_slot
.entry(r.confirmed_slot)
.or_insert(0) += 1;
}
sleep(Duration::from_secs(5));
let mut block_by_slot = HashMap::new();
for slot in slots.iter() {
let maybe_block = rpc_client.get_block_with_config(
*slot,
RpcBlockConfig {
encoding: Some(UiTransactionEncoding::Base64),
transaction_details: Some(TransactionDetails::Signatures),
rewards: Some(true),
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
}),
max_supported_transaction_version: None,
},
);
block_by_slot.insert(*slot, maybe_block);
}
println!("slot,leader_id,block_time,block_txs,bench_txs");
for slot in slots.iter() {
let bench_txs = num_tx_confirmed_by_slot[slot];
match &block_by_slot[slot] {
Ok(block) => {
let leader_pk = &block
.rewards
.as_ref()
.unwrap()
.iter()
.find(|r| r.reward_type == Some(RewardType::Fee))
.unwrap()
.pubkey;
println!(
"{},{},{:?},{},{}",
slot,
leader_pk,
block.block_time.as_ref().unwrap(),
block.signatures.as_ref().unwrap().len(),
bench_txs
);
}
Err(err) => {
error!(
"could not fetch slot={} bench_txs={} err={}",
*slot, bench_txs, err
);
}
}
}
info!("tx csv stats");
println!("send_time,send_slot,confirmed_time,confirmed_slot,block_time");
for tx in confirmed.iter() {
let slot = tx.confirmed_slot;
match &block_by_slot[&slot] {
Ok(block) => {
println!(
"{:?},{},{:?},{},{}",
tx.sent_at,
tx.sent_slot,
tx.confirmed_at,
tx.confirmed_slot,
block.block_time.as_ref().unwrap()
);
}
Err(err) => {
println!(
"{:?},{},{:?},{},",
tx.sent_at, tx.sent_slot, tx.confirmed_at, tx.confirmed_slot
);
}
}
}
for tx in timeouts.iter() {
println!("{:?},{},,,", tx.sent_at, tx.sent_slot);
}
})
.unwrap();