Plumb ClusterInfoRepairListener (#4428)

automerge
This commit is contained in:
carllin 2019-05-24 19:20:09 -07:00 committed by Grimes
parent 0302f13b97
commit d772a27936
7 changed files with 159 additions and 20 deletions

View File

@ -958,8 +958,8 @@ mod tests {
// 3) There should only be repairmen who are not responsible for repairing this slot
// if we have more repairman than `num_blobs_in_slot * repair_redundancy`. In this case the
// first `num_blobs_in_slot * repair_redundancy` repairmen woudl send one blob, and the rest
// would noe be responsible for sending any repairs
// first `num_blobs_in_slot * repair_redundancy` repairmen would send one blob, and the rest
// would not be responsible for sending any repairs
assert_eq!(
none_results,
num_repairmen.saturating_sub(num_blobs_in_slot * repair_redundancy)

View File

@ -103,7 +103,7 @@ impl LeaderScheduleCache {
// Forbid asking for slots in an unconfirmed epoch
let bank_epoch = self.epoch_schedule.get_epoch_and_slot_index(slot).0;
if bank_epoch > *self.max_epoch.read().unwrap() {
error!(
debug!(
"Requested leader in slot: {} of unconfirmed epoch: {}",
slot, bank_epoch
);

View File

@ -64,6 +64,7 @@ pub struct ClusterConfig {
pub cluster_lamports: u64,
pub ticks_per_slot: u64,
pub slots_per_epoch: u64,
pub stakers_slot_offset: u64,
pub native_instruction_processors: Vec<(String, Pubkey)>,
pub poh_config: PohConfig,
}
@ -78,6 +79,7 @@ impl Default for ClusterConfig {
cluster_lamports: 0,
ticks_per_slot: DEFAULT_TICKS_PER_SLOT,
slots_per_epoch: DEFAULT_SLOTS_PER_EPOCH,
stakers_slot_offset: DEFAULT_SLOTS_PER_EPOCH,
native_instruction_processors: vec![],
poh_config: PohConfig::default(),
}
@ -130,6 +132,7 @@ impl LocalCluster {
);
genesis_block.ticks_per_slot = config.ticks_per_slot;
genesis_block.slots_per_epoch = config.slots_per_epoch;
genesis_block.stakers_slot_offset = config.stakers_slot_offset;
genesis_block.poh_config = config.poh_config.clone();
genesis_block
.native_instruction_processors
@ -223,7 +226,7 @@ impl LocalCluster {
}
}
fn add_validator(&mut self, validator_config: &ValidatorConfig, stake: u64) {
pub fn add_validator(&mut self, validator_config: &ValidatorConfig, stake: u64) {
let client = create_client(
self.entry_point_info.client_facing_addr(),
FULLNODE_PORT_RANGE,

View File

@ -4,6 +4,7 @@
use crate::bank_forks::BankForks;
use crate::blocktree::{Blocktree, CompletedSlotsReceiver, SlotMeta};
use crate::cluster_info::ClusterInfo;
use crate::cluster_info_repair_listener::ClusterInfoRepairListener;
use crate::result::Result;
use crate::service::Service;
use solana_metrics::datapoint_info;
@ -56,23 +57,36 @@ impl Default for RepairSlotRange {
pub struct RepairService {
t_repair: JoinHandle<()>,
cluster_info_repair_listener: Option<ClusterInfoRepairListener>,
}
impl RepairService {
pub fn new(
blocktree: Arc<Blocktree>,
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
repair_strategy: RepairStrategy,
) -> Self {
let exit = exit.clone();
let cluster_info_repair_listener = match repair_strategy {
RepairStrategy::RepairAll {
ref epoch_schedule, ..
} => Some(ClusterInfoRepairListener::new(
&blocktree,
&exit,
cluster_info.clone(),
*epoch_schedule,
)),
_ => None,
};
let t_repair = Builder::new()
.name("solana-repair-service".to_string())
.spawn(move || {
Self::run(
&blocktree,
exit,
&exit,
&repair_socket,
&cluster_info,
repair_strategy,
@ -80,12 +94,15 @@ impl RepairService {
})
.unwrap();
RepairService { t_repair }
RepairService {
t_repair,
cluster_info_repair_listener,
}
}
fn run(
blocktree: &Arc<Blocktree>,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
repair_socket: &Arc<UdpSocket>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
repair_strategy: RepairStrategy,
@ -373,7 +390,14 @@ impl Service for RepairService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_repair.join()
let mut results = vec![self.t_repair.join()];
if let Some(cluster_info_repair_listener) = self.cluster_info_repair_listener {
results.push(cluster_info_repair_listener.join());
}
for r in results {
r?;
}
Ok(())
}
}

View File

@ -317,9 +317,14 @@ impl ReplayStage {
.collect::<Vec<_>>();
rooted_slots.push(root_bank.slot());
let old_root = bank_forks.read().unwrap().root();
bank_forks.write().unwrap().set_root(new_root);
blocktree
.set_root(new_root, old_root)
.expect("Ledger set root failed");
// Set root first in leader schedule_cache before bank_forks because bank_forks.root
// is consumed by repair_service to update gossip, so we don't want to get blobs for
// repair on gossip before we update leader schedule, otherwise they may get dropped.
leader_schedule_cache.set_root(new_root);
blocktree.set_root(new_root, old_root)?;
bank_forks.write().unwrap().set_root(new_root);
Self::handle_new_root(&bank_forks, progress);
root_slot_sender.send(rooted_slots)?;
}

View File

@ -92,11 +92,13 @@ pub fn should_retransmit_and_persist(
false
} else if slot_leader_pubkey == None {
inc_new_counter_debug!("streamer-recv_window-unknown_leader", 1);
true
false
} else if slot_leader_pubkey != Some(blob.id()) {
inc_new_counter_debug!("streamer-recv_window-wrong_leader", 1);
false
} else {
// At this point, slot_leader_id == blob.id() && blob.id() != *my_id, so
// the blob is valid to process
true
}
}
@ -190,7 +192,7 @@ impl WindowService {
let repair_service = RepairService::new(
blocktree.clone(),
exit,
exit.clone(),
repair_socket,
cluster_info.clone(),
repair_strategy,
@ -303,10 +305,10 @@ mod test {
let mut blob = Blob::default();
blob.set_id(&leader_pubkey);
// without a Bank and blobs not from me, blob continues
// without a Bank and blobs not from me, blob gets thrown out
assert_eq!(
should_retransmit_and_persist(&blob, None, &cache, &me_id),
true
false
);
// with a Bank for slot 0, blob continues
@ -322,12 +324,11 @@ mod test {
false
);
// with a Bank and no idea who leader is, we keep the blobs (for now)
// TODO: persist in blocktree that we didn't know who the leader was at the time?
// with a Bank and no idea who leader is, blob gets thrown out
blob.set_slot(MINIMUM_SLOT_LENGTH as u64 * 3);
assert_eq!(
should_retransmit_and_persist(&blob, Some(bank), &cache, &me_id),
true
false
);
// if the blob came back from me, it doesn't continue, whether or not I have a bank

View File

@ -1,11 +1,14 @@
extern crate solana;
use crate::solana::blocktree::Blocktree;
use hashbrown::HashSet;
use log::*;
use solana::cluster::Cluster;
use solana::cluster_tests;
use solana::gossip_service::discover_cluster;
use solana::local_cluster::{ClusterConfig, LocalCluster};
use solana::validator::ValidatorConfig;
use solana_runtime::epoch_schedule::MINIMUM_SLOT_LENGTH;
use solana_runtime::epoch_schedule::{EpochSchedule, MINIMUM_SLOT_LENGTH};
use solana_sdk::poh_config::PohConfig;
use solana_sdk::timing;
use std::time::Duration;
@ -207,3 +210,106 @@ fn test_listener_startup() {
let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, 4).unwrap();
assert_eq!(cluster_nodes.len(), 4);
}
#[test]
fn test_repairman_catchup() {
run_repairman_catchup(5);
}
fn run_repairman_catchup(num_repairmen: u64) {
let mut validator_config = ValidatorConfig::default();
let num_ticks_per_second = 100;
let num_ticks_per_slot = 40;
let num_slots_per_epoch = MINIMUM_SLOT_LENGTH as u64;
let num_root_buffer_slots = 10;
// Calculate the leader schedule num_root_buffer slots ahead. Otherwise, if stakers_slot_offset ==
// num_slots_per_epoch, and num_slots_per_epoch == MINIMUM_SLOT_LENGTH, then repairmen
// will stop sending repairs after the last slot in epoch 1 (0-indexed), because the root
// is at most in the first epoch.
//
// For example:
// Assume:
// 1) num_slots_per_epoch = 32
// 2) stakers_slot_offset = 32
// 3) MINIMUM_SLOT_LENGTH = 32
//
// Then the last slot in epoch 1 is slot 63. After completing slots 0 to 63, the root on the
// repairee is at most 31. Because, the stakers_slot_offset == 32, then the max confirmed epoch
// on the repairee is epoch 1.
// Thus the repairmen won't send any slots past epoch 1, slot 63 to this repairee until the repairee
// updates their root, and the repairee can't update their root until they get slot 64, so no progress
// is made. This is also not accounting for the fact that the repairee may not vote on every slot, so
// their root could actually be much less than 31. This is why we give a num_root_buffer_slots buffer.
let stakers_slot_offset = num_slots_per_epoch + num_root_buffer_slots;
validator_config.rpc_config.enable_fullnode_exit = true;
let lamports_per_repairman = 1000;
// Make the repairee_stake small relative to the repairmen stake so that the repairee doesn't
// get included in the leader schedule, causing slots to get skipped while it's still trying
// to catch up
let repairee_stake = 3;
let cluster_lamports = 2 * lamports_per_repairman * num_repairmen + repairee_stake;
let node_stakes: Vec<_> = (0..num_repairmen).map(|_| lamports_per_repairman).collect();
let mut cluster = LocalCluster::new(&ClusterConfig {
node_stakes,
cluster_lamports,
validator_config: validator_config.clone(),
ticks_per_slot: num_ticks_per_slot,
slots_per_epoch: num_slots_per_epoch,
stakers_slot_offset,
poh_config: PohConfig::new_sleep(Duration::from_millis(1000 / num_ticks_per_second)),
..ClusterConfig::default()
});
let repairman_pubkeys: HashSet<_> = cluster.get_node_pubkeys().into_iter().collect();
let epoch_schedule = EpochSchedule::new(num_slots_per_epoch, stakers_slot_offset, true);
let num_warmup_epochs = (epoch_schedule.get_stakers_epoch(0) + 1) as f64;
// Sleep for longer than the first N warmup epochs, with a one epoch buffer for timing issues
cluster_tests::sleep_n_epochs(
num_warmup_epochs + 1.0,
&cluster.genesis_block.poh_config,
num_ticks_per_slot,
num_slots_per_epoch,
);
// Start up a new node, wait for catchup. Backwards repair won't be sufficient because the
// leader is sending blobs past this validator's first two confirmed epochs. Thus, the repairman
// protocol will have to kick in for this validator to repair.
cluster.add_validator(&validator_config, repairee_stake);
let all_pubkeys = cluster.get_node_pubkeys();
let repairee_id = all_pubkeys
.into_iter()
.find(|x| !repairman_pubkeys.contains(x))
.unwrap();
// Wait for repairman protocol to catch this validator up
cluster_tests::sleep_n_epochs(
num_warmup_epochs + 1.0,
&cluster.genesis_block.poh_config,
num_ticks_per_slot,
num_slots_per_epoch,
);
cluster.close_preserve_ledgers();
let validator_ledger_path = cluster.fullnode_infos[&repairee_id].ledger_path.clone();
// Expect at least the the first two epochs to have been rooted after waiting 3 epochs.
let num_expected_slots = num_slots_per_epoch * 2;
let validator_ledger = Blocktree::open(&validator_ledger_path).unwrap();
let validator_rooted_slots: Vec<_> =
validator_ledger.rooted_slot_iterator(0).unwrap().collect();
if validator_rooted_slots.len() as u64 <= num_expected_slots {
error!(
"Num expected slots: {}, number of rooted slots: {}",
num_expected_slots,
validator_rooted_slots.len()
);
}
assert!(validator_rooted_slots.len() as u64 > num_expected_slots);
}