Tuck away the Historian

The Historian is now just a utility of the accounting stage.
This commit is contained in:
Greg Fitzgerald 2018-05-09 12:25:13 -06:00
parent 778bec0777
commit ded28c705f
4 changed files with 21 additions and 36 deletions

View File

@ -25,7 +25,9 @@ pub struct AccountingStage {
impl AccountingStage {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(acc: Accountant, historian_input: Sender<Signal>, historian: Historian) -> Self {
pub fn new(acc: Accountant, start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
let (historian_input, event_receiver) = channel();
let historian = Historian::new(event_receiver, start_hash, ms_per_tick);
let (entry_sender, output) = channel();
AccountingStage {
output: Arc::new(Mutex::new(output)),
@ -157,10 +159,8 @@ mod tests {
use accounting_stage::AccountingStage;
use entry::Entry;
use event::Event;
use historian::Historian;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
use std::sync::mpsc::channel;
use transaction::Transaction;
#[test]
@ -170,9 +170,7 @@ mod tests {
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let acc = Accountant::new(&mint);
let (input, event_receiver) = channel();
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let stage = AccountingStage::new(acc, input, historian);
let stage = AccountingStage::new(acc, &mint.last_id(), None);
// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
@ -262,8 +260,7 @@ mod bench {
.collect();
let (input, event_receiver) = channel();
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let stage = AccountingStage::new(acc, input, historian);
let stage = AccountingStage::new(acc, &mint.last_id(), None);
let now = Instant::now();
assert!(stage.process_events(events).is_ok());

View File

@ -7,10 +7,10 @@ extern crate solana;
use getopts::Options;
use isatty::stdin_isatty;
use solana::accountant::Accountant;
use solana::accounting_stage::AccountingStage;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::event::Event;
use solana::historian::Historian;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::tpu::Tpu;
use std::env;
@ -18,7 +18,6 @@ use std::io::{stdin, stdout, Read};
use std::net::UdpSocket;
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::Arc;
fn print_usage(program: &str, opts: Options) {
@ -116,10 +115,9 @@ fn main() {
eprintln!("creating networking stack...");
let (input, event_receiver) = channel();
let historian = Historian::new(event_receiver, &last_id, Some(1000));
let accounting = AccountingStage::new(acc, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false));
let tpu = Arc::new(Tpu::new(acc, input, historian));
let tpu = Arc::new(Tpu::new(accounting));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();

View File

@ -148,15 +148,14 @@ impl ThinClient {
mod tests {
use super::*;
use accountant::Accountant;
use accounting_stage::AccountingStage;
use crdt::{Crdt, ReplicatedData};
use futures::Future;
use historian::Historian;
use logger;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
use std::io::sink;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
@ -183,9 +182,8 @@ mod tests {
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let (input, event_receiver) = channel();
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(Tpu::new(acc, input, historian));
let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30));
let acc = Arc::new(Tpu::new(accounting));
let threads = Tpu::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(300));
@ -240,17 +238,15 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let leader_acc = {
let (input, event_receiver) = channel();
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Accountant::new(&alice);
Arc::new(Tpu::new(acc, input, historian))
let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30));
Arc::new(Tpu::new(accounting))
};
let replicant_acc = {
let (input, event_receiver) = channel();
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Accountant::new(&alice);
Arc::new(Tpu::new(acc, input, historian))
let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30));
Arc::new(Tpu::new(accounting))
};
let leader_threads = Tpu::serve(

View File

@ -1,19 +1,16 @@
//! The `tpu` module implements the Transaction Processing Unit, a
//! 5-stage transaction processing pipeline in software.
use accountant::Accountant;
use accounting_stage::{AccountingStage, Request, Response};
use bincode::{deserialize, serialize, serialize_into};
use crdt::{Crdt, ReplicatedData};
use ecdsa;
use entry::Entry;
use event::Event;
use historian::Historian;
use packet;
use packet::{SharedBlob, SharedPackets, BLOB_SIZE};
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use recorder::Signal;
use result::Result;
use serde_json;
use std::collections::VecDeque;
@ -38,8 +35,7 @@ type SharedTpu = Arc<Tpu>;
impl Tpu {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(acc: Accountant, historian_input: Sender<Signal>, historian: Historian) -> Self {
let accounting = AccountingStage::new(acc, historian_input, historian);
pub fn new(accounting: AccountingStage) -> Self {
Tpu { accounting }
}
@ -676,6 +672,7 @@ mod tests {
use transaction::{memfind, test_tx};
use accountant::Accountant;
use accounting_stage::AccountingStage;
use chrono::prelude::*;
use crdt::Crdt;
use crdt::ReplicatedData;
@ -683,7 +680,6 @@ mod tests {
use event::Event;
use futures::Future;
use hash::{hash, Hash};
use historian::Historian;
use logger;
use mint::Mint;
use plan::Plan;
@ -734,9 +730,8 @@ mod tests {
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let (input, event_receiver) = channel();
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let tpu = Arc::new(Tpu::new(acc, input, historian));
let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30));
let tpu = Arc::new(Tpu::new(accounting));
let serve_addr = leader_serve.local_addr().unwrap();
let threads = Tpu::serve(
&tpu,
@ -843,9 +838,8 @@ mod tests {
let starting_balance = 10_000;
let alice = Mint::new(starting_balance);
let acc = Accountant::new(&alice);
let (input, event_receiver) = channel();
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let tpu = Arc::new(Tpu::new(acc, input, historian));
let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30));
let tpu = Arc::new(Tpu::new(accounting));
let replicate_addr = target1_data.replicate_addr;
let threads = Tpu::replicate(
&tpu,