This commit is contained in:
Rob Walker 2018-07-18 13:28:03 -07:00
parent d713e3c2cf
commit 3d80926508
3 changed files with 133 additions and 86 deletions

View File

@ -334,16 +334,19 @@ pub fn recover(
// if we're not missing data, or if we have too much missin but have enough coding
if data_missing == 0 || (data_missing + coding_missing) > NUM_CODING {
debug!(
trace!(
"1: start: {} skipping recovery data: {} coding: {}",
block_start, data_missing, coding_missing
block_start,
data_missing,
coding_missing
);
block_start += NUM_DATA;
continue;
}
debug!(
trace!(
"2: recovering: data: {} coding: {}",
data_missing, coding_missing
data_missing,
coding_missing
);
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(NUM_DATA + NUM_CODING);
let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING);
@ -359,6 +362,7 @@ pub fn recover(
if meta.is_none() {
let bl = window[j].data.clone().unwrap();
meta = Some(bl.read().unwrap().meta.clone());
trace!("meta at {} {:?}", i, meta);
}
blobs.push(
window[j]
@ -392,7 +396,7 @@ pub fn recover(
window[j].coding = Some(n.clone());
//mark the missing memory
blobs.push(n);
erasures.push((i - block_start + NUM_DATA) as i32);
erasures.push(((i - coding_start) + NUM_DATA) as i32);
}
}
erasures.push(-1);
@ -406,6 +410,7 @@ pub fn recover(
for b in &blobs {
locks.push(b.write().expect("'locks' arr in pb fn recover"));
}
{
let mut coding_ptrs: Vec<&[u8]> = Vec::with_capacity(NUM_CODING);
let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
@ -427,7 +432,12 @@ pub fn recover(
}
for i in &erasures[..erasures.len() - 1] {
let idx = *i as usize;
let data_size = locks[idx].get_data_size().unwrap() - BLOB_HEADER_SIZE as u64;
let mut data_size = locks[idx].get_data_size().unwrap();
trace!("data_size at {} {}", *i, data_size);
data_size -= BLOB_HEADER_SIZE as u64;
locks[idx].meta = meta.clone().unwrap();
locks[idx].set_size(data_size as usize);
trace!(
@ -519,7 +529,7 @@ mod test {
print!("{:>w$} ", window_l2.data()[i], w = 2);
}
} else {
print!("data null");
print!("data null ");
}
if w.coding.is_some() {
let window_l1 = w.coding.clone().unwrap();
@ -599,30 +609,71 @@ mod test {
println!("** after-gen-coding:");
print_window(&window);
println!("** whack data block:");
// test erasing a data block
let erase_offset = offset;
// Create a hole in the window
let refwindow = window[erase_offset].data.clone();
window[erase_offset].data = None;
print_window(&window);
// Recover it from coding
assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok());
println!("** after-recover:");
print_window(&window);
// Check the result
let window_l = window[erase_offset].data.clone().unwrap();
let window_l2 = window_l.read().unwrap();
let ref_l = refwindow.clone().unwrap();
let ref_l2 = ref_l.read().unwrap();
assert_eq!(
window_l2.data[..(data_len + BLOB_HEADER_SIZE)],
ref_l2.data[..(data_len + BLOB_HEADER_SIZE)]
);
assert_eq!(window_l2.meta.size, ref_l2.meta.size);
assert_eq!(window_l2.meta.addr, ref_l2.meta.addr);
assert_eq!(window_l2.meta.port, ref_l2.meta.port);
assert_eq!(window_l2.meta.v6, ref_l2.meta.v6);
assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64);
{
// Check the result, block is here to drop locks
let window_l = window[erase_offset].data.clone().unwrap();
let window_l2 = window_l.read().unwrap();
let ref_l = refwindow.clone().unwrap();
let ref_l2 = ref_l.read().unwrap();
assert_eq!(
window_l2.data[..(data_len + BLOB_HEADER_SIZE)],
ref_l2.data[..(data_len + BLOB_HEADER_SIZE)]
);
assert_eq!(window_l2.meta.size, ref_l2.meta.size);
assert_eq!(window_l2.meta.addr, ref_l2.meta.addr);
assert_eq!(window_l2.meta.port, ref_l2.meta.port);
assert_eq!(window_l2.meta.v6, ref_l2.meta.v6);
assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64);
}
println!("** whack coding block and data block");
// test erasing a coding block
let erase_offset = offset + erasure::NUM_DATA - erasure::NUM_CODING;
// Create a hole in the window
blob_recycler.recycle(window[erase_offset].data.clone().unwrap());
window[erase_offset].data = None;
let refwindow = window[erase_offset].coding.clone();
window[erase_offset].coding = None;
print_window(&window);
// Recover it from coding
assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok());
println!("** after-recover:");
print_window(&window);
{
// Check the result, block is here to drop locks
let window_l = window[erase_offset].coding.clone().unwrap();
let window_l2 = window_l.read().unwrap();
let ref_l = refwindow.clone().unwrap();
let ref_l2 = ref_l.read().unwrap();
assert_eq!(
window_l2.data[..(data_len + BLOB_HEADER_SIZE)],
ref_l2.data[..(data_len + BLOB_HEADER_SIZE)]
);
assert_eq!(window_l2.meta.size, ref_l2.meta.size);
assert_eq!(window_l2.meta.addr, ref_l2.meta.addr);
assert_eq!(window_l2.meta.port, ref_l2.meta.port);
assert_eq!(window_l2.meta.v6, ref_l2.meta.v6);
assert_eq!(window_l2.get_index().unwrap(), erase_offset as u64);
}
}
// //TODO This needs to be reworked

View File

@ -25,7 +25,7 @@ pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE;
pub const PACKET_DATA_SIZE: usize = 256;
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;
#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
#[repr(C)]
pub struct Meta {
pub size: usize,

View File

@ -30,25 +30,6 @@ pub struct WindowSlot {
pub coding: Option<SharedBlob>,
}
//impl Copy for WindowSlot {}
//impl Clone for WindowSlot {
// fn clone(&self) -> WindowSlot {
// WindowSlot {
// data: if self.data.is_some() {
// Some(self.data.clone())
// } else {
// None
// },
// coding: if self.coding.is_some() {
// Some(self.coding.clone())
// } else {
// None
// },
// }
// }
//}
pub type Window = Arc<RwLock<Vec<WindowSlot>>>;
#[derive(Debug, PartialEq, Eq)]
@ -217,18 +198,6 @@ fn repair_window(
consumed: &mut u64,
received: &mut u64,
) -> Result<()> {
#[cfg(feature = "erasure")]
{
if erasure::recover(
_recycler,
&mut locked_window.write().unwrap(),
*consumed as usize,
*received as usize,
).is_err()
{
trace!("erasure::recover failed");
}
}
//exponential backoff
if *last != *consumed {
*times = 0;
@ -354,6 +323,7 @@ fn process_blob(
debug_id: u64,
recycler: &BlobRecycler,
consumed: &mut u64,
received: u64,
) {
let mut window = locked_window.write().unwrap();
@ -361,13 +331,24 @@ fn process_blob(
// of consumed to received and clear any old ones
for ix in *consumed..(pix + 1) {
let k = (ix % WINDOW_SIZE) as usize;
if let Some(b) = &mut window[k].data {
if b.read().unwrap().get_index().unwrap() >= *consumed as u64 {
continue;
let mut old = false;
if let Some(b) = &window[k].data {
old = b.read().unwrap().get_index().unwrap() < *consumed as u64;
}
if old {
if let Some(b) = mem::replace(&mut window[k].data, None) {
recycler.recycle(b);
}
}
if let Some(b) = mem::replace(&mut window[k].data, None) {
recycler.recycle(b);
let mut old = false;
if let Some(b) = &window[k].coding {
old = b.read().unwrap().get_index().unwrap() < *consumed as u64;
}
if old {
if let Some(b) = mem::replace(&mut window[k].coding, None) {
recycler.recycle(b);
}
}
}
@ -384,9 +365,9 @@ fn process_blob(
window[w].coding = Some(b);
} else if let Some(blob) = &window[w].coding {
if blob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("{:x}: overrun blob at index {:}", debug_id, w);
warn!("{:x}: overrun coding blob at index {:}", debug_id, w);
} else {
debug!("{:x}: duplicate blob at index {:}", debug_id, w);
debug!("{:x}: duplicate coding blob at index {:}", debug_id, w);
}
}
} else {
@ -394,13 +375,20 @@ fn process_blob(
window[w].data = Some(b);
} else if let Some(blob) = &window[w].data {
if blob.read().unwrap().get_index().unwrap() != pix as u64 {
warn!("{:x}: overrun blob at index {:}", debug_id, w);
warn!("{:x}: overrun data blob at index {:}", debug_id, w);
} else {
debug!("{:x}: duplicate blob at index {:}", debug_id, w);
debug!("{:x}: duplicate data blob at index {:}", debug_id, w);
}
}
}
#[cfg(feature = "erasure")]
{
if erasure::recover(recycler, &mut window, *consumed as usize, received as usize).is_err() {
trace!("erasure::recover failed");
}
}
// push all contiguous blobs into consumed queue, increment consumed
loop {
let k = (*consumed % WINDOW_SIZE) as usize;
@ -487,6 +475,7 @@ fn recv_window(
debug_id,
recycler,
consumed,
*received,
);
}
print_window(debug_id, locked_window, *consumed);
@ -507,30 +496,28 @@ fn recv_window(
}
fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) {
{
let buf: Vec<_> = locked_window
.read()
.unwrap()
.iter()
.enumerate()
.map(|(i, v)| {
if i == (consumed % WINDOW_SIZE) as usize {
"_"
} else if v.data.is_none() {
"0"
} else if let Some(ref cblob) = v.data {
if cblob.read().unwrap().is_coding() {
"C"
} else {
"1"
}
} else {
"0"
}
})
.collect();
trace!("{:x}:WINDOW ({}): {}", debug_id, consumed, buf.join(""));
}
let buf: Vec<_> = locked_window
.read()
.unwrap()
.iter()
.enumerate()
.map(|(i, v)| {
if i == (consumed % WINDOW_SIZE) as usize {
"_"
} else if v.data.is_none() && v.coding.is_none() {
"0"
} else if v.data.is_some() && v.coding.is_some() {
"X"
} else if v.data.is_some() {
// coding.is_none()
"D"
} else {
// data.is_none()
"C"
}
})
.collect();
trace!("{:x}:WINDOW ({}): {}", debug_id, consumed, buf.join(""));
}
pub fn default_window() -> Window {
@ -686,6 +673,15 @@ fn broadcast(
);
recycler.recycle(x);
}
if let Some(x) = mem::replace(&mut win[pos].coding, None) {
trace!(
"popped {} at {}",
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x);
}
trace!("null {}", pos);
}
while let Some(b) = blobs.pop() {