2023-02-15 08:08:55 -08:00
|
|
|
use std::{
|
|
|
|
collections::HashMap,
|
|
|
|
ops::Div,
|
|
|
|
str::FromStr,
|
|
|
|
sync::{
|
|
|
|
Arc, RwLock,
|
|
|
|
},
|
|
|
|
thread::{sleep, Builder},
|
|
|
|
time::Duration,
|
|
|
|
};
|
|
|
|
|
|
|
|
use chrono::Utc;
|
|
|
|
use crossbeam_channel::{Receiver, TryRecvError};
|
|
|
|
use log::{debug, error, info, trace};
|
|
|
|
use solana_client::{rpc_client::RpcClient, rpc_config::RpcBlockConfig};
|
|
|
|
use solana_program::pubkey::Pubkey;
|
|
|
|
use solana_sdk::{
|
|
|
|
commitment_config::{CommitmentConfig, CommitmentLevel},
|
|
|
|
signature::Signature,
|
|
|
|
};
|
|
|
|
use solana_transaction_status::RewardType;
|
|
|
|
|
|
|
|
use crate::{
|
|
|
|
helpers::seconds_since,
|
|
|
|
states::{BlockData, TransactionConfirmRecord, TransactionSendRecord},
|
|
|
|
};
|
|
|
|
|
|
|
|
pub fn process_signature_confirmation_batch(
|
|
|
|
rpc_client: &RpcClient,
|
|
|
|
batch: &Vec<TransactionSendRecord>,
|
|
|
|
not_confirmed: &Arc<RwLock<Vec<TransactionSendRecord>>>,
|
|
|
|
confirmed: &Arc<RwLock<Vec<TransactionConfirmRecord>>>,
|
|
|
|
timeouts: Arc<RwLock<Vec<TransactionSendRecord>>>,
|
|
|
|
timeout: u64,
|
|
|
|
) {
|
|
|
|
match rpc_client.get_signature_statuses(&batch.iter().map(|t| t.signature).collect::<Vec<_>>())
|
|
|
|
{
|
|
|
|
Ok(statuses) => {
|
|
|
|
trace!("batch result {:?}", statuses);
|
|
|
|
for (i, s) in statuses.value.iter().enumerate() {
|
|
|
|
let tx_record = &batch[i];
|
|
|
|
match s {
|
|
|
|
Some(s) => {
|
|
|
|
if s.confirmation_status.is_none() {
|
|
|
|
not_confirmed.write().unwrap().push(tx_record.clone());
|
|
|
|
} else {
|
|
|
|
let mut lock = confirmed.write().unwrap();
|
|
|
|
(*lock).push(TransactionConfirmRecord {
|
|
|
|
signature: tx_record.signature.to_string(),
|
|
|
|
sent_slot: tx_record.sent_slot,
|
|
|
|
sent_at: tx_record.sent_at.to_string(),
|
|
|
|
confirmed_at: Utc::now().to_string(),
|
|
|
|
confirmed_slot: s.slot,
|
|
|
|
successful: s.err.is_none(),
|
|
|
|
error: match &s.err {
|
|
|
|
Some(e) => e.to_string(),
|
|
|
|
None => "".to_string(),
|
|
|
|
},
|
|
|
|
block_hash: Pubkey::default().to_string(),
|
|
|
|
slot_leader: Pubkey::default().to_string(),
|
|
|
|
market: tx_record.market.to_string(),
|
|
|
|
market_maker: tx_record.market_maker.to_string(),
|
|
|
|
slot_processed: tx_record.sent_slot,
|
|
|
|
timed_out: false,
|
2023-02-21 02:04:02 -08:00
|
|
|
priority_fees: tx_record.priority_fees,
|
2023-02-15 08:08:55 -08:00
|
|
|
});
|
|
|
|
|
|
|
|
debug!(
|
|
|
|
"confirmed sig={} duration={:?}",
|
|
|
|
tx_record.signature,
|
|
|
|
seconds_since(tx_record.sent_at)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
None => {
|
|
|
|
if seconds_since(tx_record.sent_at) > timeout as i64 {
|
|
|
|
debug!(
|
|
|
|
"could not confirm tx {} within {} seconds, dropping it",
|
|
|
|
tx_record.signature, timeout
|
|
|
|
);
|
|
|
|
let mut lock = timeouts.write().unwrap();
|
|
|
|
(*lock).push(tx_record.clone())
|
|
|
|
} else {
|
|
|
|
not_confirmed.write().unwrap().push(tx_record.clone());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(err) => {
|
|
|
|
error!("could not confirm signatures err={}", err);
|
|
|
|
not_confirmed.write().unwrap().extend_from_slice(batch);
|
|
|
|
sleep(Duration::from_millis(500));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn confirmation_by_querying_rpc(
|
|
|
|
recv_limit: usize,
|
|
|
|
rpc_client: Arc<RpcClient>,
|
|
|
|
tx_record_rx: &Receiver<TransactionSendRecord>,
|
|
|
|
tx_confirm_records: Arc<RwLock<Vec<TransactionConfirmRecord>>>,
|
|
|
|
tx_timeout_records: Arc<RwLock<Vec<TransactionSendRecord>>>,
|
|
|
|
) {
|
|
|
|
const TIMEOUT: u64 = 30;
|
|
|
|
let mut recv_until_confirm = recv_limit;
|
|
|
|
let not_confirmed: Arc<RwLock<Vec<TransactionSendRecord>>> = Arc::new(RwLock::new(Vec::new()));
|
|
|
|
loop {
|
|
|
|
let has_signatures_to_confirm = { not_confirmed.read().unwrap().len() > 0 };
|
|
|
|
if has_signatures_to_confirm {
|
|
|
|
// collect all not confirmed records in a new buffer
|
|
|
|
|
|
|
|
const BATCH_SIZE: usize = 256;
|
|
|
|
let to_confirm = {
|
|
|
|
let mut lock = not_confirmed.write().unwrap();
|
|
|
|
let to_confirm = (*lock).clone();
|
|
|
|
(*lock).clear();
|
|
|
|
to_confirm
|
|
|
|
};
|
|
|
|
|
|
|
|
info!(
|
|
|
|
"break from reading channel, try to confirm {} in {} batches",
|
|
|
|
to_confirm.len(),
|
|
|
|
(to_confirm.len() / BATCH_SIZE)
|
|
|
|
+ if to_confirm.len() % BATCH_SIZE > 0 {
|
|
|
|
1
|
|
|
|
} else {
|
|
|
|
0
|
|
|
|
}
|
|
|
|
);
|
|
|
|
|
|
|
|
let confirmed = tx_confirm_records.clone();
|
|
|
|
let timeouts = tx_timeout_records.clone();
|
|
|
|
for batch in to_confirm.rchunks(BATCH_SIZE).map(|x| x.to_vec()) {
|
|
|
|
process_signature_confirmation_batch(
|
|
|
|
&rpc_client,
|
|
|
|
&batch,
|
|
|
|
¬_confirmed,
|
|
|
|
&confirmed,
|
|
|
|
timeouts.clone(),
|
|
|
|
TIMEOUT,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
// multi threaded implementation of confirming batches
|
|
|
|
// let mut confirmation_handles = Vec::new();
|
|
|
|
// for batch in to_confirm.rchunks(BATCH_SIZE).map(|x| x.to_vec()) {
|
|
|
|
// let rpc_client = rpc_client.clone();
|
|
|
|
// let not_confirmed = not_confirmed.clone();
|
|
|
|
// let confirmed = tx_confirm_records.clone();
|
|
|
|
|
|
|
|
// let join_handle = Builder::new().name("solana-transaction-confirmation".to_string()).spawn(move || {
|
|
|
|
// process_signature_confirmation_batch(&rpc_client, &batch, ¬_confirmed, &confirmed, TIMEOUT)
|
|
|
|
// }).unwrap();
|
|
|
|
// confirmation_handles.push(join_handle);
|
|
|
|
// };
|
|
|
|
// for confirmation_handle in confirmation_handles {
|
|
|
|
// let (errors, timeouts) = confirmation_handle.join().unwrap();
|
|
|
|
// error_count += errors;
|
|
|
|
// timeout_count += timeouts;
|
|
|
|
// }
|
|
|
|
// sleep(Duration::from_millis(100)); // so the confirmation thread does not spam a lot the rpc node
|
|
|
|
}
|
|
|
|
{
|
|
|
|
if recv_until_confirm == 0 && not_confirmed.read().unwrap().len() == 0 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// context for writing all the not_confirmed_transactions
|
|
|
|
if recv_until_confirm > 0 {
|
|
|
|
let mut lock = not_confirmed.write().unwrap();
|
|
|
|
loop {
|
|
|
|
match tx_record_rx.try_recv() {
|
|
|
|
Ok(tx_record) => {
|
|
|
|
debug!(
|
|
|
|
"add to queue len={} sig={}",
|
|
|
|
(*lock).len() + 1,
|
|
|
|
tx_record.signature
|
|
|
|
);
|
|
|
|
(*lock).push(tx_record);
|
|
|
|
|
|
|
|
recv_until_confirm -= 1;
|
|
|
|
}
|
|
|
|
Err(TryRecvError::Empty) => {
|
|
|
|
debug!("channel emptied");
|
|
|
|
sleep(Duration::from_millis(100));
|
|
|
|
break; // still confirm remaining transctions
|
|
|
|
}
|
|
|
|
Err(TryRecvError::Disconnected) => {
|
|
|
|
{
|
|
|
|
info!("channel disconnected {}", recv_until_confirm);
|
|
|
|
}
|
|
|
|
debug!("channel disconnected");
|
|
|
|
break; // still confirm remaining transctions
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn confirmations_by_blocks(
|
2023-02-21 00:49:20 -08:00
|
|
|
client: Arc<RpcClient>,
|
2023-02-15 08:08:55 -08:00
|
|
|
recv_limit: usize,
|
|
|
|
tx_record_rx: Receiver<TransactionSendRecord>,
|
|
|
|
tx_confirm_records: Arc<RwLock<Vec<TransactionConfirmRecord>>>,
|
|
|
|
tx_timeout_records: Arc<RwLock<Vec<TransactionSendRecord>>>,
|
|
|
|
tx_block_data: Arc<RwLock<Vec<BlockData>>>,
|
2023-02-23 05:41:12 -08:00
|
|
|
from_slot: u64,
|
2023-02-15 08:08:55 -08:00
|
|
|
) {
|
|
|
|
let mut recv_until_confirm = recv_limit;
|
|
|
|
let transaction_map = Arc::new(RwLock::new(
|
|
|
|
HashMap::<Signature, TransactionSendRecord>::new(),
|
|
|
|
));
|
|
|
|
while recv_until_confirm != 0 {
|
|
|
|
match tx_record_rx.try_recv() {
|
|
|
|
Ok(tx_record) => {
|
|
|
|
let mut transaction_map = transaction_map.write().unwrap();
|
|
|
|
debug!(
|
|
|
|
"add to queue len={} sig={}",
|
|
|
|
transaction_map.len() + 1,
|
|
|
|
tx_record.signature
|
|
|
|
);
|
|
|
|
transaction_map.insert(tx_record.signature, tx_record);
|
|
|
|
recv_until_confirm -= 1;
|
|
|
|
}
|
|
|
|
Err(TryRecvError::Empty) => {
|
|
|
|
debug!("channel emptied");
|
|
|
|
sleep(Duration::from_millis(100));
|
|
|
|
}
|
|
|
|
Err(TryRecvError::Disconnected) => {
|
|
|
|
{
|
|
|
|
info!("channel disconnected {}", recv_until_confirm);
|
|
|
|
}
|
|
|
|
debug!("channel disconnected");
|
|
|
|
break; // still confirm remaining transctions
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
println!("finished mapping all the trasactions");
|
|
|
|
sleep(Duration::from_secs(30));
|
|
|
|
let commitment_confirmation = CommitmentConfig {
|
|
|
|
commitment: CommitmentLevel::Confirmed,
|
|
|
|
};
|
2023-02-21 00:49:20 -08:00
|
|
|
let block_res = client
|
2023-02-23 05:41:12 -08:00
|
|
|
.get_blocks_with_commitment(from_slot, None, commitment_confirmation)
|
2023-02-15 08:08:55 -08:00
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let nb_blocks = block_res.len();
|
|
|
|
let nb_thread: usize = 16;
|
|
|
|
println!("processing {} blocks", nb_blocks);
|
|
|
|
|
|
|
|
let mut join_handles = Vec::new();
|
|
|
|
for slot_batch in block_res
|
|
|
|
.chunks(if nb_blocks > nb_thread {
|
|
|
|
nb_blocks.div(nb_thread)
|
|
|
|
} else {
|
|
|
|
nb_blocks
|
|
|
|
})
|
|
|
|
.map(|x| x.to_vec())
|
|
|
|
{
|
|
|
|
let map = transaction_map.clone();
|
2023-02-21 00:49:20 -08:00
|
|
|
let client = client.clone();
|
2023-02-15 08:08:55 -08:00
|
|
|
let tx_confirm_records = tx_confirm_records.clone();
|
|
|
|
let tx_block_data = tx_block_data.clone();
|
|
|
|
let joinble = Builder::new()
|
|
|
|
.name("getting blocks and searching transactions".to_string())
|
|
|
|
.spawn(move || {
|
|
|
|
for slot in slot_batch {
|
|
|
|
// retry search for block 10 times
|
|
|
|
let mut block = None;
|
|
|
|
for _i in 0..=10 {
|
|
|
|
let block_res = client
|
|
|
|
.get_block_with_config(
|
|
|
|
slot,
|
|
|
|
RpcBlockConfig {
|
|
|
|
encoding: None,
|
|
|
|
transaction_details: None,
|
|
|
|
rewards: None,
|
|
|
|
commitment: Some(commitment_confirmation),
|
|
|
|
max_supported_transaction_version: None,
|
|
|
|
},
|
|
|
|
);
|
|
|
|
|
|
|
|
match block_res {
|
|
|
|
Ok(x) => {
|
|
|
|
block = Some(x);
|
|
|
|
break;
|
|
|
|
},
|
|
|
|
_=>{
|
|
|
|
// do nothing
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let block = match block {
|
|
|
|
Some(x) => x,
|
|
|
|
None => continue,
|
|
|
|
};
|
|
|
|
let mut mm_transaction_count: u64 = 0;
|
|
|
|
let rewards = &block.rewards.unwrap();
|
|
|
|
let slot_leader = match rewards
|
|
|
|
.iter()
|
|
|
|
.find(|r| r.reward_type == Some(RewardType::Fee))
|
|
|
|
{
|
|
|
|
Some(x) => x.pubkey.clone(),
|
|
|
|
None=> "".to_string(),
|
|
|
|
};
|
|
|
|
|
|
|
|
if let Some(transactions) = block.transactions {
|
|
|
|
let nb_transactions = transactions.len();
|
|
|
|
let mut cu_consumed : u64 = 0;
|
|
|
|
for solana_transaction_status::EncodedTransactionWithStatusMeta {
|
|
|
|
transaction,
|
|
|
|
meta,
|
|
|
|
..
|
|
|
|
} in transactions
|
|
|
|
{
|
|
|
|
if let solana_transaction_status::EncodedTransaction::Json(
|
|
|
|
transaction,
|
|
|
|
) = transaction
|
|
|
|
{
|
|
|
|
for signature in transaction.signatures {
|
|
|
|
let signature = Signature::from_str(&signature).unwrap();
|
|
|
|
|
|
|
|
let transaction_record_op = {
|
|
|
|
let map = map.read().unwrap();
|
|
|
|
let rec = map.get(&signature);
|
|
|
|
match rec {
|
|
|
|
Some(x) => Some(x.clone()),
|
|
|
|
None => None,
|
|
|
|
}
|
|
|
|
};
|
|
|
|
// add CU in counter
|
|
|
|
if let Some(meta) = &meta {
|
|
|
|
match meta.compute_units_consumed {
|
|
|
|
solana_transaction_status::option_serializer::OptionSerializer::Some(x) => {
|
|
|
|
cu_consumed = cu_consumed.saturating_add(x);
|
|
|
|
},
|
|
|
|
_ => {},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if let Some(transaction_record) = transaction_record_op {
|
|
|
|
let mut lock = tx_confirm_records.write().unwrap();
|
|
|
|
mm_transaction_count += 1;
|
|
|
|
|
|
|
|
(*lock).push(TransactionConfirmRecord {
|
|
|
|
signature: transaction_record.signature.to_string(),
|
|
|
|
confirmed_slot: slot, // TODO: should be changed to correct slot
|
|
|
|
confirmed_at: Utc::now().to_string(),
|
|
|
|
sent_at: transaction_record.sent_at.to_string(),
|
|
|
|
sent_slot: transaction_record.sent_slot,
|
|
|
|
successful: if let Some(meta) = &meta {
|
|
|
|
meta.status.is_ok()
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
},
|
|
|
|
error: if let Some(meta) = &meta {
|
|
|
|
match &meta.err {
|
|
|
|
Some(x) => x.to_string(),
|
|
|
|
None => "".to_string(),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
"".to_string()
|
|
|
|
},
|
|
|
|
block_hash: block.blockhash.clone(),
|
|
|
|
market: transaction_record.market.to_string(),
|
|
|
|
market_maker: transaction_record.market_maker.to_string(),
|
|
|
|
slot_processed: slot,
|
|
|
|
slot_leader: slot_leader.clone(),
|
|
|
|
timed_out: false,
|
2023-02-21 02:04:02 -08:00
|
|
|
priority_fees: transaction_record.priority_fees,
|
2023-02-15 08:08:55 -08:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
map.write().unwrap().remove(&signature);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// push block data
|
|
|
|
{
|
|
|
|
let mut blockstats_writer = tx_block_data.write().unwrap();
|
|
|
|
blockstats_writer.push(BlockData {
|
|
|
|
block_hash: block.blockhash,
|
|
|
|
block_leader: slot_leader,
|
|
|
|
block_slot: slot,
|
|
|
|
block_time: if let Some(time) = block.block_time {
|
|
|
|
time as u64
|
|
|
|
} else {
|
|
|
|
0
|
|
|
|
},
|
|
|
|
number_of_mm_transactions: mm_transaction_count,
|
|
|
|
total_transactions: nb_transactions as u64,
|
|
|
|
cu_consumed: cu_consumed,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
join_handles.push(joinble);
|
|
|
|
}
|
|
|
|
for handle in join_handles {
|
|
|
|
handle.join().unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut timeout_writer = tx_timeout_records.write().unwrap();
|
|
|
|
for x in transaction_map.read().unwrap().iter() {
|
|
|
|
timeout_writer.push(x.1.clone())
|
|
|
|
}
|
|
|
|
|
|
|
|
// sort all blocks by slot and print info
|
|
|
|
{
|
|
|
|
let mut blockstats_writer = tx_block_data.write().unwrap();
|
|
|
|
blockstats_writer.sort_by(|a, b| a.block_slot.partial_cmp(&b.block_slot).unwrap());
|
|
|
|
for block_stat in blockstats_writer.iter() {
|
|
|
|
info!(
|
|
|
|
"block {} at slot {} contains {} transactions and consumerd {} CUs",
|
|
|
|
block_stat.block_hash,
|
|
|
|
block_stat.block_slot,
|
|
|
|
block_stat.total_transactions,
|
|
|
|
block_stat.cu_consumed,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|