More thiserror (#7183)

* Less solana_core::result. Module now private.

* Drop solana_core::result dependency from a few more modules

* Fix warning

* Cleanup

* Fix typo
This commit is contained in:
Greg Fitzgerald 2020-01-02 20:50:43 -07:00 committed by GitHub
parent a956bb08d8
commit a707c9410e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 140 additions and 124 deletions

1
Cargo.lock generated
View File

@ -3412,6 +3412,7 @@ dependencies = [
"sys-info 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)",
"systemstat 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"thiserror 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-fs 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,6 +1,5 @@
use clap::{crate_description, crate_name, App, Arg};
use solana_core::packet::{Packet, Packets, PacketsRecycler, PACKET_DATA_SIZE};
use solana_core::result::Result;
use solana_core::streamer::{receiver, PacketReceiver};
use std::cmp::max;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
@ -8,7 +7,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread::sleep;
use std::thread::{spawn, JoinHandle};
use std::thread::{spawn, JoinHandle, Result};
use std::time::Duration;
use std::time::SystemTime;

View File

@ -64,6 +64,7 @@ solana-sys-tuner = { path = "../sys-tuner", version = "0.23.0" }
symlink = "0.1.0"
sys-info = "0.5.8"
tempfile = "3.1.0"
thiserror = "1.0"
tokio = "0.1"
tokio-codec = "0.1"
tokio-fs = "0.1"

View File

@ -6,7 +6,6 @@ use crate::{
packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH},
poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry},
poh_service::PohService,
result::{Error, Result},
thread_mem_usage,
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
@ -167,7 +166,7 @@ impl BankingStage {
buffered_packets: &mut Vec<PacketsAndOffsets>,
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Result<UnprocessedPackets> {
) -> UnprocessedPackets {
let mut unprocessed_packets = vec![];
let mut rebuffered_packets = 0;
let mut new_tx_count = 0;
@ -251,7 +250,7 @@ impl BankingStage {
inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count);
inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count);
Ok(unprocessed_packets)
unprocessed_packets
}
fn consume_or_forward_packets(
@ -291,7 +290,7 @@ impl BankingStage {
enable_forwarding: bool,
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Result<()> {
) {
let (leader_at_slot_offset, poh_has_bank, would_be_leader) = {
let poh = poh_recorder.lock().unwrap();
(
@ -318,9 +317,8 @@ impl BankingStage {
buffered_packets,
batch_limit,
transaction_status_sender,
)?;
);
buffered_packets.append(&mut unprocessed);
Ok(())
}
BufferedPacketsDecision::Forward => {
if enable_forwarding {
@ -328,7 +326,7 @@ impl BankingStage {
.lock()
.unwrap()
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET);
next_leader.map_or(Ok(()), |leader_pubkey| {
next_leader.map_or((), |leader_pubkey| {
let leader_addr = {
cluster_info
.read()
@ -337,22 +335,20 @@ impl BankingStage {
.map(|leader| leader.tpu_forwards)
};
leader_addr.map_or(Ok(()), |leader_addr| {
leader_addr.map_or((), |leader_addr| {
let _ = Self::forward_buffered_packets(
&socket,
&leader_addr,
&buffered_packets,
);
buffered_packets.clear();
Ok(())
})
})
} else {
buffered_packets.clear();
Ok(())
}
}
_ => Ok(()),
_ => (),
}
}
@ -380,8 +376,7 @@ impl BankingStage {
enable_forwarding,
batch_limit,
transaction_status_sender.clone(),
)
.unwrap_or_else(|_| buffered_packets.clear());
);
}
let recv_timeout = if !buffered_packets.is_empty() {
@ -404,8 +399,8 @@ impl BankingStage {
batch_limit,
transaction_status_sender.clone(),
) {
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
Ok(mut unprocessed_packets) => {
if unprocessed_packets.is_empty() {
continue;
@ -417,9 +412,6 @@ impl BankingStage {
inc_new_counter_info!("banking_stage-buffered_packets", num);
buffered_packets.append(&mut unprocessed_packets);
}
Err(err) => {
debug!("solana-banking-stage-tx error: {:?}", err);
}
}
}
}
@ -449,7 +441,7 @@ impl BankingStage {
txs: &[Transaction],
results: &[TransactionProcessResult],
poh: &Arc<Mutex<PohRecorder>>,
) -> (Result<usize>, Vec<usize>) {
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut processed_generation = Measure::start("record::process_generation");
let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results
.iter()
@ -484,11 +476,11 @@ impl BankingStage {
match res {
Ok(()) => (),
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => {
Err(PohRecorderError::MaxHeightReached) => {
// If record errors, add all the committable transactions (the ones
// we just attempted to record) as retryable
return (
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)),
Err(PohRecorderError::MaxHeightReached),
processed_transactions_indexes,
);
}
@ -504,7 +496,7 @@ impl BankingStage {
poh: &Arc<Mutex<PohRecorder>>,
batch: &TransactionBatch,
transaction_status_sender: Option<TransactionStatusSender>,
) -> (Result<usize>, Vec<usize>) {
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut load_execute_time = Measure::start("load_execute_time");
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids.
@ -580,7 +572,7 @@ impl BankingStage {
poh: &Arc<Mutex<PohRecorder>>,
chunk_offset: usize,
transaction_status_sender: Option<TransactionStatusSender>,
) -> (Result<usize>, Vec<usize>) {
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
@ -641,7 +633,7 @@ impl BankingStage {
// Add the retryable txs (transactions that errored in a way that warrants a retry)
// to the list of unprocessed txs.
unprocessed_txs.extend_from_slice(&retryable_txs_in_chunk);
if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result {
if let Err(PohRecorderError::MaxHeightReached) = result {
info!(
"process transactions: max height reached slot: {} height: {}",
bank.slot(),
@ -861,7 +853,7 @@ impl BankingStage {
id: u32,
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Result<UnprocessedPackets> {
) -> Result<UnprocessedPackets, RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?;
recv_time.stop();
@ -1436,10 +1428,7 @@ mod tests {
&results,
&poh_recorder,
);
assert_matches!(
res,
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
);
assert_matches!(res, Err(PohRecorderError::MaxHeightReached));
// The first result was an error so it's filtered out. The second result was Ok(),
// so it should be marked as retryable
assert_eq!(retryable, vec![1]);
@ -1757,7 +1746,7 @@ mod tests {
None,
)
.0,
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
Err(PohRecorderError::MaxHeightReached)
);
assert_eq!(bank.get_balance(&pubkey), 1);

View File

@ -2,13 +2,13 @@
//! local unix socket, to provide client services such as a block explorer with
//! real-time access to entries.
use crate::result::Result;
use bincode::serialize;
use chrono::{SecondsFormat, Utc};
use serde_json::json;
use solana_ledger::entry::Entry;
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
use std::cell::RefCell;
use std::io::Result;
use std::path::{Path, PathBuf};
pub trait EntryWriter: std::fmt::Debug {
@ -61,10 +61,10 @@ impl EntryWriter for EntrySocket {
}
#[cfg(windows)]
fn write(&self, _payload: String) -> Result<()> {
Err(crate::result::Error::from(std::io::Error::new(
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"EntryWriter::write() not implemented for windows",
)))
))
}
}

View File

@ -1,4 +1,3 @@
use crate::result::{Error, Result};
use solana_runtime::bank::Bank;
use solana_sdk::clock::Slot;
use solana_vote_program::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY};
@ -113,15 +112,10 @@ impl AggregateCommitmentService {
break;
}
if let Err(e) = Self::run(&receiver, &block_commitment_cache, &exit_) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => info!(
"Unexpected error from AggregateCommitmentService: {:?}",
e
),
}
if let Err(RecvTimeoutError::Disconnected) =
Self::run(&receiver, &block_commitment_cache, &exit_)
{
break;
}
})
.unwrap(),
@ -133,7 +127,7 @@ impl AggregateCommitmentService {
receiver: &Receiver<CommitmentAggregationData>,
block_commitment_cache: &RwLock<BlockCommitmentCache>,
exit: &Arc<AtomicBool>,
) -> Result<()> {
) -> Result<(), RecvTimeoutError> {
loop {
if exit.load(Ordering::Relaxed) {
return Ok(());

View File

@ -1,6 +1,5 @@
//! The `ledger_cleanup_service` drops older ledger data to limit disk space usage
use crate::result::{Error, Result};
use solana_ledger::blocktree::Blocktree;
use solana_metrics::datapoint_debug;
use solana_sdk::clock::Slot;
@ -51,9 +50,8 @@ impl LedgerCleanupService {
&mut next_purge_batch,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => info!("Error from cleanup_ledger: {:?}", e),
RecvTimeoutError::Disconnected => break,
RecvTimeoutError::Timeout => (),
}
}
})
@ -66,7 +64,7 @@ impl LedgerCleanupService {
blocktree: &Arc<Blocktree>,
max_ledger_slots: u64,
next_purge_batch: &mut u64,
) -> Result<()> {
) -> Result<(), RecvTimeoutError> {
let disk_utilization_pre = blocktree.storage_size();
let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;

View File

@ -40,7 +40,7 @@ pub mod poh_service;
pub mod recvmmsg;
pub mod repair_service;
pub mod replay_stage;
pub mod result;
mod result;
pub mod retransmit_stage;
pub mod rpc;
pub mod rpc_pubsub;

View File

@ -1,8 +1,5 @@
//! The `packet` module defines data structures and methods to pull data from the network.
use crate::{
recvmmsg::{recv_mmsg, NUM_RCVMMSGS},
result::{Error, Result},
};
use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
pub use solana_perf::packet::{
limited_deserialize, to_packets, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS,
PACKETS_BATCH_SIZE, PACKETS_PER_BATCH,
@ -10,7 +7,7 @@ pub use solana_perf::packet::{
use solana_metrics::inc_new_counter_debug;
pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE};
use std::{net::UdpSocket, time::Instant};
use std::{io::Result, net::UdpSocket, time::Instant};
pub fn recv_from(obj: &mut Packets, socket: &UdpSocket) -> Result<usize> {
let mut i = 0;
@ -34,7 +31,7 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket) -> Result<usize> {
}
Err(e) => {
trace!("recv_from err {:?}", e);
return Err(Error::IO(e));
return Err(e);
}
Ok((size, npkts)) => {
if i == 0 {

View File

@ -10,7 +10,6 @@
//! For Entries:
//! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::max_tick_height
//!
use crate::result::{Error, Result};
use solana_ledger::blocktree::Blocktree;
use solana_ledger::entry::Entry;
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
@ -27,17 +26,28 @@ use std::cmp;
use std::sync::mpsc::{channel, Receiver, SendError, Sender, SyncSender};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use thiserror::Error;
const GRACE_TICKS_FACTOR: u64 = 2;
const MAX_GRACE_SLOTS: u64 = 3;
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Error, Debug, Clone)]
pub enum PohRecorderError {
#[error("invalid calling object")]
InvalidCallingObject,
#[error("max height reached")]
MaxHeightReached,
#[error("min height not reached")]
MinHeightNotReached,
#[error("send WorkingBankEntry error")]
SendError(#[from] SendError<WorkingBankEntry>),
}
type Result<T> = std::result::Result<T, PohRecorderError>;
pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
#[derive(Clone)]
@ -248,16 +258,12 @@ impl PohRecorder {
let working_bank = self
.working_bank
.as_ref()
.ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?;
.ok_or(PohRecorderError::MaxHeightReached)?;
if self.tick_height < working_bank.min_tick_height {
return Err(Error::PohRecorderError(
PohRecorderError::MinHeightNotReached,
));
return Err(PohRecorderError::MinHeightNotReached);
}
if tick && self.tick_height == working_bank.min_tick_height {
return Err(Error::PohRecorderError(
PohRecorderError::MinHeightNotReached,
));
return Err(PohRecorderError::MinHeightNotReached);
}
let entry_count = self
@ -358,9 +364,9 @@ impl PohRecorder {
let working_bank = self
.working_bank
.as_ref()
.ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?;
.ok_or(PohRecorderError::MaxHeightReached)?;
if bank_slot != working_bank.bank.slot() {
return Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached));
return Err(PohRecorderError::MaxHeightReached);
}
{
@ -757,7 +763,7 @@ mod tests {
let h1 = hash(b"hello world!");
assert_matches!(
poh_recorder.record(bank.slot() + 1, h1, vec![tx.clone()]),
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
Err(PohRecorderError::MaxHeightReached)
);
}
Blocktree::destroy(&ledger_path).unwrap();

View File

@ -122,7 +122,6 @@ mod tests {
use super::*;
use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use crate::poh_recorder::WorkingBank;
use crate::result::Result;
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::{blocktree::Blocktree, get_tmp_ledger_path};
use solana_perf::test_tx::test_tx;
@ -164,7 +163,7 @@ mod tests {
max_tick_height: std::u64::MAX,
};
let entry_producer: JoinHandle<Result<()>> = {
let entry_producer = {
let poh_recorder = poh_recorder.clone();
let exit = exit.clone();
@ -181,7 +180,7 @@ mod tests {
.record(bank.slot(), h1, vec![tx]);
if exit.load(Ordering::Relaxed) {
break Ok(());
break;
}
}
})

View File

@ -6,10 +6,9 @@
//! if perf-libs are available
use crate::packet::Packets;
use crate::result::{Error, Result};
use crate::sigverify;
use crate::streamer::{self, PacketReceiver};
use crossbeam_channel::Sender as CrossbeamSender;
use crate::streamer::{self, PacketReceiver, StreamerError};
use crossbeam_channel::{SendError, Sender as CrossbeamSender};
use solana_measure::measure::Measure;
use solana_metrics::datapoint_debug;
use solana_perf::perf_libs;
@ -17,10 +16,22 @@ use solana_sdk::timing;
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::thread::{self, Builder, JoinHandle};
use thiserror::Error;
const RECV_BATCH_MAX_CPU: usize = 1_000;
const RECV_BATCH_MAX_GPU: usize = 5_000;
#[derive(Error, Debug)]
pub enum SigVerifyServiceError {
#[error("send packets batch error")]
SendError(#[from] SendError<Vec<Packets>>),
#[error("streamer error")]
StreamerError(#[from] StreamerError),
}
type Result<T> = std::result::Result<T, SigVerifyServiceError>;
pub struct SigVerifyStage {
thread_hdls: Vec<JoinHandle<()>>,
}
@ -78,9 +89,7 @@ impl SigVerifyStage {
let verified_batch = verifier.verify_batch(batch);
for v in verified_batch {
if sendr.send(vec![v]).is_err() {
return Err(Error::SendError);
}
sendr.send(vec![v])?;
}
verify_batch_time.stop();
@ -118,9 +127,13 @@ impl SigVerifyStage {
.spawn(move || loop {
if let Err(e) = Self::verifier(&packet_receiver, &verified_sender, id, &verifier) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::SendError => {
SigVerifyServiceError::StreamerError(StreamerError::RecvTimeoutError(
RecvTimeoutError::Disconnected,
)) => break,
SigVerifyServiceError::StreamerError(StreamerError::RecvTimeoutError(
RecvTimeoutError::Timeout,
)) => (),
SigVerifyServiceError::SendError(_) => {
break;
}
_ => error!("{:?}", e),

View File

@ -1,14 +1,14 @@
use crate::result::{Error, Result};
use bincode::serialize_into;
use solana_ledger::snapshot_package::{SnapshotPackage, SnapshotPackageReceiver};
use solana_ledger::snapshot_utils::{self, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
use solana_ledger::snapshot_utils::{self, SnapshotError, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
use solana_measure::measure::Measure;
use solana_metrics::datapoint_info;
use solana_runtime::status_cache::SlotDelta;
use solana_sdk::transaction::Result as TransactionResult;
use std::fs;
use std::fs::File;
use std::io::{BufWriter, Error as IOError, ErrorKind};
use std::io::BufWriter;
use std::process::ExitStatus;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::Arc;
@ -16,6 +16,30 @@ use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use symlink;
use tempfile::TempDir;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SnapshotServiceError {
#[error("I/O error")]
IO(#[from] std::io::Error),
#[error("serialization error")]
Serialize(#[from] Box<bincode::ErrorKind>),
#[error("receive timeout error")]
RecvTimeoutError(#[from] RecvTimeoutError),
#[error("snapshot error")]
SnapshotError(#[from] SnapshotError),
#[error("archive generation failure {0}")]
ArchiveGenerationFailure(ExitStatus),
#[error("storage path symlink is invalid")]
StoragePathSymlinkInvalid,
}
type Result<T> = std::result::Result<T, SnapshotServiceError>;
pub struct SnapshotPackagerService {
t_snapshot_packager: JoinHandle<()>,
@ -32,8 +56,10 @@ impl SnapshotPackagerService {
}
if let Err(e) = Self::run(&snapshot_package_receiver) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
SnapshotServiceError::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
break
}
SnapshotServiceError::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => info!("Error from package_snapshots: {:?}", e),
}
}
@ -91,9 +117,7 @@ impl SnapshotPackagerService {
fs::canonicalize(storage_path).expect("Could not get absolute path for accounts");
symlink::symlink_dir(storage_path, &output_path)?;
if !output_path.is_file() {
return Err(Self::get_io_error(
"Error trying to generate snapshot archive: storage path symlink is invalid",
));
return Err(SnapshotServiceError::StoragePathSymlinkInvalid);
}
}
@ -115,10 +139,9 @@ impl SnapshotPackagerService {
info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?"));
info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?"));
return Err(Self::get_io_error(&format!(
"Error trying to generate snapshot archive: {}",
output.status
)));
return Err(SnapshotServiceError::ArchiveGenerationFailure(
output.status,
));
}
// Once everything is successful, overwrite the previous tarball so that other validators
@ -152,11 +175,6 @@ impl SnapshotPackagerService {
Ok(())
}
fn get_io_error(error: &str) -> Error {
warn!("Snapshot Packaging Error: {:?}", error);
Error::IO(IOError::new(ErrorKind::Other, error))
}
fn serialize_status_cache(
slot_deltas: &[SlotDelta<TransactionResult<()>>],
snapshot_links: &TempDir,
@ -172,8 +190,7 @@ impl SnapshotPackagerService {
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
// write the status cache
serialize_into(&mut status_cache_stream, slot_deltas)
.map_err(|_| Self::get_io_error("serialize status cache error"))?;
serialize_into(&mut status_cache_stream, slot_deltas)?;
status_cache_serialize.stop();
inc_new_counter_info!(
"serialize-status-cache-ms",

View File

@ -3,19 +3,33 @@
use crate::packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH};
use crate::recvmmsg::NUM_RCVMMSGS;
use crate::result::{Error, Result};
use crate::thread_mem_usage;
use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender};
use std::sync::Arc;
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
use thiserror::Error;
pub type PacketReceiver = Receiver<Packets>;
pub type PacketSender = Sender<Packets>;
#[derive(Error, Debug)]
pub enum StreamerError {
#[error("I/O error")]
IO(#[from] std::io::Error),
#[error("receive timeout error")]
RecvTimeoutError(#[from] RecvTimeoutError),
#[error("send packets error")]
SendError(#[from] SendError<Packets>),
}
pub type Result<T> = std::result::Result<T, StreamerError>;
fn recv_loop(
sock: &UdpSocket,
exit: Arc<AtomicBool>,
@ -117,8 +131,8 @@ pub fn responder(name: &'static str, sock: Arc<UdpSocket>, r: PacketReceiver) ->
thread_mem_usage::datapoint(name);
if let Err(e) = recv_send(&sock, &r) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
StreamerError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
StreamerError::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => warn!("{} responder error: {:?}", name, e),
}
}

View File

@ -1,4 +1,3 @@
use crate::result::{Error, Result};
use crossbeam_channel::{Receiver, RecvTimeoutError};
use solana_client::rpc_request::RpcTransactionStatus;
use solana_ledger::{blocktree::Blocktree, blocktree_processor::TransactionStatusBatch};
@ -30,15 +29,11 @@ impl TransactionStatusService {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = Self::write_transaction_status_batch(
if let Err(RecvTimeoutError::Disconnected) = Self::write_transaction_status_batch(
&write_transaction_status_receiver,
&blocktree,
) {
match e {
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => info!("Error from write_transaction_status_batch: {:?}", e),
}
break;
}
})
.unwrap();
@ -48,7 +43,7 @@ impl TransactionStatusService {
fn write_transaction_status_batch(
write_transaction_status_receiver: &Receiver<TransactionStatusBatch>,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
) -> Result<(), RecvTimeoutError> {
let TransactionStatusBatch {
bank,
transactions,

View File

@ -6,7 +6,6 @@ use solana_core::cluster_info::{ClusterInfo, Node};
use solana_core::gossip_service::GossipService;
use solana_core::packet::Packet;
use solana_core::result;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::timestamp;
use std::net::UdpSocket;
@ -66,7 +65,7 @@ where
}
/// ring a -> b -> c -> d -> e -> a
#[test]
fn gossip_ring() -> result::Result<()> {
fn gossip_ring() {
solana_logger::setup();
run_gossip_topo(50, |listen| {
let num = listen.len();
@ -80,14 +79,12 @@ fn gossip_ring() -> result::Result<()> {
xv.insert_info(d);
}
});
Ok(())
}
/// ring a -> b -> c -> d -> e -> a
#[test]
#[ignore]
fn gossip_ring_large() -> result::Result<()> {
fn gossip_ring_large() {
solana_logger::setup();
run_gossip_topo(600, |listen| {
let num = listen.len();
@ -101,8 +98,6 @@ fn gossip_ring_large() -> result::Result<()> {
xv.insert_info(d);
}
});
Ok(())
}
/// star a -> (b,c,d,e)
#[test]
@ -144,7 +139,7 @@ fn gossip_rstar() {
}
#[test]
pub fn cluster_info_retransmit() -> result::Result<()> {
pub fn cluster_info_retransmit() {
solana_logger::setup();
let exit = Arc::new(AtomicBool::new(false));
trace!("c1:");
@ -177,7 +172,7 @@ pub fn cluster_info_retransmit() -> result::Result<()> {
p.meta.size = 10;
let peers = c1.read().unwrap().retransmit_peers();
let retransmit_peers: Vec<_> = peers.iter().collect();
ClusterInfo::retransmit_to(&retransmit_peers, &mut p, None, &tn1, false)?;
ClusterInfo::retransmit_to(&retransmit_peers, &mut p, None, &tn1, false).unwrap();
let res: Vec<_> = [tn1, tn2, tn3]
.into_par_iter()
.map(|s| {
@ -194,6 +189,4 @@ pub fn cluster_info_retransmit() -> result::Result<()> {
dr1.join().unwrap();
dr2.join().unwrap();
dr3.join().unwrap();
Ok(())
}