Merge pull request #171 from garious/cleanup-lastid
Cleanup last_id access in stub and skel
This commit is contained in:
commit
6f9285322d
|
@ -74,6 +74,13 @@ impl Accountant {
|
||||||
acc
|
acc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the last entry ID registered
|
||||||
|
pub fn last_id(&self) -> Hash {
|
||||||
|
let last_ids = self.last_ids.read().unwrap();
|
||||||
|
let last_item = last_ids.iter().last().expect("empty last_ids list");
|
||||||
|
last_item.0
|
||||||
|
}
|
||||||
|
|
||||||
fn reserve_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
|
fn reserve_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
|
||||||
if signatures.read().unwrap().contains(sig) {
|
if signatures.read().unwrap().contains(sig) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -327,6 +334,8 @@ mod tests {
|
||||||
let alice = Mint::new(10_000);
|
let alice = Mint::new(10_000);
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
let acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
|
assert_eq!(acc.last_id(), alice.last_id());
|
||||||
|
|
||||||
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
|
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
|
||||||
|
|
|
@ -33,7 +33,6 @@ use transaction::Transaction;
|
||||||
|
|
||||||
pub struct AccountantSkel {
|
pub struct AccountantSkel {
|
||||||
acc: Mutex<Accountant>,
|
acc: Mutex<Accountant>,
|
||||||
last_id: Mutex<Hash>,
|
|
||||||
historian_input: Mutex<SyncSender<Signal>>,
|
historian_input: Mutex<SyncSender<Signal>>,
|
||||||
historian: Historian,
|
historian: Historian,
|
||||||
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
|
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
|
||||||
|
@ -44,7 +43,6 @@ pub struct AccountantSkel {
|
||||||
pub enum Request {
|
pub enum Request {
|
||||||
Transaction(Transaction),
|
Transaction(Transaction),
|
||||||
GetBalance { key: PublicKey },
|
GetBalance { key: PublicKey },
|
||||||
GetLastId,
|
|
||||||
Subscribe { subscriptions: Vec<Subscription> },
|
Subscribe { subscriptions: Vec<Subscription> },
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,20 +74,13 @@ type SharedSkel = Arc<AccountantSkel>;
|
||||||
pub enum Response {
|
pub enum Response {
|
||||||
Balance { key: PublicKey, val: Option<i64> },
|
Balance { key: PublicKey, val: Option<i64> },
|
||||||
EntryInfo(EntryInfo),
|
EntryInfo(EntryInfo),
|
||||||
LastId { id: Hash },
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AccountantSkel {
|
impl AccountantSkel {
|
||||||
/// Create a new AccountantSkel that wraps the given Accountant.
|
/// Create a new AccountantSkel that wraps the given Accountant.
|
||||||
pub fn new(
|
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
|
||||||
acc: Accountant,
|
|
||||||
last_id: Hash,
|
|
||||||
historian_input: SyncSender<Signal>,
|
|
||||||
historian: Historian,
|
|
||||||
) -> Self {
|
|
||||||
AccountantSkel {
|
AccountantSkel {
|
||||||
acc: Mutex::new(acc),
|
acc: Mutex::new(acc),
|
||||||
last_id: Mutex::new(last_id),
|
|
||||||
entry_info_subscribers: Mutex::new(vec![]),
|
entry_info_subscribers: Mutex::new(vec![]),
|
||||||
historian_input: Mutex::new(historian_input),
|
historian_input: Mutex::new(historian_input),
|
||||||
historian,
|
historian,
|
||||||
|
@ -116,10 +107,7 @@ impl AccountantSkel {
|
||||||
|
|
||||||
fn update_entry<W: Write>(obj: &SharedSkel, writer: &Arc<Mutex<W>>, entry: &Entry) {
|
fn update_entry<W: Write>(obj: &SharedSkel, writer: &Arc<Mutex<W>>, entry: &Entry) {
|
||||||
trace!("update_entry entry");
|
trace!("update_entry entry");
|
||||||
let mut last_id_l = obj.last_id.lock().unwrap();
|
obj.acc.lock().unwrap().register_entry_id(&entry.id);
|
||||||
*last_id_l = entry.id;
|
|
||||||
obj.acc.lock().unwrap().register_entry_id(&last_id_l);
|
|
||||||
drop(last_id_l);
|
|
||||||
writeln!(
|
writeln!(
|
||||||
writer.lock().unwrap(),
|
writer.lock().unwrap(),
|
||||||
"{}",
|
"{}",
|
||||||
|
@ -226,12 +214,6 @@ impl AccountantSkel {
|
||||||
let val = self.acc.lock().unwrap().get_balance(&key);
|
let val = self.acc.lock().unwrap().get_balance(&key);
|
||||||
Some((Response::Balance { key, val }, rsp_addr))
|
Some((Response::Balance { key, val }, rsp_addr))
|
||||||
}
|
}
|
||||||
Request::GetLastId => Some((
|
|
||||||
Response::LastId {
|
|
||||||
id: *self.last_id.lock().unwrap(),
|
|
||||||
},
|
|
||||||
rsp_addr,
|
|
||||||
)),
|
|
||||||
Request::Transaction(_) => unreachable!(),
|
Request::Transaction(_) => unreachable!(),
|
||||||
Request::Subscribe { subscriptions } => {
|
Request::Subscribe { subscriptions } => {
|
||||||
for subscription in subscriptions {
|
for subscription in subscriptions {
|
||||||
|
@ -699,7 +681,7 @@ mod tests {
|
||||||
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
|
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
|
||||||
let (input, event_receiver) = sync_channel(10);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
||||||
let skel = AccountantSkel::new(acc, mint.last_id(), input, historian);
|
let skel = AccountantSkel::new(acc, input, historian);
|
||||||
|
|
||||||
// Process a batch that includes a transaction that receives two tokens.
|
// Process a batch that includes a transaction that receives two tokens.
|
||||||
let alice = KeyPair::new();
|
let alice = KeyPair::new();
|
||||||
|
@ -740,7 +722,7 @@ mod tests {
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let (input, event_receiver) = sync_channel(10);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||||
let acc_skel = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian));
|
let acc_skel = Arc::new(AccountantSkel::new(acc, input, historian));
|
||||||
let serve_addr = leader_serve.local_addr().unwrap();
|
let serve_addr = leader_serve.local_addr().unwrap();
|
||||||
let threads = AccountantSkel::serve(
|
let threads = AccountantSkel::serve(
|
||||||
&acc_skel,
|
&acc_skel,
|
||||||
|
@ -858,7 +840,7 @@ mod tests {
|
||||||
let acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let (input, event_receiver) = sync_channel(10);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||||
let acc = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian));
|
let acc = Arc::new(AccountantSkel::new(acc, input, historian));
|
||||||
let replicate_addr = target1_data.replicate_addr;
|
let replicate_addr = target1_data.replicate_addr;
|
||||||
let threads = AccountantSkel::replicate(
|
let threads = AccountantSkel::replicate(
|
||||||
&acc,
|
&acc,
|
||||||
|
@ -1007,7 +989,7 @@ mod bench {
|
||||||
|
|
||||||
let (input, event_receiver) = sync_channel(10);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
||||||
let skel = AccountantSkel::new(acc, mint.last_id(), input, historian);
|
let skel = AccountantSkel::new(acc, input, historian);
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
assert!(skel.process_packets(req_vers).is_ok());
|
assert!(skel.process_packets(req_vers).is_ok());
|
||||||
|
|
|
@ -57,9 +57,6 @@ impl AccountantStub {
|
||||||
Response::Balance { key, val } => {
|
Response::Balance { key, val } => {
|
||||||
self.balances.insert(key, val);
|
self.balances.insert(key, val);
|
||||||
}
|
}
|
||||||
Response::LastId { id } => {
|
|
||||||
self.last_id = Some(id);
|
|
||||||
}
|
|
||||||
Response::EntryInfo(entry_info) => {
|
Response::EntryInfo(entry_info) => {
|
||||||
self.last_id = Some(entry_info.id);
|
self.last_id = Some(entry_info.id);
|
||||||
self.num_events += entry_info.num_events;
|
self.num_events += entry_info.num_events;
|
||||||
|
@ -109,23 +106,9 @@ impl AccountantStub {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request the last Entry ID from the server. This method blocks
|
/// Request the last Entry ID from the server. This method blocks
|
||||||
/// until the server sends a response. At the time of this writing,
|
/// until the server sends a response.
|
||||||
/// it also has the side-effect of causing the server to log any
|
|
||||||
/// entries that have been published by the Historian.
|
|
||||||
pub fn get_last_id(&mut self) -> FutureResult<Hash, ()> {
|
pub fn get_last_id(&mut self) -> FutureResult<Hash, ()> {
|
||||||
let req = Request::GetLastId;
|
self.transaction_count();
|
||||||
let data = serialize(&req).expect("serialize GetId");
|
|
||||||
self.socket
|
|
||||||
.send_to(&data, &self.addr)
|
|
||||||
.expect("buffer error");
|
|
||||||
let mut done = false;
|
|
||||||
while !done {
|
|
||||||
let resp = self.recv_response().expect("recv response");
|
|
||||||
if let &Response::LastId { .. } = &resp {
|
|
||||||
done = true;
|
|
||||||
}
|
|
||||||
self.process_response(resp);
|
|
||||||
}
|
|
||||||
ok(self.last_id.unwrap_or(Hash::default()))
|
ok(self.last_id.unwrap_or(Hash::default()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,7 +175,7 @@ mod tests {
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let (input, event_receiver) = sync_channel(10);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||||
let acc = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian));
|
let acc = Arc::new(AccountantSkel::new(acc, input, historian));
|
||||||
let threads = AccountantSkel::serve(&acc, d, serve, gossip, exit.clone(), sink()).unwrap();
|
let threads = AccountantSkel::serve(&acc, d, serve, gossip, exit.clone(), sink()).unwrap();
|
||||||
sleep(Duration::from_millis(300));
|
sleep(Duration::from_millis(300));
|
||||||
|
|
||||||
|
|
|
@ -104,7 +104,7 @@ fn main() {
|
||||||
let (input, event_receiver) = sync_channel(10_000);
|
let (input, event_receiver) = sync_channel(10_000);
|
||||||
let historian = Historian::new(event_receiver, &last_id, Some(1000));
|
let historian = Historian::new(event_receiver, &last_id, Some(1000));
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let skel = Arc::new(AccountantSkel::new(acc, last_id, input, historian));
|
let skel = Arc::new(AccountantSkel::new(acc, input, historian));
|
||||||
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
|
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
|
||||||
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
|
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
|
||||||
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
|
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
|
||||||
|
|
|
@ -438,8 +438,8 @@ mod test {
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver,
|
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
|
||||||
PacketReceiver};
|
use streamer::{BlobReceiver, PacketReceiver};
|
||||||
|
|
||||||
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
||||||
for _t in 0..5 {
|
for _t in 0..5 {
|
||||||
|
|
Loading…
Reference in New Issue