diff --git a/src/rpu.rs b/src/rpu.rs index 4d6f5b993..ba4c4efc6 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -2,39 +2,27 @@ //! 5-stage transaction processing pipeline in software. use bank::Bank; -use crdt::{Crdt, ReplicatedData}; -use hash::Hash; use packet; -use record_stage::RecordStage; use request_processor::RequestProcessor; use request_stage::RequestStage; use sig_verify_stage::SigVerifyStage; -use std::io::Write; use std::net::UdpSocket; +use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex, RwLock}; use std::thread::JoinHandle; -use std::time::Duration; use streamer; -use write_stage::WriteStage; pub struct Rpu { pub thread_hdls: Vec>, } impl Rpu { - pub fn new( + pub fn new( bank: Arc, - start_hash: Hash, - tick_duration: Option, - me: ReplicatedData, requests_socket: UdpSocket, - broadcast_socket: UdpSocket, respond_socket: UdpSocket, - gossip: UdpSocket, exit: Arc, - writer: W, ) -> Self { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); @@ -57,31 +45,6 @@ impl Rpu { blob_recycler.clone(), ); - let record_stage = - RecordStage::new(request_stage.signal_receiver, &start_hash, tick_duration); - - let write_stage = WriteStage::new( - bank.clone(), - exit.clone(), - blob_recycler.clone(), - Mutex::new(writer), - record_stage.entry_receiver, - ); - - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let window = streamer::default_window(); - let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); - - let t_broadcast = streamer::broadcaster( - broadcast_socket, - exit.clone(), - crdt.clone(), - window, - blob_recycler.clone(), - write_stage.blob_receiver, - ); - let t_responder = streamer::responder( respond_socket, exit.clone(), @@ -89,15 +52,7 @@ impl Rpu { request_stage.blob_receiver, ); - let mut thread_hdls = vec![ - t_receiver, - t_responder, - request_stage.thread_hdl, - write_stage.thread_hdl, - t_gossip, - t_listen, - t_broadcast, - ]; + let mut thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl]; thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter()); Rpu { thread_hdls } diff --git a/src/server.rs b/src/server.rs index 07b11e426..b250e86d8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::thread::JoinHandle; use std::time::Duration; -//use tpu::Tpu; +use tpu::Tpu; pub struct Server { pub thread_hdls: Vec>, @@ -23,7 +23,7 @@ impl Server { tick_duration: Option, me: ReplicatedData, requests_socket: UdpSocket, - _events_socket: UdpSocket, + events_socket: UdpSocket, broadcast_socket: UdpSocket, respond_socket: UdpSocket, gossip: UdpSocket, @@ -32,32 +32,21 @@ impl Server { ) -> Self { let bank = Arc::new(bank); let mut thread_hdls = vec![]; - let rpu = Rpu::new( + let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); + thread_hdls.extend(rpu.thread_hdls); + + let tpu = Tpu::new( bank.clone(), start_hash, tick_duration, me, - requests_socket, + events_socket, broadcast_socket, - respond_socket, gossip, exit.clone(), writer, ); - thread_hdls.extend(rpu.thread_hdls); - - //let tpu = Tpu::new( - // bank.clone(), - // start_hash, - // tick_duration, - // me, - // events_socket, - // broadcast_socket, - // gossip, - // exit.clone(), - // writer, - //); - //thread_hdls.extend(tpu.thread_hdls); + thread_hdls.extend(tpu.thread_hdls); Server { thread_hdls } } diff --git a/src/thin_client.rs b/src/thin_client.rs index 30036c340..6214ef520 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -69,7 +69,7 @@ impl ThinClient { pub fn transfer_signed(&self, tr: Transaction) -> io::Result { let req = Request::Transaction(tr); let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed"); - self.requests_socket.send_to(&data, &self.addr) + self.events_socket.send_to(&data, &self.addr) } /// Creates, signs, and processes a Transaction. Useful for writing unit-tests.