From c965a110f2764cec10a6ff4c5d8ba0998afc4651 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Thu, 21 Nov 2019 14:23:40 -0700 Subject: [PATCH] Use unbounded channel (#7081) --- Cargo.lock | 1 + core/src/banking_stage.rs | 7 ++----- core/src/replay_stage.rs | 3 ++- core/src/transaction_status_service.rs | 8 ++++---- core/src/validator.rs | 5 +++-- ledger/Cargo.toml | 1 + ledger/src/blocktree_processor.rs | 3 ++- 7 files changed, 15 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 22a8ad63b..e62a7c92b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3539,6 +3539,7 @@ dependencies = [ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "dir-diff 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "dlopen 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "dlopen_derive 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index d5b7110ea..96a683b17 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1028,10 +1028,7 @@ mod tests { system_transaction, transaction::TransactionError, }; - use std::{ - sync::{atomic::Ordering, mpsc::channel}, - thread::sleep, - }; + use std::{sync::atomic::Ordering, thread::sleep}; #[test] fn test_banking_stage_shutdown1() { @@ -1950,7 +1947,7 @@ mod tests { blocktree.insert_shreds(shreds, None, false).unwrap(); blocktree.set_roots(&[bank.slot()]).unwrap(); - let (transaction_status_sender, transaction_status_receiver) = channel(); + let (transaction_status_sender, transaction_status_receiver) = unbounded(); let transaction_status_service = TransactionStatusService::new( transaction_status_receiver, blocktree.clone(), diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index d9872a6c1..b086f17d2 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1155,6 +1155,7 @@ mod test { replay_stage::ReplayStage, transaction_status_service::TransactionStatusService, }; + use crossbeam_channel::unbounded; use solana_ledger::{ blocktree::make_slot_entries, blocktree::{entries_to_test_shreds, BlocktreeError}, @@ -1759,7 +1760,7 @@ mod test { blocktree.insert_shreds(shreds, None, false).unwrap(); blocktree.set_roots(&[slot]).unwrap(); - let (transaction_status_sender, transaction_status_receiver) = channel(); + let (transaction_status_sender, transaction_status_receiver) = unbounded(); let transaction_status_service = TransactionStatusService::new( transaction_status_receiver, blocktree.clone(), diff --git a/core/src/transaction_status_service.rs b/core/src/transaction_status_service.rs index da8a24a21..58ea8b09c 100644 --- a/core/src/transaction_status_service.rs +++ b/core/src/transaction_status_service.rs @@ -1,11 +1,11 @@ use crate::result::{Error, Result}; +use crossbeam_channel::{Receiver, RecvTimeoutError}; use solana_client::rpc_request::RpcTransactionStatus; use solana_ledger::{blocktree::Blocktree, blocktree_processor::TransactionStatusBatch}; use solana_runtime::bank::Bank; use std::{ sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{Receiver, RecvTimeoutError}, Arc, }, thread::{self, Builder, JoinHandle}, @@ -35,9 +35,9 @@ impl TransactionStatusService { &blocktree, ) { match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => info!("Error from write_transaction_statuses: {:?}", e), + Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => info!("Error from write_transaction_status_batch: {:?}", e), } } }) diff --git a/core/src/validator.rs b/core/src/validator.rs index 700d122ac..3d24df448 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -19,6 +19,7 @@ use crate::{ transaction_status_service::TransactionStatusService, tvu::{Sockets, Tvu}, }; +use crossbeam_channel::unbounded; use solana_ledger::{ bank_forks::{BankForks, SnapshotConfig}, bank_forks_utils, @@ -44,7 +45,7 @@ use std::{ path::{Path, PathBuf}, process, sync::atomic::{AtomicBool, Ordering}, - sync::mpsc::{channel, Receiver}, + sync::mpsc::Receiver, sync::{Arc, Mutex, RwLock}, thread::Result, }; @@ -244,7 +245,7 @@ impl Validator { let (transaction_status_sender, transaction_status_service) = if rpc_service.is_some() && !config.transaction_status_service_disabled { - let (transaction_status_sender, transaction_status_receiver) = channel(); + let (transaction_status_sender, transaction_status_receiver) = unbounded(); ( Some(transaction_status_sender), Some(TransactionStatusService::new( diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index fa5013cb8..171815864 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -13,6 +13,7 @@ bincode = "1.2.0" byteorder = "1.3.2" bzip2 = "0.3.3" chrono = { version = "0.4.9", features = ["serde"] } +crossbeam-channel = "0.3" dir-diff = "0.3.2" dlopen = "0.1.8" dlopen_derive = "0.1.4" diff --git a/ledger/src/blocktree_processor.rs b/ledger/src/blocktree_processor.rs index bce8c5afe..de6c42786 100644 --- a/ledger/src/blocktree_processor.rs +++ b/ledger/src/blocktree_processor.rs @@ -6,6 +6,7 @@ use crate::{ entry::{create_ticks, Entry, EntrySlice}, leader_schedule_cache::LeaderScheduleCache, }; +use crossbeam_channel::Sender; use itertools::Itertools; use log::*; use rand::{seq::SliceRandom, thread_rng}; @@ -27,7 +28,7 @@ use solana_sdk::{ use std::{ cell::RefCell, result, - sync::{mpsc::Sender, Arc}, + sync::Arc, time::{Duration, Instant}, };