diff --git a/src/erasure.rs b/src/erasure.rs index 0b0b2cb9df..8eea39ca55 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -342,6 +342,7 @@ pub fn generate_coding( // examine the window beginning at block_start for missing or // stale (based on block_start_idx) blobs // if a blob is stale, remove it from the window slot +// side effect: block will be cleaned of old blobs fn find_missing( debug_id: u64, block_start_idx: u64, @@ -359,54 +360,49 @@ fn find_missing( let idx = (i - block_start) as u64 + block_start_idx; let n = i % window.len(); - // swap blob out with None, if it's in the right place, put it back - if let Some(blob) = mem::replace(&mut window[n].data, None) { - let blob_idx = blob.read().unwrap().get_index().unwrap(); - if blob_idx == idx { - trace!("recover {:x}: idx: {} good data", debug_id, idx); - mem::replace(&mut window[n].data, Some(blob)); + fn is_missing( + debug_id: u64, + idx: u64, + window_slot: &mut Option, + recycler: &BlobRecycler, + c_or_d: &str, + ) -> bool { + if let Some(blob) = mem::replace(window_slot, None) { + let blob_idx = blob.read().unwrap().get_index().unwrap(); + if blob_idx == idx { + trace!("recover {:x}: idx: {} good {}", debug_id, idx, c_or_d); + mem::replace(window_slot, Some(blob)); + false + } else { + trace!( + "recover {:x}: idx: {} old {} {}, recycling", + debug_id, + idx, + c_or_d, + blob_idx, + ); + recycler.recycle(blob); + true + } } else { - trace!( - "recover {:x}: idx: {} old data {}, recycling", - debug_id, - idx, - blob_idx - ); - recycler.recycle(blob); - data_missing += 1; + trace!("recover {:x}: idx: {} None {}", debug_id, idx, c_or_d); + true } - } else { - trace!("recover {:x}: idx: {} None data", debug_id, idx); + } + + if is_missing(debug_id, idx, &mut window[n].data, recycler, "data") { data_missing += 1; } - if i >= coding_start { - // swap blob out with None, if it's in the right place, put it back - if let Some(blob) = mem::replace(&mut window[n].coding, None) { - let blob_idx = blob.read().unwrap().get_index().unwrap(); - if blob_idx == idx { - trace!("recover {:x}: idx: {} good coding", debug_id, idx); - mem::replace(&mut window[n].coding, Some(blob)); - } else { - trace!( - "recover {:x}: idx: {} old coding {}, recycling", - debug_id, - idx, - blob_idx - ); - recycler.recycle(blob); - coding_missing += 1; - } - } else { - trace!("recover {:x}: idx: {} None coding", debug_id, idx); - coding_missing += 1; - } + if i >= coding_start && is_missing(debug_id, idx, &mut window[n].coding, recycler, "coding") + { + coding_missing += 1; } } (data_missing, coding_missing) } -// Recover missing blocks into window +// Recover a missing block into window // missing blocks should be None or old... // Use recycler to allocate new ones. // If not enough coding or data blocks are present to restore @@ -418,178 +414,172 @@ pub fn recover( window: &mut [WindowSlot], start_idx: u64, start: usize, - num_blobs: usize, ) -> Result<()> { - let num_blocks = (num_blobs / NUM_DATA) + 1; - let mut block_start = start - (start % NUM_DATA); - let mut block_start_idx = start_idx - (start_idx % NUM_DATA as u64); + let block_start = start - (start % NUM_DATA); + let block_start_idx = start_idx - (start_idx % NUM_DATA as u64); - debug!( - "num_blocks: {} start: {} num_blobs: {} block_start: {}", - num_blocks, start, num_blobs, block_start + debug!("start: {} block_start: {}", start, block_start); + + let coding_start = block_start + NUM_DATA - NUM_CODING; + let block_end = block_start + NUM_DATA; + trace!( + "recover {:x}: block_start_idx: {} block_start: {} coding_start: {} block_end: {}", + debug_id, + block_start_idx, + block_start, + coding_start, + block_end ); - for _ in 0..num_blocks { - let coding_start = block_start + NUM_DATA - NUM_CODING; - let block_end = block_start + NUM_DATA; + let (data_missing, coding_missing) = + find_missing(debug_id, block_start_idx, block_start, window, recycler); + + // if we're not missing data, or if we have too much missin but have enough coding + if data_missing == 0 { + // nothing to do... + return Ok(()); + } + + if (data_missing + coding_missing) > NUM_CODING { trace!( - "recover {:x}: block_start_idx: {} block_start: {} coding_start: {} block_end: {}", + "recover {:x}: start: {} skipping recovery data: {} coding: {}", debug_id, - block_start_idx, block_start, - coding_start, - block_end - ); - - let (data_missing, coding_missing) = - find_missing(debug_id, block_start_idx, block_start, window, recycler); - - // if we're not missing data, or if we have too much missin but have enough coding - if data_missing == 0 || (data_missing + coding_missing) > NUM_CODING { - trace!( - "recover {:x}: start: {} skipping recovery data: {} coding: {}", - debug_id, - block_start, - data_missing, - coding_missing - ); - block_start += NUM_DATA; - block_start_idx += NUM_DATA as u64; - // on to the next block - continue; - } - trace!( - "recover {:x}: recovering: data: {} coding: {}", - debug_id, data_missing, coding_missing ); - let mut blobs: Vec = Vec::with_capacity(NUM_DATA + NUM_CODING); - let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING); - let mut erasures: Vec = Vec::with_capacity(NUM_CODING); - let mut meta = None; - let mut size = None; - - // add the data blobs we have into recovery blob vector - for i in block_start..block_end { - let j = i % window.len(); - - if let Some(b) = window[j].data.clone() { - if meta.is_none() { - meta = Some(b.read().unwrap().meta.clone()); - trace!("recover {:x} meta at {} {:?}", debug_id, j, meta); - } - blobs.push(b); - } else { - let n = recycler.allocate(); - window[j].data = Some(n.clone()); - // mark the missing memory - blobs.push(n); - erasures.push((i - block_start) as i32); - } - } - for i in coding_start..block_end { - let j = i % window.len(); - if let Some(b) = window[j].coding.clone() { - if size.is_none() { - size = Some(b.read().unwrap().meta.size - BLOB_HEADER_SIZE); - } - blobs.push(b); - } else { - let n = recycler.allocate(); - window[j].coding = Some(n.clone()); - //mark the missing memory - blobs.push(n); - erasures.push(((i - coding_start) + NUM_DATA) as i32); - } - } - // now that we have size (from coding), zero out data blob tails - for i in block_start..block_end { - let j = i % window.len(); - - if let Some(b) = &window[j].data { - let size = size.unwrap(); - let mut b_wl = b.write().unwrap(); - for i in b_wl.meta.size..size { - b_wl.data[i] = 0; - } - } - } - - // marks end of erasures - erasures.push(-1); - trace!( - "erasures[]: {:x} {:?} data_size: {}", - debug_id, - erasures, - size.unwrap(), - ); - //lock everything for write - for b in &blobs { - locks.push(b.write().expect("'locks' arr in pb fn recover")); - } - - { - let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); - let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); - for (i, l) in locks.iter_mut().enumerate() { - if i < NUM_DATA { - trace!("{:x} pushing data: {}", debug_id, i); - data_ptrs.push(&mut l.data[..size.unwrap()]); - } else { - trace!("{:x} pushing coding: {}", debug_id, i); - coding_ptrs.push(&mut l.data_mut()[..size.unwrap()]); - } - } - trace!( - "{:x} coding_ptrs.len: {} data_ptrs.len {}", - debug_id, - coding_ptrs.len(), - data_ptrs.len() - ); - decode_blocks( - data_ptrs.as_mut_slice(), - coding_ptrs.as_mut_slice(), - &erasures, - )?; - } - - let mut corrupt = false; - // repopulate header data size from recovered blob contents - for i in &erasures[..erasures.len() - 1] { - let n = *i as usize; - let mut idx = n as u64 + block_start_idx; - - let mut data_size; - if n < NUM_DATA { - data_size = locks[n].get_data_size().unwrap(); - data_size -= BLOB_HEADER_SIZE as u64; - } else { - data_size = size.unwrap() as u64; - idx -= NUM_CODING as u64; - locks[n].set_index(idx).unwrap(); - } - - locks[n].meta = meta.clone().unwrap(); - locks[n].set_size(data_size as usize); - trace!( - "{:x} erasures[{}] ({}) size: {:x} data[0]: {}", - debug_id, - *i, - idx, - data_size, - locks[n].data()[0] - ); - if data_size > BLOB_SIZE as u64 { - corrupt = true; - } - } - assert!(!corrupt, " {:x} ", debug_id); - - block_start += NUM_DATA; - block_start_idx += NUM_DATA as u64; + // nothing to do... + return Err(ErasureError::NotEnoughBlocksToDecode); } + trace!( + "recover {:x}: recovering: data: {} coding: {}", + debug_id, + data_missing, + coding_missing + ); + let mut blobs: Vec = Vec::with_capacity(NUM_DATA + NUM_CODING); + let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING); + let mut erasures: Vec = Vec::with_capacity(NUM_CODING); + let mut meta = None; + let mut size = None; + + // add the data blobs we have into recovery blob vector + for i in block_start..block_end { + let j = i % window.len(); + + if let Some(b) = window[j].data.clone() { + if meta.is_none() { + meta = Some(b.read().unwrap().meta.clone()); + trace!("recover {:x} meta at {} {:?}", debug_id, j, meta); + } + blobs.push(b); + } else { + let n = recycler.allocate(); + window[j].data = Some(n.clone()); + // mark the missing memory + blobs.push(n); + erasures.push((i - block_start) as i32); + } + } + for i in coding_start..block_end { + let j = i % window.len(); + if let Some(b) = window[j].coding.clone() { + if size.is_none() { + size = Some(b.read().unwrap().meta.size - BLOB_HEADER_SIZE); + } + blobs.push(b); + } else { + let n = recycler.allocate(); + window[j].coding = Some(n.clone()); + //mark the missing memory + blobs.push(n); + erasures.push(((i - coding_start) + NUM_DATA) as i32); + } + } + // now that we have size (from coding), zero out data blob tails + for i in block_start..block_end { + let j = i % window.len(); + + if let Some(b) = &window[j].data { + let size = size.unwrap(); + let mut b_wl = b.write().unwrap(); + for i in b_wl.meta.size..size { + b_wl.data[i] = 0; + } + } + } + + // marks end of erasures + erasures.push(-1); + trace!( + "erasures[]: {:x} {:?} data_size: {}", + debug_id, + erasures, + size.unwrap(), + ); + //lock everything for write + for b in &blobs { + locks.push(b.write().expect("'locks' arr in pb fn recover")); + } + + { + let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); + let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); + for (i, l) in locks.iter_mut().enumerate() { + if i < NUM_DATA { + trace!("{:x} pushing data: {}", debug_id, i); + data_ptrs.push(&mut l.data[..size.unwrap()]); + } else { + trace!("{:x} pushing coding: {}", debug_id, i); + coding_ptrs.push(&mut l.data_mut()[..size.unwrap()]); + } + } + trace!( + "{:x} coding_ptrs.len: {} data_ptrs.len {}", + debug_id, + coding_ptrs.len(), + data_ptrs.len() + ); + decode_blocks( + data_ptrs.as_mut_slice(), + coding_ptrs.as_mut_slice(), + &erasures, + )?; + } + + let mut corrupt = false; + // repopulate header data size from recovered blob contents + for i in &erasures[..erasures.len() - 1] { + let n = *i as usize; + let mut idx = n as u64 + block_start_idx; + + let mut data_size; + if n < NUM_DATA { + data_size = locks[n].get_data_size().unwrap(); + data_size -= BLOB_HEADER_SIZE as u64; + } else { + data_size = size.unwrap() as u64; + idx -= NUM_CODING as u64; + locks[n].set_index(idx).unwrap(); + } + + locks[n].meta = meta.clone().unwrap(); + locks[n].set_size(data_size as usize); + trace!( + "{:x} erasures[{}] ({}) size: {:x} data[0]: {}", + debug_id, + *i, + idx, + data_size, + locks[n].data()[0] + ); + if data_size > BLOB_SIZE as u64 { + corrupt = true; + } + } + assert!(!corrupt, " {:x} ", debug_id); + Ok(()) } @@ -831,7 +821,6 @@ mod test { &mut window, (offset + WINDOW_SIZE) as u64, offset, - num_blobs ).is_ok() ); println!("** after-recover:"); @@ -879,7 +868,6 @@ mod test { &mut window, (offset + WINDOW_SIZE) as u64, offset, - num_blobs ).is_ok() ); println!("** after-recover:"); @@ -926,7 +914,6 @@ mod test { &mut window, (offset + WINDOW_SIZE) as u64, offset, - num_blobs ).is_ok() ); println!("** after-recover:"); diff --git a/src/streamer.rs b/src/streamer.rs index acd5744a0b..4ef44ac043 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -297,23 +297,22 @@ fn retransmit_all_leader_blocks( /// starting from consumed is thereby formed, add that continuous /// range of blobs to a queue to be sent on to the next stage. /// -/// * `b` - the blob to be processed into the window and rebroadcast +/// * `debug_id` - this node's id in a useful-for-debug format +/// * `blob` - the blob to be processed into the window and rebroadcast /// * `pix` - the index of the blob, corresponds to /// the entry height of this blob -/// * `w` - the index this blob would land at within the window /// * `consume_queue` - output, blobs to be rebroadcast are placed here /// * `window` - the window we're operating on -/// * `debug_id` - this node's id in a useful-for-debug format /// * `recycler` - where to return the blob once processed, also where /// to return old blobs from the window /// * `consumed` - input/output, the entry-height to which this /// node has populated and rebroadcast entries fn process_blob( + debug_id: u64, blob: SharedBlob, pix: u64, consume_queue: &mut SharedBlobs, window: &Window, - debug_id: u64, recycler: &BlobRecycler, consumed: &mut u64, received: u64, @@ -327,31 +326,52 @@ fn process_blob( blob_r.is_coding() }; + // insert a newly received blob into a window slot, clearing out and recycling any previous + // blob unless the incoming blob is a duplicate (based on idx) + // returns whether the incoming is a duplicate blob + fn insert_blob_is_dup( + debug_id: u64, + blob: SharedBlob, + pix: u64, + window_slot: &mut Option, + recycler: &BlobRecycler, + c_or_d: &str, + ) -> bool { + if let Some(old) = mem::replace(window_slot, Some(blob)) { + if old.read().unwrap().get_index().unwrap() == pix { + trace!( + "{:x}: duplicate {} blob at index {:}", + debug_id, + c_or_d, + pix + ); + } + trace!( + "{:x}: recycling {} blob at index {:}", + debug_id, + c_or_d, + pix + ); + recycler.recycle(old); + true + } else { + trace!("{:x}: empty {} window slot {:}", debug_id, c_or_d, pix); + false + } + } + // insert the new blob into the window, overwrite and recycle old (or duplicate) entry let is_duplicate = if is_coding { - if let Some(old) = mem::replace(&mut window[w].coding, Some(blob)) { - if old.read().unwrap().get_index().unwrap() == pix { - trace!("{:x}: duplicate coding blob at index {:}", debug_id, pix); - } - trace!("{:x}: recycling coding blob at index {:}", debug_id, pix); - recycler.recycle(old); - true - } else { - trace!("{:x}: empty coding window slot {:}", debug_id, pix); - false - } + insert_blob_is_dup( + debug_id, + blob, + pix, + &mut window[w].coding, + recycler, + "coding", + ) } else { - if let Some(old) = mem::replace(&mut window[w].data, Some(blob)) { - if old.read().unwrap().get_index().unwrap() == pix { - trace!("{:x}: duplicate data blob at index {:}", debug_id, pix); - } - trace!("{:x}: recycling data blob at index {:}", debug_id, pix); - recycler.recycle(old); - true - } else { - trace!("{:x}: empty data window slot {:}", debug_id, pix); - false - } + insert_blob_is_dup(debug_id, blob, pix, &mut window[w].data, recycler, "data") }; if is_duplicate { @@ -366,7 +386,6 @@ fn process_blob( &mut window, *consumed, (*consumed % WINDOW_SIZE) as usize, - (received - *consumed) as usize, ).is_err() { trace!("{:x}: erasure::recover failed", debug_id); @@ -460,11 +479,11 @@ fn recv_window( trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size); process_blob( + debug_id, b, pix, &mut consume_queue, window, - debug_id, recycler, consumed, *received,