fixing sync TODOs

This commit is contained in:
Svyatoslav Nikolsky 2016-12-01 11:00:09 +03:00
parent 4df6d03bbc
commit 9e53289b5a
7 changed files with 194 additions and 46 deletions

View File

@ -24,7 +24,7 @@ pub struct ConnectionFilter {
/// Filter update type. /// Filter update type.
filter_flags: types::FilterFlags, filter_flags: types::FilterFlags,
/// Last blocks from peer. /// Last blocks from peer.
last_blocks: LinkedHashMap<H256, ()>, last_blocks: LinkedHashMap<H256, bool>,
/// Last transactions from peer. /// Last transactions from peer.
last_transactions: LinkedHashMap<H256, ()>, last_transactions: LinkedHashMap<H256, ()>,
/// Minimal fee in satoshis per 1000 bytes /// Minimal fee in satoshis per 1000 bytes
@ -91,14 +91,14 @@ impl ConnectionFilter {
} }
/// We have a knowledge that block with given hash is known to this connection /// We have a knowledge that block with given hash is known to this connection
pub fn known_block(&mut self, block_hash: &H256) { pub fn known_block(&mut self, block_hash: &H256, is_sent_compact: bool) {
// remember that peer knows about this block // remember that peer knows about this block
if !self.last_blocks.contains_key(block_hash) { if !self.last_blocks.contains_key(block_hash) {
if self.last_blocks.len() == MAX_LAST_BLOCKS_TO_STORE { if self.last_blocks.len() == MAX_LAST_BLOCKS_TO_STORE {
self.last_blocks.pop_front(); self.last_blocks.pop_front();
} }
self.last_blocks.insert(block_hash.clone(), ()); self.last_blocks.insert(block_hash.clone(), is_sent_compact);
} }
} }
@ -114,6 +114,11 @@ impl ConnectionFilter {
} }
} }
/// Is compact block with this hash has been sent recently
pub fn is_known_compact_block(&self, block_hash: &H256) -> bool {
self.last_blocks.get(block_hash).cloned().unwrap_or(false)
}
/// Check if block should be sent to this connection /// Check if block should be sent to this connection
pub fn filter_block(&self, block_hash: &H256) -> bool { pub fn filter_block(&self, block_hash: &H256) -> bool {
// check if block is known // check if block is known
@ -662,15 +667,15 @@ pub mod tests {
let mut filter = ConnectionFilter::default(); let mut filter = ConnectionFilter::default();
assert!(filter.filter_block(&blocks[0].hash())); assert!(filter.filter_block(&blocks[0].hash()));
filter.known_block(&blocks[0].hash()); filter.known_block(&blocks[0].hash(), false);
assert!(!filter.filter_block(&blocks[0].hash())); assert!(!filter.filter_block(&blocks[0].hash()));
for block in blocks.iter().skip(1).take(MAX_LAST_BLOCKS_TO_STORE - 1) { for block in blocks.iter().skip(1).take(MAX_LAST_BLOCKS_TO_STORE - 1) {
filter.known_block(&block.hash()); filter.known_block(&block.hash(), false);
assert!(!filter.filter_block(&blocks[0].hash())); assert!(!filter.filter_block(&blocks[0].hash()));
} }
filter.known_block(&blocks[MAX_LAST_BLOCKS_TO_STORE].hash()); filter.known_block(&blocks[MAX_LAST_BLOCKS_TO_STORE].hash(), false);
assert!(filter.filter_block(&blocks[0].hash())); assert!(filter.filter_block(&blocks[0].hash()));
} }
@ -694,4 +699,13 @@ pub mod tests {
filter.known_transaction(&transactions[MAX_LAST_TRANSACTIONS_TO_STORE].hash()); filter.known_transaction(&transactions[MAX_LAST_TRANSACTIONS_TO_STORE].hash());
assert!(filter.filter_transaction(&transactions[0].hash(), &transactions[0], None)); assert!(filter.filter_transaction(&transactions[0].hash(), &transactions[0], None));
} }
#[test]
fn known_compact_block() {
let mut filter = ConnectionFilter::default();
filter.known_block(&test_data::block_h1().hash(), true);
filter.known_block(&test_data::block_h2().hash(), false);
assert!(filter.is_known_compact_block(&test_data::block_h1().hash()));
assert!(!filter.is_known_compact_block(&test_data::block_h2().hash()));
}
} }

