Compare commits

...

2 Commits

Author SHA1 Message Date
Christian Kamm 0681037d79
experiment: reduce number of source endpoints to 1 (#386)
Prediction is that this has no negative effect but reduces memory use a
lot.
2024-04-03 17:38:42 +02:00
galactus 9d776ed084
wip: Make connections also to the tpu_forwards port (#385)
* wip: Make connections also to the tpu_forwards port

* Making using of tpu forwards customable

---------

Co-authored-by: Christian Kamm <mail@ckamm.de>
2024-04-03 15:23:04 +02:00
7 changed files with 68 additions and 33 deletions

View File

@ -360,6 +360,7 @@ async fn main() -> anyhow::Result<()> {
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 5,
prioritization_heap_size,
enable_tpu_forwarding: None,
},
tpu_connection_path: TpuConnectionPath::QuicDirectPath,
};

View File

@ -393,5 +393,9 @@ fn quic_params_from_environment() -> Option<QuicConnectionParameters> {
quic_connection_parameters.unistreams_to_create_new_connection_in_percentage,
);
quic_connection_parameters.enable_tpu_forwarding = env::var("ENABLE_TPU_FORWARDING")
.map(|value| Some(value.parse::<bool>().unwrap()))
.unwrap_or(quic_connection_parameters.enable_tpu_forwarding);
Some(quic_connection_parameters)
}

View File

@ -62,6 +62,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
number_of_transactions_per_unistream: 10,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
enable_tpu_forwarding: None,
};
#[test]
@ -475,27 +476,27 @@ async fn start_literpc_client_direct_mode(
TpuConnectionManager::new(certificate, key, fanout_slots as usize).await;
// this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
let mut connections_to_keep = HashSet::<(Pubkey, SocketAddr)>::new();
let addr1 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1,
);
));
let addr2 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2,
);
));
// this is the real streamer
connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs);
connections_to_keep.insert((literpc_validator_identity.pubkey(), streamer_listen_addrs));
// get information about the optional validator identity stake
// populated from get_stakes_for_identity()
@ -575,27 +576,27 @@ async fn start_literpc_client_proxy_mode(
QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await;
// this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
let mut connections_to_keep: HashSet<(Pubkey, SocketAddr)> = HashSet::new();
let addr1 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1,
);
));
let addr2 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2,
);
));
// this is the real streamer
connections_to_keep.insert(validator_identity.pubkey(), streamer_listen_addrs);
connections_to_keep.insert((validator_identity.pubkey(), streamer_listen_addrs));
// get information about the optional validator identity stake
// populated from get_stakes_for_identity()

View File

@ -109,6 +109,7 @@ pub struct QuicConnectionParameters {
pub number_of_transactions_per_unistream: usize,
pub unistreams_to_create_new_connection_in_percentage: u8,
pub prioritization_heap_size: Option<usize>,
pub enable_tpu_forwarding: Option<bool>,
}
impl Default for QuicConnectionParameters {
@ -123,6 +124,7 @@ impl Default for QuicConnectionParameters {
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
enable_tpu_forwarding: None,
}
}
}

View File

