Merge branch 'master' into db-kvdb

This commit is contained in:
debris 2016-10-14 14:56:09 +02:00
commit 47d7149909
47 changed files with 273 additions and 174 deletions

6
Cargo.lock generated
View File

@ -81,7 +81,7 @@ dependencies = [
"unicode-segmentation 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"unicode-width 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"vec_map 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"yaml-rust 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"yaml-rust 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -529,7 +529,7 @@ dependencies = [
[[package]]
name = "yaml-rust"
version = "0.3.3"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata]
@ -587,4 +587,4 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
"checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
"checksum yaml-rust 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ebfe12f475ad59be6178ebf004d51e682022496535994f8d23fd7ed31084598c"
"checksum yaml-rust 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "371cea3a33a58d11dc83c0992fb37e44f651ebdf2df12f9d939f6cb24be2a8fd"

View File

@ -1,5 +1,7 @@
use ser::Error as ReaderError;
pub type MessageResult<T> = Result<T, Error>;
#[derive(Debug, PartialEq)]
pub enum Error {
/// Deserialization failed.

View File

@ -6,13 +6,13 @@ extern crate serialization as ser;
pub mod common;
mod message;
pub mod serialization;
mod serialization;
pub mod types;
mod error;
pub use primitives::{hash, bytes};
pub use message::{Message, MessageHeader};
pub use error::Error;
pub use serialization::PayloadType;
pub type MessageResult<T> = Result<T, Error>;
pub use common::{Command, Magic};
pub use message::{Message, MessageHeader, Payload};
pub use serialization::{serialize_payload, deserialize_payload};
pub use error::{Error, MessageResult};

View File

@ -2,13 +2,13 @@ use ser::Stream;
use bytes::TaggedBytes;
use common::Magic;
use serialization::serialize_payload;
use {PayloadType, MessageResult, MessageHeader};
use {Payload, MessageResult, MessageHeader};
pub struct Message<T> {
bytes: TaggedBytes<T>,
}
impl<T> Message<T> where T: PayloadType {
impl<T> Message<T> where T: Payload {
pub fn new(magic: Magic, version: u32, payload: &T) -> MessageResult<Self> {
let serialized = try!(serialize_payload(payload, version));
let header = MessageHeader::for_data(magic, T::command().into(), &serialized);

View File

@ -1,5 +1,7 @@
mod message;
mod message_header;
pub mod payload;
pub use self::message::Message;
pub use self::message_header::MessageHeader;
pub use self::payload::Payload;

View File

@ -0,0 +1,9 @@
use ser::{Reader, Stream};
use MessageResult;
pub trait Payload: Send + 'static {
fn version() -> u32;
fn command() -> &'static str;
fn deserialize_payload(reader: &mut Reader, version: u32) -> MessageResult<Self> where Self: Sized;
fn serialize_payload(&self, stream: &mut Stream, version: u32) -> MessageResult<()>;
}

View File

@ -1,14 +1,5 @@
mod stream;
mod reader;
pub use self::stream::{PayloadStream, serialize_payload};
pub use self::reader::{PayloadReader, deserialize_payload};
use ser::{Reader, Stream};
use MessageResult;
pub trait PayloadType: Send + 'static {
fn version() -> u32;
fn command() -> &'static str;
fn deserialize_payload(reader: &mut Reader, version: u32) -> MessageResult<Self> where Self: Sized;
fn serialize_payload(&self, stream: &mut Stream, version: u32) -> MessageResult<()>;
}
pub use self::stream::serialize_payload;
pub use self::reader::deserialize_payload;

View File

@ -1,8 +1,7 @@
use ser::Reader;
use serialization::PayloadType;
use Error;
use {Payload, Error};
pub fn deserialize_payload<T>(buffer: &[u8], version: u32) -> Result<T, Error> where T: PayloadType {
pub fn deserialize_payload<T>(buffer: &[u8], version: u32) -> Result<T, Error> where T: Payload {
let mut reader = PayloadReader::new(buffer, version);
let result = try!(reader.read());
if !reader.is_finished() {
@ -25,7 +24,7 @@ impl<'a> PayloadReader<'a> {
}
}
pub fn read<T>(&mut self) -> Result<T, Error> where T: PayloadType {
pub fn read<T>(&mut self) -> Result<T, Error> where T: Payload {
if T::version() > self.version {
return Err(Error::InvalidVersion);
}

View File

@ -1,8 +1,8 @@
use bytes::Bytes;
use ser::Stream;
use {PayloadType, Error, MessageResult};
use {Payload, Error, MessageResult};
pub fn serialize_payload<T>(t: &T, version: u32) -> MessageResult<Bytes> where T: PayloadType {
pub fn serialize_payload<T>(t: &T, version: u32) -> MessageResult<Bytes> where T: Payload {
let mut stream = PayloadStream::new(version);
try!(stream.append(t));
Ok(stream.out())
@ -21,7 +21,7 @@ impl PayloadStream {
}
}
pub fn append<T>(&mut self, t: &T) -> MessageResult<()> where T: PayloadType {
pub fn append<T>(&mut self, t: &T) -> MessageResult<()> where T: Payload {
if T::version() > self.version {
return Err(Error::InvalidVersion);
}

View File

@ -3,7 +3,7 @@ use ser::{
Deserializable, Reader, Error as ReaderError,
};
use common::NetAddress;
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub enum Addr {
@ -11,7 +11,7 @@ pub enum Addr {
V31402(V31402),
}
impl PayloadType for Addr {
impl Payload for Addr {
fn version() -> u32 {
0
}

View File

@ -1,13 +1,13 @@
use ser::{Stream, Reader};
use common::BlockTransactions;
use {MessageResult, PayloadType};
use {MessageResult, Payload};
#[derive(Debug, PartialEq)]
pub struct BlockTxn {
request: BlockTransactions,
}
impl PayloadType for BlockTxn {
impl Payload for BlockTxn {
fn version() -> u32 {
70014
}

View File

@ -1,13 +1,13 @@
use ser::{Stream, Reader};
use common::BlockHeaderAndIDs;
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct CompactBlock {
header: BlockHeaderAndIDs,
}
impl PayloadType for CompactBlock {
impl Payload for CompactBlock {
fn version() -> u32 {
70014
}

View File

@ -1,12 +1,12 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct FeeFilter {
fee_rate: u64,
}
impl PayloadType for FeeFilter {
impl Payload for FeeFilter {
fn version() -> u32 {
70013
}

View File

@ -1,6 +1,6 @@
use bytes::Bytes;
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct FilterAdd {
@ -8,7 +8,7 @@ pub struct FilterAdd {
data: Bytes,
}
impl PayloadType for FilterAdd {
impl Payload for FilterAdd {
fn version() -> u32 {
70001
}

View File

@ -1,10 +1,10 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct FilterClear;
impl PayloadType for FilterClear {
impl Payload for FilterClear {
fn version() -> u32 {
70001
}

View File

@ -1,6 +1,6 @@
use bytes::Bytes;
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct FilterLoad {
@ -11,7 +11,7 @@ pub struct FilterLoad {
flags: u8,
}
impl PayloadType for FilterLoad {
impl Payload for FilterLoad {
fn version() -> u32 {
70001
}

View File

@ -1,10 +1,10 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct GetAddr;
impl PayloadType for GetAddr {
impl Payload for GetAddr {
fn version() -> u32 {
60002
}

View File

@ -1,6 +1,6 @@
use hash::H256;
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct GetBlocks {
@ -9,7 +9,7 @@ pub struct GetBlocks {
hash_stop: H256,
}
impl PayloadType for GetBlocks {
impl Payload for GetBlocks {
fn version() -> u32 {
0
}

View File

@ -1,13 +1,13 @@
use ser::{Stream, Reader};
use common::BlockTransactionsRequest;
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct GetBlockTxn {
request: BlockTransactionsRequest,
}
impl PayloadType for GetBlockTxn {
impl Payload for GetBlockTxn {
fn version() -> u32 {
70014
}

View File

@ -1,13 +1,13 @@
use ser::{Stream, Reader};
use common::InventoryVector;
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct GetData {
pub inventory: Vec<InventoryVector>,
}
impl PayloadType for GetData {
impl Payload for GetData {
fn version() -> u32 {
0
}

View File

@ -1,6 +1,6 @@
use hash::H256;
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct GetHeaders {
@ -9,7 +9,7 @@ pub struct GetHeaders {
hash_stop: H256,
}
impl PayloadType for GetHeaders {
impl Payload for GetHeaders {
fn version() -> u32 {
0
}

View File

@ -1,6 +1,6 @@
use chain::BlockHeader;
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Headers {
@ -8,7 +8,7 @@ pub struct Headers {
headers: Vec<BlockHeader>,
}
impl PayloadType for Headers {
impl Payload for Headers {
fn version() -> u32 {
0
}

View File

@ -1,13 +1,13 @@
use ser::{Stream, Reader};
use common::InventoryVector;
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Inv {
pub inventory: Vec<InventoryVector>,
}
impl PayloadType for Inv {
impl Payload for Inv {
fn version() -> u32 {
0
}

View File

@ -1,10 +1,10 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct MemPool;
impl PayloadType for MemPool {
impl Payload for MemPool {
fn version() -> u32 {
60002
}

View File

@ -2,7 +2,7 @@ use hash::H256;
use bytes::Bytes;
use ser::{Stream, Reader};
use chain::BlockHeader;
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct MerkleBlock {
@ -12,7 +12,7 @@ pub struct MerkleBlock {
flags: Bytes,
}
impl PayloadType for MerkleBlock {
impl Payload for MerkleBlock {
fn version() -> u32 {
70014
}

View File

@ -1,13 +1,13 @@
use ser::{Stream, Reader};
use common::InventoryVector;
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct NotFound {
pub inventory: Vec<InventoryVector>,
}
impl PayloadType for NotFound {
impl Payload for NotFound {
fn version() -> u32 {
0
}

View File

@ -1,12 +1,12 @@
use ser::{Stream, Reader};
use {MessageResult, PayloadType};
use {MessageResult, Payload};
#[derive(Debug, PartialEq)]
pub struct Ping {
pub nonce: u64,
}
impl PayloadType for Ping {
impl Payload for Ping {
fn version() -> u32 {
0
}

View File

@ -1,12 +1,12 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Pong {
pub nonce: u64,
}
impl PayloadType for Pong {
impl Payload for Pong {
fn version() -> u32 {
0
}

View File

@ -1,5 +1,5 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq, Clone, Copy)]
#[repr(u8)]
@ -59,7 +59,7 @@ pub struct Reject {
// TODO: data
}
impl PayloadType for Reject {
impl Payload for Reject {
fn version() -> u32 {
0
}

View File

@ -1,5 +1,5 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct SendCompact {
@ -7,7 +7,7 @@ pub struct SendCompact {
second: u64,
}
impl PayloadType for SendCompact {
impl Payload for SendCompact {
fn version() -> u32 {
70014
}

View File

@ -1,10 +1,10 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct SendHeaders;
impl PayloadType for SendHeaders {
impl Payload for SendHeaders {
fn version() -> u32 {
70012
}

View File

@ -1,10 +1,10 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Verack;
impl PayloadType for Verack {
impl Payload for Verack {
fn version() -> u32 {
0
}

View File

@ -4,7 +4,7 @@ use ser::{
Deserializable, Reader, Error as ReaderError,
};
use common::{NetAddress, ServiceFlags};
use {PayloadType, MessageResult};
use {Payload, MessageResult};
use serialization::deserialize_payload;
#[derive(Debug, PartialEq)]
@ -14,7 +14,7 @@ pub enum Version {
V70001(V0, V106, V70001),
}
impl PayloadType for Version {
impl Payload for Version {
fn version() -> u32 {
0
}

View File

@ -10,8 +10,8 @@ pub use self::handshake::{
handshake, accept_handshake, Handshake, AcceptHandshake, HandshakeResult
};
pub use self::read_header::{read_header, ReadHeader};
pub use self::read_message_stream::{read_message_stream, ReadMessageStream};
pub use self::read_payload::{read_payload, ReadPayload};
pub use self::read_message::{read_message, ReadMessage};
pub use self::read_message_stream::{read_message_stream, ReadMessageStream};
pub use self::sharedtcpstream::SharedTcpStream;
pub use self::write_message::{write_message, WriteMessage};

View File

@ -1,8 +1,7 @@
use std::io;
use futures::{Future, Poll, Async};
use tokio_core::io::{ReadExact, read_exact};
use message::{MessageHeader, MessageResult};
use message::common::Magic;
use message::{MessageHeader, MessageResult, Magic};
pub fn read_header<A>(a: A, magic: Magic) -> ReadHeader<A> where A: io::Read {
ReadHeader {

View File

@ -1,13 +1,11 @@
use std::io;
use std::marker::PhantomData;
use futures::{Poll, Future, Async};
use message::{MessageResult, Error};
use message::common::Magic;
use message::serialization::PayloadType;
use message::{MessageResult, Error, Magic, Payload};
use io::{read_header, ReadHeader, read_payload, ReadPayload};
pub fn read_message<M, A>(a: A, magic: Magic, version: u32) -> ReadMessage<M, A>
where A: io::Read, M: PayloadType {
where A: io::Read, M: Payload {
ReadMessage {
state: ReadMessageState::ReadHeader {
version: version,
@ -33,7 +31,7 @@ pub struct ReadMessage<M, A> {
message_type: PhantomData<M>,
}
impl<M, A> Future for ReadMessage<M, A> where A: io::Read, M: PayloadType {
impl<M, A> Future for ReadMessage<M, A> where A: io::Read, M: Payload {
type Item = (A, MessageResult<M>);
type Error = io::Error;
@ -43,9 +41,7 @@ impl<M, A> Future for ReadMessage<M, A> where A: io::Read, M: PayloadType {
let (read, header) = try_ready!(future.poll());
let header = match header {
Ok(header) => header,
Err(err) => {
return Ok((read, Err(err)).into());
}
Err(err) => return Ok((read, Err(err)).into()),
};
if header.command != M::command().into() {
return Ok((read, Err(Error::InvalidCommand)).into());

View File

@ -3,8 +3,7 @@ use futures::{Future, Poll, Async};
use futures::stream::Stream;
use tokio_core::io::{read_exact, ReadExact};
use crypto::checksum;
use message::{Error, MessageHeader, MessageResult};
use message::common::Magic;
use message::{Error, MessageHeader, MessageResult, Magic, Command};
use bytes::Bytes;
use io::{read_header, ReadHeader};
@ -29,7 +28,7 @@ pub struct ReadMessageStream<A> {
}
impl<A> Stream for ReadMessageStream<A> where A: io::Read {
type Item = MessageResult<Bytes>;
type Item = MessageResult<(Command, Bytes)>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -54,7 +53,7 @@ impl<A> Stream for ReadMessageStream<A> where A: io::Read {
}
let future = read_header(stream, self.magic);
let next = ReadMessageStreamState::ReadHeader(future);
(next, Some(Ok(bytes)).into())
(next, Some(Ok((header.command.clone(), bytes))).into())
},
};

View File

@ -5,11 +5,10 @@ use tokio_core::io::{read_exact, ReadExact};
use bytes::Bytes;
use hash::H32;
use crypto::checksum;
use message::{Error, MessageResult};
use message::serialization::{PayloadType, deserialize_payload};
use message::{Error, MessageResult, Payload, deserialize_payload};
pub fn read_payload<M, A>(a: A, version: u32, len: usize, checksum: H32) -> ReadPayload<M, A>
where A: io::Read, M: PayloadType {
where A: io::Read, M: Payload {
ReadPayload {
reader: read_exact(a, Bytes::new_with_len(len)),
version: version,
@ -25,7 +24,7 @@ pub struct ReadPayload<M, A> {
payload_type: PhantomData<M>,
}
impl<M, A> Future for ReadPayload<M, A> where A: io::Read, M: PayloadType {
impl<M, A> Future for ReadPayload<M, A> where A: io::Read, M: Payload {
type Item = (A, MessageResult<M>);
type Error = io::Error;

View File

@ -17,7 +17,6 @@ pub mod util;
mod config;
mod event_loop;
mod p2p;
mod run;
pub const VERSION: u32 = 70_001;
pub const USER_AGENT: &'static str = "pbtc";
@ -26,7 +25,6 @@ pub use primitives::{hash, bytes};
pub use config::Config;
pub use event_loop::{event_loop, forever};
pub use run::run;
pub use p2p::P2P;
pub type PeerId = usize;

35
p2p/src/net/channel.rs Normal file
View File

@ -0,0 +1,35 @@
use std::io;
use futures::Poll;
use futures::stream::Stream;
use parking_lot::Mutex;
use bytes::Bytes;
use message::{MessageResult, Payload, Command};
use net::Connection;
use io::{read_message_stream, ReadMessageStream, SharedTcpStream, WriteMessage};
pub struct Channel {
connection: Connection,
message_stream: Mutex<ReadMessageStream<SharedTcpStream>>,
}
impl Channel {
pub fn new(connection: Connection) -> Self {
let stream = read_message_stream(connection.stream.clone(), connection.magic);
Channel {
connection: connection,
message_stream: Mutex::new(stream),
}
}
pub fn write_message<T>(&self, payload: &T) -> WriteMessage<T, SharedTcpStream> where T: Payload {
self.connection.write_message(payload)
}
pub fn poll_message(&self) -> Poll<Option<(MessageResult<(Command, Bytes)>)>, io::Error> {
self.message_stream.lock().poll()
}
pub fn version(&self) -> u32 {
self.connection.version
}
}

View File

@ -1,6 +1,5 @@
use std::net;
use message::{Message, PayloadType};
use message::common::Magic;
use message::{Message, Payload, Magic};
use io::{write_message, WriteMessage, SharedTcpStream};
pub struct Connection {
@ -11,7 +10,7 @@ pub struct Connection {
}
impl Connection {
pub fn write_message<T>(&self, payload: &T) -> WriteMessage<T, SharedTcpStream> where T: PayloadType {
pub fn write_message<T>(&self, payload: &T) -> WriteMessage<T, SharedTcpStream> where T: Payload {
let message = match Message::new(self.magic, self.version, payload) {
Ok(message) => message,
Err(_err) => {

View File

@ -5,13 +5,13 @@ use parking_lot::RwLock;
use futures::{finished, Future};
use futures_cpupool::CpuPool;
use tokio_core::reactor::Handle;
use message::PayloadType;
use net::Connection;
use message::Payload;
use net::{Connection, Channel};
use PeerId;
pub struct Connections {
peer_counter: AtomicUsize,
channels: RwLock<HashMap<PeerId, Arc<Connection>>>,
channels: RwLock<HashMap<PeerId, Arc<Channel>>>,
}
impl Connections {
@ -24,7 +24,7 @@ impl Connections {
/// Broadcast messages to the network.
/// Returned future completes of first confirmed receive.
pub fn broadcast<T>(connections: &Arc<Connections>, handle: &Handle, pool: &CpuPool, payload: T) where T: PayloadType {
pub fn broadcast<T>(connections: &Arc<Connections>, handle: &Handle, pool: &CpuPool, payload: T) where T: Payload {
let channels = connections.channels();
for (id, channel) in channels.into_iter() {
let write = channel.write_message(&payload);
@ -45,7 +45,7 @@ impl Connections {
}
/// Returns safe (nonblocking) copy of channels.
pub fn channels(&self) -> HashMap<PeerId, Arc<Connection>> {
pub fn channels(&self) -> HashMap<PeerId, Arc<Channel>> {
self.channels.read().clone()
}
@ -56,9 +56,8 @@ impl Connections {
/// Stores new channel.
pub fn store(&self, connection: Connection) {
println!("new connection!");
let id = self.peer_counter.fetch_add(1, Ordering::AcqRel);
self.channels.write().insert(id, Arc::new(connection));
self.channels.write().insert(id, Arc::new(Channel::new(connection)));
}
/// Removes channel with given id.

82
p2p/src/net/messages.rs Normal file
View File

@ -0,0 +1,82 @@
use std::io;
use std::sync::Weak;
use bytes::Bytes;
use futures::{Poll, Async};
use futures::stream::Stream;
use message::common::Command;
use net::Connections;
use PeerId;
pub struct MessagesHandler {
last_polled: usize,
connections: Weak<Connections>,
}
fn next_to_poll(channels: usize, last_polled: usize) -> usize {
// it's irrelevant if we sometimes poll the same peer
if channels > last_polled + 1 {
// let's poll the next peer
last_polled + 1
} else {
// let's move to the first channel
0
}
}
impl MessagesHandler {
pub fn new(connections: Weak<Connections>) -> Self {
MessagesHandler {
last_polled: usize::max_value(),
connections: connections,
}
}
}
impl Stream for MessagesHandler {
type Item = (Command, Bytes, u32, PeerId);
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let connections = match self.connections.upgrade() {
Some(c) => c,
// application is about to shutdown
None => return Ok(None.into())
};
let channels = connections.channels();
if channels.len() == 0 {
// let's wait for some connections
return Ok(Async::NotReady);
}
let mut to_poll = next_to_poll(channels.len(), self.last_polled);
let mut result = None;
while result.is_none() && to_poll != self.last_polled {
let (id, channel) = channels.iter().nth(to_poll).expect("to_poll < channels.len()");
let status = channel.poll_message();
match status {
Ok(Async::Ready(Some(Ok((command, message))))) => {
result = Some((command, message, channel.version(), *id));
},
Ok(Async::NotReady) => {
// no messages yet, try next channel
to_poll = next_to_poll(channels.len(), to_poll);
},
_ => {
// channel has been closed or there was error
connections.remove(*id);
to_poll = next_to_poll(channels.len(), to_poll);
},
}
}
self.last_polled = to_poll;
match result.is_some() {
true => Ok(Async::Ready(result)),
false => Ok(Async::NotReady),
}
}
}

View File

@ -1,13 +1,17 @@
mod channel;
mod config;
mod connect;
mod connection;
mod connections;
mod messages;
mod listen;
mod subscriber;
pub use self::channel::Channel;
pub use self::config::Config;
pub use self::connect::{Connect, connect};
pub use self::connection::Connection;
pub use self::connections::Connections;
pub use self::messages::MessagesHandler;
pub use self::listen::{Listen, listen};
pub use self::subscriber::Subscriber;

View File

@ -1,36 +1,45 @@
use std::sync::mpsc::{Sender, Receiver, channel};
use message::{Error, PayloadType};
use message::common::Command;
use std::mem;
use parking_lot::Mutex;
use message::{Error, Payload, Command, deserialize_payload};
use message::types::{Addr, GetAddr};
use message::serialization::deserialize_payload;
use PeerId;
struct Handler<S> {
sender: Option<Sender<S>>,
sender: Mutex<Option<Sender<(S, PeerId)>>>,
}
impl<S> Default for Handler<S> {
fn default() -> Self {
Handler {
sender: None,
sender: Mutex::default(),
}
}
}
impl<S> Handler<S> where S: PayloadType {
impl<S> Handler<S> where S: Payload {
fn command(&self) -> Command {
S::command().into()
}
fn handle(&self, payload: &[u8], version: u32) -> Result<(), Error> {
if let Some(ref sender) = self.sender {
fn handle(&self, payload: &[u8], version: u32, peerid: PeerId) -> Result<(), Error> {
let payload: S = try!(deserialize_payload(payload, version));
if let Err(_err) = sender.send(payload) {
// TODO: unsubscribe channel on error?
// TODO: trace!!!
if let Some(sender) = self.sender() {
if let Err(_err) = sender.send((payload, peerid)) {
// TODO: unsubscribe channel?
// TODO: trace
}
}
Ok(())
}
fn sender(&self) -> Option<Sender<(S, PeerId)>> {
self.sender.lock().clone()
}
fn store(&self, sender: Sender<(S, PeerId)>) {
mem::replace(&mut *self.sender.lock(), Some(sender));
}
}
#[derive(Default)]
@ -41,18 +50,18 @@ pub struct Subscriber {
macro_rules! define_subscribe {
($name: ident, $result: ident, $sub: ident) => {
pub fn $name(&mut self) -> Receiver<$result> {
pub fn $name(&self) -> Receiver<($result, PeerId)> {
let (sender, receiver) = channel();
self.$sub.sender = Some(sender);
self.$sub.store(sender);
receiver
}
}
}
macro_rules! maybe_handle {
($command: expr, $sub: expr, $payload: expr, $version: expr) => {
($command: expr, $sub: expr, $payload: expr, $version: expr, $peerid: expr) => {
if $command == $sub.command() {
return $sub.handle($payload, $version);
return $sub.handle($payload, $version, $peerid);
}
}
}
@ -61,9 +70,9 @@ impl Subscriber {
define_subscribe!(subscribe_addr, Addr, addr);
define_subscribe!(subscribe_getaddr, GetAddr, getaddr);
pub fn try_handle(&self, payload: &[u8], version: u32, command: Command) -> Result<(), Error> {
maybe_handle!(command, self.addr, payload, version);
maybe_handle!(command, self.getaddr, payload, version);
pub fn try_handle(&self, payload: &[u8], version: u32, command: Command, peerid: PeerId) -> Result<(), Error> {
maybe_handle!(command, self.addr, payload, version, peerid);
maybe_handle!(command, self.getaddr, payload, version, peerid);
Err(Error::InvalidCommand)
}
}

View File

@ -4,8 +4,8 @@ use futures::{Future, finished};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_core::reactor::Handle;
use message::PayloadType;
use net::{connect, listen, Connections, Subscriber};
use message::Payload;
use net::{connect, listen, Connections, Subscriber, MessagesHandler};
use Config;
pub struct P2P {
@ -18,7 +18,7 @@ pub struct P2P {
/// Connections.
connections: Arc<Connections>,
/// Message subscriber.
subscriber: Subscriber,
subscriber: Arc<Subscriber>,
}
impl P2P {
@ -30,7 +30,7 @@ impl P2P {
pool: pool.clone(),
config: config,
connections: Arc::new(Connections::new()),
subscriber: Subscriber::default(),
subscriber: Arc::new(Subscriber::default()),
}
}
@ -39,7 +39,9 @@ impl P2P {
self.connect(*seednode)
}
self.listen()
try!(self.listen());
self.handle_messages();
Ok(())
}
pub fn connect(&self, ip: net::IpAddr) {
@ -71,7 +73,24 @@ impl P2P {
Ok(())
}
pub fn broadcast<T>(&self, payload: T) where T: PayloadType {
fn handle_messages(&self) {
let incoming = MessagesHandler::new(Arc::downgrade(&self.connections));
let subscriber = self.subscriber.clone();
let connections = self.connections.clone();
let incoming_future = incoming.for_each(move |result| {
let (command, payload, version, peerid) = result;
if let Err(_err) = subscriber.try_handle(&payload, version, command, peerid) {
connections.remove(peerid);
}
Ok(())
}).then(|_| {
finished(())
});
let pool_work = self.pool.spawn(incoming_future);
self.event_loop_handle.spawn(pool_work);
}
pub fn broadcast<T>(&self, payload: T) where T: Payload {
Connections::broadcast(&self.connections, &self.event_loop_handle, &self.pool, payload)
}
}

View File

@ -1,42 +0,0 @@
use std::{thread, io};
use std::net::SocketAddr;
use futures::{Future, BoxFuture};
use futures::stream::Stream;
use tokio_core::reactor::Handle;
use net::{connect, listen};
use Config;
pub fn run(config: Config, handle: &Handle) -> Result<BoxFuture<(), io::Error>, io::Error> {
for seednode in config.seednodes.clone().into_iter() {
let socket = SocketAddr::new(seednode, config.connection.magic.port());
let connection = connect(&socket, &handle, &config.connection);
thread::spawn(move || {
match connection.wait() {
Ok(Ok(_connection)) => {
println!("Connected to seednode {}", seednode);
},
Ok(Err(_err)) => {
println!("Handshake failed");
},
Err(err) => {
println!("Connection failed {:?}", err);
}
}
});
}
let listen = try!(listen(&handle, config.connection));
let server = listen.for_each(|connection| {
match connection {
Ok(connection) => {
println!("new connection: {:?}", connection.version);
},
Err(_err) => {
println!("handshake failed");
}
}
Ok(())
}).boxed();
Ok(server)
}