2020-09-01 14:28:54 -07:00
use std ::net ::SocketAddr ;
2019-10-10 18:15:24 -07:00
use std ::{
collections ::HashMap ,
2020-02-14 13:38:33 -08:00
convert ::TryInto ,
2019-10-10 18:15:24 -07:00
fmt ::Debug ,
2019-12-13 14:25:14 -08:00
future ::Future ,
2019-10-10 18:15:24 -07:00
marker ::PhantomData ,
pin ::Pin ,
task ::{ Context , Poll } ,
2020-11-29 22:41:14 -08:00
time ::Instant ,
2019-10-10 18:15:24 -07:00
} ;
2019-10-21 15:24:17 -07:00
use futures ::{
channel ::{ mpsc , oneshot } ,
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
future ::TryFutureExt ,
2019-12-13 14:25:14 -08:00
prelude ::* ,
2019-10-21 15:24:17 -07:00
stream ::FuturesUnordered ,
} ;
2019-10-10 18:15:24 -07:00
use indexmap ::IndexMap ;
2020-09-01 14:28:54 -07:00
use tokio ::sync ::{ broadcast , oneshot ::error ::TryRecvError } ;
2020-06-09 12:24:28 -07:00
use tokio ::task ::JoinHandle ;
2019-10-10 18:15:24 -07:00
use tower ::{
discover ::{ Change , Discover } ,
2020-09-21 14:00:20 -07:00
load ::Load ,
2019-10-10 18:15:24 -07:00
Service ,
} ;
use crate ::{
2020-09-01 14:28:54 -07:00
protocol ::{
external ::InventoryHash ,
internal ::{ Request , Response } ,
} ,
2020-09-18 11:20:55 -07:00
BoxError ,
2019-10-10 18:15:24 -07:00
} ;
2020-09-01 14:28:54 -07:00
use super ::{
unready_service ::{ Error as UnreadyError , UnreadyService } ,
InventoryRegistry ,
} ;
2019-10-10 18:15:24 -07:00
/// A [`tower::Service`] that abstractly represents "the rest of the network".
///
/// This implementation is adapted from the one in `tower-balance`, and as
/// described in that crate's documentation, it
///
/// > Distributes requests across inner services using the [Power of Two Choices][p2c].
/// >
/// > As described in the [Finagle Guide][finagle]:
/// >
/// > > The algorithm randomly picks two services from the set of ready endpoints and
/// > > selects the least loaded of the two. By repeatedly using this strategy, we can
/// > > expect a manageable upper bound on the maximum load of any server.
/// > >
/// > > The maximum load variance between any two servers is bound by `ln(ln(n))` where
/// > > `n` is the number of servers in the cluster.
///
/// This should work well for many network requests, but not all of them: some
/// requests, e.g., a request for some particular inventory item, can only be
/// made to a subset of connected peers, e.g., the ones that have recently
/// advertised that inventory hash, and other requests require specialized logic
/// (e.g., transaction diffusion).
///
/// Implementing this specialized routing logic inside the `PeerSet` -- so that
/// it continues to abstract away "the rest of the network" into one endpoint --
/// is not a problem, as the `PeerSet` can simply maintain more information on
/// its peers and route requests appropriately. However, there is a problem with
/// maintaining accurate backpressure information, because the `Service` trait
/// requires that service readiness is independent of the data in the request.
///
/// For this reason, in the future, this code will probably be refactored to
/// address this backpressure mismatch. One possibility is to refactor the code
/// so that one entity holds and maintains the peer set and metadata on the
/// peers, and each "backpressure category" of request is assigned to different
/// `Service` impls with specialized `poll_ready()` implementations. Another
/// less-elegant solution (which might be useful as an intermediate step for the
/// inventory case) is to provide a way to borrow a particular backing service,
/// say by address.
///
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
pub struct PeerSet < D >
where
2020-09-01 14:28:54 -07:00
D : Discover < Key = SocketAddr > ,
2019-10-10 18:15:24 -07:00
{
discover : D ,
2020-11-23 23:02:48 -08:00
/// A preselected index for a ready service.
2020-11-23 23:52:40 -08:00
/// INVARIANT: If this is `Some(i)`, `i` must be a valid index for `ready_services`.
/// This means that every change to `ready_services` must invalidate or correct it.
preselected_p2c_index : Option < usize > ,
2019-10-10 18:15:24 -07:00
ready_services : IndexMap < D ::Key , D ::Service > ,
cancel_handles : HashMap < D ::Key , oneshot ::Sender < ( ) > > ,
unready_services : FuturesUnordered < UnreadyService < D ::Key , D ::Service , Request > > ,
2019-10-21 15:24:17 -07:00
demand_signal : mpsc ::Sender < ( ) > ,
2020-06-09 12:24:28 -07:00
/// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
///
/// The join handles passed into the PeerSet are used populate the `guards` member
2020-09-18 11:20:55 -07:00
handle_rx : tokio ::sync ::oneshot ::Receiver < Vec < JoinHandle < Result < ( ) , BoxError > > > > ,
2020-06-09 12:24:28 -07:00
/// Unordered set of handles to background tasks associated with the `PeerSet`
///
/// These guards are checked for errors as part of `poll_ready` which lets
/// the `PeerSet` propagate errors from background tasks back to the user
2020-09-18 11:20:55 -07:00
guards : futures ::stream ::FuturesUnordered < JoinHandle < Result < ( ) , BoxError > > > ,
2020-09-01 14:28:54 -07:00
inventory_registry : InventoryRegistry ,
2020-11-29 22:41:14 -08:00
/// The last time we logged a message about the peer set size
last_peer_log : Option < Instant > ,
2019-10-10 18:15:24 -07:00
}
impl < D > PeerSet < D >
where
2020-09-01 14:28:54 -07:00
D : Discover < Key = SocketAddr > + Unpin ,
2019-10-10 18:15:24 -07:00
D ::Service : Service < Request , Response = Response > + Load ,
2020-09-18 11:20:55 -07:00
D ::Error : Into < BoxError > ,
< D ::Service as Service < Request > > ::Error : Into < BoxError > + 'static ,
2019-10-10 18:15:24 -07:00
< D ::Service as Service < Request > > ::Future : Send + 'static ,
< D ::Service as Load > ::Metric : Debug ,
{
/// Construct a peerset which uses `discover` internally.
2020-06-09 12:24:28 -07:00
pub fn new (
discover : D ,
demand_signal : mpsc ::Sender < ( ) > ,
2020-09-18 11:20:55 -07:00
handle_rx : tokio ::sync ::oneshot ::Receiver < Vec < JoinHandle < Result < ( ) , BoxError > > > > ,
2020-09-01 14:28:54 -07:00
inv_stream : broadcast ::Receiver < ( InventoryHash , SocketAddr ) > ,
2020-06-09 12:24:28 -07:00
) -> Self {
2019-10-10 18:15:24 -07:00
Self {
discover ,
2020-11-23 23:52:40 -08:00
preselected_p2c_index : None ,
2019-10-10 18:15:24 -07:00
ready_services : IndexMap ::new ( ) ,
cancel_handles : HashMap ::new ( ) ,
unready_services : FuturesUnordered ::new ( ) ,
2019-10-21 15:24:17 -07:00
demand_signal ,
2020-06-09 12:24:28 -07:00
guards : futures ::stream ::FuturesUnordered ::new ( ) ,
handle_rx ,
2020-09-01 14:28:54 -07:00
inventory_registry : InventoryRegistry ::new ( inv_stream ) ,
2020-11-29 22:41:14 -08:00
last_peer_log : None ,
2019-10-10 18:15:24 -07:00
}
}
2020-11-23 23:52:40 -08:00
fn poll_background_errors ( & mut self , cx : & mut Context ) -> Result < ( ) , BoxError > {
2020-06-09 12:24:28 -07:00
if self . guards . is_empty ( ) {
match self . handle_rx . try_recv ( ) {
Ok ( handles ) = > {
for handle in handles {
self . guards . push ( handle ) ;
}
}
Err ( TryRecvError ::Closed ) = > unreachable! (
" try_recv will never be called if the futures have already been received "
) ,
Err ( TryRecvError ::Empty ) = > return Ok ( ( ) ) ,
}
}
match Pin ::new ( & mut self . guards ) . poll_next ( cx ) {
Poll ::Pending = > { }
Poll ::Ready ( Some ( res ) ) = > res ? ? ,
Poll ::Ready ( None ) = > Err ( " all background tasks have exited " ) ? ,
}
Ok ( ( ) )
}
2019-10-10 18:15:24 -07:00
fn poll_unready ( & mut self , cx : & mut Context < '_ > ) {
loop {
match Pin ::new ( & mut self . unready_services ) . poll_next ( cx ) {
Poll ::Pending | Poll ::Ready ( None ) = > return ,
Poll ::Ready ( Some ( Ok ( ( key , svc ) ) ) ) = > {
trace! ( ? key , " service became ready " ) ;
let _cancel = self . cancel_handles . remove ( & key ) ;
2020-06-16 13:12:09 -07:00
assert! ( _cancel . is_some ( ) , " missing cancel handle " ) ;
2019-10-10 18:15:24 -07:00
self . ready_services . insert ( key , svc ) ;
}
Poll ::Ready ( Some ( Err ( ( key , UnreadyError ::Canceled ) ) ) ) = > {
fix panic in seed subcommand (#401)
Co-authored-by: Jane Lusby <jane@zfnd.org>
Prior to this change, the seed subcommand would consistently encounter a panic in one of the background tasks, but would continue running after the panic. This is indicative of two bugs.
First, zebrad was not configured to treat panics as non recoverable and instead defaulted to the tokio defaults, which are to catch panics in tasks and return them via the join handle if available, or to print them if the join handle has been discarded. This is likely a poor fit for zebrad as an application, we do not need to maximize uptime or minimize the extent of an outage should one of our tasks / services start encountering panics. Ignoring a panic increases our risk of observing invalid state, causing all sorts of wild and bad bugs. To deal with this we've switched the default panic behavior from `unwind` to `abort`. This makes panics fail immediately and take down the entire application, regardless of where they occur, which is consistent with our treatment of misbehaving connections.
The second bug is the panic itself. This was triggered by a duplicate entry in the initial_peers set. To fix this we've switched the storage for the peers from a `Vec` to a `HashSet`, which has similar properties but guarantees uniqueness of its keys.
2020-05-27 17:40:12 -07:00
trace! ( ? key , " service was canceled " ) ;
2020-06-16 13:12:09 -07:00
// This debug assert is invalid because we can have a
// service be canceled due us connecting to the same service
// twice.
//
// assert!(!self.cancel_handles.contains_key(&key))
2019-10-10 18:15:24 -07:00
}
Poll ::Ready ( Some ( Err ( ( key , UnreadyError ::Inner ( e ) ) ) ) ) = > {
let error = e . into ( ) ;
debug! ( % error , " service failed while unready, dropped " ) ;
let _cancel = self . cancel_handles . remove ( & key ) ;
2020-06-16 13:12:09 -07:00
assert! ( _cancel . is_some ( ) , " missing cancel handle " ) ;
2019-10-10 18:15:24 -07:00
}
}
}
}
2020-11-23 23:52:40 -08:00
fn poll_discover ( & mut self , cx : & mut Context < '_ > ) -> Poll < Result < ( ) , BoxError > > {
use futures ::ready ;
loop {
match ready! ( Pin ::new ( & mut self . discover ) . poll_discover ( cx ) )
. ok_or ( " discovery stream closed " ) ?
. map_err ( Into ::into ) ?
{
Change ::Remove ( key ) = > {
trace! ( ? key , " got Change::Remove from Discover " ) ;
self . remove ( & key ) ;
}
Change ::Insert ( key , svc ) = > {
trace! ( ? key , " got Change::Insert from Discover " ) ;
self . remove ( & key ) ;
self . push_unready ( key , svc ) ;
}
}
}
}
/// Takes a ready service by key, preserving `preselected_p2c_index` if possible.
fn take_ready_service ( & mut self , key : & D ::Key ) -> Option < ( D ::Key , D ::Service ) > {
if let Some ( ( i , key , svc ) ) = self . ready_services . swap_remove_full ( key ) {
// swap_remove perturbs the position of the last element of
// ready_services, so we may have invalidated self.next_idx, in
// which case we need to fix it. Specifically, swap_remove swaps the
// position of the removee and the last element, then drops the
// removee from the end, so we compare the active and removed indices:
//
// We just removed one element, so this was the index of the last element.
let last_idx = self . ready_services . len ( ) ;
self . preselected_p2c_index = match self . preselected_p2c_index {
None = > None , // No active index
Some ( j ) if j = = i = > None , // We removed j
Some ( j ) if j = = last_idx = > Some ( i ) , // We swapped i and j
Some ( j ) = > Some ( j ) , // We swapped an unrelated service.
} ;
// No Heisenservices: they must be ready or unready.
assert! ( ! self . cancel_handles . contains_key ( & key ) ) ;
Some ( ( key , svc ) )
} else {
None
}
}
fn remove ( & mut self , key : & D ::Key ) {
2020-11-24 01:19:50 -08:00
if self . take_ready_service ( key ) . is_some ( ) {
2020-11-23 23:52:40 -08:00
} else if let Some ( handle ) = self . cancel_handles . remove ( key ) {
let _ = handle . send ( ( ) ) ;
}
}
fn push_unready ( & mut self , key : D ::Key , svc : D ::Service ) {
let ( tx , rx ) = oneshot ::channel ( ) ;
self . cancel_handles . insert ( key , tx ) ;
self . unready_services . push ( UnreadyService {
key : Some ( key ) ,
service : Some ( svc ) ,
cancel : rx ,
_req : PhantomData ,
} ) ;
}
2019-10-10 18:15:24 -07:00
/// Performs P2C on inner services to select a ready service.
2020-11-23 23:52:40 -08:00
fn preselect_p2c_index ( & mut self ) -> Option < usize > {
2019-10-10 18:15:24 -07:00
match self . ready_services . len ( ) {
0 = > None ,
1 = > Some ( 0 ) ,
len = > {
let ( a , b ) = {
let idxs = rand ::seq ::index ::sample ( & mut rand ::thread_rng ( ) , len , 2 ) ;
( idxs . index ( 0 ) , idxs . index ( 1 ) )
} ;
2020-11-23 23:52:40 -08:00
let a_load = self . query_load ( a ) ;
let b_load = self . query_load ( b ) ;
2019-10-10 18:15:24 -07:00
let selected = if a_load < = b_load { a } else { b } ;
trace! ( a . idx = a , a . load = ? a_load , b . idx = b , b . load = ? b_load , selected , " selected service by p2c " ) ;
Some ( selected )
}
}
}
/// Accesses a ready endpoint by index and returns its current load.
2020-11-23 23:52:40 -08:00
fn query_load ( & self , index : usize ) -> < D ::Service as Load > ::Metric {
2019-10-10 18:15:24 -07:00
let ( _ , svc ) = self . ready_services . get_index ( index ) . expect ( " invalid index " ) ;
svc . load ( )
}
2020-09-01 14:28:54 -07:00
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
/// Routes a request using P2C load-balancing.
fn route_p2c ( & mut self , req : Request ) -> < Self as tower ::Service < Request > > ::Future {
2020-09-01 14:28:54 -07:00
let index = self
2020-11-23 23:52:40 -08:00
. preselected_p2c_index
2020-09-01 14:28:54 -07:00
. take ( )
. expect ( " ready service must have valid preselected index " ) ;
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
let ( key , mut svc ) = self
. ready_services
2020-09-01 14:28:54 -07:00
. swap_remove_index ( index )
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
. expect ( " preselected index must be valid " ) ;
let fut = svc . call ( req ) ;
self . push_unready ( key , svc ) ;
fut . map_err ( Into ::into ) . boxed ( )
}
/// Tries to route a request to a peer that advertised that inventory,
/// falling back to P2C if there is no ready peer.
fn route_inv (
& mut self ,
req : Request ,
hash : InventoryHash ,
) -> < Self as tower ::Service < Request > > ::Future {
2020-11-23 23:52:40 -08:00
let peer = self
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
. inventory_registry
. peers ( & hash )
2020-11-23 23:52:40 -08:00
. find ( | & key | self . ready_services . contains_key ( key ) )
. cloned ( ) ;
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
2020-11-23 23:52:40 -08:00
match peer . and_then ( | key | self . take_ready_service ( & key ) ) {
Some ( ( key , mut svc ) ) = > {
2020-11-23 23:02:48 -08:00
tracing ::debug! ( ? hash , ? key , " routing based on inventory " ) ;
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
let fut = svc . call ( req ) ;
self . push_unready ( key , svc ) ;
fut . map_err ( Into ::into ) . boxed ( )
}
None = > {
2020-11-23 23:52:40 -08:00
tracing ::debug! ( ? hash , " no ready peer for inventory, falling back to p2c " ) ;
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
self . route_p2c ( req )
}
}
}
// Routes a request to all ready peers, ignoring return values.
fn route_all ( & mut self , req : Request ) -> < Self as tower ::Service < Request > > ::Future {
// This is not needless: otherwise, we'd hold a &mut reference to self.ready_services,
// blocking us from passing &mut self to push_unready.
let ready_services = std ::mem ::take ( & mut self . ready_services ) ;
2020-11-23 23:52:40 -08:00
self . preselected_p2c_index = None ; // All services are now unready.
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
let futs = FuturesUnordered ::new ( ) ;
for ( key , mut svc ) in ready_services {
futs . push ( svc . call ( req . clone ( ) ) . map_err ( | _ | ( ) ) ) ;
self . push_unready ( key , svc ) ;
}
async move {
let results = futs . collect ::< Vec < Result < _ , _ > > > ( ) . await ;
tracing ::debug! (
ok . len = results . iter ( ) . filter ( | r | r . is_ok ( ) ) . count ( ) ,
err . len = results . iter ( ) . filter ( | r | r . is_err ( ) ) . count ( ) ,
) ;
Ok ( Response ::Nil )
}
. boxed ( )
2020-09-01 14:28:54 -07:00
}
2020-11-23 23:52:40 -08:00
2020-11-29 22:41:14 -08:00
fn log_peer_set_size ( & mut self ) {
let ready_services_len = self . ready_services . len ( ) ;
let unready_services_len = self . unready_services . len ( ) ;
trace! ( ready_peers = ? ready_services_len , unready_peers = ? unready_services_len ) ;
if ready_services_len > 0 {
return ;
}
// These logs are designed to be human-readable in a terminal, at the
// default Zebra log level. If you need to know the peer set size for
// every request, use the trace-level logs, or the metrics exporter.
if let Some ( last_peer_log ) = self . last_peer_log {
// Avoid duplicate peer set logs
if Instant ::now ( ) . duration_since ( last_peer_log ) . as_secs ( ) < 60 {
return ;
}
} else {
// Suppress initial logs until the peer set has started up.
// There can be multiple initial requests before the first peer is
// ready.
self . last_peer_log = Some ( Instant ::now ( ) ) ;
return ;
}
self . last_peer_log = Some ( Instant ::now ( ) ) ;
if unready_services_len = = 0 {
warn! ( " network request with no peer connections. Hint: check your network connection. Configure fast and reliable initial peers " ) ;
} else {
info! ( " network request with no ready peers: finding more peers, waiting for {} peers to answer requests. Hint: configure fast and reliable initial peers " ,
unready_services_len ) ;
}
}
2020-11-23 23:52:40 -08:00
fn update_metrics ( & self ) {
let num_ready = self . ready_services . len ( ) ;
let num_unready = self . unready_services . len ( ) ;
let num_peers = num_ready + num_unready ;
metrics ::gauge! ( " pool.num_ready " , num_ready . try_into ( ) . unwrap ( ) ) ;
metrics ::gauge! ( " pool.num_unready " , num_unready . try_into ( ) . unwrap ( ) ) ;
metrics ::gauge! ( " pool.num_peers " , num_peers . try_into ( ) . unwrap ( ) ) ;
}
2019-10-10 18:15:24 -07:00
}
impl < D > Service < Request > for PeerSet < D >
where
2020-09-01 14:28:54 -07:00
D : Discover < Key = SocketAddr > + Unpin ,
2019-10-10 18:15:24 -07:00
D ::Service : Service < Request , Response = Response > + Load ,
2020-09-18 11:20:55 -07:00
D ::Error : Into < BoxError > ,
< D ::Service as Service < Request > > ::Error : Into < BoxError > + 'static ,
2019-10-10 18:15:24 -07:00
< D ::Service as Service < Request > > ::Future : Send + 'static ,
< D ::Service as Load > ::Metric : Debug ,
{
type Response = Response ;
2020-09-18 11:20:55 -07:00
type Error = BoxError ;
2019-10-16 17:06:21 -07:00
type Future =
Pin < Box < dyn Future < Output = Result < Self ::Response , Self ::Error > > + Send + 'static > > ;
2019-10-10 18:15:24 -07:00
fn poll_ready ( & mut self , cx : & mut Context < '_ > ) -> Poll < Result < ( ) , Self ::Error > > {
2020-11-23 23:52:40 -08:00
self . poll_background_errors ( cx ) ? ;
2019-10-10 18:15:24 -07:00
// Process peer discovery updates.
let _ = self . poll_discover ( cx ) ? ;
2020-09-01 14:28:54 -07:00
self . inventory_registry . poll_inventory ( cx ) ? ;
2019-10-10 18:15:24 -07:00
self . poll_unready ( cx ) ;
2020-11-23 23:52:40 -08:00
2020-11-29 22:41:14 -08:00
self . log_peer_set_size ( ) ;
2020-11-23 23:52:40 -08:00
self . update_metrics ( ) ;
2019-10-10 18:15:24 -07:00
loop {
// Re-check that the pre-selected service is ready, in case
// something has happened since (e.g., it failed, peer closed
// connection, ...)
2020-11-23 23:52:40 -08:00
if let Some ( index ) = self . preselected_p2c_index {
2019-10-10 18:15:24 -07:00
let ( key , service ) = self
. ready_services
. get_index_mut ( index )
. expect ( " preselected index must be valid " ) ;
trace! ( preselected_index = index , ? key ) ;
match service . poll_ready ( cx ) {
Poll ::Ready ( Ok ( ( ) ) ) = > return Poll ::Ready ( Ok ( ( ) ) ) ,
Poll ::Pending = > {
trace! ( " preselected service is no longer ready " ) ;
let ( key , service ) = self
. ready_services
. swap_remove_index ( index )
. expect ( " preselected index must be valid " ) ;
self . push_unready ( key , service ) ;
}
Poll ::Ready ( Err ( e ) ) = > {
let error = e . into ( ) ;
trace! ( % error , " preselected service failed, dropping it " ) ;
self . ready_services
. swap_remove_index ( index )
. expect ( " preselected index must be valid " ) ;
}
}
}
trace! ( " preselected service was not ready, reselecting " ) ;
2020-11-23 23:52:40 -08:00
self . preselected_p2c_index = self . preselect_p2c_index ( ) ;
2019-10-10 18:15:24 -07:00
2020-11-23 23:52:40 -08:00
if self . preselected_p2c_index . is_none ( ) {
2019-10-21 15:24:17 -07:00
trace! ( " no ready services, sending demand signal " ) ;
let _ = self . demand_signal . try_send ( ( ) ) ;
2019-10-10 18:15:24 -07:00
return Poll ::Pending ;
}
}
}
fn call ( & mut self , req : Request ) -> Self ::Future {
network: implement transaction request handling. (#1016)
This commit makes several related changes to the network code:
- adds a `TransactionsByHash(HashSet<transaction::Hash>)` request and
`Transactions(Vec<Arc<Transaction>>)` response pair that allows
fetching transactions from a remote peer;
- adds a `PushTransaction(Arc<Transaction>)` request that pushes an
unsolicited transaction to a remote peer;
- adds an `AdvertiseTransactions(HashSet<transaction::Hash>)` request
that advertises transactions by hash to a remote peer;
- adds an `AdvertiseBlock(block::Hash)` request that advertises a block
by hash to a remote peer;
Then, it modifies the connection state machine so that outbound
requests to remote peers are handled properly:
- `TransactionsByHash` generates a `getdata` message and collects the
results, like the existing `BlocksByHash` request.
- `PushTransaction` generates a `tx` message, and returns `Nil` immediately.
- `AdvertiseTransactions` and `AdvertiseBlock` generate an `inv`
message, and return `Nil` immediately.
Next, it modifies the connection state machine so that messages
from remote peers generate requests to the inbound service:
- `getdata` messages generate `BlocksByHash` or `TransactionsByHash`
requests, depending on the content of the message;
- `tx` messages generate `PushTransaction` requests;
- `inv` messages generate `AdvertiseBlock` or `AdvertiseTransactions`
requests.
Finally, it refactors the request routing logic for the peer set to
handle advertisement messages, providing three routing methods:
- `route_p2c`, which uses p2c as normal (default);
- `route_inv`, which uses the inventory registry and falls back to p2c
(used for `BlocksByHash` or `TransactionsByHash`);
- `route_all`, which broadcasts a request to all ready peers (used for
`AdvertiseBlock` and `AdvertiseTransactions`).
2020-09-08 10:16:29 -07:00
match req {
// Only do inventory-aware routing on individual items.
Request ::BlocksByHash ( ref hashes ) if hashes . len ( ) = = 1 = > {
let hash = InventoryHash ::from ( * hashes . iter ( ) . next ( ) . unwrap ( ) ) ;
self . route_inv ( req , hash )
}
Request ::TransactionsByHash ( ref hashes ) if hashes . len ( ) = = 1 = > {
let hash = InventoryHash ::from ( * hashes . iter ( ) . next ( ) . unwrap ( ) ) ;
self . route_inv ( req , hash )
}
Request ::AdvertiseTransactions ( _ ) = > self . route_all ( req ) ,
Request ::AdvertiseBlock ( _ ) = > self . route_all ( req ) ,
_ = > self . route_p2c ( req ) ,
}
2019-10-10 18:15:24 -07:00
}
}