From 785e97169879b585d8b10b2e6b7db626f2d61862 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 8 May 2018 17:32:50 -0600 Subject: [PATCH 1/2] AccountantSkel -> Tpu The terms Stub and Skel come from OMG IDL and only made sense while the Stub was acting as an RPC client for the the Accountant object. Nowadays, the Stub interface looks nothing like the Accountant and meanwhile we've recognized the multithreaded implementation is more reminiscent of a pipelined CPU. Thus, we finally bite the bullet and rename our modules. AccountantSkel -> Tpu AccountantStub -> ThinClient Up next will be moving much of the TPU code into separate modules, each representing a stage of the pipeline. The interface of each will follow the precedent set by the Historian object. --- src/bin/client-demo.rs | 6 +- src/bin/testnode.rs | 8 +- src/ecdsa.rs | 2 +- src/lib.rs | 6 +- src/{accountant_stub.rs => thin_client.rs} | 53 +++++----- src/{accountant_skel.rs => tpu.rs} | 107 ++++++++++----------- 6 files changed, 90 insertions(+), 92 deletions(-) rename src/{accountant_stub.rs => thin_client.rs} (88%) rename src/{accountant_skel.rs => tpu.rs} (93%) diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 826e2fac6..82cd42b2c 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -10,7 +10,7 @@ use futures::Future; use getopts::Options; use isatty::stdin_isatty; use rayon::prelude::*; -use solana::accountant_stub::AccountantStub; +use solana::thin_client::ThinClient; use solana::mint::MintDemo; use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Transaction; @@ -87,7 +87,7 @@ fn main() { println!("Binding to {}", client_addr); let socket = UdpSocket::bind(&client_addr).unwrap(); socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let mut acc = AccountantStub::new(addr.parse().unwrap(), socket); + let mut acc = ThinClient::new(addr.parse().unwrap(), socket); println!("Get last ID..."); let last_id = acc.get_last_id().wait().unwrap(); @@ -129,7 +129,7 @@ fn main() { let mut client_addr: SocketAddr = client_addr.parse().unwrap(); client_addr.set_port(0); let socket = UdpSocket::bind(client_addr).unwrap(); - let acc = AccountantStub::new(addr.parse().unwrap(), socket); + let acc = ThinClient::new(addr.parse().unwrap(), socket); for tr in trs { acc.transfer_signed(tr.clone()).unwrap(); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 5aede0c3f..736acb017 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -7,7 +7,7 @@ extern crate solana; use getopts::Options; use isatty::stdin_isatty; use solana::accountant::Accountant; -use solana::accountant_skel::AccountantSkel; +use solana::tpu::Tpu; use solana::crdt::ReplicatedData; use solana::entry::Entry; use solana::event::Event; @@ -119,7 +119,7 @@ fn main() { let (input, event_receiver) = sync_channel(10_000); let historian = Historian::new(event_receiver, &last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); - let skel = Arc::new(AccountantSkel::new(acc, input, historian)); + let tpu = Arc::new(Tpu::new(acc, input, historian)); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); @@ -132,8 +132,8 @@ fn main() { serve_sock.local_addr().unwrap(), ); eprintln!("starting server..."); - let threads = AccountantSkel::serve( - &skel, + let threads = Tpu::serve( + &tpu, d, serve_sock, skinny_sock, diff --git a/src/ecdsa.rs b/src/ecdsa.rs index c0a06646d..c029f1e1f 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -130,7 +130,7 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { #[cfg(test)] mod tests { - use accountant_skel::Request; + use tpu::Request; use bincode::serialize; use ecdsa; use packet::{Packet, Packets, SharedPackets}; diff --git a/src/lib.rs b/src/lib.rs index b316a79c9..5c7568c54 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,5 @@ #![cfg_attr(feature = "unstable", feature(test))] pub mod accountant; -pub mod accountant_skel; -pub mod accountant_stub; pub mod crdt; pub mod ecdsa; pub mod entry; @@ -19,8 +17,10 @@ pub mod recorder; pub mod result; pub mod signature; pub mod streamer; -pub mod transaction; +pub mod thin_client; pub mod timing; +pub mod transaction; +pub mod tpu; extern crate bincode; extern crate byteorder; extern crate chrono; diff --git a/src/accountant_stub.rs b/src/thin_client.rs similarity index 88% rename from src/accountant_stub.rs rename to src/thin_client.rs index 274bbb0d3..920081a4f 100644 --- a/src/accountant_stub.rs +++ b/src/thin_client.rs @@ -1,9 +1,9 @@ -//! The `accountant_stub` module is a client-side object that interfaces with a server-side Accountant -//! object via the network interface exposed by AccountantSkel. Client code should use -//! this object instead of writing messages to the network directly. The binary -//! encoding of its messages are unstable and may change in future releases. +//! The `thin_client` module is a client-side object that interfaces with +//! a server-side TPU. Client code should use this object instead of writing +//! messages to the network directly. The binary encoding of its messages are +//! unstable and may change in future releases. -use accountant_skel::{Request, Response, Subscription}; +use tpu::{Request, Response, Subscription}; use bincode::{deserialize, serialize}; use futures::future::{ok, FutureResult}; use hash::Hash; @@ -13,7 +13,7 @@ use std::io; use std::net::{SocketAddr, UdpSocket}; use transaction::Transaction; -pub struct AccountantStub { +pub struct ThinClient { pub addr: SocketAddr, pub socket: UdpSocket, last_id: Option, @@ -21,20 +21,20 @@ pub struct AccountantStub { balances: HashMap>, } -impl AccountantStub { - /// Create a new AccountantStub that will interface with AccountantSkel +impl ThinClient { + /// Create a new ThinClient that will interface with Tpu /// over `socket`. To receive responses, the caller must bind `socket` - /// to a public address before invoking AccountantStub methods. + /// to a public address before invoking ThinClient methods. pub fn new(addr: SocketAddr, socket: UdpSocket) -> Self { - let stub = AccountantStub { + let client = ThinClient { addr: addr, socket, last_id: None, num_events: 0, balances: HashMap::new(), }; - stub.init(); - stub + client.init(); + client } pub fn init(&self) { @@ -119,7 +119,7 @@ impl AccountantStub { } /// Return the number of transactions the server processed since creating - /// this stub instance. + /// this client instance. pub fn transaction_count(&mut self) -> u64 { // Wait for at least one EntryInfo. let mut done = false; @@ -148,7 +148,7 @@ impl AccountantStub { mod tests { use super::*; use accountant::Accountant; - use accountant_skel::AccountantSkel; + use tpu::Tpu; use crdt::{Crdt, ReplicatedData}; use futures::Future; use historian::Historian; @@ -165,7 +165,7 @@ mod tests { // TODO: Figure out why this test sometimes hangs on TravisCI. #[test] - fn test_accountant_stub() { + fn test_thin_client() { logger::setup(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -185,14 +185,13 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc = Arc::new(AccountantSkel::new(acc, input, historian)); - let threads = - AccountantSkel::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap(); + let acc = Arc::new(Tpu::new(acc, input, historian)); + let threads = Tpu::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut acc = AccountantStub::new(addr, socket); + let mut acc = ThinClient::new(addr, socket); let last_id = acc.get_last_id().wait().unwrap(); let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); @@ -230,9 +229,9 @@ mod tests { } #[test] - fn test_multi_accountant_stub() { + fn test_multi_node() { logger::setup(); - info!("test_multi_accountant_stub"); + info!("test_multi_node"); let leader = test_node(); let replicant = test_node(); let alice = Mint::new(10_000); @@ -243,17 +242,17 @@ mod tests { let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Accountant::new(&alice); - Arc::new(AccountantSkel::new(acc, input, historian)) + Arc::new(Tpu::new(acc, input, historian)) }; let replicant_acc = { let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Accountant::new(&alice); - Arc::new(AccountantSkel::new(acc, input, historian)) + Arc::new(Tpu::new(acc, input, historian)) }; - let leader_threads = AccountantSkel::serve( + let leader_threads = Tpu::serve( &leader_acc, leader.0.clone(), leader.2, @@ -262,7 +261,7 @@ mod tests { exit.clone(), sink(), ).unwrap(); - let replicant_threads = AccountantSkel::replicate( + let replicant_threads = Tpu::replicate( &replicant_acc, replicant.0.clone(), replicant.1, @@ -314,7 +313,7 @@ mod tests { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); - let mut acc = AccountantStub::new(leader.0.serve_addr, socket); + let mut acc = ThinClient::new(leader.0.serve_addr, socket); info!("getting leader last_id"); let last_id = acc.get_last_id().wait().unwrap(); info!("executing leader transer"); @@ -330,7 +329,7 @@ mod tests { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); - let mut acc = AccountantStub::new(replicant.0.serve_addr, socket); + let mut acc = ThinClient::new(replicant.0.serve_addr, socket); info!("getting replicant balance"); if let Ok(bal) = acc.get_balance(&bob_pubkey) { replicant_balance = bal; diff --git a/src/accountant_skel.rs b/src/tpu.rs similarity index 93% rename from src/accountant_skel.rs rename to src/tpu.rs index d30975ae8..32142e6ed 100644 --- a/src/accountant_skel.rs +++ b/src/tpu.rs @@ -1,6 +1,5 @@ -//! The `accountant_skel` module is a microservice that exposes the high-level -//! Accountant API to the network. Its message encoding is currently -//! in flux. Clients should use AccountantStub to interact with it. +//! The `tpu` module implements the Transaction Processing Unit, a +//! 5-stage transaction processing pipeline in software. use accountant::Accountant; use bincode::{deserialize, serialize, serialize_into}; @@ -33,7 +32,7 @@ use timing; use std::time::Instant; use rand::{thread_rng, Rng}; -pub struct AccountantSkel { +pub struct Tpu { acc: Mutex, historian_input: Mutex>, historian: Historian, @@ -70,7 +69,7 @@ impl Request { } } -type SharedSkel = Arc; +type SharedTpu = Arc; #[derive(Serialize, Deserialize, Debug)] pub enum Response { @@ -78,10 +77,10 @@ pub enum Response { EntryInfo(EntryInfo), } -impl AccountantSkel { - /// Create a new AccountantSkel that wraps the given Accountant. +impl Tpu { + /// Create a new Tpu that wraps the given Accountant. pub fn new(acc: Accountant, historian_input: SyncSender, historian: Historian) -> Self { - AccountantSkel { + Tpu { acc: Mutex::new(acc), entry_info_subscribers: Mutex::new(vec![]), historian_input: Mutex::new(historian_input), @@ -89,7 +88,7 @@ impl AccountantSkel { } } - fn notify_entry_info_subscribers(obj: &SharedSkel, entry: &Entry) { + fn notify_entry_info_subscribers(obj: &SharedTpu, entry: &Entry) { // TODO: No need to bind(). let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); @@ -112,7 +111,7 @@ impl AccountantSkel { } } - fn update_entry(obj: &SharedSkel, writer: &Arc>, entry: &Entry) { + fn update_entry(obj: &SharedTpu, writer: &Arc>, entry: &Entry) { trace!("update_entry entry"); obj.acc.lock().unwrap().register_entry_id(&entry.id); writeln!( @@ -123,7 +122,7 @@ impl AccountantSkel { Self::notify_entry_info_subscribers(obj, &entry); } - fn receive_all(obj: &SharedSkel, writer: &Arc>) -> Result> { + fn receive_all(obj: &SharedTpu, writer: &Arc>) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; let entry = obj.historian @@ -182,7 +181,7 @@ impl AccountantSkel { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out fn run_sync( - obj: SharedSkel, + obj: SharedTpu, broadcast: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, writer: &Arc>, @@ -198,7 +197,7 @@ impl AccountantSkel { } pub fn sync_service( - obj: SharedSkel, + obj: SharedTpu, exit: Arc, broadcast: streamer::BlobSender, blob_recycler: packet::BlobRecycler, @@ -213,12 +212,12 @@ impl AccountantSkel { }) } - fn process_thin_client_requests(_obj: SharedSkel, _socket: &UdpSocket) -> Result<()> { + fn process_thin_client_requests(_obj: SharedTpu, _socket: &UdpSocket) -> Result<()> { Ok(()) } fn thin_client_service( - obj: SharedSkel, + obj: SharedTpu, exit: Arc, socket: UdpSocket, ) -> JoinHandle<()> { @@ -233,12 +232,12 @@ impl AccountantSkel { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out - fn run_sync_no_broadcast(obj: SharedSkel) -> Result<()> { + fn run_sync_no_broadcast(obj: SharedTpu) -> Result<()> { Self::receive_all(&obj, &Arc::new(Mutex::new(sink())))?; Ok(()) } - pub fn sync_no_broadcast_service(obj: SharedSkel, exit: Arc) -> JoinHandle<()> { + pub fn sync_no_broadcast_service(obj: SharedTpu, exit: Arc) -> JoinHandle<()> { spawn(move || loop { let _ = Self::run_sync_no_broadcast(obj.clone()); if exit.load(Ordering::Relaxed) { @@ -420,7 +419,7 @@ impl AccountantSkel { } fn process( - obj: &SharedSkel, + obj: &SharedTpu, verified_receiver: &Receiver)>>, responder_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, @@ -485,7 +484,7 @@ impl AccountantSkel { /// Process verified blobs, already in order /// Respond with a signed hash of the state fn replicate_state( - obj: &SharedSkel, + obj: &SharedTpu, verified_receiver: &streamer::BlobReceiver, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { @@ -510,11 +509,11 @@ impl AccountantSkel { Ok(()) } - /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// Create a UDP microservice that forwards messages the given Tpu. /// This service is the network leader /// Set `exit` to shutdown its threads. pub fn serve( - obj: &SharedSkel, + obj: &SharedTpu, me: ReplicatedData, serve: UdpSocket, skinny: UdpSocket, @@ -582,10 +581,10 @@ impl AccountantSkel { let t_skinny = Self::thin_client_service(obj.clone(), exit.clone(), skinny); - let skel = obj.clone(); + let tpu = obj.clone(); let t_server = spawn(move || loop { let e = Self::process( - &mut skel.clone(), + &mut tpu.clone(), &verified_receiver, &responder_sender, &packet_recycler, @@ -631,7 +630,7 @@ impl AccountantSkel { /// 4. process the transaction state machine /// 5. respond with the hash of the state back to the leader pub fn replicate( - obj: &SharedSkel, + obj: &SharedTpu, me: ReplicatedData, gossip: UdpSocket, serve: UdpSocket, @@ -682,10 +681,10 @@ impl AccountantSkel { retransmit_sender, ); - let skel = obj.clone(); + let tpu = obj.clone(); let s_exit = exit.clone(); let t_replicator = spawn(move || loop { - let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler); + let e = Self::replicate_state(&tpu, &window_receiver, &blob_recycler); if e.is_err() && s_exit.load(Ordering::Relaxed) { break; } @@ -728,11 +727,11 @@ impl AccountantSkel { } let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone()); - let skel = obj.clone(); + let tpu = obj.clone(); let s_exit = exit.clone(); let t_server = spawn(move || loop { let e = Self::process( - &mut skel.clone(), + &mut tpu.clone(), &verified_receiver, &responder_sender, &packet_recycler, @@ -786,15 +785,15 @@ pub fn to_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec = skel.historian.output.lock().unwrap().iter().collect(); + drop(tpu.historian_input); + let entries: Vec = tpu.historian.output.lock().unwrap().iter().collect(); // Assert the user holds one token, not two. If the server only output one // entry, then the second transaction will be rejected, because it drives @@ -901,10 +900,10 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc_skel = Arc::new(AccountantSkel::new(acc, input, historian)); + let tpu = Arc::new(Tpu::new(acc, input, historian)); let serve_addr = leader_serve.local_addr().unwrap(); - let threads = AccountantSkel::serve( - &acc_skel, + let threads = Tpu::serve( + &tpu, leader_data, leader_serve, leader_skinny, @@ -916,23 +915,23 @@ mod tests { let socket = UdpSocket::bind("127.0.0.1:0").unwrap(); socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let mut acc_stub = AccountantStub::new(serve_addr, socket); - let last_id = acc_stub.get_last_id().wait().unwrap(); + let mut client = ThinClient::new(serve_addr, socket); + let last_id = client.get_last_id().wait().unwrap(); trace!("doing stuff"); let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); - let _sig = acc_stub.transfer_signed(tr).unwrap(); + let _sig = client.transfer_signed(tr).unwrap(); - let last_id = acc_stub.get_last_id().wait().unwrap(); + let last_id = client.get_last_id().wait().unwrap(); let mut tr2 = Transaction::new(&alice.keypair(), bob_pubkey, 501, last_id); tr2.data.tokens = 502; tr2.data.plan = Plan::new_payment(502, bob_pubkey); - let _sig = acc_stub.transfer_signed(tr2).unwrap(); + let _sig = client.transfer_signed(tr2).unwrap(); - assert_eq!(acc_stub.get_balance(&bob_pubkey).unwrap(), 500); + assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 500); trace!("exiting"); exit.store(true, Ordering::Relaxed); trace!("joining threads"); @@ -1009,9 +1008,9 @@ mod tests { let acc = Accountant::new(&alice); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc = Arc::new(AccountantSkel::new(acc, input, historian)); + let acc = Arc::new(Tpu::new(acc, input, historian)); let replicate_addr = target1_data.replicate_addr; - let threads = AccountantSkel::replicate( + let threads = Tpu::replicate( &acc, target1_data, target1_gossip, @@ -1111,7 +1110,7 @@ mod tests { let entry_list = vec![e0; 1000]; let blob_recycler = BlobRecycler::default(); let mut blob_q = VecDeque::new(); - AccountantSkel::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); + Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); let serialized_entry_list = serialize(&entry_list).unwrap(); let mut num_blobs_ref = serialized_entry_list.len() / BLOB_SIZE; if serialized_entry_list.len() % BLOB_SIZE != 0 { @@ -1127,7 +1126,7 @@ mod bench { extern crate test; use self::test::Bencher; use accountant::{Accountant, MAX_ENTRY_IDS}; - use accountant_skel::*; + use tpu::*; use bincode::serialize; use hash::hash; use mint::Mint; @@ -1180,17 +1179,17 @@ mod bench { let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let skel = AccountantSkel::new(acc, input, historian); + let tpu = Tpu::new(acc, input, historian); let now = Instant::now(); - assert!(skel.process_events(req_vers).is_ok()); + assert!(tpu.process_events(req_vers).is_ok()); let duration = now.elapsed(); let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; let tps = txs as f64 / sec; // Ensure that all transactions were successfully logged. - drop(skel.historian_input); - let entries: Vec = skel.historian.output.lock().unwrap().iter().collect(); + drop(tpu.historian_input); + let entries: Vec = tpu.historian.output.lock().unwrap().iter().collect(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].events.len(), txs as usize); From 1dca17fdb45903a62a3704c4bcdaacc59c3bf7db Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 8 May 2018 18:59:01 -0600 Subject: [PATCH 2/2] cargo +nightly fmt --- src/bin/client-demo.rs | 2 +- src/bin/testnode.rs | 2 +- src/ecdsa.rs | 2 +- src/lib.rs | 2 +- src/thin_client.rs | 4 ++-- src/timing.rs | 2 +- src/tpu.rs | 16 ++++++++-------- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 82cd42b2c..013e02930 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -10,9 +10,9 @@ use futures::Future; use getopts::Options; use isatty::stdin_isatty; use rayon::prelude::*; -use solana::thin_client::ThinClient; use solana::mint::MintDemo; use solana::signature::{KeyPair, KeyPairUtil}; +use solana::thin_client::ThinClient; use solana::transaction::Transaction; use std::env; use std::io::{stdin, Read}; diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 736acb017..cc4ad246a 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -7,12 +7,12 @@ extern crate solana; use getopts::Options; use isatty::stdin_isatty; use solana::accountant::Accountant; -use solana::tpu::Tpu; use solana::crdt::ReplicatedData; use solana::entry::Entry; use solana::event::Event; use solana::historian::Historian; use solana::signature::{KeyPair, KeyPairUtil}; +use solana::tpu::Tpu; use std::env; use std::io::{stdin, stdout, Read}; use std::net::UdpSocket; diff --git a/src/ecdsa.rs b/src/ecdsa.rs index c029f1e1f..4d7abbdbb 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -130,11 +130,11 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { #[cfg(test)] mod tests { - use tpu::Request; use bincode::serialize; use ecdsa; use packet::{Packet, Packets, SharedPackets}; use std::sync::RwLock; + use tpu::Request; use transaction::test_tx; use transaction::Transaction; diff --git a/src/lib.rs b/src/lib.rs index 5c7568c54..ab3fc2ff5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,8 +19,8 @@ pub mod signature; pub mod streamer; pub mod thin_client; pub mod timing; -pub mod transaction; pub mod tpu; +pub mod transaction; extern crate bincode; extern crate byteorder; extern crate chrono; diff --git a/src/thin_client.rs b/src/thin_client.rs index 920081a4f..3ae436ef8 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -3,7 +3,6 @@ //! messages to the network directly. The binary encoding of its messages are //! unstable and may change in future releases. -use tpu::{Request, Response, Subscription}; use bincode::{deserialize, serialize}; use futures::future::{ok, FutureResult}; use hash::Hash; @@ -11,6 +10,7 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; use std::io; use std::net::{SocketAddr, UdpSocket}; +use tpu::{Request, Response, Subscription}; use transaction::Transaction; pub struct ThinClient { @@ -148,7 +148,6 @@ impl ThinClient { mod tests { use super::*; use accountant::Accountant; - use tpu::Tpu; use crdt::{Crdt, ReplicatedData}; use futures::Future; use historian::Historian; @@ -162,6 +161,7 @@ mod tests { use std::thread::sleep; use std::time::Duration; use std::time::Instant; + use tpu::Tpu; // TODO: Figure out why this test sometimes hangs on TravisCI. #[test] diff --git a/src/timing.rs b/src/timing.rs index 5c36fad80..0d3c38383 100644 --- a/src/timing.rs +++ b/src/timing.rs @@ -1,5 +1,5 @@ -use std::time::{SystemTime, UNIX_EPOCH}; use std::time::Duration; +use std::time::{SystemTime, UNIX_EPOCH}; pub fn duration_as_ms(d: &Duration) -> u64 { return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000); diff --git a/src/tpu.rs b/src/tpu.rs index 32142e6ed..6da34e133 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -11,6 +11,7 @@ use hash::Hash; use historian::Historian; use packet; use packet::{SharedBlob, SharedPackets, BLOB_SIZE}; +use rand::{thread_rng, Rng}; use rayon::prelude::*; use recorder::Signal; use result::Result; @@ -26,11 +27,10 @@ use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use streamer; -use transaction::Transaction; -use timing; use std::time::Instant; -use rand::{thread_rng, Rng}; +use streamer; +use timing; +use transaction::Transaction; pub struct Tpu { acc: Mutex, @@ -785,15 +785,13 @@ pub fn to_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec