2019-11-14 11:49:31 -08:00
|
|
|
//! `window_service` handles the data plane incoming shreds, storing them in
|
2020-01-13 13:13:52 -08:00
|
|
|
//! blockstore and retransmitting where required
|
2018-09-07 15:00:26 -07:00
|
|
|
//!
|
2021-08-13 07:47:02 -07:00
|
|
|
use {
|
|
|
|
crate::{
|
|
|
|
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
|
|
|
|
cluster_info_vote_listener::VerifiedVoteReceiver,
|
|
|
|
completed_data_sets_service::CompletedDataSetsSender,
|
|
|
|
repair_response,
|
|
|
|
repair_service::{OutstandingShredRepairs, RepairInfo, RepairService},
|
|
|
|
result::{Error, Result},
|
|
|
|
},
|
2022-01-11 02:44:46 -08:00
|
|
|
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
|
2021-08-13 07:47:02 -07:00
|
|
|
rayon::{prelude::*, ThreadPool},
|
|
|
|
solana_gossip::cluster_info::ClusterInfo,
|
|
|
|
solana_ledger::{
|
|
|
|
blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT},
|
|
|
|
leader_schedule_cache::LeaderScheduleCache,
|
2021-11-16 09:50:56 -08:00
|
|
|
shred::{Nonce, Shred, ShredType},
|
2021-08-13 07:47:02 -07:00
|
|
|
},
|
|
|
|
solana_measure::measure::Measure,
|
|
|
|
solana_metrics::{inc_new_counter_debug, inc_new_counter_error},
|
2021-12-11 06:44:15 -08:00
|
|
|
solana_perf::packet::{Packet, PacketBatch},
|
2021-08-13 07:47:02 -07:00
|
|
|
solana_rayon_threadlimit::get_thread_count,
|
|
|
|
solana_runtime::{bank::Bank, bank_forks::BankForks},
|
2021-08-12 17:58:23 -07:00
|
|
|
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey},
|
2021-08-13 07:47:02 -07:00
|
|
|
std::{
|
2021-08-12 17:58:23 -07:00
|
|
|
cmp::Reverse,
|
2021-12-03 09:00:31 -08:00
|
|
|
collections::{HashMap, HashSet},
|
2021-08-13 07:47:02 -07:00
|
|
|
net::{SocketAddr, UdpSocket},
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicBool, Ordering},
|
|
|
|
Arc, RwLock,
|
|
|
|
},
|
|
|
|
thread::{self, Builder, JoinHandle},
|
|
|
|
time::{Duration, Instant},
|
|
|
|
},
|
2020-05-19 12:38:18 -07:00
|
|
|
};
|
2018-09-07 15:00:26 -07:00
|
|
|
|
2022-01-11 02:44:46 -08:00
|
|
|
type DuplicateSlotSender = Sender<Slot>;
|
|
|
|
pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
|
2021-03-24 23:41:52 -07:00
|
|
|
|
2021-07-28 19:30:43 -07:00
|
|
|
#[derive(Default)]
|
|
|
|
struct WindowServiceMetrics {
|
|
|
|
run_insert_count: u64,
|
|
|
|
num_shreds_received: u64,
|
|
|
|
shred_receiver_elapsed_us: u64,
|
|
|
|
prune_shreds_elapsed_us: u64,
|
2021-10-15 07:13:26 -07:00
|
|
|
num_shreds_pruned_invalid_repair: usize,
|
|
|
|
num_errors: u64,
|
|
|
|
num_errors_blockstore: u64,
|
|
|
|
num_errors_cross_beam_recv_timeout: u64,
|
|
|
|
num_errors_other: u64,
|
|
|
|
num_errors_try_crossbeam_send: u64,
|
2021-07-28 19:30:43 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl WindowServiceMetrics {
|
2021-08-13 07:47:02 -07:00
|
|
|
fn report_metrics(&self, metric_name: &'static str) {
|
2021-07-28 19:30:43 -07:00
|
|
|
datapoint_info!(
|
|
|
|
metric_name,
|
|
|
|
("run_insert_count", self.run_insert_count as i64, i64),
|
|
|
|
("num_shreds_received", self.num_shreds_received as i64, i64),
|
|
|
|
(
|
|
|
|
"shred_receiver_elapsed_us",
|
|
|
|
self.shred_receiver_elapsed_us as i64,
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"prune_shreds_elapsed_us",
|
|
|
|
self.prune_shreds_elapsed_us as i64,
|
|
|
|
i64
|
|
|
|
),
|
2021-10-15 07:13:26 -07:00
|
|
|
(
|
|
|
|
"num_shreds_pruned_invalid_repair",
|
|
|
|
self.num_shreds_pruned_invalid_repair,
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
("num_errors", self.num_errors, i64),
|
|
|
|
("num_errors_blockstore", self.num_errors_blockstore, i64),
|
|
|
|
("num_errors_other", self.num_errors_other, i64),
|
|
|
|
(
|
|
|
|
"num_errors_try_crossbeam_send",
|
|
|
|
self.num_errors_try_crossbeam_send,
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"num_errors_cross_beam_recv_timeout",
|
|
|
|
self.num_errors_cross_beam_recv_timeout,
|
|
|
|
i64
|
|
|
|
),
|
2021-07-28 19:30:43 -07:00
|
|
|
);
|
|
|
|
}
|
2021-10-15 07:13:26 -07:00
|
|
|
|
|
|
|
fn record_error(&mut self, err: &Error) {
|
|
|
|
self.num_errors += 1;
|
|
|
|
match err {
|
2022-01-11 02:44:46 -08:00
|
|
|
Error::TrySend => self.num_errors_try_crossbeam_send += 1,
|
|
|
|
Error::RecvTimeout(_) => self.num_errors_cross_beam_recv_timeout += 1,
|
2021-10-15 07:13:26 -07:00
|
|
|
Error::Blockstore(err) => {
|
|
|
|
self.num_errors_blockstore += 1;
|
|
|
|
error!("blockstore error: {}", err);
|
|
|
|
}
|
|
|
|
_ => self.num_errors_other += 1,
|
|
|
|
}
|
|
|
|
}
|
2021-07-28 19:30:43 -07:00
|
|
|
}
|
|
|
|
|
2021-08-12 17:58:23 -07:00
|
|
|
#[derive(Default)]
|
|
|
|
struct ReceiveWindowStats {
|
|
|
|
num_packets: usize,
|
|
|
|
num_shreds: usize, // num_discards: num_packets - num_shreds
|
|
|
|
num_repairs: usize,
|
|
|
|
elapsed: Duration, // excludes waiting time on the receiver channel.
|
|
|
|
slots: HashMap<Slot, /*num shreds:*/ usize>,
|
|
|
|
addrs: HashMap</*source:*/ SocketAddr, /*num packets:*/ usize>,
|
|
|
|
since: Option<Instant>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ReceiveWindowStats {
|
|
|
|
fn maybe_submit(&mut self) {
|
|
|
|
const MAX_NUM_ADDRS: usize = 5;
|
|
|
|
const SUBMIT_CADENCE: Duration = Duration::from_secs(2);
|
|
|
|
let elapsed = self.since.as_ref().map(Instant::elapsed);
|
|
|
|
if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
datapoint_info!(
|
|
|
|
"receive_window_stats",
|
|
|
|
("num_packets", self.num_packets, i64),
|
|
|
|
("num_shreds", self.num_shreds, i64),
|
|
|
|
("num_repairs", self.num_repairs, i64),
|
|
|
|
("elapsed_micros", self.elapsed.as_micros(), i64),
|
|
|
|
);
|
|
|
|
for (slot, num_shreds) in &self.slots {
|
2022-04-20 06:51:46 -07:00
|
|
|
datapoint_debug!(
|
2021-08-12 17:58:23 -07:00
|
|
|
"receive_window_num_slot_shreds",
|
|
|
|
("slot", *slot, i64),
|
|
|
|
("num_shreds", *num_shreds, i64)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
let mut addrs: Vec<_> = std::mem::take(&mut self.addrs).into_iter().collect();
|
|
|
|
let reverse_count = |(_addr, count): &_| Reverse(*count);
|
|
|
|
if addrs.len() > MAX_NUM_ADDRS {
|
|
|
|
addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count);
|
|
|
|
addrs.truncate(MAX_NUM_ADDRS);
|
|
|
|
}
|
|
|
|
addrs.sort_unstable_by_key(reverse_count);
|
|
|
|
info!(
|
|
|
|
"num addresses: {}, top packets by source: {:?}",
|
|
|
|
self.addrs.len(),
|
|
|
|
addrs
|
|
|
|
);
|
|
|
|
*self = Self {
|
|
|
|
since: Some(Instant::now()),
|
|
|
|
..Self::default()
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-09-24 12:25:25 -07:00
|
|
|
fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
|
2021-11-16 09:50:56 -08:00
|
|
|
match shred.shred_type() {
|
2019-09-24 12:25:25 -07:00
|
|
|
// Only data shreds have parent information
|
2021-11-23 06:45:26 -08:00
|
|
|
ShredType::Data => match shred.parent() {
|
2021-12-09 08:43:57 -08:00
|
|
|
Ok(parent) => blockstore::verify_shred_slots(shred.slot(), parent, root),
|
|
|
|
Err(_) => false,
|
2021-11-23 06:45:26 -08:00
|
|
|
},
|
2019-09-24 12:25:25 -07:00
|
|
|
// Filter out outdated coding shreds
|
2021-11-16 09:50:56 -08:00
|
|
|
ShredType::Code => shred.slot() >= root,
|
2019-09-24 12:25:25 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-14 11:49:31 -08:00
|
|
|
/// drop shreds that are from myself or not from the correct leader for the
|
|
|
|
/// shred's slot
|
2021-08-13 07:47:02 -07:00
|
|
|
pub(crate) fn should_retransmit_and_persist(
|
2019-09-18 16:24:30 -07:00
|
|
|
shred: &Shred,
|
2019-05-13 21:19:51 -07:00
|
|
|
bank: Option<Arc<Bank>>,
|
2021-08-13 07:47:02 -07:00
|
|
|
leader_schedule_cache: &LeaderScheduleCache,
|
2019-05-23 23:20:04 -07:00
|
|
|
my_pubkey: &Pubkey,
|
2019-09-16 13:13:53 -07:00
|
|
|
root: u64,
|
2019-11-18 18:05:02 -08:00
|
|
|
shred_version: u16,
|
2019-04-22 18:41:01 -07:00
|
|
|
) -> bool {
|
2021-08-13 07:46:02 -07:00
|
|
|
let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), bank.as_deref());
|
2019-08-20 17:16:06 -07:00
|
|
|
if let Some(leader_id) = slot_leader_pubkey {
|
|
|
|
if leader_id == *my_pubkey {
|
|
|
|
inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1);
|
|
|
|
false
|
2019-09-24 12:25:25 -07:00
|
|
|
} else if !verify_shred_slot(shred, root) {
|
2019-09-16 13:13:53 -07:00
|
|
|
inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1);
|
|
|
|
false
|
2019-11-18 18:05:02 -08:00
|
|
|
} else if shred.version() != shred_version {
|
|
|
|
inc_new_counter_debug!("streamer-recv_window-incorrect_shred_version", 1);
|
|
|
|
false
|
2019-12-30 07:42:09 -08:00
|
|
|
} else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 {
|
|
|
|
inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1);
|
|
|
|
false
|
2021-10-13 20:56:14 -07:00
|
|
|
} else if shred.data_header.size as usize > shred.payload.len() {
|
|
|
|
inc_new_counter_warn!("streamer-recv_window-shred_bad_meta_size", 1);
|
|
|
|
false
|
2019-08-20 17:16:06 -07:00
|
|
|
} else {
|
|
|
|
true
|
|
|
|
}
|
|
|
|
} else {
|
2019-05-17 07:00:06 -07:00
|
|
|
inc_new_counter_debug!("streamer-recv_window-unknown_leader", 1);
|
2019-05-24 19:20:09 -07:00
|
|
|
false
|
2019-04-22 15:21:10 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-01-16 15:27:54 -08:00
|
|
|
fn run_check_duplicate(
|
2021-01-24 07:47:43 -08:00
|
|
|
cluster_info: &ClusterInfo,
|
|
|
|
blockstore: &Blockstore,
|
2022-01-11 02:44:46 -08:00
|
|
|
shred_receiver: &Receiver<Shred>,
|
2021-03-24 23:41:52 -07:00
|
|
|
duplicate_slot_sender: &DuplicateSlotSender,
|
2020-01-16 15:27:54 -08:00
|
|
|
) -> Result<()> {
|
|
|
|
let check_duplicate = |shred: Shred| -> Result<()> {
|
2021-03-24 23:41:52 -07:00
|
|
|
let shred_slot = shred.slot();
|
|
|
|
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
|
2021-12-14 09:34:02 -08:00
|
|
|
if let Some(existing_shred_payload) =
|
|
|
|
blockstore.is_shred_duplicate(shred.id(), shred.payload.clone())
|
|
|
|
{
|
2021-01-24 07:47:43 -08:00
|
|
|
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
|
2020-01-16 15:27:54 -08:00
|
|
|
blockstore.store_duplicate_slot(
|
2021-03-24 23:41:52 -07:00
|
|
|
shred_slot,
|
2020-01-16 15:27:54 -08:00
|
|
|
existing_shred_payload,
|
|
|
|
shred.payload,
|
|
|
|
)?;
|
2021-03-24 23:41:52 -07:00
|
|
|
|
|
|
|
duplicate_slot_sender.send(shred_slot)?;
|
2020-01-16 15:27:54 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
};
|
2021-12-18 09:39:07 -08:00
|
|
|
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
|
|
|
|
std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
|
|
|
|
.chain(shred_receiver.try_iter())
|
|
|
|
.try_for_each(check_duplicate)
|
2020-01-16 15:27:54 -08:00
|
|
|
}
|
|
|
|
|
2021-04-20 09:37:33 -07:00
|
|
|
fn verify_repair(
|
2021-07-21 11:15:08 -07:00
|
|
|
outstanding_requests: &mut OutstandingShredRepairs,
|
2021-04-20 09:37:33 -07:00
|
|
|
shred: &Shred,
|
|
|
|
repair_meta: &Option<RepairMeta>,
|
|
|
|
) -> bool {
|
|
|
|
repair_meta
|
2020-05-19 12:38:18 -07:00
|
|
|
.as_ref()
|
2021-04-20 09:37:33 -07:00
|
|
|
.map(|repair_meta| {
|
2021-07-23 16:54:47 -07:00
|
|
|
outstanding_requests
|
|
|
|
.register_response(
|
|
|
|
repair_meta.nonce,
|
|
|
|
shred,
|
|
|
|
solana_sdk::timing::timestamp(),
|
|
|
|
|_| (),
|
|
|
|
)
|
|
|
|
.is_some()
|
2021-04-20 09:37:33 -07:00
|
|
|
})
|
2020-05-19 12:38:18 -07:00
|
|
|
.unwrap_or(true)
|
|
|
|
}
|
|
|
|
|
2021-04-20 09:37:33 -07:00
|
|
|
fn prune_shreds_invalid_repair(
|
|
|
|
shreds: &mut Vec<Shred>,
|
|
|
|
repair_infos: &mut Vec<Option<RepairMeta>>,
|
2021-08-13 07:47:02 -07:00
|
|
|
outstanding_requests: &RwLock<OutstandingShredRepairs>,
|
2021-04-20 09:37:33 -07:00
|
|
|
) {
|
|
|
|
assert_eq!(shreds.len(), repair_infos.len());
|
|
|
|
let mut i = 0;
|
|
|
|
let mut removed = HashSet::new();
|
|
|
|
{
|
|
|
|
let mut outstanding_requests = outstanding_requests.write().unwrap();
|
|
|
|
shreds.retain(|shred| {
|
|
|
|
let should_keep = (
|
2021-06-18 06:34:46 -07:00
|
|
|
verify_repair(&mut outstanding_requests, shred, &repair_infos[i]),
|
2021-04-20 09:37:33 -07:00
|
|
|
i += 1,
|
|
|
|
)
|
|
|
|
.0;
|
|
|
|
if !should_keep {
|
|
|
|
removed.insert(i - 1);
|
|
|
|
}
|
|
|
|
should_keep
|
|
|
|
});
|
|
|
|
}
|
|
|
|
i = 0;
|
|
|
|
repair_infos.retain(|_repair_info| (!removed.contains(&i), i += 1).0);
|
|
|
|
assert_eq!(shreds.len(), repair_infos.len());
|
|
|
|
}
|
|
|
|
|
2020-01-16 15:27:54 -08:00
|
|
|
fn run_insert<F>(
|
2022-01-11 02:44:46 -08:00
|
|
|
shred_receiver: &Receiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
2021-08-13 07:47:02 -07:00
|
|
|
blockstore: &Blockstore,
|
|
|
|
leader_schedule_cache: &LeaderScheduleCache,
|
2020-01-16 15:27:54 -08:00
|
|
|
handle_duplicate: F,
|
2020-03-26 12:51:41 -07:00
|
|
|
metrics: &mut BlockstoreInsertionMetrics,
|
2021-07-28 19:30:43 -07:00
|
|
|
ws_metrics: &mut WindowServiceMetrics,
|
2020-09-01 22:06:06 -07:00
|
|
|
completed_data_sets_sender: &CompletedDataSetsSender,
|
2021-08-13 11:11:37 -07:00
|
|
|
retransmit_sender: &Sender<Vec<Shred>>,
|
2021-08-13 07:47:02 -07:00
|
|
|
outstanding_requests: &RwLock<OutstandingShredRepairs>,
|
2020-01-16 15:27:54 -08:00
|
|
|
) -> Result<()>
|
|
|
|
where
|
2020-06-08 17:38:14 -07:00
|
|
|
F: Fn(Shred),
|
2020-01-16 15:27:54 -08:00
|
|
|
{
|
2021-10-15 07:13:26 -07:00
|
|
|
ws_metrics.run_insert_count += 1;
|
2021-07-28 19:30:43 -07:00
|
|
|
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
|
2019-12-19 00:15:49 -08:00
|
|
|
let timer = Duration::from_millis(200);
|
2020-05-19 12:38:18 -07:00
|
|
|
let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?;
|
|
|
|
while let Ok((more_shreds, more_repair_infos)) = shred_receiver.try_recv() {
|
|
|
|
shreds.extend(more_shreds);
|
|
|
|
repair_infos.extend(more_repair_infos);
|
2019-12-19 00:15:49 -08:00
|
|
|
}
|
2021-07-28 19:30:43 -07:00
|
|
|
shred_receiver_elapsed.stop();
|
2021-10-15 07:13:26 -07:00
|
|
|
ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
|
2021-07-28 19:30:43 -07:00
|
|
|
ws_metrics.num_shreds_received += shreds.len() as u64;
|
2019-12-19 00:15:49 -08:00
|
|
|
|
2021-07-28 19:30:43 -07:00
|
|
|
let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed");
|
2021-10-15 07:13:26 -07:00
|
|
|
let num_shreds = shreds.len();
|
2021-04-20 09:37:33 -07:00
|
|
|
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests);
|
2021-10-15 07:13:26 -07:00
|
|
|
ws_metrics.num_shreds_pruned_invalid_repair = num_shreds - shreds.len();
|
2021-06-30 09:20:07 -07:00
|
|
|
let repairs: Vec<_> = repair_infos
|
|
|
|
.iter()
|
|
|
|
.map(|repair_info| repair_info.is_some())
|
|
|
|
.collect();
|
2021-07-28 19:30:43 -07:00
|
|
|
prune_shreds_elapsed.stop();
|
2021-10-15 07:13:26 -07:00
|
|
|
ws_metrics.prune_shreds_elapsed_us += prune_shreds_elapsed.as_us();
|
2020-05-19 12:38:18 -07:00
|
|
|
|
2020-09-29 14:13:21 -07:00
|
|
|
let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate(
|
2020-01-16 15:27:54 -08:00
|
|
|
shreds,
|
2021-06-30 09:20:07 -07:00
|
|
|
repairs,
|
2020-01-16 15:27:54 -08:00
|
|
|
Some(leader_schedule_cache),
|
2021-08-13 11:11:37 -07:00
|
|
|
false, // is_trusted
|
|
|
|
Some(retransmit_sender),
|
2020-01-16 15:27:54 -08:00
|
|
|
&handle_duplicate,
|
2020-03-26 12:51:41 -07:00
|
|
|
metrics,
|
2020-09-29 14:13:21 -07:00
|
|
|
)?;
|
|
|
|
for index in inserted_indices {
|
|
|
|
if repair_infos[index].is_some() {
|
|
|
|
metrics.num_repair += 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
completed_data_sets_sender.try_send(completed_data_sets)?;
|
2019-12-19 00:15:49 -08:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2019-05-13 21:19:51 -07:00
|
|
|
fn recv_window<F>(
|
2021-08-13 07:47:02 -07:00
|
|
|
blockstore: &Blockstore,
|
|
|
|
bank_forks: &RwLock<BankForks>,
|
2022-01-11 02:44:46 -08:00
|
|
|
insert_shred_sender: &Sender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
|
|
|
verified_receiver: &Receiver<Vec<PacketBatch>>,
|
2021-08-12 09:04:01 -07:00
|
|
|
retransmit_sender: &Sender<Vec<Shred>>,
|
2019-08-20 17:16:06 -07:00
|
|
|
shred_filter: F,
|
2019-09-03 14:50:57 -07:00
|
|
|
thread_pool: &ThreadPool,
|
2021-08-12 17:58:23 -07:00
|
|
|
stats: &mut ReceiveWindowStats,
|
2019-05-13 21:19:51 -07:00
|
|
|
) -> Result<()>
|
|
|
|
where
|
2021-08-13 07:46:02 -07:00
|
|
|
F: Fn(&Shred, Arc<Bank>, /*last root:*/ Slot) -> bool + Sync,
|
2019-05-13 21:19:51 -07:00
|
|
|
{
|
2018-09-07 15:00:26 -07:00
|
|
|
let timer = Duration::from_millis(200);
|
2019-10-28 16:07:51 -07:00
|
|
|
let mut packets = verified_receiver.recv_timeout(timer)?;
|
2021-07-22 07:49:21 -07:00
|
|
|
packets.extend(verified_receiver.try_iter().flatten());
|
2018-09-07 15:00:26 -07:00
|
|
|
let now = Instant::now();
|
2020-01-13 13:13:52 -08:00
|
|
|
let last_root = blockstore.last_root();
|
2021-08-12 09:04:01 -07:00
|
|
|
let working_bank = bank_forks.read().unwrap().working_bank();
|
|
|
|
let handle_packet = |packet: &Packet| {
|
2022-01-02 09:10:32 -08:00
|
|
|
if packet.meta.discard() {
|
2021-07-22 07:49:21 -07:00
|
|
|
inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1);
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
// shred fetch stage should be sending packets
|
|
|
|
// with sufficiently large buffers. Needed to ensure
|
|
|
|
// call to `new_from_serialized_shred` is safe.
|
|
|
|
assert_eq!(packet.data.len(), PACKET_DATA_SIZE);
|
|
|
|
let serialized_shred = packet.data.to_vec();
|
2021-08-12 09:04:01 -07:00
|
|
|
let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?;
|
|
|
|
if !shred_filter(&shred, working_bank.clone(), last_root) {
|
|
|
|
return None;
|
|
|
|
}
|
2022-01-02 09:10:32 -08:00
|
|
|
if packet.meta.repair() {
|
2021-07-22 07:49:21 -07:00
|
|
|
let repair_info = RepairMeta {
|
|
|
|
_from_addr: packet.meta.addr(),
|
|
|
|
// If can't parse the nonce, dump the packet.
|
|
|
|
nonce: repair_response::nonce(&packet.data)?,
|
|
|
|
};
|
|
|
|
Some((shred, Some(repair_info)))
|
|
|
|
} else {
|
|
|
|
Some((shred, None))
|
|
|
|
}
|
|
|
|
};
|
2020-05-19 12:38:18 -07:00
|
|
|
let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
|
2019-09-03 14:50:57 -07:00
|
|
|
packets
|
2021-08-12 09:04:01 -07:00
|
|
|
.par_iter()
|
|
|
|
.flat_map_iter(|pkt| pkt.packets.iter().filter_map(handle_packet))
|
2020-05-19 12:38:18 -07:00
|
|
|
.unzip()
|
2019-09-05 19:16:18 -07:00
|
|
|
});
|
2021-08-12 09:04:01 -07:00
|
|
|
// Exclude repair packets from retransmit.
|
|
|
|
let _ = retransmit_sender.send(
|
|
|
|
shreds
|
|
|
|
.iter()
|
|
|
|
.zip(&repair_infos)
|
|
|
|
.filter(|(_, repair_info)| repair_info.is_none())
|
|
|
|
.map(|(shred, _)| shred)
|
|
|
|
.cloned()
|
|
|
|
.collect(),
|
|
|
|
);
|
2021-08-12 17:58:23 -07:00
|
|
|
stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count();
|
|
|
|
stats.num_shreds += shreds.len();
|
|
|
|
for shred in &shreds {
|
|
|
|
*stats.slots.entry(shred.slot()).or_default() += 1;
|
|
|
|
}
|
2021-08-12 09:04:01 -07:00
|
|
|
insert_shred_sender.send((shreds, repair_infos))?;
|
|
|
|
|
|
|
|
stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::<usize>();
|
2021-08-12 17:58:23 -07:00
|
|
|
for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) {
|
|
|
|
*stats.addrs.entry(packet.meta.addr()).or_default() += 1;
|
|
|
|
}
|
|
|
|
stats.elapsed += now.elapsed();
|
2018-09-07 15:00:26 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2020-05-19 12:38:18 -07:00
|
|
|
struct RepairMeta {
|
|
|
|
_from_addr: SocketAddr,
|
|
|
|
nonce: Nonce,
|
|
|
|
}
|
|
|
|
|
2019-02-07 15:10:54 -08:00
|
|
|
// Implement a destructor for the window_service thread to signal it exited
|
|
|
|
// even on panics
|
|
|
|
struct Finalizer {
|
|
|
|
exit_sender: Arc<AtomicBool>,
|
|
|
|
}
|
2018-11-24 19:32:33 -08:00
|
|
|
|
2019-02-07 15:10:54 -08:00
|
|
|
impl Finalizer {
|
|
|
|
fn new(exit_sender: Arc<AtomicBool>) -> Self {
|
|
|
|
Finalizer { exit_sender }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Implement a destructor for Finalizer.
|
|
|
|
impl Drop for Finalizer {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.exit_sender.clone().store(true, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
}
|
2018-11-24 19:32:33 -08:00
|
|
|
|
2021-08-13 07:47:02 -07:00
|
|
|
pub(crate) struct WindowService {
|
2019-02-07 15:10:54 -08:00
|
|
|
t_window: JoinHandle<()>,
|
2019-12-19 00:15:49 -08:00
|
|
|
t_insert: JoinHandle<()>,
|
2020-01-16 15:27:54 -08:00
|
|
|
t_check_duplicate: JoinHandle<()>,
|
2019-02-07 15:10:54 -08:00
|
|
|
repair_service: RepairService,
|
|
|
|
}
|
2018-09-07 15:00:26 -07:00
|
|
|
|
2019-02-07 15:10:54 -08:00
|
|
|
impl WindowService {
|
2019-04-23 16:24:44 -07:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2021-08-13 07:47:02 -07:00
|
|
|
pub(crate) fn new<F>(
|
2020-01-13 13:13:52 -08:00
|
|
|
blockstore: Arc<Blockstore>,
|
2022-01-11 02:44:46 -08:00
|
|
|
verified_receiver: Receiver<Vec<PacketBatch>>,
|
2021-08-12 09:04:01 -07:00
|
|
|
retransmit_sender: Sender<Vec<Shred>>,
|
2019-02-07 15:10:54 -08:00
|
|
|
repair_socket: Arc<UdpSocket>,
|
2021-12-17 21:44:01 -08:00
|
|
|
ancestor_hashes_socket: Arc<UdpSocket>,
|
2021-08-13 07:47:02 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
2020-05-14 18:22:47 -07:00
|
|
|
repair_info: RepairInfo,
|
2021-08-13 07:47:02 -07:00
|
|
|
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
2019-08-20 17:16:06 -07:00
|
|
|
shred_filter: F,
|
2020-07-09 22:52:54 -07:00
|
|
|
verified_vote_receiver: VerifiedVoteReceiver,
|
2020-09-01 22:06:06 -07:00
|
|
|
completed_data_sets_sender: CompletedDataSetsSender,
|
2021-03-24 23:41:52 -07:00
|
|
|
duplicate_slots_sender: DuplicateSlotSender,
|
2021-07-26 20:59:00 -07:00
|
|
|
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
|
2019-05-13 21:19:51 -07:00
|
|
|
) -> WindowService
|
|
|
|
where
|
|
|
|
F: 'static
|
2021-08-13 07:46:02 -07:00
|
|
|
+ Fn(&Pubkey, &Shred, Option<Arc<Bank>>, /*last root:*/ Slot) -> bool
|
2019-05-13 21:19:51 -07:00
|
|
|
+ std::marker::Send
|
|
|
|
+ std::marker::Sync,
|
|
|
|
{
|
2021-08-13 07:46:02 -07:00
|
|
|
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
|
2021-04-20 09:37:33 -07:00
|
|
|
|
2021-07-07 08:21:12 -07:00
|
|
|
let bank_forks = repair_info.bank_forks.clone();
|
2021-07-23 16:54:47 -07:00
|
|
|
let cluster_info = repair_info.cluster_info.clone();
|
|
|
|
let id = cluster_info.id();
|
2019-05-09 14:10:04 -07:00
|
|
|
|
2019-02-12 17:43:45 -08:00
|
|
|
let repair_service = RepairService::new(
|
2020-01-13 13:13:52 -08:00
|
|
|
blockstore.clone(),
|
2019-05-24 19:20:09 -07:00
|
|
|
exit.clone(),
|
2019-02-12 17:43:45 -08:00
|
|
|
repair_socket,
|
2021-12-17 21:44:01 -08:00
|
|
|
ancestor_hashes_socket,
|
2020-05-14 18:22:47 -07:00
|
|
|
repair_info,
|
2020-07-09 22:52:54 -07:00
|
|
|
verified_vote_receiver,
|
2021-04-20 09:37:33 -07:00
|
|
|
outstanding_requests.clone(),
|
2021-07-26 20:59:00 -07:00
|
|
|
ancestor_hashes_replay_update_receiver,
|
2019-02-12 17:43:45 -08:00
|
|
|
);
|
2019-12-19 00:15:49 -08:00
|
|
|
|
|
|
|
let (insert_sender, insert_receiver) = unbounded();
|
2020-01-16 15:27:54 -08:00
|
|
|
let (duplicate_sender, duplicate_receiver) = unbounded();
|
|
|
|
|
2021-01-24 07:47:43 -08:00
|
|
|
let t_check_duplicate = Self::start_check_duplicate_thread(
|
2021-07-23 16:54:47 -07:00
|
|
|
cluster_info,
|
2021-01-24 07:47:43 -08:00
|
|
|
exit.clone(),
|
|
|
|
blockstore.clone(),
|
|
|
|
duplicate_receiver,
|
2021-03-24 23:41:52 -07:00
|
|
|
duplicate_slots_sender,
|
2021-01-24 07:47:43 -08:00
|
|
|
);
|
2019-12-19 00:15:49 -08:00
|
|
|
|
|
|
|
let t_insert = Self::start_window_insert_thread(
|
2021-08-13 07:47:02 -07:00
|
|
|
exit.clone(),
|
|
|
|
blockstore.clone(),
|
2021-08-12 09:04:01 -07:00
|
|
|
leader_schedule_cache,
|
2019-12-19 00:15:49 -08:00
|
|
|
insert_receiver,
|
2020-01-16 15:27:54 -08:00
|
|
|
duplicate_sender,
|
2020-09-01 22:06:06 -07:00
|
|
|
completed_data_sets_sender,
|
2021-08-13 11:11:37 -07:00
|
|
|
retransmit_sender.clone(),
|
2021-04-20 09:37:33 -07:00
|
|
|
outstanding_requests,
|
2019-12-19 00:15:49 -08:00
|
|
|
);
|
|
|
|
|
|
|
|
let t_window = Self::start_recv_window_thread(
|
2021-07-23 16:54:47 -07:00
|
|
|
id,
|
2019-12-19 00:15:49 -08:00
|
|
|
exit,
|
2021-08-13 07:47:02 -07:00
|
|
|
blockstore,
|
2019-12-19 00:15:49 -08:00
|
|
|
insert_sender,
|
|
|
|
verified_receiver,
|
|
|
|
shred_filter,
|
2021-08-13 07:47:02 -07:00
|
|
|
bank_forks,
|
2021-08-12 09:04:01 -07:00
|
|
|
retransmit_sender,
|
2019-12-19 00:15:49 -08:00
|
|
|
);
|
|
|
|
|
|
|
|
WindowService {
|
|
|
|
t_window,
|
|
|
|
t_insert,
|
2020-01-16 15:27:54 -08:00
|
|
|
t_check_duplicate,
|
2019-12-19 00:15:49 -08:00
|
|
|
repair_service,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-01-16 15:27:54 -08:00
|
|
|
fn start_check_duplicate_thread(
|
2021-01-24 07:47:43 -08:00
|
|
|
cluster_info: Arc<ClusterInfo>,
|
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
blockstore: Arc<Blockstore>,
|
2022-01-11 02:44:46 -08:00
|
|
|
duplicate_receiver: Receiver<Shred>,
|
2021-03-24 23:41:52 -07:00
|
|
|
duplicate_slot_sender: DuplicateSlotSender,
|
2020-01-16 15:27:54 -08:00
|
|
|
) -> JoinHandle<()> {
|
|
|
|
let handle_error = || {
|
|
|
|
inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
|
|
|
|
};
|
|
|
|
Builder::new()
|
|
|
|
.name("solana-check-duplicate".to_string())
|
|
|
|
.spawn(move || loop {
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut noop = || {};
|
2021-03-24 23:41:52 -07:00
|
|
|
if let Err(e) = run_check_duplicate(
|
|
|
|
&cluster_info,
|
|
|
|
&blockstore,
|
|
|
|
&duplicate_receiver,
|
|
|
|
&duplicate_slot_sender,
|
|
|
|
) {
|
2020-01-16 15:27:54 -08:00
|
|
|
if Self::should_exit_on_error(e, &mut noop, &handle_error) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.unwrap()
|
|
|
|
}
|
|
|
|
|
2019-12-19 00:15:49 -08:00
|
|
|
fn start_window_insert_thread(
|
2021-08-13 07:47:02 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
blockstore: Arc<Blockstore>,
|
|
|
|
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
2022-01-11 02:44:46 -08:00
|
|
|
insert_receiver: Receiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
|
|
|
check_duplicate_sender: Sender<Shred>,
|
2020-09-01 22:06:06 -07:00
|
|
|
completed_data_sets_sender: CompletedDataSetsSender,
|
2021-08-13 11:11:37 -07:00
|
|
|
retransmit_sender: Sender<Vec<Shred>>,
|
2021-07-21 11:15:08 -07:00
|
|
|
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
|
2019-12-19 00:15:49 -08:00
|
|
|
) -> JoinHandle<()> {
|
|
|
|
let mut handle_timeout = || {};
|
|
|
|
let handle_error = || {
|
|
|
|
inc_new_counter_error!("solana-window-insert-error", 1, 1);
|
|
|
|
};
|
2020-01-16 15:27:54 -08:00
|
|
|
|
2019-12-19 00:15:49 -08:00
|
|
|
Builder::new()
|
|
|
|
.name("solana-window-insert".to_string())
|
2020-01-16 15:27:54 -08:00
|
|
|
.spawn(move || {
|
|
|
|
let handle_duplicate = |shred| {
|
2021-03-24 23:41:52 -07:00
|
|
|
let _ = check_duplicate_sender.send(shred);
|
2020-01-16 15:27:54 -08:00
|
|
|
};
|
2020-03-26 12:51:41 -07:00
|
|
|
let mut metrics = BlockstoreInsertionMetrics::default();
|
2021-07-28 19:30:43 -07:00
|
|
|
let mut ws_metrics = WindowServiceMetrics::default();
|
2020-03-26 12:51:41 -07:00
|
|
|
let mut last_print = Instant::now();
|
2020-01-16 15:27:54 -08:00
|
|
|
loop {
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
2019-12-19 00:15:49 -08:00
|
|
|
break;
|
|
|
|
}
|
2020-01-16 15:27:54 -08:00
|
|
|
|
|
|
|
if let Err(e) = run_insert(
|
|
|
|
&insert_receiver,
|
|
|
|
&blockstore,
|
|
|
|
&leader_schedule_cache,
|
|
|
|
&handle_duplicate,
|
2020-03-26 12:51:41 -07:00
|
|
|
&mut metrics,
|
2021-07-28 19:30:43 -07:00
|
|
|
&mut ws_metrics,
|
2020-09-01 22:06:06 -07:00
|
|
|
&completed_data_sets_sender,
|
2021-08-13 11:11:37 -07:00
|
|
|
&retransmit_sender,
|
2021-04-20 09:37:33 -07:00
|
|
|
&outstanding_requests,
|
2020-01-16 15:27:54 -08:00
|
|
|
) {
|
2021-10-15 07:13:26 -07:00
|
|
|
ws_metrics.record_error(&e);
|
2020-01-16 15:27:54 -08:00
|
|
|
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2020-03-26 12:51:41 -07:00
|
|
|
|
|
|
|
if last_print.elapsed().as_secs() > 2 {
|
2022-03-23 11:38:17 -07:00
|
|
|
metrics.report_metrics("blockstore-insert-shreds");
|
2020-03-26 12:51:41 -07:00
|
|
|
metrics = BlockstoreInsertionMetrics::default();
|
2021-07-28 19:30:43 -07:00
|
|
|
ws_metrics.report_metrics("recv-window-insert-shreds");
|
|
|
|
ws_metrics = WindowServiceMetrics::default();
|
2020-03-26 12:51:41 -07:00
|
|
|
last_print = Instant::now();
|
|
|
|
}
|
2019-12-19 00:15:49 -08:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.unwrap()
|
|
|
|
}
|
|
|
|
|
2021-07-07 08:21:12 -07:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2019-12-19 00:15:49 -08:00
|
|
|
fn start_recv_window_thread<F>(
|
|
|
|
id: Pubkey,
|
2021-08-13 07:47:02 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
blockstore: Arc<Blockstore>,
|
2022-01-11 02:44:46 -08:00
|
|
|
insert_sender: Sender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
|
|
|
verified_receiver: Receiver<Vec<PacketBatch>>,
|
2019-12-19 00:15:49 -08:00
|
|
|
shred_filter: F,
|
2021-08-13 07:47:02 -07:00
|
|
|
bank_forks: Arc<RwLock<BankForks>>,
|
2021-08-12 09:04:01 -07:00
|
|
|
retransmit_sender: Sender<Vec<Shred>>,
|
2019-12-19 00:15:49 -08:00
|
|
|
) -> JoinHandle<()>
|
|
|
|
where
|
|
|
|
F: 'static
|
|
|
|
+ Fn(&Pubkey, &Shred, Option<Arc<Bank>>, u64) -> bool
|
|
|
|
+ std::marker::Send
|
|
|
|
+ std::marker::Sync,
|
|
|
|
{
|
2021-08-12 17:58:23 -07:00
|
|
|
let mut stats = ReceiveWindowStats::default();
|
2019-12-19 00:15:49 -08:00
|
|
|
Builder::new()
|
2019-02-07 15:10:54 -08:00
|
|
|
.name("solana-window".to_string())
|
|
|
|
.spawn(move || {
|
2019-03-04 20:50:02 -08:00
|
|
|
let _exit = Finalizer::new(exit.clone());
|
2019-02-07 15:10:54 -08:00
|
|
|
trace!("{}: RECV_WINDOW started", id);
|
2019-09-03 14:50:57 -07:00
|
|
|
let thread_pool = rayon::ThreadPoolBuilder::new()
|
2019-09-12 11:39:39 -07:00
|
|
|
.num_threads(get_thread_count())
|
2019-09-03 14:50:57 -07:00
|
|
|
.build()
|
|
|
|
.unwrap();
|
2019-12-19 00:15:49 -08:00
|
|
|
let mut now = Instant::now();
|
|
|
|
let handle_error = || {
|
|
|
|
inc_new_counter_error!("solana-window-error", 1, 1);
|
|
|
|
};
|
|
|
|
|
2021-08-12 17:58:23 -07:00
|
|
|
while !exit.load(Ordering::Relaxed) {
|
2019-12-19 00:15:49 -08:00
|
|
|
let mut handle_timeout = || {
|
|
|
|
if now.elapsed() > Duration::from_secs(30) {
|
2021-08-12 17:58:23 -07:00
|
|
|
warn!(
|
|
|
|
"Window does not seem to be receiving data. \
|
|
|
|
Ensure port configuration is correct..."
|
|
|
|
);
|
2019-12-19 00:15:49 -08:00
|
|
|
now = Instant::now();
|
|
|
|
}
|
|
|
|
};
|
2019-06-12 16:43:05 -07:00
|
|
|
if let Err(e) = recv_window(
|
2020-01-13 13:13:52 -08:00
|
|
|
&blockstore,
|
2021-07-07 08:21:12 -07:00
|
|
|
&bank_forks,
|
2019-12-19 00:15:49 -08:00
|
|
|
&insert_sender,
|
2019-10-28 16:07:51 -07:00
|
|
|
&verified_receiver,
|
2021-08-12 09:04:01 -07:00
|
|
|
&retransmit_sender,
|
2021-08-12 17:58:23 -07:00
|
|
|
|shred, bank, last_root| shred_filter(&id, shred, Some(bank), last_root),
|
2019-09-03 14:50:57 -07:00
|
|
|
&thread_pool,
|
2021-08-12 17:58:23 -07:00
|
|
|
&mut stats,
|
2019-06-12 16:43:05 -07:00
|
|
|
) {
|
2019-12-19 00:15:49 -08:00
|
|
|
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
|
|
|
|
break;
|
2018-11-24 19:32:33 -08:00
|
|
|
}
|
2019-07-24 12:46:10 -07:00
|
|
|
} else {
|
|
|
|
now = Instant::now();
|
2018-11-24 19:32:33 -08:00
|
|
|
}
|
2021-08-12 17:58:23 -07:00
|
|
|
stats.maybe_submit();
|
2018-09-07 15:00:26 -07:00
|
|
|
}
|
2019-02-07 15:10:54 -08:00
|
|
|
})
|
2019-12-19 00:15:49 -08:00
|
|
|
.unwrap()
|
|
|
|
}
|
2019-02-07 15:10:54 -08:00
|
|
|
|
2019-12-19 00:15:49 -08:00
|
|
|
fn should_exit_on_error<F, H>(e: Error, handle_timeout: &mut F, handle_error: &H) -> bool
|
|
|
|
where
|
2020-06-08 17:38:14 -07:00
|
|
|
F: FnMut(),
|
|
|
|
H: Fn(),
|
2019-12-19 00:15:49 -08:00
|
|
|
{
|
|
|
|
match e {
|
2022-01-11 02:44:46 -08:00
|
|
|
Error::RecvTimeout(RecvTimeoutError::Disconnected) => true,
|
|
|
|
Error::RecvTimeout(RecvTimeoutError::Timeout) => {
|
2019-12-19 00:15:49 -08:00
|
|
|
handle_timeout();
|
|
|
|
false
|
|
|
|
}
|
2022-01-11 02:44:46 -08:00
|
|
|
Error::Send => true,
|
2019-12-19 00:15:49 -08:00
|
|
|
_ => {
|
|
|
|
handle_error();
|
|
|
|
error!("thread {:?} error {:?}", thread::current().name(), e);
|
|
|
|
false
|
|
|
|
}
|
2019-02-07 15:10:54 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-13 07:47:02 -07:00
|
|
|
pub(crate) fn join(self) -> thread::Result<()> {
|
2019-02-07 15:10:54 -08:00
|
|
|
self.t_window.join()?;
|
2019-12-19 00:15:49 -08:00
|
|
|
self.t_insert.join()?;
|
2020-01-16 15:27:54 -08:00
|
|
|
self.t_check_duplicate.join()?;
|
2019-02-07 15:10:54 -08:00
|
|
|
self.repair_service.join()
|
|
|
|
}
|
2018-09-07 15:00:26 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
2021-08-13 07:47:02 -07:00
|
|
|
use {
|
|
|
|
super::*,
|
|
|
|
solana_entry::entry::{create_ticks, Entry},
|
|
|
|
solana_gossip::contact_info::ContactInfo,
|
|
|
|
solana_ledger::{
|
|
|
|
blockstore::{make_many_slot_entries, Blockstore},
|
|
|
|
genesis_utils::create_genesis_config_with_leader,
|
|
|
|
get_tmp_ledger_path,
|
|
|
|
shred::{DataShredHeader, Shredder},
|
|
|
|
},
|
|
|
|
solana_sdk::{
|
|
|
|
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
|
|
|
|
hash::Hash,
|
|
|
|
signature::{Keypair, Signer},
|
|
|
|
timing::timestamp,
|
|
|
|
},
|
|
|
|
solana_streamer::socket::SocketAddrSpace,
|
2019-09-17 15:11:29 -07:00
|
|
|
};
|
2018-09-21 16:01:24 -07:00
|
|
|
|
2019-09-16 13:13:53 -07:00
|
|
|
fn local_entries_to_shred(
|
2019-10-08 00:42:51 -07:00
|
|
|
entries: &[Entry],
|
2019-11-02 00:38:30 -07:00
|
|
|
slot: Slot,
|
|
|
|
parent: Slot,
|
2021-06-21 13:12:38 -07:00
|
|
|
keypair: &Keypair,
|
2019-09-18 16:24:30 -07:00
|
|
|
) -> Vec<Shred> {
|
2021-06-21 13:12:38 -07:00
|
|
|
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
|
2021-12-19 14:37:55 -08:00
|
|
|
let (data_shreds, _) = shredder.entries_to_shreds(
|
|
|
|
keypair, entries, true, // is_last_in_slot
|
|
|
|
0, // next_shred_index
|
|
|
|
0, // next_code_index
|
|
|
|
);
|
|
|
|
data_shreds
|
2019-08-20 17:16:06 -07:00
|
|
|
}
|
|
|
|
|
2019-04-14 18:52:05 -07:00
|
|
|
#[test]
|
2019-09-17 15:11:29 -07:00
|
|
|
fn test_process_shred() {
|
2020-01-13 13:13:52 -08:00
|
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
|
|
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
|
2019-04-14 18:52:05 -07:00
|
|
|
let num_entries = 10;
|
2019-10-31 13:38:50 -07:00
|
|
|
let original_entries = create_ticks(num_entries, 0, Hash::default());
|
2021-06-21 13:12:38 -07:00
|
|
|
let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new());
|
2019-09-05 18:20:30 -07:00
|
|
|
shreds.reverse();
|
2020-01-13 13:13:52 -08:00
|
|
|
blockstore
|
2019-11-14 00:32:07 -08:00
|
|
|
.insert_shreds(shreds, None, false)
|
2019-09-05 18:20:30 -07:00
|
|
|
.expect("Expect successful processing of shred");
|
2019-04-14 18:52:05 -07:00
|
|
|
|
2020-04-09 13:09:59 -07:00
|
|
|
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);
|
2019-04-14 18:52:05 -07:00
|
|
|
|
2020-01-13 13:13:52 -08:00
|
|
|
drop(blockstore);
|
|
|
|
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
2019-04-14 18:52:05 -07:00
|
|
|
}
|
|
|
|
|
2019-04-22 15:21:10 -07:00
|
|
|
#[test]
|
|
|
|
fn test_should_retransmit_and_persist() {
|
2020-10-19 12:12:08 -07:00
|
|
|
let me_id = solana_sdk::pubkey::new_rand();
|
2019-09-16 13:13:53 -07:00
|
|
|
let leader_keypair = Arc::new(Keypair::new());
|
2019-06-12 16:43:05 -07:00
|
|
|
let leader_pubkey = leader_keypair.pubkey();
|
2021-08-05 06:42:38 -07:00
|
|
|
let bank = Arc::new(Bank::new_for_tests(
|
2019-11-08 20:56:57 -08:00
|
|
|
&create_genesis_config_with_leader(100, &leader_pubkey, 10).genesis_config,
|
2019-04-22 15:21:10 -07:00
|
|
|
));
|
2019-04-22 18:41:01 -07:00
|
|
|
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
|
2019-04-22 15:21:10 -07:00
|
|
|
|
2021-10-13 20:56:14 -07:00
|
|
|
let shreds = local_entries_to_shred(&[Entry::default()], 0, 0, &leader_keypair);
|
2019-04-22 15:21:10 -07:00
|
|
|
|
2019-11-14 11:49:31 -08:00
|
|
|
// with a Bank for slot 0, shred continues
|
2021-05-19 07:31:47 -07:00
|
|
|
assert!(should_retransmit_and_persist(
|
|
|
|
&shreds[0],
|
|
|
|
Some(bank.clone()),
|
|
|
|
&cache,
|
|
|
|
&me_id,
|
|
|
|
0,
|
|
|
|
0
|
|
|
|
));
|
2021-11-03 00:01:07 -07:00
|
|
|
|
2019-11-18 18:05:02 -08:00
|
|
|
// with the wrong shred_version, shred gets thrown out
|
2021-05-19 07:31:47 -07:00
|
|
|
assert!(!should_retransmit_and_persist(
|
|
|
|
&shreds[0],
|
|
|
|
Some(bank.clone()),
|
|
|
|
&cache,
|
|
|
|
&me_id,
|
|
|
|
0,
|
|
|
|
1
|
|
|
|
));
|
2019-04-22 15:21:10 -07:00
|
|
|
|
2021-11-03 00:01:07 -07:00
|
|
|
// substitute leader_pubkey for me_id so it looks I was the leader
|
|
|
|
// if the shred came back from me, it doesn't continue, whether or not I have a bank
|
|
|
|
assert!(!should_retransmit_and_persist(
|
|
|
|
&shreds[0],
|
2021-05-19 07:31:47 -07:00
|
|
|
Some(bank.clone()),
|
|
|
|
&cache,
|
2021-11-03 00:01:07 -07:00
|
|
|
&leader_pubkey,
|
2021-05-19 07:31:47 -07:00
|
|
|
0,
|
|
|
|
0
|
|
|
|
));
|
|
|
|
assert!(!should_retransmit_and_persist(
|
2021-11-03 00:01:07 -07:00
|
|
|
&shreds[0],
|
|
|
|
None,
|
2021-05-19 07:31:47 -07:00
|
|
|
&cache,
|
2021-11-03 00:01:07 -07:00
|
|
|
&leader_pubkey,
|
|
|
|
0,
|
2021-05-19 07:31:47 -07:00
|
|
|
0
|
|
|
|
));
|
2019-09-24 14:54:10 -07:00
|
|
|
|
2021-11-03 00:01:07 -07:00
|
|
|
// change the shred's slot so leader lookup fails
|
2019-11-14 11:49:31 -08:00
|
|
|
// with a Bank and no idea who leader is, shred gets thrown out
|
2021-10-13 20:56:14 -07:00
|
|
|
let mut bad_slot_shred = shreds[0].clone();
|
|
|
|
bad_slot_shred.set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3);
|
2021-05-19 07:31:47 -07:00
|
|
|
assert!(!should_retransmit_and_persist(
|
2021-10-13 20:56:14 -07:00
|
|
|
&bad_slot_shred,
|
|
|
|
Some(bank.clone()),
|
|
|
|
&cache,
|
|
|
|
&me_id,
|
|
|
|
0,
|
|
|
|
0
|
|
|
|
));
|
|
|
|
|
|
|
|
// with a bad header size
|
|
|
|
let mut bad_header_shred = shreds[0].clone();
|
|
|
|
bad_header_shred.data_header.size = (bad_header_shred.payload.len() + 1) as u16;
|
|
|
|
assert!(!should_retransmit_and_persist(
|
|
|
|
&bad_header_shred,
|
2021-05-19 07:31:47 -07:00
|
|
|
Some(bank.clone()),
|
|
|
|
&cache,
|
|
|
|
&me_id,
|
|
|
|
0,
|
|
|
|
0
|
|
|
|
));
|
2019-09-16 13:13:53 -07:00
|
|
|
|
2021-11-03 00:01:07 -07:00
|
|
|
// with an invalid index, shred gets thrown out
|
|
|
|
let mut bad_index_shred = shreds[0].clone();
|
|
|
|
bad_index_shred.common_header.index = (MAX_DATA_SHREDS_PER_SLOT + 1) as u32;
|
|
|
|
assert!(!should_retransmit_and_persist(
|
|
|
|
&bad_index_shred,
|
|
|
|
Some(bank.clone()),
|
|
|
|
&cache,
|
|
|
|
&me_id,
|
|
|
|
0,
|
|
|
|
0
|
|
|
|
));
|
|
|
|
|
2019-11-14 11:49:31 -08:00
|
|
|
// with a shred where shred.slot() == root, shred gets thrown out
|
2021-11-03 00:01:07 -07:00
|
|
|
let root = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
|
|
|
|
let shreds = local_entries_to_shred(&[Entry::default()], root, root - 1, &leader_keypair);
|
2021-05-19 07:31:47 -07:00
|
|
|
assert!(!should_retransmit_and_persist(
|
|
|
|
&shreds[0],
|
|
|
|
Some(bank.clone()),
|
|
|
|
&cache,
|
|
|
|
&me_id,
|
2021-11-03 00:01:07 -07:00
|
|
|
root,
|
2021-05-19 07:31:47 -07:00
|
|
|
0
|
|
|
|
));
|
2019-09-16 13:13:53 -07:00
|
|
|
|
2019-11-14 11:49:31 -08:00
|
|
|
// with a shred where shred.parent() < root, shred gets thrown out
|
2021-11-03 00:01:07 -07:00
|
|
|
let root = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
|
2019-09-16 13:13:53 -07:00
|
|
|
let shreds =
|
2021-11-03 00:01:07 -07:00
|
|
|
local_entries_to_shred(&[Entry::default()], root + 1, root - 1, &leader_keypair);
|
2021-05-19 07:31:47 -07:00
|
|
|
assert!(!should_retransmit_and_persist(
|
|
|
|
&shreds[0],
|
2021-11-03 00:01:07 -07:00
|
|
|
Some(bank.clone()),
|
2021-05-19 07:31:47 -07:00
|
|
|
&cache,
|
|
|
|
&me_id,
|
2021-11-03 00:01:07 -07:00
|
|
|
root,
|
2021-05-19 07:31:47 -07:00
|
|
|
0
|
|
|
|
));
|
2019-04-22 15:21:10 -07:00
|
|
|
|
2021-11-03 00:01:07 -07:00
|
|
|
// coding shreds don't contain parent slot information, test that slot >= root
|
2022-04-19 13:00:05 -07:00
|
|
|
let (common, coding) = Shred::new_coding_shred_header(
|
2021-12-05 06:42:09 -08:00
|
|
|
5, // slot
|
|
|
|
5, // index
|
|
|
|
5, // fec_set_index
|
|
|
|
6, // num_data_shreds
|
|
|
|
6, // num_coding_shreds
|
|
|
|
3, // position
|
|
|
|
0, // version
|
|
|
|
);
|
2021-11-03 00:01:07 -07:00
|
|
|
let mut coding_shred =
|
|
|
|
Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
|
2022-04-19 13:00:05 -07:00
|
|
|
coding_shred.sign(&leader_keypair);
|
2021-11-03 00:01:07 -07:00
|
|
|
// shred.slot() > root, shred continues
|
|
|
|
assert!(should_retransmit_and_persist(
|
|
|
|
&coding_shred,
|
|
|
|
Some(bank.clone()),
|
|
|
|
&cache,
|
|
|
|
&me_id,
|
|
|
|
0,
|
|
|
|
0
|
|
|
|
));
|
|
|
|
// shred.slot() == root, shred continues
|
|
|
|
assert!(should_retransmit_and_persist(
|
|
|
|
&coding_shred,
|
|
|
|
Some(bank.clone()),
|
|
|
|
&cache,
|
|
|
|
&me_id,
|
|
|
|
5,
|
|
|
|
0
|
|
|
|
));
|
|
|
|
// shred.slot() < root, shred gets thrown out
|
2021-05-19 07:31:47 -07:00
|
|
|
assert!(!should_retransmit_and_persist(
|
2021-11-03 00:01:07 -07:00
|
|
|
&coding_shred,
|
|
|
|
Some(bank),
|
|
|
|
&cache,
|
|
|
|
&me_id,
|
|
|
|
6,
|
|
|
|
0
|
2021-05-19 07:31:47 -07:00
|
|
|
));
|
2019-04-22 15:21:10 -07:00
|
|
|
}
|
|
|
|
|
2020-01-16 15:27:54 -08:00
|
|
|
#[test]
|
|
|
|
fn test_run_check_duplicate() {
|
|
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
|
|
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
|
|
|
|
let (sender, receiver) = unbounded();
|
2021-03-24 23:41:52 -07:00
|
|
|
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
|
2020-01-16 15:27:54 -08:00
|
|
|
let (shreds, _) = make_many_slot_entries(5, 5, 10);
|
|
|
|
blockstore
|
|
|
|
.insert_shreds(shreds.clone(), None, false)
|
|
|
|
.unwrap();
|
|
|
|
let mut duplicate_shred = shreds[1].clone();
|
|
|
|
duplicate_shred.set_slot(shreds[0].slot());
|
|
|
|
let duplicate_shred_slot = duplicate_shred.slot();
|
|
|
|
sender.send(duplicate_shred).unwrap();
|
|
|
|
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
|
2021-01-24 07:47:43 -08:00
|
|
|
let keypair = Keypair::new();
|
|
|
|
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
|
2021-07-23 08:25:03 -07:00
|
|
|
let cluster_info = ClusterInfo::new(
|
|
|
|
contact_info,
|
|
|
|
Arc::new(keypair),
|
|
|
|
SocketAddrSpace::Unspecified,
|
|
|
|
);
|
2021-03-24 23:41:52 -07:00
|
|
|
run_check_duplicate(
|
|
|
|
&cluster_info,
|
|
|
|
&blockstore,
|
|
|
|
&receiver,
|
|
|
|
&duplicate_slot_sender,
|
|
|
|
)
|
|
|
|
.unwrap();
|
2020-01-16 15:27:54 -08:00
|
|
|
assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
|
2021-03-24 23:41:52 -07:00
|
|
|
assert_eq!(
|
|
|
|
duplicate_slot_receiver.try_recv().unwrap(),
|
|
|
|
duplicate_shred_slot
|
|
|
|
);
|
2020-01-16 15:27:54 -08:00
|
|
|
}
|
2021-04-20 09:37:33 -07:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_prune_shreds() {
|
2021-12-03 09:00:31 -08:00
|
|
|
use {
|
|
|
|
crate::serve_repair::ShredRepairType,
|
|
|
|
std::net::{IpAddr, Ipv4Addr},
|
|
|
|
};
|
2021-04-20 09:37:33 -07:00
|
|
|
solana_logger::setup();
|
2022-04-19 13:00:05 -07:00
|
|
|
let (common, coding) = Shred::new_coding_shred_header(
|
2021-12-05 06:42:09 -08:00
|
|
|
5, // slot
|
|
|
|
5, // index
|
|
|
|
5, // fec_set_index
|
|
|
|
6, // num_data_shreds
|
|
|
|
6, // num_coding_shreds
|
|
|
|
4, // position
|
|
|
|
0, // version
|
|
|
|
);
|
2021-04-20 09:37:33 -07:00
|
|
|
let shred = Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
|
|
|
|
let mut shreds = vec![shred.clone(), shred.clone(), shred];
|
|
|
|
let _from_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
|
|
|
let repair_meta = RepairMeta {
|
|
|
|
_from_addr,
|
|
|
|
nonce: 0,
|
|
|
|
};
|
2021-07-21 11:15:08 -07:00
|
|
|
let outstanding_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default()));
|
2021-07-15 19:29:53 -07:00
|
|
|
let repair_type = ShredRepairType::Orphan(9);
|
2021-04-20 09:37:33 -07:00
|
|
|
let nonce = outstanding_requests
|
|
|
|
.write()
|
|
|
|
.unwrap()
|
|
|
|
.add_request(repair_type, timestamp());
|
|
|
|
let repair_meta1 = RepairMeta { _from_addr, nonce };
|
|
|
|
let mut repair_infos = vec![None, Some(repair_meta), Some(repair_meta1)];
|
|
|
|
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, &outstanding_requests);
|
|
|
|
assert_eq!(repair_infos.len(), 2);
|
|
|
|
assert!(repair_infos[0].is_none());
|
|
|
|
assert_eq!(repair_infos[1].as_ref().unwrap().nonce, nonce);
|
|
|
|
}
|
2018-09-07 15:00:26 -07:00
|
|
|
}
|