Internal messaging was being stopped before broadcast threads finished. Corrected that.

This commit is contained in:
Vladimir Komendantskiy 2018-04-20 09:42:15 +01:00
parent 7997e6e674
commit 27e5ee679e
1 changed files with 11 additions and 4 deletions

View File

@ -68,13 +68,14 @@ impl<T: Clone + Debug + Hashable + PartialEq + Send + Sync
crossbeam::scope(|scope| {
// Start the centralised message delivery system.
let msg_handle = messaging.spawn(scope);
let mut broadcast_handles = Vec::new();
// Associate a broadcast instance with this node. This instance will
// broadcast the proposed value. There is no remote node
// corresponding to this instance, and no dedicated comms task. The
// node index is 0.
let ref to_algo_rx0 = to_algo_rxs[0];
scope.spawn(move || {
broadcast_handles.push(scope.spawn(move || {
match broadcast::Instance::new(from_algo_tx,
to_algo_rx0,
value.to_owned(),
@ -88,7 +89,7 @@ impl<T: Clone + Debug + Hashable + PartialEq + Send + Sync
},
Err(e) => error!("Broadcast instance 0: {:?}", e)
}
});
}));
// Start a comms task for each connection. Node indices of those
// tasks are 1 through N where N is the number of connections.
@ -115,7 +116,7 @@ impl<T: Clone + Debug + Hashable + PartialEq + Send + Sync
// Associate a broadcast instance to the above comms task.
let ref to_algo_rx = to_algo_rxs[node_index];
scope.spawn(move || {
broadcast_handles.push(scope.spawn(move || {
match broadcast::Instance::new(from_algo_tx,
to_algo_rx,
None,
@ -131,7 +132,13 @@ impl<T: Clone + Debug + Hashable + PartialEq + Send + Sync
Err(e) => error!("Broadcast instance {}: {:?}",
node_index, e)
}
});
}));
}
// Wait for the broadcast instances to finish before stopping the
// messaging task.
for h in broadcast_handles {
h.join();
}
// Stop the messaging task.