From 4aedd3f1b620e7b7a2bf2f6db9acc0cc8caa9abe Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 27 Jun 2018 12:33:56 -0600 Subject: [PATCH] Cleanup type aliases and imports --- src/banking_stage.rs | 9 ++++----- src/blob_fetch_stage.rs | 14 +++++--------- src/entry_writer.rs | 8 ++++---- src/fetch_stage.rs | 14 +++++--------- src/ncp.rs | 14 +++++++------- src/packet.rs | 11 ++++------- src/recorder.rs | 4 ++-- src/replicate_stage.rs | 12 ++++++------ src/request_stage.rs | 21 ++++++++++----------- src/rpu.rs | 6 +++--- src/server.rs | 4 ++-- src/sigverify_stage.rs | 8 ++++---- src/streamer.rs | 14 ++++++++------ src/tvu.rs | 12 ++++++------ src/window_stage.rs | 12 ++++++------ src/write_stage.rs | 8 ++++---- 16 files changed, 80 insertions(+), 91 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 9b40bffee..fa1e98990 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -5,8 +5,7 @@ use bank::Bank; use bincode::deserialize; use counter::Counter; -use packet; -use packet::SharedPackets; +use packet::{PacketRecycler, Packets, SharedPackets}; use rayon::prelude::*; use record_stage::Signal; use result::Result; @@ -38,7 +37,7 @@ impl BankingStage { bank: Arc, exit: Arc, verified_receiver: Receiver)>>, - packet_recycler: packet::PacketRecycler, + packet_recycler: PacketRecycler, ) -> Self { let (signal_sender, signal_receiver) = channel(); let thread_hdl = Builder::new() @@ -65,7 +64,7 @@ impl BankingStage { /// Convert the transactions from a blob of binary data to a vector of transactions and /// an unused `SocketAddr` that could be used to send a response. - fn deserialize_transactions(p: &packet::Packets) -> Vec> { + fn deserialize_transactions(p: &Packets) -> Vec> { p.packets .par_iter() .map(|x| { @@ -82,7 +81,7 @@ impl BankingStage { bank: Arc, verified_receiver: &Receiver)>>, signal_sender: &Sender, - packet_recycler: &packet::PacketRecycler, + packet_recycler: &PacketRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let recv_start = Instant::now(); diff --git a/src/blob_fetch_stage.rs b/src/blob_fetch_stage.rs index 82a88bd3d..f67839b98 100644 --- a/src/blob_fetch_stage.rs +++ b/src/blob_fetch_stage.rs @@ -1,30 +1,26 @@ //! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel. -use packet; +use packet::BlobRecycler; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::JoinHandle; -use streamer; +use streamer::{self, BlobReceiver}; pub struct BlobFetchStage { - pub blob_receiver: streamer::BlobReceiver, + pub blob_receiver: BlobReceiver, pub thread_hdls: Vec>, } impl BlobFetchStage { - pub fn new( - socket: UdpSocket, - exit: Arc, - blob_recycler: packet::BlobRecycler, - ) -> Self { + pub fn new(socket: UdpSocket, exit: Arc, blob_recycler: BlobRecycler) -> Self { Self::new_multi_socket(vec![socket], exit, blob_recycler) } pub fn new_multi_socket( sockets: Vec, exit: Arc, - blob_recycler: packet::BlobRecycler, + blob_recycler: BlobRecycler, ) -> Self { let (blob_sender, blob_receiver) = channel(); let thread_hdls: Vec<_> = sockets diff --git a/src/entry_writer.rs b/src/entry_writer.rs index 1693fa6d2..ad52aa087 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -5,7 +5,7 @@ use bank::Bank; use entry::Entry; use ledger::Block; -use packet; +use packet::BlobRecycler; use result::Result; use serde_json; use std::collections::VecDeque; @@ -14,7 +14,7 @@ use std::io::Write; use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex}; use std::time::Duration; -use streamer; +use streamer::BlobSender; pub struct EntryWriter<'a> { bank: &'a Bank, @@ -57,8 +57,8 @@ impl<'a> EntryWriter<'a> { /// continuosly broadcast blobs of entries out pub fn write_and_send_entries( &self, - broadcast: &streamer::BlobSender, - blob_recycler: &packet::BlobRecycler, + broadcast: &BlobSender, + blob_recycler: &BlobRecycler, writer: &Mutex, entry_receiver: &Receiver, ) -> Result<()> { diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index 4eb5f8102..c73962eae 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -1,30 +1,26 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. -use packet; +use packet::PacketRecycler; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::JoinHandle; -use streamer; +use streamer::{self, PacketReceiver}; pub struct FetchStage { - pub packet_receiver: streamer::PacketReceiver, + pub packet_receiver: PacketReceiver, pub thread_hdls: Vec>, } impl FetchStage { - pub fn new( - socket: UdpSocket, - exit: Arc, - packet_recycler: packet::PacketRecycler, - ) -> Self { + pub fn new(socket: UdpSocket, exit: Arc, packet_recycler: PacketRecycler) -> Self { Self::new_multi_socket(vec![socket], exit, packet_recycler) } pub fn new_multi_socket( sockets: Vec, exit: Arc, - packet_recycler: packet::PacketRecycler, + packet_recycler: PacketRecycler, ) -> Self { let (packet_sender, packet_receiver) = channel(); let thread_hdls: Vec<_> = sockets diff --git a/src/ncp.rs b/src/ncp.rs index b4f8b759f..883f1e17a 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -1,7 +1,7 @@ //! The `ncp` module implements the network control plane. -use crdt; -use packet; +use crdt::Crdt; +use packet::{BlobRecycler, SharedBlob}; use result::Result; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; @@ -16,13 +16,13 @@ pub struct Ncp { impl Ncp { pub fn new( - crdt: Arc>, - window: Arc>>>, + crdt: Arc>, + window: Arc>>>, gossip_listen_socket: UdpSocket, gossip_send_socket: UdpSocket, exit: Arc, ) -> Result { - let blob_recycler = packet::BlobRecycler::default(); + let blob_recycler = BlobRecycler::default(); let (request_sender, request_receiver) = channel(); trace!( "Ncp: id: {:?}, listening on: {:?}", @@ -42,7 +42,7 @@ impl Ncp { blob_recycler.clone(), response_receiver, ); - let t_listen = crdt::Crdt::listen( + let t_listen = Crdt::listen( crdt.clone(), window, blob_recycler.clone(), @@ -50,7 +50,7 @@ impl Ncp { response_sender.clone(), exit.clone(), ); - let t_gossip = crdt::Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit); + let t_gossip = Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit); let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; Ok(Ncp { thread_hdls }) } diff --git a/src/packet.rs b/src/packet.rs index 7f117bd33..99382caee 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -16,6 +16,7 @@ use std::time::Instant; pub type SharedPackets = Arc>; pub type SharedBlob = Arc>; +pub type SharedBlobs = VecDeque; pub type PacketRecycler = Recycler; pub type BlobRecycler = Recycler; @@ -274,7 +275,7 @@ pub fn to_blob( pub fn to_blobs( rsps: Vec<(T, SocketAddr)>, blob_recycler: &BlobRecycler, -) -> Result> { +) -> Result { let mut blobs = VecDeque::new(); for (resp, rsp_addr) in rsps { blobs.push_back(to_blob(resp, rsp_addr, blob_recycler)?); @@ -367,7 +368,7 @@ impl Blob { self.meta.size = new_size; self.set_data_size(new_size as u64).unwrap(); } - pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result> { + pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result { let mut v = VecDeque::new(); //DOCUMENTED SIDE-EFFECT //Performance out of the IO without poll @@ -405,11 +406,7 @@ impl Blob { } Ok(v) } - pub fn send_to( - re: &BlobRecycler, - socket: &UdpSocket, - v: &mut VecDeque, - ) -> Result<()> { + pub fn send_to(re: &BlobRecycler, socket: &UdpSocket, v: &mut SharedBlobs) -> Result<()> { while let Some(r) = v.pop_front() { { let p = r.read().expect("'r' read lock in pub fn send_to"); diff --git a/src/recorder.rs b/src/recorder.rs index 68293a3d7..8e5f46560 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -3,7 +3,7 @@ use entry::Entry; use hash::{hash, Hash}; -use ledger::next_entries_mut; +use ledger; use std::time::{Duration, Instant}; use transaction::Transaction; @@ -28,7 +28,7 @@ impl Recorder { } pub fn record(&mut self, transactions: Vec) -> Vec { - next_entries_mut(&mut self.last_hash, &mut self.num_hashes, transactions) + ledger::next_entries_mut(&mut self.last_hash, &mut self.num_hashes, transactions) } pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option { diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index f0564fa41..d23d57be2 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -2,13 +2,13 @@ use bank::Bank; use ledger; -use packet; +use packet::BlobRecycler; use result::Result; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread::{Builder, JoinHandle}; use std::time::Duration; -use streamer; +use streamer::BlobReceiver; pub struct ReplicateStage { pub thread_hdl: JoinHandle<()>, @@ -18,8 +18,8 @@ impl ReplicateStage { /// Process entry blobs, already in order fn replicate_requests( bank: &Arc, - blob_receiver: &streamer::BlobReceiver, - blob_recycler: &packet::BlobRecycler, + blob_receiver: &BlobReceiver, + blob_recycler: &BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let blobs = blob_receiver.recv_timeout(timer)?; @@ -36,8 +36,8 @@ impl ReplicateStage { pub fn new( bank: Arc, exit: Arc, - window_receiver: streamer::BlobReceiver, - blob_recycler: packet::BlobRecycler, + window_receiver: BlobReceiver, + blob_recycler: BlobRecycler, ) -> Self { let thread_hdl = Builder::new() .name("solana-replicate-stage".to_string()) diff --git a/src/request_stage.rs b/src/request_stage.rs index 520a095d6..878cc97fc 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -1,8 +1,7 @@ //! The `request_stage` processes thin client Request messages. use bincode::deserialize; -use packet; -use packet::SharedPackets; +use packet::{to_blobs, BlobRecycler, PacketRecycler, Packets, SharedPackets}; use rayon::prelude::*; use request::Request; use request_processor::RequestProcessor; @@ -13,17 +12,17 @@ use std::sync::mpsc::{channel, Receiver}; use std::sync::Arc; use std::thread::{Builder, JoinHandle}; use std::time::Instant; -use streamer; +use streamer::{self, BlobReceiver, BlobSender}; use timing; pub struct RequestStage { pub thread_hdl: JoinHandle<()>, - pub blob_receiver: streamer::BlobReceiver, + pub blob_receiver: BlobReceiver, pub request_processor: Arc, } impl RequestStage { - pub fn deserialize_requests(p: &packet::Packets) -> Vec> { + pub fn deserialize_requests(p: &Packets) -> Vec> { p.packets .par_iter() .map(|x| { @@ -37,9 +36,9 @@ impl RequestStage { pub fn process_request_packets( request_processor: &RequestProcessor, packet_receiver: &Receiver, - blob_sender: &streamer::BlobSender, - packet_recycler: &packet::PacketRecycler, - blob_recycler: &packet::BlobRecycler, + blob_sender: &BlobSender, + packet_recycler: &PacketRecycler, + blob_recycler: &BlobRecycler, ) -> Result<()> { let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; @@ -60,7 +59,7 @@ impl RequestStage { let rsps = request_processor.process_requests(reqs); - let blobs = packet::to_blobs(rsps, blob_recycler)?; + let blobs = to_blobs(rsps, blob_recycler)?; if !blobs.is_empty() { info!("process: sending blobs: {}", blobs.len()); //don't wake up the other side if there is nothing @@ -84,8 +83,8 @@ impl RequestStage { request_processor: RequestProcessor, exit: Arc, packet_receiver: Receiver, - packet_recycler: packet::PacketRecycler, - blob_recycler: packet::BlobRecycler, + packet_recycler: PacketRecycler, + blob_recycler: BlobRecycler, ) -> Self { let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); diff --git a/src/rpu.rs b/src/rpu.rs index a42544de6..507b91fea 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -24,7 +24,7 @@ //! ``` use bank::Bank; -use packet; +use packet::{BlobRecycler, PacketRecycler}; use request_processor::RequestProcessor; use request_stage::RequestStage; use std::net::UdpSocket; @@ -45,7 +45,7 @@ impl Rpu { respond_socket: UdpSocket, exit: Arc, ) -> Self { - let packet_recycler = packet::PacketRecycler::default(); + let packet_recycler = PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( requests_socket, @@ -54,7 +54,7 @@ impl Rpu { packet_sender, ); - let blob_recycler = packet::BlobRecycler::default(); + let blob_recycler = BlobRecycler::default(); let request_processor = RequestProcessor::new(bank.clone()); let request_stage = RequestStage::new( request_processor, diff --git a/src/server.rs b/src/server.rs index 65752ef1b..ff9cad087 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,7 +3,7 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData}; use ncp::Ncp; -use packet; +use packet::BlobRecycler; use rpu::Rpu; use std::io::Write; use std::net::UdpSocket; @@ -61,7 +61,7 @@ impl Server { let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); thread_hdls.extend(rpu.thread_hdls); - let blob_recycler = packet::BlobRecycler::default(); + let blob_recycler = BlobRecycler::default(); let tpu = Tpu::new( bank.clone(), tick_duration, diff --git a/src/sigverify_stage.rs b/src/sigverify_stage.rs index 7876e0c5b..563b9dd14 100644 --- a/src/sigverify_stage.rs +++ b/src/sigverify_stage.rs @@ -14,7 +14,7 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread::{spawn, JoinHandle}; use std::time::Instant; -use streamer; +use streamer::{self, PacketReceiver}; use timing; pub struct SigVerifyStage { @@ -38,7 +38,7 @@ impl SigVerifyStage { } fn verifier( - recvr: &Arc>, + recvr: &Arc>, sendr: &Arc)>>>>, ) -> Result<()> { let (batch, len) = @@ -76,7 +76,7 @@ impl SigVerifyStage { fn verifier_service( exit: Arc, - packet_receiver: Arc>, + packet_receiver: Arc>, verified_sender: Arc)>>>>, ) -> JoinHandle<()> { spawn(move || loop { @@ -89,7 +89,7 @@ impl SigVerifyStage { fn verifier_services( exit: Arc, - packet_receiver: streamer::PacketReceiver, + packet_receiver: PacketReceiver, verified_sender: Sender)>>, ) -> Vec> { let sender = Arc::new(Mutex::new(verified_sender)); diff --git a/src/streamer.rs b/src/streamer.rs index e3ac1e542..039647bff 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -3,22 +3,24 @@ use crdt::Crdt; #[cfg(feature = "erasure")] use erasure; -use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE}; +use packet::{ + Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE, +}; use result::{Error, Result}; use std::collections::VecDeque; use std::mem; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc; +use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; use std::time::Duration; pub const WINDOW_SIZE: usize = 2 * 1024; -pub type PacketReceiver = mpsc::Receiver; -pub type PacketSender = mpsc::Sender; -pub type BlobSender = mpsc::Sender>; -pub type BlobReceiver = mpsc::Receiver>; +pub type PacketReceiver = Receiver; +pub type PacketSender = Sender; +pub type BlobSender = Sender; +pub type BlobReceiver = Receiver; pub type Window = Arc>>>; fn recv_loop( diff --git a/src/tvu.rs b/src/tvu.rs index cd9822d04..6671169bb 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -37,13 +37,13 @@ use bank::Bank; use blob_fetch_stage::BlobFetchStage; use crdt::Crdt; -use packet; +use packet::BlobRecycler; use replicate_stage::ReplicateStage; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; -use streamer; +use streamer::Window; use window_stage::WindowStage; pub struct Tvu { @@ -64,13 +64,13 @@ impl Tvu { pub fn new( bank: Arc, crdt: Arc>, - window: streamer::Window, + window: Window, replicate_socket: UdpSocket, repair_socket: UdpSocket, retransmit_socket: UdpSocket, exit: Arc, ) -> Self { - let blob_recycler = packet::BlobRecycler::default(); + let blob_recycler = BlobRecycler::default(); let fetch_stage = BlobFetchStage::new_multi_socket( vec![replicate_socket, repair_socket], exit.clone(), @@ -120,7 +120,7 @@ pub mod tests { use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; - use streamer; + use streamer::{self, Window}; use transaction::Transaction; use tvu::Tvu; @@ -128,7 +128,7 @@ pub mod tests { crdt: Arc>, listen: UdpSocket, exit: Arc, - ) -> Result<(Ncp, streamer::Window)> { + ) -> Result<(Ncp, Window)> { let window = streamer::default_window(); let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); let ncp = Ncp::new(crdt, window.clone(), listen, send_sock, exit)?; diff --git a/src/window_stage.rs b/src/window_stage.rs index 4cc7eceb6..3142279ee 100644 --- a/src/window_stage.rs +++ b/src/window_stage.rs @@ -1,27 +1,27 @@ //! The `window_stage` maintains the blob window use crdt::Crdt; -use packet; +use packet::BlobRecycler; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; -use streamer; +use streamer::{self, BlobReceiver, Window}; pub struct WindowStage { - pub blob_receiver: streamer::BlobReceiver, + pub blob_receiver: BlobReceiver, pub thread_hdls: Vec>, } impl WindowStage { pub fn new( crdt: Arc>, - window: streamer::Window, + window: Window, retransmit_socket: UdpSocket, exit: Arc, - blob_recycler: packet::BlobRecycler, - fetch_stage_receiver: streamer::BlobReceiver, + blob_recycler: BlobRecycler, + fetch_stage_receiver: BlobReceiver, entry_count: usize, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); diff --git a/src/write_stage.rs b/src/write_stage.rs index 490cd8a29..125f5b606 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -5,17 +5,17 @@ use bank::Bank; use entry::Entry; use entry_writer::EntryWriter; -use packet; +use packet::BlobRecycler; use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex}; use std::thread::{Builder, JoinHandle}; -use streamer; +use streamer::BlobReceiver; pub struct WriteStage { pub thread_hdl: JoinHandle<()>, - pub blob_receiver: streamer::BlobReceiver, + pub blob_receiver: BlobReceiver, } impl WriteStage { @@ -23,7 +23,7 @@ impl WriteStage { pub fn new( bank: Arc, exit: Arc, - blob_recycler: packet::BlobRecycler, + blob_recycler: BlobRecycler, writer: Mutex, entry_receiver: Receiver, ) -> Self {