code format

This commit is contained in:
GroovieGermanikus 2023-07-20 12:12:15 +02:00
parent e48ccbea57
commit b56058dc84
1 changed files with 85 additions and 52 deletions

View File

@ -1,12 +1,3 @@
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::ops::Deref;
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, Instant};
use anyhow::bail; use anyhow::bail;
use countmap::CountMap; use countmap::CountMap;
use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender}; use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender};
@ -14,6 +5,10 @@ use futures::future::join_all;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use quinn::TokioRuntime; use quinn::TokioRuntime;
use serde::de::Unexpected::Option; use serde::de::Unexpected::Option;
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes;
use solana_lite_rpc_core::tx_store::empty_tx_store;
use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager;
use solana_rpc_client::rpc_client::SerializableTransaction; use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::instruction::Instruction; use solana_sdk::instruction::Instruction;
@ -27,17 +22,22 @@ use solana_streamer::packet::PacketBatch;
use solana_streamer::quic::StreamStats; use solana_streamer::quic::StreamStats;
use solana_streamer::streamer::StakedNodes; use solana_streamer::streamer::StakedNodes;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::ops::Deref;
use std::path::Path;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
use tokio::runtime::{Builder, Runtime}; use tokio::runtime::{Builder, Runtime};
use tokio::{join, spawn, task};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::broadcast::error::SendError; use tokio::sync::broadcast::error::SendError;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::{interval, sleep}; use tokio::time::{interval, sleep};
use tracing_subscriber::{fmt, filter::LevelFilter}; use tokio::{join, spawn, task};
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters; use tracing_subscriber::{filter::LevelFilter, fmt};
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes;
use solana_lite_rpc_core::tx_store::empty_tx_store;
use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager;
// note: logging will be auto-adjusted // note: logging will be auto-adjusted
const SAMPLE_TX_COUNT: u32 = 20; const SAMPLE_TX_COUNT: u32 = 20;
@ -66,7 +66,6 @@ pub fn wireup_and_send_txs_via_channel() {
.build() .build()
.expect("failed to build tokio runtime for lite-rpc-tpu-client"); .expect("failed to build tokio runtime for lite-rpc-tpu-client");
let literpc_validator_identity = Arc::new(Keypair::new()); let literpc_validator_identity = Arc::new(Keypair::new());
let udp_listen_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let udp_listen_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
let listen_addr = udp_listen_socket.local_addr().unwrap(); let listen_addr = udp_listen_socket.local_addr().unwrap();
@ -74,10 +73,8 @@ pub fn wireup_and_send_txs_via_channel() {
let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded(); let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded();
runtime_quic1.block_on(async { runtime_quic1.block_on(async {
/// setup solana Quic streamer /// setup solana Quic streamer
// see log "Start quic server on UdpSocket { addr: 127.0.0.1:xxxxx, fd: 10 }" // see log "Start quic server on UdpSocket { addr: 127.0.0.1:xxxxx, fd: 10 }"
let staked_nodes = StakedNodes { let staked_nodes = StakedNodes {
total_stake: 100, total_stake: 100,
max_stake: 40, max_stake: 40,
@ -93,12 +90,17 @@ pub fn wireup_and_send_txs_via_channel() {
}; };
let _solana_quic_streamer = SolanaQuicStreamer::new_start_listening( let _solana_quic_streamer = SolanaQuicStreamer::new_start_listening(
udp_listen_socket, inbound_packets_sender, staked_nodes); udp_listen_socket,
inbound_packets_sender,
staked_nodes,
);
}); });
runtime_literpc.block_on(async { runtime_literpc.block_on(async {
tokio::spawn(start_literpc_client(listen_addr, literpc_validator_identity)); tokio::spawn(start_literpc_client(
listen_addr,
literpc_validator_identity,
));
}); });
let packet_consumer_jh = thread::spawn(move || { let packet_consumer_jh = thread::spawn(move || {
@ -118,7 +120,9 @@ pub fn wireup_and_send_txs_via_channel() {
break; break;
} }
let packet_batch = match inbound_packets_receiver.recv_timeout(Duration::from_millis(500)) { let packet_batch = match inbound_packets_receiver
.recv_timeout(Duration::from_millis(500))
{
Ok(batch) => batch, Ok(batch) => batch,
Err(_) => { Err(_) => {
debug!("consumer thread did not receive packets on inbound channel recently - continue polling"); debug!("consumer thread did not receive packets on inbound channel recently - continue polling");
@ -130,7 +134,10 @@ pub fn wireup_and_send_txs_via_channel() {
latest_tx = Instant::now(); latest_tx = Instant::now();
if packet_count == 0 { if packet_count == 0 {
info!("time to first packet {}ms", time_to_first.elapsed().as_millis()); info!(
"time to first packet {}ms",
time_to_first.elapsed().as_millis()
);
} }
packet_count = packet_count + packet_batch.len() as u32; packet_count = packet_count + packet_batch.len() as u32;
@ -139,8 +146,13 @@ pub fn wireup_and_send_txs_via_channel() {
} }
for packet in packet_batch.iter() { for packet in packet_batch.iter() {
let tx = packet.deserialize_slice::<VersionedTransaction, _>(..).unwrap(); let tx = packet
trace!("read transaction from quic streaming server: {:?}", tx.get_signature()); .deserialize_slice::<VersionedTransaction, _>(..)
.unwrap();
trace!(
"read transaction from quic streaming server: {:?}",
tx.get_signature()
);
count_map.insert_or_increment(*tx.get_signature()); count_map.insert_or_increment(*tx.get_signature());
// for ix in tx.message.instructions() { // for ix in tx.message.instructions() {
// info!("instruction: {:?}", ix.data); // info!("instruction: {:?}", ix.data);
@ -157,31 +169,39 @@ pub fn wireup_and_send_txs_via_channel() {
} // -- while not all packets received - by count } // -- while not all packets received - by count
let total_duration = timer.elapsed(); let total_duration = timer.elapsed();
let half_duration = timer2.map(|t| t.elapsed()).unwrap_or(Duration::from_secs(3333)); let half_duration = timer2
.map(|t| t.elapsed())
.unwrap_or(Duration::from_secs(3333));
// throughput_50 is second half of transactions - should iron out startup effects // throughput_50 is second half of transactions - should iron out startup effects
info!("consumed {} packets in {}us - throughput {:.2} tps, throughput_50 {:.2} tps, " info!(
, packet_count, total_duration.as_micros(), "consumed {} packets in {}us - throughput {:.2} tps, throughput_50 {:.2} tps, ",
packet_count,
total_duration.as_micros(),
packet_count as f64 / total_duration.as_secs_f64(), packet_count as f64 / total_duration.as_secs_f64(),
packet_count2 as f64 / half_duration.as_secs_f64(), packet_count2 as f64 / half_duration.as_secs_f64(),
); );
info!("got all expected packets - shutting down tokio runtime with lite-rpc client"); info!("got all expected packets - shutting down tokio runtime with lite-rpc client");
assert_eq!(count_map.len() as u32, SAMPLE_TX_COUNT, "count_map size should be equal to SAMPLE_TX_COUNT"); assert_eq!(
count_map.len() as u32,
SAMPLE_TX_COUNT,
"count_map size should be equal to SAMPLE_TX_COUNT"
);
// note: this assumption will not hold as soon as test is configured to do fanout // note: this assumption will not hold as soon as test is configured to do fanout
assert!(count_map.values().all(|cnt| *cnt == 1), "all transactions should be unique"); assert!(
count_map.values().all(|cnt| *cnt == 1),
"all transactions should be unique"
);
runtime_literpc.shutdown_timeout(Duration::from_millis(1000)); runtime_literpc.shutdown_timeout(Duration::from_millis(1000));
}); });
// shutdown streamer // shutdown streamer
// solana_quic_streamer.shutdown().await; // solana_quic_streamer.shutdown().await;
packet_consumer_jh.join().unwrap(); packet_consumer_jh.join().unwrap();
} }
fn configure_logging() { fn configure_logging() {
@ -208,7 +228,10 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
number_of_transactions_per_unistream: 10, number_of_transactions_per_unistream: 10,
}; };
async fn start_literpc_client(streamer_listen_addrs: SocketAddr, literpc_validator_identity: Arc<Keypair>) -> anyhow::Result<()> { async fn start_literpc_client(
streamer_listen_addrs: SocketAddr,
literpc_validator_identity: Arc<Keypair>,
) -> anyhow::Result<()> {
info!("Start lite-rpc test client ..."); info!("Start lite-rpc test client ...");
let fanout_slots = 4; let fanout_slots = 4;
@ -220,31 +243,33 @@ async fn start_literpc_client(streamer_listen_addrs: SocketAddr, literpc_validat
literpc_validator_identity.as_ref(), literpc_validator_identity.as_ref(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
) )
.expect("Failed to initialize QUIC connection certificates"); .expect("Failed to initialize QUIC connection certificates");
let tpu_connection_manager = let tpu_connection_manager =
TpuConnectionManager::new(certificate, key, fanout_slots as usize).await; TpuConnectionManager::new(certificate, key, fanout_slots as usize).await;
// this effectively controls how many connections we will have // this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new(); let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
let addr1 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap(); let addr1 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert( connections_to_keep.insert(
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1, addr1,
); );
let addr2 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap(); let addr2 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert( connections_to_keep.insert(
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2, addr2,
); );
// this is the real streamer // this is the real streamer
connections_to_keep.insert( connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs);
literpc_validator_identity.pubkey(),
streamer_listen_addrs,
);
// get information about the optional validator identity stake // get information about the optional validator identity stake
// populated from get_stakes_for_identity() // populated from get_stakes_for_identity()
@ -266,15 +291,20 @@ async fn start_literpc_client(streamer_listen_addrs: SocketAddr, literpc_validat
// note: tx_store is useless in this scenario as it is never changed; it's only used to check for duplicates // note: tx_store is useless in this scenario as it is never changed; it's only used to check for duplicates
empty_tx_store().clone(), empty_tx_store().clone(),
QUIC_CONNECTION_PARAMS, QUIC_CONNECTION_PARAMS,
).await; )
.await;
// TODO this is a race // TODO this is a race
sleep(Duration::from_millis(1500)).await; sleep(Duration::from_millis(1500)).await;
for i in 0..SAMPLE_TX_COUNT { for i in 0..SAMPLE_TX_COUNT {
let raw_sample_tx = build_raw_sample_tx(i); let raw_sample_tx = build_raw_sample_tx(i);
debug!("broadcast transaction {} to {} receivers: {}", debug!(
raw_sample_tx.0, broadcast_sender.receiver_count(), format!("hi {}", i)); "broadcast transaction {} to {} receivers: {}",
raw_sample_tx.0,
broadcast_sender.receiver_count(),
format!("hi {}", i)
);
broadcast_sender.send(raw_sample_tx)?; broadcast_sender.send(raw_sample_tx)?;
} }
@ -310,7 +340,7 @@ async fn solana_quic_streamer_start() {
stats.clone(), stats.clone(),
1000, 1000,
) )
.unwrap(); .unwrap();
let addr = sock.local_addr().unwrap().ip(); let addr = sock.local_addr().unwrap().ip();
let port = sock.local_addr().unwrap().port(); let port = sock.local_addr().unwrap().port();
@ -346,7 +376,11 @@ impl SolanaQuicStreamer {
} }
impl SolanaQuicStreamer { impl SolanaQuicStreamer {
fn new_start_listening(udp_socket: UdpSocket, sender: Sender<PacketBatch>, staked_nodes: StakedNodes) -> Self { fn new_start_listening(
udp_socket: UdpSocket,
sender: Sender<PacketBatch>,
staked_nodes: StakedNodes,
) -> Self {
let staked_nodes = Arc::new(RwLock::new(staked_nodes)); let staked_nodes = Arc::new(RwLock::new(staked_nodes));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
// keypair to derive the server tls certificate // keypair to derive the server tls certificate
@ -367,7 +401,7 @@ impl SolanaQuicStreamer {
stats.clone(), stats.clone(),
1000, 1000,
) )
.unwrap(); .unwrap();
let addr = udp_socket.local_addr().unwrap().ip(); let addr = udp_socket.local_addr().unwrap().ip();
let port = udp_socket.local_addr().unwrap().port(); let port = udp_socket.local_addr().unwrap().port();
@ -384,10 +418,10 @@ impl SolanaQuicStreamer {
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr"; const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
pub fn build_raw_sample_tx(i: u32) -> (String, Vec<u8>) { pub fn build_raw_sample_tx(i: u32) -> (String, Vec<u8>) {
let payer_keypair = Keypair::from_base58_string(
let payer_keypair = Keypair::from_base58_string("rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr"); "rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr",
);
let tx = build_sample_tx(&payer_keypair, i); let tx = build_sample_tx(&payer_keypair, i);
@ -408,4 +442,3 @@ fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
let message = Message::new(&[instruction], Some(&payer.pubkey())); let message = Message::new(&[instruction], Some(&payer.pubkey()));
Transaction::new(&[payer], message, blockhash) Transaction::new(&[payer], message, blockhash)
} }