Generate transactions only after each epoch has begun.

This commit is contained in:
c0gent 2018-11-01 08:58:47 -07:00
parent 41298be329
commit a0898ab918
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
3 changed files with 75 additions and 46 deletions

View File

@ -393,6 +393,10 @@ impl<T: Contribution> Handler<T> {
)?);
}
}
for l in self.hdb.epoch_listeners().iter() {
l.unbounded_send(self.hdb.current_epoch())
.map_err(|_| Error::InstantiateHbListenerDropped)?;
}
}
StateDsct::AwaitingMorePeersForKeyGeneration => unimplemented!(),
StateDsct::Observer => {

View File

@ -30,7 +30,7 @@ use tokio::{
self,
net::{TcpListener, TcpStream},
prelude::*,
timer::Interval,
timer::{Interval, Delay},
};
use {
Contribution, InAddr, InternalMessage, InternalTx, OutAddr, Uid, WireMessage, WireMessageKind,
@ -216,20 +216,6 @@ impl<T: Contribution> Hydrabadger<T> {
(sd, 0, 0)
}
/// Returns a stream of epoch numbers (e) indicating that a batch has been
/// output for an epoch (e - 1) and that a new epoch has begun.
///
/// The current epoch will be sent upon registration. If a listener is
/// registered before any batches have been output by this instance of
/// Hydrabadger, the start epoch will be output.
pub fn register_epoch_listener(&self) -> EpochRx {
let (tx, rx) = mpsc::unbounded();
tx.unbounded_send(*self.inner.current_epoch.lock())
.expect("Unknown failure: Rx can not have been dropped yet");
self.inner.epoch_listeners.write().push(tx);
rx
}
pub fn is_validator(&self) -> bool {
StateDsct::from(self.inner.state_dsct.load(Ordering::Relaxed)) == StateDsct::Validator
}
@ -261,6 +247,22 @@ impl<T: Contribution> Hydrabadger<T> {
*self.inner.current_epoch.lock()
}
/// Returns a stream of epoch numbers (e) indicating that a batch has been
/// output for an epoch (e - 1) and that a new epoch has begun.
///
/// The current epoch will be sent upon registration. If a listener is
/// registered before any batches have been output by this instance of
/// Hydrabadger, the start epoch will be output.
pub fn register_epoch_listener(&self) -> EpochRx {
let (tx, rx) = mpsc::unbounded();
if self.is_validator() {
tx.unbounded_send(self.current_epoch())
.expect("Unknown error: receiver can not have dropped");
}
self.inner.epoch_listeners.write().push(tx);
rx
}
/// Returns a reference to the epoch listeners list.
pub(crate) fn epoch_listeners(&self) -> RwLockReadGuard<Vec<EpochTx>> {
self.inner.epoch_listeners.read()
@ -392,12 +394,55 @@ impl<T: Contribution> Hydrabadger<T> {
})
}
fn generate_contributions(self, gen_txns: Option<fn(usize, usize) -> T>)
-> impl Future<Item = (), Error = ()>
{
if let Some(gen_txns) = gen_txns {
let epoch_stream = self.register_epoch_listener();
let gen_delay = self.inner.config.txn_gen_interval;
let gen_cntrb = epoch_stream
.and_then(move |epoch_no| {
Delay::new(Instant::now() + Duration::from_millis(gen_delay))
.map_err(|err| panic!("Timer error: {:?}", err))
.and_then(move |_| Ok(epoch_no))
})
.for_each(move |_epoch_no| {
let hdb = self.clone();
match hdb.state_info_stale().0 {
StateDsct::Validator => {
info!(
"Generating and inputting {} random transactions...",
self.inner.config.txn_gen_count
);
// Send some random transactions to our internal HB instance.
let txns = gen_txns(
self.inner.config.txn_gen_count,
self.inner.config.txn_gen_bytes,
);
hdb.send_internal(InternalMessage::hb_input(
hdb.inner.uid,
OutAddr(*hdb.inner.addr),
DhbInput::User(txns),
));
}
_ => {}
}
Ok(())
})
.map_err(|err| panic!("Contribution generation error: {:?}", err));
Either::A(gen_cntrb)
} else {
Either::B(future::ok(()))
}
}
/// Returns a future that generates random transactions and logs status
/// messages.
fn generate_txns_status(
self,
gen_txns: Option<fn(usize, usize) -> T>,
) -> impl Future<Item = (), Error = ()> {
fn log_status(self) -> impl Future<Item = (), Error = ()> {
Interval::new(
Instant::now(),
Duration::from_millis(self.inner.config.txn_gen_interval),
@ -430,32 +475,9 @@ impl<T: Contribution> Hydrabadger<T> {
drop(peers);
if let Some(gt) = gen_txns {
match dsct {
StateDsct::Validator => {
info!(
"Generating and inputting {} random transactions...",
self.inner.config.txn_gen_count
);
// Send some random transactions to our internal HB instance.
let txns = gt(
self.inner.config.txn_gen_count,
self.inner.config.txn_gen_bytes,
);
hdb.send_internal(InternalMessage::hb_input(
hdb.inner.uid,
OutAddr(*hdb.inner.addr),
DhbInput::User(txns),
));
}
_ => {}
}
}
Ok(())
})
.map_err(|err| error!("List connection interval error: {:?}", err))
.map_err(|err| panic!("List connection interval error: {:?}", err))
}
/// Binds to a host address and returns a future which starts the node.
@ -494,11 +516,12 @@ impl<T: Contribution> Hydrabadger<T> {
.handler()
.map_err(|err| error!("Handler internal error: {:?}", err));
let generate_txns_status = self.clone().generate_txns_status(gen_txns);
let log_status = self.clone().log_status();
let generate_contributions = self.clone().generate_contributions(gen_txns);
listen
.join4(connect, generate_txns_status, hdb_handler)
.map(|(_, _, _, _)| ())
.join5(connect, hdb_handler, log_status, generate_contributions)
.map(|(..)| ())
}
/// Starts a node.

View File

@ -52,6 +52,8 @@ pub enum Error {
SyncKeyGenGenerate(SyncKeyGenError),
#[fail(display = "Unable to push user transactions, this node is not a validator")]
ProposeUserContributionNotValidator,
#[fail(display = "Unable to transmit epoch status to listener, listener receiver dropped")]
InstantiateHbListenerDropped,
}
impl From<std::io::Error> for Error {