Randomize repair requests (#1059)

* randomize packet repair requests

* exponential random repair requests

* use gen_range to get a uniform distribution
This commit is contained in:
anatoly yakovenko 2018-08-27 07:05:48 -07:00 committed by GitHub
parent 8d0d429acd
commit 48762834d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 46 additions and 17 deletions

View File

@ -8,6 +8,7 @@ use erasure;
use ledger::Block;
use log::Level;
use packet::{BlobRecycler, SharedBlob, SharedBlobs, BLOB_SIZE};
use rand::{thread_rng, Rng};
use result::{Error, Result};
use signature::Pubkey;
use std::cmp;
@ -92,6 +93,27 @@ fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u6
cmp::min(consumed + WINDOW_SIZE - 1, highest_lost)
}
fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
//exponential backoff
if *last != consumed {
//start with a 50% chance of asking for repairs
*times = 1;
}
*last = consumed;
*times += 1;
// Experiment with capping repair request duration.
// Once nodes are too far behind they can spend many
// seconds without asking for repair
if *times > 128 {
// 50% chance that a request will fire between 64 - 128 tries
*times = 64;
}
//if we get lucky, make the request, which should exponentially get less likely
thread_rng().gen_range(0, *times as u64) == 0
}
fn repair_window(
debug_id: u64,
window: &SharedWindow,
@ -103,22 +125,7 @@ fn repair_window(
received: u64,
) -> Result<()> {
//exponential backoff
if *last != consumed {
*times = 0;
}
*last = consumed;
*times += 1;
// Experiment with capping repair request duration.
// Once nodes are too far behind they can spend many
// seconds without asking for repair
if *times > 128 {
*times = 65;
}
//if times flips from all 1s 7 -> 8, 15 -> 16, we retry otherwise return Ok
if *times & (*times - 1) != 0 {
trace!("repair_window counter {} {} {}", *times, consumed, received);
if !repair_backoff(last, times, consumed) {
return Ok(());
}
@ -697,7 +704,8 @@ mod test {
use std::time::Duration;
use streamer::{blob_receiver, receiver, responder, BlobReceiver, PacketReceiver};
use window::{
blob_idx_in_window, calculate_highest_lost_blob_index, default_window, window, WINDOW_SIZE,
blob_idx_in_window, calculate_highest_lost_blob_index, default_window, repair_backoff,
window, WINDOW_SIZE,
};
fn get_msgs(r: PacketReceiver, num: &mut usize) {
@ -1048,4 +1056,25 @@ mod test {
assert_eq!(wrap_blob_idx_in_window(0, 91, 90, 100), (true, 100));
assert_eq!(wrap_blob_idx_in_window(0, 101, 90, 100), (true, 101));
}
#[test]
pub fn test_repair_backoff() {
let mut last = 0;
let mut times = 0;
let total: usize = (0..127)
.map(|x| {
let rv = repair_backoff(&mut last, &mut times, 1) as usize;
assert_eq!(times, x + 2);
rv
})
.sum();
assert_eq!(times, 128);
assert_eq!(last, 1);
assert!(total > 0);
assert!(total < 127);
repair_backoff(&mut last, &mut times, 1);
assert_eq!(times, 64);
repair_backoff(&mut last, &mut times, 2);
assert_eq!(times, 2);
assert_eq!(last, 2);
}
}