From 0d4f83a6fcf5782b29058731f59602cb006f1f7d Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 30 Jun 2023 16:14:20 +0200 Subject: [PATCH] fix stake setup --- ...literpc_tpu_quic_server_integrationtest.rs | 133 +++++++++++++----- 1 file changed, 94 insertions(+), 39 deletions(-) diff --git a/services/tests/literpc_tpu_quic_server_integrationtest.rs b/services/tests/literpc_tpu_quic_server_integrationtest.rs index 724b4d29..7d6c9a7b 100644 --- a/services/tests/literpc_tpu_quic_server_integrationtest.rs +++ b/services/tests/literpc_tpu_quic_server_integrationtest.rs @@ -5,10 +5,13 @@ use std::path::Path; use std::str::FromStr; use std::sync::{Arc, RwLock}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; use std::time::{Duration, Instant}; -use crossbeam_channel::Receiver; +use anyhow::bail; +use crossbeam_channel::{Receiver, RecvError, Sender}; use futures::future::join_all; -use log::{debug, info}; +use itertools::join; +use log::{debug, error, info}; use quinn::TokioRuntime; use solana_rpc_client::rpc_client::SerializableTransaction; use solana_sdk::hash::Hash; @@ -34,6 +37,9 @@ use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; use solana_lite_rpc_core::tx_store::empty_tx_store; use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager; + +const SAMPLE_TX_COUNT: u32 = 2; + const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; #[test] @@ -44,7 +50,7 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> { // solana quic streamer - see quic.rs -> rt() const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1; let runtime1 = Builder::new_multi_thread() - .worker_threads(NUM_QUIC_STREAMER_WORKER_THREADS) + .worker_threads(1) .thread_name("quic-server") .enable_all() .build() @@ -59,32 +65,94 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> { .expect("failed to build tokio runtime for lite-rpc"); + let literpc_validator_identity = Arc::new(Keypair::new()); let udp_listen_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let listen_addr = udp_listen_socket.local_addr()?; - let literpc_validator_identity = Arc::new(Keypair::new()); + + let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded(); runtime1.block_on(async { /// setup solana Quic streamer // see log "Start quic server on UdpSocket { addr: 127.0.0.1:xxxxx, fd: 10 }" - let mut solana_quic_streamer = SolanaQuicStreamer::new_start_listening(udp_listen_socket); - if STAKE_CONNECTION { - solana_quic_streamer.add_stake_for_identity( - literpc_validator_identity.as_ref(), - 30, - ); - } + let staked_nodes = StakedNodes { + total_stake: 100, + max_stake: 40, + min_stake: 0, + ip_stake_map: Default::default(), + pubkey_stake_map: + // literpc_validator_identity.as_ref() + if STAKE_CONNECTION { + let mut map = HashMap::new(); + map.insert(literpc_validator_identity.pubkey(),30); + map + } else { HashMap::default() } + }; + let mut solana_quic_streamer = SolanaQuicStreamer::new_start_listening(udp_listen_socket, inbound_packets_sender, staked_nodes); + + + info!("end of quic streamer runtime"); }); runtime2.block_on(start_literpc_client(listen_addr, literpc_validator_identity))?; + let packet_consumer_jh = thread::spawn(move || { + info!("start pulling packets..."); + let mut packet_count = 0; + while packet_count != SAMPLE_TX_COUNT { + match inbound_packets_receiver.recv() { + Ok(packet_batch) => { + info!("received packets: {}", packet_batch.len()); + packet_count = packet_count + packet_batch.len() as u32; + + for packet in packet_batch.iter() { + info!("packet: {:?}", packet); + let tx = packet.deserialize_slice::(..).unwrap(); + debug!("read transaction from quic streaming server: {:?}", tx.get_signature()); + // for ix in tx.message.instructions() { + // info!("instruction: {:?}", ix.data); + // } + } + + if packet_count == 10 { + return; + } + } + Err(err) => { + error!("Error polling quic streaming channel: {}", err); + } + } + } + while let Ok(packet_batch) = inbound_packets_receiver.recv() { + info!("received packets: {}", packet_batch.len()); + packet_count = packet_count + packet_batch.len() as u32; + + for packet in packet_batch.iter() { + info!("packet: {:?}", packet); + let tx = packet.deserialize_slice::(..).unwrap(); + debug!("read transaction from quic streaming server: {:?}", tx.get_signature()); + // for ix in tx.message.instructions() { + // info!("instruction: {:?}", ix.data); + // } + } + + if packet_count == 10 { + return; + } + } + }); + + + // shutdown streamer // solana_quic_streamer.shutdown().await; + packet_consumer_jh.join().unwrap(); + Ok(()) } @@ -146,23 +214,22 @@ async fn start_literpc_client(listen_addrs: SocketAddr, literpc_validator_identi ) .await; - let raw_sample_tx = build_raw_sample_tx(); + // TODO this is a rase + sleep(Duration::from_millis(1500)).await; - broadcast_sender.send(raw_sample_tx)?; + for i in 0..SAMPLE_TX_COUNT { + let raw_sample_tx = build_raw_sample_tx(i); + debug!("broadcast transaction {}: {}", raw_sample_tx.0, format!("hi {}", i)); + broadcast_sender.send(raw_sample_tx)?; + } - // TODO improve - sleep(Duration::from_millis(100)).await; - - debug!("remaining: {}", broadcast_sender.len()); + debug!("end of lite-rpc client runtime: {}", broadcast_sender.len()); Ok(()) } #[tokio::test] // taken from solana -> test_nonblocking_quic_client_multiple_writes async fn solana_quic_streamer_start() { - tracing_subscriber::fmt::fmt() - .with_max_level(tracing::Level::TRACE).init(); - let (sender, _receiver) = crossbeam_channel::unbounded(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); // will create random free port @@ -205,18 +272,12 @@ struct SolanaQuicStreamer { exit: Arc, join_handler: JoinHandle<()>, stats: Arc, - staked_nodes: Arc>, } impl SolanaQuicStreamer { pub fn get_socket_addr(&self) -> SocketAddr { self.sock.local_addr().unwrap() } - pub fn add_stake_for_identity(&self, identity: &Keypair, stake: u64) { - let mut lock = self.staked_nodes.write().unwrap(); - let prev = lock.pubkey_stake_map.insert(identity.pubkey(), stake); - assert!(prev.is_none(), "identity {} already staked", identity.pubkey()); - } } impl SolanaQuicStreamer { @@ -228,9 +289,8 @@ impl SolanaQuicStreamer { } impl SolanaQuicStreamer { - fn new_start_listening(udp_socket: UdpSocket) -> Self { - let (sender, _receiver) = crossbeam_channel::unbounded(); - let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); + fn new_start_listening(udp_socket: UdpSocket, sender: Sender, staked_nodes: StakedNodes) -> Self { + let staked_nodes = Arc::new(RwLock::new(staked_nodes)); let exit = Arc::new(AtomicBool::new(false)); // keypair to derive the server tls certificate let keypair = Keypair::new(); @@ -261,7 +321,6 @@ impl SolanaQuicStreamer { exit, join_handler: jh, stats, - staked_nodes, } } } @@ -269,24 +328,20 @@ impl SolanaQuicStreamer { const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr"; -pub fn build_raw_sample_tx() -> (String, Vec) { +pub fn build_raw_sample_tx(i: u32) -> (String, Vec) { - // FIXME - let payer_keypair = keypair::read_keypair_file( - Path::new("/Users/stefan/mango/solana-wallet/solana-testnet-stefantest.json") - ).unwrap(); + let payer_keypair = Keypair::from_base58_string("rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr"); - - let tx = build_sample_tx(&payer_keypair); + let tx = build_sample_tx(&payer_keypair, i); let raw_tx = bincode::serialize::(&tx).expect("failed to serialize tx"); (tx.get_signature().to_string(), raw_tx) } -fn build_sample_tx(payer_keypair: &Keypair) -> VersionedTransaction { +fn build_sample_tx(payer_keypair: &Keypair, i: u32) -> VersionedTransaction { let blockhash = Hash::default(); - create_memo_tx(b"hi", payer_keypair, blockhash).into() + create_memo_tx(format!("hi {}", i).as_bytes(), payer_keypair, blockhash).into() } fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {