Merge pull request #105 from ethcore/synchronizer

simplified threshold and configurable synchronizers, cleanun session init and on_message
This commit is contained in:
Svyatoslav Nikolsky 2016-11-10 07:46:07 +03:00 committed by GitHub
commit 28771adaa5
11 changed files with 157 additions and 161 deletions

View File

@ -11,7 +11,7 @@ const MAGIC_REGTEST: u32 = 0xDAB5BFFA;
/// Bitcoin network
/// https://bitcoin.org/en/glossary/mainnet
#[derive(Debug, PartialEq, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Magic {
/// The original and main network for Bitcoin transactions, where satoshis have real economic value.
Mainnet,

View File

@ -1,41 +1,31 @@
use message::{Payload, Magic, Message};
use net::Connection;
use message::{Payload, Message};
use session::Session;
use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage};
use util::Direction;
use {PeerId, PeerInfo};
use util::PeerInfo;
pub struct Channel {
version: u32,
magic: Magic,
stream: SharedTcpStream,
peer_info: PeerInfo,
session: Session,
stream: SharedTcpStream,
}
impl Channel {
pub fn new(connection: Connection, peer_id: PeerId, session: Session, direction: Direction) -> Self {
pub fn new(stream: SharedTcpStream, peer_info: PeerInfo, session: Session) -> Self {
Channel {
version: connection.version,
magic: connection.magic,
peer_info: PeerInfo {
address: connection.address,
id: peer_id,
direction: direction,
},
stream: stream,
peer_info: peer_info,
session: session,
stream: connection.stream,
}
}
pub fn write_message<T>(&self, payload: &T) -> WriteMessage<T, SharedTcpStream> where T: Payload {
// TODO: some tracing here
let message = Message::new(self.magic, self.version, payload).expect("failed to create outgoing message");
let message = Message::new(self.peer_info.magic, self.peer_info.version, payload).expect("failed to create outgoing message");
write_message(self.stream.clone(), message)
}
pub fn read_message(&self) -> ReadAnyMessage<SharedTcpStream> {
read_any_message(self.stream.clone(), self.magic)
read_any_message(self.stream.clone(), self.peer_info.magic)
}
pub fn shutdown(&self) {
@ -43,11 +33,11 @@ impl Channel {
}
pub fn version(&self) -> u32 {
self.version
self.peer_info.version
}
pub fn peer_info(&self) -> PeerInfo {
self.peer_info
self.peer_info.clone()
}
pub fn session(&self) -> &Session {

View File

@ -6,7 +6,7 @@ use parking_lot::RwLock;
use net::{Connection, Channel};
use p2p::Context;
use session::{SessionFactory};
use util::Direction;
use util::{Direction, PeerInfo};
use PeerId;
#[derive(Default)]
@ -42,8 +42,17 @@ impl Connections {
/// Returnes a shared pointer to it.
pub fn store<T>(&self, context: Arc<Context>, connection: Connection, direction: Direction) -> Arc<Channel> where T: SessionFactory {
let id = self.peer_counter.fetch_add(1, Ordering::AcqRel);
let session = T::new_session(context, id);
let channel = Arc::new(Channel::new(connection, id, session, direction));
let peer_info = PeerInfo {
id: id,
address: connection.address,
direction: direction,
version: connection.version,
magic: connection.magic,
};
let session = T::new_session(context, peer_info.clone());
let channel = Arc::new(Channel::new(connection.stream, peer_info, session));
self.channels.write().insert(id, channel.clone());
channel
}

View File

@ -141,7 +141,7 @@ impl Context {
let channel = context.connections.store::<T>(context.clone(), connection, Direction::Outbound);
// initialize session and then start reading messages
channel.session().initialize(channel.clone());
channel.session().initialize();
Context::on_message(context, channel)
},
Ok(DeadlineStatus::Meet(Err(_))) => {
@ -191,7 +191,7 @@ impl Context {
let channel = context.connections.store::<NormalSessionFactory>(context.clone(), connection, Direction::Inbound);
// initialize session and then start reading messages
channel.session().initialize(channel.clone());
channel.session().initialize();
Context::on_message(context.clone(), channel)
},
Ok(DeadlineStatus::Meet(Err(err))) => {
@ -261,7 +261,7 @@ impl Context {
// successful read
trace!("Received {} message from {}", command, channel.peer_info().address);
// handle message and read the next one
match channel.session().on_message(channel.clone(), command, payload) {
match channel.session().on_message(command, payload) {
Ok(_) => {
context.node_table.write().note_used(&channel.peer_info().address);
let on_message = Context::on_message(context.clone(), channel);

View File

@ -5,44 +5,43 @@ use message::{Error, Command, deserialize_payload, Payload};
use message::types::{GetAddr, Addr};
use protocol::Protocol;
use p2p::Context;
use util::Direction;
use PeerId;
use util::{Direction, PeerInfo};
pub struct AddrProtocol {
/// Context
context: Arc<Context>,
/// Connected peer id.
peer: PeerId,
/// Connected peer info.
info: PeerInfo,
}
impl AddrProtocol {
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
AddrProtocol {
context: context,
peer: peer,
info: info,
}
}
}
impl Protocol for AddrProtocol {
fn initialize(&mut self, direction: Direction, _version: u32) {
if let Direction::Outbound = direction {
let send = Context::send_to_peer(self.context.clone(), self.peer, &GetAddr);
fn initialize(&mut self) {
if let Direction::Outbound = self.info.direction {
let send = Context::send_to_peer(self.context.clone(), self.info.id, &GetAddr);
self.context.spawn(send);
}
}
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> {
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {
// normal nodes send addr message only after they receive getaddr message
// meanwhile seednodes, surprisingly, send addr message even before they are asked for it
if command == &GetAddr::command() {
let _: GetAddr = try!(deserialize_payload(payload, version));
let _: GetAddr = try!(deserialize_payload(payload, self.info.version));
let entries = self.context.node_table_entries().into_iter().map(Into::into).collect();
let addr = Addr::new(entries);
let send = Context::send_to_peer(self.context.clone(), self.peer, &addr);
let send = Context::send_to_peer(self.context.clone(), self.info.id, &addr);
self.context.spawn(send);
} else if command == &Addr::command() {
let addr: Addr = try!(deserialize_payload(payload, version));
let addr: Addr = try!(deserialize_payload(payload, self.info.version));
match addr {
Addr::V0(_) => {
unreachable!("This version of protocol is not supported!");
@ -60,30 +59,30 @@ impl Protocol for AddrProtocol {
pub struct SeednodeProtocol {
/// Context
context: Arc<Context>,
/// Connected peer id.
peer: PeerId,
/// Connected peer info,
info: PeerInfo,
/// Indicates if disconnecting has been scheduled.
disconnecting: bool,
}
impl SeednodeProtocol {
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
SeednodeProtocol {
context: context,
peer: peer,
info: info,
disconnecting: false,
}
}
}
impl Protocol for SeednodeProtocol {
fn on_message(&mut self, command: &Command, _payload: &Bytes, _version: u32) -> Result<(), Error> {
fn on_message(&mut self, command: &Command, _payload: &Bytes) -> Result<(), Error> {
// Seednodes send addr message more than once with different addresses.
// We can't disconenct after first read. Let's delay it by 60 seconds.
if !self.disconnecting && command == &Addr::command() {
self.disconnecting = true;
let context = self.context.clone();
let peer = self.peer;
let peer = self.info.id;
self.context.execute_after(Duration::new(60, 0), move || {
context.close_channel(peer);
});

View File

@ -6,17 +6,16 @@ use bytes::Bytes;
use message::Error;
use message::common::Command;
use util::Direction;
pub use self::addr::{AddrProtocol, SeednodeProtocol};
pub use self::ping::PingProtocol;
pub use self::sync::{SyncProtocol, InboundSyncConnection, InboundSyncConnectionRef, OutboundSyncConnection, OutboundSyncConnectionRef, LocalSyncNode, LocalSyncNodeRef};
pub trait Protocol: Send {
/// Initialize the protocol.
fn initialize(&mut self, _direction: Direction, _version: u32) {}
fn initialize(&mut self) {}
/// Handle the message.
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error>;
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error>;
/// On disconnect.
fn on_close(&mut self) {}

View File

@ -4,10 +4,9 @@ use message::{Error, Payload, deserialize_payload};
use message::types::{Ping, Pong};
use message::common::Command;
use protocol::Protocol;
use util::Direction;
use util::{PeerId, PeerInfo};
use util::nonce::{NonceGenerator, RandomNonce};
use p2p::Context;
use PeerId;
pub trait PingContext: Send + Sync {
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where Self: Sized, T: Payload;
@ -23,8 +22,8 @@ impl PingContext for Context {
pub struct PingProtocol<T = RandomNonce, C = Context> {
/// Context
context: Arc<C>,
/// Connected peer id.
peer: PeerId,
/// Connected peer info.
info: PeerInfo,
/// Nonce generator.
nonce_generator: T,
/// Last nonce sent in the ping message.
@ -32,10 +31,10 @@ pub struct PingProtocol<T = RandomNonce, C = Context> {
}
impl PingProtocol {
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
PingProtocol {
context: context,
peer: peer,
info: info,
nonce_generator: RandomNonce::default(),
last_ping_nonce: None,
}
@ -43,21 +42,21 @@ impl PingProtocol {
}
impl<T, C> Protocol for PingProtocol<T, C> where T: NonceGenerator + Send, C: PingContext {
fn initialize(&mut self, _direction: Direction, _version: u32) {
fn initialize(&mut self) {
// bitcoind always sends ping, let's do the same
let nonce = self.nonce_generator.get();
self.last_ping_nonce = Some(nonce);
let ping = Ping::new(nonce);
PingContext::send_to_peer(self.context.clone(), self.peer, &ping);
PingContext::send_to_peer(self.context.clone(), self.info.id, &ping);
}
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> {
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {
if command == &Ping::command() {
let ping: Ping = try!(deserialize_payload(payload, version));
let ping: Ping = try!(deserialize_payload(payload, self.info.version));
let pong = Pong::new(ping.nonce);
PingContext::send_to_peer(self.context.clone(), self.peer, &pong);
PingContext::send_to_peer(self.context.clone(), self.info.id, &pong);
} else if command == &Pong::command() {
let pong: Pong = try!(deserialize_payload(payload, version));
let pong: Pong = try!(deserialize_payload(payload, self.info.version));
if Some(pong.nonce) != self.last_ping_nonce.take() {
return Err(Error::InvalidCommand)
}
@ -72,12 +71,11 @@ mod tests {
use std::sync::Arc;
use parking_lot::Mutex;
use bytes::Bytes;
use message::{Payload, serialize_payload};
use message::{Payload, serialize_payload, Magic};
use message::types::{Ping, Pong};
use util::Direction;
use util::{PeerId, PeerInfo, Direction};
use util::nonce::StaticNonce;
use protocol::Protocol;
use PeerId;
use super::{PingProtocol, PingContext};
#[derive(Default)]
@ -101,12 +99,18 @@ mod tests {
let expected_message = serialize_payload(&Ping::new(nonce), 0).unwrap();
let mut ping_protocol = PingProtocol {
context: ping_context.clone(),
peer: peer,
info: PeerInfo {
id: peer,
address: "0.0.0.0:8080".parse().unwrap(),
direction: Direction::Inbound,
version: 0,
magic: Magic::Testnet,
},
nonce_generator: StaticNonce::new(nonce),
last_ping_nonce: None,
};
ping_protocol.initialize(Direction::Inbound, 0);
ping_protocol.initialize();
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].0, peer);
@ -124,12 +128,18 @@ mod tests {
let expected_message = serialize_payload(&Pong::new(nonce), 0).unwrap();
let mut ping_protocol = PingProtocol {
context: ping_context.clone(),
peer: peer,
info: PeerInfo {
id: peer,
address: "0.0.0.0:8080".parse().unwrap(),
direction: Direction::Inbound,
version: 0,
magic: Magic::Testnet,
},
nonce_generator: StaticNonce::new(nonce),
last_ping_nonce: None,
};
assert!(ping_protocol.on_message(&command, &message, 0).is_ok());
assert!(ping_protocol.on_message(&command, &message).is_ok());
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].0, peer);
@ -146,12 +156,18 @@ mod tests {
let message = serialize_payload(&Pong::new(nonce), 0).unwrap();
let mut ping_protocol = PingProtocol {
context: ping_context.clone(),
peer: peer,
info: PeerInfo {
id: peer,
address: "0.0.0.0:8080".parse().unwrap(),
direction: Direction::Inbound,
version: 0,
magic: Magic::Testnet,
},
nonce_generator: StaticNonce::new(nonce),
last_ping_nonce: Some(nonce),
};
assert!(ping_protocol.on_message(&command, &message, 0).is_ok());
assert!(ping_protocol.on_message(&command, &message).is_ok());
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
assert_eq!(messages.len(), 0);
assert_eq!(ping_protocol.last_ping_nonce, None);

View File

@ -2,9 +2,8 @@ use std::sync::Arc;
use bytes::Bytes;
use message::{Command, Error, Payload, types, deserialize_payload};
use protocol::Protocol;
use util::Direction;
use util::{PeerInfo, PeerId};
use p2p::Context;
use PeerId;
pub type InboundSyncConnectionRef = Box<InboundSyncConnection>;
pub type OutboundSyncConnectionRef = Box<OutboundSyncConnection>;
@ -165,98 +164,100 @@ impl OutboundSyncConnection for OutboundSync {
pub struct SyncProtocol {
inbound_connection: InboundSyncConnectionRef,
info: PeerInfo,
}
impl SyncProtocol {
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
let outbound_connection = OutboundSync::new(context.clone(), peer).boxed();
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
let outbound_connection = OutboundSync::new(context.clone(), info.id).boxed();
let inbound_connection = context.create_sync_session(0, outbound_connection);
SyncProtocol {
inbound_connection: inbound_connection,
info: info,
}
}
}
impl Protocol for SyncProtocol {
fn initialize(&mut self, _direction: Direction, version: u32) {
self.inbound_connection.start_sync_session(version);
fn initialize(&mut self) {
self.inbound_connection.start_sync_session(self.info.version);
}
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> {
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {
if command == &types::Inv::command() {
let message: types::Inv = try!(deserialize_payload(payload, version));
let message: types::Inv = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_inventory(message);
}
else if command == &types::GetData::command() {
let message: types::GetData = try!(deserialize_payload(payload, version));
let message: types::GetData = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getdata(message);
}
else if command == &types::GetBlocks::command() {
let message: types::GetBlocks = try!(deserialize_payload(payload, version));
let message: types::GetBlocks = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getblocks(message);
}
else if command == &types::GetHeaders::command() {
let message: types::GetHeaders = try!(deserialize_payload(payload, version));
let message: types::GetHeaders = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getheaders(message);
}
else if command == &types::Tx::command() {
let message: types::Tx = try!(deserialize_payload(payload, version));
let message: types::Tx = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_transaction(message);
}
else if command == &types::Block::command() {
let message: types::Block = try!(deserialize_payload(payload, version));
let message: types::Block = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_block(message);
}
else if command == &types::MemPool::command() {
let message: types::MemPool = try!(deserialize_payload(payload, version));
let message: types::MemPool = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_mempool(message);
}
else if command == &types::Headers::command() {
let message: types::Headers = try!(deserialize_payload(payload, version));
let message: types::Headers = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_headers(message);
}
else if command == &types::FilterLoad::command() {
let message: types::FilterLoad = try!(deserialize_payload(payload, version));
let message: types::FilterLoad = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_filterload(message);
}
else if command == &types::FilterAdd::command() {
let message: types::FilterAdd = try!(deserialize_payload(payload, version));
let message: types::FilterAdd = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_filteradd(message);
}
else if command == &types::FilterClear::command() {
let message: types::FilterClear = try!(deserialize_payload(payload, version));
let message: types::FilterClear = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_filterclear(message);
}
else if command == &types::MerkleBlock::command() {
let message: types::MerkleBlock = try!(deserialize_payload(payload, version));
let message: types::MerkleBlock = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_merkleblock(message);
}
else if command == &types::SendHeaders::command() {
let message: types::SendHeaders = try!(deserialize_payload(payload, version));
let message: types::SendHeaders = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_sendheaders(message);
}
else if command == &types::FeeFilter::command() {
let message: types::FeeFilter = try!(deserialize_payload(payload, version));
let message: types::FeeFilter = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_feefilter(message);
}
else if command == &types::SendCompact::command() {
let message: types::SendCompact = try!(deserialize_payload(payload, version));
let message: types::SendCompact = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_send_compact(message);
}
else if command == &types::CompactBlock::command() {
let message: types::CompactBlock = try!(deserialize_payload(payload, version));
let message: types::CompactBlock = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_compact_block(message);
}
else if command == &types::GetBlockTxn::command() {
let message: types::GetBlockTxn = try!(deserialize_payload(payload, version));
let message: types::GetBlockTxn = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_get_block_txn(message);
}
else if command == &types::BlockTxn::command() {
let message: types::BlockTxn = try!(deserialize_payload(payload, version));
let message: types::BlockTxn = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_block_txn(message);
}
else if command == &types::NotFound::command() {
let message: types::NotFound = try!(deserialize_payload(payload, version));
let message: types::NotFound = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_notfound(message);
}
Ok(())

View File

@ -3,21 +3,20 @@ use parking_lot::Mutex;
use bytes::Bytes;
use message::{Command, Error};
use p2p::Context;
use net::Channel;
use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol};
use PeerId;
use util::{PeerInfo};
pub trait SessionFactory {
fn new_session(context: Arc<Context>, peer: PeerId) -> Session;
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session;
}
pub struct SeednodeSessionFactory;
impl SessionFactory for SeednodeSessionFactory {
fn new_session(context: Arc<Context>, peer: PeerId) -> Session {
let ping = PingProtocol::new(context.clone(), peer).boxed();
let addr = AddrProtocol::new(context.clone(), peer).boxed();
let seed = SeednodeProtocol::new(context.clone(), peer).boxed();
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session {
let ping = PingProtocol::new(context.clone(), info.clone()).boxed();
let addr = AddrProtocol::new(context.clone(), info.clone()).boxed();
let seed = SeednodeProtocol::new(context.clone(), info).boxed();
Session::new(vec![ping, addr, seed])
}
}
@ -25,10 +24,10 @@ impl SessionFactory for SeednodeSessionFactory {
pub struct NormalSessionFactory;
impl SessionFactory for NormalSessionFactory {
fn new_session(context: Arc<Context>, peer: PeerId) -> Session {
let ping = PingProtocol::new(context.clone(), peer).boxed();
let addr = AddrProtocol::new(context.clone(), peer).boxed();
let sync = SyncProtocol::new(context, peer).boxed();
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session {
let ping = PingProtocol::new(context.clone(), info.clone()).boxed();
let addr = AddrProtocol::new(context.clone(), info.clone()).boxed();
let sync = SyncProtocol::new(context, info).boxed();
Session::new(vec![ping, addr, sync])
}
}
@ -44,17 +43,17 @@ impl Session {
}
}
pub fn initialize(&self, channel: Arc<Channel>) {
pub fn initialize(&self) {
for protocol in self.protocols.lock().iter_mut() {
protocol.initialize(channel.peer_info().direction, channel.version());
protocol.initialize();
}
}
pub fn on_message(&self, channel: Arc<Channel>, command: Command, payload: Bytes) -> Result<(), Error> {
pub fn on_message(&self, command: Command, payload: Bytes) -> Result<(), Error> {
self.protocols.lock()
.iter_mut()
.map(|protocol| {
protocol.on_message(&command, &payload, channel.version())
protocol.on_message(&command, &payload)
})
.collect::<Result<Vec<_>, Error>>()
.map(|_| ())

View File

@ -1,4 +1,5 @@
use std::net::SocketAddr;
use message::Magic;
pub type PeerId = usize;
@ -8,10 +9,12 @@ pub enum Direction {
Outbound,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PeerInfo {
pub id: PeerId,
pub address: SocketAddr,
pub direction: Direction,
pub version: u32,
pub magic: Magic,
}

View File

@ -66,23 +66,19 @@ impl Synchronizer for NoopSynchronizer {
#[derive(Debug)]
struct ThresholdSynchronizer {
inner: FifoSynchronizer,
start_id: u32,
threshold: u32,
to_grant_min: u32,
to_grant_max: u32,
}
impl ThresholdSynchronizer {
fn new(next_to_grant: u32, threshold: u32) -> Self {
// let's mark all ids in threshold as declared
// this may cause some ids, to be skipped, but we don't care
// it won't affect correct execution of the program
let declared = next_to_grant + threshold;
fn new(declared: u32, threshold: u32) -> Self {
ThresholdSynchronizer {
inner: FifoSynchronizer {
declared_responses: declared,
next_to_grant: declared,
},
start_id: next_to_grant,
threshold: threshold,
to_grant_min: declared.overflowing_sub(threshold).0,
to_grant_max: declared,
}
}
}
@ -97,14 +93,19 @@ impl Synchronizer for ThresholdSynchronizer {
return true;
}
id.overflowing_sub(self.start_id).0 < self.threshold ||
self.start_id.overflowing_sub(id).0 < self.threshold
if self.to_grant_min <= self.to_grant_max {
// if max is bigger then min, id must be in range [min, max)
self.to_grant_min <= id && id < self.to_grant_max
} else {
// otherwise if is in range [min, u32::max_value()] || [0, max)
(self.to_grant_min <= id && id <= u32::max_value()) ||
id < self.to_grant_max
}
}
}
#[derive(Debug)]
enum InnerSynchronizer {
Fifo(FifoSynchronizer),
Noop(NoopSynchronizer),
Threshold(ThresholdSynchronizer),
}
@ -112,7 +113,7 @@ enum InnerSynchronizer {
impl InnerSynchronizer {
pub fn new(sync: bool) -> Self {
if sync {
InnerSynchronizer::Fifo(FifoSynchronizer::default())
InnerSynchronizer::Threshold(ThresholdSynchronizer::new(0, 0))
} else {
InnerSynchronizer::Noop(NoopSynchronizer::default())
}
@ -123,8 +124,6 @@ impl InnerSynchronizer {
pub struct ConfigurableSynchronizer {
/// Inner synchronizer which is currently used
inner: InnerSynchronizer,
/// Id of next response which is likely to be granted permission.
probably_next_to_grant: u32,
}
impl Default for ConfigurableSynchronizer {
@ -137,7 +136,6 @@ impl ConfigurableSynchronizer {
pub fn new(sync: bool) -> Self {
ConfigurableSynchronizer {
inner: InnerSynchronizer::new(sync),
probably_next_to_grant: 0,
}
}
@ -145,22 +143,15 @@ impl ConfigurableSynchronizer {
/// from last_processed response will still be granted permissions.
pub fn change_sync_policy(&mut self, sync: bool) {
let new_inner = match self.inner {
InnerSynchronizer::Fifo(ref s) if sync == false => {
self.probably_next_to_grant = s.next_to_grant;
InnerSynchronizer::Noop(NoopSynchronizer {
declared_responses: s.declared_responses,
})
},
InnerSynchronizer::Threshold(ref s) if sync == false => {
self.probably_next_to_grant = s.inner.next_to_grant;
InnerSynchronizer::Noop(NoopSynchronizer {
declared_responses: s.inner.declared_responses,
})
},
InnerSynchronizer::Noop(_) if sync == true => {
InnerSynchronizer::Noop(ref s) if sync == true => {
let threshold = ThresholdSynchronizer::new(
self.probably_next_to_grant,
CONFIGURABLE_SYNCHRONIZER_THRESHOLD
s.declared_responses,
CONFIGURABLE_SYNCHRONIZER_THRESHOLD,
);
InnerSynchronizer::Threshold(threshold)
},
@ -174,7 +165,6 @@ impl ConfigurableSynchronizer {
impl Synchronizer for ConfigurableSynchronizer {
fn declare_response(&mut self) -> u32 {
match self.inner {
InnerSynchronizer::Fifo(ref mut s) => s.declare_response(),
InnerSynchronizer::Noop(ref mut s) => s.declare_response(),
InnerSynchronizer::Threshold(ref mut s) => s.declare_response(),
}
@ -182,12 +172,8 @@ impl Synchronizer for ConfigurableSynchronizer {
fn permission_for_response(&mut self, id: u32) -> bool {
match self.inner {
InnerSynchronizer::Fifo(ref mut s) => s.permission_for_response(id),
InnerSynchronizer::Threshold(ref mut s) => s.permission_for_response(id),
InnerSynchronizer::Noop(ref mut s) => {
self.probably_next_to_grant = id.overflowing_add(1).0;
s.permission_for_response(id)
},
InnerSynchronizer::Noop(ref mut s) => s.permission_for_response(id),
}
}
}
@ -195,7 +181,7 @@ impl Synchronizer for ConfigurableSynchronizer {
#[cfg(test)]
mod tests {
use super::{
Synchronizer, FifoSynchronizer, NoopSynchronizer, ConfigurableSynchronizer, ThresholdSynchronizer, CONFIGURABLE_SYNCHRONIZER_THRESHOLD
Synchronizer, FifoSynchronizer, NoopSynchronizer, ConfigurableSynchronizer, ThresholdSynchronizer
};
#[test]
@ -235,12 +221,10 @@ mod tests {
assert!(!s.permission_for_response(id2));
assert!(s.permission_for_response(id1));
assert!(s.permission_for_response(id2));
// historic permissions
assert!(!s.permission_for_response(0));
// historic permissions, order does not matter
assert!(s.permission_for_response(1));
assert!(s.permission_for_response(2));
assert!(s.permission_for_response(3));
assert!(!s.permission_for_response(4));
assert!(s.permission_for_response(0));
assert!(!s.permission_for_response(2));
}
#[test]
@ -268,11 +252,13 @@ mod tests {
assert!(s.permission_for_response(id2));
assert!(s.permission_for_response(id0));
let d0 = s.declare_response();
let d1 = s.declare_response();
// process messages synchronously again
s.change_sync_policy(true);
let last_async = id2;
// let's check again if we can process them only synchronously
let id0 = s.declare_response();
let id1 = s.declare_response();
@ -284,14 +270,8 @@ mod tests {
assert!(s.permission_for_response(id1));
assert!(s.permission_for_response(id2));
// there might be ~10 unhandled messages,
// let's check if we can process them out of order (eg. in reverse)
for i in (0..CONFIGURABLE_SYNCHRONIZER_THRESHOLD - 1).into_iter().rev() {
assert!(s.permission_for_response(last_async + i));
}
// the next one should fail
assert!(!s.permission_for_response(last_async + CONFIGURABLE_SYNCHRONIZER_THRESHOLD));
// order of requests before changing to policy to sync should not matter
assert!(s.permission_for_response(d1));
assert!(s.permission_for_response(d0));
}
}