Fix repair slowness when most peers are unable to serve requests (#7287)

* Fix repair when most peers are incapable of serving requests

* Add a test for getting the lowest slot in blocktree

* Replace some more u64s with Slot
This commit is contained in:
Sagar Dhawan 2019-12-05 11:25:13 -08:00 committed by GitHub
parent d8e1a196bc
commit a95d37ea25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 204 additions and 60 deletions

View File

@ -317,10 +317,10 @@ impl ClusterInfo {
) )
} }
pub fn push_epoch_slots(&mut self, id: Pubkey, root: u64, slots: BTreeSet<u64>) { pub fn push_epoch_slots(&mut self, id: Pubkey, root: Slot, min: Slot, slots: BTreeSet<Slot>) {
let now = timestamp(); let now = timestamp();
let entry = CrdsValue::new_signed( let entry = CrdsValue::new_signed(
CrdsData::EpochSlots(EpochSlots::new(id, root, slots, now)), CrdsData::EpochSlots(EpochSlots::new(id, root, min, slots, now)),
&self.keypair, &self.keypair,
); );
self.gossip self.gossip
@ -489,13 +489,18 @@ impl ClusterInfo {
.collect() .collect()
} }
/// all tvu peers with valid gossip addrs /// all tvu peers with valid gossip addrs that likely have the slot being requested
fn repair_peers(&self) -> Vec<ContactInfo> { fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> {
let me = self.my_data().id; let me = self.my_data().id;
ClusterInfo::tvu_peers(self) ClusterInfo::tvu_peers(self)
.into_iter() .into_iter()
.filter(|x| x.id != me) .filter(|x| x.id != me)
.filter(|x| ContactInfo::is_valid_address(&x.gossip)) .filter(|x| ContactInfo::is_valid_address(&x.gossip))
.filter(|x| {
self.get_epoch_state_for_node(&x.id, None)
.map(|(epoch_slots, _)| epoch_slots.lowest <= slot)
.unwrap_or_else(|| /* fallback to legacy behavior */ true)
})
.collect() .collect()
} }
@ -840,9 +845,9 @@ impl ClusterInfo {
} }
pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> { pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication, as indicated // find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location // by a valid tvu port location
let valid: Vec<_> = self.repair_peers(); let valid: Vec<_> = self.repair_peers(repair_request.slot());
if valid.is_empty() { if valid.is_empty() {
return Err(ClusterInfoError::NoPeers.into()); return Err(ClusterInfoError::NoPeers.into());
} }
@ -2555,6 +2560,7 @@ mod tests {
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots { let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots {
from: Pubkey::default(), from: Pubkey::default(),
root: 0, root: 0,
lowest: 0,
slots: btree_slots, slots: btree_slots,
wallclock: 0, wallclock: 0,
})); }));
@ -2571,6 +2577,7 @@ mod tests {
let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots { let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots {
from: Pubkey::default(), from: Pubkey::default(),
root: 0, root: 0,
lowest: 0,
slots: BTreeSet::new(), slots: BTreeSet::new(),
wallclock: 0, wallclock: 0,
})); }));
@ -2588,6 +2595,7 @@ mod tests {
value.data = CrdsData::EpochSlots(EpochSlots { value.data = CrdsData::EpochSlots(EpochSlots {
from: Pubkey::default(), from: Pubkey::default(),
root: 0, root: 0,
lowest: 0,
slots, slots,
wallclock: 0, wallclock: 0,
}); });
@ -2700,6 +2708,37 @@ mod tests {
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
} }
#[test]
fn test_repair_peers() {
let node_keypair = Arc::new(Keypair::new());
let mut cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
);
for i in 0..10 {
let mut peer_root = 5;
let mut peer_lowest = 0;
if i >= 5 {
// make these invalid for the upcoming repair request
peer_root = 15;
peer_lowest = 10;
}
let other_node_pubkey = Pubkey::new_rand();
let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp());
cluster_info.insert_info(other_node.clone());
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new(
other_node_pubkey,
peer_root,
peer_lowest,
BTreeSet::new(),
timestamp(),
)));
let _ = cluster_info.gossip.crds.insert(value, timestamp());
}
// only half the visible peers should be eligible to serve this repair
assert_eq!(cluster_info.repair_peers(5).len(), 5);
}
#[test] #[test]
fn test_max_bloom_size() { fn test_max_bloom_size() {
assert_eq!(MAX_BLOOM_SIZE, max_bloom_size()); assert_eq!(MAX_BLOOM_SIZE, max_bloom_size());

View File

@ -27,6 +27,7 @@ pub const GOSSIP_DELAY_SLOTS: usize = 2;
pub const NUM_SLOTS_PER_UPDATE: usize = 2; pub const NUM_SLOTS_PER_UPDATE: usize = 2;
// Time between allowing repair for same slot for same validator // Time between allowing repair for same slot for same validator
pub const REPAIR_SAME_SLOT_THRESHOLD: u64 = 5000; pub const REPAIR_SAME_SLOT_THRESHOLD: u64 = 5000;
use solana_sdk::clock::Slot;
use solana_sdk::timing::timestamp; use solana_sdk::timing::timestamp;
// Represents the shreds that a repairman is responsible for repairing in specific slot. More // Represents the shreds that a repairman is responsible for repairing in specific slot. More
@ -76,9 +77,10 @@ impl Iterator for ShredIndexesToRepairIterator {
#[derive(Default)] #[derive(Default)]
struct RepaireeInfo { struct RepaireeInfo {
last_root: u64, last_root: Slot,
last_ts: u64, last_ts: u64,
last_repaired_slot_and_ts: (u64, u64), last_repaired_slot_and_ts: (Slot, u64),
lowest_slot: Slot,
} }
pub struct ClusterInfoRepairListener { pub struct ClusterInfoRepairListener {
@ -132,6 +134,7 @@ impl ClusterInfoRepairListener {
return Ok(()); return Ok(());
} }
let lowest_slot = blocktree.lowest_slot();
let peers = cluster_info.read().unwrap().gossip_peers(); let peers = cluster_info.read().unwrap().gossip_peers();
let mut peers_needing_repairs: HashMap<Pubkey, EpochSlots> = HashMap::new(); let mut peers_needing_repairs: HashMap<Pubkey, EpochSlots> = HashMap::new();
@ -144,6 +147,7 @@ impl ClusterInfoRepairListener {
cluster_info, cluster_info,
peer_infos, peer_infos,
&mut my_gossiped_root, &mut my_gossiped_root,
lowest_slot,
) { ) {
peers_needing_repairs.insert(peer.id, repairee_epoch_slots); peers_needing_repairs.insert(peer.id, repairee_epoch_slots);
} }
@ -170,7 +174,8 @@ impl ClusterInfoRepairListener {
peer_pubkey: &Pubkey, peer_pubkey: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
peer_infos: &mut HashMap<Pubkey, RepaireeInfo>, peer_infos: &mut HashMap<Pubkey, RepaireeInfo>,
my_gossiped_root: &mut u64, my_gossiped_root: &mut Slot,
my_lowest_slot: Slot,
) -> Option<EpochSlots> { ) -> Option<EpochSlots> {
let last_cached_repair_ts = Self::get_last_ts(peer_pubkey, peer_infos); let last_cached_repair_ts = Self::get_last_ts(peer_pubkey, peer_infos);
let my_root = Self::read_my_gossiped_root(&my_pubkey, cluster_info, my_gossiped_root); let my_root = Self::read_my_gossiped_root(&my_pubkey, cluster_info, my_gossiped_root);
@ -187,7 +192,12 @@ impl ClusterInfoRepairListener {
let last_repair_ts = { let last_repair_ts = {
// Following logic needs to be fast because it holds the lock // Following logic needs to be fast because it holds the lock
// preventing updates on gossip // preventing updates on gossip
if Self::should_repair_peer(my_root, peer_epoch_slots.root, NUM_BUFFER_SLOTS) { if Self::should_repair_peer(
my_root,
my_lowest_slot,
peer_epoch_slots.root,
NUM_BUFFER_SLOTS,
) {
// Clone out EpochSlots structure to avoid holding lock on gossip // Clone out EpochSlots structure to avoid holding lock on gossip
result = Some(peer_epoch_slots.clone()); result = Some(peer_epoch_slots.clone());
updated_ts updated_ts
@ -199,6 +209,7 @@ impl ClusterInfoRepairListener {
peer_info.last_ts = last_repair_ts; peer_info.last_ts = last_repair_ts;
peer_info.last_root = peer_root; peer_info.last_root = peer_root;
peer_info.lowest_slot = peer_epoch_slots.lowest;
result result
} else { } else {
None None
@ -213,7 +224,7 @@ impl ClusterInfoRepairListener {
repairees: &HashMap<Pubkey, EpochSlots>, repairees: &HashMap<Pubkey, EpochSlots>,
socket: &UdpSocket, socket: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
my_gossiped_root: &mut u64, my_gossiped_root: &mut Slot,
epoch_schedule: &EpochSchedule, epoch_schedule: &EpochSchedule,
) -> Result<()> { ) -> Result<()> {
for (repairee_pubkey, repairee_epoch_slots) in repairees { for (repairee_pubkey, repairee_epoch_slots) in repairees {
@ -274,7 +285,7 @@ impl ClusterInfoRepairListener {
fn serve_repairs_to_repairee( fn serve_repairs_to_repairee(
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
repairee_pubkey: &Pubkey, repairee_pubkey: &Pubkey,
my_root: u64, my_root: Slot,
blocktree: &Blocktree, blocktree: &Blocktree,
repairee_epoch_slots: &EpochSlots, repairee_epoch_slots: &EpochSlots,
eligible_repairmen: &[&Pubkey], eligible_repairmen: &[&Pubkey],
@ -283,7 +294,7 @@ impl ClusterInfoRepairListener {
num_slots_to_repair: usize, num_slots_to_repair: usize,
epoch_schedule: &EpochSchedule, epoch_schedule: &EpochSchedule,
last_repaired_slot_and_ts: (u64, u64), last_repaired_slot_and_ts: (u64, u64),
) -> Result<Option<u64>> { ) -> Result<Option<Slot>> {
let slot_iter = RootedSlotIterator::new(repairee_epoch_slots.root, &blocktree); let slot_iter = RootedSlotIterator::new(repairee_epoch_slots.root, &blocktree);
if slot_iter.is_err() { if slot_iter.is_err() {
info!( info!(
@ -386,13 +397,13 @@ impl ClusterInfoRepairListener {
} }
fn report_repair_metrics( fn report_repair_metrics(
slot: u64, slot: Slot,
repairee_id: &Pubkey, repairee_id: &Pubkey,
total_data_shreds_sent: u64, total_data_shreds_sent: u64,
total_coding_shreds_sent: u64, total_coding_shreds_sent: u64,
) { ) {
if total_data_shreds_sent > 0 || total_coding_shreds_sent > 0 { if total_data_shreds_sent > 0 || total_coding_shreds_sent > 0 {
datapoint!( datapoint_info!(
"repairman_activity", "repairman_activity",
("slot", slot, i64), ("slot", slot, i64),
("repairee_id", repairee_id.to_string(), String), ("repairee_id", repairee_id.to_string(), String),
@ -405,7 +416,7 @@ impl ClusterInfoRepairListener {
fn shuffle_repairmen( fn shuffle_repairmen(
eligible_repairmen: &mut Vec<&Pubkey>, eligible_repairmen: &mut Vec<&Pubkey>,
repairee_pubkey: &Pubkey, repairee_pubkey: &Pubkey,
repairee_root: u64, repairee_root: Slot,
) { ) {
// Make a seed from pubkey + repairee root // Make a seed from pubkey + repairee root
let mut seed = [0u8; mem::size_of::<Pubkey>()]; let mut seed = [0u8; mem::size_of::<Pubkey>()];
@ -456,7 +467,7 @@ impl ClusterInfoRepairListener {
fn find_eligible_repairmen<'a>( fn find_eligible_repairmen<'a>(
my_pubkey: &'a Pubkey, my_pubkey: &'a Pubkey,
repairee_root: u64, repairee_root: Slot,
repairman_roots: &'a HashMap<Pubkey, RepaireeInfo>, repairman_roots: &'a HashMap<Pubkey, RepaireeInfo>,
num_buffer_slots: usize, num_buffer_slots: usize,
) -> Vec<&'a Pubkey> { ) -> Vec<&'a Pubkey> {
@ -465,6 +476,7 @@ impl ClusterInfoRepairListener {
.filter_map(|(repairman_pubkey, repairman_info)| { .filter_map(|(repairman_pubkey, repairman_info)| {
if Self::should_repair_peer( if Self::should_repair_peer(
repairman_info.last_root, repairman_info.last_root,
repairman_info.lowest_slot,
repairee_root, repairee_root,
num_buffer_slots - GOSSIP_DELAY_SLOTS, num_buffer_slots - GOSSIP_DELAY_SLOTS,
) { ) {
@ -484,7 +496,7 @@ impl ClusterInfoRepairListener {
fn read_my_gossiped_root( fn read_my_gossiped_root(
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
old_root: &mut u64, old_root: &mut Slot,
) -> u64 { ) -> u64 {
let new_root = cluster_info let new_root = cluster_info
.read() .read()
@ -502,13 +514,16 @@ impl ClusterInfoRepairListener {
// Decide if a repairman with root == `repairman_root` should send repairs to a // Decide if a repairman with root == `repairman_root` should send repairs to a
// potential repairee with root == `repairee_root` // potential repairee with root == `repairee_root`
fn should_repair_peer( fn should_repair_peer(
repairman_root: u64, repairman_root: Slot,
repairee_root: u64, repairman_lowest_slot: Slot,
repairee_root: Slot,
num_buffer_slots: usize, num_buffer_slots: usize,
) -> bool { ) -> bool {
// Check if this potential repairman's root is greater than the repairee root + // Check if this potential repairman is likely to have slots needed to repair this repairee
// num_buffer_slots // and that its root is greater than the repairee root + num_buffer_slots
repairman_root > repairee_root + num_buffer_slots as u64 // Only need to be able to repair slots that are 1 higher than root
repairman_lowest_slot <= repairee_root + 1
&& repairman_root > repairee_root + num_buffer_slots as u64
} }
fn get_last_ts(pubkey: &Pubkey, peer_infos: &mut HashMap<Pubkey, RepaireeInfo>) -> Option<u64> { fn get_last_ts(pubkey: &Pubkey, peer_infos: &mut HashMap<Pubkey, RepaireeInfo>) -> Option<u64> {
@ -612,6 +627,7 @@ mod tests {
cluster_info.write().unwrap().push_epoch_slots( cluster_info.write().unwrap().push_epoch_slots(
peer_pubkey, peer_pubkey,
repairee_root, repairee_root,
0,
repairee_slots.clone(), repairee_slots.clone(),
); );
@ -626,6 +642,7 @@ mod tests {
&cluster_info, &cluster_info,
&mut peer_info, &mut peer_info,
&mut my_gossiped_root, &mut my_gossiped_root,
0,
) )
.is_none()); .is_none());
@ -638,6 +655,7 @@ mod tests {
&cluster_info, &cluster_info,
&mut peer_info, &mut peer_info,
&mut my_gossiped_root, &mut my_gossiped_root,
0,
) )
.is_some()); .is_some());
@ -650,22 +668,26 @@ mod tests {
&cluster_info, &cluster_info,
&mut peer_info, &mut peer_info,
&mut my_gossiped_root, &mut my_gossiped_root,
0,
) )
.is_none()); .is_none());
// Sleep to make sure the timestamp is updated in gossip. Update the gossiped EpochSlots. // Sleep to make sure the timestamp is updated in gossip. Update the gossiped EpochSlots.
// Now a repair should be sent again // Now a repair should be sent again
sleep(Duration::from_millis(10)); sleep(Duration::from_millis(10));
cluster_info cluster_info.write().unwrap().push_epoch_slots(
.write() peer_pubkey,
.unwrap() repairee_root,
.push_epoch_slots(peer_pubkey, repairee_root, repairee_slots); 0,
repairee_slots,
);
assert!(ClusterInfoRepairListener::process_potential_repairee( assert!(ClusterInfoRepairListener::process_potential_repairee(
&my_pubkey, &my_pubkey,
&peer_pubkey, &peer_pubkey,
&cluster_info, &cluster_info,
&mut peer_info, &mut peer_info,
&mut my_gossiped_root, &mut my_gossiped_root,
0,
) )
.is_some()); .is_some());
} }
@ -695,7 +717,7 @@ mod tests {
let repairee_root = 0; let repairee_root = 0;
let repairee_slots: BTreeSet<_> = (0..=num_slots).step_by(2).collect(); let repairee_slots: BTreeSet<_> = (0..=num_slots).step_by(2).collect();
let repairee_epoch_slots = let repairee_epoch_slots =
EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots, 1); EpochSlots::new(mock_repairee.id, repairee_root, 0, repairee_slots, 1);
let eligible_repairmen = vec![&my_pubkey]; let eligible_repairmen = vec![&my_pubkey];
let epoch_schedule = EpochSchedule::custom(32, 16, false); let epoch_schedule = EpochSchedule::custom(32, 16, false);
assert!(ClusterInfoRepairListener::serve_repairs_to_repairee( assert!(ClusterInfoRepairListener::serve_repairs_to_repairee(
@ -765,7 +787,7 @@ mod tests {
let repairee_root = 0; let repairee_root = 0;
let repairee_slots: BTreeSet<_> = (0..=num_slots).step_by(2).collect(); let repairee_slots: BTreeSet<_> = (0..=num_slots).step_by(2).collect();
let repairee_epoch_slots = let repairee_epoch_slots =
EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots, 1); EpochSlots::new(mock_repairee.id, repairee_root, 0, repairee_slots, 1);
// Mock out some other repairmen such that each repairman is responsible for 1 shred in a slot // Mock out some other repairmen such that each repairman is responsible for 1 shred in a slot
let num_repairmen = entries_per_slot - 1; let num_repairmen = entries_per_slot - 1;
@ -857,8 +879,13 @@ mod tests {
// already has all the slots for which they have a confirmed leader schedule // already has all the slots for which they have a confirmed leader schedule
let repairee_root = 0; let repairee_root = 0;
let repairee_slots: BTreeSet<_> = (0..=slots_per_epoch).collect(); let repairee_slots: BTreeSet<_> = (0..=slots_per_epoch).collect();
let repairee_epoch_slots = let repairee_epoch_slots = EpochSlots::new(
EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots.clone(), 1); mock_repairee.id,
repairee_root,
0,
repairee_slots.clone(),
1,
);
ClusterInfoRepairListener::serve_repairs_to_repairee( ClusterInfoRepairListener::serve_repairs_to_repairee(
&my_pubkey, &my_pubkey,
@ -882,7 +909,7 @@ mod tests {
// Set the root to stakers_slot_offset, now epoch 2 should be confirmed, so the repairee // Set the root to stakers_slot_offset, now epoch 2 should be confirmed, so the repairee
// is now eligible to get slots from epoch 2: // is now eligible to get slots from epoch 2:
let repairee_epoch_slots = let repairee_epoch_slots =
EpochSlots::new(mock_repairee.id, stakers_slot_offset, repairee_slots, 1); EpochSlots::new(mock_repairee.id, stakers_slot_offset, 0, repairee_slots, 1);
ClusterInfoRepairListener::serve_repairs_to_repairee( ClusterInfoRepairListener::serve_repairs_to_repairee(
&my_pubkey, &my_pubkey,
&mock_repairee.id, &mock_repairee.id,
@ -1016,6 +1043,7 @@ mod tests {
let repairee_root = 5; let repairee_root = 5;
assert!(!ClusterInfoRepairListener::should_repair_peer( assert!(!ClusterInfoRepairListener::should_repair_peer(
repairman_root, repairman_root,
0,
repairee_root, repairee_root,
0, 0,
)); ));
@ -1025,6 +1053,7 @@ mod tests {
let repairee_root = 5; let repairee_root = 5;
assert!(!ClusterInfoRepairListener::should_repair_peer( assert!(!ClusterInfoRepairListener::should_repair_peer(
repairman_root, repairman_root,
0,
repairee_root, repairee_root,
0, 0,
)); ));
@ -1034,6 +1063,7 @@ mod tests {
let repairee_root = 5; let repairee_root = 5;
assert!(ClusterInfoRepairListener::should_repair_peer( assert!(ClusterInfoRepairListener::should_repair_peer(
repairman_root, repairman_root,
0,
repairee_root, repairee_root,
0, 0,
)); ));
@ -1043,6 +1073,7 @@ mod tests {
let repairee_root = 5; let repairee_root = 5;
assert!(!ClusterInfoRepairListener::should_repair_peer( assert!(!ClusterInfoRepairListener::should_repair_peer(
repairman_root, repairman_root,
0,
repairee_root, repairee_root,
11, 11,
)); ));
@ -1052,6 +1083,17 @@ mod tests {
let repairee_root = 5; let repairee_root = 5;
assert!(ClusterInfoRepairListener::should_repair_peer( assert!(ClusterInfoRepairListener::should_repair_peer(
repairman_root, repairman_root,
0,
repairee_root,
10,
));
// If repairee is behind and outside the buffer but behind our lowest slot, we don't repair
let repairman_root = 16;
let repairee_root = 5;
assert!(!ClusterInfoRepairListener::should_repair_peer(
repairman_root,
repairee_root + 2,
repairee_root, repairee_root,
10, 10,
)); ));

View File

@ -1,5 +1,6 @@
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use bincode::{serialize, serialized_size}; use bincode::{serialize, serialized_size};
use solana_sdk::clock::Slot;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, Signable, Signature}; use solana_sdk::signature::{Keypair, Signable, Signature};
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
@ -63,16 +64,24 @@ pub enum CrdsData {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct EpochSlots { pub struct EpochSlots {
pub from: Pubkey, pub from: Pubkey,
pub root: u64, pub root: Slot,
pub slots: BTreeSet<u64>, pub lowest: Slot,
pub slots: BTreeSet<Slot>,
pub wallclock: u64, pub wallclock: u64,
} }
impl EpochSlots { impl EpochSlots {
pub fn new(from: Pubkey, root: u64, slots: BTreeSet<u64>, wallclock: u64) -> Self { pub fn new(
from: Pubkey,
root: Slot,
lowest: Slot,
slots: BTreeSet<Slot>,
wallclock: u64,
) -> Self {
Self { Self {
from, from,
root, root,
lowest,
slots, slots,
wallclock, wallclock,
} }
@ -271,6 +280,7 @@ mod test {
let v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( let v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new(
Pubkey::default(), Pubkey::default(),
0, 0,
0,
BTreeSet::new(), BTreeSet::new(),
0, 0,
))); )));
@ -293,10 +303,11 @@ mod test {
Vote::new(&keypair.pubkey(), test_tx(), timestamp()), Vote::new(&keypair.pubkey(), test_tx(), timestamp()),
)); ));
verify_signatures(&mut v, &keypair, &wrong_keypair); verify_signatures(&mut v, &keypair, &wrong_keypair);
let btreeset: BTreeSet<u64> = vec![1, 2, 3, 6, 8].into_iter().collect(); let btreeset: BTreeSet<Slot> = vec![1, 2, 3, 6, 8].into_iter().collect();
v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new(
keypair.pubkey(), keypair.pubkey(),
0, 0,
0,
btreeset, btreeset,
timestamp(), timestamp(),
))); )));

View File

@ -35,14 +35,24 @@ pub enum RepairStrategy {
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum RepairType { pub enum RepairType {
Orphan(u64), Orphan(Slot),
HighestShred(u64, u64), HighestShred(Slot, u64),
Shred(u64, u64), Shred(Slot, u64),
}
impl RepairType {
pub fn slot(&self) -> Slot {
match self {
RepairType::Orphan(slot) => *slot,
RepairType::HighestShred(slot, _) => *slot,
RepairType::Shred(slot, _) => *slot,
}
}
} }
pub struct RepairSlotRange { pub struct RepairSlotRange {
pub start: u64, pub start: Slot,
pub end: u64, pub end: Slot,
} }
impl Default for RepairSlotRange { impl Default for RepairSlotRange {
@ -106,7 +116,7 @@ impl RepairService {
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
repair_strategy: RepairStrategy, repair_strategy: RepairStrategy,
) { ) {
let mut epoch_slots: BTreeSet<u64> = BTreeSet::new(); let mut epoch_slots: BTreeSet<Slot> = BTreeSet::new();
let id = cluster_info.read().unwrap().id(); let id = cluster_info.read().unwrap().id();
let mut current_root = 0; let mut current_root = 0;
if let RepairStrategy::RepairAll { if let RepairStrategy::RepairAll {
@ -144,9 +154,11 @@ impl RepairService {
.. ..
} => { } => {
let new_root = blocktree.last_root(); let new_root = blocktree.last_root();
let lowest_slot = blocktree.lowest_slot();
Self::update_epoch_slots( Self::update_epoch_slots(
id, id,
new_root, new_root,
lowest_slot,
&mut current_root, &mut current_root,
&mut epoch_slots, &mut epoch_slots,
&cluster_info, &cluster_info,
@ -216,7 +228,7 @@ impl RepairService {
fn generate_repairs( fn generate_repairs(
blocktree: &Blocktree, blocktree: &Blocktree,
root: u64, root: Slot,
max_repairs: usize, max_repairs: usize,
) -> Result<Vec<RepairType>> { ) -> Result<Vec<RepairType>> {
// Slot height and shred indexes for shreds we want to repair // Slot height and shred indexes for shreds we want to repair
@ -289,8 +301,8 @@ impl RepairService {
fn get_completed_slots_past_root( fn get_completed_slots_past_root(
blocktree: &Blocktree, blocktree: &Blocktree,
slots_in_gossip: &mut BTreeSet<u64>, slots_in_gossip: &mut BTreeSet<Slot>,
root: u64, root: Slot,
epoch_schedule: &EpochSchedule, epoch_schedule: &EpochSchedule,
) { ) {
let last_confirmed_epoch = epoch_schedule.get_leader_schedule_epoch(root); let last_confirmed_epoch = epoch_schedule.get_leader_schedule_epoch(root);
@ -313,8 +325,8 @@ impl RepairService {
fn initialize_epoch_slots( fn initialize_epoch_slots(
id: Pubkey, id: Pubkey,
blocktree: &Blocktree, blocktree: &Blocktree,
slots_in_gossip: &mut BTreeSet<u64>, slots_in_gossip: &mut BTreeSet<Slot>,
root: u64, root: Slot,
epoch_schedule: &EpochSchedule, epoch_schedule: &EpochSchedule,
cluster_info: &RwLock<ClusterInfo>, cluster_info: &RwLock<ClusterInfo>,
) { ) {
@ -324,19 +336,22 @@ impl RepairService {
// also be updated with the latest root (done in blocktree_processor) and thus // also be updated with the latest root (done in blocktree_processor) and thus
// will provide a schedule to window_service for any incoming shreds up to the // will provide a schedule to window_service for any incoming shreds up to the
// last_confirmed_epoch. // last_confirmed_epoch.
cluster_info cluster_info.write().unwrap().push_epoch_slots(
.write() id,
.unwrap() root,
.push_epoch_slots(id, root, slots_in_gossip.clone()); blocktree.lowest_slot(),
slots_in_gossip.clone(),
);
} }
// Update the gossiped structure used for the "Repairmen" repair protocol. See book // Update the gossiped structure used for the "Repairmen" repair protocol. See book
// for details. // for details.
fn update_epoch_slots( fn update_epoch_slots(
id: Pubkey, id: Pubkey,
latest_known_root: u64, latest_known_root: Slot,
prev_root: &mut u64, lowest_slot: Slot,
slots_in_gossip: &mut BTreeSet<u64>, prev_root: &mut Slot,
slots_in_gossip: &mut BTreeSet<Slot>,
cluster_info: &RwLock<ClusterInfo>, cluster_info: &RwLock<ClusterInfo>,
completed_slots_receiver: &CompletedSlotsReceiver, completed_slots_receiver: &CompletedSlotsReceiver,
) { ) {
@ -362,12 +377,13 @@ impl RepairService {
cluster_info.write().unwrap().push_epoch_slots( cluster_info.write().unwrap().push_epoch_slots(
id, id,
latest_known_root, latest_known_root,
lowest_slot,
slots_in_gossip.clone(), slots_in_gossip.clone(),
); );
} }
} }
fn retain_slots_greater_than_root(slot_set: &mut BTreeSet<u64>, root: u64) { fn retain_slots_greater_than_root(slot_set: &mut BTreeSet<Slot>, root: Slot) {
*slot_set = slot_set *slot_set = slot_set
.range((Excluded(&root), Unbounded)) .range((Excluded(&root), Unbounded))
.cloned() .cloned()
@ -732,6 +748,7 @@ mod test {
RepairService::update_epoch_slots( RepairService::update_epoch_slots(
Pubkey::default(), Pubkey::default(),
root, root,
blocktree.lowest_slot(),
&mut root.clone(), &mut root.clone(),
&mut completed_slots, &mut completed_slots,
&cluster_info, &cluster_info,
@ -749,6 +766,7 @@ mod test {
RepairService::update_epoch_slots( RepairService::update_epoch_slots(
Pubkey::default(), Pubkey::default(),
root, root,
0,
&mut 0, &mut 0,
&mut completed_slots, &mut completed_slots,
&cluster_info, &cluster_info,
@ -782,6 +800,7 @@ mod test {
RepairService::update_epoch_slots( RepairService::update_epoch_slots(
my_pubkey.clone(), my_pubkey.clone(),
current_root, current_root,
0,
&mut current_root.clone(), &mut current_root.clone(),
&mut completed_slots, &mut completed_slots,
&cluster_info, &cluster_info,
@ -812,6 +831,7 @@ mod test {
RepairService::update_epoch_slots( RepairService::update_epoch_slots(
my_pubkey.clone(), my_pubkey.clone(),
current_root, current_root,
0,
&mut current_root, &mut current_root,
&mut completed_slots, &mut completed_slots,
&cluster_info, &cluster_info,
@ -830,6 +850,7 @@ mod test {
RepairService::update_epoch_slots( RepairService::update_epoch_slots(
my_pubkey.clone(), my_pubkey.clone(),
current_root + 1, current_root + 1,
0,
&mut current_root, &mut current_root,
&mut completed_slots, &mut completed_slots,
&cluster_info, &cluster_info,

View File

@ -72,10 +72,10 @@ pub struct Blocktree {
data_shred_cf: LedgerColumn<cf::ShredData>, data_shred_cf: LedgerColumn<cf::ShredData>,
code_shred_cf: LedgerColumn<cf::ShredCode>, code_shred_cf: LedgerColumn<cf::ShredCode>,
transaction_status_cf: LedgerColumn<cf::TransactionStatus>, transaction_status_cf: LedgerColumn<cf::TransactionStatus>,
last_root: Arc<RwLock<u64>>, last_root: Arc<RwLock<Slot>>,
insert_shreds_lock: Arc<Mutex<()>>, insert_shreds_lock: Arc<Mutex<()>>,
pub new_shreds_signals: Vec<SyncSender<bool>>, pub new_shreds_signals: Vec<SyncSender<bool>>,
pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>, pub completed_slots_senders: Vec<SyncSender<Vec<Slot>>>,
} }
pub struct IndexMetaWorkingSetEntry { pub struct IndexMetaWorkingSetEntry {
@ -1452,9 +1452,23 @@ impl Blocktree {
} }
} }
pub fn last_root(&self) -> u64 { pub fn last_root(&self) -> Slot {
*self.last_root.read().unwrap() *self.last_root.read().unwrap()
} }
// find the first available slot in blocktree that has some data in it
pub fn lowest_slot(&self) -> Slot {
for (slot, meta) in self
.slot_meta_iterator(0)
.expect("unable to iterate over meta")
{
if slot > 0 && meta.received > 0 {
return slot;
}
}
// This means blocktree is empty, should never get here aside from right at boot.
self.last_root()
}
} }
fn update_slot_meta( fn update_slot_meta(
@ -1952,7 +1966,7 @@ macro_rules! create_new_tmp_ledger {
}; };
} }
pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: u64) -> bool { pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: Slot) -> bool {
if !is_valid_write_to_slot_0(slot, parent_slot, last_root) { if !is_valid_write_to_slot_0(slot, parent_slot, last_root) {
// Check that the parent_slot < slot // Check that the parent_slot < slot
if parent_slot >= slot { if parent_slot >= slot {
@ -4341,4 +4355,21 @@ pub mod tests {
} }
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
} }
#[test]
fn test_lowest_slot() {
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
for i in 0..10 {
let slot = i;
let (shreds, _) = make_slot_entries(slot, 0, 1);
blocktree.insert_shreds(shreds, None, false).unwrap();
}
assert_eq!(blocktree.lowest_slot(), 1);
blocktree.run_purge_batch(0, 5).unwrap();
assert_eq!(blocktree.lowest_slot(), 6);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
} }