Merge branch 'master' into sync_transactions

This commit is contained in:
Svyatoslav Nikolsky 2016-11-17 11:14:05 +03:00
commit 4e74bb7f68
41 changed files with 735 additions and 465 deletions

28
Cargo.lock generated
View File

@ -104,13 +104,13 @@ dependencies = [
"bitcrypto 0.1.0",
"heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
"serialization 0.1.0",
]
[[package]]
name = "clap"
version = "2.17.1"
version = "2.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -135,7 +135,7 @@ version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -199,7 +199,7 @@ dependencies = [
"gcc 0.3.38 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -270,15 +270,15 @@ dependencies = [
"base58 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bitcrypto 0.1.0",
"eth-secp256k1 0.5.6 (git+https://github.com/ethcore/rust-secp256k1)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "lazy_static"
version = "0.2.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -476,7 +476,7 @@ version = "0.1.0"
dependencies = [
"app_dirs 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"chain 0.1.0",
"clap 2.17.1 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.18.0 (registry+https://github.com/rust-lang/crates.io-index)",
"db 0.1.0",
"env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"import 0.1.0",
@ -495,7 +495,7 @@ name = "primitives"
version = "0.1.0"
dependencies = [
"heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -565,13 +565,13 @@ dependencies = [
"gcc 0.3.38 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rustc-serialize"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -781,7 +781,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d"
"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"
"checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c"
"checksum clap 2.17.1 (registry+https://github.com/rust-lang/crates.io-index)" = "27dac76762fb56019b04aed3ccb43a770a18f80f9c2eb62ee1a18d9fb4ea2430"
"checksum clap 2.18.0 (registry+https://github.com/rust-lang/crates.io-index)" = "40046b8a004bf3ba43b9078bf4b9b6d1708406a234848f925dbd7160a374c8a8"
"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97"
"checksum csv 0.14.7 (registry+https://github.com/rust-lang/crates.io-index)" = "266c1815d7ca63a5bd86284043faf91e8c95e943e55ce05dc0ae08e952de18bc"
"checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf"
@ -794,7 +794,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum gcc 0.3.38 (registry+https://github.com/rust-lang/crates.io-index)" = "553f11439bdefe755bf366b264820f1da70f3aaf3924e594b886beb9c831bcf5"
"checksum heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8c80e194758495a9109566134dc06e42ea0423987d6ceca016edaa90381b3549"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "49247ec2a285bb3dcb23cbd9c35193c025e7251bfce77c1d5da97e6362dffe7f"
"checksum lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6abe0ee2e758cd6bc8a2cd56726359007748fbf4128da998b65d0b70f881e19b"
"checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b"
"checksum libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)" = "044d1360593a78f5c8e5e710beccdc24ab71d1f01bc19a29bcacdba22e8475d8"
"checksum linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6d262045c5b87c0861b3f004610afd0e2c851e2908d08b6c870cbb9d5f494ecd"
@ -820,7 +820,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum rocksdb 0.4.5 (git+https://github.com/ethcore/rust-rocksdb)" = "<none>"
"checksum rocksdb-sys 0.3.0 (git+https://github.com/ethcore/rust-rocksdb)" = "<none>"
"checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a"
"checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b"
"checksum rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)" = "bff9fc1c79f2dec76b253273d07682e94a978bd8f132ded071188122b2af9818"
"checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084"
"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
"checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac"

View File

@ -26,7 +26,7 @@ pub fn merkle_root(hashes: &[H256]) -> H256 {
// duplicate the last element if len is not even
if hashes.len() % 2 == 1 {
let last = &hashes[hashes.len() - 1];
row.push(dhash256(&*concat(&last, last)));
row.push(dhash256(&*concat(last, last)));
}
merkle_root(&row)

View File

@ -377,7 +377,7 @@ impl Storage {
/// all transaction meta is removed
/// DOES NOT update best block
fn decanonize_block(&self, context: &mut UpdateContext, hash: &H256) -> Result<(), Error> {
trace!(target: "reorg", "Decanonizing block {}", hash);
trace!(target: "reorg", "Decanonizing block {}", hash.to_reversed_str());
// ensure that block is of the main chain
try!(self.block_number(hash).ok_or(Error::not_main(hash)));
@ -645,19 +645,34 @@ impl Store for Storage {
Err(Error::Consistency(consistency_error)) => {
match consistency_error {
ConsistencyError::DoubleSpend(hash) => {
warn!(target: "reorg", "Failed to reorganize to {} due to double-spend at {}", &block_hash, &hash);
warn!(
target: "reorg",
"Failed to reorganize to {} due to double-spend at {}",
block_hash.to_reversed_str(),
hash.to_reversed_str()
);
// return without any commit
return Err(Error::reorganize(&hash));
},
ConsistencyError::UnknownSpending(hash) => {
warn!(target: "reorg", "Failed to reorganize to {} due to spending unknown transaction {}", &block_hash, &hash);
warn!(
target: "reorg",
"Failed to reorganize to {} due to spending unknown transaction {}",
block_hash.to_reversed_str(),
hash.to_reversed_str()
);
// return without any commit
return Err(Error::reorganize(&hash));
},
ConsistencyError::Unknown(hash) => {
// this is orphan block inserted or disconnected chain head updated, we allow that (by now)
// so it is no-op
warn!(target: "reorg", "Disconnected chain head {} updated with {}", &hash, &block_hash);
warn!(
target: "reorg",
"Disconnected chain head {} updated with {}",
hash.to_reversed_str(),
block_hash.to_reversed_str()
);
},
_ => {
// we don't allow other errors on side chain/orphans

View File

@ -3,6 +3,9 @@ use super::Magic;
#[derive(Debug, Clone)]
/// Parameters that influence chain consensus.
pub struct ConsensusParams {
/// Time when BIP16 becomes active.
/// See https://github.com/bitcoin/bips/blob/master/bip-0016.mediawiki
pub bip16_time: u32,
/// Block height at which BIP65 becomes active.
/// See https://github.com/bitcoin/bips/blob/master/bip-0065.mediawiki
pub bip65_height: u32,
@ -12,12 +15,15 @@ impl ConsensusParams {
pub fn with_magic(magic: Magic) -> Self {
match magic {
Magic::Mainnet => ConsensusParams {
bip65_height: 388381, // 000000000000000004c2b624ed5d7756c508d90fd0da2c7c679febfa6c4735f0
bip16_time: 1333238400, // Apr 1 2012
bip65_height: 388381, // 000000000000000004c2b624ed5d7756c508d90fd0da2c7c679febfa6c4735f0
},
Magic::Testnet => ConsensusParams {
bip65_height: 581885, // 00000000007f6655f22f98e72ed80d8b06dc761d5da09df0fa1dc4be4f861eb6
bip16_time: 1333238400, // Apr 1 2012
bip65_height: 581885, // 00000000007f6655f22f98e72ed80d8b06dc761d5da09df0fa1dc4be4f861eb6
},
Magic::Regtest => ConsensusParams {
bip16_time: 1333238400, // Apr 1 2012
bip65_height: 1351,
},
}

View File

@ -33,3 +33,9 @@ impl<T> AsRef<[u8]> for Message<T> {
self.bytes.as_ref()
}
}
impl<T> From<Message<T>> for Bytes {
fn from(m: Message<T>) -> Self {
m.bytes.into_raw()
}
}

View File

@ -7,7 +7,7 @@ pub struct GetAddr;
impl Payload for GetAddr {
fn version() -> u32 {
60002
0
}
fn command() -> &'static str {

View File

@ -6,10 +6,6 @@ use net::Config as NetConfig;
pub struct Config {
/// Number of threads used by p2p thread pool.
pub threads: usize,
/// Lowest supported protocol version.
pub protocol_minimum: u32,
/// Highest supported protocol version.
pub protocol_maximum: u32,
/// Number of inbound connections.
pub inbound_connections: u32,
/// Number of outbound connections.
@ -20,6 +16,6 @@ pub struct Config {
pub peers: Vec<SocketAddr>,
/// Connect to these nodes to retrieve peer addresses, and disconnect.
pub seeds: Vec<String>,
/// p2p module cache directory.
/// p2p/nodes.csv file path
pub node_table_path: PathBuf,
}

View File

@ -1,19 +1,20 @@
use std::{io, cmp};
use futures::{Future, Poll, Async};
use message::{Message, MessageResult};
use message::{Message, MessageResult, Error};
use message::types::{Version, Verack};
use message::common::Magic;
use io::{write_message, WriteMessage, ReadMessage, read_message};
pub fn handshake<A>(a: A, magic: Magic, version: Version) -> Handshake<A> where A: io::Write + io::Read {
pub fn handshake<A>(a: A, magic: Magic, version: Version, min_version: u32) -> Handshake<A> where A: io::Write + io::Read {
Handshake {
version: version.version(),
state: HandshakeState::SendVersion(write_message(a, version_message(magic, version))),
magic: magic,
min_version: min_version,
}
}
pub fn accept_handshake<A>(a: A, magic: Magic, version: Version) -> AcceptHandshake<A> where A: io::Write + io::Read {
pub fn accept_handshake<A>(a: A, magic: Magic, version: Version, min_version: u32) -> AcceptHandshake<A> where A: io::Write + io::Read {
AcceptHandshake {
version: version.version(),
state: AcceptHandshakeState::ReceiveVersion {
@ -21,6 +22,7 @@ pub fn accept_handshake<A>(a: A, magic: Magic, version: Version) -> AcceptHandsh
future: read_message(a, magic, 0),
},
magic: magic,
min_version: min_version,
}
}
@ -72,12 +74,14 @@ pub struct Handshake<A> {
state: HandshakeState<A>,
magic: Magic,
version: u32,
min_version: u32,
}
pub struct AcceptHandshake<A> {
state: AcceptHandshakeState<A>,
magic: Magic,
version: u32,
min_version: u32,
}
impl<A> Future for Handshake<A> where A: io::Read + io::Write {
@ -97,6 +101,10 @@ impl<A> Future for Handshake<A> where A: io::Read + io::Write {
Err(err) => return Ok((stream, Err(err.into())).into()),
};
if version.version() < self.min_version {
return Ok((stream, Err(Error::InvalidVersion)).into());
}
let next = HandshakeState::ReceiveVerack {
version: Some(version),
future: read_message(stream, self.magic, 0),
@ -140,6 +148,10 @@ impl<A> Future for AcceptHandshake<A> where A: io::Read + io::Write {
Err(err) => return Ok((stream, Err(err.into())).into()),
};
if version.version() < self.min_version {
return Ok((stream, Err(Error::InvalidVersion)).into());
}
let local_version = local_version.take().expect("local version must be set");
let next = AcceptHandshakeState::SendVersion {
version: Some(version),
@ -276,7 +288,7 @@ mod tests {
write: Bytes::default(),
};
let hs = handshake(test_io, magic, local_version).wait().unwrap();
let hs = handshake(test_io, magic, local_version, 0).wait().unwrap();
assert_eq!(hs.0.write, expected_stream.out());
assert_eq!(hs.1.unwrap(), expected);
}
@ -305,7 +317,7 @@ mod tests {
expected_stream.append_slice(Message::new(magic, version, &local_version).unwrap().as_ref());
expected_stream.append_slice(Message::new(magic, version, &Verack).unwrap().as_ref());
let hs = accept_handshake(test_io, magic, local_version).wait().unwrap();
let hs = accept_handshake(test_io, magic, local_version, 0).wait().unwrap();
assert_eq!(hs.0.write, expected_stream.out());
assert_eq!(hs.1.unwrap(), expected);
}

View File

@ -25,9 +25,6 @@ mod config;
mod event_loop;
mod p2p;
pub const VERSION: u32 = 70_014;
pub const USER_AGENT: &'static str = "pbtc";
pub use primitives::{hash, bytes};
pub use config::Config;

View File

@ -9,7 +9,7 @@ use net::{Config, Connection};
pub fn accept_connection(stream: TcpStream, handle: &Handle, config: &Config, address: net::SocketAddr) -> Deadline<AcceptConnection> {
let accept = AcceptConnection {
handshake: accept_handshake(stream, config.magic, config.version(&address)),
handshake: accept_handshake(stream, config.magic, config.version(&address), config.protocol_minimum),
magic: config.magic,
address: address,
};

View File

@ -1,6 +1,6 @@
use message::{Payload, Message};
use tokio_core::io::{write_all, WriteAll};
use session::Session;
use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage};
use io::{SharedTcpStream, read_any_message, ReadAnyMessage};
use util::PeerInfo;
pub struct Channel {
@ -18,10 +18,8 @@ impl Channel {
}
}
pub fn write_message<T>(&self, payload: &T) -> WriteMessage<T, SharedTcpStream> where T: Payload {
// TODO: some tracing here
let message = Message::new(self.peer_info.magic, self.peer_info.version, payload).expect("failed to create outgoing message");
write_message(self.stream.clone(), message)
pub fn write_message<T>(&self, message: T) -> WriteAll<SharedTcpStream, T> where T: AsRef<[u8]> {
write_all(self.stream.clone(), message)
}
pub fn read_message(&self) -> ReadAnyMessage<SharedTcpStream> {

View File

@ -3,10 +3,11 @@ use message::common::{Magic, Services, NetAddress};
use message::types::version::{Version, V0, V106, V70001};
use util::time::{Time, RealTime};
use util::nonce::{NonceGenerator, RandomNonce};
use VERSION;
#[derive(Debug, Clone)]
pub struct Config {
pub protocol_version: u32,
pub protocol_minimum: u32,
pub magic: Magic,
pub local_address: SocketAddr,
pub services: Services,
@ -18,7 +19,7 @@ pub struct Config {
impl Config {
pub fn version(&self, to: &SocketAddr) -> Version {
Version::V70001(V0 {
version: VERSION,
version: self.protocol_version,
services: self.services,
timestamp: RealTime.get().sec,
receiver: NetAddress {

View File

@ -18,6 +18,7 @@ pub fn connect(address: &SocketAddr, handle: &Handle, config: &Config) -> Deadli
},
magic: config.magic,
address: *address,
protocol_minimum: config.protocol_minimum,
};
deadline(Duration::new(5, 0), handle, connect).expect("Failed to create timeout")
@ -36,6 +37,7 @@ pub struct Connect {
state: ConnectState,
magic: Magic,
address: SocketAddr,
protocol_minimum: u32,
}
impl Future for Connect {
@ -47,7 +49,7 @@ impl Future for Connect {
ConnectState::TcpConnect { ref mut future, ref mut version } => {
let stream = try_ready!(future.poll());
let version = version.take().expect("state TcpConnect must have version");
let handshake = handshake(stream, self.magic, version);
let handshake = handshake(stream, self.magic, version, self.protocol_minimum);
(ConnectState::Handshake(handshake), Async::NotReady)
},
ConnectState::Handshake(ref mut future) => {

View File

@ -9,6 +9,8 @@ use session::{SessionFactory};
use util::{Direction, PeerInfo};
use PeerId;
const SYNCHRONOUS_RESPONSES: bool = true;
#[derive(Default)]
pub struct Connections {
/// Incremental peer counter.
@ -51,7 +53,7 @@ impl Connections {
magic: connection.magic,
};
let session = T::new_session(context, peer_info.clone());
let session = T::new_session(context, peer_info.clone(), SYNCHRONOUS_RESPONSES);
let channel = Arc::new(Channel::new(connection.stream, peer_info, session));
self.channels.write().insert(id, channel.clone());
channel

View File

@ -5,6 +5,7 @@ mod connect;
mod connection;
mod connection_counter;
mod connections;
mod peer_context;
pub use self::accept_connection::{AcceptConnection, accept_connection};
pub use self::channel::Channel;
@ -13,3 +14,4 @@ pub use self::connect::{Connect, connect};
pub use self::connection::Connection;
pub use self::connection_counter::ConnectionCounter;
pub use self::connections::Connections;
pub use self::peer_context::PeerContext;

114
p2p/src/net/peer_context.rs Normal file
View File

@ -0,0 +1,114 @@
use std::sync::Arc;
use parking_lot::Mutex;
use message::{Payload, Message};
use p2p::Context;
use util::{PeerInfo, ConfigurableSynchronizer, ResponseQueue, Synchronizer, Responses};
pub struct PeerContext {
context: Arc<Context>,
info: PeerInfo,
synchronizer: Mutex<ConfigurableSynchronizer>,
response_queue: Mutex<ResponseQueue>,
}
impl PeerContext {
pub fn new(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Self {
PeerContext {
context: context,
info: info,
synchronizer: Mutex::new(ConfigurableSynchronizer::new(synchronous)),
response_queue: Mutex::default(),
}
}
fn to_message<T>(&self, payload: &T) -> Message<T> where T: Payload {
Message::new(self.info.magic, self.info.version, payload).expect("failed to create outgoing message")
}
fn send_awaiting(&self, sync: &mut ConfigurableSynchronizer, queue: &mut ResponseQueue, start_id: u32) {
let mut next_id = start_id;
loop {
next_id = next_id.overflowing_add(1).0;
match queue.responses(next_id) {
Some(Responses::Finished(messages)) => {
assert!(sync.permission_for_response(next_id));
for message in messages {
let send = Context::send_message_to_peer(self.context.clone(), self.info.id, message);
self.context.spawn(send);
}
},
Some(Responses::Ignored) => {
assert!(sync.permission_for_response(next_id));
},
Some(Responses::Unfinished(messages)) => {
assert!(sync.is_permitted(next_id));
for message in messages {
let send = Context::send_message_to_peer(self.context.clone(), self.info.id, message);
self.context.spawn(send);
}
break;
},
None => {
break;
}
}
}
}
/// Request is always automatically send.
pub fn send_request<T>(&self, payload: &T) where T: Payload {
let send = Context::send_to_peer(self.context.clone(), self.info.id, payload);
self.context.spawn(send);
}
pub fn declare_response(&self) -> u32 {
let d = self.synchronizer.lock().declare_response();
trace!("declared response: {}", d);
d
}
pub fn send_response_inline<T>(&self, payload: &T) where T: Payload {
let id = self.declare_response();
self.send_response(payload, id, true);
}
/// Do not wait for response with given id.
pub fn ignore_response(&self, id: u32) {
let mut sync = self.synchronizer.lock();
let mut queue = self.response_queue.lock();
if sync.permission_for_response(id) {
self.send_awaiting(&mut sync, &mut queue, id);
} else {
queue.push_ignored_response(id);
}
}
/// Responses are sent in order defined by synchronizer.
pub fn send_response<T>(&self, payload: &T, id: u32, is_final: bool) where T: Payload {
trace!("response ready: {}, id: {}, final: {}", T::command(), id, is_final);
let mut sync = self.synchronizer.lock();
let mut queue = self.response_queue.lock();
if is_final {
if sync.permission_for_response(id) {
let send = Context::send_to_peer(self.context.clone(), self.info.id, payload);
self.context.spawn(send);
self.send_awaiting(&mut sync, &mut queue, id);
} else {
queue.push_finished_response(id, self.to_message(payload).into());
}
} else if sync.is_permitted(id) {
let send = Context::send_to_peer(self.context.clone(), self.info.id, payload);
self.context.spawn(send);
} else {
queue.push_unfinished_response(id, self.to_message(payload).into());
}
}
pub fn info(&self) -> &PeerInfo {
&self.info
}
pub fn global(&self) -> &Arc<Context> {
&self.context
}
}

View File

@ -9,7 +9,7 @@ use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::{Handle, Remote, Timeout, Interval};
use abstract_ns::Resolver;
use ns_dns_tokio::DnsResolver;
use message::{Payload, MessageResult};
use message::{Payload, MessageResult, Message};
use message::common::Services;
use net::{connect, Connections, Channel, Config as NetConfig, accept_connection, ConnectionCounter};
use util::{NodeTable, Node, Direction};
@ -293,7 +293,22 @@ impl Context {
/// Send message to a channel with given peer id.
pub fn send_to_peer<T>(context: Arc<Context>, peer: PeerId, payload: &T) -> IoFuture<()> where T: Payload {
match context.connections.channel(peer) {
Some(channel) => Context::send(context, channel, payload),
Some(channel) => {
let info = channel.peer_info();
let message = Message::new(info.magic, info.version, payload).expect("failed to create outgoing message");
Context::send(context, channel, message)
},
None => {
// peer no longer exists.
// TODO: should we return error here?
finished(()).boxed()
}
}
}
pub fn send_message_to_peer<T>(context: Arc<Context>, peer: PeerId, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static {
match context.connections.channel(peer) {
Some(channel) => Context::send(context, channel, message),
None => {
// peer no longer exists.
// TODO: should we return error here?
@ -303,13 +318,13 @@ impl Context {
}
/// Send message using given channel.
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, payload: &T) -> IoFuture<()> where T: Payload {
trace!("Sending {} message to {}", T::command(), channel.peer_info().address);
channel.write_message(payload).then(move |result| {
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static {
//trace!("Sending {} message to {}", T::command(), channel.peer_info().address);
channel.write_message(message).then(move |result| {
match result {
Ok(_) => {
// successful send
trace!("Sent {} message to {}", T::command(), channel.peer_info().address);
//trace!("Sent {} message to {}", T::command(), channel.peer_info().address);
finished(()).boxed()
},
Err(err) => {

View File

@ -4,30 +4,26 @@ use bytes::Bytes;
use message::{Error, Command, deserialize_payload, Payload};
use message::types::{GetAddr, Addr};
use protocol::Protocol;
use p2p::Context;
use util::{Direction, PeerInfo};
use net::PeerContext;
use util::Direction;
pub struct AddrProtocol {
/// Context
context: Arc<Context>,
/// Connected peer info.
info: PeerInfo,
context: Arc<PeerContext>,
}
impl AddrProtocol {
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
pub fn new(context: Arc<PeerContext>) -> Self {
AddrProtocol {
context: context,
info: info,
}
}
}
impl Protocol for AddrProtocol {
fn initialize(&mut self) {
if let Direction::Outbound = self.info.direction {
let send = Context::send_to_peer(self.context.clone(), self.info.id, &GetAddr);
self.context.spawn(send);
if let Direction::Outbound = self.context.info().direction {
self.context.send_request(&GetAddr);
}
}
@ -35,20 +31,19 @@ impl Protocol for AddrProtocol {
// normal nodes send addr message only after they receive getaddr message
// meanwhile seednodes, surprisingly, send addr message even before they are asked for it
if command == &GetAddr::command() {
let _: GetAddr = try!(deserialize_payload(payload, self.info.version));
let entries = self.context.node_table_entries().into_iter().map(Into::into).collect();
let _: GetAddr = try!(deserialize_payload(payload, self.context.info().version));
let entries = self.context.global().node_table_entries().into_iter().map(Into::into).collect();
let addr = Addr::new(entries);
let send = Context::send_to_peer(self.context.clone(), self.info.id, &addr);
self.context.spawn(send);
self.context.send_response_inline(&addr);
} else if command == &Addr::command() {
let addr: Addr = try!(deserialize_payload(payload, self.info.version));
let addr: Addr = try!(deserialize_payload(payload, self.context.info().version));
match addr {
Addr::V0(_) => {
unreachable!("This version of protocol is not supported!");
},
Addr::V31402(addr) => {
let nodes = addr.addresses.into_iter().map(Into::into).collect();
self.context.update_node_table(nodes);
self.context.global().update_node_table(nodes);
},
}
}
@ -58,18 +53,15 @@ impl Protocol for AddrProtocol {
pub struct SeednodeProtocol {
/// Context
context: Arc<Context>,
/// Connected peer info,
info: PeerInfo,
context: Arc<PeerContext>,
/// Indicates if disconnecting has been scheduled.
disconnecting: bool,
}
impl SeednodeProtocol {
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
pub fn new(context: Arc<PeerContext>) -> Self {
SeednodeProtocol {
context: context,
info: info,
disconnecting: false,
}
}
@ -81,9 +73,9 @@ impl Protocol for SeednodeProtocol {
// We can't disconenct after first read. Let's delay it by 60 seconds.
if !self.disconnecting && command == &Addr::command() {
self.disconnecting = true;
let context = self.context.clone();
let peer = self.info.id;
self.context.execute_after(Duration::new(60, 0), move || {
let context = self.context.global().clone();
let peer = self.context.info().id;
self.context.global().execute_after(Duration::new(60, 0), move || {
context.close_channel(peer);
});
}

View File

@ -4,26 +4,12 @@ use message::{Error, Payload, deserialize_payload};
use message::types::{Ping, Pong};
use message::common::Command;
use protocol::Protocol;
use util::{PeerId, PeerInfo};
use net::PeerContext;
use util::nonce::{NonceGenerator, RandomNonce};
use p2p::Context;
pub trait PingContext: Send + Sync {
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where Self: Sized, T: Payload;
}
impl PingContext for Context {
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where T: Payload {
let send = Context::send_to_peer(context.clone(), peer, payload);
context.spawn(send);
}
}
pub struct PingProtocol<T = RandomNonce, C = Context> {
pub struct PingProtocol<T = RandomNonce, C = PeerContext> {
/// Context
context: Arc<C>,
/// Connected peer info.
info: PeerInfo,
/// Nonce generator.
nonce_generator: T,
/// Last nonce sent in the ping message.
@ -31,32 +17,31 @@ pub struct PingProtocol<T = RandomNonce, C = Context> {
}
impl PingProtocol {
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
pub fn new(context: Arc<PeerContext>) -> Self {
PingProtocol {
context: context,
info: info,
nonce_generator: RandomNonce::default(),
last_ping_nonce: None,
}
}
}
impl<T, C> Protocol for PingProtocol<T, C> where T: NonceGenerator + Send, C: PingContext {
impl Protocol for PingProtocol {
fn initialize(&mut self) {
// bitcoind always sends ping, let's do the same
let nonce = self.nonce_generator.get();
self.last_ping_nonce = Some(nonce);
let ping = Ping::new(nonce);
PingContext::send_to_peer(self.context.clone(), self.info.id, &ping);
self.context.send_request(&ping);
}
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {
if command == &Ping::command() {
let ping: Ping = try!(deserialize_payload(payload, self.info.version));
let ping: Ping = try!(deserialize_payload(payload, self.context.info().version));
let pong = Pong::new(ping.nonce);
PingContext::send_to_peer(self.context.clone(), self.info.id, &pong);
self.context.send_response_inline(&pong);
} else if command == &Pong::command() {
let pong: Pong = try!(deserialize_payload(payload, self.info.version));
let pong: Pong = try!(deserialize_payload(payload, self.context.info().version));
if Some(pong.nonce) != self.last_ping_nonce.take() {
return Err(Error::InvalidCommand)
}
@ -65,111 +50,3 @@ impl<T, C> Protocol for PingProtocol<T, C> where T: NonceGenerator + Send, C: Pi
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use parking_lot::Mutex;
use bytes::Bytes;
use message::{Payload, serialize_payload, Magic};
use message::types::{Ping, Pong};
use util::{PeerId, PeerInfo, Direction};
use util::nonce::StaticNonce;
use protocol::Protocol;
use super::{PingProtocol, PingContext};
#[derive(Default)]
struct TestPingContext {
version: u32,
messages: Mutex<Vec<(PeerId, Bytes)>>,
}
impl PingContext for TestPingContext {
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where T: Payload {
let value = (peer, serialize_payload(payload, context.version).unwrap());
context.messages.lock().push(value);
}
}
#[test]
fn test_ping_init() {
let ping_context = Arc::new(TestPingContext::default());
let peer = 99;
let nonce = 1000;
let expected_message = serialize_payload(&Ping::new(nonce), 0).unwrap();
let mut ping_protocol = PingProtocol {
context: ping_context.clone(),
info: PeerInfo {
id: peer,
address: "0.0.0.0:8080".parse().unwrap(),
direction: Direction::Inbound,
version: 0,
magic: Magic::Testnet,
},
nonce_generator: StaticNonce::new(nonce),
last_ping_nonce: None,
};
ping_protocol.initialize();
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].0, peer);
assert_eq!(messages[0].1, expected_message);
assert_eq!(ping_protocol.last_ping_nonce, Some(nonce));
}
#[test]
fn test_ping_on_message_ping() {
let ping_context = Arc::new(TestPingContext::default());
let peer = 99;
let nonce = 1000;
let command = "ping".into();
let message = serialize_payload(&Ping::new(nonce), 0).unwrap();
let expected_message = serialize_payload(&Pong::new(nonce), 0).unwrap();
let mut ping_protocol = PingProtocol {
context: ping_context.clone(),
info: PeerInfo {
id: peer,
address: "0.0.0.0:8080".parse().unwrap(),
direction: Direction::Inbound,
version: 0,
magic: Magic::Testnet,
},
nonce_generator: StaticNonce::new(nonce),
last_ping_nonce: None,
};
assert!(ping_protocol.on_message(&command, &message).is_ok());
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].0, peer);
assert_eq!(messages[0].1, expected_message);
assert_eq!(ping_protocol.last_ping_nonce, None);
}
#[test]
fn test_ping_on_message_pong() {
let ping_context = Arc::new(TestPingContext::default());
let peer = 99;
let nonce = 1000;
let command = "pong".into();
let message = serialize_payload(&Pong::new(nonce), 0).unwrap();
let mut ping_protocol = PingProtocol {
context: ping_context.clone(),
info: PeerInfo {
id: peer,
address: "0.0.0.0:8080".parse().unwrap(),
direction: Direction::Inbound,
version: 0,
magic: Magic::Testnet,
},
nonce_generator: StaticNonce::new(nonce),
last_ping_nonce: Some(nonce),
};
assert!(ping_protocol.on_message(&command, &message).is_ok());
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
assert_eq!(messages.len(), 0);
assert_eq!(ping_protocol.last_ping_nonce, None);
}
}

View File

@ -2,8 +2,7 @@ use std::sync::Arc;
use bytes::Bytes;
use message::{Command, Error, Payload, types, deserialize_payload};
use protocol::Protocol;
use util::{PeerInfo, PeerId};
use p2p::Context;
use net::PeerContext;
pub type InboundSyncConnectionRef = Box<InboundSyncConnection>;
pub type OutboundSyncConnectionRef = Box<OutboundSyncConnection>;
@ -19,13 +18,13 @@ pub trait InboundSyncConnection : Send + Sync {
fn start_sync_session(&self, version: u32);
fn close_session(&self);
fn on_inventory(&self, message: types::Inv);
fn on_getdata(&self, message: types::GetData);
fn on_getblocks(&self, message: types::GetBlocks);
fn on_getheaders(&self, message: types::GetHeaders);
fn on_getdata(&self, message: types::GetData, id: u32);
fn on_getblocks(&self, message: types::GetBlocks, id: u32);
fn on_getheaders(&self, message: types::GetHeaders, id: u32);
fn on_transaction(&self, message: types::Tx);
fn on_block(&self, message: types::Block);
fn on_headers(&self, message: types::Headers);
fn on_mempool(&self, message: types::MemPool);
fn on_mempool(&self, message: types::MemPool, id: u32);
fn on_filterload(&self, message: types::FilterLoad);
fn on_filteradd(&self, message: types::FilterAdd);
fn on_filterclear(&self, message: types::FilterClear);
@ -40,14 +39,14 @@ pub trait InboundSyncConnection : Send + Sync {
}
pub trait OutboundSyncConnection : Send + Sync {
fn send_inventory(&self, message: &types::Inv);
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool);
fn send_getdata(&self, message: &types::GetData);
fn send_getblocks(&self, message: &types::GetBlocks);
fn send_getheaders(&self, message: &types::GetHeaders);
fn send_transaction(&self, message: &types::Tx);
fn send_block(&self, message: &types::Block);
fn send_headers(&self, message: &types::Headers);
fn send_mempool(&self, message: &types::MemPool);
fn send_block(&self, message: &types::Block, id: u32, is_final: bool);
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool);
fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool);
fn send_filterload(&self, message: &types::FilterLoad);
fn send_filteradd(&self, message: &types::FilterAdd);
fn send_filterclear(&self, message: &types::FilterClear);
@ -58,206 +57,213 @@ pub trait OutboundSyncConnection : Send + Sync {
fn send_compact_block(&self, message: &types::CompactBlock);
fn send_get_block_txn(&self, message: &types::GetBlockTxn);
fn send_block_txn(&self, message: &types::BlockTxn);
fn send_notfound(&self, message: &types::NotFound);
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool);
fn ignored(&self, id: u32);
}
struct OutboundSync {
context: Arc<Context>,
peer: PeerId,
context: Arc<PeerContext>,
}
impl OutboundSync {
pub fn new(context: Arc<Context>, peer: PeerId) -> OutboundSync {
pub fn new(context: Arc<PeerContext>) -> OutboundSync {
OutboundSync {
context: context,
peer: peer,
}
}
pub fn send_message<T>(&self, message: &T) where T: Payload {
let send = Context::send_to_peer(self.context.clone(), self.peer, message);
self.context.spawn(send);
}
pub fn boxed(self) -> Box<OutboundSyncConnection> {
Box::new(self)
}
}
impl OutboundSyncConnection for OutboundSync {
fn send_inventory(&self, message: &types::Inv) {
self.send_message(message);
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
}
fn send_getdata(&self, message: &types::GetData) {
self.send_message(message);
self.context.send_request(message);
}
fn send_getblocks(&self, message: &types::GetBlocks) {
self.send_message(message);
self.context.send_request(message);
}
fn send_getheaders(&self, message: &types::GetHeaders) {
self.send_message(message);
self.context.send_request(message);
}
fn send_transaction(&self, message: &types::Tx) {
self.send_message(message);
self.context.send_request(message);
}
fn send_block(&self, message: &types::Block) {
self.send_message(message);
fn send_block(&self, message: &types::Block, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
}
fn send_headers(&self, message: &types::Headers) {
self.send_message(message);
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
}
fn send_mempool(&self, message: &types::MemPool) {
self.send_message(message);
fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
}
fn send_filterload(&self, message: &types::FilterLoad) {
self.send_message(message);
self.context.send_request(message);
}
fn send_filteradd(&self, message: &types::FilterAdd) {
self.send_message(message);
self.context.send_request(message);
}
fn send_filterclear(&self, message: &types::FilterClear) {
self.send_message(message);
self.context.send_request(message);
}
fn send_merkleblock(&self, message: &types::MerkleBlock) {
self.send_message(message);
self.context.send_request(message);
}
fn send_sendheaders(&self, message: &types::SendHeaders) {
self.send_message(message);
self.context.send_request(message);
}
fn send_feefilter(&self, message: &types::FeeFilter) {
self.send_message(message);
self.context.send_request(message);
}
fn send_send_compact(&self, message: &types::SendCompact) {
self.send_message(message);
self.context.send_request(message);
}
fn send_compact_block(&self, message: &types::CompactBlock) {
self.send_message(message);
self.context.send_request(message);
}
fn send_get_block_txn(&self, message: &types::GetBlockTxn) {
self.send_message(message);
self.context.send_request(message);
}
fn send_block_txn(&self, message: &types::BlockTxn) {
self.send_message(message);
self.context.send_request(message);
}
fn send_notfound(&self, message: &types::NotFound) {
self.send_message(message);
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
}
fn ignored(&self, id: u32) {
self.context.ignore_response(id);
}
}
pub struct SyncProtocol {
inbound_connection: InboundSyncConnectionRef,
info: PeerInfo,
context: Arc<PeerContext>,
}
impl SyncProtocol {
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
let outbound_connection = OutboundSync::new(context.clone(), info.id).boxed();
let inbound_connection = context.create_sync_session(0, outbound_connection);
pub fn new(context: Arc<PeerContext>) -> Self {
let outbound_connection = OutboundSync::new(context.clone()).boxed();
let inbound_connection = context.global().create_sync_session(0, outbound_connection);
SyncProtocol {
inbound_connection: inbound_connection,
info: info,
context: context,
}
}
}
impl Protocol for SyncProtocol {
fn initialize(&mut self) {
self.inbound_connection.start_sync_session(self.info.version);
self.inbound_connection.start_sync_session(self.context.info().version);
}
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {
let version = self.context.info().version;
if command == &types::Inv::command() {
let message: types::Inv = try!(deserialize_payload(payload, self.info.version));
let message: types::Inv = try!(deserialize_payload(payload, version));
self.inbound_connection.on_inventory(message);
}
else if command == &types::GetData::command() {
let message: types::GetData = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getdata(message);
let message: types::GetData = try!(deserialize_payload(payload, version));
let id = self.context.declare_response();
trace!("declared response {} for request: {}", id, types::GetData::command());
self.inbound_connection.on_getdata(message, id);
}
else if command == &types::GetBlocks::command() {
let message: types::GetBlocks = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getblocks(message);
let message: types::GetBlocks = try!(deserialize_payload(payload, version));
let id = self.context.declare_response();
trace!("declared response {} for request: {}", id, types::GetBlocks::command());
self.inbound_connection.on_getblocks(message, id);
}
else if command == &types::GetHeaders::command() {
let message: types::GetHeaders = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getheaders(message);
let message: types::GetHeaders = try!(deserialize_payload(payload, version));
let id = self.context.declare_response();
trace!("declared response {} for request: {}", id, types::GetHeaders::command());
self.inbound_connection.on_getheaders(message, id);
}
else if command == &types::Tx::command() {
let message: types::Tx = try!(deserialize_payload(payload, self.info.version));
let message: types::Tx = try!(deserialize_payload(payload, version));
self.inbound_connection.on_transaction(message);
}
else if command == &types::Block::command() {
let message: types::Block = try!(deserialize_payload(payload, self.info.version));
let message: types::Block = try!(deserialize_payload(payload, version));
self.inbound_connection.on_block(message);
}
else if command == &types::MemPool::command() {
let message: types::MemPool = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_mempool(message);
let message: types::MemPool = try!(deserialize_payload(payload, version));
let id = self.context.declare_response();
trace!("declared response {} for request: {}", id, types::MemPool::command());
self.inbound_connection.on_mempool(message, id);
}
else if command == &types::Headers::command() {
let message: types::Headers = try!(deserialize_payload(payload, self.info.version));
let message: types::Headers = try!(deserialize_payload(payload, version));
self.inbound_connection.on_headers(message);
}
else if command == &types::FilterLoad::command() {
let message: types::FilterLoad = try!(deserialize_payload(payload, self.info.version));
let message: types::FilterLoad = try!(deserialize_payload(payload, version));
self.inbound_connection.on_filterload(message);
}
else if command == &types::FilterAdd::command() {
let message: types::FilterAdd = try!(deserialize_payload(payload, self.info.version));
let message: types::FilterAdd = try!(deserialize_payload(payload, version));
self.inbound_connection.on_filteradd(message);
}
else if command == &types::FilterClear::command() {
let message: types::FilterClear = try!(deserialize_payload(payload, self.info.version));
let message: types::FilterClear = try!(deserialize_payload(payload, version));
self.inbound_connection.on_filterclear(message);
}
else if command == &types::MerkleBlock::command() {
let message: types::MerkleBlock = try!(deserialize_payload(payload, self.info.version));
let message: types::MerkleBlock = try!(deserialize_payload(payload, version));
self.inbound_connection.on_merkleblock(message);
}
else if command == &types::SendHeaders::command() {
let message: types::SendHeaders = try!(deserialize_payload(payload, self.info.version));
let message: types::SendHeaders = try!(deserialize_payload(payload, version));
self.inbound_connection.on_sendheaders(message);
}
else if command == &types::FeeFilter::command() {
let message: types::FeeFilter = try!(deserialize_payload(payload, self.info.version));
let message: types::FeeFilter = try!(deserialize_payload(payload, version));
self.inbound_connection.on_feefilter(message);
}
else if command == &types::SendCompact::command() {
let message: types::SendCompact = try!(deserialize_payload(payload, self.info.version));
let message: types::SendCompact = try!(deserialize_payload(payload, version));
self.inbound_connection.on_send_compact(message);
}
else if command == &types::CompactBlock::command() {
let message: types::CompactBlock = try!(deserialize_payload(payload, self.info.version));
let message: types::CompactBlock = try!(deserialize_payload(payload, version));
self.inbound_connection.on_compact_block(message);
}
else if command == &types::GetBlockTxn::command() {
let message: types::GetBlockTxn = try!(deserialize_payload(payload, self.info.version));
let message: types::GetBlockTxn = try!(deserialize_payload(payload, version));
self.inbound_connection.on_get_block_txn(message);
}
else if command == &types::BlockTxn::command() {
let message: types::BlockTxn = try!(deserialize_payload(payload, self.info.version));
let message: types::BlockTxn = try!(deserialize_payload(payload, version));
self.inbound_connection.on_block_txn(message);
}
else if command == &types::NotFound::command() {
let message: types::NotFound = try!(deserialize_payload(payload, self.info.version));
let message: types::NotFound = try!(deserialize_payload(payload, version));
self.inbound_connection.on_notfound(message);
}
Ok(())

View File

@ -3,42 +3,47 @@ use parking_lot::Mutex;
use bytes::Bytes;
use message::{Command, Error};
use p2p::Context;
use net::PeerContext;
use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol};
use util::{PeerInfo};
use util::PeerInfo;
pub trait SessionFactory {
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session;
fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session;
}
pub struct SeednodeSessionFactory;
impl SessionFactory for SeednodeSessionFactory {
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session {
let ping = PingProtocol::new(context.clone(), info.clone()).boxed();
let addr = AddrProtocol::new(context.clone(), info.clone()).boxed();
let seed = SeednodeProtocol::new(context.clone(), info).boxed();
Session::new(vec![ping, addr, seed])
fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session {
let peer_context = Arc::new(PeerContext::new(context, info, synchronous));
let ping = PingProtocol::new(peer_context.clone()).boxed();
let addr = AddrProtocol::new(peer_context.clone()).boxed();
let seed = SeednodeProtocol::new(peer_context.clone()).boxed();
Session::new(peer_context, vec![ping, addr, seed])
}
}
pub struct NormalSessionFactory;
impl SessionFactory for NormalSessionFactory {
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session {
let ping = PingProtocol::new(context.clone(), info.clone()).boxed();
let addr = AddrProtocol::new(context.clone(), info.clone()).boxed();
let sync = SyncProtocol::new(context, info).boxed();
Session::new(vec![ping, addr, sync])
fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session {
let peer_context = Arc::new(PeerContext::new(context, info, synchronous));
let ping = PingProtocol::new(peer_context.clone()).boxed();
let addr = AddrProtocol::new(peer_context.clone()).boxed();
let sync = SyncProtocol::new(peer_context.clone()).boxed();
Session::new(peer_context, vec![ping, addr, sync])
}
}
pub struct Session {
_peer_context: Arc<PeerContext>,
protocols: Mutex<Vec<Box<Protocol>>>,
}
impl Session {
pub fn new(protocols: Vec<Box<Protocol>>) -> Self {
pub fn new(peer_context: Arc<PeerContext>, protocols: Vec<Box<Protocol>>) -> Self {
Session {
_peer_context: peer_context,
protocols: Mutex::new(protocols),
}
}

View File

@ -2,8 +2,10 @@ pub mod nonce;
pub mod time;
mod node_table;
mod peer;
mod response_queue;
mod synchronizer;
pub use self::node_table::{NodeTable, Node};
pub use self::peer::{PeerId, PeerInfo, Direction};
pub use self::response_queue::{ResponseQueue, Responses};
pub use self::synchronizer::{Synchronizer, ConfigurableSynchronizer};

View File

@ -0,0 +1,45 @@
use std::collections::{HashMap, HashSet};
use bytes::Bytes;
/// Queue of out-of-order responses. Each peer has it's own queue.
#[derive(Debug, Default)]
pub struct ResponseQueue {
unfinished: HashMap<u32, Vec<Bytes>>,
finished: HashMap<u32, Vec<Bytes>>,
ignored: HashSet<u32>,
}
pub enum Responses {
Unfinished(Vec<Bytes>),
Finished(Vec<Bytes>),
Ignored,
}
impl ResponseQueue {
pub fn push_unfinished_response(&mut self, id: u32, response: Bytes) {
self.unfinished.entry(id).or_insert_with(Vec::new).push(response)
}
pub fn push_finished_response(&mut self, id: u32, response: Bytes) {
let mut responses = self.unfinished.remove(&id).unwrap_or_default();
responses.push(response);
let previous = self.finished.insert(id, responses);
assert!(previous.is_none(), "logic error; same finished response should never be pushed twice");
}
pub fn push_ignored_response(&mut self, id: u32) {
assert!(self.ignored.insert(id), "logic error; same response should never be ignored twice");
}
pub fn responses(&mut self, id: u32) -> Option<Responses> {
self.unfinished.remove(&id).map(Responses::Unfinished)
.or_else(|| self.finished.remove(&id).map(Responses::Finished))
.or_else(|| {
if self.ignored.remove(&id) {
Some(Responses::Ignored)
} else {
None
}
})
}
}

View File

@ -11,6 +11,9 @@ pub trait Synchronizer: Send {
/// Declare sending response in future.
fn declare_response(&mut self) -> u32;
/// Returns true if permission for response is granted, but without marking response as sent.
fn is_permitted(&self, id: u32) -> bool;
/// Returns true if permission for sending response is granted.
fn permission_for_response(&mut self, id: u32) -> bool;
}
@ -29,6 +32,10 @@ impl Synchronizer for FifoSynchronizer {
result
}
fn is_permitted(&self, id: u32) -> bool {
id == self.next_to_grant
}
fn permission_for_response(&mut self, id: u32) -> bool {
// there should be an assertion here, assert!(id < self.declared_responses),
// but it's impossible to write an assertion if the value may overflow
@ -54,6 +61,10 @@ impl Synchronizer for NoopSynchronizer {
result
}
fn is_permitted(&self, _id: u32) -> bool {
true
}
fn permission_for_response(&mut self, _id: u32) -> bool {
true
}
@ -81,6 +92,16 @@ impl ThresholdSynchronizer {
to_grant_max: declared,
}
}
fn within_threshold(&self, id: u32) -> bool {
if self.to_grant_min <= self.to_grant_max {
// if max is bigger then min, id must be in range [min, max)
self.to_grant_min <= id && id < self.to_grant_max
} else {
// otherwise if is in range [min, u32::max_value()] || [0, max)
self.to_grant_min <= id || id < self.to_grant_max
}
}
}
impl Synchronizer for ThresholdSynchronizer {
@ -88,19 +109,12 @@ impl Synchronizer for ThresholdSynchronizer {
self.inner.declare_response()
}
fn permission_for_response(&mut self, id: u32) -> bool {
if self.inner.permission_for_response(id) {
return true;
}
fn is_permitted(&self, id: u32) -> bool {
self.inner.is_permitted(id) || self.within_threshold(id)
}
if self.to_grant_min <= self.to_grant_max {
// if max is bigger then min, id must be in range [min, max)
self.to_grant_min <= id && id < self.to_grant_max
} else {
// otherwise if is in range [min, u32::max_value()] || [0, max)
(self.to_grant_min <= id && id <= u32::max_value()) ||
id < self.to_grant_max
}
fn permission_for_response(&mut self, id: u32) -> bool {
self.inner.permission_for_response(id) || self.within_threshold(id)
}
}
@ -143,12 +157,12 @@ impl ConfigurableSynchronizer {
/// from last_processed response will still be granted permissions.
pub fn change_sync_policy(&mut self, sync: bool) {
let new_inner = match self.inner {
InnerSynchronizer::Threshold(ref s) if sync == false => {
InnerSynchronizer::Threshold(ref s) if !sync => {
InnerSynchronizer::Noop(NoopSynchronizer {
declared_responses: s.inner.declared_responses,
})
},
InnerSynchronizer::Noop(ref s) if sync == true => {
InnerSynchronizer::Noop(ref s) if sync => {
let threshold = ThresholdSynchronizer::new(
s.declared_responses,
CONFIGURABLE_SYNCHRONIZER_THRESHOLD,
@ -170,6 +184,13 @@ impl Synchronizer for ConfigurableSynchronizer {
}
}
fn is_permitted(&self, id: u32) -> bool {
match self.inner {
InnerSynchronizer::Noop(ref s) => s.is_permitted(id),
InnerSynchronizer::Threshold(ref s) => s.is_permitted(id),
}
}
fn permission_for_response(&mut self, id: u32) -> bool {
match self.inner {
InnerSynchronizer::Threshold(ref mut s) => s.permission_for_response(id),

View File

@ -2,7 +2,7 @@ use std::net::SocketAddr;
use sync::create_sync_connection_factory;
use message::Services;
use util::{open_db, init_db, node_table_path};
use {config, p2p};
use {config, p2p, PROTOCOL_VERSION, PROTOCOL_MINIMUM, USER_AGENT};
pub fn start(cfg: config::Config) -> Result<(), String> {
let mut el = p2p::event_loop();
@ -11,16 +11,16 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
try!(init_db(&cfg, &db));
let p2p_cfg = p2p::Config {
threads: 4,
protocol_minimum: 70001,
protocol_maximum: 70017,
inbound_connections: 10,
outbound_connections: 10,
threads: cfg.p2p_threads,
inbound_connections: cfg.inbound_connections,
outbound_connections: cfg.outbound_connections,
connection: p2p::NetConfig {
protocol_version: PROTOCOL_VERSION,
protocol_minimum: PROTOCOL_MINIMUM,
magic: cfg.magic,
local_address: SocketAddr::new("127.0.0.1".parse().unwrap(), cfg.port),
services: Services::default().with_network(true),
user_agent: "pbtc".into(),
user_agent: USER_AGENT.into(),
start_height: 0,
relay: false,
},

View File

@ -8,6 +8,9 @@ pub struct Config {
pub connect: Option<net::SocketAddr>,
pub seednode: Option<String>,
pub print_to_console: bool,
pub inbound_connections: u32,
pub outbound_connections: u32,
pub p2p_threads: usize,
}
pub fn parse(matches: &clap::ArgMatches) -> Result<Config, String> {
@ -19,6 +22,16 @@ pub fn parse(matches: &clap::ArgMatches) -> Result<Config, String> {
(true, true) => return Err("Only one testnet option can be used".into()),
};
let (in_connections, out_connections) = match magic {
Magic::Testnet | Magic::Mainnet => (10, 10),
Magic::Regtest => (1, 0),
};
let p2p_threads = match magic {
Magic::Testnet | Magic::Mainnet => 4,
Magic::Regtest => 1,
};
let port = match matches.value_of("port") {
Some(port) => try!(port.parse().map_err(|_| "Invalid port".to_owned())),
None => magic.port(),
@ -45,6 +58,9 @@ pub fn parse(matches: &clap::ArgMatches) -> Result<Config, String> {
port: port,
connect: connect,
seednode: seednode,
inbound_connections: in_connections,
outbound_connections: out_connections,
p2p_threads: p2p_threads,
};
Ok(config)

View File

@ -23,6 +23,9 @@ mod util;
use app_dirs::AppInfo;
pub const APP_INFO: AppInfo = AppInfo { name: "pbtc", author: "Parity" };
pub const PROTOCOL_VERSION: u32 = 70_014;
pub const PROTOCOL_MINIMUM: u32 = 70_001;
pub const USER_AGENT: &'static str = "pbtc";
fn main() {
env_logger::init().unwrap();
@ -31,7 +34,6 @@ fn main() {
}
}
fn run() -> Result<(), String> {
let yaml = load_yaml!("cli.yml");
let matches = clap::App::from_yaml(yaml).get_matches();

View File

@ -56,11 +56,11 @@ impl BestHeadersChain {
self.best.position(hash)
.and_then(|pos| self.best.at(pos + 1))
.and_then(|child| Some(vec![child]))
.unwrap_or(Vec::new())
.unwrap_or_default()
}
pub fn best_block_hash(&self) -> H256 {
self.best.back().or(Some(self.storage_best_hash.clone())).expect("storage_best_hash is always known")
self.best.back().or_else(|| Some(self.storage_best_hash.clone())).expect("storage_best_hash is always known")
}
pub fn insert(&mut self, header: BlockHeader) {
@ -80,7 +80,7 @@ impl BestHeadersChain {
}
pub fn remove(&mut self, hash: &H256) {
if let Some(_) = self.headers.remove(hash) {
if self.headers.remove(hash).is_some() {
match self.best.remove(hash) {
HashPosition::Front => self.clear(),
HashPosition::Inside(position) => self.clear_after(position),
@ -89,8 +89,8 @@ impl BestHeadersChain {
}
}
pub fn remove_n<'a, I: IntoIterator<Item=H256>> (&mut self, hashes: I) {
for hash in hashes.into_iter() {
pub fn remove_n<I: IntoIterator<Item=H256>> (&mut self, hashes: I) {
for hash in hashes {
self.remove(&hash);
}
}

View File

@ -191,7 +191,7 @@ impl HashQueueChain {
/// Returns element at the given position
pub fn at(&self, mut index: u32) -> Option<H256> {
for queue in self.chain.iter() {
for queue in &self.chain {
let queue_len = queue.len();
if index < queue_len {
return queue.at(index);

View File

@ -29,16 +29,16 @@ impl InboundSyncConnection for InboundConnection {
self.local_node.on_peer_inventory(self.peer_index, message);
}
fn on_getdata(&self, message: types::GetData) {
self.local_node.on_peer_getdata(self.peer_index, message);
fn on_getdata(&self, message: types::GetData, id: u32) {
self.local_node.on_peer_getdata(self.peer_index, message, id);
}
fn on_getblocks(&self, message: types::GetBlocks) {
self.local_node.on_peer_getblocks(self.peer_index, message);
fn on_getblocks(&self, message: types::GetBlocks, id: u32) {
self.local_node.on_peer_getblocks(self.peer_index, message, id);
}
fn on_getheaders(&self, message: types::GetHeaders) {
self.local_node.on_peer_getheaders(self.peer_index, message);
fn on_getheaders(&self, message: types::GetHeaders, id: u32) {
self.local_node.on_peer_getheaders(self.peer_index, message, id);
}
fn on_transaction(&self, message: types::Tx) {
@ -53,8 +53,8 @@ impl InboundSyncConnection for InboundConnection {
self.local_node.on_peer_headers(self.peer_index, message);
}
fn on_mempool(&self, message: types::MemPool) {
self.local_node.on_peer_mempool(self.peer_index, message);
fn on_mempool(&self, message: types::MemPool, id: u32) {
self.local_node.on_peer_mempool(self.peer_index, message, id);
}
fn on_filterload(&self, message: types::FilterLoad) {

View File

@ -94,23 +94,24 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
// TODO: process other inventory types
}
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
self.server.serve_getdata(peer_index, message);
self.server.serve_getdata(peer_index, message, id);
}
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index);
self.server.serve_getblocks(peer_index, message);
self.server.serve_getblocks(peer_index, message, id);
}
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index);
// do not serve getheaders requests until we are synchronized
if self.client.lock().state().is_synchronizing() {
self.executor.lock().execute(SynchronizationTask::Ignore(peer_index, id));
return;
}
@ -125,21 +126,21 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
need_wait
};
self.server.serve_getheaders(peer_index, message);
self.server.serve_getheaders(peer_index, message, id);
if need_wait {
self.server.wait_peer_requests_completed(peer_index);
}
}
pub fn on_peer_transaction(&self, peer_index: usize, message: types::Tx) {
trace!(target: "sync", "Got `transaction` message from peer#{}. Transaction hash: {}", peer_index, message.transaction.hash());
trace!(target: "sync", "Got `transaction` message from peer#{}. Transaction hash: {}", peer_index, message.transaction.hash().to_reversed_str());
// try to process new transaction
self.client.lock().on_peer_transaction(peer_index, message.transaction);
}
pub fn on_peer_block(&self, peer_index: usize, message: types::Block) {
trace!(target: "sync", "Got `block` message from peer#{}. Block hash: {}", peer_index, message.block.hash());
trace!(target: "sync", "Got `block` message from peer#{}. Block hash: {}", peer_index, message.block.hash().to_reversed_str());
// try to process new block
self.client.lock().on_peer_block(peer_index, message.block);
@ -153,10 +154,10 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
}
}
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool) {
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool, id: u32) {
trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index);
self.server.serve_mempool(peer_index);
self.server.serve_mempool(peer_index, id);
}
pub fn on_peer_filterload(&self, peer_index: usize, _message: types::FilterLoad) {
@ -206,14 +207,14 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
self.client.lock().on_peer_blocks_notfound(peer_index, blocks_inventory);
}
fn transactions_inventory(&self, inventory: &Vec<InventoryVector>) -> Vec<H256> {
fn transactions_inventory(&self, inventory: &[InventoryVector]) -> Vec<H256> {
inventory.iter()
.filter(|item| item.inv_type == InventoryType::MessageTx)
.map(|item| item.hash.clone())
.collect()
}
fn blocks_inventory(&self, inventory: &Vec<InventoryVector>) -> Vec<H256> {
fn blocks_inventory(&self, inventory: &[InventoryVector]) -> Vec<H256> {
inventory.iter()
.filter(|item| item.inv_type == InventoryType::MessageBlock)
.map(|item| item.hash.clone())
@ -249,14 +250,14 @@ mod tests {
}
impl OutboundSyncConnection for DummyOutboundSyncConnection {
fn send_inventory(&self, _message: &types::Inv) {}
fn send_inventory(&self, _message: &types::Inv, _id: u32, _is_final: bool) {}
fn send_getdata(&self, _message: &types::GetData) {}
fn send_getblocks(&self, _message: &types::GetBlocks) {}
fn send_getheaders(&self, _message: &types::GetHeaders) {}
fn send_transaction(&self, _message: &types::Tx) {}
fn send_block(&self, _message: &types::Block) {}
fn send_headers(&self, _message: &types::Headers) {}
fn send_mempool(&self, _message: &types::MemPool) {}
fn send_block(&self, _message: &types::Block, _id: u32, _is_final: bool) {}
fn send_headers(&self, _message: &types::Headers, _id: u32, _is_final: bool) {}
fn send_mempool(&self, _message: &types::MemPool, _id: u32, _is_final: bool) {}
fn send_filterload(&self, _message: &types::FilterLoad) {}
fn send_filteradd(&self, _message: &types::FilterAdd) {}
fn send_filterclear(&self, _message: &types::FilterClear) {}
@ -267,7 +268,8 @@ mod tests {
fn send_compact_block(&self, _message: &types::CompactBlock) {}
fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {}
fn send_block_txn(&self, _message: &types::BlockTxn) {}
fn send_notfound(&self, _message: &types::NotFound) {}
fn send_notfound(&self, _message: &types::NotFound, _id: u32, _is_final: bool) {}
fn ignored(&self, _id: u32) {}
}
fn create_local_node() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor>>) {
@ -305,9 +307,10 @@ mod tests {
hash: genesis_block_hash.clone(),
}
];
let dummy_id = 0;
local_node.on_peer_getdata(peer_index, types::GetData {
inventory: inventory.clone()
});
}, dummy_id);
// => `getdata` is served
let tasks = server.take_tasks();
assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]);

View File

@ -367,9 +367,9 @@ impl Chain {
match self.hash_chain.remove_at(VERIFYING_QUEUE, hash) {
HashPosition::Missing => match self.hash_chain.remove_at(REQUESTED_QUEUE, hash) {
HashPosition::Missing => self.hash_chain.remove_at(SCHEDULED_QUEUE, hash),
position @ _ => position,
position => position,
},
position @ _ => position,
position => position,
}
}
@ -416,14 +416,14 @@ impl Chain {
}
/// Intersect chain with inventory
pub fn intersect_with_blocks_headers(&self, hashes: &Vec<H256>, headers: &Vec<BlockHeader>) -> HeadersIntersection {
pub fn intersect_with_blocks_headers(&self, hashes: &[H256], headers: &[BlockHeader]) -> HeadersIntersection {
let hashes_len = hashes.len();
assert!(hashes_len != 0 && hashes.len() == headers.len());
// giving that headers are ordered
let (is_first_known, first_state) = match self.block_state(&hashes[0]) {
BlockState::Unknown => (false, self.block_state(&headers[0].previous_header_hash)),
state @ _ => (true, state),
state => (true, state),
};
match first_state {
// if first block of inventory is unknown && its parent is unknonw => all other blocks are also unknown
@ -431,32 +431,32 @@ impl Chain {
HeadersIntersection::NoKnownBlocks(0)
},
// else if first block is known
first_block_state @ _ => match self.block_state(&hashes[hashes_len - 1]) {
first_block_state => match self.block_state(&hashes[hashes_len - 1]) {
// if last block is known to be in db => all inventory blocks are also in db
BlockState::Stored => {
HeadersIntersection::DbAllBlocksKnown
HeadersIntersection::DbAllBlocksKnown
},
// if first block is known && last block is unknown but we know block before first one => intersection with queue or with db
BlockState::Unknown if !is_first_known => {
// previous block is stored => fork from stored block
if first_state == BlockState::Stored {
return HeadersIntersection::DbForkNewBlocks(0);
HeadersIntersection::DbForkNewBlocks(0)
}
// previous block is best block => no fork
else if &self.best_block().hash == &headers[0].previous_header_hash {
return HeadersIntersection::InMemoryMainNewBlocks(0);
HeadersIntersection::InMemoryMainNewBlocks(0)
}
// previous block is not a best block => fork
else {
return HeadersIntersection::InMemoryForkNewBlocks(0);
HeadersIntersection::InMemoryForkNewBlocks(0)
}
},
// if first block is known && last block is unknown => intersection with queue or with db
BlockState::Unknown if is_first_known => {
// find last known block
let mut previous_state = first_block_state;
for index in 1..hashes_len {
let state = self.block_state(&hashes[index]);
for (index, hash) in hashes.iter().enumerate().take(hashes_len).skip(1) {
let state = self.block_state(hash);
if state == BlockState::Unknown {
// previous block is stored => fork from stored block
if previous_state == BlockState::Stored {

View File

@ -268,8 +268,8 @@ impl Config {
impl State {
pub fn is_saturated(&self) -> bool {
match self {
&State::Saturated => true,
match *self {
State::Saturated => true,
_ => false,
}
}
@ -282,8 +282,8 @@ impl State {
}
pub fn is_nearly_saturated(&self) -> bool {
match self {
&State::NearlySaturated => true,
match *self {
State::NearlySaturated => true,
_ => false,
}
}
@ -310,7 +310,7 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
/// Get synchronization state
fn state(&self) -> State {
self.state.clone()
self.state
}
/// Try to queue synchronization of unknown blocks when new inventory is received.
@ -328,7 +328,7 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
let unknown_blocks_hashes: Vec<_> = {
let chain = self.chain.read();
blocks_hashes.into_iter()
.filter(|h| chain.block_state(&h) == BlockState::Unknown)
.filter(|h| chain.block_state(h) == BlockState::Unknown)
.filter(|h| !self.orphaned_blocks_pool.contains_unknown_block(h))
.collect()
};
@ -364,11 +364,16 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>) {
let blocks_hashes = {
// we can't process headers message if it has no link to our headers
let ref header0 = blocks_headers[0];
if {
self.chain.read().block_state(&header0.previous_header_hash) == BlockState::Unknown
} {
warn!(target: "sync", "Previous header of the first header from peer#{} `headers` message is unknown. First: {:?}. Previous: {:?}", peer_index, header0.hash(), header0.previous_header_hash);
let header0 = &blocks_headers[0];
let unknown_state = self.chain.read().block_state(&header0.previous_header_hash) == BlockState::Unknown;
if unknown_state {
warn!(
target: "sync",
"Previous header of the first header from peer#{} `headers` message is unknown. First: {:?}. Previous: {:?}",
peer_index,
header0.hash().to_reversed_str(),
header0.previous_header_hash.to_reversed_str()
);
return;
}
@ -376,7 +381,7 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
// validate blocks headers before scheduling
let mut blocks_hashes: Vec<H256> = Vec::with_capacity(blocks_headers.len());
let mut prev_block_hash = header0.previous_header_hash.clone();
for block_header in blocks_headers.iter() {
for block_header in &blocks_headers {
let block_header_hash = block_header.hash();
if block_header.previous_header_hash != prev_block_hash {
warn!(target: "sync", "Neighbour headers in peer#{} `headers` message are unlinked: Prev: {:?}, PrevLink: {:?}, Curr: {:?}", peer_index, prev_block_hash, block_header.previous_header_hash, block_header_hash);
@ -515,18 +520,18 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
/// Process failed block verification
fn on_block_verification_error(&mut self, err: &str, hash: &H256) {
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash.to_reversed_str(), err);
{
let mut chain = self.chain.write();
// forget for this block and all its children
// headers are also removed as they all are invalid
chain.forget_block_with_children(&hash);
chain.forget_block_with_children(hash);
}
// awake threads, waiting for this block insertion
self.awake_waiting_threads(&hash);
self.awake_waiting_threads(hash);
// start new tasks
self.execute_synchronization_tasks(None);
@ -652,7 +657,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
/// Get configuration parameters.
pub fn config<'a>(&'a self) -> &'a Config {
pub fn config(&self) -> &Config {
&self.config
}
@ -685,7 +690,13 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
let new_blocks_hashes = hashes.split_off(new_block_index);
let new_blocks_headers = headers.split_off(new_block_index);
let new_blocks_hashes_len = new_blocks_hashes.len();
trace!(target: "sync", "Sch. {} headers from peer#{}. First {:?}, last: {:?}", new_blocks_hashes_len, peer_index, new_blocks_hashes[0], new_blocks_hashes[new_blocks_hashes_len - 1]);
trace!(
target: "sync", "Sch. {} headers from peer#{}. First {:?}, last: {:?}",
new_blocks_hashes_len,
peer_index,
new_blocks_hashes[0].to_reversed_str(),
new_blocks_hashes[new_blocks_hashes_len - 1].to_reversed_str()
);
chain.schedule_blocks_headers(new_blocks_hashes, new_blocks_headers);
// remember peer as useful
self.peers.useful_peer(peer_index);
@ -718,7 +729,12 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
BlockState::Unknown => {
if self.state.is_synchronizing() {
// when synchronizing, we tend to receive all blocks in-order
trace!(target: "sync", "Ignoring block {} from peer#{}, because its parent is unknown and we are synchronizing", block_hash, peer_index);
trace!(
target: "sync",
"Ignoring block {} from peer#{}, because its parent is unknown and we are synchronizing",
block_hash.to_reversed_str(),
peer_index
);
// remove block from current queue
chain.forget_block(&block_hash);
// remove orphaned blocks
@ -863,11 +879,11 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
if !inventory_idle_peers.is_empty() {
let scheduled_hashes_len = { self.chain.read().length_of_blocks_state(BlockState::Scheduled) };
if scheduled_hashes_len < MAX_SCHEDULED_HASHES {
for inventory_peer in inventory_idle_peers.iter() {
for inventory_peer in &inventory_idle_peers {
self.peers.on_inventory_requested(*inventory_peer);
}
let inventory_tasks = inventory_idle_peers.into_iter().map(|p| Task::RequestBlocksHeaders(p));
let inventory_tasks = inventory_idle_peers.into_iter().map(Task::RequestBlocksHeaders);
tasks.extend(inventory_tasks);
}
}
@ -1009,10 +1025,20 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
/// Thread procedure for handling verification tasks
fn verification_worker_proc(sync: Arc<Mutex<Self>>, mut verifier: ChainVerifier, work_receiver: Receiver<VerificationTask>) {
let bip16_time_border = { sync.lock().config().consensus_params.bip16_time };
let mut is_bip16_active = false;
let mut parameters_change_steps = Some(0);
while let Ok(task) = work_receiver.recv() {
match task {
VerificationTask::VerifyBlock(block) => {
// for changes that are not relying on block#
let is_bip16_active_on_block = block.block_header.time >= bip16_time_border;
let force_parameters_change = is_bip16_active_on_block != is_bip16_active;
if force_parameters_change {
parameters_change_steps = Some(0);
}
// change verifier parameters, if needed
if let Some(steps_left) = parameters_change_steps {
if steps_left == 0 {
@ -1020,6 +1046,9 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
let config = sync.config();
let best_storage_block = sync.chain.read().best_storage_block();
is_bip16_active = is_bip16_active_on_block;
verifier = verifier.verify_p2sh(is_bip16_active);
let is_bip65_active = best_storage_block.number >= config.consensus_params.bip65_height;
verifier = verifier.verify_clocktimeverify(is_bip65_active);
@ -1105,7 +1134,7 @@ pub mod tests {
let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone());
(event_loop, handle, executor, chain, client)
}
}
#[test]
fn synchronization_saturated_on_start() {

View File

@ -7,6 +7,7 @@ use message::types;
use primitives::hash::H256;
use p2p::OutboundSyncConnectionRef;
use synchronization_chain::ChainRef;
use synchronization_server::ServerTaskIndex;
use local_node::PeersConnections;
pub type LocalSynchronizationTaskExecutorRef = Arc<Mutex<LocalSynchronizationTaskExecutor>>;
@ -28,13 +29,15 @@ pub enum Task {
/// Request memory pool contents
RequestMemoryPool(usize),
/// Send block.
SendBlock(usize, Block),
SendBlock(usize, Block, ServerTaskIndex),
/// Send notfound
SendNotFound(usize, Vec<InventoryVector>),
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
/// Send inventory
SendInventory(usize, Vec<InventoryVector>),
SendInventory(usize, Vec<InventoryVector>, ServerTaskIndex),
/// Send headers
SendHeaders(usize, Vec<BlockHeader>),
SendHeaders(usize, Vec<BlockHeader>, ServerTaskIndex),
/// Notify io about ignored request
Ignore(usize, u32),
}
/// Synchronization tasks executor
@ -79,7 +82,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Querying {} unknown blocks from peer#{}", getdata.inventory.len(), peer_index);
connection.send_getdata(&getdata);
}
@ -93,7 +95,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Request blocks hashes from peer#{} using getheaders", peer_index);
connection.send_getheaders(&getheaders);
}
@ -122,48 +123,50 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_getdata(&getdata);
}
},
Task::SendBlock(peer_index, block) => {
Task::SendBlock(peer_index, block, id) => {
let block_message = types::Block {
block: block,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index);
connection.send_block(&block_message);
connection.send_block(&block_message, id.raw(), id.is_final());
}
},
Task::SendNotFound(peer_index, unknown_inventory) => {
Task::SendNotFound(peer_index, unknown_inventory, id) => {
let notfound = types::NotFound {
inventory: unknown_inventory,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len());
connection.send_notfound(&notfound);
connection.send_notfound(&notfound, id.raw(), id.is_final());
}
},
Task::SendInventory(peer_index, inventory) => {
Task::SendInventory(peer_index, inventory, id) => {
let inventory = types::Inv {
inventory: inventory,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len());
connection.send_inventory(&inventory);
connection.send_inventory(&inventory, id.raw(), id.is_final());
}
},
Task::SendHeaders(peer_index, headers) => {
Task::SendHeaders(peer_index, headers, id) => {
let headers = types::Headers {
headers: headers,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending headers to peer#{} with {} items", peer_index, headers.headers.len());
connection.send_headers(&headers);
connection.send_headers(&headers, id.raw(), id.is_final());
}
},
Task::Ignore(peer_index, id) => {
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Ignoring request from peer#{} with id {}", peer_index, id);
connection.ignored(id);
}
},
}
@ -222,4 +225,4 @@ pub mod tests {
self.waiter.notify_one();
}
}
}
}

View File

@ -14,10 +14,10 @@ use message::types;
/// Synchronization requests server trait
pub trait Server : Send + 'static {
fn serve_getdata(&self, peer_index: usize, message: types::GetData);
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks);
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders);
fn serve_mempool(&self, peer_index: usize);
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32);
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32);
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32);
fn serve_mempool(&self, peer_index: usize, id: u32);
fn wait_peer_requests_completed(&self, peer_index: usize);
}
@ -42,10 +42,58 @@ struct ServerQueue {
is_stopping: AtomicBool,
queue_ready: Arc<Condvar>,
peers_queue: VecDeque<usize>,
tasks_queue: HashMap<usize, VecDeque<ServerTask>>,
tasks_queue: HashMap<usize, VecDeque<IndexedServerTask>>,
peer_waiters: HashMap<usize, Arc<PeerRequestsWaiter>>,
}
/// `ServerTask` index.
#[derive(Debug, PartialEq)]
pub enum ServerTaskIndex {
/// `Partial` is used when server needs to send more than one response for request.
Partial(u32),
/// `Final` task task can be preceded by many `Partial` tasks with the same id.
Final(u32),
}
impl ServerTaskIndex {
pub fn raw(&self) -> u32 {
match *self {
ServerTaskIndex::Partial(id) | ServerTaskIndex::Final(id) => id,
}
}
pub fn is_final(&self) -> bool {
match *self {
ServerTaskIndex::Partial(_) => false,
ServerTaskIndex::Final(_) => true,
}
}
}
/// Server tests together with unique id assigned to it
#[derive(Debug, PartialEq)]
pub struct IndexedServerTask {
/// Task itself.
task: ServerTask,
/// Task id.
id: ServerTaskIndex,
}
impl IndexedServerTask {
fn new(task: ServerTask, id: ServerTaskIndex) -> Self {
IndexedServerTask {
task: task,
id: id,
}
}
}
impl IndexedServerTask {
fn ignore(id: u32) -> Self {
IndexedServerTask::new(ServerTask::Ignore, ServerTaskIndex::Final(id))
}
}
#[derive(Debug, PartialEq)]
pub enum ServerTask {
ServeGetData(Vec<InventoryVector>),
@ -54,6 +102,7 @@ pub enum ServerTask {
ServeMempool,
ReturnNotFound(Vec<InventoryVector>),
ReturnBlock(H256),
Ignore,
}
impl SynchronizationServer {
@ -97,11 +146,18 @@ impl SynchronizationServer {
})
};
match server_task {
// `getdata` => `notfound` + `block` + ...
Some((peer_index, ServerTask::ServeGetData(inventory))) => {
let (peer_index, indexed_task) = match server_task {
Some((peer_index, indexed_task)) => (peer_index, indexed_task),
// no tasks after wake-up => stopping or pausing
_ => continue,
};
match indexed_task.task {
// `getdata` => `notfound` + `block` + ...
ServerTask::ServeGetData(inventory) => {
let mut unknown_items: Vec<InventoryVector> = Vec::new();
let mut new_tasks: Vec<ServerTask> = Vec::new();
let mut new_tasks: Vec<IndexedServerTask> = Vec::new();
let task_id = indexed_task.id.raw();
{
let chain = chain.read();
let storage = chain.storage();
@ -109,7 +165,10 @@ impl SynchronizationServer {
match item.inv_type {
InventoryType::MessageBlock => {
match storage.block_number(&item.hash) {
Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())),
Some(_) => {
let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::Partial(task_id));
new_tasks.push(task);
},
None => unknown_items.push(item),
}
},
@ -120,18 +179,25 @@ impl SynchronizationServer {
// respond with `notfound` message for unknown data
if !unknown_items.is_empty() {
trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index);
new_tasks.push(ServerTask::ReturnNotFound(unknown_items));
let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::Partial(task_id));
new_tasks.push(task);
}
// schedule data responses
if !new_tasks.is_empty() {
trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index);
// mark last task as the final one
if let Some(task) = new_tasks.last_mut() {
task.id = ServerTaskIndex::Final(task_id);
}
queue.lock().add_tasks(peer_index, new_tasks);
} else {
executor.lock().execute(Task::Ignore(peer_index, task_id));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `getblocks` => `inventory`
Some((peer_index, ServerTask::ServeGetBlocks(best_block, hash_stop))) => {
ServerTask::ServeGetBlocks(best_block, hash_stop) => {
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 500);
if !blocks_hashes.is_empty() {
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
@ -139,25 +205,29 @@ impl SynchronizationServer {
inv_type: InventoryType::MessageBlock,
hash: hash,
}).collect();
executor.lock().execute(Task::SendInventory(peer_index, inventory));
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
} else {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `getheaders` => `headers`
Some((peer_index, ServerTask::ServeGetHeaders(best_block, hash_stop))) => {
ServerTask::ServeGetHeaders(best_block, hash_stop) => {
// What if we have no common blocks with peer at all? Maybe drop connection or penalize peer?
// https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568
let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_block, &hash_stop, 2000);
if !blocks_headers.is_empty() {
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers));
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id));
} else {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `mempool` => `inventory`
Some((peer_index, ServerTask::ServeMempool)) => {
ServerTask::ServeMempool => {
let inventory: Vec<_> = chain.read()
.transactions_hashes_with_state(TransactionState::InMemory)
.into_iter()
@ -168,27 +238,32 @@ impl SynchronizationServer {
.collect();
if !inventory.is_empty() {
trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index);
executor.lock().execute(Task::SendInventory(peer_index, inventory));
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
} else {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `notfound`
Some((peer_index, ServerTask::ReturnNotFound(inventory))) => {
executor.lock().execute(Task::SendNotFound(peer_index, inventory));
ServerTask::ReturnNotFound(inventory) => {
executor.lock().execute(Task::SendNotFound(peer_index, inventory, indexed_task.id));
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `block`
Some((peer_index, ServerTask::ReturnBlock(block_hash))) => {
ServerTask::ReturnBlock(block_hash) => {
let block = chain.read().storage().block(db::BlockRef::Hash(block_hash))
.expect("we have checked that block exists in ServeGetData; db is append-only; qed");
executor.lock().execute(Task::SendBlock(peer_index, block));
executor.lock().execute(Task::SendBlock(peer_index, block, indexed_task.id));
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// no tasks after wake-up => stopping or pausing
None => (),
// ignore
ServerTask::Ignore => {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
queue.lock().task_processed(peer_index);
},
}
}
}
@ -205,9 +280,9 @@ impl SynchronizationServer {
// `max_hashes` hashes after best_block.number OR hash_stop OR blockchain end
(first_block_number..last_block_number).into_iter()
.map(|number| chain.block_hash(number))
.take_while(|ref hash| hash.is_some())
.take_while(|hash| hash.is_some())
.map(|hash| hash.unwrap())
.take_while(|ref hash| *hash != hash_stop)
.take_while(|hash| hash != hash_stop)
.collect()
}
@ -223,16 +298,16 @@ impl SynchronizationServer {
// `max_hashes` hashes after best_block.number OR hash_stop OR blockchain end
(first_block_number..last_block_number).into_iter()
.map(|number| chain.block_header_by_number(number))
.take_while(|ref header| header.is_some())
.take_while(|header| header.is_some())
.map(|header| header.unwrap())
.take_while(|ref header| &header.hash() != hash_stop)
.take_while(|header| &header.hash() != hash_stop)
.collect()
}
fn locate_best_known_block_hash(chain: &ChainRef, hash: &H256) -> Option<db::BestBlock> {
let chain = chain.read();
match chain.block_number(&hash) {
match chain.block_number(hash) {
Some(number) => Some(db::BestBlock {
number: number,
hash: hash.clone(),
@ -240,7 +315,7 @@ impl SynchronizationServer {
// block with hash is not in the main chain (block_number has returned None)
// but maybe it is in some fork? if so => we should find intersection with main chain
// and this would be our best common block
None => chain.block_header_by_hash(&hash)
None => chain.block_header_by_hash(hash)
.and_then(|block| {
let mut current_block_hash = block.previous_header_hash;
loop {
@ -272,32 +347,38 @@ impl Drop for SynchronizationServer {
}
impl Server for SynchronizationServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
self.queue.lock().add_task(peer_index, ServerTask::ServeGetData(message.inventory));
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
self.queue.lock().add_task(peer_index, ServerTask::ServeGetBlocks(best_common_block, message.hash_stop));
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
else {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
}
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
self.queue.lock().add_task(peer_index, ServerTask::ServeGetHeaders(best_common_block, message.hash_stop));
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
else {
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
}
}
fn serve_mempool(&self, peer_index: usize) {
self.queue.lock().add_task(peer_index, ServerTask::ServeMempool);
fn serve_mempool(&self, peer_index: usize, id: u32) {
let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
fn wait_peer_requests_completed(&self, peer_index: usize) {
@ -321,7 +402,7 @@ impl ServerQueue {
}
}
pub fn next_task(&mut self) -> Option<(usize, ServerTask)> {
pub fn next_task(&mut self) -> Option<(usize, IndexedServerTask)> {
self.peers_queue.pop_front()
.map(|peer| {
let (peer_task, no_tasks_left) = {
@ -352,7 +433,7 @@ impl ServerQueue {
}
}
pub fn add_task(&mut self, peer_index: usize, task: ServerTask) {
pub fn add_task(&mut self, peer_index: usize, task: IndexedServerTask) {
match self.tasks_queue.entry(peer_index) {
Entry::Occupied(mut entry) => {
let add_to_peers_queue = entry.get().is_empty();
@ -371,7 +452,7 @@ impl ServerQueue {
self.queue_ready.notify_one();
}
pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec<ServerTask>) {
pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec<IndexedServerTask>) {
match self.tasks_queue.entry(peer_index) {
Entry::Occupied(mut entry) => {
let add_to_peers_queue = entry.get().is_empty();
@ -441,7 +522,7 @@ pub mod tests {
use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_chain::Chain;
use super::{Server, ServerTask, SynchronizationServer};
use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex};
pub struct DummyServer {
tasks: Mutex<Vec<(usize, ServerTask)>>,
@ -460,25 +541,25 @@ pub mod tests {
}
impl Server for DummyServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
fn serve_getdata(&self, peer_index: usize, message: types::GetData, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory)));
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
}
fn serve_mempool(&self, peer_index: usize) {
fn serve_mempool(&self, peer_index: usize, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeMempool));
}
@ -503,12 +584,13 @@ pub mod tests {
hash: H256::default(),
}
];
let dummy_id = 0;
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
});
}, dummy_id);
// => respond with notfound
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::Final(dummy_id))]);
}
#[test]
@ -521,12 +603,13 @@ pub mod tests {
hash: test_data::genesis().hash(),
}
];
let dummy_id = 0;
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
});
}, dummy_id);
// => respond with block
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis())]);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::Final(dummy_id))]);
}
#[test]
@ -534,14 +617,15 @@ pub mod tests {
let (_, executor, server) = create_synchronization_server();
// when asking for blocks hashes
let genesis_block_hash = test_data::genesis().hash();
let dummy_id = 5;
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
});
}, dummy_id);
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
}
#[test]
@ -549,18 +633,19 @@ pub mod tests {
let (chain, executor, server) = create_synchronization_server();
chain.write().insert_best_block(test_data::block_h1().hash(), &test_data::block_h1()).expect("Db write error");
// when asking for blocks hashes
let dummy_id = 0;
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
});
}, dummy_id);
// => responds with inventory
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: test_data::block_h1().hash(),
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
}
#[test]
@ -568,14 +653,15 @@ pub mod tests {
let (_, executor, server) = create_synchronization_server();
// when asking for blocks hashes
let genesis_block_hash = test_data::genesis().hash();
let dummy_id = 6;
server.serve_getheaders(0, types::GetHeaders {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
});
}, dummy_id);
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
}
#[test]
@ -583,27 +669,29 @@ pub mod tests {
let (chain, executor, server) = create_synchronization_server();
chain.write().insert_best_block(test_data::block_h1().hash(), &test_data::block_h1()).expect("Db write error");
// when asking for blocks hashes
let dummy_id = 0;
server.serve_getheaders(0, types::GetHeaders {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
});
}, dummy_id);
// => responds with headers
let headers = vec![
test_data::block_h1().block_header,
];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendHeaders(0, headers)]);
assert_eq!(tasks, vec![Task::SendHeaders(0, headers, ServerTaskIndex::Final(dummy_id))]);
}
#[test]
fn server_mempool_do_not_responds_inventory_when_empty_memory_pool() {
let (_, executor, server) = create_synchronization_server();
// when asking for memory pool transactions ids
server.serve_mempool(0);
let dummy_id = 9;
server.serve_mempool(0, dummy_id);
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
}
#[test]
@ -614,13 +702,14 @@ pub mod tests {
let transaction_hash = transaction.hash();
chain.write().insert_verified_transaction(transaction);
// when asking for memory pool transactions ids
server.serve_mempool(0);
let dummy_id = 0;
server.serve_mempool(0, dummy_id);
// => respond with inventory
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageTx,
hash: transaction_hash,
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
}
}

View File

@ -14,3 +14,6 @@ dot -Tpng > tools/graph.png tools/graph.dot
# Finally let's bring back old Cargo.toml file
patch Cargo.toml tools/workspace.diff
# Now let's revert Cargo.lock to previous state
cargo update -p pbtc

View File

@ -121,6 +121,7 @@ digraph dependencies {
N3 -> N46[label="",style=dashed];
N3 -> N47[label="",style=dashed];
N4 -> N2[label="",style=dashed];
N4 -> N8[label="",style=dashed];
N4 -> N30[label="",style=dashed];
N4 -> N32[label="",style=dashed];
N4 -> N36[label="",style=dashed];
@ -181,6 +182,7 @@ digraph dependencies {
N13 -> N14[label="",style=dashed];
N13 -> N16[label="",style=dashed];
N13 -> N32[label="",style=dashed];
N13 -> N51[label="",style=dashed];
N13 -> N52[label="",style=dashed];
N13 -> N54[label="",style=dashed];
N13 -> N57[label="",style=dashed];
@ -189,6 +191,7 @@ digraph dependencies {
N13 -> N80[label="",style=dashed];
N14 -> N2[label="",style=dashed];
N14 -> N4[label="",style=dashed];
N14 -> N8[label="",style=dashed];
N14 -> N12[label="",style=dashed];
N14 -> N32[label="",style=dashed];
N14 -> N36[label="",style=dashed];
@ -229,6 +232,7 @@ digraph dependencies {
N43 -> N65[label="",style=dashed];
N49 -> N36[label="",style=dashed];
N49 -> N39[label="",style=dashed];
N51 -> N8[label="",style=dashed];
N51 -> N18[label="",style=dashed];
N52 -> N79[label="",style=dashed];
N52 -> N81[label="",style=dashed];

Binary file not shown.

Before

Width:  |  Height:  |  Size: 848 KiB

After

Width:  |  Height:  |  Size: 858 KiB

View File

@ -14,6 +14,7 @@ const MAX_BLOCK_SIZE: usize = 1000000;
pub struct ChainVerifier {
store: Arc<db::Store>,
verify_p2sh: bool,
verify_clocktimeverify: bool,
skip_pow: bool,
skip_sig: bool,
@ -23,6 +24,7 @@ impl ChainVerifier {
pub fn new(store: Arc<db::Store>) -> Self {
ChainVerifier {
store: store,
verify_p2sh: false,
verify_clocktimeverify: false,
skip_pow: false,
skip_sig: false
@ -41,6 +43,11 @@ impl ChainVerifier {
self
}
pub fn verify_p2sh(mut self, verify: bool) -> Self {
self.verify_p2sh = verify;
self
}
pub fn verify_clocktimeverify(mut self, verify: bool) -> Self {
self.verify_clocktimeverify = verify;
self
@ -141,7 +148,7 @@ impl ChainVerifier {
let output: Script = paired_output.script_pubkey.to_vec().into();
let flags = VerificationFlags::default()
.verify_p2sh(true)
.verify_p2sh(self.verify_p2sh)
.verify_clocktimeverify(self.verify_clocktimeverify);
// for tests only, skips as late as possible

View File

@ -157,7 +157,7 @@ impl Queue {
items.push_front(hash, ScheduleItem::Continued(item.block(), num));
},
Err(e) => {
trace!(target: "verification", "Verification of block {} failed: {:?}", &hash, e);
trace!(target: "verification", "Verification of block {} failed: {:?}", hash.to_reversed_str(), e);
let mut invalid = self.invalid.write();
let mut processing = self.processing.write();

View File

@ -50,14 +50,14 @@ pub fn age(protocol_time: u32) -> i64 {
pub fn block_reward_satoshi(block_height: u32) -> u64 {
let mut res = 50 * 100 * 1000 * 1000;
for _ in 0..block_height / 210000 { res = res / 2 }
for _ in 0..block_height / 210000 { res /= 2 }
res
}
pub fn transaction_sigops(transaction: &chain::Transaction) -> Result<usize, script::Error> {
let mut result = 0usize;
for output in transaction.outputs.iter() {
for output in &transaction.outputs {
let output_script: Script = output.script_pubkey.to_vec().into();
// todo: not always allow malformed output?
result += output_script.sigop_count(false).unwrap_or(0);
@ -65,7 +65,7 @@ pub fn transaction_sigops(transaction: &chain::Transaction) -> Result<usize, scr
if transaction.is_coinbase() { return Ok(result); }
for input in transaction.inputs.iter() {
for input in &transaction.inputs {
let input_script: Script = input.script_sig().to_vec().into();
result += try!(input_script.sigop_count(false));
}