Use crossbeam-channel instead of std::sync::mpsc.

See https://github.com/zcash/zcash/pull/6088#issuecomment-1192806433 for rationale.

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
This commit is contained in:
Daira Hopwood 2022-07-22 19:04:26 +01:00
parent ee4a427519
commit 7d1e14ac3d
3 changed files with 7 additions and 5 deletions

1
Cargo.lock generated
View File

@ -956,6 +956,7 @@ dependencies = [
"bls12_381", "bls12_381",
"byteorder", "byteorder",
"clearscreen", "clearscreen",
"crossbeam-channel",
"cxx", "cxx",
"ed25519-zebra", "ed25519-zebra",
"group", "group",

View File

@ -38,6 +38,7 @@ blake2b_simd = "1"
blake2s_simd = "1" blake2s_simd = "1"
bls12_381 = "0.7" bls12_381 = "0.7"
byteorder = "1" byteorder = "1"
crossbeam-channel = "0.5"
group = "0.12" group = "0.12"
incrementalmerkletree = "0.3" incrementalmerkletree = "0.3"
libc = "0.2" libc = "0.2"

View File

@ -2,8 +2,8 @@ use core::fmt;
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
use std::mem; use std::mem;
use std::sync::mpsc;
use crossbeam_channel as channel;
use group::GroupEncoding; use group::GroupEncoding;
use zcash_note_encryption::{batch, BatchDomain, Domain, ShieldedOutput, ENC_CIPHERTEXT_SIZE}; use zcash_note_encryption::{batch, BatchDomain, Domain, ShieldedOutput, ENC_CIPHERTEXT_SIZE};
use zcash_primitives::{ use zcash_primitives::{
@ -222,7 +222,7 @@ struct OutputIndex<V> {
value: V, value: V,
} }
type OutputReplier<D> = OutputIndex<mpsc::Sender<OutputIndex<Option<DecryptedNote<D>>>>>; type OutputReplier<D> = OutputIndex<channel::Sender<OutputIndex<Option<DecryptedNote<D>>>>>;
/// A batch of outputs to trial decrypt. /// A batch of outputs to trial decrypt.
struct Batch<D: BatchDomain, Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>> { struct Batch<D: BatchDomain, Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>> {
@ -296,7 +296,7 @@ impl<D: BatchDomain, Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE> + Clone> Bat
&mut self, &mut self,
domain: impl Fn() -> D, domain: impl Fn() -> D,
outputs: &[Output], outputs: &[Output],
replier: mpsc::Sender<OutputIndex<Option<DecryptedNote<D>>>>, replier: channel::Sender<OutputIndex<Option<DecryptedNote<D>>>>,
) { ) {
self.outputs self.outputs
.extend(outputs.iter().cloned().map(|output| (domain(), output))); .extend(outputs.iter().cloned().map(|output| (domain(), output)));
@ -313,7 +313,7 @@ type ResultKey = (BlockHash, TxId);
/// Logic to run batches of trial decryptions on the global threadpool. /// Logic to run batches of trial decryptions on the global threadpool.
struct BatchRunner<D: BatchDomain, Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>> { struct BatchRunner<D: BatchDomain, Output: ShieldedOutput<D, ENC_CIPHERTEXT_SIZE>> {
acc: Batch<D, Output>, acc: Batch<D, Output>,
pending_results: HashMap<ResultKey, mpsc::Receiver<OutputIndex<Option<DecryptedNote<D>>>>>, pending_results: HashMap<ResultKey, channel::Receiver<OutputIndex<Option<DecryptedNote<D>>>>>,
} }
impl<D, Output> BatchRunner<D, Output> impl<D, Output> BatchRunner<D, Output>
@ -356,7 +356,7 @@ where
domain: impl Fn() -> D, domain: impl Fn() -> D,
outputs: &[Output], outputs: &[Output],
) { ) {
let (tx, rx) = mpsc::channel(); let (tx, rx) = channel::unbounded();
self.acc.add_outputs(domain, outputs, tx); self.acc.add_outputs(domain, outputs, tx);
self.pending_results.insert((block_tag, txid), rx); self.pending_results.insert((block_tag, txid), rx);