View File

@ -216,7 +216,15 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
pub fn on_peer_get_block_txn(&self, peer_index: usize, message: types::GetBlockTxn) { pub fn on_peer_get_block_txn(&self, peer_index: usize, message: types::GetBlockTxn) {
trace!(target: "sync", "Got `getblocktxn` message from peer#{}", peer_index); trace!(target: "sync", "Got `getblocktxn` message from peer#{}", peer_index);
self.server.serve_get_block_txn(peer_index, message.request.blockhash, message.request.indexes).map(|t| self.server.add_task(peer_index, t)); // Upon receipt of a properly-formatted getblocktxn message, nodes which *recently provided the sender
// of such a message a cmpctblock for the block hash identified in this message* MUST respond ...
// => we should check if we have send cmpctblock before
if {
let mut client = self.client.lock();
client.is_compact_block_sent_recently(peer_index, &message.request.blockhash)
} {
self.server.serve_get_block_txn(peer_index, message.request.blockhash, message.request.indexes).map(|t| self.server.add_task(peer_index, t));
}
} }
pub fn on_peer_block_txn(&self, peer_index: usize, _message: types::BlockTxn) { pub fn on_peer_block_txn(&self, peer_index: usize, _message: types::BlockTxn) {
@ -256,7 +264,7 @@ mod tests {
use synchronization_chain::Chain; use synchronization_chain::Chain;
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef}; use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
use message::types; use message::types;
use message::common::{InventoryVector, InventoryType}; use message::common::{InventoryVector, InventoryType, BlockTransactionsRequest};
use db; use db;
use super::LocalNode; use super::LocalNode;
use test_data; use test_data;
@ -475,4 +483,67 @@ mod tests {
_ => panic!("unexpected"), _ => panic!("unexpected"),
} }
} }
#[test]
fn local_node_serves_get_block_txn_when_recently_sent_compact_block() {
let (_, _, _, server, local_node) = create_local_node();
let genesis = test_data::genesis();
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
.transaction().output().value(10).build().build()
.build(); // genesis -> b1
let b1_hash = b1.hash();
// Append block
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() });
// Request compact block
let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![
InventoryVector { inv_type: InventoryType::MessageCompactBlock, hash: b1_hash.clone() },
]});
// forget tasks
server.take_tasks();
// Request compact transaction from this block
local_node.on_peer_get_block_txn(peer_index2, types::GetBlockTxn {
request: BlockTransactionsRequest {
blockhash: b1_hash.clone(),
indexes: vec![0],
}
});
let tasks = server.take_tasks();
assert_eq!(tasks, vec![(2, ServerTask::ServeGetBlockTxn(b1_hash, vec![0]))]);
}
#[test]
fn local_node_not_serves_get_block_txn_when_compact_block_was_not_sent() {
let (_, _, _, server, local_node) = create_local_node();
let genesis = test_data::genesis();
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
.transaction().output().value(10).build().build()
.build(); // genesis -> b1
let b1_hash = b1.hash();
// Append block
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() });
// Request compact transaction from this block
let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_get_block_txn(peer_index2, types::GetBlockTxn {
request: BlockTransactionsRequest {
blockhash: b1_hash,
indexes: vec![0],
}
});
let tasks = server.take_tasks();
assert_eq!(tasks, vec![]);
}
} }

View File

