diff --git a/zcash_client_backend/Cargo.toml b/zcash_client_backend/Cargo.toml index 4f2f002fd..2a36b8f14 100644 --- a/zcash_client_backend/Cargo.toml +++ b/zcash_client_backend/Cargo.toml @@ -26,6 +26,7 @@ hex = "0.4" hdwallet = { version = "0.3.1", optional = true } jubjub = "0.9" log = "0.4" +memuse = "0.2" nom = "7" orchard = "0.2" percent-encoding = "2.1.0" diff --git a/zcash_client_backend/src/scan.rs b/zcash_client_backend/src/scan.rs index e628850cf..0752f99b1 100644 --- a/zcash_client_backend/src/scan.rs +++ b/zcash_client_backend/src/scan.rs @@ -2,7 +2,12 @@ use crossbeam_channel as channel; use std::collections::HashMap; use std::fmt; use std::mem; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use memuse::DynamicUsage; use zcash_note_encryption::{batch, BatchDomain, Domain, ShieldedOutput, COMPACT_NOTE_SIZE}; use zcash_primitives::{block::BlockHash, transaction::TxId}; @@ -41,8 +46,56 @@ struct OutputIndex { value: V, } -type OutputReplier = OutputIndex>>>>; -type OutputResult = channel::Receiver>>>; +type OutputItem = OutputIndex>; + +/// The sender for the result of batch scanning a specific transaction output. +struct OutputReplier(OutputIndex>>); + +impl DynamicUsage for OutputReplier { + #[inline(always)] + fn dynamic_usage(&self) -> usize { + // We count the memory usage of items in the channel on the receiver side. + 0 + } + + #[inline(always)] + fn dynamic_usage_bounds(&self) -> (usize, Option) { + (0, Some(0)) + } +} + +/// The receiver for the result of batch scanning a specific transaction. +struct BatchReceiver(channel::Receiver>); + +impl DynamicUsage for BatchReceiver { + fn dynamic_usage(&self) -> usize { + // We count the memory usage of items in the channel on the receiver side. + let num_items = self.0.len(); + + // We know we use unbounded channels, so the items in the channel are stored as a + // linked list. `crossbeam_channel` allocates memory for the linked list in blocks + // of 31 items. + const ITEMS_PER_BLOCK: usize = 31; + let num_blocks = (num_items + ITEMS_PER_BLOCK - 1) / ITEMS_PER_BLOCK; + + // The structure of a block is: + // - A pointer to the next block. + // - For each slot in the block: + // - Space for an item. + // - The state of the slot, stored as an AtomicUsize. + const PTR_SIZE: usize = std::mem::size_of::(); + let item_size = std::mem::size_of::>(); + const ATOMIC_USIZE_SIZE: usize = std::mem::size_of::(); + let block_size = PTR_SIZE + ITEMS_PER_BLOCK * (item_size + ATOMIC_USIZE_SIZE); + + num_blocks * block_size + } + + fn dynamic_usage_bounds(&self) -> (usize, Option) { + let usage = self.dynamic_usage(); + (usage, Some(usage)) + } +} /// A batch of outputs to trial decrypt. struct Batch> { @@ -57,6 +110,35 @@ struct Batch> { /// (that is captured in the outer `OutputIndex` of each `OutputReplier`). outputs: Vec<(D, Output)>, repliers: Vec>, + // Pointer to the parent `BatchRunner`'s heap usage tracker for running batches. + running_usage: Arc, +} + +fn base_vec_usage(c: &Vec) -> usize { + c.capacity() * mem::size_of::() +} + +impl DynamicUsage for Batch +where + D: BatchDomain, + Output: ShieldedOutput, +{ + fn dynamic_usage(&self) -> usize { + // We don't have a `DynamicUsage` bound on `D::IncomingViewingKey`, `D`, or + // `Output`, and we can't use newtypes because the batch decryption API takes + // slices. But we know that we don't allocate memory inside either of these, so we + // just compute the size directly. + base_vec_usage(&self.ivks) + base_vec_usage(&self.outputs) + self.repliers.dynamic_usage() + } + + fn dynamic_usage_bounds(&self) -> (usize, Option) { + let base_usage = base_vec_usage(&self.ivks) + base_vec_usage(&self.outputs); + let bounds = self.repliers.dynamic_usage_bounds(); + ( + base_usage + bounds.0, + bounds.1.map(|upper| base_usage + upper), + ) + } } impl Batch @@ -66,13 +148,18 @@ where Output: ShieldedOutput, { /// Constructs a new batch. - fn new(tags: Vec, ivks: Vec) -> Self { + fn new( + tags: Vec, + ivks: Vec, + running_usage: Arc, + ) -> Self { assert_eq!(tags.len(), ivks.len()); Self { tags, ivks, outputs: vec![], repliers: vec![], + running_usage, } } @@ -83,25 +170,48 @@ where /// Runs the batch of trial decryptions, and reports the results. fn run(self) { - assert_eq!(self.outputs.len(), self.repliers.len()); + // Approximate now as when the heap cost of this running batch begins. We use the + // size of `self` as a lower bound on the actual heap memory allocated by the + // rayon threadpool to store this `Batch`. + let own_usage = std::mem::size_of_val(&self) + self.dynamic_usage(); + self.running_usage.fetch_add(own_usage, Ordering::SeqCst); - let decryption_results = batch::try_compact_note_decryption(&self.ivks, &self.outputs); - for (decryption_result, replier) in decryption_results.into_iter().zip(self.repliers.iter()) + // Deconstruct self so we can consume the pieces individually. + let Self { + tags, + ivks, + outputs, + repliers, + running_usage, + } = self; + + assert_eq!(outputs.len(), repliers.len()); + + let decryption_results = batch::try_compact_note_decryption(&ivks, &outputs); + for (decryption_result, OutputReplier(replier)) in + decryption_results.into_iter().zip(repliers.into_iter()) { - let result = OutputIndex { - output_index: replier.output_index, - value: decryption_result.map(|((note, recipient), ivk_idx)| DecryptedNote { - ivk_tag: self.tags[ivk_idx].clone(), - recipient, - note, - }), - }; + // If `decryption_result` is `None` then we will just drop `replier`, + // indicating to the parent `BatchRunner` that this output was not for us. + if let Some(((note, recipient), ivk_idx)) = decryption_result { + let result = OutputIndex { + output_index: replier.output_index, + value: DecryptedNote { + ivk_tag: tags[ivk_idx].clone(), + recipient, + note, + }, + }; - if replier.value.send(result).is_err() { - tracing::debug!("BatchRunner was dropped before batch finished"); - return; + if replier.value.send(result).is_err() { + tracing::debug!("BatchRunner was dropped before batch finished"); + break; + } } } + + // Signal that the heap memory for this batch is about to be freed. + running_usage.fetch_sub(own_usage, Ordering::SeqCst); } } @@ -113,25 +223,73 @@ impl + Clone> Ba &mut self, domain: impl Fn() -> D, outputs: &[Output], - replier: channel::Sender>>>, + replier: channel::Sender>, ) { self.outputs .extend(outputs.iter().cloned().map(|output| (domain(), output))); - self.repliers - .extend((0..outputs.len()).map(|output_index| OutputIndex { + self.repliers.extend((0..outputs.len()).map(|output_index| { + OutputReplier(OutputIndex { output_index, value: replier.clone(), - })); + }) + })); } } -type ResultKey = (BlockHash, TxId); +/// A `HashMap` key for looking up the result of a batch scanning a specific transaction. +#[derive(PartialEq, Eq, Hash)] +struct ResultKey(BlockHash, TxId); + +impl DynamicUsage for ResultKey { + #[inline(always)] + fn dynamic_usage(&self) -> usize { + 0 + } + + #[inline(always)] + fn dynamic_usage_bounds(&self) -> (usize, Option) { + (0, Some(0)) + } +} /// Logic to run batches of trial decryptions on the global threadpool. pub(crate) struct BatchRunner> { batch_size_threshold: usize, + // The batch currently being accumulated. acc: Batch, - pending_results: HashMap>, + // The dynamic memory usage of the running batches. + running_usage: Arc, + // Receivers for the results of the running batches. + pending_results: HashMap>, +} + +impl DynamicUsage for BatchRunner +where + D: BatchDomain, + Output: ShieldedOutput, +{ + fn dynamic_usage(&self) -> usize { + self.acc.dynamic_usage() + + self.running_usage.load(Ordering::Relaxed) + + self.pending_results.dynamic_usage() + } + + fn dynamic_usage_bounds(&self) -> (usize, Option) { + let running_usage = self.running_usage.load(Ordering::Relaxed); + + let bounds = ( + self.acc.dynamic_usage_bounds(), + self.pending_results.dynamic_usage_bounds(), + ); + ( + bounds.0 .0 + running_usage + bounds.1 .0, + bounds + .0 + .1 + .zip(bounds.1 .1) + .map(|(a, b)| a + running_usage + b), + ) + } } impl BatchRunner @@ -146,9 +304,11 @@ where ivks: impl Iterator, ) -> Self { let (tags, ivks) = ivks.unzip(); + let running_usage = Arc::new(AtomicUsize::new(0)); Self { batch_size_threshold, - acc: Batch::new(tags, ivks), + acc: Batch::new(tags, ivks, running_usage.clone()), + running_usage, pending_results: HashMap::default(), } } @@ -182,7 +342,8 @@ where ) { let (tx, rx) = channel::unbounded(); self.acc.add_outputs(domain, outputs, tx); - self.pending_results.insert((block_tag, txid), rx); + self.pending_results + .insert(ResultKey(block_tag, txid), BatchReceiver(rx)); if self.acc.outputs.len() >= self.batch_size_threshold { self.flush(); @@ -194,7 +355,11 @@ where /// Subsequent calls to `Self::add_outputs` will be accumulated into a new batch. pub(crate) fn flush(&mut self) { if !self.acc.is_empty() { - let mut batch = Batch::new(self.acc.tags.clone(), self.acc.ivks.clone()); + let mut batch = Batch::new( + self.acc.tags.clone(), + self.acc.ivks.clone(), + self.running_usage.clone(), + ); mem::swap(&mut batch, &mut self.acc); rayon::spawn_fifo(|| batch.run()); } @@ -211,18 +376,22 @@ where txid: TxId, ) -> HashMap<(TxId, usize), DecryptedNote> { self.pending_results - .remove(&(block_tag, txid)) + .remove(&ResultKey(block_tag, txid)) // We won't have a pending result if the transaction didn't have outputs of // this runner's kind. - .map(|rx| { + .map(|BatchReceiver(rx)| { + // This iterator will end once the channel becomes empty and disconnected. + // We created one sender per output, and each sender is dropped after the + // batch it is in completes (and in the case of successful decryptions, + // after the decrypted note has been sent to the channel). Completion of + // the iterator therefore corresponds to complete knowledge of the outputs + // of this transaction that could be decrypted. rx.into_iter() - .filter_map( + .map( |OutputIndex { output_index, value, - }| { - value.map(|decrypted_note| ((txid, output_index), decrypted_note)) - }, + }| { ((txid, output_index), value) }, ) .collect() })