diff --git a/Cargo.lock b/Cargo.lock index 9ff27fd2..75e82fe4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -783,14 +783,23 @@ dependencies = [ name = "rpc" version = "0.1.0" dependencies = [ + "chain 0.1.0", + "db 0.1.0", "jsonrpc-core 4.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-http-server 6.1.1 (git+https://github.com/ethcore/jsonrpc.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "network 0.1.0", + "p2p 0.1.0", + "primitives 0.1.0", "rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.8.19 (registry+https://github.com/rust-lang/crates.io-index)", "serde_codegen 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", "serde_macros 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)", + "serialization 0.1.0", + "sync 0.1.0", + "test-data 0.1.0", + "tokio-core 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/pbtc/commands/start.rs b/pbtc/commands/start.rs index 7067fe7d..3e865ca2 100644 --- a/pbtc/commands/start.rs +++ b/pbtc/commands/start.rs @@ -1,5 +1,5 @@ use std::net::SocketAddr; -use sync::create_sync_connection_factory; +use sync::{create_local_sync_node, create_sync_connection_factory}; use message::Services; use util::{open_db, init_db, node_table_path}; use {config, p2p, PROTOCOL_VERSION, PROTOCOL_MINIMUM}; @@ -34,9 +34,13 @@ pub fn start(cfg: config::Config) -> Result<(), String> { }; let sync_handle = el.handle(); - let sync_connection_factory = create_sync_connection_factory(&sync_handle, cfg.magic, db); + let local_sync_node = create_local_sync_node(&sync_handle, cfg.magic, db); + let sync_connection_factory = create_sync_connection_factory(local_sync_node.clone()); - let _http_server = try!(rpc::new_http(cfg.rpc_config)); + let rpc_deps = rpc::Dependencies { + local_sync_node: local_sync_node, + }; + let _rpc_server = try!(rpc::new_http(cfg.rpc_config, rpc_deps)); let p2p = try!(p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle()).map_err(|x| x.to_string())); try!(p2p.run().map_err(|_| "Failed to start p2p module")); diff --git a/pbtc/rpc.rs b/pbtc/rpc.rs index 30aae30e..e5fe5812 100644 --- a/pbtc/rpc.rs +++ b/pbtc/rpc.rs @@ -2,6 +2,11 @@ use std::net::SocketAddr; use rpc_apis::{self, ApiSet}; use ethcore_rpc::{Server, RpcServer, RpcServerError}; use std::io; +use sync; + +pub struct Dependencies { + pub local_sync_node: sync::LocalNodeRef, +} #[derive(Debug, PartialEq)] pub struct HttpConfiguration { @@ -26,23 +31,24 @@ impl HttpConfiguration { } } -pub fn new_http(conf: HttpConfiguration) -> Result, String> { +pub fn new_http(conf: HttpConfiguration, deps: Dependencies) -> Result, String> { if !conf.enabled { return Ok(None); } let url = format!("{}:{}", conf.interface, conf.port); let addr = try!(url.parse().map_err(|_| format!("Invalid JSONRPC listen host/port given: {}", url))); - Ok(Some(try!(setup_http_rpc_server(&addr, conf.cors, conf.hosts, conf.apis)))) + Ok(Some(try!(setup_http_rpc_server(&addr, conf.cors, conf.hosts, conf.apis, deps)))) } pub fn setup_http_rpc_server( url: &SocketAddr, cors_domains: Option>, allowed_hosts: Option>, - apis: ApiSet + apis: ApiSet, + deps: Dependencies, ) -> Result { - let server = try!(setup_rpc_server(apis)); + let server = try!(setup_rpc_server(apis, deps)); // TODO: PanicsHandler let start_result = server.start_http(url, cors_domains, allowed_hosts); match start_result { @@ -55,7 +61,7 @@ pub fn setup_http_rpc_server( } } -fn setup_rpc_server(apis: ApiSet) -> Result { +fn setup_rpc_server(apis: ApiSet, deps: Dependencies) -> Result { let server = RpcServer::new(); - Ok(rpc_apis::setup_rpc(server, apis)) + Ok(rpc_apis::setup_rpc(server, apis, deps)) } diff --git a/pbtc/rpc_apis.rs b/pbtc/rpc_apis.rs index a191497d..5c64ad4d 100644 --- a/pbtc/rpc_apis.rs +++ b/pbtc/rpc_apis.rs @@ -1,5 +1,6 @@ use std::str::FromStr; use std::collections::HashSet; +use rpc::Dependencies; use ethcore_rpc::Extendable; #[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)] @@ -38,12 +39,12 @@ impl ApiSet { } } -pub fn setup_rpc(server: T, apis: ApiSet) -> T { +pub fn setup_rpc(server: T, apis: ApiSet, deps: Dependencies) -> T { use ethcore_rpc::v1::*; for api in apis.list_apis() { match api { - Api::Raw => server.add_delegate(RawClient::new().to_delegate()), + Api::Raw => server.add_delegate(RawClient::new(RawClientCore::new(deps.local_sync_node.clone())).to_delegate()), } } server diff --git a/primitives/src/hash.rs b/primitives/src/hash.rs index 17999dd2..b2f20db6 100644 --- a/primitives/src/hash.rs +++ b/primitives/src/hash.rs @@ -124,6 +124,10 @@ macro_rules! impl_hash { impl Eq for $name { } impl $name { + pub fn take(self) -> [u8; $size] { + self.0 + } + pub fn reversed(&self) -> Self { let mut result = self.clone(); result.reverse(); diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 78fa2e5a..00ebf876 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -11,9 +11,18 @@ log = "0.3" serde = "0.8" serde_json = "0.8" rustc-serialize = "0.3" +tokio-core = "0.1.1" serde_macros = { version = "0.8.0", optional = true } jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git" } jsonrpc-http-server = { git = "https://github.com/ethcore/jsonrpc.git" } +sync = { path = "../sync" } +serialization = { path = "../serialization" } +chain = { path = "../chain" } +primitives = { path = "../primitives" } +p2p = { path = "../p2p" } +network = { path = "../network" } +db = { path = "../db" } +test-data = { path = "../test-data" } [build-dependencies] serde_codegen = { version = "0.8.0", optional = true } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 2ece41fc..945bf685 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -7,6 +7,15 @@ extern crate serde; extern crate serde_json; extern crate jsonrpc_core; extern crate jsonrpc_http_server; +extern crate tokio_core; +extern crate sync; +extern crate chain; +extern crate serialization as ser; +extern crate primitives; +extern crate p2p; +extern crate network; +extern crate db; +extern crate test_data; pub mod v1; pub mod rpc_server; diff --git a/rpc/src/v1/helpers/errors.rs b/rpc/src/v1/helpers/errors.rs index f3dccfb0..90ab238f 100644 --- a/rpc/src/v1/helpers/errors.rs +++ b/rpc/src/v1/helpers/errors.rs @@ -1,5 +1,11 @@ ///! RPC Error codes and error objects +mod codes { + // NOTE [ToDr] Codes from [-32099, -32000] + pub const EXECUTION_ERROR: i64 = -32015; +} + + macro_rules! rpc_unimplemented { () => (Err(::v1::helpers::errors::unimplemented(None))) } @@ -22,3 +28,11 @@ pub fn invalid_params(param: &str, details: T) -> Error { data: Some(Value::String(format!("{:?}", details))), } } + +pub fn execution(data: T) -> Error { + Error { + code: ErrorCode::ServerError(codes::EXECUTION_ERROR), + message: "Execution error.".into(), + data: Some(Value::String(format!("{:?}", data))), + } +} diff --git a/rpc/src/v1/impls/mod.rs b/rpc/src/v1/impls/mod.rs index f1627f3f..85256508 100644 --- a/rpc/src/v1/impls/mod.rs +++ b/rpc/src/v1/impls/mod.rs @@ -1,3 +1,3 @@ mod raw; -pub use self::raw::RawClient; +pub use self::raw::{RawClient, RawClientCore}; diff --git a/rpc/src/v1/impls/raw.rs b/rpc/src/v1/impls/raw.rs index 0c67dc4f..556e1001 100644 --- a/rpc/src/v1/impls/raw.rs +++ b/rpc/src/v1/impls/raw.rs @@ -1,17 +1,117 @@ use v1::traits::Raw; use v1::types::RawTransaction; +use v1::types::H256; +use v1::helpers::errors::{execution, invalid_params}; use jsonrpc_core::Error; +use chain::Transaction; +use sync; +use ser::{Reader, deserialize}; +use primitives::hash::H256 as GlobalH256; -pub struct RawClient; +pub struct RawClient { + core: T, +} -impl RawClient { - pub fn new() -> Self { - RawClient { } +pub trait RawClientCoreApi: Send + Sync + 'static { + fn accept_transaction(&self, transaction: Transaction) -> Result; +} + +pub struct RawClientCore { + local_sync_node: sync::LocalNodeRef, +} + +impl RawClientCore { + pub fn new(local_sync_node: sync::LocalNodeRef) -> Self { + RawClientCore { + local_sync_node: local_sync_node, + } } } -impl Raw for RawClient { - fn send_raw_transaction(&self, _tx: RawTransaction) -> Result<(), Error> { - rpc_unimplemented!() +impl RawClientCoreApi for RawClientCore { + fn accept_transaction(&self, transaction: Transaction) -> Result { + self.local_sync_node.accept_transaction(transaction) +} + } + +impl RawClient where T: RawClientCoreApi { + pub fn new(core: T) -> Self { + RawClient { + core: core, + } + } +} + +impl Raw for RawClient where T: RawClientCoreApi { + fn send_raw_transaction(&self, raw_transaction: RawTransaction) -> Result { + let raw_transaction_data: Vec = raw_transaction.into(); + let transaction = try!(deserialize(Reader::new(&raw_transaction_data)).map_err(|e| invalid_params("tx", e))); + self.core.accept_transaction(transaction) + .map(|h| h.reversed().into()) + .map_err(|e| execution(e)) + } +} + +#[cfg(test)] +pub mod tests { + use jsonrpc_core::{IoHandler, GenericIoHandler}; + use chain::Transaction; + use primitives::hash::H256 as GlobalH256; + use v1::traits::Raw; + use super::*; + + #[derive(Default)] + struct SuccessRawClientCore; + #[derive(Default)] + struct ErrorRawClientCore; + + impl RawClientCoreApi for SuccessRawClientCore { + fn accept_transaction(&self, transaction: Transaction) -> Result { + Ok(transaction.hash()) + } + } + + impl RawClientCoreApi for ErrorRawClientCore { + fn accept_transaction(&self, _transaction: Transaction) -> Result { + Err("error".to_owned()) + } + } + + #[test] + fn sendrawtransaction_accepted() { + let client = RawClient::new(SuccessRawClientCore::default()); + let handler = IoHandler::new(); + handler.add_delegate(client.to_delegate()); + + let sample = handler.handle_request_sync(&(r#" + { + "jsonrpc": "2.0", + "method": "sendrawtransaction", + "params": ["00000000013ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a0000000000000000000101000000000000000000000000"], + "id": 1 + }"#) + ).unwrap(); + + // direct hash is 0791efccd035c5fe501023ff888106eba5eff533965de4a6e06400f623bcac34 + // but client expects reverse hash + assert_eq!(r#"{"jsonrpc":"2.0","result":"34acbc23f60064e0a6e45d9633f5efa5eb068188ff231050fec535d0ccef9107","id":1}"#, &sample); + } + + #[test] + fn sendrawtransaction_rejected() { + let client = RawClient::new(ErrorRawClientCore::default()); + let handler = IoHandler::new(); + handler.add_delegate(client.to_delegate()); + + let sample = handler.handle_request_sync(&(r#" + { + "jsonrpc": "2.0", + "method": "sendrawtransaction", + "params": ["00000000013ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a0000000000000000000101000000000000000000000000"], + "id": 1 + }"#) + ).unwrap(); + + assert_eq!(r#"{"jsonrpc":"2.0","error":{"code":-32015,"message":"Execution error.","data":"\"error\""},"id":1}"#, &sample); } } diff --git a/rpc/src/v1/mod.rs b/rpc/src/v1/mod.rs index fc31be83..32ef8c22 100644 --- a/rpc/src/v1/mod.rs +++ b/rpc/src/v1/mod.rs @@ -5,4 +5,4 @@ pub mod traits; pub mod types; pub use self::traits::Raw; -pub use self::impls::RawClient; +pub use self::impls::{RawClient, RawClientCore}; diff --git a/rpc/src/v1/traits/raw.rs b/rpc/src/v1/traits/raw.rs index e4c208dd..3295353b 100644 --- a/rpc/src/v1/traits/raw.rs +++ b/rpc/src/v1/traits/raw.rs @@ -2,12 +2,13 @@ use jsonrpc_core::Error; use v1::helpers::auto_args::Wrap; use v1::types::RawTransaction; +use v1::types::H256; build_rpc_trait! { /// Partiy-bitcoin raw data interface. pub trait Raw { /// Adds transaction to the memory pool && relays it to the peers. #[rpc(name = "sendrawtransaction")] - fn send_raw_transaction(&self, RawTransaction) -> Result<(), Error>; + fn send_raw_transaction(&self, RawTransaction) -> Result; } } diff --git a/rpc/src/v1/types/hash.rs b/rpc/src/v1/types/hash.rs new file mode 100644 index 00000000..79e388e2 --- /dev/null +++ b/rpc/src/v1/types/hash.rs @@ -0,0 +1,168 @@ +use std::fmt; +use std::str::FromStr; +use std::cmp::Ordering; +use std::hash::{Hash, Hasher}; +use serde; +use rustc_serialize::hex::{ToHex, FromHex}; +use primitives::hash::H256 as GlobalH256; + +macro_rules! impl_hash { + ($name: ident, $other: ident, $size: expr) => { + /// Hash serialization + #[derive(Eq)] + pub struct $name([u8; $size]); + + impl Default for $name { + fn default() -> Self { + $name([0; $size]) + } + } + + impl fmt::Debug for $name { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "{}", $other::from(self.0.clone()).to_hex()) + } + } + + impl From for $name where $other: From { + fn from(o: T) -> Self { + $name($other::from(o).take()) + } + } + + impl FromStr for $name { + type Err = <$other as FromStr>::Err; + + fn from_str(s: &str) -> Result { + let other = try!($other::from_str(s)); + Ok($name(other.take())) + } + } + + impl Into<$other> for $name { + fn into(self) -> $other { + $other::from(self.0) + } + } + + impl PartialEq for $name { + fn eq(&self, other: &Self) -> bool { + let self_ref: &[u8] = &self.0; + let other_ref: &[u8] = &other.0; + self_ref == other_ref + } + } + + impl PartialOrd for $name { + fn partial_cmp(&self, other: &Self) -> Option { + let self_ref: &[u8] = &self.0; + let other_ref: &[u8] = &other.0; + self_ref.partial_cmp(other_ref) + } + } + + impl Ord for $name { + fn cmp(&self, other: &Self) -> Ordering { + let self_ref: &[u8] = &self.0; + let other_ref: &[u8] = &other.0; + self_ref.cmp(other_ref) + } + } + + impl Hash for $name { + fn hash(&self, state: &mut H) where H: Hasher { + $other::from(self.0.clone()).hash(state) + } + } + + impl Clone for $name { + fn clone(&self) -> Self { + let mut r = [0; $size]; + r.copy_from_slice(&self.0); + $name(r) + } + } + + impl serde::Serialize for $name { + fn serialize(&self, serializer: &mut S) -> Result<(), S::Error> + where S: serde::Serializer { + let mut hex = String::new(); + hex.push_str(&$other::from(self.0.clone()).to_hex()); + serializer.serialize_str(&hex) + } + } + + impl serde::Deserialize for $name { + fn deserialize(deserializer: &mut D) -> Result<$name, D::Error> where D: serde::Deserializer { + struct HashVisitor; + + impl serde::de::Visitor for HashVisitor { + type Value = $name; + + fn visit_str(&mut self, value: &str) -> Result where E: serde::Error { + + if value.len() != $size * 2 { + return Err(serde::Error::custom("Invalid length.")); + } + + match value[..].from_hex() { + Ok(ref v) => { + let mut result = [0u8; $size]; + result.copy_from_slice(v); + Ok($name($other::from(result).take())) + }, + _ => Err(serde::Error::custom("Invalid hex value.")) + } + } + + fn visit_string(&mut self, value: String) -> Result where E: serde::Error { + self.visit_str(value.as_ref()) + } + } + + deserializer.deserialize(HashVisitor) + } + } + } +} + +impl_hash!(H256, GlobalH256, 32); + +#[cfg(test)] +mod tests { + use super::H256; + use primitives::hash::H256 as GlobalH256; + use std::str::FromStr; + + #[test] + fn hash_debug() { + let str_reversed = "00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048"; + let reversed_hash = H256::from(str_reversed); + let debug_result = format!("{:?}", reversed_hash); + assert_eq!(debug_result, str_reversed); + } + + #[test] + fn hash_from_str() { + let str_reversed = "00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048"; + match H256::from_str(str_reversed) { + Ok(reversed_hash) => assert_eq!(format!("{:?}", reversed_hash), str_reversed), + _ => panic!("unexpected"), + } + + let str_reversed = "XXXYYY"; + match H256::from_str(str_reversed) { + Ok(_) => panic!("unexpected"), + _ => (), + } + } + + #[test] + fn hash_to_global_hash() { + let str_reversed = "00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048"; + let reversed_hash = H256::from(str_reversed); + let global_hash = GlobalH256::from(str_reversed); + let global_converted: GlobalH256 = reversed_hash.into(); + assert_eq!(global_converted, global_hash); + } +} diff --git a/rpc/src/v1/types/mod.rs.in b/rpc/src/v1/types/mod.rs.in index 61b3b4a6..82b9a583 100644 --- a/rpc/src/v1/types/mod.rs.in +++ b/rpc/src/v1/types/mod.rs.in @@ -1,4 +1,7 @@ mod bytes; +mod hash; mod raw_transaction; +pub use self::bytes::Bytes; +pub use self::hash::H256; pub use self::raw_transaction::RawTransaction; diff --git a/sync/src/blocks_writer.rs b/sync/src/blocks_writer.rs index e3892c6c..a3c7985e 100644 --- a/sync/src/blocks_writer.rs +++ b/sync/src/blocks_writer.rs @@ -5,7 +5,8 @@ use chain; use db; use network::Magic; use orphan_blocks_pool::OrphanBlocksPool; -use synchronization_verifier::{Verifier, SyncVerifier, VerificationSink, VerificationTask}; +use synchronization_verifier::{Verifier, SyncVerifier, VerificationTask, + VerificationSink, BlockVerificationSink, TransactionVerificationSink}; use primitives::hash::H256; use super::Error; @@ -15,23 +16,28 @@ pub struct BlocksWriter { storage: db::SharedStore, orphaned_blocks_pool: OrphanBlocksPool, verifier: SyncVerifier, - sink: Arc>, + sink: Arc, } struct BlocksWriterSink { + data: Arc, +} + +struct BlocksWriterSinkData { storage: db::SharedStore, - err: Option, + err: Mutex>, } impl BlocksWriter { pub fn new(storage: db::SharedStore, network: Magic) -> BlocksWriter { - let sink = Arc::new(Mutex::new(BlocksWriterSink::new(storage.clone()))); - let verifier = SyncVerifier::new(network, storage.clone(), sink.clone()); + let sink_data = Arc::new(BlocksWriterSinkData::new(storage.clone())); + let sink = Arc::new(BlocksWriterSink::new(sink_data.clone())); + let verifier = SyncVerifier::new(network, storage.clone(), sink); BlocksWriter { storage: storage, orphaned_blocks_pool: OrphanBlocksPool::new(), verifier: verifier, - sink: sink, + sink: sink_data, } } @@ -56,7 +62,8 @@ impl BlocksWriter { verification_queue.push_front(indexed_block); while let Some(block) = verification_queue.pop_front() { self.verifier.verify_block(block); - if let Some(err) = self.sink.lock().error() { + + if let Some(err) = self.sink.error() { return Err(err); } } @@ -66,35 +73,48 @@ impl BlocksWriter { } impl BlocksWriterSink { - pub fn new(storage: db::SharedStore) -> Self { + pub fn new(data: Arc) -> Self { BlocksWriterSink { + data: data, + } + } +} + +impl BlocksWriterSinkData { + pub fn new(storage: db::SharedStore) -> Self { + BlocksWriterSinkData { storage: storage, - err: None, + err: Mutex::new(None), } } - pub fn error(&mut self) -> Option { - self.err.take() + pub fn error(&self) -> Option { + self.err.lock().take() } } impl VerificationSink for BlocksWriterSink { - fn on_block_verification_success(&mut self, block: db::IndexedBlock) -> Option> { - if let Err(err) = self.storage.insert_indexed_block(&block) { - self.err = Some(Error::Database(err)); +} + +impl BlockVerificationSink for BlocksWriterSink { + fn on_block_verification_success(&self, block: db::IndexedBlock) -> Option> { + if let Err(err) = self.data.storage.insert_indexed_block(&block) { + *self.data.err.lock() = Some(Error::Database(err)); } None } - fn on_block_verification_error(&mut self, err: &str, _hash: &H256) { - self.err = Some(Error::Verification(err.into())); + fn on_block_verification_error(&self, err: &str, _hash: &H256) { + *self.data.err.lock() = Some(Error::Verification(err.into())); } +} - fn on_transaction_verification_success(&mut self, _transaction: chain::Transaction) { +impl TransactionVerificationSink for BlocksWriterSink { + fn on_transaction_verification_success(&self, _transaction: chain::Transaction) { unreachable!("not intended to verify transactions") } - fn on_transaction_verification_error(&mut self, _err: &str, _hash: &H256) { + fn on_transaction_verification_error(&self, _err: &str, _hash: &H256) { unreachable!("not intended to verify transactions") } } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 00338074..fa78b3ed 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -43,6 +43,8 @@ mod synchronization_peers; mod synchronization_server; mod synchronization_verifier; +pub use local_node::LocalNodeRef; + use std::sync::Arc; use parking_lot::RwLock; use tokio_core::reactor::Handle; @@ -65,14 +67,13 @@ pub fn create_sync_blocks_writer(db: db::SharedStore, network: Magic) -> blocks_ blocks_writer::BlocksWriter::new(db, network) } -/// Create inbound synchronization connections factory for given `db`. -pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::SharedStore) -> p2p::LocalSyncNodeRef { +/// Creates local sync node for given `db` +pub fn create_local_sync_node(handle: &Handle, network: Magic, db: db::SharedStore) -> LocalNodeRef { use synchronization_chain::Chain as SyncChain; use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor; use local_node::LocalNode as SyncNode; - use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory; use synchronization_server::SynchronizationServer; - use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig}; + use synchronization_client::{SynchronizationClient, SynchronizationClientCore, CoreVerificationSink, Config as SynchronizationConfig}; use synchronization_verifier::AsyncVerifier; let sync_client_config = SynchronizationConfig { @@ -87,8 +88,15 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S let sync_executor = SyncExecutor::new(sync_chain.clone()); let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone())); let sync_client_core = SynchronizationClientCore::new(sync_client_config, handle, sync_executor.clone(), sync_chain.clone(), chain_verifier.clone()); - let verifier = AsyncVerifier::new(chain_verifier, sync_chain, sync_client_core.clone()); + let verifier_sink = Arc::new(CoreVerificationSink::new(sync_client_core.clone())); + let verifier = AsyncVerifier::new(chain_verifier, sync_chain, verifier_sink); let sync_client = SynchronizationClient::new(sync_client_core, verifier); - let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor)); - SyncConnectionFactory::with_local_node(sync_node) + Arc::new(SyncNode::new(sync_server, sync_client, sync_executor)) +} + +/// Create inbound synchronization connections factory for given local sync node. +pub fn create_sync_connection_factory(local_sync_node: LocalNodeRef) -> p2p::LocalSyncNodeRef { + use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory; + + SyncConnectionFactory::with_local_node(local_sync_node) } diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 3b84192c..4c5e3d7f 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -1,14 +1,15 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use parking_lot::Mutex; +use parking_lot::{Mutex, Condvar}; use db; +use chain::Transaction; use p2p::OutboundSyncConnectionRef; use message::common::{InventoryType, InventoryVector}; use message::types; use synchronization_client::{Client, SynchronizationClient, BlockAnnouncementType}; use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor}; use synchronization_server::{Server, SynchronizationServer}; -use synchronization_verifier::AsyncVerifier; +use synchronization_verifier::{AsyncVerifier, TransactionVerificationSink}; use primitives::hash::H256; // TODO: check messages before processing (filterload' filter is max 36000, nHashFunc is <= 50, etc) @@ -35,6 +36,17 @@ pub trait PeersConnections { fn remove_peer_connection(&mut self, peer_index: usize); } +/// Transaction accept verification sink +struct TransactionAcceptSink { + data: Arc, +} + +#[derive(Default)] +struct TransactionAcceptSinkData { + result: Mutex>>, + waiter: Condvar, +} + impl LocalNode where T: SynchronizationTaskExecutor + PeersConnections, U: Server, V: Client { @@ -238,6 +250,18 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon self.client.lock().on_peer_blocks_notfound(peer_index, blocks_inventory); } + pub fn accept_transaction(&self, transaction: Transaction) -> Result { + let sink_data = Arc::new(TransactionAcceptSinkData::default()); + let sink = TransactionAcceptSink::new(sink_data.clone()).boxed(); + { + let mut client = self.client.lock(); + if let Err(err) = client.accept_transaction(transaction, sink) { + return Err(err.into()); + } + } + sink_data.wait() + } + fn transactions_inventory(&self, inventory: &[InventoryVector]) -> Vec { inventory.iter() .filter(|item| item.inv_type == InventoryType::MessageTx) @@ -253,6 +277,42 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon } } +impl TransactionAcceptSink { + pub fn new(data: Arc) -> Self { + TransactionAcceptSink { + data: data, + } + } + + pub fn boxed(self) -> Box { + Box::new(self) + } +} + +impl TransactionAcceptSinkData { + pub fn wait(&self) -> Result { + let mut lock = self.result.lock(); + if lock.is_some() { + return lock.take().unwrap(); + } + + self.waiter.wait(&mut lock); + return lock.take().unwrap(); + } +} + +impl TransactionVerificationSink for TransactionAcceptSink { + fn on_transaction_verification_success(&self, tx: Transaction) { + *self.data.result.lock() = Some(Ok(tx.hash())); + self.data.waiter.notify_all(); + } + + fn on_transaction_verification_error(&self, err: &str, _hash: &H256) { + *self.data.result.lock() = Some(Err(err.to_owned())); + self.data.waiter.notify_all(); + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -260,12 +320,13 @@ mod tests { use connection_filter::tests::{default_filterload, make_filteradd}; use synchronization_executor::Task; use synchronization_executor::tests::DummyTaskExecutor; - use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore, FilteredInventory}; + use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore, CoreVerificationSink, FilteredInventory}; use synchronization_chain::Chain; use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef}; use message::types; use message::common::{InventoryVector, InventoryType, BlockTransactionsRequest}; use network::Magic; + use chain::Transaction; use db; use super::LocalNode; use test_data; @@ -309,7 +370,7 @@ mod tests { fn close(&self) {} } - fn create_local_node() -> (Core, Handle, Arc>, Arc, LocalNode>) { + fn create_local_node(verifier: Option) -> (Core, Handle, Arc>, Arc, LocalNode>) { let event_loop = event_loop(); let handle = event_loop.handle(); let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block())))); @@ -318,8 +379,11 @@ mod tests { let config = Config { threads_num: 1, close_connection_on_bad_block: true }; let chain_verifier = Arc::new(ChainVerifier::new(chain.read().storage(), Magic::Mainnet)); let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), chain_verifier); - let mut verifier = DummyVerifier::default(); - verifier.set_sink(client_core.clone()); + let mut verifier = match verifier { + Some(verifier) => verifier, + None => DummyVerifier::default(), + }; + verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone()))); let client = SynchronizationClient::new(client_core, verifier); let local_node = LocalNode::new(server.clone(), client, executor.clone()); (event_loop, handle, executor, server, local_node) @@ -327,7 +391,7 @@ mod tests { #[test] fn local_node_request_inventory_on_sync_start() { - let (_, _, executor, _, local_node) = create_local_node(); + let (_, _, executor, _, local_node) = create_local_node(None); let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); // start sync session local_node.start_sync_session(peer_index, 0); @@ -338,7 +402,7 @@ mod tests { #[test] fn local_node_serves_block() { - let (_, _, _, server, local_node) = create_local_node(); + let (_, _, _, server, local_node) = create_local_node(None); let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); // peer requests genesis block let genesis_block_hash = test_data::genesis().hash(); @@ -358,7 +422,7 @@ mod tests { #[test] fn local_node_serves_merkleblock() { - let (_, _, _, server, local_node) = create_local_node(); + let (_, _, _, server, local_node) = create_local_node(None); let genesis = test_data::genesis(); let b1 = test_data::block_builder().header().parent(genesis.hash()).build() @@ -458,7 +522,7 @@ mod tests { #[test] fn local_node_serves_compactblock() { - let (_, _, _, server, local_node) = create_local_node(); + let (_, _, _, server, local_node) = create_local_node(None); let genesis = test_data::genesis(); let b1 = test_data::block_builder().header().parent(genesis.hash()).build() @@ -490,7 +554,7 @@ mod tests { #[test] fn local_node_serves_get_block_txn_when_recently_sent_compact_block() { - let (_, _, _, server, local_node) = create_local_node(); + let (_, _, _, server, local_node) = create_local_node(None); let genesis = test_data::genesis(); let b1 = test_data::block_builder().header().parent(genesis.hash()).build() @@ -526,7 +590,7 @@ mod tests { #[test] fn local_node_not_serves_get_block_txn_when_compact_block_was_not_sent() { - let (_, _, _, server, local_node) = create_local_node(); + let (_, _, _, server, local_node) = create_local_node(None); let genesis = test_data::genesis(); let b1 = test_data::block_builder().header().parent(genesis.hash()).build() @@ -550,4 +614,49 @@ mod tests { let tasks = server.take_tasks(); assert_eq!(tasks, vec![]); } + + #[test] + fn local_node_accepts_local_transaction() { + let (_, _, executor, _, local_node) = create_local_node(None); + + // transaction will be relayed to this peer + let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + { executor.lock().take_tasks(); } + + let genesis = test_data::genesis(); + let transaction: Transaction = test_data::TransactionBuilder::with_output(1).add_input(&genesis.transactions[0], 0).into(); + let transaction_hash = transaction.hash(); + + let result = local_node.accept_transaction(transaction); + assert_eq!(result, Ok(transaction_hash)); + + assert_eq!(executor.lock().take_tasks(), vec![Task::SendInventory(peer_index1, + vec![InventoryVector { + inv_type: InventoryType::MessageTx, + hash: "0791efccd035c5fe501023ff888106eba5eff533965de4a6e06400f623bcac34".into(), + }] + )] + ); + } + + #[test] + fn local_node_discards_local_transaction() { + let genesis = test_data::genesis(); + let transaction: Transaction = test_data::TransactionBuilder::with_output(1).add_input(&genesis.transactions[0], 0).into(); + let transaction_hash = transaction.hash(); + + // simulate transaction verification fail + let mut verifier = DummyVerifier::default(); + verifier.error_when_verifying(transaction_hash.clone(), "simulated"); + + let (_, _, executor, _, local_node) = create_local_node(Some(verifier)); + + let _peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + { executor.lock().take_tasks(); } + + let result = local_node.accept_transaction(transaction); + assert_eq!(result, Err("simulated".to_owned())); + + assert_eq!(executor.lock().take_tasks(), vec![]); + } } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 66077e4a..437a08ee 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -25,7 +25,7 @@ use synchronization_server::ServerTaskIndex; use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory, manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig}; -use synchronization_verifier::{Verifier, VerificationSink, VerificationTask}; +use synchronization_verifier::{Verifier, VerificationSink, BlockVerificationSink, TransactionVerificationSink, VerificationTask}; use compact_block_builder::build_compact_block; use hash_queue::HashPosition; use miner::transaction_fee_rate; @@ -164,6 +164,9 @@ const SYNC_SPEED_BLOCKS_TO_INSPECT: usize = 512; /// Number of blocks to inspect when calculating average blocks speed const BLOCKS_SPEED_BLOCKS_TO_INSPECT: usize = 512; +// No-error, no-result future +type EmptyBoxFuture = BoxFuture<(), ()>; + /// Synchronization state #[derive(Debug, Clone, Copy)] pub enum State { @@ -210,11 +213,12 @@ pub trait Client : Send + 'static { fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType); fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter); fn on_peer_disconnected(&mut self, peer_index: usize); - fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>); + fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: EmptyBoxFuture); + fn accept_transaction(&mut self, transaction: Transaction, sink: Box) -> Result<(), String>; } /// Synchronization client trait -pub trait ClientCore : VerificationSink { +pub trait ClientCore { fn best_block(&self) -> db::BestBlock; fn state(&self) -> State; fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool; @@ -232,9 +236,14 @@ pub trait ClientCore : VerificationSink { fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType); fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter); fn on_peer_disconnected(&mut self, peer_index: usize); - fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>); + fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: EmptyBoxFuture); + fn accept_transaction(&mut self, transaction: Transaction, sink: Box) -> Result, String>; fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option>, final_blocks_requests: Option>); fn try_switch_to_saturated_state(&mut self) -> bool; + fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option>; + fn on_block_verification_error(&mut self, err: &str, hash: &H256); + fn on_transaction_verification_success(&mut self, transaction: Transaction); + fn on_transaction_verification_error(&mut self, err: &str, hash: &H256); } @@ -286,7 +295,7 @@ pub struct SynchronizationClientCore { /// Cpu pool. pool: CpuPool, /// Sync management worker. - management_worker: Option>, + management_worker: Option, /// Synchronization peers. peers: Peers, /// Task executor. @@ -304,7 +313,9 @@ pub struct SynchronizationClientCore { /// Verifying blocks by peer verifying_blocks_by_peer: HashMap, /// Verifying blocks futures - verifying_blocks_futures: HashMap, Vec>)>, + verifying_blocks_futures: HashMap, Vec)>, + /// Verifying transactions futures + verifying_transactions_sinks: HashMap>, /// Hashes of items we do not want to relay after verification is completed do_not_relay: HashSet, /// Block processing speed meter @@ -315,6 +326,12 @@ pub struct SynchronizationClientCore { config: Config, } +/// Verification sink for synchronization client core +pub struct CoreVerificationSink { + /// Client core reference + core: Arc>>, +} + /// Block headers provider from `headers` message pub struct MessageBlockHeadersProvider<'a> { /// sync chain @@ -339,6 +356,12 @@ struct AverageSpeedMeter { last_timestamp: Option, } +/// Transaction append error +enum AppendTransactionError { + Synchronizing, + Orphan(HashSet), +} + impl FilteredInventory { #[cfg(test)] pub fn with_unfiltered(unfiltered: Vec) -> Self { @@ -479,9 +502,18 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri self.core.lock().on_peer_disconnected(peer_index); } - fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>) { + fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: EmptyBoxFuture) { self.core.lock().after_peer_nearly_blocks_verified(peer_index, future); } + + fn accept_transaction(&mut self, transaction: Transaction, sink: Box) -> Result<(), String> { + let mut transactions_to_verify = try!(self.core.lock().accept_transaction(transaction, sink)); + let next_block_height = self.best_block().number + 1; + while let Some((_, tx)) = transactions_to_verify.pop_front() { + self.verifier.verify_transaction(next_block_height, tx); + } + Ok(()) + } } impl SynchronizationClient where T: TaskExecutor, U: Verifier { @@ -778,7 +810,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { /// Execute after last block from this peer in NearlySaturated state is verified. /// If there are no verifying blocks from this peer or we are not in the NearlySaturated state => execute immediately. - fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>) { + fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: EmptyBoxFuture) { // if we are currently synchronizing => no need to wait if self.state.is_synchronizing() { future.wait().expect("no-error future"); @@ -794,6 +826,18 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { } } + fn accept_transaction(&mut self, transaction: Transaction, sink: Box) -> Result, String> { + let hash = transaction.hash(); + match self.try_append_transaction(hash.clone(), transaction, true) { + Err(AppendTransactionError::Orphan(_)) => Err("Cannot append transaction as its inputs are unknown".to_owned()), + Err(AppendTransactionError::Synchronizing) => Err("Cannot append transaction as node is not yet fully synchronized".to_owned()), + Ok(transactions) => { + self.verifying_transactions_sinks.insert(hash, sink); + Ok(transactions) + }, + } + } + /// Schedule new synchronization tasks, if any. fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option>, final_blocks_requests: Option>) { let mut tasks: Vec = Vec::new(); @@ -951,10 +995,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { switch_to_saturated } -} -impl VerificationSink for SynchronizationClientCore where T: TaskExecutor { - /// Process successful block verification fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option> { // update block processing speed self.block_speed_meter.checkpoint(); @@ -1018,7 +1059,6 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor } } - /// Process failed block verification fn on_block_verification_error(&mut self, err: &str, hash: &H256) { warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash.to_reversed_str(), err); @@ -1050,7 +1090,6 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor self.execute_synchronization_tasks(None, None); } - /// Process successful transaction verification fn on_transaction_verification_success(&mut self, transaction: Transaction) { let hash = transaction.hash(); let needs_relay = !self.do_not_relay.remove(&hash); @@ -1074,11 +1113,15 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor // relay transaction to peers if needs_relay { - self.relay_new_transactions(vec![(hash, &transaction, transaction_fee_rate)]); + self.relay_new_transactions(vec![(hash.clone(), &transaction, transaction_fee_rate)]); + } + + // call verification future, if any + if let Some(mut future_sink) = self.verifying_transactions_sinks.remove(&hash) { + (&mut future_sink).on_transaction_verification_success(transaction); } } - /// Process failed transaction verification fn on_transaction_verification_error(&mut self, err: &str, hash: &H256) { warn!(target: "sync", "Transaction {:?} verification failed with error {:?}", hash.to_reversed_str(), err); @@ -1090,6 +1133,46 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor // forget for this transaction and all its children chain.forget_verifying_transaction_with_children(hash); } + + // call verification future, if any + if let Some(mut future_sink) = self.verifying_transactions_sinks.remove(hash) { + (&mut future_sink).on_transaction_verification_error(err, hash); + } + } +} + +impl CoreVerificationSink where T: TaskExecutor { + pub fn new(core: Arc>>) -> Self { + CoreVerificationSink { + core: core, + } + } +} + +impl VerificationSink for CoreVerificationSink where T: TaskExecutor { +} + +impl BlockVerificationSink for CoreVerificationSink where T: TaskExecutor { + /// Process successful block verification + fn on_block_verification_success(&self, block: IndexedBlock) -> Option> { + self.core.lock().on_block_verification_success(block) + } + + /// Process failed block verification + fn on_block_verification_error(&self, err: &str, hash: &H256) { + self.core.lock().on_block_verification_error(err, hash) + } +} + +impl TransactionVerificationSink for CoreVerificationSink where T: TaskExecutor { + /// Process successful transaction verification + fn on_transaction_verification_success(&self, transaction: Transaction) { + self.core.lock().on_transaction_verification_success(transaction) + } + + /// Process failed transaction verification + fn on_transaction_verification_error(&self, err: &str, hash: &H256) { + self.core.lock().on_transaction_verification_error(err, hash) } } @@ -1110,6 +1193,7 @@ impl SynchronizationClientCore where T: TaskExecutor { verify_headers: true, verifying_blocks_by_peer: HashMap::new(), verifying_blocks_futures: HashMap::new(), + verifying_transactions_sinks: HashMap::new(), do_not_relay: HashSet::new(), block_speed_meter: AverageSpeedMeter::with_inspect_items(SYNC_SPEED_BLOCKS_TO_INSPECT), sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT), @@ -1458,9 +1542,20 @@ impl SynchronizationClientCore where T: TaskExecutor { /// Process new peer transaction fn process_peer_transaction(&mut self, _peer_index: Option, hash: H256, transaction: Transaction, relay: bool) -> Option> { + match self.try_append_transaction(hash.clone(), transaction.clone(), relay) { + Err(AppendTransactionError::Orphan(unknown_parents)) => { + self.orphaned_transactions_pool.insert(hash, transaction, unknown_parents); + None + }, + Err(AppendTransactionError::Synchronizing) => None, + Ok(transactions) => Some(transactions), + } + } + + fn try_append_transaction(&mut self, hash: H256, transaction: Transaction, relay: bool) -> Result, AppendTransactionError> { // if we are in synchronization state, we will ignore this message if self.state.is_synchronizing() { - return None; + return Err(AppendTransactionError::Synchronizing); } // else => verify transaction + it's orphans and then add to the memory pool @@ -1472,8 +1567,7 @@ impl SynchronizationClientCore where T: TaskExecutor { .map(|input| input.previous_output.hash.clone()) .collect(); if !unknown_parents.is_empty() { - self.orphaned_transactions_pool.insert(hash, transaction, unknown_parents); - return None; + return Err(AppendTransactionError::Orphan(unknown_parents)); } // else verify && insert this transaction && all dependent orphans @@ -1487,7 +1581,7 @@ impl SynchronizationClientCore where T: TaskExecutor { self.do_not_relay.insert(h.clone()); } } - Some(transactions) + Ok(transactions) } fn forget_failed_blocks(&mut self, blocks_to_forget: &[H256]) { @@ -1563,7 +1657,7 @@ impl SynchronizationClientCore where T: TaskExecutor { } } - /// Awake threads, waiting for this block + /// Execute futures, which were waiting for this block verification fn awake_waiting_threads(&mut self, hash: &H256) { // find a peer, which has supplied us with this block if let Entry::Occupied(block_entry) = self.verifying_blocks_by_peer.entry(hash.clone()) { @@ -1698,7 +1792,8 @@ pub mod tests { use network::Magic; use message::common::{InventoryVector, InventoryType}; use message::types; - use super::{Client, Config, SynchronizationClient, SynchronizationClientCore, BlockAnnouncementType, MessageBlockHeadersProvider}; + use super::{Client, Config, SynchronizationClient, SynchronizationClientCore, CoreVerificationSink, + BlockAnnouncementType, MessageBlockHeadersProvider}; use connection_filter::tests::*; use synchronization_executor::Task; use synchronization_chain::{Chain, ChainRef}; @@ -1735,7 +1830,7 @@ pub mod tests { client_core.lock().verify_headers(false); } let mut verifier = verifier.unwrap_or_default(); - verifier.set_sink(client_core.clone()); + verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone()))); let client = SynchronizationClient::new(client_core.clone(), verifier); (event_loop, handle, executor, chain, client_core, client) } diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index c2b2ae95..102bd85d 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -2,7 +2,6 @@ use std::thread; use std::collections::VecDeque; use std::sync::Arc; use std::sync::mpsc::{channel, Sender, Receiver}; -use parking_lot::Mutex; use chain::{Transaction, OutPoint, TransactionOutput}; use network::Magic; use primitives::hash::H256; @@ -11,16 +10,24 @@ use verification::{ChainVerifier, Verify as VerificationVerify, Chain}; use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider}; use time::get_time; -/// Verification events sink -pub trait VerificationSink : Send + 'static { +/// Block verification events sink +pub trait BlockVerificationSink : Send + Sync + 'static { /// When block verification has completed successfully. - fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option>; + fn on_block_verification_success(&self, block: IndexedBlock) -> Option>; /// When block verification has failed. - fn on_block_verification_error(&mut self, err: &str, hash: &H256); + fn on_block_verification_error(&self, err: &str, hash: &H256); +} + +/// Transaction verification events sink +pub trait TransactionVerificationSink : Send + Sync + 'static { /// When transaction verification has completed successfully. - fn on_transaction_verification_success(&mut self, transaction: Transaction); + fn on_transaction_verification_success(&self, transaction: Transaction); /// When transaction verification has failed. - fn on_transaction_verification_error(&mut self, err: &str, hash: &H256); + fn on_transaction_verification_error(&self, err: &str, hash: &H256); +} + +/// Verification events sink +pub trait VerificationSink : BlockVerificationSink + TransactionVerificationSink { } /// Verification thread tasks @@ -60,7 +67,7 @@ struct EmptyTransactionOutputProvider { impl AsyncVerifier { /// Create new async verifier - pub fn new(verifier: Arc, chain: ChainRef, sink: Arc>) -> Self { + pub fn new(verifier: Arc, chain: ChainRef, sink: Arc) -> Self { let (verification_work_sender, verification_work_receiver) = channel(); AsyncVerifier { verification_work_sender: verification_work_sender, @@ -74,7 +81,7 @@ impl AsyncVerifier { } /// Thread procedure for handling verification tasks - fn verification_worker_proc(sink: Arc>, chain: ChainRef, verifier: Arc, work_receiver: Receiver) { + fn verification_worker_proc(sink: Arc, chain: ChainRef, verifier: Arc, work_receiver: Receiver) { while let Ok(task) = work_receiver.recv() { match task { VerificationTask::Stop => break, @@ -119,12 +126,12 @@ pub struct SyncVerifier { /// Verifier verifier: ChainVerifier, /// Verification sink - sink: Arc>, + sink: Arc, } impl SyncVerifier where T: VerificationSink { /// Create new sync verifier - pub fn new(network: Magic, storage: SharedStore, sink: Arc>) -> Self { + pub fn new(network: Magic, storage: SharedStore, sink: Arc) -> Self { let verifier = ChainVerifier::new(storage, network); SyncVerifier { verifier: verifier, @@ -146,7 +153,7 @@ impl Verifier for SyncVerifier where T: VerificationSink { } /// Execute single verification task -fn execute_verification_task(sink: &Arc>, tx_output_provider: &U, verifier: &ChainVerifier, task: VerificationTask) { +fn execute_verification_task(sink: &Arc, tx_output_provider: &U, verifier: &ChainVerifier, task: VerificationTask) { let mut tasks_queue: VecDeque = VecDeque::new(); tasks_queue.push_back(task); @@ -156,24 +163,24 @@ fn execute_verification_task { - if let Some(tasks) = sink.lock().on_block_verification_success(block) { + if let Some(tasks) = sink.on_block_verification_success(block) { tasks_queue.extend(tasks); } }, Ok(Chain::Orphan) => { // this can happen for B1 if B0 verification has failed && we have already scheduled verification of B0 - sink.lock().on_block_verification_error(&format!("orphaned block because parent block verification has failed"), &block.hash()) + sink.on_block_verification_error(&format!("orphaned block because parent block verification has failed"), &block.hash()) }, Err(e) => { - sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash()) + sink.on_block_verification_error(&format!("{:?}", e), &block.hash()) } } }, VerificationTask::VerifyTransaction(height, transaction) => { let time: u32 = get_time().sec as u32; match verifier.verify_transaction(tx_output_provider, height, time, &transaction, 1) { - Ok(_) => sink.lock().on_transaction_verification_success(transaction), - Err(e) => sink.lock().on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()), + Ok(_) => sink.on_transaction_verification_success(transaction), + Err(e) => sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()), } }, _ => unreachable!("must be checked by caller"), @@ -213,22 +220,21 @@ impl PreviousTransactionOutputProvider for EmptyTransactionOutputProvider { pub mod tests { use std::sync::Arc; use std::collections::HashMap; - use parking_lot::Mutex; use chain::Transaction; - use synchronization_client::SynchronizationClientCore; + use synchronization_client::CoreVerificationSink; use synchronization_executor::tests::DummyTaskExecutor; use primitives::hash::H256; - use super::{Verifier, VerificationSink}; + use super::{Verifier, BlockVerificationSink, TransactionVerificationSink}; use db::IndexedBlock; #[derive(Default)] pub struct DummyVerifier { - sink: Option>>>, + sink: Option>>, errors: HashMap } impl DummyVerifier { - pub fn set_sink(&mut self, sink: Arc>>) { + pub fn set_sink(&mut self, sink: Arc>) { self.sink = Some(sink); } @@ -241,9 +247,9 @@ pub mod tests { fn verify_block(&self, block: IndexedBlock) { match self.sink { Some(ref sink) => match self.errors.get(&block.hash()) { - Some(err) => sink.lock().on_block_verification_error(&err, &block.hash()), + Some(err) => sink.on_block_verification_error(&err, &block.hash()), None => { - sink.lock().on_block_verification_success(block); + sink.on_block_verification_success(block); () }, }, @@ -254,8 +260,8 @@ pub mod tests { fn verify_transaction(&self, _height: u32, transaction: Transaction) { match self.sink { Some(ref sink) => match self.errors.get(&transaction.hash()) { - Some(err) => sink.lock().on_transaction_verification_error(&err, &transaction.hash()), - None => sink.lock().on_transaction_verification_success(transaction), + Some(err) => sink.on_transaction_verification_error(&err, &transaction.hash()), + None => sink.on_transaction_verification_success(transaction), }, None => panic!("call set_sink"), }