2020-06-15 23:40:11 -07:00
|
|
|
use std::{
|
|
|
|
future::Future,
|
2020-06-16 17:51:50 -07:00
|
|
|
mem,
|
2020-06-15 23:40:11 -07:00
|
|
|
pin::Pin,
|
|
|
|
task::{Context, Poll},
|
|
|
|
time::Duration,
|
|
|
|
};
|
|
|
|
|
2020-07-15 13:27:39 -07:00
|
|
|
use color_eyre::{eyre::eyre, Report};
|
2020-06-15 23:40:11 -07:00
|
|
|
use ed25519_zebra::*;
|
|
|
|
use futures::stream::{FuturesUnordered, StreamExt};
|
|
|
|
use rand::thread_rng;
|
|
|
|
use tokio::sync::broadcast::{channel, RecvError, Sender};
|
|
|
|
use tower::{Service, ServiceExt};
|
|
|
|
use tower_batch::{Batch, BatchControl};
|
2020-07-15 13:27:39 -07:00
|
|
|
use tower_fallback::Fallback;
|
2020-06-15 23:40:11 -07:00
|
|
|
|
|
|
|
// ============ service impl ============
|
|
|
|
|
|
|
|
pub struct Ed25519Verifier {
|
2020-06-16 01:34:31 -07:00
|
|
|
batch: batch::Verifier,
|
2020-06-15 23:40:11 -07:00
|
|
|
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also
|
|
|
|
// provides a spmc channel, "watch", but it only keeps the latest value, so
|
|
|
|
// using it would require thinking through whether it was possible for
|
|
|
|
// results from one batch to be mixed with another.
|
|
|
|
tx: Sender<Result<(), Error>>,
|
|
|
|
}
|
|
|
|
|
2020-06-16 17:51:50 -07:00
|
|
|
#[allow(clippy::new_without_default)]
|
2020-06-15 23:40:11 -07:00
|
|
|
impl Ed25519Verifier {
|
|
|
|
pub fn new() -> Self {
|
2020-06-16 01:34:31 -07:00
|
|
|
let batch = batch::Verifier::default();
|
|
|
|
// XXX(hdevalence) what's a reasonable choice here?
|
|
|
|
let (tx, _) = channel(10);
|
2020-06-15 23:40:11 -07:00
|
|
|
Self { tx, batch }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-16 01:34:31 -07:00
|
|
|
pub type Ed25519Item = batch::Item;
|
2020-06-15 23:40:11 -07:00
|
|
|
|
2020-06-16 01:34:31 -07:00
|
|
|
impl<'msg> Service<BatchControl<Ed25519Item>> for Ed25519Verifier {
|
2020-06-15 23:40:11 -07:00
|
|
|
type Response = ();
|
|
|
|
type Error = Error;
|
|
|
|
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
|
|
|
|
|
|
|
|
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
|
|
Poll::Ready(Ok(()))
|
|
|
|
}
|
|
|
|
|
2020-06-16 01:34:31 -07:00
|
|
|
fn call(&mut self, req: BatchControl<Ed25519Item>) -> Self::Future {
|
2020-06-15 23:40:11 -07:00
|
|
|
match req {
|
2020-06-16 01:34:31 -07:00
|
|
|
BatchControl::Item(item) => {
|
|
|
|
tracing::trace!("got item");
|
|
|
|
self.batch.queue(item);
|
2020-06-15 23:40:11 -07:00
|
|
|
let mut rx = self.tx.subscribe();
|
|
|
|
Box::pin(async move {
|
|
|
|
match rx.recv().await {
|
|
|
|
Ok(result) => result,
|
2020-06-16 01:34:31 -07:00
|
|
|
Err(RecvError::Lagged(_)) => {
|
|
|
|
tracing::warn!(
|
|
|
|
"missed channel updates for the correct signature batch!"
|
|
|
|
);
|
|
|
|
Err(Error::InvalidSignature)
|
|
|
|
}
|
2020-06-15 23:40:11 -07:00
|
|
|
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
BatchControl::Flush => {
|
2020-06-16 01:34:31 -07:00
|
|
|
tracing::trace!("got flush command");
|
2020-06-16 17:51:50 -07:00
|
|
|
let batch = mem::take(&mut self.batch);
|
2020-06-15 23:40:11 -07:00
|
|
|
let _ = self.tx.send(batch.verify(thread_rng()));
|
|
|
|
Box::pin(async { Ok(()) })
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for Ed25519Verifier {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
// We need to flush the current batch in case there are still any pending futures.
|
2020-06-16 17:51:50 -07:00
|
|
|
let batch = mem::take(&mut self.batch);
|
2020-06-15 23:40:11 -07:00
|
|
|
let _ = self.tx.send(batch.verify(thread_rng()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// =============== testing code ========
|
|
|
|
|
2020-07-15 13:27:39 -07:00
|
|
|
async fn sign_and_verify<V>(
|
|
|
|
mut verifier: V,
|
|
|
|
n: usize,
|
|
|
|
bad_index: Option<usize>,
|
|
|
|
) -> Result<(), V::Error>
|
2020-06-15 23:40:11 -07:00
|
|
|
where
|
2020-06-16 01:34:31 -07:00
|
|
|
V: Service<Ed25519Item, Response = ()>,
|
2020-06-15 23:40:11 -07:00
|
|
|
{
|
2020-07-15 13:27:39 -07:00
|
|
|
let results = FuturesUnordered::new();
|
2020-06-16 01:34:31 -07:00
|
|
|
for i in 0..n {
|
|
|
|
let span = tracing::trace_span!("sig", i);
|
2020-06-15 23:40:11 -07:00
|
|
|
let sk = SigningKey::new(thread_rng());
|
|
|
|
let vk_bytes = VerificationKeyBytes::from(&sk);
|
|
|
|
let msg = b"BatchVerifyTest";
|
2020-07-15 13:27:39 -07:00
|
|
|
let sig = if Some(i) == bad_index {
|
|
|
|
sk.sign(b"badmsg")
|
|
|
|
} else {
|
|
|
|
sk.sign(&msg[..])
|
|
|
|
};
|
2020-06-16 01:34:31 -07:00
|
|
|
|
2020-06-17 16:47:46 -07:00
|
|
|
verifier.ready_and().await?;
|
2020-06-16 01:34:31 -07:00
|
|
|
results.push(span.in_scope(|| verifier.call((vk_bytes, sig, msg).into())))
|
2020-06-15 23:40:11 -07:00
|
|
|
}
|
|
|
|
|
2020-07-15 13:27:39 -07:00
|
|
|
let mut numbered_results = results.enumerate();
|
|
|
|
while let Some((i, result)) = numbered_results.next().await {
|
|
|
|
if Some(i) == bad_index {
|
|
|
|
assert!(result.is_err());
|
|
|
|
} else {
|
|
|
|
result?;
|
|
|
|
}
|
2020-06-15 23:40:11 -07:00
|
|
|
}
|
2020-06-17 16:47:46 -07:00
|
|
|
|
|
|
|
Ok(())
|
2020-06-15 23:40:11 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
async fn batch_flushes_on_max_items() {
|
2020-06-15 23:40:11 -07:00
|
|
|
use tokio::time::timeout;
|
2020-06-24 11:30:20 -07:00
|
|
|
zebra_test::init();
|
2020-06-15 23:40:11 -07:00
|
|
|
|
|
|
|
// Use a very long max_latency and a short timeout to check that
|
|
|
|
// flushing is happening based on hitting max_items.
|
2020-06-17 16:47:46 -07:00
|
|
|
let verifier = Batch::new(Ed25519Verifier::new(), 10, Duration::from_secs(1000));
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
assert!(
|
2020-07-15 13:27:39 -07:00
|
|
|
timeout(Duration::from_secs(1), sign_and_verify(verifier, 100, None))
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
.await
|
|
|
|
.is_ok()
|
|
|
|
);
|
2020-06-15 23:40:11 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
async fn batch_flushes_on_max_latency() {
|
2020-06-15 23:40:11 -07:00
|
|
|
use tokio::time::timeout;
|
2020-06-24 11:30:20 -07:00
|
|
|
zebra_test::init();
|
2020-06-15 23:40:11 -07:00
|
|
|
|
|
|
|
// Use a very high max_items and a short timeout to check that
|
|
|
|
// flushing is happening based on hitting max_latency.
|
2020-06-17 16:47:46 -07:00
|
|
|
let verifier = Batch::new(Ed25519Verifier::new(), 100, Duration::from_millis(500));
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
assert!(
|
2020-07-15 13:27:39 -07:00
|
|
|
timeout(Duration::from_secs(1), sign_and_verify(verifier, 10, None))
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
.await
|
|
|
|
.is_ok()
|
|
|
|
);
|
2020-06-15 23:40:11 -07:00
|
|
|
}
|
2020-07-15 13:27:39 -07:00
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn fallback_verification() -> Result<(), Report> {
|
|
|
|
zebra_test::init();
|
|
|
|
|
|
|
|
let verifier = Fallback::new(
|
|
|
|
Batch::new(Ed25519Verifier::new(), 10, Duration::from_millis(100)),
|
|
|
|
tower::service_fn(|item: Ed25519Item| async move { item.verify_single() }),
|
|
|
|
);
|
|
|
|
|
|
|
|
sign_and_verify(verifier, 100, Some(39))
|
|
|
|
.await
|
|
|
|
.map_err(|e| eyre!(e))?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|