Add batch output channel.

This commit is contained in:
c0gent 2018-10-02 09:47:11 -07:00
parent f8ff1bf917
commit 36783a04bc
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
3 changed files with 34 additions and 6 deletions

View File

@ -23,7 +23,7 @@ use std::collections::BTreeMap;
use tokio::{self, prelude::*};
use {
Contribution, InAddr, Input, InternalMessage, InternalMessageKind, InternalRx, Message,
NetworkNodeInfo, NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind,
NetworkNodeInfo, NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind, BatchTx,
};
/// Hydrabadger event (internal message) handler.
@ -35,15 +35,18 @@ pub struct Handler<T: Contribution> {
wire_queue: SegQueue<(Uid, WireMessage<T>, usize)>,
// Output from HoneyBadger:
step_queue: SegQueue<Step<T>>,
// TODO: Use a bounded tx/rx (find a sensible upper bound):
batch_tx: BatchTx<T>,
}
impl<T: Contribution> Handler<T> {
pub(super) fn new(hdb: Hydrabadger<T>, peer_internal_rx: InternalRx<T>) -> Handler<T> {
pub(super) fn new(hdb: Hydrabadger<T>, peer_internal_rx: InternalRx<T>, batch_tx: BatchTx<T>) -> Handler<T> {
Handler {
hdb,
peer_internal_rx,
wire_queue: SegQueue::new(),
step_queue: SegQueue::new(),
batch_tx,
}
}
@ -826,7 +829,16 @@ impl<T: Contribution> Future for Handler<T> {
::std::thread::sleep(::std::time::Duration::from_millis(extra_delay));
}
// TODO: Something useful!
// Send the batch along its merry way:
if !self.batch_tx.is_closed() {
if let Err(err) = self.batch_tx.unbounded_send(batch) {
error!("Unable to send batch output. Shutting down...");
return Ok(Async::Ready(()));
}
} else {
info!("Batch output receiver dropped. Shutting down...");
return Ok(Async::Ready(()));
}
}
for hb_msg in step.messages.drain(..) {

View File

@ -34,7 +34,7 @@ use tokio::{
};
use {
Contribution, InAddr, InternalMessage, InternalTx, OutAddr, Uid, WireMessage, WireMessageKind,
WireMessages,
WireMessages, BatchRx,
};
// The HoneyBadger batch size.
@ -112,6 +112,7 @@ struct Inner<T: Contribution> {
pub struct Hydrabadger<T: Contribution> {
inner: Arc<Inner<T>>,
handler: Arc<Mutex<Option<Handler<T>>>>,
batch_rx: Arc<Mutex<Option<BatchRx<T>>>>,
}
impl<T: Contribution> Hydrabadger<T> {
@ -125,6 +126,7 @@ impl<T: Contribution> Hydrabadger<T> {
let secret_key = SecretKey::rand(&mut rand::thread_rng());
let (peer_internal_tx, peer_internal_rx) = mpsc::unbounded();
let (batch_tx, batch_rx) = mpsc::unbounded();
info!("");
info!("Local Hydrabadger Node: ");
@ -156,9 +158,10 @@ impl<T: Contribution> Hydrabadger<T> {
let hdb = Hydrabadger {
inner,
handler: Arc::new(Mutex::new(None)),
batch_rx: Arc::new(Mutex::new(Some(batch_rx))),
};
*hdb.handler.lock() = Some(Handler::new(hdb.clone(), peer_internal_rx));
*hdb.handler.lock() = Some(Handler::new(hdb.clone(), peer_internal_rx, batch_tx));
hdb
}
@ -173,6 +176,11 @@ impl<T: Contribution> Hydrabadger<T> {
self.handler.lock().take()
}
/// Returns the batch output receiver.
pub fn batch_rx(&self) -> Option<BatchRx<T>> {
self.batch_rx.lock().take()
}
/// Returns a reference to the inner state.
pub(crate) fn state(&self) -> RwLockReadGuard<State<T>> {
self.inner.state.read()

View File

@ -72,7 +72,7 @@ use uuid::Uuid;
// use bincode::{serialize, deserialize};
use hbbft::{
crypto::{PublicKey, PublicKeySet},
dynamic_honey_badger::{JoinPlan, Message as DhbMessage},
dynamic_honey_badger::{JoinPlan, Message as DhbMessage, Batch},
messaging::Step as MessagingStep,
queueing_honey_badger::{Input as QhbInput, QueueingHoneyBadger},
sync_key_gen::{Ack, Part},
@ -101,6 +101,14 @@ type InternalTx<T> = mpsc::UnboundedSender<InternalMessage<T>>;
// TODO: Use a bounded tx/rx (find a sensible upper bound):
type InternalRx<T> = mpsc::UnboundedReceiver<InternalMessage<T>>;
/// Transmit half of the batch output channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
type BatchTx<T> = mpsc::UnboundedSender<Batch<Vec<Vec<T>>, Uid>>;
/// Receive half of the batch output channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
type BatchRx<T> = mpsc::UnboundedReceiver<Batch<Vec<Vec<T>>, Uid>>;
pub trait Contribution:
HbbftContribution + Clone + Debug + Serialize + DeserializeOwned + 'static
{