Fix some compilation issues
This commit is contained in:
parent
f752e55929
commit
1b6cdd5637
|
@ -27,7 +27,9 @@ use std::thread::{spawn, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer;
|
use streamer;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
use subscribers::Subscribers;
|
|
||||||
|
use subscribers;
|
||||||
|
use std::mem::size_of;
|
||||||
|
|
||||||
pub struct AccountantSkel<W: Write + Send + 'static> {
|
pub struct AccountantSkel<W: Write + Send + 'static> {
|
||||||
acc: Accountant,
|
acc: Accountant,
|
||||||
|
@ -250,20 +252,26 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
/// Respond with a signed hash of the state
|
/// Respond with a signed hash of the state
|
||||||
fn replicate_state(
|
fn replicate_state(
|
||||||
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
||||||
verified_receiver: &BlobReceiver,
|
verified_receiver: &streamer::BlobReceiver,
|
||||||
blob_sender: &streamer::BlobSender,
|
blob_sender: &streamer::BlobSender,
|
||||||
blob_recycler: &packet::BlobRecycler,
|
blob_recycler: &packet::BlobRecycler,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let blobs = verified_receiver.recv_timeout(timer)?;
|
let blobs = verified_receiver.recv_timeout(timer)?;
|
||||||
for msgs in blobs {
|
for msgs in &blobs {
|
||||||
let entries:Vec<Entry> = b.read().unwrap().data.deserialize()?;
|
let blob = msgs.read().unwrap();
|
||||||
|
let mut entries:Vec<Entry> = Vec::new();
|
||||||
|
for i in 0..blob.meta.size/size_of::<Entry>() {
|
||||||
|
entries.push(deserialize(&blob.data[i..i+size_of::<Entry>()]).unwrap());
|
||||||
|
}
|
||||||
for e in entries {
|
for e in entries {
|
||||||
obj.lock().unwrap().acc.process_verified_events(e.events)?;
|
obj.lock().unwrap().acc.process_verified_events(e.events)?;
|
||||||
}
|
}
|
||||||
//TODO respond back to leader with hash of the state
|
//TODO respond back to leader with hash of the state
|
||||||
}
|
}
|
||||||
blob_recycler.recycle(msgs);
|
for blob in blobs {
|
||||||
|
blob_recycler.recycle(blob);
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -335,7 +343,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
/// 5. respond with the hash of the state back to the leader
|
/// 5. respond with the hash of the state back to the leader
|
||||||
pub fn replicate(
|
pub fn replicate(
|
||||||
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
||||||
rsubs: Subscribers,
|
rsubs: subscribers::Subscribers,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> Result<Vec<JoinHandle<()>>> {
|
) -> Result<Vec<JoinHandle<()>>> {
|
||||||
let read = UdpSocket::bind(rsubs.me.addr)?;
|
let read = UdpSocket::bind(rsubs.me.addr)?;
|
||||||
|
@ -347,7 +355,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
let blob_recycler = packet::BlobRecycler::default();
|
let blob_recycler = packet::BlobRecycler::default();
|
||||||
let (blob_sender, blob_receiver) = channel();
|
let (blob_sender, blob_receiver) = channel();
|
||||||
let t_blob_receiver =
|
let t_blob_receiver =
|
||||||
streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?;
|
streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender.clone())?;
|
||||||
let (window_sender, window_receiver) = channel();
|
let (window_sender, window_receiver) = channel();
|
||||||
let (retransmit_sender, retransmit_receiver) = channel();
|
let (retransmit_sender, retransmit_receiver) = channel();
|
||||||
|
|
||||||
|
@ -355,7 +363,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
let t_retransmit = streamer::retransmitter(
|
let t_retransmit = streamer::retransmitter(
|
||||||
write,
|
write,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
subs,
|
subs.clone(),
|
||||||
blob_recycler.clone(),
|
blob_recycler.clone(),
|
||||||
retransmit_receiver,
|
retransmit_receiver,
|
||||||
);
|
);
|
||||||
|
@ -373,7 +381,8 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
|
|
||||||
let skel = obj.clone();
|
let skel = obj.clone();
|
||||||
let t_server = spawn(move || loop {
|
let t_server = spawn(move || loop {
|
||||||
let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler);
|
let e = Self::replicate_state(&skel, &window_receiver,
|
||||||
|
&blob_sender, &blob_recycler);
|
||||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
if e.is_err() && exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -428,6 +437,8 @@ mod tests {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
|
use subscribers::{Node, Subscribers};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_layout() {
|
fn test_layout() {
|
||||||
let tr = test_tx();
|
let tr = test_tx();
|
||||||
|
@ -532,6 +543,30 @@ mod tests {
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_replicate() {
|
||||||
|
let serve_port = 9004;
|
||||||
|
let send_port = 9005;
|
||||||
|
let addr = format!("127.0.0.1:{}", serve_port);
|
||||||
|
let send_addr = format!("127.0.0.1:{}", send_port);
|
||||||
|
let alice = Mint::new(10_000);
|
||||||
|
let acc = Accountant::new(&alice);
|
||||||
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
let historian = Historian::new(&alice.last_id(), Some(30));
|
||||||
|
let acc = Arc::new(Mutex::new(AccountantSkel::new(
|
||||||
|
acc,
|
||||||
|
alice.last_id(),
|
||||||
|
sink(),
|
||||||
|
historian,
|
||||||
|
)));
|
||||||
|
let node_me = Node::default();
|
||||||
|
let node_leader = Node::default();
|
||||||
|
let subs = Subscribers::new(node_me, node_leader, &[]);
|
||||||
|
let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap();
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "unstable", test))]
|
#[cfg(all(feature = "unstable", test))]
|
||||||
|
|
Loading…
Reference in New Issue