Replace usage of atomics with `tokio::sync::watch` (#2272)

Rust atomics have an API that's very easy to use incorrectly, leading to
hard to find bugs. For that reason, it's best to avoid it unless there's
a good reason not to.
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2021-06-10 23:25:06 -03:00 committed by GitHub
parent 71c10af7d9
commit a2d3078fcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 13 additions and 14 deletions

View File

@ -3,10 +3,7 @@ use std::{
convert::TryInto, convert::TryInto,
iter, iter,
net::{IpAddr, SocketAddr}, net::{IpAddr, SocketAddr},
sync::{ sync::{Arc, Mutex},
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::Duration as StdDuration, time::Duration as StdDuration,
}; };
@ -14,6 +11,7 @@ use chrono::{DateTime, Duration, Utc};
use futures::future; use futures::future;
use tokio::{ use tokio::{
runtime::Runtime, runtime::Runtime,
sync::watch,
time::{self, Instant}, time::{self, Instant},
}; };
use tower::Service; use tower::Service;
@ -165,7 +163,7 @@ fn candidate_set_updates_are_rate_limited() {
} }
assert_eq!( assert_eq!(
call_count.load(Ordering::SeqCst), *call_count.borrow(),
INTERVALS_TO_RUN as usize * GET_ADDR_FANOUT INTERVALS_TO_RUN as usize * GET_ADDR_FANOUT
); );
}); });
@ -191,7 +189,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
.await .await
.expect("Call to CandidateSet::update should not fail"); .expect("Call to CandidateSet::update should not fail");
assert_eq!(call_count.load(Ordering::SeqCst), GET_ADDR_FANOUT); assert_eq!(*call_count.borrow(), GET_ADDR_FANOUT);
// The following two calls to `update` should be skipped // The following two calls to `update` should be skipped
candidate_set candidate_set
@ -204,7 +202,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
.await .await
.expect("Call to CandidateSet::update should not fail"); .expect("Call to CandidateSet::update should not fail");
assert_eq!(call_count.load(Ordering::SeqCst), GET_ADDR_FANOUT); assert_eq!(*call_count.borrow(), GET_ADDR_FANOUT);
// After waiting for at least the minimum interval the call to `update` should succeed // After waiting for at least the minimum interval the call to `update` should succeed
time::advance(MIN_PEER_GET_ADDR_INTERVAL).await; time::advance(MIN_PEER_GET_ADDR_INTERVAL).await;
@ -213,7 +211,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
.await .await
.expect("Call to CandidateSet::update should not fail"); .expect("Call to CandidateSet::update should not fail");
assert_eq!(call_count.load(Ordering::SeqCst), 2 * GET_ADDR_FANOUT); assert_eq!(*call_count.borrow(), 2 * GET_ADDR_FANOUT);
}); });
} }
@ -242,7 +240,7 @@ fn mock_gossiped_peers(last_seen_times: impl IntoIterator<Item = DateTime<Utc>>)
/// Create a mock `PeerSet` service that checks that requests to it are rate limited. /// Create a mock `PeerSet` service that checks that requests to it are rate limited.
/// ///
/// The function also returns an atomic counter, that can be used for checking how many times the /// The function also returns a call count watcher, that can be used for checking how many times the
/// service was called. /// service was called.
fn mock_peer_service<E>() -> ( fn mock_peer_service<E>() -> (
impl Service< impl Service<
@ -251,12 +249,12 @@ fn mock_peer_service<E>() -> (
Future = future::Ready<Result<Response, E>>, Future = future::Ready<Result<Response, E>>,
Error = E, Error = E,
> + 'static, > + 'static,
Arc<AtomicUsize>, watch::Receiver<usize>,
) { ) {
let rate_limit_interval = MIN_PEER_GET_ADDR_INTERVAL; let rate_limit_interval = MIN_PEER_GET_ADDR_INTERVAL;
let call_counter = Arc::new(AtomicUsize::new(0)); let mut call_counter = 0;
let call_counter_to_return = call_counter.clone(); let (call_count_sender, call_count_receiver) = watch::channel(call_counter);
let mut peer_request_tracker: VecDeque<_> = let mut peer_request_tracker: VecDeque<_> =
iter::repeat(Instant::now()).take(GET_ADDR_FANOUT).collect(); iter::repeat(Instant::now()).take(GET_ADDR_FANOUT).collect();
@ -274,7 +272,8 @@ fn mock_peer_service<E>() -> (
peer_request_tracker.push_back(Instant::now() + rate_limit_interval); peer_request_tracker.push_back(Instant::now() + rate_limit_interval);
// Increment count of calls // Increment count of calls
call_counter.fetch_add(1, Ordering::SeqCst); call_counter += 1;
let _ = call_count_sender.send(call_counter);
// Return an empty list of peer addresses // Return an empty list of peer addresses
future::ok(Response::Peers(vec![])) future::ok(Response::Peers(vec![]))
@ -283,5 +282,5 @@ fn mock_peer_service<E>() -> (
} }
}); });
(service, call_counter_to_return) (service, call_count_receiver)
} }