Implement Async Script Verification RFC (#961)

This commit begins the process of integrating `zcash_script` with the rest of the system for verifying scripts while syncing the block chain. It does so by adding the necessary support for looking up UTXOs from the state service and implements the first parts of the `script::Verifier` for looking up the necessary UTXOs in the state and then generating the necessary call to `zcash_script` to verify the script itself.

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Jane Lusby 2020-10-14 14:06:32 -07:00 committed by GitHub
parent 2d3c3bcc23
commit e05103323e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 330 additions and 32 deletions

1
Cargo.lock generated
View File

@ -3203,6 +3203,7 @@ dependencies = [
"tracing-futures",
"tracing-subscriber 0.2.13",
"zebra-chain",
"zebra-script",
"zebra-state",
"zebra-test",
]

View File

@ -27,6 +27,7 @@ tower-fallback = { path = "../tower-fallback/" }
tower-batch = { path = "../tower-batch/" }
zebra-chain = { path = "../zebra-chain" }
zebra-state = { path = "../zebra-state" }
zebra-script = { path = "../zebra-script" }
displaydoc = "0.1.7"
[dev-dependencies]

View File

@ -23,10 +23,10 @@ pub mod config;
pub mod error;
pub mod mempool;
pub mod parameters;
pub mod script;
#[allow(dead_code)] // Remove this once transaction verification is implemented
mod primitives;
mod script;
mod transaction;
pub use crate::config::Config;

View File

@ -12,11 +12,82 @@
//! This is an internal module. Use `verify::BlockVerifier` for blocks and their
//! transactions, or `mempool::MempoolTransactionVerifier` for mempool transactions.
use std::{pin::Pin, sync::Arc};
use std::future::Future;
use zebra_chain::{parameters::ConsensusBranchId, transaction::Transaction, transparent};
use crate::BoxError;
/// Internal script verification service.
///
/// After verification, the script future completes. State changes are handled by
/// `BlockVerifier` or `MempoolTransactionVerifier`.
///
/// `ScriptVerifier` is not yet implemented.
#[derive(Default)]
pub(crate) struct ScriptVerifier {}
pub struct Verifier<ZS> {
state: ZS,
branch: ConsensusBranchId,
}
impl<ZS> Verifier<ZS> {
pub fn new(state: ZS, branch: ConsensusBranchId) -> Self {
Self { state, branch }
}
}
#[derive(Debug)]
struct Request {
transaction: Arc<Transaction>,
input_index: usize,
}
impl<ZS> tower::Service<Request> for Verifier<ZS>
where
ZS: tower::Service<zebra_state::Request, Response = zebra_state::Response, Error = BoxError>,
ZS::Future: Send + 'static,
{
type Response = ();
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.state.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
use futures_util::FutureExt;
let input = &req.transaction.inputs()[req.input_index];
match input {
transparent::Input::PrevOut { outpoint, .. } => {
let output = self.state.call(zebra_state::Request::AwaitUtxo(*outpoint));
let transaction = req.transaction;
let branch_id = self.branch;
let input_index = req.input_index;
async move {
let previous_output = match output.await? {
zebra_state::Response::Utxo(output) => output,
_ => unreachable!("AwaitUtxo always responds with Utxo"),
};
zebra_script::is_valid(
transaction,
branch_id,
(input_index as u32, previous_output),
)?;
Ok(())
}
.boxed()
}
transparent::Input::Coinbase { .. } => {
async { Err("unexpected coinbase input".into()) }.boxed()
}
}
}
}

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use zebra_chain::{
block::{self, Block},
transaction,
transaction, transparent,
};
// Allow *only* this unused import, so that rustdoc link resolution
@ -107,4 +107,7 @@ pub enum Request {
/// Note: the [`HashOrHeight`] can be constructed from a [`block::Hash`] or
/// [`block::Height`] using `.into()`.
Block(HashOrHeight),
/// Request a UTXO identified by the given Outpoint
AwaitUtxo(transparent::OutPoint),
}

View File

@ -2,6 +2,7 @@ use std::sync::Arc;
use zebra_chain::{
block::{self, Block},
transaction::Transaction,
transparent,
};
// Allow *only* this unused import, so that rustdoc link resolution
@ -30,4 +31,7 @@ pub enum Response {
/// Response to [`Request::Block`] with the specified block.
Block(Option<Arc<Block>>),
/// The response to a `AwaitUtxo` request
Utxo(transparent::Output),
}

