From 5bcef64514b41d5a5393225fbaa0292fc1b731ae Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Mon, 15 Jun 2020 23:40:11 -0700 Subject: [PATCH] Add Ed25519 batch verification example test. This test doesn't compile in a way that reveals a problem with the design. The verification service takes a `Request<'msg>` parameterized by the message lifetime, and returns a future unconstrained by the message lifetime (it hashes upfront to avoid requiring that `'msg` outlive `call`). But the `Batch` middleware has the verification service working on its own task, so how can we ensure that the message lives long enough to be read by the worker task? --- Cargo.lock | 2 + tower-batch/Cargo.toml | 5 ++ tower-batch/tests/ed25519.rs | 142 +++++++++++++++++++++++++++++++++++ 3 files changed, 149 insertions(+) create mode 100644 tower-batch/tests/ed25519.rs diff --git a/Cargo.lock b/Cargo.lock index 45ac6fdde..311101b63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1927,9 +1927,11 @@ dependencies = [ name = "tower-batch" version = "0.1.0" dependencies = [ + "ed25519-zebra", "futures", "futures-core", "pin-project", + "rand 0.7.3", "tokio", "tower", "tracing", diff --git a/tower-batch/Cargo.toml b/tower-batch/Cargo.toml index 4419b90e9..e54a07cf0 100644 --- a/tower-batch/Cargo.toml +++ b/tower-batch/Cargo.toml @@ -13,3 +13,8 @@ pin-project = "0.4.20" tracing = "0.1.15" tracing-futures = "0.2.4" futures = "0.3.5" + +[dev-dependencies] +ed25519-zebra = "0.3" +rand = "0.7" +tokio = { version = "0.2", features = ["full"]} diff --git a/tower-batch/tests/ed25519.rs b/tower-batch/tests/ed25519.rs new file mode 100644 index 000000000..fc20e6d90 --- /dev/null +++ b/tower-batch/tests/ed25519.rs @@ -0,0 +1,142 @@ +use std::{ + convert::TryFrom, + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use ed25519_zebra::*; +use futures::stream::{FuturesUnordered, StreamExt}; +use rand::thread_rng; +use tokio::sync::broadcast::{channel, RecvError, Sender}; +use tower::{Service, ServiceExt}; +use tower_batch::{Batch, BatchControl}; + +// ============ service impl ============ + +pub struct Ed25519Verifier { + batch: BatchVerifier, + // This uses a "broadcast" channel, which is an mpmc channel. Tokio also + // provides a spmc channel, "watch", but it only keeps the latest value, so + // using it would require thinking through whether it was possible for + // results from one batch to be mixed with another. + tx: Sender>, +} + +impl Ed25519Verifier { + pub fn new() -> Self { + let batch = BatchVerifier::default(); + let (tx, _) = channel(1); + Self { tx, batch } + } +} + +type Request<'msg> = (VerificationKeyBytes, Signature, &'msg [u8]); + +impl<'msg> Service>> for Ed25519Verifier { + type Response = (); + type Error = Error; + type Future = Pin> + Send + 'static>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: BatchControl>) -> Self::Future { + match req { + BatchControl::Item((vk_bytes, sig, msg)) => { + self.batch.queue(vk_bytes, sig, msg); + let mut rx = self.tx.subscribe(); + Box::pin(async move { + match rx.recv().await { + Ok(result) => result, + // this would be bad + Err(RecvError::Lagged(_)) => Err(Error::InvalidSignature), + Err(RecvError::Closed) => panic!("verifier was dropped without flushing"), + } + }) + } + BatchControl::Flush => { + let batch = std::mem::replace(&mut self.batch, BatchVerifier::default()); + let _ = self.tx.send(batch.verify(thread_rng())); + Box::pin(async { Ok(()) }) + } + } + } +} + +impl Drop for Ed25519Verifier { + fn drop(&mut self) { + // We need to flush the current batch in case there are still any pending futures. + let batch = std::mem::replace(&mut self.batch, BatchVerifier::default()); + let _ = self.tx.send(batch.verify(thread_rng())); + } +} + +// =============== testing code ======== + +async fn sign_and_verify(mut verifier: V, n: usize) +where + for<'msg> V: Service>, + for<'msg> >>::Error: + Into>, +{ + let mut results = FuturesUnordered::new(); + for _ in 0..n { + let sk = SigningKey::new(thread_rng()); + let vk_bytes = VerificationKeyBytes::from(&sk); + let msg = b"BatchVerifyTest"; + let sig = sk.sign(&msg[..]); + results.push( + verifier + .ready_and() + .await + .map_err(|e| e.into()) + .unwrap() + .call((vk_bytes, sig, &msg[..])), + ) + } + + while let Some(result) = results.next().await { + assert!(result.is_ok()); + } +} + +#[tokio::test] +async fn individual_verification_with_service_fn() { + let verifier = tower::service_fn(|(vk_bytes, sig, msg): Request| { + let result = VerificationKey::try_from(vk_bytes).and_then(|vk| vk.verify(&sig, msg)); + async move { result } + }); + + sign_and_verify(verifier, 100).await; +} + +#[tokio::test] +async fn batch_flushes_on_max_items() { + use tokio::time::timeout; + + // Use a very long max_latency and a short timeout to check that + // flushing is happening based on hitting max_items. + let verifier = Batch::new(Ed25519Verifier::new(), 10, Duration::from_secs(1000)); + assert!( + timeout(Duration::from_secs(1), sign_and_verify(verifier, 100)) + .await + .is_ok() + ) +} + +#[tokio::test] +async fn batch_flushes_on_max_latency() { + use tokio::time::timeout; + + // Use a very high max_items and a short timeout to check that + // flushing is happening based on hitting max_latency. + let verifier = Batch::new(Ed25519Verifier::new(), 100, Duration::from_millis(500)); + assert!( + timeout(Duration::from_secs(1), sign_and_verify(verifier, 10)) + .await + .is_ok() + ) +}