p2p in progress

This commit is contained in:
debris 2016-10-18 12:14:54 +02:00
parent f61f6de84c
commit 4fb1d04955
13 changed files with 299 additions and 139 deletions

View File

@ -3,7 +3,7 @@ use ser::{
Deserializable, Reader, Error as ReaderError
};
#[derive(Debug, Default, PartialEq, Clone, Copy)]
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct Services(u64);
impl From<Services> for u64 {
@ -18,7 +18,6 @@ impl From<u64> for Services {
}
}
impl Services {
pub fn network(&self) -> bool {
self.bit_at(0)
@ -63,7 +62,11 @@ impl Services {
pub fn with_xthin(mut self, v: bool) -> Self {
self.set_bit(4, v);
self
}
}
pub fn includes(&self, other: &Self) -> bool {
self.0 & other.0 == other.0
}
fn set_bit(&mut self, bit: usize, bit_value: bool) {
if bit_value {
@ -89,3 +92,24 @@ impl Deserializable for Services {
reader.read().map(Services)
}
}
#[cfg(test)]
mod test {
use super::Services;
#[test]
fn test_serivces_includes() {
let s1 = Services::default()
.with_witness(true)
.with_xthin(true);
let s2 = Services::default()
.with_witness(true);
assert!(s1.witness());
assert!(s1.xthin());
assert!(s2.witness());
assert!(!s2.xthin());
assert!(s1.includes(&s2));
assert!(!s2.includes(&s1));
}
}

View File

@ -39,4 +39,8 @@ impl Channel {
pub fn version(&self) -> u32 {
self.version
}
pub fn address(&self) -> net::SocketAddr {
self.address
}
}

View File

@ -57,6 +57,7 @@ impl Future for Connect {
stream: stream.into(),
version: result.negotiated_version,
magic: self.magic,
services: result.version.services(),
address: self.address,
};
(ConnectState::Connected, Async::Ready(Ok(connection)))

View File

@ -1,10 +1,12 @@
use std::net;
use message::Magic;
use message::common::Services;
use io::SharedTcpStream;
pub struct Connection {
pub stream: SharedTcpStream,
pub version: u32,
pub magic: Magic,
pub services: Services,
pub address: net::SocketAddr,
}

View File

@ -2,15 +2,13 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::HashMap;
use parking_lot::RwLock;
use futures::{finished, Future};
use futures_cpupool::CpuPool;
use tokio_core::reactor::Handle;
use message::Payload;
use net::{Connection, Channel};
use PeerId;
pub struct Connections {
/// Incremental peer counter.
peer_counter: AtomicUsize,
/// All open connections.
channels: RwLock<HashMap<PeerId, Arc<Channel>>>,
}
@ -22,28 +20,6 @@ impl Connections {
}
}
/// Broadcast messages to the network.
/// Returned future completes of first confirmed receive.
pub fn broadcast<T>(connections: &Arc<Connections>, handle: &Handle, pool: &CpuPool, payload: T) where T: Payload {
let channels = connections.channels();
for (id, channel) in channels.into_iter() {
let write = channel.write_message(&payload);
let cs = connections.clone();
let pool_work = pool.spawn(write).then(move |x| {
match x {
Ok(_) => {
// successfully sent message
},
Err(_) => {
cs.remove(id);
}
}
finished(())
});
handle.spawn(pool_work);
}
}
/// Returns safe (nonblocking) copy of channels.
pub fn channels(&self) -> HashMap<PeerId, Arc<Channel>> {
self.channels.read().clone()

View File

@ -52,6 +52,7 @@ impl Future for AcceptConnection {
stream: stream.into(),
version: result.negotiated_version,
magic: self.magic,
services: result.version.services(),
address: self.address,
};
Ok(Ok(connection).into())

View File

@ -7,13 +7,26 @@ use message::common::Command;
use net::Connections;
use PeerId;
pub struct MessagesHandler {
pub enum MessagePoll {
Ready {
command: Command,
payload: Bytes,
version: u32,
peer_id: PeerId,
errored_peers: Vec<PeerId>,
},
OnlyErrors {
errored_peers: Vec<PeerId>,
}
}
pub struct MessagePoller {
last_polled: usize,
connections: Weak<Connections>,
}
fn next_to_poll(channels: usize, last_polled: usize) -> usize {
// it's irrelevant if we sometimes poll the same peer
// it's irrelevant if we sometimes poll the same peer twice in a row
if channels > last_polled + 1 {
// let's poll the next peer
last_polled + 1
@ -23,17 +36,17 @@ fn next_to_poll(channels: usize, last_polled: usize) -> usize {
}
}
impl MessagesHandler {
impl MessagePoller {
pub fn new(connections: Weak<Connections>) -> Self {
MessagesHandler {
MessagePoller {
last_polled: usize::max_value(),
connections: connections,
}
}
}
impl Stream for MessagesHandler {
type Item = (Command, Bytes, u32, PeerId);
impl Stream for MessagePoller {
type Item = MessagePoll;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -50,14 +63,15 @@ impl Stream for MessagesHandler {
let mut to_poll = next_to_poll(channels.len(), self.last_polled);
let mut result = None;
let mut errored_peers = Vec::new();
while result.is_none() && to_poll != self.last_polled {
let (id, channel) = channels.iter().nth(to_poll).expect("to_poll < channels.len()");
let status = channel.poll_message();
match status {
Ok(Async::Ready(Some(Ok((command, message))))) => {
result = Some((command, message, channel.version(), *id));
Ok(Async::Ready(Some(Ok((command, payload))))) => {
result = Some((command, payload, channel.version(), *id));
},
Ok(Async::NotReady) => {
// no messages yet, try next channel
@ -65,16 +79,32 @@ impl Stream for MessagesHandler {
},
_ => {
// channel has been closed or there was error
connections.remove(*id);
errored_peers.push(*id);
to_poll = next_to_poll(channels.len(), to_poll);
},
}
}
self.last_polled = to_poll;
match result.is_some() {
true => Ok(Async::Ready(result)),
false => Ok(Async::NotReady),
match result {
Some((command, payload, version, id)) => {
let message_poll = MessagePoll::Ready {
command: command,
payload: payload,
version: version,
peer_id: id,
errored_peers: errored_peers,
};
Ok(Async::Ready(Some(message_poll)))
},
None if errored_peers.is_empty() => Ok(Async::NotReady),
_ => {
let message_poll = MessagePoll::OnlyErrors {
errored_peers: errored_peers,
};
Ok(Async::Ready(Some(message_poll)))
}
}
}
}

View File

@ -5,13 +5,11 @@ mod connection;
mod connections;
mod messages;
mod listen;
mod subscriber;
pub use self::channel::Channel;
pub use self::config::Config;
pub use self::connect::{Connect, connect};
pub use self::connection::Connection;
pub use self::connections::Connections;
pub use self::messages::MessagesHandler;
pub use self::messages::MessagePoller;
pub use self::listen::{Listen, listen};
pub use self::subscriber::Subscriber;

View File

@ -1,78 +0,0 @@
use std::sync::mpsc::{Sender, Receiver, channel};
use std::mem;
use parking_lot::Mutex;
use message::{Error, Payload, Command, deserialize_payload};
use message::types::{Addr, GetAddr};
use PeerId;
struct Handler<S> {
sender: Mutex<Option<Sender<(S, PeerId)>>>,
}
impl<S> Default for Handler<S> {
fn default() -> Self {
Handler {
sender: Mutex::default(),
}
}
}
impl<S> Handler<S> where S: Payload {
fn command(&self) -> Command {
S::command().into()
}
fn handle(&self, payload: &[u8], version: u32, peerid: PeerId) -> Result<(), Error> {
let payload: S = try!(deserialize_payload(payload, version));
if let Some(sender) = self.sender() {
if let Err(_err) = sender.send((payload, peerid)) {
// TODO: unsubscribe channel?
// TODO: trace
}
}
Ok(())
}
fn sender(&self) -> Option<Sender<(S, PeerId)>> {
self.sender.lock().clone()
}
fn store(&self, sender: Sender<(S, PeerId)>) {
mem::replace(&mut *self.sender.lock(), Some(sender));
}
}
#[derive(Default)]
pub struct Subscriber {
addr: Handler<Addr>,
getaddr: Handler<GetAddr>,
}
macro_rules! define_subscribe {
($name: ident, $result: ident, $sub: ident) => {
pub fn $name(&self) -> Receiver<($result, PeerId)> {
let (sender, receiver) = channel();
self.$sub.store(sender);
receiver
}
}
}
macro_rules! maybe_handle {
($command: expr, $sub: expr, $payload: expr, $version: expr, $peerid: expr) => {
if $command == $sub.command() {
return $sub.handle($payload, $version, $peerid);
}
}
}
impl Subscriber {
define_subscribe!(subscribe_addr, Addr, addr);
define_subscribe!(subscribe_getaddr, GetAddr, getaddr);
pub fn try_handle(&self, payload: &[u8], version: u32, command: Command, peerid: PeerId) -> Result<(), Error> {
maybe_handle!(command, self.addr, payload, version, peerid);
maybe_handle!(command, self.getaddr, payload, version, peerid);
Err(Error::InvalidCommand)
}
}

View File

@ -1,11 +1,13 @@
use std::{io, net};
use std::sync::Arc;
use parking_lot::RwLock;
use futures::{Future, finished};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_core::reactor::Handle;
use message::Payload;
use net::{connect, listen, Connections, Subscriber, MessagesHandler};
use net::{connect, listen, Connections, MessagePoller};
use util::NodeTable;
use Config;
pub struct P2P {
@ -17,8 +19,8 @@ pub struct P2P {
config: Config,
/// Connections.
connections: Arc<Connections>,
/// Message subscriber.
subscriber: Arc<Subscriber>,
/// Node Table.
node_table: Arc<RwLock<NodeTable>>,
}
impl P2P {
@ -30,7 +32,7 @@ impl P2P {
pool: pool.clone(),
config: config,
connections: Arc::new(Connections::new()),
subscriber: Arc::new(Subscriber::default()),
node_table: Arc::default(),
}
}
@ -40,17 +42,21 @@ impl P2P {
}
try!(self.listen());
self.handle_messages();
self.attach_protocols();
Ok(())
}
pub fn connect(&self, ip: net::IpAddr) {
let socket = net::SocketAddr::new(ip, self.config.connection.magic.port());
let connections = self.connections.clone();
let node_table = self.node_table.clone();
let connection = connect(&socket, &self.event_loop_handle, &self.config.connection);
let pool_work = self.pool.spawn(connection).then(move |x| {
if let Ok(Ok(con)) = x {
let pool_work = self.pool.spawn(connection).then(move |result| {
if let Ok(Ok(con)) = result {
node_table.write().insert(con.address, con.services);
connections.store(con);
} else {
node_table.write().note_failure(&socket);
}
finished(())
});
@ -60,8 +66,10 @@ impl P2P {
fn listen(&self) -> Result<(), io::Error> {
let listen = try!(listen(&self.event_loop_handle, self.config.connection.clone()));
let connections = self.connections.clone();
let node_table = self.node_table.clone();
let server = listen.for_each(move |x| {
if let Ok(con) = x {
node_table.write().insert(con.address, con.services);
connections.store(con);
}
Ok(())
@ -73,24 +81,40 @@ impl P2P {
Ok(())
}
fn handle_messages(&self) {
let incoming = MessagesHandler::new(Arc::downgrade(&self.connections));
let subscriber = self.subscriber.clone();
fn attach_protocols(&self) {
let poller = MessagePoller::new(Arc::downgrade(&self.connections));
let connections = self.connections.clone();
let incoming_future = incoming.for_each(move |result| {
let (command, payload, version, peerid) = result;
if let Err(_err) = subscriber.try_handle(&payload, version, command, peerid) {
connections.remove(peerid);
}
let polling = poller.for_each(move |result| {
// TODO: handle incomming message
Ok(())
}).then(|_| {
finished(())
});
let pool_work = self.pool.spawn(incoming_future);
let pool_work = self.pool.spawn(polling);
self.event_loop_handle.spawn(pool_work);
}
pub fn broadcast<T>(&self, payload: T) where T: Payload {
Connections::broadcast(&self.connections, &self.event_loop_handle, &self.pool, payload)
let channels = self.connections.channels();
for (id, channel) in channels.into_iter() {
let connections = self.connections.clone();
let node_table = self.node_table.clone();
let address = channel.address();
let write = channel.write_message(&payload);
let pool_work = self.pool.spawn(write).then(move |result| {
match result {
Ok(_) => {
node_table.write().note_used(&address);
},
Err(_err) => {
node_table.write().note_failure(&address);
connections.remove(id);
}
}
// remove broken connections
finished(())
});
self.event_loop_handle.spawn(pool_work);
}
}
}

View File

@ -1,3 +1,5 @@
pub mod nonce;
pub mod time;
mod node_table;
pub use self::node_table::{NodeTable, Node};

161
p2p/src/util/node_table.rs Normal file
View File

@ -0,0 +1,161 @@
use std::collections::{HashMap, BTreeSet};
use std::net::SocketAddr;
use std::cmp::{PartialOrd, Ord, Ordering};
use message::common::Services;
use util::time::{Time, RealTime};
#[derive(PartialEq, Eq, Clone)]
pub struct Node {
/// Node address.
addr: SocketAddr,
/// Timestamp of last interaction with a node.
time: i64,
/// Services supported by the node.
services: Services,
/// Node failures counter.
failures: u32,
}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
if self.failures == other.failures {
self.time.partial_cmp(&other.time)
} else {
other.failures.partial_cmp(&self.failures)
}
}
}
impl Ord for Node {
fn cmp(&self, other: &Self) -> Ordering {
if self.failures == other.failures {
self.time.cmp(&other.time)
} else {
other.failures.cmp(&self.failures)
}
}
}
#[derive(Default)]
pub struct NodeTable<T = RealTime> where T: Time {
/// Time source.
time: T,
/// Nodes by socket address.
by_addr: HashMap<SocketAddr, Node>,
/// Nodes sorted by score.
by_score: BTreeSet<Node>,
}
impl<T> NodeTable<T> where T: Time {
/// Inserts new address and services pair into NodeTable.
pub fn insert(&mut self, addr: SocketAddr, services: Services) {
let failures = self.by_addr.get(&addr).map_or(0, |ref node| node.failures);
let node = Node {
addr: addr,
time: self.time.get().sec,
services: services,
failures: failures,
};
self.by_addr.insert(addr, node.clone());
self.by_score.insert(node);
}
/// Returnes most reliable nodes with desired services.
pub fn nodes_with_services(&self, services: &Services, limit: usize) -> Vec<Node> {
self.by_score.iter()
.rev()
.filter(|s| s.services.includes(services))
.map(Clone::clone)
.take(limit)
.collect()
}
/// Marks address as recently used.
pub fn note_used(&mut self, addr: &SocketAddr) {
if let Some(ref mut node) = self.by_addr.get_mut(addr) {
assert!(self.by_score.remove(node));
node.time = self.time.get().sec;
self.by_score.insert(node.clone());
}
}
/// Notes failure.
pub fn note_failure(&mut self, addr: &SocketAddr) {
if let Some(ref mut node) = self.by_addr.get_mut(addr) {
assert!(self.by_score.remove(node));
node.failures += 1;
self.by_score.insert(node.clone());
}
}
}
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use message::common::Services;
use util::time::IncrementalTime;
use super::NodeTable;
#[test]
fn test_node_table_insert() {
let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap();
let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap();
let s2: SocketAddr = "127.0.0.1:8002".parse().unwrap();
let mut table = NodeTable::<IncrementalTime>::default();
table.insert(s0, Services::default());
table.insert(s1, Services::default());
table.insert(s2, Services::default());
let nodes = table.nodes_with_services(&Services::default(), 2);
assert_eq!(nodes.len(), 2);
assert_eq!(nodes[0].addr, s2);
assert_eq!(nodes[0].time, 2);
assert_eq!(nodes[0].failures, 0);
assert_eq!(nodes[1].addr, s1);
assert_eq!(nodes[1].time, 1);
assert_eq!(nodes[1].failures, 0);
}
#[test]
fn test_node_table_note() {
let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap();
let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap();
let s2: SocketAddr = "127.0.0.1:8002".parse().unwrap();
let s3: SocketAddr = "127.0.0.1:8003".parse().unwrap();
let s4: SocketAddr = "127.0.0.1:8004".parse().unwrap();
let mut table = NodeTable::<IncrementalTime>::default();
table.insert(s0, Services::default());
table.insert(s1, Services::default());
table.insert(s2, Services::default());
table.insert(s3, Services::default());
table.insert(s4, Services::default());
table.note_used(&s2);
table.note_used(&s4);
table.note_used(&s1);
table.note_failure(&s2);
table.note_failure(&s3);
let nodes = table.nodes_with_services(&Services::default(), 10);
assert_eq!(nodes.len(), 5);
assert_eq!(nodes[0].addr, s1);
assert_eq!(nodes[0].time, 7);
assert_eq!(nodes[0].failures, 0);
assert_eq!(nodes[1].addr, s4);
assert_eq!(nodes[1].time, 6);
assert_eq!(nodes[1].failures, 0);
assert_eq!(nodes[2].addr, s0);
assert_eq!(nodes[2].time, 0);
assert_eq!(nodes[2].failures, 0);
assert_eq!(nodes[3].addr, s2);
assert_eq!(nodes[3].time, 5);
assert_eq!(nodes[3].failures, 1);
assert_eq!(nodes[4].addr, s3);
assert_eq!(nodes[4].time, 3);
assert_eq!(nodes[4].failures, 1);
}
}

View File

@ -1,3 +1,4 @@
use std::cell::Cell;
use time;
pub trait Time {
@ -26,3 +27,17 @@ impl Time for StaticTime {
self.0
}
}
#[derive(Default)]
pub struct IncrementalTime {
counter: Cell<i64>,
}
impl Time for IncrementalTime {
fn get(&self) -> time::Timespec {
let c = self.counter.get();
let result = time::Timespec::new(c, 0);
self.counter.set(c + 1);
result
}
}