@ -185,6 +185,7 @@ pub struct Information {
pub trait Client : Send + 'static { pub trait Client : Send + 'static {
fn best_block(&self) -> db::BestBlock; fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State; fn state(&self) -> State;
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool;
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory; fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory;
fn on_peer_connected(&mut self, peer_index: usize); fn on_peer_connected(&mut self, peer_index: usize);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>); fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
@ -206,6 +207,7 @@ pub trait Client : Send + 'static {
pub trait ClientCore : VerificationSink { pub trait ClientCore : VerificationSink {
fn best_block(&self) -> db::BestBlock; fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State; fn state(&self) -> State;
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool;
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory; fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory;
fn on_peer_connected(&mut self, peer_index: usize); fn on_peer_connected(&mut self, peer_index: usize);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>); fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
@ -353,6 +355,10 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
self.core.lock().state() self.core.lock().state()
} }
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool {
self.core.lock().is_compact_block_sent_recently(peer_index, hash)
}
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory { fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
self.core.lock().filter_getdata_inventory(peer_index, inventory) self.core.lock().filter_getdata_inventory(peer_index, inventory)
} }
@ -469,6 +475,11 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
self.state self.state
} }
/// Returns true if compactblock with this hash has been sent to this peer recently
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool {
self.peers.filter(peer_index).is_known_compact_block(hash)
}
/// Filter inventory from `getdata` message for given peer /// Filter inventory from `getdata` message for given peer
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory { fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
let storage = { self.chain.read().storage() }; let storage = { self.chain.read().storage() };
@ -489,7 +500,10 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
None => notfound.push(item), None => notfound.push(item),
Some(block) => match filter.build_merkle_block(block) { Some(block) => match filter.build_merkle_block(block) {
None => notfound.push(item), None => notfound.push(item),
Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)), Some(merkleblock) => {
filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions));
filter.known_block(&item.hash, false);
},
}, },
} }
}, },
@ -508,6 +522,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
header: build_compact_block(indexed_block, prefilled_transactions_indexes), header: build_compact_block(indexed_block, prefilled_transactions_indexes),
}; };
compacted.push(compact_block); compacted.push(compact_block);
filter.known_block(&item.hash, true);
}, },
} }
}, },

View File

