diff --git a/Cargo.lock b/Cargo.lock index 2c72d92563..1dc9c110e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -497,6 +497,15 @@ dependencies = [ "cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam-channel" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.6.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crossbeam-deque" version = "0.6.3" @@ -2168,6 +2177,7 @@ dependencies = [ "chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "core_affinity 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)", "crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "hashbrown 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3551,6 +3561,7 @@ dependencies = [ "checksum core_affinity 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6d162c6e463c31dbf78fefa99d042156c1c74d404e299cfe3df2923cb857595b" "checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" "checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" +"checksum crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0f0ed1a4de2235cabda8558ff5840bffb97fcb64c97827f354a451307df5f72b" "checksum crossbeam-deque 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "05e44b8cf3e1a625844d1750e1f7820da46044ff6d28f4d43e455ba3e5bb2c13" "checksum crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71" "checksum crossbeam-epoch 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "04c9e3102cc2d69cd681412141b390abd55a362afc1540965dad0ad4d34280b4" diff --git a/core/Cargo.toml b/core/Cargo.toml index af3ecf3854..d0b00a9cf9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -24,6 +24,7 @@ byteorder = "1.3.2" chrono = { version = "0.4.7", features = ["serde"] } core_affinity = "0.5.9" crc = { version = "1.8.1", optional = true } +crossbeam-channel = "0.3" hashbrown = "0.2.0" indexmap = "1.0" itertools = "0.8.0" diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index e36f22d58d..93f5ad0c39 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -4,6 +4,7 @@ extern crate test; #[macro_use] extern crate solana; +use crossbeam_channel::unbounded; use log::*; use rand::{thread_rng, Rng}; use rayon::prelude::*; @@ -26,7 +27,7 @@ use solana_sdk::timing::{ }; use std::iter; use std::sync::atomic::Ordering; -use std::sync::mpsc::{channel, Receiver}; +use std::sync::mpsc::Receiver; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use test::Bencher; @@ -104,8 +105,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { // during the benchmark genesis_block.ticks_per_slot = 10_000; - let (verified_sender, verified_receiver) = channel(); - let (vote_sender, vote_receiver) = channel(); + let (verified_sender, verified_receiver) = unbounded(); + let (vote_sender, vote_receiver) = unbounded(); let bank = Arc::new(Bank::new(&genesis_block)); let to_pubkey = Pubkey::new_rand(); let dummy = system_transaction::transfer(&mint_keypair, &to_pubkey, 1, genesis_block.hash()); @@ -230,8 +231,8 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { .. } = create_genesis_block(mint_total); - let (verified_sender, verified_receiver) = channel(); - let (vote_sender, vote_receiver) = channel(); + let (verified_sender, verified_receiver) = unbounded(); + let (vote_sender, vote_receiver) = unbounded(); let bank = Arc::new(Bank::new(&genesis_block)); let to_pubkey = Pubkey::new_rand(); let dummy = system_transaction::transfer(&mint_keypair, &to_pubkey, 1, genesis_block.hash()); diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 7511b002f9..9fa8e7e79f 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -3,6 +3,7 @@ extern crate solana; extern crate test; +use crossbeam_channel::unbounded; use log::*; use rand::{thread_rng, Rng}; use solana::packet::to_packets_chunked; @@ -21,7 +22,7 @@ use test::Bencher; fn bench_sigverify_stage(bencher: &mut Bencher) { solana_logger::setup(); let (packet_s, packet_r) = channel(); - let (verified_s, verified_r) = channel(); + let (verified_s, verified_r) = unbounded(); let sigverify_disabled = false; let stage = SigVerifyStage::new(packet_r, sigverify_disabled, verified_s); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index d59455a5bf..ccc3841527 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -14,6 +14,7 @@ use crate::result::{Error, Result}; use crate::service::Service; use crate::sigverify_stage::VerifiedPackets; use bincode::deserialize; +use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn}; use solana_runtime::accounts_db::ErrorCounters; @@ -28,7 +29,7 @@ use solana_sdk::timing::{ use solana_sdk::transaction::{self, Transaction, TransactionError}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{Receiver, RecvTimeoutError}; +use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -62,8 +63,8 @@ impl BankingStage { pub fn new( cluster_info: &Arc>, poh_recorder: &Arc>, - verified_receiver: Receiver, - verified_vote_receiver: Receiver, + verified_receiver: CrossbeamReceiver, + verified_vote_receiver: CrossbeamReceiver, ) -> Self { Self::new_num_threads( cluster_info, @@ -77,13 +78,10 @@ impl BankingStage { fn new_num_threads( cluster_info: &Arc>, poh_recorder: &Arc>, - verified_receiver: Receiver, - verified_vote_receiver: Receiver, + verified_receiver: CrossbeamReceiver, + verified_vote_receiver: CrossbeamReceiver, num_threads: u32, ) -> Self { - let verified_receiver = Arc::new(Mutex::new(verified_receiver)); - let verified_vote_receiver = Arc::new(Mutex::new(verified_vote_receiver)); - // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. @@ -306,7 +304,7 @@ impl BankingStage { pub fn process_loop( my_pubkey: Pubkey, - verified_receiver: &Arc>>, + verified_receiver: &CrossbeamReceiver, poh_recorder: &Arc>, cluster_info: &Arc>, recv_start: &mut Instant, @@ -346,7 +344,8 @@ impl BankingStage { recv_timeout, id, ) { - Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), + Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout)) => (), + Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected)) => break, Ok(mut unprocessed_packets) => { if unprocessed_packets.is_empty() { continue; @@ -714,16 +713,13 @@ impl BankingStage { /// Process the incoming packets pub fn process_packets( my_pubkey: &Pubkey, - verified_receiver: &Arc>>, + verified_receiver: &CrossbeamReceiver, poh: &Arc>, recv_start: &mut Instant, recv_timeout: Duration, id: u32, ) -> Result { - let mms = verified_receiver - .lock() - .unwrap() - .recv_timeout(recv_timeout)?; + let mms = verified_receiver.recv_timeout(recv_timeout)?; let mms_len = mms.len(); let count: usize = mms.iter().map(|x| x.1.len()).sum(); @@ -860,20 +856,21 @@ mod tests { use crate::packet::to_packets; use crate::poh_recorder::WorkingBank; use crate::{get_tmp_ledger_path, tmp_ledger_name}; + use crossbeam_channel::unbounded; use itertools::Itertools; use solana_sdk::instruction::InstructionError; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; use solana_sdk::transaction::TransactionError; - use std::sync::mpsc::channel; + use std::sync::atomic::Ordering; use std::thread::sleep; #[test] fn test_banking_stage_shutdown1() { let genesis_block = create_genesis_block(2).genesis_block; let bank = Arc::new(Bank::new(&genesis_block)); - let (verified_sender, verified_receiver) = channel(); - let (vote_sender, vote_receiver) = channel(); + let (verified_sender, verified_receiver) = unbounded(); + let (vote_sender, vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blocktree = Arc::new( @@ -907,8 +904,8 @@ mod tests { genesis_block.ticks_per_slot = 4; let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_blockhash(); - let (verified_sender, verified_receiver) = channel(); - let (vote_sender, vote_receiver) = channel(); + let (verified_sender, verified_receiver) = unbounded(); + let (vote_sender, vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blocktree = Arc::new( @@ -956,8 +953,8 @@ mod tests { } = create_genesis_block(10); let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_blockhash(); - let (verified_sender, verified_receiver) = channel(); - let (vote_sender, vote_receiver) = channel(); + let (verified_sender, verified_receiver) = unbounded(); + let (vote_sender, vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blocktree = Arc::new( @@ -1065,7 +1062,7 @@ mod tests { mint_keypair, .. } = create_genesis_block(2); - let (verified_sender, verified_receiver) = channel(); + let (verified_sender, verified_receiver) = unbounded(); // Process a batch that includes a transaction that receives two lamports. let alice = Keypair::new(); @@ -1097,7 +1094,7 @@ mod tests { .collect(); verified_sender.send(packets).unwrap(); - let (vote_sender, vote_receiver) = channel(); + let (vote_sender, vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let entry_receiver = { diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index bf256a7d22..3f0b992d1a 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -4,9 +4,9 @@ use crate::result::Result; use crate::service::Service; use crate::sigverify_stage::VerifiedPackets; use crate::{packet, sigverify}; +use crossbeam_channel::Sender as CrossbeamSender; use solana_metrics::inc_new_counter_debug; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; @@ -20,7 +20,7 @@ impl ClusterInfoVoteListener { exit: &Arc, cluster_info: Arc>, sigverify_disabled: bool, - sender: Sender, + sender: CrossbeamSender, poh_recorder: &Arc>, ) -> Self { let exit = exit.clone(); @@ -45,7 +45,7 @@ impl ClusterInfoVoteListener { exit: Arc, cluster_info: &Arc>, sigverify_disabled: bool, - sender: &Sender, + sender: &CrossbeamSender, poh_recorder: Arc>, ) -> Result<()> { let mut last_ts = 0; diff --git a/core/src/lib.rs b/core/src/lib.rs index ddb25e87b3..460ae5d51d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -96,3 +96,5 @@ extern crate solana_metrics; #[cfg(test)] #[macro_use] extern crate matches; + +extern crate crossbeam_channel; diff --git a/core/src/result.rs b/core/src/result.rs index 05eb4c84d5..0f9ce2a8b4 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -17,7 +17,10 @@ pub enum Error { AddrParse(std::net::AddrParseError), JoinError(Box), RecvError(std::sync::mpsc::RecvError), + TryCrossbeamRecvError(crossbeam_channel::TryRecvError), + CrossbeamRecvTimeoutError(crossbeam_channel::RecvTimeoutError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), + CrossbeamSendError, TryRecvError(std::sync::mpsc::TryRecvError), Serialize(std::boxed::Box), TransactionError(transaction::TransactionError), @@ -44,11 +47,21 @@ impl std::convert::From for Error { Error::RecvError(e) } } +impl std::convert::From for Error { + fn from(e: crossbeam_channel::TryRecvError) -> Error { + Error::TryCrossbeamRecvError(e) + } +} impl std::convert::From for Error { fn from(e: std::sync::mpsc::TryRecvError) -> Error { Error::TryRecvError(e) } } +impl std::convert::From for Error { + fn from(e: crossbeam_channel::RecvTimeoutError) -> Error { + Error::CrossbeamRecvTimeoutError(e) + } +} impl std::convert::From for Error { fn from(e: std::sync::mpsc::RecvTimeoutError) -> Error { Error::RecvTimeoutError(e) @@ -69,6 +82,11 @@ impl std::convert::From for Error { Error::ErasureError(e) } } +impl std::convert::From> for Error { + fn from(_e: crossbeam_channel::SendError) -> Error { + Error::CrossbeamSendError + } +} impl std::convert::From> for Error { fn from(_e: std::sync::mpsc::SendError) -> Error { Error::SendError diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index c1d6fd058d..a83075dc8f 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -10,9 +10,10 @@ use crate::result::{Error, Result}; use crate::service::Service; use crate::sigverify; use crate::streamer::{self, PacketReceiver}; +use crossbeam_channel::Sender as CrossbeamSender; use solana_metrics::{datapoint_info, inc_new_counter_info}; use solana_sdk::timing; -use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; +use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex}; use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; @@ -34,7 +35,7 @@ impl SigVerifyStage { pub fn new( packet_receiver: Receiver, sigverify_disabled: bool, - verified_sender: Sender, + verified_sender: CrossbeamSender, ) -> Self { sigverify::init(); let thread_hdls = @@ -53,7 +54,7 @@ impl SigVerifyStage { fn verifier( recvr: &Arc>, - sendr: &Sender, + sendr: &CrossbeamSender, sigverify_disabled: bool, id: usize, ) -> Result<()> { @@ -107,7 +108,7 @@ impl SigVerifyStage { fn verifier_service( packet_receiver: Arc>, - verified_sender: Sender, + verified_sender: CrossbeamSender, sigverify_disabled: bool, id: usize, ) -> JoinHandle<()> { @@ -132,7 +133,7 @@ impl SigVerifyStage { fn verifier_services( packet_receiver: PacketReceiver, - verified_sender: Sender, + verified_sender: CrossbeamSender, sigverify_disabled: bool, ) -> Vec> { let receiver = Arc::new(Mutex::new(packet_receiver)); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 24d25953b6..4b0b416801 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -10,6 +10,7 @@ use crate::fetch_stage::FetchStage; use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; +use crossbeam_channel::unbounded; use solana_sdk::pubkey::Pubkey; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; @@ -50,12 +51,12 @@ impl Tpu { &packet_sender, &poh_recorder, ); - let (verified_sender, verified_receiver) = channel(); + let (verified_sender, verified_receiver) = unbounded(); let sigverify_stage = SigVerifyStage::new(packet_receiver, sigverify_disabled, verified_sender.clone()); - let (verified_vote_sender, verified_vote_receiver) = channel(); + let (verified_vote_sender, verified_vote_receiver) = unbounded(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( &exit, cluster_info.clone(),