diff --git a/p2p/src/net/peer_context.rs b/p2p/src/net/peer_context.rs index 61618064..74046fe0 100644 --- a/p2p/src/net/peer_context.rs +++ b/p2p/src/net/peer_context.rs @@ -3,6 +3,7 @@ use parking_lot::Mutex; use message::{Payload, Message}; use p2p::Context; use util::{PeerInfo, ConfigurableSynchronizer, ResponseQueue, Synchronizer, Responses}; +use futures::{lazy, finished}; pub struct PeerContext { context: Arc, @@ -104,6 +105,17 @@ impl PeerContext { } } + /// Closes this context + pub fn close(&self) { + let context = self.context.clone(); + let peer_id = self.info.id; + let close = lazy(move || { + context.close_channel(peer_id); + finished::<(), ()>(()) + }); + self.context.spawn(close); + } + pub fn info(&self) -> &PeerInfo { &self.info } diff --git a/p2p/src/protocol/sync.rs b/p2p/src/protocol/sync.rs index f3b5f701..8a921368 100644 --- a/p2p/src/protocol/sync.rs +++ b/p2p/src/protocol/sync.rs @@ -60,6 +60,7 @@ pub trait OutboundSyncConnection : Send + Sync { fn send_block_txn(&self, message: &types::BlockTxn); fn send_notfound(&self, message: &types::NotFound); fn ignored(&self, id: u32); + fn close(&self); } struct OutboundSync { @@ -162,6 +163,10 @@ impl OutboundSyncConnection for OutboundSync { fn ignored(&self, id: u32) { self.context.ignore_response(id); } + + fn close(&self) { + self.context.close() + } } pub struct SyncProtocol { diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 2a0ae7c4..8dd68875 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -74,10 +74,17 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig}; use synchronization_verifier::AsyncVerifier; + let sync_client_config = SynchronizationConfig { + // during regtests, peer is providing us with bad blocks => we shouldn't close connection because of this + close_connection_on_bad_block: network != Magic::Regtest, + // TODO: remove me + threads_num: 4, + }; + let sync_chain = Arc::new(RwLock::new(SyncChain::new(db))); let sync_executor = SyncExecutor::new(sync_chain.clone()); let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone())); - let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone(), network); + let sync_client_core = SynchronizationClientCore::new(sync_client_config, handle, sync_executor.clone(), sync_chain.clone(), network); let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone()); let sync_client = SynchronizationClient::new(sync_client_core, verifier); let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor)); diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 02a049c8..0743056e 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -305,6 +305,7 @@ mod tests { fn send_block_txn(&self, _message: &types::BlockTxn) {} fn send_notfound(&self, _message: &types::NotFound) {} fn ignored(&self, _id: u32) {} + fn close(&self) {} } fn create_local_node() -> (Core, Handle, Arc>, Arc, LocalNode>) { @@ -313,7 +314,7 @@ mod tests { let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block())))); let executor = DummyTaskExecutor::new(); let server = Arc::new(DummyServer::new()); - let config = Config { threads_num: 1 }; + let config = Config { threads_num: 1, close_connection_on_bad_block: true }; let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Mainnet); let mut verifier = DummyVerifier::default(); verifier.set_sink(client_core.clone()); diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 171362fa..0e049481 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -1,6 +1,6 @@ use std::fmt; use std::sync::Arc; -use std::collections::VecDeque; +use std::collections::{VecDeque, HashSet}; use linked_hash_map::LinkedHashMap; use parking_lot::RwLock; use chain::{BlockHeader, Transaction}; @@ -55,6 +55,8 @@ pub enum BlockState { Verifying, /// In storage Stored, + /// This block has been marked as dead-end block + DeadEnd, } /// Transactions synchronization state @@ -101,6 +103,8 @@ pub enum HeadersIntersection { DbAllBlocksKnown, /// 3.4: No intersection with in-memory queue && has intersection with db && some blocks are not yet stored in db DbForkNewBlocks(usize), + /// Dead-end blocks are starting from given index + DeadEnd(usize), } /// Blockchain from synchroniation point of view, consisting of: @@ -123,6 +127,8 @@ pub struct Chain { verifying_transactions: LinkedHashMap, /// Transactions memory pool memory_pool: MemoryPool, + /// Blocks that have been marked as dead-ends + dead_end_blocks: HashSet, } impl BlockState { @@ -163,6 +169,7 @@ impl Chain { headers_chain: BestHeadersChain::new(best_storage_block_hash), verifying_transactions: LinkedHashMap::new(), memory_pool: MemoryPool::new(), + dead_end_blocks: HashSet::new(), } } @@ -276,7 +283,11 @@ impl Chain { None => if self.storage.contains_block(db::BlockRef::Hash(hash.clone())) { BlockState::Stored } else { - BlockState::Unknown + if self.dead_end_blocks.contains(hash) { + BlockState::DeadEnd + } else { + BlockState::Unknown + } }, } } @@ -334,6 +345,11 @@ impl Chain { requested } + /// Mark this block as dead end, so these tasks won't be synchronized + pub fn mark_dead_end_block(&mut self, hash: &H256) { + self.dead_end_blocks.insert(hash.clone()); + } + /// Insert new best block to storage pub fn insert_best_block(&mut self, hash: H256, block: &IndexedBlock) -> Result { let is_appending_to_main_branch = self.best_storage_block.hash == block.header().previous_header_hash; @@ -516,6 +532,10 @@ impl Chain { state => (true, state), }; match first_state { + // if first block of inventory is dead-end, then all other blocks are also dead-end blocks + BlockState::DeadEnd => { + HeadersIntersection::DeadEnd(0) + }, // if first block of inventory is unknown && its parent is unknonw => all other blocks are also unknown BlockState::Unknown => { HeadersIntersection::NoKnownBlocks(0) @@ -542,14 +562,18 @@ impl Chain { } }, // if first block is known && last block is unknown => intersection with queue or with db - BlockState::Unknown if is_first_known => { + BlockState::Unknown | BlockState::DeadEnd if is_first_known => { // find last known block let mut previous_state = first_block_state; for (index, hash) in hashes.iter().enumerate().take(hashes_len).skip(1) { let state = self.block_state(hash); - if state == BlockState::Unknown { + if state == BlockState::Unknown || state == BlockState::DeadEnd { + // if state is dead end => there are no useful blocks + if state == BlockState::DeadEnd { + return HeadersIntersection::DeadEnd(index); + } // previous block is stored => fork from stored block - if previous_state == BlockState::Stored { + else if previous_state == BlockState::Stored { return HeadersIntersection::DbForkNewBlocks(index); } // previous block is best block => no fork @@ -1194,4 +1218,49 @@ mod tests { // => tx2 is removed from memory pool, but tx3 remains assert_eq!(chain.information().transactions.transactions_count, 1); } + + #[test] + fn chain_dead_end() { + let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block())); + + let blocks = test_data::build_n_empty_blocks_from(5, 0, &test_data::genesis().block_header); + let headers: Vec<_> = blocks.iter().map(|b| b.block_header.clone()).collect(); + let hashes: Vec<_> = headers.iter().map(|h| h.hash()).collect(); + + chain.insert_best_block(blocks[0].hash(), &blocks[0].clone().into()).expect("no error"); + chain.insert_best_block(blocks[1].hash(), &blocks[1].clone().into()).expect("no error"); + chain.mark_dead_end_block(&blocks[2].hash()); + + assert_eq!(chain.intersect_with_blocks_headers(&vec![ + hashes[0].clone(), + hashes[1].clone(), + hashes[2].clone(), + hashes[3].clone(), + hashes[4].clone(), + ], &vec![ + headers[0].clone(), + headers[1].clone(), + headers[2].clone(), + headers[3].clone(), + headers[4].clone(), + ]), HeadersIntersection::DeadEnd(2)); + + assert_eq!(chain.intersect_with_blocks_headers(&vec![ + hashes[2].clone(), + hashes[3].clone(), + hashes[4].clone(), + ], &vec![ + headers[2].clone(), + headers[3].clone(), + headers[4].clone(), + ]), HeadersIntersection::DeadEnd(0)); + + assert_eq!(chain.intersect_with_blocks_headers(&vec![ + hashes[3].clone(), + hashes[4].clone(), + ], &vec![ + headers[3].clone(), + headers[4].clone(), + ]), HeadersIntersection::DeadEnd(0)); + } } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 53860d29..6c3a1a35 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -242,6 +242,8 @@ pub trait ClientCore : VerificationSink { /// Synchronization client configuration options. #[derive(Debug)] pub struct Config { + /// If true, connection to peer who has provided us with bad block is closed + pub close_connection_on_bad_block: bool, /// Number of threads to allocate in synchronization CpuPool. pub threads_num: usize, } @@ -310,6 +312,8 @@ pub struct SynchronizationClientCore { block_speed_meter: AverageSpeedMeter, /// Block synchronization speed meter sync_speed_meter: AverageSpeedMeter, + /// Configuration + config: Config, } /// Block headers provider from `headers` message @@ -336,14 +340,6 @@ struct AverageSpeedMeter { last_timestamp: Option, } -impl Config { - pub fn new() -> Self { - Config { - threads_num: 4, - } - } -} - impl FilteredInventory { #[cfg(test)] pub fn with_unfiltered(unfiltered: Vec) -> Self { @@ -603,7 +599,12 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { let unknown_blocks_hashes: Vec<_> = { let chain = self.chain.read(); blocks_hashes.into_iter() - .filter(|h| chain.block_state(h) == BlockState::Unknown) + .filter(|h| { + // if we haven't closed connection after receiving dead-end block + // => also process dead-end blocks + let block_state = chain.block_state(h); + block_state == BlockState::Unknown || (block_state == BlockState::DeadEnd && !self.config.close_connection_on_bad_block) + }) .filter(|h| !self.orphaned_blocks_pool.contains_unknown_block(h)) .collect() }; @@ -1016,12 +1017,23 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor self.do_not_relay.remove(hash); + // close connection with this peer + if let Some(peer_index) = self.verifying_blocks_by_peer.get(hash) { + warn!(target: "sync", "Peer#{} has provided us with wrong block {:?}", peer_index, hash.to_reversed_str()); + if self.config.close_connection_on_bad_block { + self.executor.lock().execute(Task::Close(*peer_index)); + } + } + { let mut chain = self.chain.write(); // forget for this block and all its children // headers are also removed as they all are invalid chain.forget_block_with_children(hash); + + // mark failed block as dead end (this branch won't be synchronized) + chain.mark_dead_end_block(hash); } // awake threads, waiting for this block insertion @@ -1094,6 +1106,7 @@ impl SynchronizationClientCore where T: TaskExecutor { do_not_relay: HashSet::new(), block_speed_meter: AverageSpeedMeter::with_inspect_items(SYNC_SPEED_BLOCKS_TO_INSPECT), sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT), + config: config, } )); @@ -1258,7 +1271,8 @@ impl SynchronizationClientCore where T: TaskExecutor { assert_eq!(hashes.len(), headers.len()); let mut chain = self.chain.write(); - match chain.intersect_with_blocks_headers(&hashes, &headers) { + let intersection_result = chain.intersect_with_blocks_headers(&hashes, &headers); + match intersection_result { HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => { warn!(target: "sync", "Ignoring {} headers from peer#{}. Unknown and we are synchronizing.", headers.len(), peer_index); }, @@ -1277,11 +1291,35 @@ impl SynchronizationClientCore where T: TaskExecutor { HeadersIntersection::InMemoryMainNewBlocks(new_block_index) | HeadersIntersection::InMemoryForkNewBlocks(new_block_index) | HeadersIntersection::DbForkNewBlocks(new_block_index) - | HeadersIntersection::NoKnownBlocks(new_block_index) => { + | HeadersIntersection::NoKnownBlocks(new_block_index) + | HeadersIntersection::DeadEnd(new_block_index) => { + if let HeadersIntersection::DeadEnd(dead_block_index) = intersection_result { + warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, hashes[dead_block_index].to_reversed_str()); + if self.config.close_connection_on_bad_block { + self.executor.lock().execute(Task::Close(peer_index)); + return; + } + } + // check that we do not know all blocks in range [new_block_index..] // if we know some block => there has been verification error => all headers should be ignored // see when_previous_block_verification_failed_fork_is_not_requested for details - if hashes.iter().skip(new_block_index).any(|h| chain.block_state(h) != BlockState::Unknown) { + if hashes.iter().skip(new_block_index).any(|h| { + let block_state = chain.block_state(h); + match block_state { + BlockState::Unknown => false, + BlockState::DeadEnd => { + warn!(target: "sync", "Peer#{} has provided us with blocks leading to dead-end block {:?}", peer_index, h.to_reversed_str()); + if self.config.close_connection_on_bad_block { + self.executor.lock().execute(Task::Close(peer_index)); + true + } else { + false + } + }, + _ => true, + } + }) { return; } @@ -1318,15 +1356,32 @@ impl SynchronizationClientCore where T: TaskExecutor { // prepare list of blocks to verify + make all required changes to the chain let mut result: Option> = None; let mut chain = self.chain.write(); - match chain.block_state(&block_hash) { + let block_state = chain.block_state(&block_hash); + match block_state { BlockState::Verifying | BlockState::Stored => { // remember peer as useful self.peers.useful_peer(peer_index); }, - BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => { + BlockState::Unknown | BlockState::Scheduled | BlockState::Requested | BlockState::DeadEnd => { + if block_state == BlockState::DeadEnd { + warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str()); + if self.config.close_connection_on_bad_block { + self.executor.lock().execute(Task::Close(peer_index)); + } + } + // check parent block state - match chain.block_state(&block.header().previous_header_hash) { - BlockState::Unknown => { + let parent_block_state = chain.block_state(&block.header().previous_header_hash); + match parent_block_state { + BlockState::Unknown | BlockState::DeadEnd => { + if parent_block_state == BlockState::DeadEnd { + warn!(target: "sync", "Peer#{} has provided us with dead-end block {:?}", peer_index, block_hash.to_reversed_str()); + if self.config.close_connection_on_bad_block { + self.executor.lock().execute(Task::Close(peer_index)); + return result; + } + } + if self.state.is_synchronizing() { // when synchronizing, we tend to receive all blocks in-order trace!( @@ -1648,7 +1703,7 @@ pub mod tests { }; let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone()))); let executor = DummyTaskExecutor::new(); - let config = Config { threads_num: 1 }; + let config = Config { threads_num: 1, close_connection_on_bad_block: true }; let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Testnet); { @@ -2605,4 +2660,85 @@ pub mod tests { assert_eq!(headers_provider.block_header(db::BlockRef::Hash(H256::from(1))), None); assert_eq!(headers_provider.block_header(db::BlockRef::Number(2)), None); } + + #[test] + fn collection_closed_on_block_verification_error() { + let genesis = test_data::genesis(); + let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); + + // simulate verification error during b0 verification + let mut dummy_verifier = DummyVerifier::default(); + dummy_verifier.error_when_verifying(b0.hash(), "simulated"); + + let (_, _, executor, _, sync) = create_sync(None, Some(dummy_verifier)); + sync.lock().on_peer_block(0, b0.into()); + + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::Close(0)]); + } + + #[test] + fn collection_closed_on_begin_dead_end_block_header() { + let genesis = test_data::genesis(); + let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); + let b1 = test_data::block_builder().header().parent(b0.hash()).build().build(); + let b2 = test_data::block_builder().header().parent(b1.hash()).build().build(); + + let (_, _, executor, chain, sync) = create_sync(None, None); + { + chain.write().mark_dead_end_block(&b0.hash()); + } + sync.lock().on_new_blocks_headers(0, vec![b0.block_header.clone(), b1.block_header.clone(), b2.block_header.clone()]); + + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::Close(0)]); + } + + #[test] + fn collection_closed_on_in_middle_dead_end_block_header() { + let genesis = test_data::genesis(); + let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); + let b1 = test_data::block_builder().header().parent(b0.hash()).build().build(); + let b2 = test_data::block_builder().header().parent(b1.hash()).build().build(); + + let (_, _, executor, chain, sync) = create_sync(None, None); + { + chain.write().mark_dead_end_block(&b1.hash()); + } + sync.lock().on_new_blocks_headers(0, vec![b0.block_header.clone(), b1.block_header.clone(), b2.block_header.clone()]); + + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::Close(0)]); + } + + #[test] + fn collection_closed_on_providing_dead_end_block() { + let genesis = test_data::genesis(); + let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); + + let (_, _, executor, chain, sync) = create_sync(None, None); + { + chain.write().mark_dead_end_block(&b0.hash()); + } + sync.lock().on_peer_block(0, b0.into()); + + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::Close(0)]); + } + + #[test] + fn collection_closed_on_providing_child_dead_end_block() { + let genesis = test_data::genesis(); + let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); + let b1 = test_data::block_builder().header().parent(b0.hash()).build().build(); + + let (_, _, executor, chain, sync) = create_sync(None, None); + { + chain.write().mark_dead_end_block(&b0.hash()); + } + sync.lock().on_peer_block(0, b1.into()); + + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::Close(0)]); + } } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index 08289c8c..46ed9efe 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -46,6 +46,8 @@ pub enum Task { SendCompactBlocks(usize, Vec), /// Notify io about ignored request Ignore(usize, u32), + /// Close connection with this peer + Close(usize), } /// Synchronization tasks executor @@ -56,6 +58,28 @@ pub struct LocalSynchronizationTaskExecutor { chain: ChainRef, } +impl Task { + #[cfg(test)] + pub fn peer_index(&self) -> usize { + match *self { + Task::RequestBlocks(peer_index, _) => peer_index, + Task::RequestBlocksHeaders(peer_index) => peer_index, + Task::RequestTransactions(peer_index, _) => peer_index, + Task::RequestMemoryPool(peer_index) => peer_index, + Task::SendBlock(peer_index, _) => peer_index, + Task::SendMerkleBlock(peer_index, _) => peer_index, + Task::SendTransaction(peer_index, _) => peer_index, + Task::SendBlockTxn(peer_index, _, _) => peer_index, + Task::SendNotFound(peer_index, _) => peer_index, + Task::SendInventory(peer_index, _) => peer_index, + Task::SendHeaders(peer_index, _, _) => peer_index, + Task::SendCompactBlocks(peer_index, _) => peer_index, + Task::Ignore(peer_index, _) => peer_index, + Task::Close(peer_index) => peer_index, + } + } +} + impl LocalSynchronizationTaskExecutor { pub fn new(chain: ChainRef) -> Arc> { Arc::new(Mutex::new(LocalSynchronizationTaskExecutor { @@ -215,6 +239,12 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.ignored(id); } }, + Task::Close(peer_index) => { + if let Some(connection) = self.peers.get_mut(&peer_index) { + trace!(target: "sync", "Closing request with peer#{}", peer_index); + connection.close(); + } + }, } } } @@ -228,10 +258,12 @@ pub mod tests { use parking_lot::{Mutex, Condvar}; use local_node::PeersConnections; use p2p::OutboundSyncConnectionRef; + use std::collections::HashSet; pub struct DummyTaskExecutor { tasks: Vec, waiter: Arc, + closed: HashSet, } impl DummyTaskExecutor { @@ -239,6 +271,7 @@ pub mod tests { Arc::new(Mutex::new(DummyTaskExecutor { tasks: Vec::new(), waiter: Arc::new(Condvar::new()), + closed: HashSet::new(), })) } @@ -267,6 +300,13 @@ pub mod tests { impl TaskExecutor for DummyTaskExecutor { fn execute(&mut self, task: Task) { + match task { + Task::Close(id) => { + self.closed.insert(id); + () + }, + _ => if self.closed.contains(&task.peer_index()) { return }, + } self.tasks.push(task); self.waiter.notify_one(); } diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index bc57665e..e0850e6a 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -280,11 +280,10 @@ impl SynchronizationServer { }, // `getblocktxn` => `blocktxn` ServerTask::ServeGetBlockTxn(block_hash, indexes) => { - let transactions = { + let (close, transactions) = { let chain = chain.read(); let storage = chain.storage(); if let Some(block) = storage.block(db::BlockRef::Hash(block_hash.clone())) { - let requested_len = indexes.len(); let transactions_len = block.transactions.len(); let mut read_indexes = HashSet::new(); @@ -302,20 +301,23 @@ impl SynchronizationServer { .map(Option::unwrap) // take_while above .collect(); if transactions.len() == requested_len { - Some(transactions) + (false, Some(transactions)) } else { - // TODO: malformed - None + warn!(target: "sync", "Closing connection with peer#{} as it has requested unknown block_txn from block {:?}", peer_index, block_hash.to_reversed_str()); + (true, None) } } else { - // TODO: else malformed - None + warn!(target: "sync", "Closing connection with peer#{} as it has requested block_txn from unknown block {:?}", peer_index, block_hash.to_reversed_str()); + (true, None) } }; if let Some(transactions) = transactions { trace!(target: "sync", "Going to respond with {} blocktxn transactions to peer#{}", transactions.len(), peer_index); executor.lock().execute(Task::SendBlockTxn(peer_index, block_hash, transactions)); } + if close { + executor.lock().execute(Task::Close(peer_index)); + } }, // `mempool` => `inventory` ServerTask::ServeMempool => { @@ -766,7 +768,7 @@ pub mod tests { server.serve_get_block_txn(0, test_data::genesis().hash(), vec![1]).map(|t| server.add_task(0, t)); // server responds with transactions let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![]); + assert_eq!(tasks, vec![Task::Close(0)]); } #[test]