@ -77,8 +77,6 @@ impl PeersConnections for LocalSynchronizationTaskExecutor {
impl TaskExecutor for LocalSynchronizationTaskExecutor { impl TaskExecutor for LocalSynchronizationTaskExecutor {
fn execute(&mut self, task: Task) { fn execute(&mut self, task: Task) {
// TODO: what is types::GetBlocks::version here? (@ PR#37)
match task { match task {
Task::RequestBlocks(peer_index, blocks_hashes) => { Task::RequestBlocks(peer_index, blocks_hashes) => {
let getdata = types::GetData { let getdata = types::GetData {
@ -97,7 +95,7 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
Task::RequestBlocksHeaders(peer_index) => { Task::RequestBlocksHeaders(peer_index) => {
let block_locator_hashes = self.chain.read().block_locator_hashes(); let block_locator_hashes = self.chain.read().block_locator_hashes();
let getheaders = types::GetHeaders { let getheaders = types::GetHeaders {
version: 0, version: 0, // this field is ignored by clients
block_locator_hashes: block_locator_hashes, block_locator_hashes: block_locator_hashes,
hash_stop: H256::default(), hash_stop: H256::default(),
}; };

View File

@ -244,7 +244,7 @@ impl Peers {
} }
// remember that peer knows about this block // remember that peer knows about this block
self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default).known_block(block_hash); self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default).known_block(block_hash, false);
} }
/// Transaction is received from peer. /// Transaction is received from peer.

View File

@ -123,14 +123,27 @@ impl SynchronizationServer {
server server
} }
fn locate_known_block_hash(chain: &Chain, block_locator_hashes: &Vec<H256>) -> Option<db::BestBlock> { fn locate_known_block_hash(chain: &Chain, block_locator_hashes: &Vec<H256>, stop_hash: &H256) -> Option<db::BestBlock> {
block_locator_hashes.into_iter() block_locator_hashes.into_iter()
.filter_map(|hash| SynchronizationServer::locate_best_known_block_hash(&chain, hash)) .filter_map(|hash| SynchronizationServer::locate_best_known_block_hash(&chain, hash))
.nth(0) .nth(0)
.or_else(|| if stop_hash != &H256::default() {
if let Some(stop_hash_number) = chain.storage().block_number(stop_hash) {
Some(db::BestBlock {
number: stop_hash_number,
hash: stop_hash.clone(),
})
} else {
None
}
} else {
None
}
)
} }
fn locate_known_block_header(chain: &Chain, block_locator_hashes: &Vec<H256>) -> Option<db::BestBlock> { fn locate_known_block_header(chain: &Chain, block_locator_hashes: &Vec<H256>, stop_hash: &H256) -> Option<db::BestBlock> {
SynchronizationServer::locate_known_block_hash(chain, block_locator_hashes) SynchronizationServer::locate_known_block_hash(chain, block_locator_hashes, stop_hash)
} }
fn server_worker<T: TaskExecutor>(queue_ready: Arc<Condvar>, queue: Arc<Mutex<ServerQueue>>, chain: ChainRef, executor: Arc<Mutex<T>>) { fn server_worker<T: TaskExecutor>(queue_ready: Arc<Condvar>, queue: Arc<Mutex<ServerQueue>>, chain: ChainRef, executor: Arc<Mutex<T>>) {
@ -223,22 +236,23 @@ impl SynchronizationServer {
assert_eq!(indexed_task.id, ServerTaskIndex::None); assert_eq!(indexed_task.id, ServerTaskIndex::None);
let chain = chain.read(); let chain = chain.read();
if let Some(best_common_block) = SynchronizationServer::locate_known_block_hash(&chain, &block_locator_hashes) { let blocks_hashes = match SynchronizationServer::locate_known_block_hash(&chain, &block_locator_hashes, &hash_stop) {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); Some(best_common_block) => {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
SynchronizationServer::blocks_hashes_after(&chain, &best_common_block, &hash_stop, 500)
},
None => {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
Vec::new()
},
};
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_common_block, &hash_stop, 500); trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
if !blocks_hashes.is_empty() { let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector {
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index); inv_type: InventoryType::MessageBlock,
let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector { hash: hash,
inv_type: InventoryType::MessageBlock, }).collect();
hash: hash, executor.lock().execute(Task::SendInventory(peer_index, inventory));
}).collect();
executor.lock().execute(Task::SendInventory(peer_index, inventory));
}
}
else {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
}
// inform that we have processed task for peer // inform that we have processed task for peer
queue.lock().task_processed(peer_index); queue.lock().task_processed(peer_index);
}, },
@ -246,19 +260,15 @@ impl SynchronizationServer {
ServerTask::ServeGetHeaders(block_locator_hashes, hash_stop) => { ServerTask::ServeGetHeaders(block_locator_hashes, hash_stop) => {
let chain = chain.read(); let chain = chain.read();
// TODO: if block_locator_hashes is empty => return hash_stop let blocks_headers = match SynchronizationServer::locate_known_block_header(&chain, &block_locator_hashes, &hash_stop) {
let blocks_headers = match SynchronizationServer::locate_known_block_header(&chain, &block_locator_hashes) {
Some(best_common_block) => { Some(best_common_block) => {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str()); trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
// TODO: add test for this case
// we must respond with empty headers message even if we have no common blocks with this peer
SynchronizationServer::blocks_headers_after(&chain, &best_common_block, &hash_stop, 2000) SynchronizationServer::blocks_headers_after(&chain, &best_common_block, &hash_stop, 2000)
}, },
None => { None => {
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index); trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
Vec::new() Vec::new()
} },
}; };
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index); trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
@ -447,8 +457,6 @@ impl Server for SynchronizationServer {
} }
fn serve_get_block_txn(&self, _peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask> { fn serve_get_block_txn(&self, _peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask> {
// TODO: Upon receipt of a properly-formatted getblocktxn message, nodes which *recently provided the sender
// of such a message a cmpctblock for the block hash identified in this message* MUST respond ...
let task = IndexedServerTask::new(ServerTask::ServeGetBlockTxn(block_hash, indexes), ServerTaskIndex::None); let task = IndexedServerTask::new(ServerTask::ServeGetBlockTxn(block_hash, indexes), ServerTaskIndex::None);
Some(task) Some(task)
} }
@ -650,9 +658,9 @@ pub mod tests {
block_locator_hashes: vec![genesis_block_hash.clone()], block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(), hash_stop: H256::default(),
}).map(|t| server.add_task(0, t)); }).map(|t| server.add_task(0, t));
// => no response // => empty response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]); assert_eq!(tasks, vec![Task::SendInventory(0, vec![])]);
} }
#[test] #[test]
@ -815,4 +823,48 @@ pub mod tests {
Task::SendTransaction(0, tx_verified), Task::SendTransaction(0, tx_verified),
]); ]);
} }
#[test]
fn server_responds_with_nonempty_inventory_when_getdata_stop_hash_filled() {
let (chain, executor, server) = create_synchronization_server();
{
let mut chain = chain.write();
chain.insert_best_block(test_data::block_h1().hash(), &test_data::block_h1().into()).expect("no error");
}
// when asking with stop_hash
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![],
hash_stop: test_data::genesis().hash(),
}).map(|t| server.add_task(0, t));
// => respond with next block
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: test_data::block_h1().hash(),
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
}
#[test]
fn server_responds_with_nonempty_headers_when_getdata_stop_hash_filled() {
let (chain, executor, server) = create_synchronization_server();
{
let mut chain = chain.write();
chain.insert_best_block(test_data::block_h1().hash(), &test_data::block_h1().into()).expect("no error");
}
// when asking with stop_hash
let dummy_id = 6;
server.serve_getheaders(0, types::GetHeaders {
version: 0,
block_locator_hashes: vec![],
hash_stop: test_data::genesis().hash(),
}, Some(dummy_id)).map(|t| server.add_task(0, t));
// => respond with next block
let headers = vec![
test_data::block_h1().block_header,
];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendHeaders(0, headers, ServerTaskIndex::Final(dummy_id))]);
}
} }

