From c98f04330d4b6ed0d23a7a3d35db44af5b64b2bc Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Sat, 24 Sep 2022 16:51:53 +0000 Subject: [PATCH] zcash_client_backend: Move heap tracking of batch tasks behind a trait This enables the heap usage measurements to be conditionally enabled by the `BatchRunner` user. Importantly, when heap usage measurements are not enabled, the `DynamicUsage` bound on `Batch` is not required. This refactor also fixes a bug in the prior implementation. We were counting the heap usage of a task when it started to run, but the item may have been in the `rayon` work-stealing queues for a non-negligible period before then. We now count the heap usage immediately before spawning the task into the `rayon` thread pool. --- zcash_client_backend/src/data_api/chain.rs | 2 +- zcash_client_backend/src/scan.rs | 174 ++++++++++++++++----- zcash_client_backend/src/welding_rig.rs | 26 +-- 3 files changed, 155 insertions(+), 47 deletions(-) diff --git a/zcash_client_backend/src/data_api/chain.rs b/zcash_client_backend/src/data_api/chain.rs index 784c949f5..f51bf7e58 100644 --- a/zcash_client_backend/src/data_api/chain.rs +++ b/zcash_client_backend/src/data_api/chain.rs @@ -232,7 +232,7 @@ where // Get the nullifiers for the notes we are tracking let mut nullifiers = data.get_nullifiers()?; - let mut batch_runner = BatchRunner::new( + let mut batch_runner = BatchRunner::<_, _, _, ()>::new( 100, dfvks .iter() diff --git a/zcash_client_backend/src/scan.rs b/zcash_client_backend/src/scan.rs index b6aaec4ac..30fc0841f 100644 --- a/zcash_client_backend/src/scan.rs +++ b/zcash_client_backend/src/scan.rs @@ -97,8 +97,113 @@ impl DynamicUsage for BatchReceiver { } } +/// A tracker for the batch scanning tasks that are currently running. +/// +/// This enables a [`BatchRunner`] to be optionally configured to track heap memory usage. +pub(crate) trait Tasks { + type Task: Task; + fn new() -> Self; + fn add_task(&self, item: Item) -> Self::Task; + fn run_task(&self, item: Item) { + let task = self.add_task(item); + rayon::spawn_fifo(|| task.run()); + } +} + +/// A batch scanning task. +pub(crate) trait Task: Send + 'static { + fn run(self); +} + +impl Tasks for () { + type Task = Item; + fn new() -> Self {} + fn add_task(&self, item: Item) -> Self::Task { + // Return the item itself as the task; we aren't tracking anything about it, so + // there is no need to wrap it in a newtype. + item + } +} + +/// A task tracker that measures heap usage. +/// +/// This struct implements `DynamicUsage` without any item bounds, but that works because +/// it only implements `Tasks` for items that implement `DynamicUsage`. +pub(crate) struct WithUsage { + // The current heap usage for all running tasks. + running_usage: Arc, +} + +impl DynamicUsage for WithUsage { + fn dynamic_usage(&self) -> usize { + self.running_usage.load(Ordering::Relaxed) + } + + fn dynamic_usage_bounds(&self) -> (usize, Option) { + // Tasks are relatively short-lived, so we accept the inaccuracy of treating the + // tasks's approximate usage as its bounds. + let usage = self.dynamic_usage(); + (usage, Some(usage)) + } +} + +impl Tasks for WithUsage { + type Task = WithUsageTask; + + fn new() -> Self { + Self { + running_usage: Arc::new(AtomicUsize::new(0)), + } + } + + fn add_task(&self, item: Item) -> Self::Task { + // Create the task that will move onto the heap with the batch item. + let mut task = WithUsageTask { + item, + own_usage: 0, + running_usage: self.running_usage.clone(), + }; + + // We use the size of `self` as a lower bound on the actual heap memory allocated + // by the rayon threadpool to store this `Batch`. + task.own_usage = mem::size_of_val(&task) + task.item.dynamic_usage(); + + // Approximate now as when the heap cost of this running batch begins. In practice + // this is fine, because `Self::add_task` is called from `Self::run_task` which + // immediately moves the task to the heap. + self.running_usage + .fetch_add(task.own_usage, Ordering::SeqCst); + + task + } +} + +/// A task that will clean up its own heap usage from the overall running usage once it is +/// complete. +pub(crate) struct WithUsageTask { + /// The item being run. + item: Item, + /// Size of this task on the heap. We assume that the size of the task does not change + /// once it has been created, to avoid needing to maintain bidirectional channels + /// between [`WithUsage`] and its tasks. + own_usage: usize, + /// Pointer to the parent [`WithUsage`]'s heap usage tracker for running tasks. + running_usage: Arc, +} + +impl Task for WithUsageTask { + fn run(self) { + // Run the item. + self.item.run(); + + // Signal that the heap memory for this task has been freed. + self.running_usage + .fetch_sub(self.own_usage, Ordering::SeqCst); + } +} + /// A batch of outputs to trial decrypt. -struct Batch> { +pub(crate) struct Batch> { tags: Vec, ivks: Vec, /// We currently store outputs and repliers as parallel vectors, because @@ -110,8 +215,6 @@ struct Batch> { /// (that is captured in the outer `OutputIndex` of each `OutputReplier`). outputs: Vec<(D, Output)>, repliers: Vec>, - // Pointer to the parent `BatchRunner`'s heap usage tracker for running batches. - running_usage: Arc, } fn base_vec_usage(c: &Vec) -> usize { @@ -152,18 +255,13 @@ where Output: ShieldedOutput, { /// Constructs a new batch. - fn new( - tags: Vec, - ivks: Vec, - running_usage: Arc, - ) -> Self { + fn new(tags: Vec, ivks: Vec) -> Self { assert_eq!(tags.len(), ivks.len()); Self { tags, ivks, outputs: vec![], repliers: vec![], - running_usage, } } @@ -171,22 +269,26 @@ where fn is_empty(&self) -> bool { self.outputs.is_empty() } +} +impl Task for Batch +where + A: Clone + Send + 'static, + D: BatchDomain + Send + 'static, + D::IncomingViewingKey: Send, + D::Memo: Send, + D::Note: Send, + D::Recipient: Send, + Output: ShieldedOutput + Send + 'static, +{ /// Runs the batch of trial decryptions, and reports the results. fn run(self) { - // Approximate now as when the heap cost of this running batch begins. We use the - // size of `self` as a lower bound on the actual heap memory allocated by the - // rayon threadpool to store this `Batch`. - let own_usage = std::mem::size_of_val(&self) + self.dynamic_usage(); - self.running_usage.fetch_add(own_usage, Ordering::SeqCst); - // Deconstruct self so we can consume the pieces individually. let Self { tags, ivks, outputs, repliers, - running_usage, } = self; assert_eq!(outputs.len(), repliers.len()); @@ -213,9 +315,6 @@ where } } } - - // Signal that the heap memory for this batch is about to be freed. - running_usage.fetch_sub(own_usage, Ordering::SeqCst); } } @@ -257,29 +356,35 @@ impl DynamicUsage for ResultKey { } /// Logic to run batches of trial decryptions on the global threadpool. -pub(crate) struct BatchRunner> { +pub(crate) struct BatchRunner +where + D: BatchDomain, + Output: ShieldedOutput, + T: Tasks>, +{ batch_size_threshold: usize, // The batch currently being accumulated. acc: Batch, - // The dynamic memory usage of the running batches. - running_usage: Arc, + // The running batches. + running_tasks: T, // Receivers for the results of the running batches. pending_results: HashMap>, } -impl DynamicUsage for BatchRunner +impl DynamicUsage for BatchRunner where D: BatchDomain, Output: ShieldedOutput, + T: Tasks> + DynamicUsage, { fn dynamic_usage(&self) -> usize { self.acc.dynamic_usage() - + self.running_usage.load(Ordering::Relaxed) + + self.running_tasks.dynamic_usage() + self.pending_results.dynamic_usage() } fn dynamic_usage_bounds(&self) -> (usize, Option) { - let running_usage = self.running_usage.load(Ordering::Relaxed); + let running_usage = self.running_tasks.dynamic_usage(); let bounds = ( self.acc.dynamic_usage_bounds(), @@ -296,11 +401,12 @@ where } } -impl BatchRunner +impl BatchRunner where A: Clone, D: BatchDomain, Output: ShieldedOutput, + T: Tasks>, { /// Constructs a new batch runner for the given incoming viewing keys. pub(crate) fn new( @@ -308,17 +414,16 @@ where ivks: impl Iterator, ) -> Self { let (tags, ivks) = ivks.unzip(); - let running_usage = Arc::new(AtomicUsize::new(0)); Self { batch_size_threshold, - acc: Batch::new(tags, ivks, running_usage.clone()), - running_usage, + acc: Batch::new(tags, ivks), + running_tasks: T::new(), pending_results: HashMap::default(), } } } -impl BatchRunner +impl BatchRunner where A: Clone + Send + 'static, D: BatchDomain + Send + 'static, @@ -327,6 +432,7 @@ where D::Note: Send, D::Recipient: Send, Output: ShieldedOutput + Clone + Send + 'static, + T: Tasks>, { /// Batches the given outputs for trial decryption. /// @@ -359,13 +465,9 @@ where /// Subsequent calls to `Self::add_outputs` will be accumulated into a new batch. pub(crate) fn flush(&mut self) { if !self.acc.is_empty() { - let mut batch = Batch::new( - self.acc.tags.clone(), - self.acc.ivks.clone(), - self.running_usage.clone(), - ); + let mut batch = Batch::new(self.acc.tags.clone(), self.acc.ivks.clone()); mem::swap(&mut batch, &mut self.acc); - rayon::spawn_fifo(|| batch.run()); + self.running_tasks.run_task(batch); } } diff --git a/zcash_client_backend/src/welding_rig.rs b/zcash_client_backend/src/welding_rig.rs index d31ef959c..e79fe3044 100644 --- a/zcash_client_backend/src/welding_rig.rs +++ b/zcash_client_backend/src/welding_rig.rs @@ -20,7 +20,7 @@ use zcash_primitives::{ use crate::{ proto::compact_formats::CompactBlock, - scan::BatchRunner, + scan::{Batch, BatchRunner, Tasks}, wallet::{WalletShieldedOutput, WalletShieldedSpend, WalletTx}, }; @@ -166,7 +166,7 @@ pub fn scan_block( tree: &mut CommitmentTree, existing_witnesses: &mut [&mut IncrementalWitness], ) -> Vec> { - scan_block_with_runner( + scan_block_with_runner::<_, _, ()>( params, block, vks, @@ -177,16 +177,18 @@ pub fn scan_block( ) } -type TaggedBatchRunner = - BatchRunner<(AccountId, S), SaplingDomain

