use fanout

This commit is contained in:
GroovieGermanikus 2023-07-28 22:23:57 +02:00
parent e8f2befbee
commit 385d94bc02
5 changed files with 66 additions and 54 deletions

View File

@ -41,4 +41,6 @@ chrono = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
rcgen = "0.9.3"
spl-memo = "3.0.1"
# tokio channel fanout
fan = "0.1.3"

View File

@ -10,6 +10,8 @@ use std::time::Duration;
use tracing::{debug_span, instrument, Instrument, span};
use anyhow::{anyhow, bail, Context, Error};
use dashmap::DashMap;
use fan::tokio::mpsc::FanOut;
use futures::sink::Fanout;
use itertools::{any, Itertools};
use log::{debug, error, info, trace, warn};
use quinn::{Connecting, Connection, ConnectionError, Endpoint, SendStream, ServerConfig, TransportConfig, VarInt};
@ -201,7 +203,7 @@ async fn accept_client_connection(client_connection: Connection, forwarder_chann
async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: Receiver<ForwardPacket>, exit_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
info!("TPU Quic forwarder started");
let mut agents: HashMap<SocketAddr, Sender<ForwardPacket>> = HashMap::new();
let mut agents: HashMap<SocketAddr, FanOut<ForwardPacket>> = HashMap::new();
let tpu_quic_client_copy = tpu_quic_client.clone();
loop {
@ -211,53 +213,65 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R
let tpu_address = forward_packet.tpu_address;
if !agents.contains_key(&tpu_address) {
let (sender, mut receiver) = channel::<ForwardPacket>(10000);
// TODO cleanup agent after a while of iactivity
agents.insert(tpu_address, sender);
let tpu_quic_client_copy = tpu_quic_client.clone();
let exit_signal = exit_signal.clone();
tokio::spawn(async move {
debug!("Start Quic forwarder agent for TPU {}", tpu_address);
// TODO pass+check the tpu_address
// TODO connect
// TODO consume queue
// TODO exit signal
let mut senders = Vec::new();
for i in 0..4 {
let (sender, mut receiver) = channel::<ForwardPacket>(100000);
senders.push(sender);
let endpoint = tpu_quic_client.get_endpoint().clone();
let exit_signal = exit_signal.clone();
tokio::spawn(async move {
debug!("Start Quic forwarder agent for TPU {}", tpu_address);
// TODO pass+check the tpu_address
// TODO connect
// TODO consume queue
// TODO exit signal
let auto_connection = AutoReconnect::new(tpu_quic_client_copy.get_endpoint(), tpu_address);
// let mut connection = tpu_quic_client_copy.create_connection(tpu_address).await.expect("handshake");
loop {
let exit_signal = exit_signal.clone();
let auto_connection = AutoReconnect::new(endpoint, tpu_address);
// let mut connection = tpu_quic_client_copy.create_connection(tpu_address).await.expect("handshake");
loop {
let packet = receiver.recv().await.unwrap();
assert_eq!(packet.tpu_address, tpu_address, "routing error");
let mut transactions_batch = packet.transactions;
let exit_signal = exit_signal.clone();
loop {
let packet = receiver.recv().await.unwrap();
assert_eq!(packet.tpu_address, tpu_address, "routing error");
let mut batch_size = 1;
while let Ok(more) = receiver.try_recv() {
transactions_batch.extend(more.transactions);
batch_size += 1;
}
if batch_size > 1 {
debug!("encountered batch of size {}", batch_size);
}
let mut transactions_batch = packet.transactions;
debug!("forwarding transaction batch of size {} to address {}", transactions_batch.len(), packet.tpu_address);
let mut batch_size = 1;
while let Ok(more) = receiver.try_recv() {
transactions_batch.extend(more.transactions);
batch_size += 1;
}
if batch_size > 1 {
debug!("encountered batch of size {}", batch_size);
}
// TODo move send_txs_to_tpu_static to tpu_quic_client
let result = timeout(Duration::from_millis(500),
send_txs_to_tpu_static(&auto_connection, &transactions_batch)).await;
debug!("forwarding transaction batch of size {} to address {}", transactions_batch.len(), packet.tpu_address);
// TODo move send_txs_to_tpu_static to tpu_quic_client
let result = timeout(Duration::from_millis(500),
send_txs_to_tpu_static(&auto_connection, &transactions_batch)).await;
// .expect("timeout sending data to TPU node");
debug!("send_txs_to_tpu_static result {:?} - loop over errors", result);
if result.is_err() {
warn!("send_txs_to_tpu_static result {:?} - loop over errors", result);
} else {
debug!("send_txs_to_tpu_static sent {}", transactions_batch.len());
}
}
}
}
});
});
}
let fanout = FanOut::new(senders);
agents.insert(tpu_address, fanout);
} // -- new agent

View File

@ -151,7 +151,7 @@ impl TpuQuicClient {
/// takes a validator identity and creates a new QUIC client; appears as staked peer to TPU
// note: ATM the provided identity might or might not be a valid validator keypair
pub async fn new_with_validator_identity(validator_identity: &Keypair) -> TpuQuicClient {
info!("Setup TPU Quic stable connection ...");
info!("Setup TPU Quic stable connection with validator identity {} ...", bs58::encode(validator_identity.pubkey()).into_string());
let (certificate, key) = new_self_signed_tls_certificate(
validator_identity,
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),

View File

@ -40,7 +40,8 @@ pub struct QuicProxyConnectionManager {
current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>>
}
const PARALLEL_STREAMS_TO_PROXY: usize = 4;
// TODO consolidate with number_of_transactions_per_unistream
const CHUNK_SIZE_PER_STREAM: usize = 20;
impl QuicProxyConnectionManager {
pub async fn new(
@ -242,7 +243,7 @@ impl QuicProxyConnectionManager {
}
for chunk in txs.chunks(PARALLEL_STREAMS_TO_PROXY) {
for chunk in txs.chunks(CHUNK_SIZE_PER_STREAM) {
let forwarding_request = TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, chunk.into());
debug!("forwarding_request: {}", forwarding_request);

View File

@ -2,6 +2,7 @@ use std::cell::RefCell;
use std::fmt;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use log::{trace, warn};
use tracing::{debug, info};
use quinn::{Connection, Endpoint};
use tokio::sync::{RwLock, RwLockWriteGuard};
@ -53,46 +54,40 @@ impl AutoReconnect {
if maybe_conn.as_ref().filter(|conn| conn.close_reason().is_none()).is_some() {
// let reuse = lock.unwrap().clone();
let reuse = maybe_conn.as_ref().unwrap();
debug!("Reuse connection {}", reuse.stable_id());
trace!("Reuse connection {}", reuse.stable_id());
return reuse.clone();
}
}
let mut lock = self.current.write().await;
match &*lock {
Some(current) => {
if current.close_reason().is_some() {
info!("Connection is closed for reason: {:?}", current.close_reason());
// TODO log
warn!("Connection i s closed for reason: {:?}", current.close_reason());
let new_connection = self.create_connection().await;
let prev_stable_id = current.stable_id();
*lock = Some(new_connection.clone());
// let old_conn = lock.replace(new_connection.clone());
self.reconnect_count.fetch_add(1, Ordering::SeqCst);
// debug!("Replace closed connection {} with {} (retry {})",
// old_conn.map(|c| c.stable_id().to_string()).unwrap_or("none".to_string()),
// new_connection.stable_id(),
// self.reconnect_count.load(Ordering::SeqCst));
debug!("Replace closed connection {} with {} (retry {})",
prev_stable_id,
new_connection.stable_id(),
self.reconnect_count.load(Ordering::SeqCst));
// TODO log old vs new stable_id
return new_connection.clone();
} else {
debug!("Reuse connection {} with write-lock", current.stable_id());
// TODO check log if that ever happens
warn!("Reuse connection {} with write-lock", current.stable_id());
return current.clone();
}
}
None => {
let new_connection = self.create_connection().await;
// let old_conn = lock.replace(new_connection.clone());
// assert!(old_conn.is_none(), "old connection should be None");
*lock = Some(new_connection.clone());
// let old_conn = foo.replace(Some(new_connection.clone()));
// TODO log old vs new stable_id
debug!("Create initial connection {}", new_connection.stable_id());
trace!("Create initial connection {}", new_connection.stable_id());
return new_connection.clone();
}