Synchronize PoH, bank last_id queue and ledger entry channel.

PoH, bank's last_id queue and the Entry channel need to have a synchronized order of ids.
This commit is contained in:
Anatoly Yakovenko 2018-09-26 05:52:13 -07:00 committed by Grimes
parent 718031ec35
commit 93c4f6c9b8
8 changed files with 307 additions and 513 deletions

View File

@ -42,7 +42,7 @@ fn bench_process_transaction(bencher: &mut Bencher) {
bencher.iter(|| {
// Since benchmarker runs this multiple times, we need to clear the signatures.
bank.clear_signatures();
let results = bank.process_transactions(transactions.clone());
let results = bank.process_transactions(&transactions);
assert!(results.iter().all(Result::is_ok));
})
}

View File

@ -1,17 +1,18 @@
#![feature(test)]
extern crate bincode;
extern crate rand;
extern crate rayon;
extern crate solana;
extern crate test;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana::bank::Bank;
use solana::banking_stage::BankingStage;
use solana::banking_stage::{BankingStage, NUM_THREADS};
use solana::entry::Entry;
use solana::mint::Mint;
use solana::packet::{to_packets_chunked, PacketRecycler};
use solana::poh_service::PohService;
use solana::signature::{Keypair, KeypairUtil};
use solana::signature::{KeypairUtil, Pubkey, Signature};
use solana::transaction::Transaction;
use std::iter;
use std::sync::mpsc::{channel, Receiver};
@ -19,68 +20,6 @@ use std::sync::Arc;
use std::time::Duration;
use test::Bencher;
// use self::test::Bencher;
// use bank::{Bank, MAX_ENTRY_IDS};
// use bincode::serialize;
// use hash::hash;
// use mint::Mint;
// use rayon::prelude::*;
// use signature::{Keypair, KeypairUtil};
// use std::collections::HashSet;
// use std::time::Instant;
// use transaction::Transaction;
//
// fn bench_process_transactions(_bencher: &mut Bencher) {
// let mint = Mint::new(100_000_000);
// let bank = Bank::new(&mint);
// // Create transactions between unrelated parties.
// let txs = 100_000;
// let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
// let transactions: Vec<_> = (0..txs)
// .into_par_iter()
// .map(|i| {
// // Seed the 'to' account and a cell for its signature.
// let dummy_id = i % (MAX_ENTRY_IDS as i32);
// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
// {
// let mut last_ids = last_ids.lock().unwrap();
// if !last_ids.contains(&last_id) {
// last_ids.insert(last_id);
// bank.register_entry_id(&last_id);
// }
// }
//
// // Seed the 'from' account.
// let rando0 = Keypair::new();
// let tx = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
// bank.process_transaction(&tx).unwrap();
//
// let rando1 = Keypair::new();
// let tx = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
// bank.process_transaction(&tx).unwrap();
//
// // Finally, return a transaction that's unique
// Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
// })
// .collect();
//
// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None);
//
// let now = Instant::now();
// assert!(banking_stage.process_transactions(transactions).is_ok());
// let duration = now.elapsed();
// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
// let tps = txs as f64 / sec;
//
// // Ensure that all transactions were successfully logged.
// drop(banking_stage.historian_input);
// let entries: Vec<Entry> = banking_stage.output.lock().unwrap().iter().collect();
// assert_eq!(entries.len(), 1);
// assert_eq!(entries[0].transactions.len(), txs as usize);
//
// println!("{} tps", tps);
// }
fn check_txs(receiver: &Receiver<Vec<Entry>>, ref_tx_count: usize) {
let mut total = 0;
loop {
@ -101,132 +40,63 @@ fn check_txs(receiver: &Receiver<Vec<Entry>>, ref_tx_count: usize) {
#[bench]
fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let tx = 10_000_usize;
let txes = 1000 * NUM_THREADS;
let mint_total = 1_000_000_000_000;
let mint = Mint::new(mint_total);
let num_dst_accounts = 8 * 1024;
let num_src_accounts = 8 * 1024;
let srckeys: Vec<_> = (0..num_src_accounts).map(|_| Keypair::new()).collect();
let dstkeys: Vec<_> = (0..num_dst_accounts)
.map(|_| Keypair::new().pubkey())
.collect();
let transactions: Vec<_> = (0..tx)
.map(|i| {
Transaction::new(
&srckeys[i % num_src_accounts],
dstkeys[i % num_dst_accounts],
i as i64,
mint.last_id(),
)
}).collect();
let (verified_sender, verified_receiver) = channel();
let (entry_sender, entry_receiver) = channel();
let packet_recycler = PacketRecycler::default();
let setup_transactions: Vec<_> = (0..num_src_accounts)
.map(|i| {
Transaction::new(
&mint.keypair(),
srckeys[i].pubkey(),
mint_total / num_src_accounts as i64,
mint.last_id(),
)
}).collect();
bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint));
let (hash_sender, hash_receiver) = channel();
let (_poh_service, poh_receiver) = PohService::new(bank.last_id(), hash_receiver, None);
let verified_setup: Vec<_> =
to_packets_chunked(&packet_recycler, &setup_transactions.clone(), tx)
.into_iter()
.map(|x| {
let len = (x).read().packets.len();
(x, iter::repeat(1).take(len).collect())
}).collect();
verified_sender.send(verified_setup).unwrap();
BankingStage::process_packets(
&bank,
&hash_sender,
&poh_receiver,
&verified_receiver,
&entry_sender,
).unwrap();
check_txs(&entry_receiver, num_src_accounts);
let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192)
.into_iter()
.map(|x| {
let len = (x).read().packets.len();
(x, iter::repeat(1).take(len).collect())
}).collect();
verified_sender.send(verified).unwrap();
BankingStage::process_packets(
&bank,
&hash_sender,
&poh_receiver,
&verified_receiver,
&entry_sender,
).unwrap();
check_txs(&entry_receiver, tx);
});
}
#[bench]
fn bench_banking_stage_single_from(bencher: &mut Bencher) {
let tx = 10_000_usize;
let mint = Mint::new(1_000_000_000_000);
let mut pubkeys = Vec::new();
let num_keys = 8;
for _ in 0..num_keys {
pubkeys.push(Keypair::new().pubkey());
}
let transactions: Vec<_> = (0..tx)
let bank = Arc::new(Bank::new(&mint));
let dummy = Transaction::new(&mint.keypair(), mint.keypair().pubkey(), 1, mint.last_id());
let transactions: Vec<_> = (0..txes)
.into_par_iter()
.map(|i| {
Transaction::new(
&mint.keypair(),
pubkeys[i % num_keys],
i as i64,
mint.last_id(),
)
.map(|_| {
let mut new = dummy.clone();
let from: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
let to: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
new.keys[0] = Pubkey::new(&from[0..32]);
new.keys[1] = Pubkey::new(&to[0..32]);
new.signature = Signature::new(&sig[0..64]);
new
}).collect();
let (verified_sender, verified_receiver) = channel();
let (entry_sender, entry_receiver) = channel();
let packet_recycler = PacketRecycler::default();
// fund all the accounts
transactions.iter().for_each(|tx| {
let fund = Transaction::new(
&mint.keypair(),
tx.keys[0],
mint_total / txes as i64,
mint.last_id(),
);
assert!(bank.process_transaction(&fund).is_ok());
});
//sanity check, make sure all the transactions can execute sequentially
transactions.iter().for_each(|tx| {
let res = bank.process_transaction(&tx);
assert!(res.is_ok(), "sanity test transactions");
});
bank.clear_signatures();
//sanity check, make sure all the transactions can execute in parallel
let res = bank.process_transactions(&transactions);
for r in res {
assert!(r.is_ok(), "sanity parallel execution");
}
bank.clear_signatures();
let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192)
.into_iter()
.map(|x| {
let len = x.read().packets.len();
(x, iter::repeat(1).take(len).collect())
}).collect();
let (_stage, signal_receiver) =
BankingStage::new(bank.clone(), verified_receiver, Default::default());
bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint));
let (hash_sender, hash_receiver) = channel();
let (_poh_service, poh_receiver) = PohService::new(bank.last_id(), hash_receiver, None);
let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), tx)
.into_iter()
.map(|x| {
let len = (x).read().packets.len();
(x, iter::repeat(1).take(len).collect())
}).collect();
verified_sender.send(verified).unwrap();
BankingStage::process_packets(
&bank,
&hash_sender,
&poh_receiver,
&verified_receiver,
&entry_sender,
).unwrap();
check_txs(&entry_receiver, tx);
for v in verified.chunks(verified.len() / NUM_THREADS) {
verified_sender.send(v.to_vec()).unwrap();
}
check_txs(&signal_receiver, txes);
bank.clear_signatures();
// make sure the tx last id is still registered
bank.register_entry_id(&mint.last_id());
});
}

