Add RedJubjub signature verifier service (#460)

Using tower-batch-based async pattern.

Now the Verifier is agnostic of redjubjub SigTypes.  Updated tests to
generate sigs of both types and batch verifies the whole batch.

Resolves #407
This commit is contained in:
Deirdre Connolly 2020-07-08 17:23:00 -04:00 committed by GitHub
parent 83f0747490
commit 2cd58c8325
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 225 additions and 23 deletions

View File

@ -1,16 +1,18 @@
name: Coverage
name: CI
on:
pull_request:
paths:
- '**.rs'
- '**.rs'
push:
branches:
- main
paths:
- '**.rs'
- '**.rs'
jobs:
codecov:
coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

54
Cargo.lock generated
View File

@ -186,7 +186,7 @@ dependencies = [
"block-padding",
"byte-tools",
"byteorder",
"generic-array",
"generic-array 0.12.3",
]
[[package]]
@ -371,7 +371,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d85653f070353a16313d0046f173f70d1aadd5b42600a14de626f0dfb3473a5"
dependencies = [
"byteorder",
"digest",
"digest 0.8.1",
"rand_core 0.5.1",
"serde",
"subtle",
@ -419,7 +419,16 @@ version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
dependencies = [
"generic-array",
"generic-array 0.12.3",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array 0.14.2",
]
[[package]]
@ -652,6 +661,16 @@ dependencies = [
"typenum",
]
[[package]]
name = "generic-array"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac746a5f3bbfdadd6106868134545e684693d54d9d44f6e9588a7d54af0bf980"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.1.14"
@ -1438,6 +1457,20 @@ dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "redjubjub"
version = "0.1.1"
source = "git+https://github.com/ZcashFoundation/redjubjub.git?branch=main#ba256655ddf84dac7ea6281a1b398495e46d232a"
dependencies = [
"blake2b_simd",
"byteorder",
"digest 0.9.0",
"jubjub",
"rand_core 0.5.1",
"serde",
"thiserror",
]
[[package]]
name = "redjubjub"
version = "0.1.1"
@ -1501,7 +1534,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad5112e0dbbb87577bfbc56c42450235e3012ce336e29c5befd7807bd626da4a"
dependencies = [
"block-buffer",
"digest",
"digest 0.8.1",
"opaque-debug",
]
@ -1664,7 +1697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a256f46ea78a0c0d9ff00077504903ac881a1dafdc20da66545699e7776b3e69"
dependencies = [
"block-buffer",
"digest",
"digest 0.8.1",
"fake-simd",
"opaque-debug",
]
@ -2226,6 +2259,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
[[package]]
name = "version_check"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
[[package]]
name = "wait-timeout"
version = "0.2.0"
@ -2346,7 +2385,7 @@ dependencies = [
"proptest",
"proptest-derive",
"rand_core 0.5.1",
"redjubjub",
"redjubjub 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"ripemd160",
"secp256k1",
"serde",
@ -2369,9 +2408,12 @@ dependencies = [
"color-eyre",
"futures",
"futures-util",
"rand 0.7.3",
"redjubjub 0.1.1 (git+https://github.com/ZcashFoundation/redjubjub.git?branch=main)",
"spandoc",
"tokio",
"tower",
"tower-batch",
"tracing",
"tracing-error",
"tracing-futures",

View File

@ -5,25 +5,27 @@ authors = ["Zcash Foundation <zebra@zfnd.org>"]
license = "MIT OR Apache-2.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
chrono = "0.4.13"
futures = "0.3.5"
futures-util = "0.3.5"
rand = "0.7"
redjubjub = { git = "https://github.com/ZcashFoundation/redjubjub.git", branch = "main"}
tokio = { version = "0.2", features = ["time", "sync", "stream"] }
tower = "0.3"
tracing = "0.1.15"
tracing-futures = "0.2.4"
tower-batch = { path = "../tower-batch/" }
zebra-chain = { path = "../zebra-chain" }
zebra-state = { path = "../zebra-state" }
chrono = "0.4.13"
futures-util = "0.3.5"
tokio = { version = "0.2", features = ["sync"] }
tower = "0.3.1"
[dev-dependencies]
zebra-test = { path = "../zebra-test/" }
color-eyre = "0.5"
futures = "0.3.5"
rand = "0.7"
spandoc = "0.1"
tokio = { version = "0.2", features = ["macros", "time"] }
tracing = "0.1.15"
tokio = { version = "0.2", features = ["full"] }
tracing-error = "0.1.2"
tracing-futures = "0.2"
tracing-subscriber = "0.2.7"
zebra-test = { path = "../zebra-test/" }

View File

@ -4,6 +4,7 @@
//! verification.
mod block;
pub mod redjubjub;
mod script;
mod transaction;

View File

@ -0,0 +1,155 @@
//! Async RedJubjub batch verifier service
use std::{
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
};
use rand::thread_rng;
use redjubjub::{batch, *};
use tokio::sync::broadcast::{channel, RecvError, Sender};
use tower::Service;
use tower_batch::BatchControl;
/// RedJubjub signature verifier service
pub struct Verifier {
batch: batch::Verifier,
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also
// provides a spmc channel, "watch", but it only keeps the latest value, so
// using it would require thinking through whether it was possible for
// results from one batch to be mixed with another.
tx: Sender<Result<(), Error>>,
}
#[allow(clippy::new_without_default)]
impl Verifier {
/// Create a new RedJubjubVerifier instance
pub fn new() -> Self {
let batch = batch::Verifier::default();
// XXX(hdevalence) what's a reasonable choice here?
let (tx, _) = channel(10);
Self { tx, batch }
}
}
/// Type alias to clarify that this batch::Item is a RedJubjubItem
pub type Item = batch::Item;
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
match req {
BatchControl::Item(item) => {
tracing::trace!("got item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.recv().await {
Ok(result) => result,
Err(RecvError::Lagged(_)) => {
tracing::warn!(
"missed channel updates for the correct signature batch!"
);
Err(Error::InvalidSignature)
}
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
Box::pin(async { Ok(()) })
}
}
}
}
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use color_eyre::eyre::Result;
use futures::stream::{FuturesUnordered, StreamExt};
use tower::ServiceExt;
use tower_batch::Batch;
async fn sign_and_verify<V>(mut verifier: V, n: usize) -> Result<(), V::Error>
where
V: Service<Item, Response = ()>,
{
let rng = thread_rng();
let mut results = FuturesUnordered::new();
for i in 0..n {
let span = tracing::trace_span!("sig", i);
let msg = b"BatchVerifyTest";
match i % 2 {
0 => {
let sk = SigningKey::<SpendAuth>::new(rng);
let vk = VerificationKey::from(&sk);
let sig = sk.sign(rng, &msg[..]);
verifier.ready_and().await?;
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
}
1 => {
let sk = SigningKey::<Binding>::new(rng);
let vk = VerificationKey::from(&sk);
let sig = sk.sign(rng, &msg[..]);
verifier.ready_and().await?;
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
}
_ => panic!(),
}
}
while let Some(result) = results.next().await {
result?;
}
Ok(())
}
#[tokio::test]
#[spandoc::spandoc]
async fn batch_flushes_on_max_items() -> Result<()> {
use tokio::time::timeout;
// Use a very long max_latency and a short timeout to check that
// flushing is happening based on hitting max_items.
let verifier = Batch::new(Verifier::new(), 10, Duration::from_secs(1000));
timeout(Duration::from_secs(5), sign_and_verify(verifier, 100)).await?
}
#[tokio::test]
#[spandoc::spandoc]
async fn batch_flushes_on_max_latency() -> Result<()> {
use tokio::time::timeout;
// Use a very high max_items and a short timeout to check that
// flushing is happening based on hitting max_latency.
let verifier = Batch::new(Verifier::new(), 100, Duration::from_millis(500));
timeout(Duration::from_secs(5), sign_and_verify(verifier, 10)).await?
}
}