Move cluster slots update to separate thread (#11523)

* Add cluster_slots_service

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-08-11 12:48:13 -07:00 committed by GitHub
parent 17645ee20c
commit 7ef50a9352
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 206 additions and 105 deletions

View File

@ -0,0 +1,191 @@
use crate::{cluster_info::ClusterInfo, cluster_slots::ClusterSlots};
use solana_ledger::blockstore::{Blockstore, CompletedSlotsReceiver};
use solana_measure::measure::Measure;
use solana_runtime::bank_forks::BankForks;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
{Arc, RwLock},
},
thread::sleep,
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
};
#[derive(Default, Debug)]
struct ClusterSlotsServiceTiming {
pub lowest_slot_elapsed: u64,
pub update_completed_slots_elapsed: u64,
}
impl ClusterSlotsServiceTiming {
fn update(&mut self, lowest_slot_elapsed: u64, update_completed_slots_elapsed: u64) {
self.lowest_slot_elapsed += lowest_slot_elapsed;
self.update_completed_slots_elapsed += update_completed_slots_elapsed;
}
}
pub struct ClusterSlotsService {
t_cluster_slots_service: JoinHandle<()>,
}
impl ClusterSlotsService {
pub fn new(
blockstore: Arc<Blockstore>,
cluster_slots: Arc<ClusterSlots>,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
completed_slots_receiver: CompletedSlotsReceiver,
exit: Arc<AtomicBool>,
) -> Self {
let id = cluster_info.id();
Self::initialize_lowest_slot(id, &blockstore, &cluster_info);
Self::initialize_epoch_slots(&blockstore, &cluster_info, &completed_slots_receiver);
let t_cluster_slots_service = Builder::new()
.name("solana-cluster-slots-service".to_string())
.spawn(move || {
Self::run(
blockstore,
cluster_slots,
bank_forks,
cluster_info,
completed_slots_receiver,
exit,
)
})
.unwrap();
ClusterSlotsService {
t_cluster_slots_service,
}
}
pub fn join(self) -> thread::Result<()> {
self.t_cluster_slots_service.join()
}
fn run(
blockstore: Arc<Blockstore>,
cluster_slots: Arc<ClusterSlots>,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
completed_slots_receiver: CompletedSlotsReceiver,
exit: Arc<AtomicBool>,
) {
let mut cluster_slots_service_timing = ClusterSlotsServiceTiming::default();
let mut last_stats = Instant::now();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let new_root = bank_forks.read().unwrap().root();
let id = cluster_info.id();
let mut lowest_slot_elapsed = Measure::start("lowest_slot_elapsed");
let lowest_slot = blockstore.lowest_slot();
Self::update_lowest_slot(&id, lowest_slot, &cluster_info);
lowest_slot_elapsed.stop();
let mut update_completed_slots_elapsed =
Measure::start("update_completed_slots_elapsed");
Self::update_completed_slots(&completed_slots_receiver, &cluster_info);
cluster_slots.update(new_root, &cluster_info, &bank_forks);
update_completed_slots_elapsed.stop();
cluster_slots_service_timing.update(
lowest_slot_elapsed.as_us(),
update_completed_slots_elapsed.as_us(),
);
if last_stats.elapsed().as_secs() > 2 {
datapoint_info!(
"cluster_slots_service-timing",
(
"lowest_slot_elapsed",
cluster_slots_service_timing.lowest_slot_elapsed,
i64
),
(
"update_completed_slots_elapsed",
cluster_slots_service_timing.update_completed_slots_elapsed,
i64
),
);
cluster_slots_service_timing = ClusterSlotsServiceTiming::default();
last_stats = Instant::now();
}
sleep(Duration::from_millis(200));
}
}
fn update_completed_slots(
completed_slots_receiver: &CompletedSlotsReceiver,
cluster_info: &ClusterInfo,
) {
let mut slots: Vec<Slot> = vec![];
while let Ok(mut more) = completed_slots_receiver.try_recv() {
slots.append(&mut more);
}
slots.sort();
if !slots.is_empty() {
cluster_info.push_epoch_slots(&slots);
}
}
fn initialize_lowest_slot(id: Pubkey, blockstore: &Blockstore, cluster_info: &ClusterInfo) {
// Safe to set into gossip because by this time, the leader schedule cache should
// also be updated with the latest root (done in blockstore_processor) and thus
// will provide a schedule to window_service for any incoming shreds up to the
// last_confirmed_epoch.
cluster_info.push_lowest_slot(id, blockstore.lowest_slot());
}
fn update_lowest_slot(id: &Pubkey, lowest_slot: Slot, cluster_info: &ClusterInfo) {
cluster_info.push_lowest_slot(*id, lowest_slot);
}
fn initialize_epoch_slots(
blockstore: &Blockstore,
cluster_info: &ClusterInfo,
completed_slots_receiver: &CompletedSlotsReceiver,
) {
let root = blockstore.last_root();
let mut slots: Vec<_> = blockstore
.live_slots_iterator(root)
.filter_map(|(slot, slot_meta)| {
if slot_meta.is_full() {
Some(slot)
} else {
None
}
})
.collect();
while let Ok(mut more) = completed_slots_receiver.try_recv() {
slots.append(&mut more);
}
slots.sort();
slots.dedup();
if !slots.is_empty() {
cluster_info.push_epoch_slots(&slots);
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::cluster_info::Node;
#[test]
pub fn test_update_lowest_slot() {
let node_info = Node::new_localhost_with_pubkey(&Pubkey::default());
let cluster_info = ClusterInfo::new_with_invalid_keypair(node_info.info);
ClusterSlotsService::update_lowest_slot(&Pubkey::default(), 5, &cluster_info);
let lowest = cluster_info
.get_lowest_slot_for_node(&Pubkey::default(), None, |lowest_slot, _| {
lowest_slot.clone()
})
.unwrap();
assert_eq!(lowest.lowest, 5);
}
}

View File

@ -19,6 +19,7 @@ pub mod contact_info;
pub mod bank_weight_fork_choice;
pub mod cluster_info;
pub mod cluster_slots;
pub mod cluster_slots_service;
pub mod consensus;
pub mod crds;
pub mod crds_gossip;

View File

@ -11,7 +11,7 @@ use crate::{
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use solana_ledger::{
blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta},
blockstore::{Blockstore, SlotMeta},
shred::Nonce,
};
use solana_measure::measure::Measure;
@ -78,8 +78,6 @@ pub struct RepairTiming {
pub set_root_elapsed: u64,
pub get_votes_elapsed: u64,
pub add_votes_elapsed: u64,
pub lowest_slot_elapsed: u64,
pub update_completed_slots_elapsed: u64,
pub get_best_orphans_elapsed: u64,
pub get_best_shreds_elapsed: u64,
pub send_repairs_elapsed: u64,
@ -91,15 +89,11 @@ impl RepairTiming {
set_root_elapsed: u64,
get_votes_elapsed: u64,
add_votes_elapsed: u64,
lowest_slot_elapsed: u64,
update_completed_slots_elapsed: u64,
send_repairs_elapsed: u64,
) {
self.set_root_elapsed += set_root_elapsed;
self.get_votes_elapsed += get_votes_elapsed;
self.add_votes_elapsed += add_votes_elapsed;
self.lowest_slot_elapsed += lowest_slot_elapsed;
self.update_completed_slots_elapsed += update_completed_slots_elapsed;
self.send_repairs_elapsed += send_repairs_elapsed;
}
}
@ -112,7 +106,6 @@ pub const MAX_ORPHANS: usize = 5;
pub struct RepairInfo {
pub bank_forks: Arc<RwLock<BankForks>>,
pub completed_slots_receiver: CompletedSlotsReceiver,
pub epoch_schedule: EpochSchedule,
pub duplicate_slots_reset_sender: DuplicateSlotsResetSender,
}
@ -181,18 +174,12 @@ impl RepairService {
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
let serve_repair = ServeRepair::new(cluster_info.clone());
let id = cluster_info.id();
Self::initialize_lowest_slot(id, blockstore, &cluster_info);
let mut repair_stats = RepairStats::default();
let mut repair_timing = RepairTiming::default();
let mut last_stats = Instant::now();
let duplicate_slot_repair_statuses: HashMap<Slot, DuplicateSlotRepairStatus> =
HashMap::new();
Self::initialize_epoch_slots(
blockstore,
&cluster_info,
&repair_info.completed_slots_receiver,
);
loop {
if exit.load(Ordering::Relaxed) {
break;
@ -201,8 +188,6 @@ impl RepairService {
let mut set_root_elapsed;
let mut get_votes_elapsed;
let mut add_votes_elapsed;
let mut lowest_slot_elapsed;
let mut update_completed_slots_elapsed;
let repairs = {
let root_bank = repair_info.bank_forks.read().unwrap().root_bank().clone();
let new_root = root_bank.slot();
@ -235,15 +220,6 @@ impl RepairService {
root_bank.epoch_schedule(),
);
add_votes_elapsed.stop();
lowest_slot_elapsed = Measure::start("lowest_slot_elapsed");
let lowest_slot = blockstore.lowest_slot();
Self::update_lowest_slot(&id, lowest_slot, &cluster_info);
lowest_slot_elapsed.stop();
update_completed_slots_elapsed = Measure::start("update_completed_slots_elapsed");
Self::update_completed_slots(&repair_info.completed_slots_receiver, &cluster_info);
cluster_slots.update(new_root, &cluster_info, &repair_info.bank_forks);
update_completed_slots_elapsed.stop();
/*let new_duplicate_slots = Self::find_new_duplicate_slots(
&duplicate_slot_repair_statuses,
blockstore,
@ -299,8 +275,6 @@ impl RepairService {
set_root_elapsed.as_us(),
get_votes_elapsed.as_us(),
add_votes_elapsed.as_us(),
lowest_slot_elapsed.as_us(),
update_completed_slots_elapsed.as_us(),
send_repairs_elapsed.as_us(),
);
@ -335,16 +309,6 @@ impl RepairService {
repair_timing.get_best_shreds_elapsed,
i64
),
(
"lowest-slot-elapsed",
repair_timing.lowest_slot_elapsed,
i64
),
(
"update-completed-slots-elapsed",
repair_timing.update_completed_slots_elapsed,
i64
),
(
"send-repairs-elapsed",
repair_timing.send_repairs_elapsed,
@ -650,59 +614,6 @@ impl RepairService {
.collect()
}
fn initialize_lowest_slot(id: Pubkey, blockstore: &Blockstore, cluster_info: &ClusterInfo) {
// Safe to set into gossip because by this time, the leader schedule cache should
// also be updated with the latest root (done in blockstore_processor) and thus
// will provide a schedule to window_service for any incoming shreds up to the
// last_confirmed_epoch.
cluster_info.push_lowest_slot(id, blockstore.lowest_slot());
}
fn update_completed_slots(
completed_slots_receiver: &CompletedSlotsReceiver,
cluster_info: &ClusterInfo,
) {
let mut slots: Vec<Slot> = vec![];
while let Ok(mut more) = completed_slots_receiver.try_recv() {
slots.append(&mut more);
}
slots.sort();
if !slots.is_empty() {
cluster_info.push_epoch_slots(&slots);
}
}
fn update_lowest_slot(id: &Pubkey, lowest_slot: Slot, cluster_info: &ClusterInfo) {
cluster_info.push_lowest_slot(*id, lowest_slot);
}
fn initialize_epoch_slots(
blockstore: &Blockstore,
cluster_info: &ClusterInfo,
completed_slots_receiver: &CompletedSlotsReceiver,
) {
let root = blockstore.last_root();
let mut slots: Vec<_> = blockstore
.live_slots_iterator(root)
.filter_map(|(slot, slot_meta)| {
if slot_meta.is_full() {
Some(slot)
} else {
None
}
})
.collect();
while let Ok(mut more) = completed_slots_receiver.try_recv() {
slots.append(&mut more);
}
slots.sort();
slots.dedup();
if !slots.is_empty() {
cluster_info.push_epoch_slots(&slots);
}
}
pub fn join(self) -> thread::Result<()> {
self.t_repair.join()
}
@ -980,19 +891,6 @@ mod test {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_update_lowest_slot() {
let node_info = Node::new_localhost_with_pubkey(&Pubkey::default());
let cluster_info = ClusterInfo::new_with_invalid_keypair(node_info.info);
RepairService::update_lowest_slot(&Pubkey::default(), 5, &cluster_info);
let lowest = cluster_info
.get_lowest_slot_for_node(&Pubkey::default(), None, |lowest_slot, _| {
lowest_slot.clone()
})
.unwrap();
assert_eq!(lowest.lowest, 5);
}
#[test]
pub fn test_generate_duplicate_repairs_for_slot() {
let blockstore_path = get_tmp_ledger_path!();

View File

@ -4,6 +4,7 @@ use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots,
cluster_slots_service::ClusterSlotsService,
contact_info::ContactInfo,
repair_service::DuplicateSlotsResetSender,
repair_service::RepairInfo,
@ -394,6 +395,7 @@ pub fn retransmitter(
pub struct RetransmitStage {
thread_hdls: Vec<JoinHandle<()>>,
window_service: WindowService,
cluster_slots_service: ClusterSlotsService,
}
impl RetransmitStage {
@ -427,13 +429,20 @@ impl RetransmitStage {
retransmit_receiver,
);
let leader_schedule_cache_clone = leader_schedule_cache.clone();
let cluster_slots_service = ClusterSlotsService::new(
blockstore.clone(),
cluster_slots.clone(),
bank_forks.clone(),
cluster_info.clone(),
completed_slots_receiver,
exit.clone(),
);
let repair_info = RepairInfo {
bank_forks,
completed_slots_receiver,
epoch_schedule,
duplicate_slots_reset_sender,
};
let leader_schedule_cache_clone = leader_schedule_cache.clone();
let window_service = WindowService::new(
blockstore,
cluster_info.clone(),
@ -466,6 +475,7 @@ impl RetransmitStage {
Self {
thread_hdls,
window_service,
cluster_slots_service,
}
}
@ -474,6 +484,7 @@ impl RetransmitStage {
thread_hdl.join()?;
}
self.window_service.join()?;
self.cluster_slots_service.join()?;
Ok(())
}
}