commit
89e08c42c6
|
@ -320,7 +320,6 @@ dependencies = [
|
|||
"parking_lot 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"primitives 0.1.0",
|
||||
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serialization 0.1.0",
|
||||
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
|
|
@ -3,11 +3,11 @@ use ser::{
|
|||
Stream, Serializable,
|
||||
Reader, Deserializable, Error as ReaderError, deserialize,
|
||||
};
|
||||
use common::{Port, IpAddress, ServiceFlags};
|
||||
use common::{Port, IpAddress, Services};
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct NetAddress {
|
||||
pub services: ServiceFlags,
|
||||
pub services: Services,
|
||||
pub address: IpAddress,
|
||||
pub port: Port,
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ impl From<&'static str> for NetAddress {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use ser::{serialize, deserialize};
|
||||
use common::ServiceFlags;
|
||||
use common::Services;
|
||||
use super::NetAddress;
|
||||
|
||||
#[test]
|
||||
|
@ -54,7 +54,7 @@ mod tests {
|
|||
].into();
|
||||
|
||||
let address = NetAddress {
|
||||
services: ServiceFlags::default().with_network(true),
|
||||
services: Services::default().with_network(true),
|
||||
address: "::ffff:a00:1".into(),
|
||||
port: 8333.into(),
|
||||
};
|
||||
|
@ -71,7 +71,7 @@ mod tests {
|
|||
];
|
||||
|
||||
let expected = NetAddress {
|
||||
services: ServiceFlags::default().with_network(true),
|
||||
services: Services::default().with_network(true),
|
||||
address: "::ffff:a00:1".into(),
|
||||
port: 8333.into(),
|
||||
};
|
||||
|
@ -82,7 +82,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_net_address_from_static_str() {
|
||||
let expected = NetAddress {
|
||||
services: ServiceFlags::default().with_network(true),
|
||||
services: Services::default().with_network(true),
|
||||
address: "::ffff:a00:1".into(),
|
||||
port: 8333.into(),
|
||||
|
||||
|
|
|
@ -20,4 +20,4 @@ pub use self::ip::IpAddress;
|
|||
pub use self::magic::Magic;
|
||||
pub use self::port::Port;
|
||||
pub use self::prefilled_transaction::PrefilledTransaction;
|
||||
pub use self::service::ServiceFlags;
|
||||
pub use self::service::Services;
|
||||
|
|
|
@ -3,23 +3,22 @@ use ser::{
|
|||
Deserializable, Reader, Error as ReaderError
|
||||
};
|
||||
|
||||
#[derive(Debug, Default, PartialEq, Clone, Copy)]
|
||||
pub struct ServiceFlags(u64);
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct Services(u64);
|
||||
|
||||
impl From<ServiceFlags> for u64 {
|
||||
fn from(s: ServiceFlags) -> Self {
|
||||
impl From<Services> for u64 {
|
||||
fn from(s: Services) -> Self {
|
||||
s.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<u64> for ServiceFlags {
|
||||
impl From<u64> for Services {
|
||||
fn from(v: u64) -> Self {
|
||||
ServiceFlags(v)
|
||||
Services(v)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl ServiceFlags {
|
||||
impl Services {
|
||||
pub fn network(&self) -> bool {
|
||||
self.bit_at(0)
|
||||
}
|
||||
|
@ -63,7 +62,11 @@ impl ServiceFlags {
|
|||
pub fn with_xthin(mut self, v: bool) -> Self {
|
||||
self.set_bit(4, v);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn includes(&self, other: &Self) -> bool {
|
||||
self.0 & other.0 == other.0
|
||||
}
|
||||
|
||||
fn set_bit(&mut self, bit: usize, bit_value: bool) {
|
||||
if bit_value {
|
||||
|
@ -78,14 +81,35 @@ impl ServiceFlags {
|
|||
}
|
||||
}
|
||||
|
||||
impl Serializable for ServiceFlags {
|
||||
impl Serializable for Services {
|
||||
fn serialize(&self, stream: &mut Stream) {
|
||||
stream.append(&self.0);
|
||||
}
|
||||
}
|
||||
|
||||
impl Deserializable for ServiceFlags {
|
||||
impl Deserializable for Services {
|
||||
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
|
||||
reader.read().map(ServiceFlags)
|
||||
reader.read().map(Services)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::Services;
|
||||
|
||||
#[test]
|
||||
fn test_serivces_includes() {
|
||||
let s1 = Services::default()
|
||||
.with_witness(true)
|
||||
.with_xthin(true);
|
||||
let s2 = Services::default()
|
||||
.with_witness(true);
|
||||
|
||||
assert!(s1.witness());
|
||||
assert!(s1.xthin());
|
||||
assert!(s2.witness());
|
||||
assert!(!s2.xthin());
|
||||
assert!(s1.includes(&s2));
|
||||
assert!(!s2.includes(&s1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,10 +29,6 @@ impl PayloadStream {
|
|||
t.serialize_payload(&mut self.stream, self.version)
|
||||
}
|
||||
|
||||
pub fn raw_stream(&mut self) -> &mut Stream {
|
||||
&mut self.stream
|
||||
}
|
||||
|
||||
pub fn out(self) -> Bytes {
|
||||
self.stream.out()
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ use ser::{
|
|||
Serializable, Stream,
|
||||
Deserializable, Reader, Error as ReaderError,
|
||||
};
|
||||
use common::{NetAddress, ServiceFlags};
|
||||
use common::{NetAddress, Services};
|
||||
use {Payload, MessageResult};
|
||||
use serialization::deserialize_payload;
|
||||
|
||||
|
@ -69,12 +69,20 @@ impl Version {
|
|||
Version::V70001(ref s, _, _) => s.version,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn services(&self) -> Services {
|
||||
match *self {
|
||||
Version::V0(ref s) => s.services,
|
||||
Version::V106(ref s, _) => s.services,
|
||||
Version::V70001(ref s, _, _) => s.services,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct V0 {
|
||||
pub version: u32,
|
||||
pub services: ServiceFlags,
|
||||
pub services: Services,
|
||||
pub timestamp: i64,
|
||||
pub receiver: NetAddress,
|
||||
}
|
||||
|
|
|
@ -13,5 +13,4 @@ rand = "0.3"
|
|||
|
||||
primitives = { path = "../primitives" }
|
||||
bitcrypto = { path = "../crypto" }
|
||||
serialization = { path = "../serialization" }
|
||||
message = { path = "../message" }
|
||||
|
|
|
@ -3,10 +3,20 @@ use net::Config as NetConfig;
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct Config {
|
||||
/// Number of threads used by p2p thread pool.
|
||||
pub threads: usize,
|
||||
/// Lowest supported protocol version.
|
||||
pub protocol_minimum: u32,
|
||||
/// Highest supported protocol version.
|
||||
pub protocol_maximum: u32,
|
||||
/// Number of inbound connections.
|
||||
pub inbound_connections: usize,
|
||||
/// Number of outbound connections.
|
||||
pub outbound_connections: usize,
|
||||
/// Configuration for every connection.
|
||||
pub connection: NetConfig,
|
||||
/// Connect to these nodes to retrieve peer addresses, and disconnect.
|
||||
pub seednodes: Vec<IpAddr>,
|
||||
/// Connect only ot these nodes.
|
||||
pub limited_connect: Option<Vec<IpAddr>>,
|
||||
pub peers: Vec<IpAddr>,
|
||||
/// Connect to these nodes to retrieve peer addresses, and disconnect.
|
||||
pub seeds: Vec<IpAddr>,
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use std::sync::Arc;
|
||||
use std::net::Shutdown;
|
||||
use std::io::{Read, Write, Error};
|
||||
use tokio_core::net::TcpStream;
|
||||
|
||||
|
@ -12,6 +13,11 @@ impl SharedTcpStream {
|
|||
io: a,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn shutdown(&self) {
|
||||
// error is irrelevant here, the connection is dropped anyway
|
||||
let _ = self.io.shutdown(Shutdown::Both);
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TcpStream> for SharedTcpStream {
|
||||
|
|
|
@ -9,10 +9,11 @@ extern crate parking_lot;
|
|||
extern crate bitcrypto as crypto;
|
||||
extern crate message;
|
||||
extern crate primitives;
|
||||
extern crate serialization as ser;
|
||||
|
||||
pub mod io;
|
||||
pub mod net;
|
||||
pub mod protocol;
|
||||
pub mod session;
|
||||
pub mod util;
|
||||
mod config;
|
||||
mod event_loop;
|
||||
|
@ -26,6 +27,5 @@ pub use primitives::{hash, bytes};
|
|||
pub use config::Config;
|
||||
pub use event_loop::{event_loop, forever};
|
||||
pub use p2p::P2P;
|
||||
|
||||
pub type PeerId = usize;
|
||||
pub use util::{PeerId, PeerInfo};
|
||||
|
||||
|
|
|
@ -3,33 +3,53 @@ use futures::Poll;
|
|||
use futures::stream::Stream;
|
||||
use parking_lot::Mutex;
|
||||
use bytes::Bytes;
|
||||
use message::{MessageResult, Payload, Command};
|
||||
use message::{MessageResult, Payload, Command, Magic, Message};
|
||||
use net::Connection;
|
||||
use io::{read_message_stream, ReadMessageStream, SharedTcpStream, WriteMessage};
|
||||
use io::{read_message_stream, ReadMessageStream, SharedTcpStream, WriteMessage, write_message};
|
||||
use {PeerId, PeerInfo};
|
||||
|
||||
pub struct Channel {
|
||||
connection: Connection,
|
||||
message_stream: Mutex<ReadMessageStream<SharedTcpStream>>,
|
||||
version: u32,
|
||||
magic: Magic,
|
||||
peer_info: PeerInfo,
|
||||
write_stream: SharedTcpStream,
|
||||
read_stream: Mutex<ReadMessageStream<SharedTcpStream>>,
|
||||
}
|
||||
|
||||
impl Channel {
|
||||
pub fn new(connection: Connection) -> Self {
|
||||
pub fn new(connection: Connection, peer_id: PeerId) -> Self {
|
||||
let stream = read_message_stream(connection.stream.clone(), connection.magic);
|
||||
Channel {
|
||||
connection: connection,
|
||||
message_stream: Mutex::new(stream),
|
||||
version: connection.version,
|
||||
magic: connection.magic,
|
||||
peer_info: PeerInfo {
|
||||
address: connection.address,
|
||||
id: peer_id,
|
||||
},
|
||||
write_stream: connection.stream,
|
||||
read_stream: Mutex::new(stream),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_message<T>(&self, payload: &T) -> WriteMessage<T, SharedTcpStream> where T: Payload {
|
||||
self.connection.write_message(payload)
|
||||
// TODO: some tracing here
|
||||
let message = Message::new(self.magic, self.version, payload).expect("failed to create outgoing message");
|
||||
write_message(self.write_stream.clone(), message)
|
||||
}
|
||||
|
||||
pub fn poll_message(&self) -> Poll<Option<(MessageResult<(Command, Bytes)>)>, io::Error> {
|
||||
self.message_stream.lock().poll()
|
||||
self.read_stream.lock().poll()
|
||||
}
|
||||
|
||||
pub fn shutdown(&self) {
|
||||
self.write_stream.shutdown();
|
||||
}
|
||||
|
||||
pub fn version(&self) -> u32 {
|
||||
self.connection.version
|
||||
self.version
|
||||
}
|
||||
|
||||
pub fn peer_info(&self) -> PeerInfo {
|
||||
self.peer_info
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use std::net::SocketAddr;
|
||||
use message::common::{Magic, ServiceFlags, NetAddress};
|
||||
use message::common::{Magic, Services, NetAddress};
|
||||
use message::types::version::{Version, V0, V106, V70001};
|
||||
use util::time::{Time, RealTime};
|
||||
use util::nonce::{NonceGenerator, RandomNonce};
|
||||
|
@ -9,7 +9,7 @@ use VERSION;
|
|||
pub struct Config {
|
||||
pub magic: Magic,
|
||||
pub local_address: SocketAddr,
|
||||
pub services: ServiceFlags,
|
||||
pub services: Services,
|
||||
pub user_agent: String,
|
||||
pub start_height: i32,
|
||||
pub relay: bool,
|
||||
|
|
|
@ -57,6 +57,7 @@ impl Future for Connect {
|
|||
stream: stream.into(),
|
||||
version: result.negotiated_version,
|
||||
magic: self.magic,
|
||||
services: result.version.services(),
|
||||
address: self.address,
|
||||
};
|
||||
(ConnectState::Connected, Async::Ready(Ok(connection)))
|
||||
|
|
|
@ -1,23 +1,12 @@
|
|||
use std::net;
|
||||
use message::{Message, Payload, Magic};
|
||||
use io::{write_message, WriteMessage, SharedTcpStream};
|
||||
use message::Magic;
|
||||
use message::common::Services;
|
||||
use io::SharedTcpStream;
|
||||
|
||||
pub struct Connection {
|
||||
pub stream: SharedTcpStream,
|
||||
pub version: u32,
|
||||
pub magic: Magic,
|
||||
pub services: Services,
|
||||
pub address: net::SocketAddr,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn write_message<T>(&self, payload: &T) -> WriteMessage<T, SharedTcpStream> where T: Payload {
|
||||
let message = match Message::new(self.magic, self.version, payload) {
|
||||
Ok(message) => message,
|
||||
Err(_err) => {
|
||||
// trace here! outgoing messages should always be written properly
|
||||
panic!();
|
||||
}
|
||||
};
|
||||
write_message(self.stream.clone(), message)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,15 +2,13 @@ use std::sync::Arc;
|
|||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::collections::HashMap;
|
||||
use parking_lot::RwLock;
|
||||
use futures::{finished, Future};
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio_core::reactor::Handle;
|
||||
use message::Payload;
|
||||
use net::{Connection, Channel};
|
||||
use PeerId;
|
||||
|
||||
pub struct Connections {
|
||||
/// Incremental peer counter.
|
||||
peer_counter: AtomicUsize,
|
||||
/// All open connections.
|
||||
channels: RwLock<HashMap<PeerId, Arc<Channel>>>,
|
||||
}
|
||||
|
||||
|
@ -22,28 +20,6 @@ impl Connections {
|
|||
}
|
||||
}
|
||||
|
||||
/// Broadcast messages to the network.
|
||||
/// Returned future completes of first confirmed receive.
|
||||
pub fn broadcast<T>(connections: &Arc<Connections>, handle: &Handle, pool: &CpuPool, payload: T) where T: Payload {
|
||||
let channels = connections.channels();
|
||||
for (id, channel) in channels.into_iter() {
|
||||
let write = channel.write_message(&payload);
|
||||
let cs = connections.clone();
|
||||
let pool_work = pool.spawn(write).then(move |x| {
|
||||
match x {
|
||||
Ok(_) => {
|
||||
// successfully sent message
|
||||
},
|
||||
Err(_) => {
|
||||
cs.remove(id);
|
||||
}
|
||||
}
|
||||
finished(())
|
||||
});
|
||||
handle.spawn(pool_work);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns safe (nonblocking) copy of channels.
|
||||
pub fn channels(&self) -> HashMap<PeerId, Arc<Channel>> {
|
||||
self.channels.read().clone()
|
||||
|
@ -57,11 +33,13 @@ impl Connections {
|
|||
/// Stores new channel.
|
||||
pub fn store(&self, connection: Connection) {
|
||||
let id = self.peer_counter.fetch_add(1, Ordering::AcqRel);
|
||||
self.channels.write().insert(id, Arc::new(Channel::new(connection)));
|
||||
self.channels.write().insert(id, Arc::new(Channel::new(connection, id)));
|
||||
}
|
||||
|
||||
/// Removes channel with given id.
|
||||
pub fn remove(&self, id: PeerId) {
|
||||
self.channels.write().remove(&id);
|
||||
if let Some(channel) = self.channels.write().remove(&id) {
|
||||
channel.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ impl Future for AcceptConnection {
|
|||
stream: stream.into(),
|
||||
version: result.negotiated_version,
|
||||
magic: self.magic,
|
||||
services: result.version.services(),
|
||||
address: self.address,
|
||||
};
|
||||
Ok(Ok(connection).into())
|
||||
|
|
|
@ -5,15 +5,28 @@ use futures::{Poll, Async};
|
|||
use futures::stream::Stream;
|
||||
use message::common::Command;
|
||||
use net::Connections;
|
||||
use PeerId;
|
||||
use PeerInfo;
|
||||
|
||||
pub struct MessagesHandler {
|
||||
pub enum MessagePoll {
|
||||
Ready {
|
||||
command: Command,
|
||||
payload: Bytes,
|
||||
version: u32,
|
||||
peer_info: PeerInfo,
|
||||
errored_peers: Vec<PeerInfo>,
|
||||
},
|
||||
OnlyErrors {
|
||||
errored_peers: Vec<PeerInfo>,
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MessagePoller {
|
||||
last_polled: usize,
|
||||
connections: Weak<Connections>,
|
||||
}
|
||||
|
||||
fn next_to_poll(channels: usize, last_polled: usize) -> usize {
|
||||
// it's irrelevant if we sometimes poll the same peer
|
||||
// it's irrelevant if we sometimes poll the same peer twice in a row
|
||||
if channels > last_polled + 1 {
|
||||
// let's poll the next peer
|
||||
last_polled + 1
|
||||
|
@ -23,17 +36,17 @@ fn next_to_poll(channels: usize, last_polled: usize) -> usize {
|
|||
}
|
||||
}
|
||||
|
||||
impl MessagesHandler {
|
||||
impl MessagePoller {
|
||||
pub fn new(connections: Weak<Connections>) -> Self {
|
||||
MessagesHandler {
|
||||
MessagePoller {
|
||||
last_polled: usize::max_value(),
|
||||
connections: connections,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for MessagesHandler {
|
||||
type Item = (Command, Bytes, u32, PeerId);
|
||||
impl Stream for MessagePoller {
|
||||
type Item = MessagePoll;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
|
@ -50,14 +63,15 @@ impl Stream for MessagesHandler {
|
|||
|
||||
let mut to_poll = next_to_poll(channels.len(), self.last_polled);
|
||||
let mut result = None;
|
||||
let mut errored_peers = Vec::new();
|
||||
|
||||
while result.is_none() && to_poll != self.last_polled {
|
||||
let (id, channel) = channels.iter().nth(to_poll).expect("to_poll < channels.len()");
|
||||
let (_, channel) = channels.iter().nth(to_poll).expect("to_poll < channels.len()");
|
||||
let status = channel.poll_message();
|
||||
|
||||
match status {
|
||||
Ok(Async::Ready(Some(Ok((command, message))))) => {
|
||||
result = Some((command, message, channel.version(), *id));
|
||||
Ok(Async::Ready(Some(Ok((command, payload))))) => {
|
||||
result = Some((command, payload, channel.version(), channel.peer_info()));
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
// no messages yet, try next channel
|
||||
|
@ -65,16 +79,32 @@ impl Stream for MessagesHandler {
|
|||
},
|
||||
_ => {
|
||||
// channel has been closed or there was error
|
||||
connections.remove(*id);
|
||||
errored_peers.push(channel.peer_info());
|
||||
to_poll = next_to_poll(channels.len(), to_poll);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
self.last_polled = to_poll;
|
||||
match result.is_some() {
|
||||
true => Ok(Async::Ready(result)),
|
||||
false => Ok(Async::NotReady),
|
||||
match result {
|
||||
Some((command, payload, version, info)) => {
|
||||
let message_poll = MessagePoll::Ready {
|
||||
command: command,
|
||||
payload: payload,
|
||||
version: version,
|
||||
peer_info: info,
|
||||
errored_peers: errored_peers,
|
||||
};
|
||||
|
||||
Ok(Async::Ready(Some(message_poll)))
|
||||
},
|
||||
None if errored_peers.is_empty() => Ok(Async::NotReady),
|
||||
_ => {
|
||||
let message_poll = MessagePoll::OnlyErrors {
|
||||
errored_peers: errored_peers,
|
||||
};
|
||||
Ok(Async::Ready(Some(message_poll)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,13 +5,11 @@ mod connection;
|
|||
mod connections;
|
||||
mod messages;
|
||||
mod listen;
|
||||
mod subscriber;
|
||||
|
||||
pub use self::channel::Channel;
|
||||
pub use self::config::Config;
|
||||
pub use self::connect::{Connect, connect};
|
||||
pub use self::connection::Connection;
|
||||
pub use self::connections::Connections;
|
||||
pub use self::messages::MessagesHandler;
|
||||
pub use self::messages::{MessagePoller, MessagePoll};
|
||||
pub use self::listen::{Listen, listen};
|
||||
pub use self::subscriber::Subscriber;
|
||||
|
|
|
@ -1,78 +0,0 @@
|
|||
use std::sync::mpsc::{Sender, Receiver, channel};
|
||||
use std::mem;
|
||||
use parking_lot::Mutex;
|
||||
use message::{Error, Payload, Command, deserialize_payload};
|
||||
use message::types::{Addr, GetAddr};
|
||||
use PeerId;
|
||||
|
||||
struct Handler<S> {
|
||||
sender: Mutex<Option<Sender<(S, PeerId)>>>,
|
||||
}
|
||||
|
||||
impl<S> Default for Handler<S> {
|
||||
fn default() -> Self {
|
||||
Handler {
|
||||
sender: Mutex::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Handler<S> where S: Payload {
|
||||
fn command(&self) -> Command {
|
||||
S::command().into()
|
||||
}
|
||||
|
||||
fn handle(&self, payload: &[u8], version: u32, peerid: PeerId) -> Result<(), Error> {
|
||||
let payload: S = try!(deserialize_payload(payload, version));
|
||||
if let Some(sender) = self.sender() {
|
||||
if let Err(_err) = sender.send((payload, peerid)) {
|
||||
// TODO: unsubscribe channel?
|
||||
// TODO: trace
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sender(&self) -> Option<Sender<(S, PeerId)>> {
|
||||
self.sender.lock().clone()
|
||||
}
|
||||
|
||||
fn store(&self, sender: Sender<(S, PeerId)>) {
|
||||
mem::replace(&mut *self.sender.lock(), Some(sender));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Subscriber {
|
||||
addr: Handler<Addr>,
|
||||
getaddr: Handler<GetAddr>,
|
||||
}
|
||||
|
||||
macro_rules! define_subscribe {
|
||||
($name: ident, $result: ident, $sub: ident) => {
|
||||
pub fn $name(&self) -> Receiver<($result, PeerId)> {
|
||||
let (sender, receiver) = channel();
|
||||
self.$sub.store(sender);
|
||||
receiver
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! maybe_handle {
|
||||
($command: expr, $sub: expr, $payload: expr, $version: expr, $peerid: expr) => {
|
||||
if $command == $sub.command() {
|
||||
return $sub.handle($payload, $version, $peerid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Subscriber {
|
||||
define_subscribe!(subscribe_addr, Addr, addr);
|
||||
define_subscribe!(subscribe_getaddr, GetAddr, getaddr);
|
||||
|
||||
pub fn try_handle(&self, payload: &[u8], version: u32, command: Command, peerid: PeerId) -> Result<(), Error> {
|
||||
maybe_handle!(command, self.addr, payload, version, peerid);
|
||||
maybe_handle!(command, self.getaddr, payload, version, peerid);
|
||||
Err(Error::InvalidCommand)
|
||||
}
|
||||
}
|
|
@ -1,12 +1,14 @@
|
|||
use std::{io, net};
|
||||
use std::sync::Arc;
|
||||
use parking_lot::RwLock;
|
||||
use futures::{Future, finished};
|
||||
use futures::stream::Stream;
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio_core::reactor::Handle;
|
||||
use message::Payload;
|
||||
use net::{connect, listen, Connections, Subscriber, MessagesHandler};
|
||||
use Config;
|
||||
use net::{connect, listen, Connections, MessagePoller, MessagePoll, Channel};
|
||||
use util::NodeTable;
|
||||
use {Config, PeerId};
|
||||
|
||||
pub struct P2P {
|
||||
/// Global event loop handle.
|
||||
|
@ -17,40 +19,44 @@ pub struct P2P {
|
|||
config: Config,
|
||||
/// Connections.
|
||||
connections: Arc<Connections>,
|
||||
/// Message subscriber.
|
||||
subscriber: Arc<Subscriber>,
|
||||
/// Node Table.
|
||||
node_table: Arc<RwLock<NodeTable>>,
|
||||
}
|
||||
|
||||
impl P2P {
|
||||
pub fn new(config: Config, handle: Handle) -> Self {
|
||||
let pool = CpuPool::new(4);
|
||||
let pool = CpuPool::new(config.threads);
|
||||
|
||||
P2P {
|
||||
event_loop_handle: handle.clone(),
|
||||
pool: pool.clone(),
|
||||
config: config,
|
||||
connections: Arc::new(Connections::new()),
|
||||
subscriber: Arc::new(Subscriber::default()),
|
||||
node_table: Arc::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(&self) -> Result<(), io::Error> {
|
||||
for seednode in self.config.seednodes.iter() {
|
||||
for seednode in self.config.peers.iter() {
|
||||
self.connect(*seednode)
|
||||
}
|
||||
|
||||
try!(self.listen());
|
||||
self.handle_messages();
|
||||
self.attach_protocols();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn connect(&self, ip: net::IpAddr) {
|
||||
let socket = net::SocketAddr::new(ip, self.config.connection.magic.port());
|
||||
let connections = self.connections.clone();
|
||||
let node_table = self.node_table.clone();
|
||||
let connection = connect(&socket, &self.event_loop_handle, &self.config.connection);
|
||||
let pool_work = self.pool.spawn(connection).then(move |x| {
|
||||
if let Ok(Ok(con)) = x {
|
||||
let pool_work = self.pool.spawn(connection).then(move |result| {
|
||||
if let Ok(Ok(con)) = result {
|
||||
node_table.write().insert(con.address, con.services);
|
||||
connections.store(con);
|
||||
} else {
|
||||
node_table.write().note_failure(&socket);
|
||||
}
|
||||
finished(())
|
||||
});
|
||||
|
@ -60,8 +66,10 @@ impl P2P {
|
|||
fn listen(&self) -> Result<(), io::Error> {
|
||||
let listen = try!(listen(&self.event_loop_handle, self.config.connection.clone()));
|
||||
let connections = self.connections.clone();
|
||||
let server = listen.for_each(move |x| {
|
||||
if let Ok(con) = x {
|
||||
let node_table = self.node_table.clone();
|
||||
let server = listen.for_each(move |result| {
|
||||
if let Ok(con) = result {
|
||||
node_table.write().insert(con.address, con.services);
|
||||
connections.store(con);
|
||||
}
|
||||
Ok(())
|
||||
|
@ -73,24 +81,70 @@ impl P2P {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_messages(&self) {
|
||||
let incoming = MessagesHandler::new(Arc::downgrade(&self.connections));
|
||||
let subscriber = self.subscriber.clone();
|
||||
fn attach_protocols(&self) {
|
||||
// TODO: here all network protocols will be attached
|
||||
|
||||
let poller = MessagePoller::new(Arc::downgrade(&self.connections));
|
||||
let connections = self.connections.clone();
|
||||
let incoming_future = incoming.for_each(move |result| {
|
||||
let (command, payload, version, peerid) = result;
|
||||
if let Err(_err) = subscriber.try_handle(&payload, version, command, peerid) {
|
||||
connections.remove(peerid);
|
||||
let node_table = self.node_table.clone();
|
||||
let polling = poller.for_each(move |result| {
|
||||
match result {
|
||||
MessagePoll::Ready { errored_peers, .. } => {
|
||||
// TODO: handle new messasges here!
|
||||
|
||||
let mut node_table = node_table.write();
|
||||
for peer in errored_peers.into_iter() {
|
||||
node_table.note_failure(&peer.address);
|
||||
connections.remove(peer.id);
|
||||
}
|
||||
},
|
||||
MessagePoll::OnlyErrors { errored_peers } => {
|
||||
let mut node_table = node_table.write();
|
||||
for peer in errored_peers.into_iter() {
|
||||
node_table.note_failure(&peer.address);
|
||||
connections.remove(peer.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}).then(|_| {
|
||||
finished(())
|
||||
});
|
||||
let pool_work = self.pool.spawn(incoming_future);
|
||||
let pool_work = self.pool.spawn(polling);
|
||||
self.event_loop_handle.spawn(pool_work);
|
||||
}
|
||||
|
||||
pub fn broadcast<T>(&self, payload: T) where T: Payload {
|
||||
Connections::broadcast(&self.connections, &self.event_loop_handle, &self.pool, payload)
|
||||
let channels = self.connections.channels();
|
||||
for (_id, channel) in channels.into_iter() {
|
||||
self.send_to_channel(&payload, &channel);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send<T>(&self, payload: T, peer: PeerId) where T: Payload {
|
||||
let channels = self.connections.channels();
|
||||
if let Some(channel) = channels.get(&peer) {
|
||||
self.send_to_channel(&payload, channel);
|
||||
}
|
||||
}
|
||||
|
||||
fn send_to_channel<T>(&self, payload: &T, channel: &Arc<Channel>) where T: Payload {
|
||||
let connections = self.connections.clone();
|
||||
let node_table = self.node_table.clone();
|
||||
let peer_info = channel.peer_info();
|
||||
let write = channel.write_message(payload);
|
||||
let pool_work = self.pool.spawn(write).then(move |result| {
|
||||
match result {
|
||||
Ok(_) => {
|
||||
node_table.write().note_used(&peer_info.address);
|
||||
},
|
||||
Err(_err) => {
|
||||
node_table.write().note_failure(&peer_info.address);
|
||||
connections.remove(peer_info.id);
|
||||
}
|
||||
}
|
||||
finished(())
|
||||
});
|
||||
self.event_loop_handle.spawn(pool_work);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
mod ping;
|
|
@ -0,0 +1,3 @@
|
|||
mod manual;
|
||||
mod normal;
|
||||
mod seednode;
|
|
@ -1,2 +1,7 @@
|
|||
pub mod nonce;
|
||||
pub mod time;
|
||||
mod node_table;
|
||||
mod peer;
|
||||
|
||||
pub use self::node_table::{NodeTable, Node};
|
||||
pub use self::peer::{PeerId, PeerInfo};
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
use std::collections::{HashMap, BTreeSet};
|
||||
use std::net::SocketAddr;
|
||||
use std::cmp::{PartialOrd, Ord, Ordering};
|
||||
use message::common::Services;
|
||||
use util::time::{Time, RealTime};
|
||||
|
||||
#[derive(PartialEq, Eq, Clone)]
|
||||
pub struct Node {
|
||||
/// Node address.
|
||||
addr: SocketAddr,
|
||||
/// Timestamp of last interaction with a node.
|
||||
time: i64,
|
||||
/// Services supported by the node.
|
||||
services: Services,
|
||||
/// Node failures counter.
|
||||
failures: u32,
|
||||
}
|
||||
|
||||
impl PartialOrd for Node {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
if self.failures == other.failures {
|
||||
self.time.partial_cmp(&other.time)
|
||||
} else {
|
||||
other.failures.partial_cmp(&self.failures)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for Node {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
if self.failures == other.failures {
|
||||
self.time.cmp(&other.time)
|
||||
} else {
|
||||
other.failures.cmp(&self.failures)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct NodeTable<T = RealTime> where T: Time {
|
||||
/// Time source.
|
||||
time: T,
|
||||
/// Nodes by socket address.
|
||||
by_addr: HashMap<SocketAddr, Node>,
|
||||
/// Nodes sorted by score.
|
||||
by_score: BTreeSet<Node>,
|
||||
}
|
||||
|
||||
impl<T> NodeTable<T> where T: Time {
|
||||
/// Inserts new address and services pair into NodeTable.
|
||||
pub fn insert(&mut self, addr: SocketAddr, services: Services) {
|
||||
let failures = self.by_addr.get(&addr).map_or(0, |ref node| node.failures);
|
||||
|
||||
let node = Node {
|
||||
addr: addr,
|
||||
time: self.time.get().sec,
|
||||
services: services,
|
||||
failures: failures,
|
||||
};
|
||||
|
||||
self.by_addr.insert(addr, node.clone());
|
||||
self.by_score.insert(node);
|
||||
}
|
||||
|
||||
/// Returnes most reliable nodes with desired services.
|
||||
pub fn nodes_with_services(&self, services: &Services, limit: usize) -> Vec<Node> {
|
||||
self.by_score.iter()
|
||||
.rev()
|
||||
.filter(|s| s.services.includes(services))
|
||||
.map(Clone::clone)
|
||||
.take(limit)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Marks address as recently used.
|
||||
pub fn note_used(&mut self, addr: &SocketAddr) {
|
||||
if let Some(ref mut node) = self.by_addr.get_mut(addr) {
|
||||
assert!(self.by_score.remove(node));
|
||||
node.time = self.time.get().sec;
|
||||
self.by_score.insert(node.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Notes failure.
|
||||
pub fn note_failure(&mut self, addr: &SocketAddr) {
|
||||
if let Some(ref mut node) = self.by_addr.get_mut(addr) {
|
||||
assert!(self.by_score.remove(node));
|
||||
node.failures += 1;
|
||||
self.by_score.insert(node.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::net::SocketAddr;
|
||||
use message::common::Services;
|
||||
use util::time::IncrementalTime;
|
||||
use super::NodeTable;
|
||||
|
||||
#[test]
|
||||
fn test_node_table_insert() {
|
||||
let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap();
|
||||
let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap();
|
||||
let s2: SocketAddr = "127.0.0.1:8002".parse().unwrap();
|
||||
let mut table = NodeTable::<IncrementalTime>::default();
|
||||
table.insert(s0, Services::default());
|
||||
table.insert(s1, Services::default());
|
||||
table.insert(s2, Services::default());
|
||||
let nodes = table.nodes_with_services(&Services::default(), 2);
|
||||
assert_eq!(nodes.len(), 2);
|
||||
assert_eq!(nodes[0].addr, s2);
|
||||
assert_eq!(nodes[0].time, 2);
|
||||
assert_eq!(nodes[0].failures, 0);
|
||||
assert_eq!(nodes[1].addr, s1);
|
||||
assert_eq!(nodes[1].time, 1);
|
||||
assert_eq!(nodes[1].failures, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_node_table_note() {
|
||||
let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap();
|
||||
let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap();
|
||||
let s2: SocketAddr = "127.0.0.1:8002".parse().unwrap();
|
||||
let s3: SocketAddr = "127.0.0.1:8003".parse().unwrap();
|
||||
let s4: SocketAddr = "127.0.0.1:8004".parse().unwrap();
|
||||
let mut table = NodeTable::<IncrementalTime>::default();
|
||||
table.insert(s0, Services::default());
|
||||
table.insert(s1, Services::default());
|
||||
table.insert(s2, Services::default());
|
||||
table.insert(s3, Services::default());
|
||||
table.insert(s4, Services::default());
|
||||
table.note_used(&s2);
|
||||
table.note_used(&s4);
|
||||
table.note_used(&s1);
|
||||
table.note_failure(&s2);
|
||||
table.note_failure(&s3);
|
||||
let nodes = table.nodes_with_services(&Services::default(), 10);
|
||||
assert_eq!(nodes.len(), 5);
|
||||
|
||||
assert_eq!(nodes[0].addr, s1);
|
||||
assert_eq!(nodes[0].time, 7);
|
||||
assert_eq!(nodes[0].failures, 0);
|
||||
|
||||
assert_eq!(nodes[1].addr, s4);
|
||||
assert_eq!(nodes[1].time, 6);
|
||||
assert_eq!(nodes[1].failures, 0);
|
||||
|
||||
assert_eq!(nodes[2].addr, s0);
|
||||
assert_eq!(nodes[2].time, 0);
|
||||
assert_eq!(nodes[2].failures, 0);
|
||||
|
||||
assert_eq!(nodes[3].addr, s2);
|
||||
assert_eq!(nodes[3].time, 5);
|
||||
assert_eq!(nodes[3].failures, 1);
|
||||
|
||||
assert_eq!(nodes[4].addr, s3);
|
||||
assert_eq!(nodes[4].time, 3);
|
||||
assert_eq!(nodes[4].failures, 1);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
use std::net::SocketAddr;
|
||||
|
||||
pub type PeerId = usize;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PeerInfo {
|
||||
pub id: PeerId,
|
||||
pub address: SocketAddr,
|
||||
}
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
use std::cell::Cell;
|
||||
use time;
|
||||
|
||||
pub trait Time {
|
||||
|
@ -26,3 +27,17 @@ impl Time for StaticTime {
|
|||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct IncrementalTime {
|
||||
counter: Cell<i64>,
|
||||
}
|
||||
|
||||
impl Time for IncrementalTime {
|
||||
fn get(&self) -> time::Timespec {
|
||||
let c = self.counter.get();
|
||||
let result = time::Timespec::new(c, 0);
|
||||
self.counter.set(c + 1);
|
||||
result
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,11 @@ fn run() -> Result<(), String> {
|
|||
let mut el = event_loop();
|
||||
|
||||
let p2p_cfg = p2p::Config {
|
||||
threads: 4,
|
||||
protocol_minimum: 70001,
|
||||
protocol_maximum: 70017,
|
||||
inbound_connections: 10,
|
||||
outbound_connections: 10,
|
||||
connection: net::Config {
|
||||
magic: cfg.magic,
|
||||
local_address: SocketAddr::new("127.0.0.1".parse().unwrap(), cfg.port),
|
||||
|
@ -36,8 +41,8 @@ fn run() -> Result<(), String> {
|
|||
start_height: 0,
|
||||
relay: false,
|
||||
},
|
||||
seednodes: cfg.seednode.map_or_else(|| vec![], |x| vec![x]),
|
||||
limited_connect: cfg.connect.map_or(None, |x| Some(vec![x])),
|
||||
peers: cfg.connect.map_or_else(|| vec![], |x| vec![x]),
|
||||
seeds: cfg.seednode.map_or_else(|| vec![], |x| vec![x]),
|
||||
};
|
||||
|
||||
let p2p = P2P::new(p2p_cfg, el.handle());
|
||||
|
|
|
@ -107,7 +107,6 @@ digraph dependencies {
|
|||
N6 -> N4[label="",style=dashed];
|
||||
N6 -> N13[label="",style=dashed];
|
||||
N6 -> N14[label="",style=dashed];
|
||||
N6 -> N22[label="",style=dashed];
|
||||
N6 -> N33[label="",style=dashed];
|
||||
N6 -> N36[label="",style=dashed];
|
||||
N6 -> N39[label="",style=dashed];
|
||||
|
|
BIN
tools/graph.png
BIN
tools/graph.png
Binary file not shown.
Before Width: | Height: | Size: 455 KiB After Width: | Height: | Size: 438 KiB |
Loading…
Reference in New Issue