check tx uniqueness

This commit is contained in:
GroovieGermanikus 2023-07-17 08:25:55 +02:00
parent 4c6fe80608
commit 7ed29df55f
2 changed files with 10 additions and 27 deletions

View File

@ -42,4 +42,5 @@ tracing-subscriber = { workspace = true, features = ["std", "env-filter"] }
[dev-dependencies]
# note: version 0.5.6 has a known bug
crossbeam-channel = "0.5.6"
countmap = "0.2.0"

View File

@ -8,10 +8,11 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, Instant};
use anyhow::bail;
use countmap::CountMap;
use crossbeam_channel::{Receiver, RecvError, Sender};
use futures::future::join_all;
use itertools::join;
use log::{debug, error, info};
use log::{debug, error, info, trace};
use quinn::TokioRuntime;
use serde::de::Unexpected::Option;
use solana_rpc_client::rpc_client::SerializableTransaction;
@ -123,7 +124,8 @@ pub fn wireup_and_send_txs_via_channel() {
for packet in packet_batch.iter() {
let tx = packet.deserialize_slice::<VersionedTransaction, _>(..).unwrap();
// debug!("read transaction from quic streaming server: {:?}", tx.get_signature());
trace!("read transaction from quic streaming server: {:?}", tx.get_signature());
count_map.insert_or_increment(*tx.get_signature());
// for ix in tx.message.instructions() {
// info!("instruction: {:?}", ix.data);
// }
@ -149,7 +151,11 @@ pub fn wireup_and_send_txs_via_channel() {
);
info!("got all expected packets - shutting down tokio runtime with lite-rpc client");
runtime2.shutdown_timeout(Duration::from_millis(1000));
assert_eq!(count_map.len() as u32, SAMPLE_TX_COUNT, "count_map size should be equal to SAMPLE_TX_COUNT");
assert!(count_map.values().all(|cnt| *cnt == 1), "all transactions should be unique");
runtime_literpc.shutdown_timeout(Duration::from_millis(1000));
});
@ -374,27 +380,3 @@ fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
Transaction::new(&[payer], message, blockhash)
}
fn check_packets(
receiver: Receiver<PacketBatch>,
num_bytes: usize,
num_expected_packets: usize,
) {
let mut all_packets = vec![];
let now = Instant::now();
let mut total_packets: usize = 0;
while now.elapsed().as_secs() < 10 {
if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) {
total_packets = total_packets.saturating_add(packets.len());
all_packets.push(packets)
}
if total_packets >= num_expected_packets {
break;
}
}
for batch in all_packets {
for p in &batch {
assert_eq!(p.meta().size, num_bytes);
}
}
assert_eq!(total_packets, num_expected_packets);
}