Update Cluster Slots to support multiple threads (#9071)

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-03-25 18:09:19 -07:00 committed by GitHub
parent 40eba48109
commit 076fef5e57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 186 additions and 137 deletions

View File

@ -10,6 +10,7 @@ use solana_client::{
}; };
use solana_core::{ use solana_core::{
cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE}, cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE},
cluster_slots::ClusterSlots,
contact_info::ContactInfo, contact_info::ContactInfo,
gossip_service::GossipService, gossip_service::GossipService,
repair_service, repair_service,
@ -187,7 +188,7 @@ impl Archiver {
let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone()); let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone());
cluster_info.set_entrypoint(cluster_entrypoint.clone()); cluster_info.set_entrypoint(cluster_entrypoint.clone());
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let cluster_slots = Arc::new(ClusterSlots::default());
// Note for now, this ledger will not contain any of the existing entries // Note for now, this ledger will not contain any of the existing entries
// in the ledger located at ledger_path, and will only append on newly received // in the ledger located at ledger_path, and will only append on newly received
// entries after being passed to window_service // entries after being passed to window_service
@ -262,6 +263,7 @@ impl Archiver {
repair_socket, repair_socket,
shred_fetch_receiver, shred_fetch_receiver,
slot_sender, slot_sender,
cluster_slots,
) { ) {
Ok(window_service) => window_service, Ok(window_service) => window_service,
Err(e) => { Err(e) => {
@ -400,6 +402,7 @@ impl Archiver {
} }
// Find a segment to replicate and download it. // Find a segment to replicate and download it.
#[allow(clippy::too_many_arguments)]
fn setup( fn setup(
meta: &mut ArchiverMeta, meta: &mut ArchiverMeta,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
@ -410,6 +413,7 @@ impl Archiver {
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
shred_fetch_receiver: PacketReceiver, shred_fetch_receiver: PacketReceiver,
slot_sender: Sender<u64>, slot_sender: Sender<u64>,
cluster_slots: Arc<ClusterSlots>,
) -> Result<WindowService> { ) -> Result<WindowService> {
let slots_per_segment = let slots_per_segment =
match Self::get_segment_config(&cluster_info, meta.client_commitment) { match Self::get_segment_config(&cluster_info, meta.client_commitment) {
@ -467,6 +471,7 @@ impl Archiver {
RepairStrategy::RepairRange(repair_slot_range), RepairStrategy::RepairRange(repair_slot_range),
&Arc::new(LeaderScheduleCache::default()), &Arc::new(LeaderScheduleCache::default()),
|_, _, _, _| true, |_, _, _, _| true,
cluster_slots,
); );
info!("waiting for ledger download"); info!("waiting for ledger download");
Self::wait_for_segment_download( Self::wait_for_segment_download(

View File

@ -2,44 +2,43 @@ use crate::{
cluster_info::ClusterInfo, contact_info::ContactInfo, epoch_slots::EpochSlots, cluster_info::ClusterInfo, contact_info::ContactInfo, epoch_slots::EpochSlots,
serve_repair::RepairType, serve_repair::RepairType,
}; };
use solana_ledger::bank_forks::BankForks;
use solana_ledger::{bank_forks::BankForks, staking_utils}; use solana_runtime::epoch_stakes::NodeIdToVoteAccounts;
use solana_sdk::{clock::Slot, pubkey::Pubkey}; use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
rc::Rc, sync::{Arc, RwLock},
sync::RwLock,
}; };
pub type SlotPubkeys = HashMap<Arc<Pubkey>, u64>;
pub type ClusterSlotsMap = RwLock<HashMap<Slot, Arc<RwLock<SlotPubkeys>>>>;
#[derive(Default)] #[derive(Default)]
pub struct ClusterSlots { pub struct ClusterSlots {
cluster_slots: HashMap<Slot, HashMap<Rc<Pubkey>, u64>>, cluster_slots: ClusterSlotsMap,
keys: HashSet<Rc<Pubkey>>, keys: RwLock<HashSet<Arc<Pubkey>>>,
since: Option<u64>, since: RwLock<Option<u64>>,
validator_stakes: HashMap<Rc<Pubkey>, u64>, validator_stakes: RwLock<Arc<NodeIdToVoteAccounts>>,
epoch: Option<u64>, epoch: RwLock<Option<u64>>,
self_id: Pubkey, self_id: RwLock<Pubkey>,
} }
impl ClusterSlots { impl ClusterSlots {
pub fn lookup(&self, slot: Slot) -> Option<&HashMap<Rc<Pubkey>, u64>> { pub fn lookup(&self, slot: Slot) -> Option<Arc<RwLock<SlotPubkeys>>> {
self.cluster_slots.get(&slot) self.cluster_slots.read().unwrap().get(&slot).cloned()
} }
pub fn update( pub fn update(
&mut self, &self,
root: Slot, root: Slot,
cluster_info: &RwLock<ClusterInfo>, cluster_info: &RwLock<ClusterInfo>,
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
) { ) {
self.update_peers(cluster_info, bank_forks); self.update_peers(cluster_info, bank_forks);
let epoch_slots = cluster_info let since = *self.since.read().unwrap();
.read() let epoch_slots = cluster_info.read().unwrap().get_epoch_slots_since(since);
.unwrap()
.get_epoch_slots_since(self.since);
self.update_internal(root, epoch_slots); self.update_internal(root, epoch_slots);
} }
fn update_internal(&mut self, root: Slot, epoch_slots: (Vec<EpochSlots>, Option<u64>)) { fn update_internal(&self, root: Slot, epoch_slots: (Vec<EpochSlots>, Option<u64>)) {
let (epoch_slots_list, since) = epoch_slots; let (epoch_slots_list, since) = epoch_slots;
for epoch_slots in epoch_slots_list { for epoch_slots in epoch_slots_list {
let slots = epoch_slots.to_slots(root); let slots = epoch_slots.to_slots(root);
@ -47,97 +46,74 @@ impl ClusterSlots {
if *slot <= root { if *slot <= root {
continue; continue;
} }
let pubkey = Rc::new(epoch_slots.from); let pubkey = Arc::new(epoch_slots.from);
if self.keys.get(&pubkey).is_none() { let exists = self.keys.read().unwrap().get(&pubkey).is_some();
self.keys.insert(pubkey.clone()); if !exists {
self.keys.write().unwrap().insert(pubkey.clone());
} }
let from = self.keys.get(&pubkey).unwrap(); let from = self.keys.read().unwrap().get(&pubkey).unwrap().clone();
let balance = self.validator_stakes.get(from).cloned().unwrap_or(0); let balance = self
if self.self_id != **from { .validator_stakes
debug!( .read()
"CLUSTER_SLLOTS: {}: insert {} {} {}", .unwrap()
self.self_id, from, *slot, balance .get(&from)
); .map(|v| v.total_stake)
.unwrap_or(0);
let mut slot_pubkeys = self.cluster_slots.read().unwrap().get(slot).cloned();
if slot_pubkeys.is_none() {
let new_slot_pubkeys = Arc::new(RwLock::new(HashMap::default()));
self.cluster_slots
.write()
.unwrap()
.insert(*slot, new_slot_pubkeys.clone());
slot_pubkeys = Some(new_slot_pubkeys);
} }
self.cluster_slots
.entry(*slot) slot_pubkeys
.or_insert_with(HashMap::default) .unwrap()
.write()
.unwrap()
.insert(from.clone(), balance); .insert(from.clone(), balance);
} }
} }
self.cluster_slots.retain(|x, _| *x > root); self.cluster_slots.write().unwrap().retain(|x, _| *x > root);
self.keys.retain(|x| Rc::strong_count(x) > 1); self.keys
self.since = since; .write()
} .unwrap()
pub fn stats(&self) -> (usize, usize, f64) { .retain(|x| Arc::strong_count(x) > 1);
let my_slots = self.collect(&self.self_id); *self.since.write().unwrap() = since;
let others: HashMap<_, _> = self
.cluster_slots
.iter()
.filter(|(x, _)| !my_slots.contains(x))
.flat_map(|(_, x)| x.iter())
.collect();
let other_slots: Vec<Slot> = self
.cluster_slots
.iter()
.filter(|(x, _)| !my_slots.contains(x))
.map(|(x, _)| *x)
.collect();
let weight: u64 = others.values().map(|x| **x).sum();
let keys: Vec<Rc<Pubkey>> = others.keys().copied().cloned().collect();
let total: u64 = self.validator_stakes.values().copied().sum::<u64>() + 1u64;
if !other_slots.is_empty() {
debug!(
"{}: CLUSTER_SLOTS STATS {} {:?} {:?}",
self.self_id,
weight as f64 / total as f64,
keys,
other_slots
);
}
(
my_slots.len(),
self.cluster_slots.len(),
weight as f64 / total as f64,
)
} }
pub fn collect(&self, id: &Pubkey) -> HashSet<Slot> { pub fn collect(&self, id: &Pubkey) -> HashSet<Slot> {
self.cluster_slots self.cluster_slots
.read()
.unwrap()
.iter() .iter()
.filter(|(_, keys)| keys.get(id).is_some()) .filter(|(_, keys)| keys.read().unwrap().get(id).is_some())
.map(|(slot, _)| slot) .map(|(slot, _)| slot)
.cloned() .cloned()
.collect() .collect()
} }
fn update_peers(&mut self, cluster_info: &RwLock<ClusterInfo>, bank_forks: &RwLock<BankForks>) { fn update_peers(&self, cluster_info: &RwLock<ClusterInfo>, bank_forks: &RwLock<BankForks>) {
let root = bank_forks.read().unwrap().root(); let root_bank = bank_forks.read().unwrap().root_bank().clone();
let (epoch, _) = bank_forks let root_epoch = root_bank.epoch();
.read() let my_epoch = *self.epoch.read().unwrap();
.unwrap()
.working_bank() if Some(root_epoch) != my_epoch {
.get_epoch_and_slot_index(root); let validator_stakes = root_bank
if Some(epoch) != self.epoch { .epoch_stakes(root_epoch)
let stakes = staking_utils::staked_nodes_at_epoch( .expect(
&bank_forks.read().unwrap().working_bank(), "Bank must have epoch stakes
epoch, for its own epoch",
); )
if stakes.is_none() { .node_id_to_vote_accounts()
return; .clone();
}
let stakes = stakes.unwrap(); *self.validator_stakes.write().unwrap() = validator_stakes;
self.validator_stakes = HashMap::new(); let id = cluster_info.read().unwrap().id();
for (from, bal) in stakes { *self.self_id.write().unwrap() = id;
let pubkey = Rc::new(from); *self.epoch.write().unwrap() = Some(root_epoch);
if self.keys.get(&pubkey).is_none() {
self.keys.insert(pubkey.clone());
}
let from = self.keys.get(&pubkey).unwrap();
self.validator_stakes.insert(from.clone(), bal);
}
self.self_id = cluster_info.read().unwrap().id();
self.epoch = Some(epoch);
} }
} }
@ -147,9 +123,19 @@ impl ClusterSlots {
.iter() .iter()
.enumerate() .enumerate()
.map(|(i, x)| { .map(|(i, x)| {
let peer_stake = slot_peers
.as_ref()
.and_then(|v| v.read().unwrap().get(&x.id).cloned())
.unwrap_or(0);
( (
1 + slot_peers.and_then(|v| v.get(&x.id)).cloned().unwrap_or(0) 1 + peer_stake
+ self.validator_stakes.get(&x.id).cloned().unwrap_or(0), + self
.validator_stakes
.read()
.unwrap()
.get(&x.id)
.map(|v| v.total_stake)
.unwrap_or(0),
i, i,
) )
}) })
@ -163,6 +149,8 @@ impl ClusterSlots {
) -> Vec<RepairType> { ) -> Vec<RepairType> {
let my_slots = self.collect(self_id); let my_slots = self.collect(self_id);
self.cluster_slots self.cluster_slots
.read()
.unwrap()
.keys() .keys()
.filter(|x| **x > root) .filter(|x| **x > root)
.filter(|x| !my_slots.contains(*x)) .filter(|x| !my_slots.contains(*x))
@ -174,52 +162,60 @@ impl ClusterSlots {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use solana_runtime::epoch_stakes::NodeVoteAccounts;
#[test] #[test]
fn test_default() { fn test_default() {
let cs = ClusterSlots::default(); let cs = ClusterSlots::default();
assert!(cs.cluster_slots.is_empty()); assert!(cs.cluster_slots.read().unwrap().is_empty());
assert!(cs.since.is_none()); assert!(cs.since.read().unwrap().is_none());
} }
#[test] #[test]
fn test_update_noop() { fn test_update_noop() {
let mut cs = ClusterSlots::default(); let cs = ClusterSlots::default();
cs.update_internal(0, (vec![], None)); cs.update_internal(0, (vec![], None));
assert!(cs.cluster_slots.is_empty()); assert!(cs.cluster_slots.read().unwrap().is_empty());
assert!(cs.since.is_none()); assert!(cs.since.read().unwrap().is_none());
} }
#[test] #[test]
fn test_update_empty() { fn test_update_empty() {
let mut cs = ClusterSlots::default(); let cs = ClusterSlots::default();
let epoch_slot = EpochSlots::default(); let epoch_slot = EpochSlots::default();
cs.update_internal(0, (vec![epoch_slot], Some(0))); cs.update_internal(0, (vec![epoch_slot], Some(0)));
assert_eq!(cs.since, Some(0)); assert_eq!(*cs.since.read().unwrap(), Some(0));
assert!(cs.lookup(0).is_none()); assert!(cs.lookup(0).is_none());
} }
#[test] #[test]
fn test_update_rooted() { fn test_update_rooted() {
//root is 0, so it should clear out the slot //root is 0, so it should clear out the slot
let mut cs = ClusterSlots::default(); let cs = ClusterSlots::default();
let mut epoch_slot = EpochSlots::default(); let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[0], 0); epoch_slot.fill(&[0], 0);
cs.update_internal(0, (vec![epoch_slot], Some(0))); cs.update_internal(0, (vec![epoch_slot], Some(0)));
assert_eq!(cs.since, Some(0)); assert_eq!(*cs.since.read().unwrap(), Some(0));
assert!(cs.lookup(0).is_none()); assert!(cs.lookup(0).is_none());
} }
#[test] #[test]
fn test_update_new_slot() { fn test_update_new_slot() {
let mut cs = ClusterSlots::default(); let cs = ClusterSlots::default();
let mut epoch_slot = EpochSlots::default(); let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[1], 0); epoch_slot.fill(&[1], 0);
cs.update_internal(0, (vec![epoch_slot], Some(0))); cs.update_internal(0, (vec![epoch_slot], Some(0)));
assert_eq!(cs.since, Some(0)); assert_eq!(*cs.since.read().unwrap(), Some(0));
assert!(cs.lookup(0).is_none()); assert!(cs.lookup(0).is_none());
assert!(cs.lookup(1).is_some()); assert!(cs.lookup(1).is_some());
assert_eq!(cs.lookup(1).unwrap().get(&Pubkey::default()), Some(&0)); assert_eq!(
cs.lookup(1)
.unwrap()
.read()
.unwrap()
.get(&Pubkey::default()),
Some(&0)
);
} }
#[test] #[test]
@ -231,15 +227,18 @@ mod tests {
#[test] #[test]
fn test_best_peer_2() { fn test_best_peer_2() {
let mut cs = ClusterSlots::default(); let cs = ClusterSlots::default();
let mut c1 = ContactInfo::default(); let mut c1 = ContactInfo::default();
let mut c2 = ContactInfo::default(); let mut c2 = ContactInfo::default();
let mut map = HashMap::new(); let mut map = HashMap::new();
let k1 = Pubkey::new_rand(); let k1 = Pubkey::new_rand();
let k2 = Pubkey::new_rand(); let k2 = Pubkey::new_rand();
map.insert(Rc::new(k1.clone()), std::u64::MAX / 2); map.insert(Arc::new(k1.clone()), std::u64::MAX / 2);
map.insert(Rc::new(k2.clone()), 0); map.insert(Arc::new(k2.clone()), 0);
cs.cluster_slots.insert(0, map); cs.cluster_slots
.write()
.unwrap()
.insert(0, Arc::new(RwLock::new(map)));
c1.id = k1; c1.id = k1;
c2.id = k2; c2.id = k2;
assert_eq!( assert_eq!(
@ -250,17 +249,28 @@ mod tests {
#[test] #[test]
fn test_best_peer_3() { fn test_best_peer_3() {
let mut cs = ClusterSlots::default(); let cs = ClusterSlots::default();
let mut c1 = ContactInfo::default(); let mut c1 = ContactInfo::default();
let mut c2 = ContactInfo::default(); let mut c2 = ContactInfo::default();
let mut map = HashMap::new(); let mut map = HashMap::new();
let k1 = Pubkey::new_rand(); let k1 = Pubkey::new_rand();
let k2 = Pubkey::new_rand(); let k2 = Pubkey::new_rand();
map.insert(Rc::new(k2.clone()), 0); map.insert(Arc::new(k2.clone()), 0);
cs.cluster_slots.insert(0, map); cs.cluster_slots
.write()
.unwrap()
.insert(0, Arc::new(RwLock::new(map)));
//make sure default weights are used as well //make sure default weights are used as well
cs.validator_stakes let validator_stakes: HashMap<_, _> = vec![(
.insert(Rc::new(k1.clone()), std::u64::MAX / 2); *Arc::new(k1.clone()),
NodeVoteAccounts {
total_stake: std::u64::MAX / 2,
vote_accounts: vec![Pubkey::default()],
},
)]
.into_iter()
.collect();
*cs.validator_stakes.write().unwrap() = Arc::new(validator_stakes);
c1.id = k1; c1.id = k1;
c2.id = k2; c2.id = k2;
assert_eq!( assert_eq!(
@ -271,19 +281,38 @@ mod tests {
#[test] #[test]
fn test_update_new_staked_slot() { fn test_update_new_staked_slot() {
let mut cs = ClusterSlots::default(); let cs = ClusterSlots::default();
let mut epoch_slot = EpochSlots::default(); let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[1], 0); epoch_slot.fill(&[1], 0);
let map = vec![(Rc::new(Pubkey::default()), 1)].into_iter().collect();
cs.validator_stakes = map; let map = Arc::new(
vec![(
Pubkey::default(),
NodeVoteAccounts {
total_stake: 1,
vote_accounts: vec![Pubkey::default()],
},
)]
.into_iter()
.collect(),
);
*cs.validator_stakes.write().unwrap() = map;
cs.update_internal(0, (vec![epoch_slot], None)); cs.update_internal(0, (vec![epoch_slot], None));
assert!(cs.lookup(1).is_some()); assert!(cs.lookup(1).is_some());
assert_eq!(cs.lookup(1).unwrap().get(&Pubkey::default()), Some(&1)); assert_eq!(
cs.lookup(1)
.unwrap()
.read()
.unwrap()
.get(&Pubkey::default()),
Some(&1)
);
} }
#[test] #[test]
fn test_generate_repairs() { fn test_generate_repairs() {
let mut cs = ClusterSlots::default(); let cs = ClusterSlots::default();
let mut epoch_slot = EpochSlots::default(); let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[1], 0); epoch_slot.fill(&[1], 0);
cs.update_internal(0, (vec![epoch_slot], None)); cs.update_internal(0, (vec![epoch_slot], None));
@ -296,7 +325,7 @@ mod tests {
#[test] #[test]
fn test_collect_my_slots() { fn test_collect_my_slots() {
let mut cs = ClusterSlots::default(); let cs = ClusterSlots::default();
let mut epoch_slot = EpochSlots::default(); let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[1], 0); epoch_slot.fill(&[1], 0);
let self_id = epoch_slot.from; let self_id = epoch_slot.from;
@ -307,7 +336,7 @@ mod tests {
#[test] #[test]
fn test_generate_repairs_existing() { fn test_generate_repairs_existing() {
let mut cs = ClusterSlots::default(); let cs = ClusterSlots::default();
let mut epoch_slot = EpochSlots::default(); let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[1], 0); epoch_slot.fill(&[1], 0);
let self_id = epoch_slot.from; let self_id = epoch_slot.from;

View File

@ -61,6 +61,7 @@ impl RepairService {
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
repair_strategy: RepairStrategy, repair_strategy: RepairStrategy,
cluster_slots: Arc<ClusterSlots>,
) -> Self { ) -> Self {
let t_repair = Builder::new() let t_repair = Builder::new()
.name("solana-repair-service".to_string()) .name("solana-repair-service".to_string())
@ -71,6 +72,7 @@ impl RepairService {
&repair_socket, &repair_socket,
&cluster_info, &cluster_info,
repair_strategy, repair_strategy,
&cluster_slots,
) )
}) })
.unwrap(); .unwrap();
@ -79,15 +81,15 @@ impl RepairService {
} }
fn run( fn run(
blockstore: &Arc<Blockstore>, blockstore: &Blockstore,
exit: &Arc<AtomicBool>, exit: &AtomicBool,
repair_socket: &Arc<UdpSocket>, repair_socket: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
repair_strategy: RepairStrategy, repair_strategy: RepairStrategy,
cluster_slots: &Arc<ClusterSlots>,
) { ) {
let serve_repair = ServeRepair::new(cluster_info.clone()); let serve_repair = ServeRepair::new(cluster_info.clone());
let id = cluster_info.read().unwrap().id(); let id = cluster_info.read().unwrap().id();
let mut cluster_slots = ClusterSlots::default();
if let RepairStrategy::RepairAll { .. } = repair_strategy { if let RepairStrategy::RepairAll { .. } = repair_strategy {
Self::initialize_lowest_slot(id, blockstore, cluster_info); Self::initialize_lowest_slot(id, blockstore, cluster_info);
} }
@ -118,7 +120,7 @@ impl RepairService {
Self::update_completed_slots( Self::update_completed_slots(
&id, &id,
new_root, new_root,
&mut cluster_slots, &cluster_slots,
blockstore, blockstore,
completed_slots_receiver, completed_slots_receiver,
&cluster_info, &cluster_info,
@ -277,7 +279,7 @@ impl RepairService {
fn update_completed_slots( fn update_completed_slots(
id: &Pubkey, id: &Pubkey,
root: Slot, root: Slot,
cluster_slots: &mut ClusterSlots, cluster_slots: &ClusterSlots,
blockstore: &Blockstore, blockstore: &Blockstore,
completed_slots_receiver: &CompletedSlotsReceiver, completed_slots_receiver: &CompletedSlotsReceiver,
cluster_info: &RwLock<ClusterInfo>, cluster_info: &RwLock<ClusterInfo>,

View File

@ -2,6 +2,7 @@
use crate::{ use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
cluster_slots::ClusterSlots,
repair_service::RepairStrategy, repair_service::RepairStrategy,
result::{Error, Result}, result::{Error, Result},
window_service::{should_retransmit_and_persist, WindowService}, window_service::{should_retransmit_and_persist, WindowService},
@ -214,6 +215,7 @@ impl RetransmitStage {
epoch_schedule: EpochSchedule, epoch_schedule: EpochSchedule,
cfg: Option<Arc<AtomicBool>>, cfg: Option<Arc<AtomicBool>>,
shred_version: u16, shred_version: u16,
cluster_slots: Arc<ClusterSlots>,
) -> Self { ) -> Self {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
@ -256,6 +258,7 @@ impl RetransmitStage {
); );
rv && is_connected rv && is_connected
}, },
cluster_slots,
); );
let thread_hdls = t_retransmit; let thread_hdls = t_retransmit;

View File

@ -7,6 +7,7 @@ use crate::{
broadcast_stage::RetransmitSlotsSender, broadcast_stage::RetransmitSlotsSender,
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker, cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots,
commitment::BlockCommitmentCache, commitment::BlockCommitmentCache,
ledger_cleanup_service::LedgerCleanupService, ledger_cleanup_service::LedgerCleanupService,
poh_recorder::PohRecorder, poh_recorder::PohRecorder,
@ -145,6 +146,7 @@ impl Tvu {
) )
}; };
let cluster_slots = Arc::new(ClusterSlots::default());
let retransmit_stage = RetransmitStage::new( let retransmit_stage = RetransmitStage::new(
bank_forks.clone(), bank_forks.clone(),
leader_schedule_cache, leader_schedule_cache,
@ -158,6 +160,7 @@ impl Tvu {
*bank_forks.read().unwrap().working_bank().epoch_schedule(), *bank_forks.read().unwrap().working_bank().epoch_schedule(),
cfg, cfg,
tvu_config.shred_version, tvu_config.shred_version,
cluster_slots,
); );
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();

View File

@ -884,7 +884,7 @@ mod tests {
}) })
.collect(); .collect();
// Each validator can exit in parallel to speed many sequential calls to `join` // Each validator can exit in parallel to speed many sequential calls to join`
validators.iter_mut().for_each(|v| v.exit()); validators.iter_mut().for_each(|v| v.exit());
// While join is called sequentially, the above exit call notified all the // While join is called sequentially, the above exit call notified all the
// validators to exit from all their threads // validators to exit from all their threads

View File

@ -1,9 +1,12 @@
//! `window_service` handles the data plane incoming shreds, storing them in //! `window_service` handles the data plane incoming shreds, storing them in
//! blockstore and retransmitting where required //! blockstore and retransmitting where required
//! //!
use crate::cluster_info::ClusterInfo; use crate::{
use crate::repair_service::{RepairService, RepairStrategy}; cluster_info::ClusterInfo,
use crate::result::{Error, Result}; cluster_slots::ClusterSlots,
repair_service::{RepairService, RepairStrategy},
result::{Error, Result},
};
use crossbeam_channel::{ use crossbeam_channel::{
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
}; };
@ -252,6 +255,7 @@ impl WindowService {
repair_strategy: RepairStrategy, repair_strategy: RepairStrategy,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
shred_filter: F, shred_filter: F,
cluster_slots: Arc<ClusterSlots>,
) -> WindowService ) -> WindowService
where where
F: 'static F: 'static
@ -270,6 +274,7 @@ impl WindowService {
repair_socket, repair_socket,
cluster_info.clone(), cluster_info.clone(),
repair_strategy, repair_strategy,
cluster_slots,
); );
let (insert_sender, insert_receiver) = unbounded(); let (insert_sender, insert_receiver) = unbounded();
@ -620,6 +625,7 @@ mod test {
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
ContactInfo::new_localhost(&Pubkey::default(), 0), ContactInfo::new_localhost(&Pubkey::default(), 0),
))); )));
let cluster_slots = Arc::new(ClusterSlots::default());
let repair_sock = Arc::new(UdpSocket::bind(socketaddr_any!()).unwrap()); let repair_sock = Arc::new(UdpSocket::bind(socketaddr_any!()).unwrap());
let window = WindowService::new( let window = WindowService::new(
blockstore, blockstore,
@ -631,6 +637,7 @@ mod test {
RepairStrategy::RepairRange(RepairSlotRange { start: 0, end: 0 }), RepairStrategy::RepairRange(RepairSlotRange { start: 0, end: 0 }),
&Arc::new(LeaderScheduleCache::default()), &Arc::new(LeaderScheduleCache::default()),
|_, _, _, _| true, |_, _, _, _| true,
cluster_slots,
); );
window window
} }