solved the problem with receiver side of a comms channel located inside an OS thread

This commit is contained in:
Vladimir Komendantskiy 2018-03-17 00:36:32 +00:00
parent 4353b1bb3e
commit 797f775008
2 changed files with 19 additions and 19 deletions

View File

@ -15,19 +15,19 @@ use self::stage::*;
/// A broadcast task is an instance of `Task`, a message-handling task with a
/// main loop.
pub struct BroadcastTask<T: Send + Sync> {
pub struct BroadcastTask<T: Send + Sync + 'static> {
/// The underlying task that handles sending and receiving messages.
task: Task,
/// The receive end of the comms channel. The transmit end is stored in
/// `stage`.
receiver: Receiver<Message<T>>,
receiver: Arc<Mutex<Receiver<Message<T>>>>,
/// Shared state of the broadcast stage.
stage: Arc<Mutex<Stage<T>>>
}
impl<T: Clone + Debug + Send + Sync + 'static> BroadcastTask<T> {
pub fn new(stream: TcpStream,
receiver: Receiver<Message<T>>,
receiver: Arc<Mutex<Receiver<Message<T>>>>,
stage: Arc<Mutex<Stage<T>>>) -> Self {
BroadcastTask {
task: Task::new(stream),
@ -65,25 +65,24 @@ impl<T: Clone + Debug + Send + Sync + 'static> BroadcastTask<T> {
}
}
/// Receiver of messages from other broadcast tasks.
///
/// TODO: This is only a placeholder.
pub fn receiver_thread(&self) {
::std::thread::spawn(move || {
loop {
// let message = self.receiver.recv().unwrap();
// info!("Task {:?} received message {:?}",
// self.task.stream.peer_addr().unwrap(),
// message);
}
});
}
}
impl<T: Clone + Debug + From<Vec<u8>> + Send + Sync + 'static>
MessageLoop for BroadcastTask<T> {
MessageLoop for BroadcastTask<T>
{
fn run(&mut self) {
self.receiver_thread();
let rx = self.receiver.clone();
// receiver_thread(rx.lock().unwrap());
::std::thread::spawn(move || {
loop {
let message = rx.lock().unwrap().recv().unwrap();
info!("Received message {:?}",
// self.task.stream.peer_addr().unwrap(),
message);
}
});
loop {
match self.task.receive_message() {
Ok(message) => self.on_message_received(message).unwrap(),

View File

@ -14,7 +14,7 @@ use proto::Message;
pub struct Node {
/// Incoming connection socket.
addr: SocketAddr,
/// Connection sockets of remote nodes. TODO.
/// Sockets of remote nodes. TODO.
remotes: Vec<SocketAddr>
}
@ -36,6 +36,7 @@ impl Node {
stream.peer_addr().unwrap());
let (tx, rx): (Sender<Message<T>>, Receiver<Message<T>>) =
channel();
let rx = Arc::new(Mutex::new(rx));
let stage = Arc::clone(&broadcast_stage);
// Insert the transmit handle connected to this task into
// the shared list of senders.