add some tests to check tx.subscribe behaviour

This commit is contained in:
GroovieGermanikus 2023-06-29 14:12:05 +02:00
parent cdc3d1444d
commit 91ec468495
2 changed files with 76 additions and 2 deletions

View File

@ -290,26 +290,34 @@ mod test {
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use futures::future::join_all;
use log::info;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_streamer::nonblocking::quic::ConnectionPeerType;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use tokio::runtime::Runtime;
use tokio::{join, spawn};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::SendError;
use tokio::time::sleep;
use tracing_subscriber::util::SubscriberInitExt;
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes;
use solana_lite_rpc_core::tx_store::empty_tx_store;
use crate::tpu_utils::tpu_connection_manager::TpuConnectionManager;
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000;
#[tokio::test]
async fn test() {
async fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> {
tracing_subscriber::fmt::fmt()
.with_max_level(tracing::Level::TRACE).init();
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000;
let txs_sent_store = empty_tx_store();
let validator_identity = Arc::new(Keypair::new());
let fanout_slots = 4;
// (String, Vec<u8>) (signature, transaction)
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
let broadcast_sender = Arc::new(sender);
let (certificate, key) = new_self_signed_tls_certificate(
@ -333,6 +341,8 @@ mod test {
total_stakes: 100,
};
let receiver2 = broadcast_sender.subscribe();
tpu_connection_manager
.update_connections(
broadcast_sender.clone(),
@ -343,6 +353,68 @@ mod test {
.await;
broadcast_sender.send(("sig01".to_string(), vec![1,2,3,4]))?;
println!("remaining: {}", broadcast_sender.len());
Ok(())
}
#[tokio::test]
async fn breaking_with_no_receiver() {
let (tx, _) = broadcast::channel(16);
assert!(matches!(tx.send(10).unwrap_err(), SendError(10)));
}
#[tokio::test]
async fn subscribe_drop_fail() {
let (tx, _) = broadcast::channel::<i32>(16);
let mut rx1 = tx.subscribe();
// let mut rx2 = tx.subscribe();
drop(rx1);
assert_eq!(tx.receiver_count(), 0);
assert!(matches!(tx.send(10).unwrap_err(), SendError(10)));
}
#[tokio::test]
async fn subscribe_drop_okey() {
let (tx, _) = broadcast::channel(16);
let mut rx1 = tx.subscribe();
// this makes the difference
let rx2 = tx.subscribe();
drop(rx1);
assert_eq!(tx.receiver_count(), 1);
tx.send(10).unwrap();
}
#[tokio::test]
async fn closing_from_docs() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
let jh1 = spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
let jh2 = spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
join!(jh1, jh2);
}
}

View File

@ -22,6 +22,8 @@ use std::{
Arc,
},
};
use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::{
sync::RwLock,
time::{Duration, Instant},