From 078a71ba1505a69cadbade7cff19b153cb4a809d Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 9 Nov 2016 21:42:17 +0100 Subject: [PATCH] simplified threshold and configurable synchronizers --- p2p/src/util/synchronizer.rs | 78 ++++++++++++++---------------------- 1 file changed, 29 insertions(+), 49 deletions(-) diff --git a/p2p/src/util/synchronizer.rs b/p2p/src/util/synchronizer.rs index df9b85f5..f4e78671 100644 --- a/p2p/src/util/synchronizer.rs +++ b/p2p/src/util/synchronizer.rs @@ -66,23 +66,19 @@ impl Synchronizer for NoopSynchronizer { #[derive(Debug)] struct ThresholdSynchronizer { inner: FifoSynchronizer, - start_id: u32, - threshold: u32, + to_grant_min: u32, + to_grant_max: u32, } impl ThresholdSynchronizer { - fn new(next_to_grant: u32, threshold: u32) -> Self { - // let's mark all ids in threshold as declared - // this may cause some ids, to be skipped, but we don't care - // it won't affect correct execution of the program - let declared = next_to_grant + threshold; + fn new(declared: u32, threshold: u32) -> Self { ThresholdSynchronizer { inner: FifoSynchronizer { declared_responses: declared, next_to_grant: declared, }, - start_id: next_to_grant, - threshold: threshold, + to_grant_min: declared.overflowing_sub(threshold).0, + to_grant_max: declared, } } } @@ -97,14 +93,19 @@ impl Synchronizer for ThresholdSynchronizer { return true; } - id.overflowing_sub(self.start_id).0 < self.threshold || - self.start_id.overflowing_sub(id).0 < self.threshold + if self.to_grant_min <= self.to_grant_max { + // if max is bigger then min, id must be in range [min, max) + self.to_grant_min <= id && id < self.to_grant_max + } else { + // otherwise if is in range [min, u32::max_value()] || [0, max) + (self.to_grant_min <= id && id <= u32::max_value()) || + id < self.to_grant_max + } } } #[derive(Debug)] enum InnerSynchronizer { - Fifo(FifoSynchronizer), Noop(NoopSynchronizer), Threshold(ThresholdSynchronizer), } @@ -112,7 +113,7 @@ enum InnerSynchronizer { impl InnerSynchronizer { pub fn new(sync: bool) -> Self { if sync { - InnerSynchronizer::Fifo(FifoSynchronizer::default()) + InnerSynchronizer::Threshold(ThresholdSynchronizer::new(0, 0)) } else { InnerSynchronizer::Noop(NoopSynchronizer::default()) } @@ -123,8 +124,6 @@ impl InnerSynchronizer { pub struct ConfigurableSynchronizer { /// Inner synchronizer which is currently used inner: InnerSynchronizer, - /// Id of next response which is likely to be granted permission. - probably_next_to_grant: u32, } impl Default for ConfigurableSynchronizer { @@ -137,7 +136,6 @@ impl ConfigurableSynchronizer { pub fn new(sync: bool) -> Self { ConfigurableSynchronizer { inner: InnerSynchronizer::new(sync), - probably_next_to_grant: 0, } } @@ -145,22 +143,15 @@ impl ConfigurableSynchronizer { /// from last_processed response will still be granted permissions. pub fn change_sync_policy(&mut self, sync: bool) { let new_inner = match self.inner { - InnerSynchronizer::Fifo(ref s) if sync == false => { - self.probably_next_to_grant = s.next_to_grant; - InnerSynchronizer::Noop(NoopSynchronizer { - declared_responses: s.declared_responses, - }) - }, InnerSynchronizer::Threshold(ref s) if sync == false => { - self.probably_next_to_grant = s.inner.next_to_grant; InnerSynchronizer::Noop(NoopSynchronizer { declared_responses: s.inner.declared_responses, }) }, - InnerSynchronizer::Noop(_) if sync == true => { + InnerSynchronizer::Noop(ref s) if sync == true => { let threshold = ThresholdSynchronizer::new( - self.probably_next_to_grant, - CONFIGURABLE_SYNCHRONIZER_THRESHOLD + s.declared_responses, + CONFIGURABLE_SYNCHRONIZER_THRESHOLD, ); InnerSynchronizer::Threshold(threshold) }, @@ -174,7 +165,6 @@ impl ConfigurableSynchronizer { impl Synchronizer for ConfigurableSynchronizer { fn declare_response(&mut self) -> u32 { match self.inner { - InnerSynchronizer::Fifo(ref mut s) => s.declare_response(), InnerSynchronizer::Noop(ref mut s) => s.declare_response(), InnerSynchronizer::Threshold(ref mut s) => s.declare_response(), } @@ -182,12 +172,8 @@ impl Synchronizer for ConfigurableSynchronizer { fn permission_for_response(&mut self, id: u32) -> bool { match self.inner { - InnerSynchronizer::Fifo(ref mut s) => s.permission_for_response(id), InnerSynchronizer::Threshold(ref mut s) => s.permission_for_response(id), - InnerSynchronizer::Noop(ref mut s) => { - self.probably_next_to_grant = id.overflowing_add(1).0; - s.permission_for_response(id) - }, + InnerSynchronizer::Noop(ref mut s) => s.permission_for_response(id), } } } @@ -195,7 +181,7 @@ impl Synchronizer for ConfigurableSynchronizer { #[cfg(test)] mod tests { use super::{ - Synchronizer, FifoSynchronizer, NoopSynchronizer, ConfigurableSynchronizer, ThresholdSynchronizer, CONFIGURABLE_SYNCHRONIZER_THRESHOLD + Synchronizer, FifoSynchronizer, NoopSynchronizer, ConfigurableSynchronizer, ThresholdSynchronizer }; #[test] @@ -235,12 +221,10 @@ mod tests { assert!(!s.permission_for_response(id2)); assert!(s.permission_for_response(id1)); assert!(s.permission_for_response(id2)); - // historic permissions - assert!(!s.permission_for_response(0)); + // historic permissions, order does not matter assert!(s.permission_for_response(1)); - assert!(s.permission_for_response(2)); - assert!(s.permission_for_response(3)); - assert!(!s.permission_for_response(4)); + assert!(s.permission_for_response(0)); + assert!(!s.permission_for_response(2)); } #[test] @@ -268,11 +252,13 @@ mod tests { assert!(s.permission_for_response(id2)); assert!(s.permission_for_response(id0)); + + let d0 = s.declare_response(); + let d1 = s.declare_response(); + // process messages synchronously again s.change_sync_policy(true); - let last_async = id2; - // let's check again if we can process them only synchronously let id0 = s.declare_response(); let id1 = s.declare_response(); @@ -284,14 +270,8 @@ mod tests { assert!(s.permission_for_response(id1)); assert!(s.permission_for_response(id2)); - - // there might be ~10 unhandled messages, - // let's check if we can process them out of order (eg. in reverse) - for i in (0..CONFIGURABLE_SYNCHRONIZER_THRESHOLD - 1).into_iter().rev() { - assert!(s.permission_for_response(last_async + i)); - } - - // the next one should fail - assert!(!s.permission_for_response(last_async + CONFIGURABLE_SYNCHRONIZER_THRESHOLD)); + // order of requests before changing to policy to sync should not matter + assert!(s.permission_for_response(d1)); + assert!(s.permission_for_response(d0)); } }