diff --git a/Cargo.lock b/Cargo.lock index ab606ff6..b82da87f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -320,7 +320,6 @@ dependencies = [ "parking_lot 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "primitives 0.1.0", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", - "serialization 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/message/src/common/address.rs b/message/src/common/address.rs index e9a67485..f5c1371d 100644 --- a/message/src/common/address.rs +++ b/message/src/common/address.rs @@ -3,11 +3,11 @@ use ser::{ Stream, Serializable, Reader, Deserializable, Error as ReaderError, deserialize, }; -use common::{Port, IpAddress, ServiceFlags}; +use common::{Port, IpAddress, Services}; #[derive(Debug, PartialEq)] pub struct NetAddress { - pub services: ServiceFlags, + pub services: Services, pub address: IpAddress, pub port: Port, } @@ -42,7 +42,7 @@ impl From<&'static str> for NetAddress { #[cfg(test)] mod tests { use ser::{serialize, deserialize}; - use common::ServiceFlags; + use common::Services; use super::NetAddress; #[test] @@ -54,7 +54,7 @@ mod tests { ].into(); let address = NetAddress { - services: ServiceFlags::default().with_network(true), + services: Services::default().with_network(true), address: "::ffff:a00:1".into(), port: 8333.into(), }; @@ -71,7 +71,7 @@ mod tests { ]; let expected = NetAddress { - services: ServiceFlags::default().with_network(true), + services: Services::default().with_network(true), address: "::ffff:a00:1".into(), port: 8333.into(), }; @@ -82,7 +82,7 @@ mod tests { #[test] fn test_net_address_from_static_str() { let expected = NetAddress { - services: ServiceFlags::default().with_network(true), + services: Services::default().with_network(true), address: "::ffff:a00:1".into(), port: 8333.into(), diff --git a/message/src/common/mod.rs b/message/src/common/mod.rs index a6164156..08bbc1e3 100644 --- a/message/src/common/mod.rs +++ b/message/src/common/mod.rs @@ -20,4 +20,4 @@ pub use self::ip::IpAddress; pub use self::magic::Magic; pub use self::port::Port; pub use self::prefilled_transaction::PrefilledTransaction; -pub use self::service::ServiceFlags; +pub use self::service::Services; diff --git a/message/src/common/service.rs b/message/src/common/service.rs index 209ef35c..c3699218 100644 --- a/message/src/common/service.rs +++ b/message/src/common/service.rs @@ -3,23 +3,22 @@ use ser::{ Deserializable, Reader, Error as ReaderError }; -#[derive(Debug, Default, PartialEq, Clone, Copy)] -pub struct ServiceFlags(u64); +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] +pub struct Services(u64); -impl From for u64 { - fn from(s: ServiceFlags) -> Self { +impl From for u64 { + fn from(s: Services) -> Self { s.0 } } -impl From for ServiceFlags { +impl From for Services { fn from(v: u64) -> Self { - ServiceFlags(v) + Services(v) } } - -impl ServiceFlags { +impl Services { pub fn network(&self) -> bool { self.bit_at(0) } @@ -63,7 +62,11 @@ impl ServiceFlags { pub fn with_xthin(mut self, v: bool) -> Self { self.set_bit(4, v); self - } + } + + pub fn includes(&self, other: &Self) -> bool { + self.0 & other.0 == other.0 + } fn set_bit(&mut self, bit: usize, bit_value: bool) { if bit_value { @@ -78,14 +81,35 @@ impl ServiceFlags { } } -impl Serializable for ServiceFlags { +impl Serializable for Services { fn serialize(&self, stream: &mut Stream) { stream.append(&self.0); } } -impl Deserializable for ServiceFlags { +impl Deserializable for Services { fn deserialize(reader: &mut Reader) -> Result where Self: Sized { - reader.read().map(ServiceFlags) + reader.read().map(Services) + } +} + +#[cfg(test)] +mod test { + use super::Services; + + #[test] + fn test_serivces_includes() { + let s1 = Services::default() + .with_witness(true) + .with_xthin(true); + let s2 = Services::default() + .with_witness(true); + + assert!(s1.witness()); + assert!(s1.xthin()); + assert!(s2.witness()); + assert!(!s2.xthin()); + assert!(s1.includes(&s2)); + assert!(!s2.includes(&s1)); } } diff --git a/message/src/serialization/stream.rs b/message/src/serialization/stream.rs index 3b1f52e3..cd4effd8 100644 --- a/message/src/serialization/stream.rs +++ b/message/src/serialization/stream.rs @@ -29,10 +29,6 @@ impl PayloadStream { t.serialize_payload(&mut self.stream, self.version) } - pub fn raw_stream(&mut self) -> &mut Stream { - &mut self.stream - } - pub fn out(self) -> Bytes { self.stream.out() } diff --git a/message/src/types/version.rs b/message/src/types/version.rs index f4d019b5..f66a8bfb 100644 --- a/message/src/types/version.rs +++ b/message/src/types/version.rs @@ -3,7 +3,7 @@ use ser::{ Serializable, Stream, Deserializable, Reader, Error as ReaderError, }; -use common::{NetAddress, ServiceFlags}; +use common::{NetAddress, Services}; use {Payload, MessageResult}; use serialization::deserialize_payload; @@ -69,12 +69,20 @@ impl Version { Version::V70001(ref s, _, _) => s.version, } } + + pub fn services(&self) -> Services { + match *self { + Version::V0(ref s) => s.services, + Version::V106(ref s, _) => s.services, + Version::V70001(ref s, _, _) => s.services, + } + } } #[derive(Debug, PartialEq)] pub struct V0 { pub version: u32, - pub services: ServiceFlags, + pub services: Services, pub timestamp: i64, pub receiver: NetAddress, } diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 8300acec..def2e02e 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -13,5 +13,4 @@ rand = "0.3" primitives = { path = "../primitives" } bitcrypto = { path = "../crypto" } -serialization = { path = "../serialization" } message = { path = "../message" } diff --git a/p2p/src/config.rs b/p2p/src/config.rs index 3b0b96e1..5cd0c5d8 100644 --- a/p2p/src/config.rs +++ b/p2p/src/config.rs @@ -3,10 +3,20 @@ use net::Config as NetConfig; #[derive(Debug)] pub struct Config { + /// Number of threads used by p2p thread pool. + pub threads: usize, + /// Lowest supported protocol version. + pub protocol_minimum: u32, + /// Highest supported protocol version. + pub protocol_maximum: u32, + /// Number of inbound connections. + pub inbound_connections: usize, + /// Number of outbound connections. + pub outbound_connections: usize, /// Configuration for every connection. pub connection: NetConfig, - /// Connect to these nodes to retrieve peer addresses, and disconnect. - pub seednodes: Vec, /// Connect only ot these nodes. - pub limited_connect: Option>, + pub peers: Vec, + /// Connect to these nodes to retrieve peer addresses, and disconnect. + pub seeds: Vec, } diff --git a/p2p/src/io/sharedtcpstream.rs b/p2p/src/io/sharedtcpstream.rs index 752e5379..2e797d3e 100644 --- a/p2p/src/io/sharedtcpstream.rs +++ b/p2p/src/io/sharedtcpstream.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::net::Shutdown; use std::io::{Read, Write, Error}; use tokio_core::net::TcpStream; @@ -12,6 +13,11 @@ impl SharedTcpStream { io: a, } } + + pub fn shutdown(&self) { + // error is irrelevant here, the connection is dropped anyway + let _ = self.io.shutdown(Shutdown::Both); + } } impl From for SharedTcpStream { diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index d4ec4351..c786dc9c 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -9,10 +9,11 @@ extern crate parking_lot; extern crate bitcrypto as crypto; extern crate message; extern crate primitives; -extern crate serialization as ser; pub mod io; pub mod net; +pub mod protocol; +pub mod session; pub mod util; mod config; mod event_loop; @@ -26,6 +27,5 @@ pub use primitives::{hash, bytes}; pub use config::Config; pub use event_loop::{event_loop, forever}; pub use p2p::P2P; - -pub type PeerId = usize; +pub use util::{PeerId, PeerInfo}; diff --git a/p2p/src/net/channel.rs b/p2p/src/net/channel.rs index b5d6b620..07853643 100644 --- a/p2p/src/net/channel.rs +++ b/p2p/src/net/channel.rs @@ -3,33 +3,53 @@ use futures::Poll; use futures::stream::Stream; use parking_lot::Mutex; use bytes::Bytes; -use message::{MessageResult, Payload, Command}; +use message::{MessageResult, Payload, Command, Magic, Message}; use net::Connection; -use io::{read_message_stream, ReadMessageStream, SharedTcpStream, WriteMessage}; +use io::{read_message_stream, ReadMessageStream, SharedTcpStream, WriteMessage, write_message}; +use {PeerId, PeerInfo}; pub struct Channel { - connection: Connection, - message_stream: Mutex>, + version: u32, + magic: Magic, + peer_info: PeerInfo, + write_stream: SharedTcpStream, + read_stream: Mutex>, } impl Channel { - pub fn new(connection: Connection) -> Self { + pub fn new(connection: Connection, peer_id: PeerId) -> Self { let stream = read_message_stream(connection.stream.clone(), connection.magic); Channel { - connection: connection, - message_stream: Mutex::new(stream), + version: connection.version, + magic: connection.magic, + peer_info: PeerInfo { + address: connection.address, + id: peer_id, + }, + write_stream: connection.stream, + read_stream: Mutex::new(stream), } } pub fn write_message(&self, payload: &T) -> WriteMessage where T: Payload { - self.connection.write_message(payload) + // TODO: some tracing here + let message = Message::new(self.magic, self.version, payload).expect("failed to create outgoing message"); + write_message(self.write_stream.clone(), message) } pub fn poll_message(&self) -> Poll)>, io::Error> { - self.message_stream.lock().poll() + self.read_stream.lock().poll() + } + + pub fn shutdown(&self) { + self.write_stream.shutdown(); } pub fn version(&self) -> u32 { - self.connection.version + self.version + } + + pub fn peer_info(&self) -> PeerInfo { + self.peer_info } } diff --git a/p2p/src/net/config.rs b/p2p/src/net/config.rs index 6bef7b66..5f620abb 100644 --- a/p2p/src/net/config.rs +++ b/p2p/src/net/config.rs @@ -1,5 +1,5 @@ use std::net::SocketAddr; -use message::common::{Magic, ServiceFlags, NetAddress}; +use message::common::{Magic, Services, NetAddress}; use message::types::version::{Version, V0, V106, V70001}; use util::time::{Time, RealTime}; use util::nonce::{NonceGenerator, RandomNonce}; @@ -9,7 +9,7 @@ use VERSION; pub struct Config { pub magic: Magic, pub local_address: SocketAddr, - pub services: ServiceFlags, + pub services: Services, pub user_agent: String, pub start_height: i32, pub relay: bool, diff --git a/p2p/src/net/connect.rs b/p2p/src/net/connect.rs index 5c850910..3667fd92 100644 --- a/p2p/src/net/connect.rs +++ b/p2p/src/net/connect.rs @@ -57,6 +57,7 @@ impl Future for Connect { stream: stream.into(), version: result.negotiated_version, magic: self.magic, + services: result.version.services(), address: self.address, }; (ConnectState::Connected, Async::Ready(Ok(connection))) diff --git a/p2p/src/net/connection.rs b/p2p/src/net/connection.rs index ac20b616..603d58d6 100644 --- a/p2p/src/net/connection.rs +++ b/p2p/src/net/connection.rs @@ -1,23 +1,12 @@ use std::net; -use message::{Message, Payload, Magic}; -use io::{write_message, WriteMessage, SharedTcpStream}; +use message::Magic; +use message::common::Services; +use io::SharedTcpStream; pub struct Connection { pub stream: SharedTcpStream, pub version: u32, pub magic: Magic, + pub services: Services, pub address: net::SocketAddr, } - -impl Connection { - pub fn write_message(&self, payload: &T) -> WriteMessage where T: Payload { - let message = match Message::new(self.magic, self.version, payload) { - Ok(message) => message, - Err(_err) => { - // trace here! outgoing messages should always be written properly - panic!(); - } - }; - write_message(self.stream.clone(), message) - } -} diff --git a/p2p/src/net/connections.rs b/p2p/src/net/connections.rs index 0390844c..8657cb0d 100644 --- a/p2p/src/net/connections.rs +++ b/p2p/src/net/connections.rs @@ -2,15 +2,13 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::collections::HashMap; use parking_lot::RwLock; -use futures::{finished, Future}; -use futures_cpupool::CpuPool; -use tokio_core::reactor::Handle; -use message::Payload; use net::{Connection, Channel}; use PeerId; pub struct Connections { + /// Incremental peer counter. peer_counter: AtomicUsize, + /// All open connections. channels: RwLock>>, } @@ -22,28 +20,6 @@ impl Connections { } } - /// Broadcast messages to the network. - /// Returned future completes of first confirmed receive. - pub fn broadcast(connections: &Arc, handle: &Handle, pool: &CpuPool, payload: T) where T: Payload { - let channels = connections.channels(); - for (id, channel) in channels.into_iter() { - let write = channel.write_message(&payload); - let cs = connections.clone(); - let pool_work = pool.spawn(write).then(move |x| { - match x { - Ok(_) => { - // successfully sent message - }, - Err(_) => { - cs.remove(id); - } - } - finished(()) - }); - handle.spawn(pool_work); - } - } - /// Returns safe (nonblocking) copy of channels. pub fn channels(&self) -> HashMap> { self.channels.read().clone() @@ -57,11 +33,13 @@ impl Connections { /// Stores new channel. pub fn store(&self, connection: Connection) { let id = self.peer_counter.fetch_add(1, Ordering::AcqRel); - self.channels.write().insert(id, Arc::new(Channel::new(connection))); + self.channels.write().insert(id, Arc::new(Channel::new(connection, id))); } /// Removes channel with given id. pub fn remove(&self, id: PeerId) { - self.channels.write().remove(&id); + if let Some(channel) = self.channels.write().remove(&id) { + channel.shutdown(); + } } } diff --git a/p2p/src/net/listen.rs b/p2p/src/net/listen.rs index d5a89ef5..5d803f8f 100644 --- a/p2p/src/net/listen.rs +++ b/p2p/src/net/listen.rs @@ -52,6 +52,7 @@ impl Future for AcceptConnection { stream: stream.into(), version: result.negotiated_version, magic: self.magic, + services: result.version.services(), address: self.address, }; Ok(Ok(connection).into()) diff --git a/p2p/src/net/messages.rs b/p2p/src/net/messages.rs index 1c654f27..4360a2e5 100644 --- a/p2p/src/net/messages.rs +++ b/p2p/src/net/messages.rs @@ -5,15 +5,28 @@ use futures::{Poll, Async}; use futures::stream::Stream; use message::common::Command; use net::Connections; -use PeerId; +use PeerInfo; -pub struct MessagesHandler { +pub enum MessagePoll { + Ready { + command: Command, + payload: Bytes, + version: u32, + peer_info: PeerInfo, + errored_peers: Vec, + }, + OnlyErrors { + errored_peers: Vec, + } +} + +pub struct MessagePoller { last_polled: usize, connections: Weak, } fn next_to_poll(channels: usize, last_polled: usize) -> usize { - // it's irrelevant if we sometimes poll the same peer + // it's irrelevant if we sometimes poll the same peer twice in a row if channels > last_polled + 1 { // let's poll the next peer last_polled + 1 @@ -23,17 +36,17 @@ fn next_to_poll(channels: usize, last_polled: usize) -> usize { } } -impl MessagesHandler { +impl MessagePoller { pub fn new(connections: Weak) -> Self { - MessagesHandler { + MessagePoller { last_polled: usize::max_value(), connections: connections, } } } -impl Stream for MessagesHandler { - type Item = (Command, Bytes, u32, PeerId); +impl Stream for MessagePoller { + type Item = MessagePoll; type Error = io::Error; fn poll(&mut self) -> Poll, Self::Error> { @@ -50,14 +63,15 @@ impl Stream for MessagesHandler { let mut to_poll = next_to_poll(channels.len(), self.last_polled); let mut result = None; + let mut errored_peers = Vec::new(); while result.is_none() && to_poll != self.last_polled { - let (id, channel) = channels.iter().nth(to_poll).expect("to_poll < channels.len()"); + let (_, channel) = channels.iter().nth(to_poll).expect("to_poll < channels.len()"); let status = channel.poll_message(); match status { - Ok(Async::Ready(Some(Ok((command, message))))) => { - result = Some((command, message, channel.version(), *id)); + Ok(Async::Ready(Some(Ok((command, payload))))) => { + result = Some((command, payload, channel.version(), channel.peer_info())); }, Ok(Async::NotReady) => { // no messages yet, try next channel @@ -65,16 +79,32 @@ impl Stream for MessagesHandler { }, _ => { // channel has been closed or there was error - connections.remove(*id); + errored_peers.push(channel.peer_info()); to_poll = next_to_poll(channels.len(), to_poll); }, } } self.last_polled = to_poll; - match result.is_some() { - true => Ok(Async::Ready(result)), - false => Ok(Async::NotReady), + match result { + Some((command, payload, version, info)) => { + let message_poll = MessagePoll::Ready { + command: command, + payload: payload, + version: version, + peer_info: info, + errored_peers: errored_peers, + }; + + Ok(Async::Ready(Some(message_poll))) + }, + None if errored_peers.is_empty() => Ok(Async::NotReady), + _ => { + let message_poll = MessagePoll::OnlyErrors { + errored_peers: errored_peers, + }; + Ok(Async::Ready(Some(message_poll))) + } } } } diff --git a/p2p/src/net/mod.rs b/p2p/src/net/mod.rs index beb49bff..6aea5c89 100644 --- a/p2p/src/net/mod.rs +++ b/p2p/src/net/mod.rs @@ -5,13 +5,11 @@ mod connection; mod connections; mod messages; mod listen; -mod subscriber; pub use self::channel::Channel; pub use self::config::Config; pub use self::connect::{Connect, connect}; pub use self::connection::Connection; pub use self::connections::Connections; -pub use self::messages::MessagesHandler; +pub use self::messages::{MessagePoller, MessagePoll}; pub use self::listen::{Listen, listen}; -pub use self::subscriber::Subscriber; diff --git a/p2p/src/net/subscriber.rs b/p2p/src/net/subscriber.rs deleted file mode 100644 index 7d234320..00000000 --- a/p2p/src/net/subscriber.rs +++ /dev/null @@ -1,78 +0,0 @@ -use std::sync::mpsc::{Sender, Receiver, channel}; -use std::mem; -use parking_lot::Mutex; -use message::{Error, Payload, Command, deserialize_payload}; -use message::types::{Addr, GetAddr}; -use PeerId; - -struct Handler { - sender: Mutex>>, -} - -impl Default for Handler { - fn default() -> Self { - Handler { - sender: Mutex::default(), - } - } -} - -impl Handler where S: Payload { - fn command(&self) -> Command { - S::command().into() - } - - fn handle(&self, payload: &[u8], version: u32, peerid: PeerId) -> Result<(), Error> { - let payload: S = try!(deserialize_payload(payload, version)); - if let Some(sender) = self.sender() { - if let Err(_err) = sender.send((payload, peerid)) { - // TODO: unsubscribe channel? - // TODO: trace - } - } - Ok(()) - } - - fn sender(&self) -> Option> { - self.sender.lock().clone() - } - - fn store(&self, sender: Sender<(S, PeerId)>) { - mem::replace(&mut *self.sender.lock(), Some(sender)); - } -} - -#[derive(Default)] -pub struct Subscriber { - addr: Handler, - getaddr: Handler, -} - -macro_rules! define_subscribe { - ($name: ident, $result: ident, $sub: ident) => { - pub fn $name(&self) -> Receiver<($result, PeerId)> { - let (sender, receiver) = channel(); - self.$sub.store(sender); - receiver - } - } -} - -macro_rules! maybe_handle { - ($command: expr, $sub: expr, $payload: expr, $version: expr, $peerid: expr) => { - if $command == $sub.command() { - return $sub.handle($payload, $version, $peerid); - } - } -} - -impl Subscriber { - define_subscribe!(subscribe_addr, Addr, addr); - define_subscribe!(subscribe_getaddr, GetAddr, getaddr); - - pub fn try_handle(&self, payload: &[u8], version: u32, command: Command, peerid: PeerId) -> Result<(), Error> { - maybe_handle!(command, self.addr, payload, version, peerid); - maybe_handle!(command, self.getaddr, payload, version, peerid); - Err(Error::InvalidCommand) - } -} diff --git a/p2p/src/p2p.rs b/p2p/src/p2p.rs index 26966950..877bc806 100644 --- a/p2p/src/p2p.rs +++ b/p2p/src/p2p.rs @@ -1,12 +1,14 @@ use std::{io, net}; use std::sync::Arc; +use parking_lot::RwLock; use futures::{Future, finished}; use futures::stream::Stream; use futures_cpupool::CpuPool; use tokio_core::reactor::Handle; use message::Payload; -use net::{connect, listen, Connections, Subscriber, MessagesHandler}; -use Config; +use net::{connect, listen, Connections, MessagePoller, MessagePoll, Channel}; +use util::NodeTable; +use {Config, PeerId}; pub struct P2P { /// Global event loop handle. @@ -17,40 +19,44 @@ pub struct P2P { config: Config, /// Connections. connections: Arc, - /// Message subscriber. - subscriber: Arc, + /// Node Table. + node_table: Arc>, } impl P2P { pub fn new(config: Config, handle: Handle) -> Self { - let pool = CpuPool::new(4); + let pool = CpuPool::new(config.threads); P2P { event_loop_handle: handle.clone(), pool: pool.clone(), config: config, connections: Arc::new(Connections::new()), - subscriber: Arc::new(Subscriber::default()), + node_table: Arc::default(), } } pub fn run(&self) -> Result<(), io::Error> { - for seednode in self.config.seednodes.iter() { + for seednode in self.config.peers.iter() { self.connect(*seednode) } try!(self.listen()); - self.handle_messages(); + self.attach_protocols(); Ok(()) } pub fn connect(&self, ip: net::IpAddr) { let socket = net::SocketAddr::new(ip, self.config.connection.magic.port()); let connections = self.connections.clone(); + let node_table = self.node_table.clone(); let connection = connect(&socket, &self.event_loop_handle, &self.config.connection); - let pool_work = self.pool.spawn(connection).then(move |x| { - if let Ok(Ok(con)) = x { + let pool_work = self.pool.spawn(connection).then(move |result| { + if let Ok(Ok(con)) = result { + node_table.write().insert(con.address, con.services); connections.store(con); + } else { + node_table.write().note_failure(&socket); } finished(()) }); @@ -60,8 +66,10 @@ impl P2P { fn listen(&self) -> Result<(), io::Error> { let listen = try!(listen(&self.event_loop_handle, self.config.connection.clone())); let connections = self.connections.clone(); - let server = listen.for_each(move |x| { - if let Ok(con) = x { + let node_table = self.node_table.clone(); + let server = listen.for_each(move |result| { + if let Ok(con) = result { + node_table.write().insert(con.address, con.services); connections.store(con); } Ok(()) @@ -73,24 +81,70 @@ impl P2P { Ok(()) } - fn handle_messages(&self) { - let incoming = MessagesHandler::new(Arc::downgrade(&self.connections)); - let subscriber = self.subscriber.clone(); + fn attach_protocols(&self) { + // TODO: here all network protocols will be attached + + let poller = MessagePoller::new(Arc::downgrade(&self.connections)); let connections = self.connections.clone(); - let incoming_future = incoming.for_each(move |result| { - let (command, payload, version, peerid) = result; - if let Err(_err) = subscriber.try_handle(&payload, version, command, peerid) { - connections.remove(peerid); + let node_table = self.node_table.clone(); + let polling = poller.for_each(move |result| { + match result { + MessagePoll::Ready { errored_peers, .. } => { + // TODO: handle new messasges here! + + let mut node_table = node_table.write(); + for peer in errored_peers.into_iter() { + node_table.note_failure(&peer.address); + connections.remove(peer.id); + } + }, + MessagePoll::OnlyErrors { errored_peers } => { + let mut node_table = node_table.write(); + for peer in errored_peers.into_iter() { + node_table.note_failure(&peer.address); + connections.remove(peer.id); + } + } } Ok(()) }).then(|_| { finished(()) }); - let pool_work = self.pool.spawn(incoming_future); + let pool_work = self.pool.spawn(polling); self.event_loop_handle.spawn(pool_work); } pub fn broadcast(&self, payload: T) where T: Payload { - Connections::broadcast(&self.connections, &self.event_loop_handle, &self.pool, payload) + let channels = self.connections.channels(); + for (_id, channel) in channels.into_iter() { + self.send_to_channel(&payload, &channel); + } + } + + pub fn send(&self, payload: T, peer: PeerId) where T: Payload { + let channels = self.connections.channels(); + if let Some(channel) = channels.get(&peer) { + self.send_to_channel(&payload, channel); + } + } + + fn send_to_channel(&self, payload: &T, channel: &Arc) where T: Payload { + let connections = self.connections.clone(); + let node_table = self.node_table.clone(); + let peer_info = channel.peer_info(); + let write = channel.write_message(payload); + let pool_work = self.pool.spawn(write).then(move |result| { + match result { + Ok(_) => { + node_table.write().note_used(&peer_info.address); + }, + Err(_err) => { + node_table.write().note_failure(&peer_info.address); + connections.remove(peer_info.id); + } + } + finished(()) + }); + self.event_loop_handle.spawn(pool_work); } } diff --git a/p2p/src/protocol/mod.rs b/p2p/src/protocol/mod.rs new file mode 100644 index 00000000..b32533ba --- /dev/null +++ b/p2p/src/protocol/mod.rs @@ -0,0 +1 @@ +mod ping; diff --git a/p2p/src/protocol/ping.rs b/p2p/src/protocol/ping.rs new file mode 100644 index 00000000..e69de29b diff --git a/p2p/src/session/manual.rs b/p2p/src/session/manual.rs new file mode 100644 index 00000000..e69de29b diff --git a/p2p/src/session/mod.rs b/p2p/src/session/mod.rs new file mode 100644 index 00000000..284f5412 --- /dev/null +++ b/p2p/src/session/mod.rs @@ -0,0 +1,3 @@ +mod manual; +mod normal; +mod seednode; diff --git a/p2p/src/session/normal.rs b/p2p/src/session/normal.rs new file mode 100644 index 00000000..e69de29b diff --git a/p2p/src/session/seednode.rs b/p2p/src/session/seednode.rs new file mode 100644 index 00000000..e69de29b diff --git a/p2p/src/util/mod.rs b/p2p/src/util/mod.rs index 1154bb2d..5366572a 100644 --- a/p2p/src/util/mod.rs +++ b/p2p/src/util/mod.rs @@ -1,2 +1,7 @@ pub mod nonce; pub mod time; +mod node_table; +mod peer; + +pub use self::node_table::{NodeTable, Node}; +pub use self::peer::{PeerId, PeerInfo}; diff --git a/p2p/src/util/node_table.rs b/p2p/src/util/node_table.rs new file mode 100644 index 00000000..5cf296a2 --- /dev/null +++ b/p2p/src/util/node_table.rs @@ -0,0 +1,161 @@ +use std::collections::{HashMap, BTreeSet}; +use std::net::SocketAddr; +use std::cmp::{PartialOrd, Ord, Ordering}; +use message::common::Services; +use util::time::{Time, RealTime}; + +#[derive(PartialEq, Eq, Clone)] +pub struct Node { + /// Node address. + addr: SocketAddr, + /// Timestamp of last interaction with a node. + time: i64, + /// Services supported by the node. + services: Services, + /// Node failures counter. + failures: u32, +} + +impl PartialOrd for Node { + fn partial_cmp(&self, other: &Self) -> Option { + if self.failures == other.failures { + self.time.partial_cmp(&other.time) + } else { + other.failures.partial_cmp(&self.failures) + } + } +} + +impl Ord for Node { + fn cmp(&self, other: &Self) -> Ordering { + if self.failures == other.failures { + self.time.cmp(&other.time) + } else { + other.failures.cmp(&self.failures) + } + } +} + +#[derive(Default)] +pub struct NodeTable where T: Time { + /// Time source. + time: T, + /// Nodes by socket address. + by_addr: HashMap, + /// Nodes sorted by score. + by_score: BTreeSet, +} + +impl NodeTable where T: Time { + /// Inserts new address and services pair into NodeTable. + pub fn insert(&mut self, addr: SocketAddr, services: Services) { + let failures = self.by_addr.get(&addr).map_or(0, |ref node| node.failures); + + let node = Node { + addr: addr, + time: self.time.get().sec, + services: services, + failures: failures, + }; + + self.by_addr.insert(addr, node.clone()); + self.by_score.insert(node); + } + + /// Returnes most reliable nodes with desired services. + pub fn nodes_with_services(&self, services: &Services, limit: usize) -> Vec { + self.by_score.iter() + .rev() + .filter(|s| s.services.includes(services)) + .map(Clone::clone) + .take(limit) + .collect() + } + + /// Marks address as recently used. + pub fn note_used(&mut self, addr: &SocketAddr) { + if let Some(ref mut node) = self.by_addr.get_mut(addr) { + assert!(self.by_score.remove(node)); + node.time = self.time.get().sec; + self.by_score.insert(node.clone()); + } + } + + /// Notes failure. + pub fn note_failure(&mut self, addr: &SocketAddr) { + if let Some(ref mut node) = self.by_addr.get_mut(addr) { + assert!(self.by_score.remove(node)); + node.failures += 1; + self.by_score.insert(node.clone()); + } + } +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + use message::common::Services; + use util::time::IncrementalTime; + use super::NodeTable; + + #[test] + fn test_node_table_insert() { + let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap(); + let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap(); + let s2: SocketAddr = "127.0.0.1:8002".parse().unwrap(); + let mut table = NodeTable::::default(); + table.insert(s0, Services::default()); + table.insert(s1, Services::default()); + table.insert(s2, Services::default()); + let nodes = table.nodes_with_services(&Services::default(), 2); + assert_eq!(nodes.len(), 2); + assert_eq!(nodes[0].addr, s2); + assert_eq!(nodes[0].time, 2); + assert_eq!(nodes[0].failures, 0); + assert_eq!(nodes[1].addr, s1); + assert_eq!(nodes[1].time, 1); + assert_eq!(nodes[1].failures, 0); + } + + #[test] + fn test_node_table_note() { + let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap(); + let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap(); + let s2: SocketAddr = "127.0.0.1:8002".parse().unwrap(); + let s3: SocketAddr = "127.0.0.1:8003".parse().unwrap(); + let s4: SocketAddr = "127.0.0.1:8004".parse().unwrap(); + let mut table = NodeTable::::default(); + table.insert(s0, Services::default()); + table.insert(s1, Services::default()); + table.insert(s2, Services::default()); + table.insert(s3, Services::default()); + table.insert(s4, Services::default()); + table.note_used(&s2); + table.note_used(&s4); + table.note_used(&s1); + table.note_failure(&s2); + table.note_failure(&s3); + let nodes = table.nodes_with_services(&Services::default(), 10); + assert_eq!(nodes.len(), 5); + + assert_eq!(nodes[0].addr, s1); + assert_eq!(nodes[0].time, 7); + assert_eq!(nodes[0].failures, 0); + + assert_eq!(nodes[1].addr, s4); + assert_eq!(nodes[1].time, 6); + assert_eq!(nodes[1].failures, 0); + + assert_eq!(nodes[2].addr, s0); + assert_eq!(nodes[2].time, 0); + assert_eq!(nodes[2].failures, 0); + + assert_eq!(nodes[3].addr, s2); + assert_eq!(nodes[3].time, 5); + assert_eq!(nodes[3].failures, 1); + + assert_eq!(nodes[4].addr, s3); + assert_eq!(nodes[4].time, 3); + assert_eq!(nodes[4].failures, 1); + } +} diff --git a/p2p/src/util/peer.rs b/p2p/src/util/peer.rs new file mode 100644 index 00000000..1d0dbc6e --- /dev/null +++ b/p2p/src/util/peer.rs @@ -0,0 +1,10 @@ +use std::net::SocketAddr; + +pub type PeerId = usize; + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub struct PeerInfo { + pub id: PeerId, + pub address: SocketAddr, +} + diff --git a/p2p/src/util/time.rs b/p2p/src/util/time.rs index cca37f45..58b6c8af 100644 --- a/p2p/src/util/time.rs +++ b/p2p/src/util/time.rs @@ -1,3 +1,4 @@ +use std::cell::Cell; use time; pub trait Time { @@ -26,3 +27,17 @@ impl Time for StaticTime { self.0 } } + +#[derive(Default)] +pub struct IncrementalTime { + counter: Cell, +} + +impl Time for IncrementalTime { + fn get(&self) -> time::Timespec { + let c = self.counter.get(); + let result = time::Timespec::new(c, 0); + self.counter.set(c + 1); + result + } +} diff --git a/pbtc/main.rs b/pbtc/main.rs index c060e6a9..64cfbb94 100644 --- a/pbtc/main.rs +++ b/pbtc/main.rs @@ -28,6 +28,11 @@ fn run() -> Result<(), String> { let mut el = event_loop(); let p2p_cfg = p2p::Config { + threads: 4, + protocol_minimum: 70001, + protocol_maximum: 70017, + inbound_connections: 10, + outbound_connections: 10, connection: net::Config { magic: cfg.magic, local_address: SocketAddr::new("127.0.0.1".parse().unwrap(), cfg.port), @@ -36,8 +41,8 @@ fn run() -> Result<(), String> { start_height: 0, relay: false, }, - seednodes: cfg.seednode.map_or_else(|| vec![], |x| vec![x]), - limited_connect: cfg.connect.map_or(None, |x| Some(vec![x])), + peers: cfg.connect.map_or_else(|| vec![], |x| vec![x]), + seeds: cfg.seednode.map_or_else(|| vec![], |x| vec![x]), }; let p2p = P2P::new(p2p_cfg, el.handle()); diff --git a/tools/graph.dot b/tools/graph.dot index b748cf12..67c27066 100644 --- a/tools/graph.dot +++ b/tools/graph.dot @@ -107,7 +107,6 @@ digraph dependencies { N6 -> N4[label="",style=dashed]; N6 -> N13[label="",style=dashed]; N6 -> N14[label="",style=dashed]; - N6 -> N22[label="",style=dashed]; N6 -> N33[label="",style=dashed]; N6 -> N36[label="",style=dashed]; N6 -> N39[label="",style=dashed]; diff --git a/tools/graph.png b/tools/graph.png index aa0d3927..0339e1e6 100644 Binary files a/tools/graph.png and b/tools/graph.png differ