wallet: Move heap tracking of batch tasks behind a trait

This enables the heap usage measurements to be conditionally enabled by
the `BatchRunner` user. Importantly, when heap usage measurements are
not enabled, the `DynamicUsage` bound on `Batch` is not required.

This refactor also fixes a bug in the prior implementation. We were
counting the heap usage of a task when it started to run, but the item
may have been in the `rayon` work-stealing queues for a non-negligible
period before then. We now count the heap usage immediately before
spawning the task into the `rayon` thread pool.

Ported from zcash/librustzcash@c98f04330d.
This commit is contained in:
Jack Grigg 2022-09-25 22:27:58 +00:00
parent 3139559ee9
commit 0ba43dc714
1 changed files with 141 additions and 37 deletions

View File

@ -272,6 +272,111 @@ impl<A, D: Domain> DynamicUsage for OutputReplier<A, D> {
}
}
/// A tracker for the batch scanning tasks that are currently running.
///
/// This enables a [`BatchRunner`] to be optionally configured to track heap memory usage.
pub(crate) trait Tasks<Item> {
type Task: Task;
fn new() -> Self;
fn add_task(&self, item: Item) -> Self::Task;
fn run_task(&self, item: Item) {
let task = self.add_task(item);
rayon::spawn_fifo(|| task.run());
}
}
/// A batch scanning task.
pub(crate) trait Task: Send + 'static {
fn run(self);
}
impl<Item: Task> Tasks<Item> for () {
type Task = Item;
fn new() -> Self {}
fn add_task(&self, item: Item) -> Self::Task {
// Return the item itself as the task; we aren't tracking anything about it, so
// there is no need to wrap it in a newtype.
item
}
}
/// A task tracker that measures heap usage.
///
/// This struct implements `DynamicUsage` without any item bounds, but that works because
/// it only implements `Tasks` for items that implement `DynamicUsage`.
pub(crate) struct WithUsage {
// The current heap usage for all running tasks.
running_usage: Arc<AtomicUsize>,
}
impl DynamicUsage for WithUsage {
fn dynamic_usage(&self) -> usize {
self.running_usage.load(Ordering::Relaxed)
}
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
// Tasks are relatively short-lived, so we accept the inaccuracy of treating the
// tasks's approximate usage as its bounds.
let usage = self.dynamic_usage();
(usage, Some(usage))
}
}
impl<Item: Task + DynamicUsage> Tasks<Item> for WithUsage {
type Task = WithUsageTask<Item>;
fn new() -> Self {
Self {
running_usage: Arc::new(AtomicUsize::new(0)),
}
}
fn add_task(&self, item: Item) -> Self::Task {
// Create the task that will move onto the heap with the batch item.
let mut task = WithUsageTask {
item,
own_usage: 0,
running_usage: self.running_usage.clone(),
};
// We use the size of `self` as a lower bound on the actual heap memory allocated
// by the rayon threadpool to store this `Batch`.
task.own_usage = mem::size_of_val(&task) + task.item.dynamic_usage();
// Approximate now as when the heap cost of this running batch begins. In practice
// this is fine, because `Self::add_task` is called from `Self::run_task` which
// immediately moves the task to the heap.
self.running_usage
.fetch_add(task.own_usage, Ordering::SeqCst);
task
}
}
/// A task that will clean up its own heap usage from the overall running usage once it is
/// complete.
pub(crate) struct WithUsageTask<Item> {
/// The item being run.
item: Item,
/// Size of this task on the heap. We assume that the size of the task does not change
/// once it has been created, to avoid needing to maintain bidirectional channels
/// between [`WithUsage`] and its tasks.
own_usage: usize,
/// Pointer to the parent [`WithUsage`]'s heap usage tracker for running tasks.
running_usage: Arc<AtomicUsize>,
}
impl<Item: Task> Task for WithUsageTask<Item> {
fn run(self) {
// Run the item.
self.item.run();
// Signal that the heap memory for this task has been freed.
self.running_usage
.fetch_sub(self.own_usage, Ordering::SeqCst);
}
}
/// A batch of outputs to trial decrypt.
struct Batch<A, D: BatchDomain, Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>> {
tags: Vec<A>,
@ -285,8 +390,6 @@ struct Batch<A, D: BatchDomain, Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>>
/// (that is captured in the outer `OutputIndex` of each `OutputReplier`).
outputs: Vec<(D, Output)>,
repliers: Vec<OutputReplier<A, D>>,
// Pointer to the parent `BatchRunner`'s heap usage tracker for running batches.
running_usage: Arc<AtomicUsize>,
}
fn base_vec_usage<T>(c: &Vec<T>) -> usize {
@ -326,18 +429,13 @@ where
Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>,
{
/// Constructs a new batch.
fn new(
tags: Vec<A>,
ivks: Vec<D::IncomingViewingKey>,
running_usage: Arc<AtomicUsize>,
) -> Self {
fn new(tags: Vec<A>, ivks: Vec<D::IncomingViewingKey>) -> Self {
assert_eq!(tags.len(), ivks.len());
Self {
tags,
ivks,
outputs: vec![],
repliers: vec![],
running_usage,
}
}
@ -345,22 +443,26 @@ where
fn is_empty(&self) -> bool {
self.outputs.is_empty()
}
}
impl<A, D, Output> Task for Batch<A, D, Output>
where
A: Clone + Send + 'static,
D: OutputDomain + Send + 'static,
D::IncomingViewingKey: Send,
D::Memo: Send,
D::Note: Send,
D::Recipient: Send,
Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE> + Send + 'static,
{
/// Runs the batch of trial decryptions, and reports the results.
fn run(self) {
// Approximate now as when the heap cost of this running batch begins. We use the
// size of `self` as a lower bound on the actual heap memory allocated by the
// rayon threadpool to store this `Batch`.
let own_usage = std::mem::size_of_val(&self) + self.dynamic_usage();
self.running_usage.fetch_add(own_usage, Ordering::SeqCst);
// Deconstruct self so we can consume the pieces individually.
let Self {
tags,
ivks,
outputs,
repliers,
running_usage,
} = self;
assert_eq!(outputs.len(), repliers.len());
@ -393,9 +495,6 @@ where
}
}
}
// Signal that the heap memory for this batch is about to be freed.
running_usage.fetch_sub(own_usage, Ordering::SeqCst);
}
}
@ -472,28 +571,34 @@ impl<A, D: Domain> DynamicUsage for BatchReceiver<A, D> {
}
/// Logic to run batches of trial decryptions on the global threadpool.
struct BatchRunner<A, D: BatchDomain, Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>> {
struct BatchRunner<A, D, Output, T>
where
D: BatchDomain,
Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>,
T: Tasks<Batch<A, D, Output>>,
{
// The batch currently being accumulated.
acc: Batch<A, D, Output>,
// The dynamic memory usage of the running batches.
running_usage: Arc<AtomicUsize>,
// The running batches.
running_tasks: T,
// Receivers for the results of the running batches.
pending_results: HashMap<ResultKey, BatchReceiver<A, D>>,
}
impl<A, D, Output> DynamicUsage for BatchRunner<A, D, Output>
impl<A, D, Output, T> DynamicUsage for BatchRunner<A, D, Output, T>
where
D: BatchDomain,
Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>,
T: Tasks<Batch<A, D, Output>> + DynamicUsage,
{
fn dynamic_usage(&self) -> usize {
self.acc.dynamic_usage()
+ self.running_usage.load(Ordering::Relaxed)
+ self.running_tasks.dynamic_usage()
+ self.pending_results.dynamic_usage()
}
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
let running_usage = self.running_usage.load(Ordering::Relaxed);
let running_usage = self.running_tasks.dynamic_usage();
let bounds = (
self.acc.dynamic_usage_bounds(),
@ -510,25 +615,25 @@ where
}
}
impl<A, D, Output> BatchRunner<A, D, Output>
impl<A, D, Output, T> BatchRunner<A, D, Output, T>
where
A: Clone,
D: OutputDomain,
Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>,
T: Tasks<Batch<A, D, Output>>,
{
/// Constructs a new batch runner for the given incoming viewing keys.
fn new(ivks: impl Iterator<Item = (A, D::IncomingViewingKey)>) -> Self {
let running_usage = Arc::new(AtomicUsize::new(0));
let (tags, ivks) = ivks.unzip();
Self {
acc: Batch::new(tags, ivks, running_usage.clone()),
running_usage,
acc: Batch::new(tags, ivks),
running_tasks: T::new(),
pending_results: HashMap::default(),
}
}
}
impl<A, D, Output> BatchRunner<A, D, Output>
impl<A, D, Output, T> BatchRunner<A, D, Output, T>
where
A: Clone + Send + 'static,
D: OutputDomain + Send + 'static,
@ -537,6 +642,7 @@ where
D::Note: Send,
D::Recipient: Send,
Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE> + Clone + Send + 'static,
T: Tasks<Batch<A, D, Output>>,
{
/// Batches the given outputs for trial decryption.
///
@ -569,13 +675,9 @@ where
/// Subsequent calls to `Self::add_outputs` will be accumulated into a new batch.
fn flush(&mut self) {
if !self.acc.is_empty() {
let mut batch = Batch::new(
self.acc.tags.clone(),
self.acc.ivks.clone(),
self.running_usage.clone(),
);
let mut batch = Batch::new(self.acc.tags.clone(), self.acc.ivks.clone());
mem::swap(&mut batch, &mut self.acc);
rayon::spawn_fifo(|| batch.run());
self.running_tasks.run_task(batch);
}
}
@ -613,11 +715,13 @@ where
}
}
type SaplingRunner =
BatchRunner<[u8; 32], SaplingDomain<Network>, OutputDescription<GrothProofBytes>, WithUsage>;
/// A batch scanner for the `zcashd` wallet.
struct BatchScanner {
params: Network,
sapling_runner:
Option<BatchRunner<[u8; 32], SaplingDomain<Network>, OutputDescription<GrothProofBytes>>>,
sapling_runner: Option<SaplingRunner>,
}
impl DynamicUsage for BatchScanner {