diff --git a/src/crdt.rs b/src/crdt.rs index 9015fc85b..98180847a 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -590,33 +590,45 @@ impl Crdt { } fn run_window_request( window: &Window, + me: &ReplicatedData, from: &ReplicatedData, ix: u64, blob_recycler: &BlobRecycler, ) -> Option { let pos = (ix as usize) % window.read().unwrap().len(); if let Some(blob) = &window.read().unwrap()[pos] { - let rblob = blob.read().unwrap(); - let blob_ix = rblob.get_index().expect("run_window_request get_index"); + let mut wblob = blob.write().unwrap(); + let blob_ix = wblob.get_index().expect("run_window_request get_index"); if blob_ix == ix { + let num_retransmits = wblob.meta.num_retransmits; + wblob.meta.num_retransmits += 1; + + if me.current_leader_id == me.id && + num_retransmits != 0 && + !num_retransmits.is_power_of_two() + { + return None; + } + let out = blob_recycler.allocate(); + // copy to avoid doing IO inside the lock { let mut outblob = out.write().unwrap(); - let sz = rblob.meta.size; + let sz = wblob.meta.size; outblob.meta.size = sz; - outblob.data[..sz].copy_from_slice(&rblob.data[..sz]); + outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); outblob.meta.set_addr(&from.repair_addr); - //TODO, set the sender id to the requester so we dont retransmit - //come up with a cleaner solution for this when sender signatures are checked - outblob.set_id(from.id).expect("blob set_id"); + outblob.set_id(me.id).expect("blob set_id"); } + return Some(out); } } else { assert!(window.read().unwrap()[pos].is_none()); info!("failed RequestWindowIndex {} {}", ix, from.repair_addr); } + None } @@ -683,7 +695,7 @@ impl Crdt { me.repair_addr ); assert_ne!(from.repair_addr, me.repair_addr); - Self::run_window_request(&window, &from, ix, blob_recycler) + Self::run_window_request(&window, &me, &from, ix, blob_recycler) } Err(_) => { warn!("deserialize crdt packet failed"); @@ -807,6 +819,7 @@ mod tests { use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil}; + use std; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -1092,18 +1105,57 @@ mod tests { "127.0.0.1:1238".parse().unwrap(), ); let recycler = BlobRecycler::default(); - let rv = Crdt::run_window_request(&window, &me, 0, &recycler); + let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); assert!(rv.is_none()); let out = recycler.allocate(); out.write().unwrap().meta.size = 200; window.write().unwrap()[0] = Some(out); - let rv = Crdt::run_window_request(&window, &me, 0, &recycler); + let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); assert!(rv.is_some()); let v = rv.unwrap(); //test we copied the blob assert_eq!(v.read().unwrap().meta.size, 200); let len = window.read().unwrap().len() as u64; - let rv = Crdt::run_window_request(&window, &me, len, &recycler); + let rv = Crdt::run_window_request(&window, &me, &me, len, &recycler); assert!(rv.is_none()); } + + /// test window requests respond with the right blob, and do not overrun + #[test] + fn run_window_request_with_backoff() { + let window = default_window(); + let mut me = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.1:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), + ); + + me.current_leader_id = me.id; + let recycler = BlobRecycler::default(); + let num_requests: u32 = 64; + + let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); + assert!(rv.is_none()); + let out = recycler.allocate(); + out.write().unwrap().meta.size = 200; + window.write().unwrap()[0] = Some(out); + let range: std::ops::Range = 0..num_requests; + + for i in range { + let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); + + if i != 0 && !(i.is_power_of_two()) { + assert!(rv.is_none()); + continue; + } + + assert!(rv.is_some()); + let v = rv.unwrap(); + //test we copied the blob + assert_eq!(v.read().unwrap().meta.size, 200); + } + } } diff --git a/src/packet.rs b/src/packet.rs index b98d6e0da..5dfde8c80 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -29,6 +29,7 @@ pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; #[repr(C)] pub struct Meta { pub size: usize, + pub num_retransmits: u64, pub addr: [u16; 8], pub port: u16, pub v6: bool,