sync is now connected to transactions verifier

This commit is contained in:
Svyatoslav Nikolsky 2016-11-30 15:15:48 +03:00
parent 4c85a3c3b4
commit 2fa409dc4a
4 changed files with 99 additions and 34 deletions

View File

@ -74,11 +74,11 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S
use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig};
use synchronization_verifier::AsyncVerifier;
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db.clone())));
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
let sync_executor = SyncExecutor::new(sync_chain.clone());
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone());
let verifier = AsyncVerifier::new(network, db, sync_client_core.clone());
let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone());
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
SyncConnectionFactory::with_local_node(sync_node)

View File

@ -183,6 +183,11 @@ impl Chain {
self.storage.clone()
}
/// Get memory pool
pub fn memory_pool(&self) -> &MemoryPool {
&self.memory_pool
}
/// Get number of blocks in given state
pub fn length_of_blocks_state(&self, state: BlockState) -> u32 {
match state {

View File

@ -285,6 +285,8 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
verifying_blocks_by_peer: HashMap<H256, usize>,
/// Verifying blocks futures
verifying_blocks_futures: HashMap<usize, (HashSet<H256>, Vec<BoxFuture<(), ()>>)>,
/// Hashes of items we do not want to relay after verification is completed
do_not_relay: HashSet<H256>,
}
impl Config {
@ -394,8 +396,13 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
let transactions_to_verify = { self.core.lock().on_peer_transaction(peer_index, transaction) };
if let Some(mut transactions_to_verify) = transactions_to_verify {
// it is not actual height of block this transaction will be included to
// => it possibly will be invalid if included in later blocks
// => mined block can be rejected
// => we should verify blocks we mine
let next_block_height = self.best_block().number + 1;
while let Some((_, tx)) = transactions_to_verify.pop_front() {
self.verifier.verify_transaction(tx);
self.verifier.verify_transaction(next_block_height, tx);
}
}
}
@ -625,7 +632,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
// remember that peer has this transaction
self.peers.on_transaction_received(peer_index, &transaction_hash);
self.process_peer_transaction(Some(peer_index), transaction_hash, transaction)
self.process_peer_transaction(Some(peer_index), transaction_hash, transaction, true)
}
/// Peer wants to set bloom filter for the connection
@ -778,6 +785,7 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
/// Process successful block verification
fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option<Vec<VerificationTask>> {
let hash = block.hash();
let needs_relay = !self.do_not_relay.remove(hash);
// insert block to the storage
match {
let mut chain = self.chain.write();
@ -806,15 +814,18 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
// Being able to opt-out of inv messages until the filter is set prevents a client being flooded with traffic in
// the brief window of time between finishing version handshaking and setting the filter.
if self.state.is_saturated() || self.state.is_nearly_saturated() {
self.relay_new_blocks(insert_result.canonized_blocks_hashes);
if needs_relay {
self.relay_new_blocks(insert_result.canonized_blocks_hashes);
}
}
// deal with block transactions
let mut verification_tasks: Vec<VerificationTask> = Vec::with_capacity(insert_result.transactions_to_reverify.len());
let next_block_height = self.best_block().number + 1;
for (hash, tx) in insert_result.transactions_to_reverify {
// TODO: transactions from this blocks will be relayed. Do we need this?
if let Some(tx_orphans) = self.process_peer_transaction(None, hash, tx) {
let tx_tasks = tx_orphans.into_iter().map(|(_, tx)| VerificationTask::VerifyTransaction(tx));
// do not relay resurrected transactions again
if let Some(tx_orphans) = self.process_peer_transaction(None, hash, tx, false) {
let tx_tasks = tx_orphans.into_iter().map(|(_, tx)| VerificationTask::VerifyTransaction(next_block_height, tx));
verification_tasks.extend(tx_tasks);
};
}
@ -836,6 +847,8 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
fn on_block_verification_error(&mut self, err: &str, hash: &H256) {
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash.to_reversed_str(), err);
self.do_not_relay.remove(hash);
{
let mut chain = self.chain.write();
@ -854,6 +867,7 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
/// Process successful transaction verification
fn on_transaction_verification_success(&mut self, transaction: Transaction) {
let hash = transaction.hash();
let needs_relay = !self.do_not_relay.remove(&hash);
let transaction_fee_rate = {
// insert transaction to the memory pool
@ -873,13 +887,17 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
};
// relay transaction to peers
self.relay_new_transactions(vec![(hash, &transaction, transaction_fee_rate)]);
if needs_relay {
self.relay_new_transactions(vec![(hash, &transaction, transaction_fee_rate)]);
}
}
/// Process failed transaction verification
fn on_transaction_verification_error(&mut self, err: &str, hash: &H256) {
warn!(target: "sync", "Transaction {:?} verification failed with error {:?}", hash.to_reversed_str(), err);
self.do_not_relay.remove(hash);
{
let mut chain = self.chain.write();
@ -904,6 +922,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
orphaned_transactions_pool: OrphanTransactionsPool::new(),
verifying_blocks_by_peer: HashMap::new(),
verifying_blocks_futures: HashMap::new(),
do_not_relay: HashSet::new(),
}
));
@ -1197,7 +1216,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
/// Process new peer transaction
fn process_peer_transaction(&mut self, peer_index: Option<usize>, hash: H256, transaction: Transaction) -> Option<VecDeque<(H256, Transaction)>> {
fn process_peer_transaction(&mut self, peer_index: Option<usize>, hash: H256, transaction: Transaction, relay: bool) -> Option<VecDeque<(H256, Transaction)>> {
// if we are in synchronization state, we will ignore this message
if self.state.is_synchronizing() {
return None;
@ -1222,14 +1241,17 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
// else verify && insert this transaction && all dependent orphans
let mut transactons: VecDeque<(H256, Transaction)> = VecDeque::new();
transactons.push_back((hash.clone(), transaction));
transactons.extend(self.orphaned_transactions_pool.remove_transactions_for_parent(&hash));
let mut transactions: VecDeque<(H256, Transaction)> = VecDeque::new();
transactions.push_back((hash.clone(), transaction));
transactions.extend(self.orphaned_transactions_pool.remove_transactions_for_parent(&hash));
// remember that we are verifying these transactions
for &(ref h, ref tx) in &transactons {
for &(ref h, ref tx) in &transactions {
chain.verify_transaction(h.clone(), tx.clone());
if !relay {
self.do_not_relay.insert(h.clone());
}
}
Some(transactons)
Some(transactions)
}
fn prepare_blocks_requests_tasks(&mut self, peers: Vec<usize>, mut hashes: Vec<H256>) -> Vec<Task> {

View File

@ -3,11 +3,12 @@ use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::mpsc::{channel, Sender, Receiver};
use parking_lot::Mutex;
use chain::Transaction;
use chain::{Transaction, OutPoint, TransactionOutput};
use network::Magic;
use primitives::hash::H256;
use synchronization_chain::ChainRef;
use verification::{ChainVerifier, Verify as VerificationVerify, Chain};
use db::{SharedStore, IndexedBlock};
use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider};
/// Verification events sink
pub trait VerificationSink : Send + 'static {
@ -27,7 +28,7 @@ pub enum VerificationTask {
/// Verify single block
VerifyBlock(IndexedBlock),
/// Verify single transaction
VerifyTransaction(Transaction),
VerifyTransaction(u32, Transaction),
/// Stop verification thread
Stop,
}
@ -37,7 +38,7 @@ pub trait Verifier : Send + 'static {
/// Verify block
fn verify_block(&self, block: IndexedBlock);
/// Verify transaction
fn verify_transaction(&self, transaction: Transaction);
fn verify_transaction(&self, height: u32, transaction: Transaction);
}
/// Asynchronous synchronization verifier
@ -48,28 +49,39 @@ pub struct AsyncVerifier {
verification_worker_thread: Option<thread::JoinHandle<()>>,
}
struct ChainMemoryPoolTransactionOutputProvider {
chain: ChainRef,
}
#[derive(Default)]
struct EmptyTransactionOutputProvider {
}
impl AsyncVerifier {
/// Create new async verifier
pub fn new<T: VerificationSink>(network: Magic, storage: SharedStore, sink: Arc<Mutex<T>>) -> Self {
pub fn new<T: VerificationSink>(network: Magic, chain: ChainRef, sink: Arc<Mutex<T>>) -> Self {
let (verification_work_sender, verification_work_receiver) = channel();
let verifier = ChainVerifier::new(storage, network);
let verifier = ChainVerifier::new(chain.read().storage(), network);
AsyncVerifier {
verification_work_sender: verification_work_sender,
verification_worker_thread: Some(thread::Builder::new()
.name("Sync verification thread".to_string())
.spawn(move || {
AsyncVerifier::verification_worker_proc(sink, verifier, verification_work_receiver)
AsyncVerifier::verification_worker_proc(sink, chain, verifier, verification_work_receiver)
})
.expect("Error creating verification thread"))
}
}
/// Thread procedure for handling verification tasks
fn verification_worker_proc<T: VerificationSink>(sink: Arc<Mutex<T>>, verifier: ChainVerifier, work_receiver: Receiver<VerificationTask>) {
fn verification_worker_proc<T: VerificationSink>(sink: Arc<Mutex<T>>, chain: ChainRef, verifier: ChainVerifier, work_receiver: Receiver<VerificationTask>) {
while let Ok(task) = work_receiver.recv() {
match task {
VerificationTask::Stop => break,
_ => execute_verification_task(&sink, &verifier, task),
_ => {
let prevout_provider = ChainMemoryPoolTransactionOutputProvider::with_chain(chain.clone());
execute_verification_task(&sink, &prevout_provider, &verifier, task)
},
}
}
}
@ -95,9 +107,9 @@ impl Verifier for AsyncVerifier {
}
/// Verify transaction
fn verify_transaction(&self, transaction: Transaction) {
fn verify_transaction(&self, height: u32, transaction: Transaction) {
self.verification_work_sender
.send(VerificationTask::VerifyTransaction(transaction))
.send(VerificationTask::VerifyTransaction(height, transaction))
.expect("Verification thread have the same lifetime as `AsyncVerifier`");
}
}
@ -124,17 +136,17 @@ impl<T> SyncVerifier<T> where T: VerificationSink {
impl<T> Verifier for SyncVerifier<T> where T: VerificationSink {
/// Verify block
fn verify_block(&self, block: IndexedBlock) {
execute_verification_task(&self.sink, &self.verifier, VerificationTask::VerifyBlock(block))
execute_verification_task(&self.sink, &EmptyTransactionOutputProvider::default(), &self.verifier, VerificationTask::VerifyBlock(block))
}
/// Verify transaction
fn verify_transaction(&self, transaction: Transaction) {
execute_verification_task(&self.sink, &self.verifier, VerificationTask::VerifyTransaction(transaction))
fn verify_transaction(&self, height: u32, transaction: Transaction) {
execute_verification_task(&self.sink, &EmptyTransactionOutputProvider::default(), &self.verifier, VerificationTask::VerifyTransaction(height, transaction))
}
}
/// Execute single verification task
fn execute_verification_task<T: VerificationSink>(sink: &Arc<Mutex<T>>, verifier: &ChainVerifier, task: VerificationTask) {
fn execute_verification_task<T: VerificationSink, U: PreviousTransactionOutputProvider>(sink: &Arc<Mutex<T>>, tx_output_provider: &U, verifier: &ChainVerifier, task: VerificationTask) {
let mut tasks_queue: VecDeque<VerificationTask> = VecDeque::new();
tasks_queue.push_back(task);
@ -156,15 +168,41 @@ fn execute_verification_task<T: VerificationSink>(sink: &Arc<Mutex<T>>, verifier
}
}
},
VerificationTask::VerifyTransaction(transaction) => {
// TODO: add verification here
sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash())
VerificationTask::VerifyTransaction(height, transaction) => {
// bitcoin: AcceptToMemoryPoolWorker
let time: u32 = 0; // TODO
let sequence: usize = 1; // TODO: change to bool
match verifier.verify_transaction(tx_output_provider, height, time, &transaction, sequence) {
Ok(_) => sink.lock().on_transaction_verification_success(transaction),
Err(e) => sink.lock().on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()),
}
},
_ => unreachable!("must be checked by caller"),
}
}
}
impl ChainMemoryPoolTransactionOutputProvider {
pub fn with_chain(chain: ChainRef) -> Self {
ChainMemoryPoolTransactionOutputProvider {
chain: chain,
}
}
}
impl PreviousTransactionOutputProvider for ChainMemoryPoolTransactionOutputProvider {
fn previous_transaction_output(&self, prevout: &OutPoint) -> Option<TransactionOutput> {
self.chain.read().memory_pool().previous_transaction_output(prevout)
}
}
impl PreviousTransactionOutputProvider for EmptyTransactionOutputProvider {
fn previous_transaction_output(&self, _prevout: &OutPoint) -> Option<TransactionOutput> {
None
}
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
@ -207,7 +245,7 @@ pub mod tests {
}
}
fn verify_transaction(&self, transaction: Transaction) {
fn verify_transaction(&self, _height: u32, transaction: Transaction) {
match self.sink {
Some(ref sink) => match self.errors.get(&transaction.hash()) {
Some(err) => sink.lock().on_transaction_verification_error(&err, &transaction.hash()),