crdt.rs - panic cleanup

This commit is contained in:
Jackson Sandland 2018-05-09 18:10:48 -07:00
parent 02c573986b
commit 1e91d09be7
1 changed files with 17 additions and 17 deletions

View File

@ -168,7 +168,7 @@ impl Crdt {
) -> Result<()> { ) -> Result<()> {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = { let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
// copy to avoid locking durring IO // copy to avoid locking durring IO
let robj = obj.read().unwrap(); let robj = obj.read().expect("'obj' read lock in pub fn broadcast");
let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect(); let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect();
(robj.table[&robj.me].clone(), cloned_table) (robj.table[&robj.me].clone(), cloned_table)
}; };
@ -194,10 +194,10 @@ impl Crdt {
.map(|((i, v), b)| { .map(|((i, v), b)| {
// only leader should be broadcasting // only leader should be broadcasting
assert!(me.current_leader_id != v.id); assert!(me.current_leader_id != v.id);
let mut blob = b.write().unwrap(); let mut blob = b.write().expect("'b' write lock in pub fn broadcast");
blob.set_id(me.id).expect("set_id"); blob.set_id(me.id).expect("set_id in pub fn broadcast");
blob.set_index(*transmit_index + i as u64) blob.set_index(*transmit_index + i as u64)
.expect("set_index"); .expect("set_index in pub fn broadcast");
//TODO profile this, may need multiple sockets for par_iter //TODO profile this, may need multiple sockets for par_iter
s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr) s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr)
}) })
@ -219,10 +219,10 @@ impl Crdt {
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = { let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
// copy to avoid locking durring IO // copy to avoid locking durring IO
let s = obj.read().unwrap(); let s = obj.read().expect("'obj' read lock in pub fn retransmit");
(s.table[&s.me].clone(), s.table.values().cloned().collect()) (s.table[&s.me].clone(), s.table.values().cloned().collect())
}; };
let rblob = blob.read().unwrap(); let rblob = blob.read().expect("'blob' read lock in pub fn retransmit");
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
let orders: Vec<_> = table let orders: Vec<_> = table
.iter() .iter()
@ -261,9 +261,9 @@ impl Crdt {
fn random() -> u64 { fn random() -> u64 {
let rnd = SystemRandom::new(); let rnd = SystemRandom::new();
let mut buf = [0u8; 8]; let mut buf = [0u8; 8];
rnd.fill(&mut buf).unwrap(); rnd.fill(&mut buf).expect("rnd.fill in pub fn random");
let mut rdr = Cursor::new(&buf); let mut rdr = Cursor::new(&buf);
rdr.read_u64::<LittleEndian>().unwrap() rdr.read_u64::<LittleEndian>().expect("rdr.read_u64 in fn random")
} }
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) { fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
//trace!("get updates since {}", v); //trace!("get updates since {}", v);
@ -287,10 +287,10 @@ impl Crdt {
return Err(Error::GeneralError); return Err(Error::GeneralError);
} }
let mut n = (Self::random() as usize) % self.table.len(); let mut n = (Self::random() as usize) % self.table.len();
while self.table.values().nth(n).unwrap().id == self.me { while self.table.values().nth(n).expect("'values().nth(n)' while loop in fn gossip_request").id == self.me {
n = (Self::random() as usize) % self.table.len(); n = (Self::random() as usize) % self.table.len();
} }
let v = self.table.values().nth(n).unwrap().clone(); let v = self.table.values().nth(n).expect("'values().nth(n)' in fn gossip_request").clone();
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
Ok((v.gossip_addr, req)) Ok((v.gossip_addr, req))
@ -303,7 +303,7 @@ impl Crdt {
// Lock the object only to do this operation and not for any longer // Lock the object only to do this operation and not for any longer
// especially not when doing the `sock.send_to` // especially not when doing the `sock.send_to`
let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request()?; let (remote_gossip_addr, req) = obj.read().expect("'obj' read lock in fn run_gossip").gossip_request()?;
let sock = UdpSocket::bind("0.0.0.0:0")?; let sock = UdpSocket::bind("0.0.0.0:0")?;
// TODO this will get chatty, so we need to first ask for number of updates since // TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have // then only ask for specific data that we dont have
@ -335,7 +335,7 @@ impl Crdt {
return; return;
} }
//TODO this should be a tuned parameter //TODO this should be a tuned parameter
sleep(obj.read().unwrap().timeout); sleep(obj.read().expect("'obj' read lock in pub fn gossip").timeout);
}) })
} }
@ -353,18 +353,18 @@ impl Crdt {
trace!("RequestUpdates {}", v); trace!("RequestUpdates {}", v);
let addr = reqdata.gossip_addr; let addr = reqdata.gossip_addr;
// only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from` // only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = obj.read().unwrap().get_updates_since(v); let (from, ups, data) = obj.read().expect("'obj' read lock in RequestUpdates").get_updates_since(v);
trace!("get updates since response {} {}", v, data.len()); trace!("get updates since response {} {}", v, data.len());
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?; let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
trace!("send_to {}", addr); trace!("send_to {}", addr);
//TODO verify reqdata belongs to sender //TODO verify reqdata belongs to sender
obj.write().unwrap().insert(reqdata); obj.write().expect("'obj' write lock in RequestUpdates").insert(reqdata);
sock.send_to(&rsp, addr).unwrap(); sock.send_to(&rsp, addr).expect("'sock.send_to' in RequestUpdates");
trace!("send_to done!"); trace!("send_to done!");
} }
Protocol::ReceiveUpdates(from, ups, data) => { Protocol::ReceiveUpdates(from, ups, data) => {
trace!("ReceivedUpdates"); trace!("ReceivedUpdates");
obj.write().unwrap().apply_updates(from, ups, &data); obj.write().expect("'obj' write lock in ReceiveUpdates").apply_updates(from, ups, &data);
} }
} }
Ok(()) Ok(())
@ -374,7 +374,7 @@ impl Crdt {
sock: UdpSocket, sock: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
sock.set_read_timeout(Some(Duration::new(2, 0))).unwrap(); sock.set_read_timeout(Some(Duration::new(2, 0))).expect("'sock.set_read_timeout' in crdt.rs");
spawn(move || loop { spawn(move || loop {
let _ = Self::run_listen(&obj, &sock); let _ = Self::run_listen(&obj, &sock);
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {