commit
2bfa20ff85
|
@ -213,6 +213,24 @@ impl AccountantSkel {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn process_thin_client_requests(_obj: SharedSkel, _socket: &UdpSocket) -> Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn thin_client_service(
|
||||||
|
obj: SharedSkel,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
socket: UdpSocket,
|
||||||
|
) -> JoinHandle<()> {
|
||||||
|
spawn(move || loop {
|
||||||
|
let _ = Self::process_thin_client_requests(obj.clone(), &socket);
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
info!("sync_service exiting");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Process any Entry items that have been published by the Historian.
|
/// Process any Entry items that have been published by the Historian.
|
||||||
/// continuosly broadcast blobs of entries out
|
/// continuosly broadcast blobs of entries out
|
||||||
fn run_sync_no_broadcast(obj: SharedSkel) -> Result<()> {
|
fn run_sync_no_broadcast(obj: SharedSkel) -> Result<()> {
|
||||||
|
@ -499,6 +517,7 @@ impl AccountantSkel {
|
||||||
obj: &SharedSkel,
|
obj: &SharedSkel,
|
||||||
me: ReplicatedData,
|
me: ReplicatedData,
|
||||||
serve: UdpSocket,
|
serve: UdpSocket,
|
||||||
|
skinny: UdpSocket,
|
||||||
gossip: UdpSocket,
|
gossip: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
writer: W,
|
writer: W,
|
||||||
|
@ -561,6 +580,8 @@ impl AccountantSkel {
|
||||||
Arc::new(Mutex::new(writer)),
|
Arc::new(Mutex::new(writer)),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let t_skinny = Self::thin_client_service(obj.clone(), exit.clone(), skinny);
|
||||||
|
|
||||||
let skel = obj.clone();
|
let skel = obj.clone();
|
||||||
let t_server = spawn(move || loop {
|
let t_server = spawn(move || loop {
|
||||||
let e = Self::process(
|
let e = Self::process(
|
||||||
|
@ -582,6 +603,7 @@ impl AccountantSkel {
|
||||||
t_responder,
|
t_responder,
|
||||||
t_server,
|
t_server,
|
||||||
t_sync,
|
t_sync,
|
||||||
|
t_skinny,
|
||||||
t_gossip,
|
t_gossip,
|
||||||
t_listen,
|
t_listen,
|
||||||
t_broadcast,
|
t_broadcast,
|
||||||
|
@ -872,7 +894,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_accountant_bad_sig() {
|
fn test_accountant_bad_sig() {
|
||||||
let (leader_data, leader_gossip, _, leader_serve) = test_node();
|
let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = test_node();
|
||||||
let alice = Mint::new(10_000);
|
let alice = Mint::new(10_000);
|
||||||
let acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
|
@ -885,6 +907,7 @@ mod tests {
|
||||||
&acc_skel,
|
&acc_skel,
|
||||||
leader_data,
|
leader_data,
|
||||||
leader_serve,
|
leader_serve,
|
||||||
|
leader_skinny,
|
||||||
leader_gossip,
|
leader_gossip,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
sink(),
|
sink(),
|
||||||
|
@ -918,7 +941,8 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) {
|
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
||||||
|
let skinny = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
|
@ -929,16 +953,16 @@ mod tests {
|
||||||
replicate.local_addr().unwrap(),
|
replicate.local_addr().unwrap(),
|
||||||
serve.local_addr().unwrap(),
|
serve.local_addr().unwrap(),
|
||||||
);
|
);
|
||||||
(d, gossip, replicate, serve)
|
(d, gossip, replicate, serve, skinny)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test that mesasge sent from leader to target1 and repliated to target2
|
/// Test that mesasge sent from leader to target1 and repliated to target2
|
||||||
#[test]
|
#[test]
|
||||||
fn test_replicate() {
|
fn test_replicate() {
|
||||||
logger::setup();
|
logger::setup();
|
||||||
let (leader_data, leader_gossip, _, leader_serve) = test_node();
|
let (leader_data, leader_gossip, _, leader_serve, _) = test_node();
|
||||||
let (target1_data, target1_gossip, target1_replicate, target1_serve) = test_node();
|
let (target1_data, target1_gossip, target1_replicate, target1_serve, _) = test_node();
|
||||||
let (target2_data, target2_gossip, target2_replicate, _) = test_node();
|
let (target2_data, target2_gossip, target2_replicate, _, _) = test_node();
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
//start crdt_leader
|
//start crdt_leader
|
||||||
|
|
|
@ -169,6 +169,7 @@ mod tests {
|
||||||
logger::setup();
|
logger::setup();
|
||||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
let skinny = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let addr = serve.local_addr().unwrap();
|
let addr = serve.local_addr().unwrap();
|
||||||
let pubkey = KeyPair::new().pubkey();
|
let pubkey = KeyPair::new().pubkey();
|
||||||
let d = ReplicatedData::new(
|
let d = ReplicatedData::new(
|
||||||
|
@ -185,7 +186,8 @@ mod tests {
|
||||||
let (input, event_receiver) = sync_channel(10);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||||
let acc = Arc::new(AccountantSkel::new(acc, input, historian));
|
let acc = Arc::new(AccountantSkel::new(acc, input, historian));
|
||||||
let threads = AccountantSkel::serve(&acc, d, serve, gossip, exit.clone(), sink()).unwrap();
|
let threads =
|
||||||
|
AccountantSkel::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap();
|
||||||
sleep(Duration::from_millis(300));
|
sleep(Duration::from_millis(300));
|
||||||
|
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
@ -212,9 +214,10 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) {
|
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
||||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
let skinny = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let pubkey = KeyPair::new().pubkey();
|
let pubkey = KeyPair::new().pubkey();
|
||||||
let leader = ReplicatedData::new(
|
let leader = ReplicatedData::new(
|
||||||
|
@ -223,7 +226,7 @@ mod tests {
|
||||||
replicate.local_addr().unwrap(),
|
replicate.local_addr().unwrap(),
|
||||||
serve.local_addr().unwrap(),
|
serve.local_addr().unwrap(),
|
||||||
);
|
);
|
||||||
(leader, gossip, serve, replicate)
|
(leader, gossip, serve, replicate, skinny)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -254,6 +257,7 @@ mod tests {
|
||||||
&leader_acc,
|
&leader_acc,
|
||||||
leader.0.clone(),
|
leader.0.clone(),
|
||||||
leader.2,
|
leader.2,
|
||||||
|
leader.4,
|
||||||
leader.1,
|
leader.1,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
sink(),
|
sink(),
|
||||||
|
@ -269,7 +273,7 @@ mod tests {
|
||||||
).unwrap();
|
).unwrap();
|
||||||
|
|
||||||
//lets spy on the network
|
//lets spy on the network
|
||||||
let (mut spy, spy_gossip, _, _) = test_node();
|
let (mut spy, spy_gossip, _, _, _) = test_node();
|
||||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||||
spy.replicate_addr = daddr;
|
spy.replicate_addr = daddr;
|
||||||
spy.serve_addr = daddr;
|
spy.serve_addr = daddr;
|
||||||
|
|
|
@ -55,6 +55,7 @@ fn main() {
|
||||||
let serve_addr = format!("0.0.0.0:{}", port);
|
let serve_addr = format!("0.0.0.0:{}", port);
|
||||||
let gossip_addr = format!("0.0.0.0:{}", port + 1);
|
let gossip_addr = format!("0.0.0.0:{}", port + 1);
|
||||||
let replicate_addr = format!("0.0.0.0:{}", port + 2);
|
let replicate_addr = format!("0.0.0.0:{}", port + 2);
|
||||||
|
let skinny_addr = format!("0.0.0.0:{}", port + 3);
|
||||||
|
|
||||||
if stdin_isatty() {
|
if stdin_isatty() {
|
||||||
eprintln!("nothing found on stdin, expected a log file");
|
eprintln!("nothing found on stdin, expected a log file");
|
||||||
|
@ -122,6 +123,7 @@ fn main() {
|
||||||
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
|
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
|
||||||
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
|
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
|
||||||
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
|
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
|
||||||
|
let skinny_sock = UdpSocket::bind(&skinny_addr).unwrap();
|
||||||
let pubkey = KeyPair::new().pubkey();
|
let pubkey = KeyPair::new().pubkey();
|
||||||
let d = ReplicatedData::new(
|
let d = ReplicatedData::new(
|
||||||
pubkey,
|
pubkey,
|
||||||
|
@ -130,8 +132,15 @@ fn main() {
|
||||||
serve_sock.local_addr().unwrap(),
|
serve_sock.local_addr().unwrap(),
|
||||||
);
|
);
|
||||||
eprintln!("starting server...");
|
eprintln!("starting server...");
|
||||||
let threads =
|
let threads = AccountantSkel::serve(
|
||||||
AccountantSkel::serve(&skel, d, serve_sock, gossip_sock, exit.clone(), stdout()).unwrap();
|
&skel,
|
||||||
|
d,
|
||||||
|
serve_sock,
|
||||||
|
skinny_sock,
|
||||||
|
gossip_sock,
|
||||||
|
exit.clone(),
|
||||||
|
stdout(),
|
||||||
|
).unwrap();
|
||||||
eprintln!("Ready. Listening on {}", serve_addr);
|
eprintln!("Ready. Listening on {}", serve_addr);
|
||||||
for t in threads {
|
for t in threads {
|
||||||
t.join().expect("join");
|
t.join().expect("join");
|
||||||
|
|
Loading…
Reference in New Issue