cargo fmt
This commit is contained in:
parent
f511ac9be7
commit
a06535d7ed
10
src/crdt.rs
10
src/crdt.rs
|
@ -312,9 +312,13 @@ impl Crdt {
|
||||||
let bl = b.unwrap();
|
let bl = b.unwrap();
|
||||||
let blob = bl.read().expect("blob read lock in streamer::broadcast");
|
let blob = bl.read().expect("blob read lock in streamer::broadcast");
|
||||||
//TODO profile this, may need multiple sockets for par_iter
|
//TODO profile this, may need multiple sockets for par_iter
|
||||||
trace!("broadcast idx: {} sz: {} to {} coding: {}",
|
trace!(
|
||||||
blob.get_index().unwrap(), blob.meta.size,
|
"broadcast idx: {} sz: {} to {} coding: {}",
|
||||||
v.replicate_addr, blob.is_coding());
|
blob.get_index().unwrap(),
|
||||||
|
blob.meta.size,
|
||||||
|
v.replicate_addr,
|
||||||
|
blob.is_coding()
|
||||||
|
);
|
||||||
assert!(blob.meta.size < BLOB_SIZE);
|
assert!(blob.meta.size < BLOB_SIZE);
|
||||||
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr);
|
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr);
|
||||||
trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
||||||
|
|
|
@ -74,14 +74,22 @@ pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Resul
|
||||||
let mut data_arg = Vec::new();
|
let mut data_arg = Vec::new();
|
||||||
for block in data {
|
for block in data {
|
||||||
if block_len != block.len() {
|
if block_len != block.len() {
|
||||||
trace!("data block size incorrect {} expected {}", block.len(), block_len);
|
trace!(
|
||||||
|
"data block size incorrect {} expected {}",
|
||||||
|
block.len(),
|
||||||
|
block_len
|
||||||
|
);
|
||||||
return Err(ErasureError::InvalidBlockSize);
|
return Err(ErasureError::InvalidBlockSize);
|
||||||
}
|
}
|
||||||
data_arg.push(block.as_ptr());
|
data_arg.push(block.as_ptr());
|
||||||
}
|
}
|
||||||
for mut block in coding {
|
for mut block in coding {
|
||||||
if block_len != block.len() {
|
if block_len != block.len() {
|
||||||
trace!("coding block size incorrect {} expected {}", block.len(), block_len);
|
trace!(
|
||||||
|
"coding block size incorrect {} expected {}",
|
||||||
|
block.len(),
|
||||||
|
block_len
|
||||||
|
);
|
||||||
return Err(ErasureError::InvalidBlockSize);
|
return Err(ErasureError::InvalidBlockSize);
|
||||||
}
|
}
|
||||||
coding_arg.push(block.as_mut_ptr());
|
coding_arg.push(block.as_mut_ptr());
|
||||||
|
@ -182,13 +190,15 @@ pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec<SharedBlob>, co
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate coding blocks in window starting from consumed
|
// Generate coding blocks in window starting from consumed
|
||||||
pub fn generate_coding(window: &mut Vec<Option<SharedBlob>>, consumed: usize, num_blobs: usize) -> Result<()> {
|
pub fn generate_coding(
|
||||||
|
window: &mut Vec<Option<SharedBlob>>,
|
||||||
|
consumed: usize,
|
||||||
|
num_blobs: usize,
|
||||||
|
) -> Result<()> {
|
||||||
let mut block_start = consumed - (consumed % NUM_CODED);
|
let mut block_start = consumed - (consumed % NUM_CODED);
|
||||||
|
|
||||||
for i in consumed..consumed + num_blobs {
|
for i in consumed..consumed + num_blobs {
|
||||||
if (i % NUM_CODED) == (NUM_CODED - 1) {
|
if (i % NUM_CODED) == (NUM_CODED - 1) {
|
||||||
|
|
||||||
let mut data_blobs = Vec::new();
|
let mut data_blobs = Vec::new();
|
||||||
let mut coding_blobs = Vec::new();
|
let mut coding_blobs = Vec::new();
|
||||||
let mut data_locks = Vec::new();
|
let mut data_locks = Vec::new();
|
||||||
|
@ -262,8 +272,14 @@ pub fn generate_coding(window: &mut Vec<Option<SharedBlob>>, consumed: usize, nu
|
||||||
}
|
}
|
||||||
|
|
||||||
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
|
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
|
||||||
debug!("consumed: {} data: {}:{} coding: {}:{}", consumed,
|
debug!(
|
||||||
block_start, block_start + NUM_DATA, coding_start, coding_end);
|
"consumed: {} data: {}:{} coding: {}:{}",
|
||||||
|
consumed,
|
||||||
|
block_start,
|
||||||
|
block_start + NUM_DATA,
|
||||||
|
coding_start,
|
||||||
|
coding_end
|
||||||
|
);
|
||||||
block_start += NUM_CODED;
|
block_start += NUM_CODED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -288,7 +304,10 @@ pub fn recover(
|
||||||
let mut block_start = consumed - (consumed % NUM_CODED);
|
let mut block_start = consumed - (consumed % NUM_CODED);
|
||||||
|
|
||||||
if num_blocks > 0 {
|
if num_blocks > 0 {
|
||||||
debug!("num_blocks: {} received: {} consumed: {}", num_blocks, received, consumed);
|
debug!(
|
||||||
|
"num_blocks: {} received: {} consumed: {}",
|
||||||
|
num_blocks, received, consumed
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
for i in 0..num_blocks {
|
for i in 0..num_blocks {
|
||||||
|
@ -316,11 +335,17 @@ pub fn recover(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (data_missing + coded_missing) != NUM_CODED && (data_missing + coded_missing) != 0 {
|
if (data_missing + coded_missing) != NUM_CODED && (data_missing + coded_missing) != 0 {
|
||||||
debug!("1: start: {} recovering: data: {} coding: {}", block_start, data_missing, coded_missing);
|
debug!(
|
||||||
|
"1: start: {} recovering: data: {} coding: {}",
|
||||||
|
block_start, data_missing, coded_missing
|
||||||
|
);
|
||||||
}
|
}
|
||||||
if data_missing > 0 {
|
if data_missing > 0 {
|
||||||
if (data_missing + coded_missing) <= MAX_MISSING {
|
if (data_missing + coded_missing) <= MAX_MISSING {
|
||||||
debug!("2: recovering: data: {} coding: {}", data_missing, coded_missing);
|
debug!(
|
||||||
|
"2: recovering: data: {} coding: {}",
|
||||||
|
data_missing, coded_missing
|
||||||
|
);
|
||||||
let mut blobs: Vec<SharedBlob> = Vec::new();
|
let mut blobs: Vec<SharedBlob> = Vec::new();
|
||||||
let mut locks = Vec::new();
|
let mut locks = Vec::new();
|
||||||
let mut erasures: Vec<i32> = Vec::new();
|
let mut erasures: Vec<i32> = Vec::new();
|
||||||
|
@ -348,7 +373,12 @@ pub fn recover(
|
||||||
erasures.push((i - block_start) as i32);
|
erasures.push((i - block_start) as i32);
|
||||||
}
|
}
|
||||||
erasures.push(-1);
|
erasures.push(-1);
|
||||||
trace!("erasures: {:?} data_size: {} header_size: {}", erasures, size.unwrap(), BLOB_HEADER_SIZE);
|
trace!(
|
||||||
|
"erasures: {:?} data_size: {} header_size: {}",
|
||||||
|
erasures,
|
||||||
|
size.unwrap(),
|
||||||
|
BLOB_HEADER_SIZE
|
||||||
|
);
|
||||||
//lock everything
|
//lock everything
|
||||||
for b in &blobs {
|
for b in &blobs {
|
||||||
locks.push(b.write().expect("'locks' arr in pb fn recover"));
|
locks.push(b.write().expect("'locks' arr in pb fn recover"));
|
||||||
|
@ -377,7 +407,12 @@ pub fn recover(
|
||||||
let data_size = locks[idx].get_data_size().unwrap() - BLOB_HEADER_SIZE as u64;
|
let data_size = locks[idx].get_data_size().unwrap() - BLOB_HEADER_SIZE as u64;
|
||||||
locks[idx].meta = meta.clone().unwrap();
|
locks[idx].meta = meta.clone().unwrap();
|
||||||
locks[idx].set_size(data_size as usize);
|
locks[idx].set_size(data_size as usize);
|
||||||
trace!("erasures[{}] size: {} data[0]: {}", *i, data_size, locks[idx].data()[0]);
|
trace!(
|
||||||
|
"erasures[{}] size: {} data[0]: {}",
|
||||||
|
*i,
|
||||||
|
data_size,
|
||||||
|
locks[idx].data()[0]
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -388,13 +423,13 @@ pub fn recover(
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
|
use crdt;
|
||||||
use erasure;
|
use erasure;
|
||||||
use logger;
|
use logger;
|
||||||
use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE};
|
use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE};
|
||||||
use crdt;
|
|
||||||
use std::sync::{Arc, RwLock};
|
|
||||||
use signature::KeyPair;
|
use signature::KeyPair;
|
||||||
use signature::KeyPairUtil;
|
use signature::KeyPairUtil;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
pub fn test_coding() {
|
pub fn test_coding() {
|
||||||
|
@ -452,7 +487,11 @@ mod test {
|
||||||
if w.is_some() {
|
if w.is_some() {
|
||||||
let window_l1 = w.clone().unwrap();
|
let window_l1 = w.clone().unwrap();
|
||||||
let window_l2 = window_l1.read().unwrap();
|
let window_l2 = window_l1.read().unwrap();
|
||||||
print!("index: {:?} meta.size: {} data: ", window_l2.get_index(), window_l2.meta.size);
|
print!(
|
||||||
|
"index: {:?} meta.size: {} data: ",
|
||||||
|
window_l2.get_index(),
|
||||||
|
window_l2.meta.size
|
||||||
|
);
|
||||||
for i in 0..8 {
|
for i in 0..8 {
|
||||||
print!("{} ", window_l2.data()[i]);
|
print!("{} ", window_l2.data()[i]);
|
||||||
}
|
}
|
||||||
|
|
|
@ -275,7 +275,9 @@ const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::<u32>();
|
||||||
const BLOB_SIZE_END: usize = BLOB_FLAGS_END + size_of::<u64>();
|
const BLOB_SIZE_END: usize = BLOB_FLAGS_END + size_of::<u64>();
|
||||||
|
|
||||||
macro_rules! align {
|
macro_rules! align {
|
||||||
($x:expr, $align: expr) => ($x + ($align - 1) & !($align - 1));
|
($x:expr, $align:expr) => {
|
||||||
|
$x + ($align - 1) & !($align - 1)
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
|
pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
|
||||||
|
|
|
@ -178,7 +178,13 @@ fn repair_window(
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
#[cfg(feature = "erasure")]
|
#[cfg(feature = "erasure")]
|
||||||
{
|
{
|
||||||
if erasure::recover(_recycler, &mut locked_window.write().unwrap(), *consumed, *received).is_err() {
|
if erasure::recover(
|
||||||
|
_recycler,
|
||||||
|
&mut locked_window.write().unwrap(),
|
||||||
|
*consumed,
|
||||||
|
*received,
|
||||||
|
).is_err()
|
||||||
|
{
|
||||||
trace!("erasure::recover failed");
|
trace!("erasure::recover failed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -271,7 +277,10 @@ fn recv_window(
|
||||||
// Got a blob which has already been consumed, skip it
|
// Got a blob which has already been consumed, skip it
|
||||||
// probably from a repair window request
|
// probably from a repair window request
|
||||||
if pix < *consumed {
|
if pix < *consumed {
|
||||||
info!("received: {} but older than consumed: {} skipping..", pix, *consumed);
|
debug!(
|
||||||
|
"received: {} but older than consumed: {} skipping..",
|
||||||
|
pix, *consumed
|
||||||
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let w = pix % WINDOW_SIZE;
|
let w = pix % WINDOW_SIZE;
|
||||||
|
@ -299,7 +308,11 @@ fn recv_window(
|
||||||
}
|
}
|
||||||
let mut is_coding = false;
|
let mut is_coding = false;
|
||||||
if let &Some(ref cblob) = &window[k] {
|
if let &Some(ref cblob) = &window[k] {
|
||||||
if cblob.read().expect("blob read lock for flags streamer::window").is_coding() {
|
if cblob
|
||||||
|
.read()
|
||||||
|
.expect("blob read lock for flags streamer::window")
|
||||||
|
.is_coding()
|
||||||
|
{
|
||||||
is_coding = true;
|
is_coding = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -315,7 +328,10 @@ fn recv_window(
|
||||||
}
|
}
|
||||||
|
|
||||||
*consumed += erasure::MAX_MISSING;
|
*consumed += erasure::MAX_MISSING;
|
||||||
info!("skipping processing coding blob k: {} consumed: {}", k, *consumed);
|
debug!(
|
||||||
|
"skipping processing coding blob k: {} consumed: {}",
|
||||||
|
k, *consumed
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -329,10 +345,7 @@ fn recv_window(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn print_window(
|
fn print_window(locked_window: &Arc<RwLock<Vec<Option<SharedBlob>>>>, consumed: usize) {
|
||||||
locked_window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
|
||||||
consumed: usize,
|
|
||||||
) {
|
|
||||||
{
|
{
|
||||||
let buf: Vec<_> = locked_window
|
let buf: Vec<_> = locked_window
|
||||||
.read()
|
.read()
|
||||||
|
@ -357,7 +370,7 @@ fn print_window(
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
info!("WINDOW ({}): {}", consumed, buf.join(""));
|
debug!("WINDOW ({}): {}", consumed, buf.join(""));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -466,7 +479,11 @@ fn broadcast(
|
||||||
// Fill in the coding blob data from the window data blobs
|
// Fill in the coding blob data from the window data blobs
|
||||||
#[cfg(feature = "erasure")]
|
#[cfg(feature = "erasure")]
|
||||||
{
|
{
|
||||||
if erasure::generate_coding(&mut window.write().unwrap(), *receive_index as usize, blobs_len).is_err()
|
if erasure::generate_coding(
|
||||||
|
&mut window.write().unwrap(),
|
||||||
|
*receive_index as usize,
|
||||||
|
blobs_len,
|
||||||
|
).is_err()
|
||||||
{
|
{
|
||||||
return Err(Error::GenericError);
|
return Err(Error::GenericError);
|
||||||
}
|
}
|
||||||
|
@ -505,7 +522,15 @@ pub fn broadcaster(
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index, &mut receive_index);
|
let _ = broadcast(
|
||||||
|
&crdt,
|
||||||
|
&window,
|
||||||
|
&recycler,
|
||||||
|
&r,
|
||||||
|
&sock,
|
||||||
|
&mut transmit_index,
|
||||||
|
&mut receive_index,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
|
Loading…
Reference in New Issue