solana/core/src/retransmit_stage.rs

716 lines
26 KiB
Rust
Raw Normal View History

//! The `retransmit_stage` retransmits shreds between validators
2020-12-13 17:26:34 -08:00
#![allow(clippy::rc_buffer)]
use {
crate::{
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_nodes::ClusterNodesCache,
cluster_slots::ClusterSlots,
cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver},
completed_data_sets_service::CompletedDataSetsSender,
packet_hasher::PacketHasher,
repair_service::{DuplicateSlotsResetSender, RepairInfo},
result::{Error, Result},
window_service::{should_retransmit_and_persist, WindowService},
},
crossbeam_channel::{Receiver, Sender},
lru::LruCache,
solana_client::rpc_response::SlotUpdate,
solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
solana_ledger::{
shred::{get_shred_slot_index_type, ShredFetchStats},
{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
solana_perf::packet::{Packet, Packets},
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::Slot,
epoch_schedule::EpochSchedule,
pubkey::Pubkey,
timing::{timestamp, AtomicInterval},
},
solana_streamer::streamer::PacketReceiver,
std::{
collections::{BTreeSet, HashSet},
net::UdpSocket,
ops::DerefMut,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc::{channel, RecvTimeoutError},
Arc, Mutex, RwLock,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
2020-12-10 07:54:15 -08:00
const MAX_DUPLICATE_COUNT: usize = 2;
const DEFAULT_LRU_SIZE: usize = 10_000;
// Limit a given thread to consume about this many packets so that
// it doesn't pull up too much work.
const MAX_PACKET_BATCH_SIZE: usize = 100;
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
#[derive(Default)]
struct RetransmitStats {
total_packets: AtomicU64,
total_batches: AtomicU64,
total_time: AtomicU64,
epoch_fetch: AtomicU64,
epoch_cache_update: AtomicU64,
retransmit_total: AtomicU64,
last_ts: AtomicInterval,
compute_turbine_peers_total: AtomicU64,
retransmit_tree_mismatch: AtomicU64,
}
#[allow(clippy::too_many_arguments)]
fn update_retransmit_stats(
stats: &RetransmitStats,
total_time: u64,
total_packets: usize,
retransmit_total: u64,
compute_turbine_peers_total: u64,
peers_len: usize,
epoch_fetch: u64,
epoch_cach_update: u64,
retransmit_tree_mismatch: u64,
) {
stats.total_time.fetch_add(total_time, Ordering::Relaxed);
stats
.total_packets
.fetch_add(total_packets as u64, Ordering::Relaxed);
stats
.retransmit_total
.fetch_add(retransmit_total, Ordering::Relaxed);
stats
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed);
stats.total_batches.fetch_add(1, Ordering::Relaxed);
stats.epoch_fetch.fetch_add(epoch_fetch, Ordering::Relaxed);
stats
.epoch_cache_update
.fetch_add(epoch_cach_update, Ordering::Relaxed);
stats
.retransmit_tree_mismatch
.fetch_add(retransmit_tree_mismatch, Ordering::Relaxed);
if stats.last_ts.should_update(2000) {
datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64));
datapoint_info!(
"retransmit-stage",
(
"total_time",
stats.total_time.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"epoch_fetch",
stats.epoch_fetch.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"epoch_cache_update",
stats.epoch_cache_update.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_batches",
stats.total_batches.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_packets",
stats.total_packets.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"retransmit_total",
stats.retransmit_total.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"retransmit_tree_mismatch",
stats.retransmit_tree_mismatch.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"compute_turbine",
stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
// Map of shred (slot, index, is_data) => list of hash values seen for that key.
pub type ShredFilter = LruCache<(Slot, u32, bool), Vec<u64>>;
pub type ShredFilterAndHasher = (ShredFilter, PacketHasher);
// Returns None if shred is already received and should skip retransmit.
// Otherwise returns shred's slot and whether the shred is a data shred.
fn check_if_already_received(
packet: &Packet,
shreds_received: &Mutex<ShredFilterAndHasher>,
) -> Option<Slot> {
let shred = get_shred_slot_index_type(packet, &mut ShredFetchStats::default())?;
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
match cache.get_mut(&shred) {
Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => None,
Some(sent) => {
let hash = hasher.hash_packet(packet);
if sent.contains(&hash) {
None
} else {
sent.push(hash);
Some(shred.0)
}
}
None => {
let hash = hasher.hash_packet(packet);
cache.put(shred, vec![hash]);
Some(shred.0)
}
}
}
// Returns true if this is the first time receiving a shred for `shred_slot`.
fn check_if_first_shred_received(
shred_slot: Slot,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
root_bank: &Bank,
) -> bool {
if shred_slot <= root_bank.slot() {
return false;
}
let mut first_shreds_received_locked = first_shreds_received.lock().unwrap();
if !first_shreds_received_locked.contains(&shred_slot) {
datapoint_info!("retransmit-first-shred", ("slot", shred_slot, i64));
first_shreds_received_locked.insert(shred_slot);
if first_shreds_received_locked.len() > 100 {
let mut slots_before_root =
first_shreds_received_locked.split_off(&(root_bank.slot() + 1));
// `slots_before_root` now contains all slots <= root
std::mem::swap(&mut slots_before_root, &mut first_shreds_received_locked);
}
true
} else {
false
}
}
fn maybe_reset_shreds_received_cache(
shreds_received: &Mutex<ShredFilterAndHasher>,
hasher_reset_ts: &AtomicU64,
) {
const UPDATE_INTERVAL_MS: u64 = 1000;
let now = timestamp();
let prev = hasher_reset_ts.load(Ordering::Acquire);
if now.saturating_sub(prev) > UPDATE_INTERVAL_MS
&& hasher_reset_ts
.compare_exchange(prev, now, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
cache.clear();
hasher.reset();
}
}
2020-12-10 07:54:15 -08:00
#[allow(clippy::too_many_arguments)]
fn retransmit(
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
cluster_info: &ClusterInfo,
r: &Mutex<PacketReceiver>,
sock: &UdpSocket,
id: u32,
stats: &RetransmitStats,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
hasher_reset_ts: &AtomicU64,
shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
rpc_subscriptions: Option<&RpcSubscriptions>,
) -> Result<()> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let r_lock = r.lock().unwrap();
let packets = r_lock.recv_timeout(RECV_TIMEOUT)?;
let mut timer_start = Measure::start("retransmit");
let mut total_packets = packets.packets.len();
let mut packets = vec![packets];
while let Ok(nq) = r_lock.try_recv() {
total_packets += nq.packets.len();
packets.push(nq);
if total_packets >= MAX_PACKET_BATCH_SIZE {
break;
}
}
drop(r_lock);
let mut epoch_fetch = Measure::start("retransmit_epoch_fetch");
let (working_bank, root_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.working_bank(), bank_forks.root_bank())
};
epoch_fetch.stop();
let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts);
epoch_cache_update.stop();
let my_id = cluster_info.id();
let socket_addr_space = cluster_info.socket_addr_space();
let mut retransmit_total = 0;
let mut compute_turbine_peers_total = 0;
let mut retransmit_tree_mismatch = 0;
let mut max_slot = 0;
for packet in packets.iter().flat_map(|p| p.packets.iter()) {
// skip discarded packets and repair packets
if packet.meta.discard {
total_packets -= 1;
continue;
}
if packet.meta.repair {
total_packets -= 1;
continue;
}
let shred_slot = match check_if_already_received(packet, shreds_received) {
Some(slot) => slot,
None => continue,
};
max_slot = max_slot.max(shred_slot);
if let Some(rpc_subscriptions) = rpc_subscriptions {
if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) {
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
slot: shred_slot,
timestamp: timestamp(),
});
}
}
let mut compute_turbine_peers = Measure::start("turbine_start");
let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank));
let cluster_nodes =
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
let (neighbors, children) =
cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader);
// If the node is on the critical path (i.e. the first node in each
// neighborhood), then we expect that the packet arrives at tvu socket
// as opposed to tvu-forwards. If this is not the case, then the
// turbine broadcast/retransmit tree is mismatched across nodes.
let anchor_node = neighbors[0].id == my_id;
if packet.meta.forward == anchor_node {
// TODO: Consider forwarding the packet to the root node here.
retransmit_tree_mismatch += 1;
}
compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_us();
let mut retransmit_time = Measure::start("retransmit_to");
// If the node is on the critical path (i.e. the first node in each
// neighborhood), it should send the packet to tvu socket of its
// children and also tvu_forward socket of its neighbors. Otherwise it
// should only forward to tvu_forward socket of its children.
if anchor_node {
// First neighbor is this node itself, so skip it.
ClusterInfo::retransmit_to(
&neighbors[1..],
packet,
sock,
/*forward socket=*/ true,
socket_addr_space,
);
}
ClusterInfo::retransmit_to(
&children,
packet,
sock,
!anchor_node, // send to forward socket!
socket_addr_space,
);
retransmit_time.stop();
retransmit_total += retransmit_time.as_us();
}
max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed);
timer_start.stop();
debug!(
"retransmitted {} packets in {}ms retransmit_time: {}ms id: {}",
total_packets,
timer_start.as_ms(),
retransmit_total,
id,
);
let cluster_nodes =
cluster_nodes_cache.get(root_bank.slot(), &root_bank, &working_bank, cluster_info);
update_retransmit_stats(
stats,
timer_start.as_us(),
total_packets,
retransmit_total,
compute_turbine_peers_total,
cluster_nodes.num_peers(),
epoch_fetch.as_us(),
epoch_cache_update.as_us(),
retransmit_tree_mismatch,
);
Ok(())
}
/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
2018-10-08 19:55:54 -07:00
/// See `cluster_info` for network layer definitions.
/// # Arguments
/// * `sockets` - Sockets to read from.
/// * `bank_forks` - The BankForks structure
/// * `leader_schedule_cache` - The leader schedule to verify shreds
2018-10-08 19:55:54 -07:00
/// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip.
/// * `r` - Receive channel for shreds to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
sockets: Arc<Vec<UdpSocket>>,
2019-02-21 11:19:45 -08:00
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>,
r: Arc<Mutex<PacketReceiver>>,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Vec<JoinHandle<()>> {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<RetransmitStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
));
let hasher_reset_ts = Arc::default();
let stats = Arc::new(RetransmitStats::default());
let shreds_received = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
let first_shreds_received = Arc::new(Mutex::new(BTreeSet::new()));
(0..sockets.len())
.map(|s| {
let sockets = sockets.clone();
let bank_forks = bank_forks.clone();
let leader_schedule_cache = leader_schedule_cache.clone();
let r = r.clone();
let cluster_info = cluster_info.clone();
let stats = stats.clone();
let cluster_nodes_cache = Arc::clone(&cluster_nodes_cache);
let hasher_reset_ts = Arc::clone(&hasher_reset_ts);
2020-12-10 07:54:15 -08:00
let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone();
let first_shreds_received = first_shreds_received.clone();
let rpc_subscriptions = rpc_subscriptions.clone();
Builder::new()
.name("solana-retransmitter".to_string())
.spawn(move || {
trace!("retransmitter started");
loop {
if let Err(e) = retransmit(
&bank_forks,
&leader_schedule_cache,
&cluster_info,
&r,
&sockets[s],
s as u32,
&stats,
&cluster_nodes_cache,
&hasher_reset_ts,
2020-12-10 07:54:15 -08:00
&shreds_received,
&max_slots,
&first_shreds_received,
rpc_subscriptions.as_deref(),
) {
match e {
Error::RecvTimeout(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeout(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_error!("streamer-retransmit-error", 1, 1);
}
}
}
}
trace!("exiting retransmitter");
})
.unwrap()
})
.collect()
}
pub(crate) struct RetransmitStage {
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
thread_hdls: Vec<JoinHandle<()>>,
window_service: WindowService,
cluster_slots_service: ClusterSlotsService,
}
impl RetransmitStage {
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
blockstore: Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>,
retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<Packets>>,
exit: Arc<AtomicBool>,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
cfg: Option<Arc<AtomicBool>>,
shred_version: u16,
cluster_slots: Arc<ClusterSlots>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
verified_vote_receiver: VerifiedVoteReceiver,
repair_validators: Option<HashSet<Pubkey>>,
completed_data_sets_sender: CompletedDataSetsSender,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
duplicate_slots_sender: Sender<Slot>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver));
let thread_hdls = retransmitter(
retransmit_sockets,
2019-02-21 11:19:45 -08:00
bank_forks.clone(),
leader_schedule_cache.clone(),
cluster_info.clone(),
retransmit_receiver,
max_slots,
rpc_subscriptions,
);
let cluster_slots_service = ClusterSlotsService::new(
blockstore.clone(),
cluster_slots.clone(),
bank_forks.clone(),
cluster_info.clone(),
cluster_slots_update_receiver,
exit.clone(),
);
let leader_schedule_cache_clone = leader_schedule_cache.clone();
let repair_info = RepairInfo {
bank_forks,
epoch_schedule,
duplicate_slots_reset_sender,
repair_validators,
cluster_info,
cluster_slots,
};
let window_service = WindowService::new(
blockstore,
2019-10-28 16:07:51 -07:00
verified_receiver,
retransmit_sender,
repair_socket,
exit,
repair_info,
leader_schedule_cache,
move |id, shred, working_bank, last_root| {
let is_connected = cfg
.as_ref()
.map(|x| x.load(Ordering::Relaxed))
.unwrap_or(true);
let rv = should_retransmit_and_persist(
shred,
working_bank,
&leader_schedule_cache_clone,
id,
last_root,
shred_version,
);
rv && is_connected
},
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,
ancestor_hashes_replay_update_receiver,
);
Self {
thread_hdls,
window_service,
cluster_slots_service,
}
}
pub(crate) fn join(self) -> thread::Result<()> {
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
self.window_service.join()?;
self.cluster_slots_service.join()?;
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use solana_gossip::contact_info::ContactInfo;
use solana_ledger::blockstore_processor::{process_blockstore, ProcessOptions};
use solana_ledger::create_new_tmp_ledger;
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_ledger::shred::Shred;
use solana_net_utils::find_available_port_in_range;
2020-12-10 07:54:15 -08:00
use solana_perf::packet::{Packet, Packets};
use solana_sdk::signature::Keypair;
use solana_streamer::socket::SocketAddrSpace;
use std::net::{IpAddr, Ipv4Addr};
#[test]
fn test_skip_repair() {
2020-12-10 07:54:15 -08:00
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(123);
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
let blockstore = Blockstore::open(&ledger_path).unwrap();
let opts = ProcessOptions {
accounts_db_test_hash_calculation: true,
full_leader_cache: true,
..ProcessOptions::default()
};
2020-05-06 08:24:59 -07:00
let (bank_forks, cached_leader_schedule) =
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap();
let leader_schedule_cache = Arc::new(cached_leader_schedule);
let bank_forks = Arc::new(RwLock::new(bank_forks));
let mut me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let port = find_available_port_in_range(ip_addr, (8000, 10000)).unwrap();
let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap();
// need to make sure tvu and tpu are valid addresses
me.tvu_forwards = me_retransmit.local_addr().unwrap();
let port = find_available_port_in_range(ip_addr, (8000, 10000)).unwrap();
me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port))
.unwrap()
.local_addr()
.unwrap();
// This fixes the order of nodes returned by shuffle_peers_and_index,
// and makes turbine retransmit tree deterministic for the purpose of
// the test.
let other = std::iter::repeat_with(solana_sdk::pubkey::new_rand)
.find(|pk| me.id < *pk)
.unwrap();
let other = ContactInfo::new_localhost(&other, 0);
let cluster_info = ClusterInfo::new(
other,
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
cluster_info.insert_info(me);
let retransmit_socket = Arc::new(vec![UdpSocket::bind("0.0.0.0:0").unwrap()]);
let cluster_info = Arc::new(cluster_info);
let (retransmit_sender, retransmit_receiver) = channel();
let _t_retransmit = retransmitter(
retransmit_socket,
bank_forks,
leader_schedule_cache,
cluster_info,
Arc::new(Mutex::new(retransmit_receiver)),
Arc::default(), // MaxSlots
None,
);
let mut shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
2020-12-10 07:54:15 -08:00
let mut packet = Packet::default();
shred.copy_to_packet(&mut packet);
let packets = Packets::new(vec![packet.clone()]);
// it should send this over the sockets.
retransmit_sender.send(packets).unwrap();
let mut packets = Packets::new(vec![]);
solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap();
assert_eq!(packets.packets.len(), 1);
assert!(!packets.packets[0].meta.repair);
2020-12-10 07:54:15 -08:00
let mut repair = packet.clone();
repair.meta.repair = true;
2020-12-10 07:54:15 -08:00
shred.set_slot(1);
shred.copy_to_packet(&mut packet);
// send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from
2020-12-10 07:54:15 -08:00
let packets = Packets::new(vec![repair, packet]);
retransmit_sender.send(packets).unwrap();
let mut packets = Packets::new(vec![]);
solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap();
assert_eq!(packets.packets.len(), 1);
assert!(!packets.packets[0].meta.repair);
}
#[test]
fn test_already_received() {
let mut packet = Packet::default();
let slot = 1;
let index = 5;
let version = 0x40;
let shred = Shred::new_from_data(slot, index, 0, None, true, true, 0, version, 0);
shred.copy_to_packet(&mut packet);
let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default())));
// unique shred for (1, 5) should pass
assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
// duplicate shred for (1, 5) blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0);
shred.copy_to_packet(&mut packet);
// first duplicate shred for (1, 5) passed
assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
// then blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0);
shred.copy_to_packet(&mut packet);
// 2nd duplicate shred for (1, 5) blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, version);
shred.copy_to_packet(&mut packet);
// Coding at (1, 5) passes
assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
// then blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, version);
shred.copy_to_packet(&mut packet);
// 2nd unique coding at (1, 5) passes
assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
// same again is blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, version);
shred.copy_to_packet(&mut packet);
// Another unique coding at (1, 5) always blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
}
}