session initialize and on_message in progress

This commit is contained in:
debris 2016-10-20 16:17:59 +02:00
parent f8f21ce3c6
commit 72825fea50
1 changed files with 57 additions and 7 deletions

View File

@ -1,14 +1,15 @@
use std::sync::Arc;
use parking_lot::Mutex;
use futures::{collect, finished, failed, Future};
use tokio_core::io::IoFuture;
use bytes::Bytes;
use message::Command;
use p2p::Context;
use net::Channel;
use protocol::{Protocol, PingProtocol};
use protocol::{Protocol, ProtocolAction, PingProtocol, Direction};
pub struct Session {
protocols: Vec<Arc<Mutex<Box<Protocol>>>>,
protocols: Mutex<Vec<Box<Protocol>>>,
}
impl Session {
@ -24,16 +25,65 @@ impl Session {
pub fn new_with_protocols(protocols: Vec<Box<Protocol>>) -> Self {
Session {
protocols: protocols.into_iter().map(Mutex::new).map(Arc::new).collect(),
protocols: Mutex::new(protocols),
}
}
pub fn initialize(&self, _context: Arc<Context>, _channel: Arc<Channel>) -> IoFuture<()> {
unimplemented!();
pub fn initialize(&self, context: Arc<Context>, channel: Arc<Channel>) -> IoFuture<()> {
let futures = self.protocols.lock()
.iter_mut()
.map(|protocol| {
// TODO: use real direction and version
match protocol.initialize(Direction::Inbound, 0) {
Ok(ProtocolAction::None) => {
finished(()).boxed()
},
Ok(ProtocolAction::Disconnect) => {
// no other protocols can use the channel after that
context.close_connection(channel.peer_info());
finished(()).boxed()
},
Ok(ProtocolAction::Reply(message)) => {
unimplemented!();
},
Err(err) => {
// protocol error
unimplemented!();
}
}
})
.collect::<Vec<_>>();
collect(futures)
.and_then(|_| finished(()))
.boxed()
}
pub fn on_message(&self, _context: Arc<Context>, _channel: Arc<Channel>, _command: Command, _payload: Bytes) -> IoFuture<()> {
unimplemented!();
pub fn on_message(&self, context: Arc<Context>, channel: Arc<Channel>, command: Command, payload: Bytes) -> IoFuture<()> {
let futures = self.protocols.lock()
.iter()
.map(|protocol| {
// TODO: use real version
match protocol.on_message(&command, &payload, 0) {
Ok(ProtocolAction::None) => {
finished(()).boxed()
},
Ok(ProtocolAction::Disconnect) => {
context.close_connection(channel.peer_info());
finished(()).boxed()
},
Ok(ProtocolAction::Reply(message)) => {
unimplemented!();
},
Err(err) => {
// protocol error
unimplemented!();
},
}
})
.collect::<Vec<_>>();
collect(futures)
.and_then(|_| finished(()))
.boxed()
}
}