Initialize and Update EpochSlots in RepairService (#4255)

* Initialize EpochSlots in RepairService

* Fix flaky test
This commit is contained in:
carllin 2019-05-13 15:37:50 -07:00 committed by GitHub
parent 2eaa64c4e8
commit 7501ed65e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 247 additions and 15 deletions

View File

@ -7,6 +7,7 @@ use crate::cluster_info::ClusterInfo;
use crate::result::Result;
use crate::service::Service;
use solana_metrics::datapoint;
use solana_runtime::bank::EpochSchedule;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashSet;
use std::net::UdpSocket;
@ -27,6 +28,7 @@ pub enum RepairStrategy {
RepairAll {
bank_forks: Arc<RwLock<BankForks>>,
completed_slots_receiver: CompletedSlotsReceiver,
epoch_schedule: EpochSchedule,
},
}
@ -103,8 +105,24 @@ impl RepairService {
repair_strategy: RepairStrategy,
) {
let mut repair_info = RepairInfo::new();
let epoch_slots: HashSet<u64> = HashSet::new();
let mut epoch_slots: HashSet<u64> = HashSet::new();
let id = cluster_info.read().unwrap().id();
if let RepairStrategy::RepairAll {
ref bank_forks,
ref epoch_schedule,
..
} = repair_strategy
{
let root = bank_forks.read().unwrap().root();
Self::initialize_epoch_slots(
id,
blocktree,
&mut epoch_slots,
root,
epoch_schedule,
cluster_info,
);
}
loop {
if exit.load(Ordering::Relaxed) {
break;
@ -125,12 +143,14 @@ impl RepairService {
RepairStrategy::RepairAll {
ref bank_forks,
ref completed_slots_receiver,
..
} => {
let root = bank_forks.read().unwrap().root();
Self::update_epoch_slots(
id,
&epoch_slots,
root,
&mut epoch_slots,
&cluster_info,
bank_forks,
completed_slots_receiver,
);
Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH)
@ -289,20 +309,76 @@ impl RepairService {
}
}
fn get_completed_slots_past_root(
blocktree: &Blocktree,
slots_in_gossip: &mut HashSet<u64>,
root: u64,
epoch_schedule: &EpochSchedule,
) {
let last_confirmed_epoch = epoch_schedule.get_stakers_epoch(root);
let last_epoch_slot = epoch_schedule.get_last_slot_in_epoch(last_confirmed_epoch);
let mut meta_iter = blocktree
.slot_meta_iterator(root + 1)
.expect("Couldn't get db iterator");
while meta_iter.valid() && meta_iter.key().unwrap() <= last_epoch_slot {
let current_slot = meta_iter.key().unwrap();
let meta = meta_iter.value().unwrap();
if meta.is_full() {
slots_in_gossip.insert(current_slot);
}
meta_iter.next();
}
}
fn initialize_epoch_slots(
id: Pubkey,
blocktree: &Blocktree,
slots_in_gossip: &mut HashSet<u64>,
root: u64,
epoch_schedule: &EpochSchedule,
cluster_info: &RwLock<ClusterInfo>,
) {
Self::get_completed_slots_past_root(blocktree, slots_in_gossip, root, epoch_schedule);
// Safe to set into gossip because by this time, the leader schedule cache should
// also be updated with the latest root (done in blocktree_processor) and thus
// will provide a schedule to window_service for any incoming blobs up to the
// last_confirmed_epoch.
cluster_info
.write()
.unwrap()
.push_epoch_slots(id, root, slots_in_gossip.clone());
}
// Update the gossiped structure used for the "Repairmen" repair protocol. See book
// for details.
fn update_epoch_slots(
id: Pubkey,
slots: &HashSet<u64>,
root: u64,
slots_in_gossip: &mut HashSet<u64>,
cluster_info: &RwLock<ClusterInfo>,
bank_forks: &Arc<RwLock<BankForks>>,
_completed_slots_receiver: &CompletedSlotsReceiver,
completed_slots_receiver: &CompletedSlotsReceiver,
) {
let root = bank_forks.read().unwrap().root();
cluster_info
.write()
.unwrap()
.push_epoch_slots(id, root, slots.clone());
let mut should_update = false;
while let Ok(completed_slots) = completed_slots_receiver.try_recv() {
for slot in completed_slots {
// If the newly completed slot > root, and the set did not contain this value
// before, we should update gossip.
if slot > root && slots_in_gossip.insert(slot) {
should_update = true;
}
}
}
if should_update {
slots_in_gossip.retain(|x| *x > root);
cluster_info
.write()
.unwrap()
.push_epoch_slots(id, root, slots_in_gossip.clone());
}
}
}
@ -321,6 +397,11 @@ mod test {
make_chaining_slot_entries, make_many_slot_entries, make_slot_entries,
};
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::cluster_info::Node;
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
use std::cmp::min;
use std::thread::Builder;
#[test]
pub fn test_repair_orphan() {
@ -523,4 +604,145 @@ mod test {
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_get_completed_slots_past_root() {
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_entries_per_slot = 10;
let root = 10;
let fork1 = vec![5, 7, root, 15, 20, 21];
let fork1_blobs: Vec<_> = make_chaining_slot_entries(&fork1, num_entries_per_slot)
.into_iter()
.flat_map(|(blobs, _)| blobs)
.collect();
let fork2 = vec![8, 12];
let fork2_blobs = make_chaining_slot_entries(&fork2, num_entries_per_slot);
// Remove the last blob from each slot to make an incomplete slot
let fork2_incomplete_blobs: Vec<_> = fork2_blobs
.into_iter()
.flat_map(|(mut blobs, _)| {
blobs.pop();
blobs
})
.collect();
let mut full_slots = HashSet::new();
blocktree.write_blobs(&fork1_blobs).unwrap();
blocktree.write_blobs(&fork2_incomplete_blobs).unwrap();
// Test that only slots > root from fork1 were included
let epoch_schedule = EpochSchedule::new(32, 32, false);
RepairService::get_completed_slots_past_root(
&blocktree,
&mut full_slots,
root,
&epoch_schedule,
);
let mut expected: HashSet<_> = fork1.into_iter().filter(|x| *x > root).collect();
assert_eq!(full_slots, expected);
// Test that slots past the last confirmed epoch boundary don't get included
let last_epoch = epoch_schedule.get_stakers_epoch(root);
let last_slot = epoch_schedule.get_last_slot_in_epoch(last_epoch);
let fork3 = vec![last_slot, last_slot + 1];
let fork3_blobs: Vec<_> = make_chaining_slot_entries(&fork3, num_entries_per_slot)
.into_iter()
.flat_map(|(blobs, _)| blobs)
.collect();
blocktree.write_blobs(&fork3_blobs).unwrap();
RepairService::get_completed_slots_past_root(
&blocktree,
&mut full_slots,
root,
&epoch_schedule,
);
expected.insert(last_slot);
assert_eq!(full_slots, expected);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_update_epoch_slots() {
let blocktree_path = get_tmp_ledger_path!();
{
// Create blocktree
let (blocktree, _, completed_slots_receiver) =
Blocktree::open_with_signal(&blocktree_path).unwrap();
let blocktree = Arc::new(blocktree);
let mut root = 0;
let num_slots = 100;
let entries_per_slot = 5;
let blocktree_ = blocktree.clone();
// Spin up thread to write to blocktree
let writer = Builder::new()
.name("writer".to_string())
.spawn(move || {
let slots: Vec<_> = (1..num_slots + 1).collect();
let mut blobs: Vec<_> = make_chaining_slot_entries(&slots, entries_per_slot)
.into_iter()
.flat_map(|(blobs, _)| blobs)
.collect();
blobs.shuffle(&mut thread_rng());
let mut i = 0;
let max_step = entries_per_slot * 4;
let repair_interval_ms = 10;
let mut rng = rand::thread_rng();
while i < blobs.len() as usize {
let step = rng.gen_range(1, max_step + 1);
blocktree_
.insert_data_blobs(&blobs[i..min(i + max_step as usize, blobs.len())])
.unwrap();
sleep(Duration::from_millis(repair_interval_ms));
i += step as usize;
}
})
.unwrap();
let mut completed_slots = HashSet::new();
let node_info = Node::new_localhost_with_pubkey(&Pubkey::default());
let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair(
node_info.info.clone(),
));
while completed_slots.len() < num_slots as usize {
RepairService::update_epoch_slots(
Pubkey::default(),
root,
&mut completed_slots,
&cluster_info,
&completed_slots_receiver,
);
}
let mut expected: HashSet<_> = (1..num_slots + 1).collect();
assert_eq!(completed_slots, expected);
// Update with new root, should filter out the slots <= root
root = num_slots / 2;
let (blobs, _) = make_slot_entries(num_slots + 2, num_slots + 1, entries_per_slot);
blocktree.insert_data_blobs(&blobs).unwrap();
RepairService::update_epoch_slots(
Pubkey::default(),
root,
&mut completed_slots,
&cluster_info,
&completed_slots_receiver,
);
expected.insert(num_slots + 2);
expected.retain(|x| *x > root);
assert_eq!(completed_slots, expected);
writer.join().unwrap();
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
}

View File

@ -11,6 +11,7 @@ use crate::staking_utils;
use crate::streamer::BlobReceiver;
use crate::window_service::WindowService;
use solana_metrics::{datapoint, inc_new_counter_info};
use solana_runtime::bank::EpochSchedule;
use solana_sdk::hash::Hash;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
@ -117,6 +118,7 @@ impl RetransmitStage {
exit: &Arc<AtomicBool>,
genesis_blockhash: &Hash,
completed_slots_receiver: CompletedSlotsReceiver,
epoch_schedule: EpochSchedule,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
@ -131,6 +133,7 @@ impl RetransmitStage {
let repair_strategy = RepairStrategy::RepairAll {
bank_forks,
completed_slots_receiver,
epoch_schedule,
};
let window_service = WindowService::new(
Some(leader_schedule_cache.clone()),

View File

@ -110,6 +110,7 @@ impl Tvu {
&exit,
genesis_blockhash,
completed_slots_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
);
let (replay_stage, slot_full_receiver, root_slot_receiver) = ReplayStage::new(

View File

@ -364,8 +364,14 @@ mod test {
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let repair_strategy = RepairStrategy::RepairAll {
bank_forks,
bank_forks: bank_forks.clone(),
completed_slots_receiver,
epoch_schedule: bank_forks
.read()
.unwrap()
.working_bank()
.epoch_schedule()
.clone(),
};
let t_window = WindowService::new(
Some(leader_schedule_cache),
@ -445,9 +451,11 @@ mod test {
let bank = Bank::new(&create_genesis_block_with_leader(100, &me_id, 10).0);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule();
let repair_strategy = RepairStrategy::RepairAll {
bank_forks,
completed_slots_receiver,
epoch_schedule,
};
let t_window = WindowService::new(
Some(leader_schedule_cache),

View File

@ -577,7 +577,7 @@ mod tests {
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::transaction::Transaction;
use std::thread::{sleep, Builder};
use std::time::{Duration, Instant};
use std::time::Duration;
fn load_accounts_with_fee(
tx: Transaction,
@ -1061,7 +1061,6 @@ mod tests {
.unwrap();
// Function will block until the parent_thread unlocks the parent's record lock
let now = Instant::now();
assert_eq!(
Accounts::lock_account(
(
@ -1074,7 +1073,6 @@ mod tests {
Ok(())
);
// Make sure that the function blocked
assert!(now.elapsed().as_secs() > 1);
parent_thread.join().unwrap();
{