diff --git a/Cargo.lock b/Cargo.lock index 8213da8588..87a44e0259 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3412,6 +3412,7 @@ dependencies = [ "sys-info 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)", "systemstat 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "thiserror 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-fs 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 0285be1b15..7b0a80cc7b 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -1,6 +1,5 @@ use clap::{crate_description, crate_name, App, Arg}; use solana_core::packet::{Packet, Packets, PacketsRecycler, PACKET_DATA_SIZE}; -use solana_core::result::Result; use solana_core::streamer::{receiver, PacketReceiver}; use std::cmp::max; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; @@ -8,7 +7,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::sleep; -use std::thread::{spawn, JoinHandle}; +use std::thread::{spawn, JoinHandle, Result}; use std::time::Duration; use std::time::SystemTime; diff --git a/core/Cargo.toml b/core/Cargo.toml index fed9d54226..4d76adda92 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -64,6 +64,7 @@ solana-sys-tuner = { path = "../sys-tuner", version = "0.23.0" } symlink = "0.1.0" sys-info = "0.5.8" tempfile = "3.1.0" +thiserror = "1.0" tokio = "0.1" tokio-codec = "0.1" tokio-fs = "0.1" diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 1fe91cc953..2eaa952728 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -6,7 +6,6 @@ use crate::{ packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH}, poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}, poh_service::PohService, - result::{Error, Result}, thread_mem_usage, }; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; @@ -167,7 +166,7 @@ impl BankingStage { buffered_packets: &mut Vec, batch_limit: usize, transaction_status_sender: Option, - ) -> Result { + ) -> UnprocessedPackets { let mut unprocessed_packets = vec![]; let mut rebuffered_packets = 0; let mut new_tx_count = 0; @@ -251,7 +250,7 @@ impl BankingStage { inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count); inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count); - Ok(unprocessed_packets) + unprocessed_packets } fn consume_or_forward_packets( @@ -291,7 +290,7 @@ impl BankingStage { enable_forwarding: bool, batch_limit: usize, transaction_status_sender: Option, - ) -> Result<()> { + ) { let (leader_at_slot_offset, poh_has_bank, would_be_leader) = { let poh = poh_recorder.lock().unwrap(); ( @@ -318,9 +317,8 @@ impl BankingStage { buffered_packets, batch_limit, transaction_status_sender, - )?; + ); buffered_packets.append(&mut unprocessed); - Ok(()) } BufferedPacketsDecision::Forward => { if enable_forwarding { @@ -328,7 +326,7 @@ impl BankingStage { .lock() .unwrap() .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET); - next_leader.map_or(Ok(()), |leader_pubkey| { + next_leader.map_or((), |leader_pubkey| { let leader_addr = { cluster_info .read() @@ -337,22 +335,20 @@ impl BankingStage { .map(|leader| leader.tpu_forwards) }; - leader_addr.map_or(Ok(()), |leader_addr| { + leader_addr.map_or((), |leader_addr| { let _ = Self::forward_buffered_packets( &socket, &leader_addr, &buffered_packets, ); buffered_packets.clear(); - Ok(()) }) }) } else { buffered_packets.clear(); - Ok(()) } } - _ => Ok(()), + _ => (), } } @@ -380,8 +376,7 @@ impl BankingStage { enable_forwarding, batch_limit, transaction_status_sender.clone(), - ) - .unwrap_or_else(|_| buffered_packets.clear()); + ); } let recv_timeout = if !buffered_packets.is_empty() { @@ -404,8 +399,8 @@ impl BankingStage { batch_limit, transaction_status_sender.clone(), ) { - Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout)) => (), - Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected)) => break, + Err(RecvTimeoutError::Timeout) => (), + Err(RecvTimeoutError::Disconnected) => break, Ok(mut unprocessed_packets) => { if unprocessed_packets.is_empty() { continue; @@ -417,9 +412,6 @@ impl BankingStage { inc_new_counter_info!("banking_stage-buffered_packets", num); buffered_packets.append(&mut unprocessed_packets); } - Err(err) => { - debug!("solana-banking-stage-tx error: {:?}", err); - } } } } @@ -449,7 +441,7 @@ impl BankingStage { txs: &[Transaction], results: &[TransactionProcessResult], poh: &Arc>, - ) -> (Result, Vec) { + ) -> (Result, Vec) { let mut processed_generation = Measure::start("record::process_generation"); let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results .iter() @@ -484,11 +476,11 @@ impl BankingStage { match res { Ok(()) => (), - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => { + Err(PohRecorderError::MaxHeightReached) => { // If record errors, add all the committable transactions (the ones // we just attempted to record) as retryable return ( - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)), + Err(PohRecorderError::MaxHeightReached), processed_transactions_indexes, ); } @@ -504,7 +496,7 @@ impl BankingStage { poh: &Arc>, batch: &TransactionBatch, transaction_status_sender: Option, - ) -> (Result, Vec) { + ) -> (Result, Vec) { let mut load_execute_time = Measure::start("load_execute_time"); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce // the likelihood of any single thread getting starved and processing old ids. @@ -580,7 +572,7 @@ impl BankingStage { poh: &Arc>, chunk_offset: usize, transaction_status_sender: Option, - ) -> (Result, Vec) { + ) -> (Result, Vec) { let mut lock_time = Measure::start("lock_time"); // Once accounts are locked, other threads cannot encode transactions that will modify the // same account state @@ -641,7 +633,7 @@ impl BankingStage { // Add the retryable txs (transactions that errored in a way that warrants a retry) // to the list of unprocessed txs. unprocessed_txs.extend_from_slice(&retryable_txs_in_chunk); - if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result { + if let Err(PohRecorderError::MaxHeightReached) = result { info!( "process transactions: max height reached slot: {} height: {}", bank.slot(), @@ -861,7 +853,7 @@ impl BankingStage { id: u32, batch_limit: usize, transaction_status_sender: Option, - ) -> Result { + ) -> Result { let mut recv_time = Measure::start("process_packets_recv"); let mms = verified_receiver.recv_timeout(recv_timeout)?; recv_time.stop(); @@ -1436,10 +1428,7 @@ mod tests { &results, &poh_recorder, ); - assert_matches!( - res, - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) - ); + assert_matches!(res, Err(PohRecorderError::MaxHeightReached)); // The first result was an error so it's filtered out. The second result was Ok(), // so it should be marked as retryable assert_eq!(retryable, vec![1]); @@ -1757,7 +1746,7 @@ mod tests { None, ) .0, - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + Err(PohRecorderError::MaxHeightReached) ); assert_eq!(bank.get_balance(&pubkey), 1); diff --git a/core/src/blockstream.rs b/core/src/blockstream.rs index 31869c84d9..25e72b229e 100644 --- a/core/src/blockstream.rs +++ b/core/src/blockstream.rs @@ -2,13 +2,13 @@ //! local unix socket, to provide client services such as a block explorer with //! real-time access to entries. -use crate::result::Result; use bincode::serialize; use chrono::{SecondsFormat, Utc}; use serde_json::json; use solana_ledger::entry::Entry; use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; use std::cell::RefCell; +use std::io::Result; use std::path::{Path, PathBuf}; pub trait EntryWriter: std::fmt::Debug { @@ -61,10 +61,10 @@ impl EntryWriter for EntrySocket { } #[cfg(windows)] fn write(&self, _payload: String) -> Result<()> { - Err(crate::result::Error::from(std::io::Error::new( + Err(std::io::Error::new( std::io::ErrorKind::Other, "EntryWriter::write() not implemented for windows", - ))) + )) } } diff --git a/core/src/commitment.rs b/core/src/commitment.rs index 9903fd57ac..d59edf69e1 100644 --- a/core/src/commitment.rs +++ b/core/src/commitment.rs @@ -1,4 +1,3 @@ -use crate::result::{Error, Result}; use solana_runtime::bank::Bank; use solana_sdk::clock::Slot; use solana_vote_program::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY}; @@ -113,15 +112,10 @@ impl AggregateCommitmentService { break; } - if let Err(e) = Self::run(&receiver, &block_commitment_cache, &exit_) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => info!( - "Unexpected error from AggregateCommitmentService: {:?}", - e - ), - } + if let Err(RecvTimeoutError::Disconnected) = + Self::run(&receiver, &block_commitment_cache, &exit_) + { + break; } }) .unwrap(), @@ -133,7 +127,7 @@ impl AggregateCommitmentService { receiver: &Receiver, block_commitment_cache: &RwLock, exit: &Arc, - ) -> Result<()> { + ) -> Result<(), RecvTimeoutError> { loop { if exit.load(Ordering::Relaxed) { return Ok(()); diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 553e3942ea..2b430343c3 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -1,6 +1,5 @@ //! The `ledger_cleanup_service` drops older ledger data to limit disk space usage -use crate::result::{Error, Result}; use solana_ledger::blocktree::Blocktree; use solana_metrics::datapoint_debug; use solana_sdk::clock::Slot; @@ -51,9 +50,8 @@ impl LedgerCleanupService { &mut next_purge_batch, ) { match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => info!("Error from cleanup_ledger: {:?}", e), + RecvTimeoutError::Disconnected => break, + RecvTimeoutError::Timeout => (), } } }) @@ -66,7 +64,7 @@ impl LedgerCleanupService { blocktree: &Arc, max_ledger_slots: u64, next_purge_batch: &mut u64, - ) -> Result<()> { + ) -> Result<(), RecvTimeoutError> { let disk_utilization_pre = blocktree.storage_size(); let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?; diff --git a/core/src/lib.rs b/core/src/lib.rs index febbb40b61..faedb4ea1b 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -40,7 +40,7 @@ pub mod poh_service; pub mod recvmmsg; pub mod repair_service; pub mod replay_stage; -pub mod result; +mod result; pub mod retransmit_stage; pub mod rpc; pub mod rpc_pubsub; diff --git a/core/src/packet.rs b/core/src/packet.rs index 43f723e294..4fee75f5bd 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -1,8 +1,5 @@ //! The `packet` module defines data structures and methods to pull data from the network. -use crate::{ - recvmmsg::{recv_mmsg, NUM_RCVMMSGS}, - result::{Error, Result}, -}; +use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; pub use solana_perf::packet::{ limited_deserialize, to_packets, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS, PACKETS_BATCH_SIZE, PACKETS_PER_BATCH, @@ -10,7 +7,7 @@ pub use solana_perf::packet::{ use solana_metrics::inc_new_counter_debug; pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; -use std::{net::UdpSocket, time::Instant}; +use std::{io::Result, net::UdpSocket, time::Instant}; pub fn recv_from(obj: &mut Packets, socket: &UdpSocket) -> Result { let mut i = 0; @@ -34,7 +31,7 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket) -> Result { } Err(e) => { trace!("recv_from err {:?}", e); - return Err(Error::IO(e)); + return Err(e); } Ok((size, npkts)) => { if i == 0 { diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 423383a17e..0d2cebaf0f 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -10,7 +10,6 @@ //! For Entries: //! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::max_tick_height //! -use crate::result::{Error, Result}; use solana_ledger::blocktree::Blocktree; use solana_ledger::entry::Entry; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; @@ -27,17 +26,28 @@ use std::cmp; use std::sync::mpsc::{channel, Receiver, SendError, Sender, SyncSender}; use std::sync::{Arc, Mutex}; use std::time::Instant; +use thiserror::Error; const GRACE_TICKS_FACTOR: u64 = 2; const MAX_GRACE_SLOTS: u64 = 3; -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Error, Debug, Clone)] pub enum PohRecorderError { + #[error("invalid calling object")] InvalidCallingObject, + + #[error("max height reached")] MaxHeightReached, + + #[error("min height not reached")] MinHeightNotReached, + + #[error("send WorkingBankEntry error")] + SendError(#[from] SendError), } +type Result = std::result::Result; + pub type WorkingBankEntry = (Arc, (Entry, u64)); #[derive(Clone)] @@ -248,16 +258,12 @@ impl PohRecorder { let working_bank = self .working_bank .as_ref() - .ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?; + .ok_or(PohRecorderError::MaxHeightReached)?; if self.tick_height < working_bank.min_tick_height { - return Err(Error::PohRecorderError( - PohRecorderError::MinHeightNotReached, - )); + return Err(PohRecorderError::MinHeightNotReached); } if tick && self.tick_height == working_bank.min_tick_height { - return Err(Error::PohRecorderError( - PohRecorderError::MinHeightNotReached, - )); + return Err(PohRecorderError::MinHeightNotReached); } let entry_count = self @@ -358,9 +364,9 @@ impl PohRecorder { let working_bank = self .working_bank .as_ref() - .ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?; + .ok_or(PohRecorderError::MaxHeightReached)?; if bank_slot != working_bank.bank.slot() { - return Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)); + return Err(PohRecorderError::MaxHeightReached); } { @@ -757,7 +763,7 @@ mod tests { let h1 = hash(b"hello world!"); assert_matches!( poh_recorder.record(bank.slot() + 1, h1, vec![tx.clone()]), - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + Err(PohRecorderError::MaxHeightReached) ); } Blocktree::destroy(&ledger_path).unwrap(); diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index 10136ef805..554ac05346 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -122,7 +122,6 @@ mod tests { use super::*; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use crate::poh_recorder::WorkingBank; - use crate::result::Result; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::{blocktree::Blocktree, get_tmp_ledger_path}; use solana_perf::test_tx::test_tx; @@ -164,7 +163,7 @@ mod tests { max_tick_height: std::u64::MAX, }; - let entry_producer: JoinHandle> = { + let entry_producer = { let poh_recorder = poh_recorder.clone(); let exit = exit.clone(); @@ -181,7 +180,7 @@ mod tests { .record(bank.slot(), h1, vec![tx]); if exit.load(Ordering::Relaxed) { - break Ok(()); + break; } } }) diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 469c5a7178..31091ce15a 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -6,10 +6,9 @@ //! if perf-libs are available use crate::packet::Packets; -use crate::result::{Error, Result}; use crate::sigverify; -use crate::streamer::{self, PacketReceiver}; -use crossbeam_channel::Sender as CrossbeamSender; +use crate::streamer::{self, PacketReceiver, StreamerError}; +use crossbeam_channel::{SendError, Sender as CrossbeamSender}; use solana_measure::measure::Measure; use solana_metrics::datapoint_debug; use solana_perf::perf_libs; @@ -17,10 +16,22 @@ use solana_sdk::timing; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex}; use std::thread::{self, Builder, JoinHandle}; +use thiserror::Error; const RECV_BATCH_MAX_CPU: usize = 1_000; const RECV_BATCH_MAX_GPU: usize = 5_000; +#[derive(Error, Debug)] +pub enum SigVerifyServiceError { + #[error("send packets batch error")] + SendError(#[from] SendError>), + + #[error("streamer error")] + StreamerError(#[from] StreamerError), +} + +type Result = std::result::Result; + pub struct SigVerifyStage { thread_hdls: Vec>, } @@ -78,9 +89,7 @@ impl SigVerifyStage { let verified_batch = verifier.verify_batch(batch); for v in verified_batch { - if sendr.send(vec![v]).is_err() { - return Err(Error::SendError); - } + sendr.send(vec![v])?; } verify_batch_time.stop(); @@ -118,9 +127,13 @@ impl SigVerifyStage { .spawn(move || loop { if let Err(e) = Self::verifier(&packet_receiver, &verified_sender, id, &verifier) { match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::SendError => { + SigVerifyServiceError::StreamerError(StreamerError::RecvTimeoutError( + RecvTimeoutError::Disconnected, + )) => break, + SigVerifyServiceError::StreamerError(StreamerError::RecvTimeoutError( + RecvTimeoutError::Timeout, + )) => (), + SigVerifyServiceError::SendError(_) => { break; } _ => error!("{:?}", e), diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index c0f6312249..2be2932d46 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,14 +1,14 @@ -use crate::result::{Error, Result}; use bincode::serialize_into; use solana_ledger::snapshot_package::{SnapshotPackage, SnapshotPackageReceiver}; -use solana_ledger::snapshot_utils::{self, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR}; +use solana_ledger::snapshot_utils::{self, SnapshotError, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR}; use solana_measure::measure::Measure; use solana_metrics::datapoint_info; use solana_runtime::status_cache::SlotDelta; use solana_sdk::transaction::Result as TransactionResult; use std::fs; use std::fs::File; -use std::io::{BufWriter, Error as IOError, ErrorKind}; +use std::io::BufWriter; +use std::process::ExitStatus; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::RecvTimeoutError; use std::sync::Arc; @@ -16,6 +16,30 @@ use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use symlink; use tempfile::TempDir; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum SnapshotServiceError { + #[error("I/O error")] + IO(#[from] std::io::Error), + + #[error("serialization error")] + Serialize(#[from] Box), + + #[error("receive timeout error")] + RecvTimeoutError(#[from] RecvTimeoutError), + + #[error("snapshot error")] + SnapshotError(#[from] SnapshotError), + + #[error("archive generation failure {0}")] + ArchiveGenerationFailure(ExitStatus), + + #[error("storage path symlink is invalid")] + StoragePathSymlinkInvalid, +} + +type Result = std::result::Result; pub struct SnapshotPackagerService { t_snapshot_packager: JoinHandle<()>, @@ -32,8 +56,10 @@ impl SnapshotPackagerService { } if let Err(e) = Self::run(&snapshot_package_receiver) { match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + SnapshotServiceError::RecvTimeoutError(RecvTimeoutError::Disconnected) => { + break + } + SnapshotServiceError::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => info!("Error from package_snapshots: {:?}", e), } } @@ -91,9 +117,7 @@ impl SnapshotPackagerService { fs::canonicalize(storage_path).expect("Could not get absolute path for accounts"); symlink::symlink_dir(storage_path, &output_path)?; if !output_path.is_file() { - return Err(Self::get_io_error( - "Error trying to generate snapshot archive: storage path symlink is invalid", - )); + return Err(SnapshotServiceError::StoragePathSymlinkInvalid); } } @@ -115,10 +139,9 @@ impl SnapshotPackagerService { info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?")); info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?")); - return Err(Self::get_io_error(&format!( - "Error trying to generate snapshot archive: {}", - output.status - ))); + return Err(SnapshotServiceError::ArchiveGenerationFailure( + output.status, + )); } // Once everything is successful, overwrite the previous tarball so that other validators @@ -152,11 +175,6 @@ impl SnapshotPackagerService { Ok(()) } - fn get_io_error(error: &str) -> Error { - warn!("Snapshot Packaging Error: {:?}", error); - Error::IO(IOError::new(ErrorKind::Other, error)) - } - fn serialize_status_cache( slot_deltas: &[SlotDelta>], snapshot_links: &TempDir, @@ -172,8 +190,7 @@ impl SnapshotPackagerService { let mut status_cache_serialize = Measure::start("status_cache_serialize-ms"); // write the status cache - serialize_into(&mut status_cache_stream, slot_deltas) - .map_err(|_| Self::get_io_error("serialize status cache error"))?; + serialize_into(&mut status_cache_stream, slot_deltas)?; status_cache_serialize.stop(); inc_new_counter_info!( "serialize-status-cache-ms", diff --git a/core/src/streamer.rs b/core/src/streamer.rs index 8224417604..be67fcdfc8 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -3,19 +3,33 @@ use crate::packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH}; use crate::recvmmsg::NUM_RCVMMSGS; -use crate::result::{Error, Result}; use crate::thread_mem_usage; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; +use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender}; use std::sync::Arc; use std::thread::{Builder, JoinHandle}; use std::time::{Duration, Instant}; +use thiserror::Error; pub type PacketReceiver = Receiver; pub type PacketSender = Sender; +#[derive(Error, Debug)] +pub enum StreamerError { + #[error("I/O error")] + IO(#[from] std::io::Error), + + #[error("receive timeout error")] + RecvTimeoutError(#[from] RecvTimeoutError), + + #[error("send packets error")] + SendError(#[from] SendError), +} + +pub type Result = std::result::Result; + fn recv_loop( sock: &UdpSocket, exit: Arc, @@ -117,8 +131,8 @@ pub fn responder(name: &'static str, sock: Arc, r: PacketReceiver) -> thread_mem_usage::datapoint(name); if let Err(e) = recv_send(&sock, &r) { match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + StreamerError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + StreamerError::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => warn!("{} responder error: {:?}", name, e), } } diff --git a/core/src/transaction_status_service.rs b/core/src/transaction_status_service.rs index ba6c06a52e..bcc795b7f2 100644 --- a/core/src/transaction_status_service.rs +++ b/core/src/transaction_status_service.rs @@ -1,4 +1,3 @@ -use crate::result::{Error, Result}; use crossbeam_channel::{Receiver, RecvTimeoutError}; use solana_client::rpc_request::RpcTransactionStatus; use solana_ledger::{blocktree::Blocktree, blocktree_processor::TransactionStatusBatch}; @@ -30,15 +29,11 @@ impl TransactionStatusService { if exit.load(Ordering::Relaxed) { break; } - if let Err(e) = Self::write_transaction_status_batch( + if let Err(RecvTimeoutError::Disconnected) = Self::write_transaction_status_batch( &write_transaction_status_receiver, &blocktree, ) { - match e { - Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => info!("Error from write_transaction_status_batch: {:?}", e), - } + break; } }) .unwrap(); @@ -48,7 +43,7 @@ impl TransactionStatusService { fn write_transaction_status_batch( write_transaction_status_receiver: &Receiver, blocktree: &Arc, - ) -> Result<()> { + ) -> Result<(), RecvTimeoutError> { let TransactionStatusBatch { bank, transactions, diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index e2193f939b..0a4911bf10 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -6,7 +6,6 @@ use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::gossip_service::GossipService; use solana_core::packet::Packet; -use solana_core::result; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::timestamp; use std::net::UdpSocket; @@ -66,7 +65,7 @@ where } /// ring a -> b -> c -> d -> e -> a #[test] -fn gossip_ring() -> result::Result<()> { +fn gossip_ring() { solana_logger::setup(); run_gossip_topo(50, |listen| { let num = listen.len(); @@ -80,14 +79,12 @@ fn gossip_ring() -> result::Result<()> { xv.insert_info(d); } }); - - Ok(()) } /// ring a -> b -> c -> d -> e -> a #[test] #[ignore] -fn gossip_ring_large() -> result::Result<()> { +fn gossip_ring_large() { solana_logger::setup(); run_gossip_topo(600, |listen| { let num = listen.len(); @@ -101,8 +98,6 @@ fn gossip_ring_large() -> result::Result<()> { xv.insert_info(d); } }); - - Ok(()) } /// star a -> (b,c,d,e) #[test] @@ -144,7 +139,7 @@ fn gossip_rstar() { } #[test] -pub fn cluster_info_retransmit() -> result::Result<()> { +pub fn cluster_info_retransmit() { solana_logger::setup(); let exit = Arc::new(AtomicBool::new(false)); trace!("c1:"); @@ -177,7 +172,7 @@ pub fn cluster_info_retransmit() -> result::Result<()> { p.meta.size = 10; let peers = c1.read().unwrap().retransmit_peers(); let retransmit_peers: Vec<_> = peers.iter().collect(); - ClusterInfo::retransmit_to(&retransmit_peers, &mut p, None, &tn1, false)?; + ClusterInfo::retransmit_to(&retransmit_peers, &mut p, None, &tn1, false).unwrap(); let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| { @@ -194,6 +189,4 @@ pub fn cluster_info_retransmit() -> result::Result<()> { dr1.join().unwrap(); dr2.join().unwrap(); dr3.join().unwrap(); - - Ok(()) }