separates out turbine QUIC from TPU implementation (#32368)

* separates out turbine QUIC from TPU implementation

Turbine being tied to QUIC implementation for TPU hinders development
and makes it hard to optimize QUIC specifically for turbine.
The commit separates out turbine QUIC from TPU implementation.

* Update core/src/validator.rs

Co-authored-by: Jon Cinque <me@jonc.dev>

* Update turbine/src/retransmit_stage.rs

Co-authored-by: Jon Cinque <me@jonc.dev>

---------

Co-authored-by: Jon Cinque <me@jonc.dev>
This commit is contained in:
behzad nouri 2023-07-12 14:15:28 +00:00 committed by GitHub
parent 724e0fe4ac
commit a3ada9c5ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 279 additions and 223 deletions

3
Cargo.lock generated
View File

@ -5620,6 +5620,7 @@ dependencies = [
"base64 0.21.2",
"bincode",
"bs58",
"bytes",
"chrono",
"crossbeam-channel",
"dashmap 4.0.2",
@ -5634,6 +5635,7 @@ dependencies = [
"matches",
"min-max-heap",
"num_enum 0.6.1",
"quinn",
"rand 0.7.3",
"rand_chacha 0.2.2",
"raptorq",
@ -7169,7 +7171,6 @@ dependencies = [
"rayon",
"rcgen",
"rustls 0.20.8",
"solana-client",
"solana-entry",
"solana-gossip",
"solana-ledger",

View File

@ -17,6 +17,7 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git
base64 = { workspace = true }
bincode = { workspace = true }
bs58 = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true, features = ["default", "serde"] }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true, features = ["rayon", "raw-api"] }
@ -29,6 +30,7 @@ log = { workspace = true }
lru = { workspace = true }
min-max-heap = { workspace = true }
num_enum = { workspace = true }
quinn = { workspace = true }
rand = { workspace = true }
rand_chacha = { workspace = true }
rayon = { workspace = true }

View File

@ -2,19 +2,23 @@
use {
crate::repair::serve_repair::ServeRepair,
crossbeam_channel::{unbounded, Sender},
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
itertools::Itertools,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags},
solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags, PACKETS_PER_BATCH},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
feature_set,
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
},
solana_streamer::streamer::{self, PacketBatchReceiver, StakedNodes, StreamerReceiveStats},
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
solana_turbine::cluster_nodes::check_feature_activation,
std::{
net::UdpSocket,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
@ -24,11 +28,7 @@ use {
},
};
const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
const MAX_STAKED_QUIC_CONNECTIONS: usize = 4000;
const MAX_UNSTAKED_QUIC_CONNECTIONS: usize = 2000;
const QUIC_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(5);
const QUIC_COALESCE_WAIT: Duration = Duration::from_millis(10);
const PACKET_COALESCE_DURATION: Duration = Duration::from_millis(1);
pub(crate) struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
@ -141,9 +141,9 @@ impl ShredFetchStage {
packet_sender.clone(),
recycler.clone(),
Arc::new(StreamerReceiveStats::new("packet_modifier")),
Duration::from_millis(1), // coalesce
true,
None,
PACKET_COALESCE_DURATION,
true, // use_pinned_memory
None, // in_vote_only_mode
)
})
.collect();
@ -171,13 +171,12 @@ impl ShredFetchStage {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
sockets: Vec<Arc<UdpSocket>>,
quic_socket: UdpSocket,
quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
repair_socket: Arc<UdpSocket>,
sender: Sender<PacketBatch>,
shred_version: u16,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
staked_nodes: Arc<RwLock<StakedNodes>>,
turbine_disabled: Arc<AtomicBool>,
exit: Arc<AtomicBool>,
) -> Self {
@ -200,12 +199,12 @@ impl ShredFetchStage {
vec![repair_socket.clone()],
exit.clone(),
sender.clone(),
recycler,
recycler.clone(),
bank_forks.clone(),
shred_version,
"shred_fetch_repair",
PacketFlags::REPAIR,
Some((repair_socket, cluster_info.clone())),
Some((repair_socket, cluster_info)),
turbine_disabled.clone(),
);
@ -213,33 +212,16 @@ impl ShredFetchStage {
tvu_threads.push(tvu_filter);
tvu_threads.push(repair_handler);
let keypair = cluster_info.keypair().clone();
let ip_addr = cluster_info
.my_contact_info()
.tvu(Protocol::QUIC)
.expect("Operator must spin up node with valid (QUIC) TVU address")
.ip();
let (packet_sender, packet_receiver) = unbounded();
let (_endpoint, join_handle) = solana_streamer::quic::spawn_server(
"quic_streamer_tvu",
quic_socket,
&keypair,
ip_addr,
packet_sender,
exit,
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes,
MAX_STAKED_QUIC_CONNECTIONS,
MAX_UNSTAKED_QUIC_CONNECTIONS,
QUIC_WAIT_FOR_CHUNK_TIMEOUT,
QUIC_COALESCE_WAIT,
)
.unwrap();
tvu_threads.push(join_handle);
tvu_threads.push(
tvu_threads.extend([
Builder::new()
.name("solTvuFetchPMod".to_string())
.name("solTvuRecvQuic".to_string())
.spawn(|| {
receive_quic_datagrams(quic_endpoint_receiver, packet_sender, recycler, exit)
})
.unwrap(),
Builder::new()
.name("solTvuFetchQuic".to_string())
.spawn(move || {
Self::modify_packets(
packet_receiver,
@ -253,8 +235,7 @@ impl ShredFetchStage {
)
})
.unwrap(),
);
]);
Self {
thread_hdls: tvu_threads,
}
@ -268,6 +249,48 @@ impl ShredFetchStage {
}
}
fn receive_quic_datagrams(
quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
sender: Sender<PacketBatch>,
recycler: PacketBatchRecycler,
exit: Arc<AtomicBool>,
) {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
while !exit.load(Ordering::Relaxed) {
let entry = match quic_endpoint_receiver.recv_timeout(RECV_TIMEOUT) {
Ok(entry) => entry,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return,
};
let mut packet_batch =
PacketBatch::new_with_recycler(&recycler, PACKETS_PER_BATCH, "receive_quic_datagrams");
unsafe {
packet_batch.set_len(PACKETS_PER_BATCH);
};
let deadline = Instant::now() + PACKET_COALESCE_DURATION;
let entries = std::iter::once(entry).chain(
std::iter::repeat_with(|| quic_endpoint_receiver.recv_deadline(deadline).ok())
.while_some(),
);
let size = entries
.filter(|(_, _, bytes)| bytes.len() <= PACKET_DATA_SIZE)
.zip(packet_batch.iter_mut())
.map(|((_pubkey, addr, bytes), packet)| {
packet.buffer_mut()[..bytes.len()].copy_from_slice(&bytes);
let meta = packet.meta_mut();
meta.size = bytes.len();
meta.set_socket_addr(&addr);
})
.count();
if size > 0 {
packet_batch.truncate(size);
if sender.send(packet_batch).is_err() {
return;
}
}
}
}
#[must_use]
fn should_drop_merkle_shreds(shred_slot: Slot, root_bank: &Bank) -> bool {
check_feature_activation(

View File

@ -17,6 +17,7 @@ use {
tpu_entry_notifier::TpuEntryNotifier,
validator::GeneratorConfig,
},
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver},
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_gossip::cluster_info::ClusterInfo,
@ -25,7 +26,6 @@ use {
entry_notifier_service::EntryNotifierSender,
},
solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
solana_quic_client::QuicConnectionCache,
solana_rpc::{
optimistically_confirmed_bank_tracker::BankNotificationSender,
rpc_subscriptions::RpcSubscriptions,
@ -44,11 +44,12 @@ use {
solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
std::{
collections::HashMap,
net::UdpSocket,
net::{SocketAddr, UdpSocket},
sync::{atomic::AtomicBool, Arc, RwLock},
thread,
time::Duration,
},
tokio::sync::mpsc::Sender as AsyncSender,
};
// allow multiple connections for NAT and any open/close overlap
@ -102,7 +103,7 @@ impl Tpu {
tpu_coalesce: Duration,
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
connection_cache: &Arc<ConnectionCache>,
turbine_quic_connection_cache: Arc<QuicConnectionCache>,
turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
keypair: &Keypair,
log_messages_bytes_limit: Option<usize>,
staked_nodes: &Arc<RwLock<StakedNodes>>,
@ -256,7 +257,7 @@ impl Tpu {
blockstore.clone(),
bank_forks,
shred_version,
turbine_quic_connection_cache,
turbine_quic_endpoint_sender,
);
Self {

View File

@ -24,6 +24,7 @@ use {
warm_quic_cache_service::WarmQuicCacheService,
window_service::WindowService,
},
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, Sender},
solana_client::connection_cache::ConnectionCache,
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock,
@ -36,7 +37,6 @@ use {
entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache,
},
solana_poh::poh_recorder::PohRecorder,
solana_quic_client::QuicConnectionCache,
solana_rpc::{
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig,
rpc_subscriptions::RpcSubscriptions,
@ -47,14 +47,14 @@ use {
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
solana_streamer::streamer::StakedNodes,
solana_turbine::retransmit_stage::RetransmitStage,
std::{
collections::HashSet,
net::UdpSocket,
net::{SocketAddr, UdpSocket},
sync::{atomic::AtomicBool, Arc, RwLock},
thread::{self, JoinHandle},
},
tokio::sync::mpsc::Sender as AsyncSender,
};
pub struct Tvu {
@ -74,7 +74,6 @@ pub struct Tvu {
pub struct TvuSockets {
pub fetch: Vec<UdpSocket>,
pub(crate) fetch_quic: UdpSocket,
pub repair: UdpSocket,
pub retransmit: Vec<UdpSocket>,
pub ancestor_hashes_requests: UdpSocket,
@ -137,13 +136,12 @@ impl Tvu {
connection_cache: &Arc<ConnectionCache>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
banking_tracer: Arc<BankingTracer>,
staked_nodes: Arc<RwLock<StakedNodes>>,
quic_connection_cache: Arc<QuicConnectionCache>,
turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
fetch: fetch_sockets,
fetch_quic: fetch_quic_socket,
retransmit: retransmit_sockets,
ancestor_hashes_requests: ancestor_hashes_socket,
} = sockets;
@ -155,13 +153,12 @@ impl Tvu {
let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
let fetch_stage = ShredFetchStage::new(
fetch_sockets,
fetch_quic_socket,
turbine_quic_endpoint_receiver,
repair_socket.clone(),
fetch_sender,
tvu_config.shred_version,
bank_forks.clone(),
cluster_info.clone(),
staked_nodes,
turbine_disabled,
exit.clone(),
);
@ -182,7 +179,7 @@ impl Tvu {
leader_schedule_cache.clone(),
cluster_info.clone(),
Arc::new(retransmit_sockets),
quic_connection_cache,
turbine_quic_endpoint_sender,
retransmit_receiver,
max_slots.clone(),
Some(rpc_subscriptions.clone()),
@ -369,10 +366,7 @@ impl Tvu {
pub mod tests {
use {
super::*,
crate::{
consensus::tower_storage::FileTowerStorage,
validator::TURBINE_QUIC_CONNECTION_POOL_SIZE,
},
crate::consensus::tower_storage::FileTowerStorage,
serial_test::serial,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
@ -386,10 +380,7 @@ pub mod tests {
solana_runtime::bank::Bank,
solana_sdk::signature::{Keypair, Signer},
solana_streamer::socket::SocketAddrSpace,
std::{
net::{IpAddr, Ipv4Addr},
sync::atomic::{AtomicU64, Ordering},
},
std::sync::atomic::{AtomicU64, Ordering},
};
#[ignore]
@ -407,16 +398,9 @@ pub mod tests {
let bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config));
let keypair = Arc::new(Keypair::new());
let quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_test",
&keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
TURBINE_QUIC_CONNECTION_POOL_SIZE,
)
.unwrap(),
);
let (turbine_quic_endpoint_sender, _turbine_quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*capacity:*/ 128);
let (_turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
//start cluster_info1
let cluster_info1 =
ClusterInfo::new(target1.info.clone(), keypair, SocketAddrSpace::Unspecified);
@ -457,7 +441,6 @@ pub mod tests {
repair: target1.sockets.repair,
retransmit: target1.sockets.retransmit_sockets,
fetch: target1.sockets.tvu,
fetch_quic: target1.sockets.tvu_quic,
ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests,
}
},
@ -499,8 +482,8 @@ pub mod tests {
&Arc::new(ConnectionCache::new("connection_cache_test")),
&ignored_prioritization_fee_cache,
BankingTracer::new_disabled(),
Arc::<RwLock<StakedNodes>>::default(),
quic_connection_cache,
turbine_quic_endpoint_sender,
turbine_quic_endpoint_receiver,
)
.expect("assume success");
exit.store(true, Ordering::Relaxed);

View File

@ -30,6 +30,7 @@ use {
},
crossbeam_channel::{bounded, unbounded, Receiver},
lazy_static::lazy_static,
quinn::Endpoint,
rand::{thread_rng, Rng},
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_entry::poh::compute_hash_time_ns,
@ -113,7 +114,7 @@ use {
},
solana_send_transaction_service::send_transaction_service,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_turbine::broadcast_stage::BroadcastStageType,
solana_turbine::{self, broadcast_stage::BroadcastStageType},
solana_vote_program::vote_state,
std::{
collections::{HashMap, HashSet},
@ -133,8 +134,6 @@ use {
const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
pub const TURBINE_QUIC_CONNECTION_POOL_SIZE: usize = 4;
#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
#[strum(serialize_all = "kebab-case")]
pub enum BlockVerificationMethod {
@ -461,6 +460,9 @@ pub struct Validator {
ledger_metric_report_service: LedgerMetricReportService,
accounts_background_service: AccountsBackgroundService,
accounts_hash_verifier: AccountsHashVerifier,
turbine_quic_endpoint: Endpoint,
turbine_quic_endpoint_runtime: Option<tokio::runtime::Runtime>,
turbine_quic_endpoint_join_handle: solana_turbine::quic_endpoint::AsyncTryJoinHandle,
}
impl Validator {
@ -1111,19 +1113,40 @@ impl Validator {
let entry_notification_sender = entry_notifier_service
.as_ref()
.map(|service| service.sender_cloned());
let turbine_quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_tvu_quic",
&identity_keypair,
node.info
.tvu(Protocol::QUIC)
.expect("Operator must spin up node with valid TVU address")
.ip(),
&staked_nodes,
TURBINE_QUIC_CONNECTION_POOL_SIZE,
)
.unwrap(),
);
// test-validator crate may start the validator in a tokio runtime
// context which forces us to use the same runtime because a nested
// runtime will cause panic at drop.
// Outside test-validator crate, we always need a tokio runtime (and
// the respective handle) to initialize the turbine QUIC endpoint.
let current_runtime_handle = tokio::runtime::Handle::try_current();
let turbine_quic_endpoint_runtime = current_runtime_handle.is_err().then(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("solTurbineQuic")
.build()
.unwrap()
});
let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
let (
turbine_quic_endpoint,
turbine_quic_endpoint_sender,
turbine_quic_endpoint_join_handle,
) = solana_turbine::quic_endpoint::new_quic_endpoint(
turbine_quic_endpoint_runtime
.as_ref()
.map(tokio::runtime::Runtime::handle)
.unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
&identity_keypair,
node.sockets.tvu_quic,
node.info
.tvu(Protocol::QUIC)
.expect("Operator must spin up node with valid QUIC TVU address")
.ip(),
turbine_quic_endpoint_sender,
)
.unwrap();
let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
vote_account,
@ -1134,7 +1157,6 @@ impl Validator {
repair: node.sockets.repair,
retransmit: node.sockets.retransmit_sockets,
fetch: node.sockets.tvu,
fetch_quic: node.sockets.tvu_quic,
ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
},
blockstore.clone(),
@ -1175,8 +1197,8 @@ impl Validator {
&connection_cache,
&prioritization_fee_cache,
banking_tracer.clone(),
staked_nodes.clone(),
turbine_quic_connection_cache.clone(),
turbine_quic_endpoint_sender.clone(),
turbine_quic_endpoint_receiver,
)?;
let tpu = Tpu::new(
@ -1209,7 +1231,7 @@ impl Validator {
config.tpu_coalesce,
cluster_confirmed_slot_sender,
&connection_cache,
turbine_quic_connection_cache,
turbine_quic_endpoint_sender,
&identity_keypair,
config.runtime_config.log_messages_bytes_limit,
&staked_nodes,
@ -1258,6 +1280,9 @@ impl Validator {
ledger_metric_report_service,
accounts_background_service,
accounts_hash_verifier,
turbine_quic_endpoint,
turbine_quic_endpoint_runtime,
turbine_quic_endpoint_join_handle,
})
}
@ -1302,6 +1327,7 @@ impl Validator {
pub fn join(self) {
drop(self.bank_forks);
drop(self.cluster_info);
solana_turbine::quic_endpoint::close_quic_endpoint(&self.turbine_quic_endpoint);
self.poh_service.join().expect("poh_service");
drop(self.poh_recorder);
@ -1384,6 +1410,10 @@ impl Validator {
.expect("accounts_hash_verifier");
self.tpu.join().expect("tpu");
self.tvu.join().expect("tvu");
self.turbine_quic_endpoint_runtime
.map(|runtime| runtime.block_on(self.turbine_quic_endpoint_join_handle))
.transpose()
.unwrap();
self.completed_data_sets_service
.join()
.expect("completed_data_sets_service");

View File

@ -4765,6 +4765,7 @@ dependencies = [
"base64 0.21.2",
"bincode",
"bs58",
"bytes",
"chrono",
"crossbeam-channel",
"dashmap",
@ -4777,6 +4778,7 @@ dependencies = [
"lru",
"min-max-heap",
"num_enum 0.6.1",
"quinn",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",
@ -6239,7 +6241,6 @@ dependencies = [
"rayon",
"rcgen",
"rustls 0.20.8",
"solana-client",
"solana-entry",
"solana-gossip",
"solana-ledger",

View File

@ -23,7 +23,6 @@ rand_chacha = { workspace = true }
rayon = { workspace = true }
rcgen = { workspace = true }
rustls = { workspace = true }
solana-client = { workspace = true }
solana-entry = { workspace = true }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }

View File

@ -12,14 +12,13 @@ use {
genesis_utils::{create_genesis_config, GenesisConfigInfo},
shred::{Shred, ShredFlags},
},
solana_quic_client::new_quic_connection_cache,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
pubkey,
signature::{Keypair, Signer},
timing::{timestamp, AtomicInterval},
},
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_streamer::socket::SocketAddrSpace,
solana_turbine::{
broadcast_stage::{
broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage,
@ -28,7 +27,7 @@ use {
},
std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, UdpSocket},
net::UdpSocket,
sync::{Arc, RwLock},
time::Duration,
},
@ -39,14 +38,8 @@ use {
fn broadcast_shreds_bench(bencher: &mut Bencher) {
solana_logger::setup();
let leader_keypair = Arc::new(Keypair::new());
let quic_connection_cache = new_quic_connection_cache(
"connection_cache_test",
&leader_keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
4, // connection_pool_size
)
.unwrap();
let (quic_endpoint_sender, _quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*capacity:*/ 128);
let leader_info = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
let cluster_info = ClusterInfo::new(
leader_info.info,
@ -82,12 +75,12 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
&socket,
&shreds,
&cluster_nodes_cache,
&quic_connection_cache,
&last_datapoint,
&mut TransmitShredsStats::default(),
&cluster_info,
&bank_forks,
&SocketAddrSpace::Unspecified,
&quic_endpoint_sender,
)
.unwrap();
});

View File

@ -25,11 +25,11 @@ use {
system_transaction,
timing::timestamp,
},
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_streamer::socket::SocketAddrSpace,
solana_turbine::retransmit_stage::retransmitter,
std::{
iter::repeat_with,
net::{IpAddr, Ipv4Addr, UdpSocket},
net::{Ipv4Addr, UdpSocket},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
@ -97,16 +97,8 @@ fn bench_retransmitter(bencher: &mut Bencher) {
.collect();
let keypair = Keypair::new();
let quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_test",
&keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
4, // connection_pool_size
)
.unwrap(),
);
let (quic_endpoint_sender, _quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*capacity:*/ 128);
let slot = 0;
let parent = 0;
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
@ -125,7 +117,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let retransmitter_handles = retransmitter(
Arc::new(sockets),
quic_connection_cache,
quic_endpoint_sender,
bank_forks,
leader_schedule_cache,
cluster_info,

View File

@ -9,9 +9,9 @@ use {
standard_broadcast_run::StandardBroadcastRun,
},
crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache},
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender},
itertools::{Either, Itertools},
solana_client::tpu_connection::TpuConnection,
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::Protocol,
@ -20,7 +20,6 @@ use {
solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_error, inc_new_counter_info},
solana_poh::poh_recorder::WorkingBankEntry,
solana_quic_client::QuicConnectionCache,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::Slot,
@ -35,7 +34,7 @@ use {
std::{
collections::{HashMap, HashSet},
iter::repeat_with,
net::UdpSocket,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, RwLock,
@ -44,6 +43,7 @@ use {
time::{Duration, Instant},
},
thiserror::Error,
tokio::sync::mpsc::Sender as AsyncSender,
};
pub mod broadcast_duplicates_run;
@ -107,7 +107,7 @@ impl BroadcastStageType {
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
shred_version: u16,
quic_connection_cache: Arc<QuicConnectionCache>,
quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
) -> BroadcastStage {
match self {
BroadcastStageType::Standard => BroadcastStage::new(
@ -118,7 +118,8 @@ impl BroadcastStageType {
exit_sender,
blockstore,
bank_forks,
StandardBroadcastRun::new(shred_version, quic_connection_cache),
quic_endpoint_sender,
StandardBroadcastRun::new(shred_version),
),
BroadcastStageType::FailEntryVerification => BroadcastStage::new(
@ -129,7 +130,8 @@ impl BroadcastStageType {
exit_sender,
blockstore,
bank_forks,
FailEntryVerificationBroadcastRun::new(shred_version, quic_connection_cache),
quic_endpoint_sender,
FailEntryVerificationBroadcastRun::new(shred_version),
),
BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new(
@ -140,6 +142,7 @@ impl BroadcastStageType {
exit_sender,
blockstore,
bank_forks,
quic_endpoint_sender,
BroadcastFakeShredsRun::new(0, shred_version),
),
@ -151,6 +154,7 @@ impl BroadcastStageType {
exit_sender,
blockstore,
bank_forks,
quic_endpoint_sender,
BroadcastDuplicatesRun::new(shred_version, config.clone()),
),
}
@ -172,6 +176,7 @@ trait BroadcastRun {
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &RwLock<BankForks>,
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
) -> Result<()>;
fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()>;
}
@ -265,6 +270,7 @@ impl BroadcastStage {
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone,
) -> Self {
let (socket_sender, socket_receiver) = unbounded();
@ -296,8 +302,15 @@ impl BroadcastStage {
let mut bs_transmit = broadcast_stage_run.clone();
let cluster_info = cluster_info.clone();
let bank_forks = bank_forks.clone();
let quic_endpoint_sender = quic_endpoint_sender.clone();
let run_transmit = move || loop {
let res = bs_transmit.transmit(&socket_receiver, &cluster_info, &sock, &bank_forks);
let res = bs_transmit.transmit(
&socket_receiver,
&cluster_info,
&sock,
&bank_forks,
&quic_endpoint_sender,
);
let res = Self::handle_error(res, "solana-broadcaster-transmit");
if let Some(res) = res {
return res;
@ -411,12 +424,12 @@ pub fn broadcast_shreds(
s: &UdpSocket,
shreds: &[Shred],
cluster_nodes_cache: &ClusterNodesCache<BroadcastStage>,
quic_connection_cache: &QuicConnectionCache,
last_datapoint_submit: &AtomicInterval,
transmit_stats: &mut TransmitShredsStats,
cluster_info: &ClusterInfo,
bank_forks: &RwLock<BankForks>,
socket_addr_space: &SocketAddrSpace,
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
) -> Result<()> {
let mut result = Ok(());
let mut shred_select = Measure::start("shred_select");
@ -459,14 +472,14 @@ pub fn broadcast_shreds(
}
send_mmsg_time.stop();
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
for (shred, addr) in &quic_packets {
let conn = quic_connection_cache.get_connection(addr);
if let Err(err) = conn.send_data(shred) {
transmit_stats.total_packets += packets.len() + quic_packets.len();
for (shred, addr) in quic_packets {
let shred = Bytes::from(shred.clone());
if let Err(err) = quic_endpoint_sender.blocking_send((addr, shred)) {
transmit_stats.dropped_packets_quic += 1;
result = Err(Error::from(err));
}
}
transmit_stats.total_packets += packets.len() + quic_packets.len();
result
}
@ -476,6 +489,12 @@ impl<T> From<crossbeam_channel::SendError<T>> for Error {
}
}
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
fn from(_: tokio::sync::mpsc::error::SendError<T>) -> Error {
Error::Send
}
}
#[cfg(test)]
pub mod test {
use {
@ -494,9 +513,7 @@ pub mod test {
hash::Hash,
signature::{Keypair, Signer},
},
solana_streamer::streamer::StakedNodes,
std::{
net::{IpAddr, Ipv4Addr},
path::Path,
sync::{atomic::AtomicBool, Arc},
thread::sleep,
@ -628,16 +645,8 @@ pub mod test {
) -> MockBroadcastStage {
// Make the database ledger
let blockstore = Arc::new(Blockstore::open(ledger_path).unwrap());
let quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_test",
&leader_keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
4, // connection_pool_size
)
.unwrap(),
);
let (quic_endpoint_sender, _quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*capacity:*/ 128);
// Make the leader node and scheduler
let leader_info = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
@ -671,7 +680,8 @@ pub mod test {
exit_sender,
blockstore.clone(),
bank_forks,
StandardBroadcastRun::new(0, quic_connection_cache),
quic_endpoint_sender,
StandardBroadcastRun::new(0),
);
MockBroadcastStage {

View File

@ -265,6 +265,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &RwLock<BankForks>,
_quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
) -> Result<()> {
let (shreds, _) = receiver.recv()?;
if shreds.is_empty() {

View File

@ -132,6 +132,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
cluster_info: &ClusterInfo,
sock: &UdpSocket,
_bank_forks: &RwLock<BankForks>,
_quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
) -> Result<()> {
for (data_shreds, batch_info) in receiver {
let fake = batch_info.is_some();

View File

@ -4,6 +4,7 @@ use {
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{hash::Hash, signature::Keypair},
std::{thread::sleep, time::Duration},
tokio::sync::mpsc::Sender as AsyncSender,
};
pub const NUM_BAD_SLOTS: u64 = 10;
@ -17,12 +18,11 @@ pub(super) struct FailEntryVerificationBroadcastRun {
next_shred_index: u32,
next_code_index: u32,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
quic_connection_cache: Arc<QuicConnectionCache>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}
impl FailEntryVerificationBroadcastRun {
pub(super) fn new(shred_version: u16, quic_connection_cache: Arc<QuicConnectionCache>) -> Self {
pub(super) fn new(shred_version: u16) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
@ -34,7 +34,6 @@ impl FailEntryVerificationBroadcastRun {
next_shred_index: 0,
next_code_index: 0,
cluster_nodes_cache,
quic_connection_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
@ -164,18 +163,19 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &RwLock<BankForks>,
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
) -> Result<()> {
let (shreds, _) = receiver.recv()?;
broadcast_shreds(
sock,
&shreds,
&self.cluster_nodes_cache,
&self.quic_connection_cache,
&AtomicInterval::default(),
&mut TransmitShredsStats::default(),
cluster_info,
bank_forks,
cluster_info.socket_addr_space(),
quic_endpoint_sender,
)
}
fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> {

View File

@ -19,6 +19,7 @@ use {
timing::{duration_as_us, AtomicInterval},
},
std::{sync::RwLock, time::Duration},
tokio::sync::mpsc::Sender as AsyncSender,
};
#[derive(Clone)]
@ -33,7 +34,6 @@ pub struct StandardBroadcastRun {
last_datapoint_submit: Arc<AtomicInterval>,
num_batches: usize,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
quic_connection_cache: Arc<QuicConnectionCache>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}
@ -43,7 +43,7 @@ enum BroadcastError {
}
impl StandardBroadcastRun {
pub(super) fn new(shred_version: u16, quic_connection_cache: Arc<QuicConnectionCache>) -> Self {
pub(super) fn new(shred_version: u16) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
@ -59,7 +59,6 @@ impl StandardBroadcastRun {
last_datapoint_submit: Arc::default(),
num_batches: 0,
cluster_nodes_cache,
quic_connection_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
@ -195,15 +194,16 @@ impl StandardBroadcastRun {
blockstore: &Blockstore,
receive_results: ReceiveResults,
bank_forks: &RwLock<BankForks>,
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
) -> Result<()> {
let (bsend, brecv) = unbounded();
let (ssend, srecv) = unbounded();
self.process_receive_results(keypair, blockstore, &ssend, &bsend, receive_results)?;
//data
let _ = self.transmit(&srecv, cluster_info, sock, bank_forks);
let _ = self.transmit(&srecv, cluster_info, sock, bank_forks, quic_endpoint_sender);
let _ = self.record(&brecv, blockstore);
//coding
let _ = self.transmit(&srecv, cluster_info, sock, bank_forks);
let _ = self.transmit(&srecv, cluster_info, sock, bank_forks, quic_endpoint_sender);
let _ = self.record(&brecv, blockstore);
Ok(())
}
@ -402,6 +402,7 @@ impl StandardBroadcastRun {
shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
bank_forks: &RwLock<BankForks>,
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
) -> Result<()> {
trace!("Broadcasting {:?} shreds", shreds.len());
let mut transmit_stats = TransmitShredsStats::default();
@ -412,12 +413,12 @@ impl StandardBroadcastRun {
sock,
&shreds,
&self.cluster_nodes_cache,
&self.quic_connection_cache,
&self.last_datapoint_submit,
&mut transmit_stats,
cluster_info,
bank_forks,
cluster_info.socket_addr_space(),
quic_endpoint_sender,
)?;
transmit_time.stop();
@ -487,9 +488,17 @@ impl BroadcastRun for StandardBroadcastRun {
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &RwLock<BankForks>,
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
) -> Result<()> {
let (shreds, batch_info) = receiver.recv()?;
self.broadcast(sock, cluster_info, shreds, batch_info, bank_forks)
self.broadcast(
sock,
cluster_info,
shreds,
batch_info,
bank_forks,
quic_endpoint_sender,
)
}
fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> {
let (shreds, slot_start_ts) = receiver.recv()?;
@ -520,13 +529,8 @@ mod test {
genesis_config::GenesisConfig,
signature::{Keypair, Signer},
},
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
std::{
net::{IpAddr, Ipv4Addr},
ops::Deref,
sync::Arc,
time::Duration,
},
solana_streamer::socket::SocketAddrSpace,
std::{ops::Deref, sync::Arc, time::Duration},
};
#[allow(clippy::type_complexity)]
@ -572,24 +576,10 @@ mod test {
)
}
fn new_quic_connection_cache(keypair: &Keypair) -> Arc<QuicConnectionCache> {
Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_test",
keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
4, // connection_pool_size
)
.unwrap(),
)
}
#[test]
fn test_interrupted_slot_last_shred() {
let keypair = Arc::new(Keypair::new());
let quic_connection_cache = new_quic_connection_cache(&keypair);
let mut run = StandardBroadcastRun::new(0, quic_connection_cache);
let mut run = StandardBroadcastRun::new(0);
// Set up the slot to be interrupted
let next_shred_index = 10;
@ -631,7 +621,8 @@ mod test {
let num_shreds_per_slot = 2;
let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) =
setup(num_shreds_per_slot);
let quic_connection_cache = new_quic_connection_cache(&leader_keypair);
let (quic_endpoint_sender, _quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*capacity:*/ 128);
// Insert 1 less than the number of ticks needed to finish the slot
let ticks0 = create_ticks(genesis_config.ticks_per_slot - 1, 0, genesis_config.hash());
@ -644,7 +635,7 @@ mod test {
};
// Step 1: Make an incomplete transmission for slot 0
let mut standard_broadcast_run = StandardBroadcastRun::new(0, quic_connection_cache);
let mut standard_broadcast_run = StandardBroadcastRun::new(0);
standard_broadcast_run
.test_process_receive_results(
&leader_keypair,
@ -653,6 +644,7 @@ mod test {
&blockstore,
receive_results,
&bank_forks,
&quic_endpoint_sender,
)
.unwrap();
let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap();
@ -719,6 +711,7 @@ mod test {
&blockstore,
receive_results,
&bank_forks,
&quic_endpoint_sender,
)
.unwrap();
let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap();
@ -762,11 +755,10 @@ mod test {
let num_shreds_per_slot = 2;
let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket, _bank_forks) =
setup(num_shreds_per_slot);
let quic_connection_cache = new_quic_connection_cache(&leader_keypair);
let (bsend, brecv) = unbounded();
let (ssend, _srecv) = unbounded();
let mut last_tick_height = 0;
let mut standard_broadcast_run = StandardBroadcastRun::new(0, quic_connection_cache);
let mut standard_broadcast_run = StandardBroadcastRun::new(0);
let mut process_ticks = |num_ticks| {
let ticks = create_ticks(num_ticks, 0, genesis_config.hash());
last_tick_height += (ticks.len() - 1) as u64;
@ -811,7 +803,8 @@ mod test {
let num_shreds_per_slot = 2;
let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) =
setup(num_shreds_per_slot);
let quic_connection_cache = new_quic_connection_cache(&leader_keypair);
let (quic_endpoint_sender, _quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*capacity:*/ 128);
// Insert complete slot of ticks needed to finish the slot
let ticks = create_ticks(genesis_config.ticks_per_slot, 0, genesis_config.hash());
@ -823,7 +816,7 @@ mod test {
last_tick_height: ticks.len() as u64,
};
let mut standard_broadcast_run = StandardBroadcastRun::new(0, quic_connection_cache);
let mut standard_broadcast_run = StandardBroadcastRun::new(0);
standard_broadcast_run
.test_process_receive_results(
&leader_keypair,
@ -832,6 +825,7 @@ mod test {
&blockstore,
receive_results,
&bank_forks,
&quic_endpoint_sender,
)
.unwrap();
assert!(standard_broadcast_run.unfinished_slot.is_none())
@ -841,8 +835,7 @@ mod test {
fn entries_to_shreds_max() {
solana_logger::setup();
let keypair = Keypair::new();
let quic_connection_cache = new_quic_connection_cache(&keypair);
let mut bs = StandardBroadcastRun::new(0, quic_connection_cache);
let mut bs = StandardBroadcastRun::new(0);
bs.current_slot_and_parent = Some((1, 0));
let entries = create_ticks(10_000, 1, solana_sdk::hash::Hash::default());

View File

@ -23,7 +23,6 @@ use {
},
thiserror::Error,
tokio::{
runtime::Runtime,
sync::{
mpsc::{Receiver as AsyncReceiver, Sender as AsyncSender},
RwLock,
@ -47,6 +46,7 @@ const CONNECTION_CLOSE_REASON_DROPPED: &[u8] = b"DROPPED";
const CONNECTION_CLOSE_REASON_INVALID_IDENTITY: &[u8] = b"INVALID_IDENTITY";
const CONNECTION_CLOSE_REASON_REPLACED: &[u8] = b"REPLACED";
pub type AsyncTryJoinHandle = TryJoin<JoinHandle<()>, JoinHandle<()>>;
type ConnectionCache = HashMap<(SocketAddr, Option<Pubkey>), Arc<RwLock<Option<Connection>>>>;
#[derive(Error, Debug)]
@ -71,7 +71,7 @@ pub enum Error {
#[allow(clippy::type_complexity)]
pub fn new_quic_endpoint(
runtime: &Runtime,
runtime: &tokio::runtime::Handle,
keypair: &Keypair,
socket: UdpSocket,
address: IpAddr,
@ -80,7 +80,7 @@ pub fn new_quic_endpoint(
(
Endpoint,
AsyncSender<(SocketAddr, Bytes)>,
TryJoin<JoinHandle<()>, JoinHandle<()>>,
AsyncTryJoinHandle,
),
Error,
> {
@ -156,6 +156,7 @@ async fn run_server(
) {
while let Some(connecting) = endpoint.accept().await {
tokio::task::spawn(handle_connecting_error(
endpoint.clone(),
connecting,
sender.clone(),
cache.clone(),
@ -182,16 +183,18 @@ async fn run_client(
}
async fn handle_connecting_error(
endpoint: Endpoint,
connecting: Connecting,
sender: Sender<(Pubkey, SocketAddr, Bytes)>,
cache: Arc<RwLock<ConnectionCache>>,
) {
if let Err(err) = handle_connecting(connecting, sender, cache).await {
if let Err(err) = handle_connecting(endpoint, connecting, sender, cache).await {
error!("handle_connecting: {err:?}");
}
}
async fn handle_connecting(
endpoint: Endpoint,
connecting: Connecting,
sender: Sender<(Pubkey, SocketAddr, Bytes)>,
cache: Arc<RwLock<ConnectionCache>>,
@ -199,11 +202,20 @@ async fn handle_connecting(
let connection = connecting.await?;
let remote_address = connection.remote_address();
let remote_pubkey = get_remote_pubkey(&connection)?;
handle_connection_error(remote_address, remote_pubkey, connection, sender, cache).await;
handle_connection_error(
endpoint,
remote_address,
remote_pubkey,
connection,
sender,
cache,
)
.await;
Ok(())
}
async fn handle_connection_error(
endpoint: Endpoint,
remote_address: SocketAddr,
remote_pubkey: Pubkey,
connection: Connection,
@ -211,23 +223,37 @@ async fn handle_connection_error(
cache: Arc<RwLock<ConnectionCache>>,
) {
cache_connection(remote_address, remote_pubkey, connection.clone(), &cache).await;
if let Err(err) = handle_connection(remote_address, remote_pubkey, &connection, sender).await {
if let Err(err) = handle_connection(
&endpoint,
remote_address,
remote_pubkey,
&connection,
&sender,
)
.await
{
drop_connection(remote_address, remote_pubkey, &connection, &cache).await;
error!("handle_connection: {remote_pubkey}, {remote_address}, {err:?}");
}
}
async fn handle_connection(
endpoint: &Endpoint,
remote_address: SocketAddr,
remote_pubkey: Pubkey,
connection: &Connection,
sender: Sender<(Pubkey, SocketAddr, Bytes)>,
sender: &Sender<(Pubkey, SocketAddr, Bytes)>,
) -> Result<(), Error> {
// Assert that send won't block.
debug_assert_eq!(sender.capacity(), None);
loop {
match connection.read_datagram().await {
Ok(bytes) => sender.send((remote_pubkey, remote_address, bytes))?,
Ok(bytes) => {
if let Err(err) = sender.send((remote_pubkey, remote_address, bytes)) {
close_quic_endpoint(endpoint);
return Err(Error::from(err));
}
}
Err(err) => {
if let Some(err) = connection.close_reason() {
return Err(Error::from(err));
@ -293,6 +319,7 @@ async fn get_connection(
entry.insert(connection).clone()
};
tokio::task::spawn(handle_connection_error(
endpoint.clone(),
connection.remote_address(),
get_remote_pubkey(&connection)?,
connection.clone(),
@ -404,7 +431,7 @@ mod tests {
multiunzip(keypairs.iter().zip(sockets).zip(senders).map(
|((keypair, socket), sender)| {
new_quic_endpoint(
&runtime,
runtime.handle(),
keypair,
socket,
IpAddr::V4(Ipv4Addr::LOCALHOST),

View File

@ -3,12 +3,12 @@
use {
crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
bytes::Bytes,
crossbeam_channel::{Receiver, RecvTimeoutError},
itertools::{izip, Itertools},
lru::LruCache,
rand::Rng,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_client::tpu_connection::TpuConnection,
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache,
@ -16,7 +16,6 @@ use {
},
solana_measure::measure::Measure,
solana_perf::deduper::Deduper,
solana_quic_client::QuicConnectionCache,
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_rpc_client_api::response::SlotUpdate,
@ -29,7 +28,7 @@ use {
std::{
collections::HashMap,
iter::repeat,
net::UdpSocket,
net::{SocketAddr, UdpSocket},
ops::AddAssign,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
@ -38,6 +37,7 @@ use {
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
tokio::sync::mpsc::Sender as AsyncSender,
};
const MAX_DUPLICATE_COUNT: usize = 2;
@ -173,7 +173,7 @@ fn retransmit(
cluster_info: &ClusterInfo,
shreds_receiver: &Receiver<Vec</*shred:*/ Vec<u8>>>,
sockets: &[UdpSocket],
quic_connection_cache: &QuicConnectionCache,
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
stats: &mut RetransmitStats,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
shred_deduper: &mut ShredDeduper<2>,
@ -257,7 +257,7 @@ fn retransmit(
&cluster_nodes,
socket_addr_space,
&sockets[index % sockets.len()],
quic_connection_cache,
quic_endpoint_sender,
stats,
)
.map_err(|err| {
@ -282,7 +282,7 @@ fn retransmit(
&cluster_nodes,
socket_addr_space,
&sockets[index % sockets.len()],
quic_connection_cache,
quic_endpoint_sender,
stats,
)
.map_err(|err| {
@ -311,7 +311,7 @@ fn retransmit_shred(
cluster_nodes: &ClusterNodes<RetransmitStage>,
socket_addr_space: &SocketAddrSpace,
socket: &UdpSocket,
quic_connection_cache: &QuicConnectionCache,
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
stats: &RetransmitStats,
) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> {
let mut compute_turbine_peers = Measure::start("turbine_start");
@ -328,16 +328,15 @@ fn retransmit_shred(
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);
let mut retransmit_time = Measure::start("retransmit_to");
let num_addrs = addrs.len();
let num_nodes = match cluster_nodes::get_broadcast_protocol(key) {
Protocol::QUIC => addrs
.iter()
.filter_map(|addr| {
quic_connection_cache
.get_connection(addr)
.send_data(shred)
.ok()
})
.count(),
Protocol::QUIC => {
let shred = Bytes::copy_from_slice(shred);
addrs
.into_iter()
.filter_map(|addr| quic_endpoint_sender.try_send((addr, shred.clone())).ok())
.count()
}
Protocol::UDP => match multi_target_send(socket, shred, &addrs) {
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
@ -354,7 +353,7 @@ fn retransmit_shred(
retransmit_time.stop();
stats
.num_addrs_failed
.fetch_add(addrs.len() - num_nodes, Ordering::Relaxed);
.fetch_add(num_addrs - num_nodes, Ordering::Relaxed);
stats.num_nodes.fetch_add(num_nodes, Ordering::Relaxed);
stats
.retransmit_total
@ -372,7 +371,7 @@ fn retransmit_shred(
/// * `r` - Receive channel for shreds to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
sockets: Arc<Vec<UdpSocket>>,
quic_connection_cache: Arc<QuicConnectionCache>,
quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>,
@ -404,7 +403,7 @@ pub fn retransmitter(
&cluster_info,
&shreds_receiver,
&sockets,
&quic_connection_cache,
&quic_endpoint_sender,
&mut stats,
&cluster_nodes_cache,
&mut shred_deduper,
@ -429,14 +428,14 @@ impl RetransmitStage {
leader_schedule_cache: Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>,
retransmit_sockets: Arc<Vec<UdpSocket>>,
quic_connection_cache: Arc<QuicConnectionCache>,
quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
retransmit_receiver: Receiver<Vec</*shred:*/ Vec<u8>>>,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Self {
let retransmit_thread_handle = retransmitter(
retransmit_sockets,
quic_connection_cache,
quic_endpoint_sender,
bank_forks,
leader_schedule_cache,
cluster_info,