This commit is contained in:
Anatoly Yakovenko 2018-04-18 12:29:33 -07:00 committed by Stephen Akridge
parent 828b9d6717
commit ad6303f031
1 changed files with 29 additions and 1 deletions

View File

@ -245,6 +245,34 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
Ok(())
}
/// Process verified blobs, already in order
/// Respond with a signed hash of the state
fn replicate_state(
obj: &Arc<Mutex<AccountantSkel<W>>>,
verified_receiver: &BlobReceiver,
blob_sender: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
for msgs in blobs {
let entries = b.read().unwrap().data.deserialize();
let req_vers = reqs.into_iter()
.zip(vers)
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
.filter(|x| x.0.verify())
.collect();
let rsps = obj.lock().unwrap().process_packets(req_vers)?;
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
if !blobs.is_empty() {
//don't wake up the other side if there is nothing
blob_sender.send(blobs)?;
}
packet_recycler.recycle(msgs);
}
Ok(())
}
/// Create a UDP microservice that forwards messages the given AccountantSkel.
/// This service is the network leader
@ -307,7 +335,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
/// 3. reconstruct contiguous window
/// a. order the blobs
/// b. use erasure coding to reconstruct missing blobs
/// c. ask the network for missing blobs
/// c. ask the network for missing blobs, if erasure coding is insufficient
/// d. make sure that the blobs PoH sequences connect (TODO)
/// 4. process the transaction state machine
/// 5. respond with the hash of the state back to the leader