2020-09-01 14:28:54 -07:00
|
|
|
use std::net::SocketAddr;
|
2019-10-10 18:15:24 -07:00
|
|
|
use std::{
|
|
|
|
collections::HashMap,
|
|
|
|
fmt::Debug,
|
2019-12-13 14:25:14 -08:00
|
|
|
future::Future,
|
2019-10-10 18:15:24 -07:00
|
|
|
marker::PhantomData,
|
|
|
|
pin::Pin,
|
2021-04-18 23:04:24 -07:00
|
|
|
sync::Arc,
|
2019-10-10 18:15:24 -07:00
|
|
|
task::{Context, Poll},
|
2020-11-29 22:41:14 -08:00
|
|
|
time::Instant,
|
2019-10-10 18:15:24 -07:00
|
|
|
};
|
|
|
|
|
2019-10-21 15:24:17 -07:00
|
|
|
use futures::{
|
|
|
|
channel::{mpsc, oneshot},
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
future::TryFutureExt,
|
2019-12-13 14:25:14 -08:00
|
|
|
prelude::*,
|
2019-10-21 15:24:17 -07:00
|
|
|
stream::FuturesUnordered,
|
|
|
|
};
|
2019-10-10 18:15:24 -07:00
|
|
|
use indexmap::IndexMap;
|
2020-09-01 14:28:54 -07:00
|
|
|
use tokio::sync::{broadcast, oneshot::error::TryRecvError};
|
2020-06-09 12:24:28 -07:00
|
|
|
use tokio::task::JoinHandle;
|
2019-10-10 18:15:24 -07:00
|
|
|
use tower::{
|
|
|
|
discover::{Change, Discover},
|
2020-09-21 14:00:20 -07:00
|
|
|
load::Load,
|
2019-10-10 18:15:24 -07:00
|
|
|
Service,
|
|
|
|
};
|
|
|
|
|
|
|
|
use crate::{
|
2020-09-01 14:28:54 -07:00
|
|
|
protocol::{
|
|
|
|
external::InventoryHash,
|
|
|
|
internal::{Request, Response},
|
|
|
|
},
|
2021-03-15 05:02:12 -07:00
|
|
|
AddressBook, BoxError,
|
2019-10-10 18:15:24 -07:00
|
|
|
};
|
|
|
|
|
2020-09-01 14:28:54 -07:00
|
|
|
use super::{
|
|
|
|
unready_service::{Error as UnreadyError, UnreadyService},
|
|
|
|
InventoryRegistry,
|
|
|
|
};
|
2019-10-10 18:15:24 -07:00
|
|
|
|
|
|
|
/// A [`tower::Service`] that abstractly represents "the rest of the network".
|
|
|
|
///
|
2021-05-06 17:50:04 -07:00
|
|
|
/// # Security
|
|
|
|
///
|
|
|
|
/// The `Discover::Key` must be the transient remote address of each peer. This
|
|
|
|
/// address may only be valid for the duration of a single connection. (For
|
|
|
|
/// example, inbound connections have an ephemeral remote port, and proxy
|
|
|
|
/// connections have an ephemeral local or proxy port.)
|
|
|
|
///
|
|
|
|
/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state.
|
|
|
|
///
|
|
|
|
/// # Implementation
|
|
|
|
///
|
2019-10-10 18:15:24 -07:00
|
|
|
/// This implementation is adapted from the one in `tower-balance`, and as
|
|
|
|
/// described in that crate's documentation, it
|
|
|
|
///
|
|
|
|
/// > Distributes requests across inner services using the [Power of Two Choices][p2c].
|
|
|
|
/// >
|
|
|
|
/// > As described in the [Finagle Guide][finagle]:
|
|
|
|
/// >
|
|
|
|
/// > > The algorithm randomly picks two services from the set of ready endpoints and
|
|
|
|
/// > > selects the least loaded of the two. By repeatedly using this strategy, we can
|
|
|
|
/// > > expect a manageable upper bound on the maximum load of any server.
|
|
|
|
/// > >
|
|
|
|
/// > > The maximum load variance between any two servers is bound by `ln(ln(n))` where
|
|
|
|
/// > > `n` is the number of servers in the cluster.
|
|
|
|
///
|
|
|
|
/// This should work well for many network requests, but not all of them: some
|
|
|
|
/// requests, e.g., a request for some particular inventory item, can only be
|
|
|
|
/// made to a subset of connected peers, e.g., the ones that have recently
|
|
|
|
/// advertised that inventory hash, and other requests require specialized logic
|
|
|
|
/// (e.g., transaction diffusion).
|
|
|
|
///
|
|
|
|
/// Implementing this specialized routing logic inside the `PeerSet` -- so that
|
|
|
|
/// it continues to abstract away "the rest of the network" into one endpoint --
|
|
|
|
/// is not a problem, as the `PeerSet` can simply maintain more information on
|
|
|
|
/// its peers and route requests appropriately. However, there is a problem with
|
|
|
|
/// maintaining accurate backpressure information, because the `Service` trait
|
|
|
|
/// requires that service readiness is independent of the data in the request.
|
|
|
|
///
|
|
|
|
/// For this reason, in the future, this code will probably be refactored to
|
|
|
|
/// address this backpressure mismatch. One possibility is to refactor the code
|
|
|
|
/// so that one entity holds and maintains the peer set and metadata on the
|
|
|
|
/// peers, and each "backpressure category" of request is assigned to different
|
|
|
|
/// `Service` impls with specialized `poll_ready()` implementations. Another
|
|
|
|
/// less-elegant solution (which might be useful as an intermediate step for the
|
|
|
|
/// inventory case) is to provide a way to borrow a particular backing service,
|
|
|
|
/// say by address.
|
|
|
|
///
|
|
|
|
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
|
|
|
|
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
|
|
|
|
pub struct PeerSet<D>
|
|
|
|
where
|
2020-09-01 14:28:54 -07:00
|
|
|
D: Discover<Key = SocketAddr>,
|
2019-10-10 18:15:24 -07:00
|
|
|
{
|
|
|
|
discover: D,
|
2020-11-23 23:02:48 -08:00
|
|
|
/// A preselected index for a ready service.
|
2020-11-23 23:52:40 -08:00
|
|
|
/// INVARIANT: If this is `Some(i)`, `i` must be a valid index for `ready_services`.
|
|
|
|
/// This means that every change to `ready_services` must invalidate or correct it.
|
|
|
|
preselected_p2c_index: Option<usize>,
|
2019-10-10 18:15:24 -07:00
|
|
|
ready_services: IndexMap<D::Key, D::Service>,
|
|
|
|
cancel_handles: HashMap<D::Key, oneshot::Sender<()>>,
|
|
|
|
unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,
|
2019-10-21 15:24:17 -07:00
|
|
|
demand_signal: mpsc::Sender<()>,
|
2020-06-09 12:24:28 -07:00
|
|
|
/// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
|
|
|
|
///
|
|
|
|
/// The join handles passed into the PeerSet are used populate the `guards` member
|
2020-09-18 11:20:55 -07:00
|
|
|
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
|
2020-06-09 12:24:28 -07:00
|
|
|
/// Unordered set of handles to background tasks associated with the `PeerSet`
|
|
|
|
///
|
|
|
|
/// These guards are checked for errors as part of `poll_ready` which lets
|
|
|
|
/// the `PeerSet` propagate errors from background tasks back to the user
|
2020-09-18 11:20:55 -07:00
|
|
|
guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxError>>>,
|
2020-09-01 14:28:54 -07:00
|
|
|
inventory_registry: InventoryRegistry,
|
2020-11-29 22:41:14 -08:00
|
|
|
/// The last time we logged a message about the peer set size
|
|
|
|
last_peer_log: Option<Instant>,
|
2021-03-15 05:02:12 -07:00
|
|
|
/// A shared list of peer addresses.
|
|
|
|
///
|
|
|
|
/// Used for logging diagnostics.
|
2021-04-18 23:04:24 -07:00
|
|
|
address_book: Arc<std::sync::Mutex<AddressBook>>,
|
2019-10-10 18:15:24 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<D> PeerSet<D>
|
|
|
|
where
|
2020-09-01 14:28:54 -07:00
|
|
|
D: Discover<Key = SocketAddr> + Unpin,
|
2019-10-10 18:15:24 -07:00
|
|
|
D::Service: Service<Request, Response = Response> + Load,
|
2020-09-18 11:20:55 -07:00
|
|
|
D::Error: Into<BoxError>,
|
|
|
|
<D::Service as Service<Request>>::Error: Into<BoxError> + 'static,
|
2019-10-10 18:15:24 -07:00
|
|
|
<D::Service as Service<Request>>::Future: Send + 'static,
|
|
|
|
<D::Service as Load>::Metric: Debug,
|
|
|
|
{
|
|
|
|
/// Construct a peerset which uses `discover` internally.
|
2020-06-09 12:24:28 -07:00
|
|
|
pub fn new(
|
|
|
|
discover: D,
|
|
|
|
demand_signal: mpsc::Sender<()>,
|
2020-09-18 11:20:55 -07:00
|
|
|
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
|
2020-09-01 14:28:54 -07:00
|
|
|
inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>,
|
2021-04-18 23:04:24 -07:00
|
|
|
address_book: Arc<std::sync::Mutex<AddressBook>>,
|
2020-06-09 12:24:28 -07:00
|
|
|
) -> Self {
|
2019-10-10 18:15:24 -07:00
|
|
|
Self {
|
|
|
|
discover,
|
2020-11-23 23:52:40 -08:00
|
|
|
preselected_p2c_index: None,
|
2019-10-10 18:15:24 -07:00
|
|
|
ready_services: IndexMap::new(),
|
|
|
|
cancel_handles: HashMap::new(),
|
|
|
|
unready_services: FuturesUnordered::new(),
|
2019-10-21 15:24:17 -07:00
|
|
|
demand_signal,
|
2020-06-09 12:24:28 -07:00
|
|
|
guards: futures::stream::FuturesUnordered::new(),
|
|
|
|
handle_rx,
|
2020-09-01 14:28:54 -07:00
|
|
|
inventory_registry: InventoryRegistry::new(inv_stream),
|
2020-11-29 22:41:14 -08:00
|
|
|
last_peer_log: None,
|
2021-03-15 05:02:12 -07:00
|
|
|
address_book,
|
2019-10-10 18:15:24 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-23 23:52:40 -08:00
|
|
|
fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> {
|
2020-06-09 12:24:28 -07:00
|
|
|
if self.guards.is_empty() {
|
|
|
|
match self.handle_rx.try_recv() {
|
|
|
|
Ok(handles) => {
|
|
|
|
for handle in handles {
|
|
|
|
self.guards.push(handle);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(TryRecvError::Closed) => unreachable!(
|
|
|
|
"try_recv will never be called if the futures have already been received"
|
|
|
|
),
|
|
|
|
Err(TryRecvError::Empty) => return Ok(()),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
match Pin::new(&mut self.guards).poll_next(cx) {
|
|
|
|
Poll::Pending => {}
|
|
|
|
Poll::Ready(Some(res)) => res??,
|
|
|
|
Poll::Ready(None) => Err("all background tasks have exited")?,
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2019-10-10 18:15:24 -07:00
|
|
|
fn poll_unready(&mut self, cx: &mut Context<'_>) {
|
|
|
|
loop {
|
|
|
|
match Pin::new(&mut self.unready_services).poll_next(cx) {
|
|
|
|
Poll::Pending | Poll::Ready(None) => return,
|
|
|
|
Poll::Ready(Some(Ok((key, svc)))) => {
|
|
|
|
trace!(?key, "service became ready");
|
|
|
|
let _cancel = self.cancel_handles.remove(&key);
|
2020-06-16 13:12:09 -07:00
|
|
|
assert!(_cancel.is_some(), "missing cancel handle");
|
2019-10-10 18:15:24 -07:00
|
|
|
self.ready_services.insert(key, svc);
|
|
|
|
}
|
|
|
|
Poll::Ready(Some(Err((key, UnreadyError::Canceled)))) => {
|
fix panic in seed subcommand (#401)
Co-authored-by: Jane Lusby <jane@zfnd.org>
Prior to this change, the seed subcommand would consistently encounter a panic in one of the background tasks, but would continue running after the panic. This is indicative of two bugs.
First, zebrad was not configured to treat panics as non recoverable and instead defaulted to the tokio defaults, which are to catch panics in tasks and return them via the join handle if available, or to print them if the join handle has been discarded. This is likely a poor fit for zebrad as an application, we do not need to maximize uptime or minimize the extent of an outage should one of our tasks / services start encountering panics. Ignoring a panic increases our risk of observing invalid state, causing all sorts of wild and bad bugs. To deal with this we've switched the default panic behavior from `unwind` to `abort`. This makes panics fail immediately and take down the entire application, regardless of where they occur, which is consistent with our treatment of misbehaving connections.
The second bug is the panic itself. This was triggered by a duplicate entry in the initial_peers set. To fix this we've switched the storage for the peers from a `Vec` to a `HashSet`, which has similar properties but guarantees uniqueness of its keys.
2020-05-27 17:40:12 -07:00
|
|
|
trace!(?key, "service was canceled");
|
2020-06-16 13:12:09 -07:00
|
|
|
// This debug assert is invalid because we can have a
|
|
|
|
// service be canceled due us connecting to the same service
|
|
|
|
// twice.
|
|
|
|
//
|
|
|
|
// assert!(!self.cancel_handles.contains_key(&key))
|
2019-10-10 18:15:24 -07:00
|
|
|
}
|
|
|
|
Poll::Ready(Some(Err((key, UnreadyError::Inner(e))))) => {
|
|
|
|
let error = e.into();
|
|
|
|
debug!(%error, "service failed while unready, dropped");
|
|
|
|
let _cancel = self.cancel_handles.remove(&key);
|
2020-06-16 13:12:09 -07:00
|
|
|
assert!(_cancel.is_some(), "missing cancel handle");
|
2019-10-10 18:15:24 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-23 23:52:40 -08:00
|
|
|
fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
|
|
|
|
use futures::ready;
|
|
|
|
loop {
|
|
|
|
match ready!(Pin::new(&mut self.discover).poll_discover(cx))
|
|
|
|
.ok_or("discovery stream closed")?
|
|
|
|
.map_err(Into::into)?
|
|
|
|
{
|
|
|
|
Change::Remove(key) => {
|
|
|
|
trace!(?key, "got Change::Remove from Discover");
|
|
|
|
self.remove(&key);
|
|
|
|
}
|
|
|
|
Change::Insert(key, svc) => {
|
|
|
|
trace!(?key, "got Change::Insert from Discover");
|
|
|
|
self.remove(&key);
|
|
|
|
self.push_unready(key, svc);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Takes a ready service by key, preserving `preselected_p2c_index` if possible.
|
|
|
|
fn take_ready_service(&mut self, key: &D::Key) -> Option<(D::Key, D::Service)> {
|
|
|
|
if let Some((i, key, svc)) = self.ready_services.swap_remove_full(key) {
|
|
|
|
// swap_remove perturbs the position of the last element of
|
|
|
|
// ready_services, so we may have invalidated self.next_idx, in
|
|
|
|
// which case we need to fix it. Specifically, swap_remove swaps the
|
|
|
|
// position of the removee and the last element, then drops the
|
|
|
|
// removee from the end, so we compare the active and removed indices:
|
|
|
|
//
|
|
|
|
// We just removed one element, so this was the index of the last element.
|
|
|
|
let last_idx = self.ready_services.len();
|
|
|
|
self.preselected_p2c_index = match self.preselected_p2c_index {
|
|
|
|
None => None, // No active index
|
|
|
|
Some(j) if j == i => None, // We removed j
|
|
|
|
Some(j) if j == last_idx => Some(i), // We swapped i and j
|
|
|
|
Some(j) => Some(j), // We swapped an unrelated service.
|
|
|
|
};
|
|
|
|
// No Heisenservices: they must be ready or unready.
|
|
|
|
assert!(!self.cancel_handles.contains_key(&key));
|
|
|
|
Some((key, svc))
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn remove(&mut self, key: &D::Key) {
|
2020-11-24 01:19:50 -08:00
|
|
|
if self.take_ready_service(key).is_some() {
|
2020-11-23 23:52:40 -08:00
|
|
|
} else if let Some(handle) = self.cancel_handles.remove(key) {
|
|
|
|
let _ = handle.send(());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn push_unready(&mut self, key: D::Key, svc: D::Service) {
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
self.cancel_handles.insert(key, tx);
|
|
|
|
self.unready_services.push(UnreadyService {
|
|
|
|
key: Some(key),
|
|
|
|
service: Some(svc),
|
|
|
|
cancel: rx,
|
|
|
|
_req: PhantomData,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2019-10-10 18:15:24 -07:00
|
|
|
/// Performs P2C on inner services to select a ready service.
|
2020-11-23 23:52:40 -08:00
|
|
|
fn preselect_p2c_index(&mut self) -> Option<usize> {
|
2019-10-10 18:15:24 -07:00
|
|
|
match self.ready_services.len() {
|
|
|
|
0 => None,
|
|
|
|
1 => Some(0),
|
|
|
|
len => {
|
|
|
|
let (a, b) = {
|
|
|
|
let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2);
|
|
|
|
(idxs.index(0), idxs.index(1))
|
|
|
|
};
|
|
|
|
|
2020-11-23 23:52:40 -08:00
|
|
|
let a_load = self.query_load(a);
|
|
|
|
let b_load = self.query_load(b);
|
2019-10-10 18:15:24 -07:00
|
|
|
|
|
|
|
let selected = if a_load <= b_load { a } else { b };
|
|
|
|
|
|
|
|
trace!(a.idx = a, a.load = ?a_load, b.idx = b, b.load = ?b_load, selected, "selected service by p2c");
|
|
|
|
|
|
|
|
Some(selected)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Accesses a ready endpoint by index and returns its current load.
|
2020-11-23 23:52:40 -08:00
|
|
|
fn query_load(&self, index: usize) -> <D::Service as Load>::Metric {
|
2019-10-10 18:15:24 -07:00
|
|
|
let (_, svc) = self.ready_services.get_index(index).expect("invalid index");
|
|
|
|
svc.load()
|
|
|
|
}
|
2020-09-01 14:28:54 -07:00
|
|
|
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
/// Routes a request using P2C load-balancing.
|
|
|
|
fn route_p2c(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
|
2020-09-01 14:28:54 -07:00
|
|
|
let index = self
|
2020-11-23 23:52:40 -08:00
|
|
|
.preselected_p2c_index
|
2020-09-01 14:28:54 -07:00
|
|
|
.take()
|
|
|
|
.expect("ready service must have valid preselected index");
|
|
|
|
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
let (key, mut svc) = self
|
|
|
|
.ready_services
|
2020-09-01 14:28:54 -07:00
|
|
|
.swap_remove_index(index)
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
.expect("preselected index must be valid");
|
|
|
|
|
|
|
|
let fut = svc.call(req);
|
|
|
|
self.push_unready(key, svc);
|
|
|
|
fut.map_err(Into::into).boxed()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Tries to route a request to a peer that advertised that inventory,
|
|
|
|
/// falling back to P2C if there is no ready peer.
|
|
|
|
fn route_inv(
|
|
|
|
&mut self,
|
|
|
|
req: Request,
|
|
|
|
hash: InventoryHash,
|
|
|
|
) -> <Self as tower::Service<Request>>::Future {
|
2020-11-23 23:52:40 -08:00
|
|
|
let peer = self
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
.inventory_registry
|
|
|
|
.peers(&hash)
|
2020-11-23 23:52:40 -08:00
|
|
|
.find(|&key| self.ready_services.contains_key(key))
|
|
|
|
.cloned();
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
|
2020-11-23 23:52:40 -08:00
|
|
|
match peer.and_then(|key| self.take_ready_service(&key)) {
|
|
|
|
Some((key, mut svc)) => {
|
2020-11-23 23:02:48 -08:00
|
|
|
tracing::debug!(?hash, ?key, "routing based on inventory");
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
let fut = svc.call(req);
|
|
|
|
self.push_unready(key, svc);
|
|
|
|
fut.map_err(Into::into).boxed()
|
|
|
|
}
|
|
|
|
None => {
|
2020-11-23 23:52:40 -08:00
|
|
|
tracing::debug!(?hash, "no ready peer for inventory, falling back to p2c");
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
self.route_p2c(req)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Routes a request to all ready peers, ignoring return values.
|
|
|
|
fn route_all(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
|
|
|
|
// This is not needless: otherwise, we'd hold a &mut reference to self.ready_services,
|
|
|
|
// blocking us from passing &mut self to push_unready.
|
|
|
|
let ready_services = std::mem::take(&mut self.ready_services);
|
2020-11-23 23:52:40 -08:00
|
|
|
self.preselected_p2c_index = None; // All services are now unready.
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
|
|
|
|
let futs = FuturesUnordered::new();
|
|
|
|
for (key, mut svc) in ready_services {
|
|
|
|
futs.push(svc.call(req.clone()).map_err(|_| ()));
|
|
|
|
self.push_unready(key, svc);
|
|
|
|
}
|
|
|
|
|
|
|
|
async move {
|
|
|
|
let results = futs.collect::<Vec<Result<_, _>>>().await;
|
|
|
|
tracing::debug!(
|
|
|
|
ok.len = results.iter().filter(|r| r.is_ok()).count(),
|
|
|
|
err.len = results.iter().filter(|r| r.is_err()).count(),
|
|
|
|
);
|
|
|
|
Ok(Response::Nil)
|
|
|
|
}
|
|
|
|
.boxed()
|
2020-09-01 14:28:54 -07:00
|
|
|
}
|
2020-11-23 23:52:40 -08:00
|
|
|
|
2020-11-29 22:41:14 -08:00
|
|
|
fn log_peer_set_size(&mut self) {
|
|
|
|
let ready_services_len = self.ready_services.len();
|
|
|
|
let unready_services_len = self.unready_services.len();
|
|
|
|
trace!(ready_peers = ?ready_services_len, unready_peers = ?unready_services_len);
|
|
|
|
|
|
|
|
if ready_services_len > 0 {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// These logs are designed to be human-readable in a terminal, at the
|
|
|
|
// default Zebra log level. If you need to know the peer set size for
|
|
|
|
// every request, use the trace-level logs, or the metrics exporter.
|
|
|
|
if let Some(last_peer_log) = self.last_peer_log {
|
|
|
|
// Avoid duplicate peer set logs
|
|
|
|
if Instant::now().duration_since(last_peer_log).as_secs() < 60 {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Suppress initial logs until the peer set has started up.
|
|
|
|
// There can be multiple initial requests before the first peer is
|
|
|
|
// ready.
|
|
|
|
self.last_peer_log = Some(Instant::now());
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
self.last_peer_log = Some(Instant::now());
|
2021-04-18 23:04:24 -07:00
|
|
|
|
|
|
|
// # Correctness
|
|
|
|
//
|
2021-03-15 05:02:12 -07:00
|
|
|
// Only log address metrics in exceptional circumstances, to avoid lock contention.
|
2021-04-18 23:04:24 -07:00
|
|
|
//
|
|
|
|
// TODO: replace with a watch channel that is updated in `AddressBook::update_metrics()`,
|
|
|
|
// or turn the address book into a service (#1976)
|
2021-03-15 05:02:12 -07:00
|
|
|
let address_metrics = self.address_book.lock().unwrap().address_metrics();
|
2020-11-29 22:41:14 -08:00
|
|
|
if unready_services_len == 0 {
|
2021-03-15 05:02:12 -07:00
|
|
|
warn!(
|
|
|
|
?address_metrics,
|
|
|
|
"network request with no peer connections. Hint: check your network connection"
|
|
|
|
);
|
2020-11-29 22:41:14 -08:00
|
|
|
} else {
|
2021-03-15 05:02:12 -07:00
|
|
|
info!(?address_metrics, "network request with no ready peers: finding more peers, waiting for {} peers to answer requests",
|
2020-11-29 22:41:14 -08:00
|
|
|
unready_services_len);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-23 23:52:40 -08:00
|
|
|
fn update_metrics(&self) {
|
|
|
|
let num_ready = self.ready_services.len();
|
|
|
|
let num_unready = self.unready_services.len();
|
|
|
|
let num_peers = num_ready + num_unready;
|
2021-01-11 18:28:56 -08:00
|
|
|
metrics::gauge!("pool.num_ready", num_ready as f64);
|
|
|
|
metrics::gauge!("pool.num_unready", num_unready as f64);
|
2021-03-13 18:03:13 -08:00
|
|
|
metrics::gauge!("zcash.net.peers", num_peers as f64);
|
2020-11-23 23:52:40 -08:00
|
|
|
}
|
2019-10-10 18:15:24 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<D> Service<Request> for PeerSet<D>
|
|
|
|
where
|
2020-09-01 14:28:54 -07:00
|
|
|
D: Discover<Key = SocketAddr> + Unpin,
|
2019-10-10 18:15:24 -07:00
|
|
|
D::Service: Service<Request, Response = Response> + Load,
|
2020-09-18 11:20:55 -07:00
|
|
|
D::Error: Into<BoxError>,
|
|
|
|
<D::Service as Service<Request>>::Error: Into<BoxError> + 'static,
|
2019-10-10 18:15:24 -07:00
|
|
|
<D::Service as Service<Request>>::Future: Send + 'static,
|
|
|
|
<D::Service as Load>::Metric: Debug,
|
|
|
|
{
|
|
|
|
type Response = Response;
|
2020-09-18 11:20:55 -07:00
|
|
|
type Error = BoxError;
|
2019-10-16 17:06:21 -07:00
|
|
|
type Future =
|
|
|
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
2019-10-10 18:15:24 -07:00
|
|
|
|
|
|
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
2020-11-23 23:52:40 -08:00
|
|
|
self.poll_background_errors(cx)?;
|
2019-10-10 18:15:24 -07:00
|
|
|
// Process peer discovery updates.
|
|
|
|
let _ = self.poll_discover(cx)?;
|
2020-09-01 14:28:54 -07:00
|
|
|
self.inventory_registry.poll_inventory(cx)?;
|
2019-10-10 18:15:24 -07:00
|
|
|
self.poll_unready(cx);
|
2020-11-23 23:52:40 -08:00
|
|
|
|
2020-11-29 22:41:14 -08:00
|
|
|
self.log_peer_set_size();
|
2020-11-23 23:52:40 -08:00
|
|
|
self.update_metrics();
|
2019-10-10 18:15:24 -07:00
|
|
|
|
|
|
|
loop {
|
|
|
|
// Re-check that the pre-selected service is ready, in case
|
|
|
|
// something has happened since (e.g., it failed, peer closed
|
|
|
|
// connection, ...)
|
2020-11-23 23:52:40 -08:00
|
|
|
if let Some(index) = self.preselected_p2c_index {
|
2019-10-10 18:15:24 -07:00
|
|
|
let (key, service) = self
|
|
|
|
.ready_services
|
|
|
|
.get_index_mut(index)
|
|
|
|
.expect("preselected index must be valid");
|
|
|
|
trace!(preselected_index = index, ?key);
|
|
|
|
match service.poll_ready(cx) {
|
|
|
|
Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
|
|
|
|
Poll::Pending => {
|
|
|
|
trace!("preselected service is no longer ready");
|
|
|
|
let (key, service) = self
|
|
|
|
.ready_services
|
|
|
|
.swap_remove_index(index)
|
|
|
|
.expect("preselected index must be valid");
|
|
|
|
self.push_unready(key, service);
|
|
|
|
}
|
|
|
|
Poll::Ready(Err(e)) => {
|
|
|
|
let error = e.into();
|
|
|
|
trace!(%error, "preselected service failed, dropping it");
|
|
|
|
self.ready_services
|
|
|
|
.swap_remove_index(index)
|
|
|
|
.expect("preselected index must be valid");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
trace!("preselected service was not ready, reselecting");
|
2020-11-23 23:52:40 -08:00
|
|
|
self.preselected_p2c_index = self.preselect_p2c_index();
|
2021-02-17 13:06:59 -08:00
|
|
|
self.update_metrics();
|
2019-10-10 18:15:24 -07:00
|
|
|
|
2020-11-23 23:52:40 -08:00
|
|
|
if self.preselected_p2c_index.is_none() {
|
2021-04-01 19:50:17 -07:00
|
|
|
// CORRECTNESS
|
|
|
|
//
|
|
|
|
// If the channel is full, drop the demand signal rather than waiting.
|
|
|
|
// If we waited here, the crawler could deadlock sending a request to
|
|
|
|
// fetch more peers, because it also empties the channel.
|
2019-10-21 15:24:17 -07:00
|
|
|
trace!("no ready services, sending demand signal");
|
|
|
|
let _ = self.demand_signal.try_send(());
|
2021-04-01 19:50:17 -07:00
|
|
|
|
2021-03-26 20:50:46 -07:00
|
|
|
// CORRECTNESS
|
|
|
|
//
|
|
|
|
// The current task must be scheduled for wakeup every time we
|
|
|
|
// return `Poll::Pending`.
|
|
|
|
//
|
|
|
|
// As long as there are unready or new peers, this task will run,
|
|
|
|
// because:
|
|
|
|
// - `poll_discover` schedules this task for wakeup when new
|
|
|
|
// peers arrive.
|
|
|
|
// - if there are unready peers, `poll_unready` schedules this
|
|
|
|
// task for wakeup when peer services become ready.
|
|
|
|
// - if the preselected peer is not ready, `service.poll_ready`
|
|
|
|
// schedules this task for wakeup when that service becomes
|
|
|
|
// ready.
|
|
|
|
//
|
|
|
|
// To avoid peers blocking on a full background error channel:
|
|
|
|
// - if no background tasks have exited since the last poll,
|
|
|
|
// `poll_background_errors` schedules this task for wakeup when
|
|
|
|
// the next task exits.
|
2019-10-10 18:15:24 -07:00
|
|
|
return Poll::Pending;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn call(&mut self, req: Request) -> Self::Future {
|
2021-02-17 13:06:59 -08:00
|
|
|
let fut = match req {
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
// Only do inventory-aware routing on individual items.
|
|
|
|
Request::BlocksByHash(ref hashes) if hashes.len() == 1 => {
|
|
|
|
let hash = InventoryHash::from(*hashes.iter().next().unwrap());
|
|
|
|
self.route_inv(req, hash)
|
|
|
|
}
|
2021-08-18 15:55:24 -07:00
|
|
|
Request::TransactionsById(ref hashes) if hashes.len() == 1 => {
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
let hash = InventoryHash::from(*hashes.iter().next().unwrap());
|
|
|
|
self.route_inv(req, hash)
|
|
|
|
}
|
2021-08-18 15:55:24 -07:00
|
|
|
|
|
|
|
// Broadcast advertisements to all peers
|
|
|
|
Request::AdvertiseTransactionIds(_) => self.route_all(req),
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
Request::AdvertiseBlock(_) => self.route_all(req),
|
2021-08-18 15:55:24 -07:00
|
|
|
|
|
|
|
// Choose a random less-loaded peer for all other requests
|
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
|
|
|
_ => self.route_p2c(req),
|
2021-02-17 13:06:59 -08:00
|
|
|
};
|
|
|
|
self.update_metrics();
|
|
|
|
|
|
|
|
fut
|
2019-10-10 18:15:24 -07:00
|
|
|
}
|
|
|
|
}
|