From 93b6fceb2f5fc8a6232d490534ec2d8584485526 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Fri, 25 May 2018 18:21:18 -0700 Subject: [PATCH] generate coding after indexing --- src/crdt.rs | 9 +++++---- src/streamer.rs | 18 +++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index a00a840995..e06f8cf2ec 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -299,10 +299,10 @@ impl Crdt { return Err(Error::CrdtTooSmall); } trace!("nodes table {}", nodes.len()); - trace!("blobs table {}", blobs.len()); // enumerate all the blobs, those are the indices // transmit them to nodes, starting from a different node - let orders: Vec<_> = blobs + let window_l = window.write().unwrap(); + let orders: Vec<_> = window_l[(*transmit_index as usize)..] .iter() .enumerate() .zip( @@ -315,10 +315,11 @@ impl Crdt { trace!("orders table {}", orders.len()); let errs: Vec<_> = orders .into_iter() - .map(|((i, b), v)| { + .map(|((_i, b), v)| { // only leader should be broadcasting assert!(me.current_leader_id != v.id); - let mut blob = b.write().expect("'b' write lock in pub fn broadcast"); + let bl = b.clone().unwrap(); + let blob = bl.read().expect("blob read lock in streamer::broadcast"); //TODO profile this, may need multiple sockets for par_iter trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr); assert!(blob.meta.size < BLOB_SIZE); diff --git a/src/streamer.rs b/src/streamer.rs index aa8f3e205c..0d18f936c7 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -378,15 +378,6 @@ fn broadcast( } let mut blobs = dq.into_iter().collect(); - // appends codes to the list of blobs allowing us to reconstruct the stream - #[cfg(feature = "erasure")] - { - match erasure::generate_coding(recycler, &mut window.write().unwrap(), *transmit_index as usize) { - Err(_e) => { return Err(Error::GenericError) } - _ => {} - } - } - Crdt::index_blobs(crdt, &blobs, transmit_index)?; // keep the cache of blobs that are broadcast { @@ -415,6 +406,15 @@ fn broadcast( } } + // appends codes to the list of blobs allowing us to reconstruct the stream + #[cfg(feature = "erasure")] + { + match erasure::generate_coding(recycler, &mut window.write().unwrap(), *transmit_index as usize) { + Err(_e) => { return Err(Error::GenericError) } + _ => {} + } + } + Crdt::broadcast(crdt, &window, &sock, transmit_index)?; Ok(()) }