Merge pull request #267 from ethcore/sync_sendrawtransaction
`sendrawtransaction` RPC method implemented
This commit is contained in:
commit
db0e4f3bd1
|
@ -783,14 +783,23 @@ dependencies = [
|
||||||
name = "rpc"
|
name = "rpc"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"chain 0.1.0",
|
||||||
|
"db 0.1.0",
|
||||||
"jsonrpc-core 4.0.0 (git+https://github.com/ethcore/jsonrpc.git)",
|
"jsonrpc-core 4.0.0 (git+https://github.com/ethcore/jsonrpc.git)",
|
||||||
"jsonrpc-http-server 6.1.1 (git+https://github.com/ethcore/jsonrpc.git)",
|
"jsonrpc-http-server 6.1.1 (git+https://github.com/ethcore/jsonrpc.git)",
|
||||||
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"network 0.1.0",
|
||||||
|
"p2p 0.1.0",
|
||||||
|
"primitives 0.1.0",
|
||||||
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"serde 0.8.19 (registry+https://github.com/rust-lang/crates.io-index)",
|
"serde 0.8.19 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"serde_codegen 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
"serde_codegen 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"serde_json 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
"serde_json 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"serde_macros 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
"serde_macros 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"serialization 0.1.0",
|
||||||
|
"sync 0.1.0",
|
||||||
|
"test-data 0.1.0",
|
||||||
|
"tokio-core 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use sync::create_sync_connection_factory;
|
use sync::{create_local_sync_node, create_sync_connection_factory};
|
||||||
use message::Services;
|
use message::Services;
|
||||||
use util::{open_db, init_db, node_table_path};
|
use util::{open_db, init_db, node_table_path};
|
||||||
use {config, p2p, PROTOCOL_VERSION, PROTOCOL_MINIMUM};
|
use {config, p2p, PROTOCOL_VERSION, PROTOCOL_MINIMUM};
|
||||||
|
@ -34,9 +34,13 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
|
||||||
};
|
};
|
||||||
|
|
||||||
let sync_handle = el.handle();
|
let sync_handle = el.handle();
|
||||||
let sync_connection_factory = create_sync_connection_factory(&sync_handle, cfg.magic, db);
|
let local_sync_node = create_local_sync_node(&sync_handle, cfg.magic, db);
|
||||||
|
let sync_connection_factory = create_sync_connection_factory(local_sync_node.clone());
|
||||||
|
|
||||||
let _http_server = try!(rpc::new_http(cfg.rpc_config));
|
let rpc_deps = rpc::Dependencies {
|
||||||
|
local_sync_node: local_sync_node,
|
||||||
|
};
|
||||||
|
let _rpc_server = try!(rpc::new_http(cfg.rpc_config, rpc_deps));
|
||||||
|
|
||||||
let p2p = try!(p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle()).map_err(|x| x.to_string()));
|
let p2p = try!(p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle()).map_err(|x| x.to_string()));
|
||||||
try!(p2p.run().map_err(|_| "Failed to start p2p module"));
|
try!(p2p.run().map_err(|_| "Failed to start p2p module"));
|
||||||
|
|
18
pbtc/rpc.rs
18
pbtc/rpc.rs
|
@ -2,6 +2,11 @@ use std::net::SocketAddr;
|
||||||
use rpc_apis::{self, ApiSet};
|
use rpc_apis::{self, ApiSet};
|
||||||
use ethcore_rpc::{Server, RpcServer, RpcServerError};
|
use ethcore_rpc::{Server, RpcServer, RpcServerError};
|
||||||
use std::io;
|
use std::io;
|
||||||
|
use sync;
|
||||||
|
|
||||||
|
pub struct Dependencies {
|
||||||
|
pub local_sync_node: sync::LocalNodeRef,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
pub struct HttpConfiguration {
|
pub struct HttpConfiguration {
|
||||||
|
@ -26,23 +31,24 @@ impl HttpConfiguration {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_http(conf: HttpConfiguration) -> Result<Option<Server>, String> {
|
pub fn new_http(conf: HttpConfiguration, deps: Dependencies) -> Result<Option<Server>, String> {
|
||||||
if !conf.enabled {
|
if !conf.enabled {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let url = format!("{}:{}", conf.interface, conf.port);
|
let url = format!("{}:{}", conf.interface, conf.port);
|
||||||
let addr = try!(url.parse().map_err(|_| format!("Invalid JSONRPC listen host/port given: {}", url)));
|
let addr = try!(url.parse().map_err(|_| format!("Invalid JSONRPC listen host/port given: {}", url)));
|
||||||
Ok(Some(try!(setup_http_rpc_server(&addr, conf.cors, conf.hosts, conf.apis))))
|
Ok(Some(try!(setup_http_rpc_server(&addr, conf.cors, conf.hosts, conf.apis, deps))))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setup_http_rpc_server(
|
pub fn setup_http_rpc_server(
|
||||||
url: &SocketAddr,
|
url: &SocketAddr,
|
||||||
cors_domains: Option<Vec<String>>,
|
cors_domains: Option<Vec<String>>,
|
||||||
allowed_hosts: Option<Vec<String>>,
|
allowed_hosts: Option<Vec<String>>,
|
||||||
apis: ApiSet
|
apis: ApiSet,
|
||||||
|
deps: Dependencies,
|
||||||
) -> Result<Server, String> {
|
) -> Result<Server, String> {
|
||||||
let server = try!(setup_rpc_server(apis));
|
let server = try!(setup_rpc_server(apis, deps));
|
||||||
// TODO: PanicsHandler
|
// TODO: PanicsHandler
|
||||||
let start_result = server.start_http(url, cors_domains, allowed_hosts);
|
let start_result = server.start_http(url, cors_domains, allowed_hosts);
|
||||||
match start_result {
|
match start_result {
|
||||||
|
@ -55,7 +61,7 @@ pub fn setup_http_rpc_server(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn setup_rpc_server(apis: ApiSet) -> Result<RpcServer, String> {
|
fn setup_rpc_server(apis: ApiSet, deps: Dependencies) -> Result<RpcServer, String> {
|
||||||
let server = RpcServer::new();
|
let server = RpcServer::new();
|
||||||
Ok(rpc_apis::setup_rpc(server, apis))
|
Ok(rpc_apis::setup_rpc(server, apis, deps))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
use rpc::Dependencies;
|
||||||
use ethcore_rpc::Extendable;
|
use ethcore_rpc::Extendable;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
|
#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
|
||||||
|
@ -38,12 +39,12 @@ impl ApiSet {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setup_rpc<T: Extendable>(server: T, apis: ApiSet) -> T {
|
pub fn setup_rpc<T: Extendable>(server: T, apis: ApiSet, deps: Dependencies) -> T {
|
||||||
use ethcore_rpc::v1::*;
|
use ethcore_rpc::v1::*;
|
||||||
|
|
||||||
for api in apis.list_apis() {
|
for api in apis.list_apis() {
|
||||||
match api {
|
match api {
|
||||||
Api::Raw => server.add_delegate(RawClient::new().to_delegate()),
|
Api::Raw => server.add_delegate(RawClient::new(RawClientCore::new(deps.local_sync_node.clone())).to_delegate()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
server
|
server
|
||||||
|
|
|
@ -124,6 +124,10 @@ macro_rules! impl_hash {
|
||||||
impl Eq for $name { }
|
impl Eq for $name { }
|
||||||
|
|
||||||
impl $name {
|
impl $name {
|
||||||
|
pub fn take(self) -> [u8; $size] {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
|
||||||
pub fn reversed(&self) -> Self {
|
pub fn reversed(&self) -> Self {
|
||||||
let mut result = self.clone();
|
let mut result = self.clone();
|
||||||
result.reverse();
|
result.reverse();
|
||||||
|
|
|
@ -11,9 +11,18 @@ log = "0.3"
|
||||||
serde = "0.8"
|
serde = "0.8"
|
||||||
serde_json = "0.8"
|
serde_json = "0.8"
|
||||||
rustc-serialize = "0.3"
|
rustc-serialize = "0.3"
|
||||||
|
tokio-core = "0.1.1"
|
||||||
serde_macros = { version = "0.8.0", optional = true }
|
serde_macros = { version = "0.8.0", optional = true }
|
||||||
jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git" }
|
jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git" }
|
||||||
jsonrpc-http-server = { git = "https://github.com/ethcore/jsonrpc.git" }
|
jsonrpc-http-server = { git = "https://github.com/ethcore/jsonrpc.git" }
|
||||||
|
sync = { path = "../sync" }
|
||||||
|
serialization = { path = "../serialization" }
|
||||||
|
chain = { path = "../chain" }
|
||||||
|
primitives = { path = "../primitives" }
|
||||||
|
p2p = { path = "../p2p" }
|
||||||
|
network = { path = "../network" }
|
||||||
|
db = { path = "../db" }
|
||||||
|
test-data = { path = "../test-data" }
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
serde_codegen = { version = "0.8.0", optional = true }
|
serde_codegen = { version = "0.8.0", optional = true }
|
||||||
|
|
|
@ -7,6 +7,15 @@ extern crate serde;
|
||||||
extern crate serde_json;
|
extern crate serde_json;
|
||||||
extern crate jsonrpc_core;
|
extern crate jsonrpc_core;
|
||||||
extern crate jsonrpc_http_server;
|
extern crate jsonrpc_http_server;
|
||||||
|
extern crate tokio_core;
|
||||||
|
extern crate sync;
|
||||||
|
extern crate chain;
|
||||||
|
extern crate serialization as ser;
|
||||||
|
extern crate primitives;
|
||||||
|
extern crate p2p;
|
||||||
|
extern crate network;
|
||||||
|
extern crate db;
|
||||||
|
extern crate test_data;
|
||||||
|
|
||||||
pub mod v1;
|
pub mod v1;
|
||||||
pub mod rpc_server;
|
pub mod rpc_server;
|
||||||
|
|
|
@ -1,5 +1,11 @@
|
||||||
///! RPC Error codes and error objects
|
///! RPC Error codes and error objects
|
||||||
|
|
||||||
|
mod codes {
|
||||||
|
// NOTE [ToDr] Codes from [-32099, -32000]
|
||||||
|
pub const EXECUTION_ERROR: i64 = -32015;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
macro_rules! rpc_unimplemented {
|
macro_rules! rpc_unimplemented {
|
||||||
() => (Err(::v1::helpers::errors::unimplemented(None)))
|
() => (Err(::v1::helpers::errors::unimplemented(None)))
|
||||||
}
|
}
|
||||||
|
@ -22,3 +28,11 @@ pub fn invalid_params<T: fmt::Debug>(param: &str, details: T) -> Error {
|
||||||
data: Some(Value::String(format!("{:?}", details))),
|
data: Some(Value::String(format!("{:?}", details))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn execution<T: fmt::Debug>(data: T) -> Error {
|
||||||
|
Error {
|
||||||
|
code: ErrorCode::ServerError(codes::EXECUTION_ERROR),
|
||||||
|
message: "Execution error.".into(),
|
||||||
|
data: Some(Value::String(format!("{:?}", data))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
mod raw;
|
mod raw;
|
||||||
|
|
||||||
pub use self::raw::RawClient;
|
pub use self::raw::{RawClient, RawClientCore};
|
||||||
|
|
|
@ -1,17 +1,117 @@
|
||||||
use v1::traits::Raw;
|
use v1::traits::Raw;
|
||||||
use v1::types::RawTransaction;
|
use v1::types::RawTransaction;
|
||||||
|
use v1::types::H256;
|
||||||
|
use v1::helpers::errors::{execution, invalid_params};
|
||||||
use jsonrpc_core::Error;
|
use jsonrpc_core::Error;
|
||||||
|
use chain::Transaction;
|
||||||
|
use sync;
|
||||||
|
use ser::{Reader, deserialize};
|
||||||
|
use primitives::hash::H256 as GlobalH256;
|
||||||
|
|
||||||
pub struct RawClient;
|
pub struct RawClient<T: RawClientCoreApi> {
|
||||||
|
core: T,
|
||||||
|
}
|
||||||
|
|
||||||
impl RawClient {
|
pub trait RawClientCoreApi: Send + Sync + 'static {
|
||||||
pub fn new() -> Self {
|
fn accept_transaction(&self, transaction: Transaction) -> Result<GlobalH256, String>;
|
||||||
RawClient { }
|
}
|
||||||
|
|
||||||
|
pub struct RawClientCore {
|
||||||
|
local_sync_node: sync::LocalNodeRef,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RawClientCore {
|
||||||
|
pub fn new(local_sync_node: sync::LocalNodeRef) -> Self {
|
||||||
|
RawClientCore {
|
||||||
|
local_sync_node: local_sync_node,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Raw for RawClient {
|
impl RawClientCoreApi for RawClientCore {
|
||||||
fn send_raw_transaction(&self, _tx: RawTransaction) -> Result<(), Error> {
|
fn accept_transaction(&self, transaction: Transaction) -> Result<GlobalH256, String> {
|
||||||
rpc_unimplemented!()
|
self.local_sync_node.accept_transaction(transaction)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> RawClient<T> where T: RawClientCoreApi {
|
||||||
|
pub fn new(core: T) -> Self {
|
||||||
|
RawClient {
|
||||||
|
core: core,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Raw for RawClient<T> where T: RawClientCoreApi {
|
||||||
|
fn send_raw_transaction(&self, raw_transaction: RawTransaction) -> Result<H256, Error> {
|
||||||
|
let raw_transaction_data: Vec<u8> = raw_transaction.into();
|
||||||
|
let transaction = try!(deserialize(Reader::new(&raw_transaction_data)).map_err(|e| invalid_params("tx", e)));
|
||||||
|
self.core.accept_transaction(transaction)
|
||||||
|
.map(|h| h.reversed().into())
|
||||||
|
.map_err(|e| execution(e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub mod tests {
|
||||||
|
use jsonrpc_core::{IoHandler, GenericIoHandler};
|
||||||
|
use chain::Transaction;
|
||||||
|
use primitives::hash::H256 as GlobalH256;
|
||||||
|
use v1::traits::Raw;
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct SuccessRawClientCore;
|
||||||
|
#[derive(Default)]
|
||||||
|
struct ErrorRawClientCore;
|
||||||
|
|
||||||
|
impl RawClientCoreApi for SuccessRawClientCore {
|
||||||
|
fn accept_transaction(&self, transaction: Transaction) -> Result<GlobalH256, String> {
|
||||||
|
Ok(transaction.hash())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RawClientCoreApi for ErrorRawClientCore {
|
||||||
|
fn accept_transaction(&self, _transaction: Transaction) -> Result<GlobalH256, String> {
|
||||||
|
Err("error".to_owned())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn sendrawtransaction_accepted() {
|
||||||
|
let client = RawClient::new(SuccessRawClientCore::default());
|
||||||
|
let handler = IoHandler::new();
|
||||||
|
handler.add_delegate(client.to_delegate());
|
||||||
|
|
||||||
|
let sample = handler.handle_request_sync(&(r#"
|
||||||
|
{
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"method": "sendrawtransaction",
|
||||||
|
"params": ["00000000013ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a0000000000000000000101000000000000000000000000"],
|
||||||
|
"id": 1
|
||||||
|
}"#)
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
// direct hash is 0791efccd035c5fe501023ff888106eba5eff533965de4a6e06400f623bcac34
|
||||||
|
// but client expects reverse hash
|
||||||
|
assert_eq!(r#"{"jsonrpc":"2.0","result":"34acbc23f60064e0a6e45d9633f5efa5eb068188ff231050fec535d0ccef9107","id":1}"#, &sample);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn sendrawtransaction_rejected() {
|
||||||
|
let client = RawClient::new(ErrorRawClientCore::default());
|
||||||
|
let handler = IoHandler::new();
|
||||||
|
handler.add_delegate(client.to_delegate());
|
||||||
|
|
||||||
|
let sample = handler.handle_request_sync(&(r#"
|
||||||
|
{
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"method": "sendrawtransaction",
|
||||||
|
"params": ["00000000013ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a0000000000000000000101000000000000000000000000"],
|
||||||
|
"id": 1
|
||||||
|
}"#)
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(r#"{"jsonrpc":"2.0","error":{"code":-32015,"message":"Execution error.","data":"\"error\""},"id":1}"#, &sample);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,4 +5,4 @@ pub mod traits;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
|
|
||||||
pub use self::traits::Raw;
|
pub use self::traits::Raw;
|
||||||
pub use self::impls::RawClient;
|
pub use self::impls::{RawClient, RawClientCore};
|
||||||
|
|
|
@ -2,12 +2,13 @@ use jsonrpc_core::Error;
|
||||||
|
|
||||||
use v1::helpers::auto_args::Wrap;
|
use v1::helpers::auto_args::Wrap;
|
||||||
use v1::types::RawTransaction;
|
use v1::types::RawTransaction;
|
||||||
|
use v1::types::H256;
|
||||||
|
|
||||||
build_rpc_trait! {
|
build_rpc_trait! {
|
||||||
/// Partiy-bitcoin raw data interface.
|
/// Partiy-bitcoin raw data interface.
|
||||||
pub trait Raw {
|
pub trait Raw {
|
||||||
/// Adds transaction to the memory pool && relays it to the peers.
|
/// Adds transaction to the memory pool && relays it to the peers.
|
||||||
#[rpc(name = "sendrawtransaction")]
|
#[rpc(name = "sendrawtransaction")]
|
||||||
fn send_raw_transaction(&self, RawTransaction) -> Result<(), Error>;
|
fn send_raw_transaction(&self, RawTransaction) -> Result<H256, Error>;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,168 @@
|
||||||
|
use std::fmt;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use std::cmp::Ordering;
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
|
use serde;
|
||||||
|
use rustc_serialize::hex::{ToHex, FromHex};
|
||||||
|
use primitives::hash::H256 as GlobalH256;
|
||||||
|
|
||||||
|
macro_rules! impl_hash {
|
||||||
|
($name: ident, $other: ident, $size: expr) => {
|
||||||
|
/// Hash serialization
|
||||||
|
#[derive(Eq)]
|
||||||
|
pub struct $name([u8; $size]);
|
||||||
|
|
||||||
|
impl Default for $name {
|
||||||
|
fn default() -> Self {
|
||||||
|
$name([0; $size])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for $name {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
|
write!(f, "{}", $other::from(self.0.clone()).to_hex())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> From<T> for $name where $other: From<T> {
|
||||||
|
fn from(o: T) -> Self {
|
||||||
|
$name($other::from(o).take())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromStr for $name {
|
||||||
|
type Err = <$other as FromStr>::Err;
|
||||||
|
|
||||||
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||||
|
let other = try!($other::from_str(s));
|
||||||
|
Ok($name(other.take()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Into<$other> for $name {
|
||||||
|
fn into(self) -> $other {
|
||||||
|
$other::from(self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for $name {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
let self_ref: &[u8] = &self.0;
|
||||||
|
let other_ref: &[u8] = &other.0;
|
||||||
|
self_ref == other_ref
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialOrd for $name {
|
||||||
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||||
|
let self_ref: &[u8] = &self.0;
|
||||||
|
let other_ref: &[u8] = &other.0;
|
||||||
|
self_ref.partial_cmp(other_ref)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ord for $name {
|
||||||
|
fn cmp(&self, other: &Self) -> Ordering {
|
||||||
|
let self_ref: &[u8] = &self.0;
|
||||||
|
let other_ref: &[u8] = &other.0;
|
||||||
|
self_ref.cmp(other_ref)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Hash for $name {
|
||||||
|
fn hash<H>(&self, state: &mut H) where H: Hasher {
|
||||||
|
$other::from(self.0.clone()).hash(state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for $name {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
let mut r = [0; $size];
|
||||||
|
r.copy_from_slice(&self.0);
|
||||||
|
$name(r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl serde::Serialize for $name {
|
||||||
|
fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error>
|
||||||
|
where S: serde::Serializer {
|
||||||
|
let mut hex = String::new();
|
||||||
|
hex.push_str(&$other::from(self.0.clone()).to_hex());
|
||||||
|
serializer.serialize_str(&hex)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl serde::Deserialize for $name {
|
||||||
|
fn deserialize<D>(deserializer: &mut D) -> Result<$name, D::Error> where D: serde::Deserializer {
|
||||||
|
struct HashVisitor;
|
||||||
|
|
||||||
|
impl serde::de::Visitor for HashVisitor {
|
||||||
|
type Value = $name;
|
||||||
|
|
||||||
|
fn visit_str<E>(&mut self, value: &str) -> Result<Self::Value, E> where E: serde::Error {
|
||||||
|
|
||||||
|
if value.len() != $size * 2 {
|
||||||
|
return Err(serde::Error::custom("Invalid length."));
|
||||||
|
}
|
||||||
|
|
||||||
|
match value[..].from_hex() {
|
||||||
|
Ok(ref v) => {
|
||||||
|
let mut result = [0u8; $size];
|
||||||
|
result.copy_from_slice(v);
|
||||||
|
Ok($name($other::from(result).take()))
|
||||||
|
},
|
||||||
|
_ => Err(serde::Error::custom("Invalid hex value."))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn visit_string<E>(&mut self, value: String) -> Result<Self::Value, E> where E: serde::Error {
|
||||||
|
self.visit_str(value.as_ref())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
deserializer.deserialize(HashVisitor)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl_hash!(H256, GlobalH256, 32);
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::H256;
|
||||||
|
use primitives::hash::H256 as GlobalH256;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn hash_debug() {
|
||||||
|
let str_reversed = "00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048";
|
||||||
|
let reversed_hash = H256::from(str_reversed);
|
||||||
|
let debug_result = format!("{:?}", reversed_hash);
|
||||||
|
assert_eq!(debug_result, str_reversed);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn hash_from_str() {
|
||||||
|
let str_reversed = "00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048";
|
||||||
|
match H256::from_str(str_reversed) {
|
||||||
|
Ok(reversed_hash) => assert_eq!(format!("{:?}", reversed_hash), str_reversed),
|
||||||
|
_ => panic!("unexpected"),
|
||||||
|
}
|
||||||
|
|
||||||
|
let str_reversed = "XXXYYY";
|
||||||
|
match H256::from_str(str_reversed) {
|
||||||
|
Ok(_) => panic!("unexpected"),
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn hash_to_global_hash() {
|
||||||
|
let str_reversed = "00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048";
|
||||||
|
let reversed_hash = H256::from(str_reversed);
|
||||||
|
let global_hash = GlobalH256::from(str_reversed);
|
||||||
|
let global_converted: GlobalH256 = reversed_hash.into();
|
||||||
|
assert_eq!(global_converted, global_hash);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,7 @@
|
||||||
mod bytes;
|
mod bytes;
|
||||||
|
mod hash;
|
||||||
mod raw_transaction;
|
mod raw_transaction;
|
||||||
|
|
||||||
|
pub use self::bytes::Bytes;
|
||||||
|
pub use self::hash::H256;
|
||||||
pub use self::raw_transaction::RawTransaction;
|
pub use self::raw_transaction::RawTransaction;
|
||||||
|
|
|
@ -5,7 +5,8 @@ use chain;
|
||||||
use db;
|
use db;
|
||||||
use network::Magic;
|
use network::Magic;
|
||||||
use orphan_blocks_pool::OrphanBlocksPool;
|
use orphan_blocks_pool::OrphanBlocksPool;
|
||||||
use synchronization_verifier::{Verifier, SyncVerifier, VerificationSink, VerificationTask};
|
use synchronization_verifier::{Verifier, SyncVerifier, VerificationTask,
|
||||||
|
VerificationSink, BlockVerificationSink, TransactionVerificationSink};
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
use super::Error;
|
use super::Error;
|
||||||
|
|
||||||
|
@ -15,23 +16,28 @@ pub struct BlocksWriter {
|
||||||
storage: db::SharedStore,
|
storage: db::SharedStore,
|
||||||
orphaned_blocks_pool: OrphanBlocksPool,
|
orphaned_blocks_pool: OrphanBlocksPool,
|
||||||
verifier: SyncVerifier<BlocksWriterSink>,
|
verifier: SyncVerifier<BlocksWriterSink>,
|
||||||
sink: Arc<Mutex<BlocksWriterSink>>,
|
sink: Arc<BlocksWriterSinkData>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct BlocksWriterSink {
|
struct BlocksWriterSink {
|
||||||
|
data: Arc<BlocksWriterSinkData>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct BlocksWriterSinkData {
|
||||||
storage: db::SharedStore,
|
storage: db::SharedStore,
|
||||||
err: Option<Error>,
|
err: Mutex<Option<Error>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlocksWriter {
|
impl BlocksWriter {
|
||||||
pub fn new(storage: db::SharedStore, network: Magic) -> BlocksWriter {
|
pub fn new(storage: db::SharedStore, network: Magic) -> BlocksWriter {
|
||||||
let sink = Arc::new(Mutex::new(BlocksWriterSink::new(storage.clone())));
|
let sink_data = Arc::new(BlocksWriterSinkData::new(storage.clone()));
|
||||||
let verifier = SyncVerifier::new(network, storage.clone(), sink.clone());
|
let sink = Arc::new(BlocksWriterSink::new(sink_data.clone()));
|
||||||
|
let verifier = SyncVerifier::new(network, storage.clone(), sink);
|
||||||
BlocksWriter {
|
BlocksWriter {
|
||||||
storage: storage,
|
storage: storage,
|
||||||
orphaned_blocks_pool: OrphanBlocksPool::new(),
|
orphaned_blocks_pool: OrphanBlocksPool::new(),
|
||||||
verifier: verifier,
|
verifier: verifier,
|
||||||
sink: sink,
|
sink: sink_data,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,7 +62,8 @@ impl BlocksWriter {
|
||||||
verification_queue.push_front(indexed_block);
|
verification_queue.push_front(indexed_block);
|
||||||
while let Some(block) = verification_queue.pop_front() {
|
while let Some(block) = verification_queue.pop_front() {
|
||||||
self.verifier.verify_block(block);
|
self.verifier.verify_block(block);
|
||||||
if let Some(err) = self.sink.lock().error() {
|
|
||||||
|
if let Some(err) = self.sink.error() {
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -66,35 +73,48 @@ impl BlocksWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlocksWriterSink {
|
impl BlocksWriterSink {
|
||||||
pub fn new(storage: db::SharedStore) -> Self {
|
pub fn new(data: Arc<BlocksWriterSinkData>) -> Self {
|
||||||
BlocksWriterSink {
|
BlocksWriterSink {
|
||||||
storage: storage,
|
data: data,
|
||||||
err: None,
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn error(&mut self) -> Option<Error> {
|
impl BlocksWriterSinkData {
|
||||||
self.err.take()
|
pub fn new(storage: db::SharedStore) -> Self {
|
||||||
|
BlocksWriterSinkData {
|
||||||
|
storage: storage,
|
||||||
|
err: Mutex::new(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn error(&self) -> Option<Error> {
|
||||||
|
self.err.lock().take()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl VerificationSink for BlocksWriterSink {
|
impl VerificationSink for BlocksWriterSink {
|
||||||
fn on_block_verification_success(&mut self, block: db::IndexedBlock) -> Option<Vec<VerificationTask>> {
|
}
|
||||||
if let Err(err) = self.storage.insert_indexed_block(&block) {
|
|
||||||
self.err = Some(Error::Database(err));
|
impl BlockVerificationSink for BlocksWriterSink {
|
||||||
|
fn on_block_verification_success(&self, block: db::IndexedBlock) -> Option<Vec<VerificationTask>> {
|
||||||
|
if let Err(err) = self.data.storage.insert_indexed_block(&block) {
|
||||||
|
*self.data.err.lock() = Some(Error::Database(err));
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_block_verification_error(&mut self, err: &str, _hash: &H256) {
|
fn on_block_verification_error(&self, err: &str, _hash: &H256) {
|
||||||
self.err = Some(Error::Verification(err.into()));
|
*self.data.err.lock() = Some(Error::Verification(err.into()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_transaction_verification_success(&mut self, _transaction: chain::Transaction) {
|
impl TransactionVerificationSink for BlocksWriterSink {
|
||||||
|
fn on_transaction_verification_success(&self, _transaction: chain::Transaction) {
|
||||||
unreachable!("not intended to verify transactions")
|
unreachable!("not intended to verify transactions")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_transaction_verification_error(&mut self, _err: &str, _hash: &H256) {
|
fn on_transaction_verification_error(&self, _err: &str, _hash: &H256) {
|
||||||
unreachable!("not intended to verify transactions")
|
unreachable!("not intended to verify transactions")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,8 @@ mod synchronization_peers;
|
||||||
mod synchronization_server;
|
mod synchronization_server;
|
||||||
mod synchronization_verifier;
|
mod synchronization_verifier;
|
||||||
|
|
||||||
|
pub use local_node::LocalNodeRef;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use tokio_core::reactor::Handle;
|
use tokio_core::reactor::Handle;
|
||||||
|
@ -65,14 +67,13 @@ pub fn create_sync_blocks_writer(db: db::SharedStore, network: Magic) -> blocks_
|
||||||
blocks_writer::BlocksWriter::new(db, network)
|
blocks_writer::BlocksWriter::new(db, network)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create inbound synchronization connections factory for given `db`.
|
/// Creates local sync node for given `db`
|
||||||
pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::SharedStore) -> p2p::LocalSyncNodeRef {
|
pub fn create_local_sync_node(handle: &Handle, network: Magic, db: db::SharedStore) -> LocalNodeRef {
|
||||||
use synchronization_chain::Chain as SyncChain;
|
use synchronization_chain::Chain as SyncChain;
|
||||||
use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor;
|
use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor;
|
||||||
use local_node::LocalNode as SyncNode;
|
use local_node::LocalNode as SyncNode;
|
||||||
use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory;
|
|
||||||
use synchronization_server::SynchronizationServer;
|
use synchronization_server::SynchronizationServer;
|
||||||
use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig};
|
use synchronization_client::{SynchronizationClient, SynchronizationClientCore, CoreVerificationSink, Config as SynchronizationConfig};
|
||||||
use synchronization_verifier::AsyncVerifier;
|
use synchronization_verifier::AsyncVerifier;
|
||||||
|
|
||||||
let sync_client_config = SynchronizationConfig {
|
let sync_client_config = SynchronizationConfig {
|
||||||
|
@ -87,8 +88,15 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S
|
||||||
let sync_executor = SyncExecutor::new(sync_chain.clone());
|
let sync_executor = SyncExecutor::new(sync_chain.clone());
|
||||||
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
|
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
|
||||||
let sync_client_core = SynchronizationClientCore::new(sync_client_config, handle, sync_executor.clone(), sync_chain.clone(), chain_verifier.clone());
|
let sync_client_core = SynchronizationClientCore::new(sync_client_config, handle, sync_executor.clone(), sync_chain.clone(), chain_verifier.clone());
|
||||||
let verifier = AsyncVerifier::new(chain_verifier, sync_chain, sync_client_core.clone());
|
let verifier_sink = Arc::new(CoreVerificationSink::new(sync_client_core.clone()));
|
||||||
|
let verifier = AsyncVerifier::new(chain_verifier, sync_chain, verifier_sink);
|
||||||
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
|
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
|
||||||
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
|
Arc::new(SyncNode::new(sync_server, sync_client, sync_executor))
|
||||||
SyncConnectionFactory::with_local_node(sync_node)
|
}
|
||||||
|
|
||||||
|
/// Create inbound synchronization connections factory for given local sync node.
|
||||||
|
pub fn create_sync_connection_factory(local_sync_node: LocalNodeRef) -> p2p::LocalSyncNodeRef {
|
||||||
|
use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory;
|
||||||
|
|
||||||
|
SyncConnectionFactory::with_local_node(local_sync_node)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,14 +1,15 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::{Mutex, Condvar};
|
||||||
use db;
|
use db;
|
||||||
|
use chain::Transaction;
|
||||||
use p2p::OutboundSyncConnectionRef;
|
use p2p::OutboundSyncConnectionRef;
|
||||||
use message::common::{InventoryType, InventoryVector};
|
use message::common::{InventoryType, InventoryVector};
|
||||||
use message::types;
|
use message::types;
|
||||||
use synchronization_client::{Client, SynchronizationClient, BlockAnnouncementType};
|
use synchronization_client::{Client, SynchronizationClient, BlockAnnouncementType};
|
||||||
use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor};
|
use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor};
|
||||||
use synchronization_server::{Server, SynchronizationServer};
|
use synchronization_server::{Server, SynchronizationServer};
|
||||||
use synchronization_verifier::AsyncVerifier;
|
use synchronization_verifier::{AsyncVerifier, TransactionVerificationSink};
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
|
|
||||||
// TODO: check messages before processing (filterload' filter is max 36000, nHashFunc is <= 50, etc)
|
// TODO: check messages before processing (filterload' filter is max 36000, nHashFunc is <= 50, etc)
|
||||||
|
@ -35,6 +36,17 @@ pub trait PeersConnections {
|
||||||
fn remove_peer_connection(&mut self, peer_index: usize);
|
fn remove_peer_connection(&mut self, peer_index: usize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Transaction accept verification sink
|
||||||
|
struct TransactionAcceptSink {
|
||||||
|
data: Arc<TransactionAcceptSinkData>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct TransactionAcceptSinkData {
|
||||||
|
result: Mutex<Option<Result<H256, String>>>,
|
||||||
|
waiter: Condvar,
|
||||||
|
}
|
||||||
|
|
||||||
impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersConnections,
|
impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersConnections,
|
||||||
U: Server,
|
U: Server,
|
||||||
V: Client {
|
V: Client {
|
||||||
|
@ -238,6 +250,18 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
||||||
self.client.lock().on_peer_blocks_notfound(peer_index, blocks_inventory);
|
self.client.lock().on_peer_blocks_notfound(peer_index, blocks_inventory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn accept_transaction(&self, transaction: Transaction) -> Result<H256, String> {
|
||||||
|
let sink_data = Arc::new(TransactionAcceptSinkData::default());
|
||||||
|
let sink = TransactionAcceptSink::new(sink_data.clone()).boxed();
|
||||||
|
{
|
||||||
|
let mut client = self.client.lock();
|
||||||
|
if let Err(err) = client.accept_transaction(transaction, sink) {
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sink_data.wait()
|
||||||
|
}
|
||||||
|
|
||||||
fn transactions_inventory(&self, inventory: &[InventoryVector]) -> Vec<H256> {
|
fn transactions_inventory(&self, inventory: &[InventoryVector]) -> Vec<H256> {
|
||||||
inventory.iter()
|
inventory.iter()
|
||||||
.filter(|item| item.inv_type == InventoryType::MessageTx)
|
.filter(|item| item.inv_type == InventoryType::MessageTx)
|
||||||
|
@ -253,6 +277,42 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl TransactionAcceptSink {
|
||||||
|
pub fn new(data: Arc<TransactionAcceptSinkData>) -> Self {
|
||||||
|
TransactionAcceptSink {
|
||||||
|
data: data,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn boxed(self) -> Box<Self> {
|
||||||
|
Box::new(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TransactionAcceptSinkData {
|
||||||
|
pub fn wait(&self) -> Result<H256, String> {
|
||||||
|
let mut lock = self.result.lock();
|
||||||
|
if lock.is_some() {
|
||||||
|
return lock.take().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
self.waiter.wait(&mut lock);
|
||||||
|
return lock.take().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TransactionVerificationSink for TransactionAcceptSink {
|
||||||
|
fn on_transaction_verification_success(&self, tx: Transaction) {
|
||||||
|
*self.data.result.lock() = Some(Ok(tx.hash()));
|
||||||
|
self.data.waiter.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_transaction_verification_error(&self, err: &str, _hash: &H256) {
|
||||||
|
*self.data.result.lock() = Some(Err(err.to_owned()));
|
||||||
|
self.data.waiter.notify_all();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -260,12 +320,13 @@ mod tests {
|
||||||
use connection_filter::tests::{default_filterload, make_filteradd};
|
use connection_filter::tests::{default_filterload, make_filteradd};
|
||||||
use synchronization_executor::Task;
|
use synchronization_executor::Task;
|
||||||
use synchronization_executor::tests::DummyTaskExecutor;
|
use synchronization_executor::tests::DummyTaskExecutor;
|
||||||
use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore, FilteredInventory};
|
use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore, CoreVerificationSink, FilteredInventory};
|
||||||
use synchronization_chain::Chain;
|
use synchronization_chain::Chain;
|
||||||
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
|
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
|
||||||
use message::types;
|
use message::types;
|
||||||
use message::common::{InventoryVector, InventoryType, BlockTransactionsRequest};
|
use message::common::{InventoryVector, InventoryType, BlockTransactionsRequest};
|
||||||
use network::Magic;
|
use network::Magic;
|
||||||
|
use chain::Transaction;
|
||||||
use db;
|
use db;
|
||||||
use super::LocalNode;
|
use super::LocalNode;
|
||||||
use test_data;
|
use test_data;
|
||||||
|
@ -309,7 +370,7 @@ mod tests {
|
||||||
fn close(&self) {}
|
fn close(&self) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_local_node() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor, DummyVerifier>>) {
|
fn create_local_node(verifier: Option<DummyVerifier>) -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor, DummyVerifier>>) {
|
||||||
let event_loop = event_loop();
|
let event_loop = event_loop();
|
||||||
let handle = event_loop.handle();
|
let handle = event_loop.handle();
|
||||||
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
|
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
|
||||||
|
@ -318,8 +379,11 @@ mod tests {
|
||||||
let config = Config { threads_num: 1, close_connection_on_bad_block: true };
|
let config = Config { threads_num: 1, close_connection_on_bad_block: true };
|
||||||
let chain_verifier = Arc::new(ChainVerifier::new(chain.read().storage(), Magic::Mainnet));
|
let chain_verifier = Arc::new(ChainVerifier::new(chain.read().storage(), Magic::Mainnet));
|
||||||
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), chain_verifier);
|
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), chain_verifier);
|
||||||
let mut verifier = DummyVerifier::default();
|
let mut verifier = match verifier {
|
||||||
verifier.set_sink(client_core.clone());
|
Some(verifier) => verifier,
|
||||||
|
None => DummyVerifier::default(),
|
||||||
|
};
|
||||||
|
verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone())));
|
||||||
let client = SynchronizationClient::new(client_core, verifier);
|
let client = SynchronizationClient::new(client_core, verifier);
|
||||||
let local_node = LocalNode::new(server.clone(), client, executor.clone());
|
let local_node = LocalNode::new(server.clone(), client, executor.clone());
|
||||||
(event_loop, handle, executor, server, local_node)
|
(event_loop, handle, executor, server, local_node)
|
||||||
|
@ -327,7 +391,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn local_node_request_inventory_on_sync_start() {
|
fn local_node_request_inventory_on_sync_start() {
|
||||||
let (_, _, executor, _, local_node) = create_local_node();
|
let (_, _, executor, _, local_node) = create_local_node(None);
|
||||||
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||||
// start sync session
|
// start sync session
|
||||||
local_node.start_sync_session(peer_index, 0);
|
local_node.start_sync_session(peer_index, 0);
|
||||||
|
@ -338,7 +402,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn local_node_serves_block() {
|
fn local_node_serves_block() {
|
||||||
let (_, _, _, server, local_node) = create_local_node();
|
let (_, _, _, server, local_node) = create_local_node(None);
|
||||||
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||||
// peer requests genesis block
|
// peer requests genesis block
|
||||||
let genesis_block_hash = test_data::genesis().hash();
|
let genesis_block_hash = test_data::genesis().hash();
|
||||||
|
@ -358,7 +422,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn local_node_serves_merkleblock() {
|
fn local_node_serves_merkleblock() {
|
||||||
let (_, _, _, server, local_node) = create_local_node();
|
let (_, _, _, server, local_node) = create_local_node(None);
|
||||||
|
|
||||||
let genesis = test_data::genesis();
|
let genesis = test_data::genesis();
|
||||||
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
||||||
|
@ -458,7 +522,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn local_node_serves_compactblock() {
|
fn local_node_serves_compactblock() {
|
||||||
let (_, _, _, server, local_node) = create_local_node();
|
let (_, _, _, server, local_node) = create_local_node(None);
|
||||||
|
|
||||||
let genesis = test_data::genesis();
|
let genesis = test_data::genesis();
|
||||||
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
||||||
|
@ -490,7 +554,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn local_node_serves_get_block_txn_when_recently_sent_compact_block() {
|
fn local_node_serves_get_block_txn_when_recently_sent_compact_block() {
|
||||||
let (_, _, _, server, local_node) = create_local_node();
|
let (_, _, _, server, local_node) = create_local_node(None);
|
||||||
|
|
||||||
let genesis = test_data::genesis();
|
let genesis = test_data::genesis();
|
||||||
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
||||||
|
@ -526,7 +590,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn local_node_not_serves_get_block_txn_when_compact_block_was_not_sent() {
|
fn local_node_not_serves_get_block_txn_when_compact_block_was_not_sent() {
|
||||||
let (_, _, _, server, local_node) = create_local_node();
|
let (_, _, _, server, local_node) = create_local_node(None);
|
||||||
|
|
||||||
let genesis = test_data::genesis();
|
let genesis = test_data::genesis();
|
||||||
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
||||||
|
@ -550,4 +614,49 @@ mod tests {
|
||||||
let tasks = server.take_tasks();
|
let tasks = server.take_tasks();
|
||||||
assert_eq!(tasks, vec![]);
|
assert_eq!(tasks, vec![]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn local_node_accepts_local_transaction() {
|
||||||
|
let (_, _, executor, _, local_node) = create_local_node(None);
|
||||||
|
|
||||||
|
// transaction will be relayed to this peer
|
||||||
|
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||||
|
{ executor.lock().take_tasks(); }
|
||||||
|
|
||||||
|
let genesis = test_data::genesis();
|
||||||
|
let transaction: Transaction = test_data::TransactionBuilder::with_output(1).add_input(&genesis.transactions[0], 0).into();
|
||||||
|
let transaction_hash = transaction.hash();
|
||||||
|
|
||||||
|
let result = local_node.accept_transaction(transaction);
|
||||||
|
assert_eq!(result, Ok(transaction_hash));
|
||||||
|
|
||||||
|
assert_eq!(executor.lock().take_tasks(), vec![Task::SendInventory(peer_index1,
|
||||||
|
vec![InventoryVector {
|
||||||
|
inv_type: InventoryType::MessageTx,
|
||||||
|
hash: "0791efccd035c5fe501023ff888106eba5eff533965de4a6e06400f623bcac34".into(),
|
||||||
|
}]
|
||||||
|
)]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn local_node_discards_local_transaction() {
|
||||||
|
let genesis = test_data::genesis();
|
||||||
|
let transaction: Transaction = test_data::TransactionBuilder::with_output(1).add_input(&genesis.transactions[0], 0).into();
|
||||||
|
let transaction_hash = transaction.hash();
|
||||||
|
|
||||||
|
// simulate transaction verification fail
|
||||||
|
let mut verifier = DummyVerifier::default();
|
||||||
|
verifier.error_when_verifying(transaction_hash.clone(), "simulated");
|
||||||
|
|
||||||
|
let (_, _, executor, _, local_node) = create_local_node(Some(verifier));
|
||||||
|
|
||||||
|
let _peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||||
|
{ executor.lock().take_tasks(); }
|
||||||
|
|
||||||
|
let result = local_node.accept_transaction(transaction);
|
||||||
|
assert_eq!(result, Err("simulated".to_owned()));
|
||||||
|
|
||||||
|
assert_eq!(executor.lock().take_tasks(), vec![]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ use synchronization_server::ServerTaskIndex;
|
||||||
use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory,
|
use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory,
|
||||||
manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS,
|
manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS,
|
||||||
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
|
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
|
||||||
use synchronization_verifier::{Verifier, VerificationSink, VerificationTask};
|
use synchronization_verifier::{Verifier, VerificationSink, BlockVerificationSink, TransactionVerificationSink, VerificationTask};
|
||||||
use compact_block_builder::build_compact_block;
|
use compact_block_builder::build_compact_block;
|
||||||
use hash_queue::HashPosition;
|
use hash_queue::HashPosition;
|
||||||
use miner::transaction_fee_rate;
|
use miner::transaction_fee_rate;
|
||||||
|
@ -164,6 +164,9 @@ const SYNC_SPEED_BLOCKS_TO_INSPECT: usize = 512;
|
||||||
/// Number of blocks to inspect when calculating average blocks speed
|
/// Number of blocks to inspect when calculating average blocks speed
|
||||||
const BLOCKS_SPEED_BLOCKS_TO_INSPECT: usize = 512;
|
const BLOCKS_SPEED_BLOCKS_TO_INSPECT: usize = 512;
|
||||||
|
|
||||||
|
// No-error, no-result future
|
||||||
|
type EmptyBoxFuture = BoxFuture<(), ()>;
|
||||||
|
|
||||||
/// Synchronization state
|
/// Synchronization state
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
pub enum State {
|
pub enum State {
|
||||||
|
@ -210,11 +213,12 @@ pub trait Client : Send + 'static {
|
||||||
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType);
|
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType);
|
||||||
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
|
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
|
||||||
fn on_peer_disconnected(&mut self, peer_index: usize);
|
fn on_peer_disconnected(&mut self, peer_index: usize);
|
||||||
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
|
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: EmptyBoxFuture);
|
||||||
|
fn accept_transaction(&mut self, transaction: Transaction, sink: Box<TransactionVerificationSink>) -> Result<(), String>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Synchronization client trait
|
/// Synchronization client trait
|
||||||
pub trait ClientCore : VerificationSink {
|
pub trait ClientCore {
|
||||||
fn best_block(&self) -> db::BestBlock;
|
fn best_block(&self) -> db::BestBlock;
|
||||||
fn state(&self) -> State;
|
fn state(&self) -> State;
|
||||||
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool;
|
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool;
|
||||||
|
@ -232,9 +236,14 @@ pub trait ClientCore : VerificationSink {
|
||||||
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType);
|
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType);
|
||||||
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
|
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
|
||||||
fn on_peer_disconnected(&mut self, peer_index: usize);
|
fn on_peer_disconnected(&mut self, peer_index: usize);
|
||||||
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
|
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: EmptyBoxFuture);
|
||||||
|
fn accept_transaction(&mut self, transaction: Transaction, sink: Box<TransactionVerificationSink>) -> Result<VecDeque<(H256, Transaction)>, String>;
|
||||||
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>, final_blocks_requests: Option<Vec<H256>>);
|
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>, final_blocks_requests: Option<Vec<H256>>);
|
||||||
fn try_switch_to_saturated_state(&mut self) -> bool;
|
fn try_switch_to_saturated_state(&mut self) -> bool;
|
||||||
|
fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option<Vec<VerificationTask>>;
|
||||||
|
fn on_block_verification_error(&mut self, err: &str, hash: &H256);
|
||||||
|
fn on_transaction_verification_success(&mut self, transaction: Transaction);
|
||||||
|
fn on_transaction_verification_error(&mut self, err: &str, hash: &H256);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -286,7 +295,7 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
|
||||||
/// Cpu pool.
|
/// Cpu pool.
|
||||||
pool: CpuPool,
|
pool: CpuPool,
|
||||||
/// Sync management worker.
|
/// Sync management worker.
|
||||||
management_worker: Option<BoxFuture<(), ()>>,
|
management_worker: Option<EmptyBoxFuture>,
|
||||||
/// Synchronization peers.
|
/// Synchronization peers.
|
||||||
peers: Peers,
|
peers: Peers,
|
||||||
/// Task executor.
|
/// Task executor.
|
||||||
|
@ -304,7 +313,9 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
|
||||||
/// Verifying blocks by peer
|
/// Verifying blocks by peer
|
||||||
verifying_blocks_by_peer: HashMap<H256, usize>,
|
verifying_blocks_by_peer: HashMap<H256, usize>,
|
||||||
/// Verifying blocks futures
|
/// Verifying blocks futures
|
||||||
verifying_blocks_futures: HashMap<usize, (HashSet<H256>, Vec<BoxFuture<(), ()>>)>,
|
verifying_blocks_futures: HashMap<usize, (HashSet<H256>, Vec<EmptyBoxFuture>)>,
|
||||||
|
/// Verifying transactions futures
|
||||||
|
verifying_transactions_sinks: HashMap<H256, Box<TransactionVerificationSink>>,
|
||||||
/// Hashes of items we do not want to relay after verification is completed
|
/// Hashes of items we do not want to relay after verification is completed
|
||||||
do_not_relay: HashSet<H256>,
|
do_not_relay: HashSet<H256>,
|
||||||
/// Block processing speed meter
|
/// Block processing speed meter
|
||||||
|
@ -315,6 +326,12 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
|
||||||
config: Config,
|
config: Config,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Verification sink for synchronization client core
|
||||||
|
pub struct CoreVerificationSink<T: TaskExecutor> {
|
||||||
|
/// Client core reference
|
||||||
|
core: Arc<Mutex<SynchronizationClientCore<T>>>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Block headers provider from `headers` message
|
/// Block headers provider from `headers` message
|
||||||
pub struct MessageBlockHeadersProvider<'a> {
|
pub struct MessageBlockHeadersProvider<'a> {
|
||||||
/// sync chain
|
/// sync chain
|
||||||
|
@ -339,6 +356,12 @@ struct AverageSpeedMeter {
|
||||||
last_timestamp: Option<f64>,
|
last_timestamp: Option<f64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Transaction append error
|
||||||
|
enum AppendTransactionError {
|
||||||
|
Synchronizing,
|
||||||
|
Orphan(HashSet<H256>),
|
||||||
|
}
|
||||||
|
|
||||||
impl FilteredInventory {
|
impl FilteredInventory {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub fn with_unfiltered(unfiltered: Vec<InventoryVector>) -> Self {
|
pub fn with_unfiltered(unfiltered: Vec<InventoryVector>) -> Self {
|
||||||
|
@ -479,9 +502,18 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
|
||||||
self.core.lock().on_peer_disconnected(peer_index);
|
self.core.lock().on_peer_disconnected(peer_index);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>) {
|
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: EmptyBoxFuture) {
|
||||||
self.core.lock().after_peer_nearly_blocks_verified(peer_index, future);
|
self.core.lock().after_peer_nearly_blocks_verified(peer_index, future);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn accept_transaction(&mut self, transaction: Transaction, sink: Box<TransactionVerificationSink>) -> Result<(), String> {
|
||||||
|
let mut transactions_to_verify = try!(self.core.lock().accept_transaction(transaction, sink));
|
||||||
|
let next_block_height = self.best_block().number + 1;
|
||||||
|
while let Some((_, tx)) = transactions_to_verify.pop_front() {
|
||||||
|
self.verifier.verify_transaction(next_block_height, tx);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, U> SynchronizationClient<T, U> where T: TaskExecutor, U: Verifier {
|
impl<T, U> SynchronizationClient<T, U> where T: TaskExecutor, U: Verifier {
|
||||||
|
@ -778,7 +810,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
|
|
||||||
/// Execute after last block from this peer in NearlySaturated state is verified.
|
/// Execute after last block from this peer in NearlySaturated state is verified.
|
||||||
/// If there are no verifying blocks from this peer or we are not in the NearlySaturated state => execute immediately.
|
/// If there are no verifying blocks from this peer or we are not in the NearlySaturated state => execute immediately.
|
||||||
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>) {
|
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: EmptyBoxFuture) {
|
||||||
// if we are currently synchronizing => no need to wait
|
// if we are currently synchronizing => no need to wait
|
||||||
if self.state.is_synchronizing() {
|
if self.state.is_synchronizing() {
|
||||||
future.wait().expect("no-error future");
|
future.wait().expect("no-error future");
|
||||||
|
@ -794,6 +826,18 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn accept_transaction(&mut self, transaction: Transaction, sink: Box<TransactionVerificationSink>) -> Result<VecDeque<(H256, Transaction)>, String> {
|
||||||
|
let hash = transaction.hash();
|
||||||
|
match self.try_append_transaction(hash.clone(), transaction, true) {
|
||||||
|
Err(AppendTransactionError::Orphan(_)) => Err("Cannot append transaction as its inputs are unknown".to_owned()),
|
||||||
|
Err(AppendTransactionError::Synchronizing) => Err("Cannot append transaction as node is not yet fully synchronized".to_owned()),
|
||||||
|
Ok(transactions) => {
|
||||||
|
self.verifying_transactions_sinks.insert(hash, sink);
|
||||||
|
Ok(transactions)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Schedule new synchronization tasks, if any.
|
/// Schedule new synchronization tasks, if any.
|
||||||
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>, final_blocks_requests: Option<Vec<H256>>) {
|
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>, final_blocks_requests: Option<Vec<H256>>) {
|
||||||
let mut tasks: Vec<Task> = Vec::new();
|
let mut tasks: Vec<Task> = Vec::new();
|
||||||
|
@ -951,10 +995,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
|
|
||||||
switch_to_saturated
|
switch_to_saturated
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor {
|
|
||||||
/// Process successful block verification
|
|
||||||
fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option<Vec<VerificationTask>> {
|
fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option<Vec<VerificationTask>> {
|
||||||
// update block processing speed
|
// update block processing speed
|
||||||
self.block_speed_meter.checkpoint();
|
self.block_speed_meter.checkpoint();
|
||||||
|
@ -1018,7 +1059,6 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process failed block verification
|
|
||||||
fn on_block_verification_error(&mut self, err: &str, hash: &H256) {
|
fn on_block_verification_error(&mut self, err: &str, hash: &H256) {
|
||||||
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash.to_reversed_str(), err);
|
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash.to_reversed_str(), err);
|
||||||
|
|
||||||
|
@ -1050,7 +1090,6 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
|
||||||
self.execute_synchronization_tasks(None, None);
|
self.execute_synchronization_tasks(None, None);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process successful transaction verification
|
|
||||||
fn on_transaction_verification_success(&mut self, transaction: Transaction) {
|
fn on_transaction_verification_success(&mut self, transaction: Transaction) {
|
||||||
let hash = transaction.hash();
|
let hash = transaction.hash();
|
||||||
let needs_relay = !self.do_not_relay.remove(&hash);
|
let needs_relay = !self.do_not_relay.remove(&hash);
|
||||||
|
@ -1074,11 +1113,15 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
|
||||||
|
|
||||||
// relay transaction to peers
|
// relay transaction to peers
|
||||||
if needs_relay {
|
if needs_relay {
|
||||||
self.relay_new_transactions(vec![(hash, &transaction, transaction_fee_rate)]);
|
self.relay_new_transactions(vec![(hash.clone(), &transaction, transaction_fee_rate)]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// call verification future, if any
|
||||||
|
if let Some(mut future_sink) = self.verifying_transactions_sinks.remove(&hash) {
|
||||||
|
(&mut future_sink).on_transaction_verification_success(transaction);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process failed transaction verification
|
|
||||||
fn on_transaction_verification_error(&mut self, err: &str, hash: &H256) {
|
fn on_transaction_verification_error(&mut self, err: &str, hash: &H256) {
|
||||||
warn!(target: "sync", "Transaction {:?} verification failed with error {:?}", hash.to_reversed_str(), err);
|
warn!(target: "sync", "Transaction {:?} verification failed with error {:?}", hash.to_reversed_str(), err);
|
||||||
|
|
||||||
|
@ -1090,6 +1133,46 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
|
||||||
// forget for this transaction and all its children
|
// forget for this transaction and all its children
|
||||||
chain.forget_verifying_transaction_with_children(hash);
|
chain.forget_verifying_transaction_with_children(hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// call verification future, if any
|
||||||
|
if let Some(mut future_sink) = self.verifying_transactions_sinks.remove(hash) {
|
||||||
|
(&mut future_sink).on_transaction_verification_error(err, hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> CoreVerificationSink<T> where T: TaskExecutor {
|
||||||
|
pub fn new(core: Arc<Mutex<SynchronizationClientCore<T>>>) -> Self {
|
||||||
|
CoreVerificationSink {
|
||||||
|
core: core,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> VerificationSink for CoreVerificationSink<T> where T: TaskExecutor {
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> BlockVerificationSink for CoreVerificationSink<T> where T: TaskExecutor {
|
||||||
|
/// Process successful block verification
|
||||||
|
fn on_block_verification_success(&self, block: IndexedBlock) -> Option<Vec<VerificationTask>> {
|
||||||
|
self.core.lock().on_block_verification_success(block)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process failed block verification
|
||||||
|
fn on_block_verification_error(&self, err: &str, hash: &H256) {
|
||||||
|
self.core.lock().on_block_verification_error(err, hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> TransactionVerificationSink for CoreVerificationSink<T> where T: TaskExecutor {
|
||||||
|
/// Process successful transaction verification
|
||||||
|
fn on_transaction_verification_success(&self, transaction: Transaction) {
|
||||||
|
self.core.lock().on_transaction_verification_success(transaction)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process failed transaction verification
|
||||||
|
fn on_transaction_verification_error(&self, err: &str, hash: &H256) {
|
||||||
|
self.core.lock().on_transaction_verification_error(err, hash)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1110,6 +1193,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
verify_headers: true,
|
verify_headers: true,
|
||||||
verifying_blocks_by_peer: HashMap::new(),
|
verifying_blocks_by_peer: HashMap::new(),
|
||||||
verifying_blocks_futures: HashMap::new(),
|
verifying_blocks_futures: HashMap::new(),
|
||||||
|
verifying_transactions_sinks: HashMap::new(),
|
||||||
do_not_relay: HashSet::new(),
|
do_not_relay: HashSet::new(),
|
||||||
block_speed_meter: AverageSpeedMeter::with_inspect_items(SYNC_SPEED_BLOCKS_TO_INSPECT),
|
block_speed_meter: AverageSpeedMeter::with_inspect_items(SYNC_SPEED_BLOCKS_TO_INSPECT),
|
||||||
sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT),
|
sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT),
|
||||||
|
@ -1458,9 +1542,20 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
|
|
||||||
/// Process new peer transaction
|
/// Process new peer transaction
|
||||||
fn process_peer_transaction(&mut self, _peer_index: Option<usize>, hash: H256, transaction: Transaction, relay: bool) -> Option<VecDeque<(H256, Transaction)>> {
|
fn process_peer_transaction(&mut self, _peer_index: Option<usize>, hash: H256, transaction: Transaction, relay: bool) -> Option<VecDeque<(H256, Transaction)>> {
|
||||||
|
match self.try_append_transaction(hash.clone(), transaction.clone(), relay) {
|
||||||
|
Err(AppendTransactionError::Orphan(unknown_parents)) => {
|
||||||
|
self.orphaned_transactions_pool.insert(hash, transaction, unknown_parents);
|
||||||
|
None
|
||||||
|
},
|
||||||
|
Err(AppendTransactionError::Synchronizing) => None,
|
||||||
|
Ok(transactions) => Some(transactions),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_append_transaction(&mut self, hash: H256, transaction: Transaction, relay: bool) -> Result<VecDeque<(H256, Transaction)>, AppendTransactionError> {
|
||||||
// if we are in synchronization state, we will ignore this message
|
// if we are in synchronization state, we will ignore this message
|
||||||
if self.state.is_synchronizing() {
|
if self.state.is_synchronizing() {
|
||||||
return None;
|
return Err(AppendTransactionError::Synchronizing);
|
||||||
}
|
}
|
||||||
|
|
||||||
// else => verify transaction + it's orphans and then add to the memory pool
|
// else => verify transaction + it's orphans and then add to the memory pool
|
||||||
|
@ -1472,8 +1567,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
.map(|input| input.previous_output.hash.clone())
|
.map(|input| input.previous_output.hash.clone())
|
||||||
.collect();
|
.collect();
|
||||||
if !unknown_parents.is_empty() {
|
if !unknown_parents.is_empty() {
|
||||||
self.orphaned_transactions_pool.insert(hash, transaction, unknown_parents);
|
return Err(AppendTransactionError::Orphan(unknown_parents));
|
||||||
return None;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// else verify && insert this transaction && all dependent orphans
|
// else verify && insert this transaction && all dependent orphans
|
||||||
|
@ -1487,7 +1581,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
self.do_not_relay.insert(h.clone());
|
self.do_not_relay.insert(h.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(transactions)
|
Ok(transactions)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn forget_failed_blocks(&mut self, blocks_to_forget: &[H256]) {
|
fn forget_failed_blocks(&mut self, blocks_to_forget: &[H256]) {
|
||||||
|
@ -1563,7 +1657,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Awake threads, waiting for this block
|
/// Execute futures, which were waiting for this block verification
|
||||||
fn awake_waiting_threads(&mut self, hash: &H256) {
|
fn awake_waiting_threads(&mut self, hash: &H256) {
|
||||||
// find a peer, which has supplied us with this block
|
// find a peer, which has supplied us with this block
|
||||||
if let Entry::Occupied(block_entry) = self.verifying_blocks_by_peer.entry(hash.clone()) {
|
if let Entry::Occupied(block_entry) = self.verifying_blocks_by_peer.entry(hash.clone()) {
|
||||||
|
@ -1698,7 +1792,8 @@ pub mod tests {
|
||||||
use network::Magic;
|
use network::Magic;
|
||||||
use message::common::{InventoryVector, InventoryType};
|
use message::common::{InventoryVector, InventoryType};
|
||||||
use message::types;
|
use message::types;
|
||||||
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore, BlockAnnouncementType, MessageBlockHeadersProvider};
|
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore, CoreVerificationSink,
|
||||||
|
BlockAnnouncementType, MessageBlockHeadersProvider};
|
||||||
use connection_filter::tests::*;
|
use connection_filter::tests::*;
|
||||||
use synchronization_executor::Task;
|
use synchronization_executor::Task;
|
||||||
use synchronization_chain::{Chain, ChainRef};
|
use synchronization_chain::{Chain, ChainRef};
|
||||||
|
@ -1735,7 +1830,7 @@ pub mod tests {
|
||||||
client_core.lock().verify_headers(false);
|
client_core.lock().verify_headers(false);
|
||||||
}
|
}
|
||||||
let mut verifier = verifier.unwrap_or_default();
|
let mut verifier = verifier.unwrap_or_default();
|
||||||
verifier.set_sink(client_core.clone());
|
verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone())));
|
||||||
let client = SynchronizationClient::new(client_core.clone(), verifier);
|
let client = SynchronizationClient::new(client_core.clone(), verifier);
|
||||||
(event_loop, handle, executor, chain, client_core, client)
|
(event_loop, handle, executor, chain, client_core, client)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ use std::thread;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::mpsc::{channel, Sender, Receiver};
|
use std::sync::mpsc::{channel, Sender, Receiver};
|
||||||
use parking_lot::Mutex;
|
|
||||||
use chain::{Transaction, OutPoint, TransactionOutput};
|
use chain::{Transaction, OutPoint, TransactionOutput};
|
||||||
use network::Magic;
|
use network::Magic;
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
|
@ -11,16 +10,24 @@ use verification::{ChainVerifier, Verify as VerificationVerify, Chain};
|
||||||
use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider};
|
use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider};
|
||||||
use time::get_time;
|
use time::get_time;
|
||||||
|
|
||||||
/// Verification events sink
|
/// Block verification events sink
|
||||||
pub trait VerificationSink : Send + 'static {
|
pub trait BlockVerificationSink : Send + Sync + 'static {
|
||||||
/// When block verification has completed successfully.
|
/// When block verification has completed successfully.
|
||||||
fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option<Vec<VerificationTask>>;
|
fn on_block_verification_success(&self, block: IndexedBlock) -> Option<Vec<VerificationTask>>;
|
||||||
/// When block verification has failed.
|
/// When block verification has failed.
|
||||||
fn on_block_verification_error(&mut self, err: &str, hash: &H256);
|
fn on_block_verification_error(&self, err: &str, hash: &H256);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transaction verification events sink
|
||||||
|
pub trait TransactionVerificationSink : Send + Sync + 'static {
|
||||||
/// When transaction verification has completed successfully.
|
/// When transaction verification has completed successfully.
|
||||||
fn on_transaction_verification_success(&mut self, transaction: Transaction);
|
fn on_transaction_verification_success(&self, transaction: Transaction);
|
||||||
/// When transaction verification has failed.
|
/// When transaction verification has failed.
|
||||||
fn on_transaction_verification_error(&mut self, err: &str, hash: &H256);
|
fn on_transaction_verification_error(&self, err: &str, hash: &H256);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Verification events sink
|
||||||
|
pub trait VerificationSink : BlockVerificationSink + TransactionVerificationSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Verification thread tasks
|
/// Verification thread tasks
|
||||||
|
@ -60,7 +67,7 @@ struct EmptyTransactionOutputProvider {
|
||||||
|
|
||||||
impl AsyncVerifier {
|
impl AsyncVerifier {
|
||||||
/// Create new async verifier
|
/// Create new async verifier
|
||||||
pub fn new<T: VerificationSink>(verifier: Arc<ChainVerifier>, chain: ChainRef, sink: Arc<Mutex<T>>) -> Self {
|
pub fn new<T: VerificationSink>(verifier: Arc<ChainVerifier>, chain: ChainRef, sink: Arc<T>) -> Self {
|
||||||
let (verification_work_sender, verification_work_receiver) = channel();
|
let (verification_work_sender, verification_work_receiver) = channel();
|
||||||
AsyncVerifier {
|
AsyncVerifier {
|
||||||
verification_work_sender: verification_work_sender,
|
verification_work_sender: verification_work_sender,
|
||||||
|
@ -74,7 +81,7 @@ impl AsyncVerifier {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Thread procedure for handling verification tasks
|
/// Thread procedure for handling verification tasks
|
||||||
fn verification_worker_proc<T: VerificationSink>(sink: Arc<Mutex<T>>, chain: ChainRef, verifier: Arc<ChainVerifier>, work_receiver: Receiver<VerificationTask>) {
|
fn verification_worker_proc<T: VerificationSink>(sink: Arc<T>, chain: ChainRef, verifier: Arc<ChainVerifier>, work_receiver: Receiver<VerificationTask>) {
|
||||||
while let Ok(task) = work_receiver.recv() {
|
while let Ok(task) = work_receiver.recv() {
|
||||||
match task {
|
match task {
|
||||||
VerificationTask::Stop => break,
|
VerificationTask::Stop => break,
|
||||||
|
@ -119,12 +126,12 @@ pub struct SyncVerifier<T: VerificationSink> {
|
||||||
/// Verifier
|
/// Verifier
|
||||||
verifier: ChainVerifier,
|
verifier: ChainVerifier,
|
||||||
/// Verification sink
|
/// Verification sink
|
||||||
sink: Arc<Mutex<T>>,
|
sink: Arc<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> SyncVerifier<T> where T: VerificationSink {
|
impl<T> SyncVerifier<T> where T: VerificationSink {
|
||||||
/// Create new sync verifier
|
/// Create new sync verifier
|
||||||
pub fn new(network: Magic, storage: SharedStore, sink: Arc<Mutex<T>>) -> Self {
|
pub fn new(network: Magic, storage: SharedStore, sink: Arc<T>) -> Self {
|
||||||
let verifier = ChainVerifier::new(storage, network);
|
let verifier = ChainVerifier::new(storage, network);
|
||||||
SyncVerifier {
|
SyncVerifier {
|
||||||
verifier: verifier,
|
verifier: verifier,
|
||||||
|
@ -146,7 +153,7 @@ impl<T> Verifier for SyncVerifier<T> where T: VerificationSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute single verification task
|
/// Execute single verification task
|
||||||
fn execute_verification_task<T: VerificationSink, U: PreviousTransactionOutputProvider>(sink: &Arc<Mutex<T>>, tx_output_provider: &U, verifier: &ChainVerifier, task: VerificationTask) {
|
fn execute_verification_task<T: VerificationSink, U: PreviousTransactionOutputProvider>(sink: &Arc<T>, tx_output_provider: &U, verifier: &ChainVerifier, task: VerificationTask) {
|
||||||
let mut tasks_queue: VecDeque<VerificationTask> = VecDeque::new();
|
let mut tasks_queue: VecDeque<VerificationTask> = VecDeque::new();
|
||||||
tasks_queue.push_back(task);
|
tasks_queue.push_back(task);
|
||||||
|
|
||||||
|
@ -156,24 +163,24 @@ fn execute_verification_task<T: VerificationSink, U: PreviousTransactionOutputPr
|
||||||
// verify block
|
// verify block
|
||||||
match verifier.verify(&block) {
|
match verifier.verify(&block) {
|
||||||
Ok(Chain::Main) | Ok(Chain::Side) => {
|
Ok(Chain::Main) | Ok(Chain::Side) => {
|
||||||
if let Some(tasks) = sink.lock().on_block_verification_success(block) {
|
if let Some(tasks) = sink.on_block_verification_success(block) {
|
||||||
tasks_queue.extend(tasks);
|
tasks_queue.extend(tasks);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok(Chain::Orphan) => {
|
Ok(Chain::Orphan) => {
|
||||||
// this can happen for B1 if B0 verification has failed && we have already scheduled verification of B0
|
// this can happen for B1 if B0 verification has failed && we have already scheduled verification of B0
|
||||||
sink.lock().on_block_verification_error(&format!("orphaned block because parent block verification has failed"), &block.hash())
|
sink.on_block_verification_error(&format!("orphaned block because parent block verification has failed"), &block.hash())
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash())
|
sink.on_block_verification_error(&format!("{:?}", e), &block.hash())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
VerificationTask::VerifyTransaction(height, transaction) => {
|
VerificationTask::VerifyTransaction(height, transaction) => {
|
||||||
let time: u32 = get_time().sec as u32;
|
let time: u32 = get_time().sec as u32;
|
||||||
match verifier.verify_transaction(tx_output_provider, height, time, &transaction, 1) {
|
match verifier.verify_transaction(tx_output_provider, height, time, &transaction, 1) {
|
||||||
Ok(_) => sink.lock().on_transaction_verification_success(transaction),
|
Ok(_) => sink.on_transaction_verification_success(transaction),
|
||||||
Err(e) => sink.lock().on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()),
|
Err(e) => sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ => unreachable!("must be checked by caller"),
|
_ => unreachable!("must be checked by caller"),
|
||||||
|
@ -213,22 +220,21 @@ impl PreviousTransactionOutputProvider for EmptyTransactionOutputProvider {
|
||||||
pub mod tests {
|
pub mod tests {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use parking_lot::Mutex;
|
|
||||||
use chain::Transaction;
|
use chain::Transaction;
|
||||||
use synchronization_client::SynchronizationClientCore;
|
use synchronization_client::CoreVerificationSink;
|
||||||
use synchronization_executor::tests::DummyTaskExecutor;
|
use synchronization_executor::tests::DummyTaskExecutor;
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
use super::{Verifier, VerificationSink};
|
use super::{Verifier, BlockVerificationSink, TransactionVerificationSink};
|
||||||
use db::IndexedBlock;
|
use db::IndexedBlock;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct DummyVerifier {
|
pub struct DummyVerifier {
|
||||||
sink: Option<Arc<Mutex<SynchronizationClientCore<DummyTaskExecutor>>>>,
|
sink: Option<Arc<CoreVerificationSink<DummyTaskExecutor>>>,
|
||||||
errors: HashMap<H256, String>
|
errors: HashMap<H256, String>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DummyVerifier {
|
impl DummyVerifier {
|
||||||
pub fn set_sink(&mut self, sink: Arc<Mutex<SynchronizationClientCore<DummyTaskExecutor>>>) {
|
pub fn set_sink(&mut self, sink: Arc<CoreVerificationSink<DummyTaskExecutor>>) {
|
||||||
self.sink = Some(sink);
|
self.sink = Some(sink);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,9 +247,9 @@ pub mod tests {
|
||||||
fn verify_block(&self, block: IndexedBlock) {
|
fn verify_block(&self, block: IndexedBlock) {
|
||||||
match self.sink {
|
match self.sink {
|
||||||
Some(ref sink) => match self.errors.get(&block.hash()) {
|
Some(ref sink) => match self.errors.get(&block.hash()) {
|
||||||
Some(err) => sink.lock().on_block_verification_error(&err, &block.hash()),
|
Some(err) => sink.on_block_verification_error(&err, &block.hash()),
|
||||||
None => {
|
None => {
|
||||||
sink.lock().on_block_verification_success(block);
|
sink.on_block_verification_success(block);
|
||||||
()
|
()
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -254,8 +260,8 @@ pub mod tests {
|
||||||
fn verify_transaction(&self, _height: u32, transaction: Transaction) {
|
fn verify_transaction(&self, _height: u32, transaction: Transaction) {
|
||||||
match self.sink {
|
match self.sink {
|
||||||
Some(ref sink) => match self.errors.get(&transaction.hash()) {
|
Some(ref sink) => match self.errors.get(&transaction.hash()) {
|
||||||
Some(err) => sink.lock().on_transaction_verification_error(&err, &transaction.hash()),
|
Some(err) => sink.on_transaction_verification_error(&err, &transaction.hash()),
|
||||||
None => sink.lock().on_transaction_verification_success(transaction),
|
None => sink.on_transaction_verification_success(transaction),
|
||||||
},
|
},
|
||||||
None => panic!("call set_sink"),
|
None => panic!("call set_sink"),
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue