adds code-path broadcasting shreds using QUIC (#31610)

adds quic connection cache to turbine

Working towards migrating turbine to QUIC.
This commit is contained in:
behzad nouri 2023-06-12 22:58:27 +00:00 committed by GitHub
parent b89ce94d33
commit ec0001ef85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 254 additions and 68 deletions

1
Cargo.lock generated
View File

@ -5514,6 +5514,7 @@ dependencies = [
"solana-perf",
"solana-poh",
"solana-program-runtime",
"solana-quic-client",
"solana-rayon-threadlimit",
"solana-rpc",
"solana-rpc-client-api",

View File

@ -50,6 +50,7 @@ solana-net-utils = { workspace = true }
solana-perf = { workspace = true }
solana-poh = { workspace = true }
solana-program-runtime = { workspace = true }
solana-quic-client = { workspace = true }
solana-rayon-threadlimit = { workspace = true }
solana-rpc = { workspace = true }
solana-rpc-client-api = { workspace = true }

View File

@ -9,6 +9,7 @@ use {
broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage,
},
cluster_nodes::ClusterNodesCache,
validator::TURBINE_QUIC_CONNECTION_POOL_SIZE,
},
solana_gossip::{
cluster_info::{ClusterInfo, Node},
@ -18,16 +19,17 @@ use {
genesis_utils::{create_genesis_config, GenesisConfigInfo},
shred::{Shred, ShredFlags},
},
solana_quic_client::new_quic_connection_cache,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
pubkey,
signature::{Keypair, Signer},
timing::{timestamp, AtomicInterval},
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
std::{
collections::HashMap,
net::UdpSocket,
net::{IpAddr, Ipv4Addr, UdpSocket},
sync::{Arc, RwLock},
time::Duration,
},
@ -38,6 +40,14 @@ use {
fn broadcast_shreds_bench(bencher: &mut Bencher) {
solana_logger::setup();
let leader_keypair = Arc::new(Keypair::new());
let quic_connection_cache = new_quic_connection_cache(
"connection_cache_test",
&leader_keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
TURBINE_QUIC_CONNECTION_POOL_SIZE,
)
.unwrap();
let leader_info = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
let cluster_info = ClusterInfo::new(
leader_info.info,
@ -45,7 +55,6 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
SocketAddrSpace::Unspecified,
);
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_benches(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
@ -74,6 +83,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
&socket,
&shreds,
&cluster_nodes_cache,
&quic_connection_cache,
&last_datapoint,
&mut TransmitShredsStats::default(),
&cluster_info,

View File

@ -6,7 +6,7 @@ extern crate test;
use {
crossbeam_channel::unbounded,
log::*,
solana_core::retransmit_stage::retransmitter,
solana_core::{retransmit_stage::retransmitter, validator::TURBINE_QUIC_CONNECTION_POOL_SIZE},
solana_entry::entry::Entry,
solana_gossip::{
cluster_info::{ClusterInfo, Node},
@ -26,10 +26,10 @@ use {
system_transaction,
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
std::{
iter::repeat_with,
net::{Ipv4Addr, UdpSocket},
net::{IpAddr, Ipv4Addr, UdpSocket},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
@ -100,6 +100,16 @@ fn bench_retransmitter(bencher: &mut Bencher) {
.collect();
let keypair = Keypair::new();
let quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_test",
&keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
TURBINE_QUIC_CONNECTION_POOL_SIZE,
)
.unwrap(),
);
let slot = 0;
let parent = 0;
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
@ -118,6 +128,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let retransmitter_handles = retransmitter(
Arc::new(sockets),
quic_connection_cache,
bank_forks,
leader_schedule_cache,
cluster_info,

View File

@ -9,11 +9,12 @@ use {
standard_broadcast_run::StandardBroadcastRun,
},
crate::{
cluster_nodes::{ClusterNodes, ClusterNodesCache},
cluster_nodes::{self, ClusterNodes, ClusterNodesCache},
result::{Error, Result},
},
crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender},
itertools::Itertools,
itertools::{Either, Itertools},
solana_client::tpu_connection::TpuConnection,
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::Protocol,
@ -22,6 +23,7 @@ use {
solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_error, inc_new_counter_info},
solana_poh::poh_recorder::WorkingBankEntry,
solana_quic_client::QuicConnectionCache,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::Slot,
@ -87,6 +89,7 @@ impl BroadcastStageType {
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
shred_version: u16,
quic_connection_cache: Arc<QuicConnectionCache>,
) -> BroadcastStage {
match self {
BroadcastStageType::Standard => BroadcastStage::new(
@ -97,7 +100,7 @@ impl BroadcastStageType {
exit_sender,
blockstore,
bank_forks,
StandardBroadcastRun::new(shred_version),
StandardBroadcastRun::new(shred_version, quic_connection_cache),
),
BroadcastStageType::FailEntryVerification => BroadcastStage::new(
@ -108,7 +111,7 @@ impl BroadcastStageType {
exit_sender,
blockstore,
bank_forks,
FailEntryVerificationBroadcastRun::new(shred_version),
FailEntryVerificationBroadcastRun::new(shred_version, quic_connection_cache),
),
BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new(
@ -392,6 +395,7 @@ pub fn broadcast_shreds(
s: &UdpSocket,
shreds: &[Shred],
cluster_nodes_cache: &ClusterNodesCache<BroadcastStage>,
quic_connection_cache: &QuicConnectionCache,
last_datapoint_submit: &AtomicInterval,
transmit_stats: &mut TransmitShredsStats,
cluster_info: &ClusterInfo,
@ -404,7 +408,7 @@ pub fn broadcast_shreds(
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let packets: Vec<_> = shreds
let (packets, quic_packets): (Vec<_>, Vec<_>) = shreds
.iter()
.group_by(|shred| shred.slot())
.into_iter()
@ -413,26 +417,40 @@ pub fn broadcast_shreds(
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
update_peer_stats(&cluster_nodes, last_datapoint_submit);
shreds.filter_map(move |shred| {
let key = shred.id();
let protocol = cluster_nodes::get_broadcast_protocol(&key);
cluster_nodes
.get_broadcast_peer(&shred.id())?
.tvu(Protocol::UDP)
.get_broadcast_peer(&key)?
.tvu(protocol)
.ok()
.filter(|addr| socket_addr_space.check(addr))
.map(|addr| (shred.payload(), addr))
.map(|addr| {
(match protocol {
Protocol::QUIC => Either::Right,
Protocol::UDP => Either::Left,
})((shred.payload(), addr))
})
})
})
.collect();
.partition_map(std::convert::identity);
shred_select.stop();
transmit_stats.shred_select += shred_select.as_us();
let mut send_mmsg_time = Measure::start("send_mmsg");
if let Err(SendPktsError::IoError(ioerr, num_failed)) = batch_send(s, &packets[..]) {
transmit_stats.dropped_packets += num_failed;
transmit_stats.dropped_packets_udp += num_failed;
result = Err(Error::Io(ioerr));
}
send_mmsg_time.stop();
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
transmit_stats.total_packets += packets.len();
for (shred, addr) in &quic_packets {
let conn = quic_connection_cache.get_connection(addr);
if let Err(err) = conn.send_data(shred) {
transmit_stats.dropped_packets_quic += 1;
result = Err(Error::from(err));
}
}
transmit_stats.total_packets += packets.len() + quic_packets.len();
result
}
@ -440,6 +458,7 @@ pub fn broadcast_shreds(
pub mod test {
use {
super::*,
crate::validator::TURBINE_QUIC_CONNECTION_POOL_SIZE,
crossbeam_channel::unbounded,
solana_entry::entry::create_ticks,
solana_gossip::cluster_info::{ClusterInfo, Node},
@ -454,7 +473,9 @@ pub mod test {
hash::Hash,
signature::{Keypair, Signer},
},
solana_streamer::streamer::StakedNodes,
std::{
net::{IpAddr, Ipv4Addr},
path::Path,
sync::{atomic::AtomicBool, Arc},
thread::sleep,
@ -586,6 +607,16 @@ pub mod test {
) -> MockBroadcastStage {
// Make the database ledger
let blockstore = Arc::new(Blockstore::open(ledger_path).unwrap());
let quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_test",
&leader_keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
TURBINE_QUIC_CONNECTION_POOL_SIZE,
)
.unwrap(),
);
// Make the leader node and scheduler
let leader_info = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
@ -619,7 +650,7 @@ pub mod test {
exit_sender,
blockstore.clone(),
bank_forks,
StandardBroadcastRun::new(0),
StandardBroadcastRun::new(0, quic_connection_cache),
);
MockBroadcastStage {

View File

@ -21,7 +21,8 @@ pub struct TransmitShredsStats {
pub shred_select: u64,
pub num_shreds: usize,
pub total_packets: usize,
pub dropped_packets: usize,
pub(crate) dropped_packets_udp: usize,
pub(crate) dropped_packets_quic: usize,
}
impl BroadcastStats for TransmitShredsStats {
@ -32,7 +33,8 @@ impl BroadcastStats for TransmitShredsStats {
self.num_shreds += new_stats.num_shreds;
self.shred_select += new_stats.shred_select;
self.total_packets += new_stats.total_packets;
self.dropped_packets += new_stats.dropped_packets;
self.dropped_packets_udp += new_stats.dropped_packets_udp;
self.dropped_packets_quic += new_stats.dropped_packets_quic;
}
fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool) {
if was_interrupted {
@ -45,7 +47,12 @@ impl BroadcastStats for TransmitShredsStats {
("num_shreds", self.num_shreds as i64, i64),
("shred_select", self.shred_select as i64, i64),
("total_packets", self.total_packets as i64, i64),
("dropped_packets", self.dropped_packets as i64, i64),
("dropped_packets_udp", self.dropped_packets_udp as i64, i64),
(
"dropped_packets_quic",
self.dropped_packets_quic as i64,
i64
),
);
} else {
datapoint_info!(
@ -64,7 +71,12 @@ impl BroadcastStats for TransmitShredsStats {
("num_shreds", self.num_shreds as i64, i64),
("shred_select", self.shred_select as i64, i64),
("total_packets", self.total_packets as i64, i64),
("dropped_packets", self.dropped_packets as i64, i64),
("dropped_packets_udp", self.dropped_packets_udp as i64, i64),
(
"dropped_packets_quic",
self.dropped_packets_quic as i64,
i64
),
);
}
}
@ -210,7 +222,8 @@ mod test {
shred_select: 4,
num_shreds: 5,
total_packets: 6,
dropped_packets: 7,
dropped_packets_udp: 7,
dropped_packets_quic: 8,
},
&Some(BroadcastShredBatchInfo {
slot: 0,
@ -230,7 +243,8 @@ mod test {
assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5);
assert_eq!(slot_0_stats.broadcast_shred_stats.total_packets, 6);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets, 7);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_udp, 7);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_quic, 8);
slot_broadcast_stats.update(
&TransmitShredsStats {
@ -240,7 +254,8 @@ mod test {
shred_select: 14,
num_shreds: 15,
total_packets: 16,
dropped_packets: 17,
dropped_packets_udp: 17,
dropped_packets_quic: 18,
},
&None,
);
@ -255,7 +270,8 @@ mod test {
assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5);
assert_eq!(slot_0_stats.broadcast_shred_stats.total_packets, 6);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets, 7);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_udp, 7);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_quic, 8);
// If another batch is given, then total number of batches == num_expected_batches == 2,
// so the batch should be purged from the HashMap
@ -267,7 +283,8 @@ mod test {
shred_select: 1,
num_shreds: 1,
total_packets: 1,
dropped_packets: 1,
dropped_packets_udp: 1,
dropped_packets_quic: 1,
},
&Some(BroadcastShredBatchInfo {
slot: 0,

View File

@ -17,11 +17,12 @@ pub(super) struct FailEntryVerificationBroadcastRun {
next_shred_index: u32,
next_code_index: u32,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
quic_connection_cache: Arc<QuicConnectionCache>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}
impl FailEntryVerificationBroadcastRun {
pub(super) fn new(shred_version: u16) -> Self {
pub(super) fn new(shred_version: u16, quic_connection_cache: Arc<QuicConnectionCache>) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
@ -33,6 +34,7 @@ impl FailEntryVerificationBroadcastRun {
next_shred_index: 0,
next_code_index: 0,
cluster_nodes_cache,
quic_connection_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
@ -168,6 +170,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
sock,
&shreds,
&self.cluster_nodes_cache,
&self.quic_connection_cache,
&AtomicInterval::default(),
&mut TransmitShredsStats::default(),
cluster_info,

View File

@ -33,6 +33,7 @@ pub struct StandardBroadcastRun {
last_datapoint_submit: Arc<AtomicInterval>,
num_batches: usize,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
quic_connection_cache: Arc<QuicConnectionCache>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}
@ -42,7 +43,7 @@ enum BroadcastError {
}
impl StandardBroadcastRun {
pub(super) fn new(shred_version: u16) -> Self {
pub(super) fn new(shred_version: u16, quic_connection_cache: Arc<QuicConnectionCache>) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
@ -58,6 +59,7 @@ impl StandardBroadcastRun {
last_datapoint_submit: Arc::default(),
num_batches: 0,
cluster_nodes_cache,
quic_connection_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
@ -413,6 +415,7 @@ impl StandardBroadcastRun {
sock,
&shreds,
&self.cluster_nodes_cache,
&self.quic_connection_cache,
&self.last_datapoint_submit,
&mut transmit_stats,
cluster_info,
@ -506,6 +509,7 @@ fn should_use_merkle_variant(slot: Slot, cluster_type: ClusterType, shred_versio
mod test {
use {
super::*,
crate::validator::TURBINE_QUIC_CONNECTION_POOL_SIZE,
solana_entry::entry::create_ticks,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
@ -517,8 +521,13 @@ mod test {
genesis_config::GenesisConfig,
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
std::{ops::Deref, sync::Arc, time::Duration},
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
std::{
net::{IpAddr, Ipv4Addr},
ops::Deref,
sync::Arc,
time::Duration,
},
};
#[allow(clippy::type_complexity)]
@ -564,10 +573,24 @@ mod test {
)
}
fn new_quic_connection_cache(keypair: &Keypair) -> Arc<QuicConnectionCache> {
Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_test",
keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
TURBINE_QUIC_CONNECTION_POOL_SIZE,
)
.unwrap(),
)
}
#[test]
fn test_interrupted_slot_last_shred() {
let keypair = Arc::new(Keypair::new());
let mut run = StandardBroadcastRun::new(0);
let quic_connection_cache = new_quic_connection_cache(&keypair);
let mut run = StandardBroadcastRun::new(0, quic_connection_cache);
// Set up the slot to be interrupted
let next_shred_index = 10;
@ -609,6 +632,7 @@ mod test {
let num_shreds_per_slot = 2;
let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) =
setup(num_shreds_per_slot);
let quic_connection_cache = new_quic_connection_cache(&leader_keypair);
// Insert 1 less than the number of ticks needed to finish the slot
let ticks0 = create_ticks(genesis_config.ticks_per_slot - 1, 0, genesis_config.hash());
@ -621,7 +645,7 @@ mod test {
};
// Step 1: Make an incomplete transmission for slot 0
let mut standard_broadcast_run = StandardBroadcastRun::new(0);
let mut standard_broadcast_run = StandardBroadcastRun::new(0, quic_connection_cache);
standard_broadcast_run
.test_process_receive_results(
&leader_keypair,
@ -739,10 +763,11 @@ mod test {
let num_shreds_per_slot = 2;
let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket, _bank_forks) =
setup(num_shreds_per_slot);
let quic_connection_cache = new_quic_connection_cache(&leader_keypair);
let (bsend, brecv) = unbounded();
let (ssend, _srecv) = unbounded();
let mut last_tick_height = 0;
let mut standard_broadcast_run = StandardBroadcastRun::new(0);
let mut standard_broadcast_run = StandardBroadcastRun::new(0, quic_connection_cache);
let mut process_ticks = |num_ticks| {
let ticks = create_ticks(num_ticks, 0, genesis_config.hash());
last_tick_height += (ticks.len() - 1) as u64;
@ -787,6 +812,7 @@ mod test {
let num_shreds_per_slot = 2;
let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) =
setup(num_shreds_per_slot);
let quic_connection_cache = new_quic_connection_cache(&leader_keypair);
// Insert complete slot of ticks needed to finish the slot
let ticks = create_ticks(genesis_config.ticks_per_slot, 0, genesis_config.hash());
@ -798,7 +824,7 @@ mod test {
last_tick_height: ticks.len() as u64,
};
let mut standard_broadcast_run = StandardBroadcastRun::new(0);
let mut standard_broadcast_run = StandardBroadcastRun::new(0, quic_connection_cache);
standard_broadcast_run
.test_process_receive_results(
&leader_keypair,
@ -815,9 +841,10 @@ mod test {
#[test]
fn entries_to_shreds_max() {
solana_logger::setup();
let mut bs = StandardBroadcastRun::new(0);
bs.current_slot_and_parent = Some((1, 0));
let keypair = Keypair::new();
let quic_connection_cache = new_quic_connection_cache(&keypair);
let mut bs = StandardBroadcastRun::new(0, quic_connection_cache);
bs.current_slot_and_parent = Some((1, 0));
let entries = create_ticks(10_000, 1, solana_sdk::hash::Hash::default());
let ledger_path = get_tmp_ledger_path!();

View File

@ -176,10 +176,11 @@ impl ClusterNodes<RetransmitStage> {
addrs,
frwds,
} = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout)?;
let protocol = get_broadcast_protocol(shred);
if neighbors.is_empty() {
let peers = children.into_iter().filter_map(|node| {
node.contact_info()?
.tvu(Protocol::UDP)
.tvu(protocol)
.ok()
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
});
@ -209,7 +210,7 @@ impl ClusterNodes<RetransmitStage> {
})
.chain(children.into_iter().filter_map(|node| {
node.contact_info()?
.tvu(Protocol::UDP)
.tvu(protocol)
.ok()
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
}));
@ -239,12 +240,13 @@ impl ClusterNodes<RetransmitStage> {
let mut frwds = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = ChaChaRng::from_seed(shred_seed);
let drop_redundant_turbine_path = drop_redundant_turbine_path(shred.slot(), root_bank);
let protocol = get_broadcast_protocol(shred);
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.inspect(|node| {
if let Some(node) = node.contact_info() {
if let Ok(addr) = node.tvu(Protocol::UDP) {
if let Ok(addr) = node.tvu(protocol) {
addrs.entry(addr).or_insert(*node.pubkey());
}
if !drop_redundant_turbine_path {
@ -469,6 +471,11 @@ impl From<Pubkey> for NodeId {
}
}
#[inline]
pub(crate) fn get_broadcast_protocol(_: &ShredId) -> Protocol {
Protocol::UDP
}
pub fn make_test_cluster<R: Rng>(
rng: &mut R,
num_nodes: usize,

View File

@ -27,6 +27,8 @@ pub enum Error {
RecvTimeout(#[from] crossbeam_channel::RecvTimeoutError),
#[error("Send")]
Send,
#[error(transparent)]
TransportError(#[from] solana_sdk::transport::TransportError),
#[error("TrySend")]
TrySend,
#[error(transparent)]

View File

@ -8,15 +8,15 @@ use {
lru::LruCache,
rand::Rng,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_gossip::{
cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo,
},
solana_client::tpu_connection::TpuConnection,
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache,
shred::{self, ShredId},
},
solana_measure::measure::Measure,
solana_perf::deduper::Deduper,
solana_quic_client::QuicConnectionCache,
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_rpc_client_api::response::SlotUpdate,
@ -173,6 +173,7 @@ fn retransmit(
cluster_info: &ClusterInfo,
shreds_receiver: &Receiver<Vec</*shred:*/ Vec<u8>>>,
sockets: &[UdpSocket],
quic_connection_cache: &QuicConnectionCache,
stats: &mut RetransmitStats,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
shred_deduper: &mut ShredDeduper<2>,
@ -259,6 +260,7 @@ fn retransmit(
&cluster_nodes,
socket_addr_space,
&sockets[index % sockets.len()],
quic_connection_cache,
stats,
)
.map_err(|err| {
@ -283,6 +285,7 @@ fn retransmit(
&cluster_nodes,
socket_addr_space,
&sockets[index % sockets.len()],
quic_connection_cache,
stats,
)
.map_err(|err| {
@ -311,6 +314,7 @@ fn retransmit_shred(
cluster_nodes: &ClusterNodes<RetransmitStage>,
socket_addr_space: &SocketAddrSpace,
socket: &UdpSocket,
quic_connection_cache: &QuicConnectionCache,
stats: &RetransmitStats,
) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> {
let mut compute_turbine_peers = Measure::start("turbine_start");
@ -319,7 +323,7 @@ fn retransmit_shred(
cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout)?;
let addrs: Vec<_> = addrs
.into_iter()
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.filter(|addr| socket_addr_space.check(addr))
.collect();
compute_turbine_peers.stop();
stats
@ -327,22 +331,33 @@ fn retransmit_shred(
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);
let mut retransmit_time = Measure::start("retransmit_to");
let num_nodes = match multi_target_send(socket, shred, &addrs) {
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
stats
.num_addrs_failed
.fetch_add(num_failed, Ordering::Relaxed);
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
addrs.len(),
);
addrs.len() - num_failed
}
let num_nodes = match cluster_nodes::get_broadcast_protocol(key) {
Protocol::QUIC => addrs
.iter()
.filter_map(|addr| {
quic_connection_cache
.get_connection(addr)
.send_data(shred)
.ok()
})
.count(),
Protocol::UDP => match multi_target_send(socket, shred, &addrs) {
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
addrs.len(),
);
addrs.len() - num_failed
}
},
};
retransmit_time.stop();
stats
.num_addrs_failed
.fetch_add(addrs.len() - num_nodes, Ordering::Relaxed);
stats.num_nodes.fetch_add(num_nodes, Ordering::Relaxed);
stats
.retransmit_total
@ -360,6 +375,7 @@ fn retransmit_shred(
/// * `r` - Receive channel for shreds to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
sockets: Arc<Vec<UdpSocket>>,
quic_connection_cache: Arc<QuicConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>,
@ -391,6 +407,7 @@ pub fn retransmitter(
&cluster_info,
&shreds_receiver,
&sockets,
&quic_connection_cache,
&mut stats,
&cluster_nodes_cache,
&mut shred_deduper,
@ -415,12 +432,14 @@ impl RetransmitStage {
leader_schedule_cache: Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>,
retransmit_sockets: Arc<Vec<UdpSocket>>,
quic_connection_cache: Arc<QuicConnectionCache>,
retransmit_receiver: Receiver<Vec</*shred:*/ Vec<u8>>>,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Self {
let retransmit_thread_handle = retransmitter(
retransmit_sockets,
quic_connection_cache,
bank_forks,
leader_schedule_cache,
cluster_info,

View File

@ -24,8 +24,8 @@ use {
};
const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
const MAX_STAKED_QUIC_CONNECTIONS: usize = 2000;
const MAX_UNSTAKED_QUIC_CONNECTIONS: usize = 1000;
const MAX_STAKED_QUIC_CONNECTIONS: usize = 4000;
const MAX_UNSTAKED_QUIC_CONNECTIONS: usize = 2000;
const QUIC_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(5);
const QUIC_COALESCE_WAIT: Duration = Duration::from_millis(10);

View File

@ -26,6 +26,7 @@ use {
entry_notifier_service::EntryNotifierSender,
},
solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
solana_quic_client::QuicConnectionCache,
solana_rpc::{
optimistically_confirmed_bank_tracker::BankNotificationSender,
rpc_subscriptions::RpcSubscriptions,
@ -101,6 +102,7 @@ impl Tpu {
tpu_coalesce: Duration,
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
connection_cache: &Arc<ConnectionCache>,
turbine_quic_connection_cache: Arc<QuicConnectionCache>,
keypair: &Keypair,
log_messages_bytes_limit: Option<usize>,
staked_nodes: &Arc<RwLock<StakedNodes>>,
@ -254,6 +256,7 @@ impl Tpu {
blockstore.clone(),
bank_forks,
shred_version,
turbine_quic_connection_cache,
);
Self {

View File

@ -40,6 +40,7 @@ use {
entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache,
},
solana_poh::poh_recorder::PohRecorder,
solana_quic_client::QuicConnectionCache,
solana_rpc::{
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
rpc_subscriptions::RpcSubscriptions,
@ -141,6 +142,7 @@ impl Tvu {
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
banking_tracer: Arc<BankingTracer>,
staked_nodes: Arc<RwLock<StakedNodes>>,
quic_connection_cache: Arc<QuicConnectionCache>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
@ -188,6 +190,7 @@ impl Tvu {
leader_schedule_cache.clone(),
cluster_info.clone(),
Arc::new(retransmit_sockets),
quic_connection_cache,
retransmit_receiver,
max_slots.clone(),
Some(rpc_subscriptions.clone()),
@ -374,6 +377,7 @@ impl Tvu {
pub mod tests {
use {
super::*,
crate::validator::TURBINE_QUIC_CONNECTION_POOL_SIZE,
serial_test::serial,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
@ -387,7 +391,10 @@ pub mod tests {
solana_runtime::bank::Bank,
solana_sdk::signature::{Keypair, Signer},
solana_streamer::socket::SocketAddrSpace,
std::sync::atomic::{AtomicU64, Ordering},
std::{
net::{IpAddr, Ipv4Addr},
sync::atomic::{AtomicU64, Ordering},
},
};
#[ignore]
@ -404,12 +411,20 @@ pub mod tests {
let bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config));
//start cluster_info1
let cluster_info1 = ClusterInfo::new(
target1.info.clone(),
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
let keypair = Arc::new(Keypair::new());
let quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_test",
&keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
TURBINE_QUIC_CONNECTION_POOL_SIZE,
)
.unwrap(),
);
//start cluster_info1
let cluster_info1 =
ClusterInfo::new(target1.info.clone(), keypair, SocketAddrSpace::Unspecified);
cluster_info1.insert_info(leader.info);
let cref1 = Arc::new(cluster_info1);
@ -491,6 +506,7 @@ pub mod tests {
&ignored_prioritization_fee_cache,
BankingTracer::new_disabled(),
Arc::<RwLock<StakedNodes>>::default(),
quic_connection_cache,
)
.expect("assume success");
exit.store(true, Ordering::Relaxed);

View File

@ -129,6 +129,8 @@ use {
const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
pub const TURBINE_QUIC_CONNECTION_POOL_SIZE: usize = 4;
#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
#[strum(serialize_all = "kebab-case")]
pub enum BlockVerificationMethod {
@ -1107,6 +1109,19 @@ impl Validator {
let entry_notification_sender = entry_notifier_service
.as_ref()
.map(|service| service.sender_cloned());
let turbine_quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_tvu_quic",
&identity_keypair,
node.info
.tvu(Protocol::QUIC)
.expect("Operator must spin up node with valid TVU address")
.ip(),
&staked_nodes,
TURBINE_QUIC_CONNECTION_POOL_SIZE,
)
.unwrap(),
);
let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
vote_account,
@ -1160,6 +1175,7 @@ impl Validator {
&prioritization_fee_cache,
banking_tracer.clone(),
staked_nodes.clone(),
turbine_quic_connection_cache.clone(),
)?;
let tpu = Tpu::new(
@ -1192,6 +1208,7 @@ impl Validator {
config.tpu_coalesce,
cluster_confirmed_slot_sender,
&connection_cache,
turbine_quic_connection_cache,
&identity_keypair,
config.runtime_config.log_messages_bytes_limit,
&staked_nodes,

View File

@ -4744,6 +4744,7 @@ dependencies = [
"solana-perf",
"solana-poh",
"solana-program-runtime",
"solana-quic-client",
"solana-rayon-threadlimit",
"solana-rpc",
"solana-rpc-client-api",

View File

@ -18,12 +18,15 @@ use {
rcgen::RcgenError,
solana_connection_cache::{
connection_cache::{
BaseClientConnection, ClientError, ConnectionManager, ConnectionPool,
BaseClientConnection, ClientError, ConnectionCache, ConnectionManager, ConnectionPool,
ConnectionPoolError, Protocol,
},
connection_cache_stats::ConnectionCacheStats,
},
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, Signer},
},
solana_streamer::{
nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType},
streamer::StakedNodes,
@ -214,6 +217,23 @@ impl QuicConnectionManager {
Self { connection_config }
}
}
pub type QuicConnectionCache = ConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>;
pub fn new_quic_connection_cache(
name: &'static str,
keypair: &Keypair,
ipaddr: IpAddr,
staked_nodes: &Arc<RwLock<StakedNodes>>,
connection_pool_size: usize,
) -> Result<QuicConnectionCache, ClientError> {
let mut config = QuicConfig::new()?;
config.update_client_certificate(keypair, ipaddr)?;
config.set_staked_nodes(staked_nodes, &keypair.pubkey());
let connection_manager = QuicConnectionManager::new_with_connection_config(config);
ConnectionCache::new(name, connection_manager, connection_pool_size)
}
#[cfg(test)]
mod tests {
use {