zcash_client_backend: Move heap tracking of batch tasks behind a trait
This enables the heap usage measurements to be conditionally enabled by the `BatchRunner` user. Importantly, when heap usage measurements are not enabled, the `DynamicUsage` bound on `Batch` is not required. This refactor also fixes a bug in the prior implementation. We were counting the heap usage of a task when it started to run, but the item may have been in the `rayon` work-stealing queues for a non-negligible period before then. We now count the heap usage immediately before spawning the task into the `rayon` thread pool.
This commit is contained in:
parent
fe258ca120
commit
c98f04330d
|
@ -232,7 +232,7 @@ where
|
||||||
// Get the nullifiers for the notes we are tracking
|
// Get the nullifiers for the notes we are tracking
|
||||||
let mut nullifiers = data.get_nullifiers()?;
|
let mut nullifiers = data.get_nullifiers()?;
|
||||||
|
|
||||||
let mut batch_runner = BatchRunner::new(
|
let mut batch_runner = BatchRunner::<_, _, _, ()>::new(
|
||||||
100,
|
100,
|
||||||
dfvks
|
dfvks
|
||||||
.iter()
|
.iter()
|
||||||
|
|
|
@ -97,8 +97,113 @@ impl<A, D: Domain> DynamicUsage for BatchReceiver<A, D> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A tracker for the batch scanning tasks that are currently running.
|
||||||
|
///
|
||||||
|
/// This enables a [`BatchRunner`] to be optionally configured to track heap memory usage.
|
||||||
|
pub(crate) trait Tasks<Item> {
|
||||||
|
type Task: Task;
|
||||||
|
fn new() -> Self;
|
||||||
|
fn add_task(&self, item: Item) -> Self::Task;
|
||||||
|
fn run_task(&self, item: Item) {
|
||||||
|
let task = self.add_task(item);
|
||||||
|
rayon::spawn_fifo(|| task.run());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A batch scanning task.
|
||||||
|
pub(crate) trait Task: Send + 'static {
|
||||||
|
fn run(self);
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Item: Task> Tasks<Item> for () {
|
||||||
|
type Task = Item;
|
||||||
|
fn new() -> Self {}
|
||||||
|
fn add_task(&self, item: Item) -> Self::Task {
|
||||||
|
// Return the item itself as the task; we aren't tracking anything about it, so
|
||||||
|
// there is no need to wrap it in a newtype.
|
||||||
|
item
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A task tracker that measures heap usage.
|
||||||
|
///
|
||||||
|
/// This struct implements `DynamicUsage` without any item bounds, but that works because
|
||||||
|
/// it only implements `Tasks` for items that implement `DynamicUsage`.
|
||||||
|
pub(crate) struct WithUsage {
|
||||||
|
// The current heap usage for all running tasks.
|
||||||
|
running_usage: Arc<AtomicUsize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DynamicUsage for WithUsage {
|
||||||
|
fn dynamic_usage(&self) -> usize {
|
||||||
|
self.running_usage.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
|
||||||
|
// Tasks are relatively short-lived, so we accept the inaccuracy of treating the
|
||||||
|
// tasks's approximate usage as its bounds.
|
||||||
|
let usage = self.dynamic_usage();
|
||||||
|
(usage, Some(usage))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Item: Task + DynamicUsage> Tasks<Item> for WithUsage {
|
||||||
|
type Task = WithUsageTask<Item>;
|
||||||
|
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
running_usage: Arc::new(AtomicUsize::new(0)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_task(&self, item: Item) -> Self::Task {
|
||||||
|
// Create the task that will move onto the heap with the batch item.
|
||||||
|
let mut task = WithUsageTask {
|
||||||
|
item,
|
||||||
|
own_usage: 0,
|
||||||
|
running_usage: self.running_usage.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// We use the size of `self` as a lower bound on the actual heap memory allocated
|
||||||
|
// by the rayon threadpool to store this `Batch`.
|
||||||
|
task.own_usage = mem::size_of_val(&task) + task.item.dynamic_usage();
|
||||||
|
|
||||||
|
// Approximate now as when the heap cost of this running batch begins. In practice
|
||||||
|
// this is fine, because `Self::add_task` is called from `Self::run_task` which
|
||||||
|
// immediately moves the task to the heap.
|
||||||
|
self.running_usage
|
||||||
|
.fetch_add(task.own_usage, Ordering::SeqCst);
|
||||||
|
|
||||||
|
task
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A task that will clean up its own heap usage from the overall running usage once it is
|
||||||
|
/// complete.
|
||||||
|
pub(crate) struct WithUsageTask<Item> {
|
||||||
|
/// The item being run.
|
||||||
|
item: Item,
|
||||||
|
/// Size of this task on the heap. We assume that the size of the task does not change
|
||||||
|
/// once it has been created, to avoid needing to maintain bidirectional channels
|
||||||
|
/// between [`WithUsage`] and its tasks.
|
||||||
|
own_usage: usize,
|
||||||
|
/// Pointer to the parent [`WithUsage`]'s heap usage tracker for running tasks.
|
||||||
|
running_usage: Arc<AtomicUsize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Item: Task> Task for WithUsageTask<Item> {
|
||||||
|
fn run(self) {
|
||||||
|
// Run the item.
|
||||||
|
self.item.run();
|
||||||
|
|
||||||
|
// Signal that the heap memory for this task has been freed.
|
||||||
|
self.running_usage
|
||||||
|
.fetch_sub(self.own_usage, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A batch of outputs to trial decrypt.
|
/// A batch of outputs to trial decrypt.
|
||||||
struct Batch<A, D: BatchDomain, Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>> {
|
pub(crate) struct Batch<A, D: BatchDomain, Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>> {
|
||||||
tags: Vec<A>,
|
tags: Vec<A>,
|
||||||
ivks: Vec<D::IncomingViewingKey>,
|
ivks: Vec<D::IncomingViewingKey>,
|
||||||
/// We currently store outputs and repliers as parallel vectors, because
|
/// We currently store outputs and repliers as parallel vectors, because
|
||||||
|
@ -110,8 +215,6 @@ struct Batch<A, D: BatchDomain, Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>> {
|
||||||
/// (that is captured in the outer `OutputIndex` of each `OutputReplier`).
|
/// (that is captured in the outer `OutputIndex` of each `OutputReplier`).
|
||||||
outputs: Vec<(D, Output)>,
|
outputs: Vec<(D, Output)>,
|
||||||
repliers: Vec<OutputReplier<A, D>>,
|
repliers: Vec<OutputReplier<A, D>>,
|
||||||
// Pointer to the parent `BatchRunner`'s heap usage tracker for running batches.
|
|
||||||
running_usage: Arc<AtomicUsize>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn base_vec_usage<T>(c: &Vec<T>) -> usize {
|
fn base_vec_usage<T>(c: &Vec<T>) -> usize {
|
||||||
|
@ -152,18 +255,13 @@ where
|
||||||
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
|
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
|
||||||
{
|
{
|
||||||
/// Constructs a new batch.
|
/// Constructs a new batch.
|
||||||
fn new(
|
fn new(tags: Vec<A>, ivks: Vec<D::IncomingViewingKey>) -> Self {
|
||||||
tags: Vec<A>,
|
|
||||||
ivks: Vec<D::IncomingViewingKey>,
|
|
||||||
running_usage: Arc<AtomicUsize>,
|
|
||||||
) -> Self {
|
|
||||||
assert_eq!(tags.len(), ivks.len());
|
assert_eq!(tags.len(), ivks.len());
|
||||||
Self {
|
Self {
|
||||||
tags,
|
tags,
|
||||||
ivks,
|
ivks,
|
||||||
outputs: vec![],
|
outputs: vec![],
|
||||||
repliers: vec![],
|
repliers: vec![],
|
||||||
running_usage,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,22 +269,26 @@ where
|
||||||
fn is_empty(&self) -> bool {
|
fn is_empty(&self) -> bool {
|
||||||
self.outputs.is_empty()
|
self.outputs.is_empty()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A, D, Output> Task for Batch<A, D, Output>
|
||||||
|
where
|
||||||
|
A: Clone + Send + 'static,
|
||||||
|
D: BatchDomain + Send + 'static,
|
||||||
|
D::IncomingViewingKey: Send,
|
||||||
|
D::Memo: Send,
|
||||||
|
D::Note: Send,
|
||||||
|
D::Recipient: Send,
|
||||||
|
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE> + Send + 'static,
|
||||||
|
{
|
||||||
/// Runs the batch of trial decryptions, and reports the results.
|
/// Runs the batch of trial decryptions, and reports the results.
|
||||||
fn run(self) {
|
fn run(self) {
|
||||||
// Approximate now as when the heap cost of this running batch begins. We use the
|
|
||||||
// size of `self` as a lower bound on the actual heap memory allocated by the
|
|
||||||
// rayon threadpool to store this `Batch`.
|
|
||||||
let own_usage = std::mem::size_of_val(&self) + self.dynamic_usage();
|
|
||||||
self.running_usage.fetch_add(own_usage, Ordering::SeqCst);
|
|
||||||
|
|
||||||
// Deconstruct self so we can consume the pieces individually.
|
// Deconstruct self so we can consume the pieces individually.
|
||||||
let Self {
|
let Self {
|
||||||
tags,
|
tags,
|
||||||
ivks,
|
ivks,
|
||||||
outputs,
|
outputs,
|
||||||
repliers,
|
repliers,
|
||||||
running_usage,
|
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
assert_eq!(outputs.len(), repliers.len());
|
assert_eq!(outputs.len(), repliers.len());
|
||||||
|
@ -213,9 +315,6 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Signal that the heap memory for this batch is about to be freed.
|
|
||||||
running_usage.fetch_sub(own_usage, Ordering::SeqCst);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,29 +356,35 @@ impl DynamicUsage for ResultKey {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Logic to run batches of trial decryptions on the global threadpool.
|
/// Logic to run batches of trial decryptions on the global threadpool.
|
||||||
pub(crate) struct BatchRunner<A, D: BatchDomain, Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>> {
|
pub(crate) struct BatchRunner<A, D, Output, T>
|
||||||
|
where
|
||||||
|
D: BatchDomain,
|
||||||
|
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
|
||||||
|
T: Tasks<Batch<A, D, Output>>,
|
||||||
|
{
|
||||||
batch_size_threshold: usize,
|
batch_size_threshold: usize,
|
||||||
// The batch currently being accumulated.
|
// The batch currently being accumulated.
|
||||||
acc: Batch<A, D, Output>,
|
acc: Batch<A, D, Output>,
|
||||||
// The dynamic memory usage of the running batches.
|
// The running batches.
|
||||||
running_usage: Arc<AtomicUsize>,
|
running_tasks: T,
|
||||||
// Receivers for the results of the running batches.
|
// Receivers for the results of the running batches.
|
||||||
pending_results: HashMap<ResultKey, BatchReceiver<A, D>>,
|
pending_results: HashMap<ResultKey, BatchReceiver<A, D>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, D, Output> DynamicUsage for BatchRunner<A, D, Output>
|
impl<A, D, Output, T> DynamicUsage for BatchRunner<A, D, Output, T>
|
||||||
where
|
where
|
||||||
D: BatchDomain,
|
D: BatchDomain,
|
||||||
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
|
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
|
||||||
|
T: Tasks<Batch<A, D, Output>> + DynamicUsage,
|
||||||
{
|
{
|
||||||
fn dynamic_usage(&self) -> usize {
|
fn dynamic_usage(&self) -> usize {
|
||||||
self.acc.dynamic_usage()
|
self.acc.dynamic_usage()
|
||||||
+ self.running_usage.load(Ordering::Relaxed)
|
+ self.running_tasks.dynamic_usage()
|
||||||
+ self.pending_results.dynamic_usage()
|
+ self.pending_results.dynamic_usage()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
|
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
|
||||||
let running_usage = self.running_usage.load(Ordering::Relaxed);
|
let running_usage = self.running_tasks.dynamic_usage();
|
||||||
|
|
||||||
let bounds = (
|
let bounds = (
|
||||||
self.acc.dynamic_usage_bounds(),
|
self.acc.dynamic_usage_bounds(),
|
||||||
|
@ -296,11 +401,12 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, D, Output> BatchRunner<A, D, Output>
|
impl<A, D, Output, T> BatchRunner<A, D, Output, T>
|
||||||
where
|
where
|
||||||
A: Clone,
|
A: Clone,
|
||||||
D: BatchDomain,
|
D: BatchDomain,
|
||||||
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
|
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE>,
|
||||||
|
T: Tasks<Batch<A, D, Output>>,
|
||||||
{
|
{
|
||||||
/// Constructs a new batch runner for the given incoming viewing keys.
|
/// Constructs a new batch runner for the given incoming viewing keys.
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
|
@ -308,17 +414,16 @@ where
|
||||||
ivks: impl Iterator<Item = (A, D::IncomingViewingKey)>,
|
ivks: impl Iterator<Item = (A, D::IncomingViewingKey)>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (tags, ivks) = ivks.unzip();
|
let (tags, ivks) = ivks.unzip();
|
||||||
let running_usage = Arc::new(AtomicUsize::new(0));
|
|
||||||
Self {
|
Self {
|
||||||
batch_size_threshold,
|
batch_size_threshold,
|
||||||
acc: Batch::new(tags, ivks, running_usage.clone()),
|
acc: Batch::new(tags, ivks),
|
||||||
running_usage,
|
running_tasks: T::new(),
|
||||||
pending_results: HashMap::default(),
|
pending_results: HashMap::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, D, Output> BatchRunner<A, D, Output>
|
impl<A, D, Output, T> BatchRunner<A, D, Output, T>
|
||||||
where
|
where
|
||||||
A: Clone + Send + 'static,
|
A: Clone + Send + 'static,
|
||||||
D: BatchDomain + Send + 'static,
|
D: BatchDomain + Send + 'static,
|
||||||
|
@ -327,6 +432,7 @@ where
|
||||||
D::Note: Send,
|
D::Note: Send,
|
||||||
D::Recipient: Send,
|
D::Recipient: Send,
|
||||||
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE> + Clone + Send + 'static,
|
Output: ShieldedOutput<D, COMPACT_NOTE_SIZE> + Clone + Send + 'static,
|
||||||
|
T: Tasks<Batch<A, D, Output>>,
|
||||||
{
|
{
|
||||||
/// Batches the given outputs for trial decryption.
|
/// Batches the given outputs for trial decryption.
|
||||||
///
|
///
|
||||||
|
@ -359,13 +465,9 @@ where
|
||||||
/// Subsequent calls to `Self::add_outputs` will be accumulated into a new batch.
|
/// Subsequent calls to `Self::add_outputs` will be accumulated into a new batch.
|
||||||
pub(crate) fn flush(&mut self) {
|
pub(crate) fn flush(&mut self) {
|
||||||
if !self.acc.is_empty() {
|
if !self.acc.is_empty() {
|
||||||
let mut batch = Batch::new(
|
let mut batch = Batch::new(self.acc.tags.clone(), self.acc.ivks.clone());
|
||||||
self.acc.tags.clone(),
|
|
||||||
self.acc.ivks.clone(),
|
|
||||||
self.running_usage.clone(),
|
|
||||||
);
|
|
||||||
mem::swap(&mut batch, &mut self.acc);
|
mem::swap(&mut batch, &mut self.acc);
|
||||||
rayon::spawn_fifo(|| batch.run());
|
self.running_tasks.run_task(batch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ use zcash_primitives::{
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
proto::compact_formats::CompactBlock,
|
proto::compact_formats::CompactBlock,
|
||||||
scan::BatchRunner,
|
scan::{Batch, BatchRunner, Tasks},
|
||||||
wallet::{WalletShieldedOutput, WalletShieldedSpend, WalletTx},
|
wallet::{WalletShieldedOutput, WalletShieldedSpend, WalletTx},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -166,7 +166,7 @@ pub fn scan_block<P: consensus::Parameters + Send + 'static, K: ScanningKey>(
|
||||||
tree: &mut CommitmentTree<Node>,
|
tree: &mut CommitmentTree<Node>,
|
||||||
existing_witnesses: &mut [&mut IncrementalWitness<Node>],
|
existing_witnesses: &mut [&mut IncrementalWitness<Node>],
|
||||||
) -> Vec<WalletTx<K::Nf>> {
|
) -> Vec<WalletTx<K::Nf>> {
|
||||||
scan_block_with_runner(
|
scan_block_with_runner::<_, _, ()>(
|
||||||
params,
|
params,
|
||||||
block,
|
block,
|
||||||
vks,
|
vks,
|
||||||
|
@ -177,16 +177,18 @@ pub fn scan_block<P: consensus::Parameters + Send + 'static, K: ScanningKey>(
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
type TaggedBatchRunner<P, S> =
|
type TaggedBatch<P, S> = Batch<(AccountId, S), SaplingDomain<P>, CompactOutputDescription>;
|
||||||
BatchRunner<(AccountId, S), SaplingDomain<P>, CompactOutputDescription>;
|
type TaggedBatchRunner<P, S, T> =
|
||||||
|
BatchRunner<(AccountId, S), SaplingDomain<P>, CompactOutputDescription, T>;
|
||||||
|
|
||||||
pub(crate) fn add_block_to_runner<P, S>(
|
pub(crate) fn add_block_to_runner<P, S, T>(
|
||||||
params: &P,
|
params: &P,
|
||||||
block: CompactBlock,
|
block: CompactBlock,
|
||||||
batch_runner: &mut TaggedBatchRunner<P, S>,
|
batch_runner: &mut TaggedBatchRunner<P, S, T>,
|
||||||
) where
|
) where
|
||||||
P: consensus::Parameters + Send + 'static,
|
P: consensus::Parameters + Send + 'static,
|
||||||
S: Clone + Send + 'static,
|
S: Clone + Send + 'static,
|
||||||
|
T: Tasks<TaggedBatch<P, S>>,
|
||||||
{
|
{
|
||||||
let block_hash = block.hash();
|
let block_hash = block.hash();
|
||||||
let block_height = block.height();
|
let block_height = block.height();
|
||||||
|
@ -211,14 +213,18 @@ pub(crate) fn add_block_to_runner<P, S>(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn scan_block_with_runner<P: consensus::Parameters + Send + 'static, K: ScanningKey>(
|
pub(crate) fn scan_block_with_runner<
|
||||||
|
P: consensus::Parameters + Send + 'static,
|
||||||
|
K: ScanningKey,
|
||||||
|
T: Tasks<TaggedBatch<P, K::Scope>> + Sync,
|
||||||
|
>(
|
||||||
params: &P,
|
params: &P,
|
||||||
block: CompactBlock,
|
block: CompactBlock,
|
||||||
vks: &[(&AccountId, &K)],
|
vks: &[(&AccountId, &K)],
|
||||||
nullifiers: &[(AccountId, Nullifier)],
|
nullifiers: &[(AccountId, Nullifier)],
|
||||||
tree: &mut CommitmentTree<Node>,
|
tree: &mut CommitmentTree<Node>,
|
||||||
existing_witnesses: &mut [&mut IncrementalWitness<Node>],
|
existing_witnesses: &mut [&mut IncrementalWitness<Node>],
|
||||||
mut batch_runner: Option<&mut TaggedBatchRunner<P, K::Scope>>,
|
mut batch_runner: Option<&mut TaggedBatchRunner<P, K::Scope, T>>,
|
||||||
) -> Vec<WalletTx<K::Nf>> {
|
) -> Vec<WalletTx<K::Nf>> {
|
||||||
let mut wtxs: Vec<WalletTx<K::Nf>> = vec![];
|
let mut wtxs: Vec<WalletTx<K::Nf>> = vec![];
|
||||||
let block_height = block.height();
|
let block_height = block.height();
|
||||||
|
@ -554,7 +560,7 @@ mod tests {
|
||||||
|
|
||||||
let mut tree = CommitmentTree::empty();
|
let mut tree = CommitmentTree::empty();
|
||||||
let mut batch_runner = if scan_multithreaded {
|
let mut batch_runner = if scan_multithreaded {
|
||||||
let mut runner = BatchRunner::new(
|
let mut runner = BatchRunner::<_, _, _, ()>::new(
|
||||||
10,
|
10,
|
||||||
extfvk
|
extfvk
|
||||||
.to_sapling_keys()
|
.to_sapling_keys()
|
||||||
|
@ -618,7 +624,7 @@ mod tests {
|
||||||
|
|
||||||
let mut tree = CommitmentTree::empty();
|
let mut tree = CommitmentTree::empty();
|
||||||
let mut batch_runner = if scan_multithreaded {
|
let mut batch_runner = if scan_multithreaded {
|
||||||
let mut runner = BatchRunner::new(
|
let mut runner = BatchRunner::<_, _, _, ()>::new(
|
||||||
10,
|
10,
|
||||||
extfvk
|
extfvk
|
||||||
.to_sapling_keys()
|
.to_sapling_keys()
|
||||||
|
|
Loading…
Reference in New Issue