indexing blobs then coding

This commit is contained in:
Stephen Akridge 2018-05-25 11:36:31 -07:00 committed by Greg Fitzgerald
parent b0eab8729f
commit ac7860c35d
2 changed files with 39 additions and 5 deletions

View File

@ -229,12 +229,45 @@ impl Crdt {
}
}
pub fn index_blobs(
obj: &Arc<RwLock<Self>>,
blobs: &Vec<SharedBlob>,
transmit_index: &mut u64,
) -> Result<()> {
let me: ReplicatedData = {
// copy to avoid locking during IO
let robj = obj.read().expect("'obj' read lock in pub fn broadcast");
info!("broadcast table {}", robj.table.len());
robj.table[&robj.me].clone()
};
// enumerate all the blobs, those are the indices
let orders: Vec<_> = blobs
.iter()
.enumerate()
.collect();
info!("orders table {}", orders.len());
let _ : Vec<_> = orders
.into_iter()
.map(|(i, b)| {
// only leader should be broadcasting
let mut blob = b.write().expect("'b' write lock in pub fn broadcast");
blob.set_id(me.id).expect("set_id in pub fn broadcast");
blob.set_index(*transmit_index + i as u64)
.expect("set_index in pub fn broadcast");
//TODO profile this, may need multiple sockets for par_iter
})
.collect();
Ok(())
}
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn broadcast(
obj: &Arc<RwLock<Self>>,
blobs: &Vec<SharedBlob>,
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
s: &UdpSocket,
transmit_index: &mut u64,
) -> Result<()> {
@ -286,9 +319,6 @@ impl Crdt {
// only leader should be broadcasting
assert!(me.current_leader_id != v.id);
let mut blob = b.write().expect("'b' write lock in pub fn broadcast");
blob.set_id(me.id).expect("set_id in pub fn broadcast");
blob.set_index(*transmit_index + i as u64)
.expect("set_index in pub fn broadcast");
//TODO profile this, may need multiple sockets for par_iter
trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr);
assert!(blob.meta.size < BLOB_SIZE);

View File

@ -377,6 +377,7 @@ fn broadcast(
dq.append(&mut nq);
}
let mut blobs = dq.into_iter().collect();
// appends codes to the list of blobs allowing us to reconstruct the stream
#[cfg(feature = "erasure")]
{
@ -385,7 +386,8 @@ fn broadcast(
_ => {}
}
}
Crdt::broadcast(crdt, &blobs, &sock, transmit_index)?;
Crdt::index_blobs(crdt, &blobs, transmit_index)?;
// keep the cache of blobs that are broadcast
{
let mut win = window.write().unwrap();
@ -412,6 +414,8 @@ fn broadcast(
win[pos] = Some(b);
}
}
Crdt::broadcast(crdt, &window, &sock, transmit_index)?;
Ok(())
}