, CompactOutputDescription>; +type TaggedBatch = Batch<(AccountId, S), SaplingDomain

, CompactOutputDescription>; +type TaggedBatchRunner = + BatchRunner<(AccountId, S), SaplingDomain

, CompactOutputDescription, T>; -pub(crate) fn add_block_to_runner( +pub(crate) fn add_block_to_runner( params: &P, block: CompactBlock, - batch_runner: &mut TaggedBatchRunner, + batch_runner: &mut TaggedBatchRunner, ) where P: consensus::Parameters + Send + 'static, S: Clone + Send + 'static, + T: Tasks>, { let block_hash = block.hash(); let block_height = block.height(); @@ -211,14 +213,18 @@ pub(crate) fn add_block_to_runner( } } -pub(crate) fn scan_block_with_runner( +pub(crate) fn scan_block_with_runner< + P: consensus::Parameters + Send + 'static, + K: ScanningKey, + T: Tasks> + Sync, +>( params: &P, block: CompactBlock, vks: &[(&AccountId, &K)], nullifiers: &[(AccountId, Nullifier)], tree: &mut CommitmentTree, existing_witnesses: &mut [&mut IncrementalWitness], - mut batch_runner: Option<&mut TaggedBatchRunner>, + mut batch_runner: Option<&mut TaggedBatchRunner>, ) -> Vec> { let mut wtxs: Vec> = vec![]; let block_height = block.height(); @@ -554,7 +560,7 @@ mod tests { let mut tree = CommitmentTree::empty(); let mut batch_runner = if scan_multithreaded { - let mut runner = BatchRunner::new( + let mut runner = BatchRunner::<_, _, _, ()>::new( 10, extfvk .to_sapling_keys() @@ -618,7 +624,7 @@ mod tests { let mut tree = CommitmentTree::empty(); let mut batch_runner = if scan_multithreaded { - let mut runner = BatchRunner::new( + let mut runner = BatchRunner::<_, _, _, ()>::new( 10, extfvk .to_sapling_keys()