diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index a4de9e45c..0eb38e195 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -54,7 +54,7 @@ fn main() { let serve_addr = format!("0.0.0.0:{}", port); let gossip_addr = format!("0.0.0.0:{}", port + 1); let replicate_addr = format!("0.0.0.0:{}", port + 2); - let skinny_addr = format!("0.0.0.0:{}", port + 3); + let events_addr = format!("0.0.0.0:{}", port + 3); if stdin_isatty() { eprintln!("nothing found on stdin, expected a log file"); @@ -121,7 +121,7 @@ fn main() { let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); - let skinny_sock = UdpSocket::bind(&skinny_addr).unwrap(); + let events_sock = UdpSocket::bind(&events_addr).unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, @@ -134,7 +134,7 @@ fn main() { &tpu, d, serve_sock, - skinny_sock, + events_sock, gossip_sock, exit.clone(), stdout(), diff --git a/src/streamer.rs b/src/streamer.rs index 1a607a12f..13a8d04a2 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -64,6 +64,25 @@ fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Res Ok(()) } +pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize)> { + let timer = Duration::new(1, 0); + let msgs = recvr.recv_timeout(timer)?; + debug!("got msgs"); + let mut len = msgs.read().unwrap().packets.len(); + let mut batch = vec![msgs]; + while let Ok(more) = recvr.try_recv() { + trace!("got more msgs"); + len += more.read().unwrap().packets.len(); + batch.push(more); + + if len > 100_000 { + break; + } + } + debug!("batch len {}", batch.len()); + Ok((batch, len)) +} + pub fn responder( sock: UdpSocket, exit: Arc, diff --git a/src/thin_client.rs b/src/thin_client.rs index 4db2056c1..274ced7e8 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -168,7 +168,7 @@ mod tests { logger::setup(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - let skinny = UdpSocket::bind("0.0.0.0:0").unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let addr = serve.local_addr().unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( @@ -184,8 +184,15 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); let accountant = Arc::new(Tpu::new(accounting_stage)); - let threads = - Tpu::serve(&accountant, d, serve, skinny, gossip, exit.clone(), sink()).unwrap(); + let threads = Tpu::serve( + &accountant, + d, + serve, + events_socket, + gossip, + exit.clone(), + sink(), + ).unwrap(); sleep(Duration::from_millis(300)); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -215,7 +222,7 @@ mod tests { #[test] fn test_bad_sig() { - let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = tpu::test_node(); + let (leader_data, leader_gossip, _, leader_serve, leader_events) = tpu::test_node(); let alice = Mint::new(10_000); let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); @@ -227,7 +234,7 @@ mod tests { &tpu, leader_data, leader_serve, - leader_skinny, + leader_events, leader_gossip, exit.clone(), sink(), @@ -264,7 +271,7 @@ mod tests { fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - let skinny = UdpSocket::bind("0.0.0.0:0").unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let leader = ReplicatedData::new( @@ -273,7 +280,7 @@ mod tests { replicate.local_addr().unwrap(), serve.local_addr().unwrap(), ); - (leader, gossip, serve, replicate, skinny) + (leader, gossip, serve, replicate, events_socket) } #[test] diff --git a/src/thin_client_service.rs b/src/thin_client_service.rs index df46b15b1..f86ff0142 100644 --- a/src/thin_client_service.rs +++ b/src/thin_client_service.rs @@ -2,14 +2,65 @@ //! on behalf of thing clients. use accountant::Accountant; -use bincode::serialize; +use accounting_stage::AccountingStage; +use bincode::{deserialize, serialize}; use entry::Entry; +use event::Event; use hash::Hash; +use packet; +use packet::SharedPackets; +use rayon::prelude::*; +use result::Result; use signature::PublicKey; +use std::collections::VecDeque; use std::net::{SocketAddr, UdpSocket}; -//use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; use transaction::Transaction; +//use std::io::{Cursor, Write}; +//use std::sync::atomic::{AtomicBool, Ordering}; +//use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::Receiver; +use std::sync::{Arc, Mutex}; +//use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use std::time::Instant; +use streamer; +use timing; + +#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Transaction(Transaction), + GetBalance { key: PublicKey }, + Subscribe { subscriptions: Vec }, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Subscription { + EntryInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EntryInfo { + pub id: Hash, + pub num_hashes: u64, + pub num_events: u64, +} + +impl Request { + /// Verify the request is valid. + pub fn verify(&self) -> bool { + match *self { + Request::Transaction(ref tr) => tr.verify_plan(), + _ => true, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum Response { + Balance { key: PublicKey, val: Option }, + EntryInfo(EntryInfo), +} pub struct ThinClientService { //pub output: Mutex>, @@ -88,40 +139,196 @@ impl ThinClientService { } } } -} -#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Request { - Transaction(Transaction), - GetBalance { key: PublicKey }, - Subscribe { subscriptions: Vec }, -} + fn deserialize_requests(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Subscription { - EntryInfo, -} + // Copy-paste of deserialize_requests() because I can't figure out how to + // route the lifetimes in a generic version. + pub fn deserialize_events(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct EntryInfo { - pub id: Hash, - pub num_hashes: u64, - pub num_events: u64, -} - -impl Request { - /// Verify the request is valid. - pub fn verify(&self) -> bool { - match *self { - Request::Transaction(ref tr) => tr.verify_plan(), - _ => true, + /// Split Request list into verified transactions and the rest + fn partition_requests( + req_vers: Vec<(Request, SocketAddr, u8)>, + ) -> (Vec, Vec<(Request, SocketAddr)>) { + let mut events = vec![]; + let mut reqs = vec![]; + for (msg, rsp_addr, verify) in req_vers { + match msg { + Request::Transaction(tr) => { + if verify != 0 { + events.push(Event::Transaction(tr)); + } + } + _ => reqs.push((msg, rsp_addr)), + } } + (events, reqs) + } + + fn serialize_response( + resp: Response, + rsp_addr: SocketAddr, + blob_recycler: &packet::BlobRecycler, + ) -> Result { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + Ok(blob) + } + + fn serialize_responses( + rsps: Vec<(Response, SocketAddr)>, + blob_recycler: &packet::BlobRecycler, + ) -> Result> { + let mut blobs = VecDeque::new(); + for (resp, rsp_addr) in rsps { + blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); + } + Ok(blobs) + } + + pub fn process_request_packets( + &self, + accounting_stage: &AccountingStage, + verified_receiver: &Receiver)>>, + responder_sender: &streamer::BlobSender, + packet_recycler: &packet::PacketRecycler, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let recv_start = Instant::now(); + let mms = verified_receiver.recv_timeout(timer)?; + let mut reqs_len = 0; + let mms_len = mms.len(); + info!( + "@{:?} process start stalled for: {:?}ms batches: {}", + timing::timestamp(), + timing::duration_as_ms(&recv_start.elapsed()), + mms.len(), + ); + let proc_start = Instant::now(); + for (msgs, vers) in mms { + let reqs = Self::deserialize_requests(&msgs.read().unwrap()); + reqs_len += reqs.len(); + let req_vers = reqs.into_iter() + .zip(vers) + .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) + .filter(|x| { + let v = x.0.verify(); + v + }) + .collect(); + + debug!("partitioning"); + let (events, reqs) = Self::partition_requests(req_vers); + debug!("events: {} reqs: {}", events.len(), reqs.len()); + + debug!("process_events"); + accounting_stage.process_events(events)?; + debug!("done process_events"); + + debug!("process_requests"); + let rsps = self.process_requests(reqs); + debug!("done process_requests"); + + let blobs = Self::serialize_responses(rsps, blob_recycler)?; + if !blobs.is_empty() { + info!("process: sending blobs: {}", blobs.len()); + //don't wake up the other side if there is nothing + responder_sender.send(blobs)?; + } + packet_recycler.recycle(msgs); + } + let total_time_s = timing::duration_as_s(&proc_start.elapsed()); + let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + info!( + "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", + timing::timestamp(), + mms_len, + total_time_ms, + reqs_len, + (reqs_len as f32) / (total_time_s) + ); + Ok(()) } } -#[derive(Serialize, Deserialize, Debug)] -pub enum Response { - Balance { key: PublicKey, val: Option }, - EntryInfo(EntryInfo), +#[cfg(test)] +pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { + let mut out = vec![]; + for rrs in reqs.chunks(packet::NUM_PACKETS) { + let p = r.allocate(); + p.write() + .unwrap() + .packets + .resize(rrs.len(), Default::default()); + for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) { + let v = serialize(&i).expect("serialize request"); + let len = v.len(); + o.data[..len].copy_from_slice(&v); + o.meta.size = len; + } + out.push(p); + } + return out; +} + +#[cfg(test)] +mod tests { + use bincode::serialize; + use ecdsa; + use packet::{PacketRecycler, NUM_PACKETS}; + use thin_client_service::{to_request_packets, Request}; + use transaction::{memfind, test_tx}; + + #[test] + fn test_layout() { + let tr = test_tx(); + let tx = serialize(&tr).unwrap(); + let packet = serialize(&Request::Transaction(tr)).unwrap(); + assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); + assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); + } + + #[test] + fn test_to_packets() { + let tr = Request::Transaction(test_tx()); + let re = PacketRecycler::default(); + let rv = to_request_packets(&re, vec![tr.clone(); 1]); + assert_eq!(rv.len(), 1); + assert_eq!(rv[0].read().unwrap().packets.len(), 1); + + let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]); + assert_eq!(rv.len(), 1); + assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); + + let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]); + assert_eq!(rv.len(), 2); + assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); + assert_eq!(rv[1].read().unwrap().packets.len(), 1); + } } diff --git a/src/tpu.rs b/src/tpu.rs index cc01f957e..1cc373fc9 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -1,32 +1,28 @@ //! The `tpu` module implements the Transaction Processing Unit, a //! 5-stage transaction processing pipeline in software. -use accountant::Accountant; use accounting_stage::AccountingStage; -use bincode::{deserialize, serialize}; use crdt::{Crdt, ReplicatedData}; use ecdsa; use entry::Entry; -use event::Event; use ledger; use packet; use packet::SharedPackets; use rand::{thread_rng, Rng}; -use rayon::prelude::*; use result::Result; use serde_json; use std::collections::VecDeque; use std::io::Write; use std::io::sink; -use std::net::{SocketAddr, UdpSocket}; +use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use std::time::Instant; use streamer; -use thin_client_service::{Request, Response, ThinClientService}; +use thin_client_service::ThinClientService; use timing; pub struct Tpu { @@ -46,30 +42,32 @@ impl Tpu { } } - fn update_entry(obj: &Tpu, writer: &Mutex, entry: &Entry) { - trace!("update_entry entry"); - obj.accounting_stage.accountant.register_entry_id(&entry.id); + fn write_entry(&self, writer: &Mutex, entry: &Entry) { + trace!("write_entry entry"); + self.accounting_stage + .accountant + .register_entry_id(&entry.id); writeln!( writer.lock().unwrap(), "{}", serde_json::to_string(&entry).unwrap() ).unwrap(); - obj.thin_client_service + self.thin_client_service .notify_entry_info_subscribers(&entry); } - fn receive_all(obj: &Tpu, writer: &Mutex) -> Result> { + fn write_entries(&self, writer: &Mutex) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; - let entry = obj.accounting_stage + let entry = self.accounting_stage .output .lock() .unwrap() .recv_timeout(Duration::new(1, 0))?; - Self::update_entry(obj, writer, &entry); + self.write_entry(writer, &entry); l.push(entry); - while let Ok(entry) = obj.accounting_stage.output.lock().unwrap().try_recv() { - Self::update_entry(obj, writer, &entry); + while let Ok(entry) = self.accounting_stage.output.lock().unwrap().try_recv() { + self.write_entry(writer, &entry); l.push(entry); } Ok(l) @@ -78,13 +76,13 @@ impl Tpu { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out fn run_sync( - obj: SharedTpu, + &self, broadcast: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, writer: &Mutex, ) -> Result<()> { let mut q = VecDeque::new(); - let list = Self::receive_all(&obj, writer)?; + let list = self.write_entries(writer)?; trace!("New blobs? {}", list.len()); ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q); if !q.is_empty() { @@ -101,25 +99,7 @@ impl Tpu { writer: Mutex, ) -> JoinHandle<()> { spawn(move || loop { - let _ = Self::run_sync(obj.clone(), &broadcast, &blob_recycler, &writer); - if exit.load(Ordering::Relaxed) { - info!("sync_service exiting"); - break; - } - }) - } - - fn process_thin_client_requests(_acc: &Arc, _socket: &UdpSocket) -> Result<()> { - Ok(()) - } - - fn thin_client_service( - accountant: Arc, - exit: Arc, - socket: UdpSocket, - ) -> JoinHandle<()> { - spawn(move || loop { - let _ = Self::process_thin_client_requests(&accountant, &socket); + let _ = obj.run_sync(&broadcast, &blob_recycler, &writer); if exit.load(Ordering::Relaxed) { info!("sync_service exiting"); break; @@ -129,14 +109,14 @@ impl Tpu { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out - fn run_sync_no_broadcast(obj: SharedTpu) -> Result<()> { - Self::receive_all(&obj, &Arc::new(Mutex::new(sink())))?; + fn run_sync_no_broadcast(&self) -> Result<()> { + self.write_entries(&Arc::new(Mutex::new(sink())))?; Ok(()) } pub fn sync_no_broadcast_service(obj: SharedTpu, exit: Arc) -> JoinHandle<()> { spawn(move || loop { - let _ = Self::run_sync_no_broadcast(obj.clone()); + let _ = obj.run_sync_no_broadcast(); if exit.load(Ordering::Relaxed) { info!("sync_no_broadcast_service exiting"); break; @@ -144,25 +124,6 @@ impl Tpu { }) } - fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec, usize)> { - let timer = Duration::new(1, 0); - let msgs = recvr.recv_timeout(timer)?; - debug!("got msgs"); - let mut len = msgs.read().unwrap().packets.len(); - let mut batch = vec![msgs]; - while let Ok(more) = recvr.try_recv() { - trace!("got more msgs"); - len += more.read().unwrap().packets.len(); - batch.push(more); - - if len > 100_000 { - break; - } - } - debug!("batch len {}", batch.len()); - Ok((batch, len)) - } - fn verify_batch( batch: Vec, sendr: &Arc)>>>>, @@ -178,7 +139,7 @@ impl Tpu { recvr: &Arc>, sendr: &Arc)>>>>, ) -> Result<()> { - let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?; + let (batch, len) = streamer::recv_batch(&recvr.lock().unwrap())?; let now = Instant::now(); let batch_len = batch.len(); let rand_id = thread_rng().gen_range(0, 100); @@ -205,128 +166,6 @@ impl Tpu { Ok(()) } - pub fn deserialize_packets(p: &packet::Packets) -> Vec> { - p.packets - .par_iter() - .map(|x| { - deserialize(&x.data[0..x.meta.size]) - .map(|req| (req, x.meta.addr())) - .ok() - }) - .collect() - } - - /// Split Request list into verified transactions and the rest - fn partition_requests( - req_vers: Vec<(Request, SocketAddr, u8)>, - ) -> (Vec, Vec<(Request, SocketAddr)>) { - let mut events = vec![]; - let mut reqs = vec![]; - for (msg, rsp_addr, verify) in req_vers { - match msg { - Request::Transaction(tr) => { - if verify != 0 { - events.push(Event::Transaction(tr)); - } - } - _ => reqs.push((msg, rsp_addr)), - } - } - (events, reqs) - } - - fn serialize_response( - resp: Response, - rsp_addr: SocketAddr, - blob_recycler: &packet::BlobRecycler, - ) -> Result { - let blob = blob_recycler.allocate(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } - Ok(blob) - } - - fn serialize_responses( - rsps: Vec<(Response, SocketAddr)>, - blob_recycler: &packet::BlobRecycler, - ) -> Result> { - let mut blobs = VecDeque::new(); - for (resp, rsp_addr) in rsps { - blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); - } - Ok(blobs) - } - - fn process( - obj: &Tpu, - verified_receiver: &Receiver)>>, - responder_sender: &streamer::BlobSender, - packet_recycler: &packet::PacketRecycler, - blob_recycler: &packet::BlobRecycler, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let recv_start = Instant::now(); - let mms = verified_receiver.recv_timeout(timer)?; - let mut reqs_len = 0; - let mms_len = mms.len(); - info!( - "@{:?} process start stalled for: {:?}ms batches: {}", - timing::timestamp(), - timing::duration_as_ms(&recv_start.elapsed()), - mms.len(), - ); - let proc_start = Instant::now(); - for (msgs, vers) in mms { - let reqs = Self::deserialize_packets(&msgs.read().unwrap()); - reqs_len += reqs.len(); - let req_vers = reqs.into_iter() - .zip(vers) - .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) - .filter(|x| { - let v = x.0.verify(); - v - }) - .collect(); - - debug!("partitioning"); - let (events, reqs) = Self::partition_requests(req_vers); - debug!("events: {} reqs: {}", events.len(), reqs.len()); - - debug!("process_events"); - obj.accounting_stage.process_events(events)?; - debug!("done process_events"); - - debug!("process_requests"); - let rsps = obj.thin_client_service.process_requests(reqs); - debug!("done process_requests"); - - let blobs = Self::serialize_responses(rsps, blob_recycler)?; - if !blobs.is_empty() { - info!("process: sending blobs: {}", blobs.len()); - //don't wake up the other side if there is nothing - responder_sender.send(blobs)?; - } - packet_recycler.recycle(msgs); - } - let total_time_s = timing::duration_as_s(&proc_start.elapsed()); - let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); - info!( - "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", - timing::timestamp(), - mms_len, - total_time_ms, - reqs_len, - (reqs_len as f32) / (total_time_s) - ); - Ok(()) - } - /// Process verified blobs, already in order /// Respond with a signed hash of the state fn replicate_state( @@ -354,7 +193,7 @@ impl Tpu { obj: &SharedTpu, me: ReplicatedData, serve: UdpSocket, - skinny: UdpSocket, + _events_socket: UdpSocket, gossip: UdpSocket, exit: Arc, writer: W, @@ -417,16 +256,10 @@ impl Tpu { Mutex::new(writer), ); - let t_skinny = Self::thin_client_service( - obj.accounting_stage.accountant.clone(), - exit.clone(), - skinny, - ); - let tpu = obj.clone(); let t_server = spawn(move || loop { - let e = Self::process( - &mut tpu.clone(), + let e = tpu.thin_client_service.process_request_packets( + &tpu.accounting_stage, &verified_receiver, &responder_sender, &packet_recycler, @@ -444,7 +277,6 @@ impl Tpu { t_responder, t_server, t_sync, - t_skinny, t_gossip, t_listen, t_broadcast, @@ -572,8 +404,8 @@ impl Tpu { let tpu = obj.clone(); let s_exit = exit.clone(); let t_server = spawn(move || loop { - let e = Self::process( - &mut tpu.clone(), + let e = tpu.thin_client_service.process_request_packets( + &tpu.accounting_stage, &verified_receiver, &responder_sender, &packet_recycler, @@ -605,31 +437,11 @@ impl Tpu { } } -#[cfg(test)] -pub fn to_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { - let mut out = vec![]; - for rrs in reqs.chunks(packet::NUM_PACKETS) { - let p = r.allocate(); - p.write() - .unwrap() - .packets - .resize(rrs.len(), Default::default()); - for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) { - let v = serialize(&i).expect("serialize request"); - let len = v.len(); - o.data[..len].copy_from_slice(&v); - o.meta.size = len; - } - out.push(p); - } - return out; -} - #[cfg(test)] pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { use signature::{KeyPair, KeyPairUtil}; - let skinny = UdpSocket::bind("127.0.0.1:0").unwrap(); + let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let serve = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -640,7 +452,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke replicate.local_addr().unwrap(), serve.local_addr().unwrap(), ); - (d, gossip, replicate, serve, skinny) + (d, gossip, replicate, serve, events_socket) } #[cfg(test)] @@ -650,13 +462,12 @@ mod tests { use bincode::serialize; use chrono::prelude::*; use crdt::Crdt; - use ecdsa; use entry; use event::Event; use hash::{hash, Hash}; use logger; use mint::Mint; - use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; + use packet::BlobRecycler; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; use std::sync::atomic::{AtomicBool, Ordering}; @@ -664,34 +475,8 @@ mod tests { use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer; - use tpu::{test_node, to_packets, Request, Tpu}; - use transaction::{memfind, test_tx, Transaction}; - - #[test] - fn test_layout() { - let tr = test_tx(); - let tx = serialize(&tr).unwrap(); - let packet = serialize(&Request::Transaction(tr)).unwrap(); - assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); - assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); - } - #[test] - fn test_to_packets() { - let tr = Request::Transaction(test_tx()); - let re = PacketRecycler::default(); - let rv = to_packets(&re, vec![tr.clone(); 1]); - assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().packets.len(), 1); - - let rv = to_packets(&re, vec![tr.clone(); NUM_PACKETS]); - assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - - let rv = to_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]); - assert_eq!(rv.len(), 2); - assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - assert_eq!(rv[1].read().unwrap().packets.len(), 1); - } + use tpu::{test_node, Tpu}; + use transaction::Transaction; /// Test that mesasge sent from leader to target1 and repliated to target2 #[test]