Add thin_client_service

This commit is contained in:
Greg Fitzgerald 2018-05-09 14:56:34 -06:00
parent bc824c1a6c
commit 43cd631579
6 changed files with 144 additions and 112 deletions

View File

@ -1,18 +1,14 @@
//! The `accounting_stage` module implements the accounting stage of the TPU.
use accountant::Accountant;
use bincode::serialize;
use entry::Entry;
use event::Event;
use hash::Hash;
use historian::Historian;
use recorder::Signal;
use result::Result;
use signature::PublicKey;
use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use transaction::Transaction;
pub struct AccountingStage {
pub output: Mutex<Receiver<Entry>>,
@ -20,7 +16,6 @@ pub struct AccountingStage {
pub acc: Arc<Accountant>,
historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>,
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
}
impl AccountingStage {
@ -33,7 +28,6 @@ impl AccountingStage {
output: Mutex::new(output),
entry_sender: Mutex::new(entry_sender),
acc: Arc::new(acc),
entry_info_subscribers: Mutex::new(vec![]),
historian_input: Mutex::new(historian_input),
historian: Mutex::new(historian),
}
@ -51,105 +45,8 @@ impl AccountingStage {
let entry = historian.output.lock().unwrap().recv()?;
self.acc.register_entry_id(&entry.id);
self.entry_sender.lock().unwrap().send(entry)?;
debug!("after historian_input");
Ok(())
}
/// Process Request items sent by clients.
fn process_request(
&self,
msg: Request,
rsp_addr: SocketAddr,
) -> Option<(Response, SocketAddr)> {
match msg {
Request::GetBalance { key } => {
let val = self.acc.get_balance(&key);
let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
for subscription in subscriptions {
match subscription {
Subscription::EntryInfo => {
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
}
}
}
None
}
}
}
pub fn process_requests(
&self,
reqs: Vec<(Request, SocketAddr)>,
) -> Vec<(Response, SocketAddr)> {
reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect()
}
pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
// TODO: No need to bind().
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
// copy subscribers to avoid taking lock while doing io
let addrs = self.entry_info_subscribers.lock().unwrap().clone();
trace!("Sending to {} addrs", addrs.len());
for addr in addrs {
let entry_info = EntryInfo {
id: entry.id,
num_hashes: entry.num_hashes,
num_events: entry.events.len() as u64,
};
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
trace!("sending {} to {}", data.len(), addr);
//TODO dont do IO here, this needs to be on a separate channel
let res = socket.send_to(&data, addr);
if res.is_err() {
eprintln!("couldn't send response: {:?}", res);
}
}
}
}
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Request {
Transaction(Transaction),
GetBalance { key: PublicKey },
Subscribe { subscriptions: Vec<Subscription> },
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Subscription {
EntryInfo,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EntryInfo {
pub id: Hash,
pub num_hashes: u64,
pub num_events: u64,
}
impl Request {
/// Verify the request is valid.
pub fn verify(&self) -> bool {
match *self {
Request::Transaction(ref tr) => tr.verify_plan(),
_ => true,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
Balance { key: PublicKey, val: Option<i64> },
EntryInfo(EntryInfo),
}
#[cfg(test)]

View File

@ -130,11 +130,11 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
#[cfg(test)]
mod tests {
use accounting_stage::Request;
use bincode::serialize;
use ecdsa;
use packet::{Packet, Packets, SharedPackets};
use std::sync::RwLock;
use thin_client_service::Request;
use transaction::test_tx;
use transaction::Transaction;

View File

@ -19,6 +19,7 @@ pub mod result;
pub mod signature;
pub mod streamer;
pub mod thin_client;
pub mod thin_client_service;
pub mod timing;
pub mod tpu;
pub mod transaction;

View File

@ -3,7 +3,6 @@
//! messages to the network directly. The binary encoding of its messages are
//! unstable and may change in future releases.
use accounting_stage::{Request, Response, Subscription};
use bincode::{deserialize, serialize};
use futures::future::{ok, FutureResult};
use hash::Hash;
@ -11,6 +10,7 @@ use signature::{KeyPair, PublicKey, Signature};
use std::collections::HashMap;
use std::io;
use std::net::{SocketAddr, UdpSocket};
use thin_client_service::{Request, Response, Subscription};
use transaction::Transaction;
pub struct ThinClient {

127
src/thin_client_service.rs Normal file
View File

@ -0,0 +1,127 @@
//! The `thin_client_service` sits alongside the TPU and queries it for information
//! on behalf of thing clients.
use accountant::Accountant;
use bincode::serialize;
use entry::Entry;
use hash::Hash;
use signature::PublicKey;
use std::net::{SocketAddr, UdpSocket};
//use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use transaction::Transaction;
pub struct ThinClientService {
//pub output: Mutex<Receiver<Response>>,
//response_sender: Mutex<Sender<Response>>,
pub acc: Arc<Accountant>,
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
}
impl ThinClientService {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(acc: Arc<Accountant>) -> Self {
//let (response_sender, output) = channel();
ThinClientService {
//output: Mutex::new(output),
//response_sender: Mutex::new(response_sender),
acc,
entry_info_subscribers: Mutex::new(vec![]),
}
}
/// Process Request items sent by clients.
fn process_request(
&self,
msg: Request,
rsp_addr: SocketAddr,
) -> Option<(Response, SocketAddr)> {
match msg {
Request::GetBalance { key } => {
let val = self.acc.get_balance(&key);
let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
for subscription in subscriptions {
match subscription {
Subscription::EntryInfo => {
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
}
}
}
None
}
}
}
pub fn process_requests(
&self,
reqs: Vec<(Request, SocketAddr)>,
) -> Vec<(Response, SocketAddr)> {
reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect()
}
pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
// TODO: No need to bind().
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
// copy subscribers to avoid taking lock while doing io
let addrs = self.entry_info_subscribers.lock().unwrap().clone();
trace!("Sending to {} addrs", addrs.len());
for addr in addrs {
let entry_info = EntryInfo {
id: entry.id,
num_hashes: entry.num_hashes,
num_events: entry.events.len() as u64,
};
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
trace!("sending {} to {}", data.len(), addr);
//TODO dont do IO here, this needs to be on a separate channel
let res = socket.send_to(&data, addr);
if res.is_err() {
eprintln!("couldn't send response: {:?}", res);
}
}
}
}
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Request {
Transaction(Transaction),
GetBalance { key: PublicKey },
Subscribe { subscriptions: Vec<Subscription> },
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Subscription {
EntryInfo,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EntryInfo {
pub id: Hash,
pub num_hashes: u64,
pub num_events: u64,
}
impl Request {
/// Verify the request is valid.
pub fn verify(&self) -> bool {
match *self {
Request::Transaction(ref tr) => tr.verify_plan(),
_ => true,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
Balance { key: PublicKey, val: Option<i64> },
EntryInfo(EntryInfo),
}

View File

@ -2,7 +2,7 @@
//! 5-stage transaction processing pipeline in software.
use accountant::Accountant;
use accounting_stage::{AccountingStage, Request, Response};
use accounting_stage::AccountingStage;
use bincode::{deserialize, serialize, serialize_into};
use crdt::{Crdt, ReplicatedData};
use ecdsa;
@ -26,10 +26,12 @@ use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use std::time::Instant;
use streamer;
use thin_client_service::{Request, Response, ThinClientService};
use timing;
pub struct Tpu {
accounting: AccountingStage,
thin_client_service: ThinClientService,
}
type SharedTpu = Arc<Tpu>;
@ -37,7 +39,11 @@ type SharedTpu = Arc<Tpu>;
impl Tpu {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(accounting: AccountingStage) -> Self {
Tpu { accounting }
let thin_client_service = ThinClientService::new(accounting.acc.clone());
Tpu {
accounting,
thin_client_service,
}
}
fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) {
@ -48,7 +54,8 @@ impl Tpu {
"{}",
serde_json::to_string(&entry).unwrap()
).unwrap();
obj.accounting.notify_entry_info_subscribers(&entry);
obj.thin_client_service
.notify_entry_info_subscribers(&entry);
}
fn receive_all<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
@ -335,7 +342,7 @@ impl Tpu {
debug!("done process_events");
debug!("process_requests");
let rsps = obj.accounting.process_requests(reqs);
let rsps = obj.thin_client_service.process_requests(reqs);
debug!("done process_requests");
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
@ -680,18 +687,18 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
#[cfg(test)]
mod tests {
use bincode::serialize;
use ecdsa;
use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS};
use accountant::Accountant;
use accounting_stage::AccountingStage;
use bincode::serialize;
use chrono::prelude::*;
use crdt::Crdt;
use ecdsa;
use entry;
use event::Event;
use hash::{hash, Hash};
use logger;
use mint::Mint;
use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS};
use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};