cleanup(consensus): Avoid blocking threads by awaiting proof verification results from rayon in async context (#6887)

* Replaces rayon::iter::once with spawn_fifo

* Removes spawn_blocking in flush_spawning methods

* Logs warning and returns error for RecvErrors

* Uses BoxError in proof verifiers

* Adds async spawn_fifo fns

* Updates verify_single_spawning and flush_spawning methods to use new async spawn_fifo fns

* Removes outdated TODOs and docs.

* removes outdated TODO

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
Arya 2023-07-12 17:30:07 -04:00 committed by GitHub
parent f9a48266ad
commit 797df674cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 135 additions and 166 deletions

View File

@ -1,5 +1,9 @@
//! Asynchronous verification of cryptographic primitives.
use tokio::sync::oneshot::error::RecvError;
use crate::BoxError;
pub mod ed25519;
pub mod groth16;
pub mod halo2;
@ -11,3 +15,37 @@ const MAX_BATCH_SIZE: usize = 64;
/// The maximum latency bound for any of the batch verifiers.
const MAX_BATCH_LATENCY: std::time::Duration = std::time::Duration::from_millis(100);
/// Fires off a task into the Rayon threadpool, awaits the result through a oneshot channel,
/// then converts the error to a [`BoxError`].
pub async fn spawn_fifo_and_convert<
E: 'static + std::error::Error + Into<BoxError> + Sync + Send,
F: 'static + FnOnce() -> Result<(), E> + Send,
>(
f: F,
) -> Result<(), BoxError> {
spawn_fifo(f)
.await
.map_err(|_| {
"threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?"
})?
.map_err(BoxError::from)
}
/// Fires off a task into the Rayon threadpool and awaits the result through a oneshot channel.
pub async fn spawn_fifo<
E: 'static + std::error::Error + Sync + Send,
F: 'static + FnOnce() -> Result<(), E> + Send,
>(
f: F,
) -> Result<Result<(), E>, RecvError> {
// Rayon doesn't have a spawn function that returns a value,
// so we use a oneshot channel instead.
let (rsp_tx, rsp_rx) = tokio::sync::oneshot::channel();
rayon::spawn_fifo(move || {
let _ = rsp_tx.send(f());
});
rsp_rx.await
}

View File

@ -11,13 +11,16 @@ use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use rayon::prelude::*;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch_control::{Batch, BatchControl};
use tower_fallback::Fallback;
use zebra_chain::primitives::ed25519::{batch, *};
use crate::BoxError;
use super::{spawn_fifo, spawn_fifo_and_convert};
#[cfg(test)]
mod tests;
@ -43,7 +46,10 @@ pub type Item = batch::Item;
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
@ -120,43 +126,22 @@ impl Verifier {
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future<Output = ()> {
async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(|| {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx)))
})
.map(|join_result| join_result.expect("panic in ed25519 batch verifier"))
let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(item: Item) -> impl Future<Output = VerifyResult> {
async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(|| {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(|item| item.verify_single())
.collect()
})
.map(|join_result| join_result.expect("panic in ed25519 fallback verifier"))
spawn_fifo_and_convert(move || item.verify_single()).await
}
}
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -174,7 +159,8 @@ impl Service<BatchControl<Item>> for Verifier {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx.borrow().expect("completed batch must send a value");
let result = rx.borrow()
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
if result.is_ok() {
tracing::trace!(?result, "validated ed25519 signature");
@ -183,7 +169,7 @@ impl Service<BatchControl<Item>> for Verifier {
tracing::trace!(?result, "invalid ed25519 signature");
metrics::counter!("signatures.ed25519.invalid", 1);
}
result
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"),
}

View File

@ -18,7 +18,6 @@ use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use rayon::prelude::*;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
@ -34,6 +33,10 @@ use zebra_chain::{
sprout::{JoinSplit, Nullifier, RandomSeed},
};
use crate::BoxError;
use super::{spawn_fifo, spawn_fifo_and_convert};
mod params;
#[cfg(test)]
mod tests;
@ -74,7 +77,10 @@ pub type ItemVerifyingKey = PreparedVerifyingKey<Bls12>;
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static SPEND_VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
@ -113,7 +119,10 @@ pub static SPEND_VERIFIER: Lazy<
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static OUTPUT_VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
@ -417,43 +426,22 @@ impl Verifier {
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(
batch: BatchVerifier,
vk: &'static BatchVerifyingKey,
tx: Sender,
) -> impl Future<Output = ()> {
async fn flush_spawning(batch: BatchVerifier, vk: &'static BatchVerifyingKey, tx: Sender) {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(move || {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(move |s| s.spawn_fifo(move |_s| Self::verify(batch, vk, tx)))
})
.map(|join_result| join_result.expect("panic in groth16 batch verifier"))
let _ = tx.send(
spawn_fifo(move || batch.verify(thread_rng(), vk))
.await
.ok(),
);
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(
async fn verify_single_spawning(
item: Item,
pvk: &'static ItemVerifyingKey,
) -> impl Future<Output = VerifyResult> {
) -> Result<(), BoxError> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(move || {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(move |item| item.verify_single(pvk))
.collect()
})
.map(|join_result| join_result.expect("panic in groth16 fallback verifier"))
spawn_fifo_and_convert(move || item.verify_single(pvk)).await
}
}
@ -470,8 +458,8 @@ impl fmt::Debug for Verifier {
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = VerificationError;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -492,7 +480,7 @@ impl Service<BatchControl<Item>> for Verifier {
let result = rx
.borrow()
.as_ref()
.expect("completed batch must send a value")
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?
.clone();
if result.is_ok() {
@ -503,7 +491,7 @@ impl Service<BatchControl<Item>> for Verifier {
metrics::counter!("proofs.groth16.invalid", 1);
}
result
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}

View File

@ -13,13 +13,16 @@ use once_cell::sync::Lazy;
use orchard::circuit::VerifyingKey;
use rand::{thread_rng, CryptoRng, RngCore};
use rayon::prelude::*;
use thiserror::Error;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch_control::{Batch, BatchControl};
use tower_fallback::Fallback;
use crate::BoxError;
use super::{spawn_fifo, spawn_fifo_and_convert};
#[cfg(test)]
mod tests;
@ -199,7 +202,10 @@ impl From<halo2::plonk::Error> for Halo2Error {
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
@ -284,43 +290,22 @@ impl Verifier {
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(
batch: BatchVerifier,
vk: &'static BatchVerifyingKey,
tx: Sender,
) -> impl Future<Output = ()> {
async fn flush_spawning(batch: BatchVerifier, vk: &'static BatchVerifyingKey, tx: Sender) {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(move || {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(move |s| s.spawn_fifo(move |_s| Self::verify(batch, vk, tx)))
})
.map(|join_result| join_result.expect("panic in halo2 batch verifier"))
let _ = tx.send(
spawn_fifo(move || batch.verify(thread_rng(), vk).map_err(Halo2Error::from))
.await
.ok(),
);
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(
async fn verify_single_spawning(
item: Item,
pvk: &'static ItemVerifyingKey,
) -> impl Future<Output = VerifyResult> {
) -> Result<(), BoxError> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(move || {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(move |item| item.verify_single(pvk).map_err(Halo2Error::from))
.collect()
})
.map(|join_result| join_result.expect("panic in halo2 fallback verifier"))
spawn_fifo_and_convert(move || item.verify_single(pvk).map_err(Halo2Error::from)).await
}
}
@ -337,8 +322,8 @@ impl fmt::Debug for Verifier {
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Halo2Error;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -358,7 +343,7 @@ impl Service<BatchControl<Item>> for Verifier {
let result = rx
.borrow()
.as_ref()
.expect("completed batch must send a value")
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?
.clone();
if result.is_ok() {
@ -369,7 +354,7 @@ impl Service<BatchControl<Item>> for Verifier {
metrics::counter!("proofs.halo2.invalid", 1);
}
result
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}

View File

@ -11,7 +11,6 @@ use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use rayon::prelude::*;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch_control::{Batch, BatchControl};
@ -19,6 +18,10 @@ use tower_fallback::Fallback;
use zebra_chain::primitives::redjubjub::{batch, *};
use crate::BoxError;
use super::{spawn_fifo, spawn_fifo_and_convert};
#[cfg(test)]
mod tests;
@ -44,7 +47,10 @@ pub type Item = batch::Item;
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
@ -121,43 +127,22 @@ impl Verifier {
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future<Output = ()> {
async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(|| {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx)))
})
.map(|join_result| join_result.expect("panic in redjubjub batch verifier"))
let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(item: Item) -> impl Future<Output = VerifyResult> {
async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(|| {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(|item| item.verify_single())
.collect()
})
.map(|join_result| join_result.expect("panic in redjubjub fallback verifier"))
spawn_fifo_and_convert(move || item.verify_single()).await
}
}
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -175,7 +160,8 @@ impl Service<BatchControl<Item>> for Verifier {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx.borrow().expect("completed batch must send a value");
let result = rx.borrow()
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
if result.is_ok() {
tracing::trace!(?result, "validated redjubjub signature");
@ -185,7 +171,7 @@ impl Service<BatchControl<Item>> for Verifier {
metrics::counter!("signatures.redjubjub.invalid", 1);
}
result
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}

View File

@ -11,7 +11,6 @@ use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use rayon::prelude::*;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch_control::{Batch, BatchControl};
@ -19,6 +18,10 @@ use tower_fallback::Fallback;
use zebra_chain::primitives::reddsa::{batch, orchard, Error};
use crate::BoxError;
use super::{spawn_fifo, spawn_fifo_and_convert};
#[cfg(test)]
mod tests;
@ -44,7 +47,10 @@ pub type Item = batch::Item<orchard::SpendAuth, orchard::Binding>;
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
@ -121,43 +127,22 @@ impl Verifier {
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future<Output = ()> {
async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(|| {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx)))
})
.map(|join_result| join_result.expect("panic in ed25519 batch verifier"))
let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(item: Item) -> impl Future<Output = VerifyResult> {
async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(|| {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(|item| item.verify_single())
.collect()
})
.map(|join_result| join_result.expect("panic in redpallas fallback verifier"))
spawn_fifo_and_convert(move || item.verify_single()).await
}
}
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -174,7 +159,8 @@ impl Service<BatchControl<Item>> for Verifier {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx.borrow().expect("completed batch must send a value");
let result = rx.borrow()
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
if result.is_ok() {
tracing::trace!(?result, "validated redpallas signature");
@ -184,7 +170,7 @@ impl Service<BatchControl<Item>> for Verifier {
metrics::counter!("signatures.redpallas.invalid", 1);
}
result
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}