View File

@ -3,6 +3,8 @@ use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
time::Instant,
};
use futures::future::{FutureExt, TryFutureExt};
@ -21,6 +23,7 @@ use crate::{
};
mod memory_state;
mod utxo;
// todo: put this somewhere
#[derive(Debug)]
@ -39,18 +42,27 @@ struct StateService {
mem: NonFinalizedState,
/// Blocks awaiting their parent blocks for contextual verification.
queued_blocks: QueuedBlocks,
/// The set of outpoints with pending requests for their associated transparent::Output
pending_utxos: utxo::PendingUtxos,
/// Instant tracking the last time `pending_utxos` was pruned
last_prune: Instant,
}
impl StateService {
const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
pub fn new(config: Config, network: Network) -> Self {
let sled = FinalizedState::new(&config, network);
let mem = NonFinalizedState::default();
let queued_blocks = QueuedBlocks::default();
let pending_utxos = utxo::PendingUtxos::default();
Self {
sled,
mem,
queued_blocks,
pending_utxos,
last_prune: Instant::now(),
}
}
@ -154,6 +166,13 @@ impl Service<Request> for StateService {
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let now = Instant::now();
if self.last_prune + Self::PRUNE_INTERVAL < now {
self.pending_utxos.prune();
self.last_prune = now;
}
Poll::Ready(Ok(()))
}
@ -162,6 +181,7 @@ impl Service<Request> for StateService {
Request::CommitBlock { block } => {
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);
self.pending_utxos.check_block(&block);
self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx });
async move {
@ -177,6 +197,7 @@ impl Service<Request> for StateService {
Request::CommitFinalizedBlock { block } => {
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);
self.pending_utxos.check_block(&block);
self.sled
.queue_and_commit_finalized_blocks(QueuedBlock { block, rsp_tx });
@ -213,6 +234,17 @@ impl Service<Request> for StateService {
.map_ok(Response::Block)
.boxed()
}
Request::AwaitUtxo(outpoint) => {
let fut = self.pending_utxos.queue(outpoint);
if let Some(finalized_utxo) = self.sled.utxo(&outpoint).unwrap() {
self.pending_utxos.respond(outpoint, finalized_utxo);
} else if let Some(non_finalized_utxo) = self.mem.utxo(&outpoint) {
self.pending_utxos.respond(outpoint, non_finalized_utxo);
}
fut.boxed()
}
}
}
}

View File

