fix writer

This commit is contained in:
Anatoly Yakovenko 2018-03-26 11:17:19 -07:00 committed by Greg Fitzgerald
parent 112aecf6eb
commit 14239e584f
3 changed files with 13 additions and 9 deletions

View File

@ -13,12 +13,14 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::default::Default; use std::default::Default;
use std::io::Write;
use serde_json; use serde_json;
pub struct AccountantSkel { pub struct AccountantSkel<W: Write + Send + 'static> {
pub acc: Accountant, pub acc: Accountant,
pub last_id: Hash, pub last_id: Hash,
pub ledger: Vec<Entry>, pub ledger: Vec<Entry>,
writer: W,
} }
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
@ -37,20 +39,21 @@ pub enum Response {
Id { id: Hash, is_last: bool }, Id { id: Hash, is_last: bool },
} }
impl AccountantSkel { impl<W: Write + Send + 'static> AccountantSkel<W> {
pub fn new(acc: Accountant) -> Self { pub fn new(acc: Accountant, w: W) -> Self {
let last_id = acc.first_id; let last_id = acc.first_id;
AccountantSkel { AccountantSkel {
acc, acc,
last_id, last_id,
ledger: vec![], ledger: vec![],
writer: w,
} }
} }
pub fn sync(self: &mut Self) -> Hash { pub fn sync(&mut self) -> Hash {
while let Ok(entry) = self.acc.historian.receiver.try_recv() { while let Ok(entry) = self.acc.historian.receiver.try_recv() {
self.last_id = entry.id; self.last_id = entry.id;
println!("{}", serde_json::to_string(&entry).unwrap()); write!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
self.ledger.push(entry); self.ledger.push(entry);
} }
self.last_id self.last_id
@ -131,7 +134,7 @@ impl AccountantSkel {
/// UDP Server that forwards messages to Accountant methods. /// UDP Server that forwards messages to Accountant methods.
pub fn serve( pub fn serve(
obj: Arc<Mutex<AccountantSkel>>, obj: Arc<Mutex<AccountantSkel<W>>>,
addr: &str, addr: &str,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> { ) -> Result<Vec<JoinHandle<()>>> {

View File

@ -128,6 +128,7 @@ mod tests {
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::io::sink;
#[test] #[test]
fn test_accountant_stub() { fn test_accountant_stub() {
@ -137,7 +138,7 @@ mod tests {
let acc = Accountant::new(&alice, Some(30)); let acc = Accountant::new(&alice, Some(30));
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let acc = Arc::new(Mutex::new(AccountantSkel::new(acc))); let acc = Arc::new(Mutex::new(AccountantSkel::new(acc, sink())));
let threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); let threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap();
sleep(Duration::from_millis(30)); sleep(Duration::from_millis(30));

View File

@ -3,7 +3,7 @@ extern crate silk;
use silk::accountant_skel::AccountantSkel; use silk::accountant_skel::AccountantSkel;
use silk::accountant::Accountant; use silk::accountant::Accountant;
use std::io::{self, BufRead}; use std::io::{self, stdout, BufRead};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
@ -16,7 +16,7 @@ fn main() {
.map(|line| serde_json::from_str(&line.unwrap()).unwrap()); .map(|line| serde_json::from_str(&line.unwrap()).unwrap());
let acc = Accountant::new_from_entries(entries, Some(1000)); let acc = Accountant::new_from_entries(entries, Some(1000));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let skel = Arc::new(Mutex::new(AccountantSkel::new(acc))); let skel = Arc::new(Mutex::new(AccountantSkel::new(acc, stdout())));
eprintln!("Listening on {}", addr); eprintln!("Listening on {}", addr);
let threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap(); let threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap();
for t in threads { for t in threads {