fixing sync TODOs

This commit is contained in:
Svyatoslav Nikolsky 2016-12-01 11:00:09 +03:00
parent 4df6d03bbc
commit 9e53289b5a
7 changed files with 194 additions and 46 deletions

View File

@ -24,7 +24,7 @@ pub struct ConnectionFilter {
/// Filter update type.
filter_flags: types::FilterFlags,
/// Last blocks from peer.
last_blocks: LinkedHashMap<H256, ()>,
last_blocks: LinkedHashMap<H256, bool>,
/// Last transactions from peer.
last_transactions: LinkedHashMap<H256, ()>,
/// Minimal fee in satoshis per 1000 bytes
@ -91,14 +91,14 @@ impl ConnectionFilter {
}
/// We have a knowledge that block with given hash is known to this connection
pub fn known_block(&mut self, block_hash: &H256) {
pub fn known_block(&mut self, block_hash: &H256, is_sent_compact: bool) {
// remember that peer knows about this block
if !self.last_blocks.contains_key(block_hash) {
if self.last_blocks.len() == MAX_LAST_BLOCKS_TO_STORE {
self.last_blocks.pop_front();
}
self.last_blocks.insert(block_hash.clone(), ());
self.last_blocks.insert(block_hash.clone(), is_sent_compact);
}
}
@ -114,6 +114,11 @@ impl ConnectionFilter {
}
}
/// Is compact block with this hash has been sent recently
pub fn is_known_compact_block(&self, block_hash: &H256) -> bool {
self.last_blocks.get(block_hash).cloned().unwrap_or(false)
}
/// Check if block should be sent to this connection
pub fn filter_block(&self, block_hash: &H256) -> bool {
// check if block is known
@ -662,15 +667,15 @@ pub mod tests {
let mut filter = ConnectionFilter::default();
assert!(filter.filter_block(&blocks[0].hash()));
filter.known_block(&blocks[0].hash());
filter.known_block(&blocks[0].hash(), false);
assert!(!filter.filter_block(&blocks[0].hash()));
for block in blocks.iter().skip(1).take(MAX_LAST_BLOCKS_TO_STORE - 1) {
filter.known_block(&block.hash());
filter.known_block(&block.hash(), false);
assert!(!filter.filter_block(&blocks[0].hash()));
}
filter.known_block(&blocks[MAX_LAST_BLOCKS_TO_STORE].hash());
filter.known_block(&blocks[MAX_LAST_BLOCKS_TO_STORE].hash(), false);
assert!(filter.filter_block(&blocks[0].hash()));
}
@ -694,4 +699,13 @@ pub mod tests {
filter.known_transaction(&transactions[MAX_LAST_TRANSACTIONS_TO_STORE].hash());
assert!(filter.filter_transaction(&transactions[0].hash(), &transactions[0], None));
}
#[test]
fn known_compact_block() {
let mut filter = ConnectionFilter::default();
filter.known_block(&test_data::block_h1().hash(), true);
filter.known_block(&test_data::block_h2().hash(), false);
assert!(filter.is_known_compact_block(&test_data::block_h1().hash()));
assert!(!filter.is_known_compact_block(&test_data::block_h2().hash()));
}
}

View File

@ -216,7 +216,15 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
pub fn on_peer_get_block_txn(&self, peer_index: usize, message: types::GetBlockTxn) {
trace!(target: "sync", "Got `getblocktxn` message from peer#{}", peer_index);
self.server.serve_get_block_txn(peer_index, message.request.blockhash, message.request.indexes).map(|t| self.server.add_task(peer_index, t));
// Upon receipt of a properly-formatted getblocktxn message, nodes which *recently provided the sender
// of such a message a cmpctblock for the block hash identified in this message* MUST respond ...
// => we should check if we have send cmpctblock before
if {
let mut client = self.client.lock();
client.is_compact_block_sent_recently(peer_index, &message.request.blockhash)
} {
self.server.serve_get_block_txn(peer_index, message.request.blockhash, message.request.indexes).map(|t| self.server.add_task(peer_index, t));
}
}
pub fn on_peer_block_txn(&self, peer_index: usize, _message: types::BlockTxn) {
@ -256,7 +264,7 @@ mod tests {
use synchronization_chain::Chain;
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
use message::types;
use message::common::{InventoryVector, InventoryType};
use message::common::{InventoryVector, InventoryType, BlockTransactionsRequest};
use db;
use super::LocalNode;
use test_data;
@ -475,4 +483,67 @@ mod tests {
_ => panic!("unexpected"),
}
}
#[test]
fn local_node_serves_get_block_txn_when_recently_sent_compact_block() {
let (_, _, _, server, local_node) = create_local_node();
let genesis = test_data::genesis();
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
.transaction().output().value(10).build().build()
.build(); // genesis -> b1
let b1_hash = b1.hash();
// Append block
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() });
// Request compact block
let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![
InventoryVector { inv_type: InventoryType::MessageCompactBlock, hash: b1_hash.clone() },
]});
// forget tasks
server.take_tasks();
// Request compact transaction from this block
local_node.on_peer_get_block_txn(peer_index2, types::GetBlockTxn {
request: BlockTransactionsRequest {
blockhash: b1_hash.clone(),
indexes: vec![0],
}
});
let tasks = server.take_tasks();
assert_eq!(tasks, vec![(2, ServerTask::ServeGetBlockTxn(b1_hash, vec![0]))]);
}
#[test]
fn local_node_not_serves_get_block_txn_when_compact_block_was_not_sent() {
let (_, _, _, server, local_node) = create_local_node();
let genesis = test_data::genesis();
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
.transaction().output().value(10).build().build()
.build(); // genesis -> b1
let b1_hash = b1.hash();
// Append block
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() });
// Request compact transaction from this block
let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_get_block_txn(peer_index2, types::GetBlockTxn {
request: BlockTransactionsRequest {
blockhash: b1_hash,
indexes: vec![0],
}
});
let tasks = server.take_tasks();
assert_eq!(tasks, vec![]);
}
}

