Merge pull request #188 from garious/add-tpu

AccountantSkel -> Tpu
This commit is contained in:
Greg Fitzgerald 2018-05-08 19:50:58 -06:00 committed by GitHub
commit 3236be7877
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 95 additions and 97 deletions

View File

@ -10,9 +10,9 @@ use futures::Future;
use getopts::Options;
use isatty::stdin_isatty;
use rayon::prelude::*;
use solana::accountant_stub::AccountantStub;
use solana::mint::MintDemo;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::thin_client::ThinClient;
use solana::transaction::Transaction;
use std::env;
use std::io::{stdin, Read};
@ -87,7 +87,7 @@ fn main() {
println!("Binding to {}", client_addr);
let socket = UdpSocket::bind(&client_addr).unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
let mut acc = AccountantStub::new(addr.parse().unwrap(), socket);
let mut acc = ThinClient::new(addr.parse().unwrap(), socket);
println!("Get last ID...");
let last_id = acc.get_last_id().wait().unwrap();
@ -129,7 +129,7 @@ fn main() {
let mut client_addr: SocketAddr = client_addr.parse().unwrap();
client_addr.set_port(0);
let socket = UdpSocket::bind(client_addr).unwrap();
let acc = AccountantStub::new(addr.parse().unwrap(), socket);
let acc = ThinClient::new(addr.parse().unwrap(), socket);
for tr in trs {
acc.transfer_signed(tr.clone()).unwrap();
}

View File

@ -7,12 +7,12 @@ extern crate solana;
use getopts::Options;
use isatty::stdin_isatty;
use solana::accountant::Accountant;
use solana::accountant_skel::AccountantSkel;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::event::Event;
use solana::historian::Historian;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::tpu::Tpu;
use std::env;
use std::io::{stdin, stdout, Read};
use std::net::UdpSocket;
@ -119,7 +119,7 @@ fn main() {
let (input, event_receiver) = sync_channel(10_000);
let historian = Historian::new(event_receiver, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false));
let skel = Arc::new(AccountantSkel::new(acc, input, historian));
let tpu = Arc::new(Tpu::new(acc, input, historian));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
@ -132,8 +132,8 @@ fn main() {
serve_sock.local_addr().unwrap(),
);
eprintln!("starting server...");
let threads = AccountantSkel::serve(
&skel,
let threads = Tpu::serve(
&tpu,
d,
serve_sock,
skinny_sock,

View File

@ -130,11 +130,11 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
#[cfg(test)]
mod tests {
use accountant_skel::Request;
use bincode::serialize;
use ecdsa;
use packet::{Packet, Packets, SharedPackets};
use std::sync::RwLock;
use tpu::Request;
use transaction::test_tx;
use transaction::Transaction;

View File

@ -1,7 +1,5 @@
#![cfg_attr(feature = "unstable", feature(test))]
pub mod accountant;
pub mod accountant_skel;
pub mod accountant_stub;
pub mod crdt;
pub mod ecdsa;
pub mod entry;
@ -19,8 +17,10 @@ pub mod recorder;
pub mod result;
pub mod signature;
pub mod streamer;
pub mod transaction;
pub mod thin_client;
pub mod timing;
pub mod tpu;
pub mod transaction;
extern crate bincode;
extern crate byteorder;
extern crate chrono;

View File

@ -1,9 +1,8 @@
//! The `accountant_stub` module is a client-side object that interfaces with a server-side Accountant
//! object via the network interface exposed by AccountantSkel. Client code should use
//! this object instead of writing messages to the network directly. The binary
//! encoding of its messages are unstable and may change in future releases.
//! The `thin_client` module is a client-side object that interfaces with
//! a server-side TPU. Client code should use this object instead of writing
//! messages to the network directly. The binary encoding of its messages are
//! unstable and may change in future releases.
use accountant_skel::{Request, Response, Subscription};
use bincode::{deserialize, serialize};
use futures::future::{ok, FutureResult};
use hash::Hash;
@ -11,9 +10,10 @@ use signature::{KeyPair, PublicKey, Signature};
use std::collections::HashMap;
use std::io;
use std::net::{SocketAddr, UdpSocket};
use tpu::{Request, Response, Subscription};
use transaction::Transaction;
pub struct AccountantStub {
pub struct ThinClient {
pub addr: SocketAddr,
pub socket: UdpSocket,
last_id: Option<Hash>,
@ -21,20 +21,20 @@ pub struct AccountantStub {
balances: HashMap<PublicKey, Option<i64>>,
}
impl AccountantStub {
/// Create a new AccountantStub that will interface with AccountantSkel
impl ThinClient {
/// Create a new ThinClient that will interface with Tpu
/// over `socket`. To receive responses, the caller must bind `socket`
/// to a public address before invoking AccountantStub methods.
/// to a public address before invoking ThinClient methods.
pub fn new(addr: SocketAddr, socket: UdpSocket) -> Self {
let stub = AccountantStub {
let client = ThinClient {
addr: addr,
socket,
last_id: None,
num_events: 0,
balances: HashMap::new(),
};
stub.init();
stub
client.init();
client
}
pub fn init(&self) {
@ -119,7 +119,7 @@ impl AccountantStub {
}
/// Return the number of transactions the server processed since creating
/// this stub instance.
/// this client instance.
pub fn transaction_count(&mut self) -> u64 {
// Wait for at least one EntryInfo.
let mut done = false;
@ -148,7 +148,6 @@ impl AccountantStub {
mod tests {
use super::*;
use accountant::Accountant;
use accountant_skel::AccountantSkel;
use crdt::{Crdt, ReplicatedData};
use futures::Future;
use historian::Historian;
@ -162,10 +161,11 @@ mod tests {
use std::thread::sleep;
use std::time::Duration;
use std::time::Instant;
use tpu::Tpu;
// TODO: Figure out why this test sometimes hangs on TravisCI.
#[test]
fn test_accountant_stub() {
fn test_thin_client() {
logger::setup();
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -185,14 +185,13 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(AccountantSkel::new(acc, input, historian));
let threads =
AccountantSkel::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap();
let acc = Arc::new(Tpu::new(acc, input, historian));
let threads = Tpu::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(300));
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut acc = AccountantStub::new(addr, socket);
let mut acc = ThinClient::new(addr, socket);
let last_id = acc.get_last_id().wait().unwrap();
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap();
@ -230,9 +229,9 @@ mod tests {
}
#[test]
fn test_multi_accountant_stub() {
fn test_multi_node() {
logger::setup();
info!("test_multi_accountant_stub");
info!("test_multi_node");
let leader = test_node();
let replicant = test_node();
let alice = Mint::new(10_000);
@ -243,17 +242,17 @@ mod tests {
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Accountant::new(&alice);
Arc::new(AccountantSkel::new(acc, input, historian))
Arc::new(Tpu::new(acc, input, historian))
};
let replicant_acc = {
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Accountant::new(&alice);
Arc::new(AccountantSkel::new(acc, input, historian))
Arc::new(Tpu::new(acc, input, historian))
};
let leader_threads = AccountantSkel::serve(
let leader_threads = Tpu::serve(
&leader_acc,
leader.0.clone(),
leader.2,
@ -262,7 +261,7 @@ mod tests {
exit.clone(),
sink(),
).unwrap();
let replicant_threads = AccountantSkel::replicate(
let replicant_threads = Tpu::replicate(
&replicant_acc,
replicant.0.clone(),
replicant.1,
@ -314,7 +313,7 @@ mod tests {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let mut acc = AccountantStub::new(leader.0.serve_addr, socket);
let mut acc = ThinClient::new(leader.0.serve_addr, socket);
info!("getting leader last_id");
let last_id = acc.get_last_id().wait().unwrap();
info!("executing leader transer");
@ -330,7 +329,7 @@ mod tests {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let mut acc = AccountantStub::new(replicant.0.serve_addr, socket);
let mut acc = ThinClient::new(replicant.0.serve_addr, socket);
info!("getting replicant balance");
if let Ok(bal) = acc.get_balance(&bob_pubkey) {
replicant_balance = bal;

View File

@ -1,5 +1,5 @@
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
pub fn duration_as_ms(d: &Duration) -> u64 {
return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000);

View File

@ -1,6 +1,5 @@
//! The `accountant_skel` module is a microservice that exposes the high-level
//! Accountant API to the network. Its message encoding is currently
//! in flux. Clients should use AccountantStub to interact with it.
//! The `tpu` module implements the Transaction Processing Unit, a
//! 5-stage transaction processing pipeline in software.
use accountant::Accountant;
use bincode::{deserialize, serialize, serialize_into};
@ -12,6 +11,7 @@ use hash::Hash;
use historian::Historian;
use packet;
use packet::{SharedBlob, SharedPackets, BLOB_SIZE};
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use recorder::Signal;
use result::Result;
@ -27,13 +27,12 @@ use std::sync::mpsc::{channel, Receiver, Sender, SyncSender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
use transaction::Transaction;
use timing;
use std::time::Instant;
use rand::{thread_rng, Rng};
use streamer;
use timing;
use transaction::Transaction;
pub struct AccountantSkel {
pub struct Tpu {
acc: Mutex<Accountant>,
historian_input: Mutex<SyncSender<Signal>>,
historian: Historian,
@ -70,7 +69,7 @@ impl Request {
}
}
type SharedSkel = Arc<AccountantSkel>;
type SharedTpu = Arc<Tpu>;
#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
@ -78,10 +77,10 @@ pub enum Response {
EntryInfo(EntryInfo),
}
impl AccountantSkel {
/// Create a new AccountantSkel that wraps the given Accountant.
impl Tpu {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
AccountantSkel {
Tpu {
acc: Mutex::new(acc),
entry_info_subscribers: Mutex::new(vec![]),
historian_input: Mutex::new(historian_input),
@ -89,7 +88,7 @@ impl AccountantSkel {
}
}
fn notify_entry_info_subscribers(obj: &SharedSkel, entry: &Entry) {
fn notify_entry_info_subscribers(obj: &SharedTpu, entry: &Entry) {
// TODO: No need to bind().
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
@ -112,7 +111,7 @@ impl AccountantSkel {
}
}
fn update_entry<W: Write>(obj: &SharedSkel, writer: &Arc<Mutex<W>>, entry: &Entry) {
fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) {
trace!("update_entry entry");
obj.acc.lock().unwrap().register_entry_id(&entry.id);
writeln!(
@ -123,7 +122,7 @@ impl AccountantSkel {
Self::notify_entry_info_subscribers(obj, &entry);
}
fn receive_all<W: Write>(obj: &SharedSkel, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
fn receive_all<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
let mut l = vec![];
let entry = obj.historian
@ -182,7 +181,7 @@ impl AccountantSkel {
/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
fn run_sync<W: Write>(
obj: SharedSkel,
obj: SharedTpu,
broadcast: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler,
writer: &Arc<Mutex<W>>,
@ -198,7 +197,7 @@ impl AccountantSkel {
}
pub fn sync_service<W: Write + Send + 'static>(
obj: SharedSkel,
obj: SharedTpu,
exit: Arc<AtomicBool>,
broadcast: streamer::BlobSender,
blob_recycler: packet::BlobRecycler,
@ -213,12 +212,12 @@ impl AccountantSkel {
})
}
fn process_thin_client_requests(_obj: SharedSkel, _socket: &UdpSocket) -> Result<()> {
fn process_thin_client_requests(_obj: SharedTpu, _socket: &UdpSocket) -> Result<()> {
Ok(())
}
fn thin_client_service(
obj: SharedSkel,
obj: SharedTpu,
exit: Arc<AtomicBool>,
socket: UdpSocket,
) -> JoinHandle<()> {
@ -233,12 +232,12 @@ impl AccountantSkel {
/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
fn run_sync_no_broadcast(obj: SharedSkel) -> Result<()> {
fn run_sync_no_broadcast(obj: SharedTpu) -> Result<()> {
Self::receive_all(&obj, &Arc::new(Mutex::new(sink())))?;
Ok(())
}
pub fn sync_no_broadcast_service(obj: SharedSkel, exit: Arc<AtomicBool>) -> JoinHandle<()> {
pub fn sync_no_broadcast_service(obj: SharedTpu, exit: Arc<AtomicBool>) -> JoinHandle<()> {
spawn(move || loop {
let _ = Self::run_sync_no_broadcast(obj.clone());
if exit.load(Ordering::Relaxed) {
@ -420,7 +419,7 @@ impl AccountantSkel {
}
fn process(
obj: &SharedSkel,
obj: &SharedTpu,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
responder_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler,
@ -485,7 +484,7 @@ impl AccountantSkel {
/// Process verified blobs, already in order
/// Respond with a signed hash of the state
fn replicate_state(
obj: &SharedSkel,
obj: &SharedTpu,
verified_receiver: &streamer::BlobReceiver,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
@ -510,11 +509,11 @@ impl AccountantSkel {
Ok(())
}
/// Create a UDP microservice that forwards messages the given AccountantSkel.
/// Create a UDP microservice that forwards messages the given Tpu.
/// This service is the network leader
/// Set `exit` to shutdown its threads.
pub fn serve<W: Write + Send + 'static>(
obj: &SharedSkel,
obj: &SharedTpu,
me: ReplicatedData,
serve: UdpSocket,
skinny: UdpSocket,
@ -582,10 +581,10 @@ impl AccountantSkel {
let t_skinny = Self::thin_client_service(obj.clone(), exit.clone(), skinny);
let skel = obj.clone();
let tpu = obj.clone();
let t_server = spawn(move || loop {
let e = Self::process(
&mut skel.clone(),
&mut tpu.clone(),
&verified_receiver,
&responder_sender,
&packet_recycler,
@ -631,7 +630,7 @@ impl AccountantSkel {
/// 4. process the transaction state machine
/// 5. respond with the hash of the state back to the leader
pub fn replicate(
obj: &SharedSkel,
obj: &SharedTpu,
me: ReplicatedData,
gossip: UdpSocket,
serve: UdpSocket,
@ -682,10 +681,10 @@ impl AccountantSkel {
retransmit_sender,
);
let skel = obj.clone();
let tpu = obj.clone();
let s_exit = exit.clone();
let t_replicator = spawn(move || loop {
let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler);
let e = Self::replicate_state(&tpu, &window_receiver, &blob_recycler);
if e.is_err() && s_exit.load(Ordering::Relaxed) {
break;
}
@ -728,11 +727,11 @@ impl AccountantSkel {
}
let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone());
let skel = obj.clone();
let tpu = obj.clone();
let s_exit = exit.clone();
let t_server = spawn(move || loop {
let e = Self::process(
&mut skel.clone(),
&mut tpu.clone(),
&verified_receiver,
&responder_sender,
&packet_recycler,
@ -786,15 +785,13 @@ pub fn to_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedP
#[cfg(test)]
mod tests {
use accountant_skel::{to_packets, Request};
use bincode::serialize;
use ecdsa;
use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS};
use tpu::{to_packets, Request};
use transaction::{memfind, test_tx};
use accountant::Accountant;
use accountant_skel::AccountantSkel;
use accountant_stub::AccountantStub;
use chrono::prelude::*;
use crdt::Crdt;
use crdt::ReplicatedData;
@ -819,6 +816,8 @@ mod tests {
use std::thread::sleep;
use std::time::Duration;
use streamer;
use thin_client::ThinClient;
use tpu::Tpu;
use transaction::Transaction;
#[test]
@ -856,27 +855,27 @@ mod tests {
let acc = Accountant::new(&mint);
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let skel = AccountantSkel::new(acc, input, historian);
let tpu = Tpu::new(acc, input, historian);
// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(skel.process_events(events).is_ok());
assert!(tpu.process_events(events).is_ok());
// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(skel.process_events(events).is_ok());
assert!(tpu.process_events(events).is_ok());
// Collect the ledger and feed it to a new accountant.
skel.historian_input
tpu.historian_input
.lock()
.unwrap()
.send(Signal::Tick)
.unwrap();
drop(skel.historian_input);
let entries: Vec<Entry> = skel.historian.output.lock().unwrap().iter().collect();
drop(tpu.historian_input);
let entries: Vec<Entry> = tpu.historian.output.lock().unwrap().iter().collect();
// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
@ -901,10 +900,10 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc_skel = Arc::new(AccountantSkel::new(acc, input, historian));
let tpu = Arc::new(Tpu::new(acc, input, historian));
let serve_addr = leader_serve.local_addr().unwrap();
let threads = AccountantSkel::serve(
&acc_skel,
let threads = Tpu::serve(
&tpu,
leader_data,
leader_serve,
leader_skinny,
@ -916,23 +915,23 @@ mod tests {
let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
let mut acc_stub = AccountantStub::new(serve_addr, socket);
let last_id = acc_stub.get_last_id().wait().unwrap();
let mut client = ThinClient::new(serve_addr, socket);
let last_id = client.get_last_id().wait().unwrap();
trace!("doing stuff");
let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);
let _sig = acc_stub.transfer_signed(tr).unwrap();
let _sig = client.transfer_signed(tr).unwrap();
let last_id = acc_stub.get_last_id().wait().unwrap();
let last_id = client.get_last_id().wait().unwrap();
let mut tr2 = Transaction::new(&alice.keypair(), bob_pubkey, 501, last_id);
tr2.data.tokens = 502;
tr2.data.plan = Plan::new_payment(502, bob_pubkey);
let _sig = acc_stub.transfer_signed(tr2).unwrap();
let _sig = client.transfer_signed(tr2).unwrap();
assert_eq!(acc_stub.get_balance(&bob_pubkey).unwrap(), 500);
assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 500);
trace!("exiting");
exit.store(true, Ordering::Relaxed);
trace!("joining threads");
@ -1009,9 +1008,9 @@ mod tests {
let acc = Accountant::new(&alice);
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(AccountantSkel::new(acc, input, historian));
let acc = Arc::new(Tpu::new(acc, input, historian));
let replicate_addr = target1_data.replicate_addr;
let threads = AccountantSkel::replicate(
let threads = Tpu::replicate(
&acc,
target1_data,
target1_gossip,
@ -1111,7 +1110,7 @@ mod tests {
let entry_list = vec![e0; 1000];
let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new();
AccountantSkel::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
let serialized_entry_list = serialize(&entry_list).unwrap();
let mut num_blobs_ref = serialized_entry_list.len() / BLOB_SIZE;
if serialized_entry_list.len() % BLOB_SIZE != 0 {
@ -1127,7 +1126,6 @@ mod bench {
extern crate test;
use self::test::Bencher;
use accountant::{Accountant, MAX_ENTRY_IDS};
use accountant_skel::*;
use bincode::serialize;
use hash::hash;
use mint::Mint;
@ -1135,6 +1133,7 @@ mod bench {
use std::collections::HashSet;
use std::sync::mpsc::sync_channel;
use std::time::Instant;
use tpu::*;
use transaction::Transaction;
#[bench]
@ -1180,17 +1179,17 @@ mod bench {
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let skel = AccountantSkel::new(acc, input, historian);
let tpu = Tpu::new(acc, input, historian);
let now = Instant::now();
assert!(skel.process_events(req_vers).is_ok());
assert!(tpu.process_events(req_vers).is_ok());
let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec;
// Ensure that all transactions were successfully logged.
drop(skel.historian_input);
let entries: Vec<Entry> = skel.historian.output.lock().unwrap().iter().collect();
drop(tpu.historian_input);
let entries: Vec<Entry> = tpu.historian.output.lock().unwrap().iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);