//! Async Groth16 batch verifier service use std::{ fmt, future::Future, mem, pin::Pin, task::{Context, Poll}, }; use bellman::{ groth16::{batch, prepare_verifying_key, VerifyingKey}, VerificationError, }; use bls12_381::Bls12; use futures::future::{ready, Ready}; use once_cell::sync::Lazy; use rand::thread_rng; use tokio::sync::broadcast::{channel, error::RecvError, Sender}; use tower::{util::ServiceFn, Service}; use tower_batch::{Batch, BatchControl}; use tower_fallback::Fallback; use zebra_chain::sapling::{Output, Spend}; mod hash_reader; mod params; #[cfg(test)] mod tests; use self::hash_reader::HashReader; use params::PARAMS; /// Global batch verification context for Groth16 proofs of Spend statements. /// /// This service transparently batches contemporaneous proof verifications, /// handling batch failures by falling back to individual verification. /// /// Note that making a `Service` call requires mutable access to the service, so /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static SPEND_VERIFIER: Lazy< Fallback, ServiceFn Ready>>>, > = Lazy::new(|| { Fallback::new( Batch::new( Verifier::new(&PARAMS.sapling.spend.vk), super::MAX_BATCH_SIZE, super::MAX_BATCH_LATENCY, ), // We want to fallback to individual verification if batch verification // fails, so we need a Service to use. The obvious way to do this would // be to write a closure that returns an async block. But because we // have to specify the type of a static, we need to be able to write the // type of the closure and its return value, and both closures and async // blocks have eldritch types whose names cannot be written. So instead, // we use a Ready to avoid an async block and cast the closure to a // function (which is possible because it doesn't capture any state). tower::service_fn( (|item: Item| { ready(item.verify_single(&prepare_verifying_key(&PARAMS.sapling.spend.vk))) }) as fn(_) -> _, ), ) }); /// Global batch verification context for Groth16 proofs of Output statements. /// /// This service transparently batches contemporaneous proof verifications, /// handling batch failures by falling back to individual verification. /// /// Note that making a `Service` call requires mutable access to the service, so /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static OUTPUT_VERIFIER: Lazy< Fallback, ServiceFn Ready>>>, > = Lazy::new(|| { Fallback::new( Batch::new( Verifier::new(&PARAMS.sapling.output.vk), super::MAX_BATCH_SIZE, super::MAX_BATCH_LATENCY, ), // We want to fallback to individual verification if batch verification // fails, so we need a Service to use. The obvious way to do this would // be to write a closure that returns an async block. But because we // have to specify the type of a static, we need to be able to write the // type of the closure and its return value, and both closures and async // blocks have eldritch types whose names cannot be written. So instead, // we use a Ready to avoid an async block and cast the closure to a // function (which is possible because it doesn't capture any state). tower::service_fn( (|item: Item| { ready(item.verify_single(&prepare_verifying_key(&PARAMS.sapling.output.vk))) }) as fn(_) -> _, ), ) }); /// A Groth16 verification item, used as the request type of the service. pub type Item = batch::Item; pub struct ItemWrapper(Item); impl From<&Spend> for ItemWrapper { fn from(spend: &Spend) -> Self { Self(Item::from(( bellman::groth16::Proof::read(&spend.zkproof.0[..]).unwrap(), spend.primary_inputs(), ))) } } impl From<&Output> for ItemWrapper { fn from(output: &Output) -> Self { Self(Item::from(( bellman::groth16::Proof::read(&output.zkproof.0[..]).unwrap(), output.primary_inputs(), ))) } } impl From for Item { fn from(item_wrapper: ItemWrapper) -> Self { item_wrapper.0 } } /// Groth16 signature verifier implementation /// /// This is the core implementation for the batch verification logic of the groth /// verifier. It handles batching incoming requests, driving batches to /// completion, and reporting results. pub struct Verifier { batch: batch::Verifier, // Making this 'static makes managing lifetimes much easier. vk: &'static VerifyingKey, /// Broadcast sender used to send the result of a batch verification to each /// request source in the batch. tx: Sender>, } impl Verifier { fn new(vk: &'static VerifyingKey) -> Self { let batch = batch::Verifier::default(); let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE); Self { batch, vk, tx } } } impl fmt::Debug for Verifier { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let name = "Verifier"; f.debug_struct(name) .field("batch", &"..") .field("vk", &"..") .field("tx", &self.tx) .finish() } } impl Service> for Verifier { type Response = (); type Error = VerificationError; type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: BatchControl) -> Self::Future { match req { BatchControl::Item(item) => { tracing::trace!("got item"); self.batch.queue(item); let mut rx = self.tx.subscribe(); Box::pin(async move { match rx.recv().await { Ok(result) => { if result.is_ok() { tracing::trace!(?result, "verified groth16 proof"); metrics::counter!("proofs.groth16.verified", 1); } else { tracing::trace!(?result, "invalid groth16 proof"); metrics::counter!("proofs.groth16.invalid", 1); } result } Err(RecvError::Lagged(_)) => { tracing::error!( "missed channel updates, BROADCAST_BUFFER_SIZE is too low!!" ); Err(VerificationError::InvalidProof) } Err(RecvError::Closed) => panic!("verifier was dropped without flushing"), } }) } BatchControl::Flush => { tracing::trace!("got flush command"); let batch = mem::take(&mut self.batch); let _ = self.tx.send(batch.verify(thread_rng(), self.vk)); Box::pin(async { Ok(()) }) } } } } impl Drop for Verifier { fn drop(&mut self) { // We need to flush the current batch in case there are still any pending futures. let batch = mem::take(&mut self.batch); let _ = self.tx.send(batch.verify(thread_rng(), self.vk)); } }