Add events socket instead of modifying the existing socket
This commit is contained in:
parent
4fdd891b54
commit
4f629dd982
|
@ -54,7 +54,7 @@ fn main() {
|
||||||
let serve_addr = format!("0.0.0.0:{}", port);
|
let serve_addr = format!("0.0.0.0:{}", port);
|
||||||
let gossip_addr = format!("0.0.0.0:{}", port + 1);
|
let gossip_addr = format!("0.0.0.0:{}", port + 1);
|
||||||
let replicate_addr = format!("0.0.0.0:{}", port + 2);
|
let replicate_addr = format!("0.0.0.0:{}", port + 2);
|
||||||
let skinny_addr = format!("0.0.0.0:{}", port + 3);
|
let events_addr = format!("0.0.0.0:{}", port + 3);
|
||||||
|
|
||||||
if stdin_isatty() {
|
if stdin_isatty() {
|
||||||
eprintln!("nothing found on stdin, expected a log file");
|
eprintln!("nothing found on stdin, expected a log file");
|
||||||
|
@ -121,7 +121,7 @@ fn main() {
|
||||||
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
|
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
|
||||||
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
|
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
|
||||||
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
|
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
|
||||||
let skinny_sock = UdpSocket::bind(&skinny_addr).unwrap();
|
let events_sock = UdpSocket::bind(&events_addr).unwrap();
|
||||||
let pubkey = KeyPair::new().pubkey();
|
let pubkey = KeyPair::new().pubkey();
|
||||||
let d = ReplicatedData::new(
|
let d = ReplicatedData::new(
|
||||||
pubkey,
|
pubkey,
|
||||||
|
@ -134,7 +134,7 @@ fn main() {
|
||||||
&tpu,
|
&tpu,
|
||||||
d,
|
d,
|
||||||
serve_sock,
|
serve_sock,
|
||||||
skinny_sock,
|
events_sock,
|
||||||
gossip_sock,
|
gossip_sock,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
stdout(),
|
stdout(),
|
||||||
|
|
|
@ -168,7 +168,7 @@ mod tests {
|
||||||
logger::setup();
|
logger::setup();
|
||||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let skinny = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let addr = serve.local_addr().unwrap();
|
let addr = serve.local_addr().unwrap();
|
||||||
let pubkey = KeyPair::new().pubkey();
|
let pubkey = KeyPair::new().pubkey();
|
||||||
let d = ReplicatedData::new(
|
let d = ReplicatedData::new(
|
||||||
|
@ -184,8 +184,15 @@ mod tests {
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
|
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
|
||||||
let accountant = Arc::new(Tpu::new(accounting_stage));
|
let accountant = Arc::new(Tpu::new(accounting_stage));
|
||||||
let threads =
|
let threads = Tpu::serve(
|
||||||
Tpu::serve(&accountant, d, serve, skinny, gossip, exit.clone(), sink()).unwrap();
|
&accountant,
|
||||||
|
d,
|
||||||
|
serve,
|
||||||
|
events_socket,
|
||||||
|
gossip,
|
||||||
|
exit.clone(),
|
||||||
|
sink(),
|
||||||
|
).unwrap();
|
||||||
sleep(Duration::from_millis(300));
|
sleep(Duration::from_millis(300));
|
||||||
|
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
@ -215,7 +222,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_bad_sig() {
|
fn test_bad_sig() {
|
||||||
let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = tpu::test_node();
|
let (leader_data, leader_gossip, _, leader_serve, leader_events) = tpu::test_node();
|
||||||
let alice = Mint::new(10_000);
|
let alice = Mint::new(10_000);
|
||||||
let accountant = Accountant::new(&alice);
|
let accountant = Accountant::new(&alice);
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
|
@ -227,7 +234,7 @@ mod tests {
|
||||||
&tpu,
|
&tpu,
|
||||||
leader_data,
|
leader_data,
|
||||||
leader_serve,
|
leader_serve,
|
||||||
leader_skinny,
|
leader_events,
|
||||||
leader_gossip,
|
leader_gossip,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
sink(),
|
sink(),
|
||||||
|
@ -264,7 +271,7 @@ mod tests {
|
||||||
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
||||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let skinny = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let pubkey = KeyPair::new().pubkey();
|
let pubkey = KeyPair::new().pubkey();
|
||||||
let leader = ReplicatedData::new(
|
let leader = ReplicatedData::new(
|
||||||
|
@ -273,7 +280,7 @@ mod tests {
|
||||||
replicate.local_addr().unwrap(),
|
replicate.local_addr().unwrap(),
|
||||||
serve.local_addr().unwrap(),
|
serve.local_addr().unwrap(),
|
||||||
);
|
);
|
||||||
(leader, gossip, serve, replicate, skinny)
|
(leader, gossip, serve, replicate, events_socket)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
32
src/tpu.rs
32
src/tpu.rs
|
@ -1,7 +1,6 @@
|
||||||
//! The `tpu` module implements the Transaction Processing Unit, a
|
//! The `tpu` module implements the Transaction Processing Unit, a
|
||||||
//! 5-stage transaction processing pipeline in software.
|
//! 5-stage transaction processing pipeline in software.
|
||||||
|
|
||||||
use accountant::Accountant;
|
|
||||||
use accounting_stage::AccountingStage;
|
use accounting_stage::AccountingStage;
|
||||||
use bincode::{deserialize, serialize};
|
use bincode::{deserialize, serialize};
|
||||||
use crdt::{Crdt, ReplicatedData};
|
use crdt::{Crdt, ReplicatedData};
|
||||||
|
@ -109,24 +108,6 @@ impl Tpu {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_thin_client_requests(_acc: &Arc<Accountant>, _socket: &UdpSocket) -> Result<()> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn thin_client_service(
|
|
||||||
accountant: Arc<Accountant>,
|
|
||||||
exit: Arc<AtomicBool>,
|
|
||||||
socket: UdpSocket,
|
|
||||||
) -> JoinHandle<()> {
|
|
||||||
spawn(move || loop {
|
|
||||||
let _ = Self::process_thin_client_requests(&accountant, &socket);
|
|
||||||
if exit.load(Ordering::Relaxed) {
|
|
||||||
info!("sync_service exiting");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process any Entry items that have been published by the Historian.
|
/// Process any Entry items that have been published by the Historian.
|
||||||
/// continuosly broadcast blobs of entries out
|
/// continuosly broadcast blobs of entries out
|
||||||
fn run_sync_no_broadcast(obj: SharedTpu) -> Result<()> {
|
fn run_sync_no_broadcast(obj: SharedTpu) -> Result<()> {
|
||||||
|
@ -367,7 +348,7 @@ impl Tpu {
|
||||||
obj: &SharedTpu,
|
obj: &SharedTpu,
|
||||||
me: ReplicatedData,
|
me: ReplicatedData,
|
||||||
serve: UdpSocket,
|
serve: UdpSocket,
|
||||||
skinny: UdpSocket,
|
_events_socket: UdpSocket,
|
||||||
gossip: UdpSocket,
|
gossip: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
writer: W,
|
writer: W,
|
||||||
|
@ -430,12 +411,6 @@ impl Tpu {
|
||||||
Mutex::new(writer),
|
Mutex::new(writer),
|
||||||
);
|
);
|
||||||
|
|
||||||
let t_skinny = Self::thin_client_service(
|
|
||||||
obj.accounting_stage.accountant.clone(),
|
|
||||||
exit.clone(),
|
|
||||||
skinny,
|
|
||||||
);
|
|
||||||
|
|
||||||
let tpu = obj.clone();
|
let tpu = obj.clone();
|
||||||
let t_server = spawn(move || loop {
|
let t_server = spawn(move || loop {
|
||||||
let e = Self::process_request_packets(
|
let e = Self::process_request_packets(
|
||||||
|
@ -457,7 +432,6 @@ impl Tpu {
|
||||||
t_responder,
|
t_responder,
|
||||||
t_server,
|
t_server,
|
||||||
t_sync,
|
t_sync,
|
||||||
t_skinny,
|
|
||||||
t_gossip,
|
t_gossip,
|
||||||
t_listen,
|
t_listen,
|
||||||
t_broadcast,
|
t_broadcast,
|
||||||
|
@ -642,7 +616,7 @@ pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec
|
||||||
pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
||||||
use signature::{KeyPair, KeyPairUtil};
|
use signature::{KeyPair, KeyPairUtil};
|
||||||
|
|
||||||
let skinny = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
|
@ -653,7 +627,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
|
||||||
replicate.local_addr().unwrap(),
|
replicate.local_addr().unwrap(),
|
||||||
serve.local_addr().unwrap(),
|
serve.local_addr().unwrap(),
|
||||||
);
|
);
|
||||||
(d, gossip, replicate, serve, skinny)
|
(d, gossip, replicate, serve, events_socket)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
Loading…
Reference in New Issue