zebra/tower-batch/tests/ed25519.rs

172 lines
5.2 KiB
Rust
Raw Normal View History

use std::{
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use color_eyre::{eyre::eyre, Report};
use ed25519_zebra::*;
use futures::stream::{FuturesUnordered, StreamExt};
use rand::thread_rng;
use tokio::sync::broadcast::{channel, error::RecvError, Sender};
use tower::{Service, ServiceExt};
use tower_batch::{Batch, BatchControl};
use tower_fallback::Fallback;
// ============ service impl ============
pub struct Ed25519Verifier {
batch: batch::Verifier,
// This uses a "broadcast" channel, which is an mpmc channel. Tokio also
// provides a spmc channel, "watch", but it only keeps the latest value, so
// using it would require thinking through whether it was possible for
// results from one batch to be mixed with another.
tx: Sender<Result<(), Error>>,
}
#[allow(clippy::new_without_default)]
impl Ed25519Verifier {
pub fn new() -> Self {
let batch = batch::Verifier::default();
// XXX(hdevalence) what's a reasonable choice here?
let (tx, _) = channel(10);
Self { batch, tx }
}
}
pub type Ed25519Item = batch::Item;
impl<'msg> Service<BatchControl<Ed25519Item>> for Ed25519Verifier {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: BatchControl<Ed25519Item>) -> Self::Future {
match req {
BatchControl::Item(item) => {
tracing::trace!("got item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.recv().await {
Ok(result) => result,
Err(RecvError::Lagged(_)) => {
tracing::warn!(
"missed channel updates for the correct signature batch!"
);
Err(Error::InvalidSignature)
}
Err(RecvError::Closed) => panic!("verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
Box::pin(async { Ok(()) })
}
}
}
}
impl Drop for Ed25519Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));
}
}
// =============== testing code ========
async fn sign_and_verify<V>(
mut verifier: V,
n: usize,
bad_index: Option<usize>,
) -> Result<(), V::Error>
where
V: Service<Ed25519Item, Response = ()>,
{
let results = FuturesUnordered::new();
for i in 0..n {
let span = tracing::trace_span!("sig", i);
let sk = SigningKey::new(thread_rng());
let vk_bytes = VerificationKeyBytes::from(&sk);
let msg = b"BatchVerifyTest";
let sig = if Some(i) == bad_index {
sk.sign(b"badmsg")
} else {
sk.sign(&msg[..])
};
Update to Tokio 1.13.0 (#2994) * Update `tower` to version `0.4.9` Update to latest version to add support for Tokio version 1. * Replace usage of `ServiceExt::ready_and` It was deprecated in favor of `ServiceExt::ready`. * Update Tokio dependency to version `1.13.0` This will break the build because the code isn't ready for the update, but future commits will fix the issues. * Replace import of `tokio::stream::StreamExt` Use `futures::stream::StreamExt` instead, because newer versions of Tokio don't have the `stream` feature. * Use `IntervalStream` in `zebra-network` In newer versions of Tokio `Interval` doesn't implement `Stream`, so the wrapper types from `tokio-stream` have to be used instead. * Use `IntervalStream` in `inventory_registry` In newer versions of Tokio the `Interval` type doesn't implement `Stream`, so `tokio_stream::wrappers::IntervalStream` has to be used instead. * Use `BroadcastStream` in `inventory_registry` In newer versions of Tokio `broadcast::Receiver` doesn't implement `Stream`, so `tokio_stream::wrappers::BroadcastStream` instead. This also requires changing the error type that is used. * Handle `Semaphore::acquire` error in `tower-batch` Newer versions of Tokio can return an error if the semaphore is closed. This shouldn't happen in `tower-batch` because the semaphore is never closed. * Handle `Semaphore::acquire` error in `zebrad` test On newer versions of Tokio `Semaphore::acquire` can return an error if the semaphore is closed. This shouldn't happen in the test because the semaphore is never closed. * Update some `zebra-network` dependencies Use versions compatible with Tokio version 1. * Upgrade Hyper to version 0.14 Use a version that supports Tokio version 1. * Update `metrics` dependency to version 0.17 And also update the `metrics-exporter-prometheus` to version 0.6.1. These updates are to make sure Tokio 1 is supported. * Use `f64` as the histogram data type `u64` isn't supported as the histogram data type in newer versions of `metrics`. * Update the initialization of the metrics component Make it compatible with the new version of `metrics`. * Simplify build version counter Remove all constants and use the new `metrics::incement_counter!` macro. * Change metrics output line to match on The snapshot string isn't included in the newer version of `metrics-exporter-prometheus`. * Update `sentry` to version 0.23.0 Use a version compatible with Tokio version 1. * Remove usage of `TracingIntegration` This seems to not be available from `sentry-tracing` anymore, so it needs to be replaced. * Add sentry layer to tracing initialization This seems like the replacement for `TracingIntegration`. * Remove unnecessary conversion Suggested by a Clippy lint. * Update Cargo lock file Apply all of the updates to dependencies. * Ban duplicate tokio dependencies Also ban git sources for tokio dependencies. * Stop allowing sentry-tracing git repository in `deny.toml` * Allow remaining duplicates after the tokio upgrade * Use C: drive for CI build output on Windows GitHub Actions uses a Windows image with two disk drives, and the default D: drive is smaller than the C: drive. Zebra currently uses a lot of space to build, so it has to use the C: drive to avoid CI build failures because of insufficient space. Co-authored-by: teor <teor@riseup.net>
2021-11-02 11:46:57 -07:00
verifier.ready().await?;
results.push(span.in_scope(|| verifier.call((vk_bytes, sig, msg).into())))
}
let mut numbered_results = results.enumerate();
while let Some((i, result)) = numbered_results.next().await {
if Some(i) == bad_index {
assert!(result.is_err());
} else {
result?;
}
}
2020-06-17 16:47:46 -07:00
Ok(())
}
#[tokio::test]
async fn batch_flushes_on_max_items() -> Result<(), Report> {
use tokio::time::timeout;
zebra_test::init();
// Use a very long max_latency and a short timeout to check that
// flushing is happening based on hitting max_items.
2020-06-17 16:47:46 -07:00
let verifier = Batch::new(Ed25519Verifier::new(), 10, Duration::from_secs(1000));
timeout(Duration::from_secs(1), sign_and_verify(verifier, 100, None))
.await
.map_err(|e| eyre!(e))?
.map_err(|e| eyre!(e))?;
Ok(())
}
#[tokio::test]
async fn batch_flushes_on_max_latency() -> Result<(), Report> {
use tokio::time::timeout;
zebra_test::init();
// Use a very high max_items and a short timeout to check that
// flushing is happening based on hitting max_latency.
2020-06-17 16:47:46 -07:00
let verifier = Batch::new(Ed25519Verifier::new(), 100, Duration::from_millis(500));
timeout(Duration::from_secs(1), sign_and_verify(verifier, 10, None))
.await
.map_err(|e| eyre!(e))?
.map_err(|e| eyre!(e))?;
Ok(())
}
#[tokio::test]
async fn fallback_verification() -> Result<(), Report> {
zebra_test::init();
let verifier = Fallback::new(
Batch::new(Ed25519Verifier::new(), 10, Duration::from_millis(100)),
tower::service_fn(|item: Ed25519Item| async move { item.verify_single() }),
);
sign_and_verify(verifier, 100, Some(39))
.await
.map_err(|e| eyre!(e))?;
Ok(())
}