@ -26,7 +26,7 @@ struct Chain {
height_by_hash: HashMap<block::Hash, block::Height>,
tx_by_hash: HashMap<transaction::Hash, (block::Height, usize)>,
created_utxos: HashSet<transparent::OutPoint>,
created_utxos: HashMap<transparent::OutPoint, transparent::Output>,
spent_utxos: HashSet<transparent::OutPoint>,
sprout_anchors: HashSet<sprout::tree::Root>,
sapling_anchors: HashSet<sapling::tree::Root>,
@ -258,11 +258,14 @@ impl UpdateWith<(transaction::Hash, &Vec<transparent::Output>)> for Chain {
&mut self,
(transaction_hash, outputs): &(transaction::Hash, &Vec<transparent::Output>),
) {
for (utxo_index, _) in outputs.iter().enumerate() {
self.created_utxos.insert(transparent::OutPoint {
hash: *transaction_hash,
index: utxo_index as u32,
});
for (utxo_index, output) in outputs.iter().enumerate() {
self.created_utxos.insert(
transparent::OutPoint {
hash: *transaction_hash,
index: utxo_index as u32,
},
output.clone(),
);
}
}
@ -272,10 +275,12 @@ impl UpdateWith<(transaction::Hash, &Vec<transparent::Output>)> for Chain {
) {
for (utxo_index, _) in outputs.iter().enumerate() {
assert!(
self.created_utxos.remove(&transparent::OutPoint {
hash: *transaction_hash,
index: utxo_index as u32,
}),
self.created_utxos
.remove(&transparent::OutPoint {
hash: *transaction_hash,
index: utxo_index as u32,
})
.is_some(),
"created_utxos must be present if block was"
);
}
@ -530,6 +535,18 @@ impl NonFinalizedState {
None
}
/// Returns the `transparent::Output` pointed to by the given
/// `transparent::OutPoint` if it is present.
pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Output> {
for chain in self.chain_set.iter().rev() {
if let Some(output) = chain.created_utxos.get(outpoint) {
return Some(output.clone());
}
}
None
}
}
/// A queue of blocks, awaiting the arrival of parent blocks.

View File

@ -0,0 +1,69 @@
#![allow(dead_code)]
use crate::{BoxError, Response};
use std::collections::HashMap;
use std::future::Future;
use tokio::sync::broadcast;
use zebra_chain::{block::Block, transparent};
#[derive(Debug, Default)]
pub struct PendingUtxos(HashMap<transparent::OutPoint, broadcast::Sender<transparent::Output>>);
impl PendingUtxos {
/// Returns a future that will resolve to the `transparent::Output` pointed
/// to by the given `transparent::OutPoint` when it is available.
pub fn queue(
&mut self,
outpoint: transparent::OutPoint,
) -> impl Future<Output = Result<Response, BoxError>> {
let mut receiver = self
.0
.entry(outpoint)
.or_insert_with(|| {
let (sender, _) = broadcast::channel(1);
sender
})
.subscribe();
async move {
receiver
.recv()
.await
.map(Response::Utxo)
.map_err(BoxError::from)
}
}
/// Notify all utxo requests waiting for the `transparent::Output` pointed to
/// by the given `transparent::OutPoint` that the `Output` has arrived.
pub fn respond(&mut self, outpoint: transparent::OutPoint, output: transparent::Output) {
if let Some(sender) = self.0.remove(&outpoint) {
let _ = sender.send(output);
}
}
/// For each notifies waiting utxo requests for each `transparent::Output` in
/// `block` that the output has arrived.
pub fn check_block(&mut self, block: &Block) {
if self.0.is_empty() {
return;
}
for transaction in block.transactions.iter() {
let transaction_hash = transaction.hash();
for (index, output) in transaction.outputs().iter().enumerate() {
let outpoint = transparent::OutPoint {
hash: transaction_hash,
index: index as _,
};
self.respond(outpoint, output.clone());
}
}
}
/// Scan the set of waiting utxo requests for channels where all recievers
/// have been dropped and remove the corresponding sender.
pub fn prune(&mut self) {
self.0.retain(|_, chan| chan.receiver_count() > 0);
}
}

View File

@ -3,11 +3,14 @@
use std::{collections::HashMap, convert::TryInto, future::Future, sync::Arc};
use tracing::trace;
use zebra_chain::serialization::{ZcashDeserialize, ZcashSerialize};
use zebra_chain::{
block::{self, Block},
parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
};
use zebra_chain::{
serialization::{ZcashDeserialize, ZcashSerialize},
transparent,
};
use crate::{BoxError, Config, HashOrHeight, QueuedBlock};
@ -36,13 +39,83 @@ pub struct FinalizedState {
height_by_hash: sled::Tree,
block_by_height: sled::Tree,
// tx_by_hash: sled::Tree,
// utxo_by_outpoint: sled::Tree,
utxo_by_outpoint: sled::Tree,
// sprout_nullifiers: sled::Tree,
// sapling_nullifiers: sled::Tree,
// sprout_anchors: sled::Tree,
// sapling_anchors: sled::Tree,
}
/// Helper trait for inserting (Key, Value) pairs into sled when both the key and
/// value implement ZcashSerialize.
trait SledSerialize {
/// Serialize and insert the given key and value into a sled tree.
fn zs_insert<K, V>(
&self,
key: &K,
value: &V,
) -> Result<(), sled::transaction::UnabortableTransactionError>
where
K: ZcashSerialize,
V: ZcashSerialize;
}
/// Helper trait for retrieving values from sled trees when the key and value
/// implement ZcashSerialize/ZcashDeserialize.
trait SledDeserialize {
/// Serialize the given key and use that to get and deserialize the
/// corresponding value from a sled tree, if it is present.
fn zs_get<K, V>(&self, key: &K) -> Result<Option<V>, BoxError>
where
K: ZcashSerialize,
V: ZcashDeserialize;
}
impl SledSerialize for sled::transaction::TransactionalTree {
fn zs_insert<K, V>(
&self,
key: &K,
value: &V,
) -> Result<(), sled::transaction::UnabortableTransactionError>
where
K: ZcashSerialize,
V: ZcashSerialize,
{
let key_bytes = key
.zcash_serialize_to_vec()
.expect("serializing into a vec won't fail");
let value_bytes = value
.zcash_serialize_to_vec()
.expect("serializing into a vec won't fail");
self.insert(key_bytes, value_bytes)?;
Ok(())
}
}
impl SledDeserialize for sled::Tree {
fn zs_get<K, V>(&self, key: &K) -> Result<Option<V>, BoxError>
where
K: ZcashSerialize,
V: ZcashDeserialize,
{
let key_bytes = key
.zcash_serialize_to_vec()
.expect("serializing into a vec won't fail");
let value_bytes = self.get(&key_bytes)?;
let value = value_bytes
.as_deref()
.map(ZcashDeserialize::zcash_deserialize)
.transpose()?;
Ok(value)
}
}
impl FinalizedState {
pub fn new(config: &Config, network: Network) -> Self {
let db = config.sled_config(network).open().unwrap();
@ -53,7 +126,7 @@ impl FinalizedState {
height_by_hash: db.open_tree(b"height_by_hash").unwrap(),
block_by_height: db.open_tree(b"block_by_height").unwrap(),
// tx_by_hash: db.open_tree(b"tx_by_hash").unwrap(),
// utxo_by_outpoint: db.open_tree(b"utxo_by_outpoint").unwrap(),
utxo_by_outpoint: db.open_tree(b"utxo_by_outpoint").unwrap(),
// sprout_nullifiers: db.open_tree(b"sprout_nullifiers").unwrap(),
// sapling_nullifiers: db.open_tree(b"sapling_nullifiers").unwrap(),
}
@ -115,23 +188,41 @@ impl FinalizedState {
&self.hash_by_height,
&self.height_by_hash,
&self.block_by_height,
&self.utxo_by_outpoint,
)
.transaction(move |(hash_by_height, height_by_hash, block_by_height)| {
// TODO: do serialization above
// for some reason this wouldn't move into the closure (??)
let block_bytes = block
.zcash_serialize_to_vec()
.expect("zcash_serialize_to_vec has wrong return type");
.transaction(
move |(hash_by_height, height_by_hash, block_by_height, utxo_by_outpoint)| {
// TODO: do serialization above
// for some reason this wouldn't move into the closure (??)
let block_bytes = block
.zcash_serialize_to_vec()
.expect("zcash_serialize_to_vec has wrong return type");
// TODO: check highest entry of hash_by_height as in RFC
// TODO: check highest entry of hash_by_height as in RFC
hash_by_height.insert(&height_bytes, &hash.0)?;
height_by_hash.insert(&hash.0, &height_bytes)?;
block_by_height.insert(&height_bytes, block_bytes)?;
hash_by_height.insert(&height_bytes, &hash.0)?;
height_by_hash.insert(&hash.0, &height_bytes)?;
block_by_height.insert(&height_bytes, block_bytes)?;
// tx_by_hash
// for some reason type inference fails here
Ok::<_, sled::transaction::ConflictableTransactionError>(hash)
})
for transaction in block.transactions.iter() {
let transaction_hash = transaction.hash();
for (index, output) in transaction.outputs().iter().enumerate() {
let outpoint = transparent::OutPoint {
hash: transaction_hash,
index: index as _,
};
utxo_by_outpoint.zs_insert(&outpoint, output)?;
}
}
// sprout_nullifiers
// sapling_nullifiers
// for some reason type inference fails here
Ok::<_, sled::transaction::ConflictableTransactionError>(hash)
},
)
.map_err(Into::into)
}
@ -222,6 +313,15 @@ impl FinalizedState {
}
}
}
/// Returns the `transparent::Output` pointed to by the given
/// `transparent::OutPoint` if it is present.
pub fn utxo(
&self,
outpoint: &transparent::OutPoint,
) -> Result<Option<transparent::Output>, BoxError> {
self.utxo_by_outpoint.zs_get(outpoint)
}
}
// Split into a helper function to be called synchronously or asynchronously.