View File

@ -10,26 +10,44 @@ use entry::Entry;
use hash::Hasher;
use log::Level;
use packet::{Packets, SharedPackets};
use poh_service::PohService;
use poh_recorder::PohRecorder;
use rayon::prelude::*;
use result::{Error, Result};
use service::Service;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
use timing;
use transaction::Transaction;
// number of threads is 1 until mt bank is ready
pub const NUM_THREADS: usize = 1;
/// Stores the stage's thread handle and output receiver.
pub struct BankingStage {
/// Handle to the stage's thread.
thread_hdl: JoinHandle<()>,
thread_hdls: Vec<JoinHandle<()>>,
}
pub enum Config {
/// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before transmitting a new entry.
Tick(usize),
/// * `Sleep`- Low power mode. Sleep is a rough estimate of how long to sleep before rolling 1 poh once and producing 1
/// tick.
Sleep(Duration),
}
impl Default for Config {
fn default() -> Config {
// TODO: Change this to Tick to enable PoH
Config::Sleep(Duration::from_millis(500))
}
}
impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
/// Discard input packets using `packet_recycler` to minimize memory
@ -37,48 +55,74 @@ impl BankingStage {
pub fn new(
bank: Arc<Bank>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
tick_duration: Option<Duration>,
config: Config,
) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-banking-stage".to_string())
let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
let poh = PohRecorder::new(bank.clone(), entry_sender);
let tick_poh = poh.clone();
// Tick producer is a headless producer, so when it exits it should notify the banking stage.
// Since channel are not used to talk between these threads an AtomicBool is used as a
// signal.
let poh_exit = Arc::new(AtomicBool::new(false));
let banking_exit = poh_exit.clone();
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its last_id is registered with the bank.
let tick_producer = Builder::new()
.name("solana-banking-stage-tick_producer".to_string())
.spawn(move || {
let poh_service = PohService::new(bank.last_id(), tick_duration.is_some());
if let Err(e) = Self::tick_producer(tick_poh, config, &poh_exit) {
match e {
Error::SendError => (),
_ => error!(
"solana-banking-stage-tick_producer unexpected error {:?}",
e
),
}
}
debug!("tick producer exiting");
poh_exit.store(true, Ordering::Relaxed);
}).unwrap();
let mut last_tick = Instant::now();
loop {
let timeout =
tick_duration.map(|duration| duration - (Instant::now() - last_tick));
if let Err(e) = Self::process_packets(
timeout,
&bank,
&poh_service,
&verified_receiver,
&entry_sender,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::RecvError(_) => break,
Error::SendError => break,
_ => {
error!("process_packets() {:?}", e);
// Many banks that process transactions in parallel.
let mut thread_hdls: Vec<JoinHandle<()>> = (0..NUM_THREADS)
.into_iter()
.map(|_| {
let thread_bank = bank.clone();
let thread_verified_receiver = shared_verified_receiver.clone();
let thread_poh = poh.clone();
let thread_banking_exit = banking_exit.clone();
Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
loop {
if let Err(e) = Self::process_packets(
&thread_bank,
&thread_verified_receiver,
&thread_poh,
) {
debug!("got error {:?}", e);
match e {
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
break
}
Error::RecvError(_) => break,
Error::SendError => break,
_ => error!("solana-banking-stage-tx {:?}", e),
}
}
if thread_banking_exit.load(Ordering::Relaxed) {
debug!("tick service exited");
break;
}
}
}
if tick_duration.is_some() && last_tick.elapsed() > tick_duration.unwrap() {
if let Err(e) = Self::tick(&poh_service, &bank, &entry_sender) {
error!("tick() {:?}", e);
}
last_tick = Instant::now();
}
}
poh_service.join().unwrap();
}).unwrap();
(BankingStage { thread_hdl }, entry_receiver)
thread_banking_exit.store(true, Ordering::Relaxed);
}).unwrap()
}).collect();
thread_hdls.push(tick_producer);
(BankingStage { thread_hdls }, entry_receiver)
}
/// Convert the transactions from a blob of binary data to a vector of transactions and
@ -93,32 +137,32 @@ impl BankingStage {
}).collect()
}
fn tick(
poh_service: &PohService,
bank: &Arc<Bank>,
entry_sender: &Sender<Vec<Entry>>,
) -> Result<()> {
let poh = poh_service.tick();
bank.register_entry_id(&poh.id);
let entry = Entry {
num_hashes: poh.num_hashes,
id: poh.id,
transactions: vec![],
};
entry_sender.send(vec![entry])?;
Ok(())
fn tick_producer(poh: PohRecorder, config: Config, poh_exit: &AtomicBool) -> Result<()> {
loop {
match config {
Config::Tick(num) => {
for _ in 0..num {
poh.hash();
}
}
Config::Sleep(duration) => {
sleep(duration);
}
}
poh.tick()?;
if poh_exit.load(Ordering::Relaxed) {
debug!("tick service exited");
return Ok(());
}
}
}
fn process_transactions(
bank: &Arc<Bank>,
transactions: &[Transaction],
poh_service: &PohService,
) -> Result<Vec<Entry>> {
let mut entries = Vec::new();
debug!("processing: {}", transactions.len());
transactions: Vec<Transaction>,
poh: &PohRecorder,
) -> Result<()> {
debug!("transactions: {}", transactions.len());
let mut chunk_start = 0;
while chunk_start != transactions.len() {
let chunk_end = chunk_start + Entry::num_will_fit(&transactions[chunk_start..]);
@ -140,46 +184,29 @@ impl BankingStage {
}
}).collect();
debug!("processed: {}", processed_transactions.len());
chunk_start = chunk_end;
let hash = hasher.result();
if !processed_transactions.is_empty() {
let poh = poh_service.record(hash);
bank.register_entry_id(&poh.id);
entries.push(Entry {
num_hashes: poh.num_hashes,
id: poh.id,
transactions: processed_transactions,
});
let hash = hasher.result();
debug!("processed ok: {} {}", processed_transactions.len(), hash);
poh.record(hash, processed_transactions)?;
}
chunk_start = chunk_end;
}
debug!("done process_transactions, {} entries", entries.len());
Ok(entries)
debug!("done process_transactions");
Ok(())
}
/// Process the incoming packets and send output `Signal` messages to `signal_sender`.
/// Discard packets via `packet_recycler`.
pub fn process_packets(
timeout: Option<Duration>,
bank: &Arc<Bank>,
poh_service: &PohService,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
entry_sender: &Sender<Vec<Entry>>,
verified_receiver: &Arc<Mutex<Receiver<Vec<(SharedPackets, Vec<u8>)>>>>,
poh: &PohRecorder,
) -> Result<()> {
let recv_start = Instant::now();
// TODO pass deadline to recv_deadline() when/if it becomes available?
let mms = if let Some(timeout) = timeout {
verified_receiver.recv_timeout(timeout)?
} else {
verified_receiver.recv()?
};
let now = Instant::now();
let mms = verified_receiver
.lock()
.unwrap()
.recv_timeout(Duration::from_millis(100))?;
let mut reqs_len = 0;
let mms_len = mms.len();
info!(
@ -210,14 +237,12 @@ impl BankingStage {
},
}).collect();
debug!("verified transactions {}", transactions.len());
let entries = Self::process_transactions(bank, &transactions, poh_service)?;
entry_sender.send(entries)?;
Self::process_transactions(bank, transactions, poh)?;
}
inc_new_counter_info!(
"banking_stage-time_ms",
timing::duration_as_ms(&now.elapsed()) as usize
timing::duration_as_ms(&proc_start.elapsed()) as usize
);
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
@ -242,7 +267,10 @@ impl Service for BankingStage {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
}
}
@ -259,15 +287,25 @@ mod tests {
use transaction::Transaction;
#[test]
fn test_banking_stage_shutdown() {
fn test_banking_stage_shutdown1() {
let bank = Bank::new(&Mint::new(2));
let (verified_sender, verified_receiver) = channel();
let (banking_stage, _entry_receiver) =
BankingStage::new(Arc::new(bank), verified_receiver, None);
BankingStage::new(Arc::new(bank), verified_receiver, Default::default());
drop(verified_sender);
assert_eq!(banking_stage.join().unwrap(), ());
}
#[test]
fn test_banking_stage_shutdown2() {
let bank = Bank::new(&Mint::new(2));
let (_verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) =
BankingStage::new(Arc::new(bank), verified_receiver, Default::default());
drop(entry_receiver);
assert_eq!(banking_stage.join().unwrap(), ());
}
#[test]
fn test_banking_stage_tick() {
let bank = Arc::new(Bank::new(&Mint::new(2)));
@ -276,9 +314,9 @@ mod tests {
let (banking_stage, entry_receiver) = BankingStage::new(
bank.clone(),
verified_receiver,
Some(Duration::from_millis(1)),
Config::Sleep(Duration::from_millis(1)),
);
sleep(Duration::from_millis(50));
sleep(Duration::from_millis(500));
drop(verified_sender);
let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect();
@ -288,19 +326,6 @@ mod tests {
assert_eq!(banking_stage.join().unwrap(), ());
}
#[test]
fn test_banking_stage_no_tick() {
let bank = Arc::new(Bank::new(&Mint::new(2)));
let (verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) = BankingStage::new(bank, verified_receiver, None);
sleep(Duration::from_millis(1000));
drop(verified_sender);
let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect();
assert!(entries.len() == 0);
assert_eq!(banking_stage.join().unwrap(), ());
}
#[test]
fn test_banking_stage_entries_only() {
let mint = Mint::new(2);
@ -308,7 +333,7 @@ mod tests {
let start_hash = bank.last_id();
let (verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) =
BankingStage::new(bank.clone(), verified_receiver, None);
BankingStage::new(bank, verified_receiver, Default::default());
// good tx
let keypair = mint.keypair();
@ -333,59 +358,19 @@ mod tests {
drop(verified_sender);
let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect();
assert_eq!(entries.len(), 1);
assert!(entries.verify(&start_hash));
assert_eq!(entries[entries.len() - 1].id, bank.last_id());
//receive entries + ticks
let entries: Vec<_> = entry_receiver.iter().map(|x| x).collect();
assert!(entries.len() >= 1);
let mut last_id = start_hash;
entries.iter().for_each(|entries| {
assert_eq!(entries.len(), 1);
assert!(entries.verify(&last_id));
last_id = entries.last().unwrap().id;
});
drop(entry_receiver);
assert_eq!(banking_stage.join().unwrap(), ());
}
#[test]
fn test_banking_stage() {
let mint = Mint::new(2);
let bank = Arc::new(Bank::new(&mint));
let start_hash = bank.last_id();
let (verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) = BankingStage::new(
bank.clone(),
verified_receiver,
Some(Duration::from_millis(1)),
);
// wait for some ticks
sleep(Duration::from_millis(50));
// good tx
let keypair = mint.keypair();
let tx = Transaction::system_new(&keypair, keypair.pubkey(), 1, start_hash);
// good tx, but no verify
let tx_no_ver = Transaction::system_new(&keypair, keypair.pubkey(), 1, start_hash);
// bad tx, AccountNotFound
let keypair = Keypair::new();
let tx_anf = Transaction::system_new(&keypair, keypair.pubkey(), 1, start_hash);
// send 'em over
let recycler = PacketRecycler::default();
let packets = to_packets(&recycler, &[tx, tx_no_ver, tx_anf]);
// glad they all fit
assert_eq!(packets.len(), 1);
verified_sender // tx, no_ver, anf
.send(vec![(packets[0].clone(), vec![1u8, 0u8, 1u8])])
.unwrap();
drop(verified_sender);
let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect();
assert!(entries.len() > 1);
assert!(entries.verify(&start_hash));
assert_eq!(entries[entries.len() - 1].id, bank.last_id());
assert_eq!(banking_stage.join().unwrap(), ());
}
#[test]
fn test_banking_stage_entryfication() {
// In this attack we'll demonstrate that a verifier can interpret the ledger
@ -396,7 +381,7 @@ mod tests {
let recycler = PacketRecycler::default();
let (verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) =
BankingStage::new(bank.clone(), verified_receiver, None);
BankingStage::new(bank.clone(), verified_receiver, Default::default());
// Process a batch that includes a transaction that receives two tokens.
let alice = Keypair::new();
@ -419,7 +404,7 @@ mod tests {
// Collect the ledger and feed it to a new bank.
let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect();
// same assertion as running through the bank, really...
assert_eq!(entries.len(), 2);
assert!(entries.len() >= 2);
// Assert the user holds one token, not two. If the stage only outputs one
// entry, then the second transaction will be rejected, because it drives
@ -434,5 +419,4 @@ mod tests {
}
assert_eq!(bank.get_balance(&alice.pubkey()), 1);
}
}

View File

@ -316,14 +316,11 @@ impl Fullnode {
}
None => {
// Start in leader mode.
let tick_duration = None;
// TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000));
let (tpu, entry_receiver, tpu_exit) = Tpu::new(
keypair.clone(),
&bank,
&crdt,
tick_duration,
Default::default(),
node.sockets
.transaction
.iter()
@ -430,14 +427,11 @@ impl Fullnode {
fn validator_to_leader(&mut self, entry_height: u64) {
self.crdt.write().unwrap().set_leader(self.keypair.pubkey());
let tick_duration = None;
// TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000));
let (tpu, blob_receiver, tpu_exit) = Tpu::new(
self.keypair.clone(),
&self.bank,
&self.crdt,
tick_duration,
Default::default(),
self.transaction_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone transaction sockets"))

View File

@ -39,7 +39,7 @@ pub mod netutil;
pub mod packet;
pub mod payment_plan;
pub mod poh;
pub mod poh_service;
pub mod poh_recorder;
pub mod recvmmsg;
pub mod recycler;
pub mod replicate_stage;

95
src/poh_recorder.rs Normal file
View File

@ -0,0 +1,95 @@
//! The `poh_recorder` module provides an object for synchronizing with Proof of History.
//! It synchronizes PoH, bank's register_entry_id and the ledger
//!
use bank::Bank;
use entry::Entry;
use hash::Hash;
use poh::Poh;
use result::Result;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use transaction::Transaction;
#[derive(Clone)]
pub struct PohRecorder {
poh: Arc<Mutex<Poh>>,
bank: Arc<Bank>,
sender: Sender<Vec<Entry>>,
}
impl PohRecorder {
/// A recorder to synchronize PoH with the following data structures
/// * bank - the LastId's queue is updated on `tick` and `record` events
/// * sender - the Entry channel that outputs to the ledger
pub fn new(bank: Arc<Bank>, sender: Sender<Vec<Entry>>) -> Self {
let poh = Arc::new(Mutex::new(Poh::new(bank.last_id())));
PohRecorder { poh, bank, sender }
}
pub fn hash(&self) {
// TODO: amortize the cost of this lock by doing the loop in here for
// some min amount of hashes
let mut poh = self.poh.lock().unwrap();
poh.hash()
}
pub fn tick(&self) -> Result<()> {
// Register and send the entry out while holding the lock.
// This guarantees PoH order and Entry production and banks LastId queue is the same
let mut poh = self.poh.lock().unwrap();
let tick = poh.tick();
self.bank.register_entry_id(&tick.id);
let entry = Entry {
num_hashes: tick.num_hashes,
id: tick.id,
transactions: vec![],
};
self.sender.send(vec![entry])?;
Ok(())
}
pub fn record(&self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
// Register and send the entry out while holding the lock.
// This guarantees PoH order and Entry production and banks LastId queue is the same.
let mut poh = self.poh.lock().unwrap();
let tick = poh.record(mixin);
self.bank.register_entry_id(&tick.id);
let entry = Entry {
num_hashes: tick.num_hashes,
id: tick.id,
transactions: txs,
};
self.sender.send(vec![entry])?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use hash::hash;
use mint::Mint;
use std::sync::mpsc::channel;
use std::sync::Arc;
#[test]
fn test_poh() {
let mint = Mint::new(1);
let bank = Arc::new(Bank::new(&mint));
let (entry_sender, entry_receiver) = channel();
let poh_recorder = PohRecorder::new(bank, entry_sender);
//send some data
let h1 = hash(b"hello world!");
assert!(poh_recorder.record(h1, vec![]).is_ok());
assert!(poh_recorder.tick().is_ok());
//get some events
let _ = entry_receiver.recv().unwrap();
let _ = entry_receiver.recv().unwrap();
//make sure it handles channel close correctly
drop(entry_receiver);
assert!(poh_recorder.tick().is_err());
}
}

View File

@ -1,148 +0,0 @@
//! The `poh_service` module provides an object for generating a Proof of History.
//! It records Hashes items on behalf of its users. It continuously generates
//! new Hashes, only stopping to check if it has been sent a Hash to mix in
//! to the Poh.
//!
//! The returned Entry includes the mix-in request, the latest Poh Hash, and the
//! number of Hashes generated in the service since the last mix-in request.
//!
//! The resulting stream of Hashes represents ordered events in time.
//!
use hash::Hash;
use poh::{Poh, PohEntry};
use service::Service;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, Builder, JoinHandle};
pub struct PohService {
poh: Arc<Mutex<Poh>>,
thread_hdl: JoinHandle<()>,
run_poh: Arc<AtomicBool>,
}
impl PohService {
/// A background thread that will continue tagging received Transaction messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
/// if tick_duration is some, service will automatically produce entries every
/// `tick_duration`.
pub fn new(start_hash: Hash, run_poh: bool) -> Self {
let poh = Arc::new(Mutex::new(Poh::new(start_hash)));
let run_poh = Arc::new(AtomicBool::new(run_poh));
let thread_poh = poh.clone();
let thread_run_poh = run_poh.clone();
let thread_hdl = Builder::new()
.name("solana-record-service".to_string())
.spawn(move || {
while thread_run_poh.load(Ordering::Relaxed) {
thread_poh.lock().unwrap().hash();
}
}).unwrap();
PohService {
poh,
run_poh,
thread_hdl,
}
}
pub fn tick(&self) -> PohEntry {
let mut poh = self.poh.lock().unwrap();
poh.tick()
}
pub fn record(&self, mixin: Hash) -> PohEntry {
let mut poh = self.poh.lock().unwrap();
poh.record(mixin)
}
// fn process_hash(
// mixin: Option<Hash>,
// poh: &mut Poh,
// sender: &Sender<PohEntry>,
// ) -> Result<(), ()> {
// let resp = match mixin {
// Some(mixin) => poh.record(mixin),
// None => poh.tick(),
// };
// sender.send(resp).or(Err(()))?;
// Ok(())
// }
//
// fn process_hashes(
// poh: &mut Poh,
// receiver: &Receiver<Option<Hash>>,
// sender: &Sender<PohEntry>,
// ) -> Result<(), ()> {
// loop {
// match receiver.recv() {
// Ok(hash) => Self::process_hash(hash, poh, sender)?,
// Err(RecvError) => return Err(()),
// }
// }
// }
//
// fn try_process_hashes(
// poh: &mut Poh,
// receiver: &Receiver<Option<Hash>>,
// sender: &Sender<PohEntry>,
// ) -> Result<(), ()> {
// loop {
// match receiver.try_recv() {
// Ok(hash) => Self::process_hash(hash, poh, sender)?,
// Err(TryRecvError::Empty) => return Ok(()),
// Err(TryRecvError::Disconnected) => return Err(()),
// };
// }
// }
}
impl Service for PohService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.run_poh.store(false, Ordering::Relaxed);
self.thread_hdl.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use poh::verify;
use std::thread::sleep;
use std::time::Duration;
#[test]
fn test_poh() {
let poh_service = PohService::new(Hash::default(), false);
let entry0 = poh_service.record(Hash::default());
sleep(Duration::from_millis(1));
let entry1 = poh_service.record(Hash::default());
sleep(Duration::from_millis(1));
let entry2 = poh_service.record(Hash::default());
assert_eq!(entry0.num_hashes, 1);
assert_eq!(entry1.num_hashes, 1);
assert_eq!(entry2.num_hashes, 1);
assert_eq!(poh_service.join().unwrap(), ());
assert!(verify(Hash::default(), &[entry0, entry1, entry2]));
}
#[test]
fn test_do_poh() {
let poh_service = PohService::new(Hash::default(), true);
sleep(Duration::from_millis(50));
let entry = poh_service.tick();
assert!(entry.num_hashes > 1);
assert_eq!(poh_service.join().unwrap(), ());
assert!(verify(Hash::default(), &vec![entry]));
}
}

View File

@ -26,7 +26,7 @@
//! ```
use bank::Bank;
use banking_stage::BankingStage;
use banking_stage::{BankingStage, Config};
use crdt::Crdt;
use entry::Entry;
use fetch_stage::FetchStage;
@ -38,7 +38,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
use write_stage::{WriteStage, WriteStageReturnType};
pub enum TpuReturnType {
@ -58,7 +57,7 @@ impl Tpu {
keypair: Arc<Keypair>,
bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>,
tick_duration: Option<Duration>,
tick_duration: Config,
transactions_sockets: Vec<UdpSocket>,
ledger_path: &str,
sigverify_disabled: bool,