@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
@ -64,7 +64,7 @@ impl QuicProxyConnectionManager {
&self,
broadcast_receiver: Receiver<SentTransactionInfo>,
// for duration of this slot these tpu nodes will receive the transactions
connections_to_keep: HashMap<Pubkey, SocketAddr>,
connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
connection_parameters: QuicConnectionParameters,
) {
debug!(

View File

@ -13,7 +13,7 @@ use solana_lite_rpc_core::{
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::{
broadcast::{self, Receiver, Sender},
Notify,
@ -242,35 +242,36 @@ impl ActiveConnection {
pub struct TpuConnectionManager {
endpoints: RotatingQueue<Endpoint>,
identity_to_active_connection: Arc<DashMap<Pubkey, ActiveConnection>>,
active_connections: Arc<DashMap<(Pubkey, SocketAddr), ActiveConnection>>,
}
impl TpuConnectionManager {
pub async fn new(
certificate: rustls::Certificate,
key: rustls::PrivateKey,
fanout: usize,
_fanout: usize,
) -> Self {
let number_of_clients = fanout * 4;
let number_of_clients = 1; // fanout * 4;
Self {
endpoints: RotatingQueue::new(number_of_clients, || {
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
}),
identity_to_active_connection: Arc::new(DashMap::new()),
active_connections: Arc::new(DashMap::new()),
}
}
pub async fn update_connections(
&self,
broadcast_sender: Arc<Sender<SentTransactionInfo>>,
connections_to_keep: HashMap<Pubkey, SocketAddr>,
connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
identity_stakes: IdentityStakesData,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
) {
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
for (identity, socket_addr) in &connections_to_keep {
if self.identity_to_active_connection.get(identity).is_none() {
let connection_key = (*identity, *socket_addr);
if self.active_connections.get(&connection_key).is_none() {
trace!("added a connection for {}, {}", identity, socket_addr);
let active_connection = ActiveConnection::new(
self.endpoints.clone(),
@ -282,15 +283,15 @@ impl TpuConnectionManager {
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let broadcast_receiver = broadcast_sender.subscribe();
active_connection.start_listening(broadcast_receiver, identity_stakes);
self.identity_to_active_connection
.insert(*identity, active_connection);
self.active_connections
.insert((*identity, *socket_addr), active_connection);
}
}
// remove connections which are no longer needed
self.identity_to_active_connection.retain(|key, value| {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
self.active_connections.retain(|key, value| {
if !connections_to_keep.contains(key) {
trace!("removing a connection for {} {}", key.0, key.1);
// ignore error for exit channel
let _ = value.exit_notifier.send(());
false

View File

@ -1,5 +1,6 @@
use anyhow::Context;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_streamer::nonblocking::quic::ConnectionPeerType;
use super::tpu_connection_manager::TpuConnectionManager;
use crate::quic_connection_utils::QuicConnectionParameters;
@ -15,7 +16,7 @@ use solana_lite_rpc_core::types::SlotStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap;
use std::collections::HashSet;
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
@ -128,8 +129,21 @@ impl TpuService {
.leader_schedule
.get_slot_leaders(current_slot, last_slot)
.await?;
let identity_stakes = self.data_cache.identity_stakes.get_stakes().await;
let enable_tpu_forwards = {
match identity_stakes.peer_type {
ConnectionPeerType::Unstaked => false,
ConnectionPeerType::Staked => self
.config
.quic_connection_params
.enable_tpu_forwarding
.unwrap_or_default(),
}
};
// get next leader with its tpu port
let connections_to_keep: HashMap<_, _> = next_leaders
let connections_to_keep: HashSet<_, _> = next_leaders
.iter()
.map(|x| {
let contact_info = cluster_nodes.get(&x.pubkey);
@ -140,11 +154,23 @@ impl TpuService {
(x.pubkey, tpu_port)
})
.filter(|x| x.1.is_some())
.map(|x| {
let mut addr = x.1.unwrap();
.flat_map(|x| {
let mut addresses = vec![];
let mut tpu_addr = x.1.unwrap();
// add quic port offset
addr.set_port(addr.port() + QUIC_PORT_OFFSET);
(x.0, addr)
tpu_addr.set_port(tpu_addr.port() + QUIC_PORT_OFFSET);
addresses.push((x.0, tpu_addr));
if enable_tpu_forwards {
// Technically the forwards port could be anywhere and unfortunately getClusterNodes
// does not report it. However it's nearly always directly after the tpu port.
let mut tpu_forwards_addr = tpu_addr;
tpu_forwards_addr.set_port(tpu_addr.port() + 1);
addresses.push((x.0, tpu_forwards_addr));
}
addresses
})
.collect();
@ -156,7 +182,7 @@ impl TpuService {
.update_connections(
self.broadcast_sender.clone(),
connections_to_keep,
self.data_cache.identity_stakes.get_stakes().await,
identity_stakes,
self.data_cache.clone(),
self.config.quic_connection_params,
)