Use unbounded channel (#7081)
This commit is contained in:
parent
8a879faac7
commit
c965a110f2
|
@ -3539,6 +3539,7 @@ dependencies = [
|
||||||
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
"bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
"chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"dir-diff 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
"dir-diff 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"dlopen 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
"dlopen 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"dlopen_derive 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
"dlopen_derive 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
|
|
@ -1028,10 +1028,7 @@ mod tests {
|
||||||
system_transaction,
|
system_transaction,
|
||||||
transaction::TransactionError,
|
transaction::TransactionError,
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{sync::atomic::Ordering, thread::sleep};
|
||||||
sync::{atomic::Ordering, mpsc::channel},
|
|
||||||
thread::sleep,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_banking_stage_shutdown1() {
|
fn test_banking_stage_shutdown1() {
|
||||||
|
@ -1950,7 +1947,7 @@ mod tests {
|
||||||
blocktree.insert_shreds(shreds, None, false).unwrap();
|
blocktree.insert_shreds(shreds, None, false).unwrap();
|
||||||
blocktree.set_roots(&[bank.slot()]).unwrap();
|
blocktree.set_roots(&[bank.slot()]).unwrap();
|
||||||
|
|
||||||
let (transaction_status_sender, transaction_status_receiver) = channel();
|
let (transaction_status_sender, transaction_status_receiver) = unbounded();
|
||||||
let transaction_status_service = TransactionStatusService::new(
|
let transaction_status_service = TransactionStatusService::new(
|
||||||
transaction_status_receiver,
|
transaction_status_receiver,
|
||||||
blocktree.clone(),
|
blocktree.clone(),
|
||||||
|
|
|
@ -1155,6 +1155,7 @@ mod test {
|
||||||
replay_stage::ReplayStage,
|
replay_stage::ReplayStage,
|
||||||
transaction_status_service::TransactionStatusService,
|
transaction_status_service::TransactionStatusService,
|
||||||
};
|
};
|
||||||
|
use crossbeam_channel::unbounded;
|
||||||
use solana_ledger::{
|
use solana_ledger::{
|
||||||
blocktree::make_slot_entries,
|
blocktree::make_slot_entries,
|
||||||
blocktree::{entries_to_test_shreds, BlocktreeError},
|
blocktree::{entries_to_test_shreds, BlocktreeError},
|
||||||
|
@ -1759,7 +1760,7 @@ mod test {
|
||||||
blocktree.insert_shreds(shreds, None, false).unwrap();
|
blocktree.insert_shreds(shreds, None, false).unwrap();
|
||||||
blocktree.set_roots(&[slot]).unwrap();
|
blocktree.set_roots(&[slot]).unwrap();
|
||||||
|
|
||||||
let (transaction_status_sender, transaction_status_receiver) = channel();
|
let (transaction_status_sender, transaction_status_receiver) = unbounded();
|
||||||
let transaction_status_service = TransactionStatusService::new(
|
let transaction_status_service = TransactionStatusService::new(
|
||||||
transaction_status_receiver,
|
transaction_status_receiver,
|
||||||
blocktree.clone(),
|
blocktree.clone(),
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
|
use crossbeam_channel::{Receiver, RecvTimeoutError};
|
||||||
use solana_client::rpc_request::RpcTransactionStatus;
|
use solana_client::rpc_request::RpcTransactionStatus;
|
||||||
use solana_ledger::{blocktree::Blocktree, blocktree_processor::TransactionStatusBatch};
|
use solana_ledger::{blocktree::Blocktree, blocktree_processor::TransactionStatusBatch};
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use std::{
|
use std::{
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
mpsc::{Receiver, RecvTimeoutError},
|
|
||||||
Arc,
|
Arc,
|
||||||
},
|
},
|
||||||
thread::{self, Builder, JoinHandle},
|
thread::{self, Builder, JoinHandle},
|
||||||
|
@ -35,9 +35,9 @@ impl TransactionStatusService {
|
||||||
&blocktree,
|
&blocktree,
|
||||||
) {
|
) {
|
||||||
match e {
|
match e {
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
_ => info!("Error from write_transaction_statuses: {:?}", e),
|
_ => info!("Error from write_transaction_status_batch: {:?}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -19,6 +19,7 @@ use crate::{
|
||||||
transaction_status_service::TransactionStatusService,
|
transaction_status_service::TransactionStatusService,
|
||||||
tvu::{Sockets, Tvu},
|
tvu::{Sockets, Tvu},
|
||||||
};
|
};
|
||||||
|
use crossbeam_channel::unbounded;
|
||||||
use solana_ledger::{
|
use solana_ledger::{
|
||||||
bank_forks::{BankForks, SnapshotConfig},
|
bank_forks::{BankForks, SnapshotConfig},
|
||||||
bank_forks_utils,
|
bank_forks_utils,
|
||||||
|
@ -44,7 +45,7 @@ use std::{
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
process,
|
process,
|
||||||
sync::atomic::{AtomicBool, Ordering},
|
sync::atomic::{AtomicBool, Ordering},
|
||||||
sync::mpsc::{channel, Receiver},
|
sync::mpsc::Receiver,
|
||||||
sync::{Arc, Mutex, RwLock},
|
sync::{Arc, Mutex, RwLock},
|
||||||
thread::Result,
|
thread::Result,
|
||||||
};
|
};
|
||||||
|
@ -244,7 +245,7 @@ impl Validator {
|
||||||
|
|
||||||
let (transaction_status_sender, transaction_status_service) =
|
let (transaction_status_sender, transaction_status_service) =
|
||||||
if rpc_service.is_some() && !config.transaction_status_service_disabled {
|
if rpc_service.is_some() && !config.transaction_status_service_disabled {
|
||||||
let (transaction_status_sender, transaction_status_receiver) = channel();
|
let (transaction_status_sender, transaction_status_receiver) = unbounded();
|
||||||
(
|
(
|
||||||
Some(transaction_status_sender),
|
Some(transaction_status_sender),
|
||||||
Some(TransactionStatusService::new(
|
Some(TransactionStatusService::new(
|
||||||
|
|
|
@ -13,6 +13,7 @@ bincode = "1.2.0"
|
||||||
byteorder = "1.3.2"
|
byteorder = "1.3.2"
|
||||||
bzip2 = "0.3.3"
|
bzip2 = "0.3.3"
|
||||||
chrono = { version = "0.4.9", features = ["serde"] }
|
chrono = { version = "0.4.9", features = ["serde"] }
|
||||||
|
crossbeam-channel = "0.3"
|
||||||
dir-diff = "0.3.2"
|
dir-diff = "0.3.2"
|
||||||
dlopen = "0.1.8"
|
dlopen = "0.1.8"
|
||||||
dlopen_derive = "0.1.4"
|
dlopen_derive = "0.1.4"
|
||||||
|
|
|
@ -6,6 +6,7 @@ use crate::{
|
||||||
entry::{create_ticks, Entry, EntrySlice},
|
entry::{create_ticks, Entry, EntrySlice},
|
||||||
leader_schedule_cache::LeaderScheduleCache,
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
};
|
};
|
||||||
|
use crossbeam_channel::Sender;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use log::*;
|
use log::*;
|
||||||
use rand::{seq::SliceRandom, thread_rng};
|
use rand::{seq::SliceRandom, thread_rng};
|
||||||
|
@ -27,7 +28,7 @@ use solana_sdk::{
|
||||||
use std::{
|
use std::{
|
||||||
cell::RefCell,
|
cell::RefCell,
|
||||||
result,
|
result,
|
||||||
sync::{mpsc::Sender, Arc},
|
sync::Arc,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue