From a06535d7edea0e0e2068d9bc53887e7cad99e473 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Tue, 5 Jun 2018 12:24:39 -0700 Subject: [PATCH] cargo fmt --- src/crdt.rs | 10 ++++--- src/erasure.rs | 69 ++++++++++++++++++++++++++++++++++++++----------- src/packet.rs | 4 ++- src/streamer.rs | 47 +++++++++++++++++++++++++-------- 4 files changed, 100 insertions(+), 30 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index f6cdf4b5ad..2cbb02a30e 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -312,9 +312,13 @@ impl Crdt { let bl = b.unwrap(); let blob = bl.read().expect("blob read lock in streamer::broadcast"); //TODO profile this, may need multiple sockets for par_iter - trace!("broadcast idx: {} sz: {} to {} coding: {}", - blob.get_index().unwrap(), blob.meta.size, - v.replicate_addr, blob.is_coding()); + trace!( + "broadcast idx: {} sz: {} to {} coding: {}", + blob.get_index().unwrap(), + blob.meta.size, + v.replicate_addr, + blob.is_coding() + ); assert!(blob.meta.size < BLOB_SIZE); let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr); trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr); diff --git a/src/erasure.rs b/src/erasure.rs index 88296ded18..604bb158fa 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -74,14 +74,22 @@ pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Resul let mut data_arg = Vec::new(); for block in data { if block_len != block.len() { - trace!("data block size incorrect {} expected {}", block.len(), block_len); + trace!( + "data block size incorrect {} expected {}", + block.len(), + block_len + ); return Err(ErasureError::InvalidBlockSize); } data_arg.push(block.as_ptr()); } for mut block in coding { if block_len != block.len() { - trace!("coding block size incorrect {} expected {}", block.len(), block_len); + trace!( + "coding block size incorrect {} expected {}", + block.len(), + block_len + ); return Err(ErasureError::InvalidBlockSize); } coding_arg.push(block.as_mut_ptr()); @@ -182,13 +190,15 @@ pub fn add_coding_blobs(recycler: &BlobRecycler, blobs: &mut Vec, co } // Generate coding blocks in window starting from consumed -pub fn generate_coding(window: &mut Vec>, consumed: usize, num_blobs: usize) -> Result<()> { - +pub fn generate_coding( + window: &mut Vec>, + consumed: usize, + num_blobs: usize, +) -> Result<()> { let mut block_start = consumed - (consumed % NUM_CODED); for i in consumed..consumed + num_blobs { if (i % NUM_CODED) == (NUM_CODED - 1) { - let mut data_blobs = Vec::new(); let mut coding_blobs = Vec::new(); let mut data_locks = Vec::new(); @@ -262,8 +272,14 @@ pub fn generate_coding(window: &mut Vec>, consumed: usize, nu } generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; - debug!("consumed: {} data: {}:{} coding: {}:{}", consumed, - block_start, block_start + NUM_DATA, coding_start, coding_end); + debug!( + "consumed: {} data: {}:{} coding: {}:{}", + consumed, + block_start, + block_start + NUM_DATA, + coding_start, + coding_end + ); block_start += NUM_CODED; } } @@ -288,7 +304,10 @@ pub fn recover( let mut block_start = consumed - (consumed % NUM_CODED); if num_blocks > 0 { - debug!("num_blocks: {} received: {} consumed: {}", num_blocks, received, consumed); + debug!( + "num_blocks: {} received: {} consumed: {}", + num_blocks, received, consumed + ); } for i in 0..num_blocks { @@ -316,11 +335,17 @@ pub fn recover( } } if (data_missing + coded_missing) != NUM_CODED && (data_missing + coded_missing) != 0 { - debug!("1: start: {} recovering: data: {} coding: {}", block_start, data_missing, coded_missing); + debug!( + "1: start: {} recovering: data: {} coding: {}", + block_start, data_missing, coded_missing + ); } if data_missing > 0 { if (data_missing + coded_missing) <= MAX_MISSING { - debug!("2: recovering: data: {} coding: {}", data_missing, coded_missing); + debug!( + "2: recovering: data: {} coding: {}", + data_missing, coded_missing + ); let mut blobs: Vec = Vec::new(); let mut locks = Vec::new(); let mut erasures: Vec = Vec::new(); @@ -348,7 +373,12 @@ pub fn recover( erasures.push((i - block_start) as i32); } erasures.push(-1); - trace!("erasures: {:?} data_size: {} header_size: {}", erasures, size.unwrap(), BLOB_HEADER_SIZE); + trace!( + "erasures: {:?} data_size: {} header_size: {}", + erasures, + size.unwrap(), + BLOB_HEADER_SIZE + ); //lock everything for b in &blobs { locks.push(b.write().expect("'locks' arr in pb fn recover")); @@ -377,7 +407,12 @@ pub fn recover( let data_size = locks[idx].get_data_size().unwrap() - BLOB_HEADER_SIZE as u64; locks[idx].meta = meta.clone().unwrap(); locks[idx].set_size(data_size as usize); - trace!("erasures[{}] size: {} data[0]: {}", *i, data_size, locks[idx].data()[0]); + trace!( + "erasures[{}] size: {} data[0]: {}", + *i, + data_size, + locks[idx].data()[0] + ); } } } @@ -388,13 +423,13 @@ pub fn recover( #[cfg(test)] mod test { + use crdt; use erasure; use logger; use packet::{BlobRecycler, SharedBlob, BLOB_HEADER_SIZE}; - use crdt; - use std::sync::{Arc, RwLock}; use signature::KeyPair; use signature::KeyPairUtil; + use std::sync::{Arc, RwLock}; #[test] pub fn test_coding() { @@ -452,7 +487,11 @@ mod test { if w.is_some() { let window_l1 = w.clone().unwrap(); let window_l2 = window_l1.read().unwrap(); - print!("index: {:?} meta.size: {} data: ", window_l2.get_index(), window_l2.meta.size); + print!( + "index: {:?} meta.size: {} data: ", + window_l2.get_index(), + window_l2.meta.size + ); for i in 0..8 { print!("{} ", window_l2.data()[i]); } diff --git a/src/packet.rs b/src/packet.rs index 2241305c86..c73ea1afb3 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -275,7 +275,9 @@ const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::(); const BLOB_SIZE_END: usize = BLOB_FLAGS_END + size_of::(); macro_rules! align { - ($x:expr, $align: expr) => ($x + ($align - 1) & !($align - 1)); + ($x:expr, $align:expr) => { + $x + ($align - 1) & !($align - 1) + }; } pub const BLOB_FLAG_IS_CODING: u32 = 0x1; diff --git a/src/streamer.rs b/src/streamer.rs index e96a29cafe..d0a931d422 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -178,7 +178,13 @@ fn repair_window( ) -> Result<()> { #[cfg(feature = "erasure")] { - if erasure::recover(_recycler, &mut locked_window.write().unwrap(), *consumed, *received).is_err() { + if erasure::recover( + _recycler, + &mut locked_window.write().unwrap(), + *consumed, + *received, + ).is_err() + { trace!("erasure::recover failed"); } } @@ -271,7 +277,10 @@ fn recv_window( // Got a blob which has already been consumed, skip it // probably from a repair window request if pix < *consumed { - info!("received: {} but older than consumed: {} skipping..", pix, *consumed); + debug!( + "received: {} but older than consumed: {} skipping..", + pix, *consumed + ); continue; } let w = pix % WINDOW_SIZE; @@ -299,7 +308,11 @@ fn recv_window( } let mut is_coding = false; if let &Some(ref cblob) = &window[k] { - if cblob.read().expect("blob read lock for flags streamer::window").is_coding() { + if cblob + .read() + .expect("blob read lock for flags streamer::window") + .is_coding() + { is_coding = true; } } @@ -315,7 +328,10 @@ fn recv_window( } *consumed += erasure::MAX_MISSING; - info!("skipping processing coding blob k: {} consumed: {}", k, *consumed); + debug!( + "skipping processing coding blob k: {} consumed: {}", + k, *consumed + ); } } } @@ -329,10 +345,7 @@ fn recv_window( Ok(()) } -fn print_window( - locked_window: &Arc>>>, - consumed: usize, - ) { +fn print_window(locked_window: &Arc>>>, consumed: usize) { { let buf: Vec<_> = locked_window .read() @@ -357,7 +370,7 @@ fn print_window( } }) .collect(); - info!("WINDOW ({}): {}", consumed, buf.join("")); + debug!("WINDOW ({}): {}", consumed, buf.join("")); } } @@ -466,7 +479,11 @@ fn broadcast( // Fill in the coding blob data from the window data blobs #[cfg(feature = "erasure")] { - if erasure::generate_coding(&mut window.write().unwrap(), *receive_index as usize, blobs_len).is_err() + if erasure::generate_coding( + &mut window.write().unwrap(), + *receive_index as usize, + blobs_len, + ).is_err() { return Err(Error::GenericError); } @@ -505,7 +522,15 @@ pub fn broadcaster( if exit.load(Ordering::Relaxed) { break; } - let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index, &mut receive_index); + let _ = broadcast( + &crdt, + &window, + &recycler, + &r, + &sock, + &mut transmit_index, + &mut receive_index, + ); } }) .unwrap()