Compare commits

...

2 Commits

Author SHA1 Message Date
Christian Kamm 0681037d79
experiment: reduce number of source endpoints to 1 (#386)
Prediction is that this has no negative effect but reduces memory use a
lot.
2024-04-03 17:38:42 +02:00
galactus 9d776ed084
wip: Make connections also to the tpu_forwards port (#385)
* wip: Make connections also to the tpu_forwards port

* Making using of tpu forwards customable

---------

Co-authored-by: Christian Kamm <mail@ckamm.de>
2024-04-03 15:23:04 +02:00
7 changed files with 68 additions and 33 deletions

View File

@ -360,6 +360,7 @@ async fn main() -> anyhow::Result<()> {
number_of_transactions_per_unistream: 1, number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 5, unistreams_to_create_new_connection_in_percentage: 5,
prioritization_heap_size, prioritization_heap_size,
enable_tpu_forwarding: None,
}, },
tpu_connection_path: TpuConnectionPath::QuicDirectPath, tpu_connection_path: TpuConnectionPath::QuicDirectPath,
}; };

View File

@ -393,5 +393,9 @@ fn quic_params_from_environment() -> Option<QuicConnectionParameters> {
quic_connection_parameters.unistreams_to_create_new_connection_in_percentage, quic_connection_parameters.unistreams_to_create_new_connection_in_percentage,
); );
quic_connection_parameters.enable_tpu_forwarding = env::var("ENABLE_TPU_FORWARDING")
.map(|value| Some(value.parse::<bool>().unwrap()))
.unwrap_or(quic_connection_parameters.enable_tpu_forwarding);
Some(quic_connection_parameters) Some(quic_connection_parameters)
} }

View File

@ -62,6 +62,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
number_of_transactions_per_unistream: 10, number_of_transactions_per_unistream: 10,
unistreams_to_create_new_connection_in_percentage: 10, unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None, prioritization_heap_size: None,
enable_tpu_forwarding: None,
}; };
#[test] #[test]
@ -475,27 +476,27 @@ async fn start_literpc_client_direct_mode(
TpuConnectionManager::new(certificate, key, fanout_slots as usize).await; TpuConnectionManager::new(certificate, key, fanout_slots as usize).await;
// this effectively controls how many connections we will have // this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new(); let mut connections_to_keep = HashSet::<(Pubkey, SocketAddr)>::new();
let addr1 = UdpSocket::bind("127.0.0.1:0") let addr1 = UdpSocket::bind("127.0.0.1:0")
.unwrap() .unwrap()
.local_addr() .local_addr()
.unwrap(); .unwrap();
connections_to_keep.insert( connections_to_keep.insert((
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1, addr1,
); ));
let addr2 = UdpSocket::bind("127.0.0.1:0") let addr2 = UdpSocket::bind("127.0.0.1:0")
.unwrap() .unwrap()
.local_addr() .local_addr()
.unwrap(); .unwrap();
connections_to_keep.insert( connections_to_keep.insert((
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2, addr2,
); ));
// this is the real streamer // this is the real streamer
connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs); connections_to_keep.insert((literpc_validator_identity.pubkey(), streamer_listen_addrs));
// get information about the optional validator identity stake // get information about the optional validator identity stake
// populated from get_stakes_for_identity() // populated from get_stakes_for_identity()
@ -575,27 +576,27 @@ async fn start_literpc_client_proxy_mode(
QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await; QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await;
// this effectively controls how many connections we will have // this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new(); let mut connections_to_keep: HashSet<(Pubkey, SocketAddr)> = HashSet::new();
let addr1 = UdpSocket::bind("127.0.0.1:0") let addr1 = UdpSocket::bind("127.0.0.1:0")
.unwrap() .unwrap()
.local_addr() .local_addr()
.unwrap(); .unwrap();
connections_to_keep.insert( connections_to_keep.insert((
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1, addr1,
); ));
let addr2 = UdpSocket::bind("127.0.0.1:0") let addr2 = UdpSocket::bind("127.0.0.1:0")
.unwrap() .unwrap()
.local_addr() .local_addr()
.unwrap(); .unwrap();
connections_to_keep.insert( connections_to_keep.insert((
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2, addr2,
); ));
// this is the real streamer // this is the real streamer
connections_to_keep.insert(validator_identity.pubkey(), streamer_listen_addrs); connections_to_keep.insert((validator_identity.pubkey(), streamer_listen_addrs));
// get information about the optional validator identity stake // get information about the optional validator identity stake
// populated from get_stakes_for_identity() // populated from get_stakes_for_identity()

View File

@ -109,6 +109,7 @@ pub struct QuicConnectionParameters {
pub number_of_transactions_per_unistream: usize, pub number_of_transactions_per_unistream: usize,
pub unistreams_to_create_new_connection_in_percentage: u8, pub unistreams_to_create_new_connection_in_percentage: u8,
pub prioritization_heap_size: Option<usize>, pub prioritization_heap_size: Option<usize>,
pub enable_tpu_forwarding: Option<bool>,
} }
impl Default for QuicConnectionParameters { impl Default for QuicConnectionParameters {
@ -123,6 +124,7 @@ impl Default for QuicConnectionParameters {
number_of_transactions_per_unistream: 1, number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 10, unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None, prioritization_heap_size: None,
enable_tpu_forwarding: None,
} }
} }
} }

