Merge branch 'master' into db

This commit is contained in:
NikVolf 2016-11-17 18:12:42 +03:00
commit 7c6dcade30
32 changed files with 1500 additions and 323 deletions

28
Cargo.lock generated
View File

@ -104,13 +104,13 @@ dependencies = [
"bitcrypto 0.1.0",
"heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
"serialization 0.1.0",
]
[[package]]
name = "clap"
version = "2.17.1"
version = "2.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -135,7 +135,7 @@ version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -199,7 +199,7 @@ dependencies = [
"gcc 0.3.38 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -270,15 +270,15 @@ dependencies = [
"base58 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bitcrypto 0.1.0",
"eth-secp256k1 0.5.6 (git+https://github.com/ethcore/rust-secp256k1)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "lazy_static"
version = "0.2.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -476,7 +476,7 @@ version = "0.1.0"
dependencies = [
"app_dirs 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"chain 0.1.0",
"clap 2.17.1 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.18.0 (registry+https://github.com/rust-lang/crates.io-index)",
"db 0.1.0",
"env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"import 0.1.0",
@ -495,7 +495,7 @@ name = "primitives"
version = "0.1.0"
dependencies = [
"heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -565,13 +565,13 @@ dependencies = [
"gcc 0.3.38 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rustc-serialize"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -781,7 +781,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d"
"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"
"checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c"
"checksum clap 2.17.1 (registry+https://github.com/rust-lang/crates.io-index)" = "27dac76762fb56019b04aed3ccb43a770a18f80f9c2eb62ee1a18d9fb4ea2430"
"checksum clap 2.18.0 (registry+https://github.com/rust-lang/crates.io-index)" = "40046b8a004bf3ba43b9078bf4b9b6d1708406a234848f925dbd7160a374c8a8"
"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97"
"checksum csv 0.14.7 (registry+https://github.com/rust-lang/crates.io-index)" = "266c1815d7ca63a5bd86284043faf91e8c95e943e55ce05dc0ae08e952de18bc"
"checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf"
@ -794,7 +794,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum gcc 0.3.38 (registry+https://github.com/rust-lang/crates.io-index)" = "553f11439bdefe755bf366b264820f1da70f3aaf3924e594b886beb9c831bcf5"
"checksum heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8c80e194758495a9109566134dc06e42ea0423987d6ceca016edaa90381b3549"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "49247ec2a285bb3dcb23cbd9c35193c025e7251bfce77c1d5da97e6362dffe7f"
"checksum lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6abe0ee2e758cd6bc8a2cd56726359007748fbf4128da998b65d0b70f881e19b"
"checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b"
"checksum libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)" = "044d1360593a78f5c8e5e710beccdc24ab71d1f01bc19a29bcacdba22e8475d8"
"checksum linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6d262045c5b87c0861b3f004610afd0e2c851e2908d08b6c870cbb9d5f494ecd"
@ -820,7 +820,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum rocksdb 0.4.5 (git+https://github.com/ethcore/rust-rocksdb)" = "<none>"
"checksum rocksdb-sys 0.3.0 (git+https://github.com/ethcore/rust-rocksdb)" = "<none>"
"checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a"
"checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b"
"checksum rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)" = "bff9fc1c79f2dec76b253273d07682e94a978bd8f132ded071188122b2af9818"
"checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084"
"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
"checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac"

View File

@ -26,7 +26,7 @@ pub fn merkle_root(hashes: &[H256]) -> H256 {
// duplicate the last element if len is not even
if hashes.len() % 2 == 1 {
let last = &hashes[hashes.len() - 1];
row.push(dhash256(&*concat(&last, last)));
row.push(dhash256(&*concat(last, last)));
}
merkle_root(&row)

View File

@ -64,6 +64,11 @@ pub trait Store : Send + Sync {
self.block_header_bytes(block_ref).is_some()
}
/// returns true if store contains given transaction
fn contains_transaction(&self, hash: &H256) -> bool {
self.transaction(hash).is_some()
}
/// insert block in the storage
fn insert_block(&self, block: &chain::Block) -> Result<BlockInsertedChain, Error>;
@ -401,7 +406,7 @@ impl Storage {
/// all transaction meta is removed
/// DOES NOT update best block
fn decanonize_block(&self, context: &mut UpdateContext, hash: &H256) -> Result<(), Error> {
trace!(target: "reorg", "Decanonizing block {}", hash);
trace!(target: "reorg", "Decanonizing block {}", hash.to_reversed_str());
// ensure that block is of the main chain
try!(self.block_number(hash).ok_or(Error::not_main(hash)));
@ -680,20 +685,36 @@ impl Store for Storage {
Err(Error::Consistency(consistency_error)) => {
match consistency_error {
ConsistencyError::DoubleSpend(hash) => {
warn!(target: "reorg", "Failed to reorganize to {} due to double-spend at {}", &block_hash, &hash);
warn!(
target: "reorg",
"Failed to reorganize to {} due to double-spend at {}",
block_hash.to_reversed_str(),
hash.to_reversed_str()
);
// return without any commit
return Err(Error::reorganize(&hash));
},
ConsistencyError::UnknownSpending(hash) => {
warn!(target: "reorg", "Failed to reorganize to {} due to spending unknown transaction {}", &block_hash, &hash);
warn!(
target: "reorg",
"Failed to reorganize to {} due to spending unknown transaction {}",
block_hash.to_reversed_str(),
hash.to_reversed_str()
);
// return without any commit
return Err(Error::reorganize(&hash));
},
ConsistencyError::Unknown(hash) => {
// this is orphan block inserted or disconnected chain head updated, we allow that (by now)
// so it is no-op
warn!(target: "reorg", "Disconnected chain head {} updated with {}", &hash, &block_hash);
warn!(
target: "reorg",
"Disconnected chain head {} updated with {}",
hash.to_reversed_str(),
block_hash.to_reversed_str()
);
BlockInsertedChain::Disconnected
},
_ => {
// we don't allow other errors on side chain/orphans

View File

@ -7,7 +7,7 @@ pub struct GetAddr;
impl Payload for GetAddr {
fn version() -> u32 {
60002
0
}
fn command() -> &'static str {

View File

@ -6,10 +6,6 @@ use net::Config as NetConfig;
pub struct Config {
/// Number of threads used by p2p thread pool.
pub threads: usize,
/// Lowest supported protocol version.
pub protocol_minimum: u32,
/// Highest supported protocol version.
pub protocol_maximum: u32,
/// Number of inbound connections.
pub inbound_connections: u32,
/// Number of outbound connections.
@ -20,6 +16,6 @@ pub struct Config {
pub peers: Vec<SocketAddr>,
/// Connect to these nodes to retrieve peer addresses, and disconnect.
pub seeds: Vec<String>,
/// p2p module cache directory.
/// p2p/nodes.csv file path
pub node_table_path: PathBuf,
}

View File

@ -1,19 +1,20 @@
use std::{io, cmp};
use futures::{Future, Poll, Async};
use message::{Message, MessageResult};
use message::{Message, MessageResult, Error};
use message::types::{Version, Verack};
use message::common::Magic;
use io::{write_message, WriteMessage, ReadMessage, read_message};
pub fn handshake<A>(a: A, magic: Magic, version: Version) -> Handshake<A> where A: io::Write + io::Read {
pub fn handshake<A>(a: A, magic: Magic, version: Version, min_version: u32) -> Handshake<A> where A: io::Write + io::Read {
Handshake {
version: version.version(),
state: HandshakeState::SendVersion(write_message(a, version_message(magic, version))),
magic: magic,
min_version: min_version,
}
}
pub fn accept_handshake<A>(a: A, magic: Magic, version: Version) -> AcceptHandshake<A> where A: io::Write + io::Read {
pub fn accept_handshake<A>(a: A, magic: Magic, version: Version, min_version: u32) -> AcceptHandshake<A> where A: io::Write + io::Read {
AcceptHandshake {
version: version.version(),
state: AcceptHandshakeState::ReceiveVersion {
@ -21,6 +22,7 @@ pub fn accept_handshake<A>(a: A, magic: Magic, version: Version) -> AcceptHandsh
future: read_message(a, magic, 0),
},
magic: magic,
min_version: min_version,
}
}
@ -72,12 +74,14 @@ pub struct Handshake<A> {
state: HandshakeState<A>,
magic: Magic,
version: u32,
min_version: u32,
}
pub struct AcceptHandshake<A> {
state: AcceptHandshakeState<A>,
magic: Magic,
version: u32,
min_version: u32,
}
impl<A> Future for Handshake<A> where A: io::Read + io::Write {
@ -97,6 +101,10 @@ impl<A> Future for Handshake<A> where A: io::Read + io::Write {
Err(err) => return Ok((stream, Err(err.into())).into()),
};
if version.version() < self.min_version {
return Ok((stream, Err(Error::InvalidVersion)).into());
}
let next = HandshakeState::ReceiveVerack {
version: Some(version),
future: read_message(stream, self.magic, 0),
@ -140,6 +148,10 @@ impl<A> Future for AcceptHandshake<A> where A: io::Read + io::Write {
Err(err) => return Ok((stream, Err(err.into())).into()),
};
if version.version() < self.min_version {
return Ok((stream, Err(Error::InvalidVersion)).into());
}
let local_version = local_version.take().expect("local version must be set");
let next = AcceptHandshakeState::SendVersion {
version: Some(version),
@ -276,7 +288,7 @@ mod tests {
write: Bytes::default(),
};
let hs = handshake(test_io, magic, local_version).wait().unwrap();
let hs = handshake(test_io, magic, local_version, 0).wait().unwrap();
assert_eq!(hs.0.write, expected_stream.out());
assert_eq!(hs.1.unwrap(), expected);
}
@ -305,7 +317,7 @@ mod tests {
expected_stream.append_slice(Message::new(magic, version, &local_version).unwrap().as_ref());
expected_stream.append_slice(Message::new(magic, version, &Verack).unwrap().as_ref());
let hs = accept_handshake(test_io, magic, local_version).wait().unwrap();
let hs = accept_handshake(test_io, magic, local_version, 0).wait().unwrap();
assert_eq!(hs.0.write, expected_stream.out());
assert_eq!(hs.1.unwrap(), expected);
}

View File

@ -25,9 +25,6 @@ mod config;
mod event_loop;
mod p2p;
pub const VERSION: u32 = 70_014;
pub const USER_AGENT: &'static str = "pbtc";
pub use primitives::{hash, bytes};
pub use config::Config;

View File

@ -9,7 +9,7 @@ use net::{Config, Connection};
pub fn accept_connection(stream: TcpStream, handle: &Handle, config: &Config, address: net::SocketAddr) -> Deadline<AcceptConnection> {
let accept = AcceptConnection {
handshake: accept_handshake(stream, config.magic, config.version(&address)),
handshake: accept_handshake(stream, config.magic, config.version(&address), config.protocol_minimum),
magic: config.magic,
address: address,
};

View File

@ -3,10 +3,11 @@ use message::common::{Magic, Services, NetAddress};
use message::types::version::{Version, V0, V106, V70001};
use util::time::{Time, RealTime};
use util::nonce::{NonceGenerator, RandomNonce};
use VERSION;
#[derive(Debug, Clone)]
pub struct Config {
pub protocol_version: u32,
pub protocol_minimum: u32,
pub magic: Magic,
pub local_address: SocketAddr,
pub services: Services,
@ -18,7 +19,7 @@ pub struct Config {
impl Config {
pub fn version(&self, to: &SocketAddr) -> Version {
Version::V70001(V0 {
version: VERSION,
version: self.protocol_version,
services: self.services,
timestamp: RealTime.get().sec,
receiver: NetAddress {

View File

@ -18,6 +18,7 @@ pub fn connect(address: &SocketAddr, handle: &Handle, config: &Config) -> Deadli
},
magic: config.magic,
address: *address,
protocol_minimum: config.protocol_minimum,
};
deadline(Duration::new(5, 0), handle, connect).expect("Failed to create timeout")
@ -36,6 +37,7 @@ pub struct Connect {
state: ConnectState,
magic: Magic,
address: SocketAddr,
protocol_minimum: u32,
}
impl Future for Connect {
@ -47,7 +49,7 @@ impl Future for Connect {
ConnectState::TcpConnect { ref mut future, ref mut version } => {
let stream = try_ready!(future.poll());
let version = version.take().expect("state TcpConnect must have version");
let handshake = handshake(stream, self.magic, version);
let handshake = handshake(stream, self.magic, version, self.protocol_minimum);
(ConnectState::Handshake(handshake), Async::NotReady)
},
ConnectState::Handshake(ref mut future) => {

View File

@ -96,13 +96,11 @@ impl PeerContext {
} else {
queue.push_finished_response(id, self.to_message(payload).into());
}
} else if sync.is_permitted(id) {
let send = Context::send_to_peer(self.context.clone(), self.info.id, payload);
self.context.spawn(send);
} else {
if sync.is_permitted(id) {
let send = Context::send_to_peer(self.context.clone(), self.info.id, payload);
self.context.spawn(send);
} else {
queue.push_unfinished_response(id, self.to_message(payload).into());
}
queue.push_unfinished_response(id, self.to_message(payload).into());
}
}

View File

@ -46,7 +46,7 @@ pub trait OutboundSyncConnection : Send + Sync {
fn send_transaction(&self, message: &types::Tx);
fn send_block(&self, message: &types::Block, id: u32, is_final: bool);
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool);
fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool);
fn send_mempool(&self, message: &types::MemPool);
fn send_filterload(&self, message: &types::FilterLoad);
fn send_filteradd(&self, message: &types::FilterAdd);
fn send_filterclear(&self, message: &types::FilterClear);
@ -106,8 +106,8 @@ impl OutboundSyncConnection for OutboundSync {
self.context.send_response(message, id, is_final);
}
fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
fn send_mempool(&self, message: &types::MemPool) {
self.context.send_request(message);
}
fn send_filterload(&self, message: &types::FilterLoad) {

View File

@ -99,8 +99,7 @@ impl ThresholdSynchronizer {
self.to_grant_min <= id && id < self.to_grant_max
} else {
// otherwise if is in range [min, u32::max_value()] || [0, max)
(self.to_grant_min <= id && id <= u32::max_value()) ||
id < self.to_grant_max
self.to_grant_min <= id || id < self.to_grant_max
}
}
}
@ -158,12 +157,12 @@ impl ConfigurableSynchronizer {
/// from last_processed response will still be granted permissions.
pub fn change_sync_policy(&mut self, sync: bool) {
let new_inner = match self.inner {
InnerSynchronizer::Threshold(ref s) if sync == false => {
InnerSynchronizer::Threshold(ref s) if !sync => {
InnerSynchronizer::Noop(NoopSynchronizer {
declared_responses: s.inner.declared_responses,
})
},
InnerSynchronizer::Noop(ref s) if sync == true => {
InnerSynchronizer::Noop(ref s) if sync => {
let threshold = ThresholdSynchronizer::new(
s.declared_responses,
CONFIGURABLE_SYNCHRONIZER_THRESHOLD,

View File

@ -2,7 +2,7 @@ use std::net::SocketAddr;
use sync::create_sync_connection_factory;
use message::Services;
use util::{open_db, init_db, node_table_path};
use {config, p2p};
use {config, p2p, PROTOCOL_VERSION, PROTOCOL_MINIMUM, USER_AGENT};
pub fn start(cfg: config::Config) -> Result<(), String> {
let mut el = p2p::event_loop();
@ -11,16 +11,16 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
try!(init_db(&cfg, &db));
let p2p_cfg = p2p::Config {
threads: 4,
protocol_minimum: 70001,
protocol_maximum: 70017,
inbound_connections: 10,
outbound_connections: 10,
threads: cfg.p2p_threads,
inbound_connections: cfg.inbound_connections,
outbound_connections: cfg.outbound_connections,
connection: p2p::NetConfig {
protocol_version: PROTOCOL_VERSION,
protocol_minimum: PROTOCOL_MINIMUM,
magic: cfg.magic,
local_address: SocketAddr::new("127.0.0.1".parse().unwrap(), cfg.port),
services: Services::default().with_network(true),
user_agent: "pbtc".into(),
user_agent: USER_AGENT.into(),
start_height: 0,
relay: false,
},

View File

@ -8,6 +8,9 @@ pub struct Config {
pub connect: Option<net::SocketAddr>,
pub seednode: Option<String>,
pub print_to_console: bool,
pub inbound_connections: u32,
pub outbound_connections: u32,
pub p2p_threads: usize,
}
pub fn parse(matches: &clap::ArgMatches) -> Result<Config, String> {
@ -19,6 +22,16 @@ pub fn parse(matches: &clap::ArgMatches) -> Result<Config, String> {
(true, true) => return Err("Only one testnet option can be used".into()),
};
let (in_connections, out_connections) = match magic {
Magic::Testnet | Magic::Mainnet => (10, 10),
Magic::Regtest => (1, 0),
};
let p2p_threads = match magic {
Magic::Testnet | Magic::Mainnet => 4,
Magic::Regtest => 1,
};
let port = match matches.value_of("port") {
Some(port) => try!(port.parse().map_err(|_| "Invalid port".to_owned())),
None => magic.port(),
@ -45,6 +58,9 @@ pub fn parse(matches: &clap::ArgMatches) -> Result<Config, String> {
port: port,
connect: connect,
seednode: seednode,
inbound_connections: in_connections,
outbound_connections: out_connections,
p2p_threads: p2p_threads,
};
Ok(config)

View File

@ -23,6 +23,9 @@ mod util;
use app_dirs::AppInfo;
pub const APP_INFO: AppInfo = AppInfo { name: "pbtc", author: "Parity" };
pub const PROTOCOL_VERSION: u32 = 70_014;
pub const PROTOCOL_MINIMUM: u32 = 70_001;
pub const USER_AGENT: &'static str = "pbtc";
fn main() {
env_logger::init().unwrap();
@ -31,7 +34,6 @@ fn main() {
}
}
fn run() -> Result<(), String> {
let yaml = load_yaml!("cli.yml");
let matches = clap::App::from_yaml(yaml).get_matches();

View File

@ -56,11 +56,11 @@ impl BestHeadersChain {
self.best.position(hash)
.and_then(|pos| self.best.at(pos + 1))
.and_then(|child| Some(vec![child]))
.unwrap_or(Vec::new())
.unwrap_or_default()
}
pub fn best_block_hash(&self) -> H256 {
self.best.back().or(Some(self.storage_best_hash.clone())).expect("storage_best_hash is always known")
self.best.back().or_else(|| Some(self.storage_best_hash.clone())).expect("storage_best_hash is always known")
}
pub fn insert(&mut self, header: BlockHeader) {
@ -80,7 +80,7 @@ impl BestHeadersChain {
}
pub fn remove(&mut self, hash: &H256) {
if let Some(_) = self.headers.remove(hash) {
if self.headers.remove(hash).is_some() {
match self.best.remove(hash) {
HashPosition::Front => self.clear(),
HashPosition::Inside(position) => self.clear_after(position),
@ -89,8 +89,8 @@ impl BestHeadersChain {
}
}
pub fn remove_n<'a, I: IntoIterator<Item=H256>> (&mut self, hashes: I) {
for hash in hashes.into_iter() {
pub fn remove_n<I: IntoIterator<Item=H256>> (&mut self, hashes: I) {
for hash in hashes {
self.remove(&hash);
}
}

View File

@ -191,7 +191,7 @@ impl HashQueueChain {
/// Returns element at the given position
pub fn at(&self, mut index: u32) -> Option<H256> {
for queue in self.chain.iter() {
for queue in &self.chain {
let queue_len = queue.len();
if index < queue_len {
return queue.at(index);

View File

@ -23,6 +23,8 @@ mod hash_queue;
mod inbound_connection;
mod inbound_connection_factory;
mod local_node;
mod orphan_blocks_pool;
mod orphan_transactions_pool;
mod synchronization_chain;
mod synchronization_client;
mod synchronization_executor;

View File

@ -79,18 +79,19 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
pub fn on_peer_inventory(&self, peer_index: usize, message: types::Inv) {
trace!(target: "sync", "Got `inventory` message from peer#{}. Inventory len: {}", peer_index, message.inventory.len());
// TODO: after each `getblocks` message bitcoind responds with two `inventory` messages:
// (1) with single entry
// (2) with 500 entries
// what is (1)?
// if there are unknown blocks => start synchronizing with peer
let blocks_inventory = self.blocks_inventory(&message.inventory);
if !blocks_inventory.is_empty() {
self.client.lock().on_new_blocks_inventory(peer_index, blocks_inventory);
}
// TODO: process unknown transactions, etc...
// if there are unknown transactions => add to memory pool
let transactions_inventory = self.transactions_inventory(&message.inventory);
if !transactions_inventory.is_empty() {
self.client.lock().on_new_transactions_inventory(peer_index, transactions_inventory);
}
// TODO: process other inventory types
}
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
@ -132,11 +133,14 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
}
pub fn on_peer_transaction(&self, peer_index: usize, message: types::Tx) {
trace!(target: "sync", "Got `transaction` message from peer#{}. Transaction hash: {}", peer_index, message.transaction.hash());
trace!(target: "sync", "Got `transaction` message from peer#{}. Transaction hash: {}", peer_index, message.transaction.hash().to_reversed_str());
// try to process new transaction
self.client.lock().on_peer_transaction(peer_index, message.transaction);
}
pub fn on_peer_block(&self, peer_index: usize, message: types::Block) {
trace!(target: "sync", "Got `block` message from peer#{}. Block hash: {}", peer_index, message.block.hash());
trace!(target: "sync", "Got `block` message from peer#{}. Block hash: {}", peer_index, message.block.hash().to_reversed_str());
// try to process new block
self.client.lock().on_peer_block(peer_index, message.block);
@ -203,7 +207,14 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
self.client.lock().on_peer_blocks_notfound(peer_index, blocks_inventory);
}
fn blocks_inventory(&self, inventory: &Vec<InventoryVector>) -> Vec<H256> {
fn transactions_inventory(&self, inventory: &[InventoryVector]) -> Vec<H256> {
inventory.iter()
.filter(|item| item.inv_type == InventoryType::MessageTx)
.map(|item| item.hash.clone())
.collect()
}
fn blocks_inventory(&self, inventory: &[InventoryVector]) -> Vec<H256> {
inventory.iter()
.filter(|item| item.inv_type == InventoryType::MessageBlock)
.map(|item| item.hash.clone())
@ -246,7 +257,7 @@ mod tests {
fn send_transaction(&self, _message: &types::Tx) {}
fn send_block(&self, _message: &types::Block, _id: u32, _is_final: bool) {}
fn send_headers(&self, _message: &types::Headers, _id: u32, _is_final: bool) {}
fn send_mempool(&self, _message: &types::MemPool, _id: u32, _is_final: bool) {}
fn send_mempool(&self, _message: &types::MemPool) {}
fn send_filterload(&self, _message: &types::FilterLoad) {}
fn send_filteradd(&self, _message: &types::FilterAdd) {}
fn send_filterclear(&self, _message: &types::FilterClear) {}

View File

@ -0,0 +1,245 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::hash_map::Entry;
use linked_hash_map::LinkedHashMap;
use time;
use chain::Block;
use primitives::hash::H256;
#[derive(Debug)]
/// Storage for blocks, for which we have no parent yet.
/// Blocks from this storage are either moved to verification queue, or removed at all.
pub struct OrphanBlocksPool {
/// Blocks from requested_hashes, but received out-of-order.
orphaned_blocks: HashMap<H256, HashMap<H256, Block>>,
/// Blocks that we have received without requesting with receiving time.
unknown_blocks: LinkedHashMap<H256, f64>,
}
impl OrphanBlocksPool {
/// Create new pool
pub fn new() -> Self {
OrphanBlocksPool {
orphaned_blocks: HashMap::new(),
unknown_blocks: LinkedHashMap::new(),
}
}
#[cfg(test)]
/// Get total number of blocks in pool
pub fn len(&self) -> usize {
self.orphaned_blocks.len()
}
/// Check if block with given hash is stored as unknown in this pool
pub fn contains_unknown_block(&self, hash: &H256) -> bool {
self.unknown_blocks.contains_key(hash)
}
/// Get unknown blocks in the insertion order
pub fn unknown_blocks(&self) -> &LinkedHashMap<H256, f64> {
&self.unknown_blocks
}
/// Insert orphaned block, for which we have already requested its parent block
pub fn insert_orphaned_block(&mut self, hash: H256, block: Block) {
self.orphaned_blocks
.entry(block.block_header.previous_header_hash.clone())
.or_insert_with(HashMap::new)
.insert(hash, block);
}
/// Insert unknown block, for which we know nothing about its parent block
pub fn insert_unknown_block(&mut self, hash: H256, block: Block) {
let previous_value = self.unknown_blocks.insert(hash.clone(), time::precise_time_s());
assert_eq!(previous_value, None);
self.insert_orphaned_block(hash, block);
}
/// Remove all blocks, which are not-unknown
pub fn remove_known_blocks(&mut self) -> Vec<H256> {
let orphans_to_remove: HashSet<_> = self.orphaned_blocks.values()
.flat_map(|v| v.iter().map(|e| e.0.clone()))
.filter(|h| !self.unknown_blocks.contains_key(h))
.collect();
self.remove_blocks(&orphans_to_remove);
orphans_to_remove.into_iter().collect()
}
/// Remove all blocks, depending on this parent
pub fn remove_blocks_for_parent(&mut self, hash: &H256) -> Vec<(H256, Block)> {
let mut queue: VecDeque<H256> = VecDeque::new();
queue.push_back(hash.clone());
let mut removed: Vec<(H256, Block)> = Vec::new();
while let Some(parent_hash) = queue.pop_front() {
if let Entry::Occupied(entry) = self.orphaned_blocks.entry(parent_hash) {
let (_, orphaned) = entry.remove_entry();
for orphaned_hash in orphaned.keys() {
self.unknown_blocks.remove(orphaned_hash);
}
queue.extend(orphaned.keys().cloned());
removed.extend(orphaned.into_iter());
}
}
removed
}
/// Remove blocks with given hashes + all dependent blocks
pub fn remove_blocks(&mut self, hashes: &HashSet<H256>) -> Vec<(H256, Block)> {
// TODO: excess clone
let mut removed: Vec<(H256, Block)> = Vec::new();
let parent_orphan_keys: Vec<_> = self.orphaned_blocks.keys().cloned().collect();
for parent_orphan_key in parent_orphan_keys {
if let Entry::Occupied(mut orphan_entry) = self.orphaned_blocks.entry(parent_orphan_key) {
let remove_entry = {
let mut orphans = orphan_entry.get_mut();
let orphans_keys: HashSet<H256> = orphans.keys().cloned().collect();
for orphan_to_remove in orphans_keys.intersection(hashes) {
self.unknown_blocks.remove(orphan_to_remove);
removed.push((orphan_to_remove.clone(),
orphans.remove(orphan_to_remove).expect("iterating by intersection of orphans keys with hashes; removing from orphans; qed")
));
}
orphans.is_empty()
};
if remove_entry {
orphan_entry.remove_entry();
}
}
}
// also delete all children
for hash in hashes.iter() {
removed.extend(self.remove_blocks_for_parent(hash));
}
removed
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use test_data;
use primitives::hash::H256;
use chain::RepresentH256;
use super::OrphanBlocksPool;
#[test]
fn orphan_block_pool_empty_on_start() {
let pool = OrphanBlocksPool::new();
assert_eq!(pool.len(), 0);
}
#[test]
fn orphan_block_pool_insert_orphan_block() {
let mut pool = OrphanBlocksPool::new();
let b1 = test_data::block_h1();
let b1_hash = b1.hash();
pool.insert_orphaned_block(b1_hash.clone(), b1);
assert_eq!(pool.len(), 1);
assert!(!pool.contains_unknown_block(&b1_hash));
assert_eq!(pool.unknown_blocks().len(), 0);
}
#[test]
fn orphan_block_pool_insert_unknown_block() {
let mut pool = OrphanBlocksPool::new();
let b1 = test_data::block_h1();
let b1_hash = b1.hash();
pool.insert_unknown_block(b1_hash.clone(), b1);
assert_eq!(pool.len(), 1);
assert!(pool.contains_unknown_block(&b1_hash));
assert_eq!(pool.unknown_blocks().len(), 1);
}
#[test]
fn orphan_block_pool_remove_known_blocks() {
let mut pool = OrphanBlocksPool::new();
let b1 = test_data::block_h1();
let b1_hash = b1.hash();
let b2 = test_data::block_h169();
let b2_hash = b2.hash();
pool.insert_orphaned_block(b1_hash.clone(), b1);
pool.insert_unknown_block(b2_hash.clone(), b2);
assert_eq!(pool.len(), 2);
assert!(!pool.contains_unknown_block(&b1_hash));
assert!(pool.contains_unknown_block(&b2_hash));
assert_eq!(pool.unknown_blocks().len(), 1);
pool.remove_known_blocks();
assert_eq!(pool.len(), 1);
assert!(!pool.contains_unknown_block(&b1_hash));
assert!(pool.contains_unknown_block(&b2_hash));
assert_eq!(pool.unknown_blocks().len(), 1);
}
#[test]
fn orphan_block_pool_remove_blocks_for_parent() {
let mut pool = OrphanBlocksPool::new();
let b1 = test_data::block_h1();
let b1_hash = b1.hash();
let b2 = test_data::block_h169();
let b2_hash = b2.hash();
let b3 = test_data::block_h2();
let b3_hash = b3.hash();
pool.insert_orphaned_block(b1_hash.clone(), b1);
pool.insert_unknown_block(b2_hash.clone(), b2);
pool.insert_orphaned_block(b3_hash.clone(), b3);
let removed = pool.remove_blocks_for_parent(&test_data::genesis().hash());
assert_eq!(removed.len(), 2);
assert_eq!(removed[0].0, b1_hash);
assert_eq!(removed[1].0, b3_hash);
assert_eq!(pool.len(), 1);
assert!(!pool.contains_unknown_block(&b1_hash));
assert!(pool.contains_unknown_block(&b2_hash));
assert!(!pool.contains_unknown_block(&b1_hash));
assert_eq!(pool.unknown_blocks().len(), 1);
}
#[test]
fn orphan_block_pool_remove_blocks() {
let mut pool = OrphanBlocksPool::new();
let b1 = test_data::block_h1();
let b1_hash = b1.hash();
let b2 = test_data::block_h2();
let b2_hash = b2.hash();
let b3 = test_data::block_h169();
let b3_hash = b3.hash();
let b4 = test_data::block_h170();
let b4_hash = b4.hash();
let b5 = test_data::block_h181();
let b5_hash = b5.hash();
pool.insert_orphaned_block(b1_hash.clone(), b1);
pool.insert_orphaned_block(b2_hash.clone(), b2);
pool.insert_orphaned_block(b3_hash.clone(), b3);
pool.insert_orphaned_block(b4_hash.clone(), b4);
pool.insert_orphaned_block(b5_hash.clone(), b5);
let mut blocks_to_remove: HashSet<H256> = HashSet::new();
blocks_to_remove.insert(b1_hash.clone());
blocks_to_remove.insert(b3_hash.clone());
let removed = pool.remove_blocks(&blocks_to_remove);
assert_eq!(removed.len(), 4);
assert!(removed.iter().any(|&(ref h, _)| h == &b1_hash));
assert!(removed.iter().any(|&(ref h, _)| h == &b2_hash));
assert!(removed.iter().any(|&(ref h, _)| h == &b3_hash));
assert!(removed.iter().any(|&(ref h, _)| h == &b4_hash));
assert_eq!(pool.len(), 1);
}
}

View File

@ -0,0 +1,200 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::hash_map::Entry;
use linked_hash_map::LinkedHashMap;
use time;
use chain::Transaction;
use primitives::hash::H256;
#[derive(Debug)]
/// Storage for transactions, for which we have no parent transactions yet.
/// Transactions from this storage are either moved to verification queue, or removed at all.
pub struct OrphanTransactionsPool {
/// Orphan transactions by hash.
by_hash: LinkedHashMap<H256, OrphanTransaction>,
/// Orphan transactions by parent' transaction hash
by_parent: HashMap<H256, HashSet<H256>>,
}
#[derive(Debug)]
/// Orphan transaction representation.
pub struct OrphanTransaction {
/// Time when this transaction was inserted to the pool
pub insertion_time: f64,
/// Transaction itself
pub transaction: Transaction,
/// Parent transactions, which are still unknown to us
pub unknown_parents: HashSet<H256>,
}
impl OrphanTransactionsPool {
/// Create new pool
pub fn new() -> Self {
OrphanTransactionsPool {
by_hash: LinkedHashMap::new(),
by_parent: HashMap::new(),
}
}
#[cfg(test)]
/// Get total number of transactions in pool
pub fn len(&self) -> usize {
self.by_hash.len()
}
/// Get unknown transactions in the insertion order
pub fn transactions(&self) -> &LinkedHashMap<H256, OrphanTransaction> {
&self.by_hash
}
/// Insert orphan transaction
pub fn insert(&mut self, hash: H256, transaction: Transaction, unknown_parents: HashSet<H256>) {
assert!(!self.by_hash.contains_key(&hash));
assert!(unknown_parents.iter().all(|h| transaction.inputs.iter().any(|i| &i.previous_output.hash == h)));
for unknown_parent in &unknown_parents {
self.by_parent.entry(unknown_parent.clone())
.or_insert_with(HashSet::new)
.insert(hash.clone());
}
self.by_hash.insert(hash, OrphanTransaction::new(transaction, unknown_parents));
}
/// Remove all transactions, depending on this parent
pub fn remove_transactions_for_parent(&mut self, hash: &H256) -> Vec<(H256, Transaction)> {
assert!(!self.by_hash.contains_key(hash));
let mut removal_queue: VecDeque<H256> = VecDeque::new();
removal_queue.push_back(hash.clone());
let mut removed_orphans: Vec<(H256, Transaction)> = Vec::new();
while let Some(hash) = removal_queue.pop_front() {
// remove direct children of hash
let mut removed_orphans_hashes: Vec<H256> = Vec::new();
if let Entry::Occupied(children_entry) = self.by_parent.entry(hash.clone()) {
for child in children_entry.get() {
let all_parents_are_known = {
let child_entry = self.by_hash.get_mut(child).expect("every entry in by_parent.values() has corresponding entry in by_hash.keys()");
child_entry.remove_known_parent(&hash)
};
if all_parents_are_known {
removed_orphans_hashes.push(child.clone());
removed_orphans.push((child.clone(), self.by_hash.remove(child).expect("checked couple of lines above").transaction));
}
}
children_entry.remove_entry();
}
// then also remove grandchildren of hash & so on
removal_queue.extend(removed_orphans_hashes);
}
removed_orphans
}
/// Remove transactions with given hashes + all dependent blocks
pub fn remove_transactions(&mut self, hashes: &[H256]) -> Vec<(H256, Transaction)> {
let mut removed: Vec<(H256, Transaction)> = Vec::new();
for hash in hashes {
if let Some(transaction) = self.by_hash.remove(hash) {
removed.push((hash.clone(), transaction.transaction));
}
removed.extend(self.remove_transactions_for_parent(hash));
}
removed
}
}
impl OrphanTransaction {
/// Create new orphaned transaction
pub fn new(transaction: Transaction, unknown_parents: HashSet<H256>) -> Self {
OrphanTransaction {
insertion_time: time::precise_time_s(),
transaction: transaction,
unknown_parents: unknown_parents,
}
}
/// Remove parent, which is now known. Return true if all parents all now known
pub fn remove_known_parent(&mut self, parent_hash: &H256) -> bool {
self.unknown_parents.remove(parent_hash);
self.unknown_parents.is_empty()
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use test_data::{TransactionBuilder, ChainBuilder};
use primitives::hash::H256;
use super::OrphanTransactionsPool;
#[test]
fn orphan_transaction_pool_empty_on_start() {
let pool = OrphanTransactionsPool::new();
assert_eq!(pool.len(), 0);
}
#[test]
fn orphan_transaction_pool_insert_dependent_transactions() {
let chain = &mut ChainBuilder::new();
TransactionBuilder::with_output(100).store(chain) // t1
.into_input(0).add_output(200).store(chain) // t1 -> t2
.into_input(0).add_output(300).store(chain) // t1 -> t2 -> t3
.set_default_input(0).set_output(400).store(chain) // t4
.into_input(0).set_output(500).store(chain); // t4 -> t5
let t2_unknown: HashSet<H256> = chain.at(1).inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let t3_unknown: HashSet<H256> = chain.at(2).inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let t5_unknown: HashSet<H256> = chain.at(4).inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let mut pool = OrphanTransactionsPool::new();
pool.insert(chain.at(1).hash(), chain.at(1), t2_unknown); // t2
pool.insert(chain.at(2).hash(), chain.at(2), t3_unknown); // t3
pool.insert(chain.at(4).hash(), chain.at(4), t5_unknown); // t5
assert_eq!(pool.len(), 3);
let removed = pool.remove_transactions_for_parent(&chain.at(0).hash());
assert_eq!(pool.len(), 1);
let removed: Vec<H256> = removed.into_iter().map(|(h, _)| h).collect();
assert_eq!(removed, vec![chain.at(1).hash(), chain.at(2).hash()]);
let removed = pool.remove_transactions_for_parent(&chain.at(3).hash());
assert_eq!(pool.len(), 0);
let removed: Vec<H256> = removed.into_iter().map(|(h, _)| h).collect();
assert_eq!(removed, vec![chain.at(4).hash()]);
}
#[test]
fn orphan_transaction_pool_remove_transactions() {
let chain = &mut ChainBuilder::new();
TransactionBuilder::with_output(100).store(chain) // t1
.into_input(0).add_output(200).store(chain) // t1 -> t2
.into_input(0).add_output(300).store(chain) // t1 -> t2 -> t3
.set_default_input(0).set_output(400).store(chain) // t4
.into_input(0).set_output(500).store(chain) // t4 -> t5
.set_default_input(0).set_output(600).store(chain) // t6
.into_input(0).set_output(700).store(chain); // t6 -> t7
let t2_unknown: HashSet<H256> = chain.at(1).inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let t3_unknown: HashSet<H256> = chain.at(2).inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let t5_unknown: HashSet<H256> = chain.at(4).inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let t7_unknown: HashSet<H256> = chain.at(6).inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let mut pool = OrphanTransactionsPool::new();
pool.insert(chain.at(1).hash(), chain.at(1), t2_unknown); // t2
pool.insert(chain.at(2).hash(), chain.at(2), t3_unknown); // t3
pool.insert(chain.at(4).hash(), chain.at(4), t5_unknown); // t5
pool.insert(chain.at(6).hash(), chain.at(6), t7_unknown); // t7
assert_eq!(pool.len(), 4);
let removed = pool.remove_transactions(&vec![chain.at(1).hash(), chain.at(3).hash()]);
assert_eq!(pool.len(), 1);
let removed: Vec<H256> = removed.into_iter().map(|(h, _)| h).collect();
assert_eq!(removed, vec![chain.at(1).hash(), chain.at(2).hash(), chain.at(4).hash()]);
let removed = pool.remove_transactions(&vec![chain.at(6).hash()]);
assert_eq!(pool.len(), 0);
let removed: Vec<H256> = removed.into_iter().map(|(h, _)| h).collect();
assert_eq!(removed, vec![chain.at(6).hash()]);
}
}

View File

@ -1,13 +1,14 @@
use std::fmt;
use std::sync::Arc;
use std::collections::VecDeque;
use linked_hash_map::LinkedHashMap;
use parking_lot::RwLock;
use chain::{Block, BlockHeader};
use chain::{Block, BlockHeader, Transaction};
use db;
use best_headers_chain::{BestHeadersChain, Information as BestHeadersInformation};
use primitives::hash::H256;
use hash_queue::{HashQueueChain, HashPosition};
use miner::MemoryPool;
use miner::{MemoryPool, MemoryPoolOrderingStrategy, MemoryPoolInformation};
/// Thread-safe reference to `Chain`
pub type ChainRef = Arc<RwLock<Chain>>;
@ -21,6 +22,13 @@ const SCHEDULED_QUEUE: usize = 2;
/// Number of hash queues
const NUMBER_OF_QUEUES: usize = 3;
/// Block insertion result
#[derive(Debug, Default)]
pub struct BlockInsertionResult {
/// Transaction to 'reverify'
pub transactions_to_reverify: Vec<(H256, Transaction)>,
}
/// Block synchronization state
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum BlockState {
@ -36,6 +44,19 @@ pub enum BlockState {
Stored,
}
/// Transactions synchronization state
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TransactionState {
/// Transaction is unknown
Unknown,
/// Currently verifying
Verifying,
/// In memory pool
InMemory,
/// In storage
Stored,
}
/// Synchronization chain information
pub struct Information {
/// Number of blocks hashes currently scheduled for requesting
@ -46,6 +67,8 @@ pub struct Information {
pub verifying: u32,
/// Number of blocks in the storage
pub stored: u32,
/// Information on memory pool
pub transactions: MemoryPoolInformation,
/// Information on headers chain
pub headers: BestHeadersInformation,
}
@ -83,6 +106,8 @@ pub struct Chain {
hash_chain: HashQueueChain,
/// In-memory queue of blocks headers
headers_chain: BestHeadersChain,
/// Currently verifying transactions
verifying_transactions: LinkedHashMap<H256, Transaction>,
/// Transactions memory pool
memory_pool: MemoryPool,
}
@ -122,6 +147,7 @@ impl Chain {
storage: storage,
hash_chain: HashQueueChain::with_number_of_queues(NUMBER_OF_QUEUES),
headers_chain: BestHeadersChain::new(genesis_block_hash),
verifying_transactions: LinkedHashMap::new(),
memory_pool: MemoryPool::new(),
}
}
@ -133,6 +159,7 @@ impl Chain {
requested: self.hash_chain.len_of(REQUESTED_QUEUE),
verifying: self.hash_chain.len_of(VERIFYING_QUEUE),
stored: self.best_storage_block.number + 1,
transactions: self.memory_pool.information(),
headers: self.headers_chain.information(),
}
}
@ -142,19 +169,8 @@ impl Chain {
self.storage.clone()
}
/// Get memory pool reference
pub fn memory_pool(&self) -> &MemoryPool {
&self.memory_pool
}
/// Get mutable memory pool reference
#[cfg(test)]
pub fn memory_pool_mut<'a>(&'a mut self) -> &'a mut MemoryPool {
&mut self.memory_pool
}
/// Get number of blocks in given state
pub fn length_of_state(&self, state: BlockState) -> u32 {
pub fn length_of_blocks_state(&self, state: BlockState) -> u32 {
match state {
BlockState::Stored => self.best_storage_block.number + 1,
_ => self.hash_chain.len_of(state.to_queue_index()),
@ -272,7 +288,9 @@ impl Chain {
}
/// Insert new best block to storage
pub fn insert_best_block(&mut self, hash: H256, block: Block) -> Result<(), db::Error> {
pub fn insert_best_block(&mut self, hash: H256, block: &Block) -> Result<BlockInsertionResult, db::Error> {
let is_appending_to_main_branch = self.best_storage_block.hash == block.block_header.previous_header_hash;
// insert to storage
try!(self.storage.insert_block(&block));
@ -282,41 +300,122 @@ impl Chain {
// remove inserted block + handle possible reorganization in headers chain
self.headers_chain.block_inserted_to_storage(&hash, &self.best_storage_block.hash);
Ok(())
// deal with transactions. we have 3 cases here:
// case 1: block has been added to the main branch
if is_appending_to_main_branch {
// double check
assert_eq!(self.best_storage_block.hash, hash);
// all transactions from this block were accepted
// => delete accepted transactions from verification queue and from the memory pool
let this_block_transactions_hashes = block.transactions.iter().map(|tx| tx.hash());
for transaction_accepted in this_block_transactions_hashes {
self.memory_pool.remove_by_hash(&transaction_accepted);
self.verifying_transactions.remove(&transaction_accepted);
}
// no transactions to reverify, because we have just appended new transactions to the blockchain
Ok(BlockInsertionResult {
transactions_to_reverify: Vec::new(),
})
}
// case 2: block has been added to the side branch with reorganization to this branch
else if self.best_storage_block.hash == hash {
// all transactions from this block were accepted
// + all transactions from previous blocks of this fork were accepted
// => delete accepted transactions from verification queue and from the memory pool
let this_block_transactions_hashes = block.transactions.iter().map(|tx| tx.hash());
let new_main_blocks_transactions_hashes = Vec::<H256>::new().into_iter(); // TODO need this information from storage.insert_block()
for transaction_accepted in this_block_transactions_hashes.chain(new_main_blocks_transactions_hashes) {
self.memory_pool.remove_by_hash(&transaction_accepted);
self.verifying_transactions.remove(&transaction_accepted);
}
// reverify all transactions from old main branch' blocks
let old_main_blocks_transactions_hashes = Vec::<H256>::new().into_iter(); // TODO need this information from storage.insert_block()
let old_main_blocks_transactions: Vec<(H256, Transaction)> = old_main_blocks_transactions_hashes
.map(|h| (h.clone(), self.storage.transaction(&h).expect("block in storage => block transaction in storage")))
.collect();
// reverify memory pool transactions
// TODO: maybe reverify only transactions, which depends on other reverifying transactions + transactions from new main branch?
let memory_pool_transactions_count = self.memory_pool.information().transactions_count;
let memory_pool_transactions: Vec<_> = self.memory_pool
.remove_n_with_strategy(memory_pool_transactions_count, MemoryPoolOrderingStrategy::ByTimestamp)
.into_iter()
.map(|t| (t.hash(), t))
.collect();
// reverify verifying transactions
let verifying_transactions: Vec<_> = self.verifying_transactions
.iter()
.map(|(h, t)| (h.clone(), t.clone()))
.collect();
// there's no guarantee (in docs) that LinkedHashMap::into_iter() will return values ordered by insertion time
self.verifying_transactions.clear();
Ok(BlockInsertionResult {
// order matters: db transactions, then ordered mempool transactions, then ordered verifying transactions
transactions_to_reverify: old_main_blocks_transactions.into_iter()
.chain(memory_pool_transactions.into_iter())
.chain(verifying_transactions.into_iter())
.collect(),
})
}
// case 3: block has been added to the side branch without reorganization to this branch
else {
// no transactions were accepted
// no transactions to reverify
Ok(BlockInsertionResult::default())
}
}
/// Forget in-memory block
pub fn forget(&mut self, hash: &H256) -> HashPosition {
pub fn forget_block(&mut self, hash: &H256) -> HashPosition {
self.headers_chain.remove(hash);
self.forget_leave_header(hash)
self.forget_block_leave_header(hash)
}
/// Forget in-memory blocks
pub fn forget_blocks(&mut self, hashes: &[H256]) {
for hash in hashes {
self.forget_block(hash);
}
}
/// Forget in-memory block, but leave its header in the headers_chain (orphan queue)
pub fn forget_leave_header(&mut self, hash: &H256) -> HashPosition {
pub fn forget_block_leave_header(&mut self, hash: &H256) -> HashPosition {
match self.hash_chain.remove_at(VERIFYING_QUEUE, hash) {
HashPosition::Missing => match self.hash_chain.remove_at(REQUESTED_QUEUE, hash) {
HashPosition::Missing => self.hash_chain.remove_at(SCHEDULED_QUEUE, hash),
position @ _ => position,
position => position,
},
position @ _ => position,
position => position,
}
}
/// Forget in-memory blocks, but leave their headers in the headers_chain (orphan queue)
pub fn forget_blocks_leave_header(&mut self, hashes: &[H256]) {
for hash in hashes {
self.forget_block_leave_header(hash);
}
}
/// Forget in-memory block by hash if it is currently in given state
#[cfg(test)]
pub fn forget_with_state(&mut self, hash: &H256, state: BlockState) -> HashPosition {
pub fn forget_block_with_state(&mut self, hash: &H256, state: BlockState) -> HashPosition {
self.headers_chain.remove(hash);
self.forget_with_state_leave_header(hash, state)
self.forget_block_with_state_leave_header(hash, state)
}
/// Forget in-memory block by hash if it is currently in given state
pub fn forget_with_state_leave_header(&mut self, hash: &H256, state: BlockState) -> HashPosition {
pub fn forget_block_with_state_leave_header(&mut self, hash: &H256, state: BlockState) -> HashPosition {
self.hash_chain.remove_at(state.to_queue_index(), hash)
}
/// Forget in-memory block by hash.
/// Also forget all its known children.
pub fn forget_with_children(&mut self, hash: &H256) {
pub fn forget_block_with_children(&mut self, hash: &H256) {
let mut removal_stack: VecDeque<H256> = VecDeque::new();
let mut removal_queue: VecDeque<H256> = VecDeque::new();
removal_queue.push_back(hash.clone());
@ -327,25 +426,25 @@ impl Chain {
removal_stack.push_back(hash);
}
while let Some(hash) = removal_stack.pop_back() {
self.forget(&hash);
self.forget_block(&hash);
}
}
/// Forget all blocks with given state
pub fn forget_all_with_state(&mut self, state: BlockState) {
pub fn forget_all_blocks_with_state(&mut self, state: BlockState) {
let hashes = self.hash_chain.remove_all_at(state.to_queue_index());
self.headers_chain.remove_n(hashes);
}
/// Intersect chain with inventory
pub fn intersect_with_headers(&self, hashes: &Vec<H256>, headers: &Vec<BlockHeader>) -> HeadersIntersection {
pub fn intersect_with_blocks_headers(&self, hashes: &[H256], headers: &[BlockHeader]) -> HeadersIntersection {
let hashes_len = hashes.len();
assert!(hashes_len != 0 && hashes.len() == headers.len());
// giving that headers are ordered
let (is_first_known, first_state) = match self.block_state(&hashes[0]) {
BlockState::Unknown => (false, self.block_state(&headers[0].previous_header_hash)),
state @ _ => (true, state),
state => (true, state),
};
match first_state {
// if first block of inventory is unknown && its parent is unknonw => all other blocks are also unknown
@ -353,32 +452,32 @@ impl Chain {
HeadersIntersection::NoKnownBlocks(0)
},
// else if first block is known
first_block_state @ _ => match self.block_state(&hashes[hashes_len - 1]) {
first_block_state => match self.block_state(&hashes[hashes_len - 1]) {
// if last block is known to be in db => all inventory blocks are also in db
BlockState::Stored => {
HeadersIntersection::DbAllBlocksKnown
HeadersIntersection::DbAllBlocksKnown
},
// if first block is known && last block is unknown but we know block before first one => intersection with queue or with db
BlockState::Unknown if !is_first_known => {
// previous block is stored => fork from stored block
if first_state == BlockState::Stored {
return HeadersIntersection::DbForkNewBlocks(0);
HeadersIntersection::DbForkNewBlocks(0)
}
// previous block is best block => no fork
else if &self.best_block().hash == &headers[0].previous_header_hash {
return HeadersIntersection::InMemoryMainNewBlocks(0);
HeadersIntersection::InMemoryMainNewBlocks(0)
}
// previous block is not a best block => fork
else {
return HeadersIntersection::InMemoryForkNewBlocks(0);
HeadersIntersection::InMemoryForkNewBlocks(0)
}
},
// if first block is known && last block is unknown => intersection with queue or with db
BlockState::Unknown if is_first_known => {
// find last known block
let mut previous_state = first_block_state;
for index in 1..hashes_len {
let state = self.block_state(&hashes[index]);
for (index, hash) in hashes.iter().enumerate().take(hashes_len).skip(1) {
let state = self.block_state(hash);
if state == BlockState::Unknown {
// previous block is stored => fork from stored block
if previous_state == BlockState::Stored {
@ -407,6 +506,75 @@ impl Chain {
}
}
/// Get transaction state
pub fn transaction_state(&self, hash: &H256) -> TransactionState {
if self.verifying_transactions.contains_key(hash) {
return TransactionState::Verifying;
}
if self.memory_pool.contains(hash) {
return TransactionState::InMemory;
}
if self.storage.contains_transaction(hash) {
return TransactionState::Stored;
}
TransactionState::Unknown
}
/// Get transactions hashes with given state
pub fn transactions_hashes_with_state(&self, state: TransactionState) -> Vec<H256> {
match state {
TransactionState::InMemory => self.memory_pool.get_transactions_ids(),
TransactionState::Verifying => self.verifying_transactions.keys().cloned().collect(),
_ => panic!("wrong argument"),
}
}
/// Add transaction to verifying queue
pub fn verify_transaction(&mut self, hash: H256, tx: Transaction) {
self.verifying_transactions.insert(hash, tx);
}
/// Remove verifying trasaction
pub fn forget_verifying_transaction(&mut self, hash: &H256) -> bool {
self.verifying_transactions.remove(hash).is_some()
}
/// Remove verifying trasaction + all dependent transactions currently verifying
pub fn forget_verifying_transaction_with_children(&mut self, hash: &H256) {
self.forget_verifying_transaction(hash);
// TODO: suboptimal
let mut queue: VecDeque<H256> = VecDeque::new();
queue.push_back(hash.clone());
while let Some(hash) = queue.pop_front() {
let all_keys: Vec<_> = self.verifying_transactions.keys().cloned().collect();
for h in all_keys {
let remove_verifying_transaction = {
if let Some(entry) = self.verifying_transactions.get(&h) {
if entry.inputs.iter().any(|i| i.previous_output.hash == hash) {
queue.push_back(h.clone());
true
} else {
false
}
} else {
// iterating by previously read keys
unreachable!()
}
};
if remove_verifying_transaction {
self.verifying_transactions.remove(&h);
}
}
}
}
/// Insert transaction to memory pool
pub fn insert_verified_transaction(&mut self, transaction: Transaction) {
self.memory_pool.insert_verified(transaction);
}
/// Calculate block locator hashes for hash queue
fn block_locator_hashes_for_queue(&self, hashes: &mut Vec<H256>) -> (u32, u32) {
let queue_len = self.hash_chain.len();
@ -491,11 +659,12 @@ impl fmt::Debug for Chain {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use chain::RepresentH256;
use chain::{Transaction, RepresentH256};
use hash_queue::HashPosition;
use super::{Chain, BlockState, HeadersIntersection};
use super::{Chain, BlockState, TransactionState, HeadersIntersection};
use db::{self, Store, BestBlock};
use primitives::hash::H256;
use devtools::RandomTempPath;
use test_data;
#[test]
@ -507,10 +676,10 @@ mod tests {
assert_eq!(chain.information().requested, 0);
assert_eq!(chain.information().verifying, 0);
assert_eq!(chain.information().stored, 1);
assert_eq!(chain.length_of_state(BlockState::Scheduled), 0);
assert_eq!(chain.length_of_state(BlockState::Requested), 0);
assert_eq!(chain.length_of_state(BlockState::Verifying), 0);
assert_eq!(chain.length_of_state(BlockState::Stored), 1);
assert_eq!(chain.length_of_blocks_state(BlockState::Scheduled), 0);
assert_eq!(chain.length_of_blocks_state(BlockState::Requested), 0);
assert_eq!(chain.length_of_blocks_state(BlockState::Verifying), 0);
assert_eq!(chain.length_of_blocks_state(BlockState::Stored), 1);
assert_eq!(&chain.best_block(), &db_best_block);
assert_eq!(chain.block_state(&db_best_block.hash), BlockState::Stored);
assert_eq!(chain.block_state(&H256::from(0)), BlockState::Unknown);
@ -543,12 +712,12 @@ mod tests {
&& chain.information().verifying == 0 && chain.information().stored == 1);
// try to remove block 0 from scheduled queue => missing
assert_eq!(chain.forget_with_state(&hashes[0], BlockState::Scheduled), HashPosition::Missing);
assert_eq!(chain.forget_block_with_state(&hashes[0], BlockState::Scheduled), HashPosition::Missing);
assert!(chain.information().scheduled == 3 && chain.information().requested == 3
&& chain.information().verifying == 0 && chain.information().stored == 1);
// remove blocks 0 & 1 from requested queue
assert_eq!(chain.forget_with_state(&hashes[1], BlockState::Requested), HashPosition::Inside(1));
assert_eq!(chain.forget_with_state(&hashes[0], BlockState::Requested), HashPosition::Front);
assert_eq!(chain.forget_block_with_state(&hashes[1], BlockState::Requested), HashPosition::Inside(1));
assert_eq!(chain.forget_block_with_state(&hashes[0], BlockState::Requested), HashPosition::Front);
assert!(chain.information().scheduled == 3 && chain.information().requested == 1
&& chain.information().verifying == 0 && chain.information().stored == 1);
// mark 0 & 1 as verifying
@ -558,11 +727,11 @@ mod tests {
&& chain.information().verifying == 2 && chain.information().stored == 1);
// mark block 0 as verified
assert_eq!(chain.forget_with_state(&hashes[1], BlockState::Verifying), HashPosition::Front);
assert_eq!(chain.forget_block_with_state(&hashes[1], BlockState::Verifying), HashPosition::Front);
assert!(chain.information().scheduled == 3 && chain.information().requested == 1
&& chain.information().verifying == 1 && chain.information().stored == 1);
// insert new best block to the chain
chain.insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db error");
chain.insert_best_block(test_data::block_h1().hash(), &test_data::block_h1()).expect("Db error");
assert!(chain.information().scheduled == 3 && chain.information().requested == 1
&& chain.information().verifying == 1 && chain.information().stored == 2);
assert_eq!(db.best_block().expect("storage with genesis block is required").number, 1);
@ -577,13 +746,13 @@ mod tests {
let block1 = test_data::block_h1();
let block1_hash = block1.hash();
chain.insert_best_block(block1_hash.clone(), block1).expect("Error inserting new block");
chain.insert_best_block(block1_hash.clone(), &block1).expect("Error inserting new block");
assert_eq!(chain.block_locator_hashes(), vec![block1_hash.clone(), genesis_hash.clone()]);
let block2 = test_data::block_h2();
let block2_hash = block2.hash();
chain.insert_best_block(block2_hash.clone(), block2).expect("Error inserting new block");
chain.insert_best_block(block2_hash.clone(), &block2).expect("Error inserting new block");
assert_eq!(chain.block_locator_hashes(), vec![block2_hash.clone(), block1_hash.clone(), genesis_hash.clone()]);
let blocks0 = test_data::build_n_empty_blocks_from_genesis(11, 0);
@ -656,8 +825,8 @@ mod tests {
fn chain_intersect_with_inventory() {
let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block()));
// append 2 db blocks
chain.insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Error inserting new block");
chain.insert_best_block(test_data::block_h2().hash(), test_data::block_h2()).expect("Error inserting new block");
chain.insert_best_block(test_data::block_h1().hash(), &test_data::block_h1()).expect("Error inserting new block");
chain.insert_best_block(test_data::block_h2().hash(), &test_data::block_h2()).expect("Error inserting new block");
// prepare blocks
let blocks0 = test_data::build_n_empty_blocks_from(9, 0, &test_data::block_h2().block_header);
@ -671,9 +840,9 @@ mod tests {
let blocks1 = test_data::build_n_empty_blocks(2, 0);
let headers1: Vec<_> = blocks1.into_iter().map(|b| b.block_header).collect();
let hashes1: Vec<_> = headers1.iter().map(|h| h.hash()).collect();
assert_eq!(chain.intersect_with_headers(&hashes1, &headers1), HeadersIntersection::NoKnownBlocks(0));
assert_eq!(chain.intersect_with_blocks_headers(&hashes1, &headers1), HeadersIntersection::NoKnownBlocks(0));
assert_eq!(chain.intersect_with_headers(&vec![
assert_eq!(chain.intersect_with_blocks_headers(&vec![
hashes0[2].clone(),
hashes0[3].clone(),
hashes0[4].clone(),
@ -687,7 +856,7 @@ mod tests {
headers0[6].clone(),
]), HeadersIntersection::InMemoryNoNewBlocks);
assert_eq!(chain.intersect_with_headers(&vec![
assert_eq!(chain.intersect_with_blocks_headers(&vec![
hashes0[7].clone(),
hashes0[8].clone(),
hashes1[0].clone(),
@ -699,7 +868,7 @@ mod tests {
headers1[1].clone(),
]), HeadersIntersection::InMemoryMainNewBlocks(2));
assert_eq!(chain.intersect_with_headers(&vec![
assert_eq!(chain.intersect_with_blocks_headers(&vec![
hashes0[6].clone(),
hashes0[7].clone(),
hashes1[0].clone(),
@ -711,7 +880,7 @@ mod tests {
headers1[1].clone(),
]), HeadersIntersection::InMemoryForkNewBlocks(2));
assert_eq!(chain.intersect_with_headers(&vec![
assert_eq!(chain.intersect_with_blocks_headers(&vec![
test_data::block_h1().hash(),
test_data::block_h2().hash(),
], &vec![
@ -719,7 +888,7 @@ mod tests {
test_data::block_h2().block_header,
]), HeadersIntersection::DbAllBlocksKnown);
assert_eq!(chain.intersect_with_headers(&vec![
assert_eq!(chain.intersect_with_blocks_headers(&vec![
test_data::block_h2().hash(),
hashes1[0].clone(),
], &vec![
@ -727,4 +896,130 @@ mod tests {
headers1[0].clone(),
]), HeadersIntersection::DbForkNewBlocks(1));
}
#[test]
fn chain_transaction_state() {
let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block()));
let genesis_block = test_data::genesis();
let block1 = test_data::block_h1();
let tx1: Transaction = test_data::TransactionBuilder::with_version(1).into();
let tx2: Transaction = test_data::TransactionBuilder::with_version(2).into();
let tx1_hash = tx1.hash();
let tx2_hash = tx2.hash();
chain.verify_transaction(tx1_hash.clone(), tx1);
chain.insert_verified_transaction(tx2);
assert_eq!(chain.transaction_state(&genesis_block.transactions[0].hash()), TransactionState::Stored);
assert_eq!(chain.transaction_state(&block1.transactions[0].hash()), TransactionState::Unknown);
assert_eq!(chain.transaction_state(&tx1_hash), TransactionState::Verifying);
assert_eq!(chain.transaction_state(&tx2_hash), TransactionState::InMemory);
}
#[test]
fn chain_block_transaction_is_removed_from_on_block_insert() {
let b0 = test_data::block_builder().header().build().build();
let b1 = test_data::block_builder().header().parent(b0.hash()).build()
.transaction().coinbase()
.output().value(10).build()
.build()
.transaction()
.input().hash(H256::from(1)).index(1).build()
.build()
.build();
let tx1 = b1.transactions[0].clone();
let tx1_hash = tx1.hash();
let tx2 = b1.transactions[1].clone();
let tx2_hash = tx2.hash();
let mut chain = Chain::new(Arc::new(db::TestStorage::with_blocks(&vec![b0])));
chain.verify_transaction(tx1_hash.clone(), tx1);
chain.insert_verified_transaction(tx2);
// only one transaction is in the memory pool
assert_eq!(chain.information().transactions.transactions_count, 1);
// when block is inserted to the database => all accepted transactions are removed from mempool && verifying queue
chain.insert_best_block(b1.hash(), &b1).expect("block accepted");
assert_eq!(chain.information().transactions.transactions_count, 0);
assert!(!chain.forget_verifying_transaction(&tx1_hash));
assert!(!chain.forget_verifying_transaction(&tx2_hash));
}
#[test]
fn chain_forget_verifying_transaction_with_children() {
let test_chain = &mut test_data::ChainBuilder::new();
test_data::TransactionBuilder::with_output(100).store(test_chain) // t1
.into_input(0).add_output(200).store(test_chain) // t1 -> t2
.into_input(0).add_output(300).store(test_chain) // t1 -> t2 -> t3
.set_default_input(0).set_output(400).store(test_chain); // t4
let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block()));
chain.verify_transaction(test_chain.at(0).hash(), test_chain.at(0));
chain.verify_transaction(test_chain.at(1).hash(), test_chain.at(1));
chain.verify_transaction(test_chain.at(2).hash(), test_chain.at(2));
chain.verify_transaction(test_chain.at(3).hash(), test_chain.at(3));
chain.forget_verifying_transaction_with_children(&test_chain.at(0).hash());
assert!(!chain.forget_verifying_transaction(&test_chain.at(0).hash()));
assert!(!chain.forget_verifying_transaction(&test_chain.at(1).hash()));
assert!(!chain.forget_verifying_transaction(&test_chain.at(2).hash()));
assert!(chain.forget_verifying_transaction(&test_chain.at(3).hash()));
}
#[test]
fn chain_transactions_hashes_with_state() {
let test_chain = &mut test_data::ChainBuilder::new();
test_data::TransactionBuilder::with_output(100).store(test_chain) // t1
.into_input(0).add_output(200).store(test_chain) // t1 -> t2
.into_input(0).add_output(300).store(test_chain) // t1 -> t2 -> t3
.set_default_input(0).set_output(400).store(test_chain); // t4
let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block()));
chain.insert_verified_transaction(test_chain.at(0));
chain.insert_verified_transaction(test_chain.at(1));
chain.insert_verified_transaction(test_chain.at(2));
chain.insert_verified_transaction(test_chain.at(3));
let chain_transactions = chain.transactions_hashes_with_state(TransactionState::InMemory);
assert!(chain_transactions.contains(&test_chain.at(0).hash()));
assert!(chain_transactions.contains(&test_chain.at(1).hash()));
assert!(chain_transactions.contains(&test_chain.at(2).hash()));
assert!(chain_transactions.contains(&test_chain.at(3).hash()));
}
#[test]
fn memory_pool_transactions_are_reverified_after_reorganization() {
let b0 = test_data::block_builder().header().build().build();
let b1 = test_data::block_builder().header().nonce(1).parent(b0.hash()).build().build();
let b2 = test_data::block_builder().header().nonce(2).parent(b0.hash()).build().build();
let b3 = test_data::block_builder().header().parent(b2.hash()).build().build();
let tx1: Transaction = test_data::TransactionBuilder::with_version(1).into();
let tx1_hash = tx1.hash();
let tx2: Transaction = test_data::TransactionBuilder::with_version(2).into();
let tx2_hash = tx2.hash();
let path = RandomTempPath::create_dir();
let storage = Arc::new(db::Storage::new(path.as_path()).unwrap());
storage.insert_block(&b0).expect("no db error");
let mut chain = Chain::new(storage);
chain.verify_transaction(tx1_hash.clone(), tx1);
chain.insert_verified_transaction(tx2);
// no reorg
let result = chain.insert_best_block(b1.hash(), &b1).expect("no error");
assert_eq!(result.transactions_to_reverify.len(), 0);
// no reorg
let result = chain.insert_best_block(b2.hash(), &b2).expect("no error");
assert_eq!(result.transactions_to_reverify.len(), 0);
// reorg
let result = chain.insert_best_block(b3.hash(), &b3).expect("no error");
assert_eq!(result.transactions_to_reverify.len(), 2);
assert!(result.transactions_to_reverify.iter().any(|&(ref h, _)| h == &tx1_hash));
assert!(result.transactions_to_reverify.iter().any(|&(ref h, _)| h == &tx2_hash));
}
}

View File

@ -9,20 +9,22 @@ use futures::{BoxFuture, Future, finished};
use futures::stream::Stream;
use tokio_core::reactor::{Handle, Interval};
use futures_cpupool::CpuPool;
use linked_hash_map::LinkedHashMap;
use db;
use chain::{Block, BlockHeader, RepresentH256};
use chain::{Block, BlockHeader, Transaction, RepresentH256};
use message::common::ConsensusParams;
use primitives::hash::H256;
use synchronization_peers::Peers;
#[cfg(test)] use synchronization_peers::{Information as PeersInformation};
use synchronization_chain::{Chain, ChainRef, BlockState, HeadersIntersection};
use synchronization_chain::{ChainRef, BlockState, TransactionState, HeadersIntersection, BlockInsertionResult};
#[cfg(test)]
use synchronization_chain::{Information as ChainInformation};
use verification::{ChainVerifier, Verify};
use synchronization_executor::{Task, TaskExecutor};
use orphan_blocks_pool::OrphanBlocksPool;
use orphan_transactions_pool::OrphanTransactionsPool;
use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory,
manage_unknown_orphaned_blocks, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig};
manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS,
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
use hash_queue::HashPosition;
use time;
use std::time::Duration;
@ -172,13 +174,17 @@ pub struct Information {
/// Current synchronization chain inormation.
pub chain: ChainInformation,
/// Number of currently orphaned blocks.
pub orphaned: usize,
pub orphaned_blocks: usize,
/// Number of currently orphaned transactions.
pub orphaned_transactions: usize,
}
/// Verification thread tasks
enum VerificationTask {
/// Verify single block
VerifyBlock(Block),
/// Verify single transaction
VerifyTransaction(Transaction),
/// Stop verification thread
Stop,
}
@ -188,13 +194,17 @@ pub trait Client : Send + 'static {
fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State;
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>);
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>);
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_peer_block(&mut self, peer_index: usize, block: Block);
fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option<Arc<PeersBlocksWaiter>>);
fn on_block_verification_success(&mut self, block: Block);
fn on_block_verification_error(&mut self, err: &str, hash: &H256);
fn on_transaction_verification_success(&mut self, transaction: Transaction);
fn on_transaction_verification_error(&mut self, err: &str, hash: &H256);
}
/// Synchronization peer blocks waiter
@ -232,10 +242,10 @@ pub struct SynchronizationClient<T: TaskExecutor> {
executor: Arc<Mutex<T>>,
/// Chain reference.
chain: ChainRef,
/// Blocks from requested_hashes, but received out-of-order.
orphaned_blocks: HashMap<H256, HashMap<H256, Block>>,
/// Blocks that we have received without requesting with receiving time.
unknown_blocks: LinkedHashMap<H256, f64>,
/// Orphaned blocks pool.
orphaned_blocks_pool: OrphanBlocksPool,
/// Orphaned transactions pool.
orphaned_transactions_pool: OrphanTransactionsPool,
/// Verification work transmission channel.
verification_work_sender: Option<Sender<VerificationTask>>,
/// Verification thread.
@ -258,8 +268,8 @@ impl Config {
impl State {
pub fn is_saturated(&self) -> bool {
match self {
&State::Saturated => true,
match *self {
State::Saturated => true,
_ => false,
}
}
@ -272,8 +282,8 @@ impl State {
}
pub fn is_nearly_saturated(&self) -> bool {
match self {
&State::NearlySaturated => true,
match *self {
State::NearlySaturated => true,
_ => false,
}
}
@ -300,7 +310,7 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
/// Get synchronization state
fn state(&self) -> State {
self.state.clone()
self.state
}
/// Try to queue synchronization of unknown blocks when new inventory is received.
@ -318,24 +328,52 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
let unknown_blocks_hashes: Vec<_> = {
let chain = self.chain.read();
blocks_hashes.into_iter()
.filter(|h| chain.block_state(&h) == BlockState::Unknown)
.filter(|h| !self.unknown_blocks.contains_key(h))
.filter(|h| chain.block_state(h) == BlockState::Unknown)
.filter(|h| !self.orphaned_blocks_pool.contains_unknown_block(h))
.collect()
};
let mut executor = self.executor.lock();
executor.execute(Task::RequestBlocks(peer_index, unknown_blocks_hashes))
if !unknown_blocks_hashes.is_empty() {
let mut executor = self.executor.lock();
executor.execute(Task::RequestBlocks(peer_index, unknown_blocks_hashes));
}
}
/// Add new transactions to the memory pool
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>) {
// if we are in synchronization state, we will ignore this message
if self.state.is_synchronizing() {
return;
}
// else => request all unknown transactions
let unknown_transactions_hashes: Vec<_> = {
let chain = self.chain.read();
transactions_hashes.into_iter()
.filter(|h| chain.transaction_state(h) == TransactionState::Unknown)
.collect()
};
if !unknown_transactions_hashes.is_empty() {
let mut executor = self.executor.lock();
executor.execute(Task::RequestTransactions(peer_index, unknown_transactions_hashes));
}
}
/// Try to queue synchronization of unknown blocks when blocks headers are received.
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>) {
let blocks_hashes = {
// we can't process headers message if it has no link to our headers
let ref header0 = blocks_headers[0];
if {
self.chain.read().block_state(&header0.previous_header_hash) == BlockState::Unknown
} {
warn!(target: "sync", "Previous header of the first header from peer#{} `headers` message is unknown. First: {:?}. Previous: {:?}", peer_index, header0.hash(), header0.previous_header_hash);
let header0 = &blocks_headers[0];
let unknown_state = self.chain.read().block_state(&header0.previous_header_hash) == BlockState::Unknown;
if unknown_state {
warn!(
target: "sync",
"Previous header of the first header from peer#{} `headers` message is unknown. First: {:?}. Previous: {:?}",
peer_index,
header0.hash().to_reversed_str(),
header0.previous_header_hash.to_reversed_str()
);
return;
}
@ -343,7 +381,7 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
// validate blocks headers before scheduling
let mut blocks_hashes: Vec<H256> = Vec::with_capacity(blocks_headers.len());
let mut prev_block_hash = header0.previous_header_hash.clone();
for block_header in blocks_headers.iter() {
for block_header in &blocks_headers {
let block_header_hash = block_header.hash();
if block_header.previous_header_hash != prev_block_hash {
warn!(target: "sync", "Neighbour headers in peer#{} `headers` message are unlinked: Prev: {:?}, PrevLink: {:?}, Curr: {:?}", peer_index, prev_block_hash, block_header.previous_header_hash, block_header_hash);
@ -395,6 +433,11 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
self.execute_synchronization_tasks(None);
}
/// Process new transaction.
fn on_peer_transaction(&mut self, _peer_index: usize, transaction: Transaction) {
self.process_peer_transaction(transaction);
}
/// Peer disconnected.
fn on_peer_disconnected(&mut self, peer_index: usize) {
// when last peer is disconnected, reset, but let verifying blocks be verified
@ -436,19 +479,29 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
// remove block from verification queue
// header is removed in `insert_best_block` call
// or it is removed earlier, when block was removed from the verifying queue
if chain.forget_with_state_leave_header(&hash, BlockState::Verifying) != HashPosition::Missing {
if chain.forget_block_with_state_leave_header(&hash, BlockState::Verifying) != HashPosition::Missing {
// block was in verification queue => insert to storage
chain.insert_best_block(hash.clone(), block)
chain.insert_best_block(hash.clone(), &block)
} else {
Ok(())
Ok(BlockInsertionResult::default())
}
} {
Ok(_) => {
Ok(insert_result) => {
// awake threads, waiting for this block insertion
self.awake_waiting_threads(&hash);
// continue with synchronization
self.execute_synchronization_tasks(None);
// relay block to our peers
if self.state.is_saturated() {
// TODO: Task::BroadcastBlock
}
// deal with block transactions
for (_, tx) in insert_result.transactions_to_reverify {
self.process_peer_transaction(tx)
}
},
Err(db::Error::Consistency(e)) => {
// process as verification error
@ -463,22 +516,50 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
/// Process failed block verification
fn on_block_verification_error(&mut self, err: &str, hash: &H256) {
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash.to_reversed_str(), err);
{
let mut chain = self.chain.write();
// forget for this block and all its children
// headers are also removed as they all are invalid
chain.forget_with_children(&hash);
chain.forget_block_with_children(hash);
}
// awake threads, waiting for this block insertion
self.awake_waiting_threads(&hash);
self.awake_waiting_threads(hash);
// start new tasks
self.execute_synchronization_tasks(None);
}
/// Process successful transaction verification
fn on_transaction_verification_success(&mut self, transaction: Transaction) {
let hash = transaction.hash();
// insert transaction to the memory pool
let mut chain = self.chain.write();
// remove transaction from verification queue
// if it is not in the queue => it was removed due to error or reorganization
if !chain.forget_verifying_transaction(&hash) {
return;
}
// transaction was in verification queue => insert to memory pool
chain.insert_verified_transaction(transaction);
}
/// Process failed transaction verification
fn on_transaction_verification_error(&mut self, err: &str, hash: &H256) {
warn!(target: "sync", "Transaction {:?} verification failed with error {:?}", hash.to_reversed_str(), err);
{
let mut chain = self.chain.write();
// forget for this transaction and all its children
chain.forget_verifying_transaction_with_children(hash);
}
}
}
impl<T> SynchronizationClient<T> where T: TaskExecutor {
@ -493,8 +574,8 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
management_worker: None,
executor: executor,
chain: chain.clone(),
orphaned_blocks: HashMap::new(),
unknown_blocks: LinkedHashMap::new(),
orphaned_blocks_pool: OrphanBlocksPool::new(),
orphaned_transactions_pool: OrphanTransactionsPool::new(),
verification_work_sender: None,
verification_worker_thread: None,
verifying_blocks_by_peer: HashMap::new(),
@ -523,6 +604,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
{
let peers_config = ManagePeersConfig::default();
let unknown_config = ManageUnknownBlocksConfig::default();
let orphan_config = ManageOrphanTransactionsConfig::default();
let csync = Arc::downgrade(&sync);
let mut sync = sync.lock();
let management_worker = Interval::new(Duration::from_millis(MANAGEMENT_INTERVAL_MS), handle)
@ -539,8 +621,12 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
client.execute_synchronization_tasks(blocks_to_request);
manage_synchronization_peers_inventory(&peers_config, &mut client.peers);
if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, &mut client.unknown_blocks) {
client.remove_orphaned_blocks(orphans_to_remove.into_iter().collect());
manage_orphaned_transactions(&orphan_config, &mut client.orphaned_transactions_pool);
if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, &mut client.orphaned_blocks_pool) {
let mut chain = client.chain.write();
for orphan_to_remove in orphans_to_remove {
chain.forget_block(&orphan_to_remove);
}
}
}
Ok(())
@ -561,12 +647,13 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
state: self.state,
peers: self.peers.information(),
chain: self.chain.read().information(),
orphaned: self.orphaned_blocks.len(),
orphaned_blocks: self.orphaned_blocks_pool.len(),
orphaned_transactions: self.orphaned_transactions_pool.len(),
}
}
/// Get configuration parameters.
pub fn config<'a>(&'a self) -> &'a Config {
pub fn config(&self) -> &Config {
&self.config
}
@ -575,7 +662,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
assert_eq!(hashes.len(), headers.len());
let mut chain = self.chain.write();
match chain.intersect_with_headers(&hashes, &headers) {
match chain.intersect_with_blocks_headers(&hashes, &headers) {
HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => {
warn!(target: "sync", "Ignoring {} headers from peer#{}. Unknown and we are synchronizing.", headers.len(), peer_index);
},
@ -599,12 +686,19 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
let new_blocks_hashes = hashes.split_off(new_block_index);
let new_blocks_headers = headers.split_off(new_block_index);
let new_blocks_hashes_len = new_blocks_hashes.len();
trace!(target: "sync", "Sch. {} headers from peer#{}. First {:?}, last: {:?}", new_blocks_hashes_len, peer_index, new_blocks_hashes[0], new_blocks_hashes[new_blocks_hashes_len - 1]);
trace!(
target: "sync", "Sch. {} headers from peer#{}. First {:?}, last: {:?}",
new_blocks_hashes_len,
peer_index,
new_blocks_hashes[0].to_reversed_str(),
new_blocks_hashes[new_blocks_hashes_len - 1].to_reversed_str()
);
chain.schedule_blocks_headers(new_blocks_hashes, new_blocks_headers);
// remember peer as useful
self.peers.useful_peer(peer_index);
// switch to synchronization state
if !self.state.is_synchronizing() {
// TODO: NearlySaturated should start when we are in Saturated state && count(new_blocks_headers) is < LIMIT (LIMIT > 1)
if new_blocks_hashes_len == 1 && !self.state.is_nearly_saturated() {
self.state = State::NearlySaturated;
}
@ -631,31 +725,34 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
BlockState::Unknown => {
if self.state.is_synchronizing() {
// when synchronizing, we tend to receive all blocks in-order
trace!(target: "sync", "Ignoring block {} from peer#{}, because its parent is unknown and we are synchronizing", block_hash, peer_index);
trace!(
target: "sync",
"Ignoring block {} from peer#{}, because its parent is unknown and we are synchronizing",
block_hash.to_reversed_str(),
peer_index
);
// remove block from current queue
chain.forget(&block_hash);
chain.forget_block(&block_hash);
// remove orphaned blocks
SynchronizationClient::<T>::remove_orphaned_blocks_for_parent(&mut self.unknown_blocks, &mut self.orphaned_blocks, &mut chain, &block_hash);
let removed_blocks_hashes: Vec<_> = self.orphaned_blocks_pool.remove_blocks_for_parent(&block_hash).into_iter().map(|t| t.0).collect();
chain.forget_blocks_leave_header(&removed_blocks_hashes);
} else {
// remove this block from the queue
chain.forget_leave_header(&block_hash);
chain.forget_block_leave_header(&block_hash);
// remember this block as unknown
self.unknown_blocks.insert(block_hash.clone(), time::precise_time_s());
self.orphaned_blocks
.entry(block.block_header.previous_header_hash.clone())
.or_insert_with(HashMap::new)
.insert(block_hash, block);
self.orphaned_blocks_pool.insert_unknown_block(block_hash, block);
}
},
BlockState::Verifying | BlockState::Stored => {
// remember peer as useful
self.peers.useful_peer(peer_index);
// forget block
chain.forget_leave_header(&block_hash);
// schedule verification
let mut blocks: VecDeque<(H256, Block)> = VecDeque::new();
blocks.push_back((block_hash.clone(), block));
blocks.extend(SynchronizationClient::<T>::remove_orphaned_blocks_for_parent(&mut self.unknown_blocks, &mut self.orphaned_blocks, &mut chain, &block_hash));
blocks.extend(self.orphaned_blocks_pool.remove_blocks_for_parent(&block_hash));
// forget blocks we are going to process
let blocks_hashes_to_forget: Vec<_> = blocks.iter().map(|t| t.0.clone()).collect();
chain.forget_blocks_leave_header(&blocks_hashes_to_forget);
while let Some((block_hash, block)) = blocks.pop_front() {
match self.verification_work_sender {
Some(ref verification_work_sender) => {
@ -680,7 +777,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
},
None => {
// insert to the storage + forget block header
chain.insert_best_block(block_hash.clone(), block)
chain.insert_best_block(block_hash.clone(), &block)
.expect("Error inserting to db.");
},
}
@ -690,18 +787,15 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
// remember peer as useful
self.peers.useful_peer(peer_index);
// remember as orphan block
self.orphaned_blocks
.entry(block.block_header.previous_header_hash.clone())
.or_insert_with(HashMap::new)
.insert(block_hash, block);
self.orphaned_blocks_pool.insert_orphaned_block(block_hash, block);
}
}
},
}
// requested block is received => move to saturated state if there are no more blocks
chain.length_of_state(BlockState::Scheduled) == 0
&& chain.length_of_state(BlockState::Requested) == 0
chain.length_of_blocks_state(BlockState::Scheduled) == 0
&& chain.length_of_blocks_state(BlockState::Requested) == 0
};
if switch_to_saturated {
@ -709,6 +803,49 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
}
/// Process new peer transaction
fn process_peer_transaction(&mut self, transaction: Transaction) {
// if we are in synchronization state, we will ignore this message
if self.state.is_synchronizing() {
return;
}
// else => verify transaction + it's orphans and then add to the memory pool
let hash = transaction.hash();
let mut chain = self.chain.write();
// if any parent transaction is unknown => we have orphan transaction => remember in orphan pool
let unknown_parents: HashSet<H256> = transaction.inputs.iter()
.filter(|input| chain.transaction_state(&input.previous_output.hash) == TransactionState::Unknown)
.map(|input| input.previous_output.hash.clone())
.collect();
if !unknown_parents.is_empty() {
self.orphaned_transactions_pool.insert(hash, transaction, unknown_parents);
return;
}
// else verify && insert this transaction && all dependent orphans
let mut transactons: VecDeque<(H256, Transaction)> = VecDeque::new();
transactons.push_back((hash.clone(), transaction));
transactons.extend(self.orphaned_transactions_pool.remove_transactions_for_parent(&hash));
while let Some((tx_hash, tx)) = transactons.pop_front() {
match self.verification_work_sender {
Some(ref verification_work_sender) => {
// append to verifying queue
chain.verify_transaction(tx_hash.clone(), tx.clone());
// schedule verification
verification_work_sender
.send(VerificationTask::VerifyTransaction(tx))
.expect("Verification thread have the same lifetime as `Synchronization`");
},
None => {
// insert to the memory pool
chain.insert_verified_transaction(tx);
},
}
}
}
/// Schedule new synchronization tasks, if any.
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>) {
let mut tasks: Vec<Task> = Vec::new();
@ -736,13 +873,13 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
// check if we can query some blocks hashes
let inventory_idle_peers = self.peers.idle_peers_for_inventory();
if !inventory_idle_peers.is_empty() {
let scheduled_hashes_len = { self.chain.read().length_of_state(BlockState::Scheduled) };
let scheduled_hashes_len = { self.chain.read().length_of_blocks_state(BlockState::Scheduled) };
if scheduled_hashes_len < MAX_SCHEDULED_HASHES {
for inventory_peer in inventory_idle_peers.iter() {
for inventory_peer in &inventory_idle_peers {
self.peers.on_inventory_requested(*inventory_peer);
}
let inventory_tasks = inventory_idle_peers.into_iter().map(|p| Task::RequestBlocksHeaders(p));
let inventory_tasks = inventory_idle_peers.into_iter().map(Task::RequestBlocksHeaders);
tasks.extend(inventory_tasks);
}
}
@ -751,9 +888,9 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
let blocks_idle_peers_len = blocks_idle_peers.len() as u32;
if blocks_idle_peers_len != 0 {
let mut chain = self.chain.write();
let scheduled_hashes_len = chain.length_of_state(BlockState::Scheduled);
let requested_hashes_len = chain.length_of_state(BlockState::Requested);
let verifying_hashes_len = chain.length_of_state(BlockState::Verifying);
let scheduled_hashes_len = chain.length_of_blocks_state(BlockState::Scheduled);
let requested_hashes_len = chain.length_of_blocks_state(BlockState::Requested);
let verifying_hashes_len = chain.length_of_blocks_state(BlockState::Verifying);
if requested_hashes_len + verifying_hashes_len < MAX_REQUESTED_BLOCKS + MAX_VERIFYING_BLOCKS && scheduled_hashes_len != 0 {
let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(scheduled_hashes_len / blocks_idle_peers_len, MIN_BLOCKS_IN_REQUEST));
let hashes_to_request_len = chunk_size * blocks_idle_peers_len;
@ -808,17 +945,14 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
self.peers.reset();
// remove sync orphans, but leave unknown orphans until they'll be removed by management thread
let orphans_to_remove: HashSet<_> = self.orphaned_blocks.values()
.flat_map(|v| v.iter().map(|e| e.0.clone()))
.filter(|h| !self.unknown_blocks.contains_key(h))
.collect();
self.remove_orphaned_blocks(orphans_to_remove);
let removed_orphans = self.orphaned_blocks_pool.remove_known_blocks();
// leave currently verifying blocks
{
let mut chain = self.chain.write();
chain.forget_all_with_state(BlockState::Requested);
chain.forget_all_with_state(BlockState::Scheduled);
chain.forget_blocks(&removed_orphans);
chain.forget_all_blocks_with_state(BlockState::Requested);
chain.forget_all_blocks_with_state(BlockState::Scheduled);
use time;
info!(target: "sync", "{:?} @ Switched to saturated state. Chain information: {:?}",
@ -826,61 +960,18 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
chain.information());
}
// finally - ask all known peers for their best blocks inventory, in case if some peer
// finally - ask all known peers for their best blocks inventory, in case if some peer
// has lead us to the fork
// + ask all peers for their memory pool
{
let mut executor = self.executor.lock();
for peer in self.peers.all_peers() {
executor.execute(Task::RequestBlocksHeaders(peer));
executor.execute(Task::RequestMemoryPool(peer));
}
}
}
/// Remove orphaned blocks for given parent
fn remove_orphaned_blocks_for_parent(unknown_blocks: &mut LinkedHashMap<H256, f64>, orphaned_blocks: &mut HashMap<H256, HashMap<H256, Block>>, chain: &mut Chain, parent: &H256) -> VecDeque<(H256, Block)> {
let mut queue: VecDeque<H256> = VecDeque::new();
queue.push_back(parent.clone());
let mut removed: VecDeque<(H256, Block)> = VecDeque::new();
while let Some(parent_hash) = queue.pop_front() {
chain.forget_leave_header(&parent_hash);
if let Entry::Occupied(entry) = orphaned_blocks.entry(parent_hash) {
let (_, orphaned) = entry.remove_entry();
for orphaned_hash in orphaned.keys() {
unknown_blocks.remove(&orphaned_hash);
}
queue.extend(orphaned.keys().cloned());
removed.extend(orphaned.into_iter());
}
}
removed
}
/// Remove given orphaned blocks
fn remove_orphaned_blocks(&mut self, orphans_to_remove: HashSet<H256>) {
let parent_orphan_keys: Vec<_> = self.orphaned_blocks.keys().cloned().collect();
for parent_orphan_key in parent_orphan_keys.into_iter() {
if let Entry::Occupied(mut orphan_entry) = self.orphaned_blocks.entry(parent_orphan_key.clone()) {
if {
let mut orphans = orphan_entry.get_mut();
let orphans_keys: HashSet<H256> = orphans.keys().cloned().collect();
for orphan_to_remove in orphans_keys.intersection(&orphans_to_remove) {
orphans.remove(orphan_to_remove);
}
orphans.is_empty()
} {
orphan_entry.remove_entry();
}
}
}
let mut chain = self.chain.write();
for orphan_to_remove in orphans_to_remove {
chain.forget(&orphan_to_remove);
}
}
/// Awake threads, waiting for this block
fn awake_waiting_threads(&mut self, hash: &H256) {
// find a peer, which has supplied us with this block
@ -888,7 +979,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
let peer_index = *block_entry.get();
// find a # of blocks, which this thread has supplied
if let Entry::Occupied(mut entry) = self.verifying_blocks_waiters.entry(peer_index) {
if {
let is_last_block = {
let &mut (ref mut waiting, ref waiter) = entry.get_mut();
waiting.remove(hash);
// if this is the last block => awake waiting threads
@ -899,7 +990,9 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
}
is_last_block
} {
};
if is_last_block {
entry.remove_entry();
}
}
@ -977,7 +1070,11 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
}
},
_ => break,
VerificationTask::VerifyTransaction(transaction) => {
// TODO: add verification here
sync.lock().on_transaction_verification_error("unimplemented", &transaction.hash())
}
VerificationTask::Stop => break,
}
}
}
@ -1005,12 +1102,13 @@ pub mod tests {
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use tokio_core::reactor::{Core, Handle};
use chain::{Block, RepresentH256};
use chain::{Block, Transaction, RepresentH256};
use message::common::{Magic, ConsensusParams};
use super::{Client, Config, SynchronizationClient};
use synchronization_executor::Task;
use synchronization_chain::{Chain, ChainRef};
use synchronization_executor::tests::DummyTaskExecutor;
use primitives::hash::H256;
use p2p::event_loop;
use test_data;
use db;
@ -1034,7 +1132,7 @@ pub mod tests {
let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone());
(event_loop, handle, executor, chain, client)
}
}
#[test]
fn synchronization_saturated_on_start() {
@ -1042,7 +1140,8 @@ pub mod tests {
let sync = sync.lock();
let info = sync.information();
assert!(!info.state.is_synchronizing());
assert_eq!(info.orphaned, 0);
assert_eq!(info.orphaned_blocks, 0);
assert_eq!(info.orphaned_transactions, 0);
}
#[test]
@ -1057,7 +1156,7 @@ pub mod tests {
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(5), Task::RequestBlocks(5, vec![block1.hash()])]);
assert!(sync.information().state.is_nearly_saturated());
assert_eq!(sync.information().orphaned, 0);
assert_eq!(sync.information().orphaned_blocks, 0);
assert_eq!(sync.information().chain.scheduled, 0);
assert_eq!(sync.information().chain.requested, 1);
assert_eq!(sync.information().chain.stored, 1);
@ -1067,7 +1166,7 @@ pub mod tests {
// push unknown block => will be queued as orphan
sync.on_peer_block(5, block2);
assert!(sync.information().state.is_nearly_saturated());
assert_eq!(sync.information().orphaned, 1);
assert_eq!(sync.information().orphaned_blocks, 1);
assert_eq!(sync.information().chain.scheduled, 0);
assert_eq!(sync.information().chain.requested, 1);
assert_eq!(sync.information().chain.stored, 1);
@ -1077,7 +1176,7 @@ pub mod tests {
// push requested block => should be moved to the test storage && orphan should be moved
sync.on_peer_block(5, block1);
assert!(sync.information().state.is_saturated());
assert_eq!(sync.information().orphaned, 0);
assert_eq!(sync.information().orphaned_blocks, 0);
assert_eq!(sync.information().chain.scheduled, 0);
assert_eq!(sync.information().chain.requested, 0);
assert_eq!(sync.information().chain.stored, 3);
@ -1096,7 +1195,7 @@ pub mod tests {
// out-of-order block was presented by the peer
assert!(sync.information().state.is_synchronizing());
assert_eq!(sync.information().orphaned, 0);
assert_eq!(sync.information().orphaned_blocks, 0);
assert_eq!(sync.information().chain.scheduled, 0);
assert_eq!(sync.information().chain.requested, 2);
assert_eq!(sync.information().chain.stored, 1);
@ -1142,12 +1241,12 @@ pub mod tests {
// receive block from peer#2
sync.on_peer_block(2, block2);
assert!(sync.information().chain.requested == 2
&& sync.information().orphaned == 1);
&& sync.information().orphaned_blocks == 1);
// receive block from peer#1
sync.on_peer_block(1, block1);
assert!(sync.information().chain.requested == 0
&& sync.information().orphaned == 0
&& sync.information().orphaned_blocks == 0
&& sync.information().chain.stored == 3);
}
}
@ -1195,9 +1294,11 @@ pub mod tests {
sync.on_peer_block(2, block);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks.len(), 2);
assert_eq!(tasks.len(), 4);
assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(1)));
assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(2)));
assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(1)));
assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2)));
}
#[test]
@ -1234,7 +1335,7 @@ pub mod tests {
sync.on_peer_block(1, b2);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1)]);
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::RequestMemoryPool(1)]);
{
let chain = chain.read();
@ -1277,7 +1378,7 @@ pub mod tests {
sync.on_peer_block(1, b1);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1)]);
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::RequestMemoryPool(1)]);
{
let chain = chain.read();
@ -1410,7 +1511,7 @@ pub mod tests {
let mut sync = sync.lock();
sync.on_peer_block(1, test_data::block_h2());
assert_eq!(sync.information().orphaned, 1);
assert_eq!(sync.information().orphaned_blocks, 1);
{
let chain = chain.read();
@ -1418,7 +1519,7 @@ pub mod tests {
}
sync.on_peer_block(1, test_data::block_h1());
assert_eq!(sync.information().orphaned, 0);
assert_eq!(sync.information().orphaned_blocks, 0);
{
let chain = chain.read();
@ -1497,7 +1598,7 @@ pub mod tests {
sync.on_peer_blocks_notfound(1, vec![b1.hash()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1)]);
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::RequestMemoryPool(1)]);
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.unuseful, 1);
@ -1529,4 +1630,136 @@ pub mod tests {
assert_eq!(sync.information().peers.unuseful, 0);
assert_eq!(sync.information().peers.active, 1);
}
#[test]
fn transaction_is_not_requested_when_synchronizing() {
let (_, _, executor, _, sync) = create_sync(None);
let mut sync = sync.lock();
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
sync.on_new_blocks_headers(1, vec![b1.block_header.clone(), b2.block_header.clone()]);
assert!(sync.information().state.is_synchronizing());
{ executor.lock().take_tasks(); } // forget tasks
sync.on_new_transactions_inventory(0, vec![H256::from(0)]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![]);
}
#[test]
fn transaction_is_requested_when_not_synchronizing() {
let (_, _, executor, _, sync) = create_sync(None);
let mut sync = sync.lock();
sync.on_new_transactions_inventory(0, vec![H256::from(0)]);
{
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestTransactions(0, vec![H256::from(0)])]);
}
let b1 = test_data::block_h1();
sync.on_new_blocks_headers(1, vec![b1.block_header.clone()]);
assert!(sync.information().state.is_nearly_saturated());
{ executor.lock().take_tasks(); } // forget tasks
sync.on_new_transactions_inventory(0, vec![H256::from(1)]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestTransactions(0, vec![H256::from(1)])]);
}
#[test]
fn same_transaction_can_be_requested_twice() {
let (_, _, executor, _, sync) = create_sync(None);
let mut sync = sync.lock();
sync.on_new_transactions_inventory(0, vec![H256::from(0)]);
{
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestTransactions(0, vec![H256::from(0)])]);
}
sync.on_new_transactions_inventory(0, vec![H256::from(0)]);
{
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestTransactions(0, vec![H256::from(0)])]);
}
}
#[test]
fn known_transaction_is_not_requested() {
let (_, _, executor, _, sync) = create_sync(None);
let mut sync = sync.lock();
sync.on_new_transactions_inventory(0, vec![test_data::genesis().transactions[0].hash(), H256::from(0)]);
assert_eq!(executor.lock().take_tasks(), vec![Task::RequestTransactions(0, vec![H256::from(0)])]);
}
#[test]
fn transaction_is_not_accepted_when_synchronizing() {
let (_, _, _, _, sync) = create_sync(None);
let mut sync = sync.lock();
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
sync.on_new_blocks_headers(1, vec![b1.block_header.clone(), b2.block_header.clone()]);
assert!(sync.information().state.is_synchronizing());
sync.process_peer_transaction(Transaction::default());
assert_eq!(sync.information().chain.transactions.transactions_count, 0);
}
#[test]
fn transaction_is_accepted_when_not_synchronizing() {
let (_, _, _, _, sync) = create_sync(None);
let mut sync = sync.lock();
sync.process_peer_transaction(test_data::TransactionBuilder::with_version(1).into());
assert_eq!(sync.information().chain.transactions.transactions_count, 1);
let b1 = test_data::block_h1();
sync.on_new_blocks_headers(1, vec![b1.block_header.clone()]);
assert!(sync.information().state.is_nearly_saturated());
sync.process_peer_transaction(test_data::TransactionBuilder::with_version(2).into());
assert_eq!(sync.information().chain.transactions.transactions_count, 2);
}
#[test]
fn transaction_is_orphaned_when_input_is_unknown() {
let (_, _, _, _, sync) = create_sync(None);
let mut sync = sync.lock();
sync.process_peer_transaction(test_data::TransactionBuilder::with_default_input(0).into());
assert_eq!(sync.information().chain.transactions.transactions_count, 0);
assert_eq!(sync.information().orphaned_transactions, 1);
}
#[test]
fn orphaned_transaction_is_verified_when_input_is_received() {
let chain = &mut test_data::ChainBuilder::new();
test_data::TransactionBuilder::with_output(10).store(chain) // t0
.set_input(&chain.at(0), 0).set_output(20).store(chain); // t0 -> t1
let (_, _, _, _, sync) = create_sync(None);
let mut sync = sync.lock();
sync.process_peer_transaction(chain.at(1));
assert_eq!(sync.information().chain.transactions.transactions_count, 0);
assert_eq!(sync.information().orphaned_transactions, 1);
sync.process_peer_transaction(chain.at(0));
assert_eq!(sync.information().chain.transactions.transactions_count, 2);
assert_eq!(sync.information().orphaned_transactions, 0);
}
}

View File

@ -24,6 +24,10 @@ pub enum Task {
RequestBlocks(usize, Vec<H256>),
/// Request blocks headers using full getheaders.block_locator_hashes.
RequestBlocksHeaders(usize),
/// Request memory pool contents
RequestTransactions(usize, Vec<H256>),
/// Request memory pool contents
RequestMemoryPool(usize),
/// Send block.
SendBlock(usize, Block, ServerTaskIndex),
/// Send notfound
@ -81,7 +85,7 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
trace!(target: "sync", "Querying {} unknown blocks from peer#{}", getdata.inventory.len(), peer_index);
connection.send_getdata(&getdata);
}
}
},
Task::RequestBlocksHeaders(peer_index) => {
let block_locator_hashes = self.chain.read().block_locator_hashes();
let getheaders = types::GetHeaders {
@ -95,6 +99,28 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_getheaders(&getheaders);
}
},
Task::RequestMemoryPool(peer_index) => {
let mempool = types::MemPool;
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Querying memory pool contents from peer#{}", peer_index);
connection.send_mempool(&mempool);
}
},
Task::RequestTransactions(peer_index, transactions_hashes) => {
let getdata = types::GetData {
inventory: transactions_hashes.into_iter()
.map(|hash| InventoryVector {
inv_type: InventoryType::MessageTx,
hash: hash,
}).collect()
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Querying {} unknown transactions from peer#{}", getdata.inventory.len(), peer_index);
connection.send_getdata(&getdata);
}
},
Task::SendBlock(peer_index, block, id) => {
let block_message = types::Block {
block: block,

View File

@ -1,5 +1,7 @@
use std::collections::HashSet;
use time::precise_time_s;
use linked_hash_map::LinkedHashMap;
use orphan_blocks_pool::OrphanBlocksPool;
use orphan_transactions_pool::OrphanTransactionsPool;
use synchronization_peers::Peers;
use primitives::hash::H256;
@ -13,6 +15,10 @@ const DEFAULT_PEER_INVENTORY_FAILURE_INTERVAL_MS: u32 = 5 * 1000;
const DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS: u32 = 20 * 60 * 1000;
/// Maximal number of orphaned blocks
const DEFAULT_UNKNOWN_BLOCKS_MAX_LEN: usize = 16;
/// Unknown orphan transaction removal time
const DEFAULT_ORPHAN_TRANSACTION_REMOVAL_TIME_MS: u32 = 10 * 60 * 1000;
/// Maximal number of orphaned transactions
const DEFAULT_ORPHAN_TRANSACTIONS_MAX_LEN: usize = 10000;
/// Peers management configuration
pub struct ManagePeersConfig {
@ -48,6 +54,23 @@ impl Default for ManageUnknownBlocksConfig {
}
}
/// Orphan transactions management configuration
pub struct ManageOrphanTransactionsConfig {
/// Time interval (in milliseconds) to wait before removing orphan transactions from orphan pool
pub removal_time_ms: u32,
/// Maximal # of unknown transactions in the orphan pool
pub max_number: usize,
}
impl Default for ManageOrphanTransactionsConfig {
fn default() -> Self {
ManageOrphanTransactionsConfig {
removal_time_ms: DEFAULT_ORPHAN_TRANSACTION_REMOVAL_TIME_MS,
max_number: DEFAULT_ORPHAN_TRANSACTIONS_MAX_LEN,
}
}
}
/// Manage stalled synchronization peers blocks tasks
pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &mut Peers) -> Option<Vec<H256>> {
let mut blocks_to_request: Vec<H256> = Vec::new();
@ -90,41 +113,84 @@ pub fn manage_synchronization_peers_inventory(config: &ManagePeersConfig, peers:
}
/// Manage unknown orphaned blocks
pub fn manage_unknown_orphaned_blocks(config: &ManageUnknownBlocksConfig, unknown_blocks: &mut LinkedHashMap<H256, f64>) -> Option<Vec<H256>> {
let mut unknown_to_remove: Vec<H256> = Vec::new();
let mut remove_num = if unknown_blocks.len() > config.max_number { unknown_blocks.len() - config.max_number } else { 0 };
let now = precise_time_s();
for (hash, time) in unknown_blocks.iter() {
// remove oldest blocks if there are more unknown blocks that we can hold in memory
if remove_num > 0 {
unknown_to_remove.push(hash.clone());
remove_num -= 1;
continue;
pub fn manage_unknown_orphaned_blocks(config: &ManageUnknownBlocksConfig, orphaned_blocks_pool: &mut OrphanBlocksPool) -> Option<Vec<H256>> {
let unknown_to_remove = {
let unknown_blocks = orphaned_blocks_pool.unknown_blocks();
let mut unknown_to_remove: HashSet<H256> = HashSet::new();
let mut remove_num = if unknown_blocks.len() > config.max_number { unknown_blocks.len() - config.max_number } else { 0 };
let now = precise_time_s();
for (hash, time) in unknown_blocks {
// remove oldest blocks if there are more unknown blocks that we can hold in memory
if remove_num > 0 {
unknown_to_remove.insert(hash.clone());
remove_num -= 1;
continue;
}
// check if block is unknown for too long
let time_diff = now - time;
if time_diff <= config.removal_time_ms as f64 / 1000f64 {
break;
}
unknown_to_remove.insert(hash.clone());
}
// check if block is unknown for too long
let time_diff = now - time;
if time_diff <= config.removal_time_ms as f64 / 1000f64 {
break;
}
unknown_to_remove.push(hash.clone());
}
unknown_to_remove
};
// remove unknown blocks
for unknown_block in unknown_to_remove.iter() {
unknown_blocks.remove(unknown_block);
}
let unknown_to_remove: Vec<H256> = orphaned_blocks_pool.remove_blocks(&unknown_to_remove).into_iter()
.map(|t| t.0)
.collect();
if unknown_to_remove.is_empty() { None } else { Some(unknown_to_remove) }
}
/// Manage orphaned transactions
pub fn manage_orphaned_transactions(config: &ManageOrphanTransactionsConfig, orphaned_transactions_pool: &mut OrphanTransactionsPool) -> Option<Vec<H256>> {
let orphans_to_remove = {
let unknown_transactions = orphaned_transactions_pool.transactions();
let mut orphans_to_remove: Vec<H256> = Vec::new();
let mut remove_num = if unknown_transactions.len() > config.max_number { unknown_transactions.len() - config.max_number } else { 0 };
let now = precise_time_s();
for (hash, orphan_tx) in unknown_transactions {
// remove oldest blocks if there are more unknown blocks that we can hold in memory
if remove_num > 0 {
orphans_to_remove.push(hash.clone());
remove_num -= 1;
continue;
}
// check if block is unknown for too long
let time_diff = now - orphan_tx.insertion_time;
if time_diff <= config.removal_time_ms as f64 / 1000f64 {
break;
}
orphans_to_remove.push(hash.clone());
}
orphans_to_remove
};
// remove unknown blocks
let orphans_to_remove: Vec<H256> = orphaned_transactions_pool.remove_transactions(&orphans_to_remove).into_iter()
.map(|t| t.0)
.collect();
if orphans_to_remove.is_empty() { None } else { Some(orphans_to_remove) }
}
#[cfg(test)]
mod tests {
use time::precise_time_s;
use linked_hash_map::LinkedHashMap;
use super::{ManagePeersConfig, ManageUnknownBlocksConfig, manage_synchronization_peers_blocks, manage_unknown_orphaned_blocks};
use std::collections::HashSet;
use super::{ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig, manage_synchronization_peers_blocks,
manage_unknown_orphaned_blocks, manage_orphaned_transactions};
use chain::RepresentH256;
use synchronization_peers::Peers;
use primitives::hash::H256;
use test_data;
use orphan_blocks_pool::OrphanBlocksPool;
use orphan_transactions_pool::OrphanTransactionsPool;
#[test]
fn manage_good_peer() {
@ -158,10 +224,11 @@ mod tests {
#[test]
fn manage_unknown_blocks_good() {
let config = ManageUnknownBlocksConfig { removal_time_ms: 1000, max_number: 100 };
let mut unknown_blocks: LinkedHashMap<H256, f64> = LinkedHashMap::new();
unknown_blocks.insert(H256::from(0), precise_time_s());
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), None);
assert_eq!(unknown_blocks.len(), 1);
let mut pool = OrphanBlocksPool::new();
let block = test_data::genesis();
pool.insert_unknown_block(block.hash(), block);
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut pool), None);
assert_eq!(pool.len(), 1);
}
#[test]
@ -169,21 +236,70 @@ mod tests {
use std::thread::sleep;
use std::time::Duration;
let config = ManageUnknownBlocksConfig { removal_time_ms: 0, max_number: 100 };
let mut unknown_blocks: LinkedHashMap<H256, f64> = LinkedHashMap::new();
unknown_blocks.insert(H256::from(0), precise_time_s());
let mut pool = OrphanBlocksPool::new();
let block = test_data::genesis();
let block_hash = block.hash();
pool.insert_unknown_block(block_hash.clone(), block);
sleep(Duration::from_millis(1));
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), Some(vec![H256::from(0)]));
assert_eq!(unknown_blocks.len(), 0);
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut pool), Some(vec![block_hash]));
assert_eq!(pool.len(), 0);
}
#[test]
fn manage_unknown_blocks_by_max_number() {
let config = ManageUnknownBlocksConfig { removal_time_ms: 100, max_number: 1 };
let mut unknown_blocks: LinkedHashMap<H256, f64> = LinkedHashMap::new();
unknown_blocks.insert(H256::from(0), precise_time_s());
unknown_blocks.insert(H256::from(1), precise_time_s());
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), Some(vec![H256::from(0)]));
assert_eq!(unknown_blocks.len(), 1);
let mut pool = OrphanBlocksPool::new();
let block1 = test_data::genesis();
let block1_hash = block1.hash();
let block2 = test_data::block_h2();
let block2_hash = block2.hash();
pool.insert_unknown_block(block1_hash.clone(), block1);
pool.insert_unknown_block(block2_hash.clone(), block2);
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut pool), Some(vec![block1_hash]));
assert_eq!(pool.len(), 1);
}
#[test]
fn manage_orphan_transactions_good() {
let config = ManageOrphanTransactionsConfig { removal_time_ms: 1000, max_number: 100 };
let mut pool = OrphanTransactionsPool::new();
let transaction = test_data::block_h170().transactions[1].clone();
let unknown_inputs: HashSet<H256> = transaction.inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
pool.insert(transaction.hash(), transaction, unknown_inputs);
assert_eq!(manage_orphaned_transactions(&config, &mut pool), None);
assert_eq!(pool.len(), 1);
}
#[test]
fn manage_orphan_transactions_by_time() {
use std::thread::sleep;
use std::time::Duration;
let config = ManageOrphanTransactionsConfig { removal_time_ms: 0, max_number: 100 };
let mut pool = OrphanTransactionsPool::new();
let transaction = test_data::block_h170().transactions[1].clone();
let unknown_inputs: HashSet<H256> = transaction.inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let transaction_hash = transaction.hash();
pool.insert(transaction_hash.clone(), transaction, unknown_inputs);
sleep(Duration::from_millis(1));
assert_eq!(manage_orphaned_transactions(&config, &mut pool), Some(vec![transaction_hash]));
assert_eq!(pool.len(), 0);
}
#[test]
fn manage_orphan_transactions_by_max_number() {
let config = ManageOrphanTransactionsConfig { removal_time_ms: 100, max_number: 1 };
let mut pool = OrphanTransactionsPool::new();
let transaction1 = test_data::block_h170().transactions[1].clone();
let unknown_inputs1: HashSet<H256> = transaction1.inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let transaction1_hash = transaction1.hash();
let transaction2 = test_data::block_h182().transactions[1].clone();
let unknown_inputs2: HashSet<H256> = transaction2.inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let transaction2_hash = transaction2.hash();
pool.insert(transaction1_hash.clone(), transaction1, unknown_inputs1);
pool.insert(transaction2_hash.clone(), transaction2, unknown_inputs2);
assert_eq!(manage_orphaned_transactions(&config, &mut pool), Some(vec![transaction1_hash]));
assert_eq!(pool.len(), 1);
}
}

View File

@ -8,7 +8,7 @@ use message::common::{InventoryVector, InventoryType};
use db;
use chain::BlockHeader;
use primitives::hash::H256;
use synchronization_chain::ChainRef;
use synchronization_chain::{ChainRef, TransactionState};
use synchronization_executor::{Task, TaskExecutor};
use message::types;
@ -58,8 +58,7 @@ pub enum ServerTaskIndex {
impl ServerTaskIndex {
pub fn raw(&self) -> u32 {
match *self {
ServerTaskIndex::Partial(id) => id,
ServerTaskIndex::Final(id) => id,
ServerTaskIndex::Partial(id) | ServerTaskIndex::Final(id) => id,
}
}
@ -230,8 +229,7 @@ impl SynchronizationServer {
// `mempool` => `inventory`
ServerTask::ServeMempool => {
let inventory: Vec<_> = chain.read()
.memory_pool()
.get_transactions_ids()
.transactions_hashes_with_state(TransactionState::InMemory)
.into_iter()
.map(|hash| InventoryVector {
inv_type: InventoryType::MessageTx,
@ -282,9 +280,9 @@ impl SynchronizationServer {
// `max_hashes` hashes after best_block.number OR hash_stop OR blockchain end
(first_block_number..last_block_number).into_iter()
.map(|number| chain.block_hash(number))
.take_while(|ref hash| hash.is_some())
.take_while(|hash| hash.is_some())
.map(|hash| hash.unwrap())
.take_while(|ref hash| *hash != hash_stop)
.take_while(|hash| hash != hash_stop)
.collect()
}
@ -300,16 +298,16 @@ impl SynchronizationServer {
// `max_hashes` hashes after best_block.number OR hash_stop OR blockchain end
(first_block_number..last_block_number).into_iter()
.map(|number| chain.block_header_by_number(number))
.take_while(|ref header| header.is_some())
.take_while(|header| header.is_some())
.map(|header| header.unwrap())
.take_while(|ref header| &header.hash() != hash_stop)
.take_while(|header| &header.hash() != hash_stop)
.collect()
}
fn locate_best_known_block_hash(chain: &ChainRef, hash: &H256) -> Option<db::BestBlock> {
let chain = chain.read();
match chain.block_number(&hash) {
match chain.block_number(hash) {
Some(number) => Some(db::BestBlock {
number: number,
hash: hash.clone(),
@ -317,7 +315,7 @@ impl SynchronizationServer {
// block with hash is not in the main chain (block_number has returned None)
// but maybe it is in some fork? if so => we should find intersection with main chain
// and this would be our best common block
None => chain.block_header_by_hash(&hash)
None => chain.block_header_by_hash(hash)
.and_then(|block| {
let mut current_block_hash = block.previous_header_hash;
loop {
@ -368,7 +366,7 @@ impl Server for SynchronizationServer {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
@ -633,7 +631,7 @@ pub mod tests {
#[test]
fn server_getblocks_responds_inventory_when_have_unknown_blocks() {
let (chain, executor, server) = create_synchronization_server();
chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error");
chain.write().insert_best_block(test_data::block_h1().hash(), &test_data::block_h1()).expect("Db write error");
// when asking for blocks hashes
let dummy_id = 0;
server.serve_getblocks(0, types::GetBlocks {
@ -669,7 +667,7 @@ pub mod tests {
#[test]
fn server_getheaders_responds_headers_when_have_unknown_blocks() {
let (chain, executor, server) = create_synchronization_server();
chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error");
chain.write().insert_best_block(test_data::block_h1().hash(), &test_data::block_h1()).expect("Db write error");
// when asking for blocks hashes
let dummy_id = 0;
server.serve_getheaders(0, types::GetHeaders {
@ -702,7 +700,7 @@ pub mod tests {
// when memory pool is non-empty
let transaction = Transaction::default();
let transaction_hash = transaction.hash();
chain.write().memory_pool_mut().insert_verified(transaction);
chain.write().insert_verified_transaction(transaction);
// when asking for memory pool transactions ids
let dummy_id = 0;
server.serve_mempool(0, dummy_id);

View File

@ -14,3 +14,6 @@ dot -Tpng > tools/graph.png tools/graph.dot
# Finally let's bring back old Cargo.toml file
patch Cargo.toml tools/workspace.diff
# Now let's revert Cargo.lock to previous state
cargo update -p pbtc

View File

@ -121,6 +121,7 @@ digraph dependencies {
N3 -> N46[label="",style=dashed];
N3 -> N47[label="",style=dashed];
N4 -> N2[label="",style=dashed];
N4 -> N8[label="",style=dashed];
N4 -> N30[label="",style=dashed];
N4 -> N32[label="",style=dashed];
N4 -> N36[label="",style=dashed];
@ -181,6 +182,7 @@ digraph dependencies {
N13 -> N14[label="",style=dashed];
N13 -> N16[label="",style=dashed];
N13 -> N32[label="",style=dashed];
N13 -> N51[label="",style=dashed];
N13 -> N52[label="",style=dashed];
N13 -> N54[label="",style=dashed];
N13 -> N57[label="",style=dashed];
@ -189,6 +191,7 @@ digraph dependencies {
N13 -> N80[label="",style=dashed];
N14 -> N2[label="",style=dashed];
N14 -> N4[label="",style=dashed];
N14 -> N8[label="",style=dashed];
N14 -> N12[label="",style=dashed];
N14 -> N32[label="",style=dashed];
N14 -> N36[label="",style=dashed];
@ -229,6 +232,7 @@ digraph dependencies {
N43 -> N65[label="",style=dashed];
N49 -> N36[label="",style=dashed];
N49 -> N39[label="",style=dashed];
N51 -> N8[label="",style=dashed];
N51 -> N18[label="",style=dashed];
N52 -> N79[label="",style=dashed];
N52 -> N81[label="",style=dashed];

Binary file not shown.

Before

Width:  |  Height:  |  Size: 848 KiB

After

Width:  |  Height:  |  Size: 858 KiB

View File

@ -157,7 +157,7 @@ impl Queue {
items.push_front(hash, ScheduleItem::Continued(item.block(), num));
},
Err(e) => {
trace!(target: "verification", "Verification of block {} failed: {:?}", &hash, e);
trace!(target: "verification", "Verification of block {} failed: {:?}", hash.to_reversed_str(), e);
let mut invalid = self.invalid.write();
let mut processing = self.processing.write();

View File

@ -50,14 +50,14 @@ pub fn age(protocol_time: u32) -> i64 {
pub fn block_reward_satoshi(block_height: u32) -> u64 {
let mut res = 50 * 100 * 1000 * 1000;
for _ in 0..block_height / 210000 { res = res / 2 }
for _ in 0..block_height / 210000 { res /= 2 }
res
}
pub fn transaction_sigops(transaction: &chain::Transaction) -> Result<usize, script::Error> {
let mut result = 0usize;
for output in transaction.outputs.iter() {
for output in &transaction.outputs {
let output_script: Script = output.script_pubkey.to_vec().into();
// todo: not always allow malformed output?
result += output_script.sigop_count(false).unwrap_or(0);
@ -65,7 +65,7 @@ pub fn transaction_sigops(transaction: &chain::Transaction) -> Result<usize, scr
if transaction.is_coinbase() { return Ok(result); }
for input in transaction.inputs.iter() {
for input in &transaction.inputs {
let input_script: Script = input.script_sig().to_vec().into();
result += try!(input_script.sigop_count(false));
}