View File

@ -185,6 +185,7 @@ pub struct Information {
pub trait Client : Send + 'static {
fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State;
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool;
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory;
fn on_peer_connected(&mut self, peer_index: usize);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
@ -206,6 +207,7 @@ pub trait Client : Send + 'static {
pub trait ClientCore : VerificationSink {
fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State;
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool;
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory;
fn on_peer_connected(&mut self, peer_index: usize);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
@ -353,6 +355,10 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
self.core.lock().state()
}
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool {
self.core.lock().is_compact_block_sent_recently(peer_index, hash)
}
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
self.core.lock().filter_getdata_inventory(peer_index, inventory)
}
@ -469,6 +475,11 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
self.state
}
/// Returns true if compactblock with this hash has been sent to this peer recently
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool {
self.peers.filter(peer_index).is_known_compact_block(hash)
}
/// Filter inventory from `getdata` message for given peer
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
let storage = { self.chain.read().storage() };
@ -489,7 +500,10 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
None => notfound.push(item),
Some(block) => match filter.build_merkle_block(block) {
None => notfound.push(item),
Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)),
Some(merkleblock) => {
filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions));
filter.known_block(&item.hash, false);
},
},
}
},
@ -508,6 +522,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
header: build_compact_block(indexed_block, prefilled_transactions_indexes),
};
compacted.push(compact_block);
filter.known_block(&item.hash, true);
},
}
},

View File