View File

@ -1,4 +1,4 @@
use std::collections::HashMap; use std::collections::HashSet;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::Ordering::Relaxed;
@ -64,7 +64,7 @@ impl QuicProxyConnectionManager {
&self, &self,
broadcast_receiver: Receiver<SentTransactionInfo>, broadcast_receiver: Receiver<SentTransactionInfo>,
// for duration of this slot these tpu nodes will receive the transactions // for duration of this slot these tpu nodes will receive the transactions
connections_to_keep: HashMap<Pubkey, SocketAddr>, connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
connection_parameters: QuicConnectionParameters, connection_parameters: QuicConnectionParameters,
) { ) {
debug!( debug!(

View File

@ -13,7 +13,7 @@ use solana_lite_rpc_core::{
}; };
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::{ use tokio::sync::{
broadcast::{self, Receiver, Sender}, broadcast::{self, Receiver, Sender},
Notify, Notify,
@ -242,35 +242,36 @@ impl ActiveConnection {
pub struct TpuConnectionManager { pub struct TpuConnectionManager {
endpoints: RotatingQueue<Endpoint>, endpoints: RotatingQueue<Endpoint>,
identity_to_active_connection: Arc<DashMap<Pubkey, ActiveConnection>>, active_connections: Arc<DashMap<(Pubkey, SocketAddr), ActiveConnection>>,
} }
impl TpuConnectionManager { impl TpuConnectionManager {
pub async fn new( pub async fn new(
certificate: rustls::Certificate, certificate: rustls::Certificate,
key: rustls::PrivateKey, key: rustls::PrivateKey,
fanout: usize, _fanout: usize,
) -> Self { ) -> Self {
let number_of_clients = fanout * 4; let number_of_clients = 1; // fanout * 4;
Self { Self {
endpoints: RotatingQueue::new(number_of_clients, || { endpoints: RotatingQueue::new(number_of_clients, || {
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()) QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
}), }),
identity_to_active_connection: Arc::new(DashMap::new()), active_connections: Arc::new(DashMap::new()),
} }
} }
pub async fn update_connections( pub async fn update_connections(
&self, &self,
broadcast_sender: Arc<Sender<SentTransactionInfo>>, broadcast_sender: Arc<Sender<SentTransactionInfo>>,
connections_to_keep: HashMap<Pubkey, SocketAddr>, connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
identity_stakes: IdentityStakesData, identity_stakes: IdentityStakesData,
data_cache: DataCache, data_cache: DataCache,
connection_parameters: QuicConnectionParameters, connection_parameters: QuicConnectionParameters,
) { ) {
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64); NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
for (identity, socket_addr) in &connections_to_keep { for (identity, socket_addr) in &connections_to_keep {
if self.identity_to_active_connection.get(identity).is_none() { let connection_key = (*identity, *socket_addr);
if self.active_connections.get(&connection_key).is_none() {
trace!("added a connection for {}, {}", identity, socket_addr); trace!("added a connection for {}, {}", identity, socket_addr);
let active_connection = ActiveConnection::new( let active_connection = ActiveConnection::new(
self.endpoints.clone(), self.endpoints.clone(),
@ -282,15 +283,15 @@ impl TpuConnectionManager {
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever // using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let broadcast_receiver = broadcast_sender.subscribe(); let broadcast_receiver = broadcast_sender.subscribe();
active_connection.start_listening(broadcast_receiver, identity_stakes); active_connection.start_listening(broadcast_receiver, identity_stakes);
self.identity_to_active_connection self.active_connections
.insert(*identity, active_connection); .insert((*identity, *socket_addr), active_connection);
} }
} }
// remove connections which are no longer needed // remove connections which are no longer needed
self.identity_to_active_connection.retain(|key, value| { self.active_connections.retain(|key, value| {
if !connections_to_keep.contains_key(key) { if !connections_to_keep.contains(key) {
trace!("removing a connection for {}", key.to_string()); trace!("removing a connection for {} {}", key.0, key.1);
// ignore error for exit channel // ignore error for exit channel
let _ = value.exit_notifier.send(()); let _ = value.exit_notifier.send(());
false false

View File

@ -1,5 +1,6 @@
use anyhow::Context; use anyhow::Context;
use prometheus::{core::GenericGauge, opts, register_int_gauge}; use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_streamer::nonblocking::quic::ConnectionPeerType;
use super::tpu_connection_manager::TpuConnectionManager; use super::tpu_connection_manager::TpuConnectionManager;
use crate::quic_connection_utils::QuicConnectionParameters; use crate::quic_connection_utils::QuicConnectionParameters;
@ -15,7 +16,7 @@ use solana_lite_rpc_core::types::SlotStream;
use solana_lite_rpc_core::AnyhowJoinHandle; use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot}; use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap; use std::collections::HashSet;
use std::{ use std::{
net::{IpAddr, Ipv4Addr}, net::{IpAddr, Ipv4Addr},
sync::Arc, sync::Arc,
@ -128,8 +129,21 @@ impl TpuService {
.leader_schedule .leader_schedule
.get_slot_leaders(current_slot, last_slot) .get_slot_leaders(current_slot, last_slot)
.await?; .await?;
let identity_stakes = self.data_cache.identity_stakes.get_stakes().await;
let enable_tpu_forwards = {
match identity_stakes.peer_type {
ConnectionPeerType::Unstaked => false,
ConnectionPeerType::Staked => self
.config
.quic_connection_params
.enable_tpu_forwarding
.unwrap_or_default(),
}
};
// get next leader with its tpu port // get next leader with its tpu port
let connections_to_keep: HashMap<_, _> = next_leaders let connections_to_keep: HashSet<_, _> = next_leaders
.iter() .iter()
.map(|x| { .map(|x| {
let contact_info = cluster_nodes.get(&x.pubkey); let contact_info = cluster_nodes.get(&x.pubkey);
@ -140,11 +154,23 @@ impl TpuService {
(x.pubkey, tpu_port) (x.pubkey, tpu_port)
}) })
.filter(|x| x.1.is_some()) .filter(|x| x.1.is_some())
.map(|x| { .flat_map(|x| {
let mut addr = x.1.unwrap(); let mut addresses = vec![];
let mut tpu_addr = x.1.unwrap();
// add quic port offset // add quic port offset
addr.set_port(addr.port() + QUIC_PORT_OFFSET); tpu_addr.set_port(tpu_addr.port() + QUIC_PORT_OFFSET);
(x.0, addr)
addresses.push((x.0, tpu_addr));
if enable_tpu_forwards {
// Technically the forwards port could be anywhere and unfortunately getClusterNodes
// does not report it. However it's nearly always directly after the tpu port.
let mut tpu_forwards_addr = tpu_addr;
tpu_forwards_addr.set_port(tpu_addr.port() + 1);
addresses.push((x.0, tpu_forwards_addr));
}
addresses
}) })
.collect(); .collect();
@ -156,7 +182,7 @@ impl TpuService {
.update_connections( .update_connections(
self.broadcast_sender.clone(), self.broadcast_sender.clone(),
connections_to_keep, connections_to_keep,
self.data_cache.identity_stakes.get_stakes().await, identity_stakes,
self.data_cache.clone(), self.data_cache.clone(),
self.config.quic_connection_params, self.config.quic_connection_params,
) )