From 533722b70e8babc1b9f1f0f28d78e36c84ce928d Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Wed, 21 Sep 2022 23:38:44 +0000 Subject: [PATCH 1/2] zcash_client_sqlite: Add dynamic memory usage tracking to `BatchRunner` Ported from zcash/zcash@e88ea11055b21aa704c193d7b847ebf92cfc30ee. --- zcash_client_backend/Cargo.toml | 1 + zcash_client_backend/src/scan.rs | 184 ++++++++++++++++++++++++++++--- 2 files changed, 169 insertions(+), 16 deletions(-) diff --git a/zcash_client_backend/Cargo.toml b/zcash_client_backend/Cargo.toml index 4f2f002fd..2a36b8f14 100644 --- a/zcash_client_backend/Cargo.toml +++ b/zcash_client_backend/Cargo.toml @@ -26,6 +26,7 @@ hex = "0.4" hdwallet = { version = "0.3.1", optional = true } jubjub = "0.9" log = "0.4" +memuse = "0.2" nom = "7" orchard = "0.2" percent-encoding = "2.1.0" diff --git a/zcash_client_backend/src/scan.rs b/zcash_client_backend/src/scan.rs index e628850cf..8338a07f7 100644 --- a/zcash_client_backend/src/scan.rs +++ b/zcash_client_backend/src/scan.rs @@ -2,7 +2,12 @@ use crossbeam_channel as channel; use std::collections::HashMap; use std::fmt; use std::mem; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use memuse::DynamicUsage; use zcash_note_encryption::{batch, BatchDomain, Domain, ShieldedOutput, COMPACT_NOTE_SIZE}; use zcash_primitives::{block::BlockHash, transaction::TxId}; @@ -41,8 +46,56 @@ struct OutputIndex { value: V, } -type OutputReplier = OutputIndex>>>>; -type OutputResult = channel::Receiver>>>; +type OutputItem = OutputIndex>>; + +/// The sender for the result of batch scanning a specific transaction output. +struct OutputReplier(OutputIndex>>); + +impl DynamicUsage for OutputReplier { + #[inline(always)] + fn dynamic_usage(&self) -> usize { + // We count the memory usage of items in the channel on the receiver side. + 0 + } + + #[inline(always)] + fn dynamic_usage_bounds(&self) -> (usize, Option) { + (0, Some(0)) + } +} + +/// The receiver for the result of batch scanning a specific transaction. +struct BatchReceiver(channel::Receiver>); + +impl DynamicUsage for BatchReceiver { + fn dynamic_usage(&self) -> usize { + // We count the memory usage of items in the channel on the receiver side. + let num_items = self.0.len(); + + // We know we use unbounded channels, so the items in the channel are stored as a + // linked list. `crossbeam_channel` allocates memory for the linked list in blocks + // of 31 items. + const ITEMS_PER_BLOCK: usize = 31; + let num_blocks = (num_items + ITEMS_PER_BLOCK - 1) / ITEMS_PER_BLOCK; + + // The structure of a block is: + // - A pointer to the next block. + // - For each slot in the block: + // - Space for an item. + // - The state of the slot, stored as an AtomicUsize. + const PTR_SIZE: usize = std::mem::size_of::(); + let item_size = std::mem::size_of::>(); + const ATOMIC_USIZE_SIZE: usize = std::mem::size_of::(); + let block_size = PTR_SIZE + ITEMS_PER_BLOCK * (item_size + ATOMIC_USIZE_SIZE); + + num_blocks * block_size + } + + fn dynamic_usage_bounds(&self) -> (usize, Option) { + let usage = self.dynamic_usage(); + (usage, Some(usage)) + } +} /// A batch of outputs to trial decrypt. struct Batch> { @@ -57,6 +110,35 @@ struct Batch> { /// (that is captured in the outer `OutputIndex` of each `OutputReplier`). outputs: Vec<(D, Output)>, repliers: Vec>, + // Pointer to the parent `BatchRunner`'s heap usage tracker for running batches. + running_usage: Arc, +} + +fn base_vec_usage(c: &Vec) -> usize { + c.capacity() * mem::size_of::() +} + +impl DynamicUsage for Batch +where + D: BatchDomain, + Output: ShieldedOutput, +{ + fn dynamic_usage(&self) -> usize { + // We don't have a `DynamicUsage` bound on `D::IncomingViewingKey`, `D`, or + // `Output`, and we can't use newtypes because the batch decryption API takes + // slices. But we know that we don't allocate memory inside either of these, so we + // just compute the size directly. + base_vec_usage(&self.ivks) + base_vec_usage(&self.outputs) + self.repliers.dynamic_usage() + } + + fn dynamic_usage_bounds(&self) -> (usize, Option) { + let base_usage = base_vec_usage(&self.ivks) + base_vec_usage(&self.outputs); + let bounds = self.repliers.dynamic_usage_bounds(); + ( + base_usage + bounds.0, + bounds.1.map(|upper| base_usage + upper), + ) + } } impl Batch @@ -66,13 +148,18 @@ where Output: ShieldedOutput, { /// Constructs a new batch. - fn new(tags: Vec, ivks: Vec) -> Self { + fn new( + tags: Vec, + ivks: Vec, + running_usage: Arc, + ) -> Self { assert_eq!(tags.len(), ivks.len()); Self { tags, ivks, outputs: vec![], repliers: vec![], + running_usage, } } @@ -83,10 +170,17 @@ where /// Runs the batch of trial decryptions, and reports the results. fn run(self) { + // Approximate now as when the heap cost of this running batch begins. We use the + // size of `self` as a lower bound on the actual heap memory allocated by the + // rayon threadpool to store this `Batch`. + let own_usage = std::mem::size_of_val(&self) + self.dynamic_usage(); + self.running_usage.fetch_add(own_usage, Ordering::SeqCst); + assert_eq!(self.outputs.len(), self.repliers.len()); let decryption_results = batch::try_compact_note_decryption(&self.ivks, &self.outputs); - for (decryption_result, replier) in decryption_results.into_iter().zip(self.repliers.iter()) + for (decryption_result, OutputReplier(replier)) in + decryption_results.into_iter().zip(self.repliers.iter()) { let result = OutputIndex { output_index: replier.output_index, @@ -99,9 +193,12 @@ where if replier.value.send(result).is_err() { tracing::debug!("BatchRunner was dropped before batch finished"); - return; + break; } } + + // Signal that the heap memory for this batch is about to be freed. + self.running_usage.fetch_sub(own_usage, Ordering::SeqCst); } } @@ -113,25 +210,73 @@ impl + Clone> Ba &mut self, domain: impl Fn() -> D, outputs: &[Output], - replier: channel::Sender>>>, + replier: channel::Sender>, ) { self.outputs .extend(outputs.iter().cloned().map(|output| (domain(), output))); - self.repliers - .extend((0..outputs.len()).map(|output_index| OutputIndex { + self.repliers.extend((0..outputs.len()).map(|output_index| { + OutputReplier(OutputIndex { output_index, value: replier.clone(), - })); + }) + })); } } -type ResultKey = (BlockHash, TxId); +/// A `HashMap` key for looking up the result of a batch scanning a specific transaction. +#[derive(PartialEq, Eq, Hash)] +struct ResultKey(BlockHash, TxId); + +impl DynamicUsage for ResultKey { + #[inline(always)] + fn dynamic_usage(&self) -> usize { + 0 + } + + #[inline(always)] + fn dynamic_usage_bounds(&self) -> (usize, Option) { + (0, Some(0)) + } +} /// Logic to run batches of trial decryptions on the global threadpool. pub(crate) struct BatchRunner> { batch_size_threshold: usize, + // The batch currently being accumulated. acc: Batch, - pending_results: HashMap>, + // The dynamic memory usage of the running batches. + running_usage: Arc, + // Receivers for the results of the running batches. + pending_results: HashMap>, +} + +impl DynamicUsage for BatchRunner +where + D: BatchDomain, + Output: ShieldedOutput, +{ + fn dynamic_usage(&self) -> usize { + self.acc.dynamic_usage() + + self.running_usage.load(Ordering::Relaxed) + + self.pending_results.dynamic_usage() + } + + fn dynamic_usage_bounds(&self) -> (usize, Option) { + let running_usage = self.running_usage.load(Ordering::Relaxed); + + let bounds = ( + self.acc.dynamic_usage_bounds(), + self.pending_results.dynamic_usage_bounds(), + ); + ( + bounds.0 .0 + running_usage + bounds.1 .0, + bounds + .0 + .1 + .zip(bounds.1 .1) + .map(|(a, b)| a + running_usage + b), + ) + } } impl BatchRunner @@ -146,9 +291,11 @@ where ivks: impl Iterator, ) -> Self { let (tags, ivks) = ivks.unzip(); + let running_usage = Arc::new(AtomicUsize::new(0)); Self { batch_size_threshold, - acc: Batch::new(tags, ivks), + acc: Batch::new(tags, ivks, running_usage.clone()), + running_usage, pending_results: HashMap::default(), } } @@ -182,7 +329,8 @@ where ) { let (tx, rx) = channel::unbounded(); self.acc.add_outputs(domain, outputs, tx); - self.pending_results.insert((block_tag, txid), rx); + self.pending_results + .insert(ResultKey(block_tag, txid), BatchReceiver(rx)); if self.acc.outputs.len() >= self.batch_size_threshold { self.flush(); @@ -194,7 +342,11 @@ where /// Subsequent calls to `Self::add_outputs` will be accumulated into a new batch. pub(crate) fn flush(&mut self) { if !self.acc.is_empty() { - let mut batch = Batch::new(self.acc.tags.clone(), self.acc.ivks.clone()); + let mut batch = Batch::new( + self.acc.tags.clone(), + self.acc.ivks.clone(), + self.running_usage.clone(), + ); mem::swap(&mut batch, &mut self.acc); rayon::spawn_fifo(|| batch.run()); } @@ -211,10 +363,10 @@ where txid: TxId, ) -> HashMap<(TxId, usize), DecryptedNote> { self.pending_results - .remove(&(block_tag, txid)) + .remove(&ResultKey(block_tag, txid)) // We won't have a pending result if the transaction didn't have outputs of // this runner's kind. - .map(|rx| { + .map(|BatchReceiver(rx)| { rx.into_iter() .filter_map( |OutputIndex { From 9a1d61cb4e9c4661c59079b4ae287d5c6c5ff3de Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Wed, 21 Sep 2022 23:51:40 +0000 Subject: [PATCH 2/2] zcash_client_backend: Only store successes in batch scanner Previously we were sending an `Option` from each `Batch` back to its parent `BatchRunner`. However, this requires allocating sufficient space in the channel to handle the case where every output can be decrypted. In general this will not be the case, and we can instead signal "nothing decrypted" by just dropping the channel sender. This reduces the post-batch-scanning memory usage of `BatchRunner` from being linear in the number of on-chain outputs, to being linear in the number of outputs for the wallet. Ported from zcash/zcash@f7f6c2070dd3bdae70e3ee9d6e98f9501ff61d41. --- zcash_client_backend/src/scan.rs | 57 +++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/zcash_client_backend/src/scan.rs b/zcash_client_backend/src/scan.rs index 8338a07f7..0752f99b1 100644 --- a/zcash_client_backend/src/scan.rs +++ b/zcash_client_backend/src/scan.rs @@ -46,7 +46,7 @@ struct OutputIndex { value: V, } -type OutputItem = OutputIndex>>; +type OutputItem = OutputIndex>; /// The sender for the result of batch scanning a specific transaction output. struct OutputReplier(OutputIndex>>); @@ -176,29 +176,42 @@ where let own_usage = std::mem::size_of_val(&self) + self.dynamic_usage(); self.running_usage.fetch_add(own_usage, Ordering::SeqCst); - assert_eq!(self.outputs.len(), self.repliers.len()); + // Deconstruct self so we can consume the pieces individually. + let Self { + tags, + ivks, + outputs, + repliers, + running_usage, + } = self; - let decryption_results = batch::try_compact_note_decryption(&self.ivks, &self.outputs); + assert_eq!(outputs.len(), repliers.len()); + + let decryption_results = batch::try_compact_note_decryption(&ivks, &outputs); for (decryption_result, OutputReplier(replier)) in - decryption_results.into_iter().zip(self.repliers.iter()) + decryption_results.into_iter().zip(repliers.into_iter()) { - let result = OutputIndex { - output_index: replier.output_index, - value: decryption_result.map(|((note, recipient), ivk_idx)| DecryptedNote { - ivk_tag: self.tags[ivk_idx].clone(), - recipient, - note, - }), - }; + // If `decryption_result` is `None` then we will just drop `replier`, + // indicating to the parent `BatchRunner` that this output was not for us. + if let Some(((note, recipient), ivk_idx)) = decryption_result { + let result = OutputIndex { + output_index: replier.output_index, + value: DecryptedNote { + ivk_tag: tags[ivk_idx].clone(), + recipient, + note, + }, + }; - if replier.value.send(result).is_err() { - tracing::debug!("BatchRunner was dropped before batch finished"); - break; + if replier.value.send(result).is_err() { + tracing::debug!("BatchRunner was dropped before batch finished"); + break; + } } } // Signal that the heap memory for this batch is about to be freed. - self.running_usage.fetch_sub(own_usage, Ordering::SeqCst); + running_usage.fetch_sub(own_usage, Ordering::SeqCst); } } @@ -367,14 +380,18 @@ where // We won't have a pending result if the transaction didn't have outputs of // this runner's kind. .map(|BatchReceiver(rx)| { + // This iterator will end once the channel becomes empty and disconnected. + // We created one sender per output, and each sender is dropped after the + // batch it is in completes (and in the case of successful decryptions, + // after the decrypted note has been sent to the channel). Completion of + // the iterator therefore corresponds to complete knowledge of the outputs + // of this transaction that could be decrypted. rx.into_iter() - .filter_map( + .map( |OutputIndex { output_index, value, - }| { - value.map(|decrypted_note| ((txid, output_index), decrypted_note)) - }, + }| { ((txid, output_index), value) }, ) .collect() })