@ -77,8 +77,6 @@ impl PeersConnections for LocalSynchronizationTaskExecutor {
impl TaskExecutor for LocalSynchronizationTaskExecutor {
fn execute(&mut self, task: Task) {
// TODO: what is types::GetBlocks::version here? (@ PR#37)
match task {
Task::RequestBlocks(peer_index, blocks_hashes) => {
let getdata = types::GetData {
@ -97,7 +95,7 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
Task::RequestBlocksHeaders(peer_index) => {
let block_locator_hashes = self.chain.read().block_locator_hashes();
let getheaders = types::GetHeaders {
version: 0,
version: 0, // this field is ignored by clients
block_locator_hashes: block_locator_hashes,
hash_stop: H256::default(),
};

View File

@ -244,7 +244,7 @@ impl Peers {
}
// remember that peer knows about this block
self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default).known_block(block_hash);
self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default).known_block(block_hash, false);
}
/// Transaction is received from peer.

View File

@ -123,14 +123,27 @@ impl SynchronizationServer {
server
}
fn locate_known_block_hash(chain: &Chain, block_locator_hashes: &Vec<H256>) -> Option<db::BestBlock> {
fn locate_known_block_hash(chain: &Chain, block_locator_hashes: &Vec<H256>, stop_hash: &H256) -> Option<db::BestBlock> {
block_locator_hashes.into_iter()
.filter_map(|hash| SynchronizationServer::locate_best_known_block_hash(&chain, hash))
.nth(0)
.or_else(|| if stop_hash != &H256::default() {
if let Some(stop_hash_number) = chain.storage().block_number(stop_hash) {
Some(db::BestBlock {
number: stop_hash_number,
hash: stop_hash.clone(),
})
} else {
None
}
} else {
None
}
)
}
fn locate_known_block_header(chain: &Chain, block_locator_hashes: &Vec<H256>) -> Option<db::BestBlock> {
SynchronizationServer::locate_known_block_hash(chain, block_locator_hashes)
fn locate_known_block_header(chain: &Chain, block_locator_hashes: &Vec<H256>, stop_hash: &H256) -> Option<db::BestBlock> {
SynchronizationServer::locate_known_block_hash(chain, block_locator_hashes, stop_hash)
}
fn server_worker<T: TaskExecutor>(queue_ready: Arc<Condvar>, queue: Arc<Mutex<ServerQueue>>, chain: ChainRef, executor: Arc<Mutex<T>>) {
@ -223,22 +236,23 @@ impl SynchronizationServer {
assert_eq!(indexed_task.id, ServerTaskIndex::None);
let chain = chain.read();
if let Some(best_common_block) = SynchronizationServer::locate_known_block_hash(&chain, &block_locator_hashes) {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
let blocks_hashes = match SynchronizationServer::locate_known_block_hash(&chain, &block_locator_hashes, &hash_stop) {
Some(best_common_block) => {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
SynchronizationServer::blocks_hashes_after(&chain, &best_common_block, &hash_stop, 500)
},
None => {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
Vec::new()
},
};
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_common_block, &hash_stop, 500);
if !blocks_hashes.is_empty() {
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: hash,
}).collect();
executor.lock().execute(Task::SendInventory(peer_index, inventory));
}
}
else {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
}
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: hash,
}).collect();
executor.lock().execute(Task::SendInventory(peer_index, inventory));
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
@ -246,19 +260,15 @@ impl SynchronizationServer {
ServerTask::ServeGetHeaders(block_locator_hashes, hash_stop) => {
let chain = chain.read();
// TODO: if block_locator_hashes is empty => return hash_stop
let blocks_headers = match SynchronizationServer::locate_known_block_header(&chain, &block_locator_hashes) {
let blocks_headers = match SynchronizationServer::locate_known_block_header(&chain, &block_locator_hashes, &hash_stop) {
Some(best_common_block) => {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
// TODO: add test for this case
// we must respond with empty headers message even if we have no common blocks with this peer
SynchronizationServer::blocks_headers_after(&chain, &best_common_block, &hash_stop, 2000)
},
None => {
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
Vec::new()
}
},
};
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
@ -447,8 +457,6 @@ impl Server for SynchronizationServer {
}
fn serve_get_block_txn(&self, _peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask> {
// TODO: Upon receipt of a properly-formatted getblocktxn message, nodes which *recently provided the sender
// of such a message a cmpctblock for the block hash identified in this message* MUST respond ...
let task = IndexedServerTask::new(ServerTask::ServeGetBlockTxn(block_hash, indexes), ServerTaskIndex::None);
Some(task)
}
@ -650,9 +658,9 @@ pub mod tests {
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
}).map(|t| server.add_task(0, t));
// => no response
// => empty response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
assert_eq!(tasks, vec![Task::SendInventory(0, vec![])]);
}
#[test]
@ -815,4 +823,48 @@ pub mod tests {
Task::SendTransaction(0, tx_verified),
]);
}
#[test]
fn server_responds_with_nonempty_inventory_when_getdata_stop_hash_filled() {
let (chain, executor, server) = create_synchronization_server();
{
let mut chain = chain.write();
chain.insert_best_block(test_data::block_h1().hash(), &test_data::block_h1().into()).expect("no error");
}
// when asking with stop_hash
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![],
hash_stop: test_data::genesis().hash(),
}).map(|t| server.add_task(0, t));
// => respond with next block
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: test_data::block_h1().hash(),
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
}
#[test]
fn server_responds_with_nonempty_headers_when_getdata_stop_hash_filled() {
let (chain, executor, server) = create_synchronization_server();
{
let mut chain = chain.write();
chain.insert_best_block(test_data::block_h1().hash(), &test_data::block_h1().into()).expect("no error");
}
// when asking with stop_hash
let dummy_id = 6;
server.serve_getheaders(0, types::GetHeaders {
version: 0,
block_locator_hashes: vec![],
hash_stop: test_data::genesis().hash(),
}, Some(dummy_id)).map(|t| server.add_task(0, t));
// => respond with next block
let headers = vec![
test_data::block_h1().block_header,
];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendHeaders(0, headers, ServerTaskIndex::Final(dummy_id))]);
}
}

View File

@ -9,6 +9,7 @@ use primitives::hash::H256;
use synchronization_chain::ChainRef;
use verification::{ChainVerifier, Verify as VerificationVerify, Chain};
use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider};
use time::get_time;
/// Verification events sink
pub trait VerificationSink : Send + 'static {
@ -169,11 +170,8 @@ fn execute_verification_task<T: VerificationSink, U: PreviousTransactionOutputPr
}
},
VerificationTask::VerifyTransaction(height, transaction) => {
// bitcoin: AcceptToMemoryPoolWorker
let time: u32 = 0; // TODO
let sequence: usize = 1; // TODO: change to bool
match verifier.verify_transaction(tx_output_provider, height, time, &transaction, sequence) {
let time: u32 = get_time().sec as u32;
match verifier.verify_transaction(tx_output_provider, height, time, &transaction, 1) {
Ok(_) => sink.lock().on_transaction_verification_success(transaction),
Err(e) => sink.lock().on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()),
}