Merge pull request #608 from nuttycom/wallet/batch_trial_decryption
Implement parallel, batched trial decryption in wallet scanning.
This commit is contained in:
commit
58f34426d4
|
@ -13,10 +13,11 @@ license = "MIT OR Apache-2.0"
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
base64 = "0.13"
|
||||||
bech32 = "0.8"
|
bech32 = "0.8"
|
||||||
bls12_381 = "0.7"
|
bls12_381 = "0.7"
|
||||||
bs58 = { version = "0.4", features = ["check"] }
|
bs58 = { version = "0.4", features = ["check"] }
|
||||||
base64 = "0.13"
|
crossbeam-channel = "0.5"
|
||||||
ff = "0.12"
|
ff = "0.12"
|
||||||
group = "0.12"
|
group = "0.12"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
|
@ -29,11 +30,13 @@ percent-encoding = "2.1.0"
|
||||||
proptest = { version = "1.0.0", optional = true }
|
proptest = { version = "1.0.0", optional = true }
|
||||||
protobuf = "~2.27.1" # MSRV 1.52.1
|
protobuf = "~2.27.1" # MSRV 1.52.1
|
||||||
rand_core = "0.6"
|
rand_core = "0.6"
|
||||||
|
rayon = "1.5"
|
||||||
ripemd = { version = "0.1", optional = true }
|
ripemd = { version = "0.1", optional = true }
|
||||||
secp256k1 = { version = "0.21", optional = true }
|
secp256k1 = { version = "0.21", optional = true }
|
||||||
sha2 = { version = "0.10.1", optional = true }
|
sha2 = { version = "0.10.1", optional = true }
|
||||||
subtle = "2.2.3"
|
subtle = "2.2.3"
|
||||||
time = "0.2"
|
time = "0.2"
|
||||||
|
tracing = "0.1"
|
||||||
zcash_address = { version = "0.1", path = "../components/zcash_address" }
|
zcash_address = { version = "0.1", path = "../components/zcash_address" }
|
||||||
zcash_note_encryption = { version = "0.1", path = "../components/zcash_note_encryption" }
|
zcash_note_encryption = { version = "0.1", path = "../components/zcash_note_encryption" }
|
||||||
zcash_primitives = { version = "0.7", path = "../zcash_primitives" }
|
zcash_primitives = { version = "0.7", path = "../zcash_primitives" }
|
||||||
|
|
|
@ -83,7 +83,7 @@ use zcash_primitives::{
|
||||||
block::BlockHash,
|
block::BlockHash,
|
||||||
consensus::{self, BlockHeight, NetworkUpgrade},
|
consensus::{self, BlockHeight, NetworkUpgrade},
|
||||||
merkle_tree::CommitmentTree,
|
merkle_tree::CommitmentTree,
|
||||||
sapling::Nullifier,
|
sapling::{keys::Scope, Nullifier},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -92,8 +92,9 @@ use crate::{
|
||||||
BlockSource, PrunedBlock, WalletWrite,
|
BlockSource, PrunedBlock, WalletWrite,
|
||||||
},
|
},
|
||||||
proto::compact_formats::CompactBlock,
|
proto::compact_formats::CompactBlock,
|
||||||
|
scan::BatchRunner,
|
||||||
wallet::WalletTx,
|
wallet::WalletTx,
|
||||||
welding_rig::scan_block,
|
welding_rig::{add_block_to_runner, scan_block_with_runner},
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Checks that the scanned blocks in the data database, when combined with the recent
|
/// Checks that the scanned blocks in the data database, when combined with the recent
|
||||||
|
@ -192,7 +193,7 @@ pub fn scan_cached_blocks<E, N, P, C, D>(
|
||||||
limit: Option<u32>,
|
limit: Option<u32>,
|
||||||
) -> Result<(), E>
|
) -> Result<(), E>
|
||||||
where
|
where
|
||||||
P: consensus::Parameters,
|
P: consensus::Parameters + Send + 'static,
|
||||||
C: BlockSource<Error = E>,
|
C: BlockSource<Error = E>,
|
||||||
D: WalletWrite<Error = E, NoteRef = N>,
|
D: WalletWrite<Error = E, NoteRef = N>,
|
||||||
N: Copy + Debug,
|
N: Copy + Debug,
|
||||||
|
@ -229,6 +230,21 @@ where
|
||||||
// Get the nullifiers for the notes we are tracking
|
// Get the nullifiers for the notes we are tracking
|
||||||
let mut nullifiers = data.get_nullifiers()?;
|
let mut nullifiers = data.get_nullifiers()?;
|
||||||
|
|
||||||
|
let mut batch_runner = BatchRunner::new(
|
||||||
|
100,
|
||||||
|
dfvks
|
||||||
|
.iter()
|
||||||
|
.flat_map(|(_, dfvk)| [dfvk.to_ivk(Scope::External), dfvk.to_ivk(Scope::Internal)])
|
||||||
|
.collect(),
|
||||||
|
);
|
||||||
|
|
||||||
|
cache.with_blocks(last_height, limit, |block: CompactBlock| {
|
||||||
|
add_block_to_runner(params, block, &mut batch_runner);
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
batch_runner.flush();
|
||||||
|
|
||||||
cache.with_blocks(last_height, limit, |block: CompactBlock| {
|
cache.with_blocks(last_height, limit, |block: CompactBlock| {
|
||||||
let current_height = block.height();
|
let current_height = block.height();
|
||||||
|
|
||||||
|
@ -245,13 +261,14 @@ where
|
||||||
let txs: Vec<WalletTx<Nullifier>> = {
|
let txs: Vec<WalletTx<Nullifier>> = {
|
||||||
let mut witness_refs: Vec<_> = witnesses.iter_mut().map(|w| &mut w.1).collect();
|
let mut witness_refs: Vec<_> = witnesses.iter_mut().map(|w| &mut w.1).collect();
|
||||||
|
|
||||||
scan_block(
|
scan_block_with_runner(
|
||||||
params,
|
params,
|
||||||
block,
|
block,
|
||||||
&dfvks,
|
&dfvks,
|
||||||
&nullifiers,
|
&nullifiers,
|
||||||
&mut tree,
|
&mut tree,
|
||||||
&mut witness_refs[..],
|
&mut witness_refs[..],
|
||||||
|
Some(&mut batch_runner),
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ mod decrypt;
|
||||||
pub mod encoding;
|
pub mod encoding;
|
||||||
pub mod keys;
|
pub mod keys;
|
||||||
pub mod proto;
|
pub mod proto;
|
||||||
|
pub mod scan;
|
||||||
pub mod wallet;
|
pub mod wallet;
|
||||||
pub mod welding_rig;
|
pub mod welding_rig;
|
||||||
pub mod zip321;
|
pub mod zip321;
|
||||||
|
|
|
@ -0,0 +1,221 @@
|
||||||
|
use crossbeam_channel as channel;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::fmt;
|
||||||
|
use std::mem;
|
||||||
|
|
||||||
|
use zcash_note_encryption::{batch, BatchDomain, Domain, ShieldedOutput, COMPACT_NOTE_SIZE};
|
||||||
|
use zcash_primitives::{block::BlockHash, transaction::TxId};
|
||||||
|
|
||||||
|
/// A decrypted note.
|
||||||
|
pub(crate) struct DecryptedNote<D: Domain> {
|
||||||
|
/// The incoming viewing key used to decrypt the note.
|
||||||
|
pub(crate) ivk: D::IncomingViewingKey,
|
||||||
|
/// The recipient of the note.
|
||||||
|
pub(crate) recipient: D::Recipient,
|
||||||
|
/// The note!
|
||||||
|
pub(crate) note: D::Note,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<D: Domain> fmt::Debug for DecryptedNote<D>
|
||||||
|
where
|
||||||
|
D::IncomingViewingKey: fmt::Debug,
|
||||||
|
D::Recipient: fmt::Debug,
|
||||||
|
D::Note: fmt::Debug,
|
||||||
|
D::Memo: fmt::Debug,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("DecryptedNote")
|
||||||
|
.field("ivk", &self.ivk)
|
||||||
|
.field("recipient", &self.recipient)
|
||||||
|
.field("note", &self.note)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A value correlated with an output index.
|
||||||
|
struct OutputIndex<V> {
|
||||||
|
/// The index of the output within the corresponding shielded bundle.
|
||||||
|
output_index: usize,
|
||||||
|
/// The value for the output index.
|
||||||
|
value: V,
|
||||||
|
}
|
||||||
|
|
||||||
|
type OutputReplier<D> = OutputIndex<channel::Sender<OutputIndex<Option<DecryptedNote<D>>>>>;
|
||||||
|
|
||||||
|
/// A batch of outputs to trial decrypt.
|
||||||
|
struct Batch<D: BatchDomain, Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>> {
|
||||||
|
ivks: Vec<D::IncomingViewingKey>,
|
||||||
|
/// We currently store outputs and repliers as parallel vectors, because
|
||||||
|
/// [`batch::try_note_decryption`] accepts a slice of domain/output pairs
|
||||||
|
/// rather than a value that implements `IntoIterator`, and therefore we
|
||||||
|
/// can't just use `map` to select the parts we need in order to perform
|
||||||
|
/// batch decryption. Ideally the domain, output, and output replier would
|
||||||
|
/// all be part of the same struct, which would also track the output index
|
||||||
|
/// (that is captured in the outer `OutputIndex` of each `OutputReplier`).
|
||||||
|
outputs: Vec<(D, Output)>,
|
||||||
|
repliers: Vec<OutputReplier<D>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<D, Output> Batch<D, Output>
|
||||||
|
where
|
||||||
|
D: BatchDomain,
|
||||||
|
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
|
||||||
|
D::IncomingViewingKey: Clone,
|
||||||
|
{
|
||||||
|
/// Constructs a new batch.
|
||||||
|
fn new(ivks: Vec<D::IncomingViewingKey>) -> Self {
|
||||||
|
Self {
|
||||||
|
ivks,
|
||||||
|
outputs: vec![],
|
||||||
|
repliers: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if the batch is currently empty.
|
||||||
|
fn is_empty(&self) -> bool {
|
||||||
|
self.outputs.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs the batch of trial decryptions, and reports the results.
|
||||||
|
fn run(self) {
|
||||||
|
assert_eq!(self.outputs.len(), self.repliers.len());
|
||||||
|
|
||||||
|
let decryption_results = batch::try_compact_note_decryption(&self.ivks, &self.outputs);
|
||||||
|
for (decryption_result, replier) in decryption_results.into_iter().zip(self.repliers.iter())
|
||||||
|
{
|
||||||
|
let result = OutputIndex {
|
||||||
|
output_index: replier.output_index,
|
||||||
|
value: decryption_result.map(|((note, recipient), ivk_idx)| DecryptedNote {
|
||||||
|
ivk: self.ivks[ivk_idx].clone(),
|
||||||
|
recipient,
|
||||||
|
note,
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
if replier.value.send(result).is_err() {
|
||||||
|
tracing::debug!("BatchRunner was dropped before batch finished");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<D: BatchDomain, Output: ShieldedOutput<D, COMPACT_NOTE_SIZE> + Clone> Batch<D, Output> {
|
||||||
|
/// Adds the given outputs to this batch.
|
||||||
|
///
|
||||||
|
/// `replier` will be called with the result of every output.
|
||||||
|
fn add_outputs(
|
||||||
|
&mut self,
|
||||||
|
domain: impl Fn() -> D,
|
||||||
|
outputs: &[Output],
|
||||||
|
replier: channel::Sender<OutputIndex<Option<DecryptedNote<D>>>>,
|
||||||
|
) {
|
||||||
|
self.outputs
|
||||||
|
.extend(outputs.iter().cloned().map(|output| (domain(), output)));
|
||||||
|
self.repliers
|
||||||
|
.extend((0..outputs.len()).map(|output_index| OutputIndex {
|
||||||
|
output_index,
|
||||||
|
value: replier.clone(),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type ResultKey = (BlockHash, TxId);
|
||||||
|
|
||||||
|
/// Logic to run batches of trial decryptions on the global threadpool.
|
||||||
|
pub(crate) struct BatchRunner<D: BatchDomain, Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>> {
|
||||||
|
batch_size_threshold: usize,
|
||||||
|
acc: Batch<D, Output>,
|
||||||
|
pending_results: HashMap<ResultKey, channel::Receiver<OutputIndex<Option<DecryptedNote<D>>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<D, Output> BatchRunner<D, Output>
|
||||||
|
where
|
||||||
|
D: BatchDomain,
|
||||||
|
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
|
||||||
|
D::IncomingViewingKey: Clone,
|
||||||
|
{
|
||||||
|
/// Constructs a new batch runner for the given incoming viewing keys.
|
||||||
|
pub(crate) fn new(batch_size_threshold: usize, ivks: Vec<D::IncomingViewingKey>) -> Self {
|
||||||
|
Self {
|
||||||
|
batch_size_threshold,
|
||||||
|
acc: Batch::new(ivks),
|
||||||
|
pending_results: HashMap::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<D, Output> BatchRunner<D, Output>
|
||||||
|
where
|
||||||
|
D: BatchDomain + Send + 'static,
|
||||||
|
D::IncomingViewingKey: Clone + Send,
|
||||||
|
D::Memo: Send,
|
||||||
|
D::Note: Send,
|
||||||
|
D::Recipient: Send,
|
||||||
|
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE> + Clone + Send + 'static,
|
||||||
|
{
|
||||||
|
/// Batches the given outputs for trial decryption.
|
||||||
|
///
|
||||||
|
/// `block_tag` is the hash of the block that triggered this txid being added to the
|
||||||
|
/// batch, or the all-zeros hash to indicate that no block triggered it (i.e. it was a
|
||||||
|
/// mempool change).
|
||||||
|
///
|
||||||
|
/// If after adding the given outputs, the accumulated batch size is at least
|
||||||
|
/// `BATCH_SIZE_THRESHOLD`, `Self::flush` is called. Subsequent calls to
|
||||||
|
/// `Self::add_outputs` will be accumulated into a new batch.
|
||||||
|
pub(crate) fn add_outputs(
|
||||||
|
&mut self,
|
||||||
|
block_tag: BlockHash,
|
||||||
|
txid: TxId,
|
||||||
|
domain: impl Fn() -> D,
|
||||||
|
outputs: &[Output],
|
||||||
|
) {
|
||||||
|
let (tx, rx) = channel::unbounded();
|
||||||
|
self.acc.add_outputs(domain, outputs, tx);
|
||||||
|
self.pending_results.insert((block_tag, txid), rx);
|
||||||
|
|
||||||
|
if self.acc.outputs.len() >= self.batch_size_threshold {
|
||||||
|
self.flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs the currently accumulated batch on the global threadpool.
|
||||||
|
///
|
||||||
|
/// Subsequent calls to `Self::add_outputs` will be accumulated into a new batch.
|
||||||
|
pub(crate) fn flush(&mut self) {
|
||||||
|
if !self.acc.is_empty() {
|
||||||
|
let mut batch = Batch::new(self.acc.ivks.clone());
|
||||||
|
mem::swap(&mut batch, &mut self.acc);
|
||||||
|
rayon::spawn_fifo(|| batch.run());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Collects the pending decryption results for the given transaction.
|
||||||
|
///
|
||||||
|
/// `block_tag` is the hash of the block that triggered this txid being added to the
|
||||||
|
/// batch, or the all-zeros hash to indicate that no block triggered it (i.e. it was a
|
||||||
|
/// mempool change).
|
||||||
|
pub(crate) fn collect_results(
|
||||||
|
&mut self,
|
||||||
|
block_tag: BlockHash,
|
||||||
|
txid: TxId,
|
||||||
|
) -> HashMap<(TxId, usize), DecryptedNote<D>> {
|
||||||
|
self.pending_results
|
||||||
|
.remove(&(block_tag, txid))
|
||||||
|
// We won't have a pending result if the transaction didn't have outputs of
|
||||||
|
// this runner's kind.
|
||||||
|
.map(|rx| {
|
||||||
|
rx.into_iter()
|
||||||
|
.filter_map(
|
||||||
|
|OutputIndex {
|
||||||
|
output_index,
|
||||||
|
value,
|
||||||
|
}| {
|
||||||
|
value.map(|decrypted_note| ((txid, output_index), decrypted_note))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
|
.unwrap_or_default()
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,7 +1,7 @@
|
||||||
//! Tools for scanning a compact representation of the Zcash block chain.
|
//! Tools for scanning a compact representation of the Zcash block chain.
|
||||||
|
|
||||||
use ff::PrimeField;
|
use ff::PrimeField;
|
||||||
use std::collections::HashSet;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use subtle::{ConditionallySelectable, ConstantTimeEq, CtOption};
|
use subtle::{ConditionallySelectable, ConstantTimeEq, CtOption};
|
||||||
use zcash_note_encryption::batch;
|
use zcash_note_encryption::batch;
|
||||||
|
@ -18,8 +18,11 @@ use zcash_primitives::{
|
||||||
zip32::{AccountId, ExtendedFullViewingKey},
|
zip32::{AccountId, ExtendedFullViewingKey},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::proto::compact_formats::CompactBlock;
|
use crate::{
|
||||||
use crate::wallet::{WalletShieldedOutput, WalletShieldedSpend, WalletTx};
|
proto::compact_formats::CompactBlock,
|
||||||
|
scan::BatchRunner,
|
||||||
|
wallet::{WalletShieldedOutput, WalletShieldedSpend, WalletTx},
|
||||||
|
};
|
||||||
|
|
||||||
/// A key that can be used to perform trial decryption and nullifier
|
/// A key that can be used to perform trial decryption and nullifier
|
||||||
/// computation for a Sapling [`CompactSaplingOutput`]
|
/// computation for a Sapling [`CompactSaplingOutput`]
|
||||||
|
@ -36,7 +39,7 @@ use crate::wallet::{WalletShieldedOutput, WalletShieldedSpend, WalletTx};
|
||||||
/// [`scan_block`]: crate::welding_rig::scan_block
|
/// [`scan_block`]: crate::welding_rig::scan_block
|
||||||
pub trait ScanningKey {
|
pub trait ScanningKey {
|
||||||
/// The type of key that is used to decrypt Sapling outputs;
|
/// The type of key that is used to decrypt Sapling outputs;
|
||||||
type SaplingNk;
|
type SaplingNk: Clone;
|
||||||
|
|
||||||
type SaplingKeys: IntoIterator<Item = (SaplingIvk, Self::SaplingNk)>;
|
type SaplingKeys: IntoIterator<Item = (SaplingIvk, Self::SaplingNk)>;
|
||||||
|
|
||||||
|
@ -141,16 +144,65 @@ impl ScanningKey for SaplingIvk {
|
||||||
/// [`IncrementalWitness`]: zcash_primitives::merkle_tree::IncrementalWitness
|
/// [`IncrementalWitness`]: zcash_primitives::merkle_tree::IncrementalWitness
|
||||||
/// [`WalletShieldedOutput`]: crate::wallet::WalletShieldedOutput
|
/// [`WalletShieldedOutput`]: crate::wallet::WalletShieldedOutput
|
||||||
/// [`WalletTx`]: crate::wallet::WalletTx
|
/// [`WalletTx`]: crate::wallet::WalletTx
|
||||||
pub fn scan_block<P: consensus::Parameters, K: ScanningKey>(
|
pub fn scan_block<P: consensus::Parameters + Send + 'static, K: ScanningKey>(
|
||||||
params: &P,
|
params: &P,
|
||||||
block: CompactBlock,
|
block: CompactBlock,
|
||||||
vks: &[(&AccountId, &K)],
|
vks: &[(&AccountId, &K)],
|
||||||
nullifiers: &[(AccountId, Nullifier)],
|
nullifiers: &[(AccountId, Nullifier)],
|
||||||
tree: &mut CommitmentTree<Node>,
|
tree: &mut CommitmentTree<Node>,
|
||||||
existing_witnesses: &mut [&mut IncrementalWitness<Node>],
|
existing_witnesses: &mut [&mut IncrementalWitness<Node>],
|
||||||
|
) -> Vec<WalletTx<K::Nf>> {
|
||||||
|
scan_block_with_runner(
|
||||||
|
params,
|
||||||
|
block,
|
||||||
|
vks,
|
||||||
|
nullifiers,
|
||||||
|
tree,
|
||||||
|
existing_witnesses,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn add_block_to_runner<P: consensus::Parameters + Send + 'static>(
|
||||||
|
params: &P,
|
||||||
|
block: CompactBlock,
|
||||||
|
batch_runner: &mut BatchRunner<SaplingDomain<P>, CompactOutputDescription>,
|
||||||
|
) {
|
||||||
|
let block_hash = block.hash();
|
||||||
|
let block_height = block.height();
|
||||||
|
|
||||||
|
for tx in block.vtx.into_iter() {
|
||||||
|
let txid = tx.txid();
|
||||||
|
let outputs = tx
|
||||||
|
.outputs
|
||||||
|
.into_iter()
|
||||||
|
.map(|output| {
|
||||||
|
CompactOutputDescription::try_from(output)
|
||||||
|
.expect("Invalid output found in compact block decoding.")
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
batch_runner.add_outputs(
|
||||||
|
block_hash,
|
||||||
|
txid,
|
||||||
|
|| SaplingDomain::for_height(params.clone(), block_height),
|
||||||
|
&outputs,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn scan_block_with_runner<P: consensus::Parameters + Send + 'static, K: ScanningKey>(
|
||||||
|
params: &P,
|
||||||
|
block: CompactBlock,
|
||||||
|
vks: &[(&AccountId, &K)],
|
||||||
|
nullifiers: &[(AccountId, Nullifier)],
|
||||||
|
tree: &mut CommitmentTree<Node>,
|
||||||
|
existing_witnesses: &mut [&mut IncrementalWitness<Node>],
|
||||||
|
mut batch_runner: Option<&mut BatchRunner<SaplingDomain<P>, CompactOutputDescription>>,
|
||||||
) -> Vec<WalletTx<K::Nf>> {
|
) -> Vec<WalletTx<K::Nf>> {
|
||||||
let mut wtxs: Vec<WalletTx<K::Nf>> = vec![];
|
let mut wtxs: Vec<WalletTx<K::Nf>> = vec![];
|
||||||
let block_height = block.height();
|
let block_height = block.height();
|
||||||
|
let block_hash = block.hash();
|
||||||
|
|
||||||
for tx in block.vtx.into_iter() {
|
for tx in block.vtx.into_iter() {
|
||||||
let txid = tx.txid();
|
let txid = tx.txid();
|
||||||
|
@ -218,21 +270,53 @@ pub fn scan_block<P: consensus::Parameters, K: ScanningKey>(
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let vks = vks
|
let decrypted: Vec<_> = if let Some(runner) = batch_runner.as_mut() {
|
||||||
.iter()
|
let vks = vks
|
||||||
.flat_map(|(a, k)| {
|
.iter()
|
||||||
k.to_sapling_keys()
|
.flat_map(|(a, k)| {
|
||||||
.into_iter()
|
k.to_sapling_keys()
|
||||||
.map(move |(ivk, nk)| (**a, ivk, nk))
|
.into_iter()
|
||||||
})
|
.map(move |(ivk, nk)| (ivk.to_repr(), (**a, nk)))
|
||||||
.collect::<Vec<_>>();
|
})
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
let ivks = vks
|
let mut decrypted = runner.collect_results(block_hash, txid);
|
||||||
.iter()
|
(0..decoded.len())
|
||||||
.map(|(_, ivk, _)| (*ivk).clone())
|
.map(|i| {
|
||||||
.collect::<Vec<_>>();
|
decrypted.remove(&(txid, i)).map(|d_note| {
|
||||||
|
let (a, nk) = vks.get(&d_note.ivk.to_repr()).expect(
|
||||||
|
"The batch runner and scan_block must use the same set of IVKs.",
|
||||||
|
);
|
||||||
|
|
||||||
let decrypted = batch::try_compact_note_decryption(&ivks, decoded);
|
((d_note.note, d_note.recipient), *a, (*nk).clone())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
} else {
|
||||||
|
let vks = vks
|
||||||
|
.iter()
|
||||||
|
.flat_map(|(a, k)| {
|
||||||
|
k.to_sapling_keys()
|
||||||
|
.into_iter()
|
||||||
|
.map(move |(ivk, nk)| (**a, ivk, nk))
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let ivks = vks
|
||||||
|
.iter()
|
||||||
|
.map(|(_, ivk, _)| (*ivk).clone())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
batch::try_compact_note_decryption(&ivks, decoded)
|
||||||
|
.into_iter()
|
||||||
|
.map(|v| {
|
||||||
|
v.map(|(note_data, ivk_idx)| {
|
||||||
|
let (account, _, nk) = &vks[ivk_idx];
|
||||||
|
(note_data, *account, (*nk).clone())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
|
||||||
for (index, ((_, output), dec_output)) in decoded.iter().zip(decrypted).enumerate() {
|
for (index, ((_, output), dec_output)) in decoded.iter().zip(decrypted).enumerate() {
|
||||||
// Grab mutable references to new witnesses from previous outputs
|
// Grab mutable references to new witnesses from previous outputs
|
||||||
|
@ -256,23 +340,22 @@ pub fn scan_block<P: consensus::Parameters, K: ScanningKey>(
|
||||||
}
|
}
|
||||||
tree.append(node).unwrap();
|
tree.append(node).unwrap();
|
||||||
|
|
||||||
if let Some(((note, to), ivk_idx)) = dec_output {
|
if let Some(((note, to), account, nk)) = dec_output {
|
||||||
// A note is marked as "change" if the account that received it
|
// A note is marked as "change" if the account that received it
|
||||||
// also spent notes in the same transaction. This will catch,
|
// also spent notes in the same transaction. This will catch,
|
||||||
// for instance:
|
// for instance:
|
||||||
// - Change created by spending fractions of notes.
|
// - Change created by spending fractions of notes.
|
||||||
// - Notes created by consolidation transactions.
|
// - Notes created by consolidation transactions.
|
||||||
// - Notes sent from one account to itself.
|
// - Notes sent from one account to itself.
|
||||||
let (account, _, nk) = &vks[ivk_idx];
|
let is_change = spent_from_accounts.contains(&account);
|
||||||
let is_change = spent_from_accounts.contains(account);
|
|
||||||
let witness = IncrementalWitness::from_tree(tree);
|
let witness = IncrementalWitness::from_tree(tree);
|
||||||
let nf = K::sapling_nf(nk, ¬e, &witness);
|
let nf = K::sapling_nf(&nk, ¬e, &witness);
|
||||||
|
|
||||||
shielded_outputs.push(WalletShieldedOutput {
|
shielded_outputs.push(WalletShieldedOutput {
|
||||||
index,
|
index,
|
||||||
cmu: output.cmu,
|
cmu: output.cmu,
|
||||||
ephemeral_key: output.ephemeral_key.clone(),
|
ephemeral_key: output.ephemeral_key.clone(),
|
||||||
account: *account,
|
account,
|
||||||
note,
|
note,
|
||||||
to,
|
to,
|
||||||
is_change,
|
is_change,
|
||||||
|
@ -316,11 +399,15 @@ mod tests {
|
||||||
zip32::{AccountId, ExtendedFullViewingKey, ExtendedSpendingKey},
|
zip32::{AccountId, ExtendedFullViewingKey, ExtendedSpendingKey},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::scan_block;
|
use crate::{
|
||||||
use crate::proto::compact_formats::{
|
proto::compact_formats::{
|
||||||
CompactBlock, CompactSaplingOutput, CompactSaplingSpend, CompactTx,
|
CompactBlock, CompactSaplingOutput, CompactSaplingSpend, CompactTx,
|
||||||
|
},
|
||||||
|
scan::BatchRunner,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use super::{add_block_to_runner, scan_block, scan_block_with_runner, ScanningKey};
|
||||||
|
|
||||||
fn random_compact_tx(mut rng: impl RngCore) -> CompactTx {
|
fn random_compact_tx(mut rng: impl RngCore) -> CompactTx {
|
||||||
let fake_nf = {
|
let fake_nf = {
|
||||||
let mut nf = vec![0; 32];
|
let mut nf = vec![0; 32];
|
||||||
|
@ -387,6 +474,11 @@ mod tests {
|
||||||
|
|
||||||
// Create a fake CompactBlock containing the note
|
// Create a fake CompactBlock containing the note
|
||||||
let mut cb = CompactBlock::new();
|
let mut cb = CompactBlock::new();
|
||||||
|
cb.set_hash({
|
||||||
|
let mut hash = vec![0; 32];
|
||||||
|
rng.fill_bytes(&mut hash);
|
||||||
|
hash
|
||||||
|
});
|
||||||
cb.set_height(height.into());
|
cb.set_height(height.into());
|
||||||
|
|
||||||
// Add a random Sapling tx before ours
|
// Add a random Sapling tx before ours
|
||||||
|
@ -423,80 +515,128 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn scan_block_with_my_tx() {
|
fn scan_block_with_my_tx() {
|
||||||
let extsk = ExtendedSpendingKey::master(&[]);
|
fn go(scan_multithreaded: bool) {
|
||||||
let extfvk = ExtendedFullViewingKey::from(&extsk);
|
let extsk = ExtendedSpendingKey::master(&[]);
|
||||||
|
let extfvk = ExtendedFullViewingKey::from(&extsk);
|
||||||
|
|
||||||
let cb = fake_compact_block(
|
let cb = fake_compact_block(
|
||||||
1u32.into(),
|
1u32.into(),
|
||||||
Nullifier([0; 32]),
|
Nullifier([0; 32]),
|
||||||
extfvk.clone(),
|
extfvk.clone(),
|
||||||
Amount::from_u64(5).unwrap(),
|
Amount::from_u64(5).unwrap(),
|
||||||
false,
|
false,
|
||||||
);
|
);
|
||||||
assert_eq!(cb.vtx.len(), 2);
|
assert_eq!(cb.vtx.len(), 2);
|
||||||
|
|
||||||
let mut tree = CommitmentTree::empty();
|
let mut tree = CommitmentTree::empty();
|
||||||
let txs = scan_block(
|
let mut batch_runner = if scan_multithreaded {
|
||||||
&Network::TestNetwork,
|
let mut runner = BatchRunner::new(
|
||||||
cb,
|
10,
|
||||||
&[(&AccountId::from(0), &extfvk)],
|
extfvk
|
||||||
&[],
|
.to_sapling_keys()
|
||||||
&mut tree,
|
.iter()
|
||||||
&mut [],
|
.map(|(k, _)| k.clone())
|
||||||
);
|
.collect(),
|
||||||
assert_eq!(txs.len(), 1);
|
);
|
||||||
|
|
||||||
let tx = &txs[0];
|
add_block_to_runner(&Network::TestNetwork, cb.clone(), &mut runner);
|
||||||
assert_eq!(tx.index, 1);
|
runner.flush();
|
||||||
assert_eq!(tx.num_spends, 1);
|
|
||||||
assert_eq!(tx.num_outputs, 1);
|
|
||||||
assert_eq!(tx.shielded_spends.len(), 0);
|
|
||||||
assert_eq!(tx.shielded_outputs.len(), 1);
|
|
||||||
assert_eq!(tx.shielded_outputs[0].index, 0);
|
|
||||||
assert_eq!(tx.shielded_outputs[0].account, AccountId::from(0));
|
|
||||||
assert_eq!(tx.shielded_outputs[0].note.value, 5);
|
|
||||||
|
|
||||||
// Check that the witness root matches
|
Some(runner)
|
||||||
assert_eq!(tx.shielded_outputs[0].witness.root(), tree.root());
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let txs = scan_block_with_runner(
|
||||||
|
&Network::TestNetwork,
|
||||||
|
cb,
|
||||||
|
&[(&AccountId::from(0), &extfvk)],
|
||||||
|
&[],
|
||||||
|
&mut tree,
|
||||||
|
&mut [],
|
||||||
|
batch_runner.as_mut(),
|
||||||
|
);
|
||||||
|
assert_eq!(txs.len(), 1);
|
||||||
|
|
||||||
|
let tx = &txs[0];
|
||||||
|
assert_eq!(tx.index, 1);
|
||||||
|
assert_eq!(tx.num_spends, 1);
|
||||||
|
assert_eq!(tx.num_outputs, 1);
|
||||||
|
assert_eq!(tx.shielded_spends.len(), 0);
|
||||||
|
assert_eq!(tx.shielded_outputs.len(), 1);
|
||||||
|
assert_eq!(tx.shielded_outputs[0].index, 0);
|
||||||
|
assert_eq!(tx.shielded_outputs[0].account, AccountId::from(0));
|
||||||
|
assert_eq!(tx.shielded_outputs[0].note.value, 5);
|
||||||
|
|
||||||
|
// Check that the witness root matches
|
||||||
|
assert_eq!(tx.shielded_outputs[0].witness.root(), tree.root());
|
||||||
|
}
|
||||||
|
|
||||||
|
go(false);
|
||||||
|
go(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn scan_block_with_txs_after_my_tx() {
|
fn scan_block_with_txs_after_my_tx() {
|
||||||
let extsk = ExtendedSpendingKey::master(&[]);
|
fn go(scan_multithreaded: bool) {
|
||||||
let extfvk = ExtendedFullViewingKey::from(&extsk);
|
let extsk = ExtendedSpendingKey::master(&[]);
|
||||||
|
let extfvk = ExtendedFullViewingKey::from(&extsk);
|
||||||
|
|
||||||
let cb = fake_compact_block(
|
let cb = fake_compact_block(
|
||||||
1u32.into(),
|
1u32.into(),
|
||||||
Nullifier([0; 32]),
|
Nullifier([0; 32]),
|
||||||
extfvk.clone(),
|
extfvk.clone(),
|
||||||
Amount::from_u64(5).unwrap(),
|
Amount::from_u64(5).unwrap(),
|
||||||
true,
|
true,
|
||||||
);
|
);
|
||||||
assert_eq!(cb.vtx.len(), 3);
|
assert_eq!(cb.vtx.len(), 3);
|
||||||
|
|
||||||
let mut tree = CommitmentTree::empty();
|
let mut tree = CommitmentTree::empty();
|
||||||
let txs = scan_block(
|
let mut batch_runner = if scan_multithreaded {
|
||||||
&Network::TestNetwork,
|
let mut runner = BatchRunner::new(
|
||||||
cb,
|
10,
|
||||||
&[(&AccountId::from(0), &extfvk)],
|
extfvk
|
||||||
&[],
|
.to_sapling_keys()
|
||||||
&mut tree,
|
.iter()
|
||||||
&mut [],
|
.map(|(k, _)| k.clone())
|
||||||
);
|
.collect(),
|
||||||
assert_eq!(txs.len(), 1);
|
);
|
||||||
|
|
||||||
let tx = &txs[0];
|
add_block_to_runner(&Network::TestNetwork, cb.clone(), &mut runner);
|
||||||
assert_eq!(tx.index, 1);
|
runner.flush();
|
||||||
assert_eq!(tx.num_spends, 1);
|
|
||||||
assert_eq!(tx.num_outputs, 1);
|
|
||||||
assert_eq!(tx.shielded_spends.len(), 0);
|
|
||||||
assert_eq!(tx.shielded_outputs.len(), 1);
|
|
||||||
assert_eq!(tx.shielded_outputs[0].index, 0);
|
|
||||||
assert_eq!(tx.shielded_outputs[0].account, AccountId::from(0));
|
|
||||||
assert_eq!(tx.shielded_outputs[0].note.value, 5);
|
|
||||||
|
|
||||||
// Check that the witness root matches
|
Some(runner)
|
||||||
assert_eq!(tx.shielded_outputs[0].witness.root(), tree.root());
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let txs = scan_block_with_runner(
|
||||||
|
&Network::TestNetwork,
|
||||||
|
cb,
|
||||||
|
&[(&AccountId::from(0), &extfvk)],
|
||||||
|
&[],
|
||||||
|
&mut tree,
|
||||||
|
&mut [],
|
||||||
|
batch_runner.as_mut(),
|
||||||
|
);
|
||||||
|
assert_eq!(txs.len(), 1);
|
||||||
|
|
||||||
|
let tx = &txs[0];
|
||||||
|
assert_eq!(tx.index, 1);
|
||||||
|
assert_eq!(tx.num_spends, 1);
|
||||||
|
assert_eq!(tx.num_outputs, 1);
|
||||||
|
assert_eq!(tx.shielded_spends.len(), 0);
|
||||||
|
assert_eq!(tx.shielded_outputs.len(), 1);
|
||||||
|
assert_eq!(tx.shielded_outputs[0].index, 0);
|
||||||
|
assert_eq!(tx.shielded_outputs[0].account, AccountId::from(0));
|
||||||
|
assert_eq!(tx.shielded_outputs[0].note.value, 5);
|
||||||
|
|
||||||
|
// Check that the witness root matches
|
||||||
|
assert_eq!(tx.shielded_outputs[0].witness.root(), tree.root());
|
||||||
|
}
|
||||||
|
|
||||||
|
go(false);
|
||||||
|
go(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
@ -389,6 +389,7 @@ impl OutputDescriptionV5 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct CompactOutputDescription {
|
pub struct CompactOutputDescription {
|
||||||
pub ephemeral_key: EphemeralKeyBytes,
|
pub ephemeral_key: EphemeralKeyBytes,
|
||||||
pub cmu: bls12_381::Scalar,
|
pub cmu: bls12_381::Scalar,
|
||||||
|
|
Loading…
Reference in New Issue