placate clippy and reduce replicode

This commit is contained in:
Rob Walker 2018-07-23 22:20:37 -07:00
parent f11e60b801
commit caeb55d066
2 changed files with 235 additions and 229 deletions

View File

@ -342,6 +342,7 @@ pub fn generate_coding(
// examine the window beginning at block_start for missing or // examine the window beginning at block_start for missing or
// stale (based on block_start_idx) blobs // stale (based on block_start_idx) blobs
// if a blob is stale, remove it from the window slot // if a blob is stale, remove it from the window slot
// side effect: block will be cleaned of old blobs
fn find_missing( fn find_missing(
debug_id: u64, debug_id: u64,
block_start_idx: u64, block_start_idx: u64,
@ -359,54 +360,49 @@ fn find_missing(
let idx = (i - block_start) as u64 + block_start_idx; let idx = (i - block_start) as u64 + block_start_idx;
let n = i % window.len(); let n = i % window.len();
// swap blob out with None, if it's in the right place, put it back fn is_missing(
if let Some(blob) = mem::replace(&mut window[n].data, None) { debug_id: u64,
idx: u64,
window_slot: &mut Option<SharedBlob>,
recycler: &BlobRecycler,
c_or_d: &str,
) -> bool {
if let Some(blob) = mem::replace(window_slot, None) {
let blob_idx = blob.read().unwrap().get_index().unwrap(); let blob_idx = blob.read().unwrap().get_index().unwrap();
if blob_idx == idx { if blob_idx == idx {
trace!("recover {:x}: idx: {} good data", debug_id, idx); trace!("recover {:x}: idx: {} good {}", debug_id, idx, c_or_d);
mem::replace(&mut window[n].data, Some(blob)); mem::replace(window_slot, Some(blob));
false
} else { } else {
trace!( trace!(
"recover {:x}: idx: {} old data {}, recycling", "recover {:x}: idx: {} old {} {}, recycling",
debug_id, debug_id,
idx, idx,
blob_idx c_or_d,
blob_idx,
); );
recycler.recycle(blob); recycler.recycle(blob);
data_missing += 1; true
} }
} else { } else {
trace!("recover {:x}: idx: {} None data", debug_id, idx); trace!("recover {:x}: idx: {} None {}", debug_id, idx, c_or_d);
true
}
}
if is_missing(debug_id, idx, &mut window[n].data, recycler, "data") {
data_missing += 1; data_missing += 1;
} }
if i >= coding_start { if i >= coding_start && is_missing(debug_id, idx, &mut window[n].coding, recycler, "coding")
// swap blob out with None, if it's in the right place, put it back {
if let Some(blob) = mem::replace(&mut window[n].coding, None) {
let blob_idx = blob.read().unwrap().get_index().unwrap();
if blob_idx == idx {
trace!("recover {:x}: idx: {} good coding", debug_id, idx);
mem::replace(&mut window[n].coding, Some(blob));
} else {
trace!(
"recover {:x}: idx: {} old coding {}, recycling",
debug_id,
idx,
blob_idx
);
recycler.recycle(blob);
coding_missing += 1; coding_missing += 1;
} }
} else {
trace!("recover {:x}: idx: {} None coding", debug_id, idx);
coding_missing += 1;
}
}
} }
(data_missing, coding_missing) (data_missing, coding_missing)
} }
// Recover missing blocks into window // Recover a missing block into window
// missing blocks should be None or old... // missing blocks should be None or old...
// Use recycler to allocate new ones. // Use recycler to allocate new ones.
// If not enough coding or data blocks are present to restore // If not enough coding or data blocks are present to restore
@ -418,18 +414,12 @@ pub fn recover(
window: &mut [WindowSlot], window: &mut [WindowSlot],
start_idx: u64, start_idx: u64,
start: usize, start: usize,
num_blobs: usize,
) -> Result<()> { ) -> Result<()> {
let num_blocks = (num_blobs / NUM_DATA) + 1; let block_start = start - (start % NUM_DATA);
let mut block_start = start - (start % NUM_DATA); let block_start_idx = start_idx - (start_idx % NUM_DATA as u64);
let mut block_start_idx = start_idx - (start_idx % NUM_DATA as u64);
debug!( debug!("start: {} block_start: {}", start, block_start);
"num_blocks: {} start: {} num_blobs: {} block_start: {}",
num_blocks, start, num_blobs, block_start
);
for _ in 0..num_blocks {
let coding_start = block_start + NUM_DATA - NUM_CODING; let coding_start = block_start + NUM_DATA - NUM_CODING;
let block_end = block_start + NUM_DATA; let block_end = block_start + NUM_DATA;
trace!( trace!(
@ -445,7 +435,12 @@ pub fn recover(
find_missing(debug_id, block_start_idx, block_start, window, recycler); find_missing(debug_id, block_start_idx, block_start, window, recycler);
// if we're not missing data, or if we have too much missin but have enough coding // if we're not missing data, or if we have too much missin but have enough coding
if data_missing == 0 || (data_missing + coding_missing) > NUM_CODING { if data_missing == 0 {
// nothing to do...
return Ok(());
}
if (data_missing + coding_missing) > NUM_CODING {
trace!( trace!(
"recover {:x}: start: {} skipping recovery data: {} coding: {}", "recover {:x}: start: {} skipping recovery data: {} coding: {}",
debug_id, debug_id,
@ -453,11 +448,10 @@ pub fn recover(
data_missing, data_missing,
coding_missing coding_missing
); );
block_start += NUM_DATA; // nothing to do...
block_start_idx += NUM_DATA as u64; return Err(ErasureError::NotEnoughBlocksToDecode);
// on to the next block
continue;
} }
trace!( trace!(
"recover {:x}: recovering: data: {} coding: {}", "recover {:x}: recovering: data: {} coding: {}",
debug_id, debug_id,
@ -586,10 +580,6 @@ pub fn recover(
} }
assert!(!corrupt, " {:x} ", debug_id); assert!(!corrupt, " {:x} ", debug_id);
block_start += NUM_DATA;
block_start_idx += NUM_DATA as u64;
}
Ok(()) Ok(())
} }
@ -831,7 +821,6 @@ mod test {
&mut window, &mut window,
(offset + WINDOW_SIZE) as u64, (offset + WINDOW_SIZE) as u64,
offset, offset,
num_blobs
).is_ok() ).is_ok()
); );
println!("** after-recover:"); println!("** after-recover:");
@ -879,7 +868,6 @@ mod test {
&mut window, &mut window,
(offset + WINDOW_SIZE) as u64, (offset + WINDOW_SIZE) as u64,
offset, offset,
num_blobs
).is_ok() ).is_ok()
); );
println!("** after-recover:"); println!("** after-recover:");
@ -926,7 +914,6 @@ mod test {
&mut window, &mut window,
(offset + WINDOW_SIZE) as u64, (offset + WINDOW_SIZE) as u64,
offset, offset,
num_blobs
).is_ok() ).is_ok()
); );
println!("** after-recover:"); println!("** after-recover:");

