streamer.rs - panic cleanup

This commit is contained in:
Jackson Sandland 2018-05-10 17:38:00 -07:00
parent 73c7fb87e8
commit 4eb2e84c9f
1 changed files with 9 additions and 9 deletions

View File

@ -27,7 +27,7 @@ fn recv_loop(
let msgs = re.allocate(); let msgs = re.allocate();
let msgs_ = msgs.clone(); let msgs_ = msgs.clone();
loop { loop {
match msgs.write().unwrap().recv_from(sock) { match msgs.write().expect("write lock in fn recv_loop").recv_from(sock) {
Ok(()) => { Ok(()) => {
channel.send(msgs_)?; channel.send(msgs_)?;
break; break;
@ -117,7 +117,7 @@ fn recv_window(
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?; let mut dq = r.recv_timeout(timer)?;
let leader_id = crdt.read().unwrap().leader_data().id; let leader_id = crdt.read().expect("'crdt' read lock in fn recv_window").leader_data().id;
while let Ok(mut nq) = r.try_recv() { while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq) dq.append(&mut nq)
} }
@ -125,17 +125,17 @@ fn recv_window(
//retransmit all leader blocks //retransmit all leader blocks
let mut retransmitq = VecDeque::new(); let mut retransmitq = VecDeque::new();
for b in &dq { for b in &dq {
let p = b.read().unwrap(); let p = b.read().expect("'b' read lock in fn recv_window");
//TODO this check isn't safe against adverserial packets //TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window //we need to maintain a sequence window
trace!( trace!(
"idx: {} addr: {:?} id: {:?} leader: {:?}", "idx: {} addr: {:?} id: {:?} leader: {:?}",
p.get_index().unwrap(), p.get_index().expect("get_index in fn recv_window"),
p.get_id().unwrap(), p.get_id().expect("get_id in trace! fn recv_window"),
p.meta.addr(), p.meta.addr(),
leader_id leader_id
); );
if p.get_id().unwrap() == leader_id { if p.get_id().expect("get_id in fn recv_window") == leader_id {
//TODO //TODO
//need to copy the retransmited blob //need to copy the retransmited blob
//otherwise we get into races with which thread //otherwise we get into races with which thread
@ -145,7 +145,7 @@ fn recv_window(
//is dropped via a weakref to the recycler //is dropped via a weakref to the recycler
let nv = recycler.allocate(); let nv = recycler.allocate();
{ {
let mut mnv = nv.write().unwrap(); let mut mnv = nv.write().expect("recycler write lock in fn recv_window");
let sz = p.meta.size; let sz = p.meta.size;
mnv.meta.size = sz; mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]); mnv.data[..sz].copy_from_slice(&p.data[..sz]);
@ -161,7 +161,7 @@ fn recv_window(
let mut contq = VecDeque::new(); let mut contq = VecDeque::new();
while let Some(b) = dq.pop_front() { while let Some(b) = dq.pop_front() {
let b_ = b.clone(); let b_ = b.clone();
let p = b.write().unwrap(); let p = b.write().expect("'b' write lock in fn recv_window");
let pix = p.get_index()? as usize; let pix = p.get_index()? as usize;
let w = pix % NUM_BLOBS; let w = pix % NUM_BLOBS;
//TODO, after the block are authenticated //TODO, after the block are authenticated
@ -180,7 +180,7 @@ fn recv_window(
if window[k].is_none() { if window[k].is_none() {
break; break;
} }
contq.push_back(window[k].clone().unwrap()); contq.push_back(window[k].clone().expect("clone in fn recv_window"));
window[k] = None; window[k] = None;
*consumed += 1; *consumed += 1;
} }