zcash_client_backend: Only store successes in batch scanner

Previously we were sending an `Option<DecryptedNote>` from each `Batch`
back to its parent `BatchRunner`. However, this requires allocating
sufficient space in the channel to handle the case where every output
can be decrypted. In general this will not be the case, and we can
instead signal "nothing decrypted" by just dropping the channel sender.
This reduces the post-batch-scanning memory usage of `BatchRunner` from
being linear in the number of on-chain outputs, to being linear in the
number of outputs for the wallet.

Ported from zcash/zcash@f7f6c2070d.
This commit is contained in:
Jack Grigg 2022-09-21 23:51:40 +00:00
parent 533722b70e
commit 9a1d61cb4e
1 changed files with 37 additions and 20 deletions

View File

@ -46,7 +46,7 @@ struct OutputIndex<V> {
value: V,
}
type OutputItem<A, D> = OutputIndex<Option<DecryptedNote<A, D>>>;
type OutputItem<A, D> = OutputIndex<DecryptedNote<A, D>>;
/// The sender for the result of batch scanning a specific transaction output.
struct OutputReplier<A, D: Domain>(OutputIndex<channel::Sender<OutputItem<A, D>>>);
@ -176,29 +176,42 @@ where
let own_usage = std::mem::size_of_val(&self) + self.dynamic_usage();
self.running_usage.fetch_add(own_usage, Ordering::SeqCst);
assert_eq!(self.outputs.len(), self.repliers.len());
// Deconstruct self so we can consume the pieces individually.
let Self {
tags,
ivks,
outputs,
repliers,
running_usage,
} = self;
let decryption_results = batch::try_compact_note_decryption(&self.ivks, &self.outputs);
assert_eq!(outputs.len(), repliers.len());
let decryption_results = batch::try_compact_note_decryption(&ivks, &outputs);
for (decryption_result, OutputReplier(replier)) in
decryption_results.into_iter().zip(self.repliers.iter())
decryption_results.into_iter().zip(repliers.into_iter())
{
let result = OutputIndex {
output_index: replier.output_index,
value: decryption_result.map(|((note, recipient), ivk_idx)| DecryptedNote {
ivk_tag: self.tags[ivk_idx].clone(),
recipient,
note,
}),
};
// If `decryption_result` is `None` then we will just drop `replier`,
// indicating to the parent `BatchRunner` that this output was not for us.
if let Some(((note, recipient), ivk_idx)) = decryption_result {
let result = OutputIndex {
output_index: replier.output_index,
value: DecryptedNote {
ivk_tag: tags[ivk_idx].clone(),
recipient,
note,
},
};
if replier.value.send(result).is_err() {
tracing::debug!("BatchRunner was dropped before batch finished");
break;
if replier.value.send(result).is_err() {
tracing::debug!("BatchRunner was dropped before batch finished");
break;
}
}
}
// Signal that the heap memory for this batch is about to be freed.
self.running_usage.fetch_sub(own_usage, Ordering::SeqCst);
running_usage.fetch_sub(own_usage, Ordering::SeqCst);
}
}
@ -367,14 +380,18 @@ where
// We won't have a pending result if the transaction didn't have outputs of
// this runner's kind.
.map(|BatchReceiver(rx)| {
// This iterator will end once the channel becomes empty and disconnected.
// We created one sender per output, and each sender is dropped after the
// batch it is in completes (and in the case of successful decryptions,
// after the decrypted note has been sent to the channel). Completion of
// the iterator therefore corresponds to complete knowledge of the outputs
// of this transaction that could be decrypted.
rx.into_iter()
.filter_map(
.map(
|OutputIndex {
output_index,
value,
}| {
value.map(|decrypted_note| ((txid, output_index), decrypted_note))
},
}| { ((txid, output_index), value) },
)
.collect()
})