wip: Make connections also to the tpu_forwards port (#385)
* wip: Make connections also to the tpu_forwards port * Making using of tpu forwards customable --------- Co-authored-by: Christian Kamm <mail@ckamm.de>
This commit is contained in:
parent
b84e880961
commit
9d776ed084
|
@ -360,6 +360,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
number_of_transactions_per_unistream: 1,
|
||||
unistreams_to_create_new_connection_in_percentage: 5,
|
||||
prioritization_heap_size,
|
||||
enable_tpu_forwarding: None,
|
||||
},
|
||||
tpu_connection_path: TpuConnectionPath::QuicDirectPath,
|
||||
};
|
||||
|
|
|
@ -393,5 +393,9 @@ fn quic_params_from_environment() -> Option<QuicConnectionParameters> {
|
|||
quic_connection_parameters.unistreams_to_create_new_connection_in_percentage,
|
||||
);
|
||||
|
||||
quic_connection_parameters.enable_tpu_forwarding = env::var("ENABLE_TPU_FORWARDING")
|
||||
.map(|value| Some(value.parse::<bool>().unwrap()))
|
||||
.unwrap_or(quic_connection_parameters.enable_tpu_forwarding);
|
||||
|
||||
Some(quic_connection_parameters)
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
|
|||
number_of_transactions_per_unistream: 10,
|
||||
unistreams_to_create_new_connection_in_percentage: 10,
|
||||
prioritization_heap_size: None,
|
||||
enable_tpu_forwarding: None,
|
||||
};
|
||||
|
||||
#[test]
|
||||
|
@ -475,27 +476,27 @@ async fn start_literpc_client_direct_mode(
|
|||
TpuConnectionManager::new(certificate, key, fanout_slots as usize).await;
|
||||
|
||||
// this effectively controls how many connections we will have
|
||||
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
|
||||
let mut connections_to_keep = HashSet::<(Pubkey, SocketAddr)>::new();
|
||||
let addr1 = UdpSocket::bind("127.0.0.1:0")
|
||||
.unwrap()
|
||||
.local_addr()
|
||||
.unwrap();
|
||||
connections_to_keep.insert(
|
||||
connections_to_keep.insert((
|
||||
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
|
||||
addr1,
|
||||
);
|
||||
));
|
||||
|
||||
let addr2 = UdpSocket::bind("127.0.0.1:0")
|
||||
.unwrap()
|
||||
.local_addr()
|
||||
.unwrap();
|
||||
connections_to_keep.insert(
|
||||
connections_to_keep.insert((
|
||||
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
|
||||
addr2,
|
||||
);
|
||||
));
|
||||
|
||||
// this is the real streamer
|
||||
connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs);
|
||||
connections_to_keep.insert((literpc_validator_identity.pubkey(), streamer_listen_addrs));
|
||||
|
||||
// get information about the optional validator identity stake
|
||||
// populated from get_stakes_for_identity()
|
||||
|
@ -575,27 +576,27 @@ async fn start_literpc_client_proxy_mode(
|
|||
QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await;
|
||||
|
||||
// this effectively controls how many connections we will have
|
||||
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
|
||||
let mut connections_to_keep: HashSet<(Pubkey, SocketAddr)> = HashSet::new();
|
||||
let addr1 = UdpSocket::bind("127.0.0.1:0")
|
||||
.unwrap()
|
||||
.local_addr()
|
||||
.unwrap();
|
||||
connections_to_keep.insert(
|
||||
connections_to_keep.insert((
|
||||
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
|
||||
addr1,
|
||||
);
|
||||
));
|
||||
|
||||
let addr2 = UdpSocket::bind("127.0.0.1:0")
|
||||
.unwrap()
|
||||
.local_addr()
|
||||
.unwrap();
|
||||
connections_to_keep.insert(
|
||||
connections_to_keep.insert((
|
||||
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
|
||||
addr2,
|
||||
);
|
||||
));
|
||||
|
||||
// this is the real streamer
|
||||
connections_to_keep.insert(validator_identity.pubkey(), streamer_listen_addrs);
|
||||
connections_to_keep.insert((validator_identity.pubkey(), streamer_listen_addrs));
|
||||
|
||||
// get information about the optional validator identity stake
|
||||
// populated from get_stakes_for_identity()
|
||||
|
|
|
@ -109,6 +109,7 @@ pub struct QuicConnectionParameters {
|
|||
pub number_of_transactions_per_unistream: usize,
|
||||
pub unistreams_to_create_new_connection_in_percentage: u8,
|
||||
pub prioritization_heap_size: Option<usize>,
|
||||
pub enable_tpu_forwarding: Option<bool>,
|
||||
}
|
||||
|
||||
impl Default for QuicConnectionParameters {
|
||||
|
@ -123,6 +124,7 @@ impl Default for QuicConnectionParameters {
|
|||
number_of_transactions_per_unistream: 1,
|
||||
unistreams_to_create_new_connection_in_percentage: 10,
|
||||
prioritization_heap_size: None,
|
||||
enable_tpu_forwarding: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
|
@ -64,7 +64,7 @@ impl QuicProxyConnectionManager {
|
|||
&self,
|
||||
broadcast_receiver: Receiver<SentTransactionInfo>,
|
||||
// for duration of this slot these tpu nodes will receive the transactions
|
||||
connections_to_keep: HashMap<Pubkey, SocketAddr>,
|
||||
connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
|
||||
connection_parameters: QuicConnectionParameters,
|
||||
) {
|
||||
debug!(
|
||||
|
|
|
@ -13,7 +13,7 @@ use solana_lite_rpc_core::{
|
|||
};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
|
||||
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
|
||||
use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
|
||||
use tokio::sync::{
|
||||
broadcast::{self, Receiver, Sender},
|
||||
Notify,
|
||||
|
@ -242,7 +242,7 @@ impl ActiveConnection {
|
|||
|
||||
pub struct TpuConnectionManager {
|
||||
endpoints: RotatingQueue<Endpoint>,
|
||||
identity_to_active_connection: Arc<DashMap<Pubkey, ActiveConnection>>,
|
||||
active_connections: Arc<DashMap<(Pubkey, SocketAddr), ActiveConnection>>,
|
||||
}
|
||||
|
||||
impl TpuConnectionManager {
|
||||
|
@ -256,21 +256,22 @@ impl TpuConnectionManager {
|
|||
endpoints: RotatingQueue::new(number_of_clients, || {
|
||||
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
|
||||
}),
|
||||
identity_to_active_connection: Arc::new(DashMap::new()),
|
||||
active_connections: Arc::new(DashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn update_connections(
|
||||
&self,
|
||||
broadcast_sender: Arc<Sender<SentTransactionInfo>>,
|
||||
connections_to_keep: HashMap<Pubkey, SocketAddr>,
|
||||
connections_to_keep: HashSet<(Pubkey, SocketAddr)>,
|
||||
identity_stakes: IdentityStakesData,
|
||||
data_cache: DataCache,
|
||||
connection_parameters: QuicConnectionParameters,
|
||||
) {
|
||||
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
|
||||
for (identity, socket_addr) in &connections_to_keep {
|
||||
if self.identity_to_active_connection.get(identity).is_none() {
|
||||
let connection_key = (*identity, *socket_addr);
|
||||
if self.active_connections.get(&connection_key).is_none() {
|
||||
trace!("added a connection for {}, {}", identity, socket_addr);
|
||||
let active_connection = ActiveConnection::new(
|
||||
self.endpoints.clone(),
|
||||
|
@ -282,15 +283,15 @@ impl TpuConnectionManager {
|
|||
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
|
||||
let broadcast_receiver = broadcast_sender.subscribe();
|
||||
active_connection.start_listening(broadcast_receiver, identity_stakes);
|
||||
self.identity_to_active_connection
|
||||
.insert(*identity, active_connection);
|
||||
self.active_connections
|
||||
.insert((*identity, *socket_addr), active_connection);
|
||||
}
|
||||
}
|
||||
|
||||
// remove connections which are no longer needed
|
||||
self.identity_to_active_connection.retain(|key, value| {
|
||||
if !connections_to_keep.contains_key(key) {
|
||||
trace!("removing a connection for {}", key.to_string());
|
||||
self.active_connections.retain(|key, value| {
|
||||
if !connections_to_keep.contains(key) {
|
||||
trace!("removing a connection for {} {}", key.0, key.1);
|
||||
// ignore error for exit channel
|
||||
let _ = value.exit_notifier.send(());
|
||||
false
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use anyhow::Context;
|
||||
use prometheus::{core::GenericGauge, opts, register_int_gauge};
|
||||
use solana_streamer::nonblocking::quic::ConnectionPeerType;
|
||||
|
||||
use super::tpu_connection_manager::TpuConnectionManager;
|
||||
use crate::quic_connection_utils::QuicConnectionParameters;
|
||||
|
@ -15,7 +16,7 @@ use solana_lite_rpc_core::types::SlotStream;
|
|||
use solana_lite_rpc_core::AnyhowJoinHandle;
|
||||
use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
|
||||
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
sync::Arc,
|
||||
|
@ -128,8 +129,21 @@ impl TpuService {
|
|||
.leader_schedule
|
||||
.get_slot_leaders(current_slot, last_slot)
|
||||
.await?;
|
||||
|
||||
let identity_stakes = self.data_cache.identity_stakes.get_stakes().await;
|
||||
|
||||
let enable_tpu_forwards = {
|
||||
match identity_stakes.peer_type {
|
||||
ConnectionPeerType::Unstaked => false,
|
||||
ConnectionPeerType::Staked => self
|
||||
.config
|
||||
.quic_connection_params
|
||||
.enable_tpu_forwarding
|
||||
.unwrap_or_default(),
|
||||
}
|
||||
};
|
||||
// get next leader with its tpu port
|
||||
let connections_to_keep: HashMap<_, _> = next_leaders
|
||||
let connections_to_keep: HashSet<_, _> = next_leaders
|
||||
.iter()
|
||||
.map(|x| {
|
||||
let contact_info = cluster_nodes.get(&x.pubkey);
|
||||
|
@ -140,11 +154,23 @@ impl TpuService {
|
|||
(x.pubkey, tpu_port)
|
||||
})
|
||||
.filter(|x| x.1.is_some())
|
||||
.map(|x| {
|
||||
let mut addr = x.1.unwrap();
|
||||
.flat_map(|x| {
|
||||
let mut addresses = vec![];
|
||||
let mut tpu_addr = x.1.unwrap();
|
||||
// add quic port offset
|
||||
addr.set_port(addr.port() + QUIC_PORT_OFFSET);
|
||||
(x.0, addr)
|
||||
tpu_addr.set_port(tpu_addr.port() + QUIC_PORT_OFFSET);
|
||||
|
||||
addresses.push((x.0, tpu_addr));
|
||||
|
||||
if enable_tpu_forwards {
|
||||
// Technically the forwards port could be anywhere and unfortunately getClusterNodes
|
||||
// does not report it. However it's nearly always directly after the tpu port.
|
||||
let mut tpu_forwards_addr = tpu_addr;
|
||||
tpu_forwards_addr.set_port(tpu_addr.port() + 1);
|
||||
addresses.push((x.0, tpu_forwards_addr));
|
||||
}
|
||||
|
||||
addresses
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
@ -156,7 +182,7 @@ impl TpuService {
|
|||
.update_connections(
|
||||
self.broadcast_sender.clone(),
|
||||
connections_to_keep,
|
||||
self.data_cache.identity_stakes.get_stakes().await,
|
||||
identity_stakes,
|
||||
self.data_cache.clone(),
|
||||
self.config.quic_connection_params,
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue