Build up network infrastructure.

This commit is contained in:
c0gent 2018-07-02 19:36:12 -07:00
parent 3c6b25882b
commit d0f515db70
13 changed files with 1110 additions and 100 deletions

View File

@ -2,14 +2,39 @@
name = "hydrabadger"
version = "0.1.0"
authors = ["c0gent <nsan1129@gmail.com>"]
autobins = false
[[bin]]
name = "simulation"
path = "src/bin/simulation.rs"
[[bin]]
name = "network"
path = "src/bin/network.rs"
[dependencies]
log = "*"
pretty_env_logger = "*"
env_logger = "*"
clap = "*"
failure = "*"
crossbeam = "*"
crossbeam-channel = "*"
chrono = "*"
rust-crypto = "*"
num-traits = "*"
num-bigint = "*"
bincode = "*"
colored = "*"
itertools = "*"
pairing = "*"
rand = "*"
serde = "*"
serde_derive = "*"
signifix = "*"
futures = "0.1"
tokio = "0.1.7"
tokio-codec = "*"
bytes = "*"
[dependencies.hbbft]
version = "*"

View File

@ -1,5 +1,6 @@
## Hydrabadger*
## Hydrabadger
An experimental peer-to-peer client using the [Honey Badger Byzantine Fault
Tolerant consensus algorithm](https://github.com/poanetwork/hbbft).
\**Name pending committee approval*

View File

@ -4,23 +4,23 @@ export RUST_LOG=hbbft=debug,consensus_node=debug
cargo build
target/debug/hydrabadger --bind-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 --broadcast-value Foo &
target/debug/network --bind-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 --broadcast-value Foo &
sleep 1
target/debug/hydrabadger --bind-address=127.0.0.1:5001 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
target/debug/network --bind-address=127.0.0.1:5001 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
sleep 1
target/debug/hydrabadger --bind-address=127.0.0.1:5002 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
target/debug/network --bind-address=127.0.0.1:5002 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
sleep 1
target/debug/hydrabadger --bind-address=127.0.0.1:5003 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
target/debug/network --bind-address=127.0.0.1:5003 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
sleep 1
target/debug/hydrabadger --bind-address=127.0.0.1:5004 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
target/debug/network --bind-address=127.0.0.1:5004 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
sleep 1
target/debug/hydrabadger --bind-address=127.0.0.1:5005 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
target/debug/network --bind-address=127.0.0.1:5005 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
sleep 1
target/debug/hydrabadger --bind-address=127.0.0.1:5006 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
target/debug/network --bind-address=127.0.0.1:5006 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
sleep 1
target/debug/hydrabadger --bind-address=127.0.0.1:5007 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
target/debug/network --bind-address=127.0.0.1:5007 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5008 --remote-address=127.0.0.1:5009 &
sleep 1
target/debug/hydrabadger --bind-address=127.0.0.1:5008 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5009 &
target/debug/network --bind-address=127.0.0.1:5008 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5009 &
sleep 1
target/debug/hydrabadger --bind-address=127.0.0.1:5009 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 &
target/debug/network --bind-address=127.0.0.1:5009 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --remote-address=127.0.0.1:5005 --remote-address=127.0.0.1:5006 --remote-address=127.0.0.1:5007 --remote-address=127.0.0.1:5008 &
wait

View File

@ -1,65 +0,0 @@
extern crate clap;
extern crate pretty_env_logger;
extern crate crossbeam;
extern crate hydrabadger;
use std::net::{SocketAddr, ToSocketAddrs};
use std::collections::HashSet;
use clap::{App, Arg, ArgMatches};
use hydrabadger::Node;
/// Returns parsed command line arguments.
fn arg_matches<'a>() -> ArgMatches<'a> {
App::new("hydrabadger")
.version("0.1")
.author("Nick Sanders <cogciprocate@gmail.com>")
.about("Evaluation and testing for hbbft")
.arg(Arg::with_name("bind-address")
.short("b")
.long("bind-address")
.value_name("<HOST:PORT>")
.takes_value(true)
.required(true))
.arg(Arg::with_name("broadcast-value")
.long("broadcast-value")
.value_name("BROADCAST_VALUE")
.takes_value(true))
.arg(Arg::with_name("remote-address")
.short("r")
.long("remote-address")
.value_name("<HOST:PORT>")
.takes_value(true)
.multiple(true)
.number_of_values(1))
.arg(Arg::with_name("help")
.short("h")
.long("--help"))
.arg(Arg::with_name("version")
.short("v")
.long("version"))
.get_matches()
}
fn main() {
pretty_env_logger::init();
let matches = arg_matches();
let bind_address: SocketAddr = matches.value_of("bind-address").unwrap()
.to_socket_addrs().expect("Invalid bind address").next().unwrap();
let remote_addresses: HashSet<SocketAddr> = match matches.values_of("remote-address") {
Some(addrs) => addrs.flat_map(|addr| addr.to_socket_addrs().expect("Invalid bind address"))
.collect(),
None => HashSet::new(),
};
let broadcast_value = matches.value_of("broadcast-value")
.map(|bv| bv.as_bytes().to_vec());
let node = Node::new(bind_address, remote_addresses, broadcast_value);
node.run().expect("Node failed");
}

99
src/bin/network.rs Normal file
View File

@ -0,0 +1,99 @@
#![allow(unused_imports, dead_code, unused_variables)]
extern crate clap;
extern crate env_logger;
extern crate crossbeam;
extern crate hydrabadger;
extern crate chrono;
use std::net::{SocketAddr, ToSocketAddrs};
use std::collections::HashSet;
use std::env;
use std::io::Write;
use chrono::Local;
use clap::{App, Arg, ArgMatches};
use hydrabadger::{Hydrabadger, Blockchain, MiningError};
/// Returns parsed command line arguments.
fn arg_matches<'a>() -> ArgMatches<'a> {
App::new("hydrabadger")
.version("0.1")
.author("Nick Sanders <cogciprocate@gmail.com>")
.about("Evaluation and testing for hbbft")
.arg(Arg::with_name("bind-address")
.short("b")
.long("bind-address")
.value_name("<HOST:PORT>")
.help("Specifies the local address to listen on.")
.takes_value(true)
.required(true))
.arg(Arg::with_name("remote-address")
.short("r")
.long("remote-address")
.help("Specifies a list of remote node addresses to connect to.")
.value_name("<HOST:PORT>")
.takes_value(true)
.multiple(true))
.arg(Arg::with_name("broadcast-value")
.long("broadcast-value")
.value_name("BROADCAST_VALUE")
.help("Specifies the value to broadcast to other nodes.")
.takes_value(true))
.get_matches()
}
/// Begins mining.
fn mine() -> Result<(), MiningError> {
let mut chain = Blockchain::new()?;
println!("Send 1 Hydradollar to Bob");
chain.add_block("1HD->Bob")?;
chain.add_block("0.5HD->Bob")?;
chain.add_block("1.5HD->Bob")?;
println!("Traversing blockchain:\n");
chain.traverse();
Ok(())
}
fn main() {
env_logger::Builder::new()
.format(|buf, record| {
write!(buf,
"{} [{}] - {}\n",
Local::now().format("%Y-%m-%dT%H:%M:%S"),
record.level(),
record.args()
)
})
.parse(&env::var("HYDRABADGER_LOG").unwrap_or_default())
.init();
let matches = arg_matches();
let bind_address: SocketAddr = matches.value_of("bind-address")
.expect("No bind address provided")
.to_socket_addrs()
.expect("Invalid bind address")
.next().unwrap();
let remote_addresses: HashSet<SocketAddr> = match matches.values_of("remote-address") {
Some(addrs) => addrs.flat_map(|addr| addr.to_socket_addrs()
.expect("Invalid remote bind address"))
.collect(),
None => HashSet::new(),
};
let broadcast_value = matches.value_of("broadcast-value")
.map(|bv| bv.as_bytes().to_vec());
let hb = Hydrabadger::new(bind_address, remote_addresses, broadcast_value);
hb.run();
// match mine() {
// Ok(_) => {},
// Err(err) => println!("Error: {}", err),
// }
}

404
src/bin/simulation.rs Normal file
View File

@ -0,0 +1,404 @@
extern crate bincode;
extern crate colored;
extern crate env_logger;
extern crate hbbft;
extern crate itertools;
extern crate pairing;
extern crate rand;
extern crate serde;
#[macro_use(Deserialize, Serialize)]
extern crate serde_derive;
extern crate signifix;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::rc::Rc;
use std::time::{Duration, Instant};
use std::{cmp, u64};
use colored::*;
use itertools::Itertools;
use rand::Rng;
use serde::de::DeserializeOwned;
use serde::Serialize;
use signifix::{metric, TryFrom};
use hbbft::crypto::SecretKeySet;
use hbbft::honey_badger::{Batch, HoneyBadger, HoneyBadgerBuilder};
use hbbft::messaging::{DistAlgorithm, NetworkInfo, Target};
/// A node identifier. In the simulation, nodes are simply numbered.
#[derive(Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
pub struct NodeUid(pub usize);
/// A transaction.
#[derive(Serialize, Deserialize, Eq, PartialEq, Hash, Ord, PartialOrd, Debug, Clone)]
pub struct Transaction(pub Vec<u8>);
impl Transaction {
fn new(len: usize) -> Transaction {
Transaction(rand::thread_rng().gen_iter().take(len).collect())
}
}
/// A serialized message with a sender and the timestamp of arrival.
#[derive(Eq, PartialEq, Debug)]
struct TimestampedMessage<D: DistAlgorithm> {
time: Duration,
sender_id: D::NodeUid,
target: Target<D::NodeUid>,
message: Vec<u8>,
}
impl<D: DistAlgorithm> Clone for TimestampedMessage<D>
where D::Message: Clone, {
fn clone(&self) -> Self {
TimestampedMessage {
time: self.time,
sender_id: self.sender_id.clone(),
target: self.target.clone(),
message: self.message.clone(),
}
}
}
/// Performance parameters of a node's hardware and Internet connection. For simplicity, only the
/// sender's lag and bandwidth are taken into account. (I.e. infinite downstream, limited
/// upstream.)
#[derive(Clone, Copy)]
pub struct HwQuality {
/// The network latency. This is added once for every message.
latency: Duration,
/// The inverse bandwidth, in time per byte.
inv_bw: Duration,
/// The CPU time multiplier: how much slower, in percent, is this node than your computer?
cpu_factor: u32,
}
/// A "node" running an instance of the algorithm `D`.
pub struct TestNode<D: DistAlgorithm> {
/// This node's own ID.
id: D::NodeUid,
/// The instance of the broadcast algorithm.
algo: D,
/// The duration for which this node's CPU has already been simulated.
time: Duration,
/// The time when this node last sent data over the network.
sent_time: Duration,
/// Incoming messages from other nodes that this node has not yet handled, with timestamps.
in_queue: VecDeque<TimestampedMessage<D>>,
/// Outgoing messages to other nodes, with timestamps.
out_queue: VecDeque<TimestampedMessage<D>>,
/// The values this node has output so far, with timestamps.
outputs: Vec<(Duration, D::Output)>,
/// The number of messages this node has handled so far.
message_count: usize,
/// The total size of messages this node has handled so far, in bytes.
message_size: u64,
/// The hardware and network quality of this node.
hw_quality: HwQuality,
}
impl<D: DistAlgorithm> TestNode<D>
where D::Message: Serialize + DeserializeOwned, {
/// Creates a new test node with the given broadcast instance.
fn new(algo: D, hw_quality: HwQuality) -> TestNode<D> {
let mut node = TestNode {
id: algo.our_id().clone(),
algo,
time: Duration::default(),
sent_time: Duration::default(),
in_queue: VecDeque::new(),
out_queue: VecDeque::new(),
outputs: Vec::new(),
message_count: 0,
message_size: 0,
hw_quality,
};
node.send_output_and_msgs();
node
}
/// Handles the first message in the node's queue.
fn handle_message(&mut self) {
let ts_msg = self.in_queue.pop_front().expect("message not found");
self.time = cmp::max(self.time, ts_msg.time);
self.message_count += 1;
self.message_size += ts_msg.message.len() as u64;
let start = Instant::now();
let msg = bincode::deserialize::<D::Message>(&ts_msg.message).expect("deserialize");
self.algo
.handle_message(&ts_msg.sender_id, msg)
.expect("handling message");
self.time += start.elapsed() * self.hw_quality.cpu_factor / 100;
self.send_output_and_msgs()
}
/// Handles the algorithm's output and messages.
fn send_output_and_msgs(&mut self) {
let start = Instant::now();
let out_msgs: Vec<_> = self
.algo
.message_iter()
.map(|msg| {
(
msg.target,
bincode::serialize(&msg.message).expect("serialize"),
)
})
.collect();
self.time += start.elapsed() * self.hw_quality.cpu_factor / 100;
let time = self.time;
self.outputs
.extend(self.algo.output_iter().map(|out| (time, out)));
self.sent_time = cmp::max(self.time, self.sent_time);
for (target, message) in out_msgs {
self.sent_time += self.hw_quality.inv_bw * message.len() as u32;
self.out_queue.push_back(TimestampedMessage {
time: self.sent_time + self.hw_quality.latency,
sender_id: self.id.clone(),
target,
message,
});
}
}
/// Returns the time when the next message can be handled.
fn next_event_time(&self) -> Option<Duration> {
match self.in_queue.front() {
None => None,
Some(ts_msg) => Some(cmp::max(ts_msg.time, self.time)),
}
}
/// Returns the number of messages this node has handled so far.
fn message_count(&self) -> usize {
self.message_count
}
/// Returns the size of messages this node has handled so far.
fn message_size(&self) -> u64 {
self.message_size
}
/// Adds a message into the incoming queue.
fn add_message(&mut self, msg: TimestampedMessage<D>) {
match self.in_queue.iter().position(|other| other.time > msg.time) {
None => self.in_queue.push_back(msg),
Some(i) => self.in_queue.insert(i, msg),
}
}
}
/// A collection of `TestNode`s representing a network.
pub struct TestNetwork<D: DistAlgorithm> {
nodes: BTreeMap<D::NodeUid, TestNode<D>>,
}
impl<D: DistAlgorithm<NodeUid = NodeUid>> TestNetwork<D>
where D::Message: Serialize + DeserializeOwned + Clone, {
/// Creates a new network with `good_num` good nodes, and `dead_num` dead nodes.
pub fn new<F>(good_num: usize, dead_num: usize, new_algo: F, hw_quality: HwQuality)
-> TestNetwork<D>
where F: Fn(NodeUid, BTreeSet<NodeUid>) -> D {
let node_ids: BTreeSet<NodeUid> = (0..(good_num + dead_num)).map(NodeUid).collect();
let new_node_by_id = |id: NodeUid| {
(
id,
TestNode::new(new_algo(id, node_ids.clone()), hw_quality),
)
};
let mut network = TestNetwork {
nodes: (0..good_num).map(NodeUid).map(new_node_by_id).collect(),
};
let initial_msgs: Vec<_> = network
.nodes
.values_mut()
.flat_map(|node| node.out_queue.drain(..))
.collect();
network.dispatch_messages(initial_msgs);
network
}
/// Pushes the messages into the queues of the corresponding recipients.
fn dispatch_messages<Q>(&mut self, msgs: Q)
where Q: IntoIterator<Item = TimestampedMessage<D>> {
for ts_msg in msgs {
match ts_msg.target {
Target::All => {
for node in self.nodes.values_mut() {
if node.id != ts_msg.sender_id {
node.add_message(ts_msg.clone())
}
}
}
Target::Node(to_id) => {
if let Some(node) = self.nodes.get_mut(&to_id) {
node.add_message(ts_msg);
}
}
}
}
}
/// Handles a queued message in one of the nodes with the earliest timestamp.
pub fn step(&mut self) -> NodeUid {
let min_time = self
.nodes
.values()
.filter_map(TestNode::next_event_time)
.min()
.expect("no more messages in queue");
let min_ids: Vec<NodeUid> = self
.nodes
.iter()
.filter(|(_, node)| node.next_event_time() == Some(min_time))
.map(|(id, _)| *id)
.collect();
let next_id = *rand::thread_rng().choose(&min_ids).unwrap();
let msgs: Vec<_> = {
let node = self.nodes.get_mut(&next_id).unwrap();
node.handle_message();
node.out_queue.drain(..).collect()
};
self.dispatch_messages(msgs);
next_id
}
/// Returns the number of messages that have been handled so far.
pub fn message_count(&self) -> usize {
self.nodes.values().map(TestNode::message_count).sum()
}
/// Returns the total size of messages that have been handled so far.
pub fn message_size(&self) -> u64 {
self.nodes.values().map(TestNode::message_size).sum()
}
}
/// The timestamped batches for a particular epoch that have already been output.
#[derive(Clone, Default)]
struct EpochInfo {
nodes: BTreeMap<NodeUid, (Duration, Batch<Transaction, NodeUid>)>,
}
impl EpochInfo {
/// Adds a batch to this epoch. Prints information if the epoch is complete.
fn add(&mut self, id: NodeUid, time: Duration, batch: &Batch<Transaction, NodeUid>,
network: &TestNetwork<HoneyBadger<Transaction, NodeUid>>) {
if self.nodes.contains_key(&id) {
return;
}
self.nodes.insert(id, (time, batch.clone()));
if self.nodes.len() < network.nodes.len() {
return;
}
let (min_t, max_t) = self
.nodes
.values()
.map(|&(time, _)| time)
.minmax()
.into_option()
.unwrap();
let txs = batch.len();
println!(
"{:>5} {:6} {:6} {:5} {:9} {:>9}B",
batch.epoch.to_string().cyan(),
min_t.as_secs() * 1000 + max_t.subsec_nanos() as u64 / 1_000_000,
max_t.as_secs() * 1000 + max_t.subsec_nanos() as u64 / 1_000_000,
txs,
network.message_count() / network.nodes.len(),
metric::Signifix::try_from(network.message_size() / network.nodes.len() as u64)
.unwrap(),
);
}
}
/// Proposes `num_txs` values and expects nodes to output and order them.
fn simulate_honey_badger(mut network: TestNetwork<HoneyBadger<Transaction, NodeUid>>,
num_txs: usize) {
// Returns `true` if the node has not output all transactions yet.
// If it has, and has advanced another epoch, it clears all messages for later epochs.
let node_busy = |node: &mut TestNode<HoneyBadger<Transaction, NodeUid>>| {
node.outputs
.iter()
.map(|&(_, ref batch)| batch.len())
.sum::<usize>() < num_txs
};
// Handle messages until all nodes have output all transactions.
println!("{}", "Epoch Min/Max Time Txs Msgs/Node Size/Node".bold());
let mut epochs = Vec::new();
while network.nodes.values_mut().any(node_busy) {
let id = network.step();
for &(time, ref batch) in &network.nodes[&id].outputs {
if epochs.len() <= batch.epoch as usize {
epochs.resize(batch.epoch as usize + 1, EpochInfo::default());
}
epochs[batch.epoch as usize].add(id, time, batch, &network);
}
}
}
// -n <n>, --nodes <n> The total number of nodes [default: 10]
// -f <f>, --faulty <f> The number of faulty nodes [default: 0]
// -t <txs>, --txs <txs> The number of transactions to process [default: 1000]
// -b <b>, --batch <b> The batch size, i.e. txs per epoch [default: 100]
// -l <lag>, --lag <lag> The network lag between sending and receiving [default: 100]
// --bw <bw> The bandwidth, in kbit/s [default: 2000]
// --cpu <cpu> The CPU speed, in percent of this machine's [default: 100]
// --tx-size <size> The size of a transaction, in bytes [default: 10]
fn main() {
let node_count_total = 10;
let node_count_faulty = 1;
let txn_count = 1000;
let batch_size = 100;
let latency = 100;
let bandwidth = 2000;
let cpu = 100;
let txn_size = 10;
println!("Simulating Honey Badger with:");
println!("{} nodes, {} faulty", node_count_total, node_count_faulty);
println!(
"{} transactions, {} bytes each, ≤{} per epoch",
txn_count, txn_size, batch_size
);
println!(
"Network latency: {} ms, bandwidth: {} kbit/s, {:5.2}% CPU speed",
latency, bandwidth, cpu
);
println!();
let node_count_good = node_count_total - node_count_faulty;
let txns = (0..txn_count).map(|_| Transaction::new(txn_size));
let sk_set = SecretKeySet::random(node_count_faulty, &mut rand::thread_rng());
let pk_set = sk_set.public_keys();
let new_honey_badger = |id: NodeUid, all_ids: BTreeSet<NodeUid>| {
let netinfo = Rc::new(NetworkInfo::new(
id,
all_ids,
sk_set.secret_key_share(id.0 as u64),
pk_set.clone(),
));
HoneyBadgerBuilder::new(netinfo)
.batch_size(batch_size)
.build_with_transactions(txns.clone())
.expect("Instantiate honey_badger")
};
let hw_quality = HwQuality {
latency: Duration::from_millis(latency),
inv_bw: Duration::new(0, 8_000_000 / bandwidth),
cpu_factor: (10_000f32 / cpu as f32) as u32,
};
let network = TestNetwork::new(node_count_good, node_count_faulty, new_honey_badger, hw_quality);
simulate_honey_badger(network, txn_count);
}

200
src/blockchain.rs Normal file
View File

@ -0,0 +1,200 @@
//! An incredibly simple blockchain implementation.
//!
#![allow(unused_imports, dead_code, unused_variables)]
use chrono::prelude::*;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use num_bigint::BigUint;
use num_traits::One;
const HASH_BYTE_SIZE: usize = 32;
const DIFFICULTY: usize = 4;
const MAX_NONCE: u64 = 1_000_000;
pub type Sha256Hash = [u8; HASH_BYTE_SIZE];
/// Transforms a u64 into a little endian array of u8.
pub fn convert_u64_to_u8_array(val: u64) -> [u8; 8] {
return [
val as u8,
(val >> 8) as u8,
(val >> 16) as u8,
(val >> 24) as u8,
(val >> 32) as u8,
(val >> 40) as u8,
(val >> 48) as u8,
(val >> 56) as u8,
]
}
/// A mining error
#[derive(Debug, Fail)]
pub enum MiningError {
#[fail(display = "Could not mine block, hit iteration limit")]
Iteration,
#[fail(display = "Block has no parent")]
NoParent,
}
/// Calculates the hash for the provided block and nonce.
pub fn calculate_hash(block: &Block, nonce: u64) -> Sha256Hash {
let mut headers = block.headers();
headers.extend_from_slice(&convert_u64_to_u8_array(nonce));
let mut hasher = Sha256::new();
hasher.input(&headers);
let mut hash = Sha256Hash::default();
hasher.result(&mut hash);
hash
}
/// Attemts to find a satisfactory nonce.
fn try_hash(block: &Block) -> Option<(u64, Sha256Hash)> {
// The target is a number we compare the hash to. It is a 256bit
// binary with `DIFFICULTY` leading zeroes.
let target = BigUint::one() << (256 - 4 * DIFFICULTY);
for nonce in 0..MAX_NONCE {
let hash = calculate_hash(block, nonce);
let hash_int = BigUint::from_bytes_be(&hash);
if hash_int < target {
return Some((nonce, hash));
}
}
None
}
/// A block header.
#[derive(Debug)]
pub struct Header {
timestamp: i64,
prev_block_hash: Sha256Hash,
nonce: u64,
}
/// A block.
#[derive(Debug)]
pub struct Block {
header: Header,
// Body: Instead of transactions, blocks contain bytes:
data: Vec<u8>,
// Hash of the block:
hash: Option<Sha256Hash>,
}
impl Block {
// Creates a genesis block, which is a block with no parent.
//
// The `prev_block_hash` field is set to all zeroes.
pub fn genesis() -> Result<Self, MiningError> {
Self::new("Genesis block", Sha256Hash::default())
}
/// Creates a new block.
pub fn new(data: &str, prev_hash: Sha256Hash) -> Result<Self, MiningError> {
let mut b = Self {
header: Header {
timestamp: Utc::now().timestamp(),
prev_block_hash: prev_hash,
nonce: 0,
},
data: data.to_owned().into(),
hash: None,
};
try_hash(&b)
.ok_or(MiningError::Iteration)
.and_then(|(nonce, hash)| {
b.header.nonce = nonce;
b.hash = Some(hash);
Ok(b)
})
}
/// Returns the block headers.
pub fn headers(&self) -> Vec<u8> {
let mut vec = Vec::new();
vec.extend(&convert_u64_to_u8_array(self.header.timestamp as u64));
vec.extend_from_slice(&self.header.prev_block_hash);
vec
}
/// Returns this block's nonce.
pub fn nonce(&self) -> u64 {
self.header.nonce
}
/// Returns this block's hash.
pub fn hash(&self) -> Option<Sha256Hash> {
self.hash.clone()
}
/// Returns this block's hash.
pub fn prev_block_hash(&self) -> Sha256Hash {
self.header.prev_block_hash
}
pub fn data(&self) -> &[u8] {
&self.data
}
}
/// A sequence of blocks.
pub struct Blockchain {
blocks: Vec<Block>,
}
impl Blockchain {
// Initializes a new blockchain with a genesis block.
pub fn new() -> Result<Self, MiningError> {
let blocks = Block::genesis()?;
Ok(Self { blocks: vec![blocks] })
}
// Adds a newly-mined block to the chain.
pub fn add_block(&mut self, data: &str) -> Result<(), MiningError> {
let block: Block;
{
match self.blocks.last() {
Some(prev) => {
block = Block::new(data, prev.hash().unwrap())?;
}
// Adding a block to an empty blockchain is an error, a genesis block needs to be
// created first.
None => {
return Err(MiningError::NoParent)
}
}
}
self.blocks.push(block);
Ok(())
}
// A method that iterates over the blockchain's blocks and prints out information for each.
pub fn traverse(&self) {
for (i, block) in self.blocks.iter().enumerate() {
println!("block: {}", i);
println!("hash: {:?}", block.hash());
println!("parent: {:?}", block.prev_block_hash());
println!("data: {:?}", block.data());
println!()
}
}
}

341
src/hydrabadger.rs Normal file
View File

@ -0,0 +1,341 @@
//! A hydrabadger consensus node.
//!
//! Code heavily borrowed from: https://github.com/poanetwork/hbbft/blob/master/examples/network/node.rs
//!
#![allow(unused_imports, dead_code, unused_variables)]
use crossbeam;
use std::{
time::{Duration, Instant},
sync::{Arc, RwLock},
{self, iter, process, thread, time},
collections::{BTreeSet, HashSet, HashMap},
fmt::Debug,
marker::{Send, Sync},
net::{SocketAddr},
rc::Rc,
};
use futures::future;
use tokio::{
self, io,
reactor::{Reactor, Handle},
net::{TcpListener, TcpStream},
timer::Interval,
prelude::*,
};
use hbbft::{
broadcast::{Broadcast, BroadcastMessage},
crypto::{
SecretKeySet,
poly::Poly,
},
messaging::{DistAlgorithm, NetworkInfo, SourcedMessage},
proto::message::BroadcastProto,
honey_badger::HoneyBadger,
};
use network::{comms_task, connection, messaging::Messaging};
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "{}", _0)]
IoError(std::io::Error),
#[fail(display = "{}", _0)]
CommsError(comms_task::Error),
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Error {
Error::IoError(err)
}
}
impl From<comms_task::Error> for Error {
fn from(err: comms_task::Error) -> Error {
Error::CommsError(err)
}
}
pub struct Hydrabadger/*<T, N>*/ {
/// Incoming connection socket.
addr: SocketAddr,
/// Sockets of remote nodes.
remotes: HashSet<SocketAddr>,
// /// Honey badger.
// hb: HoneyBadger<T, N>,
value: Option<Vec<u8>>,
connections_in: Arc<RwLock<HashMap<SocketAddr, TcpStream>>>,
connections_out: Arc<RwLock<HashMap<SocketAddr, TcpStream>>>,
}
impl Hydrabadger {
/// Returns a new Hydrabadger node.
pub fn new(addr: SocketAddr, remotes: HashSet<SocketAddr>, value: Option<Vec<u8>>) -> Self {
let connections_in = Arc::new(RwLock::new(HashMap::new()));
let connections_out = Arc::new(RwLock::new(HashMap::new()));
Hydrabadger {
addr,
remotes,
value,
connections_in,
connections_out,
}
}
pub fn run(&self) {
let socket = TcpListener::bind(&self.addr).unwrap();
info!("Listening on: {}", self.addr);
let connections_in = self.connections_in.clone();
let listen = socket.incoming()
.map_err(|e| error!("failed to accept socket; error = {:?}", e))
.for_each(move |mut socket| {
let connections_in = connections_in.clone();
// let (reader, writer) = socket.split();
// let amt = io::copy(reader, writer);
// // After our copy operation is complete we just print out some helpful
// // information.
// let msg = amt.then(move |result| {
// match result {
// Ok((amt, _, _)) => println!("wrote {} bytes", amt),
// Err(e) => println!("error: {}", e),
// }
// Ok(())
// });
tokio::spawn(future::lazy(move || {
let peer_addr = socket.peer_addr().unwrap();
let local_addr = socket.local_addr().unwrap();
info!("Connection made with: [local_addr: {}, peer_addr: {}]",
local_addr, peer_addr);
let msg = b"Yo";
socket.write_all(msg)
.unwrap_or_else(|err| error!("Socket write error: {:?}", err));
info!(" '{:?}' written", msg);
connections_in.write().unwrap().insert(peer_addr, socket);
Ok(())
}))
});
let remotes = self.remotes.clone();
let connections_out = self.connections_out.clone();
let connect = future::lazy(move || {
for remote_addr in remotes.iter() {
let connections_out = connections_out.clone();
tokio::spawn(TcpStream::connect(remote_addr)
.and_then(move |socket| {
connections_out.write().unwrap().insert(socket.peer_addr().unwrap(), socket);
Ok(())
})
.map_err(|err| error!("Socket connection error: {:?}", err)));
}
Ok(())
});
let connections_in = self.connections_in.clone();
let list = Interval::new(Instant::now(), Duration::from_millis(3000))
.for_each(move |_| {
let mut remove_list = HashSet::new();
{
let connections_in = connections_in.read().unwrap();
info!("Incoming connection list:");
for (_, mut socket) in connections_in.iter() {
let peer_addr = socket.peer_addr().unwrap();
info!(" peer_addr: {}", peer_addr);
let mut buf = Vec::new();
match socket.read_to_end(&mut buf) {
Ok(bytes_read) => {
info!(" bytes_read: {}", bytes_read);
info!(" message: {:?}", buf);
},
Err(err) => {
error!(" Read error: {:?}", err);
remove_list.insert(peer_addr);
},
}
}
}
if remove_list.len() > 0 {
let mut cns = connections_in.write().unwrap();
for addr in remove_list {
cns.remove(&addr);
}
}
Ok(())
})
.map_err(|err| {
error!("List connection inverval error: {:?}", err);
});
tokio::run(listen.join3(connect, list).map(|(_, _, _)| ()));
}
pub fn connect(&self) {
}
pub fn run_old(&self) {
let value = &self.value;
let (our_str, connections) = connection::make(&self.addr, &self.remotes);
let mut node_strs: Vec<String> = iter::once(our_str.clone())
.chain(connections.iter().map(|c| c.node_str.clone()))
.collect();
node_strs.sort();
let our_id = node_strs.binary_search(&our_str).unwrap();
let all_ids: BTreeSet<_> = (0..node_strs.len()).collect();
// FIXME: This example doesn't call algorithms that use cryptography. However the keys are
// required by the interface to all algorithms in Honey Badger. Therefore we set placeholder
// keys here. A fully-featured application would need to take appropriately initialized keys
// from elsewhere.
let secret_key_set = SecretKeySet::from(Poly::zero());
let secret_key = secret_key_set.secret_key_share(our_id as u64);
let public_key_set = secret_key_set.public_keys();
let netinfo = NetworkInfo::new(our_id, all_ids.clone(), secret_key, public_key_set);
if value.is_some() != (our_id == 0) {
panic!("The first node must propose a value.");
}
// Initialise the message delivery system and obtain TX and RX handles.
let messaging: Messaging<BroadcastMessage> = Messaging::new(all_ids.len());
let rxs_to_comms = messaging.rxs_to_comms();
let tx_from_comms = messaging.tx_from_comms();
let rx_to_algo = messaging.rx_to_algo();
let tx_from_algo = messaging.tx_from_algo();
let stop_tx = messaging.stop_tx();
// All spawned threads will have exited by the end of the scope.
crossbeam::scope(|scope| {
// Start the centralised message delivery system.
let _msg_handle = messaging.spawn(scope);
// Associate a broadcast instance with this node. This instance will
// broadcast the proposed value. There is no remote node
// corresponding to this instance, and no dedicated comms task. The
// node index is 0.
let broadcast_handle = scope.spawn(move || {
let mut broadcast =
Broadcast::new(Rc::new(netinfo), 0).expect("failed to instantiate broadcast");
if let Some(v) = value {
broadcast.input(v.clone().into()).expect("propose value");
for msg in broadcast.message_iter() {
tx_from_algo.send(msg);
}
}
loop {
// Receive a message from the socket IO task.
let message = rx_to_algo.recv().expect("receive from algo");
let SourcedMessage { source: i, message } = message;
debug!("{} received from {}: {:?}", our_id, i, message);
broadcast
.handle_message(&i, message)
.expect("handle broadcast message");
for msg in broadcast.message_iter() {
debug!("{} sending to {:?}: {:?}", our_id, msg.target, msg.message);
tx_from_algo.send(msg);
}
if let Some(output) = broadcast.next_output() {
println!(
"Broadcast succeeded! Node {} output: {}",
our_id,
String::from_utf8(output).unwrap()
);
break;
}
}
});
// Start a comms task for each connection. Node indices of those
// tasks are 1 through N where N is the number of connections.
for (i, c) in connections.iter().enumerate() {
// Receive side of a single-consumer channel from algorithm
// actor tasks to the comms task.
let node_index = if c.node_str < our_str { i } else { i + 1 };
let rx_to_comms = &rxs_to_comms[node_index];
scope.spawn(move || {
match comms_task::CommsTask::<BroadcastProto, BroadcastMessage>::new(
tx_from_comms,
rx_to_comms,
// FIXME: handle error
c.stream.try_clone().unwrap(),
node_index,
).run()
{
Ok(_) => debug!("Comms task {} succeeded", node_index),
Err(e) => error!("Comms task {}: {:?}", node_index, e),
}
});
}
// Wait for the broadcast instances to finish before stopping the
// messaging task.
broadcast_handle.join();
// Wait another second so that pending messages get sent out.
thread::sleep(time::Duration::from_secs(1));
// Stop the messaging task.
stop_tx.send(());
process::exit(0);
}) // end of thread scope
}
}
mod codec {
use std::io;
use bytes::{BufMut, BytesMut};
use tokio_codec::{Encoder, Decoder};
/// A simple `Codec` implementation that just ships bytes around.
///
/// This type is used for "framing" a TCP/UDP stream of bytes but it's really
/// just a convenient method for us to work with streams/sinks for now.
/// This'll just take any data read and interpret it as a "frame" and
/// conversely just shove data into the output location without looking at
/// it.
pub struct Bytes;
impl Decoder for Bytes {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
if buf.len() > 0 {
let len = buf.len();
Ok(Some(buf.split_to(len)))
} else {
Ok(None)
}
}
}
impl Encoder for Bytes {
type Item = Vec<u8>;
type Error = io::Error;
fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
buf.put(&data[..]);
Ok(())
}
}
}

View File

@ -1,11 +1,23 @@
extern crate clap;
extern crate pretty_env_logger;
extern crate env_logger;
#[macro_use] extern crate log;
#[macro_use] extern crate failure;
extern crate crossbeam;
#[macro_use] extern crate crossbeam_channel;
extern crate crypto;
extern crate chrono;
extern crate hbbft;
extern crate num_traits;
extern crate num_bigint;
extern crate futures;
extern crate tokio;
extern crate tokio_codec;
extern crate bytes;
pub mod network;
pub use network::{Node};
pub mod hydrabadger;
pub mod blockchain;
pub use hydrabadger::{Hydrabadger};
pub use blockchain::{Blockchain, MiningError};

View File

@ -37,11 +37,11 @@ pub struct CommsTask<'a, P: 'a, M: 'a> {
impl<'a, P: Message + 'a, M: Into<P> + From<P> + Send + 'a> CommsTask<'a, P, M> {
pub fn new(
tx: &'a Sender<SourcedMessage<M, usize>>,
rx: &'a Receiver<M>,
stream: TcpStream,
node_index: usize,
) -> Self {
tx: &'a Sender<SourcedMessage<M, usize>>,
rx: &'a Receiver<M>,
stream: TcpStream,
node_index: usize,
) -> Self {
debug!(
"Creating comms task #{} for {:?}",
node_index,

View File

@ -24,20 +24,19 @@ impl Connection {
/// Connect this node to remote peers. A vector of successful connections is returned, as well as
/// our own node ID.
pub fn make(
bind_address: &SocketAddr,
remote_addresses: &HashSet<SocketAddr>,
) -> (String, Vec<Connection>) {
pub fn make(bind_address: &SocketAddr, remote_addresses: &HashSet<SocketAddr>)
-> (String, Vec<Connection>) {
// Listen for incoming connections on a given TCP port.
let bind_address = bind_address;
let listener = TcpListener::bind(bind_address).expect("start listener");
let here_str = format!("{}", bind_address);
// Use a `BTreeMap` to make sure we all iterate in the same order.
let remote_by_str: BTreeMap<String, _> = remote_addresses
.iter()
.map(|addr| (format!("{}", addr), addr))
.filter(|(there_str, _)| *there_str != here_str)
.collect();
// Wait for all nodes with larger addresses to connect.
let connections = remote_by_str
.into_iter()
@ -50,5 +49,6 @@ pub fn make(
Connection::new(tcp_conn, there_str.to_string())
})
.collect();
(here_str, connections)
}

View File

@ -9,15 +9,8 @@ use hbbft::messaging::{SourcedMessage, Target, TargetedMessage};
pub enum Error {
#[fail(display = "Invalid messaging target: '{}'", _0)]
NoSuchTarget(usize),
// SendError,
}
// impl<T> From<crossbeam_channel::SendError<T>> for Error {
// fn from(_: crossbeam_channel::SendError<T>) -> Error {
// Error::SendError
// }
// }
/// The queue functionality for messages sent between algorithm instances.
/// The messaging struct allows for targeted message exchange between comms

View File

@ -1,9 +1,9 @@
pub mod comms_task;
pub mod connection;
pub mod messaging;
pub mod node;
// pub mod node;
// TODO: De-glob:
pub use self::comms_task::CommsTask;
pub use self::messaging::Messaging;
pub use self::node::Node;
// pub use self::node::Node;