Pacify clippy

This commit is contained in:
Greg Fitzgerald 2018-09-27 14:49:50 -06:00
parent f9fe6a0f72
commit 423e7ebc3f
7 changed files with 34 additions and 38 deletions

View File

@ -97,8 +97,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let len = x.read().unwrap().packets.len(); let len = x.read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect()) (x, iter::repeat(1).take(len).collect())
}).collect(); }).collect();
let (_stage, signal_receiver) = let (_stage, signal_receiver) = BankingStage::new(&bank, verified_receiver, Default::default());
BankingStage::new(bank.clone(), verified_receiver, Default::default());
bencher.iter(move || { bencher.iter(move || {
for v in verified.chunks(verified.len() / NUM_THREADS) { for v in verified.chunks(verified.len() / NUM_THREADS) {
verified_sender.send(v.to_vec()).unwrap(); verified_sender.send(v.to_vec()).unwrap();

View File

@ -88,6 +88,7 @@ pub enum BankError {
} }
pub type Result<T> = result::Result<T, BankError>; pub type Result<T> = result::Result<T, BankError>;
type SignatureStatusMap = HashMap<Signature, Result<()>>;
#[derive(Default)] #[derive(Default)]
struct ErrorCounters { struct ErrorCounters {
@ -95,6 +96,7 @@ struct ErrorCounters {
account_not_found_leader: usize, account_not_found_leader: usize,
account_not_found_vote: usize, account_not_found_vote: usize,
} }
/// The state of all accounts and contracts after processing its entries. /// The state of all accounts and contracts after processing its entries.
pub struct Bank { pub struct Bank {
/// A map of account public keys to the balance in that account. /// A map of account public keys to the balance in that account.
@ -107,7 +109,7 @@ pub struct Bank {
/// Mapping of hashes to signature sets along with timestamp. The bank uses this data to /// Mapping of hashes to signature sets along with timestamp. The bank uses this data to
/// reject transactions with signatures its seen before /// reject transactions with signatures its seen before
last_ids_sigs: RwLock<HashMap<Hash, (HashMap<Signature, Result<()>>, u64)>>, last_ids_sigs: RwLock<HashMap<Hash, (SignatureStatusMap, u64)>>,
/// The number of transactions the bank has processed without error since the /// The number of transactions the bank has processed without error since the
/// start of the ledger. /// start of the ledger.
@ -184,10 +186,7 @@ impl Bank {
} }
/// Store the given signature. The bank will reject any transaction with the same signature. /// Store the given signature. The bank will reject any transaction with the same signature.
fn reserve_signature( fn reserve_signature(signatures: &mut SignatureStatusMap, signature: &Signature) -> Result<()> {
signatures: &mut HashMap<Signature, Result<()>>,
signature: &Signature,
) -> Result<()> {
if let Some(_result) = signatures.get(signature) { if let Some(_result) = signatures.get(signature) {
return Err(BankError::DuplicateSignature); return Err(BankError::DuplicateSignature);
} }
@ -215,7 +214,7 @@ impl Bank {
} }
fn update_signature_status( fn update_signature_status(
signatures: &mut HashMap<Signature, Result<()>>, signatures: &mut SignatureStatusMap,
signature: &Signature, signature: &Signature,
result: &Result<()>, result: &Result<()>,
) { ) {

View File

@ -8,11 +8,12 @@ use budget_transaction::BudgetTransaction;
use counter::Counter; use counter::Counter;
use entry::Entry; use entry::Entry;
use log::Level; use log::Level;
use packet::{Packets, SharedPackets}; use packet::Packets;
use poh_recorder::PohRecorder; use poh_recorder::PohRecorder;
use rayon::prelude::*; use rayon::prelude::*;
use result::{Error, Result}; use result::{Error, Result};
use service::Service; use service::Service;
use sigverify_stage::VerifiedPackets;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
@ -50,8 +51,8 @@ impl Default for Config {
impl BankingStage { impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped. /// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
pub fn new( pub fn new(
bank: Arc<Bank>, bank: &Arc<Bank>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: Receiver<VerifiedPackets>,
config: Config, config: Config,
) -> (Self, Receiver<Vec<Entry>>) { ) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
@ -69,7 +70,7 @@ impl BankingStage {
let tick_producer = Builder::new() let tick_producer = Builder::new()
.name("solana-banking-stage-tick_producer".to_string()) .name("solana-banking-stage-tick_producer".to_string())
.spawn(move || { .spawn(move || {
if let Err(e) = Self::tick_producer(tick_poh, config, &poh_exit) { if let Err(e) = Self::tick_producer(&tick_poh, &config, &poh_exit) {
match e { match e {
Error::SendError => (), Error::SendError => (),
_ => error!( _ => error!(
@ -134,9 +135,9 @@ impl BankingStage {
}).collect() }).collect()
} }
fn tick_producer(poh: PohRecorder, config: Config, poh_exit: &AtomicBool) -> Result<()> { fn tick_producer(poh: &PohRecorder, config: &Config, poh_exit: &AtomicBool) -> Result<()> {
loop { loop {
match config { match *config {
Config::Tick(num) => { Config::Tick(num) => {
for _ in 0..num { for _ in 0..num {
poh.hash(); poh.hash();
@ -156,7 +157,7 @@ impl BankingStage {
fn process_transactions( fn process_transactions(
bank: &Arc<Bank>, bank: &Arc<Bank>,
transactions: Vec<Transaction>, transactions: &[Transaction],
poh: &PohRecorder, poh: &PohRecorder,
) -> Result<()> { ) -> Result<()> {
debug!("transactions: {}", transactions.len()); debug!("transactions: {}", transactions.len());
@ -192,7 +193,7 @@ impl BankingStage {
/// Discard packets via `packet_recycler`. /// Discard packets via `packet_recycler`.
pub fn process_packets( pub fn process_packets(
bank: &Arc<Bank>, bank: &Arc<Bank>,
verified_receiver: &Arc<Mutex<Receiver<Vec<(SharedPackets, Vec<u8>)>>>>, verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
poh: &PohRecorder, poh: &PohRecorder,
) -> Result<()> { ) -> Result<()> {
let recv_start = Instant::now(); let recv_start = Instant::now();
@ -230,7 +231,7 @@ impl BankingStage {
}, },
}).collect(); }).collect();
debug!("verified transactions {}", transactions.len()); debug!("verified transactions {}", transactions.len());
Self::process_transactions(bank, transactions, poh)?; Self::process_transactions(bank, &transactions, poh)?;
} }
inc_new_counter_info!( inc_new_counter_info!(
@ -284,7 +285,7 @@ mod tests {
let bank = Bank::new(&Mint::new(2)); let bank = Bank::new(&Mint::new(2));
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (banking_stage, _entry_receiver) = let (banking_stage, _entry_receiver) =
BankingStage::new(Arc::new(bank), verified_receiver, Default::default()); BankingStage::new(&Arc::new(bank), verified_receiver, Default::default());
drop(verified_sender); drop(verified_sender);
assert_eq!(banking_stage.join().unwrap(), ()); assert_eq!(banking_stage.join().unwrap(), ());
} }
@ -294,7 +295,7 @@ mod tests {
let bank = Bank::new(&Mint::new(2)); let bank = Bank::new(&Mint::new(2));
let (_verified_sender, verified_receiver) = channel(); let (_verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) = let (banking_stage, entry_receiver) =
BankingStage::new(Arc::new(bank), verified_receiver, Default::default()); BankingStage::new(&Arc::new(bank), verified_receiver, Default::default());
drop(entry_receiver); drop(entry_receiver);
assert_eq!(banking_stage.join().unwrap(), ()); assert_eq!(banking_stage.join().unwrap(), ());
} }
@ -305,7 +306,7 @@ mod tests {
let start_hash = bank.last_id(); let start_hash = bank.last_id();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) = BankingStage::new( let (banking_stage, entry_receiver) = BankingStage::new(
bank.clone(), &bank,
verified_receiver, verified_receiver,
Config::Sleep(Duration::from_millis(1)), Config::Sleep(Duration::from_millis(1)),
); );
@ -326,7 +327,7 @@ mod tests {
let start_hash = bank.last_id(); let start_hash = bank.last_id();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) = let (banking_stage, entry_receiver) =
BankingStage::new(bank, verified_receiver, Default::default()); BankingStage::new(&bank, verified_receiver, Default::default());
// good tx // good tx
let keypair = mint.keypair(); let keypair = mint.keypair();
@ -372,7 +373,7 @@ mod tests {
let bank = Arc::new(Bank::new(&mint)); let bank = Arc::new(Bank::new(&mint));
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) = let (banking_stage, entry_receiver) =
BankingStage::new(bank.clone(), verified_receiver, Default::default()); BankingStage::new(&bank, verified_receiver, Default::default());
// Process a batch that includes a transaction that receives two tokens. // Process a batch that includes a transaction that receives two tokens.
let alice = Keypair::new(); let alice = Keypair::new();

View File

@ -31,19 +31,15 @@ fn recv_loop(
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return Ok(()); return Ok(());
} }
let result = msgs.write().unwrap().recv_from(sock); if msgs.write().unwrap().recv_from(sock).is_ok() {
match result { let len = msgs.read().unwrap().packets.len();
Ok(()) => { metrics::submit(
let len = msgs.read().unwrap().packets.len(); influxdb::Point::new(channel_tag)
metrics::submit( .add_field("count", influxdb::Value::Integer(len as i64))
influxdb::Point::new(channel_tag) .to_owned(),
.add_field("count", influxdb::Value::Integer(len as i64)) );
.to_owned(), channel.send(msgs)?;
); break;
channel.send(msgs)?;
break;
}
Err(_) => (),
} }
} }
} }

View File

@ -71,7 +71,7 @@ impl Tpu {
SigVerifyStage::new(packet_receiver, sigverify_disabled); SigVerifyStage::new(packet_receiver, sigverify_disabled);
let (banking_stage, entry_receiver) = let (banking_stage, entry_receiver) =
BankingStage::new(bank.clone(), verified_receiver, tick_duration); BankingStage::new(&bank, verified_receiver, tick_duration);
let (write_stage, entry_forwarder) = WriteStage::new( let (write_stage, entry_forwarder) = WriteStage::new(
keypair, keypair,

View File

@ -63,6 +63,7 @@ pub trait WindowUtil {
fn print(&self, id: &Pubkey, consumed: u64) -> String; fn print(&self, id: &Pubkey, consumed: u64) -> String;
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
fn process_blob( fn process_blob(
&mut self, &mut self,
id: &Pubkey, id: &Pubkey,

View File

@ -145,7 +145,7 @@ fn recv_window(
retransmit: &BlobSender, retransmit: &BlobSender,
pending_retransmits: &mut bool, pending_retransmits: &mut bool,
leader_rotation_interval: u64, leader_rotation_interval: u64,
done: Arc<AtomicBool>, done: &Arc<AtomicBool>,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?; let mut dq = r.recv_timeout(timer)?;
@ -292,7 +292,7 @@ pub fn window_service(
&retransmit, &retransmit,
&mut pending_retransmits, &mut pending_retransmits,
leader_rotation_interval, leader_rotation_interval,
done.clone(), &done,
) { ) {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,