Merge pull request #172 from sakridge/fix_entry_serialize
Fix entry serialize
This commit is contained in:
commit
588593f619
|
@ -18,7 +18,6 @@ use result::Result;
|
|||
use serde_json;
|
||||
use signature::PublicKey;
|
||||
use std::cmp::max;
|
||||
use std::collections::LinkedList;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::{Cursor, Write};
|
||||
use std::mem::size_of;
|
||||
|
@ -113,34 +112,22 @@ impl AccountantSkel {
|
|||
"{}",
|
||||
serde_json::to_string(&entry).unwrap()
|
||||
).unwrap();
|
||||
trace!("notify_entry_info entry");
|
||||
Self::notify_entry_info_subscribers(obj, &entry);
|
||||
trace!("notify_entry_info done");
|
||||
}
|
||||
|
||||
fn receive_to_list<W: Write>(
|
||||
obj: &SharedSkel,
|
||||
writer: &Arc<Mutex<W>>,
|
||||
max: usize,
|
||||
) -> Result<LinkedList<Entry>> {
|
||||
fn receive_all<W: Write>(obj: &SharedSkel, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
|
||||
//TODO implement a serialize for channel that does this without allocations
|
||||
let mut num = 0;
|
||||
let mut l = LinkedList::new();
|
||||
let mut l = vec![];
|
||||
let entry = obj.historian
|
||||
.output
|
||||
.lock()
|
||||
.unwrap()
|
||||
.recv_timeout(Duration::new(1, 0))?;
|
||||
Self::update_entry(obj, writer, &entry);
|
||||
l.push_back(entry);
|
||||
l.push(entry);
|
||||
while let Ok(entry) = obj.historian.receive() {
|
||||
Self::update_entry(obj, writer, &entry);
|
||||
l.push_back(entry);
|
||||
num += 1;
|
||||
if num == max {
|
||||
break;
|
||||
}
|
||||
trace!("receive_to_list entries num: {}", num);
|
||||
l.push(entry);
|
||||
}
|
||||
Ok(l)
|
||||
}
|
||||
|
@ -154,24 +141,37 @@ impl AccountantSkel {
|
|||
writer: &Arc<Mutex<W>>,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Result<()> {
|
||||
// TODO: should it be the serialized Entry size?
|
||||
let max = BLOB_SIZE / size_of::<Entry>();
|
||||
let mut q = VecDeque::new();
|
||||
let mut count = 0;
|
||||
trace!("max: {}", max);
|
||||
while let Ok(list) = Self::receive_to_list(&obj, writer, max) {
|
||||
trace!("New blobs? {} {}", count, list.len());
|
||||
let b = blob_recycler.allocate();
|
||||
let pos = {
|
||||
let mut bd = b.write().unwrap();
|
||||
let mut out = Cursor::new(bd.data_mut());
|
||||
serialize_into(&mut out, &list).expect("failed to serialize output");
|
||||
out.position() as usize
|
||||
};
|
||||
assert!(pos < BLOB_SIZE);
|
||||
b.write().unwrap().set_size(pos);
|
||||
q.push_back(b);
|
||||
count += 1;
|
||||
while let Ok(list) = Self::receive_all(&obj, writer) {
|
||||
trace!("New blobs? {}", list.len());
|
||||
let mut start = 0;
|
||||
let mut end = 0;
|
||||
while start < list.len() {
|
||||
let mut total = 0;
|
||||
for i in &list[start..] {
|
||||
total += size_of::<Event>() * i.events.len();
|
||||
total += size_of::<Entry>();
|
||||
if total >= BLOB_SIZE {
|
||||
break;
|
||||
}
|
||||
end += 1;
|
||||
}
|
||||
// See that we made progress and a single
|
||||
// vec of Events wasn't too big for a single packet
|
||||
assert!(end > start);
|
||||
let b = blob_recycler.allocate();
|
||||
let pos = {
|
||||
let mut bd = b.write().unwrap();
|
||||
let mut out = Cursor::new(bd.data_mut());
|
||||
serialize_into(&mut out, &list[start..end])
|
||||
.expect("failed to serialize output");
|
||||
out.position() as usize
|
||||
};
|
||||
assert!(pos < BLOB_SIZE);
|
||||
b.write().unwrap().set_size(pos);
|
||||
q.push_back(b);
|
||||
start = end;
|
||||
}
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue