2018-07-24 03:18:09 -07:00
|
|
|
|
use std::collections::BTreeMap;
|
2018-07-11 12:15:08 -07:00
|
|
|
|
use std::sync::Arc;
|
2018-05-29 05:17:30 -07:00
|
|
|
|
|
2018-05-30 06:33:33 -07:00
|
|
|
|
use byteorder::{BigEndian, ByteOrder};
|
2018-10-11 09:17:40 -07:00
|
|
|
|
use hex_fmt::{HexFmt, HexList};
|
2018-10-29 07:36:56 -07:00
|
|
|
|
use log::{debug, error, info};
|
2018-04-06 09:39:15 -07:00
|
|
|
|
use reed_solomon_erasure as rse;
|
2018-03-27 13:59:38 -07:00
|
|
|
|
use reed_solomon_erasure::ReedSolomon;
|
2018-04-05 05:09:46 -07:00
|
|
|
|
|
2018-08-09 02:51:31 -07:00
|
|
|
|
use super::merkle::{Digest, MerkleTree, Proof};
|
2018-10-11 09:26:30 -07:00
|
|
|
|
use super::message::HexProof;
|
|
|
|
|
use super::{Error, Message, Result};
|
2018-07-24 03:18:09 -07:00
|
|
|
|
use fault_log::{Fault, FaultKind};
|
2018-10-10 07:11:27 -07:00
|
|
|
|
use {DistAlgorithm, NetworkInfo, NodeIdT, Target};
|
2018-05-02 22:47:07 -07:00
|
|
|
|
|
2018-08-27 01:37:22 -07:00
|
|
|
|
/// Broadcast algorithm instance.
|
2018-07-24 04:12:06 -07:00
|
|
|
|
#[derive(Debug)]
|
2018-08-02 14:27:55 -07:00
|
|
|
|
pub struct Broadcast<N> {
|
2018-05-29 05:17:30 -07:00
|
|
|
|
/// Shared network data.
|
2018-08-02 14:27:55 -07:00
|
|
|
|
netinfo: Arc<NetworkInfo<N>>,
|
2018-08-29 09:08:35 -07:00
|
|
|
|
/// The ID of the sending node.
|
2018-08-02 14:27:55 -07:00
|
|
|
|
proposer_id: N,
|
2018-10-31 07:51:21 -07:00
|
|
|
|
/// The Reed-Solomon erasure coding configuration.
|
2018-05-14 07:16:57 -07:00
|
|
|
|
coding: Coding,
|
2018-10-24 03:26:43 -07:00
|
|
|
|
/// If we are the proposer: whether we have already sent the `Value` messages with the shards.
|
|
|
|
|
value_sent: bool,
|
2018-05-14 00:35:34 -07:00
|
|
|
|
/// Whether we have already multicast `Echo`.
|
|
|
|
|
echo_sent: bool,
|
|
|
|
|
/// Whether we have already multicast `Ready`.
|
|
|
|
|
ready_sent: bool,
|
|
|
|
|
/// Whether we have already output a value.
|
2018-05-15 07:05:55 -07:00
|
|
|
|
decided: bool,
|
2018-05-14 00:35:34 -07:00
|
|
|
|
/// The proofs we have received via `Echo` messages, by sender ID.
|
2018-08-02 14:27:55 -07:00
|
|
|
|
echos: BTreeMap<N, Proof<Vec<u8>>>,
|
2018-05-14 00:35:34 -07:00
|
|
|
|
/// The root hashes we received via `Ready` messages, by sender ID.
|
2018-08-02 14:27:55 -07:00
|
|
|
|
readys: BTreeMap<N, Vec<u8>>,
|
2018-04-24 03:29:13 -07:00
|
|
|
|
}
|
|
|
|
|
|
2018-10-10 07:11:27 -07:00
|
|
|
|
pub type Step<N> = ::Step<Broadcast<N>>;
|
2018-07-09 04:35:26 -07:00
|
|
|
|
|
2018-08-29 09:08:35 -07:00
|
|
|
|
impl<N: NodeIdT> DistAlgorithm for Broadcast<N> {
|
|
|
|
|
type NodeId = N;
|
2018-05-14 05:55:53 -07:00
|
|
|
|
type Input = Vec<u8>;
|
2018-05-14 05:35:06 -07:00
|
|
|
|
type Output = Self::Input;
|
2018-08-08 06:46:43 -07:00
|
|
|
|
type Message = Message;
|
2018-05-14 05:35:06 -07:00
|
|
|
|
type Error = Error;
|
|
|
|
|
|
2018-08-29 08:28:02 -07:00
|
|
|
|
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N>> {
|
2018-10-23 06:38:42 -07:00
|
|
|
|
self.broadcast(input)
|
2018-05-14 05:35:06 -07:00
|
|
|
|
}
|
|
|
|
|
|
2018-08-02 14:27:55 -07:00
|
|
|
|
fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> Result<Step<N>> {
|
2018-10-23 06:38:42 -07:00
|
|
|
|
self.handle_message(sender_id, message)
|
2018-05-14 05:35:06 -07:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn terminated(&self) -> bool {
|
2018-05-15 07:05:55 -07:00
|
|
|
|
self.decided
|
2018-05-14 05:35:06 -07:00
|
|
|
|
}
|
|
|
|
|
|
2018-08-02 14:27:55 -07:00
|
|
|
|
fn our_id(&self) -> &N {
|
2018-08-29 09:08:35 -07:00
|
|
|
|
self.netinfo.our_id()
|
2018-05-14 05:35:06 -07:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2018-08-29 09:08:35 -07:00
|
|
|
|
impl<N: NodeIdT> Broadcast<N> {
|
2018-05-03 01:07:37 -07:00
|
|
|
|
/// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal
|
|
|
|
|
/// from node `proposer_id`.
|
2018-08-02 14:27:55 -07:00
|
|
|
|
pub fn new(netinfo: Arc<NetworkInfo<N>>, proposer_id: N) -> Result<Self> {
|
2018-05-29 05:17:30 -07:00
|
|
|
|
let parity_shard_num = 2 * netinfo.num_faulty();
|
|
|
|
|
let data_shard_num = netinfo.num_nodes() - parity_shard_num;
|
2018-05-14 07:16:57 -07:00
|
|
|
|
let coding = Coding::new(data_shard_num, parity_shard_num)?;
|
2018-05-01 09:32:01 -07:00
|
|
|
|
|
|
|
|
|
Ok(Broadcast {
|
2018-05-29 05:17:30 -07:00
|
|
|
|
netinfo,
|
2018-05-03 01:07:37 -07:00
|
|
|
|
proposer_id,
|
2018-05-01 09:32:01 -07:00
|
|
|
|
coding,
|
2018-10-24 03:26:43 -07:00
|
|
|
|
value_sent: false,
|
2018-05-14 00:35:34 -07:00
|
|
|
|
echo_sent: false,
|
|
|
|
|
ready_sent: false,
|
2018-05-15 07:05:55 -07:00
|
|
|
|
decided: false,
|
2018-05-14 00:35:34 -07:00
|
|
|
|
echos: BTreeMap::new(),
|
|
|
|
|
readys: BTreeMap::new(),
|
2018-05-01 07:28:31 -07:00
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2018-10-23 06:38:42 -07:00
|
|
|
|
/// Initiates the broadcast. This must only be called in the proposer node.
|
|
|
|
|
pub fn broadcast(&mut self, input: Vec<u8>) -> Result<Step<N>> {
|
2018-10-25 05:44:28 -07:00
|
|
|
|
if *self.our_id() != self.proposer_id {
|
2018-10-23 06:38:42 -07:00
|
|
|
|
return Err(Error::InstanceCannotPropose);
|
|
|
|
|
}
|
2018-10-24 03:26:43 -07:00
|
|
|
|
if self.value_sent {
|
|
|
|
|
return Err(Error::MultipleInputs);
|
|
|
|
|
}
|
|
|
|
|
self.value_sent = true;
|
2018-10-23 06:38:42 -07:00
|
|
|
|
// Split the value into chunks/shards, encode them with erasure codes.
|
|
|
|
|
// Assemble a Merkle tree from data and parity shards. Take all proofs
|
|
|
|
|
// from this tree and send them, each to its own node.
|
2018-10-25 05:44:28 -07:00
|
|
|
|
let (proof, step) = self.send_shards(input)?;
|
|
|
|
|
let our_id = &self.our_id().clone();
|
|
|
|
|
Ok(step.join(self.handle_value(our_id, proof)?))
|
2018-10-23 06:38:42 -07:00
|
|
|
|
}
|
|
|
|
|
|
2018-10-24 03:26:43 -07:00
|
|
|
|
/// Handles a message received from `sender_id`.
|
|
|
|
|
///
|
|
|
|
|
/// This must be called with every message we receive from another node.
|
2018-10-23 06:38:42 -07:00
|
|
|
|
pub fn handle_message(&mut self, sender_id: &N, message: Message) -> Result<Step<N>> {
|
|
|
|
|
if !self.netinfo.is_node_validator(sender_id) {
|
|
|
|
|
return Err(Error::UnknownSender);
|
|
|
|
|
}
|
|
|
|
|
match message {
|
|
|
|
|
Message::Value(p) => self.handle_value(sender_id, p),
|
|
|
|
|
Message::Echo(p) => self.handle_echo(sender_id, p),
|
|
|
|
|
Message::Ready(ref hash) => self.handle_ready(sender_id, hash),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2018-04-27 05:19:39 -07:00
|
|
|
|
/// Breaks the input value into shards of equal length and encodes them --
|
|
|
|
|
/// and some extra parity shards -- with a Reed-Solomon erasure coding
|
|
|
|
|
/// scheme. The returned value contains the shard assigned to this
|
|
|
|
|
/// node. That shard doesn't need to be sent anywhere. It gets recorded in
|
|
|
|
|
/// the broadcast instance.
|
2018-08-02 14:27:55 -07:00
|
|
|
|
fn send_shards(&mut self, mut value: Vec<u8>) -> Result<(Proof<Vec<u8>>, Step<N>)> {
|
2018-04-27 05:19:39 -07:00
|
|
|
|
let data_shard_num = self.coding.data_shard_count();
|
|
|
|
|
let parity_shard_num = self.coding.parity_shard_count();
|
|
|
|
|
|
2018-04-30 08:55:51 -07:00
|
|
|
|
debug!(
|
|
|
|
|
"Data shards: {}, parity shards: {}",
|
2018-10-31 07:51:21 -07:00
|
|
|
|
data_shard_num, parity_shard_num
|
2018-04-30 08:55:51 -07:00
|
|
|
|
);
|
2018-04-27 05:19:39 -07:00
|
|
|
|
// Insert the length of `v` so it can be decoded without the padding.
|
2018-05-30 06:33:33 -07:00
|
|
|
|
let payload_len = value.len() as u32;
|
|
|
|
|
value.splice(0..0, 0..4); // Insert four bytes at the beginning.
|
|
|
|
|
BigEndian::write_u32(&mut value[..4], payload_len); // Write the size.
|
2018-04-27 05:19:39 -07:00
|
|
|
|
let value_len = value.len();
|
|
|
|
|
// Size of a Merkle tree leaf value, in bytes.
|
|
|
|
|
let shard_len = if value_len % data_shard_num > 0 {
|
|
|
|
|
value_len / data_shard_num + 1
|
2018-04-30 08:55:51 -07:00
|
|
|
|
} else {
|
2018-04-27 05:19:39 -07:00
|
|
|
|
value_len / data_shard_num
|
|
|
|
|
};
|
|
|
|
|
// Pad the last data shard with zeros. Fill the parity shards with
|
|
|
|
|
// zeros.
|
|
|
|
|
value.resize(shard_len * (data_shard_num + parity_shard_num), 0);
|
|
|
|
|
|
|
|
|
|
debug!("value_len {}, shard_len {}", value_len, shard_len);
|
|
|
|
|
|
|
|
|
|
// Divide the vector into chunks/shards.
|
|
|
|
|
let shards_iter = value.chunks_mut(shard_len);
|
|
|
|
|
// Convert the iterator over slices into a vector of slices.
|
2018-05-04 00:58:21 -07:00
|
|
|
|
let mut shards: Vec<&mut [u8]> = shards_iter.collect();
|
2018-04-27 05:19:39 -07:00
|
|
|
|
|
2018-10-17 03:04:13 -07:00
|
|
|
|
debug!("Shards before encoding: {:0.10}", HexList(&shards));
|
2018-04-27 05:19:39 -07:00
|
|
|
|
|
|
|
|
|
// Construct the parity chunks/shards
|
2018-05-30 06:33:33 -07:00
|
|
|
|
self.coding
|
|
|
|
|
.encode(&mut shards)
|
|
|
|
|
.expect("the size and number of shards is correct");
|
2018-04-27 05:19:39 -07:00
|
|
|
|
|
2018-10-17 03:04:13 -07:00
|
|
|
|
debug!("Shards: {:0.10}", HexList(&shards));
|
2018-04-27 05:19:39 -07:00
|
|
|
|
|
2018-08-08 07:41:11 -07:00
|
|
|
|
// Create a Merkle tree from the shards.
|
|
|
|
|
let mtree = MerkleTree::from_vec(shards.into_iter().map(|shard| shard.to_vec()).collect());
|
2018-04-27 05:19:39 -07:00
|
|
|
|
|
2018-08-08 07:41:11 -07:00
|
|
|
|
// Default result in case of `proof` error.
|
2018-07-25 14:38:33 -07:00
|
|
|
|
let mut result = Err(Error::ProofConstructionFailed);
|
2018-08-08 07:41:11 -07:00
|
|
|
|
assert_eq!(self.netinfo.num_nodes(), mtree.values().len());
|
2018-04-27 05:19:39 -07:00
|
|
|
|
|
2018-07-24 03:18:09 -07:00
|
|
|
|
let mut step = Step::default();
|
2018-04-27 05:19:39 -07:00
|
|
|
|
// Send each proof to a node.
|
2018-08-29 09:08:35 -07:00
|
|
|
|
for (index, id) in self.netinfo.all_ids().enumerate() {
|
2018-08-08 07:41:11 -07:00
|
|
|
|
let proof = mtree.proof(index).ok_or(Error::ProofConstructionFailed)?;
|
2018-10-25 05:44:28 -07:00
|
|
|
|
if *id == *self.our_id() {
|
2018-05-04 00:58:21 -07:00
|
|
|
|
// The proof is addressed to this node.
|
|
|
|
|
result = Ok(proof);
|
|
|
|
|
} else {
|
|
|
|
|
// Rest of the proofs are sent to remote nodes.
|
2018-08-29 09:08:35 -07:00
|
|
|
|
let msg = Target::Node(id.clone()).message(Message::Value(proof));
|
2018-10-25 05:44:28 -07:00
|
|
|
|
step.messages.push(msg);
|
2018-04-27 05:19:39 -07:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2018-07-24 03:18:09 -07:00
|
|
|
|
result.map(|proof| (proof, step))
|
2018-05-02 22:47:07 -07:00
|
|
|
|
}
|
2018-04-25 12:41:46 -07:00
|
|
|
|
|
2018-05-02 22:47:07 -07:00
|
|
|
|
/// Handles a received echo and verifies the proof it contains.
|
2018-08-02 14:27:55 -07:00
|
|
|
|
fn handle_value(&mut self, sender_id: &N, p: Proof<Vec<u8>>) -> Result<Step<N>> {
|
2018-07-08 09:41:50 -07:00
|
|
|
|
// If the sender is not the proposer or if this is not the first `Value`, ignore.
|
2018-05-03 01:07:37 -07:00
|
|
|
|
if *sender_id != self.proposer_id {
|
2018-05-08 07:20:32 -07:00
|
|
|
|
info!(
|
|
|
|
|
"Node {:?} received Value from {:?} instead of {:?}.",
|
2018-10-25 05:44:28 -07:00
|
|
|
|
self.our_id(),
|
2018-05-29 05:17:30 -07:00
|
|
|
|
sender_id,
|
|
|
|
|
self.proposer_id
|
2018-05-08 07:20:32 -07:00
|
|
|
|
);
|
2018-07-08 09:41:50 -07:00
|
|
|
|
let fault_kind = FaultKind::ReceivedValueFromNonProposer;
|
2018-07-24 03:18:09 -07:00
|
|
|
|
return Ok(Fault::new(sender_id.clone(), fault_kind).into());
|
2018-05-03 01:07:37 -07:00
|
|
|
|
}
|
2018-05-14 00:35:34 -07:00
|
|
|
|
if self.echo_sent {
|
2018-10-25 05:44:28 -07:00
|
|
|
|
info!("Node {:?} received multiple Values.", self.our_id());
|
2018-10-24 05:48:21 -07:00
|
|
|
|
if self.echos.get(self.our_id()) == Some(&p) {
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
} else {
|
|
|
|
|
return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleValues).into());
|
|
|
|
|
}
|
2018-05-02 22:47:07 -07:00
|
|
|
|
}
|
2018-07-08 09:41:50 -07:00
|
|
|
|
|
|
|
|
|
// If the proof is invalid, log the faulty node behavior and ignore.
|
2018-10-25 05:44:28 -07:00
|
|
|
|
if !self.validate_proof(&p, &self.our_id()) {
|
2018-07-24 03:18:09 -07:00
|
|
|
|
return Ok(Fault::new(sender_id.clone(), FaultKind::InvalidProof).into());
|
2018-05-02 22:47:07 -07:00
|
|
|
|
}
|
2018-04-25 13:00:22 -07:00
|
|
|
|
|
2018-05-08 07:20:32 -07:00
|
|
|
|
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
|
2018-06-26 05:50:06 -07:00
|
|
|
|
self.send_echo(p)
|
2018-05-02 22:47:07 -07:00
|
|
|
|
}
|
|
|
|
|
|
2018-05-08 07:20:32 -07:00
|
|
|
|
/// Handles a received `Echo` message.
|
2018-08-02 14:27:55 -07:00
|
|
|
|
fn handle_echo(&mut self, sender_id: &N, p: Proof<Vec<u8>>) -> Result<Step<N>> {
|
2018-07-08 09:41:50 -07:00
|
|
|
|
// If the sender has already sent `Echo`, ignore.
|
2018-05-14 00:35:34 -07:00
|
|
|
|
if self.echos.contains_key(sender_id) {
|
2018-05-08 07:20:32 -07:00
|
|
|
|
info!(
|
|
|
|
|
"Node {:?} received multiple Echos from {:?}.",
|
2018-10-25 05:44:28 -07:00
|
|
|
|
self.our_id(),
|
2018-05-29 05:17:30 -07:00
|
|
|
|
sender_id,
|
2018-05-03 01:07:37 -07:00
|
|
|
|
);
|
2018-07-24 03:18:09 -07:00
|
|
|
|
return Ok(Step::default());
|
2018-05-02 22:47:07 -07:00
|
|
|
|
}
|
2018-07-08 09:41:50 -07:00
|
|
|
|
|
|
|
|
|
// If the proof is invalid, log the faulty-node behavior, and ignore.
|
2018-05-08 07:20:32 -07:00
|
|
|
|
if !self.validate_proof(&p, sender_id) {
|
2018-07-24 03:18:09 -07:00
|
|
|
|
return Ok(Fault::new(sender_id.clone(), FaultKind::InvalidProof).into());
|
2018-05-02 22:47:07 -07:00
|
|
|
|
}
|
|
|
|
|
|
2018-08-09 02:51:31 -07:00
|
|
|
|
let hash = *p.root_hash();
|
2018-05-02 22:47:07 -07:00
|
|
|
|
|
2018-05-08 07:20:32 -07:00
|
|
|
|
// Save the proof for reconstructing the tree later.
|
2018-05-14 00:35:34 -07:00
|
|
|
|
self.echos.insert(sender_id.clone(), p);
|
2018-05-08 07:20:32 -07:00
|
|
|
|
|
2018-07-28 08:31:17 -07:00
|
|
|
|
if self.ready_sent || self.count_echos(&hash) < self.netinfo.num_correct() {
|
2018-07-24 03:18:09 -07:00
|
|
|
|
return self.compute_output(&hash);
|
2018-05-02 22:47:07 -07:00
|
|
|
|
}
|
|
|
|
|
|
2018-07-08 09:41:50 -07:00
|
|
|
|
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
|
2018-07-24 03:18:09 -07:00
|
|
|
|
self.send_ready(&hash)
|
2018-05-02 22:47:07 -07:00
|
|
|
|
}
|
|
|
|
|
|
2018-05-08 07:20:32 -07:00
|
|
|
|
/// Handles a received `Ready` message.
|
2018-08-09 02:51:31 -07:00
|
|
|
|
fn handle_ready(&mut self, sender_id: &N, hash: &Digest) -> Result<Step<N>> {
|
2018-05-08 07:20:32 -07:00
|
|
|
|
// If the sender has already sent a `Ready` before, ignore.
|
2018-05-14 00:35:34 -07:00
|
|
|
|
if self.readys.contains_key(sender_id) {
|
2018-05-08 07:20:32 -07:00
|
|
|
|
info!(
|
|
|
|
|
"Node {:?} received multiple Readys from {:?}.",
|
2018-10-25 05:44:28 -07:00
|
|
|
|
self.our_id(),
|
2018-05-29 05:17:30 -07:00
|
|
|
|
sender_id
|
2018-05-08 07:20:32 -07:00
|
|
|
|
);
|
2018-07-24 03:18:09 -07:00
|
|
|
|
return Ok(Step::default());
|
2018-05-08 07:20:32 -07:00
|
|
|
|
}
|
2018-05-02 22:47:07 -07:00
|
|
|
|
|
2018-05-14 00:35:34 -07:00
|
|
|
|
self.readys.insert(sender_id.clone(), hash.to_vec());
|
2018-05-02 22:47:07 -07:00
|
|
|
|
|
2018-07-24 03:18:09 -07:00
|
|
|
|
let mut step = Step::default();
|
2018-05-02 22:47:07 -07:00
|
|
|
|
// Upon receiving f + 1 matching Ready(h) messages, if Ready
|
|
|
|
|
// has not yet been sent, multicast Ready(h).
|
2018-05-29 05:17:30 -07:00
|
|
|
|
if self.count_readys(hash) == self.netinfo.num_faulty() + 1 && !self.ready_sent {
|
2018-05-08 07:20:32 -07:00
|
|
|
|
// Enqueue a broadcast of a Ready message.
|
2018-07-24 03:18:09 -07:00
|
|
|
|
step.extend(self.send_ready(hash)?);
|
2018-05-14 05:35:06 -07:00
|
|
|
|
}
|
2018-10-25 05:44:28 -07:00
|
|
|
|
Ok(step.join(self.compute_output(hash)?))
|
2018-06-26 05:50:06 -07:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Sends an `Echo` message and handles it. Does nothing if we are only an observer.
|
2018-08-02 14:27:55 -07:00
|
|
|
|
fn send_echo(&mut self, p: Proof<Vec<u8>>) -> Result<Step<N>> {
|
2018-06-26 05:50:06 -07:00
|
|
|
|
self.echo_sent = true;
|
2018-06-29 08:20:54 -07:00
|
|
|
|
if !self.netinfo.is_validator() {
|
2018-07-24 03:18:09 -07:00
|
|
|
|
return Ok(Step::default());
|
2018-06-26 05:50:06 -07:00
|
|
|
|
}
|
2018-08-08 06:46:43 -07:00
|
|
|
|
let echo_msg = Message::Echo(p.clone());
|
2018-10-25 05:44:28 -07:00
|
|
|
|
let step: Step<_> = Target::All.message(echo_msg).into();
|
|
|
|
|
let our_id = &self.our_id().clone();
|
|
|
|
|
Ok(step.join(self.handle_echo(our_id, p)?))
|
2018-06-26 05:50:06 -07:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Sends a `Ready` message and handles it. Does nothing if we are only an observer.
|
2018-08-09 02:51:31 -07:00
|
|
|
|
fn send_ready(&mut self, hash: &Digest) -> Result<Step<N>> {
|
2018-06-26 05:50:06 -07:00
|
|
|
|
self.ready_sent = true;
|
2018-06-29 08:20:54 -07:00
|
|
|
|
if !self.netinfo.is_validator() {
|
2018-07-24 03:18:09 -07:00
|
|
|
|
return Ok(Step::default());
|
2018-06-26 05:50:06 -07:00
|
|
|
|
}
|
2018-08-09 02:51:31 -07:00
|
|
|
|
let ready_msg = Message::Ready(*hash);
|
2018-10-25 05:44:28 -07:00
|
|
|
|
let step: Step<_> = Target::All.message(ready_msg).into();
|
|
|
|
|
let our_id = &self.our_id().clone();
|
|
|
|
|
Ok(step.join(self.handle_ready(our_id, hash)?))
|
2018-05-08 07:20:32 -07:00
|
|
|
|
}
|
|
|
|
|
|
2018-07-25 01:46:39 -07:00
|
|
|
|
/// Checks whether the conditions for output are met for this hash, and if so, sets the output
|
2018-05-08 07:20:32 -07:00
|
|
|
|
/// value.
|
2018-08-09 02:51:31 -07:00
|
|
|
|
fn compute_output(&mut self, hash: &Digest) -> Result<Step<N>> {
|
2018-05-21 02:01:49 -07:00
|
|
|
|
if self.decided
|
2018-05-29 05:17:30 -07:00
|
|
|
|
|| self.count_readys(hash) <= 2 * self.netinfo.num_faulty()
|
2018-07-25 01:46:39 -07:00
|
|
|
|
|| self.count_echos(hash) < self.coding.data_shard_count()
|
2018-05-08 07:20:32 -07:00
|
|
|
|
{
|
2018-07-24 03:18:09 -07:00
|
|
|
|
return Ok(Step::default());
|
2018-04-25 06:07:16 -07:00
|
|
|
|
}
|
2018-05-02 22:47:07 -07:00
|
|
|
|
|
2018-05-08 07:20:32 -07:00
|
|
|
|
// Upon receiving 2f + 1 matching Ready(h) messages, wait for N − 2f Echo messages.
|
2018-05-21 02:01:49 -07:00
|
|
|
|
let mut leaf_values: Vec<Option<Box<[u8]>>> = self
|
2018-05-29 05:17:30 -07:00
|
|
|
|
.netinfo
|
2018-08-29 09:08:35 -07:00
|
|
|
|
.all_ids()
|
2018-05-08 07:20:32 -07:00
|
|
|
|
.map(|id| {
|
2018-05-14 00:35:34 -07:00
|
|
|
|
self.echos.get(id).and_then(|p| {
|
2018-08-08 07:41:11 -07:00
|
|
|
|
if p.root_hash() == hash {
|
|
|
|
|
Some(p.value().clone().into_boxed_slice())
|
2018-05-08 07:20:32 -07:00
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
})
|
2018-08-31 06:57:10 -07:00
|
|
|
|
}).collect();
|
2018-10-31 07:51:21 -07:00
|
|
|
|
if let Some(value) = decode_from_shards(&mut leaf_values, &self.coding, hash) {
|
2018-07-24 03:18:09 -07:00
|
|
|
|
self.decided = true;
|
|
|
|
|
Ok(Step::default().with_output(value))
|
|
|
|
|
} else {
|
|
|
|
|
Ok(Step::default())
|
|
|
|
|
}
|
2018-05-08 07:20:32 -07:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns `true` if the proof is valid and has the same index as the node ID. Otherwise
|
|
|
|
|
/// logs an info message.
|
2018-08-02 14:27:55 -07:00
|
|
|
|
fn validate_proof(&self, p: &Proof<Vec<u8>>, id: &N) -> bool {
|
2018-08-08 07:41:11 -07:00
|
|
|
|
if !p.validate(self.netinfo.num_nodes()) {
|
2018-05-08 07:20:32 -07:00
|
|
|
|
info!(
|
|
|
|
|
"Node {:?} received invalid proof: {:?}",
|
2018-10-25 05:44:28 -07:00
|
|
|
|
self.our_id(),
|
2018-05-08 07:20:32 -07:00
|
|
|
|
HexProof(&p)
|
|
|
|
|
);
|
|
|
|
|
false
|
2018-08-08 07:41:11 -07:00
|
|
|
|
} else if self.netinfo.node_index(id) != Some(p.index()) {
|
2018-05-08 07:20:32 -07:00
|
|
|
|
info!(
|
|
|
|
|
"Node {:?} received proof for wrong position: {:?}.",
|
2018-10-25 05:44:28 -07:00
|
|
|
|
self.our_id(),
|
2018-05-08 07:20:32 -07:00
|
|
|
|
HexProof(&p)
|
|
|
|
|
);
|
|
|
|
|
false
|
|
|
|
|
} else {
|
|
|
|
|
true
|
|
|
|
|
}
|
2018-04-24 09:31:21 -07:00
|
|
|
|
}
|
2018-05-14 00:35:34 -07:00
|
|
|
|
|
|
|
|
|
/// Returns the number of nodes that have sent us an `Echo` message with this hash.
|
2018-08-09 02:51:31 -07:00
|
|
|
|
fn count_echos(&self, hash: &Digest) -> usize {
|
2018-05-14 00:35:34 -07:00
|
|
|
|
self.echos
|
|
|
|
|
.values()
|
2018-08-08 07:41:11 -07:00
|
|
|
|
.filter(|p| p.root_hash() == hash)
|
2018-05-14 00:35:34 -07:00
|
|
|
|
.count()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns the number of nodes that have sent us a `Ready` message with this hash.
|
2018-08-09 02:51:31 -07:00
|
|
|
|
fn count_readys(&self, hash: &Digest) -> usize {
|
2018-05-14 00:35:34 -07:00
|
|
|
|
self.readys
|
|
|
|
|
.values()
|
|
|
|
|
.filter(|h| h.as_slice() == hash)
|
|
|
|
|
.count()
|
|
|
|
|
}
|
2018-04-24 09:31:21 -07:00
|
|
|
|
}
|
|
|
|
|
|
2018-05-14 07:16:57 -07:00
|
|
|
|
/// A wrapper for `ReedSolomon` that doesn't panic if there are no parity shards.
|
2018-07-24 04:12:06 -07:00
|
|
|
|
#[derive(Debug)]
|
2018-05-14 07:16:57 -07:00
|
|
|
|
enum Coding {
|
|
|
|
|
/// A `ReedSolomon` instance with at least one parity shard.
|
|
|
|
|
ReedSolomon(Box<ReedSolomon>),
|
|
|
|
|
/// A no-op replacement that doesn't encode or decode anything.
|
|
|
|
|
Trivial(usize),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Coding {
|
|
|
|
|
/// Creates a new `Coding` instance with the given number of shards.
|
2018-07-19 04:56:30 -07:00
|
|
|
|
fn new(data_shard_num: usize, parity_shard_num: usize) -> Result<Self> {
|
2018-05-14 07:16:57 -07:00
|
|
|
|
Ok(if parity_shard_num > 0 {
|
2018-07-25 14:38:33 -07:00
|
|
|
|
let rs = ReedSolomon::new(data_shard_num, parity_shard_num)
|
|
|
|
|
.map_err(Error::CodingNewReedSolomon)?;
|
2018-05-14 07:16:57 -07:00
|
|
|
|
Coding::ReedSolomon(Box::new(rs))
|
|
|
|
|
} else {
|
|
|
|
|
Coding::Trivial(data_shard_num)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns the number of data shards.
|
|
|
|
|
fn data_shard_count(&self) -> usize {
|
|
|
|
|
match *self {
|
|
|
|
|
Coding::ReedSolomon(ref rs) => rs.data_shard_count(),
|
|
|
|
|
Coding::Trivial(dsc) => dsc,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns the number of parity shards.
|
|
|
|
|
fn parity_shard_count(&self) -> usize {
|
|
|
|
|
match *self {
|
|
|
|
|
Coding::ReedSolomon(ref rs) => rs.parity_shard_count(),
|
|
|
|
|
Coding::Trivial(_) => 0,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Constructs (and overwrites) the parity shards.
|
2018-07-19 04:56:30 -07:00
|
|
|
|
fn encode(&self, slices: &mut [&mut [u8]]) -> Result<()> {
|
2018-05-14 07:16:57 -07:00
|
|
|
|
match *self {
|
2018-07-25 14:38:33 -07:00
|
|
|
|
Coding::ReedSolomon(ref rs) => {
|
|
|
|
|
rs.encode(slices).map_err(Error::CodingEncodeReedSolomon)?
|
|
|
|
|
}
|
2018-05-14 07:16:57 -07:00
|
|
|
|
Coding::Trivial(_) => (),
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// If enough shards are present, reconstructs the missing ones.
|
2018-07-19 04:56:30 -07:00
|
|
|
|
fn reconstruct_shards(&self, shards: &mut [Option<Box<[u8]>>]) -> Result<()> {
|
2018-05-14 07:16:57 -07:00
|
|
|
|
match *self {
|
2018-07-25 14:38:33 -07:00
|
|
|
|
Coding::ReedSolomon(ref rs) => rs
|
|
|
|
|
.reconstruct_shards(shards)
|
|
|
|
|
.map_err(Error::CodingReconstructShardsReedSolomon)?,
|
2018-05-14 07:16:57 -07:00
|
|
|
|
Coding::Trivial(_) => {
|
|
|
|
|
if shards.iter().any(Option::is_none) {
|
2018-07-25 14:38:33 -07:00
|
|
|
|
return Err(Error::CodingReconstructShardsTrivialReedSolomon(
|
|
|
|
|
rse::Error::TooFewShardsPresent,
|
|
|
|
|
));
|
2018-05-14 07:16:57 -07:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2018-05-30 07:24:52 -07:00
|
|
|
|
fn decode_from_shards(
|
2018-05-04 00:58:21 -07:00
|
|
|
|
leaf_values: &mut [Option<Box<[u8]>>],
|
2018-05-14 07:16:57 -07:00
|
|
|
|
coding: &Coding,
|
2018-08-09 02:51:31 -07:00
|
|
|
|
root_hash: &Digest,
|
2018-05-30 07:24:52 -07:00
|
|
|
|
) -> Option<Vec<u8>> {
|
2018-05-04 00:58:21 -07:00
|
|
|
|
// Try to interpolate the Merkle tree using the Reed-Solomon erasure coding scheme.
|
2018-05-31 00:12:15 -07:00
|
|
|
|
if let Err(err) = coding.reconstruct_shards(leaf_values) {
|
2018-07-25 14:38:33 -07:00
|
|
|
|
error!("Shard reconstruction failed: {:?}", err); // Faulty proposer
|
2018-05-31 00:12:15 -07:00
|
|
|
|
return None;
|
|
|
|
|
}
|
2018-03-29 09:23:02 -07:00
|
|
|
|
|
2018-04-14 03:27:17 -07:00
|
|
|
|
// Collect shards for tree construction.
|
2018-05-10 08:50:07 -07:00
|
|
|
|
let shards: Vec<Vec<u8>> = leaf_values
|
2018-05-04 00:58:21 -07:00
|
|
|
|
.iter()
|
|
|
|
|
.filter_map(|l| l.as_ref().map(|v| v.to_vec()))
|
|
|
|
|
.collect();
|
2018-05-08 07:20:32 -07:00
|
|
|
|
|
2018-10-17 03:04:13 -07:00
|
|
|
|
debug!("Reconstructed shards: {:0.10}", HexList(&shards));
|
2018-05-08 07:20:32 -07:00
|
|
|
|
|
2018-03-29 09:23:02 -07:00
|
|
|
|
// Construct the Merkle tree.
|
2018-08-08 07:41:11 -07:00
|
|
|
|
let mtree = MerkleTree::from_vec(shards);
|
2018-03-29 09:23:02 -07:00
|
|
|
|
// If the root hash of the reconstructed tree does not match the one
|
|
|
|
|
// received with proofs then abort.
|
2018-08-08 07:41:11 -07:00
|
|
|
|
if mtree.root_hash() != root_hash {
|
2018-05-30 06:33:33 -07:00
|
|
|
|
None // The proposer is faulty.
|
2018-04-30 08:55:51 -07:00
|
|
|
|
} else {
|
2018-03-29 09:23:02 -07:00
|
|
|
|
// Reconstruct the value from the data shards.
|
2018-10-31 07:51:21 -07:00
|
|
|
|
glue_shards(mtree, coding.data_shard_count())
|
2018-03-29 09:23:02 -07:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2018-10-31 07:55:39 -07:00
|
|
|
|
/// Concatenates the first `n` leaf values of a Merkle tree `m`. The first four bytes are
|
|
|
|
|
/// interpreted as the payload size, and the padding beyond that size is dropped.
|
2018-05-30 07:24:52 -07:00
|
|
|
|
fn glue_shards(m: MerkleTree<Vec<u8>>, n: usize) -> Option<Vec<u8>> {
|
2018-09-20 02:27:12 -07:00
|
|
|
|
let mut bytes = m.into_values().into_iter().take(n).flatten();
|
2018-05-30 07:24:52 -07:00
|
|
|
|
let payload_len = match (bytes.next(), bytes.next(), bytes.next(), bytes.next()) {
|
|
|
|
|
(Some(b0), Some(b1), Some(b2), Some(b3)) => BigEndian::read_u32(&[b0, b1, b2, b3]) as usize,
|
|
|
|
|
_ => return None, // The proposing node is faulty: no payload size.
|
|
|
|
|
};
|
|
|
|
|
let payload: Vec<u8> = bytes.take(payload_len).collect();
|
2018-10-17 03:04:13 -07:00
|
|
|
|
debug!("Glued data shards {:0.10}", HexFmt(&payload));
|
2018-05-30 07:24:52 -07:00
|
|
|
|
Some(payload)
|
2018-03-23 15:54:40 -07:00
|
|
|
|
}
|