Merge pull request #23 from ethcore/session

session with working ping pong
This commit is contained in:
Marek Kotewicz 2016-10-21 15:13:16 +02:00 committed by GitHub
commit 563f77c500
8 changed files with 107 additions and 25 deletions

View File

@ -13,6 +13,6 @@ mod error;
pub use primitives::{hash, bytes};
pub use common::{Command, Magic};
pub use message::{Message, MessageHeader, Payload};
pub use message::{Message, MessageHeader, Payload, to_raw_message};
pub use serialization::{serialize_payload, deserialize_payload};
pub use error::{Error, MessageResult};

View File

@ -1,9 +1,17 @@
use ser::Stream;
use bytes::TaggedBytes;
use common::Magic;
use bytes::{TaggedBytes, Bytes};
use common::{Magic, Command};
use serialization::serialize_payload;
use {Payload, MessageResult, MessageHeader};
pub fn to_raw_message(magic: Magic, command: Command, payload: &Bytes) -> Bytes {
let header = MessageHeader::for_data(magic, command, payload);
let mut stream = Stream::default();
stream.append(&header);
stream.append_slice(payload);
stream.out()
}
pub struct Message<T> {
bytes: TaggedBytes<T>,
}
@ -11,13 +19,9 @@ pub struct Message<T> {
impl<T> Message<T> where T: Payload {
pub fn new(magic: Magic, version: u32, payload: &T) -> MessageResult<Self> {
let serialized = try!(serialize_payload(payload, version));
let header = MessageHeader::for_data(magic, T::command().into(), &serialized);
let mut stream = Stream::default();
stream.append(&header);
stream.append_slice(&serialized);
let message = Message {
bytes: TaggedBytes::new(stream.out()),
bytes: TaggedBytes::new(to_raw_message(magic, T::command().into(), &serialized)),
};
Ok(message)

View File

@ -2,6 +2,6 @@ mod message;
mod message_header;
pub mod payload;
pub use self::message::Message;
pub use self::message::{Message, to_raw_message};
pub use self::message_header::MessageHeader;
pub use self::payload::Payload;

View File

@ -1,4 +1,6 @@
use message::{Payload, Magic, Message};
use tokio_core::io::{write_all, WriteAll};
use bytes::Bytes;
use message::{Payload, Magic, Message, to_raw_message, Command};
use net::Connection;
use session::Session;
use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage};
@ -32,6 +34,11 @@ impl Channel {
write_message(self.stream.clone(), message)
}
pub fn write_raw_message(&self, command: Command, payload: &Bytes) -> WriteAll<SharedTcpStream, Bytes> {
let message = to_raw_message(self.magic, command, payload);
write_all(self.stream.clone(), message)
}
pub fn read_message(&self) -> ReadAnyMessage<SharedTcpStream> {
read_any_message(self.stream.clone(), self.magic)
}

View File

@ -6,8 +6,10 @@ use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_core::io::IoFuture;
use tokio_core::reactor::Handle;
use message::Payload;
use bytes::Bytes;
use message::{Payload, Command};
use session::Session;
use protocol::Direction;
use io::{ReadAnyMessage, SharedTcpStream};
use net::{connect, listen, Connections, Channel, Config as NetConfig};
use util::NodeTable;
@ -39,7 +41,7 @@ impl Context {
let channel = context.connections.store(connection, session);
// initialize session and then start reading messages
channel.session().initialize(context.clone(), channel.clone())
channel.session().initialize(context.clone(), channel.clone(), Direction::Outbound)
.and_then(move |_| Context::on_message(context, channel))
.boxed()
},
@ -74,7 +76,7 @@ impl Context {
// initialize session and then start reading messages
let cloned_context = context.clone();
channel.session().initialize(context.clone(), channel.clone())
channel.session().initialize(context.clone(), channel.clone(), Direction::Inbound)
.and_then(|_| Context::on_message(cloned_context, channel))
.boxed()
},
@ -120,7 +122,25 @@ impl Context {
}).boxed()
}
pub fn send<T>(context: Arc<Context>, channel: Arc<Channel>, payload: &T) -> IoFuture<()> where T: Payload {
pub fn send_raw(_context: Arc<Context>, channel: Arc<Channel>, command: Command, payload: &Bytes) -> IoFuture<()> {
trace!("Sending {} message to {}", command, channel.peer_info().address);
channel.write_raw_message(command.clone(), payload).then(move |result| {
match result {
Ok(_) => {
// successful send
trace!("Sent {} message to {}", command, channel.peer_info().address);
finished(()).boxed()
},
Err(err) => {
// network error
// closing connection is handled in on_message`
failed(err).boxed()
},
}
}).boxed()
}
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, payload: &T) -> IoFuture<()> where T: Payload {
trace!("Sending {} message to {}", T::command(), channel.peer_info().address);
channel.write_message(payload).then(move |result| {
match result {
@ -131,7 +151,7 @@ impl Context {
},
Err(err) => {
// network error
context.close_connection(channel.peer_info());
// closing connection is handled in on_message`
failed(err).boxed()
},
}

View File

@ -6,13 +6,14 @@ use message::common::Command;
pub use self::ping::PingProtocol;
#[derive(PartialEq, Clone, Copy)]
pub enum Direction {
Inbound,
Outbound,
}
pub enum ProtocolAction {
Reply(Bytes),
Reply((Command, Bytes)),
None,
Disconnect,
}

View File

@ -30,7 +30,7 @@ impl<T> Protocol for PingProtocol<T> where T: NonceGenerator + Send {
self.last_ping_nonce = nonce;
let ping = Ping::new(nonce);
let serialized = try!(serialize_payload(&ping, version));
Ok(ProtocolAction::Reply(serialized))
Ok(ProtocolAction::Reply((Ping::command().into(), serialized)))
},
}
}
@ -40,7 +40,7 @@ impl<T> Protocol for PingProtocol<T> where T: NonceGenerator + Send {
let ping: Ping = try!(deserialize_payload(payload, version));
let pong = Pong::new(ping.nonce);
let serialized = try!(serialize_payload(&pong, version));
Ok(ProtocolAction::Reply(serialized))
Ok(ProtocolAction::Reply((Pong::command().into(), serialized)))
} else if command == &Pong::command().into() {
let pong: Pong = try!(deserialize_payload(payload, version));
if pong.nonce != self.last_ping_nonce {

View File

@ -1,14 +1,15 @@
use std::sync::Arc;
use parking_lot::Mutex;
use futures::{collect, finished, failed, Future};
use tokio_core::io::IoFuture;
use bytes::Bytes;
use message::Command;
use p2p::Context;
use net::Channel;
use protocol::{Protocol, PingProtocol};
use protocol::{Protocol, ProtocolAction, PingProtocol, Direction};
pub struct Session {
protocols: Vec<Arc<Mutex<Box<Protocol>>>>,
protocols: Mutex<Vec<Box<Protocol>>>,
}
impl Session {
@ -24,16 +25,65 @@ impl Session {
pub fn new_with_protocols(protocols: Vec<Box<Protocol>>) -> Self {
Session {
protocols: protocols.into_iter().map(Mutex::new).map(Arc::new).collect(),
protocols: Mutex::new(protocols),
}
}
pub fn initialize(&self, _context: Arc<Context>, _channel: Arc<Channel>) -> IoFuture<()> {
unimplemented!();
pub fn initialize(&self, context: Arc<Context>, channel: Arc<Channel>, direction: Direction) -> IoFuture<()> {
let futures = self.protocols.lock()
.iter_mut()
.map(|protocol| {
// TODO: use real direction and version
match protocol.initialize(direction, channel.version()) {
Ok(ProtocolAction::None) => {
finished(()).boxed()
},
Ok(ProtocolAction::Disconnect) => {
// no other protocols can use the channel after that
context.close_connection(channel.peer_info());
finished(()).boxed()
},
Ok(ProtocolAction::Reply((command, payload))) => {
Context::send_raw(context.clone(), channel.clone(), command, &payload)
},
Err(err) => {
// protocol error
unimplemented!();
}
}
})
.collect::<Vec<_>>();
collect(futures)
.and_then(|_| finished(()))
.boxed()
}
pub fn on_message(&self, _context: Arc<Context>, _channel: Arc<Channel>, _command: Command, _payload: Bytes) -> IoFuture<()> {
unimplemented!();
pub fn on_message(&self, context: Arc<Context>, channel: Arc<Channel>, command: Command, payload: Bytes) -> IoFuture<()> {
let futures = self.protocols.lock()
.iter()
.map(|protocol| {
// TODO: use real version
match protocol.on_message(&command, &payload, channel.version()) {
Ok(ProtocolAction::None) => {
finished(()).boxed()
},
Ok(ProtocolAction::Disconnect) => {
context.close_connection(channel.peer_info());
finished(()).boxed()
},
Ok(ProtocolAction::Reply((command, payload))) => {
Context::send_raw(context.clone(), channel.clone(), command, &payload)
},
Err(err) => {
// protocol error
unimplemented!();
},
}
})
.collect::<Vec<_>>();
collect(futures)
.and_then(|_| finished(()))
.boxed()
}
}