feat(verify): Concurrently verify proof and signature batches (#4776)

* Initialize the rayon threadpool with a new config for CPU-bound threads

* Verify proofs and signatures on the rayon thread pool

* Only spawn one concurrent batch per verifier, for now

* Allow tower-batch to queue multiple batches

* Fix up a potentially incorrect comment

* Rename some variables for concurrent batches

* Spawn multiple batches concurrently, without any limits

* Simplify batch worker loop using OptionFuture

* Clear pending batches once they finish

* Stop accepting new items when we're at the concurrent batch limit

* Fail queued requests on drop

* Move pending_items and the batch timer into the worker struct

* Add worker fields to batch trace logs

* Run docker tests on PR series

* During full verification, process 20 blocks concurrently

* Remove an outdated comment about yielding to other tasks
This commit is contained in:
teor 2022-07-18 08:43:29 +10:00 committed by GitHub
parent 9b9cd55097
commit cf4b2f7a67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 539 additions and 259 deletions

View File

@ -7,8 +7,6 @@ name: CI Docker
on:
pull_request:
branches:
- main
jobs:
regenerate-stateful-disks:

View File

@ -4,8 +4,6 @@ name: CI Docker
# so they can be skipped when the modified files make the actual workflow run.
on:
pull_request:
branches:
- main
paths-ignore:
# code and tests
- '**/*.rs'

View File

@ -23,8 +23,6 @@ on:
required: true
pull_request:
branches:
- main
paths:
# code and tests
- '**/*.rs'

12
Cargo.lock generated
View File

@ -3792,9 +3792,9 @@ dependencies = [
[[package]]
name = "rayon"
version = "1.5.1"
version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90"
checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d"
dependencies = [
"autocfg 1.1.0",
"crossbeam-deque",
@ -3804,14 +3804,13 @@ dependencies = [
[[package]]
name = "rayon-core"
version = "1.9.1"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e"
checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"lazy_static",
"num_cpus",
]
@ -5447,6 +5446,7 @@ dependencies = [
"futures-core",
"pin-project 1.0.11",
"rand 0.8.5",
"rayon",
"tokio",
"tokio-test",
"tokio-util 0.7.3",
@ -6368,6 +6368,7 @@ dependencies = [
"proptest-derive",
"rand 0.7.3",
"rand 0.8.5",
"rayon",
"serde",
"spandoc",
"thiserror",
@ -6576,6 +6577,7 @@ dependencies = [
"proptest-derive",
"prost",
"rand 0.8.5",
"rayon",
"regex",
"reqwest",
"semver 1.0.11",

View File

@ -9,6 +9,7 @@ edition = "2021"
futures = "0.3.21"
futures-core = "0.3.21"
pin-project = "1.0.10"
rayon = "1.5.3"
tokio = { version = "1.19.2", features = ["time", "sync", "tracing", "macros"] }
tokio-util = "0.7.3"
tower = { version = "0.4.13", features = ["util", "buffer"] }
@ -24,7 +25,6 @@ tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] }
tokio-test = "0.4.2"
tower-fallback = { path = "../tower-fallback/" }
tower-test = "0.4.0"
tracing = "0.1.31"
zebra-consensus = { path = "../zebra-consensus/" }
zebra-test = { path = "../zebra-test/" }

View File

@ -1,8 +1,12 @@
use super::{service::Batch, BatchControl};
//! Tower service layer for batch processing.
use std::{fmt, marker::PhantomData};
use tower::layer::Layer;
use tower::Service;
use super::{service::Batch, BatchControl};
/// Adds a layer performing batch processing of requests.
///
/// The default Tokio executor is used to run the given service,
@ -10,24 +14,31 @@ use tower::Service;
///
/// See the module documentation for more details.
pub struct BatchLayer<Request> {
max_items: usize,
max_items_in_batch: usize,
max_batches: Option<usize>,
max_latency: std::time::Duration,
_p: PhantomData<fn(Request)>,
// TODO: is the variance correct here?
// https://doc.rust-lang.org/1.33.0/nomicon/subtyping.html#variance
// https://doc.rust-lang.org/nomicon/phantom-data.html#table-of-phantomdata-patterns
_handles_requests: PhantomData<fn(Request)>,
}
impl<Request> BatchLayer<Request> {
/// Creates a new `BatchLayer`.
///
/// The wrapper is responsible for telling the inner service when to flush a
/// batch of requests. Two parameters control this policy:
///
/// * `max_items` gives the maximum number of items per batch.
/// * `max_latency` gives the maximum latency for a batch item.
pub fn new(max_items: usize, max_latency: std::time::Duration) -> Self {
/// batch of requests. See [`Batch::new()`] for details.
pub fn new(
max_items_in_batch: usize,
max_batches: impl Into<Option<usize>>,
max_latency: std::time::Duration,
) -> Self {
BatchLayer {
max_items,
max_items_in_batch,
max_batches: max_batches.into(),
max_latency,
_p: PhantomData,
_handles_requests: PhantomData,
}
}
}
@ -36,20 +47,27 @@ impl<S, Request> Layer<S> for BatchLayer<Request>
where
S: Service<BatchControl<Request>> + Send + 'static,
S::Future: Send,
S::Response: Send,
S::Error: Into<crate::BoxError> + Send + Sync,
Request: Send + 'static,
{
type Service = Batch<S, Request>;
fn layer(&self, service: S) -> Self::Service {
Batch::new(service, self.max_items, self.max_latency)
Batch::new(
service,
self.max_items_in_batch,
self.max_batches,
self.max_latency,
)
}
}
impl<Request> fmt::Debug for BatchLayer<Request> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BufferLayer")
.field("max_items", &self.max_items)
.field("max_items_in_batch", &self.max_items_in_batch)
.field("max_batches", &self.max_batches)
.field("max_latency", &self.max_latency)
.finish()
}

View File

@ -1,6 +1,7 @@
//! Wrapper service for batching items to an underlying service.
use std::{
cmp::max,
fmt,
future::Future,
pin::Pin,
@ -25,6 +26,11 @@ use super::{
BatchControl,
};
/// The maximum number of batches in the queue.
///
/// This avoids having very large queues on machines with hundreds or thousands of cores.
pub const QUEUE_BATCH_LIMIT: usize = 64;
/// Allows batch processing of requests.
///
/// See the crate documentation for more details.
@ -32,6 +38,8 @@ pub struct Batch<T, Request>
where
T: Service<BatchControl<Request>>,
{
// Batch management
//
/// A custom-bounded channel for sending requests to the batch worker.
///
/// Note: this actually _is_ bounded, but rather than using Tokio's unbounded
@ -53,6 +61,8 @@ where
/// A semaphore permit that allows this service to send one message on `tx`.
permit: Option<OwnedSemaphorePermit>,
// Errors
//
/// An error handle shared between all service clones for the same worker.
error_handle: ErrorHandle,
@ -71,6 +81,7 @@ where
f.debug_struct(name)
.field("tx", &self.tx)
.field("semaphore", &self.semaphore)
.field("permit", &self.permit)
.field("error_handle", &self.error_handle)
.field("worker_handle", &self.worker_handle)
.finish()
@ -80,26 +91,37 @@ where
impl<T, Request> Batch<T, Request>
where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
T::Error: Into<crate::BoxError>,
{
/// Creates a new `Batch` wrapping `service`.
///
/// The wrapper is responsible for telling the inner service when to flush a
/// batch of requests. Two parameters control this policy:
/// batch of requests. These parameters control this policy:
///
/// * `max_items` gives the maximum number of items per batch.
/// * `max_latency` gives the maximum latency for a batch item.
/// * `max_items_in_batch` gives the maximum number of items per batch.
/// * `max_batches` is an upper bound on the number of batches in the queue,
/// and the number of concurrently executing batches.
/// If this is `None`, we use the current number of [`rayon`] threads.
/// The number of batches in the queue is also limited by [`QUEUE_BATCH_LIMIT`].
/// * `max_latency` gives the maximum latency for a batch item to start verifying.
///
/// The default Tokio executor is used to run the given service, which means
/// that this method must be called while on the Tokio runtime.
pub fn new(service: T, max_items: usize, max_latency: std::time::Duration) -> Self
pub fn new(
service: T,
max_items_in_batch: usize,
max_batches: impl Into<Option<usize>>,
max_latency: std::time::Duration,
) -> Self
where
T: Send + 'static,
T::Future: Send,
T::Response: Send,
T::Error: Send + Sync,
Request: Send + 'static,
{
let (mut batch, worker) = Self::pair(service, max_items, max_latency);
let (mut batch, worker) = Self::pair(service, max_items_in_batch, max_batches, max_latency);
let span = info_span!("batch worker", kind = std::any::type_name::<T>());
@ -131,7 +153,8 @@ where
/// `Batch` and the background `Worker` that you can then spawn.
pub fn pair(
service: T,
max_items: usize,
max_items_in_batch: usize,
max_batches: impl Into<Option<usize>>,
max_latency: std::time::Duration,
) -> (Self, Worker<T, Request>)
where
@ -141,16 +164,32 @@ where
{
let (tx, rx) = mpsc::unbounded_channel();
// Clamp config to sensible values.
let max_items_in_batch = max(max_items_in_batch, 1);
let max_batches = max_batches
.into()
.unwrap_or_else(rayon::current_num_threads);
let max_batches_in_queue = max_batches.clamp(1, QUEUE_BATCH_LIMIT);
// The semaphore bound limits the maximum number of concurrent requests
// (specifically, requests which got a `Ready` from `poll_ready`, but haven't
// used their semaphore reservation in a `call` yet).
// We choose a bound that allows callers to check readiness for every item in
// a batch, then actually submit those items.
let semaphore = Semaphore::new(max_items);
//
// We choose a bound that allows callers to check readiness for one batch per rayon CPU thread.
// This helps keep all CPUs filled with work: there is one batch executing, and another ready to go.
// Often there is only one verifier running, when that happens we want it to take all the cores.
let semaphore = Semaphore::new(max_items_in_batch * max_batches_in_queue);
let semaphore = PollSemaphore::new(Arc::new(semaphore));
let (error_handle, worker) =
Worker::new(service, rx, max_items, max_latency, semaphore.clone());
let (error_handle, worker) = Worker::new(
service,
rx,
max_items_in_batch,
max_batches,
max_latency,
semaphore.clone(),
);
let batch = Batch {
tx,
semaphore,
@ -182,6 +221,7 @@ where
impl<T, Request> Service<Request> for Batch<T, Request>
where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
T::Error: Into<crate::BoxError>,
{
type Response = T::Response;

View File

@ -5,7 +5,11 @@ use std::{
sync::{Arc, Mutex},
};
use futures::future::TryFutureExt;
use futures::{
future::{BoxFuture, OptionFuture},
stream::FuturesUnordered,
FutureExt, StreamExt,
};
use pin_project::pin_project;
use tokio::{
sync::mpsc,
@ -33,28 +37,52 @@ use super::{
pub struct Worker<T, Request>
where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
T::Error: Into<crate::BoxError>,
{
// Batch management
//
/// A semaphore-bounded channel for receiving requests from the batch wrapper service.
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
/// The wrapped service that processes batches.
service: T,
/// The number of pending items sent to `service`, since the last batch flush.
pending_items: usize,
/// The timer for the pending batch, if it has any items.
///
/// The timer is started when the first entry of a new batch is
/// submitted, so that the batch latency of all entries is at most
/// self.max_latency. However, we don't keep the timer running unless
/// there is a pending request to prevent wakeups on idle services.
pending_batch_timer: Option<Pin<Box<Sleep>>>,
/// The batches that the worker is concurrently executing.
concurrent_batches: FuturesUnordered<BoxFuture<'static, Result<T::Response, T::Error>>>,
// Errors and termination
//
/// An error that's populated on permanent service failure.
failed: Option<ServiceError>,
/// A shared error handle that's populated on permanent service failure.
error_handle: ErrorHandle,
/// The maximum number of items allowed in a batch.
max_items: usize,
/// The maximum delay before processing a batch with fewer than `max_items`.
max_latency: std::time::Duration,
/// A cloned copy of the wrapper service's semaphore, used to close the semaphore.
close: PollSemaphore,
// Config
//
/// The maximum number of items allowed in a batch.
max_items_in_batch: usize,
/// The maximum number of batches that are allowed to run concurrently.
max_concurrent_batches: usize,
/// The maximum delay before processing a batch with fewer than `max_items_in_batch`.
max_latency: std::time::Duration,
}
/// Get the error out
@ -66,12 +94,17 @@ pub(crate) struct ErrorHandle {
impl<T, Request> Worker<T, Request>
where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
T::Error: Into<crate::BoxError>,
{
/// Creates a new batch worker.
///
/// See [`Service::new()`](crate::Service::new) for details.
pub(crate) fn new(
service: T,
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
max_items: usize,
max_items_in_batch: usize,
max_concurrent_batches: usize,
max_latency: std::time::Duration,
close: PollSemaphore,
) -> (ErrorHandle, Worker<T, Request>) {
@ -82,134 +115,184 @@ where
let worker = Worker {
rx,
service,
error_handle: error_handle.clone(),
pending_items: 0,
pending_batch_timer: None,
concurrent_batches: FuturesUnordered::new(),
failed: None,
max_items,
max_latency,
error_handle: error_handle.clone(),
close,
max_items_in_batch,
max_concurrent_batches,
max_latency,
};
(error_handle, worker)
}
/// Process a single worker request.
async fn process_req(&mut self, req: Request, tx: message::Tx<T::Future>) {
if let Some(ref failed) = self.failed {
tracing::trace!("notifying caller about worker failure");
let _ = tx.send(Err(failed.clone()));
} else {
match self.service.ready().await {
Ok(svc) => {
let rsp = svc.call(req.into());
let _ = tx.send(Ok(rsp));
}
Err(e) => {
self.failed(e.into());
let _ = tx.send(Err(self
.failed
.as_ref()
.expect("Worker::failed did not set self.failed?")
.clone()));
if let Some(ref error) = self.failed {
tracing::trace!(
?error,
"notifying batch request caller about worker failure",
);
let _ = tx.send(Err(error.clone()));
return;
}
// Wake any tasks waiting on channel capacity.
tracing::debug!("waking pending tasks");
self.close.close();
}
match self.service.ready().await {
Ok(svc) => {
let rsp = svc.call(req.into());
let _ = tx.send(Ok(rsp));
self.pending_items += 1;
}
Err(e) => {
self.failed(e.into());
let _ = tx.send(Err(self
.failed
.as_ref()
.expect("Worker::failed did not set self.failed?")
.clone()));
}
}
}
/// Tell the inner service to flush the current batch.
///
/// Waits until the inner service is ready,
/// then stores a future which resolves when the batch finishes.
async fn flush_service(&mut self) {
if let Err(e) = self
.service
.ready()
.and_then(|svc| svc.call(BatchControl::Flush))
.await
{
self.failed(e.into());
if self.failed.is_some() {
tracing::trace!("worker failure: skipping flush");
return;
}
// Correctness: allow other tasks to run at the end of every batch.
tokio::task::yield_now().await;
match self.service.ready().await {
Ok(ready_service) => {
let flush_future = ready_service.call(BatchControl::Flush);
self.concurrent_batches.push(flush_future.boxed());
// Now we have an empty batch.
self.pending_items = 0;
self.pending_batch_timer = None;
}
Err(error) => {
self.failed(error.into());
}
}
}
/// Is the current number of concurrent batches above the configured limit?
fn can_spawn_new_batches(&self) -> bool {
self.concurrent_batches.len() < self.max_concurrent_batches
}
/// Run loop for batch requests, which implements the batch policies.
///
/// See [`Service::new()`](crate::Service::new) for details.
pub async fn run(mut self) {
// The timer is started when the first entry of a new batch is
// submitted, so that the batch latency of all entries is at most
// self.max_latency. However, we don't keep the timer running unless
// there is a pending request to prevent wakeups on idle services.
let mut timer: Option<Pin<Box<Sleep>>> = None;
let mut pending_items = 0usize;
loop {
match timer.as_mut() {
None => match self.rx.recv().await {
// The first message in a new batch.
// Wait on either a new message or the batch timer.
//
// If both are ready, end the batch now, because the timer has elapsed.
// If the timer elapses, any pending messages are preserved:
// https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
tokio::select! {
biased;
batch_result = self.concurrent_batches.next(), if !self.concurrent_batches.is_empty() => match batch_result.expect("only returns None when empty") {
Ok(_response) => {
tracing::trace!(
pending_items = self.pending_items,
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
running_batches = self.concurrent_batches.len(),
"batch finished executing",
);
}
Err(error) => {
let error = error.into();
tracing::trace!(?error, "batch execution failed");
self.failed(error);
}
},
Some(()) = OptionFuture::from(self.pending_batch_timer.as_mut()), if self.pending_batch_timer.as_ref().is_some() => {
tracing::trace!(
pending_items = self.pending_items,
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
running_batches = self.concurrent_batches.len(),
"batch timer expired",
);
// TODO: use a batch-specific span to instrument this future.
self.flush_service().await;
},
maybe_msg = self.rx.recv(), if self.can_spawn_new_batches() => match maybe_msg {
Some(msg) => {
tracing::trace!(
pending_items = self.pending_items,
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
running_batches = self.concurrent_batches.len(),
"batch message received",
);
let span = msg.span;
self.process_req(msg.request, msg.tx)
// Apply the provided span to request processing
// Apply the provided span to request processing.
.instrument(span)
.await;
timer = Some(Box::pin(sleep(self.max_latency)));
pending_items = 1;
}
// No more messages, ever.
None => return,
},
Some(sleep) => {
// Wait on either a new message or the batch timer.
//
// If both are ready, end the batch now, because the timer has elapsed.
// If the timer elapses, any pending messages are preserved:
// https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
tokio::select! {
biased;
// The batch timer elapsed.
() = sleep => {
// Check whether we have too many pending items.
if self.pending_items >= self.max_items_in_batch {
tracing::trace!(
pending_items = self.pending_items,
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
running_batches = self.concurrent_batches.len(),
"batch is full",
);
// TODO: use a batch-specific span to instrument this future.
self.flush_service().await;
} else if self.pending_items == 1 {
tracing::trace!(
pending_items = self.pending_items,
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
running_batches = self.concurrent_batches.len(),
"batch is new, starting timer",
);
// Now we have an empty batch.
timer = None;
pending_items = 0;
// The first message in a new batch.
self.pending_batch_timer = Some(Box::pin(sleep(self.max_latency)));
} else {
tracing::trace!(
pending_items = self.pending_items,
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
running_batches = self.concurrent_batches.len(),
"waiting for full batch or batch timer",
);
}
maybe_msg = self.rx.recv() => match maybe_msg {
Some(msg) => {
let span = msg.span;
self.process_req(msg.request, msg.tx)
// Apply the provided span to request processing.
.instrument(span)
.await;
pending_items += 1;
// Check whether we have too many pending items.
if pending_items >= self.max_items {
// TODO: use a batch-specific span to instrument this future.
self.flush_service().await;
// Now we have an empty batch.
timer = None;
pending_items = 0;
} else {
// The timer is still running.
}
}
None => {
// No more messages, ever.
return;
}
},
}
}
None => {
tracing::trace!("batch channel closed and emptied, exiting worker task");
return;
}
},
}
}
}
/// Register an inner service failure.
///
/// The underlying service failed when we called `poll_ready` on it with the given `error`. We
/// need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in
/// an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent
/// requests will also fail with the same error.
fn failed(&mut self, error: crate::BoxError) {
// The underlying service failed when we called `poll_ready` on it with the given `error`. We
// need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in
// an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent
// requests will also fail with the same error.
tracing::debug!(?error, "batch worker error");
// Note that we need to handle the case where some error_handle is concurrently trying to send us
// a request. We need to make sure that *either* the send of the request fails *or* it
@ -222,15 +305,23 @@ where
let mut inner = self.error_handle.inner.lock().unwrap();
// Ignore duplicate failures
if inner.is_some() {
// Future::poll was called after we've already errored out!
return;
}
*inner = Some(error.clone());
drop(inner);
tracing::trace!(
?error,
"worker failure: waking pending requests so they can be failed",
);
self.rx.close();
self.close.close();
// We don't schedule any batches on an errored service
self.pending_batch_timer = None;
// By closing the mpsc::Receiver, we know that that the run() loop will
// drain all pending requests. We just need to make sure that any
@ -263,16 +354,39 @@ impl Clone for ErrorHandle {
impl<T, Request> PinnedDrop for Worker<T, Request>
where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
T::Error: Into<crate::BoxError>,
{
fn drop(mut self: Pin<&mut Self>) {
tracing::trace!(
pending_items = self.pending_items,
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
running_batches = self.concurrent_batches.len(),
error = ?self.failed,
"dropping batch worker",
);
// Fail pending tasks
self.failed(Closed::new().into());
// Clear queued requests
while self.rx.try_recv().is_ok() {}
// Fail queued requests
while let Ok(msg) = self.rx.try_recv() {
let _ = msg
.tx
.send(Err(self.failed.as_ref().expect("just set failed").clone()));
}
// Stop accepting reservations
self.close.close();
// Clear any finished batches, ignoring any errors.
// Ignore any batches that are still executing, because we can't cancel them.
//
// now_or_never() can stop futures waking up, but that's ok here,
// because we're manually polling, then dropping the stream.
while let Some(Some(_)) = self
.as_mut()
.project()
.concurrent_batches
.next()
.now_or_never()
{}
}
}

View File

@ -61,7 +61,7 @@ async fn batch_flushes_on_max_items() -> Result<(), Report> {
// flushing is happening based on hitting max_items.
//
// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Batch::new(Ed25519Verifier::default(), 10, Duration::from_secs(1000));
let verifier = Batch::new(Ed25519Verifier::default(), 10, 5, Duration::from_secs(1000));
timeout(Duration::from_secs(1), sign_and_verify(verifier, 100, None))
.await
.map_err(|e| eyre!(e))?
@ -79,7 +79,12 @@ async fn batch_flushes_on_max_latency() -> Result<(), Report> {
// flushing is happening based on hitting max_latency.
//
// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Batch::new(Ed25519Verifier::default(), 100, Duration::from_millis(500));
let verifier = Batch::new(
Ed25519Verifier::default(),
100,
10,
Duration::from_millis(500),
);
timeout(Duration::from_secs(1), sign_and_verify(verifier, 10, None))
.await
.map_err(|e| eyre!(e))?
@ -94,7 +99,12 @@ async fn fallback_verification() -> Result<(), Report> {
// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Fallback::new(
Batch::new(Ed25519Verifier::default(), 10, Duration::from_millis(100)),
Batch::new(
Ed25519Verifier::default(),
10,
1,
Duration::from_millis(100),
),
tower::service_fn(|item: Ed25519Item| async move { item.verify_single() }),
);

View File

@ -13,7 +13,7 @@ async fn wakes_pending_waiters_on_close() {
let (service, mut handle) = mock::pair::<_, ()>();
let (mut service, worker) = Batch::pair(service, 1, Duration::from_secs(1));
let (mut service, worker) = Batch::pair(service, 1, 1, Duration::from_secs(1));
let mut worker = task::spawn(worker.run());
// // keep the request in the worker
@ -72,7 +72,7 @@ async fn wakes_pending_waiters_on_failure() {
let (service, mut handle) = mock::pair::<_, ()>();
let (mut service, worker) = Batch::pair(service, 1, Duration::from_secs(1));
let (mut service, worker) = Batch::pair(service, 1, 1, Duration::from_secs(1));
let mut worker = task::spawn(worker.run());
// keep the request in the worker

View File

@ -13,10 +13,10 @@ proptest-impl = ["proptest", "proptest-derive", "zebra-chain/proptest-impl", "ze
blake2b_simd = "1.0.0"
bellman = "0.13.0"
bls12_381 = "0.7.0"
halo2 = { package = "halo2_proofs", version = "0.2.0" }
jubjub = "0.9.0"
rand = { version = "0.8.5", package = "rand" }
halo2 = { package = "halo2_proofs", version = "0.2.0" }
rayon = "1.5.3"
chrono = "0.4.19"
dirs = "4.0.0"

View File

@ -11,6 +11,7 @@ use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use rayon::prelude::*;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch::{Batch, BatchControl};
@ -48,6 +49,7 @@ pub static VERIFIER: Lazy<
Batch::new(
Verifier::default(),
super::MAX_BATCH_SIZE,
None,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification fails,
@ -106,38 +108,48 @@ impl Verifier {
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function blocks until the batch is completed on the thread pool.
/// This returns immediately, usually before the batch is completed.
fn flush_blocking(&mut self) {
let (batch, tx) = self.take();
// # Correctness
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: replace with the rayon thread pool
tokio::task::block_in_place(|| Self::verify(batch, tx));
// We don't care about execution order here, because this method is only called on drop.
tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future<Output = ()> {
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| Self::verify(batch, tx))
.map(|join_result| join_result.expect("panic in ed25519 batch verifier"))
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(|| {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx)))
})
.map(|join_result| join_result.expect("panic in ed25519 batch verifier"))
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(item: Item) -> impl Future<Output = VerifyResult> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| item.verify_single())
.map(|join_result| join_result.expect("panic in ed25519 fallback verifier"))
tokio::task::spawn_blocking(|| {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(|item| item.verify_single())
.collect()
})
.map(|join_result| join_result.expect("panic in ed25519 fallback verifier"))
}
}
@ -192,9 +204,7 @@ impl Service<BatchControl<Item>> for Verifier {
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
// This blocks the current thread and any futures running on it, until the batch is complete.
//
// TODO: move the batch onto the rayon thread pool, then drop the verifier immediately.
// This returns immediately, usually before the batch is completed.
self.flush_blocking();
}
}

View File

@ -44,7 +44,7 @@ async fn batch_flushes_on_max_items() -> Result<()> {
// Use a very long max_latency and a short timeout to check that
// flushing is happening based on hitting max_items.
let verifier = Batch::new(Verifier::default(), 10, Duration::from_secs(1000));
let verifier = Batch::new(Verifier::default(), 10, 5, Duration::from_secs(1000));
timeout(Duration::from_secs(5), sign_and_verify(verifier, 100))
.await?
.map_err(|e| eyre!(e))?;
@ -63,7 +63,7 @@ async fn batch_flushes_on_max_latency() -> Result<()> {
// Use a very high max_items and a short timeout to check that
// flushing is happening based on hitting max_latency.
let verifier = Batch::new(Verifier::default(), 100, Duration::from_millis(500));
let verifier = Batch::new(Verifier::default(), 100, 10, Duration::from_millis(500));
timeout(Duration::from_secs(5), sign_and_verify(verifier, 10))
.await?
.map_err(|e| eyre!(e))?;

View File

@ -17,6 +17,8 @@ use bls12_381::Bls12;
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use rayon::prelude::*;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
@ -78,6 +80,7 @@ pub static SPEND_VERIFIER: Lazy<
Batch::new(
Verifier::new(&GROTH16_PARAMETERS.sapling.spend.vk),
super::MAX_BATCH_SIZE,
None,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification fails,
@ -116,6 +119,7 @@ pub static OUTPUT_VERIFIER: Lazy<
Batch::new(
Verifier::new(&GROTH16_PARAMETERS.sapling.output.vk),
super::MAX_BATCH_SIZE,
None,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification
@ -401,16 +405,14 @@ impl Verifier {
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function blocks until the batch is completed on the thread pool.
/// This returns immediately, usually before the batch is completed.
fn flush_blocking(&mut self) {
let (batch, vk, tx) = self.take();
// # Correctness
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: replace with the rayon thread pool
tokio::task::block_in_place(|| Self::verify(batch, vk, tx));
// We don't care about execution order here, because this method is only called on drop.
tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, vk, tx)));
}
/// Flush the batch using a thread pool, and return the result via the channel.
@ -420,13 +422,16 @@ impl Verifier {
vk: &'static BatchVerifyingKey,
tx: Sender,
) -> impl Future<Output = ()> {
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(move || Self::verify(batch, vk, tx))
.map(|join_result| join_result.expect("panic in groth16 batch verifier"))
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(move || {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(move |s| s.spawn_fifo(move |_s| Self::verify(batch, vk, tx)))
})
.map(|join_result| join_result.expect("panic in groth16 batch verifier"))
}
/// Verify a single item using a thread pool, and return the result.
@ -436,10 +441,19 @@ impl Verifier {
pvk: &'static ItemVerifyingKey,
) -> impl Future<Output = VerifyResult> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(move || item.verify_single(pvk))
.map(|join_result| join_result.expect("panic in groth16 fallback verifier"))
tokio::task::spawn_blocking(move || {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(move |item| item.verify_single(pvk))
.collect()
})
.map(|join_result| join_result.expect("panic in groth16 fallback verifier"))
}
}
@ -510,9 +524,7 @@ impl Service<BatchControl<Item>> for Verifier {
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
// This blocks the current thread and any futures running on it, until the batch is complete.
//
// TODO: move the batch onto the rayon thread pool, then drop the verifier immediately.
// This returns immediately, usually before the batch is completed.
self.flush_blocking()
}
}

View File

@ -75,6 +75,7 @@ async fn verify_sapling_groth16() {
Batch::new(
Verifier::new(&GROTH16_PARAMETERS.sapling.spend.vk),
crate::primitives::MAX_BATCH_SIZE,
None,
crate::primitives::MAX_BATCH_LATENCY,
),
tower::service_fn(
@ -87,6 +88,7 @@ async fn verify_sapling_groth16() {
Batch::new(
Verifier::new(&GROTH16_PARAMETERS.sapling.output.vk),
crate::primitives::MAX_BATCH_SIZE,
None,
crate::primitives::MAX_BATCH_LATENCY,
),
tower::service_fn(
@ -179,6 +181,7 @@ async fn correctly_err_on_invalid_output_proof() {
Batch::new(
Verifier::new(&GROTH16_PARAMETERS.sapling.output.vk),
crate::primitives::MAX_BATCH_SIZE,
None,
crate::primitives::MAX_BATCH_LATENCY,
),
tower::service_fn(

View File

@ -12,6 +12,8 @@ use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use orchard::circuit::VerifyingKey;
use rand::{thread_rng, CryptoRng, RngCore};
use rayon::prelude::*;
use thiserror::Error;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
@ -203,6 +205,7 @@ pub static VERIFIER: Lazy<
Batch::new(
Verifier::new(&VERIFYING_KEY),
HALO2_MAX_BATCH_SIZE,
None,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification fails,
@ -269,16 +272,14 @@ impl Verifier {
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function blocks until the batch is completed on the thread pool.
/// This returns immediately, usually before the batch is completed.
fn flush_blocking(&mut self) {
let (batch, vk, tx) = self.take();
// # Correctness
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: replace with the rayon thread pool
tokio::task::block_in_place(|| Self::verify(batch, vk, tx));
// We don't care about execution order here, because this method is only called on drop.
tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, vk, tx)));
}
/// Flush the batch using a thread pool, and return the result via the channel.
@ -288,13 +289,16 @@ impl Verifier {
vk: &'static BatchVerifyingKey,
tx: Sender,
) -> impl Future<Output = ()> {
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(move || Self::verify(batch, vk, tx))
.map(|join_result| join_result.expect("panic in halo2 batch verifier"))
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(move || {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(move |s| s.spawn_fifo(move |_s| Self::verify(batch, vk, tx)))
})
.map(|join_result| join_result.expect("panic in halo2 batch verifier"))
}
/// Verify a single item using a thread pool, and return the result.
@ -304,10 +308,19 @@ impl Verifier {
pvk: &'static ItemVerifyingKey,
) -> impl Future<Output = VerifyResult> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(move || item.verify_single(pvk).map_err(Halo2Error::from))
.map(|join_result| join_result.expect("panic in halo2 fallback verifier"))
tokio::task::spawn_blocking(move || {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(move |item| item.verify_single(pvk).map_err(Halo2Error::from))
.collect()
})
.map(|join_result| join_result.expect("panic in halo2 fallback verifier"))
}
}
@ -377,9 +390,7 @@ impl Service<BatchControl<Item>> for Verifier {
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
// This blocks the current thread and any futures running on it, until the batch is complete.
//
// TODO: move the batch onto the rayon thread pool, then drop the verifier immediately.
// This returns immediately, usually before the batch is completed.
self.flush_blocking()
}
}

View File

@ -151,6 +151,7 @@ async fn verify_generated_halo2_proofs() {
Batch::new(
Verifier::new(&VERIFYING_KEY),
crate::primitives::MAX_BATCH_SIZE,
None,
crate::primitives::MAX_BATCH_LATENCY,
),
tower::service_fn(
@ -217,6 +218,7 @@ async fn correctly_err_on_invalid_halo2_proofs() {
Batch::new(
Verifier::new(&VERIFYING_KEY),
crate::primitives::MAX_BATCH_SIZE,
None,
crate::primitives::MAX_BATCH_LATENCY,
),
tower::service_fn(

View File

@ -11,6 +11,7 @@ use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use rayon::prelude::*;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch::{Batch, BatchControl};
@ -49,6 +50,7 @@ pub static VERIFIER: Lazy<
Batch::new(
Verifier::default(),
super::MAX_BATCH_SIZE,
None,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification fails,
@ -107,38 +109,48 @@ impl Verifier {
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function blocks until the batch is completed on the thread pool.
/// This returns immediately, usually before the batch is completed.
fn flush_blocking(&mut self) {
let (batch, tx) = self.take();
// # Correctness
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: replace with the rayon thread pool
tokio::task::block_in_place(|| Self::verify(batch, tx));
// We don't care about execution order here, because this method is only called on drop.
tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future<Output = ()> {
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| Self::verify(batch, tx))
.map(|join_result| join_result.expect("panic in redjubjub batch verifier"))
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(|| {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx)))
})
.map(|join_result| join_result.expect("panic in redjubjub batch verifier"))
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(item: Item) -> impl Future<Output = VerifyResult> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| item.verify_single())
.map(|join_result| join_result.expect("panic in redjubjub fallback verifier"))
tokio::task::spawn_blocking(|| {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(|item| item.verify_single())
.collect()
})
.map(|join_result| join_result.expect("panic in redjubjub fallback verifier"))
}
}
@ -194,9 +206,7 @@ impl Service<BatchControl<Item>> for Verifier {
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
// This blocks the current thread and any futures running on it, until the batch is complete.
//
// TODO: move the batch onto the rayon thread pool, then drop the verifier immediately.
// This returns immediately, usually before the batch is completed.
self.flush_blocking();
}
}

View File

@ -56,7 +56,7 @@ async fn batch_flushes_on_max_items() -> Result<()> {
// Use a very long max_latency and a short timeout to check that
// flushing is happening based on hitting max_items.
let verifier = Batch::new(Verifier::default(), 10, Duration::from_secs(1000));
let verifier = Batch::new(Verifier::default(), 10, 5, Duration::from_secs(1000));
timeout(Duration::from_secs(5), sign_and_verify(verifier, 100))
.await?
.map_err(|e| eyre!(e))?;
@ -75,7 +75,7 @@ async fn batch_flushes_on_max_latency() -> Result<()> {
// Use a very high max_items and a short timeout to check that
// flushing is happening based on hitting max_latency.
let verifier = Batch::new(Verifier::default(), 100, Duration::from_millis(500));
let verifier = Batch::new(Verifier::default(), 100, 10, Duration::from_millis(500));
timeout(Duration::from_secs(5), sign_and_verify(verifier, 10))
.await?
.map_err(|e| eyre!(e))?;

View File

@ -10,6 +10,8 @@ use std::{
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use rayon::prelude::*;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch::{Batch, BatchControl};
@ -48,6 +50,7 @@ pub static VERIFIER: Lazy<
Batch::new(
Verifier::default(),
super::MAX_BATCH_SIZE,
None,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification fails,
@ -106,38 +109,48 @@ impl Verifier {
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function blocks until the batch is completed on the thread pool.
/// This returns immediately, usually before the batch is completed.
fn flush_blocking(&mut self) {
let (batch, tx) = self.take();
// # Correctness
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: replace with the rayon thread pool
tokio::task::block_in_place(|| Self::verify(batch, tx));
// We don't care about execution order here, because this method is only called on drop.
tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future<Output = ()> {
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| Self::verify(batch, tx))
.map(|join_result| join_result.expect("panic in redpallas batch verifier"))
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(|| {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx)))
})
.map(|join_result| join_result.expect("panic in ed25519 batch verifier"))
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(item: Item) -> impl Future<Output = VerifyResult> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| item.verify_single())
.map(|join_result| join_result.expect("panic in redpallas fallback verifier"))
tokio::task::spawn_blocking(|| {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(|item| item.verify_single())
.collect()
})
.map(|join_result| join_result.expect("panic in redpallas fallback verifier"))
}
}
@ -192,9 +205,7 @@ impl Service<BatchControl<Item>> for Verifier {
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
// This blocks the current thread and any futures running on it, until the batch is complete.
//
// TODO: move the batch onto the rayon thread pool, then drop the verifier immediately.
// This returns immediately, usually before the batch is completed.
self.flush_blocking();
}
}

View File

@ -51,7 +51,7 @@ async fn batch_flushes_on_max_items() -> Result<()> {
// Use a very long max_latency and a short timeout to check that
// flushing is happening based on hitting max_items.
let verifier = Batch::new(Verifier::default(), 10, Duration::from_secs(1000));
let verifier = Batch::new(Verifier::default(), 10, 5, Duration::from_secs(1000));
timeout(Duration::from_secs(5), sign_and_verify(verifier, 100))
.await?
.map_err(|e| eyre!(e))?;
@ -65,7 +65,7 @@ async fn batch_flushes_on_max_latency() -> Result<()> {
// Use a very high max_items and a short timeout to check that
// flushing is happening based on hitting max_latency.
let verifier = Batch::new(Verifier::default(), 100, Duration::from_millis(500));
let verifier = Batch::new(Verifier::default(), 100, 10, Duration::from_millis(500));
timeout(Duration::from_secs(5), sign_and_verify(verifier, 10))
.await?
.map_err(|e| eyre!(e))?;

View File

@ -86,6 +86,7 @@ serde = { version = "1.0.137", features = ["serde_derive"] }
toml = "0.5.9"
futures = "0.3.21"
rayon = "1.5.3"
tokio = { version = "1.19.2", features = ["time", "rt-multi-thread", "macros", "tracing", "signal"] }
tower = { version = "0.4.13", features = ["hedge", "limit"] }
pin-project = "1.0.10"

View File

@ -343,6 +343,19 @@ impl Application for ZebradApp {
}
}));
// Apply the configured number of threads to the thread pool.
//
// TODO:
// - set rayon panic handler to a function that takes `Box<dyn Any + Send + 'static>`,
// which forwards to sentry. If possible, use eyre's panic report for formatting.
// - do we also need to call this code in `zebra_consensus::init()`,
// when that crate is being used by itself?
rayon::ThreadPoolBuilder::new()
.num_threads(config.sync.parallel_cpu_threads)
.thread_name(|thread_index| format!("rayon {}", thread_index))
.build_global()
.expect("unable to initialize rayon thread pool");
self.config = Some(config);
let cfg_ref = self
@ -389,6 +402,11 @@ impl Application for ZebradApp {
// leak the global span, to make sure it stays active
std::mem::forget(global_guard);
tracing::info!(
num_threads = rayon::current_num_threads(),
"initialized rayon thread pool for CPU-bound tasks",
);
// Launch network and async endpoints only for long-running commands.
if is_server {
components.push(Box::new(TokioComponent::new()?));

View File

@ -1,11 +1,20 @@
//! A component owning the Tokio runtime.
//!
//! The tokio runtime is used for:
//! - non-blocking async tasks, via [`Future`]s and
//! - blocking network and file tasks, via [`spawn_blocking`](tokio::task::spawn_blocking).
//!
//! The rayon thread pool is used for:
//! - long-running CPU-bound tasks like cryptography, via [`rayon::spawn_fifo`].
use std::{future::Future, time::Duration};
use crate::prelude::*;
use abscissa_core::{Application, Component, FrameworkError, Shutdown};
use color_eyre::Report;
use std::{future::Future, time::Duration};
use tokio::runtime::Runtime;
use crate::prelude::*;
/// When Zebra is shutting down, wait this long for tokio tasks to finish.
const TOKIO_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(20);

View File

@ -209,6 +209,12 @@ pub struct SyncSection {
/// This is set to a low value by default, to avoid verification timeouts on large blocks.
/// Increasing this value may improve performance on machines with many cores.
pub full_verify_concurrency_limit: usize,
/// The number of threads used to verify signatures, proofs, and other CPU-intensive code.
///
/// Set to `0` by default, which uses one thread per available CPU core.
/// For details, see [the rayon documentation](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads).
pub parallel_cpu_threads: usize,
}
impl Default for SyncSection {
@ -223,11 +229,20 @@ impl Default for SyncSection {
// This default is deliberately very low, so Zebra can verify a few large blocks in under 60 seconds,
// even on machines with only a few cores.
//
// This lets users see the committed block height changing in every progress log.
// This lets users see the committed block height changing in every progress log,
// and avoids hangs due to out-of-order verifications flooding the CPUs.
//
// TODO: when we implement orchard proof batching, try increasing to 20 or more
// limit full verification concurrency based on block transaction counts?
full_verify_concurrency_limit: 5,
// TODO:
// - limit full verification concurrency based on block transaction counts?
// - move more disk work to blocking tokio threads,
// and CPU work to the rayon thread pool inside blocking tokio threads
full_verify_concurrency_limit: 20,
// Use one thread per CPU.
//
// If this causes tokio executor starvation, move CPU-intensive tasks to rayon threads,
// or reserve a few cores for tokio threads, based on `num_cpus()`.
parallel_cpu_threads: 0,
}
}
}