From 7ed29df55f3485538c74efffa139da34d46531f0 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 17 Jul 2023 08:25:55 +0200 Subject: [PATCH] check tx uniqueness --- services/Cargo.toml | 1 + ...literpc_tpu_quic_server_integrationtest.rs | 36 +++++-------------- 2 files changed, 10 insertions(+), 27 deletions(-) diff --git a/services/Cargo.toml b/services/Cargo.toml index cbf20bdc..06a18600 100644 --- a/services/Cargo.toml +++ b/services/Cargo.toml @@ -42,4 +42,5 @@ tracing-subscriber = { workspace = true, features = ["std", "env-filter"] } [dev-dependencies] # note: version 0.5.6 has a known bug crossbeam-channel = "0.5.6" +countmap = "0.2.0" diff --git a/services/tests/literpc_tpu_quic_server_integrationtest.rs b/services/tests/literpc_tpu_quic_server_integrationtest.rs index 6b9ef022..a886fd54 100644 --- a/services/tests/literpc_tpu_quic_server_integrationtest.rs +++ b/services/tests/literpc_tpu_quic_server_integrationtest.rs @@ -8,10 +8,11 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::{Duration, Instant}; use anyhow::bail; +use countmap::CountMap; use crossbeam_channel::{Receiver, RecvError, Sender}; use futures::future::join_all; use itertools::join; -use log::{debug, error, info}; +use log::{debug, error, info, trace}; use quinn::TokioRuntime; use serde::de::Unexpected::Option; use solana_rpc_client::rpc_client::SerializableTransaction; @@ -123,7 +124,8 @@ pub fn wireup_and_send_txs_via_channel() { for packet in packet_batch.iter() { let tx = packet.deserialize_slice::(..).unwrap(); - // debug!("read transaction from quic streaming server: {:?}", tx.get_signature()); + trace!("read transaction from quic streaming server: {:?}", tx.get_signature()); + count_map.insert_or_increment(*tx.get_signature()); // for ix in tx.message.instructions() { // info!("instruction: {:?}", ix.data); // } @@ -149,7 +151,11 @@ pub fn wireup_and_send_txs_via_channel() { ); info!("got all expected packets - shutting down tokio runtime with lite-rpc client"); - runtime2.shutdown_timeout(Duration::from_millis(1000)); + + assert_eq!(count_map.len() as u32, SAMPLE_TX_COUNT, "count_map size should be equal to SAMPLE_TX_COUNT"); + assert!(count_map.values().all(|cnt| *cnt == 1), "all transactions should be unique"); + + runtime_literpc.shutdown_timeout(Duration::from_millis(1000)); }); @@ -374,27 +380,3 @@ fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction { Transaction::new(&[payer], message, blockhash) } -fn check_packets( - receiver: Receiver, - num_bytes: usize, - num_expected_packets: usize, -) { - let mut all_packets = vec![]; - let now = Instant::now(); - let mut total_packets: usize = 0; - while now.elapsed().as_secs() < 10 { - if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) { - total_packets = total_packets.saturating_add(packets.len()); - all_packets.push(packets) - } - if total_packets >= num_expected_packets { - break; - } - } - for batch in all_packets { - for p in &batch { - assert_eq!(p.meta().size, num_bytes); - } - } - assert_eq!(total_packets, num_expected_packets); -}