Add 'epoch listener' registration.

Allows a 'listener' to be informed when each new epoch begins. Useful
for coordinating contribution proposal with batch output.
This commit is contained in:
c0gent 2018-11-01 06:52:08 -07:00
parent e66d708fed
commit e358806491
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
3 changed files with 92 additions and 18 deletions

View File

@ -760,7 +760,12 @@ impl<T: Contribution> Future for Handler<T> {
for batch in step.output.drain(..) {
info!("A HONEY BADGER BATCH WITH CONTRIBUTIONS IS BEING STREAMED...");
if cfg!(exit_upon_epoch_1000) && batch.epoch() >= 1000 {
let batch_epoch = batch.epoch();
let prev_epoch = self.hdb.set_current_epoch(batch_epoch + 1);
assert_eq!(prev_epoch, batch_epoch);
// TODO: Remove
if cfg!(exit_upon_epoch_1000) && batch_epoch >= 1000 {
return Ok(Async::Ready(()));
}
@ -798,6 +803,17 @@ impl<T: Contribution> Future for Handler<T> {
if let Err(err) = self.batch_tx.unbounded_send(batch) {
error!("Unable to send batch output. Shutting down...");
return Ok(Async::Ready(()));
} else {
// Notify epoch listeners that a batch has been output.
let mut dropped_listeners = Vec::new();
for (i, listener) in self.hdb.epoch_listeners().iter().enumerate() {
if let Err(err) = listener.unbounded_send(batch_epoch + 1) {
dropped_listeners.push(i);
error!("Epoch listener {} has dropped.", i);
}
}
// TODO: Remove dropped listeners from the list (see
// comment on `Inner::epoch_listeners`).
}
} else {
info!("Batch output receiver dropped. Shutting down...");

View File

@ -34,7 +34,7 @@ use tokio::{
};
use {
Contribution, InAddr, InternalMessage, InternalTx, OutAddr, Uid, WireMessage, WireMessageKind,
WireMessages, BatchRx,
WireMessages, BatchRx, EpochTx, EpochRx,
};
// The number of random transactions to generate per interval.
@ -105,6 +105,15 @@ struct Inner<T: Contribution> {
// TODO: Use a bounded tx/rx (find a sensible upper bound):
peer_internal_tx: InternalTx<T>,
/// The earliest epoch from which we have not yet received output.
//
// Used as an initial value when a new epoch listener is registered.
current_epoch: Mutex<u64>,
// TODO: Create a separate type which uses a hashmap internally and allows
// for Tx removal. Altenratively just `Option` wrap Txs.
epoch_listeners: RwLock<Vec<EpochTx>>,
config: Config,
}
@ -139,6 +148,8 @@ impl<T: Contribution> Hydrabadger<T> {
warn!("****** This is an alpha build. Do not use in production! ******");
warn!("");
let current_epoch = cfg.start_epoch;
let inner = Arc::new(Inner {
uid,
addr: InAddr(addr),
@ -148,6 +159,8 @@ impl<T: Contribution> Hydrabadger<T> {
state_dsct: AtomicUsize::new(0),
peer_internal_tx,
config: cfg,
current_epoch: Mutex::new(current_epoch),
epoch_listeners: RwLock::new(Vec::new()),
});
let hdb = Hydrabadger {
@ -187,6 +200,13 @@ impl<T: Contribution> Hydrabadger<T> {
state
}
/// Sets the publicly visible state discriminant and returns the previous value.
pub(super) fn set_state_discriminant(&self, dsct: StateDsct) -> StateDsct {
let sd = StateDsct::from(self.inner.state_dsct.swap(dsct.into(), Ordering::Release));
info!("State has been set from '{}' to '{}'.", sd, dsct);
sd
}
/// Returns a recent state discriminant.
///
/// The returned value may not be up to date and is to be considered
@ -196,11 +216,18 @@ impl<T: Contribution> Hydrabadger<T> {
(sd, 0, 0)
}
/// Sets the publicly visible state discriminant and returns the previous value.
pub(super) fn set_state_discriminant(&self, dsct: StateDsct) -> StateDsct {
let sd = StateDsct::from(self.inner.state_dsct.swap(dsct.into(), Ordering::Release));
info!("State has been set from '{}' to '{}'.", sd, dsct);
sd
/// Returns a stream of epoch numbers (e) indicating that a batch has been
/// output for an epoch (e - 1) and that a new epoch has begun.
///
/// The current epoch will be sent upon registration. If a listener is
/// registered before any batches have been output by this instance of
/// Hydrabadger, the start epoch will be output.
pub fn register_epoch_listener(&self) -> EpochRx {
let (tx, rx) = mpsc::unbounded();
tx.unbounded_send(*self.inner.current_epoch.lock())
.expect("Unknown failure: Rx can not have been dropped yet");
self.inner.epoch_listeners.write().push(tx);
rx
}
pub fn is_validator(&self) -> bool {
@ -217,7 +244,29 @@ impl<T: Contribution> Hydrabadger<T> {
self.inner.peers.write()
}
/// Returns a mutable reference to the peers list.
/// Sets the current epoch and returns the previous epoch.
///
/// The returned value should (always?) be equal to `epoch - 1`.
//
// TODO: Convert to a simple incrementer?
pub(crate) fn set_current_epoch(&self, epoch: u64) -> u64 {
let mut ce = self.inner.current_epoch.lock();
let prev_epoch = *ce;
*ce = epoch;
prev_epoch
}
/// Returns the epoch of the next batch to be output.
pub fn current_epoch(&self) -> u64 {
*self.inner.current_epoch.lock()
}
/// Returns a reference to the epoch listeners list.
pub(crate) fn epoch_listeners(&self) -> RwLockReadGuard<Vec<EpochTx>> {
self.inner.epoch_listeners.read()
}
/// Returns a reference to the config.
pub(crate) fn config(&self) -> &Config {
&self.inner.config
}
@ -235,16 +284,15 @@ impl<T: Contribution> Hydrabadger<T> {
/// Handles a incoming batch of user transactions.
pub fn propose_user_contribution(&self, txn: T) -> Result<(), Error> {
match self.inner.state_dsct.load(Ordering::Relaxed).into() {
StateDsct::Validator => {
self.send_internal(InternalMessage::hb_input(
self.inner.uid,
OutAddr(*self.inner.addr),
DhbInput::User(txn),
));
Ok(())
}
_ => Err(Error::ProposeUserContributionNotValidator),
if self.is_validator() {
self.send_internal(InternalMessage::hb_input(
self.inner.uid,
OutAddr(*self.inner.addr),
DhbInput::User(txn),
));
Ok(())
} else {
Err(Error::ProposeUserContributionNotValidator)
}
}

View File

@ -98,6 +98,16 @@ type BatchTx<T> = mpsc::UnboundedSender<Batch<T, Uid>>;
// TODO: Use a bounded tx/rx (find a sensible upper bound):
pub type BatchRx<T> = mpsc::UnboundedReceiver<Batch<T, Uid>>;
/// Transmit half of the epoch number output channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
type EpochTx = mpsc::UnboundedSender<u64>;
/// Receive half of the epoch number output channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
pub type EpochRx = mpsc::UnboundedReceiver<u64>;
pub trait Contribution:
HbbftContribution + Clone + Debug + Serialize + DeserializeOwned + 'static
{