implemented blocknotify option

This commit is contained in:
Svyatoslav Nikolsky 2017-01-10 11:58:55 +03:00
parent d60820ca57
commit 07d4f00241
12 changed files with 231 additions and 5 deletions

2
Cargo.lock generated
View File

@ -694,12 +694,14 @@ dependencies = [
"env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"import 0.1.0",
"keys 0.1.0",
"libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"logs 0.1.0",
"message 0.1.0",
"miner 0.1.0",
"network 0.1.0",
"p2p 0.1.0",
"primitives 0.1.0",
"rpc 0.1.0",
"script 0.1.0",
"sync 0.1.0",

View File

@ -9,6 +9,7 @@ description = "Parity bitcoin client."
log = "0.3"
env_logger = "0.3"
app_dirs = "^1.1.1"
libc = "0.2"
clap = { version = "2", features = ["yaml"] }
chain = { path = "chain" }
keys = { path = "keys" }
@ -24,6 +25,7 @@ import = { path = "import" }
bencher = { path = "bencher" }
logs = { path = "logs" }
rpc = { path = "rpc" }
primitives = { path = "primitives" }
[[bin]]
path = "pbtc/main.rs"

View File

@ -71,6 +71,11 @@ args:
help: List of allowed Host header values
takes_value: true
value_name: HOSTS
- blocknotify:
long: blocknotify
help: Execute command when the best block changes (%s in cmd is replaced by block hash)
takes_value: true
value_name: command
subcommands:
- import:
about: Import blocks from bitcoin core database

View File

@ -1,10 +1,87 @@
use std::net::SocketAddr;
use sync::{create_sync_peers, create_local_sync_node, create_sync_connection_factory};
use std::thread;
use std::sync::Arc;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::atomic::{AtomicBool, Ordering};
use sync::{create_sync_peers, create_local_sync_node, create_sync_connection_factory, SyncListener};
use message::Services;
use primitives::hash::H256;
use util::{open_db, init_db, node_table_path};
use {config, p2p, PROTOCOL_VERSION, PROTOCOL_MINIMUM};
use super::super::rpc;
enum BlockNotifierTask {
NewBlock(H256),
Stop,
}
struct BlockNotifier {
tx: Sender<BlockNotifierTask>,
is_synchronizing: Arc<AtomicBool>,
worker_thread: Option<thread::JoinHandle<()>>,
}
impl BlockNotifier {
pub fn new(block_notify_command: String) -> Self {
let (tx, rx) = channel();
let is_synchronizing = Arc::new(AtomicBool::default());
BlockNotifier {
tx: tx,
is_synchronizing: is_synchronizing.clone(),
worker_thread: Some(thread::Builder::new()
.name("Block notification thread".to_owned())
.spawn(move || BlockNotifier::worker(rx, block_notify_command))
.expect("Error creating block notification thread"))
}
}
fn worker(rx: Receiver<BlockNotifierTask>, block_notify_command: String) {
for cmd in rx {
match cmd {
BlockNotifierTask::NewBlock(new_block_hash) => {
let new_block_hash = new_block_hash.to_reversed_str();
let command = block_notify_command.replace("%s", &new_block_hash);
let c_command = ::std::ffi::CString::new(command.clone()).unwrap();
unsafe {
use libc::system;
let err = system(c_command.as_ptr());
if err != 0 {
error!(target: "pbtc", "Block notification command {} exited with error code {}", command, err);
}
}
},
BlockNotifierTask::Stop => {
break
}
}
}
trace!(target: "pbtc", "Block notification thread stopped");
}
}
impl SyncListener for BlockNotifier {
fn synchronization_state_switched(&self, is_synchronizing: bool) {
self.is_synchronizing.store(is_synchronizing, Ordering::SeqCst);
}
fn best_storage_block_inserted(&self, block_hash: &H256) {
if !self.is_synchronizing.load(Ordering::SeqCst) {
self.tx.send(BlockNotifierTask::NewBlock(block_hash.clone()))
.expect("Block notification thread have the same lifetime as `BlockNotifier`")
}
}
}
impl Drop for BlockNotifier {
fn drop(&mut self) {
if let Some(join_handle) = self.worker_thread.take() {
let _ = self.tx.send(BlockNotifierTask::Stop);
join_handle.join().expect("Clean shutdown.");
}
}
}
pub fn start(cfg: config::Config) -> Result<(), String> {
let mut el = p2p::event_loop();
@ -38,6 +115,10 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
let local_sync_node = create_local_sync_node(&sync_handle, cfg.magic, db.clone(), sync_peers.clone());
let sync_connection_factory = create_sync_connection_factory(sync_peers.clone(), local_sync_node.clone());
if let Some(block_notify_command) = cfg.block_notify_command {
local_sync_node.install_sync_listener(Box::new(BlockNotifier::new(block_notify_command)));
}
let p2p = try!(p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle()).map_err(|x| x.to_string()));
let rpc_deps = rpc::Dependencies {
network: cfg.magic,

View File

@ -21,6 +21,7 @@ pub struct Config {
pub user_agent: String,
pub internet_protocol: InternetProtocol,
pub rpc_config: RpcHttpConfig,
pub block_notify_command: Option<String>,
}
pub const DEFAULT_DB_CACHE: usize = 512;
@ -91,6 +92,11 @@ pub fn parse(matches: &clap::ArgMatches) -> Result<Config, String> {
let rpc_config = try!(parse_rpc_config(magic, matches));
let block_notify_command = match matches.value_of("blocknotify") {
Some(s) => Some(try!(s.parse().map_err(|_| "Invalid blocknotify commmand".to_owned()))),
None => None,
};
let config = Config {
print_to_console: print_to_console,
magic: magic,
@ -105,6 +111,7 @@ pub fn parse(matches: &clap::ArgMatches) -> Result<Config, String> {
user_agent: user_agent.to_string(),
internet_protocol: only_net,
rpc_config: rpc_config,
block_notify_command: block_notify_command,
};
Ok(config)

View File

@ -6,6 +6,7 @@ extern crate clap;
extern crate log;
extern crate env_logger;
extern crate app_dirs;
extern crate libc;
extern crate db;
extern crate chain;
@ -18,6 +19,7 @@ extern crate p2p;
extern crate sync;
extern crate import;
extern crate rpc as ethcore_rpc;
extern crate primitives;
mod commands;
mod config;

View File

@ -48,6 +48,7 @@ use std::sync::Arc;
use parking_lot::RwLock;
use tokio_core::reactor::Handle;
use network::Magic;
use primitives::hash::H256;
use verification::BackwardsCompatibleChainVerifier as ChainVerifier;
/// Sync errors.
@ -61,6 +62,14 @@ pub enum Error {
Verification(String),
}
/// Synchronization events listener
pub trait SyncListener: Send + 'static {
/// Called when node switches to synchronization state
fn synchronization_state_switched(&self, is_synchronizing: bool);
/// Called when new best storage block is inserted
fn best_storage_block_inserted(&self, block_hash: &H256);
}
/// Create blocks writer.
pub fn create_sync_blocks_writer(db: db::SharedStore, network: Magic, verification: bool) -> blocks_writer::BlocksWriter {
blocks_writer::BlocksWriter::new(db, network, verification)

View File

@ -14,7 +14,7 @@ use primitives::hash::H256;
use miner::BlockTemplate;
use synchronization_peers::{TransactionAnnouncementType, BlockAnnouncementType};
use types::{PeerIndex, RequestId, StorageRef, MemoryPoolRef, PeersRef, ExecutorRef,
ClientRef, ServerRef, SynchronizationStateRef};
ClientRef, ServerRef, SynchronizationStateRef, SyncListenerRef};
/// Local synchronization node
pub struct LocalNode<T: TaskExecutor, U: Server, V: Client> {
@ -261,6 +261,7 @@ impl<T, U, V> LocalNode<T, U, V> where T: TaskExecutor, U: Server, V: Client {
self.peers.misbehaving(peer_index, "Got unrequested 'blocktxn' message");
}
/// Verify and then schedule new transaction
pub fn accept_transaction(&self, transaction: Transaction) -> Result<H256, String> {
let sink_data = Arc::new(TransactionAcceptSinkData::default());
let sink = TransactionAcceptSink::new(sink_data.clone()).boxed();
@ -272,11 +273,17 @@ impl<T, U, V> LocalNode<T, U, V> where T: TaskExecutor, U: Server, V: Client {
sink_data.wait()
}
/// Get block template for mining
pub fn get_block_template(&self) -> BlockTemplate {
let block_assembler = BlockAssembler::default();
let memory_pool = &*self.memory_pool.read();
block_assembler.create_new_block(&self.storage, memory_pool, time::get_time().sec as u32, self.network)
}
/// Install synchronization events listener
pub fn install_sync_listener(&self, listener: SyncListenerRef) {
self.client.install_sync_listener(listener);
}
}
impl TransactionAcceptSink {

View File

@ -5,7 +5,7 @@ use message::types;
use synchronization_executor::TaskExecutor;
use synchronization_verifier::{Verifier, TransactionVerificationSink};
use synchronization_client_core::{ClientCore, SynchronizationClientCore};
use types::{PeerIndex, ClientCoreRef, SynchronizationStateRef, EmptyBoxFuture};
use types::{PeerIndex, ClientCoreRef, SynchronizationStateRef, EmptyBoxFuture, SyncListenerRef};
#[cfg_attr(feature="cargo-clippy", allow(doc_markdown))]
///! TODO: update with headers-first corrections
@ -130,6 +130,7 @@ pub trait Client : Send + Sync + 'static {
fn on_notfound(&self, peer_index: PeerIndex, message: types::NotFound);
fn after_peer_nearly_blocks_verified(&self, peer_index: PeerIndex, future: EmptyBoxFuture);
fn accept_transaction(&self, transaction: Transaction, sink: Box<TransactionVerificationSink>) -> Result<(), String>;
fn install_sync_listener(&self, listener: SyncListenerRef);
}
/// Synchronization client facade
@ -222,6 +223,10 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
}
Ok(())
}
fn install_sync_listener(&self, listener: SyncListenerRef) {
self.core.lock().install_sync_listener(listener);
}
}
impl<T, U> SynchronizationClient<T, U> where T: TaskExecutor, U: Verifier {

View File

@ -24,7 +24,7 @@ use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchr
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
use synchronization_peers_tasks::PeersTasks;
use synchronization_verifier::{VerificationSink, BlockVerificationSink, TransactionVerificationSink, VerificationTask};
use types::{BlockHeight, ClientCoreRef, PeersRef, PeerIndex, SynchronizationStateRef, EmptyBoxFuture};
use types::{BlockHeight, ClientCoreRef, PeersRef, PeerIndex, SynchronizationStateRef, EmptyBoxFuture, SyncListenerRef};
use utils::{AverageSpeedMeter, MessageBlockHeadersProvider, OrphanBlocksPool, OrphanTransactionsPool, HashPosition};
#[cfg(test)] use synchronization_peers::Peers;
#[cfg(test)] use synchronization_peers_tasks::{Information as PeersTasksInformation};
@ -76,6 +76,7 @@ pub trait ClientCore {
fn on_notfound(&mut self, peer_index: PeerIndex, message: types::NotFound);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: PeerIndex, future: EmptyBoxFuture);
fn accept_transaction(&mut self, transaction: Transaction, sink: Box<TransactionVerificationSink>) -> Result<VecDeque<IndexedTransaction>, String>;
fn install_sync_listener(&mut self, listener: SyncListenerRef);
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>, final_blocks_requests: Option<Vec<H256>>);
fn try_switch_to_saturated_state(&mut self) -> bool;
}
@ -131,6 +132,8 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
sync_speed_meter: AverageSpeedMeter,
/// Configuration
config: Config,
/// Synchronization events listener
listener: Option<SyncListenerRef>,
}
/// Verification sink for synchronization client core
@ -511,6 +514,12 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
}
fn install_sync_listener(&mut self, listener: SyncListenerRef) {
// currently single, single-setup listener is supported
assert!(self.listener.is_none());
self.listener = Some(listener);
}
/// Schedule new synchronization tasks, if any.
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>, final_blocks_requests: Option<Vec<H256>>) {
let mut tasks: Vec<Task> = Vec::new();
@ -726,6 +735,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
block_speed_meter: AverageSpeedMeter::with_inspect_items(SYNC_SPEED_BLOCKS_TO_INSPECT),
sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT),
config: config,
listener: None,
}
));
@ -939,6 +949,10 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
return;
}
if let Some(ref listener) = self.listener {
listener.synchronization_state_switched(true);
}
self.shared_state.update_synchronizing(true);
self.state = State::Synchronizing(time::precise_time_s(), self.chain.best_storage_block().number);
}
@ -949,6 +963,10 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
return;
}
if let Some(ref listener) = self.listener {
listener.synchronization_state_switched(false);
}
self.shared_state.update_synchronizing(false);
self.state = State::NearlySaturated;
}
@ -959,6 +977,10 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
return;
}
if let Some(ref listener) = self.listener {
listener.synchronization_state_switched(false);
}
self.shared_state.update_synchronizing(false);
self.state = State::Saturated;
self.peers_tasks.reset();
@ -1013,6 +1035,13 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
// update shared state
self.shared_state.update_best_storage_block_height(self.chain.best_storage_block().number);
// notify listener
if let Some(best_block_hash) = insert_result.canonized_blocks_hashes.last() {
if let Some(ref listener) = self.listener {
listener.best_storage_block_inserted(best_block_hash);
}
}
// awake threads, waiting for this block insertion
self.awake_waiting_threads(block.hash());
@ -1175,7 +1204,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use tokio_core::reactor::{Core, Handle};
use chain::{Block, Transaction};
use db;
@ -1198,6 +1227,35 @@ pub mod tests {
use utils::SynchronizationState;
use types::{PeerIndex, StorageRef, SynchronizationStateRef, ClientCoreRef};
use super::{Config, SynchronizationClientCore, ClientCore, CoreVerificationSink};
use super::super::SyncListener;
#[derive(Default)]
struct DummySyncListenerData {
pub is_synchronizing: bool,
pub best_blocks: Vec<H256>,
}
struct DummySyncListener {
data: Arc<Mutex<DummySyncListenerData>>,
}
impl DummySyncListener {
pub fn new(data: Arc<Mutex<DummySyncListenerData>>) -> Self {
DummySyncListener {
data: data,
}
}
}
impl SyncListener for DummySyncListener {
fn synchronization_state_switched(&self, is_synchronizing: bool) {
self.data.lock().is_synchronizing = is_synchronizing;
}
fn best_storage_block_inserted(&self, block_hash: &H256) {
self.data.lock().best_blocks.push(block_hash.clone());
}
}
fn create_disk_storage() -> StorageRef {
let path = RandomTempPath::create_dir();
@ -2262,4 +2320,42 @@ pub mod tests {
assert!(mempool.write().remove_by_hash(&b1.transactions[2].hash()).is_some());
assert!(mempool.write().remove_by_hash(&tx3.hash()).is_some());
}
#[test]
fn sync_listener_calls() {
let (_, _, _, _, sync) = create_sync(None, None);
// install sync listener
let data = Arc::new(Mutex::new(DummySyncListenerData::default()));
sync.install_sync_listener(Box::new(DummySyncListener::new(data.clone())));
// at the beginning, is_synchronizing must be equal to false
assert_eq!(data.lock().is_synchronizing, false);
assert_eq!(data.lock().best_blocks.len(), 0);
// supply with new block header => is_synchronizing is still false
sync.on_headers(0, types::Headers::with_headers(vec![test_data::block_h1().block_header]));
assert_eq!(data.lock().is_synchronizing, false);
assert_eq!(data.lock().best_blocks.len(), 0);
// supply with 2 new blocks headers => is_synchronizing is true
sync.on_headers(0, types::Headers::with_headers(vec![test_data::block_h2().block_header, test_data::block_h3().block_header]));
assert_eq!(data.lock().is_synchronizing, true);
assert_eq!(data.lock().best_blocks.len(), 0);
// supply with block 3 => no new best block is informed
sync.on_block(0, test_data::block_h3().into());
assert_eq!(data.lock().is_synchronizing, true);
assert_eq!(data.lock().best_blocks.len(), 0);
// supply with block 1 => new best block is informed
sync.on_block(0, test_data::block_h1().into());
assert_eq!(data.lock().is_synchronizing, true);
assert_eq!(data.lock().best_blocks.len(), 1);
// supply with block 2 => 2 new best block is informed
sync.on_block(0, test_data::block_h2().into());
assert_eq!(data.lock().is_synchronizing, false);
assert_eq!(data.lock().best_blocks.len(), 3);
}
}

View File

@ -4,6 +4,7 @@ use parking_lot::{Mutex, RwLock};
use db;
use local_node::LocalNode;
use miner::MemoryPool;
use super::SyncListener;
use synchronization_client::SynchronizationClient;
use synchronization_executor::LocalSynchronizationTaskExecutor;
use synchronization_peers::Peers;
@ -48,3 +49,6 @@ pub type ServerRef<T> = Arc<T>;
/// Reference to local node
pub type LocalNodeRef = Arc<LocalNode<LocalSynchronizationTaskExecutor, ServerImpl, SynchronizationClient<LocalSynchronizationTaskExecutor, AsyncVerifier>>>;
/// Synchronization events listener reference
pub type SyncListenerRef = Box<SyncListener>;

View File

@ -32,6 +32,12 @@ pub fn block_h2() -> Block {
"010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into()
}
// https://webbtc.com/block/0000000082b5015589a3fdf2d4baff403e6f0be035a5d9742c1cae6295464449.hex
// height 3
pub fn block_h3() -> Block {
"01000000bddd99ccfda39da1b108ce1a5d70038d0a967bacb68b6b63065f626a0000000044f672226090d85db9a9f2fbfe5f0f9609b387af7be5b7fbb7a1767c831c9e995dbe6649ffff001d05e0ed6d0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010effffffff0100f2052a0100000043410494b9d3e76c5b1629ecf97fff95d7a4bbdac87cc26099ada28066c6ff1eb9191223cd897194a08d0c2726c5747f1db49e8cf90e75dc3e3550ae9b30086f3cd5aaac00000000".into()
}
pub fn genesis() -> Block {
"0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into()
}