Ignore messages in Agreement after termination.

This commit is contained in:
Andreas Fackler 2018-05-30 16:24:52 +02:00
parent f700ae82d0
commit bb61d0c5ab
2 changed files with 20 additions and 30 deletions

View File

@ -20,7 +20,6 @@ error_chain!{
errors {
InputNotAccepted
Terminated
}
}
@ -126,11 +125,8 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for Agreement<NodeU
sender_id: &Self::NodeUid,
message: Self::Message,
) -> AgreementResult<()> {
if self.terminated {
return Err(ErrorKind::Terminated.into());
}
if message.epoch < self.epoch {
return Ok(()); // Message is obsolete: We are already in a later epoch.
if self.terminated || message.epoch < self.epoch {
return Ok(()); // Message is obsolete: We are already in a later epoch or terminated.
}
if message.epoch > self.epoch {
// Message is for a later epoch. We can't handle that yet.
@ -386,8 +382,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
self.epoch
);
if let Some(b) = vals.definite() {
self.estimated = Some(b);
let b = if let Some(b) = vals.definite() {
// Outputting a value is allowed only once.
if self.decision.is_none() && b == coin {
// Output the agreement value.
@ -400,11 +395,12 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
b
);
}
b
} else {
self.estimated = Some(coin);
}
coin
};
let b = self.estimated.unwrap();
self.estimated = Some(b);
self.send_bval(b)?;
let queued_msgs = replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queued_msgs {

View File

@ -491,15 +491,12 @@ impl Coding {
}
}
fn decode_from_shards<T>(
fn decode_from_shards(
leaf_values: &mut [Option<Box<[u8]>>],
coding: &Coding,
data_shard_num: usize,
root_hash: &[u8],
) -> Option<T>
where
T: From<Vec<u8>>,
{
) -> Option<Vec<u8>> {
// Try to interpolate the Merkle tree using the Reed-Solomon erasure coding scheme.
coding
.reconstruct_shards(leaf_values)
@ -523,24 +520,21 @@ where
None // The proposer is faulty.
} else {
// Reconstruct the value from the data shards.
Some(glue_shards(mtree, data_shard_num))
glue_shards(mtree, data_shard_num)
}
}
/// Concatenates the first `n` leaf values of a Merkle tree `m` in one value of
/// type `T`. This is useful for reconstructing the data value held in the tree
/// and forgetting the leaves that contain parity information.
fn glue_shards<T>(m: MerkleTree<Vec<u8>>, n: usize) -> T
where
T: From<Vec<u8>>,
{
let t: Vec<u8> = m.into_iter()
.take(n)
.flat_map(|s| s.into_iter().skip(1)) // Drop the index byte.
.collect();
let payload_len = BigEndian::read_u32(&t[..4]) as usize;
let payload = &t[4..(payload_len + 4)];
debug!("Glued data shards {:?}", HexBytes(payload));
payload.to_vec().into()
fn glue_shards(m: MerkleTree<Vec<u8>>, n: usize) -> Option<Vec<u8>> {
// Create an iterator over the shard payload, drop the index bytes.
let mut bytes = m.into_iter().take(n).flat_map(|s| s.into_iter().skip(1));
let payload_len = match (bytes.next(), bytes.next(), bytes.next(), bytes.next()) {
(Some(b0), Some(b1), Some(b2), Some(b3)) => BigEndian::read_u32(&[b0, b1, b2, b3]) as usize,
_ => return None, // The proposing node is faulty: no payload size.
};
let payload: Vec<u8> = bytes.take(payload_len).collect();
debug!("Glued data shards {:?}", HexBytes(&payload));
Some(payload)
}