do proper shutdown

This commit is contained in:
GroovieGermanikus 2023-06-30 17:37:27 +02:00
parent 0d4f83a6fc
commit 1fe7f23fc5
1 changed files with 33 additions and 45 deletions

View File

@ -27,11 +27,11 @@ use solana_streamer::quic::StreamStats;
use solana_streamer::streamer::StakedNodes; use solana_streamer::streamer::StakedNodes;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use tokio::runtime::{Builder, Runtime}; use tokio::runtime::{Builder, Runtime};
use tokio::{join, spawn}; use tokio::{join, spawn, task};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::broadcast::error::SendError; use tokio::sync::broadcast::error::SendError;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::sleep; use tokio::time::{interval, sleep};
use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::util::SubscriberInitExt;
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes;
use solana_lite_rpc_core::tx_store::empty_tx_store; use solana_lite_rpc_core::tx_store::empty_tx_store;
@ -41,9 +41,10 @@ use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionMa
const SAMPLE_TX_COUNT: u32 = 2; const SAMPLE_TX_COUNT: u32 = 2;
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000;
const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
#[test] #[test]
pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> { pub fn wireup_and_send_txs_via_channel() {
tracing_subscriber::fmt::fmt() tracing_subscriber::fmt::fmt()
.with_max_level(tracing::Level::TRACE).init(); .with_max_level(tracing::Level::TRACE).init();
@ -67,7 +68,7 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> {
let literpc_validator_identity = Arc::new(Keypair::new()); let literpc_validator_identity = Arc::new(Keypair::new());
let udp_listen_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let udp_listen_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
let listen_addr = udp_listen_socket.local_addr()?; let listen_addr = udp_listen_socket.local_addr().unwrap();
let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded(); let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded();
@ -94,11 +95,11 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> {
let mut solana_quic_streamer = SolanaQuicStreamer::new_start_listening(udp_listen_socket, inbound_packets_sender, staked_nodes); let mut solana_quic_streamer = SolanaQuicStreamer::new_start_listening(udp_listen_socket, inbound_packets_sender, staked_nodes);
info!("end of quic streamer runtime");
}); });
runtime2.block_on(start_literpc_client(listen_addr, literpc_validator_identity))?; runtime2.block_on(async {
tokio::spawn(start_literpc_client(listen_addr, literpc_validator_identity));
});
let packet_consumer_jh = thread::spawn(move || { let packet_consumer_jh = thread::spawn(move || {
info!("start pulling packets..."); info!("start pulling packets...");
@ -118,32 +119,18 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> {
// } // }
} }
if packet_count == 10 { if packet_count == SAMPLE_TX_COUNT {
return; break;
} }
} }
Err(err) => { Err(err) => {
error!("Error polling quic streaming channel: {}", err); panic!("Error polling quic streaming channel: {}", err);
} }
} }
} }
while let Ok(packet_batch) = inbound_packets_receiver.recv() {
info!("received packets: {}", packet_batch.len());
packet_count = packet_count + packet_batch.len() as u32;
for packet in packet_batch.iter() { info!("got all expected packets - shutting down tokio runtime with lite-rpc client");
info!("packet: {:?}", packet); runtime2.shutdown_timeout(Duration::from_millis(100));
let tx = packet.deserialize_slice::<VersionedTransaction, _>(..).unwrap();
debug!("read transaction from quic streaming server: {:?}", tx.get_signature());
// for ix in tx.message.instructions() {
// info!("instruction: {:?}", ix.data);
// }
}
if packet_count == 10 {
return;
}
}
}); });
@ -153,17 +140,15 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> {
packet_consumer_jh.join().unwrap(); packet_consumer_jh.join().unwrap();
Ok(())
} }
const STAKE_CONNECTION: bool = true; const STAKE_CONNECTION: bool = true;
async fn start_literpc_client(listen_addrs: SocketAddr, literpc_validator_identity: Arc<Keypair>) -> anyhow::Result<()> { async fn start_literpc_client(streamer_listen_addrs: SocketAddr, literpc_validator_identity: Arc<Keypair>) -> anyhow::Result<()> {
let fanout_slots = 4; let fanout_slots = 1; // TODO change
// (String, Vec<u8>) (signature, transaction) // (String, Vec<u8>) (signature, transaction)
// _keeper is used to prevent the ref-count base closing to kick in let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
let (sender, _keeper) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
let broadcast_sender = Arc::new(sender); let broadcast_sender = Arc::new(sender);
let (certificate, key) = new_self_signed_tls_certificate( let (certificate, key) = new_self_signed_tls_certificate(
literpc_validator_identity.as_ref(), literpc_validator_identity.as_ref(),
@ -176,20 +161,20 @@ async fn start_literpc_client(listen_addrs: SocketAddr, literpc_validator_identi
// this effectively controls how many connections we will have // this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new(); let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
connections_to_keep.insert( // connections_to_keep.insert(
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, // Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
"127.0.0.1:20001".parse()?, // "127.0.0.1:20001".parse()?,
); // );
connections_to_keep.insert( // connections_to_keep.insert(
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, // Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
"127.0.0.1:20002".parse()?, // "127.0.0.1:20002".parse()?,
); // );
// this is the real streamer // this is the real streamer
connections_to_keep.insert( connections_to_keep.insert(
literpc_validator_identity.pubkey(), literpc_validator_identity.pubkey(),
listen_addrs, streamer_listen_addrs,
); );
// get information about the optional validator identity stake // get information about the optional validator identity stake
@ -214,16 +199,19 @@ async fn start_literpc_client(listen_addrs: SocketAddr, literpc_validator_identi
) )
.await; .await;
// TODO this is a rase // TODO this is a race
sleep(Duration::from_millis(1500)).await; sleep(Duration::from_millis(1500)).await;
for i in 0..SAMPLE_TX_COUNT { for i in 0..SAMPLE_TX_COUNT {
let raw_sample_tx = build_raw_sample_tx(i); let raw_sample_tx = build_raw_sample_tx(i);
debug!("broadcast transaction {}: {}", raw_sample_tx.0, format!("hi {}", i)); debug!("broadcast transaction {} to {} receivers: {}",
raw_sample_tx.0, broadcast_sender.receiver_count(), format!("hi {}", i));
broadcast_sender.send(raw_sample_tx)?; broadcast_sender.send(raw_sample_tx)?;
} }
debug!("end of lite-rpc client runtime: {}", broadcast_sender.len()); sleep(Duration::from_secs(30)).await;
Ok(()) Ok(())
} }
@ -259,7 +247,7 @@ async fn solana_quic_streamer_start() {
let port = sock.local_addr().unwrap().port(); let port = sock.local_addr().unwrap().port();
let tpu_addr = SocketAddr::new(addr, port); let tpu_addr = SocketAddr::new(addr, port);
sleep(Duration::from_millis(500)).await; // sleep(Duration::from_millis(500)).await;
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
t.await.unwrap(); t.await.unwrap();
@ -303,7 +291,7 @@ impl SolanaQuicStreamer {
gossip_host, gossip_host,
sender, sender,
exit.clone(), exit.clone(),
1, MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(), staked_nodes.clone(),
10, 10,
10, 10,