Rework initial crawler logic.

This splits out the connection handling code into a try_connect closure, which
could be refactored into a Service of its own.

On creation, when we are likely to have very few peers, launch many concurrent
connections to the first few candidates in the initial candidate set, before
continuing to grow the peer set according to demand signals.
This commit is contained in:
Henry de Valence 2019-10-21 22:07:57 -07:00
parent e1a35490af
commit 027bdc8465
2 changed files with 84 additions and 22 deletions

View File

@ -54,11 +54,18 @@ pub type BoxedZebraService = Box<
type PeerChange = Result<Change<SocketAddr, PeerClient>, BoxedStdError>;
/// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`.
pub fn init<S>(
pub async fn init<S>(
config: Config,
inbound_service: S,
) -> (
impl Service<Request, Response = Response, Error = BoxedStdError> + Send + Clone + 'static,
impl Service<
Request,
Response = Response,
Error = BoxedStdError,
Future = impl Future<Output = Result<Response, BoxedStdError>> + Send,
> + Send
+ Clone
+ 'static,
Arc<Mutex<AddressBook>>,
)
where
@ -77,7 +84,7 @@ where
let (demand_tx, demand_rx) = mpsc::channel::<()>(100);
// Connect the rx end to a PeerSet, wrapping new peers in load instruments.
let peer_set = Buffer::new(
let mut peer_set = Buffer::new(
PeerSet::new(
PeakEwmaDiscover::new(
ServiceStream::new(
@ -118,7 +125,16 @@ where
);
// 3. Outgoing peers we connect to in response to load.
let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
// We need to await candidates.update() here, because Zcashd only sends one
// `addr` message per connection, and if we only have one initial peer we
// need to ensure that its `addr` message is used by the crawler.
// XXX this should go in CandidateSet::new, but we need init() -> Result<_,_>
let _ = candidates.update().await;
info!("Sending initial request for peers");
tokio::spawn(
crawl_and_dial(demand_rx, candidates, peer_connector, peerset_tx).map(|result| {
if let Err(e) = result {
@ -127,7 +143,7 @@ where
}),
);
(Box::new(peer_set), address_book)
(peer_set, address_book)
}
/// Use the provided `peer_connector` to connect to `initial_peers`, then send
@ -192,16 +208,11 @@ where
/// Given a channel that signals a need for new peers, try to connect to a peer
/// and send the resulting `PeerClient` through a channel.
///
#[instrument(skip(
demand_signal,
candidates,
peer_connector,
success_tx
))]
#[instrument(skip(demand_signal, candidates, peer_connector, success_tx))]
async fn crawl_and_dial<C, S>(
mut demand_signal: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>,
mut peer_connector: C,
peer_connector: C,
mut success_tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxedStdError>
where
@ -210,25 +221,76 @@ where
S: Service<Request, Response = Response, Error = BoxedStdError>,
S::Future: Send + 'static,
{
// XXX this kind of boilerplate didn't exist before we made PeerConnector
// take (TcpStream, SocketAddr), which made it so that we could share code
// between inbound and outbound handshakes. Probably the cleanest way to
// make it go away again is to rename "Connector" to "Handshake" (since it
// is really responsible just for the handshake) and to have a "Connector"
// Service wrapper around "Handshake" that opens a TCP stream.
// We could also probably make the Handshake service `Clone` directly,
// which might be more efficient than using a Buffer wrapper.
use crate::types::MetaAddr;
use futures::TryFutureExt;
let try_connect = |candidate: MetaAddr| {
let mut pc = peer_connector.clone();
async move {
let stream = TcpStream::connect(candidate.addr).await?;
pc.ready().await?;
pc.call((stream, candidate.addr))
.await
.map(|client| Change::Insert(candidate.addr, client))
}
// Use map_err to tag failed connections with the MetaAddr,
// so they can be reported to the CandidateSet.
.map_err(move |_| candidate)
};
// On creation, we are likely to have very few peers, so try to get more
// connections quickly by concurrently connecting to a large number of
// candidates.
let mut handshakes = FuturesUnordered::new();
for _ in 0..50usize {
if let Some(candidate) = candidates.next() {
handshakes.push(try_connect(candidate))
}
}
while let Some(handshake) = handshakes.next().await {
match handshake {
Ok(change) => {
debug!("Successfully dialed new peer, sending to peerset");
success_tx.send(Ok(change)).await?;
}
Err(candidate) => {
debug!(?candidate.addr, "marking address as failed");
candidates.report_failed(candidate);
}
}
}
// XXX instead of just responding to demand, we could respond to demand *or*
// to a interval timer (to continuously grow the peer set).
while let Some(()) = demand_signal.next().await {
debug!("Got demand signal from peer set");
debug!("got demand signal from peer set, updating candidates");
candidates.update().await?;
loop {
candidates.update().await?;
// If we were unable to get a candidate, keep looping to crawl more.
let addr = match candidates.next() {
Some(candidate) => candidate.addr,
None => continue,
let candidate = match candidates.next() {
Some(candidate) => candidate,
None => {
warn!("got demand for more peers but no available candidates");
break;
}
};
if let Ok(stream) = TcpStream::connect(addr).await {
peer_connector.ready().await?;
if let Ok(client) = peer_connector.call((stream, addr)).await {
match try_connect(candidate).await {
Ok(change) => {
debug!("Successfully dialed new peer, sending to peerset");
success_tx.send(Ok(Change::Insert(addr, client))).await?;
success_tx.send(Ok(change)).await?;
break;
}
Err(candidate) => {
debug!(?candidate.addr, "marking address as failed");
candidates.report_failed(candidate);
}
}
}
}

View File

@ -70,7 +70,7 @@ impl ConnectCmd {
config.initial_peers = vec![self.addr];
let (mut peer_set, address_book) = zebra_network::init(config, node);
let (mut peer_set, address_book) = zebra_network::init(config, node).await;
info!("waiting for peer_set ready");
peer_set.ready().await.map_err(Error::from_boxed_compat)?;