Merge pull request #211 from garious/add-tx-count
Drop EntryInfo subscriptions
This commit is contained in:
commit
500aaed48e
|
@ -17,7 +17,7 @@ use std::collections::hash_map::Entry::Occupied;
|
||||||
use std::collections::{HashMap, HashSet, VecDeque};
|
use std::collections::{HashMap, HashSet, VecDeque};
|
||||||
use std::result;
|
use std::result;
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use std::sync::atomic::{AtomicIsize, Ordering};
|
use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering};
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
pub const MAX_ENTRY_IDS: usize = 1024 * 4;
|
pub const MAX_ENTRY_IDS: usize = 1024 * 4;
|
||||||
|
@ -59,6 +59,7 @@ pub struct Accountant {
|
||||||
last_ids: RwLock<VecDeque<(Hash, RwLock<HashSet<Signature>>)>>,
|
last_ids: RwLock<VecDeque<(Hash, RwLock<HashSet<Signature>>)>>,
|
||||||
time_sources: RwLock<HashSet<PublicKey>>,
|
time_sources: RwLock<HashSet<PublicKey>>,
|
||||||
last_time: RwLock<DateTime<Utc>>,
|
last_time: RwLock<DateTime<Utc>>,
|
||||||
|
transaction_count: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Accountant {
|
impl Accountant {
|
||||||
|
@ -72,6 +73,7 @@ impl Accountant {
|
||||||
last_ids: RwLock::new(VecDeque::new()),
|
last_ids: RwLock::new(VecDeque::new()),
|
||||||
time_sources: RwLock::new(HashSet::new()),
|
time_sources: RwLock::new(HashSet::new()),
|
||||||
last_time: RwLock::new(Utc.timestamp(0, 0)),
|
last_time: RwLock::new(Utc.timestamp(0, 0)),
|
||||||
|
transaction_count: AtomicUsize::new(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,7 +190,10 @@ impl Accountant {
|
||||||
);
|
);
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(_) => return Ok(()),
|
Ok(_) => {
|
||||||
|
self.transaction_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
Err(_) => continue,
|
Err(_) => continue,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -387,6 +392,10 @@ impl Accountant {
|
||||||
.expect("'balances' read lock in get_balance");
|
.expect("'balances' read lock in get_balance");
|
||||||
bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64)
|
bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn transaction_count(&self) -> usize {
|
||||||
|
self.transaction_count.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -412,6 +421,7 @@ mod tests {
|
||||||
.transfer(500, &alice.keypair(), bob_pubkey, alice.last_id())
|
.transfer(500, &alice.keypair(), bob_pubkey, alice.last_id())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_500);
|
assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_500);
|
||||||
|
assert_eq!(accountant.transaction_count(), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -422,6 +432,7 @@ mod tests {
|
||||||
accountant.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()),
|
accountant.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()),
|
||||||
Err(AccountingError::AccountNotFound)
|
Err(AccountingError::AccountNotFound)
|
||||||
);
|
);
|
||||||
|
assert_eq!(accountant.transaction_count(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -432,10 +443,12 @@ mod tests {
|
||||||
accountant
|
accountant
|
||||||
.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
assert_eq!(accountant.transaction_count(), 1);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
accountant.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()),
|
accountant.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()),
|
||||||
Err(AccountingError::InsufficientFunds)
|
Err(AccountingError::InsufficientFunds)
|
||||||
);
|
);
|
||||||
|
assert_eq!(accountant.transaction_count(), 1);
|
||||||
|
|
||||||
let alice_pubkey = alice.keypair().pubkey();
|
let alice_pubkey = alice.keypair().pubkey();
|
||||||
assert_eq!(accountant.get_balance(&alice_pubkey).unwrap(), 10_000);
|
assert_eq!(accountant.get_balance(&alice_pubkey).unwrap(), 10_000);
|
||||||
|
@ -468,6 +481,9 @@ mod tests {
|
||||||
// Alice's balance will be zero because all funds are locked up.
|
// Alice's balance will be zero because all funds are locked up.
|
||||||
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0));
|
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0));
|
||||||
|
|
||||||
|
// tx count is 1, because debits were applied.
|
||||||
|
assert_eq!(accountant.transaction_count(), 1);
|
||||||
|
|
||||||
// Bob's balance will be None because the funds have not been
|
// Bob's balance will be None because the funds have not been
|
||||||
// sent.
|
// sent.
|
||||||
assert_eq!(accountant.get_balance(&bob_pubkey), None);
|
assert_eq!(accountant.get_balance(&bob_pubkey), None);
|
||||||
|
@ -479,6 +495,10 @@ mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(accountant.get_balance(&bob_pubkey), Some(1));
|
assert_eq!(accountant.get_balance(&bob_pubkey), Some(1));
|
||||||
|
|
||||||
|
// tx count is still 1, because we chose not to count timestamp events
|
||||||
|
// tx count.
|
||||||
|
assert_eq!(accountant.transaction_count(), 1);
|
||||||
|
|
||||||
accountant
|
accountant
|
||||||
.process_verified_timestamp(alice.pubkey(), dt)
|
.process_verified_timestamp(alice.pubkey(), dt)
|
||||||
.unwrap(); // <-- Attack! Attempt to process completed transaction.
|
.unwrap(); // <-- Attack! Attempt to process completed transaction.
|
||||||
|
@ -516,6 +536,9 @@ mod tests {
|
||||||
.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
|
.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
// Assert the debit counts as a transaction.
|
||||||
|
assert_eq!(accountant.transaction_count(), 1);
|
||||||
|
|
||||||
// Alice's balance will be zero because all funds are locked up.
|
// Alice's balance will be zero because all funds are locked up.
|
||||||
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0));
|
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0));
|
||||||
|
|
||||||
|
@ -530,6 +553,9 @@ mod tests {
|
||||||
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1));
|
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1));
|
||||||
assert_eq!(accountant.get_balance(&bob_pubkey), None);
|
assert_eq!(accountant.get_balance(&bob_pubkey), None);
|
||||||
|
|
||||||
|
// Assert cancel doesn't cause count to go backward.
|
||||||
|
assert_eq!(accountant.transaction_count(), 1);
|
||||||
|
|
||||||
accountant
|
accountant
|
||||||
.process_verified_sig(alice.pubkey(), sig)
|
.process_verified_sig(alice.pubkey(), sig)
|
||||||
.unwrap(); // <-- Attack! Attempt to cancel completed transaction.
|
.unwrap(); // <-- Attack! Attempt to cancel completed transaction.
|
||||||
|
@ -576,7 +602,11 @@ mod tests {
|
||||||
let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
||||||
let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
||||||
let trs = vec![tr0, tr1];
|
let trs = vec![tr0, tr1];
|
||||||
assert!(accountant.process_verified_transactions(trs)[1].is_err());
|
let results = accountant.process_verified_transactions(trs);
|
||||||
|
assert!(results[1].is_err());
|
||||||
|
|
||||||
|
// Assert bad transactions aren't counted.
|
||||||
|
assert_eq!(accountant.transaction_count(), 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,6 @@ use accountant::Accountant;
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
use ledger;
|
use ledger;
|
||||||
use packet;
|
use packet;
|
||||||
use request_processor::RequestProcessor;
|
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
@ -17,16 +16,12 @@ use streamer;
|
||||||
|
|
||||||
pub struct EntryWriter<'a> {
|
pub struct EntryWriter<'a> {
|
||||||
accountant: &'a Accountant,
|
accountant: &'a Accountant,
|
||||||
request_processor: &'a RequestProcessor,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> EntryWriter<'a> {
|
impl<'a> EntryWriter<'a> {
|
||||||
/// Create a new Tpu that wraps the given Accountant.
|
/// Create a new Tpu that wraps the given Accountant.
|
||||||
pub fn new(accountant: &'a Accountant, request_processor: &'a RequestProcessor) -> Self {
|
pub fn new(accountant: &'a Accountant) -> Self {
|
||||||
EntryWriter {
|
EntryWriter { accountant }
|
||||||
accountant,
|
|
||||||
request_processor,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
|
fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
|
||||||
|
@ -37,7 +32,6 @@ impl<'a> EntryWriter<'a> {
|
||||||
"{}",
|
"{}",
|
||||||
serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry")
|
serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry")
|
||||||
).expect("writeln! in fn write_entry");
|
).expect("writeln! in fn write_entry");
|
||||||
self.request_processor.notify_entry_info_subscribers(&entry);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_entries<W: Write>(
|
fn write_entries<W: Write>(
|
||||||
|
|
|
@ -12,19 +12,8 @@ use transaction::Transaction;
|
||||||
pub enum Request {
|
pub enum Request {
|
||||||
Transaction(Transaction),
|
Transaction(Transaction),
|
||||||
GetBalance { key: PublicKey },
|
GetBalance { key: PublicKey },
|
||||||
Subscribe { subscriptions: Vec<Subscription> },
|
GetLastId,
|
||||||
}
|
GetTransactionCount,
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
||||||
pub enum Subscription {
|
|
||||||
EntryInfo,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
||||||
pub struct EntryInfo {
|
|
||||||
pub id: Hash,
|
|
||||||
pub num_hashes: u64,
|
|
||||||
pub num_events: u64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Request {
|
impl Request {
|
||||||
|
@ -40,7 +29,8 @@ impl Request {
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub enum Response {
|
pub enum Response {
|
||||||
Balance { key: PublicKey, val: Option<i64> },
|
Balance { key: PublicKey, val: Option<i64> },
|
||||||
EntryInfo(EntryInfo),
|
LastId { id: Hash },
|
||||||
|
TransactionCount { transaction_count: u64 },
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> {
|
pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> {
|
||||||
|
|
|
@ -8,12 +8,12 @@ use event_processor::EventProcessor;
|
||||||
use packet;
|
use packet;
|
||||||
use packet::SharedPackets;
|
use packet::SharedPackets;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use request::{EntryInfo, Request, Response, Subscription};
|
use request::{Request, Response};
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::sync::mpsc::{Receiver, Sender};
|
use std::sync::mpsc::{Receiver, Sender};
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use streamer;
|
use streamer;
|
||||||
|
@ -21,16 +21,12 @@ use timing;
|
||||||
|
|
||||||
pub struct RequestProcessor {
|
pub struct RequestProcessor {
|
||||||
accountant: Arc<Accountant>,
|
accountant: Arc<Accountant>,
|
||||||
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RequestProcessor {
|
impl RequestProcessor {
|
||||||
/// Create a new Tpu that wraps the given Accountant.
|
/// Create a new Tpu that wraps the given Accountant.
|
||||||
pub fn new(accountant: Arc<Accountant>) -> Self {
|
pub fn new(accountant: Arc<Accountant>) -> Self {
|
||||||
RequestProcessor {
|
RequestProcessor { accountant }
|
||||||
accountant,
|
|
||||||
entry_info_subscribers: Mutex::new(vec![]),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process Request items sent by clients.
|
/// Process Request items sent by clients.
|
||||||
|
@ -46,17 +42,19 @@ impl RequestProcessor {
|
||||||
info!("Response::Balance {:?}", rsp);
|
info!("Response::Balance {:?}", rsp);
|
||||||
Some(rsp)
|
Some(rsp)
|
||||||
}
|
}
|
||||||
|
Request::GetLastId => {
|
||||||
|
let id = self.accountant.last_id();
|
||||||
|
let rsp = (Response::LastId { id }, rsp_addr);
|
||||||
|
info!("Response::LastId {:?}", rsp);
|
||||||
|
Some(rsp)
|
||||||
|
}
|
||||||
|
Request::GetTransactionCount => {
|
||||||
|
let transaction_count = self.accountant.transaction_count() as u64;
|
||||||
|
let rsp = (Response::TransactionCount { transaction_count }, rsp_addr);
|
||||||
|
info!("Response::TransactionCount {:?}", rsp);
|
||||||
|
Some(rsp)
|
||||||
|
}
|
||||||
Request::Transaction(_) => unreachable!(),
|
Request::Transaction(_) => unreachable!(),
|
||||||
Request::Subscribe { subscriptions } => {
|
|
||||||
for subscription in subscriptions {
|
|
||||||
match subscription {
|
|
||||||
Subscription::EntryInfo => {
|
|
||||||
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,29 +67,6 @@ impl RequestProcessor {
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
|
|
||||||
// TODO: No need to bind().
|
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
|
|
||||||
|
|
||||||
// copy subscribers to avoid taking lock while doing io
|
|
||||||
let addrs = self.entry_info_subscribers.lock().unwrap().clone();
|
|
||||||
trace!("Sending to {} addrs", addrs.len());
|
|
||||||
for addr in addrs {
|
|
||||||
let entry_info = EntryInfo {
|
|
||||||
id: entry.id,
|
|
||||||
num_hashes: entry.num_hashes,
|
|
||||||
num_events: entry.events.len() as u64,
|
|
||||||
};
|
|
||||||
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
|
|
||||||
trace!("sending {} to {}", data.len(), addr);
|
|
||||||
//TODO dont do IO here, this needs to be on a separate channel
|
|
||||||
let res = socket.send_to(&data, addr);
|
|
||||||
if res.is_err() {
|
|
||||||
eprintln!("couldn't send response: {:?}", res);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
|
fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
|
||||||
p.packets
|
p.packets
|
||||||
.par_iter()
|
.par_iter()
|
||||||
|
|
|
@ -33,7 +33,6 @@ impl Rpu {
|
||||||
|
|
||||||
fn write_service<W: Write + Send + 'static>(
|
fn write_service<W: Write + Send + 'static>(
|
||||||
accountant: Arc<Accountant>,
|
accountant: Arc<Accountant>,
|
||||||
request_processor: Arc<RequestProcessor>,
|
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
broadcast: streamer::BlobSender,
|
broadcast: streamer::BlobSender,
|
||||||
blob_recycler: packet::BlobRecycler,
|
blob_recycler: packet::BlobRecycler,
|
||||||
|
@ -41,7 +40,7 @@ impl Rpu {
|
||||||
entry_receiver: Receiver<Entry>,
|
entry_receiver: Receiver<Entry>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
spawn(move || loop {
|
spawn(move || loop {
|
||||||
let entry_writer = EntryWriter::new(&accountant, &request_processor);
|
let entry_writer = EntryWriter::new(&accountant);
|
||||||
let _ = entry_writer.write_and_send_entries(
|
let _ = entry_writer.write_and_send_entries(
|
||||||
&broadcast,
|
&broadcast,
|
||||||
&blob_recycler,
|
&blob_recycler,
|
||||||
|
@ -99,7 +98,6 @@ impl Rpu {
|
||||||
let (broadcast_sender, broadcast_receiver) = channel();
|
let (broadcast_sender, broadcast_receiver) = channel();
|
||||||
let t_write = Self::write_service(
|
let t_write = Self::write_service(
|
||||||
self.event_processor.accountant.clone(),
|
self.event_processor.accountant.clone(),
|
||||||
request_stage.request_processor.clone(),
|
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
broadcast_sender,
|
broadcast_sender,
|
||||||
blob_recycler.clone(),
|
blob_recycler.clone(),
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
use bincode::{deserialize, serialize};
|
use bincode::{deserialize, serialize};
|
||||||
use futures::future::{ok, FutureResult};
|
use futures::future::{ok, FutureResult};
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use request::{Request, Response, Subscription};
|
use request::{Request, Response};
|
||||||
use signature::{KeyPair, PublicKey, Signature};
|
use signature::{KeyPair, PublicKey, Signature};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
@ -18,7 +18,7 @@ pub struct ThinClient {
|
||||||
pub requests_socket: UdpSocket,
|
pub requests_socket: UdpSocket,
|
||||||
pub events_socket: UdpSocket,
|
pub events_socket: UdpSocket,
|
||||||
last_id: Option<Hash>,
|
last_id: Option<Hash>,
|
||||||
num_events: u64,
|
transaction_count: u64,
|
||||||
balances: HashMap<PublicKey, Option<i64>>,
|
balances: HashMap<PublicKey, Option<i64>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,21 +32,12 @@ impl ThinClient {
|
||||||
requests_socket,
|
requests_socket,
|
||||||
events_socket,
|
events_socket,
|
||||||
last_id: None,
|
last_id: None,
|
||||||
num_events: 0,
|
transaction_count: 0,
|
||||||
balances: HashMap::new(),
|
balances: HashMap::new(),
|
||||||
};
|
};
|
||||||
client.init();
|
|
||||||
client
|
client
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn init(&self) {
|
|
||||||
let subscriptions = vec![Subscription::EntryInfo];
|
|
||||||
let req = Request::Subscribe { subscriptions };
|
|
||||||
let data = serialize(&req).expect("serialize Subscribe in thin_client");
|
|
||||||
trace!("subscribing to {}", self.addr);
|
|
||||||
let _res = self.requests_socket.send_to(&data, &self.addr);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn recv_response(&self) -> io::Result<Response> {
|
pub fn recv_response(&self) -> io::Result<Response> {
|
||||||
let mut buf = vec![0u8; 1024];
|
let mut buf = vec![0u8; 1024];
|
||||||
info!("start recv_from");
|
info!("start recv_from");
|
||||||
|
@ -62,10 +53,13 @@ impl ThinClient {
|
||||||
info!("Response balance {:?} {:?}", key, val);
|
info!("Response balance {:?} {:?}", key, val);
|
||||||
self.balances.insert(key, val);
|
self.balances.insert(key, val);
|
||||||
}
|
}
|
||||||
Response::EntryInfo(entry_info) => {
|
Response::LastId { id } => {
|
||||||
trace!("Response entry_info {:?}", entry_info.id);
|
info!("Response last_id {:?}", id);
|
||||||
self.last_id = Some(entry_info.id);
|
self.last_id = Some(id);
|
||||||
self.num_events += entry_info.num_events;
|
}
|
||||||
|
Response::TransactionCount { transaction_count } => {
|
||||||
|
info!("Response transaction count {:?}", transaction_count);
|
||||||
|
self.transaction_count = transaction_count;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -113,41 +107,47 @@ impl ThinClient {
|
||||||
self.balances[pubkey].ok_or(io::Error::new(io::ErrorKind::Other, "nokey"))
|
self.balances[pubkey].ok_or(io::Error::new(io::ErrorKind::Other, "nokey"))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request the last Entry ID from the server. This method blocks
|
/// Request the transaction count. If the response packet is dropped by the network,
|
||||||
/// until the server sends a response.
|
/// this method will hang.
|
||||||
pub fn get_last_id(&mut self) -> FutureResult<Hash, ()> {
|
|
||||||
self.transaction_count();
|
|
||||||
ok(self.last_id.unwrap_or(Hash::default()))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return the number of transactions the server processed since creating
|
|
||||||
/// this client instance.
|
|
||||||
pub fn transaction_count(&mut self) -> u64 {
|
pub fn transaction_count(&mut self) -> u64 {
|
||||||
// Wait for at least one EntryInfo.
|
info!("transaction_count");
|
||||||
|
let req = Request::GetTransactionCount;
|
||||||
|
let data =
|
||||||
|
serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count");
|
||||||
|
self.requests_socket
|
||||||
|
.send_to(&data, &self.addr)
|
||||||
|
.expect("buffer error in pub fn transaction_count");
|
||||||
let mut done = false;
|
let mut done = false;
|
||||||
while !done {
|
while !done {
|
||||||
let resp = self.recv_response()
|
let resp = self.recv_response().expect("transaction count dropped");
|
||||||
.expect("recv_response in pub fn transaction_count");
|
info!("recv_response {:?}", resp);
|
||||||
if let &Response::EntryInfo(_) = &resp {
|
if let &Response::TransactionCount { .. } = &resp {
|
||||||
done = true;
|
done = true;
|
||||||
}
|
}
|
||||||
self.process_response(resp);
|
self.process_response(resp);
|
||||||
}
|
}
|
||||||
|
self.transaction_count
|
||||||
|
}
|
||||||
|
|
||||||
// Then take the rest.
|
/// Request the last Entry ID from the server. This method blocks
|
||||||
|
/// until the server sends a response.
|
||||||
|
pub fn get_last_id(&mut self) -> FutureResult<Hash, ()> {
|
||||||
|
info!("get_last_id");
|
||||||
|
let req = Request::GetLastId;
|
||||||
|
let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id");
|
||||||
self.requests_socket
|
self.requests_socket
|
||||||
.set_nonblocking(true)
|
.send_to(&data, &self.addr)
|
||||||
.expect("set_nonblocking in pub fn transaction_count");
|
.expect("buffer error in pub fn get_last_id");
|
||||||
loop {
|
let mut done = false;
|
||||||
match self.recv_response() {
|
while !done {
|
||||||
Err(_) => break,
|
let resp = self.recv_response().expect("get_last_id response");
|
||||||
Ok(resp) => self.process_response(resp),
|
info!("recv_response {:?}", resp);
|
||||||
|
if let &Response::LastId { .. } = &resp {
|
||||||
|
done = true;
|
||||||
}
|
}
|
||||||
|
self.process_response(resp);
|
||||||
}
|
}
|
||||||
self.requests_socket
|
ok(self.last_id.expect("some last_id"))
|
||||||
.set_nonblocking(false)
|
|
||||||
.expect("set_nonblocking in pub fn transaction_count");
|
|
||||||
self.num_events
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,12 +34,11 @@ impl Tvu {
|
||||||
|
|
||||||
fn drain_service(
|
fn drain_service(
|
||||||
accountant: Arc<Accountant>,
|
accountant: Arc<Accountant>,
|
||||||
request_processor: Arc<RequestProcessor>,
|
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
entry_receiver: Receiver<Entry>,
|
entry_receiver: Receiver<Entry>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
spawn(move || {
|
spawn(move || {
|
||||||
let entry_writer = EntryWriter::new(&accountant, &request_processor);
|
let entry_writer = EntryWriter::new(&accountant);
|
||||||
loop {
|
loop {
|
||||||
let _ = entry_writer.drain_entries(&entry_receiver);
|
let _ = entry_writer.drain_entries(&entry_receiver);
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
@ -183,7 +182,6 @@ impl Tvu {
|
||||||
|
|
||||||
let t_write = Self::drain_service(
|
let t_write = Self::drain_service(
|
||||||
obj.event_processor.accountant.clone(),
|
obj.event_processor.accountant.clone(),
|
||||||
request_stage.request_processor.clone(),
|
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
request_stage.entry_receiver,
|
request_stage.entry_receiver,
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue