diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index b8e7db7379..d3e8ab14b5 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -27,7 +27,9 @@ use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; use transaction::Transaction; -use subscribers::Subscribers; + +use subscribers; +use std::mem::size_of; pub struct AccountantSkel { acc: Accountant, @@ -250,20 +252,26 @@ impl AccountantSkel { /// Respond with a signed hash of the state fn replicate_state( obj: &Arc>>, - verified_receiver: &BlobReceiver, + verified_receiver: &streamer::BlobReceiver, blob_sender: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; - for msgs in blobs { - let entries:Vec = b.read().unwrap().data.deserialize()?; + for msgs in &blobs { + let blob = msgs.read().unwrap(); + let mut entries:Vec = Vec::new(); + for i in 0..blob.meta.size/size_of::() { + entries.push(deserialize(&blob.data[i..i+size_of::()]).unwrap()); + } for e in entries { obj.lock().unwrap().acc.process_verified_events(e.events)?; } //TODO respond back to leader with hash of the state } - blob_recycler.recycle(msgs); + for blob in blobs { + blob_recycler.recycle(blob); + } Ok(()) } @@ -335,7 +343,7 @@ impl AccountantSkel { /// 5. respond with the hash of the state back to the leader pub fn replicate( obj: &Arc>>, - rsubs: Subscribers, + rsubs: subscribers::Subscribers, exit: Arc, ) -> Result>> { let read = UdpSocket::bind(rsubs.me.addr)?; @@ -347,7 +355,7 @@ impl AccountantSkel { let blob_recycler = packet::BlobRecycler::default(); let (blob_sender, blob_receiver) = channel(); let t_blob_receiver = - streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; + streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender.clone())?; let (window_sender, window_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel(); @@ -355,7 +363,7 @@ impl AccountantSkel { let t_retransmit = streamer::retransmitter( write, exit.clone(), - subs, + subs.clone(), blob_recycler.clone(), retransmit_receiver, ); @@ -373,7 +381,8 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler); + let e = Self::replicate_state(&skel, &window_receiver, + &blob_sender, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } @@ -428,6 +437,8 @@ mod tests { use std::time::Duration; use transaction::Transaction; + use subscribers::{Node, Subscribers}; + #[test] fn test_layout() { let tr = test_tx(); @@ -532,6 +543,30 @@ mod tests { exit.store(true, Ordering::Relaxed); } + #[test] + fn test_replicate() { + let serve_port = 9004; + let send_port = 9005; + let addr = format!("127.0.0.1:{}", serve_port); + let send_addr = format!("127.0.0.1:{}", send_port); + let alice = Mint::new(10_000); + let acc = Accountant::new(&alice); + let bob_pubkey = KeyPair::new().pubkey(); + let exit = Arc::new(AtomicBool::new(false)); + let historian = Historian::new(&alice.last_id(), Some(30)); + let acc = Arc::new(Mutex::new(AccountantSkel::new( + acc, + alice.last_id(), + sink(), + historian, + ))); + let node_me = Node::default(); + let node_leader = Node::default(); + let subs = Subscribers::new(node_me, node_leader, &[]); + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + exit.store(true, Ordering::Relaxed); + } + } #[cfg(all(feature = "unstable", test))]