retransmits shreds recovered from erasure codes

Shreds recovered from erasure codes have not been received from turbine
and have not been retransmitted to other nodes downstream. This results
in more repairs across the cluster which is slower.

This commit channels through recovered shreds to retransmit stage in
order to further broadcast the shreds to downstream nodes in the tree.
This commit is contained in:
behzad nouri 2021-08-13 14:11:37 -04:00
parent 3c71670bd9
commit 7a8807b8bb
3 changed files with 112 additions and 114 deletions

View File

@ -460,6 +460,8 @@ impl RetransmitStage {
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
// https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136
let _retransmit_sender = retransmit_sender.clone();
let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver));
let thread_hdls = retransmitter(
@ -598,6 +600,7 @@ mod tests {
let cluster_info = Arc::new(cluster_info);
let (retransmit_sender, retransmit_receiver) = channel();
let _retransmit_sender = retransmit_sender.clone();
let _t_retransmit = retransmitter(
retransmit_socket,
bank_forks,

View File

@ -260,6 +260,7 @@ fn run_insert<F>(
metrics: &mut BlockstoreInsertionMetrics,
ws_metrics: &mut WindowServiceMetrics,
completed_data_sets_sender: &CompletedDataSetsSender,
retransmit_sender: &Sender<Vec<Shred>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
) -> Result<()>
where
@ -287,7 +288,8 @@ where
shreds,
repairs,
Some(leader_schedule_cache),
false,
false, // is_trusted
Some(retransmit_sender),
&handle_duplicate,
metrics,
)?;
@ -467,6 +469,7 @@ impl WindowService {
insert_receiver,
duplicate_sender,
completed_data_sets_sender,
retransmit_sender.clone(),
outstanding_requests,
);
@ -528,6 +531,7 @@ impl WindowService {
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
check_duplicate_sender: CrossbeamSender<Shred>,
completed_data_sets_sender: CompletedDataSetsSender,
retransmit_sender: Sender<Vec<Shred>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> JoinHandle<()> {
let mut handle_timeout = || {};
@ -557,6 +561,7 @@ impl WindowService {
&mut metrics,
&mut ws_metrics,
&completed_data_sets_sender,
&retransmit_sender,
&outstanding_requests,
) {
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {

View File

@ -54,7 +54,7 @@ use {
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
mpsc::{sync_channel, Receiver, Sender, SyncSender, TrySendError},
Arc, Mutex, RwLock, RwLockWriteGuard,
},
time::Instant,
@ -800,6 +800,7 @@ impl Blockstore {
is_repaired: Vec<bool>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec<Shred>>>,
handle_duplicate: &F,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)>
@ -811,7 +812,7 @@ impl Blockstore {
let mut start = Measure::start("Blockstore lock");
let _lock = self.insert_shreds_lock.lock().unwrap();
start.stop();
let insert_lock_elapsed = start.as_us();
metrics.insert_lock_elapsed += start.as_us();
let db = &*self.db;
let mut write_batch = db.batch()?;
@ -822,66 +823,56 @@ impl Blockstore {
let mut slot_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new();
let num_shreds = shreds.len();
metrics.num_shreds += shreds.len();
let mut start = Measure::start("Shred insertion");
let mut num_inserted = 0;
let mut index_meta_time = 0;
let mut newly_completed_data_sets: Vec<CompletedDataSetInfo> = vec![];
let mut inserted_indices = Vec::new();
shreds
.into_iter()
.zip(is_repaired.into_iter())
.enumerate()
.for_each(|(i, (shred, is_repaired))| {
if shred.is_data() {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
ShredSource::Turbine
};
if let Ok(completed_data_sets) = self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
leader_schedule,
shred_source,
) {
newly_completed_data_sets.extend(completed_data_sets);
inserted_indices.push(i);
num_inserted += 1;
}
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
is_repaired,
);
for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() {
if shred.is_data() {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
panic!("There should be no other case");
ShredSource::Turbine
};
if let Ok(completed_data_sets) = self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
leader_schedule,
shred_source,
) {
newly_completed_data_sets.extend(completed_data_sets);
inserted_indices.push(i);
metrics.num_inserted += 1;
}
});
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
is_repaired,
);
} else {
panic!("There should be no other case");
}
}
start.stop();
let insert_shreds_elapsed = start.as_us();
metrics.insert_shreds_elapsed += start.as_us();
let mut start = Measure::start("Shred recovery");
let mut num_recovered = 0;
let mut num_recovered_inserted = 0;
let mut num_recovered_failed_sig = 0;
let mut num_recovered_failed_invalid = 0;
let mut num_recovered_exists = 0;
if let Some(leader_schedule_cache) = leader_schedule {
let recovered_data = Self::try_shred_recovery(
let recovered_data_shreds = Self::try_shred_recovery(
db,
&erasure_metas,
&mut index_working_set,
@ -889,62 +880,73 @@ impl Blockstore {
&mut just_inserted_coding_shreds,
);
num_recovered = recovered_data.len();
recovered_data.into_iter().for_each(|shred| {
if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) {
if shred.verify(&leader) {
match self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
&handle_duplicate,
leader_schedule,
ShredSource::Recovered,
) {
Err(InsertDataShredError::Exists) => {
num_recovered_exists += 1;
}
Err(InsertDataShredError::InvalidShred) => {
num_recovered_failed_invalid += 1;
}
Err(InsertDataShredError::BlockstoreError(_)) => {}
Ok(completed_data_sets) => {
newly_completed_data_sets.extend(completed_data_sets);
num_recovered_inserted += 1;
}
}
} else {
num_recovered_failed_sig += 1;
metrics.num_recovered += recovered_data_shreds.len();
let recovered_data_shreds: Vec<_> = recovered_data_shreds
.into_iter()
.filter_map(|shred| {
let leader =
leader_schedule_cache.slot_leader_at(shred.slot(), /*bank=*/ None)?;
if !shred.verify(&leader) {
metrics.num_recovered_failed_sig += 1;
return None;
}
match self.check_insert_data_shred(
shred.clone(),
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
&handle_duplicate,
leader_schedule,
ShredSource::Recovered,
) {
Err(InsertDataShredError::Exists) => {
metrics.num_recovered_exists += 1;
None
}
Err(InsertDataShredError::InvalidShred) => {
metrics.num_recovered_failed_invalid += 1;
None
}
Err(InsertDataShredError::BlockstoreError(_)) => None,
Ok(completed_data_sets) => {
newly_completed_data_sets.extend(completed_data_sets);
metrics.num_recovered_inserted += 1;
Some(shred)
}
}
})
// Always collect recovered-shreds so that above insert code is
// executed even if retransmit-sender is None.
.collect();
if !recovered_data_shreds.is_empty() {
if let Some(retransmit_sender) = retransmit_sender {
let _ = retransmit_sender.send(recovered_data_shreds);
}
});
}
}
start.stop();
let shred_recovery_elapsed = start.as_us();
metrics.shred_recovery_elapsed += start.as_us();
just_inserted_coding_shreds
.into_iter()
.for_each(|((_, _), shred)| {
self.check_insert_coding_shred(
shred,
&mut index_working_set,
&mut write_batch,
&mut index_meta_time,
);
num_inserted += 1;
});
metrics.num_inserted += just_inserted_coding_shreds.len() as u64;
for shred in just_inserted_coding_shreds.into_values() {
self.check_insert_coding_shred(
shred,
&mut index_working_set,
&mut write_batch,
&mut index_meta_time,
);
}
let mut start = Measure::start("Shred recovery");
// Handle chaining for the members of the slot_meta_working_set that were inserted into,
// drop the others
handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?;
start.stop();
let chaining_elapsed = start.as_us();
metrics.chaining_elapsed += start.as_us();
let mut start = Measure::start("Commit Working Sets");
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
@ -963,12 +965,12 @@ impl Blockstore {
}
}
start.stop();
let commit_working_sets_elapsed = start.as_us();
metrics.commit_working_sets_elapsed += start.as_us();
let mut start = Measure::start("Write Batch");
self.db.write(write_batch)?;
start.stop();
let write_batch_elapsed = start.as_us();
metrics.write_batch_elapsed += start.as_us();
send_signals(
&self.new_shreds_signals,
@ -979,20 +981,7 @@ impl Blockstore {
total_start.stop();
metrics.num_shreds += num_shreds;
metrics.total_elapsed += total_start.as_us();
metrics.insert_lock_elapsed += insert_lock_elapsed;
metrics.insert_shreds_elapsed += insert_shreds_elapsed;
metrics.shred_recovery_elapsed += shred_recovery_elapsed;
metrics.chaining_elapsed += chaining_elapsed;
metrics.commit_working_sets_elapsed += commit_working_sets_elapsed;
metrics.write_batch_elapsed += write_batch_elapsed;
metrics.num_inserted += num_inserted;
metrics.num_recovered += num_recovered;
metrics.num_recovered_inserted += num_recovered_inserted;
metrics.num_recovered_failed_sig += num_recovered_failed_sig;
metrics.num_recovered_failed_invalid = num_recovered_failed_invalid;
metrics.num_recovered_exists = num_recovered_exists;
metrics.index_meta_time += index_meta_time;
Ok((newly_completed_data_sets, inserted_indices))
@ -1034,7 +1023,8 @@ impl Blockstore {
vec![false; shreds_len],
leader_schedule,
is_trusted,
&|_| {},
None, // retransmit-sender
&|_| {}, // handle-duplicates
&mut BlockstoreInsertionMetrics::default(),
)
}