View File

@ -9,6 +9,7 @@ use primitives::hash::H256;
use synchronization_chain::ChainRef; use synchronization_chain::ChainRef;
use verification::{ChainVerifier, Verify as VerificationVerify, Chain}; use verification::{ChainVerifier, Verify as VerificationVerify, Chain};
use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider}; use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider};
use time::get_time;
/// Verification events sink /// Verification events sink
pub trait VerificationSink : Send + 'static { pub trait VerificationSink : Send + 'static {
@ -169,11 +170,8 @@ fn execute_verification_task<T: VerificationSink, U: PreviousTransactionOutputPr
} }
}, },
VerificationTask::VerifyTransaction(height, transaction) => { VerificationTask::VerifyTransaction(height, transaction) => {
// bitcoin: AcceptToMemoryPoolWorker let time: u32 = get_time().sec as u32;
match verifier.verify_transaction(tx_output_provider, height, time, &transaction, 1) {
let time: u32 = 0; // TODO
let sequence: usize = 1; // TODO: change to bool
match verifier.verify_transaction(tx_output_provider, height, time, &transaction, sequence) {
Ok(_) => sink.lock().on_transaction_verification_success(transaction), Ok(_) => sink.lock().on_transaction_verification_success(transaction),
Err(e) => sink.lock().on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()), Err(e) => sink.lock().on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()),
} }