View File

@ -297,23 +297,22 @@ fn retransmit_all_leader_blocks(
/// starting from consumed is thereby formed, add that continuous /// starting from consumed is thereby formed, add that continuous
/// range of blobs to a queue to be sent on to the next stage. /// range of blobs to a queue to be sent on to the next stage.
/// ///
/// * `b` - the blob to be processed into the window and rebroadcast /// * `debug_id` - this node's id in a useful-for-debug format
/// * `blob` - the blob to be processed into the window and rebroadcast
/// * `pix` - the index of the blob, corresponds to /// * `pix` - the index of the blob, corresponds to
/// the entry height of this blob /// the entry height of this blob
/// * `w` - the index this blob would land at within the window
/// * `consume_queue` - output, blobs to be rebroadcast are placed here /// * `consume_queue` - output, blobs to be rebroadcast are placed here
/// * `window` - the window we're operating on /// * `window` - the window we're operating on
/// * `debug_id` - this node's id in a useful-for-debug format
/// * `recycler` - where to return the blob once processed, also where /// * `recycler` - where to return the blob once processed, also where
/// to return old blobs from the window /// to return old blobs from the window
/// * `consumed` - input/output, the entry-height to which this /// * `consumed` - input/output, the entry-height to which this
/// node has populated and rebroadcast entries /// node has populated and rebroadcast entries
fn process_blob( fn process_blob(
debug_id: u64,
blob: SharedBlob, blob: SharedBlob,
pix: u64, pix: u64,
consume_queue: &mut SharedBlobs, consume_queue: &mut SharedBlobs,
window: &Window, window: &Window,
debug_id: u64,
recycler: &BlobRecycler, recycler: &BlobRecycler,
consumed: &mut u64, consumed: &mut u64,
received: u64, received: u64,
@ -327,31 +326,52 @@ fn process_blob(
blob_r.is_coding() blob_r.is_coding()
}; };
// insert a newly received blob into a window slot, clearing out and recycling any previous
// blob unless the incoming blob is a duplicate (based on idx)
// returns whether the incoming is a duplicate blob
fn insert_blob_is_dup(
debug_id: u64,
blob: SharedBlob,
pix: u64,
window_slot: &mut Option<SharedBlob>,
recycler: &BlobRecycler,
c_or_d: &str,
) -> bool {
if let Some(old) = mem::replace(window_slot, Some(blob)) {
if old.read().unwrap().get_index().unwrap() == pix {
trace!(
"{:x}: duplicate {} blob at index {:}",
debug_id,
c_or_d,
pix
);
}
trace!(
"{:x}: recycling {} blob at index {:}",
debug_id,
c_or_d,
pix
);
recycler.recycle(old);
true
} else {
trace!("{:x}: empty {} window slot {:}", debug_id, c_or_d, pix);
false
}
}
// insert the new blob into the window, overwrite and recycle old (or duplicate) entry // insert the new blob into the window, overwrite and recycle old (or duplicate) entry
let is_duplicate = if is_coding { let is_duplicate = if is_coding {
if let Some(old) = mem::replace(&mut window[w].coding, Some(blob)) { insert_blob_is_dup(
if old.read().unwrap().get_index().unwrap() == pix { debug_id,
trace!("{:x}: duplicate coding blob at index {:}", debug_id, pix); blob,
} pix,
trace!("{:x}: recycling coding blob at index {:}", debug_id, pix); &mut window[w].coding,
recycler.recycle(old); recycler,
true "coding",
)
} else { } else {
trace!("{:x}: empty coding window slot {:}", debug_id, pix); insert_blob_is_dup(debug_id, blob, pix, &mut window[w].data, recycler, "data")
false
}
} else {
if let Some(old) = mem::replace(&mut window[w].data, Some(blob)) {
if old.read().unwrap().get_index().unwrap() == pix {
trace!("{:x}: duplicate data blob at index {:}", debug_id, pix);
}
trace!("{:x}: recycling data blob at index {:}", debug_id, pix);
recycler.recycle(old);
true
} else {
trace!("{:x}: empty data window slot {:}", debug_id, pix);
false
}
}; };
if is_duplicate { if is_duplicate {
@ -366,7 +386,6 @@ fn process_blob(
&mut window, &mut window,
*consumed, *consumed,
(*consumed % WINDOW_SIZE) as usize, (*consumed % WINDOW_SIZE) as usize,
(received - *consumed) as usize,
).is_err() ).is_err()
{ {
trace!("{:x}: erasure::recover failed", debug_id); trace!("{:x}: erasure::recover failed", debug_id);
@ -460,11 +479,11 @@ fn recv_window(
trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size); trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size);
process_blob( process_blob(
debug_id,
b, b,
pix, pix,
&mut consume_queue, &mut consume_queue,
window, window,
debug_id,
recycler, recycler,
consumed, consumed,
*received, *received,