From 72825fea5014723c1be5e1c051347d4e674694fc Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 20 Oct 2016 16:17:59 +0200 Subject: [PATCH] session initialize and on_message in progress --- p2p/src/session.rs | 64 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 57 insertions(+), 7 deletions(-) diff --git a/p2p/src/session.rs b/p2p/src/session.rs index 122d565c..53d49414 100644 --- a/p2p/src/session.rs +++ b/p2p/src/session.rs @@ -1,14 +1,15 @@ use std::sync::Arc; use parking_lot::Mutex; +use futures::{collect, finished, failed, Future}; use tokio_core::io::IoFuture; use bytes::Bytes; use message::Command; use p2p::Context; use net::Channel; -use protocol::{Protocol, PingProtocol}; +use protocol::{Protocol, ProtocolAction, PingProtocol, Direction}; pub struct Session { - protocols: Vec>>>, + protocols: Mutex>>, } impl Session { @@ -24,16 +25,65 @@ impl Session { pub fn new_with_protocols(protocols: Vec>) -> Self { Session { - protocols: protocols.into_iter().map(Mutex::new).map(Arc::new).collect(), + protocols: Mutex::new(protocols), } } - pub fn initialize(&self, _context: Arc, _channel: Arc) -> IoFuture<()> { - unimplemented!(); + pub fn initialize(&self, context: Arc, channel: Arc) -> IoFuture<()> { + let futures = self.protocols.lock() + .iter_mut() + .map(|protocol| { + // TODO: use real direction and version + match protocol.initialize(Direction::Inbound, 0) { + Ok(ProtocolAction::None) => { + finished(()).boxed() + }, + Ok(ProtocolAction::Disconnect) => { + // no other protocols can use the channel after that + context.close_connection(channel.peer_info()); + finished(()).boxed() + }, + Ok(ProtocolAction::Reply(message)) => { + unimplemented!(); + }, + Err(err) => { + // protocol error + unimplemented!(); + } + } + }) + .collect::>(); + collect(futures) + .and_then(|_| finished(())) + .boxed() } - pub fn on_message(&self, _context: Arc, _channel: Arc, _command: Command, _payload: Bytes) -> IoFuture<()> { - unimplemented!(); + pub fn on_message(&self, context: Arc, channel: Arc, command: Command, payload: Bytes) -> IoFuture<()> { + let futures = self.protocols.lock() + .iter() + .map(|protocol| { + // TODO: use real version + match protocol.on_message(&command, &payload, 0) { + Ok(ProtocolAction::None) => { + finished(()).boxed() + }, + Ok(ProtocolAction::Disconnect) => { + context.close_connection(channel.peer_info()); + finished(()).boxed() + }, + Ok(ProtocolAction::Reply(message)) => { + unimplemented!(); + }, + Err(err) => { + // protocol error + unimplemented!(); + }, + } + }) + .collect::>(); + collect(futures) + .and_then(|_| finished(())) + .boxed() } }