Hoist Historian input
This commit is contained in:
parent
c5cc91443e
commit
4b9f115586
|
@ -21,7 +21,7 @@ use std::collections::VecDeque;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
use std::sync::mpsc::{channel, Receiver, Sender, SyncSender};
|
||||||
use std::sync::{Arc, Mutex, RwLock};
|
use std::sync::{Arc, Mutex, RwLock};
|
||||||
use std::thread::{spawn, JoinHandle};
|
use std::thread::{spawn, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -34,6 +34,7 @@ pub struct AccountantSkel<W: Write + Send + 'static> {
|
||||||
acc: Accountant,
|
acc: Accountant,
|
||||||
last_id: Hash,
|
last_id: Hash,
|
||||||
writer: W,
|
writer: W,
|
||||||
|
historian_input: SyncSender<Signal>,
|
||||||
historian: Historian,
|
historian: Historian,
|
||||||
entry_info_subscribers: Vec<SocketAddr>,
|
entry_info_subscribers: Vec<SocketAddr>,
|
||||||
}
|
}
|
||||||
|
@ -78,11 +79,18 @@ pub enum Response {
|
||||||
|
|
||||||
impl<W: Write + Send + 'static> AccountantSkel<W> {
|
impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
/// Create a new AccountantSkel that wraps the given Accountant.
|
/// Create a new AccountantSkel that wraps the given Accountant.
|
||||||
pub fn new(acc: Accountant, last_id: Hash, writer: W, historian: Historian) -> Self {
|
pub fn new(
|
||||||
|
acc: Accountant,
|
||||||
|
last_id: Hash,
|
||||||
|
writer: W,
|
||||||
|
historian_input: SyncSender<Signal>,
|
||||||
|
historian: Historian,
|
||||||
|
) -> Self {
|
||||||
AccountantSkel {
|
AccountantSkel {
|
||||||
acc,
|
acc,
|
||||||
last_id,
|
last_id,
|
||||||
writer,
|
writer,
|
||||||
|
historian_input,
|
||||||
historian,
|
historian,
|
||||||
entry_info_subscribers: vec![],
|
entry_info_subscribers: vec![],
|
||||||
}
|
}
|
||||||
|
@ -214,15 +222,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
// Process the transactions in parallel and then log the successful ones.
|
// Process the transactions in parallel and then log the successful ones.
|
||||||
for result in self.acc.process_verified_transactions(trs) {
|
for result in self.acc.process_verified_transactions(trs) {
|
||||||
if let Ok(tr) = result {
|
if let Ok(tr) = result {
|
||||||
self.historian
|
self.historian_input
|
||||||
.input
|
|
||||||
.send(Signal::Event(Event::Transaction(tr)))?;
|
.send(Signal::Event(Event::Transaction(tr)))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let validators know they should not attempt to process additional
|
// Let validators know they should not attempt to process additional
|
||||||
// transactions in parallel.
|
// transactions in parallel.
|
||||||
self.historian.input.send(Signal::Tick)?;
|
self.historian_input.send(Signal::Tick)?;
|
||||||
|
|
||||||
// Process the remaining requests serially.
|
// Process the remaining requests serially.
|
||||||
let rsps = reqs.into_iter()
|
let rsps = reqs.into_iter()
|
||||||
|
@ -482,6 +489,7 @@ mod tests {
|
||||||
use std::io::sink;
|
use std::io::sink;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::mpsc::sync_channel;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -530,8 +538,9 @@ mod tests {
|
||||||
let mint = Mint::new(2);
|
let mint = Mint::new(2);
|
||||||
let acc = Accountant::new(&mint);
|
let acc = Accountant::new(&mint);
|
||||||
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
|
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
|
||||||
let historian = Historian::new(&mint.last_id(), None);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian);
|
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
||||||
|
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), input, historian);
|
||||||
|
|
||||||
// Process a batch that includes a transaction that receives two tokens.
|
// Process a batch that includes a transaction that receives two tokens.
|
||||||
let alice = KeyPair::new();
|
let alice = KeyPair::new();
|
||||||
|
@ -545,8 +554,8 @@ mod tests {
|
||||||
assert!(skel.process_packets(req_vers).is_ok());
|
assert!(skel.process_packets(req_vers).is_ok());
|
||||||
|
|
||||||
// Collect the ledger and feed it to a new accountant.
|
// Collect the ledger and feed it to a new accountant.
|
||||||
skel.historian.input.send(Signal::Tick).unwrap();
|
skel.historian_input.send(Signal::Tick).unwrap();
|
||||||
drop(skel.historian.input);
|
drop(skel.historian_input);
|
||||||
let entries: Vec<Entry> = skel.historian.output.iter().collect();
|
let entries: Vec<Entry> = skel.historian.output.iter().collect();
|
||||||
|
|
||||||
// Assert the user holds one token, not two. If the server only output one
|
// Assert the user holds one token, not two. If the server only output one
|
||||||
|
@ -569,11 +578,13 @@ mod tests {
|
||||||
let acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let historian = Historian::new(&alice.last_id(), Some(30));
|
let (input, event_receiver) = sync_channel(10);
|
||||||
|
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||||
let acc = Arc::new(Mutex::new(AccountantSkel::new(
|
let acc = Arc::new(Mutex::new(AccountantSkel::new(
|
||||||
acc,
|
acc,
|
||||||
alice.last_id(),
|
alice.last_id(),
|
||||||
sink(),
|
sink(),
|
||||||
|
input,
|
||||||
historian,
|
historian,
|
||||||
)));
|
)));
|
||||||
let _threads = AccountantSkel::serve(&acc, &addr, exit.clone()).unwrap();
|
let _threads = AccountantSkel::serve(&acc, &addr, exit.clone()).unwrap();
|
||||||
|
@ -651,11 +662,13 @@ mod tests {
|
||||||
let starting_balance = 10_000;
|
let starting_balance = 10_000;
|
||||||
let alice = Mint::new(starting_balance);
|
let alice = Mint::new(starting_balance);
|
||||||
let acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let historian = Historian::new(&alice.last_id(), Some(30));
|
let (input, event_receiver) = sync_channel(10);
|
||||||
|
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||||
let acc = Arc::new(Mutex::new(AccountantSkel::new(
|
let acc = Arc::new(Mutex::new(AccountantSkel::new(
|
||||||
acc,
|
acc,
|
||||||
alice.last_id(),
|
alice.last_id(),
|
||||||
sink(),
|
sink(),
|
||||||
|
input,
|
||||||
historian,
|
historian,
|
||||||
)));
|
)));
|
||||||
|
|
||||||
|
@ -790,8 +803,9 @@ mod bench {
|
||||||
.map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8))
|
.map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let historian = Historian::new(&mint.last_id(), None);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian);
|
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
||||||
|
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), input, historian);
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
assert!(skel.process_packets(req_vers).is_ok());
|
assert!(skel.process_packets(req_vers).is_ok());
|
||||||
|
@ -800,7 +814,7 @@ mod bench {
|
||||||
let tps = txs as f64 / sec;
|
let tps = txs as f64 / sec;
|
||||||
|
|
||||||
// Ensure that all transactions were successfully logged.
|
// Ensure that all transactions were successfully logged.
|
||||||
drop(skel.historian.input);
|
drop(input);
|
||||||
let entries: Vec<Entry> = skel.historian.output.iter().collect();
|
let entries: Vec<Entry> = skel.historian.output.iter().collect();
|
||||||
assert_eq!(entries.len(), 1);
|
assert_eq!(entries.len(), 1);
|
||||||
assert_eq!(entries[0].events.len(), txs as usize);
|
assert_eq!(entries[0].events.len(), txs as usize);
|
||||||
|
|
|
@ -165,6 +165,7 @@ mod tests {
|
||||||
use signature::{KeyPair, KeyPairUtil};
|
use signature::{KeyPair, KeyPairUtil};
|
||||||
use std::io::sink;
|
use std::io::sink;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::mpsc::sync_channel;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -178,11 +179,13 @@ mod tests {
|
||||||
let acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let historian = Historian::new(&alice.last_id(), Some(30));
|
let (input, event_receiver) = sync_channel(10);
|
||||||
|
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||||
let acc = Arc::new(Mutex::new(AccountantSkel::new(
|
let acc = Arc::new(Mutex::new(AccountantSkel::new(
|
||||||
acc,
|
acc,
|
||||||
alice.last_id(),
|
alice.last_id(),
|
||||||
sink(),
|
sink(),
|
||||||
|
input,
|
||||||
historian,
|
historian,
|
||||||
)));
|
)));
|
||||||
let _threads = AccountantSkel::serve(&acc, addr, exit.clone()).unwrap();
|
let _threads = AccountantSkel::serve(&acc, addr, exit.clone()).unwrap();
|
||||||
|
|
|
@ -8,25 +8,26 @@ use solana::ledger::Block;
|
||||||
use solana::recorder::Signal;
|
use solana::recorder::Signal;
|
||||||
use solana::signature::{KeyPair, KeyPairUtil};
|
use solana::signature::{KeyPair, KeyPairUtil};
|
||||||
use solana::transaction::Transaction;
|
use solana::transaction::Transaction;
|
||||||
use std::sync::mpsc::SendError;
|
use std::sync::mpsc::{sync_channel, SendError, SyncSender};
|
||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
fn create_ledger(hist: &Historian, seed: &Hash) -> Result<(), SendError<Signal>> {
|
fn create_ledger(input: &SyncSender<Signal>, seed: &Hash) -> Result<(), SendError<Signal>> {
|
||||||
sleep(Duration::from_millis(15));
|
sleep(Duration::from_millis(15));
|
||||||
let keypair = KeyPair::new();
|
let keypair = KeyPair::new();
|
||||||
let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed);
|
let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed);
|
||||||
let signal0 = Signal::Event(Event::Transaction(tr));
|
let signal0 = Signal::Event(Event::Transaction(tr));
|
||||||
hist.input.send(signal0)?;
|
input.send(signal0)?;
|
||||||
sleep(Duration::from_millis(10));
|
sleep(Duration::from_millis(10));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let seed = Hash::default();
|
let seed = Hash::default();
|
||||||
let hist = Historian::new(&seed, Some(10));
|
let hist = Historian::new(event_receiver, &seed, Some(10));
|
||||||
create_ledger(&hist, &seed).expect("send error");
|
create_ledger(&input, &seed).expect("send error");
|
||||||
drop(hist.input);
|
drop(input);
|
||||||
let entries: Vec<Entry> = hist.output.iter().collect();
|
let entries: Vec<Entry> = hist.output.iter().collect();
|
||||||
for entry in &entries {
|
for entry in &entries {
|
||||||
println!("{:?}", entry);
|
println!("{:?}", entry);
|
||||||
|
|
|
@ -15,6 +15,7 @@ use std::env;
|
||||||
use std::io::{stdin, stdout, Read};
|
use std::io::{stdin, stdout, Read};
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::sync::mpsc::sync_channel;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
fn print_usage(program: &str, opts: Options) {
|
fn print_usage(program: &str, opts: Options) {
|
||||||
|
@ -95,12 +96,14 @@ fn main() {
|
||||||
acc.register_entry_id(&last_id);
|
acc.register_entry_id(&last_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
let historian = Historian::new(&last_id, Some(1000));
|
let (input, event_receiver) = sync_channel(10_000);
|
||||||
|
let historian = Historian::new(event_receiver, &last_id, Some(1000));
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let skel = Arc::new(Mutex::new(AccountantSkel::new(
|
let skel = Arc::new(Mutex::new(AccountantSkel::new(
|
||||||
acc,
|
acc,
|
||||||
last_id,
|
last_id,
|
||||||
stdout(),
|
stdout(),
|
||||||
|
input,
|
||||||
historian,
|
historian,
|
||||||
)));
|
)));
|
||||||
let threads = AccountantSkel::serve(&skel, &addr, exit.clone()).unwrap();
|
let threads = AccountantSkel::serve(&skel, &addr, exit.clone()).unwrap();
|
||||||
|
|
|
@ -9,22 +9,20 @@ use std::thread::{spawn, JoinHandle};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
pub struct Historian {
|
pub struct Historian {
|
||||||
pub input: SyncSender<Signal>,
|
|
||||||
pub output: Receiver<Entry>,
|
pub output: Receiver<Entry>,
|
||||||
pub thread_hdl: JoinHandle<ExitReason>,
|
pub thread_hdl: JoinHandle<ExitReason>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Historian {
|
impl Historian {
|
||||||
pub fn new(start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
|
pub fn new(
|
||||||
let (input, event_receiver) = sync_channel(10_000);
|
event_receiver: Receiver<Signal>,
|
||||||
|
start_hash: &Hash,
|
||||||
|
ms_per_tick: Option<u64>,
|
||||||
|
) -> Self {
|
||||||
let (entry_sender, output) = sync_channel(10_000);
|
let (entry_sender, output) = sync_channel(10_000);
|
||||||
let thread_hdl =
|
let thread_hdl =
|
||||||
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender);
|
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender);
|
||||||
Historian {
|
Historian { output, thread_hdl }
|
||||||
input,
|
|
||||||
output,
|
|
||||||
thread_hdl,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A background thread that will continue tagging received Event messages and
|
/// A background thread that will continue tagging received Event messages and
|
||||||
|
@ -59,14 +57,15 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_historian() {
|
fn test_historian() {
|
||||||
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let zero = Hash::default();
|
let zero = Hash::default();
|
||||||
let hist = Historian::new(&zero, None);
|
let hist = Historian::new(event_receiver, &zero, None);
|
||||||
|
|
||||||
hist.input.send(Signal::Tick).unwrap();
|
input.send(Signal::Tick).unwrap();
|
||||||
sleep(Duration::new(0, 1_000_000));
|
sleep(Duration::new(0, 1_000_000));
|
||||||
hist.input.send(Signal::Tick).unwrap();
|
input.send(Signal::Tick).unwrap();
|
||||||
sleep(Duration::new(0, 1_000_000));
|
sleep(Duration::new(0, 1_000_000));
|
||||||
hist.input.send(Signal::Tick).unwrap();
|
input.send(Signal::Tick).unwrap();
|
||||||
|
|
||||||
let entry0 = hist.output.recv().unwrap();
|
let entry0 = hist.output.recv().unwrap();
|
||||||
let entry1 = hist.output.recv().unwrap();
|
let entry1 = hist.output.recv().unwrap();
|
||||||
|
@ -76,7 +75,7 @@ mod tests {
|
||||||
assert_eq!(entry1.num_hashes, 0);
|
assert_eq!(entry1.num_hashes, 0);
|
||||||
assert_eq!(entry2.num_hashes, 0);
|
assert_eq!(entry2.num_hashes, 0);
|
||||||
|
|
||||||
drop(hist.input);
|
drop(input);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
hist.thread_hdl.join().unwrap(),
|
hist.thread_hdl.join().unwrap(),
|
||||||
ExitReason::RecvDisconnected
|
ExitReason::RecvDisconnected
|
||||||
|
@ -87,10 +86,11 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_historian_closed_sender() {
|
fn test_historian_closed_sender() {
|
||||||
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let zero = Hash::default();
|
let zero = Hash::default();
|
||||||
let hist = Historian::new(&zero, None);
|
let hist = Historian::new(event_receiver, &zero, None);
|
||||||
drop(hist.output);
|
drop(hist.output);
|
||||||
hist.input.send(Signal::Tick).unwrap();
|
input.send(Signal::Tick).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
hist.thread_hdl.join().unwrap(),
|
hist.thread_hdl.join().unwrap(),
|
||||||
ExitReason::SendDisconnected
|
ExitReason::SendDisconnected
|
||||||
|
@ -99,11 +99,12 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_ticking_historian() {
|
fn test_ticking_historian() {
|
||||||
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let zero = Hash::default();
|
let zero = Hash::default();
|
||||||
let hist = Historian::new(&zero, Some(20));
|
let hist = Historian::new(event_receiver, &zero, Some(20));
|
||||||
sleep(Duration::from_millis(300));
|
sleep(Duration::from_millis(300));
|
||||||
hist.input.send(Signal::Tick).unwrap();
|
input.send(Signal::Tick).unwrap();
|
||||||
drop(hist.input);
|
drop(input);
|
||||||
let entries: Vec<Entry> = hist.output.iter().collect();
|
let entries: Vec<Entry> = hist.output.iter().collect();
|
||||||
assert!(entries.len() > 1);
|
assert!(entries.len() > 1);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue