generate coding after indexing
This commit is contained in:
parent
ac7860c35d
commit
93b6fceb2f
|
@ -299,10 +299,10 @@ impl Crdt {
|
||||||
return Err(Error::CrdtTooSmall);
|
return Err(Error::CrdtTooSmall);
|
||||||
}
|
}
|
||||||
trace!("nodes table {}", nodes.len());
|
trace!("nodes table {}", nodes.len());
|
||||||
trace!("blobs table {}", blobs.len());
|
|
||||||
// enumerate all the blobs, those are the indices
|
// enumerate all the blobs, those are the indices
|
||||||
// transmit them to nodes, starting from a different node
|
// transmit them to nodes, starting from a different node
|
||||||
let orders: Vec<_> = blobs
|
let window_l = window.write().unwrap();
|
||||||
|
let orders: Vec<_> = window_l[(*transmit_index as usize)..]
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.zip(
|
.zip(
|
||||||
|
@ -315,10 +315,11 @@ impl Crdt {
|
||||||
trace!("orders table {}", orders.len());
|
trace!("orders table {}", orders.len());
|
||||||
let errs: Vec<_> = orders
|
let errs: Vec<_> = orders
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|((i, b), v)| {
|
.map(|((_i, b), v)| {
|
||||||
// only leader should be broadcasting
|
// only leader should be broadcasting
|
||||||
assert!(me.current_leader_id != v.id);
|
assert!(me.current_leader_id != v.id);
|
||||||
let mut blob = b.write().expect("'b' write lock in pub fn broadcast");
|
let bl = b.clone().unwrap();
|
||||||
|
let blob = bl.read().expect("blob read lock in streamer::broadcast");
|
||||||
//TODO profile this, may need multiple sockets for par_iter
|
//TODO profile this, may need multiple sockets for par_iter
|
||||||
trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
||||||
assert!(blob.meta.size < BLOB_SIZE);
|
assert!(blob.meta.size < BLOB_SIZE);
|
||||||
|
|
|
@ -378,15 +378,6 @@ fn broadcast(
|
||||||
}
|
}
|
||||||
let mut blobs = dq.into_iter().collect();
|
let mut blobs = dq.into_iter().collect();
|
||||||
|
|
||||||
// appends codes to the list of blobs allowing us to reconstruct the stream
|
|
||||||
#[cfg(feature = "erasure")]
|
|
||||||
{
|
|
||||||
match erasure::generate_coding(recycler, &mut window.write().unwrap(), *transmit_index as usize) {
|
|
||||||
Err(_e) => { return Err(Error::GenericError) }
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Crdt::index_blobs(crdt, &blobs, transmit_index)?;
|
Crdt::index_blobs(crdt, &blobs, transmit_index)?;
|
||||||
// keep the cache of blobs that are broadcast
|
// keep the cache of blobs that are broadcast
|
||||||
{
|
{
|
||||||
|
@ -415,6 +406,15 @@ fn broadcast(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// appends codes to the list of blobs allowing us to reconstruct the stream
|
||||||
|
#[cfg(feature = "erasure")]
|
||||||
|
{
|
||||||
|
match erasure::generate_coding(recycler, &mut window.write().unwrap(), *transmit_index as usize) {
|
||||||
|
Err(_e) => { return Err(Error::GenericError) }
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Crdt::broadcast(crdt, &window, &sock, transmit_index)?;
|
Crdt::broadcast(crdt, &window, &sock, transmit_index)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue