fix(batch): Improve batch verifier async, correctness, and performance (#4750)

* Use a new channel for each batch

* Prefer the batch timer if there are also new batch requests

* Allow other tasks to run after each batch

* Label each batch worker with the verifier's type

* Rename Handle to ErrorHandle, and fix up some docs

* Check batch worker tasks for panics and task termination

* Use tokio's PollSemaphore instead of an outdated Semaphore impl

* Run all verifier cryptography on a blocking thread

Also use a new verifier channel for each batch.

* Make flush and drop behaviour consistent for all verifiers

* Partly fix an incorrect NU5 test

* Switch batch tests to the multi-threaded runtime

* Export all verifier primitive modules from zebra-consensus

* Remove outdated test code in tower-batch

* Use a watch channel to send batch verifier results

* Use spawn_blocking for batch fallback verifiers

* Spawn cryptography batches onto blocking tokio threads

* Use smaller batches for halo2

* Minor tower-batch cleanups

* Fix doc link in zebra-test

* Drop previous permit before acquiring another to avoid a deadlock edge case
This commit is contained in:
teor 2022-07-18 08:41:18 +10:00 committed by GitHub
parent 485bac819d
commit 9b9cd55097
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1035 additions and 649 deletions

2
Cargo.lock generated
View File

@ -5449,11 +5449,13 @@ dependencies = [
"rand 0.8.5",
"tokio",
"tokio-test",
"tokio-util 0.7.3",
"tower",
"tower-fallback",
"tower-test",
"tracing",
"tracing-futures",
"zebra-consensus",
"zebra-test",
]

View File

@ -10,6 +10,7 @@ futures = "0.3.21"
futures-core = "0.3.21"
pin-project = "1.0.10"
tokio = { version = "1.19.2", features = ["time", "sync", "tracing", "macros"] }
tokio-util = "0.7.3"
tower = { version = "0.4.13", features = ["util", "buffer"] }
tracing = "0.1.31"
tracing-futures = "0.2.5"
@ -25,4 +26,5 @@ tower-fallback = { path = "../tower-fallback/" }
tower-test = "0.4.0"
tracing = "0.1.31"
zebra-consensus = { path = "../zebra-consensus/" }
zebra-test = { path = "../zebra-test/" }

View File

@ -89,22 +89,23 @@ pub mod error;
pub mod future;
mod layer;
mod message;
mod semaphore;
mod service;
mod worker;
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Signaling mechanism for batchable services that allows explicit flushing.
pub enum BatchControl<R> {
///
/// This request type is a generic wrapper for the inner `Req` type.
pub enum BatchControl<Req> {
/// A new batch item.
Item(R),
Item(Req),
/// The current batch should be flushed.
Flush,
}
impl<R> From<R> for BatchControl<R> {
fn from(req: R) -> BatchControl<R> {
impl<Req> From<Req> for BatchControl<Req> {
fn from(req: Req) -> BatchControl<Req> {
BatchControl::Item(req)
}
}

View File

@ -1,5 +1,8 @@
//! Batch message types.
use tokio::sync::{oneshot, OwnedSemaphorePermit};
use super::error::ServiceError;
use tokio::sync::oneshot;
/// Message sent to the batch worker
#[derive(Debug)]
@ -7,7 +10,7 @@ pub(crate) struct Message<Request, Fut> {
pub(crate) request: Request,
pub(crate) tx: Tx<Fut>,
pub(crate) span: tracing::Span,
pub(super) _permit: crate::semaphore::Permit,
pub(super) _permit: OwnedSemaphorePermit,
}
/// Response sender

View File

@ -1,128 +0,0 @@
// Copied from tower/src/semaphore.rs, commit:
// d4d1c67 hedge: use auto-resizing histograms (#484)
//
// When we upgrade to tower 0.4, we can use tokio's PollSemaphore, like tower's:
// ccfaffc buffer, limit: use `tokio-util`'s `PollSemaphore` (#556)
// Ignore lints on this copied code
#![allow(dead_code)]
pub(crate) use self::sync::OwnedSemaphorePermit as Permit;
use futures::FutureExt;
use futures_core::ready;
use std::{
fmt,
future::Future,
mem,
pin::Pin,
sync::{Arc, Weak},
task::{Context, Poll},
};
use tokio::sync;
#[derive(Debug)]
pub(crate) struct Semaphore {
semaphore: Arc<sync::Semaphore>,
state: State,
}
#[derive(Debug)]
pub(crate) struct Close {
semaphore: Weak<sync::Semaphore>,
permits: usize,
}
enum State {
Waiting(Pin<Box<dyn Future<Output = Permit> + Send + Sync + 'static>>),
Ready(Permit),
Empty,
}
impl Semaphore {
pub(crate) fn new_with_close(permits: usize) -> (Self, Close) {
let semaphore = Arc::new(sync::Semaphore::new(permits));
let close = Close {
semaphore: Arc::downgrade(&semaphore),
permits,
};
let semaphore = Self {
semaphore,
state: State::Empty,
};
(semaphore, close)
}
pub(crate) fn new(permits: usize) -> Self {
Self {
semaphore: Arc::new(sync::Semaphore::new(permits)),
state: State::Empty,
}
}
pub(crate) fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop {
self.state = match self.state {
State::Ready(_) => return Poll::Ready(()),
State::Waiting(ref mut fut) => {
let permit = ready!(Pin::new(fut).poll(cx));
State::Ready(permit)
}
State::Empty => State::Waiting(Box::pin(
self.semaphore
.clone()
.acquire_owned()
.map(|result| result.expect("internal semaphore is never closed")),
)),
};
}
}
pub(crate) fn take_permit(&mut self) -> Option<Permit> {
if let State::Ready(permit) = mem::replace(&mut self.state, State::Empty) {
return Some(permit);
}
None
}
}
impl Clone for Semaphore {
fn clone(&self) -> Self {
Self {
semaphore: self.semaphore.clone(),
state: State::Empty,
}
}
}
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
State::Waiting(_) => f
.debug_tuple("State::Waiting")
.field(&format_args!("..."))
.finish(),
State::Ready(ref r) => f.debug_tuple("State::Ready").field(&r).finish(),
State::Empty => f.debug_tuple("State::Empty").finish(),
}
}
}
impl Close {
/// Close the semaphore, waking any remaining tasks currently awaiting a permit.
pub(crate) fn close(self) {
// The maximum number of permits that a `tokio::sync::Semaphore`
// can hold is usize::MAX >> 3. If we attempt to add more than that
// number of permits, the semaphore will panic.
// XXX(eliza): another shift is kinda janky but if we add (usize::MAX
// > 3 - initial permits) the semaphore impl panics (I think due to a
// bug in tokio?).
// TODO(eliza): Tokio should _really_ just expose `Semaphore::close`
// publicly so we don't have to do this nonsense...
const MAX: usize = std::usize::MAX >> 4;
if let Some(semaphore) = self.semaphore.upgrade() {
// If we added `MAX - available_permits`, any tasks that are
// currently holding permits could drop them, overflowing the max.
semaphore.add_permits(MAX - self.permits);
}
}
}

View File

@ -1,40 +1,65 @@
//! Wrapper service for batching items to an underlying service.
use std::{
fmt,
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use futures_core::ready;
use tokio::{
pin,
sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore},
task::JoinHandle,
};
use tokio_util::sync::PollSemaphore;
use tower::Service;
use tracing::{info_span, Instrument};
use super::{
future::ResponseFuture,
message::Message,
worker::{Handle, Worker},
worker::{ErrorHandle, Worker},
BatchControl,
};
use crate::semaphore::Semaphore;
use futures_core::ready;
use std::{
fmt,
task::{Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
use tower::Service;
/// Allows batch processing of requests.
///
/// See the module documentation for more details.
/// See the crate documentation for more details.
pub struct Batch<T, Request>
where
T: Service<BatchControl<Request>>,
{
// Note: this actually _is_ bounded, but rather than using Tokio's unbounded
// channel, we use tokio's semaphore separately to implement the bound.
/// A custom-bounded channel for sending requests to the batch worker.
///
/// Note: this actually _is_ bounded, but rather than using Tokio's unbounded
/// channel, we use tokio's semaphore separately to implement the bound.
tx: mpsc::UnboundedSender<Message<Request, T::Future>>,
// When the buffer's channel is full, we want to exert backpressure in
// `poll_ready`, so that callers such as load balancers could choose to call
// another service rather than waiting for buffer capacity.
//
// Unfortunately, this can't be done easily using Tokio's bounded MPSC
// channel, because it doesn't expose a polling-based interface, only an
// `async fn ready`, which borrows the sender. Therefore, we implement our
// own bounded MPSC on top of the unbounded channel, using a semaphore to
// limit how many items are in the channel.
semaphore: Semaphore,
handle: Handle,
/// A semaphore used to bound the channel.
///
/// When the buffer's channel is full, we want to exert backpressure in
/// `poll_ready`, so that callers such as load balancers could choose to call
/// another service rather than waiting for buffer capacity.
///
/// Unfortunately, this can't be done easily using Tokio's bounded MPSC
/// channel, because it doesn't wake pending tasks on close. Therefore, we implement our
/// own bounded MPSC on top of the unbounded channel, using a semaphore to
/// limit how many items are in the channel.
semaphore: PollSemaphore,
/// A semaphore permit that allows this service to send one message on `tx`.
permit: Option<OwnedSemaphorePermit>,
/// An error handle shared between all service clones for the same worker.
error_handle: ErrorHandle,
/// A worker task handle shared between all service clones for the same worker.
///
/// Only used when the worker is spawned on the tokio runtime.
worker_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
}
impl<T, Request> fmt::Debug for Batch<T, Request>
@ -46,7 +71,8 @@ where
f.debug_struct(name)
.field("tx", &self.tx)
.field("semaphore", &self.semaphore)
.field("handle", &self.handle)
.field("error_handle", &self.error_handle)
.field("worker_handle", &self.worker_handle)
.finish()
}
}
@ -73,8 +99,28 @@ where
T::Error: Send + Sync,
Request: Send + 'static,
{
let (batch, worker) = Self::pair(service, max_items, max_latency);
tokio::spawn(worker.run());
let (mut batch, worker) = Self::pair(service, max_items, max_latency);
let span = info_span!("batch worker", kind = std::any::type_name::<T>());
#[cfg(tokio_unstable)]
let worker_handle = {
let batch_kind = std::any::type_name::<T>();
// TODO: identify the unique part of the type name generically,
// or make it an argument to this method
let batch_kind = batch_kind.trim_start_matches("zebra_consensus::primitives::");
let batch_kind = batch_kind.trim_end_matches("::Verifier");
tokio::task::Builder::new()
.name(&format!("{} batch", batch_kind))
.spawn(worker.run().instrument(span))
};
#[cfg(not(tokio_unstable))]
let worker_handle = tokio::spawn(worker.run().instrument(span));
batch.register_worker(worker_handle);
batch
}
@ -100,21 +146,36 @@ where
// used their semaphore reservation in a `call` yet).
// We choose a bound that allows callers to check readiness for every item in
// a batch, then actually submit those items.
let bound = max_items;
let (semaphore, close) = Semaphore::new_with_close(bound);
let semaphore = Semaphore::new(max_items);
let semaphore = PollSemaphore::new(Arc::new(semaphore));
let (handle, worker) = Worker::new(service, rx, max_items, max_latency, close);
let (error_handle, worker) =
Worker::new(service, rx, max_items, max_latency, semaphore.clone());
let batch = Batch {
tx,
semaphore,
handle,
permit: None,
error_handle,
worker_handle: Arc::new(Mutex::new(None)),
};
(batch, worker)
}
/// Ask the `Batch` to monitor the spawned worker task's [`JoinHandle`](tokio::task::JoinHandle).
///
/// Only used when the task is spawned on the tokio runtime.
pub fn register_worker(&mut self, worker_handle: JoinHandle<()>) {
*self
.worker_handle
.lock()
.expect("previous task panicked while holding the worker handle mutex") =
Some(worker_handle);
}
/// Returns the error from the batch worker's `error_handle`.
fn get_worker_error(&self) -> crate::BoxError {
self.handle.get_error_on_closed()
self.error_handle.get_error_on_closed()
}
}
@ -128,26 +189,59 @@ where
type Future = ResponseFuture<T::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// First, check if the worker is still alive.
if self.tx.is_closed() {
// If the inner service has errored, then we error here.
// Check to see if the worker has returned or panicked.
//
// Correctness: Registers this task for wakeup when the worker finishes.
if let Some(worker_handle) = self
.worker_handle
.lock()
.expect("previous task panicked while holding the worker handle mutex")
.as_mut()
{
match Pin::new(worker_handle).poll(cx) {
Poll::Ready(Ok(())) => return Poll::Ready(Err(self.get_worker_error())),
Poll::Ready(task_panic) => {
task_panic.expect("unexpected panic in batch worker task")
}
Poll::Pending => {}
}
}
// Check if the worker has set an error and closed its channels.
//
// Correctness: Registers this task for wakeup when the channel is closed.
let tx = self.tx.clone();
let closed = tx.closed();
pin!(closed);
if closed.poll(cx).is_ready() {
return Poll::Ready(Err(self.get_worker_error()));
}
// Poll to acquire a semaphore permit.
//
// CORRECTNESS
//
// Poll to acquire a semaphore permit. If we acquire a permit, then
// there's enough buffer capacity to send a new request. Otherwise, we
// need to wait for capacity.
// If we acquire a permit, then there's enough buffer capacity to send a new request.
// Otherwise, we need to wait for capacity. When that happens, `poll_acquire()` registers
// this task for wakeup when the next permit is available, or when the semaphore is closed.
//
// In tokio 0.3.7, `acquire_owned` panics if its semaphore returns an
// error, so we don't need to handle errors until we upgrade to
// tokio 1.0.
// When `poll_ready()` is called multiple times, and channel capacity is 1,
// avoid deadlocks by dropping any previous permit before acquiring another one.
// This also stops tasks holding a permit after an error.
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`. If it returns Pending, the semaphore also schedules
// the task for wakeup when the next permit is available.
ready!(self.semaphore.poll_acquire(cx));
// Calling `poll_ready()` multiple times can make tasks lose their previous permit
// to another concurrent task.
self.permit = None;
let permit = ready!(self.semaphore.poll_acquire(cx));
if let Some(permit) = permit {
// Calling poll_ready() more than once will drop any previous permit,
// releasing its capacity back to the semaphore.
self.permit = Some(permit);
} else {
// The semaphore has been closed.
return Poll::Ready(Err(self.get_worker_error()));
}
Poll::Ready(Ok(()))
}
@ -155,9 +249,9 @@ where
fn call(&mut self, request: Request) -> Self::Future {
tracing::trace!("sending request to buffer worker");
let _permit = self
.semaphore
.take_permit()
.expect("buffer full; poll_ready must be called first");
.permit
.take()
.expect("poll_ready must be called before a batch request");
// get the current Span so that we can explicitly propagate it to the worker
// if we didn't do this, events on the worker related to this span wouldn't be counted
@ -187,8 +281,10 @@ where
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
handle: self.handle.clone(),
semaphore: self.semaphore.clone(),
permit: None,
error_handle: self.error_handle.clone(),
worker_handle: self.worker_handle.clone(),
}
}
}

View File

@ -1,3 +1,5 @@
//! Batch worker item handling and run loop implementation.
use std::{
pin::Pin,
sync::{Arc, Mutex},
@ -9,11 +11,10 @@ use tokio::{
sync::mpsc,
time::{sleep, Sleep},
};
use tokio_util::sync::PollSemaphore;
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use crate::semaphore;
use super::{
error::{Closed, ServiceError},
message::{self, Message},
@ -34,18 +35,31 @@ where
T: Service<BatchControl<Request>>,
T::Error: Into<crate::BoxError>,
{
/// A semaphore-bounded channel for receiving requests from the batch wrapper service.
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
/// The wrapped service that processes batches.
service: T,
/// An error that's populated on permanent service failure.
failed: Option<ServiceError>,
handle: Handle,
/// A shared error handle that's populated on permanent service failure.
error_handle: ErrorHandle,
/// The maximum number of items allowed in a batch.
max_items: usize,
/// The maximum delay before processing a batch with fewer than `max_items`.
max_latency: std::time::Duration,
close: Option<semaphore::Close>,
/// A cloned copy of the wrapper service's semaphore, used to close the semaphore.
close: PollSemaphore,
}
/// Get the error out
#[derive(Debug)]
pub(crate) struct Handle {
pub(crate) struct ErrorHandle {
inner: Arc<Mutex<Option<ServiceError>>>,
}
@ -59,23 +73,23 @@ where
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
max_items: usize,
max_latency: std::time::Duration,
close: semaphore::Close,
) -> (Handle, Worker<T, Request>) {
let handle = Handle {
close: PollSemaphore,
) -> (ErrorHandle, Worker<T, Request>) {
let error_handle = ErrorHandle {
inner: Arc::new(Mutex::new(None)),
};
let worker = Worker {
rx,
service,
handle: handle.clone(),
error_handle: error_handle.clone(),
failed: None,
max_items,
max_latency,
close: Some(close),
close,
};
(handle, worker)
(error_handle, worker)
}
async fn process_req(&mut self, req: Request, tx: message::Tx<T::Future>) {
@ -97,10 +111,8 @@ where
.clone()));
// Wake any tasks waiting on channel capacity.
if let Some(close) = self.close.take() {
tracing::debug!("waking pending tasks");
close.close();
}
tracing::debug!("waking pending tasks");
self.close.close();
}
}
}
@ -115,6 +127,9 @@ where
{
self.failed(e.into());
}
// Correctness: allow other tasks to run at the end of every batch.
tokio::task::yield_now().await;
}
pub async fn run(mut self) {
@ -142,8 +157,23 @@ where
},
Some(sleep) => {
// Wait on either a new message or the batch timer.
// If both are ready, select! chooses one of them at random.
//
// If both are ready, end the batch now, because the timer has elapsed.
// If the timer elapses, any pending messages are preserved:
// https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
tokio::select! {
biased;
// The batch timer elapsed.
() = sleep => {
// TODO: use a batch-specific span to instrument this future.
self.flush_service().await;
// Now we have an empty batch.
timer = None;
pending_items = 0;
}
maybe_msg = self.rx.recv() => match maybe_msg {
Some(msg) => {
let span = msg.span;
@ -154,8 +184,9 @@ where
pending_items += 1;
// Check whether we have too many pending items.
if pending_items >= self.max_items {
// XXX(hdevalence): what span should instrument this?
// TODO: use a batch-specific span to instrument this future.
self.flush_service().await;
// Now we have an empty batch.
timer = None;
pending_items = 0;
@ -168,13 +199,6 @@ where
return;
}
},
() = sleep => {
// The batch timer elapsed.
// XXX(hdevalence): what span should instrument this?
self.flush_service().await;
timer = None;
pending_items = 0;
}
}
}
}
@ -187,7 +211,7 @@ where
// an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent
// requests will also fail with the same error.
// Note that we need to handle the case where some handle is concurrently trying to send us
// Note that we need to handle the case where some error_handle is concurrently trying to send us
// a request. We need to make sure that *either* the send of the request fails *or* it
// receives an error on the `oneshot` it constructed. Specifically, we want to avoid the
// case where we send errors to all outstanding requests, and *then* the caller sends its
@ -196,7 +220,7 @@ where
// sending the error to all outstanding requests.
let error = ServiceError::new(error);
let mut inner = self.handle.inner.lock().unwrap();
let mut inner = self.error_handle.inner.lock().unwrap();
if inner.is_some() {
// Future::poll was called after we've already errored out!
@ -216,20 +240,20 @@ where
}
}
impl Handle {
impl ErrorHandle {
pub(crate) fn get_error_on_closed(&self) -> crate::BoxError {
self.inner
.lock()
.unwrap()
.expect("previous task panicked while holding the error handle mutex")
.as_ref()
.map(|svc_err| svc_err.clone().into())
.unwrap_or_else(|| Closed::new().into())
}
}
impl Clone for Handle {
fn clone(&self) -> Handle {
Handle {
impl Clone for ErrorHandle {
fn clone(&self) -> ErrorHandle {
ErrorHandle {
inner: self.inner.clone(),
}
}
@ -242,8 +266,13 @@ where
T::Error: Into<crate::BoxError>,
{
fn drop(mut self: Pin<&mut Self>) {
if let Some(close) = self.as_mut().close.take() {
close.close();
}
// Fail pending tasks
self.failed(Closed::new().into());
// Clear queued requests
while self.rx.try_recv().is_ok() {}
// Stop accepting reservations
self.close.close();
}
}

View File

@ -1,88 +1,18 @@
use std::{
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
//! Test batching using ed25519 verification.
use std::time::Duration;
use color_eyre::{eyre::eyre, Report};
use ed25519_zebra::*;
use futures::stream::{FuturesUnordered, StreamExt};
use rand::thread_rng;
use tokio::sync::broadcast::{channel, error::RecvError, Sender};
use tower::{Service, ServiceExt};
use tower_batch::{Batch, BatchControl};
use tower_batch::Batch;
use tower_fallback::Fallback;
// ============ service impl ============
pub struct Ed25519Verifier {
batch: batch::Verifier,
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also
// provides a spmc channel, "watch", but it only keeps the latest value, so
// using it would require thinking through whether it was possible for
// results from one batch to be mixed with another.
tx: Sender<Result<(), Error>>,
}
#[allow(clippy::new_without_default)]
impl Ed25519Verifier {
pub fn new() -> Self {
let batch = batch::Verifier::default();
// XXX(hdevalence) what's a reasonable choice here?
let (tx, _) = channel(10);
Self { batch, tx }
}
}
pub type Ed25519Item = batch::Item;
impl Service<BatchControl<Ed25519Item>> for Ed25519Verifier {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: BatchControl<Ed25519Item>) -> Self::Future {
match req {
BatchControl::Item(item) => {
tracing::trace!("got item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.recv().await {
Ok(result) => result,
Err(RecvError::Lagged(_)) => {
tracing::warn!(
"missed channel updates for the correct signature batch!"
);
Err(Error::InvalidSignature)
}
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
Box::pin(async { Ok(()) })
}
}
}
}
impl Drop for Ed25519Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
}
}
use zebra_consensus::ed25519::{Item as Ed25519Item, Verifier as Ed25519Verifier};
// =============== testing code ========
@ -122,14 +52,16 @@ where
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_items() -> Result<(), Report> {
use tokio::time::timeout;
zebra_test::init();
// Use a very long max_latency and a short timeout to check that
// flushing is happening based on hitting max_items.
let verifier = Batch::new(Ed25519Verifier::new(), 10, Duration::from_secs(1000));
//
// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Batch::new(Ed25519Verifier::default(), 10, Duration::from_secs(1000));
timeout(Duration::from_secs(1), sign_and_verify(verifier, 100, None))
.await
.map_err(|e| eyre!(e))?
@ -138,14 +70,16 @@ async fn batch_flushes_on_max_items() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_latency() -> Result<(), Report> {
use tokio::time::timeout;
zebra_test::init();
// Use a very high max_items and a short timeout to check that
// flushing is happening based on hitting max_latency.
let verifier = Batch::new(Ed25519Verifier::new(), 100, Duration::from_millis(500));
//
// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Batch::new(Ed25519Verifier::default(), 100, Duration::from_millis(500));
timeout(Duration::from_secs(1), sign_and_verify(verifier, 10, None))
.await
.map_err(|e| eyre!(e))?
@ -154,12 +88,13 @@ async fn batch_flushes_on_max_latency() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn fallback_verification() -> Result<(), Report> {
zebra_test::init();
// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Fallback::new(
Batch::new(Ed25519Verifier::new(), 10, Duration::from_millis(100)),
Batch::new(Ed25519Verifier::default(), 10, Duration::from_millis(100)),
tower::service_fn(|item: Ed25519Item| async move { item.verify_single() }),
);

View File

@ -1,4 +1,7 @@
//! Fixed test cases for batch worker tasks.
use std::time::Duration;
use tokio_test::{assert_pending, assert_ready, assert_ready_err, task};
use tower::{Service, ServiceExt};
use tower_batch::{error, Batch};
@ -37,29 +40,29 @@ async fn wakes_pending_waiters_on_close() {
assert!(
err.is::<error::Closed>(),
"response should fail with a Closed, got: {:?}",
err
err,
);
assert!(
ready1.is_woken(),
"dropping worker should wake ready task 1"
"dropping worker should wake ready task 1",
);
let err = assert_ready_err!(ready1.poll());
assert!(
err.is::<error::Closed>(),
"ready 1 should fail with a Closed, got: {:?}",
err
err.is::<error::ServiceError>(),
"ready 1 should fail with a ServiceError {{ Closed }}, got: {:?}",
err,
);
assert!(
ready2.is_woken(),
"dropping worker should wake ready task 2"
"dropping worker should wake ready task 2",
);
let err = assert_ready_err!(ready1.poll());
assert!(
err.is::<error::Closed>(),
"ready 2 should fail with a Closed, got: {:?}",
err
err.is::<error::ServiceError>(),
"ready 2 should fail with a ServiceError {{ Closed }}, got: {:?}",
err,
);
}

View File

@ -29,7 +29,7 @@ futures = "0.3.21"
futures-util = "0.3.21"
metrics = "0.18.1"
thiserror = "1.0.31"
tokio = { version = "1.19.2", features = ["time", "sync", "tracing"] }
tokio = { version = "1.19.2", features = ["time", "sync", "tracing", "rt-multi-thread"] }
tower = { version = "0.4.13", features = ["timeout", "util", "buffer"] }
tracing = "0.1.31"
tracing-futures = "0.2.5"

View File

@ -53,7 +53,7 @@ pub use checkpoint::{
};
pub use config::Config;
pub use error::BlockError;
pub use primitives::groth16;
pub use primitives::{ed25519, groth16, halo2, redjubjub, redpallas};
/// A boxed [`std::error::Error`].
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

View File

@ -11,10 +11,3 @@ const MAX_BATCH_SIZE: usize = 64;
/// The maximum latency bound for any of the batch verifiers.
const MAX_BATCH_LATENCY: std::time::Duration = std::time::Duration::from_millis(100);
/// The size of the buffer in the broadcast channels used by batch verifiers.
///
/// This bound limits the number of concurrent batches for each verifier.
/// If tasks delay checking for verifier results, and the bound is too small,
/// new batches will be rejected with `RecvError`s.
const BROADCAST_BUFFER_SIZE: usize = 512;

View File

@ -1,8 +1,5 @@
//! Async Ed25519 batch verifier service
#[cfg(test)]
mod tests;
use std::{
future::Future,
mem,
@ -10,16 +7,32 @@ use std::{
task::{Context, Poll},
};
use futures::future::{ready, Ready};
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use tokio::sync::broadcast::{channel, error::RecvError, Sender};
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch::{Batch, BatchControl};
use tower_fallback::Fallback;
use zebra_chain::primitives::ed25519::{batch, *};
#[cfg(test)]
mod tests;
/// The type of the batch verifier.
type BatchVerifier = batch::Verifier;
/// The type of verification results.
type VerifyResult = Result<(), Error>;
/// The type of the batch sender channel.
type Sender = watch::Sender<Option<VerifyResult>>;
/// The type of the batch item.
/// This is an `Ed25519Item`.
pub type Item = batch::Item;
/// Global batch verification context for Ed25519 signatures.
///
/// This service transparently batches contemporaneous signature verifications,
@ -29,7 +42,7 @@ use zebra_chain::primitives::ed25519::{batch, *};
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> Ready<Result<(), Error>>>>,
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
@ -37,43 +50,101 @@ pub static VERIFIER: Lazy<
super::MAX_BATCH_SIZE,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification
// fails, so we need a Service to use. The obvious way to do this would
// be to write a closure that returns an async block. But because we
// have to specify the type of a static, we need to be able to write the
// type of the closure and its return value, and both closures and async
// blocks have eldritch types whose names cannot be written. So instead,
// we use a Ready to avoid an async block and cast the closure to a
// function (which is possible because it doesn't capture any state).
tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _),
// We want to fallback to individual verification if batch verification fails,
// so we need a Service to use.
//
// Because we have to specify the type of a static, we need to be able to
// write the type of the closure and its return value. But both closures and
// async blocks have unnameable types. So instead we cast the closure to a function
// (which is possible because it doesn't capture any state), and use a BoxFuture
// to erase the result type.
// (We can't use BoxCloneService to erase the service type, because it is !Sync.)
tower::service_fn(
(|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
),
)
});
/// Ed25519 signature verifier service
pub struct Verifier {
batch: batch::Verifier,
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also
// provides a spmc channel, "watch", but it only keeps the latest value, so
// using it would require thinking through whether it was possible for
// results from one batch to be mixed with another.
tx: Sender<Result<(), Error>>,
/// A batch verifier for ed25519 signatures.
batch: BatchVerifier,
/// A channel for broadcasting the result of a batch to the futures for each batch item.
///
/// Each batch gets a newly created channel, so there is only ever one result sent per channel.
/// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
tx: Sender,
}
impl Default for Verifier {
fn default() -> Self {
let batch = batch::Verifier::default();
let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE);
let batch = BatchVerifier::default();
let (tx, _) = watch::channel(None);
Self { batch, tx }
}
}
/// Type alias to clarify that this `batch::Item` is a `Ed25519Item`
pub type Item = batch::Item;
impl Verifier {
/// Returns the batch verifier and channel sender from `self`,
/// replacing them with a new empty batch.
fn take(&mut self) -> (BatchVerifier, Sender) {
// Use a new verifier and channel for each batch.
let batch = mem::take(&mut self.batch);
let (tx, _) = watch::channel(None);
let tx = mem::replace(&mut self.tx, tx);
(batch, tx)
}
/// Synchronously process the batch, and send the result using the channel sender.
/// This function blocks until the batch is completed.
fn verify(batch: BatchVerifier, tx: Sender) {
let result = batch.verify(thread_rng());
let _ = tx.send(Some(result));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function blocks until the batch is completed on the thread pool.
fn flush_blocking(&mut self) {
let (batch, tx) = self.take();
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: replace with the rayon thread pool
tokio::task::block_in_place(|| Self::verify(batch, tx));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future<Output = ()> {
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| Self::verify(batch, tx))
.map(|join_result| join_result.expect("panic in ed25519 batch verifier"))
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(item: Item) -> impl Future<Output = VerifyResult> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| item.verify_single())
.map(|join_result| join_result.expect("panic in ed25519 fallback verifier"))
}
}
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -85,9 +156,14 @@ impl Service<BatchControl<Item>> for Verifier {
tracing::trace!("got ed25519 item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.recv().await {
Ok(result) => {
match rx.changed().await {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx.borrow().expect("completed batch must send a value");
if result.is_ok() {
tracing::trace!(?result, "validated ed25519 signature");
metrics::counter!("signatures.ed25519.validated", 1);
@ -97,24 +173,17 @@ impl Service<BatchControl<Item>> for Verifier {
}
result
}
Err(RecvError::Lagged(_)) => {
tracing::error!(
"ed25519 batch verification receiver lagged and lost verification results"
);
Err(Error::InvalidSignature)
}
Err(RecvError::Closed) => {
panic!("ed25519 verifier was dropped without flushing")
}
Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got ed25519 flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
Box::pin(async { Ok(()) })
let (batch, tx) = self.take();
Box::pin(Self::flush_spawning(batch, tx).map(Ok))
}
}
}
@ -123,7 +192,9 @@ impl Service<BatchControl<Item>> for Verifier {
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
// This blocks the current thread and any futures running on it, until the batch is complete.
//
// TODO: move the batch onto the rayon thread pool, then drop the verifier immediately.
self.flush_blocking();
}
}

View File

@ -33,7 +33,7 @@ where
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_items_test() -> Result<()> {
batch_flushes_on_max_items().await
}
@ -52,7 +52,7 @@ async fn batch_flushes_on_max_items() -> Result<()> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_latency_test() -> Result<()> {
batch_flushes_on_max_latency().await
}

View File

@ -1,7 +1,6 @@
//! Async Groth16 batch verifier service
use std::{
convert::{TryFrom, TryInto},
fmt,
future::Future,
mem,
@ -11,14 +10,14 @@ use std::{
use bellman::{
gadgets::multipack,
groth16::{batch, VerifyingKey},
groth16::{batch, PreparedVerifyingKey, VerifyingKey},
VerificationError,
};
use bls12_381::Bls12;
use futures::future::{ready, Ready};
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use tokio::sync::broadcast::{channel, error::RecvError, Sender};
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch::{Batch, BatchControl};
@ -43,6 +42,27 @@ pub use params::{Groth16Parameters, GROTH16_PARAMETERS};
use crate::error::TransactionError;
/// The type of the batch verifier.
type BatchVerifier = batch::Verifier<Bls12>;
/// The type of verification results.
type VerifyResult = Result<(), VerificationError>;
/// The type of the batch sender channel.
type Sender = watch::Sender<Option<VerifyResult>>;
/// The type of the batch item.
/// This is a Groth16 verification item.
pub type Item = batch::Item<Bls12>;
/// The type of a raw verifying key.
/// This is the key used to verify batches.
pub type BatchVerifyingKey = VerifyingKey<Bls12>;
/// The type of a prepared verifying key.
/// This is the key used to verify individual items.
pub type ItemVerifyingKey = PreparedVerifyingKey<Bls12>;
/// Global batch verification context for Groth16 proofs of Spend statements.
///
/// This service transparently batches contemporaneous proof verifications,
@ -52,7 +72,7 @@ use crate::error::TransactionError;
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static SPEND_VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> Ready<Result<(), VerificationError>>>>,
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
@ -60,17 +80,22 @@ pub static SPEND_VERIFIER: Lazy<
super::MAX_BATCH_SIZE,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification
// fails, so we need a Service to use. The obvious way to do this would
// be to write a closure that returns an async block. But because we
// have to specify the type of a static, we need to be able to write the
// type of the closure and its return value, and both closures and async
// blocks have eldritch types whose names cannot be written. So instead,
// we use a Ready to avoid an async block and cast the closure to a
// function (which is possible because it doesn't capture any state).
// We want to fallback to individual verification if batch verification fails,
// so we need a Service to use.
//
// Because we have to specify the type of a static, we need to be able to
// write the type of the closure and its return value. But both closures and
// async blocks have unnameable types. So instead we cast the closure to a function
// (which is possible because it doesn't capture any state), and use a BoxFuture
// to erase the result type.
// (We can't use BoxCloneService to erase the service type, because it is !Sync.)
tower::service_fn(
(|item: Item| {
ready(item.verify_single(&GROTH16_PARAMETERS.sapling.spend_prepared_verifying_key))
Verifier::verify_single_spawning(
item,
&GROTH16_PARAMETERS.sapling.spend_prepared_verifying_key,
)
.boxed()
}) as fn(_) -> _,
),
)
@ -85,7 +110,7 @@ pub static SPEND_VERIFIER: Lazy<
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static OUTPUT_VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> Ready<Result<(), VerificationError>>>>,
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
@ -94,16 +119,16 @@ pub static OUTPUT_VERIFIER: Lazy<
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification
// fails, so we need a Service to use. The obvious way to do this would
// be to write a closure that returns an async block. But because we
// have to specify the type of a static, we need to be able to write the
// type of the closure and its return value, and both closures and async
// blocks have eldritch types whose names cannot be written. So instead,
// we use a Ready to avoid an async block and cast the closure to a
// function (which is possible because it doesn't capture any state).
// fails, so we need a Service to use.
//
// See the note on [`SPEND_VERIFIER`] for details.
tower::service_fn(
(|item: Item| {
ready(item.verify_single(&GROTH16_PARAMETERS.sapling.output_prepared_verifying_key))
Verifier::verify_single_spawning(
item,
&GROTH16_PARAMETERS.sapling.output_prepared_verifying_key,
)
.boxed()
}) as fn(_) -> _,
),
)
@ -117,25 +142,27 @@ pub static OUTPUT_VERIFIER: Lazy<
/// Note that making a `Service` call requires mutable access to the service, so
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static JOINSPLIT_VERIFIER: Lazy<ServiceFn<fn(Item) -> Ready<Result<(), BoxedError>>>> =
Lazy::new(|| {
// We need a Service to use. The obvious way to do this would
// be to write a closure that returns an async block. But because we
// have to specify the type of a static, we need to be able to write the
// type of the closure and its return value, and both closures and async
// blocks have eldritch types whose names cannot be written. So instead,
// we use a Ready to avoid an async block and cast the closure to a
// function (which is possible because it doesn't capture any state).
tower::service_fn(
(|item: Item| {
ready(
item.verify_single(&GROTH16_PARAMETERS.sprout.joinsplit_prepared_verifying_key)
.map_err(|e| TransactionError::Groth16(e.to_string()))
.map_err(tower_fallback::BoxedError::from),
)
}) as fn(_) -> _,
)
});
pub static JOINSPLIT_VERIFIER: Lazy<
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxedError>>>,
> = Lazy::new(|| {
// We just need a Service to use: there is no batch verification for JoinSplits.
//
// See the note on [`SPEND_VERIFIER`] for details.
tower::service_fn(
(|item: Item| {
Verifier::verify_single_spawning(
item,
&GROTH16_PARAMETERS.sprout.joinsplit_prepared_verifying_key,
)
.map(|result| {
result
.map_err(|e| TransactionError::Groth16(e.to_string()))
.map_err(tower_fallback::BoxedError::from)
})
.boxed()
}) as fn(_) -> _,
)
});
/// A Groth16 Description (JoinSplit, Spend, or Output) with a Groth16 proof
/// and its inputs encoded as scalars.
@ -297,9 +324,6 @@ impl Description for (&JoinSplit<Groth16Proof>, &ed25519::VerificationKeyBytes)
}
}
/// A Groth16 verification item, used as the request type of the service.
pub type Item = batch::Item<Bls12>;
/// A wrapper to allow a TryFrom blanket implementation of the [`Description`]
/// trait for the [`Item`] struct.
/// See <https://github.com/rust-lang/rust/issues/50133> for more details.
@ -334,20 +358,89 @@ where
/// verifier. It handles batching incoming requests, driving batches to
/// completion, and reporting results.
pub struct Verifier {
batch: batch::Verifier<Bls12>,
// Making this 'static makes managing lifetimes much easier.
vk: &'static VerifyingKey<Bls12>,
/// Broadcast sender used to send the result of a batch verification to each
/// request source in the batch.
tx: Sender<Result<(), VerificationError>>,
/// A batch verifier for groth16 proofs.
batch: BatchVerifier,
/// The proof verification key.
///
/// Making this 'static makes managing lifetimes much easier.
vk: &'static BatchVerifyingKey,
/// A channel for broadcasting the result of a batch to the futures for each batch item.
///
/// Each batch gets a newly created channel, so there is only ever one result sent per channel.
/// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
tx: Sender,
}
impl Verifier {
fn new(vk: &'static VerifyingKey<Bls12>) -> Self {
let batch = batch::Verifier::default();
let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE);
/// Create and return a new verifier using the verification key `vk`.
fn new(vk: &'static BatchVerifyingKey) -> Self {
let batch = BatchVerifier::default();
let (tx, _) = watch::channel(None);
Self { batch, vk, tx }
}
/// Returns the batch verifier and channel sender from `self`,
/// replacing them with a new empty batch.
fn take(&mut self) -> (BatchVerifier, &'static BatchVerifyingKey, Sender) {
// Use a new verifier and channel for each batch.
let batch = mem::take(&mut self.batch);
let (tx, _) = watch::channel(None);
let tx = mem::replace(&mut self.tx, tx);
(batch, self.vk, tx)
}
/// Synchronously process the batch, and send the result using the channel sender.
/// This function blocks until the batch is completed.
fn verify(batch: BatchVerifier, vk: &'static BatchVerifyingKey, tx: Sender) {
let result = batch.verify(thread_rng(), vk);
let _ = tx.send(Some(result));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function blocks until the batch is completed on the thread pool.
fn flush_blocking(&mut self) {
let (batch, vk, tx) = self.take();
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: replace with the rayon thread pool
tokio::task::block_in_place(|| Self::verify(batch, vk, tx));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(
batch: BatchVerifier,
vk: &'static BatchVerifyingKey,
tx: Sender,
) -> impl Future<Output = ()> {
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(move || Self::verify(batch, vk, tx))
.map(|join_result| join_result.expect("panic in groth16 batch verifier"))
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(
item: Item,
pvk: &'static ItemVerifyingKey,
) -> impl Future<Output = VerifyResult> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(move || item.verify_single(pvk))
.map(|join_result| join_result.expect("panic in groth16 fallback verifier"))
}
}
impl fmt::Debug for Verifier {
@ -364,7 +457,7 @@ impl fmt::Debug for Verifier {
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = VerificationError;
type Future = Pin<Box<dyn Future<Output = Result<(), VerificationError>> + Send + 'static>>;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -376,9 +469,18 @@ impl Service<BatchControl<Item>> for Verifier {
tracing::trace!("got item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.recv().await {
Ok(result) => {
match rx.changed().await {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx
.borrow()
.as_ref()
.expect("completed batch must send a value")
.clone();
if result.is_ok() {
tracing::trace!(?result, "verified groth16 proof");
metrics::counter!("proofs.groth16.verified", 1);
@ -389,22 +491,17 @@ impl Service<BatchControl<Item>> for Verifier {
result
}
Err(RecvError::Lagged(_)) => {
tracing::error!(
"missed channel updates, BROADCAST_BUFFER_SIZE is too low!!"
);
Err(VerificationError::InvalidProof)
}
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng(), self.vk));
Box::pin(async { Ok(()) })
tracing::trace!("got groth16 flush command");
let (batch, vk, tx) = self.take();
Box::pin(Self::flush_spawning(batch, vk, tx).map(Ok))
}
}
}
@ -413,7 +510,9 @@ impl Service<BatchControl<Item>> for Verifier {
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng(), self.vk));
// This blocks the current thread and any futures running on it, until the batch is complete.
//
// TODO: move the batch onto the rayon thread pool, then drop the verifier immediately.
self.flush_blocking()
}
}

View File

@ -1,8 +1,9 @@
//! Tests for transaction verification
use std::convert::TryInto;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::{
future::ready,
stream::{FuturesUnordered, StreamExt},
};
use hex::FromHex;
use tower::ServiceExt;
@ -67,7 +68,7 @@ where
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn verify_sapling_groth16() {
// Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390)
let mut spend_verifier = Fallback::new(
@ -170,7 +171,7 @@ where
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn correctly_err_on_invalid_output_proof() {
// Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390).
// Also, since we expect these to fail, we don't want to slow down the communal verifiers.
@ -246,7 +247,7 @@ where
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn verify_sprout_groth16() {
let mut verifier = tower::service_fn(
(|item: Item| {
@ -309,7 +310,7 @@ where
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn verify_sprout_groth16_vector() {
let mut verifier = tower::service_fn(
(|item: Item| {
@ -431,7 +432,7 @@ where
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn correctly_err_on_invalid_joinsplit_proof() {
// Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390).
// Also, since we expect these to fail, we don't want to slow down the communal verifiers.

View File

@ -1,7 +1,6 @@
//! Async Halo2 batch verifier service
use std::{
convert::TryFrom,
fmt,
future::Future,
mem,
@ -9,12 +8,12 @@ use std::{
task::{Context, Poll},
};
use futures::future::{ready, Ready};
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use orchard::circuit::VerifyingKey;
use rand::{thread_rng, CryptoRng, RngCore};
use thiserror::Error;
use tokio::sync::broadcast::{channel, error::RecvError, Sender};
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch::{Batch, BatchControl};
use tower_fallback::Fallback;
@ -22,8 +21,56 @@ use tower_fallback::Fallback;
#[cfg(test)]
mod tests;
/// Adjusted batch size for halo2 batches.
///
/// Unlike other batch verifiers, halo2 has aggregate proofs.
/// This means that there can be hundreds of actions verified by some proofs,
/// but just one action in others.
///
/// To compensate for larger proofs, we decrease the batch size.
///
/// We also decrease the batch size for these reasons:
/// - the default number of actions in `zcashd` is 2,
/// - halo2 proofs take longer to verify than Sapling proofs, and
/// - transactions with many actions generate very large proofs.
///
/// # TODO
///
/// Count each halo2 action as a batch item.
/// We could increase the batch item count by the action count each time a batch request
/// is received, which would reduce batch size, but keep the batch queue size larger.
const HALO2_MAX_BATCH_SIZE: usize = 2;
/* TODO: implement batch verification
/// The type of the batch verifier.
type BatchVerifier = plonk::BatchVerifier<vesta::Affine>;
*/
/// The type of verification results.
type VerifyResult = Result<(), Halo2Error>;
/// The type of the batch sender channel.
type Sender = watch::Sender<Option<VerifyResult>>;
/* TODO: implement batch verification
/// The type of a raw verifying key.
/// This is the key used to verify batches.
pub type BatchVerifyingKey = VerifyingKey<vesta::Affine>;
*/
/// Temporary substitute type for fake batch verification.
///
/// TODO: implement batch verification
pub type BatchVerifyingKey = ItemVerifyingKey;
/// The type of a prepared verifying key.
/// This is the key used to verify individual items.
pub type ItemVerifyingKey = VerifyingKey;
lazy_static::lazy_static! {
pub static ref VERIFYING_KEY: VerifyingKey = VerifyingKey::build();
/// The halo2 proof verifying key.
pub static ref VERIFYING_KEY: ItemVerifyingKey = ItemVerifyingKey::build();
}
// === TEMPORARY BATCH HALO2 SUBSTITUTE ===
@ -45,25 +92,28 @@ impl Item {
///
/// This is useful (in combination with `Item::clone`) for implementing
/// fallback logic when batch verification fails.
pub fn verify_single(&self, vk: &VerifyingKey) -> Result<(), halo2::plonk::Error> {
pub fn verify_single(&self, vk: &ItemVerifyingKey) -> Result<(), halo2::plonk::Error> {
self.proof.verify(vk, &self.instances[..])
}
}
/// A fake batch verifier that queues and verifies halo2 proofs.
#[derive(Default)]
pub struct BatchVerifier {
queue: Vec<Item>,
}
impl BatchVerifier {
/// Queues an item for fake batch verification.
pub fn queue(&mut self, item: Item) {
self.queue.push(item);
}
/// Verifies the current fake batch.
pub fn verify<R: RngCore + CryptoRng>(
self,
_rng: R,
vk: &VerifyingKey,
vk: &ItemVerifyingKey,
) -> Result<(), halo2::plonk::Error> {
for item in self.queue {
item.verify_single(vk)?;
@ -121,6 +171,7 @@ impl From<&zebra_chain::orchard::ShieldedData> for Item {
// remove this and just wrap `halo2::plonk::Error` as an enum variant of
// `crate::transaction::Error`, which does the trait derivation via `thiserror`
#[derive(Clone, Debug, Error, Eq, PartialEq)]
#[allow(missing_docs)]
pub enum Halo2Error {
#[error("the constraint system is not satisfied")]
ConstraintSystemFailure,
@ -145,26 +196,26 @@ impl From<halo2::plonk::Error> for Halo2Error {
/// Note that making a `Service` call requires mutable access to the service, so
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
#[allow(dead_code)]
pub static VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> Ready<Result<(), Halo2Error>>>>,
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
Verifier::new(&VERIFYING_KEY),
super::MAX_BATCH_SIZE,
HALO2_MAX_BATCH_SIZE,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification
// fails, so we need a Service to use. The obvious way to do this would
// be to write a closure that returns an async block. But because we
// have to specify the type of a static, we need to be able to write the
// type of the closure and its return value, and both closures and async
// blocks have eldritch types whose names cannot be written. So instead,
// we use a Ready to avoid an async block and cast the closure to a
// function (which is possible because it doesn't capture any state).
// We want to fallback to individual verification if batch verification fails,
// so we need a Service to use.
//
// Because we have to specify the type of a static, we need to be able to
// write the type of the closure and its return value. But both closures and
// async blocks have unnameable types. So instead we cast the closure to a function
// (which is possible because it doesn't capture any state), and use a BoxFuture
// to erase the result type.
// (We can't use BoxCloneService to erase the service type, because it is !Sync.)
tower::service_fn(
(|item: Item| ready(item.verify_single(&VERIFYING_KEY).map_err(Halo2Error::from)))
(|item: Item| Verifier::verify_single_spawning(item, &VERIFYING_KEY).boxed())
as fn(_) -> _,
),
)
@ -176,22 +227,88 @@ pub static VERIFIER: Lazy<
/// Halo2 verifier. It handles batching incoming requests, driving batches to
/// completion, and reporting results.
pub struct Verifier {
/// The sync Halo2 batch verifier.
/// The synchronous Halo2 batch verifier.
batch: BatchVerifier,
// Making this 'static makes managing lifetimes much easier.
vk: &'static VerifyingKey,
/// Broadcast sender used to send the result of a batch verification to each
/// request source in the batch.
tx: Sender<Result<(), Halo2Error>>,
/// The halo2 proof verification key.
///
/// Making this 'static makes managing lifetimes much easier.
vk: &'static ItemVerifyingKey,
/// A channel for broadcasting the result of a batch to the futures for each batch item.
///
/// Each batch gets a newly created channel, so there is only ever one result sent per channel.
/// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
tx: Sender,
}
impl Verifier {
#[allow(dead_code)]
fn new(vk: &'static VerifyingKey) -> Self {
fn new(vk: &'static ItemVerifyingKey) -> Self {
let batch = BatchVerifier::default();
let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE);
let (tx, _) = watch::channel(None);
Self { batch, vk, tx }
}
/// Returns the batch verifier and channel sender from `self`,
/// replacing them with a new empty batch.
fn take(&mut self) -> (BatchVerifier, &'static BatchVerifyingKey, Sender) {
// Use a new verifier and channel for each batch.
let batch = mem::take(&mut self.batch);
let (tx, _) = watch::channel(None);
let tx = mem::replace(&mut self.tx, tx);
(batch, self.vk, tx)
}
/// Synchronously process the batch, and send the result using the channel sender.
/// This function blocks until the batch is completed.
fn verify(batch: BatchVerifier, vk: &'static BatchVerifyingKey, tx: Sender) {
let result = batch.verify(thread_rng(), vk).map_err(Halo2Error::from);
let _ = tx.send(Some(result));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function blocks until the batch is completed on the thread pool.
fn flush_blocking(&mut self) {
let (batch, vk, tx) = self.take();
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: replace with the rayon thread pool
tokio::task::block_in_place(|| Self::verify(batch, vk, tx));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(
batch: BatchVerifier,
vk: &'static BatchVerifyingKey,
tx: Sender,
) -> impl Future<Output = ()> {
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(move || Self::verify(batch, vk, tx))
.map(|join_result| join_result.expect("panic in halo2 batch verifier"))
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(
item: Item,
pvk: &'static ItemVerifyingKey,
) -> impl Future<Output = VerifyResult> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(move || item.verify_single(pvk).map_err(Halo2Error::from))
.map(|join_result| join_result.expect("panic in halo2 fallback verifier"))
}
}
impl fmt::Debug for Verifier {
@ -208,7 +325,7 @@ impl fmt::Debug for Verifier {
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Halo2Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Halo2Error>> + Send + 'static>>;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -221,8 +338,16 @@ impl Service<BatchControl<Item>> for Verifier {
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.recv().await {
Ok(result) => {
match rx.changed().await {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx
.borrow()
.as_ref()
.expect("completed batch must send a value")
.clone();
if result.is_ok() {
tracing::trace!(?result, "verified halo2 proof");
metrics::counter!("proofs.halo2.verified", 1);
@ -233,29 +358,17 @@ impl Service<BatchControl<Item>> for Verifier {
result
}
Err(RecvError::Lagged(_)) => {
tracing::error!(
"missed channel updates, BROADCAST_BUFFER_SIZE is too low!!"
);
// This is the enum variant that
// orchard::circuit::Proof.verify() returns on
// evaluation failure.
Err(Halo2Error::ConstraintSystemFailure)
}
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(
batch
.verify(thread_rng(), self.vk)
.map_err(Halo2Error::from),
);
Box::pin(async { Ok(()) })
tracing::trace!("got halo2 flush command");
let (batch, vk, tx) = self.take();
Box::pin(Self::flush_spawning(batch, vk, tx).map(Ok))
}
}
}
@ -264,11 +377,9 @@ impl Service<BatchControl<Item>> for Verifier {
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(
batch
.verify(thread_rng(), self.vk)
.map_err(Halo2Error::from),
);
// This blocks the current thread and any futures running on it, until the batch is complete.
//
// TODO: move the batch onto the rayon thread pool, then drop the verifier immediately.
self.flush_blocking()
}
}

View File

@ -1,6 +1,6 @@
//! Tests for verifying simple Halo2 proofs with the async verifier
use std::convert::TryInto;
use std::future::ready;
use futures::stream::{FuturesUnordered, StreamExt};
use tower::ServiceExt;
@ -130,7 +130,7 @@ where
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn verify_generated_halo2_proofs() {
zebra_test::init();
@ -196,7 +196,7 @@ where
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn correctly_err_on_invalid_halo2_proofs() {
zebra_test::init();

View File

@ -1,8 +1,5 @@
//! Async RedJubjub batch verifier service
#[cfg(test)]
mod tests;
use std::{
future::Future,
mem,
@ -10,16 +7,33 @@ use std::{
task::{Context, Poll},
};
use futures::future::{ready, Ready};
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use tokio::sync::broadcast::{channel, error::RecvError, Sender};
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch::{Batch, BatchControl};
use tower_fallback::Fallback;
use zebra_chain::primitives::redjubjub::{batch, *};
#[cfg(test)]
mod tests;
/// The type of the batch verifier.
type BatchVerifier = batch::Verifier;
/// The type of verification results.
type VerifyResult = Result<(), Error>;
/// The type of the batch sender channel.
type Sender = watch::Sender<Option<VerifyResult>>;
/// The type of the batch item.
/// This is a `RedJubjubItem`.
pub type Item = batch::Item;
/// Global batch verification context for RedJubjub signatures.
///
/// This service transparently batches contemporaneous signature verifications,
@ -29,7 +43,7 @@ use zebra_chain::primitives::redjubjub::{batch, *};
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> Ready<Result<(), Error>>>>,
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
@ -37,43 +51,101 @@ pub static VERIFIER: Lazy<
super::MAX_BATCH_SIZE,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification
// fails, so we need a Service to use. The obvious way to do this would
// be to write a closure that returns an async block. But because we
// have to specify the type of a static, we need to be able to write the
// type of the closure and its return value, and both closures and async
// blocks have eldritch types whose names cannot be written. So instead,
// we use a Ready to avoid an async block and cast the closure to a
// function (which is possible because it doesn't capture any state).
tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _),
// We want to fallback to individual verification if batch verification fails,
// so we need a Service to use.
//
// Because we have to specify the type of a static, we need to be able to
// write the type of the closure and its return value. But both closures and
// async blocks have unnameable types. So instead we cast the closure to a function
// (which is possible because it doesn't capture any state), and use a BoxFuture
// to erase the result type.
// (We can't use BoxCloneService to erase the service type, because it is !Sync.)
tower::service_fn(
(|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
),
)
});
/// RedJubjub signature verifier service
pub struct Verifier {
batch: batch::Verifier,
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also
// provides a spmc channel, "watch", but it only keeps the latest value, so
// using it would require thinking through whether it was possible for
// results from one batch to be mixed with another.
tx: Sender<Result<(), Error>>,
/// A batch verifier for RedJubjub signatures.
batch: BatchVerifier,
/// A channel for broadcasting the result of a batch to the futures for each batch item.
///
/// Each batch gets a newly created channel, so there is only ever one result sent per channel.
/// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
tx: Sender,
}
impl Default for Verifier {
fn default() -> Self {
let batch = batch::Verifier::default();
let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE);
let batch = BatchVerifier::default();
let (tx, _) = watch::channel(None);
Self { batch, tx }
}
}
/// Type alias to clarify that this batch::Item is a RedJubjubItem
pub type Item = batch::Item;
impl Verifier {
/// Returns the batch verifier and channel sender from `self`,
/// replacing them with a new empty batch.
fn take(&mut self) -> (BatchVerifier, Sender) {
// Use a new verifier and channel for each batch.
let batch = mem::take(&mut self.batch);
let (tx, _) = watch::channel(None);
let tx = mem::replace(&mut self.tx, tx);
(batch, tx)
}
/// Synchronously process the batch, and send the result using the channel sender.
/// This function blocks until the batch is completed.
fn verify(batch: BatchVerifier, tx: Sender) {
let result = batch.verify(thread_rng());
let _ = tx.send(Some(result));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function blocks until the batch is completed on the thread pool.
fn flush_blocking(&mut self) {
let (batch, tx) = self.take();
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: replace with the rayon thread pool
tokio::task::block_in_place(|| Self::verify(batch, tx));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future<Output = ()> {
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| Self::verify(batch, tx))
.map(|join_result| join_result.expect("panic in redjubjub batch verifier"))
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(item: Item) -> impl Future<Output = VerifyResult> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| item.verify_single())
.map(|join_result| join_result.expect("panic in redjubjub fallback verifier"))
}
}
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -85,9 +157,14 @@ impl Service<BatchControl<Item>> for Verifier {
tracing::trace!("got item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.recv().await {
Ok(result) => {
match rx.changed().await {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx.borrow().expect("completed batch must send a value");
if result.is_ok() {
tracing::trace!(?result, "validated redjubjub signature");
metrics::counter!("signatures.redjubjub.validated", 1);
@ -98,22 +175,17 @@ impl Service<BatchControl<Item>> for Verifier {
result
}
Err(RecvError::Lagged(_)) => {
tracing::error!(
"batch verification receiver lagged and lost verification results"
);
Err(Error::InvalidSignature)
}
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
Box::pin(async { Ok(()) })
tracing::trace!("got redjubjub flush command");
let (batch, tx) = self.take();
Box::pin(Self::flush_spawning(batch, tx).map(Ok))
}
}
}
@ -122,7 +194,9 @@ impl Service<BatchControl<Item>> for Verifier {
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
// This blocks the current thread and any futures running on it, until the batch is complete.
//
// TODO: move the batch onto the rayon thread pool, then drop the verifier immediately.
self.flush_blocking();
}
}

View File

@ -45,7 +45,7 @@ where
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_items_test() -> Result<()> {
batch_flushes_on_max_items().await
}
@ -64,7 +64,7 @@ async fn batch_flushes_on_max_items() -> Result<()> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_latency_test() -> Result<()> {
batch_flushes_on_max_latency().await
}

View File

@ -1,8 +1,5 @@
//! Async RedPallas batch verifier service
#[cfg(test)]
mod tests;
use std::{
future::Future,
mem,
@ -10,16 +7,32 @@ use std::{
task::{Context, Poll},
};
use futures::future::{ready, Ready};
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use tokio::sync::broadcast::{channel, error::RecvError, Sender};
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch::{Batch, BatchControl};
use tower_fallback::Fallback;
use zebra_chain::primitives::redpallas::{batch, *};
#[cfg(test)]
mod tests;
/// The type of the batch verifier.
type BatchVerifier = batch::Verifier;
/// The type of verification results.
type VerifyResult = Result<(), Error>;
/// The type of the batch sender channel.
type Sender = watch::Sender<Option<VerifyResult>>;
/// The type of the batch item.
/// This is a `RedPallasItem`.
pub type Item = batch::Item;
/// Global batch verification context for RedPallas signatures.
///
/// This service transparently batches contemporaneous signature verifications,
@ -28,9 +41,8 @@ use zebra_chain::primitives::redpallas::{batch, *};
/// Note that making a `Service` call requires mutable access to the service, so
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
#[allow(dead_code)]
pub static VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> Ready<Result<(), Error>>>>,
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
@ -38,43 +50,101 @@ pub static VERIFIER: Lazy<
super::MAX_BATCH_SIZE,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification
// fails, so we need a Service to use. The obvious way to do this would
// be to write a closure that returns an async block. But because we
// have to specify the type of a static, we need to be able to write the
// type of the closure and its return value, and both closures and async
// blocks have eldritch types whose names cannot be written. So instead,
// we use a Ready to avoid an async block and cast the closure to a
// function (which is possible because it doesn't capture any state).
tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _),
// We want to fallback to individual verification if batch verification fails,
// so we need a Service to use.
//
// Because we have to specify the type of a static, we need to be able to
// write the type of the closure and its return value. But both closures and
// async blocks have unnameable types. So instead we cast the closure to a function
// (which is possible because it doesn't capture any state), and use a BoxFuture
// to erase the result type.
// (We can't use BoxCloneService to erase the service type, because it is !Sync.)
tower::service_fn(
(|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
),
)
});
/// RedPallas signature verifier service
pub struct Verifier {
batch: batch::Verifier,
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also
// provides a spmc channel, "watch", but it only keeps the latest value, so
// using it would require thinking through whether it was possible for
// results from one batch to be mixed with another.
tx: Sender<Result<(), Error>>,
/// A batch verifier for RedPallas signatures.
batch: BatchVerifier,
/// A channel for broadcasting the result of a batch to the futures for each batch item.
///
/// Each batch gets a newly created channel, so there is only ever one result sent per channel.
/// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
tx: Sender,
}
impl Default for Verifier {
fn default() -> Self {
let batch = batch::Verifier::default();
let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE);
let batch = BatchVerifier::default();
let (tx, _) = watch::channel(None);
Self { batch, tx }
}
}
/// Type alias to clarify that this batch::Item is a RedPallasItem
pub type Item = batch::Item;
impl Verifier {
/// Returns the batch verifier and channel sender from `self`,
/// replacing them with a new empty batch.
fn take(&mut self) -> (BatchVerifier, Sender) {
// Use a new verifier and channel for each batch.
let batch = mem::take(&mut self.batch);
let (tx, _) = watch::channel(None);
let tx = mem::replace(&mut self.tx, tx);
(batch, tx)
}
/// Synchronously process the batch, and send the result using the channel sender.
/// This function blocks until the batch is completed.
fn verify(batch: BatchVerifier, tx: Sender) {
let result = batch.verify(thread_rng());
let _ = tx.send(Some(result));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function blocks until the batch is completed on the thread pool.
fn flush_blocking(&mut self) {
let (batch, tx) = self.take();
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: replace with the rayon thread pool
tokio::task::block_in_place(|| Self::verify(batch, tx));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future<Output = ()> {
// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| Self::verify(batch, tx))
.map(|join_result| join_result.expect("panic in redpallas batch verifier"))
}
/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(item: Item) -> impl Future<Output = VerifyResult> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: spawn on the rayon thread pool inside spawn_blocking
tokio::task::spawn_blocking(|| item.verify_single())
.map(|join_result| join_result.expect("panic in redpallas fallback verifier"))
}
}
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -87,8 +157,12 @@ impl Service<BatchControl<Item>> for Verifier {
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.recv().await {
Ok(result) => {
match rx.changed().await {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx.borrow().expect("completed batch must send a value");
if result.is_ok() {
tracing::trace!(?result, "validated redpallas signature");
metrics::counter!("signatures.redpallas.validated", 1);
@ -99,22 +173,17 @@ impl Service<BatchControl<Item>> for Verifier {
result
}
Err(RecvError::Lagged(_)) => {
tracing::error!(
"batch verification receiver lagged and lost verification results"
);
Err(Error::InvalidSignature)
}
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
Box::pin(async { Ok(()) })
tracing::trace!("got redpallas flush command");
let (batch, tx) = self.take();
Box::pin(Self::flush_spawning(batch, tx).map(Ok))
}
}
}
@ -123,7 +192,9 @@ impl Service<BatchControl<Item>> for Verifier {
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
// This blocks the current thread and any futures running on it, until the batch is complete.
//
// TODO: move the batch onto the rayon thread pool, then drop the verifier immediately.
self.flush_blocking();
}
}

View File

@ -45,7 +45,7 @@ where
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_items() -> Result<()> {
use tokio::time::timeout;
@ -59,7 +59,7 @@ async fn batch_flushes_on_max_items() -> Result<()> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_latency() -> Result<()> {
use tokio::time::timeout;

View File

@ -1,8 +1,7 @@
//! Asynchronous verification of transactions.
//!
use std::{
collections::HashMap,
convert::TryInto,
future::Future,
iter::FromIterator,
pin::Pin,

View File

@ -278,61 +278,64 @@ async fn v5_transaction_is_rejected_before_nu5_activation() {
}
}
#[tokio::test]
async fn v5_transaction_is_accepted_after_nu5_activation_mainnet() {
v5_transaction_is_accepted_after_nu5_activation_for_network(Network::Mainnet).await
#[test]
fn v5_transaction_is_accepted_after_nu5_activation_mainnet() {
v5_transaction_is_accepted_after_nu5_activation_for_network(Network::Mainnet)
}
#[tokio::test]
async fn v5_transaction_is_accepted_after_nu5_activation_testnet() {
v5_transaction_is_accepted_after_nu5_activation_for_network(Network::Testnet).await
#[test]
fn v5_transaction_is_accepted_after_nu5_activation_testnet() {
v5_transaction_is_accepted_after_nu5_activation_for_network(Network::Testnet)
}
async fn v5_transaction_is_accepted_after_nu5_activation_for_network(network: Network) {
let nu5 = NetworkUpgrade::Nu5;
let nu5_activation_height = nu5
.activation_height(network)
.expect("NU5 activation height is specified");
fn v5_transaction_is_accepted_after_nu5_activation_for_network(network: Network) {
zebra_test::init();
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let nu5 = NetworkUpgrade::Nu5;
let nu5_activation_height = nu5
.activation_height(network)
.expect("NU5 activation height is specified");
let blocks = match network {
Network::Mainnet => zebra_test::vectors::MAINNET_BLOCKS.iter(),
Network::Testnet => zebra_test::vectors::TESTNET_BLOCKS.iter(),
};
let blocks = match network {
Network::Mainnet => zebra_test::vectors::MAINNET_BLOCKS.iter(),
Network::Testnet => zebra_test::vectors::TESTNET_BLOCKS.iter(),
};
let state_service = service_fn(|_| async { unreachable!("Service should not be called") });
let verifier = Verifier::new(network, state_service);
let state_service = service_fn(|_| async { unreachable!("Service should not be called") });
let verifier = Verifier::new(network, state_service);
let mut transaction = fake_v5_transactions_for_network(network, blocks)
.rev()
.next()
.expect("At least one fake V5 transaction in the test vectors");
if transaction
.expiry_height()
.expect("V5 must have expiry_height")
< nu5_activation_height
{
let expiry_height = transaction.expiry_height_mut();
*expiry_height = nu5_activation_height;
}
let mut transaction = fake_v5_transactions_for_network(network, blocks)
.rev()
.next()
.expect("At least one fake V5 transaction in the test vectors");
if transaction
.expiry_height()
.expect("V5 must have expiry_height")
< nu5_activation_height
{
let expiry_height = transaction.expiry_height_mut();
*expiry_height = nu5_activation_height;
}
let expected_hash = transaction.unmined_id();
let expiry_height = transaction
.expiry_height()
.expect("V5 must have expiry_height");
let expected_hash = transaction.unmined_id();
let expiry_height = transaction
.expiry_height()
.expect("V5 must have expiry_height");
let result = verifier
.oneshot(Request::Block {
transaction: Arc::new(transaction),
known_utxos: Arc::new(HashMap::new()),
height: expiry_height,
time: chrono::MAX_DATETIME,
})
.await;
let result = verifier
.oneshot(Request::Block {
transaction: Arc::new(transaction),
known_utxos: Arc::new(HashMap::new()),
height: expiry_height,
time: chrono::MAX_DATETIME,
})
.await;
assert_eq!(
result.expect("unexpected error response").tx_id(),
expected_hash
);
assert_eq!(
result.expect("unexpected error response").tx_id(),
expected_hash
);
})
}
/// Test if V4 transaction with transparent funds is accepted.
@ -767,7 +770,7 @@ async fn v4_transaction_with_conflicting_transparent_spend_is_rejected() {
#[test]
fn v4_transaction_with_conflicting_sprout_nullifier_inside_joinsplit_is_rejected() {
zebra_test::init();
zebra_test::RUNTIME.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let network = Network::Mainnet;
let network_upgrade = NetworkUpgrade::Canopy;
@ -832,7 +835,7 @@ fn v4_transaction_with_conflicting_sprout_nullifier_inside_joinsplit_is_rejected
#[test]
fn v4_transaction_with_conflicting_sprout_nullifier_across_joinsplits_is_rejected() {
zebra_test::init();
zebra_test::RUNTIME.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let network = Network::Mainnet;
let network_upgrade = NetworkUpgrade::Canopy;
@ -1357,13 +1360,12 @@ async fn v5_transaction_with_conflicting_transparent_spend_is_rejected() {
}
/// Test if signed V4 transaction with a dummy [`sprout::JoinSplit`] is accepted.
/// - Test if an unsigned V4 transaction with a dummy [`sprout::JoinSplit`] is rejected.
///
/// This test verifies if the transaction verifier correctly accepts a signed transaction.
#[test]
fn v4_with_signed_sprout_transfer_is_accepted() {
zebra_test::init();
zebra_test::RUNTIME.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let network = Network::Mainnet;
let (height, transaction) = test_transactions(network)
@ -1396,7 +1398,7 @@ fn v4_with_signed_sprout_transfer_is_accepted() {
result.expect("unexpected error response").tx_id(),
expected_hash
);
});
})
}
/// Test if an V4 transaction with a modified [`sprout::JoinSplit`] is rejected.
@ -1406,7 +1408,7 @@ fn v4_with_signed_sprout_transfer_is_accepted() {
#[test]
fn v4_with_modified_joinsplit_is_rejected() {
zebra_test::init();
zebra_test::RUNTIME.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
v4_with_joinsplit_is_rejected_for_modification(
JoinSplitModification::CorruptSignature,
// TODO: Fix error downcast
@ -1417,17 +1419,19 @@ fn v4_with_modified_joinsplit_is_rejected() {
),
)
.await;
v4_with_joinsplit_is_rejected_for_modification(
JoinSplitModification::CorruptProof,
TransactionError::Groth16("proof verification failed".to_string()),
)
.await;
v4_with_joinsplit_is_rejected_for_modification(
JoinSplitModification::ZeroProof,
TransactionError::MalformedGroth16("invalid G1".to_string()),
)
.await;
});
})
}
async fn v4_with_joinsplit_is_rejected_for_modification(
@ -1470,7 +1474,7 @@ async fn v4_with_joinsplit_is_rejected_for_modification(
#[test]
fn v4_with_sapling_spends() {
zebra_test::init();
zebra_test::RUNTIME.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let network = Network::Mainnet;
let (height, transaction) = test_transactions(network)
@ -1510,7 +1514,7 @@ fn v4_with_sapling_spends() {
#[test]
fn v4_with_duplicate_sapling_spends() {
zebra_test::init();
zebra_test::RUNTIME.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let network = Network::Mainnet;
let (height, mut transaction) = test_transactions(network)
@ -1555,7 +1559,7 @@ fn v4_with_duplicate_sapling_spends() {
#[test]
fn v4_with_sapling_outputs_and_no_spends() {
zebra_test::init();
zebra_test::RUNTIME.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let network = Network::Mainnet;
let (height, transaction) = test_transactions(network)
@ -1591,22 +1595,27 @@ fn v4_with_sapling_outputs_and_no_spends() {
result.expect("unexpected error response").tx_id(),
expected_hash
);
});
})
}
/// Test if a V5 transaction with Sapling spends is accepted by the verifier.
#[test]
// TODO: Remove `should_panic` once V5 transaction verification is complete.
// TODO: add NU5 mainnet test vectors with Sapling spends, then remove should_panic
#[should_panic]
fn v5_with_sapling_spends() {
zebra_test::init();
zebra_test::RUNTIME.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let network = Network::Mainnet;
let nu5_activation = NetworkUpgrade::Nu5.activation_height(network);
let transaction =
fake_v5_transactions_for_network(network, zebra_test::vectors::MAINNET_BLOCKS.iter())
.rev()
.filter(|transaction| !transaction.is_coinbase() && transaction.inputs().is_empty())
.filter(|transaction| {
!transaction.is_coinbase()
&& transaction.inputs().is_empty()
&& transaction.expiry_height() >= nu5_activation
})
.find(|transaction| transaction.sapling_spends_per_anchor().next().is_some())
.expect("No transaction found with Sapling spends");
@ -1642,7 +1651,7 @@ fn v5_with_sapling_spends() {
#[test]
fn v5_with_duplicate_sapling_spends() {
zebra_test::init();
zebra_test::RUNTIME.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let network = Network::Mainnet;
let mut transaction =
@ -1688,7 +1697,7 @@ fn v5_with_duplicate_sapling_spends() {
#[test]
fn v5_with_duplicate_orchard_action() {
zebra_test::init();
zebra_test::RUNTIME.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let network = Network::Mainnet;
// Find a transaction with no inputs or outputs to use as base

View File

@ -1,3 +1,5 @@
//! Randomised property tests for transaction verification.
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use chrono::{DateTime, Duration, Utc};
@ -438,7 +440,7 @@ fn validate(
known_utxos: HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
network: Network,
) -> Result<transaction::Response, TransactionError> {
zebra_test::RUNTIME.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
// Initialize the verifier
let state_service =
tower::service_fn(|_| async { unreachable!("State service should not be called") });

View File

@ -26,6 +26,7 @@ pub mod zip0243;
pub mod zip0244;
/// A single-threaded Tokio runtime that can be shared between tests.
/// This runtime should be used for tests that need a single thread for consistent timings.
///
/// This shared runtime should be used in tests that use shared background tasks. An example is
/// with shared global `Lazy<BatchVerifier>` types, because they spawn a background task when they
@ -40,13 +41,24 @@ pub mod zip0244;
/// at a time, there's a risk of a test finishing while the timer is paused (due to a test failure,
/// for example) and that means that the next test will already start with an incorrect timer
/// state.
pub static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
pub static SINGLE_THREADED_RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime")
});
/// A multi-threaded Tokio runtime that can be shared between tests.
/// This runtime should be used for tests that spawn blocking threads.
///
/// See [`SINGLE_THREADED_RUNTIME`] for details.
pub static MULTI_THREADED_RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime")
});
static INIT: Once = Once::new();
/// Initialize global and thread-specific settings for tests,
@ -134,7 +146,8 @@ pub fn init() {
///
/// This is generally used in proptests, which don't support the `#[tokio::test]` attribute.
///
/// If a runtime needs to be shared between tests, use the [`RUNTIME`] instance instead.
/// If a runtime needs to be shared between tests, use the [`SINGLE_THREADED_RUNTIME`] or
/// [`MULTI_THREADED_RUNTIME`] instances instead.
///
/// See also the [`init`] function, which is called by this function.
pub fn init_async() -> tokio::runtime::Runtime {