doc(net): Explain how we prioritise peer messages, and why it is secure (#6488)

* Explain why we want the current peer message priority

* Fix missing space

* Explain cancellation better
This commit is contained in:
teor 2023-04-13 14:58:03 +10:00 committed by GitHub
parent 541981e62a
commit e678423fd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 48 additions and 17 deletions

View File

@ -595,22 +595,45 @@ where
match self.state {
State::AwaitingRequest => {
trace!("awaiting client request or peer message");
// CORRECTNESS
// # Correctness
//
// Currently, select prefers the first future if multiple
// futures are ready.
// Currently, select prefers the first future if multiple futures are ready.
// We use this behaviour to prioritise messages on each individual peer
// connection in this order:
// - incoming messages from the remote peer, then
// - outgoing messages to the remote peer.
//
// The peer can starve client requests if it sends an
// uninterrupted series of messages. But this is unlikely in
// practice, due to network delays.
// This improves the performance of peer responses to Zebra requests, and new
// peer requests to Zebra's inbound service.
//
// If both futures are ready, there's no particular reason
// to prefer one over the other.
// `futures::StreamExt::next()` is cancel-safe:
// <https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety>
// This means that messages from the future that isn't selected stay in the stream,
// and they will be returned next time the future is checked.
//
// TODO: use `futures::select!`, which chooses a ready future
// at random, avoiding starvation
// (To use `select!`, we'll need to map the different
// results to a new enum types.)
// If an inbound peer message arrives at a ready peer that also has a pending
// request from Zebra, we want to process the peer's message first.
// If we process the Zebra request first:
// - we could misinterpret the inbound peer message as a response to the Zebra
// request, or
// - if the peer message is a request to Zebra, and we put the peer in the
// AwaitingResponse state, then we'll correctly ignore the simultaneous Zebra
// request. (Zebra services make multiple requests or retry, so this is ok.)
//
// # Security
//
// If a peer sends an uninterrupted series of messages, it will delay any new
// requests from Zebra to that individual peer. This is behaviour we want,
// because:
// - any responses to Zebra's requests to that peer would be slow or timeout,
// - the peer will eventually fail a Zebra keepalive check and get disconnected,
// - if there are too many inbound messages overall, the inbound service will
// return an overload error and the peer will be disconnected.
//
// Messages to other peers will continue to be processed concurrently. Some
// Zebra services might be temporarily delayed until the peer times out, if a
// request to that peer is sent by the service, and the service blocks until
// the request completes (or times out).
match future::select(peer_rx.next(), self.client_rx.next()).await {
Either::Left((None, _)) => {
self.fail_with(PeerError::ConnectionClosed);
@ -701,13 +724,21 @@ where
.as_mut()
.expect("timeout must be set while awaiting response");
// CORRECTNESS
// # Security
//
// Currently, select prefers the first future if multiple
// futures are ready.
// select() prefers the first future if multiple futures are ready.
//
// If multiple futures are ready, we want the cancellation
// to take priority, then the timeout, then peer responses.
// If multiple futures are ready, we want the priority for each individual
// connection to be:
// - cancellation, then
// - timeout, then
// - peer responses.
//
// (Messages to other peers are processed concurrently.)
//
// This makes sure a peer can't block disconnection or timeouts by sending too
// many messages. It also avoids doing work to process messages after a
// connection has failed.
let cancel = future::select(tx.cancellation(), timer_ref);
match future::select(cancel, peer_rx.next())
.instrument(span.clone())