Merge branch 'master' into verification

This commit is contained in:
NikVolf 2016-10-24 17:47:35 +03:00
commit ffd299f011
34 changed files with 1331 additions and 278 deletions

14
Cargo.lock generated
View File

@ -10,6 +10,7 @@ dependencies = [
"miner 0.1.0",
"p2p 0.1.0",
"script 0.1.0",
"sync 0.1.0",
"verification 0.1.0",
]
@ -358,6 +359,7 @@ name = "p2p"
version = "0.1.0"
dependencies = [
"bitcrypto 0.1.0",
"chain 0.1.0",
"futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -520,6 +522,18 @@ name = "strsim"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "sync"
version = "0.1.0"
dependencies = [
"chain 0.1.0",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"message 0.1.0",
"p2p 0.1.0",
"parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0",
]
[[package]]
name = "term_size"
version = "0.2.1"

View File

@ -15,6 +15,7 @@ p2p = { path = "p2p" }
script = { path = "script" }
db = { path = "db" }
verification = { path = "verification" }
sync = { path = "sync" }
[[bin]]
path = "pbtc/main.rs"

View File

@ -1,17 +1,16 @@
use hex::FromHex;
use crypto::dhash256;
use hash::H256;
use ser::{
Deserializable, Reader, Error as ReaderError, deserialize,
Serializable, Stream, serialize
Serializable, Stream
};
use merkle_root::merkle_root;
use {BlockHeader, Transaction};
#[derive(Debug, PartialEq, Clone)]
pub struct Block {
block_header: BlockHeader,
transactions: Vec<Transaction>,
pub block_header: BlockHeader,
pub transactions: Vec<Transaction>,
}
impl Serializable for Block {
@ -45,7 +44,7 @@ impl Block {
}
pub fn hash(&self) -> H256 {
dhash256(&serialize(&self.block_header))
self.block_header.hash()
}
/// Returns block's merkle root.

View File

@ -1,11 +1,12 @@
use std::fmt;
use ser::{
Deserializable, Reader, Error as ReaderError,
Serializable, Stream
Serializable, Stream, serialize
};
use crypto::dhash256;
use hash::H256;
#[derive(Clone, PartialEq)]
#[derive(PartialEq, Clone)]
pub struct BlockHeader {
pub version: u32,
pub previous_header_hash: H256,
@ -15,6 +16,12 @@ pub struct BlockHeader {
pub nonce: u32,
}
impl BlockHeader {
pub fn hash(&self) -> H256 {
dhash256(&serialize(self))
}
}
impl fmt::Debug for BlockHeader {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BlockHeader")

View File

@ -28,10 +28,13 @@ impl From<&'static str> for Command {
}
impl Command {
fn as_string(&self) -> String {
pub fn len(&self) -> usize {
let trailing_zeros = self.0.iter().rev().take_while(|&x| x == &0).count();
let limit = self.0.len() - trailing_zeros;
String::from_utf8_lossy(&self.0[..limit]).to_ascii_lowercase()
self.0.len() - trailing_zeros
}
fn as_string(&self) -> String {
String::from_utf8_lossy(&self.0[..self.len()]).to_ascii_lowercase()
}
}
@ -59,6 +62,13 @@ impl Deserializable for Command {
}
}
impl<'a> PartialEq<&'a str> for Command {
fn eq(&self, other: &&'a str) -> bool {
self.len() == other.len() &&
&self.0[..other.len()] == other.as_ref() as &[u8]
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
@ -67,7 +77,8 @@ mod tests {
#[test]
fn test_command_parse() {
assert_eq!(Command("76657273696f6e0000000000".into()), "version".into());
let command: Command = "version".into();
assert_eq!(Command("76657273696f6e0000000000".into()), command);
}
#[test]
@ -90,7 +101,15 @@ mod tests {
let raw: Bytes = "76657273696f6e0000000000".into();
let expected: Command = "version".into();
assert_eq!(expected, deserialize(&raw).unwrap());
assert_eq!(expected, deserialize::<Command>(&raw).unwrap());
}
#[test]
fn partial_eq_command_str() {
let command: Command = "version".into();
assert_eq!(command, "version");
assert!(command != "ver");
assert!(command != "versionx");
}
}

View File

@ -30,7 +30,7 @@ impl From<InventoryType> for u32 {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct InventoryVector {
pub inv_type: u32,
pub hash: H256,

View File

@ -11,6 +11,12 @@ impl From<net::IpAddr> for IpAddress {
}
}
impl From<IpAddress> for net::IpAddr {
fn from(ip: IpAddress) -> Self {
ip.0
}
}
impl From<&'static str> for IpAddress {
fn from(s: &'static str) -> Self {
s.parse().unwrap()

View File

@ -11,6 +11,14 @@ pub enum Addr {
V31402(V31402),
}
impl Addr {
pub fn new(addresses: Vec<AddressEntry>) -> Self {
Addr::V31402(V31402 {
addresses: addresses,
})
}
}
impl Payload for Addr {
fn version() -> u32 {
0

View File

@ -0,0 +1,31 @@
use ser::{Stream, Reader};
use chain::Block as ChainBlock;
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Block {
pub block: ChainBlock,
}
impl Payload for Block {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"block"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let tx = Block {
block: try!(reader.read()),
};
Ok(tx)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append(&self.block);
Ok(())
}
}

View File

@ -1,4 +1,5 @@
pub mod addr;
mod block;
mod blocktxn;
mod compactblock;
mod feefilter;
@ -20,10 +21,12 @@ mod pong;
pub mod reject;
mod sendcompact;
mod sendheaders;
mod tx;
mod verack;
pub mod version;
pub use self::addr::Addr;
pub use self::block::Block;
pub use self::blocktxn::BlockTxn;
pub use self::compactblock::CompactBlock;
pub use self::feefilter::FeeFilter;
@ -45,5 +48,6 @@ pub use self::pong::Pong;
pub use self::reject::Reject;
pub use self::sendcompact::SendCompact;
pub use self::sendheaders::SendHeaders;
pub use self::tx::Tx;
pub use self::verack::Verack;
pub use self::version::Version;

31
message/src/types/tx.rs Normal file
View File

@ -0,0 +1,31 @@
use ser::{Stream, Reader};
use chain::Transaction;
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Tx {
pub transaction: Transaction,
}
impl Payload for Tx {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"tx"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let tx = Tx {
transaction: try!(reader.read()),
};
Ok(tx)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append(&self.transaction);
Ok(())
}
}

View File

@ -6,6 +6,6 @@ extern crate test_data;
pub mod memory_pool;
pub use primitives::{hash};
pub use primitives::hash;
pub use self::memory_pool::{MemoryPool, Information as MemoryPoolInformation, OrderingStrategy as MemoryPoolOrderingStrategy};
pub use self::memory_pool::{MemoryPool, Information as MemoryPoolInformation, OrderingStrategy as MemoryPoolOrderingStrategy};

View File

@ -15,3 +15,4 @@ log = "0.3"
primitives = { path = "../primitives" }
bitcrypto = { path = "../crypto" }
message = { path = "../message" }
chain = { path = "../chain" }

View File

@ -43,7 +43,7 @@ impl<M, A> Future for ReadMessage<M, A> where A: io::Read, M: Payload {
Ok(header) => header,
Err(err) => return Ok((read, Err(err)).into()),
};
if header.command != M::command().into() {
if header.command != M::command() {
return Ok((read, Err(Error::InvalidCommand)).into());
}
let future = read_payload(

View File

@ -8,15 +8,16 @@ extern crate parking_lot;
#[macro_use]
extern crate log;
extern crate chain;
extern crate bitcrypto as crypto;
extern crate message;
extern crate primitives;
pub mod io;
pub mod net;
pub mod protocol;
pub mod session;
pub mod util;
mod io;
mod net;
mod protocol;
mod session;
mod util;
mod config;
mod event_loop;
mod p2p;
@ -27,7 +28,8 @@ pub const USER_AGENT: &'static str = "pbtc";
pub use primitives::{hash, bytes};
pub use config::Config;
pub use event_loop::{event_loop, forever};
pub use net::Config as NetConfig;
pub use p2p::P2P;
pub use event_loop::{event_loop, forever};
pub use util::{PeerId, PeerInfo};
pub use protocol::{InboundSyncConnection, InboundSyncConnectionRef, OutboundSyncConnection, OutboundSyncConnectionRef, LocalSyncNode, LocalSyncNodeRef};

View File

@ -1,6 +1,4 @@
use tokio_core::io::{write_all, WriteAll};
use bytes::Bytes;
use message::{Payload, Magic, Message, to_raw_message, Command};
use message::{Payload, Magic, Message};
use net::Connection;
use session::Session;
use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage};
@ -34,11 +32,6 @@ impl Channel {
write_message(self.stream.clone(), message)
}
pub fn write_raw_message(&self, command: Command, payload: &Bytes) -> WriteAll<SharedTcpStream, Bytes> {
let message = to_raw_message(self.magic, command, payload);
write_all(self.stream.clone(), message)
}
pub fn read_message(&self) -> ReadAnyMessage<SharedTcpStream> {
read_any_message(self.stream.clone(), self.magic)
}

View File

@ -1,9 +1,11 @@
use std::mem;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::HashMap;
use parking_lot::RwLock;
use net::{Connection, Channel};
use session::Session;
use p2p::Context;
use session::{SessionFactory};
use PeerId;
#[derive(Default)]
@ -15,24 +17,26 @@ pub struct Connections {
}
impl Connections {
pub fn new() -> Self {
Connections::default()
/// Returns channel with given peer id.
pub fn channel(&self, id: PeerId) -> Option<Arc<Channel>> {
self.channels.read().get(&id).cloned()
}
/// Returns safe (nonblocking) copy of channels.
pub fn channels(&self) -> HashMap<PeerId, Arc<Channel>> {
pub fn _channels(&self) -> HashMap<PeerId, Arc<Channel>> {
self.channels.read().clone()
}
/// Returns number of connections.
pub fn count(&self) -> usize {
pub fn _count(&self) -> usize {
self.channels.read().len()
}
/// Stores new channel.
/// Returnes a shared pointer to it.
pub fn store(&self, connection: Connection, session: Session) -> Arc<Channel> {
pub fn store<T>(&self, context: Arc<Context>, connection: Connection) -> Arc<Channel> where T: SessionFactory {
let id = self.peer_counter.fetch_add(1, Ordering::AcqRel);
let session = T::new_session(context, id);
let channel = Arc::new(Channel::new(connection, id, session));
self.channels.write().insert(id, channel.clone());
channel
@ -42,4 +46,12 @@ impl Connections {
pub fn remove(&self, id: PeerId) -> Option<Arc<Channel>> {
self.channels.write().remove(&id)
}
/// Drop all channels.
pub fn remove_all(&self) -> Vec<Arc<Channel>> {
mem::replace(&mut *self.channels.write(), HashMap::new())
.into_iter()
.map(|(_, value)| value)
.collect()
}
}

View File

@ -5,30 +5,60 @@ use futures::{Future, finished, failed, BoxFuture};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_core::io::IoFuture;
use tokio_core::reactor::Handle;
use bytes::Bytes;
use message::{Payload, Command};
use session::Session;
use tokio_core::reactor::{Handle, Remote};
use message::{Payload, MessageResult};
use protocol::Direction;
use io::{ReadAnyMessage, SharedTcpStream};
use net::{connect, listen, Connections, Channel, Config as NetConfig};
use util::NodeTable;
use {Config, PeerInfo};
use util::{NodeTable, Node};
use session::{SessionFactory, SeednodeSessionFactory, NormalSessionFactory};
use {Config, PeerInfo, PeerId};
use protocol::{LocalSyncNodeRef, InboundSyncConnectionRef, OutboundSyncConnectionRef};
pub type BoxedMessageFuture = BoxFuture<<ReadAnyMessage<SharedTcpStream> as Future>::Item, <ReadAnyMessage<SharedTcpStream> as Future>::Error>;
pub type BoxedEmptyFuture = BoxFuture<(), ()>;
/// Network context.
#[derive(Default)]
pub struct Context {
/// Connections.
connections: Connections,
/// Node Table.
node_table: RwLock<NodeTable>,
/// Thread pool handle.
pool: CpuPool,
/// Remote event loop handle.
remote: Remote,
/// Local synchronization node.
local_sync_node: LocalSyncNodeRef,
}
impl Context {
pub fn connect(context: Arc<Context>, socket: net::SocketAddr, handle: &Handle, config: &NetConfig) -> BoxedEmptyFuture {
pub fn new(local_sync_node: LocalSyncNodeRef, pool_handle: CpuPool, remote: Remote) -> Self {
Context {
connections: Default::default(),
node_table: Default::default(),
pool: pool_handle,
remote: remote,
local_sync_node: local_sync_node,
}
}
pub fn spawn<F>(&self, f: F) where F: Future + Send + 'static, F::Item: Send + 'static, F::Error: Send + 'static {
let pool_work = self.pool.spawn(f);
self.remote.spawn(move |handle| {
handle.spawn(pool_work.then(|_| finished(())));
Ok(())
})
}
pub fn node_table_entries(&self) -> Vec<Node> {
self.node_table.read().recently_active_nodes()
}
pub fn update_node_table(&self, nodes: Vec<Node>) {
trace!("Updating node table with {} entries", nodes.len());
self.node_table.write().insert_many(nodes);
}
pub fn connect<T>(context: Arc<Context>, socket: net::SocketAddr, handle: &Handle, config: &NetConfig) -> BoxedEmptyFuture where T: SessionFactory {
trace!("Trying to connect to: {}", socket);
let connection = connect(&socket, handle, config);
connection.then(move |result| {
@ -37,13 +67,11 @@ impl Context {
// successfull hanshake
trace!("Connected to {}", connection.address);
context.node_table.write().insert(connection.address, connection.services);
let session = Session::new();
let channel = context.connections.store(connection, session);
let channel = context.connections.store::<T>(context.clone(), connection);
// initialize session and then start reading messages
channel.session().initialize(context.clone(), channel.clone(), Direction::Outbound)
.and_then(move |_| Context::on_message(context, channel))
.boxed()
channel.session().initialize(channel.clone(), Direction::Outbound);
Context::on_message(context, channel)
},
Ok(Err(err)) => {
// protocol error
@ -71,14 +99,11 @@ impl Context {
// successfull hanshake
trace!("Accepted connection from {}", connection.address);
context.node_table.write().insert(connection.address, connection.services);
let session = Session::new();
let channel = context.connections.store(connection, session);
let channel = context.connections.store::<NormalSessionFactory>(context.clone(), connection);
// initialize session and then start reading messages
let cloned_context = context.clone();
channel.session().initialize(context.clone(), channel.clone(), Direction::Inbound)
.and_then(|_| Context::on_message(cloned_context, channel))
.boxed()
channel.session().initialize(channel.clone(), Direction::Inbound);
Context::on_message(context.clone(), channel)
},
Ok(Err(err)) => {
// protocol error
@ -97,16 +122,26 @@ impl Context {
Ok(server)
}
pub fn on_message(context: Arc<Context>, channel: Arc<Channel>) -> BoxedMessageFuture {
pub fn on_message(context: Arc<Context>, channel: Arc<Channel>) -> IoFuture<MessageResult<()>> {
channel.read_message().then(move |result| {
match result {
Ok(Ok((command, payload))) => {
// successful read
trace!("Received {} message from {}", command, channel.peer_info().address);
// handle message and read the next one
channel.session().on_message(context.clone(), channel.clone(), command, payload)
.and_then(move |_| Context::on_message(context, channel))
.boxed()
match channel.session().on_message(channel.clone(), command, payload) {
Ok(_) => {
context.node_table.write().note_used(&channel.peer_info().address);
let on_message = Context::on_message(context.clone(), channel);
context.spawn(on_message);
finished(Ok(())).boxed()
},
Err(err) => {
// protocol error
context.close_connection(channel.peer_info());
finished(Err(err)).boxed()
}
}
},
Ok(Err(err)) => {
// protocol error
@ -122,22 +157,15 @@ impl Context {
}).boxed()
}
pub fn send_raw(_context: Arc<Context>, channel: Arc<Channel>, command: Command, payload: &Bytes) -> IoFuture<()> {
trace!("Sending {} message to {}", command, channel.peer_info().address);
channel.write_raw_message(command.clone(), payload).then(move |result| {
match result {
Ok(_) => {
// successful send
trace!("Sent {} message to {}", command, channel.peer_info().address);
finished(()).boxed()
},
Err(err) => {
// network error
// closing connection is handled in on_message`
failed(err).boxed()
},
pub fn send_to_peer<T>(context: Arc<Context>, peer: PeerId, payload: &T) -> IoFuture<()> where T: Payload {
match context.connections.channel(peer) {
Some(channel) => Context::send(context, channel, payload),
None => {
// peer no longer exists.
// TODO: should we return error here?
finished(()).boxed()
}
}).boxed()
}
}
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, payload: &T) -> IoFuture<()> where T: Payload {
@ -165,6 +193,10 @@ impl Context {
self.node_table.write().note_failure(&peer_info.address);
}
}
pub fn create_sync_session(&self, start_height: i32, outbound_connection: OutboundSyncConnectionRef) -> InboundSyncConnectionRef {
self.local_sync_node.lock().create_sync_session(start_height, outbound_connection)
}
}
pub struct P2P {
@ -178,30 +210,47 @@ pub struct P2P {
context: Arc<Context>,
}
impl Drop for P2P {
fn drop(&mut self) {
// there are retain cycles
// context->connections->channel->session->protocol->context
// context->connections->channel->on_message closure->context
// first let's get rid of session retain cycle
for channel in &self.context.connections.remove_all() {
// done, now let's finish on_message
channel.shutdown();
}
}
}
impl P2P {
pub fn new(config: Config, handle: Handle) -> Self {
pub fn new(config: Config, local_sync_node: LocalSyncNodeRef, handle: Handle) -> Self {
let pool = CpuPool::new(config.threads);
P2P {
event_loop_handle: handle.clone(),
pool: pool.clone(),
config: config,
context: Arc::default(),
context: Arc::new(Context::new(local_sync_node, pool, handle.remote().clone())),
}
}
pub fn run(&self) -> Result<(), io::Error> {
for peer in self.config.peers.iter() {
self.connect(*peer)
self.connect::<NormalSessionFactory>(*peer);
}
for seed in self.config.seeds.iter() {
self.connect::<SeednodeSessionFactory>(*seed);
}
try!(self.listen());
Ok(())
}
pub fn connect(&self, ip: net::IpAddr) {
pub fn connect<T>(&self, ip: net::IpAddr) where T: SessionFactory {
let socket = net::SocketAddr::new(ip, self.config.connection.magic.port());
let connection = Context::connect(self.context.clone(), socket, &self.event_loop_handle, &self.config.connection);
let connection = Context::connect::<T>(self.context.clone(), socket, &self.event_loop_handle, &self.config.connection);
let pool_work = self.pool.spawn(connection);
self.event_loop_handle.spawn(pool_work);
}
@ -212,40 +261,4 @@ impl P2P {
self.event_loop_handle.spawn(pool_work);
Ok(())
}
/*
pub fn broadcast<T>(&self, payload: T) where T: Payload {
let channels = self.connections.channels();
for (_id, channel) in channels.into_iter() {
self.send_to_channel(&payload, &channel);
}
}
pub fn send<T>(&self, payload: T, peer: PeerId) where T: Payload {
let channels = self.connections.channels();
if let Some(channel) = channels.get(&peer) {
self.send_to_channel(&payload, channel);
}
}
fn send_to_channel<T>(&self, payload: &T, channel: &Arc<Channel>) where T: Payload {
let connections = self.connections.clone();
let node_table = self.node_table.clone();
let peer_info = channel.peer_info();
let write = channel.write_message(payload);
let pool_work = self.pool.spawn(write).then(move |result| {
match result {
Ok(_) => {
node_table.write().note_used(&peer_info.address);
},
Err(_err) => {
node_table.write().note_failure(&peer_info.address);
connections.remove(peer_info.id);
}
}
finished(())
});
self.event_loop_handle.spawn(pool_work);
}
*/
}

58
p2p/src/protocol/addr.rs Normal file
View File

@ -0,0 +1,58 @@
use std::sync::Arc;
use bytes::Bytes;
use message::{Error, Command, deserialize_payload, Payload};
use message::types::{GetAddr, Addr};
use protocol::{Protocol, Direction};
use p2p::Context;
use PeerId;
pub struct AddrProtocol {
/// Context
context: Arc<Context>,
/// Connected peer id.
peer: PeerId,
/// True if expect addr message.
expects_addr: bool,
}
impl AddrProtocol {
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
AddrProtocol {
context: context,
peer: peer,
expects_addr: false,
}
}
}
impl Protocol for AddrProtocol {
fn initialize(&mut self, direction: Direction, _version: u32) {
if let Direction::Outbound = direction {
self.expects_addr = true;
let send = Context::send_to_peer(self.context.clone(), self.peer, &GetAddr);
self.context.spawn(send);
}
}
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> {
if command == &GetAddr::command() {
let _: GetAddr = try!(deserialize_payload(payload, version));
let entries = self.context.node_table_entries().into_iter().map(Into::into).collect();
let addr = Addr::new(entries);
let send = Context::send_to_peer(self.context.clone(), self.peer, &addr);
self.context.spawn(send);
} else if command == &Addr::command() {
let addr: Addr = try!(deserialize_payload(payload, version));
match addr {
Addr::V0(_) => {
unreachable!("This version of protocol is not supported!");
},
Addr::V31402(addr) => {
let nodes = addr.addresses.into_iter().map(Into::into).collect();
self.context.update_node_table(nodes);
},
}
}
Ok(())
}
}

View File

@ -1,10 +1,14 @@
mod addr;
mod ping;
mod sync;
use bytes::Bytes;
use message::Error;
use message::common::Command;
pub use self::addr::AddrProtocol;
pub use self::ping::PingProtocol;
pub use self::sync::{SyncProtocol, InboundSyncConnection, InboundSyncConnectionRef, OutboundSyncConnection, OutboundSyncConnectionRef, LocalSyncNode, LocalSyncNodeRef};
#[derive(PartialEq, Clone, Copy)]
pub enum Direction {
@ -12,18 +16,15 @@ pub enum Direction {
Outbound,
}
pub enum ProtocolAction {
Reply((Command, Bytes)),
None,
Disconnect,
}
pub trait Protocol: Send {
/// Initialize the protocol.
fn initialize(&mut self, _direction: Direction, _version: u32) -> Result<ProtocolAction, Error> {
Ok(ProtocolAction::None)
}
fn initialize(&mut self, _direction: Direction, _version: u32) {}
/// Handle the message.
fn on_message(&self, command: &Command, payload: &Bytes, version: u32) -> Result<ProtocolAction, Error>;
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error>;
/// Boxes the protocol.
fn boxed(self) -> Box<Protocol> where Self: Sized + 'static {
Box::new(self)
}
}

View File

@ -1,56 +1,59 @@
use std::sync::Arc;
use bytes::Bytes;
use message::{Error, Payload, deserialize_payload, serialize_payload};
use message::{Error, Payload, deserialize_payload};
use message::types::{Ping, Pong};
use message::common::Command;
use protocol::{Protocol, ProtocolAction, Direction};
use protocol::{Protocol, Direction};
use util::nonce::{NonceGenerator, RandomNonce};
use p2p::Context;
use PeerId;
pub struct PingProtocol<T = RandomNonce> {
/// Nonce generator
/// Context
context: Arc<Context>,
/// Connected peer id.
peer: PeerId,
/// Nonce generator.
nonce_generator: T,
/// Last nonce sent in a ping message.
last_ping_nonce: u64,
/// Last nonce sent in the ping message.
last_ping_nonce: Option<u64>,
}
impl PingProtocol {
pub fn new() -> Self {
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
PingProtocol {
context: context,
peer: peer,
nonce_generator: RandomNonce::default(),
last_ping_nonce: 0,
last_ping_nonce: None,
}
}
}
impl<T> Protocol for PingProtocol<T> where T: NonceGenerator + Send {
fn initialize(&mut self, direction: Direction, version: u32) -> Result<ProtocolAction, Error> {
match direction {
Direction::Outbound => Ok(ProtocolAction::None),
Direction::Inbound => {
let nonce = self.nonce_generator.get();
self.last_ping_nonce = nonce;
let ping = Ping::new(nonce);
let serialized = try!(serialize_payload(&ping, version));
Ok(ProtocolAction::Reply((Ping::command().into(), serialized)))
},
}
fn initialize(&mut self, _direction: Direction, _version: u32) {
// bitcoind always sends ping, let's do the same
let nonce = self.nonce_generator.get();
self.last_ping_nonce = Some(nonce);
let ping = Ping::new(nonce);
let send = Context::send_to_peer(self.context.clone(), self.peer, &ping);
self.context.spawn(send);
}
fn on_message(&self, command: &Command, payload: &Bytes, version: u32) -> Result<ProtocolAction, Error> {
if command == &Ping::command().into() {
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> {
if command == &Ping::command() {
let ping: Ping = try!(deserialize_payload(payload, version));
let pong = Pong::new(ping.nonce);
let serialized = try!(serialize_payload(&pong, version));
Ok(ProtocolAction::Reply((Pong::command().into(), serialized)))
} else if command == &Pong::command().into() {
let send = Context::send_to_peer(self.context.clone(), self.peer, &pong);
self.context.spawn(send);
} else if command == &Pong::command() {
let pong: Pong = try!(deserialize_payload(payload, version));
if pong.nonce != self.last_ping_nonce {
Err(Error::InvalidCommand)
} else {
Ok(ProtocolAction::None)
if Some(pong.nonce) != self.last_ping_nonce.take() {
return Err(Error::InvalidCommand)
}
} else {
Ok(ProtocolAction::None)
}
Ok(())
}
}

253
p2p/src/protocol/sync.rs Normal file
View File

@ -0,0 +1,253 @@
use std::sync::Arc;
use parking_lot::Mutex;
use bytes::Bytes;
use message::{Command, Error, Payload, types, deserialize_payload};
use protocol::{Protocol, Direction};
use p2p::Context;
use PeerId;
pub type InboundSyncConnectionRef = Arc<Mutex<Box<InboundSyncConnection>>>;
pub type OutboundSyncConnectionRef = Arc<Mutex<Box<OutboundSyncConnection>>>;
pub type LocalSyncNodeRef = Arc<Mutex<Box<LocalSyncNode>>>;
// TODO: use this to respond to construct Version message (start_height field)
pub trait LocalSyncNode : Send + Sync {
fn start_height(&self) -> i32;
fn create_sync_session(&mut self, height: i32, outbound: OutboundSyncConnectionRef) -> InboundSyncConnectionRef;
}
pub trait InboundSyncConnection : Send + Sync {
fn start_sync_session(&mut self, version: u32);
fn on_inventory(&mut self, message: types::Inv);
fn on_getdata(&mut self, message: types::GetData);
fn on_getblocks(&mut self, message: types::GetBlocks);
fn on_getheaders(&mut self, message: types::GetHeaders);
fn on_transaction(&mut self, message: types::Tx);
fn on_block(&mut self, message: types::Block);
fn on_headers(&mut self, message: types::Headers);
fn on_mempool(&mut self, message: types::MemPool);
fn on_filterload(&mut self, message: types::FilterLoad);
fn on_filteradd(&mut self, message: types::FilterAdd);
fn on_filterclear(&mut self, message: types::FilterClear);
fn on_merkleblock(&mut self, message: types::MerkleBlock);
fn on_sendheaders(&mut self, message: types::SendHeaders);
fn on_feefilter(&mut self, message: types::FeeFilter);
fn on_send_compact(&mut self, message: types::SendCompact);
fn on_compact_block(&mut self, message: types::CompactBlock);
fn on_get_block_txn(&mut self, message: types::GetBlockTxn);
fn on_block_txn(&mut self, message: types::BlockTxn);
}
pub trait OutboundSyncConnection : Send + Sync {
fn send_inventory(&mut self, message: &types::Inv);
fn send_getdata(&mut self, message: &types::GetData);
fn send_getblocks(&mut self, message: &types::GetBlocks);
fn send_getheaders(&mut self, message: &types::GetHeaders);
fn send_transaction(&mut self, message: &types::Tx);
fn send_block(&mut self, message: &types::Block);
fn send_headers(&mut self, message: &types::Headers);
fn send_mempool(&mut self, message: &types::MemPool);
fn send_filterload(&mut self, message: &types::FilterLoad);
fn send_filteradd(&mut self, message: &types::FilterAdd);
fn send_filterclear(&mut self, message: &types::FilterClear);
fn send_merkleblock(&mut self, message: &types::MerkleBlock);
fn send_sendheaders(&mut self, message: &types::SendHeaders);
fn send_feefilter(&mut self, message: &types::FeeFilter);
fn send_send_compact(&mut self, message: &types::SendCompact);
fn send_compact_block(&mut self, message: &types::CompactBlock);
fn send_get_block_txn(&mut self, message: &types::GetBlockTxn);
fn send_block_txn(&mut self, message: &types::BlockTxn);
}
struct OutboundSync {
context: Arc<Context>,
peer: PeerId,
}
impl OutboundSync {
pub fn new(context: Arc<Context>, peer: PeerId) -> OutboundSync {
OutboundSync {
context: context,
peer: peer,
}
}
pub fn send_message<T>(&self, message: &T) where T: Payload {
let send = Context::send_to_peer(self.context.clone(), self.peer, message);
self.context.spawn(send);
}
pub fn boxed(self) -> Box<OutboundSyncConnection> {
Box::new(self)
}
}
impl OutboundSyncConnection for OutboundSync {
fn send_inventory(&mut self, message: &types::Inv) {
self.send_message(message);
}
fn send_getdata(&mut self, message: &types::GetData) {
self.send_message(message);
}
fn send_getblocks(&mut self, message: &types::GetBlocks) {
self.send_message(message);
}
fn send_getheaders(&mut self, message: &types::GetHeaders) {
self.send_message(message);
}
fn send_transaction(&mut self, message: &types::Tx) {
self.send_message(message);
}
fn send_block(&mut self, message: &types::Block) {
self.send_message(message);
}
fn send_headers(&mut self, message: &types::Headers) {
self.send_message(message);
}
fn send_mempool(&mut self, message: &types::MemPool) {
self.send_message(message);
}
fn send_filterload(&mut self, message: &types::FilterLoad) {
self.send_message(message);
}
fn send_filteradd(&mut self, message: &types::FilterAdd) {
self.send_message(message);
}
fn send_filterclear(&mut self, message: &types::FilterClear) {
self.send_message(message);
}
fn send_merkleblock(&mut self, message: &types::MerkleBlock) {
self.send_message(message);
}
fn send_sendheaders(&mut self, message: &types::SendHeaders) {
self.send_message(message);
}
fn send_feefilter(&mut self, message: &types::FeeFilter) {
self.send_message(message);
}
fn send_send_compact(&mut self, message: &types::SendCompact) {
self.send_message(message);
}
fn send_compact_block(&mut self, message: &types::CompactBlock) {
self.send_message(message);
}
fn send_get_block_txn(&mut self, message: &types::GetBlockTxn) {
self.send_message(message);
}
fn send_block_txn(&mut self, message: &types::BlockTxn) {
self.send_message(message);
}
}
pub struct SyncProtocol {
inbound_connection: InboundSyncConnectionRef,
}
impl SyncProtocol {
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
let outbound_connection = Arc::new(Mutex::new(OutboundSync::new(context.clone(), peer).boxed()));
let inbound_connection = context.create_sync_session(0, outbound_connection);
SyncProtocol {
inbound_connection: inbound_connection,
}
}
}
impl Protocol for SyncProtocol {
fn initialize(&mut self, _direction: Direction, version: u32) {
self.inbound_connection.lock().start_sync_session(version);
}
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> {
if command == &types::Inv::command() {
let message: types::Inv = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_inventory(message);
}
else if command == &types::GetData::command() {
let message: types::GetData = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_getdata(message);
}
else if command == &types::GetBlocks::command() {
let message: types::GetBlocks = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_getblocks(message);
}
else if command == &types::GetHeaders::command() {
let message: types::GetHeaders = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_getheaders(message);
}
else if command == &types::Tx::command() {
let message: types::Tx = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_transaction(message);
}
else if command == &types::Block::command() {
let message: types::Block = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_block(message);
}
else if command == &types::MemPool::command() {
let message: types::MemPool = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_mempool(message);
}
else if command == &types::Headers::command() {
let message: types::Headers = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_headers(message);
}
else if command == &types::FilterLoad::command() {
let message: types::FilterLoad = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_filterload(message);
}
else if command == &types::FilterAdd::command() {
let message: types::FilterAdd = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_filteradd(message);
}
else if command == &types::FilterClear::command() {
let message: types::FilterClear = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_filterclear(message);
}
else if command == &types::MerkleBlock::command() {
let message: types::MerkleBlock = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_merkleblock(message);
}
else if command == &types::SendHeaders::command() {
let message: types::SendHeaders = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_sendheaders(message);
}
else if command == &types::FeeFilter::command() {
let message: types::FeeFilter = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_feefilter(message);
}
else if command == &types::SendCompact::command() {
let message: types::SendCompact = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_send_compact(message);
}
else if command == &types::CompactBlock::command() {
let message: types::CompactBlock = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_compact_block(message);
}
else if command == &types::GetBlockTxn::command() {
let message: types::GetBlockTxn = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_get_block_txn(message);
}
else if command == &types::BlockTxn::command() {
let message: types::BlockTxn = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_block_txn(message);
}
Ok(())
}
}

View File

@ -1,89 +1,62 @@
use std::sync::Arc;
use parking_lot::Mutex;
use futures::{collect, finished, failed, Future};
use tokio_core::io::IoFuture;
use bytes::Bytes;
use message::Command;
use message::{Command, Error};
use p2p::Context;
use net::Channel;
use protocol::{Protocol, ProtocolAction, PingProtocol, Direction};
use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, Direction};
use PeerId;
pub trait SessionFactory {
fn new_session(context: Arc<Context>, peer: PeerId) -> Session;
}
pub struct SeednodeSessionFactory;
impl SessionFactory for SeednodeSessionFactory {
fn new_session(context: Arc<Context>, peer: PeerId) -> Session {
let ping = PingProtocol::new(context.clone(), peer).boxed();
let addr = AddrProtocol::new(context.clone(), peer).boxed();
Session::new(vec![ping, addr])
}
}
pub struct NormalSessionFactory;
impl SessionFactory for NormalSessionFactory {
fn new_session(context: Arc<Context>, peer: PeerId) -> Session {
let ping = PingProtocol::new(context.clone(), peer).boxed();
let addr = AddrProtocol::new(context.clone(), peer).boxed();
let sync = SyncProtocol::new(context, peer).boxed();
Session::new(vec![ping, addr, sync])
}
}
pub struct Session {
protocols: Mutex<Vec<Box<Protocol>>>,
}
impl Session {
pub fn new() -> Self {
let ping = PingProtocol::new();
Session::new_with_protocols(vec![Box::new(ping)])
}
pub fn new_seednode() -> Self {
let ping = PingProtocol::new();
Session::new_with_protocols(vec![Box::new(ping)])
}
pub fn new_with_protocols(protocols: Vec<Box<Protocol>>) -> Self {
pub fn new(protocols: Vec<Box<Protocol>>) -> Self {
Session {
protocols: Mutex::new(protocols),
}
}
pub fn initialize(&self, context: Arc<Context>, channel: Arc<Channel>, direction: Direction) -> IoFuture<()> {
let futures = self.protocols.lock()
.iter_mut()
.map(|protocol| {
// TODO: use real direction and version
match protocol.initialize(direction, channel.version()) {
Ok(ProtocolAction::None) => {
finished(()).boxed()
},
Ok(ProtocolAction::Disconnect) => {
// no other protocols can use the channel after that
context.close_connection(channel.peer_info());
finished(()).boxed()
},
Ok(ProtocolAction::Reply((command, payload))) => {
Context::send_raw(context.clone(), channel.clone(), command, &payload)
},
Err(err) => {
// protocol error
unimplemented!();
}
}
})
.collect::<Vec<_>>();
collect(futures)
.and_then(|_| finished(()))
.boxed()
pub fn initialize(&self, channel: Arc<Channel>, direction: Direction) {
for protocol in self.protocols.lock().iter_mut() {
protocol.initialize(direction, channel.version());
}
}
pub fn on_message(&self, context: Arc<Context>, channel: Arc<Channel>, command: Command, payload: Bytes) -> IoFuture<()> {
let futures = self.protocols.lock()
.iter()
pub fn on_message(&self, channel: Arc<Channel>, command: Command, payload: Bytes) -> Result<(), Error> {
self.protocols.lock()
.iter_mut()
.map(|protocol| {
// TODO: use real version
match protocol.on_message(&command, &payload, channel.version()) {
Ok(ProtocolAction::None) => {
finished(()).boxed()
},
Ok(ProtocolAction::Disconnect) => {
context.close_connection(channel.peer_info());
finished(()).boxed()
},
Ok(ProtocolAction::Reply((command, payload))) => {
Context::send_raw(context.clone(), channel.clone(), command, &payload)
},
Err(err) => {
// protocol error
unimplemented!();
},
}
protocol.on_message(&command, &payload, channel.version())
})
.collect::<Vec<_>>();
collect(futures)
.and_then(|_| finished(()))
.boxed()
.collect::<Result<Vec<_>, Error>>()
.map(|_| ())
}
}

View File

@ -1,7 +1,9 @@
use std::collections::{HashMap, BTreeSet};
use std::collections::hash_map::Entry;
use std::net::SocketAddr;
use std::cmp::{PartialOrd, Ord, Ordering};
use message::common::Services;
use message::common::{Services, NetAddress};
use message::types::addr::AddressEntry;
use util::time::{Time, RealTime};
#[derive(PartialEq, Eq, Clone)]
@ -16,26 +18,80 @@ pub struct Node {
failures: u32,
}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
if self.failures == other.failures {
self.time.partial_cmp(&other.time)
} else {
other.failures.partial_cmp(&self.failures)
impl From<AddressEntry> for Node {
fn from(entry: AddressEntry) -> Self {
Node {
addr: SocketAddr::new(entry.address.address.into(), entry.address.port.into()),
time: entry.timestamp as i64,
services: entry.address.services,
failures: 0,
}
}
}
impl Ord for Node {
fn cmp(&self, other: &Self) -> Ordering {
if self.failures == other.failures {
self.time.cmp(&other.time)
} else {
other.failures.cmp(&self.failures)
impl From<Node> for AddressEntry {
fn from(node: Node) -> Self {
AddressEntry {
timestamp: node.time as u32,
address: NetAddress {
services: node.services,
address: node.addr.ip().into(),
port: node.addr.port().into(),
}
}
}
}
#[derive(PartialEq, Eq, Clone)]
struct NodeByScore(Node);
impl From<Node> for NodeByScore {
fn from(node: Node) -> Self {
NodeByScore(node)
}
}
impl PartialOrd for NodeByScore {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
if self.0.failures == other.0.failures {
other.0.time.partial_cmp(&self.0.time)
} else {
self.0.failures.partial_cmp(&other.0.failures)
}
}
}
impl Ord for NodeByScore {
fn cmp(&self, other: &Self) -> Ordering {
if self.0.failures == other.0.failures {
other.0.time.cmp(&self.0.time)
} else {
self.0.failures.cmp(&other.0.failures)
}
}
}
#[derive(PartialEq, Eq, Clone)]
struct NodeByTime(Node);
impl From<Node> for NodeByTime {
fn from(node: Node) -> Self {
NodeByTime(node)
}
}
impl PartialOrd for NodeByTime {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.0.time.partial_cmp(&self.0.time)
}
}
impl Ord for NodeByTime {
fn cmp(&self, other: &Self) -> Ordering {
other.0.time.cmp(&self.0.time)
}
}
#[derive(Default)]
pub struct NodeTable<T = RealTime> where T: Time {
/// Time source.
@ -43,50 +99,117 @@ pub struct NodeTable<T = RealTime> where T: Time {
/// Nodes by socket address.
by_addr: HashMap<SocketAddr, Node>,
/// Nodes sorted by score.
by_score: BTreeSet<Node>,
by_score: BTreeSet<NodeByScore>,
/// Nodes sorted by time.
by_time: BTreeSet<NodeByTime>,
}
impl<T> NodeTable<T> where T: Time {
/// Inserts new address and services pair into NodeTable.
pub fn insert(&mut self, addr: SocketAddr, services: Services) {
let failures = self.by_addr.get(&addr).map_or(0, |ref node| node.failures);
let now = self.time.get().sec;
match self.by_addr.entry(addr) {
Entry::Occupied(mut entry) => {
let old = entry.get_mut();
assert!(self.by_score.remove(&old.clone().into()));
assert!(self.by_time.remove(&old.clone().into()));
old.time = now;
old.services = services;
self.by_score.insert(old.clone().into());
self.by_time.insert(old.clone().into());
},
Entry::Vacant(entry) => {
let node = Node {
addr: addr,
time: now,
services: services,
failures: 0,
};
self.by_score.insert(node.clone().into());
self.by_time.insert(node.clone().into());
entry.insert(node);
}
}
}
let node = Node {
addr: addr,
time: self.time.get().sec,
services: services,
failures: failures,
};
/// Inserts many new addresses into node table.
/// Used in `addr` request handler.
/// Discards all nodes with timestamp newer than current time.
pub fn insert_many(&mut self, nodes: Vec<Node>) {
// discard all nodes with timestamp newer than current time.
let now = self.time.get().sec;
let iter = nodes.into_iter()
.filter(|node| node.time <= now);
self.by_addr.insert(addr, node.clone());
self.by_score.insert(node);
// iterate over the rest
for node in iter {
match self.by_addr.entry(node.addr) {
Entry::Occupied(mut entry) => {
let old = entry.get_mut();
// we've already seen this node
if old.time < node.time {
assert!(self.by_score.remove(&old.clone().into()));
assert!(self.by_time.remove(&old.clone().into()));
// update node info
old.time = node.time;
old.services = node.services;
self.by_score.insert(old.clone().into());
self.by_time.insert(old.clone().into());
}
},
Entry::Vacant(entry)=> {
// it's first time we see this node
self.by_score.insert(node.clone().into());
self.by_time.insert(node.clone().into());
entry.insert(node);
}
}
}
}
/// Returnes most reliable nodes with desired services.
pub fn nodes_with_services(&self, services: &Services, limit: usize) -> Vec<Node> {
pub fn _nodes_with_services(&self, services: &Services, limit: usize) -> Vec<Node> {
self.by_score.iter()
.rev()
.filter(|s| s.services.includes(services))
.map(Clone::clone)
.filter(|node| node.0.services.includes(services))
.map(|node| node.0.clone())
.take(limit)
.collect()
}
/// Returns most recently active nodes.
///
/// The documenation says:
/// "Non-advertised nodes should be forgotten after typically 3 hours"
/// but bitcoin client still advertises them even after a month.
/// Let's do the same.
///
/// https://en.bitcoin.it/wiki/Protocol_documentation#addr
pub fn recently_active_nodes(&self) -> Vec<Node> {
self.by_time.iter()
.map(|node| node.0.clone())
.take(1000)
.collect()
}
/// Marks address as recently used.
pub fn note_used(&mut self, addr: &SocketAddr) {
if let Some(ref mut node) = self.by_addr.get_mut(addr) {
assert!(self.by_score.remove(node));
assert!(self.by_score.remove(&node.clone().into()));
assert!(self.by_time.remove(&node.clone().into()));
node.time = self.time.get().sec;
self.by_score.insert(node.clone());
self.by_score.insert(node.clone().into());
self.by_time.insert(node.clone().into());
}
}
/// Notes failure.
pub fn note_failure(&mut self, addr: &SocketAddr) {
if let Some(ref mut node) = self.by_addr.get_mut(addr) {
assert!(self.by_score.remove(node));
assert!(self.by_score.remove(&node.clone().into()));
assert!(self.by_time.remove(&node.clone().into()));
node.failures += 1;
self.by_score.insert(node.clone());
self.by_score.insert(node.clone().into());
self.by_time.insert(node.clone().into());
}
}
}
@ -107,7 +230,7 @@ mod tests {
table.insert(s0, Services::default());
table.insert(s1, Services::default());
table.insert(s2, Services::default());
let nodes = table.nodes_with_services(&Services::default(), 2);
let nodes = table._nodes_with_services(&Services::default(), 2);
assert_eq!(nodes.len(), 2);
assert_eq!(nodes[0].addr, s2);
assert_eq!(nodes[0].time, 2);
@ -135,7 +258,7 @@ mod tests {
table.note_used(&s1);
table.note_failure(&s2);
table.note_failure(&s3);
let nodes = table.nodes_with_services(&Services::default(), 10);
let nodes = table._nodes_with_services(&Services::default(), 10);
assert_eq!(nodes.len(), 5);
assert_eq!(nodes[0].addr, s1);
@ -157,5 +280,29 @@ mod tests {
assert_eq!(nodes[4].addr, s3);
assert_eq!(nodes[4].time, 3);
assert_eq!(nodes[4].failures, 1);
let nodes = table.recently_active_nodes();
assert_eq!(nodes.len(), 5);
assert_eq!(nodes[0].addr, s1);
assert_eq!(nodes[0].time, 7);
assert_eq!(nodes[0].failures, 0);
assert_eq!(nodes[1].addr, s4);
assert_eq!(nodes[1].time, 6);
assert_eq!(nodes[1].failures, 0);
assert_eq!(nodes[2].addr, s2);
assert_eq!(nodes[2].time, 5);
assert_eq!(nodes[2].failures, 1);
assert_eq!(nodes[3].addr, s3);
assert_eq!(nodes[3].time, 3);
assert_eq!(nodes[3].failures, 1);
assert_eq!(nodes[4].addr, s0);
assert_eq!(nodes[4].time, 0);
assert_eq!(nodes[4].failures, 0);
}
}

View File

@ -13,15 +13,15 @@ impl NonceGenerator for RandomNonce {
}
}
pub struct StaticNonce(u64);
pub struct _StaticNonce(u64);
impl StaticNonce {
pub fn new(nonce: u64) -> Self {
StaticNonce(nonce)
impl _StaticNonce {
pub fn _new(nonce: u64) -> Self {
_StaticNonce(nonce)
}
}
impl NonceGenerator for StaticNonce {
impl NonceGenerator for _StaticNonce {
fn get(&self) -> u64 {
self.0
}

View File

@ -14,20 +14,6 @@ impl Time for RealTime {
}
}
pub struct StaticTime(time::Timespec);
impl StaticTime {
pub fn new(time: time::Timespec) -> Self {
StaticTime(time)
}
}
impl Time for StaticTime {
fn get(&self) -> time::Timespec {
self.0
}
}
#[derive(Default)]
pub struct IncrementalTime {
counter: Cell<i64>,

View File

@ -8,11 +8,14 @@ extern crate keys;
extern crate script;
extern crate message;
extern crate p2p;
extern crate sync;
mod config;
use std::net::SocketAddr;
use p2p::{P2P, event_loop, forever, net};
use p2p::{P2P, event_loop, forever, NetConfig};
use sync::local_node::LocalNode;
use sync::inbound_connection_factory::InboundConnectionFactory;
fn main() {
env_logger::init().unwrap();
@ -35,7 +38,7 @@ fn run() -> Result<(), String> {
protocol_maximum: 70017,
inbound_connections: 10,
outbound_connections: 10,
connection: net::Config {
connection: NetConfig {
magic: cfg.magic,
local_address: SocketAddr::new("127.0.0.1".parse().unwrap(), cfg.port),
services: Default::default(),
@ -47,7 +50,10 @@ fn run() -> Result<(), String> {
seeds: cfg.seednode.map_or_else(|| vec![], |x| vec![x]),
};
let p2p = P2P::new(p2p_cfg, el.handle());
let local_sync_node = LocalNode::new();
let local_sync_factory = InboundConnectionFactory::with_local_node(local_sync_node.clone());
let p2p = P2P::new(p2p_cfg, local_sync_factory, el.handle());
try!(p2p.run().map_err(|_| "Failed to start p2p module"));
el.run(forever()).unwrap();
Ok(())

13
sync/Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "sync"
version = "0.1.0"
authors = ["Ethcore <admin@ethcore.io>"]
[dependencies]
parking_lot = "0.3"
log = "0.3"
chain = { path = "../chain" }
message = { path = "../message" }
p2p = { path = "../p2p" }
primitives = { path = "../primitives" }

7
sync/src/best_block.rs Normal file
View File

@ -0,0 +1,7 @@
use primitives::hash::H256;
#[derive(Debug, Clone)]
pub struct BestBlock {
pub height: u64,
pub hash: H256,
}

View File

@ -0,0 +1,96 @@
use parking_lot::Mutex;
use message::types;
use p2p::{InboundSyncConnection, InboundSyncConnectionRef};
use local_node::LocalNodeRef;
pub struct InboundConnection {
local_node: LocalNodeRef,
peer_index: usize,
}
impl InboundConnection {
pub fn new(local_node: LocalNodeRef, peer_index: usize) -> InboundSyncConnectionRef {
InboundSyncConnectionRef::new(Mutex::new(Box::new(InboundConnection {
local_node: local_node,
peer_index: peer_index,
})))
}
}
impl InboundSyncConnection for InboundConnection {
fn start_sync_session(&mut self, version: u32) {
self.local_node.lock().start_sync_session(self.peer_index, version);
}
fn on_inventory(&mut self, message: types::Inv) {
self.local_node.lock().on_peer_inventory(self.peer_index, message);
}
fn on_getdata(&mut self, message: types::GetData) {
self.local_node.lock().on_peer_getdata(self.peer_index, message);
}
fn on_getblocks(&mut self, message: types::GetBlocks) {
self.local_node.lock().on_peer_getblocks(self.peer_index, message);
}
fn on_getheaders(&mut self, message: types::GetHeaders) {
self.local_node.lock().on_peer_getheaders(self.peer_index, message);
}
fn on_transaction(&mut self, message: types::Tx) {
self.local_node.lock().on_peer_transaction(self.peer_index, message);
}
fn on_block(&mut self, message: types::Block) {
self.local_node.lock().on_peer_block(self.peer_index, message);
}
fn on_headers(&mut self, message: types::Headers) {
self.local_node.lock().on_peer_headers(self.peer_index, message);
}
fn on_mempool(&mut self, message: types::MemPool) {
self.local_node.lock().on_peer_mempool(self.peer_index, message);
}
fn on_filterload(&mut self, message: types::FilterLoad) {
self.local_node.lock().on_peer_filterload(self.peer_index, message);
}
fn on_filteradd(&mut self, message: types::FilterAdd) {
self.local_node.lock().on_peer_filteradd(self.peer_index, message);
}
fn on_filterclear(&mut self, message: types::FilterClear) {
self.local_node.lock().on_peer_filterclear(self.peer_index, message);
}
fn on_merkleblock(&mut self, message: types::MerkleBlock) {
self.local_node.lock().on_peer_merkleblock(self.peer_index, message);
}
fn on_sendheaders(&mut self, message: types::SendHeaders) {
self.local_node.lock().on_peer_sendheaders(self.peer_index, message);
}
fn on_feefilter(&mut self, message: types::FeeFilter) {
self.local_node.lock().on_peer_feefilter(self.peer_index, message);
}
fn on_send_compact(&mut self, message: types::SendCompact) {
self.local_node.lock().on_peer_send_compact(self.peer_index, message);
}
fn on_compact_block(&mut self, message: types::CompactBlock) {
self.local_node.lock().on_peer_compact_block(self.peer_index, message);
}
fn on_get_block_txn(&mut self, message: types::GetBlockTxn) {
self.local_node.lock().on_peer_get_block_txn(self.peer_index, message);
}
fn on_block_txn(&mut self, message: types::BlockTxn) {
self.local_node.lock().on_peer_block_txn(self.peer_index, message);
}
}

View File

@ -0,0 +1,35 @@
use std::sync::Arc;
use parking_lot::Mutex;
use p2p::{LocalSyncNode, OutboundSyncConnectionRef, InboundSyncConnectionRef};
use local_node::LocalNodeRef;
use inbound_connection::InboundConnection;
pub struct InboundConnectionFactory {
local_node: LocalNodeRef,
}
impl InboundConnectionFactory {
pub fn with_local_node(local_node: LocalNodeRef) -> Arc<Mutex<Box<LocalSyncNode>>> {
Arc::new(
Mutex::new(
Box::new(
InboundConnectionFactory {
local_node: local_node,
}
)
)
)
}
}
impl LocalSyncNode for InboundConnectionFactory {
fn start_height(&self) -> i32 {
self.local_node.lock().best_block().height as i32
}
fn create_sync_session(&mut self, best_block_height: i32, outbound_connection: OutboundSyncConnectionRef) -> InboundSyncConnectionRef {
let peer_index = self.local_node.lock().create_sync_session(best_block_height, outbound_connection);
let inbound_connection = InboundConnection::new(self.local_node.clone(), peer_index);
inbound_connection
}
}

13
sync/src/lib.rs Normal file
View File

@ -0,0 +1,13 @@
extern crate chain;
#[macro_use]
extern crate log;
extern crate message;
extern crate p2p;
extern crate parking_lot;
extern crate primitives;
pub mod best_block;
mod inbound_connection;
pub mod inbound_connection_factory;
mod local_chain;
pub mod local_node;

119
sync/src/local_chain.rs Normal file
View File

@ -0,0 +1,119 @@
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use chain::Block;
use primitives::hash::H256;
use best_block::BestBlock;
#[derive(Debug)]
pub struct Info {
pub chain_length: usize,
pub orphan_count: usize,
}
// TODO: this is temp storage (to use during test stage)
// it must be replaced with db + verification queue + mempools (transaction, block, ...)
pub struct LocalChain {
blocks_order: Vec<H256>,
blocks_map: HashMap<H256, Block>,
orphan_blocks: HashMap<H256, Block>,
}
impl LocalChain {
pub fn new() -> LocalChain {
let mut chain = LocalChain {
blocks_order: Vec::new(),
blocks_map: HashMap::new(),
orphan_blocks: HashMap::new(),
};
// TODO: move this to config
let genesis_block: Block = "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into();
let genesis_block_hash = genesis_block.hash();
chain.blocks_order.push(genesis_block_hash.clone());
chain.blocks_map.insert(genesis_block_hash, genesis_block);
chain
}
pub fn info(&self) -> Info {
Info {
chain_length: self.blocks_order.len(),
orphan_count: self.orphan_blocks.len(),
}
}
pub fn best_block(&self) -> BestBlock {
let height = self.blocks_order.len() - 1;
let ref block = self.blocks_order[height];
BestBlock {
height: height as u64,
hash: block.clone(),
}
}
pub fn block_locator_hashes(&self) -> Vec<H256> {
let mut index = self.blocks_order.len() - 1;
let mut hashes: Vec<H256> = Vec::new();
let mut step = 1;
loop {
let block_hash = self.blocks_order[index].clone();
hashes.push(block_hash);
if hashes.len() >= 10 {
step <<= 1;
}
if index < step {
break;
}
index -= step;
}
hashes
}
pub fn is_known_block(&self, hash: &H256) -> bool {
self.blocks_map.contains_key(hash)
}
pub fn insert_block(&mut self, block: &Block) {
// check if already known block
let block_header_hash = block.block_header.hash();
if self.blocks_map.contains_key(&block_header_hash) {
return;
}
// check if parent block is in the storage
// if there is no parent block for this block, remember as orphaned
if !self.blocks_map.contains_key(&block.block_header.previous_header_hash) {
self.orphan_blocks.insert(block.block_header.previous_header_hash.clone(), block.clone());
return;
}
// insert block
for i in 0..self.blocks_order.len() {
if self.blocks_order[i] == block.block_header.previous_header_hash {
self.blocks_order.insert(i + 1, block_header_hash.clone());
self.blocks_map.insert(block_header_hash.clone(), block.clone());
// TODO: forks
// check if any orphan blocks now can be moved to the blockchain
let mut position = i + 1;
let mut block_header_hash = block_header_hash;
while let Entry::Occupied(orphan_block_entry) = self.orphan_blocks.entry(block_header_hash.clone()) {
// remove from orphans
let (_, orphan_block) = orphan_block_entry.remove_entry();
let orphan_block_hash = orphan_block.hash();
// insert to blockchain
self.blocks_map.insert(block_header_hash.clone(), orphan_block);
block_header_hash = orphan_block_hash;
// insert to ordering
self.blocks_order.insert(position + 1, block_header_hash.clone());
position += 1;
}
return;
}
}
}
}

202
sync/src/local_node.rs Normal file
View File

@ -0,0 +1,202 @@
use std::sync::Arc;
use std::collections::HashMap;
use parking_lot::Mutex;
use p2p::OutboundSyncConnectionRef;
use primitives::hash::H256;
use message::Payload;
use message::common::InventoryType;
use message::types;
use local_chain::LocalChain;
use best_block::BestBlock;
pub type LocalNodeRef = Arc<Mutex<LocalNode>>;
pub struct LocalNode {
peer_counter: usize,
chain: LocalChain,
peers: HashMap<usize, RemoteNode>,
}
struct RemoteNode {
version: u32,
connection: OutboundSyncConnectionRef,
getdata_requests: usize,
}
impl LocalNode {
pub fn new() -> LocalNodeRef {
Arc::new(Mutex::new(LocalNode {
peer_counter: 0,
chain: LocalChain::new(),
peers: HashMap::new(),
}))
}
pub fn best_block(&self) -> BestBlock {
self.chain.best_block()
}
pub fn create_sync_session(&mut self, _best_block_height: i32, outbound_connection: OutboundSyncConnectionRef) -> usize {
trace!(target: "sync", "Creating new sync session with peer#{}", self.peer_counter + 1);
// save connection for future
self.peer_counter += 1;
self.peers.insert(self.peer_counter, RemoteNode {
version: 0,
connection: outbound_connection.clone(),
getdata_requests: 0,
});
self.peer_counter
}
pub fn start_sync_session(&mut self, peer_index: usize, version: u32) {
trace!(target: "sync", "Starting new sync session with peer#{}", peer_index);
let peer = self.peers.get_mut(&peer_index).unwrap();
let mut connection = peer.connection.lock();
let connection = &mut *connection;
peer.version = version;
// start headers sync
if peer.version >= types::SendHeaders::version() {
// send `sendheaders` message to receive `headers` message instead of `inv` message
trace!(target: "sync", "Sending `sendheaders` to peer#{}", peer_index);
let sendheaders = types::SendHeaders {};
connection.send_sendheaders(&sendheaders);
// TODO: why we do not support `sendheaders`?
// TODO: why latest bitcoind doesn't responds to the `getheaders` message?
// TODO: `getheaders` can be used only after `sendheaders`?
}
// get peer' inventory with newest blocks
trace!(target: "sync", "Sending `getblocks` to peer#{}", peer_index);
let getblocks = types::GetBlocks {
version: 0,
block_locator_hashes: self.chain.block_locator_hashes(),
hash_stop: H256::default(),
};
connection.send_getblocks(&getblocks);
}
pub fn on_peer_inventory(&mut self, peer_index: usize, message: types::Inv) {
trace!(target: "sync", "Got `inventory` message from peer#{}. Inventory len: {}", peer_index, message.inventory.len());
// TODO: after each `getblocks` message bitcoind responds with two `inventory` messages:
// (1) with single entry
// (2) with 500 entries
// what if (1)?
let mut getdata = types::GetData {
inventory: Vec::new(),
};
for item in message.inventory.iter() {
match InventoryType::from_u32(item.inv_type) {
Some(InventoryType::MessageBlock) => {
if !self.chain.is_known_block(&item.hash) {
getdata.inventory.push(item.clone());
}
},
_ => (),
}
}
// request unknown inventory data
if !getdata.inventory.is_empty() {
trace!(target: "sync", "Sending `getdata` message to peer#{}. Querying #{} unknown blocks", peer_index, getdata.inventory.len());
let peer = self.peers.get_mut(&peer_index).unwrap();
peer.getdata_requests += getdata.inventory.len();
let mut connection = peer.connection.lock();
let connection = &mut *connection;
connection.send_getdata(&getdata);
}
}
pub fn on_peer_getdata(&mut self, peer_index: usize, _message: types::GetData) {
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
}
pub fn on_peer_getblocks(&mut self, peer_index: usize, _message: types::GetBlocks) {
trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index);
}
pub fn on_peer_getheaders(&mut self, peer_index: usize, _message: types::GetHeaders) {
trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index);
}
pub fn on_peer_transaction(&mut self, peer_index: usize, _message: types::Tx) {
trace!(target: "sync", "Got `tx` message from peer#{}", peer_index);
}
pub fn on_peer_block(&mut self, peer_index: usize, message: types::Block) {
// insert block to the chain
self.chain.insert_block(&message.block);
// decrease pending requests count
let peer = self.peers.get_mut(&peer_index).unwrap();
peer.getdata_requests -= 1;
trace!(target: "sync", "Got `block` message from peer#{}. Pending #{} requests", peer_index, peer.getdata_requests);
// if there are no pending requests, continue with next blocks chunk
if peer.getdata_requests == 0 {
trace!(target: "sync", "Sending `getblocks` to peer#{}. Local chain state: {:?}", peer_index, self.chain.info());
let getblocks = types::GetBlocks {
version: 0,
block_locator_hashes: self.chain.block_locator_hashes(),
hash_stop: H256::default(),
};
let mut connection = peer.connection.lock();
let connection = &mut *connection;
connection.send_getblocks(&getblocks);
}
}
pub fn on_peer_headers(&mut self, peer_index: usize, _message: types::Headers) {
trace!(target: "sync", "Got `headers` message from peer#{}", peer_index);
}
pub fn on_peer_mempool(&mut self, peer_index: usize, _message: types::MemPool) {
trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index);
}
pub fn on_peer_filterload(&mut self, peer_index: usize, _message: types::FilterLoad) {
trace!(target: "sync", "Got `filterload` message from peer#{}", peer_index);
}
pub fn on_peer_filteradd(&mut self, peer_index: usize, _message: types::FilterAdd) {
trace!(target: "sync", "Got `filteradd` message from peer#{}", peer_index);
}
pub fn on_peer_filterclear(&mut self, peer_index: usize, _message: types::FilterClear) {
trace!(target: "sync", "Got `filterclear` message from peer#{}", peer_index);
}
pub fn on_peer_merkleblock(&mut self, peer_index: usize, _message: types::MerkleBlock) {
trace!(target: "sync", "Got `merkleblock` message from peer#{}", peer_index);
}
pub fn on_peer_sendheaders(&mut self, peer_index: usize, _message: types::SendHeaders) {
trace!(target: "sync", "Got `sendheaders` message from peer#{}", peer_index);
}
pub fn on_peer_feefilter(&mut self, peer_index: usize, _message: types::FeeFilter) {
trace!(target: "sync", "Got `feefilter` message from peer#{}", peer_index);
}
pub fn on_peer_send_compact(&mut self, peer_index: usize, _message: types::SendCompact) {
trace!(target: "sync", "Got `sendcmpct` message from peer#{}", peer_index);
}
pub fn on_peer_compact_block(&mut self, peer_index: usize, _message: types::CompactBlock) {
trace!(target: "sync", "Got `cmpctblock` message from peer#{}", peer_index);
}
pub fn on_peer_get_block_txn(&mut self, peer_index: usize, _message: types::GetBlockTxn) {
trace!(target: "sync", "Got `getblocktxn` message from peer#{}", peer_index);
}
pub fn on_peer_block_txn(&mut self, peer_index: usize, _message: types::BlockTxn) {
trace!(target: "sync", "Got `blocktxn` message from peer#{}", peer_index);
}
}