Pass sync_status to mempool (#2754)

* Pass sync_status to mempool

* Update zebrad/src/components/mempool.rs

Co-authored-by: Deirdre Connolly <deirdre@zfnd.org>

* Remove enabled flag for now; will be handled in #2723

Co-authored-by: Deirdre Connolly <deirdre@zfnd.org>
This commit is contained in:
Conrado Gouvea 2021-09-15 19:13:29 -03:00 committed by GitHub
parent 495316ac06
commit 957e12e4ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 8 deletions

View File

@ -85,12 +85,17 @@ impl StartCmd {
let (peer_set, address_book) = let (peer_set, address_book) =
zebra_network::init(config.network.clone(), inbound, latest_chain_tip).await; zebra_network::init(config.network.clone(), inbound, latest_chain_tip).await;
info!("initializing syncer");
let (syncer, sync_status) =
ChainSync::new(&config, peer_set.clone(), state.clone(), chain_verifier);
info!("initializing mempool"); info!("initializing mempool");
let mempool_service = BoxService::new(Mempool::new( let mempool_service = BoxService::new(Mempool::new(
config.network.network, config.network.network,
peer_set.clone(), peer_set.clone(),
state.clone(), state,
tx_verifier, tx_verifier,
sync_status.clone(),
)); ));
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service); let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);
@ -98,11 +103,6 @@ impl StartCmd {
.send((peer_set.clone(), address_book, mempool)) .send((peer_set.clone(), address_book, mempool))
.map_err(|_| eyre!("could not send setup data to inbound service"))?; .map_err(|_| eyre!("could not send setup data to inbound service"))?;
info!("initializing syncer");
// TODO: use sync_status to activate the mempool (#2592)
let (syncer, sync_status) =
ChainSync::new(&config, peer_set.clone(), state, chain_verifier);
select! { select! {
result = syncer.sync().fuse() => result, result = syncer.sync().fuse() => result,
_ = mempool::Crawler::spawn(peer_set, sync_status).fuse() => { _ = mempool::Crawler::spawn(peer_set, sync_status).fuse() => {

View File

@ -1,7 +1,7 @@
use std::{collections::HashSet, net::SocketAddr, str::FromStr, sync::Arc}; use std::{collections::HashSet, net::SocketAddr, str::FromStr, sync::Arc};
use super::mempool::{unmined_transactions_in_blocks, Mempool}; use super::mempool::{unmined_transactions_in_blocks, Mempool};
use crate::components::tests::mock_peer_set; use crate::components::{sync::SyncStatus, tests::mock_peer_set};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt}; use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
@ -23,6 +23,7 @@ async fn mempool_requests_for_transactions() {
let (peer_set, _) = mock_peer_set(); let (peer_set, _) = mock_peer_set();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none()); let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = Arc::new(std::sync::Mutex::new(address_book)); let address_book = Arc::new(std::sync::Mutex::new(address_book));
let (sync_status, _recent_syncs) = SyncStatus::new();
let (state, _, _) = zebra_state::init(state_config, network); let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state); let state_service = ServiceBuilder::new().buffer(1).service(state);
@ -36,6 +37,7 @@ async fn mempool_requests_for_transactions() {
peer_set.clone(), peer_set.clone(),
state_service.clone(), state_service.clone(),
transaction_verifier, transaction_verifier,
sync_status,
); );
let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network); let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network);

View File

@ -37,6 +37,8 @@ use self::downloads::{
Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT, Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
}; };
use super::sync::SyncStatus;
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>; type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>; type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
type TxVerifier = Buffer< type TxVerifier = Buffer<
@ -76,6 +78,10 @@ pub struct Mempool {
/// The transaction dowload and verify stream. /// The transaction dowload and verify stream.
tx_downloads: Pin<Box<InboundTxDownloads>>, tx_downloads: Pin<Box<InboundTxDownloads>>,
/// Allows checking if we are near the tip to enable/disable the mempool.
#[allow(dead_code)]
sync_status: SyncStatus,
} }
impl Mempool { impl Mempool {
@ -85,6 +91,7 @@ impl Mempool {
outbound: Outbound, outbound: Outbound,
state: State, state: State,
tx_verifier: TxVerifier, tx_verifier: TxVerifier,
sync_status: SyncStatus,
) -> Self { ) -> Self {
let tx_downloads = Box::pin(TxDownloads::new( let tx_downloads = Box::pin(TxDownloads::new(
Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT), Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT),
@ -94,6 +101,7 @@ impl Mempool {
Mempool { Mempool {
storage: Default::default(), storage: Default::default(),
tx_downloads, tx_downloads,
sync_status,
} }
} }

View File

@ -16,6 +16,7 @@ async fn mempool_service_basic() -> Result<(), Report> {
let consensus_config = ConsensusConfig::default(); let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral(); let state_config = StateConfig::ephemeral();
let (peer_set, _) = mock_peer_set(); let (peer_set, _) = mock_peer_set();
let (sync_status, _recent_syncs) = SyncStatus::new();
let (state, _, _) = zebra_state::init(state_config, network); let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state); let state_service = ServiceBuilder::new().buffer(1).service(state);
@ -26,7 +27,13 @@ async fn mempool_service_basic() -> Result<(), Report> {
// get the genesis block transactions from the Zcash blockchain. // get the genesis block transactions from the Zcash blockchain.
let genesis_transactions = unmined_transactions_in_blocks(0, network); let genesis_transactions = unmined_transactions_in_blocks(0, network);
// Start the mempool service // Start the mempool service
let mut service = Mempool::new(network, peer_set, state_service.clone(), tx_verifier); let mut service = Mempool::new(
network,
peer_set,
state_service.clone(),
tx_verifier,
sync_status,
);
// Insert the genesis block coinbase transaction into the mempool storage. // Insert the genesis block coinbase transaction into the mempool storage.
service.storage.insert(genesis_transactions.1[0].clone())?; service.storage.insert(genesis_transactions.1[0].clone())?;