Change can_decodes to BTreeMap<Digest, BTreeSet<N>> and fix send_can_decode

This commit is contained in:
pawanjay176 2019-05-05 03:01:58 +05:30
parent 447df73966
commit 491d71acef
2 changed files with 28 additions and 16 deletions

View File

@ -43,9 +43,9 @@ pub struct Broadcast<N> {
fault_estimate: usize,
/// The hashes and proofs we have received via `Echo` and `EchoHash` messages, by sender ID.
echos: BTreeMap<N, (Digest, Option<Proof<Vec<u8>>>)>,
/// The hashes we have received via `CanDecode` messages, by sender ID.
/// The hashes we have received from nodes via `CanDecode` messages, by hash.
/// A node can receive conflicting `CanDecode`s from the same node.
can_decodes: BTreeSet<(N, Digest)>,
can_decodes: BTreeMap<Digest, BTreeSet<N>>,
/// The root hashes we received via `Ready` messages, by sender ID.
readys: BTreeMap<N, Vec<u8>>,
}
@ -105,7 +105,7 @@ impl<N: NodeIdT> Broadcast<N> {
decided: false,
fault_estimate: g,
echos: BTreeMap::new(),
can_decodes: BTreeSet::new(),
can_decodes: BTreeMap::new(),
readys: BTreeMap::new(),
})
}
@ -275,8 +275,7 @@ impl<N: NodeIdT> Broadcast<N> {
let hash = *p.root_hash();
// Save the proof for reconstructing the tree later.
self.echos
.insert(sender_id.clone(), (hash, Some(p)));
self.echos.insert(sender_id.clone(), (hash, Some(p)));
// Upon receiving `N - 2f` `Echo`s with this root hash, send `CanDecode`
if !self.can_decode_sent && self.count_echos_full(&hash) >= self.coding.data_shard_count() {
@ -328,15 +327,24 @@ impl<N: NodeIdT> Broadcast<N> {
/// Handles a received `CanDecode` message.
fn handle_can_decode(&mut self, sender_id: &N, hash: &Digest) -> Result<Step<N>> {
// Save the hash for counting later. If tuple already exists, emit a warning.
if !self.can_decodes.insert((sender_id.clone(), *hash)) {
warn!(
"Node {:?} received CanDecode({:?}) multiple times from {:?}.",
self.our_id(),
hash,
sender_id,
);
// Save the hash for counting later. If hash from sender_id already exists, emit a warning.
if let Some(nodes) = self.can_decodes.get(hash) {
if nodes.contains(sender_id) {
warn!(
"Node {:?} received CanDecode({:?}) multiple times from {:?}.",
self.our_id(),
hash,
sender_id,
);
}
}
let mut values = self
.can_decodes
.get(hash)
.unwrap_or(&BTreeSet::new())
.clone();
values.insert(sender_id.clone());
self.can_decodes.insert(*hash, values);
Ok(Step::default())
}
@ -427,7 +435,9 @@ impl<N: NodeIdT> Broadcast<N> {
.skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
.take_while(|x| *x != self.our_id());
for id in right {
if !self.can_decodes.contains(&(id.clone(), *p.root_hash())) {
if self.can_decodes.contains_key(p.root_hash())
&& !self.can_decodes[p.root_hash()].contains(id)
{
let msg = Target::Node(id.clone()).message(echo_msg.clone());
step.messages.push(msg);
}
@ -471,7 +481,7 @@ impl<N: NodeIdT> Broadcast<N> {
let mut step = Step::default();
for id in self.netinfo.all_ids() {
if self.echos.contains_key(id) {
if let Some((_, None)) = self.echos.get(id) {
let msg = Target::Node(id.clone()).message(can_decode_msg.clone());
step.messages.push(msg);
}

View File

@ -27,7 +27,9 @@ pub enum Message {
// for integration tests.
impl Distribution<Message> for Standard {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> Message {
let message_type = *["value", "echo", "ready", "can_decode", "echo_hash"].choose(rng).unwrap();
let message_type = *["value", "echo", "ready", "can_decode", "echo_hash"]
.choose(rng)
.unwrap();
// Create a random buffer for our proof.
let mut buffer: [u8; 32] = [0; 32];