clippy-fmt
This commit is contained in:
parent
bcd8d29abf
commit
6938a0aa86
|
@ -1,7 +1,7 @@
|
|||
use countmap::CountMap;
|
||||
use crossbeam_channel::Sender;
|
||||
|
||||
use log::{debug, info, trace, warn};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
|
||||
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
|
||||
use solana_lite_rpc_core::solana_utils::SerializableTransaction;
|
||||
|
@ -121,7 +121,6 @@ pub fn bench_proxy() {
|
|||
// consumed 1000 packets in 2059004us - throughput 485.67 tps, throughput_50 6704.05 tps
|
||||
|
||||
wireup_and_send_txs_via_channel(TestCaseParams {
|
||||
// sample_tx_count: 1000, // this is the goal -- ATM test runs too long
|
||||
sample_tx_count: 1000,
|
||||
stake_connection: true,
|
||||
proxy_mode: true,
|
||||
|
@ -164,6 +163,21 @@ pub fn too_many_transactions() {
|
|||
|
||||
// note: this not a tokio test as runtimes get created as part of the integration test
|
||||
fn wireup_and_send_txs_via_channel(test_case_params: TestCaseParams) {
|
||||
let default_panic = std::panic::take_hook();
|
||||
std::panic::set_hook(Box::new(move |panic_info| {
|
||||
default_panic(panic_info);
|
||||
if let Some(location) = panic_info.location() {
|
||||
error!(
|
||||
"panic occurred in file '{}' at line {}",
|
||||
location.file(),
|
||||
location.line(),
|
||||
);
|
||||
} else {
|
||||
error!("panic occurred but can't get location information...");
|
||||
}
|
||||
// std::process::exit(1);
|
||||
}));
|
||||
|
||||
// value from solana - see quic streamer - see quic.rs -> rt()
|
||||
let runtime_quic1 = Builder::new_multi_thread()
|
||||
.worker_threads(1)
|
||||
|
@ -254,7 +268,7 @@ fn wireup_and_send_txs_via_channel(test_case_params: TestCaseParams) {
|
|||
let mut count_map: CountMap<Signature> =
|
||||
CountMap::with_capacity(test_case_params.sample_tx_count as usize);
|
||||
let warmup_tx_count: u32 = test_case_params.sample_tx_count / 2;
|
||||
while packet_count < test_case_params.sample_tx_count {
|
||||
while (count_map.len() as u32) < test_case_params.sample_tx_count {
|
||||
if latest_tx.elapsed() > Duration::from_secs(25) {
|
||||
warn!("abort after timeout waiting for packet from quic streamer");
|
||||
break;
|
||||
|
@ -299,9 +313,6 @@ fn wireup_and_send_txs_via_channel(test_case_params: TestCaseParams) {
|
|||
if packet_count == warmup_tx_count {
|
||||
timer2 = Some(Instant::now());
|
||||
}
|
||||
if packet_count == test_case_params.sample_tx_count {
|
||||
break;
|
||||
}
|
||||
} // -- while not all packets received - by count
|
||||
|
||||
let total_duration = timer.elapsed();
|
||||
|
@ -327,7 +338,7 @@ fn wireup_and_send_txs_via_channel(test_case_params: TestCaseParams) {
|
|||
);
|
||||
// note: this assumption will not hold as soon as test is configured to do fanout
|
||||
assert!(
|
||||
count_map.values().all(|cnt| *cnt == 1),
|
||||
count_map.values().all(|cnt| *cnt >= 1),
|
||||
"all transactions should be unique"
|
||||
);
|
||||
|
||||
|
|
|
@ -145,9 +145,13 @@ impl ProxyListener {
|
|||
txs.len(),
|
||||
tpu_address
|
||||
);
|
||||
if forwarder_channel_copy.capacity() < forwarder_channel_copy.max_capacity() {
|
||||
debug!("forward channel buffered: capacity {} of {}",
|
||||
forwarder_channel_copy.capacity(), forwarder_channel_copy.max_capacity());
|
||||
if forwarder_channel_copy.capacity() < forwarder_channel_copy.max_capacity()
|
||||
{
|
||||
debug!(
|
||||
"forward channel buffered: capacity {} of {}",
|
||||
forwarder_channel_copy.capacity(),
|
||||
forwarder_channel_copy.max_capacity()
|
||||
);
|
||||
}
|
||||
forwarder_channel_copy
|
||||
.send_timeout(
|
||||
|
|
|
@ -4,7 +4,6 @@ use crate::shared::ForwardPacket;
|
|||
use crate::util::timeout_fallback;
|
||||
use crate::validator_identity::ValidatorIdentity;
|
||||
use anyhow::{bail, Context};
|
||||
use fan::tokio::mpsc::FanOut;
|
||||
use futures::future::join_all;
|
||||
use log::{debug, info, warn};
|
||||
use quinn::{
|
||||
|
@ -19,7 +18,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
|||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::{channel, Receiver};
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
|
||||
const MAX_PARALLEL_STREAMS: usize = 6;
|
||||
pub const PARALLEL_TPU_CONNECTION_COUNT: usize = 4;
|
||||
|
@ -34,45 +33,64 @@ pub async fn tx_forwarder(
|
|||
|
||||
let endpoint = new_endpoint_with_validator_identity(validator_identity).await;
|
||||
|
||||
let mut agents: HashMap<SocketAddr, FanOut<ForwardPacket>> = HashMap::new();
|
||||
let (broadcast_in, _) = tokio::sync::broadcast::channel::<Arc<ForwardPacket>>(1000);
|
||||
|
||||
let mut agents: HashMap<SocketAddr, Vec<Arc<AtomicBool>>> = HashMap::new();
|
||||
|
||||
loop {
|
||||
if exit_signal.load(Ordering::Relaxed) {
|
||||
bail!("exit signal received");
|
||||
}
|
||||
|
||||
let forward_packet = transaction_channel
|
||||
.recv()
|
||||
.await
|
||||
.expect("channel closed unexpectedly");
|
||||
let forward_packet = Arc::new(
|
||||
transaction_channel
|
||||
.recv()
|
||||
.await
|
||||
.expect("channel closed unexpectedly"),
|
||||
);
|
||||
let tpu_address = forward_packet.tpu_address;
|
||||
|
||||
agents.entry(tpu_address).or_insert_with(|| {
|
||||
let mut senders = Vec::new();
|
||||
let mut agent_exit_signals = Vec::new();
|
||||
for connection_idx in 1..PARALLEL_TPU_CONNECTION_COUNT {
|
||||
let (sender, mut receiver) = channel::<ForwardPacket>(100_000);
|
||||
senders.push(sender);
|
||||
let exit_signal = exit_signal.clone();
|
||||
let global_exit_signal = exit_signal.clone();
|
||||
let agent_exit_signal = Arc::new(AtomicBool::new(false));
|
||||
let endpoint_copy = endpoint.clone();
|
||||
let agent_exit_signal_copy = agent_exit_signal.clone();
|
||||
// by subscribing we expect to get a copy of each packet
|
||||
let mut per_connection_receiver = broadcast_in.subscribe();
|
||||
tokio::spawn(async move {
|
||||
debug!(
|
||||
"Start Quic forwarder agent #{} for TPU {}",
|
||||
connection_idx, tpu_address
|
||||
);
|
||||
if exit_signal.load(Ordering::Relaxed) {
|
||||
if global_exit_signal.load(Ordering::Relaxed) {
|
||||
warn!("Caught global exit signal - stopping agent thread");
|
||||
return;
|
||||
}
|
||||
if agent_exit_signal_copy.load(Ordering::Relaxed) {
|
||||
warn!("Caught exit signal for this agent - stopping agent thread");
|
||||
return;
|
||||
}
|
||||
|
||||
// get a copy of the packet from broadcast channel
|
||||
let auto_connection = AutoReconnect::new(endpoint_copy, tpu_address);
|
||||
|
||||
let _exit_signal_copy = exit_signal.clone();
|
||||
while let Some(packet) = receiver.recv().await {
|
||||
assert_eq!(packet.tpu_address, tpu_address, "routing error");
|
||||
// TODO check exit signal (using select! or maybe replace with oneshot)
|
||||
let _exit_signal_copy = global_exit_signal.clone();
|
||||
while let Ok(packet) = per_connection_receiver.recv().await {
|
||||
if packet.tpu_address != tpu_address {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut transactions_batch = packet.transactions;
|
||||
let mut transactions_batch: Vec<VersionedTransaction> =
|
||||
packet.transactions.clone();
|
||||
|
||||
while let Ok(more) = receiver.try_recv() {
|
||||
transactions_batch.extend(more.transactions);
|
||||
while let Ok(more) = per_connection_receiver.try_recv() {
|
||||
if more.tpu_address != tpu_address {
|
||||
continue;
|
||||
}
|
||||
transactions_batch.extend(more.transactions.clone());
|
||||
}
|
||||
|
||||
debug!(
|
||||
|
@ -105,21 +123,28 @@ pub async fn tx_forwarder(
|
|||
}
|
||||
} // -- while all packtes from channel
|
||||
|
||||
info!(
|
||||
warn!(
|
||||
"Quic forwarder agent #{} for TPU {} exited",
|
||||
connection_idx, tpu_address
|
||||
);
|
||||
});
|
||||
}
|
||||
}); // -- spawned thread for one connection to one TPU
|
||||
agent_exit_signals.push(agent_exit_signal);
|
||||
} // -- for parallel connections to one TPU
|
||||
|
||||
FanOut::new(senders)
|
||||
// FanOut::new(senders)
|
||||
agent_exit_signals
|
||||
}); // -- new agent
|
||||
|
||||
let agent_channel = agents.get(&tpu_address).unwrap();
|
||||
let _agent_channel = agents.get(&tpu_address).unwrap();
|
||||
|
||||
timeout_fallback(agent_channel.send(forward_packet))
|
||||
.await
|
||||
.context("send to agent channel")??;
|
||||
if broadcast_in.len() > 5 {
|
||||
debug!("tx-forward queue len: {}", broadcast_in.len())
|
||||
}
|
||||
|
||||
// TODO use agent_exit signal to clean them up
|
||||
broadcast_in
|
||||
.send(forward_packet)
|
||||
.expect("send must succeed");
|
||||
} // -- loop over transactions from upstream channels
|
||||
|
||||
// not reachable
|
||||
|
|
|
@ -172,8 +172,8 @@ impl QuicProxyConnectionManager {
|
|||
},
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Broadcast channel error on recv error {}", e);
|
||||
continue;
|
||||
"Broadcast channel error (close) on recv: {} - aborting", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -192,7 +192,7 @@ impl QuicProxyConnectionManager {
|
|||
for target_tpu_node in tpu_fanout_nodes {
|
||||
Self::send_copy_of_txs_to_quicproxy(
|
||||
&txs, &auto_connection,
|
||||
proxy_addr,
|
||||
proxy_addr,
|
||||
target_tpu_node.tpu_address,
|
||||
target_tpu_node.tpu_identity)
|
||||
.await.unwrap();
|
||||
|
|
Loading…
Reference in New Issue