Add internal messages and channel.
This commit is contained in:
parent
221cd24c07
commit
c1a7bc8953
13
Cargo.toml
13
Cargo.toml
|
@ -4,13 +4,13 @@ version = "0.1.0"
|
|||
authors = ["c0gent <nsan1129@gmail.com>"]
|
||||
autobins = false
|
||||
|
||||
[[bin]]
|
||||
name = "simulation"
|
||||
path = "src/bin/simulation.rs"
|
||||
# [[bin]]
|
||||
# name = "simulation"
|
||||
# path = "src/bin/simulation.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "network"
|
||||
path = "src/bin/network.rs"
|
||||
path = "src/bin/network_test.rs"
|
||||
|
||||
[dependencies]
|
||||
log = "*"
|
||||
|
@ -47,6 +47,7 @@ byteorder = "*"
|
|||
|
||||
[dependencies.hbbft]
|
||||
version = "*"
|
||||
git = "https://github.com/c0gent/hbbft"
|
||||
branch = "master"
|
||||
# git = "https://github.com/c0gent/hbbft"
|
||||
# branch = "master"
|
||||
path = "../hbbft"
|
||||
features = ["serialization-protobuf"]
|
|
@ -91,7 +91,7 @@ fn main() {
|
|||
.map(|bv| bv.as_bytes().to_vec());
|
||||
|
||||
let hb = Hydrabadger::new(bind_address, broadcast_value);
|
||||
hb.run(remote_addresses);
|
||||
hydrabadger::run_node(hb, remote_addresses);
|
||||
|
||||
// match mine() {
|
||||
// Ok(_) => {},
|
|
@ -1,404 +0,0 @@
|
|||
extern crate bincode;
|
||||
extern crate colored;
|
||||
extern crate env_logger;
|
||||
extern crate hbbft;
|
||||
extern crate itertools;
|
||||
extern crate pairing;
|
||||
extern crate rand;
|
||||
extern crate serde;
|
||||
#[macro_use(Deserialize, Serialize)]
|
||||
extern crate serde_derive;
|
||||
extern crate signifix;
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||
use std::rc::Rc;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{cmp, u64};
|
||||
|
||||
use colored::*;
|
||||
use itertools::Itertools;
|
||||
use rand::Rng;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
use signifix::{metric, TryFrom};
|
||||
|
||||
use hbbft::crypto::SecretKeySet;
|
||||
use hbbft::honey_badger::{Batch, HoneyBadger, HoneyBadgerBuilder};
|
||||
use hbbft::messaging::{DistAlgorithm, NetworkInfo, Target};
|
||||
|
||||
|
||||
/// A node identifier. In the simulation, nodes are simply numbered.
|
||||
#[derive(Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
|
||||
pub struct NodeUid(pub usize);
|
||||
|
||||
/// A transaction.
|
||||
#[derive(Serialize, Deserialize, Eq, PartialEq, Hash, Ord, PartialOrd, Debug, Clone)]
|
||||
pub struct Transaction(pub Vec<u8>);
|
||||
|
||||
impl Transaction {
|
||||
fn new(len: usize) -> Transaction {
|
||||
Transaction(rand::thread_rng().gen_iter().take(len).collect())
|
||||
}
|
||||
}
|
||||
|
||||
/// A serialized message with a sender and the timestamp of arrival.
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
struct TimestampedMessage<D: DistAlgorithm> {
|
||||
time: Duration,
|
||||
sender_id: D::NodeUid,
|
||||
target: Target<D::NodeUid>,
|
||||
message: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<D: DistAlgorithm> Clone for TimestampedMessage<D>
|
||||
where D::Message: Clone, {
|
||||
fn clone(&self) -> Self {
|
||||
TimestampedMessage {
|
||||
time: self.time,
|
||||
sender_id: self.sender_id.clone(),
|
||||
target: self.target.clone(),
|
||||
message: self.message.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Performance parameters of a node's hardware and Internet connection. For simplicity, only the
|
||||
/// sender's lag and bandwidth are taken into account. (I.e. infinite downstream, limited
|
||||
/// upstream.)
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct HwQuality {
|
||||
/// The network latency. This is added once for every message.
|
||||
latency: Duration,
|
||||
/// The inverse bandwidth, in time per byte.
|
||||
inv_bw: Duration,
|
||||
/// The CPU time multiplier: how much slower, in percent, is this node than your computer?
|
||||
cpu_factor: u32,
|
||||
}
|
||||
|
||||
|
||||
/// A "node" running an instance of the algorithm `D`.
|
||||
pub struct TestNode<D: DistAlgorithm> {
|
||||
/// This node's own ID.
|
||||
id: D::NodeUid,
|
||||
/// The instance of the broadcast algorithm.
|
||||
algo: D,
|
||||
/// The duration for which this node's CPU has already been simulated.
|
||||
time: Duration,
|
||||
/// The time when this node last sent data over the network.
|
||||
sent_time: Duration,
|
||||
/// Incoming messages from other nodes that this node has not yet handled, with timestamps.
|
||||
in_queue: VecDeque<TimestampedMessage<D>>,
|
||||
/// Outgoing messages to other nodes, with timestamps.
|
||||
out_queue: VecDeque<TimestampedMessage<D>>,
|
||||
/// The values this node has output so far, with timestamps.
|
||||
outputs: Vec<(Duration, D::Output)>,
|
||||
/// The number of messages this node has handled so far.
|
||||
message_count: usize,
|
||||
/// The total size of messages this node has handled so far, in bytes.
|
||||
message_size: u64,
|
||||
/// The hardware and network quality of this node.
|
||||
hw_quality: HwQuality,
|
||||
}
|
||||
|
||||
impl<D: DistAlgorithm> TestNode<D>
|
||||
where D::Message: Serialize + DeserializeOwned, {
|
||||
/// Creates a new test node with the given broadcast instance.
|
||||
fn new(algo: D, hw_quality: HwQuality) -> TestNode<D> {
|
||||
let mut node = TestNode {
|
||||
id: algo.our_id().clone(),
|
||||
algo,
|
||||
time: Duration::default(),
|
||||
sent_time: Duration::default(),
|
||||
in_queue: VecDeque::new(),
|
||||
out_queue: VecDeque::new(),
|
||||
outputs: Vec::new(),
|
||||
message_count: 0,
|
||||
message_size: 0,
|
||||
hw_quality,
|
||||
};
|
||||
node.send_output_and_msgs();
|
||||
node
|
||||
}
|
||||
|
||||
/// Handles the first message in the node's queue.
|
||||
fn handle_message(&mut self) {
|
||||
let ts_msg = self.in_queue.pop_front().expect("message not found");
|
||||
self.time = cmp::max(self.time, ts_msg.time);
|
||||
self.message_count += 1;
|
||||
self.message_size += ts_msg.message.len() as u64;
|
||||
let start = Instant::now();
|
||||
let msg = bincode::deserialize::<D::Message>(&ts_msg.message).expect("deserialize");
|
||||
self.algo
|
||||
.handle_message(&ts_msg.sender_id, msg)
|
||||
.expect("handling message");
|
||||
self.time += start.elapsed() * self.hw_quality.cpu_factor / 100;
|
||||
self.send_output_and_msgs()
|
||||
}
|
||||
|
||||
/// Handles the algorithm's output and messages.
|
||||
fn send_output_and_msgs(&mut self) {
|
||||
let start = Instant::now();
|
||||
let out_msgs: Vec<_> = self
|
||||
.algo
|
||||
.message_iter()
|
||||
.map(|msg| {
|
||||
(
|
||||
msg.target,
|
||||
bincode::serialize(&msg.message).expect("serialize"),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
self.time += start.elapsed() * self.hw_quality.cpu_factor / 100;
|
||||
let time = self.time;
|
||||
self.outputs
|
||||
.extend(self.algo.output_iter().map(|out| (time, out)));
|
||||
self.sent_time = cmp::max(self.time, self.sent_time);
|
||||
for (target, message) in out_msgs {
|
||||
self.sent_time += self.hw_quality.inv_bw * message.len() as u32;
|
||||
self.out_queue.push_back(TimestampedMessage {
|
||||
time: self.sent_time + self.hw_quality.latency,
|
||||
sender_id: self.id.clone(),
|
||||
target,
|
||||
message,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the time when the next message can be handled.
|
||||
fn next_event_time(&self) -> Option<Duration> {
|
||||
match self.in_queue.front() {
|
||||
None => None,
|
||||
Some(ts_msg) => Some(cmp::max(ts_msg.time, self.time)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of messages this node has handled so far.
|
||||
fn message_count(&self) -> usize {
|
||||
self.message_count
|
||||
}
|
||||
|
||||
/// Returns the size of messages this node has handled so far.
|
||||
fn message_size(&self) -> u64 {
|
||||
self.message_size
|
||||
}
|
||||
|
||||
/// Adds a message into the incoming queue.
|
||||
fn add_message(&mut self, msg: TimestampedMessage<D>) {
|
||||
match self.in_queue.iter().position(|other| other.time > msg.time) {
|
||||
None => self.in_queue.push_back(msg),
|
||||
Some(i) => self.in_queue.insert(i, msg),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// A collection of `TestNode`s representing a network.
|
||||
pub struct TestNetwork<D: DistAlgorithm> {
|
||||
nodes: BTreeMap<D::NodeUid, TestNode<D>>,
|
||||
}
|
||||
|
||||
impl<D: DistAlgorithm<NodeUid = NodeUid>> TestNetwork<D>
|
||||
where D::Message: Serialize + DeserializeOwned + Clone, {
|
||||
/// Creates a new network with `good_num` good nodes, and `dead_num` dead nodes.
|
||||
pub fn new<F>(good_num: usize, dead_num: usize, new_algo: F, hw_quality: HwQuality)
|
||||
-> TestNetwork<D>
|
||||
where F: Fn(NodeUid, BTreeSet<NodeUid>) -> D {
|
||||
let node_ids: BTreeSet<NodeUid> = (0..(good_num + dead_num)).map(NodeUid).collect();
|
||||
let new_node_by_id = |id: NodeUid| {
|
||||
(
|
||||
id,
|
||||
TestNode::new(new_algo(id, node_ids.clone()), hw_quality),
|
||||
)
|
||||
};
|
||||
let mut network = TestNetwork {
|
||||
nodes: (0..good_num).map(NodeUid).map(new_node_by_id).collect(),
|
||||
};
|
||||
let initial_msgs: Vec<_> = network
|
||||
.nodes
|
||||
.values_mut()
|
||||
.flat_map(|node| node.out_queue.drain(..))
|
||||
.collect();
|
||||
network.dispatch_messages(initial_msgs);
|
||||
network
|
||||
}
|
||||
|
||||
/// Pushes the messages into the queues of the corresponding recipients.
|
||||
fn dispatch_messages<Q>(&mut self, msgs: Q)
|
||||
where Q: IntoIterator<Item = TimestampedMessage<D>> {
|
||||
for ts_msg in msgs {
|
||||
match ts_msg.target {
|
||||
Target::All => {
|
||||
for node in self.nodes.values_mut() {
|
||||
if node.id != ts_msg.sender_id {
|
||||
node.add_message(ts_msg.clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
Target::Node(to_id) => {
|
||||
if let Some(node) = self.nodes.get_mut(&to_id) {
|
||||
node.add_message(ts_msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a queued message in one of the nodes with the earliest timestamp.
|
||||
pub fn step(&mut self) -> NodeUid {
|
||||
let min_time = self
|
||||
.nodes
|
||||
.values()
|
||||
.filter_map(TestNode::next_event_time)
|
||||
.min()
|
||||
.expect("no more messages in queue");
|
||||
let min_ids: Vec<NodeUid> = self
|
||||
.nodes
|
||||
.iter()
|
||||
.filter(|(_, node)| node.next_event_time() == Some(min_time))
|
||||
.map(|(id, _)| *id)
|
||||
.collect();
|
||||
let next_id = *rand::thread_rng().choose(&min_ids).unwrap();
|
||||
let msgs: Vec<_> = {
|
||||
let node = self.nodes.get_mut(&next_id).unwrap();
|
||||
node.handle_message();
|
||||
node.out_queue.drain(..).collect()
|
||||
};
|
||||
self.dispatch_messages(msgs);
|
||||
next_id
|
||||
}
|
||||
|
||||
/// Returns the number of messages that have been handled so far.
|
||||
pub fn message_count(&self) -> usize {
|
||||
self.nodes.values().map(TestNode::message_count).sum()
|
||||
}
|
||||
|
||||
/// Returns the total size of messages that have been handled so far.
|
||||
pub fn message_size(&self) -> u64 {
|
||||
self.nodes.values().map(TestNode::message_size).sum()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// The timestamped batches for a particular epoch that have already been output.
|
||||
#[derive(Clone, Default)]
|
||||
struct EpochInfo {
|
||||
nodes: BTreeMap<NodeUid, (Duration, Batch<Transaction, NodeUid>)>,
|
||||
}
|
||||
|
||||
impl EpochInfo {
|
||||
/// Adds a batch to this epoch. Prints information if the epoch is complete.
|
||||
fn add(&mut self, id: NodeUid, time: Duration, batch: &Batch<Transaction, NodeUid>,
|
||||
network: &TestNetwork<HoneyBadger<Transaction, NodeUid>>) {
|
||||
if self.nodes.contains_key(&id) {
|
||||
return;
|
||||
}
|
||||
self.nodes.insert(id, (time, batch.clone()));
|
||||
if self.nodes.len() < network.nodes.len() {
|
||||
return;
|
||||
}
|
||||
let (min_t, max_t) = self
|
||||
.nodes
|
||||
.values()
|
||||
.map(|&(time, _)| time)
|
||||
.minmax()
|
||||
.into_option()
|
||||
.unwrap();
|
||||
let txs = batch.len();
|
||||
println!(
|
||||
"{:>5} {:6} {:6} {:5} {:9} {:>9}B",
|
||||
batch.epoch.to_string().cyan(),
|
||||
min_t.as_secs() * 1000 + max_t.subsec_nanos() as u64 / 1_000_000,
|
||||
max_t.as_secs() * 1000 + max_t.subsec_nanos() as u64 / 1_000_000,
|
||||
txs,
|
||||
network.message_count() / network.nodes.len(),
|
||||
metric::Signifix::try_from(network.message_size() / network.nodes.len() as u64)
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Proposes `txn_count` values and expects nodes to output and order them.
|
||||
fn simulate_honey_badger(mut network: TestNetwork<HoneyBadger<Transaction, NodeUid>>,
|
||||
txn_count: usize) {
|
||||
// Returns `true` if the node has not output all transactions yet.
|
||||
// If it has, and has advanced another epoch, it clears all messages for later epochs.
|
||||
let node_busy = |node: &mut TestNode<HoneyBadger<Transaction, NodeUid>>| {
|
||||
node.outputs
|
||||
.iter()
|
||||
.map(|&(_, ref batch)| batch.len())
|
||||
.sum::<usize>() < txn_count
|
||||
};
|
||||
|
||||
// Handle messages until all nodes have output all transactions.
|
||||
println!("{}", "Epoch Min/Max Time Txs Msgs/Node Size/Node".bold());
|
||||
let mut epochs = Vec::new();
|
||||
while network.nodes.values_mut().any(node_busy) {
|
||||
let id = network.step();
|
||||
for &(time, ref batch) in &network.nodes[&id].outputs {
|
||||
if epochs.len() <= batch.epoch as usize {
|
||||
epochs.resize(batch.epoch as usize + 1, EpochInfo::default());
|
||||
}
|
||||
epochs[batch.epoch as usize].add(id, time, batch, &network);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// -n <n>, --nodes <n> The total number of nodes [default: 10]
|
||||
// -f <f>, --faulty <f> The number of faulty nodes [default: 0]
|
||||
// -t <txs>, --txs <txs> The number of transactions to process [default: 1000]
|
||||
// -b <b>, --batch <b> The batch size, i.e. txs per epoch [default: 100]
|
||||
// -l <lag>, --lag <lag> The network lag between sending and receiving [default: 100]
|
||||
// --bw <bw> The bandwidth, in kbit/s [default: 2000]
|
||||
// --cpu <cpu> The CPU speed, in percent of this machine's [default: 100]
|
||||
// --tx-size <size> The size of a transaction, in bytes [default: 10]
|
||||
fn main() {
|
||||
let node_count_total = 15;
|
||||
let node_count_faulty = 1;
|
||||
let txn_count = 1000;
|
||||
let batch_size = 100;
|
||||
let latency = 100;
|
||||
let bandwidth = 2000;
|
||||
let cpu = 100;
|
||||
let txn_bytes = 10;
|
||||
|
||||
println!("Simulating Honey Badger with:");
|
||||
println!("{} nodes, {} faulty", node_count_total, node_count_faulty);
|
||||
println!(
|
||||
"{} transactions, {} bytes each, ≤{} per epoch",
|
||||
txn_count, txn_bytes, batch_size
|
||||
);
|
||||
println!(
|
||||
"Network latency: {} ms, bandwidth: {} kbit/s, {:5.2}% CPU speed",
|
||||
latency, bandwidth, cpu
|
||||
);
|
||||
println!();
|
||||
|
||||
let node_count_good = node_count_total - node_count_faulty;
|
||||
let txns = (0..txn_count).map(|_| Transaction::new(txn_bytes));
|
||||
let sk_set = SecretKeySet::random(node_count_faulty, &mut rand::thread_rng());
|
||||
let pk_set = sk_set.public_keys();
|
||||
|
||||
let new_honey_badger = |id: NodeUid, all_ids: BTreeSet<NodeUid>| {
|
||||
let netinfo = Rc::new(NetworkInfo::new(
|
||||
id,
|
||||
all_ids,
|
||||
sk_set.secret_key_share(id.0 as u64),
|
||||
pk_set.clone(),
|
||||
));
|
||||
HoneyBadgerBuilder::new(netinfo)
|
||||
.batch_size(batch_size)
|
||||
.build_with_transactions(txns.clone())
|
||||
.expect("Instantiate honey_badger")
|
||||
};
|
||||
|
||||
let hw_quality = HwQuality {
|
||||
latency: Duration::from_millis(latency),
|
||||
inv_bw: Duration::new(0, 8_000_000 / bandwidth),
|
||||
cpu_factor: (10_000f32 / cpu as f32) as u32,
|
||||
};
|
||||
|
||||
let network = TestNetwork::new(node_count_good, node_count_faulty, new_honey_badger, hw_quality);
|
||||
simulate_honey_badger(network, txn_count);
|
||||
}
|
|
@ -19,9 +19,9 @@ use std::{
|
|||
io::Cursor,
|
||||
};
|
||||
use futures::{
|
||||
StartSend, AsyncSink,
|
||||
sync::mpsc,
|
||||
future::{self, Either},
|
||||
StartSend, AsyncSink,
|
||||
};
|
||||
use tokio::{
|
||||
self,
|
||||
|
@ -44,27 +44,21 @@ use bincode::{self, serialize_into, deserialize_from, serialize, deserialize};
|
|||
use tokio_serde_bincode::{ReadBincode, WriteBincode};
|
||||
|
||||
|
||||
|
||||
use hbbft::{
|
||||
broadcast::{Broadcast, BroadcastMessage},
|
||||
crypto::{
|
||||
SecretKeySet,
|
||||
poly::Poly,
|
||||
},
|
||||
messaging::{DistAlgorithm, NetworkInfo, SourcedMessage, Target},
|
||||
crypto::{SecretKeySet, poly::Poly},
|
||||
messaging::{DistAlgorithm, NetworkInfo, SourcedMessage, Target, TargetedMessage},
|
||||
proto::message::BroadcastProto,
|
||||
honey_badger::HoneyBadger,
|
||||
dynamic_honey_badger::{DynamicHoneyBadger, Input, Batch, Message, Change},
|
||||
dynamic_honey_badger::{/*DynamicHoneyBadger,*/ /*Input, Batch,*/ Message, /*Change*/},
|
||||
queueing_honey_badger::{QueueingHoneyBadger, Input, Batch, /*Message,*/ Change},
|
||||
};
|
||||
// use network::{comms_task, connection, messaging::Messaging};
|
||||
|
||||
|
||||
#[derive(Debug, Fail)]
|
||||
pub enum Error {
|
||||
#[fail(display = "{}", _0)]
|
||||
Io(std::io::Error),
|
||||
// #[fail(display = "{}", _0)]
|
||||
// CommsError(comms_task::Error),
|
||||
#[fail(display = "{}", _0)]
|
||||
Serde(bincode::Error),
|
||||
}
|
||||
|
@ -76,24 +70,104 @@ impl From<std::io::Error> for Error {
|
|||
}
|
||||
|
||||
|
||||
/// A transaction.
|
||||
#[derive(Serialize, Deserialize, Eq, PartialEq, Hash, Ord, PartialOrd, Debug, Clone)]
|
||||
pub struct Transaction(pub Vec<u8>);
|
||||
|
||||
impl Transaction {
|
||||
fn random(len: usize) -> Transaction {
|
||||
Transaction(rand::thread_rng().gen_iter().take(len).collect())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Messages sent over the network between nodes.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum WireMessage {
|
||||
pub enum WireMessageKind {
|
||||
Hello,
|
||||
Goodbye,
|
||||
Message,
|
||||
// Message,
|
||||
#[serde(with = "serde_bytes")]
|
||||
Bytes(Bytes),
|
||||
Message(Message<Uuid>),
|
||||
// Transaction()
|
||||
}
|
||||
|
||||
|
||||
/// Messages sent over the network between nodes.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct WireMessage {
|
||||
// src_uid: Uuid,
|
||||
kind: WireMessageKind,
|
||||
}
|
||||
|
||||
impl WireMessage {
|
||||
pub fn hello(/*src_uid: Uuid*/) -> WireMessage {
|
||||
WireMessage {
|
||||
// src_uid,
|
||||
kind: WireMessageKind::Hello,
|
||||
}
|
||||
}
|
||||
|
||||
// pub fn src_uid(&self) -> &Uuid {
|
||||
// &self.src_uid
|
||||
// }
|
||||
|
||||
pub fn kind(&self) -> &WireMessageKind {
|
||||
&self.kind
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// A message between internal threads/tasks.
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum InternalMessageKind {
|
||||
Wire(WireMessage),
|
||||
}
|
||||
|
||||
|
||||
/// A message between internal threads/tasks.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct InternalMessage {
|
||||
src_uid: Uuid,
|
||||
kind: InternalMessageKind,
|
||||
}
|
||||
|
||||
impl InternalMessage {
|
||||
pub fn wire(src_uid: Uuid, wire_message: WireMessage) -> InternalMessage {
|
||||
InternalMessage {
|
||||
src_uid,
|
||||
kind: InternalMessageKind::Wire(wire_message),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn src_uid(&self) -> &Uuid {
|
||||
&self.src_uid
|
||||
}
|
||||
|
||||
pub fn kind(&self) -> &InternalMessageKind {
|
||||
&self.kind
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Transmit half of the message channel.
|
||||
type Tx = mpsc::UnboundedSender<WireMessage>;
|
||||
// TODO: Use a bounded tx/rx (find a sensible upper bound):
|
||||
type WireTx = mpsc::UnboundedSender<WireMessage>;
|
||||
|
||||
/// Receive half of the message channel.
|
||||
type Rx = mpsc::UnboundedReceiver<WireMessage>;
|
||||
// TODO: Use a bounded tx/rx (find a sensible upper bound):
|
||||
type WireRx = mpsc::UnboundedReceiver<WireMessage>;
|
||||
|
||||
type PeerTxs = Arc<RwLock<HashMap<SocketAddr, Tx>>>;
|
||||
/// Transmit half of the message channel.
|
||||
// TODO: Use a bounded tx/rx (find a sensible upper bound):
|
||||
type InternalTx = mpsc::UnboundedSender<InternalMessage>;
|
||||
|
||||
/// Receive half of the message channel.
|
||||
// TODO: Use a bounded tx/rx (find a sensible upper bound):
|
||||
type InternalRx = mpsc::UnboundedReceiver<InternalMessage>;
|
||||
|
||||
// type PeerTxs = Arc<RwLock<HashMap<SocketAddr, Tx>>>;
|
||||
|
||||
/// A serialized message with a sender and the timestamp of arrival.
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
|
@ -177,24 +251,29 @@ impl Sink for WireMessages {
|
|||
|
||||
/// The state for each connected client.
|
||||
struct Peer {
|
||||
// /// Name of the peer.
|
||||
// name: BytesMut,
|
||||
// Peer uid.
|
||||
uid: Uuid,
|
||||
|
||||
// The incoming stream of messages:
|
||||
wire_messages: WireMessages,
|
||||
|
||||
/// Handle to the shared message state.
|
||||
txs: PeerTxs,
|
||||
// txs: PeerTxs,
|
||||
hb: Arc<Hydrabadger>,
|
||||
|
||||
/// Receive half of the message channel.
|
||||
rx: Rx,
|
||||
/// Client socket address.
|
||||
rx: WireRx,
|
||||
|
||||
peer_internal_tx: InternalTx,
|
||||
|
||||
/// Peer socket address.
|
||||
addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
/// Create a new instance of `Peer`.
|
||||
fn new(txs: PeerTxs, wire_messages: WireMessages) -> Peer {
|
||||
fn new(hb: Arc<Hydrabadger>, wire_messages: WireMessages,
|
||||
peer_internal_tx: InternalTx) -> Peer {
|
||||
// Get the client socket address
|
||||
let addr = wire_messages.socket().peer_addr().unwrap();
|
||||
|
||||
|
@ -202,13 +281,14 @@ impl Peer {
|
|||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
// Add an entry for this `Peer` in the shared state map.
|
||||
let guard = txs.write().unwrap().insert(addr, tx);
|
||||
let guard = hb.peer_txs.write().unwrap().insert(addr, tx);
|
||||
|
||||
Peer {
|
||||
// name,
|
||||
uid: Uuid::new_v4(),
|
||||
wire_messages,
|
||||
txs,
|
||||
hb,
|
||||
rx,
|
||||
peer_internal_tx,
|
||||
addr,
|
||||
}
|
||||
}
|
||||
|
@ -216,7 +296,7 @@ impl Peer {
|
|||
/// Sends a message to all connected peers.
|
||||
fn send_to_all(&mut self, msg: &WireMessage) {
|
||||
// Now, send the message to all other peers
|
||||
for (addr, tx) in self.txs.read().unwrap().iter() {
|
||||
for (addr, tx) in self.hb.peer_txs.read().unwrap().iter() {
|
||||
// Don't send the message to ourselves
|
||||
if *addr != self.addr {
|
||||
// The send only fails if the rx half has been dropped,
|
||||
|
@ -275,8 +355,8 @@ impl Future for Peer {
|
|||
info!("Received message: {:?}", message);
|
||||
|
||||
if let Some(msg) = message {
|
||||
match msg {
|
||||
WireMessage::Hello => info!("HELLO RECEIVED from '{}'", self.addr),
|
||||
match msg.kind() {
|
||||
WireMessageKind::Hello => info!("HELLO RECEIVED from '{}'", self.uid),
|
||||
_ => (),
|
||||
}
|
||||
} else {
|
||||
|
@ -297,168 +377,166 @@ impl Future for Peer {
|
|||
|
||||
impl Drop for Peer {
|
||||
fn drop(&mut self) {
|
||||
self.txs.write().unwrap().remove(&self.addr);
|
||||
self.hb.peer_txs.write().unwrap().remove(&self.addr);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Return a future to manage the socket.
|
||||
pub fn handle_incoming(socket: TcpStream, peer_txs: PeerTxs) -> impl Future<Item = (), Error = ()> {
|
||||
let peer_addr = socket.peer_addr().unwrap();
|
||||
info!("Incoming connection from '{}'", peer_addr);
|
||||
|
||||
let wire_messages = WireMessages::new(socket);
|
||||
struct HoneyBadgerTask {
|
||||
|
||||
wire_messages.into_future()
|
||||
.map_err(|(e, _)| e)
|
||||
.and_then(move |(message, wire_messages)| {
|
||||
let message = match message {
|
||||
Some(message) => message,
|
||||
None => {
|
||||
// The remote client closed the connection without sending
|
||||
// any data.
|
||||
info!("Closing connection to '{}'", peer_addr);
|
||||
return Either::A(future::ok(()));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Connected to '{}'", peer_addr);
|
||||
|
||||
// Create the peer.
|
||||
//
|
||||
// This is also a future that processes the connection, only
|
||||
// completing when the socket closes.
|
||||
let peer = Peer::new(peer_txs, wire_messages);
|
||||
|
||||
// Wrap `peer` with `Either::B` to make the return type fit.
|
||||
Either::B(peer)
|
||||
})
|
||||
.map_err(|e| {
|
||||
error!("Connection error = {:?}", e);
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
// // Used to create a secret key from a UUID.
|
||||
// fn sum_into_u64(bytes: &[u8]) -> u64 {
|
||||
// let mut id_u64s = vec![0; 2];
|
||||
// LittleEndian::read_u64_into(bytes, &mut id_u64s);
|
||||
// id_u64s.iter().sum()
|
||||
// }
|
||||
|
||||
|
||||
pub struct Hydrabadger/*<T, N>*/ {
|
||||
pub struct Hydrabadger {
|
||||
/// Node uid:
|
||||
uid: Uuid,
|
||||
/// Incoming connection socket.
|
||||
addr: SocketAddr,
|
||||
value: Option<Vec<u8>>,
|
||||
peer_txs: PeerTxs,
|
||||
// value: Option<Vec<u8>>,
|
||||
|
||||
// TODO: Use a bounded tx/rx (find a sensible upper bound):
|
||||
peer_txs: RwLock<HashMap<SocketAddr, WireTx>>,
|
||||
|
||||
/// Honey badger.
|
||||
dhb: DynamicHoneyBadger<Vec<u8>, Uuid>,
|
||||
dhb: RwLock<QueueingHoneyBadger<Transaction, Uuid>>,
|
||||
|
||||
/// Incoming messages from other nodes that this node has not yet handled, with timestamps.
|
||||
in_queue: VecDeque<TimestampedMessage>,
|
||||
/// Outgoing messages to other nodes, with timestamps.
|
||||
out_queue: VecDeque<TimestampedMessage>,
|
||||
// TODO: Use a bounded tx/rx (find a sensible upper bound):
|
||||
peer_internal_tx: InternalTx,
|
||||
peer_internal_rx: InternalRx,
|
||||
|
||||
|
||||
peer_out_queue: RwLock<VecDeque<TargetedMessage<Message<usize>, usize>>>,
|
||||
batch_out_queue: RwLock<VecDeque<Batch<Transaction, usize>>>,
|
||||
}
|
||||
|
||||
impl Hydrabadger {
|
||||
/// Returns a new Hydrabadger node.
|
||||
pub fn new(addr: SocketAddr, value: Option<Vec<u8>>) -> Self {
|
||||
pub fn new(addr: SocketAddr, _value: Option<Vec<u8>>) -> Self {
|
||||
// let node_count_good = node_count_total - node_count_faulty;
|
||||
// let txns = (0..txn_count).map(|_| Transaction::new(txn_bytes));
|
||||
let sk_set = SecretKeySet::random(0, &mut rand::thread_rng());
|
||||
let pk_set = sk_set.public_keys();
|
||||
let id = Uuid::new_v4();
|
||||
let mut all_ids = BTreeSet::new();
|
||||
all_ids.insert(id);
|
||||
let sk_share = 0;
|
||||
let uid = Uuid::new_v4();
|
||||
// let mut all_ids = BTreeSet::new();
|
||||
// all_ids.insert(id);
|
||||
// let sk_share = 0;
|
||||
|
||||
let node_ids: BTreeSet<_> = iter::once(uid).collect();
|
||||
|
||||
let netinfo = NetworkInfo::new(
|
||||
id,
|
||||
all_ids,
|
||||
sk_set.secret_key_share(sk_share),
|
||||
uid,
|
||||
node_ids.clone(),
|
||||
sk_set.secret_key_share(0 as u64),
|
||||
pk_set.clone(),
|
||||
);
|
||||
|
||||
let dhb = DynamicHoneyBadger::builder(netinfo)
|
||||
let dhb = RwLock::new(QueueingHoneyBadger::builder(netinfo)
|
||||
.batch_size(50)
|
||||
.max_future_epochs(0)
|
||||
.build().expect("Error creating `DynamicHoneyBadger`");
|
||||
.build());
|
||||
/*.build().expect("Error creating `QueueingHoneyBadger`");*/
|
||||
|
||||
let (peer_internal_tx, peer_internal_rx) = mpsc::unbounded();
|
||||
|
||||
Hydrabadger {
|
||||
uid,
|
||||
addr,
|
||||
value,
|
||||
peer_txs: Arc::new(RwLock::new(HashMap::new())),
|
||||
// value,
|
||||
peer_txs: RwLock::new(HashMap::new()),
|
||||
dhb,
|
||||
in_queue: VecDeque::new(),
|
||||
out_queue: VecDeque::new(),
|
||||
|
||||
// peer_in_queue: RwLock::new(VecDeque::new()),
|
||||
peer_internal_tx,
|
||||
peer_internal_rx,
|
||||
|
||||
peer_out_queue: RwLock::new(VecDeque::new()),
|
||||
batch_out_queue: RwLock::new(VecDeque::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts the server.
|
||||
pub fn run(&self, remotes: HashSet<SocketAddr>) {
|
||||
let socket = TcpListener::bind(&self.addr).unwrap();
|
||||
info!("Listening on: {}", self.addr);
|
||||
|
||||
let peer_txs = self.peer_txs.clone();
|
||||
|
||||
let listen = socket.incoming()
|
||||
.map_err(|e| error!("failed to accept socket; error = {:?}", e))
|
||||
.for_each(move |socket| {
|
||||
let peer_addr = socket.peer_addr().unwrap();
|
||||
info!("Incoming connection from '{}'", peer_addr);
|
||||
|
||||
let wire_messages = WireMessages::new(socket);
|
||||
tokio::spawn(Peer::new(peer_txs.clone(), wire_messages)
|
||||
.map_err(|e| {
|
||||
error!("Connection error = {:?}", e);
|
||||
})
|
||||
);
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let peer_txs = self.peer_txs.clone();
|
||||
let connect = future::lazy(move || {
|
||||
for remote_addr in remotes.iter() {
|
||||
let peer_txs = peer_txs.clone();
|
||||
tokio::spawn(TcpStream::connect(remote_addr)
|
||||
.map_err(Error::from)
|
||||
.and_then(move |socket| {
|
||||
// Wrap the socket with the frame delimiter and codec:
|
||||
let mut wire_messages = WireMessages::new(socket);
|
||||
|
||||
match wire_messages.send_msg(WireMessage::Hello) {
|
||||
Ok(_) => Either::A(Peer::new(peer_txs, wire_messages)),
|
||||
Err(err) => Either::B(future::err(err)),
|
||||
}
|
||||
})
|
||||
.map_err(|err| error!("Socket connection error: {:?}", err)));
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let peer_txs = self.peer_txs.clone();
|
||||
let list = Interval::new(Instant::now(), Duration::from_millis(3000))
|
||||
.for_each(move |_| {
|
||||
let peer_txs = peer_txs.read().unwrap();
|
||||
info!("Peer list:");
|
||||
|
||||
for (peer_addr, mut pb) in peer_txs.iter() {
|
||||
info!(" peer_addr: {}", peer_addr);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.map_err(|err| {
|
||||
error!("List connection inverval error: {:?}", err);
|
||||
});
|
||||
|
||||
tokio::run(listen.join3(connect, list).map(|(_, _, _)| ()));
|
||||
}
|
||||
|
||||
pub fn connect(&self) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Binds to a host address and returns a future which starts the node.
|
||||
pub fn node(hb: Hydrabadger, remotes: HashSet<SocketAddr>)
|
||||
-> impl Future<Item = (), Error = ()> {
|
||||
let socket = TcpListener::bind(&hb.addr).unwrap();
|
||||
info!("Listening on: {}", hb.addr);
|
||||
|
||||
// let peer_txs = hb.peer_txs.clone();
|
||||
let hydrabadger = Arc::new(hb);
|
||||
|
||||
let hb = hydrabadger.clone();
|
||||
let listen = socket.incoming()
|
||||
.map_err(|e| error!("failed to accept socket; error = {:?}", e))
|
||||
.for_each(move |socket| {
|
||||
let peer_addr = socket.peer_addr().unwrap();
|
||||
info!("Incoming connection from '{}'", peer_addr);
|
||||
|
||||
let wire_messages = WireMessages::new(socket);
|
||||
tokio::spawn(Peer::new(hb.clone(), wire_messages, hb.peer_internal_tx.clone())
|
||||
.map_err(|e| {
|
||||
error!("Connection error = {:?}", e);
|
||||
})
|
||||
);
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
// let peer_txs = hb.peer_txs.clone();
|
||||
let uid = hydrabadger.uid.clone();
|
||||
let hb = hydrabadger.clone();
|
||||
let connect = future::lazy(move || {
|
||||
for remote_addr in remotes.iter() {
|
||||
let hb = hb.clone();
|
||||
tokio::spawn(TcpStream::connect(remote_addr)
|
||||
.map_err(Error::from)
|
||||
.and_then(move |socket| {
|
||||
// Wrap the socket with the frame delimiter and codec:
|
||||
let mut wire_messages = WireMessages::new(socket);
|
||||
|
||||
match wire_messages.send_msg(WireMessage::hello()) {
|
||||
Ok(_) => {
|
||||
let peer_internal_tx = hb.peer_internal_tx.clone();
|
||||
Either::A(Peer::new(hb, wire_messages, peer_internal_tx))
|
||||
},
|
||||
Err(err) => Either::B(future::err(err)),
|
||||
}
|
||||
})
|
||||
.map_err(|err| error!("Socket connection error: {:?}", err)));
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let hb = hydrabadger.clone();
|
||||
let list = Interval::new(Instant::now(), Duration::from_millis(3000))
|
||||
.for_each(move |_| {
|
||||
let hb = hb.clone();
|
||||
let peer_txs = hb.peer_txs.read().unwrap();
|
||||
// info!("Peer list:");
|
||||
|
||||
for (peer_addr, mut pb) in peer_txs.iter() {
|
||||
info!(" peer_addr: {}", peer_addr);
|
||||
}
|
||||
|
||||
// TODO: Send txns instead.
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.map_err(|err| {
|
||||
error!("List connection inverval error: {:?}", err);
|
||||
});
|
||||
|
||||
listen.join3(connect, list).map(|(_, _, _)| ())
|
||||
}
|
||||
|
||||
|
||||
/// Starts a node.
|
||||
pub fn run_node(hb: Hydrabadger, remotes: HashSet<SocketAddr>) {
|
||||
tokio::run(node(hb, remotes));
|
||||
}
|
|
@ -28,5 +28,5 @@ extern crate hbbft;
|
|||
pub mod hydrabadger;
|
||||
pub mod blockchain;
|
||||
|
||||
pub use hydrabadger::{Hydrabadger};
|
||||
pub use blockchain::{Blockchain, MiningError};
|
||||
pub use hydrabadger::{run_node, Hydrabadger};
|
||||
pub use blockchain::{Blockchain, MiningError};
|
Loading…
Reference in New Issue