lastidnotfound step 2: (#1300)

lastidnotfound step 2:
  * move "record stage", aka poh_service into banking stage
  * remove Entry.has_more, is incompatible with leader rotation
  * rewrite entry_next_hash in terms of Poh
  * simplify and unify transaction hashing (no embedded nulls)
  * register_last_entry from banking stage, fixes #1171 (w00t!)
  * new PoH doesn't generate empty ledger entries, so some fixes necessary in 
         multinode tests that rely on that (e.g. giving validators airdrops)
  * make window repair less patient, if we've been waiting for an answer, 
          don't be shy about most recent blobs
   * delete recorder and record stage
   * make more verbost  thin_client error reporting
   * more tracing in window (sigh)
This commit is contained in:
Rob Walker 2018-09-21 21:01:13 -07:00 committed by GitHub
parent 54b407b4ca
commit be31da3dce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 346 additions and 562 deletions

View File

@ -478,9 +478,7 @@ impl Bank {
result?;
}
}
if !entry.has_more {
self.register_entry_id(&entry.id);
}
self.register_entry_id(&entry.id);
Ok(())
}
@ -680,11 +678,9 @@ mod tests {
use hash::hash;
use ledger;
use logger;
use packet::BLOB_DATA_SIZE;
use signature::{GenKeys, KeypairUtil};
use std;
use std::io::{BufReader, Cursor, Seek, SeekFrom};
use std::mem::size_of;
#[test]
fn test_bank_new() {
@ -901,25 +897,6 @@ mod tests {
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
}
fn create_sample_block_with_next_entries(
mint: &Mint,
length: usize,
) -> impl Iterator<Item = Entry> {
let keypair = Keypair::new();
let hash = mint.last_id();
let mut txs = Vec::with_capacity(length);
for i in 0..length {
txs.push(Transaction::system_new(
&mint.keypair(),
keypair.pubkey(),
i as i64,
hash,
));
}
let entries = ledger::next_entries(&hash, 0, txs);
entries.into_iter()
}
fn create_sample_block_with_next_entries_using_keypairs(
mint: &Mint,
keypairs: &[Keypair],
@ -940,21 +917,12 @@ mod tests {
for _ in 0..length {
let keypair = Keypair::new();
let tx = Transaction::system_new(&mint.keypair(), keypair.pubkey(), 1, hash);
let entry = Entry::new_mut(&mut hash, &mut num_hashes, vec![tx], false);
let entry = Entry::new_mut(&mut hash, &mut num_hashes, vec![tx]);
entries.push(entry);
}
entries.into_iter()
}
fn create_sample_ledger_with_next_entries(
length: usize,
) -> (impl Iterator<Item = Entry>, Pubkey) {
let mint = Mint::new((length * length) as i64);
let genesis = mint.create_entries();
let block = create_sample_block_with_next_entries(&mint, length);
(genesis.into_iter().chain(block), mint.pubkey())
}
fn create_sample_ledger(length: usize) -> (impl Iterator<Item = Entry>, Pubkey) {
let mint = Mint::new(1 + length as i64);
let genesis = mint.create_entries();
@ -1038,16 +1006,6 @@ mod tests {
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
}
#[test]
fn test_process_ledger_has_more_cross_block() {
// size_of<Transaction> is quite large for serialized size, so
// use 2 * verify_block_size to ensure we get enough txes to cross that
// block boundary with has_more set
let num_txs = (2 * VERIFY_BLOCK_SIZE) * BLOB_DATA_SIZE / size_of::<Transaction>();
let (ledger, _pubkey) = create_sample_ledger_with_next_entries(num_txs);
let bank = Bank::default();
assert!(bank.process_ledger(ledger).is_ok());
}
#[test]
fn test_new_default() {
let def_bank = Bank::default();

View File

@ -5,10 +5,13 @@
use bank::Bank;
use bincode::deserialize;
use counter::Counter;
use entry::Entry;
use hash::{Hash, Hasher};
use log::Level;
use packet::{Packets, SharedPackets};
use poh::PohEntry;
use poh_service::PohService;
use rayon::prelude::*;
use record_stage::Signal;
use result::{Error, Result};
use service::Service;
use std::net::SocketAddr;
@ -34,21 +37,35 @@ impl BankingStage {
pub fn new(
bank: Arc<Bank>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
) -> (Self, Receiver<Signal>) {
let (signal_sender, signal_receiver) = channel();
tick_duration: Option<Duration>,
) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-banking-stage".to_string())
.spawn(move || loop {
if let Err(e) = Self::process_packets(&bank, &verified_receiver, &signal_sender) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::SendError => break,
_ => error!("{:?}", e),
.spawn(move || {
let (hash_sender, hash_receiver) = channel();
let (poh_service, poh_receiver) =
PohService::new(bank.last_id(), hash_receiver, tick_duration);
loop {
if let Err(e) = Self::process_packets(
&bank,
&hash_sender,
&poh_receiver,
&verified_receiver,
&entry_sender,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::SendError => break,
_ => error!("{:?}", e),
}
}
}
drop(hash_sender);
poh_service.join().unwrap();
}).unwrap();
(BankingStage { thread_hdl }, signal_receiver)
(BankingStage { thread_hdl }, entry_receiver)
}
/// Convert the transactions from a blob of binary data to a vector of transactions and
@ -63,16 +80,97 @@ impl BankingStage {
}).collect()
}
fn process_transactions(
bank: &Arc<Bank>,
transactions: Vec<Transaction>,
hash_sender: &Sender<Hash>,
poh_receiver: &Receiver<PohEntry>,
entry_sender: &Sender<Vec<Entry>>,
) -> Result<()> {
let mut entries = Vec::new();
debug!("transactions: {}", transactions.len());
let mut chunk_start = 0;
while chunk_start != transactions.len() {
let chunk_end = chunk_start + Entry::num_will_fit(transactions[chunk_start..].to_vec());
let results = bank.process_transactions(transactions[chunk_start..chunk_end].to_vec());
debug!("results: {}", results.len());
chunk_start = chunk_end;
let mut hasher = Hasher::default();
let processed_transactions: Vec<_> = results
.into_iter()
.filter_map(|x| match x {
Ok(x) => {
hasher.hash(&x.signature.as_ref());
Some(x)
}
Err(e) => {
debug!("process transaction failed {:?}", e);
None
}
}).collect();
debug!("processed ok: {}", processed_transactions.len());
let hash = hasher.result();
if processed_transactions.len() != 0 {
hash_sender.send(hash)?;
let mut answered = false;
while !answered {
entries.extend(poh_receiver.try_iter().map(|poh| {
if let Some(mixin) = poh.mixin {
answered = true;
assert_eq!(mixin, hash);
bank.register_entry_id(&poh.id);
Entry {
num_hashes: poh.num_hashes,
id: poh.id,
transactions: processed_transactions.clone(),
}
} else {
Entry {
num_hashes: poh.num_hashes,
id: poh.id,
transactions: vec![],
}
}
}));
}
} else {
entries.extend(poh_receiver.try_iter().map(|poh| Entry {
num_hashes: poh.num_hashes,
id: poh.id,
transactions: vec![],
}));
}
}
debug!("done process_transactions, {} entries", entries.len());
Ok(entry_sender.send(entries)?)
}
/// Process the incoming packets and send output `Signal` messages to `signal_sender`.
/// Discard packets via `packet_recycler`.
pub fn process_packets(
bank: &Arc<Bank>,
hash_sender: &Sender<Hash>,
poh_receiver: &Receiver<PohEntry>,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
signal_sender: &Sender<Signal>,
entry_sender: &Sender<Vec<Entry>>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let recv_start = Instant::now();
let mms = verified_receiver.recv_timeout(timer)?;
debug!("verified_recevier {:?}", verified_receiver);
let mut reqs_len = 0;
let mms_len = mms.len();
info!(
@ -87,7 +185,8 @@ impl BankingStage {
for (msgs, vers) in mms {
let transactions = Self::deserialize_transactions(&msgs.read());
reqs_len += transactions.len();
let transactions = transactions
let transactions: Vec<_> = transactions
.into_iter()
.zip(vers)
.filter_map(|(tx, ver)| match tx {
@ -99,14 +198,15 @@ impl BankingStage {
},
}).collect();
debug!("process_transactions");
let results = bank.process_transactions(transactions);
let transactions = results.into_iter().filter_map(|x| x.ok()).collect();
if let Err(_) = signal_sender.send(Signal::Transactions(transactions)) {
return Err(Error::SendError);
}
debug!("done process_transactions");
Self::process_transactions(
bank,
transactions,
hash_sender,
poh_receiver,
entry_sender,
)?;
}
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!(

View File

@ -269,8 +269,8 @@ mod tests {
use broadcast_stage::{BroadcastStage, BroadcastStageReturnType};
use crdt::{Crdt, Node};
use entry::Entry;
use ledger::next_entries_mut;
use mint::Mint;
use recorder::Recorder;
use service::Service;
use signature::{Keypair, KeypairUtil, Pubkey};
use std::cmp;
@ -361,19 +361,19 @@ mod tests {
}
let genesis_len = broadcast_info.entries.len() as u64;
let last_entry_hash = broadcast_info
let mut last_id = broadcast_info
.entries
.last()
.expect("Ledger should not be empty")
.id;
let mut num_hashes = 0;
// Input enough entries to make exactly leader_rotation_interval entries, which will
// trigger a check for leader rotation. Because the next scheduled leader
// is ourselves, we won't exit
let mut recorder = Recorder::new(last_entry_hash);
for _ in genesis_len..leader_rotation_interval {
let new_entry = recorder.record(vec![]);
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
broadcast_info.entry_sender.send(new_entry).unwrap();
}
@ -388,7 +388,8 @@ mod tests {
// past the point of the leader rotation. The write_stage will see that
// it's no longer the leader after checking the crdt, and exit
for _ in 0..leader_rotation_interval {
let new_entry = recorder.record(vec![]);
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
match broadcast_info.entry_sender.send(new_entry) {
// We disconnected, break out of loop and check the results
Err(_) => break,

View File

@ -3,8 +3,9 @@
//! transactions within it. Entries cannot be reordered, and its field `num_hashes`
//! represents an approximate amount of time since the last Entry was created.
use bincode::{serialize_into, serialized_size};
use hash::{extend_and_hash, hash, Hash};
use hash::Hash;
use packet::{BlobRecycler, SharedBlob, BLOB_DATA_SIZE};
use poh::Poh;
use rayon::prelude::*;
use signature::Pubkey;
use std::io::Cursor;
@ -42,30 +43,17 @@ pub struct Entry {
/// generated. They may have been observed before a previous Entry ID but were
/// pushed back into this list to ensure deterministic interpretation of the ledger.
pub transactions: Vec<Transaction>,
/// Indication that:
/// 1. the next Entry in the ledger has transactions that can potentially
/// be verified in parallel with these transactions
/// 2. this Entry can be left out of the bank's entry_id cache for
/// purposes of duplicate rejection
pub has_more: bool,
}
impl Entry {
/// Creates the next Entry `num_hashes` after `start_hash`.
pub fn new(
start_hash: &Hash,
num_hashes: u64,
transactions: Vec<Transaction>,
has_more: bool,
) -> Self {
pub fn new(start_hash: &Hash, num_hashes: u64, transactions: Vec<Transaction>) -> Self {
let num_hashes = num_hashes + if transactions.is_empty() { 0 } else { 1 };
let id = next_hash(start_hash, 0, &transactions);
let entry = Entry {
num_hashes,
id,
transactions,
has_more,
};
let size = serialized_size(&entry).unwrap();
@ -115,19 +103,53 @@ impl Entry {
num_hashes: 0,
id: Hash::default(),
transactions,
has_more: false,
}).unwrap()
<= BLOB_DATA_SIZE as u64
}
pub fn num_will_fit(transactions: Vec<Transaction>) -> usize {
if transactions.len() == 0 {
return 0;
}
let mut num = transactions.len();
let mut upper = transactions.len();
let mut lower = 1; // if one won't fit, we have a lot of TODOs
let mut next = transactions.len(); // optimistic
loop {
debug!(
"num {}, upper {} lower {} next {} transactions.len() {}",
num,
upper,
lower,
next,
transactions.len()
);
if Entry::will_fit(transactions[..num].to_vec()) {
next = (upper + num) / 2;
lower = num;
debug!("num {} fits, maybe too well? trying {}", num, next);
} else {
next = (lower + num) / 2;
upper = num;
debug!("num {} doesn't fit! trying {}", num, next);
}
// same as last time
if next == num {
debug!("converged on num {}", num);
break;
}
num = next;
}
num
}
/// Creates the next Tick Entry `num_hashes` after `start_hash`.
pub fn new_mut(
start_hash: &mut Hash,
num_hashes: &mut u64,
transactions: Vec<Transaction>,
has_more: bool,
) -> Self {
let entry = Self::new(start_hash, *num_hashes, transactions, has_more);
let entry = Self::new(start_hash, *num_hashes, transactions);
*start_hash = entry.id;
*num_hashes = 0;
assert!(serialized_size(&entry).unwrap() <= BLOB_DATA_SIZE as u64);
@ -141,7 +163,6 @@ impl Entry {
num_hashes,
id: *id,
transactions: vec![],
has_more: false,
}
}
@ -170,34 +191,22 @@ impl Entry {
}
}
fn add_transaction_data(hash_data: &mut Vec<u8>, tx: &Transaction) {
hash_data.push(0u8);
hash_data.extend_from_slice(&tx.signature.as_ref());
}
/// Creates the hash `num_hashes` after `start_hash`. If the transaction contains
/// a signature, the final hash will be a hash of both the previous ID and
/// the signature. If num_hashes is zero and there's no transaction data,
/// start_hash is returned.
fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -> Hash {
let mut id = *start_hash;
if num_hashes == 0 && transactions.len() == 0 {
return *start_hash;
}
let mut poh = Poh::new(*start_hash, None);
for _ in 1..num_hashes {
id = hash(&id.as_ref());
poh.hash();
}
// Hash all the transaction data
let mut hash_data = vec![];
for tx in transactions {
add_transaction_data(&mut hash_data, tx);
}
if !hash_data.is_empty() {
extend_and_hash(&id, &hash_data)
} else if num_hashes != 0 {
hash(&id.as_ref())
} else {
id
}
poh.record(Transaction::hash(transactions)).id
}
/// Creates the next Tick or Transaction Entry `num_hashes` after `start_hash`.
@ -207,7 +216,6 @@ pub fn next_entry(start_hash: &Hash, num_hashes: u64, transactions: Vec<Transact
num_hashes,
id: next_hash(start_hash, num_hashes, &transactions),
transactions,
has_more: false,
}
}
@ -238,7 +246,7 @@ mod tests {
let keypair = Keypair::new();
let tx0 = Transaction::new(&keypair, keypair.pubkey(), 0, zero);
let tx1 = Transaction::new(&keypair, keypair.pubkey(), 1, zero);
let mut e0 = Entry::new(&zero, 0, vec![tx0.clone(), tx1.clone()], false);
let mut e0 = Entry::new(&zero, 0, vec![tx0.clone(), tx1.clone()]);
assert!(e0.verify(&zero));
// Next, swap two transactions and ensure verification fails.
@ -262,7 +270,7 @@ mod tests {
);
let tx1 =
Transaction::budget_new_signature(&keypair, keypair.pubkey(), keypair.pubkey(), zero);
let mut e0 = Entry::new(&zero, 0, vec![tx0.clone(), tx1.clone()], false);
let mut e0 = Entry::new(&zero, 0, vec![tx0.clone(), tx1.clone()]);
assert!(e0.verify(&zero));
// Next, swap two witness transactions and ensure verification fails.

View File

@ -44,9 +44,8 @@ impl<'a, W: Write> EntryWriter<'a, W> {
fn write_and_register_entry(&mut self, entry: &Entry) -> io::Result<()> {
trace!("write_and_register_entry entry");
if !entry.has_more {
self.bank.register_entry_id(&entry.id);
}
self.bank.register_entry_id(&entry.id);
Self::write_entry(&mut self.writer, entry)
}
@ -101,46 +100,8 @@ pub fn read_entries<R: BufRead>(reader: R) -> impl Iterator<Item = io::Result<En
#[cfg(test)]
mod tests {
use super::*;
use bincode::serialize;
use ledger;
use mint::Mint;
use packet::BLOB_DATA_SIZE;
use packet::PACKET_DATA_SIZE;
use signature::{Keypair, KeypairUtil};
use std::io::Cursor;
use transaction::Transaction;
#[test]
fn test_dont_register_partial_entries() {
let mint = Mint::new(1);
let bank = Bank::new(&mint);
let writer = io::sink();
let mut entry_writer = EntryWriter::new(&bank, writer);
let keypair = Keypair::new();
let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id());
let tx_size = serialize(&tx).unwrap().len();
assert!(tx_size <= PACKET_DATA_SIZE);
assert!(BLOB_DATA_SIZE >= PACKET_DATA_SIZE);
let threshold = (BLOB_DATA_SIZE / tx_size) - 1; // PACKET_DATA_SIZE is transaction size
// Verify large entries are split up and the first sets has_more.
let txs = vec![tx.clone(); threshold * 2];
let entries = ledger::next_entries(&mint.last_id(), 0, txs);
assert_eq!(entries.len(), 2);
assert!(entries[0].has_more);
assert!(!entries[1].has_more);
// Verify that write_and_register_entry doesn't register the first entries after a split.
assert_eq!(bank.last_id(), mint.last_id());
entry_writer.write_and_register_entry(&entries[0]).unwrap();
assert_eq!(bank.last_id(), mint.last_id());
// Verify that write_and_register_entry registers the final entry after a split.
entry_writer.write_and_register_entry(&entries[1]).unwrap();
assert_eq!(bank.last_id(), entries[1].id);
}
/// Same as read_entries() but parsing a buffer and returning a vector.
fn read_entries_from_buf(s: &[u8]) -> io::Result<Vec<Entry>> {

View File

@ -9,6 +9,29 @@ use std::fmt;
#[derive(Serialize, Deserialize, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Hash(GenericArray<u8, U32>);
#[derive(Clone, Default)]
pub struct Hasher {
hasher: Sha256,
}
impl Hasher {
pub fn hash(&mut self, val: &[u8]) -> () {
self.hasher.input(val);
}
pub fn hashv(&mut self, vals: &[&[u8]]) -> () {
for val in vals {
self.hash(val);
}
}
pub fn result(self) -> Hash {
// At the time of this writing, the sha2 library is stuck on an old version
// of generic_array (0.9.0). Decouple ourselves with a clone to our version.
Hash(GenericArray::clone_from_slice(
self.hasher.result().as_slice(),
))
}
}
impl AsRef<[u8]> for Hash {
fn as_ref(&self) -> &[u8] {
&self.0[..]
@ -34,13 +57,9 @@ impl Hash {
}
/// Return a Sha256 hash for the given data.
pub fn hashv(vals: &[&[u8]]) -> Hash {
let mut hasher = Sha256::default();
for val in vals {
hasher.input(val);
}
// At the time of this writing, the sha2 library is stuck on an old version
// of generic_array (0.9.0). Decouple ourselves with a clone to our version.
Hash(GenericArray::clone_from_slice(hasher.result().as_slice()))
let mut hasher = Hasher::default();
hasher.hashv(vals);
hasher.result()
}
/// Return a Sha256 hash for the given data.

View File

@ -479,10 +479,10 @@ pub fn next_entries_mut(
num_hashes: &mut u64,
transactions: Vec<Transaction>,
) -> Vec<Entry> {
// TODO: find a magic number that works better than | ?
// V
// TODO: ?? find a number that works better than |?
// V
if transactions.is_empty() || transactions.len() == 1 {
vec![Entry::new_mut(start_hash, num_hashes, transactions, false)]
vec![Entry::new_mut(start_hash, num_hashes, transactions)]
} else {
let mut chunk_start = 0;
let mut entries = Vec::new();
@ -526,7 +526,6 @@ pub fn next_entries_mut(
start_hash,
num_hashes,
transactions[chunk_start..chunk_end].to_vec(),
transactions.len() - chunk_end > 0,
));
chunk_start = chunk_end;
}
@ -612,7 +611,6 @@ mod tests {
Utc::now(),
one,
)],
false,
)
}).collect()
}
@ -698,7 +696,6 @@ mod tests {
num_hashes: 0,
id: Hash::default(),
transactions: vec![],
has_more: false,
}).unwrap() as usize;
assert!(tx_small_size < tx_large_size);
assert!(tx_large_size < PACKET_DATA_SIZE);
@ -715,8 +712,6 @@ mod tests {
let transactions = vec![tx_small.clone(); threshold * 2];
let entries0 = next_entries(&id, 0, transactions.clone());
assert_eq!(entries0.len(), 2);
assert!(entries0[0].has_more);
assert!(!entries0[entries0.len() - 1].has_more);
assert!(entries0.verify(&id));
// verify the split with small transactions followed by large
@ -728,8 +723,6 @@ mod tests {
let entries0 = next_entries(&id, 0, transactions.clone());
assert!(entries0.len() >= 2);
assert!(entries0[0].has_more);
assert!(!entries0[entries0.len() - 1].has_more);
assert!(entries0.verify(&id));
}

View File

@ -38,8 +38,6 @@ pub mod packet;
pub mod payment_plan;
pub mod poh;
pub mod poh_service;
pub mod record_stage;
pub mod recorder;
pub mod recvmmsg;
pub mod recycler;
pub mod replicate_stage;

View File

@ -57,8 +57,8 @@ impl Mint {
}
pub fn create_entries(&self) -> Vec<Entry> {
let e0 = Entry::new(&self.seed(), 0, vec![], false);
let e1 = Entry::new(&e0.id, 0, self.create_transactions(), false);
let e0 = Entry::new(&self.seed(), 0, vec![]);
let e1 = Entry::new(&e0.id, 0, self.create_transactions());
vec![e0, e1]
}
}

View File

@ -22,34 +22,29 @@ pub struct PohService {
impl PohService {
/// A background thread that will continue tagging received Transaction messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new(start_hash: Hash, hash_receiver: Receiver<Hash>) -> (Self, Receiver<PohEntry>) {
let (poh_sender, poh_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-record-service".to_string())
.spawn(move || {
let mut poh = Poh::new(start_hash, None);
let _ = Self::process_hashes(&mut poh, &hash_receiver, &poh_sender);
}).unwrap();
(PohService { thread_hdl }, poh_receiver)
}
/// Same as `PohService::new`, but will automatically produce entries every `tick_duration`.
pub fn new_with_clock(
/// if tick_duration is some, service will automatically produce entries every
/// `tick_duration`.
pub fn new(
start_hash: Hash,
hash_receiver: Receiver<Hash>,
tick_duration: Duration,
tick_duration: Option<Duration>,
) -> (Self, Receiver<PohEntry>) {
let (poh_sender, poh_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-record-service".to_string())
.spawn(move || {
let mut poh = Poh::new(start_hash, Some(tick_duration));
loop {
if Self::try_process_hashes(&mut poh, &hash_receiver, &poh_sender).is_err() {
return;
let mut poh = Poh::new(start_hash, tick_duration);
if tick_duration.is_some() {
loop {
if Self::try_process_hashes(&mut poh, &hash_receiver, &poh_sender).is_err()
{
return;
}
poh.hash();
}
poh.hash();
} else {
let _ = Self::process_hashes(&mut poh, &hash_receiver, &poh_sender);
}
}).unwrap();
@ -111,7 +106,7 @@ mod tests {
#[test]
fn test_poh() {
let (hash_sender, hash_receiver) = channel();
let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver);
let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver, None);
hash_sender.send(Hash::default()).unwrap();
sleep(Duration::from_millis(1));
@ -136,7 +131,7 @@ mod tests {
#[test]
fn test_poh_closed_sender() {
let (hash_sender, hash_receiver) = channel();
let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver);
let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver, None);
drop(poh_receiver);
hash_sender.send(Hash::default()).unwrap();
assert_eq!(poh_service.thread_hdl.join().unwrap(), ());
@ -145,8 +140,11 @@ mod tests {
#[test]
fn test_poh_clock() {
let (hash_sender, hash_receiver) = channel();
let (_poh_service, poh_receiver) =
PohService::new_with_clock(Hash::default(), hash_receiver, Duration::from_millis(1));
let (_poh_service, poh_receiver) = PohService::new(
Hash::default(),
hash_receiver,
Some(Duration::from_millis(1)),
);
sleep(Duration::from_millis(50));
drop(hash_sender);

View File

@ -1,240 +0,0 @@
//! The `record_stage` module provides an object for generating a Proof of History.
//! It records Transaction items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an Transaction item. It
//! tags each Transaction with an Entry, and sends it back. The Entry includes the
//! Transaction, the latest hash, and the number of hashes since the last transaction.
//! The resulting stream of entries represents ordered transactions in time.
use bank::Bank;
use counter::Counter;
use entry::Entry;
use log::Level;
use recorder::Recorder;
use service::Service;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant};
use transaction::Transaction;
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum Signal {
Tick,
Transactions(Vec<Transaction>),
}
pub struct RecordStage {
thread_hdl: JoinHandle<()>,
}
impl RecordStage {
/// A background thread that will continue tagging received Transaction messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new(signal_receiver: Receiver<Signal>, bank: Arc<Bank>) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel();
let start_hash = bank.last_id();
let thread_hdl = Builder::new()
.name("solana-record-stage".to_string())
.spawn(move || {
let mut recorder = Recorder::new(start_hash);
let _ =
Self::process_signals(&mut recorder, &signal_receiver, &bank, &entry_sender);
}).unwrap();
(RecordStage { thread_hdl }, entry_receiver)
}
/// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`.
pub fn new_with_clock(
signal_receiver: Receiver<Signal>,
bank: Arc<Bank>,
tick_duration: Duration,
) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel();
let start_hash = bank.last_id();
let thread_hdl = Builder::new()
.name("solana-record-stage".to_string())
.spawn(move || {
let mut recorder = Recorder::new(start_hash);
let start_time = Instant::now();
loop {
if Self::try_process_signals(
&mut recorder,
start_time,
tick_duration,
&signal_receiver,
&bank,
&entry_sender,
).is_err()
{
return;
}
recorder.hash();
}
}).unwrap();
(RecordStage { thread_hdl }, entry_receiver)
}
fn process_signal(
signal: Signal,
bank: &Arc<Bank>,
recorder: &mut Recorder,
sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> {
let txs = if let Signal::Transactions(txs) = signal {
txs
} else {
vec![]
};
let txs_len = txs.len();
let entries = recorder.record(txs);
for entry in &entries {
if !entry.has_more {
bank.register_entry_id(&entry.id);
}
}
let entries_len = entries.len();
sender.send(entries).or(Err(()))?;
inc_new_counter_info!("record_stage-txs", txs_len);
inc_new_counter_info!("record_stage-entries", entries_len);
Ok(())
}
fn process_signals(
recorder: &mut Recorder,
receiver: &Receiver<Signal>,
bank: &Arc<Bank>,
sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> {
loop {
match receiver.recv() {
Ok(signal) => Self::process_signal(signal, bank, recorder, sender)?,
Err(RecvError) => return Err(()),
}
}
}
fn try_process_signals(
recorder: &mut Recorder,
start_time: Instant,
tick_duration: Duration,
receiver: &Receiver<Signal>,
bank: &Arc<Bank>,
sender: &Sender<Vec<Entry>>,
) -> Result<(), ()> {
loop {
if let Some(entry) = recorder.tick(start_time, tick_duration) {
sender.send(vec![entry]).or(Err(()))?;
}
match receiver.try_recv() {
Ok(signal) => Self::process_signal(signal, &bank, recorder, sender)?,
Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => return Err(()),
};
}
}
}
impl Service for RecordStage {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use bank::Bank;
use ledger::Block;
use mint::Mint;
use signature::{Keypair, KeypairUtil};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread::sleep;
#[test]
fn test_historian() {
let (tx_sender, tx_receiver) = channel();
let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let zero = bank.last_id();
let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank);
tx_sender.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000));
tx_sender.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000));
tx_sender.send(Signal::Tick).unwrap();
let entry0 = entry_receiver.recv().unwrap()[0].clone();
let entry1 = entry_receiver.recv().unwrap()[0].clone();
let entry2 = entry_receiver.recv().unwrap()[0].clone();
assert_eq!(entry0.num_hashes, 0);
assert_eq!(entry1.num_hashes, 0);
assert_eq!(entry2.num_hashes, 0);
drop(tx_sender);
assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
assert!([entry0, entry1, entry2].verify(&zero));
}
#[test]
fn test_historian_closed_sender() {
let (tx_sender, tx_receiver) = channel();
let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank);
drop(entry_receiver);
tx_sender.send(Signal::Tick).unwrap();
assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
}
#[test]
fn test_transactions() {
let (tx_sender, signal_receiver) = channel();
let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let zero = bank.last_id();
let (_record_stage, entry_receiver) = RecordStage::new(signal_receiver, bank);
let alice_keypair = Keypair::new();
let bob_pubkey = Keypair::new().pubkey();
let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
let tx1 = Transaction::new(&alice_keypair, bob_pubkey, 2, zero);
tx_sender
.send(Signal::Transactions(vec![tx0, tx1]))
.unwrap();
drop(tx_sender);
let entries: Vec<_> = entry_receiver.iter().collect();
assert_eq!(entries.len(), 1);
}
#[test]
fn test_clock() {
let (tx_sender, tx_receiver) = channel();
let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let zero = bank.last_id();
let (_record_stage, entry_receiver) =
RecordStage::new_with_clock(tx_receiver, bank, Duration::from_millis(20));
sleep(Duration::from_millis(900));
tx_sender.send(Signal::Tick).unwrap();
drop(tx_sender);
let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect();
assert!(entries.len() > 1);
// Ensure the ID is not the seed.
assert_ne!(entries[0].id, zero);
}
}

View File

@ -1,48 +0,0 @@
//! The `recorder` module provides an object for generating a Proof of History.
//! It records Transaction items on behalf of its users.
use entry::Entry;
use hash::{hash, Hash};
use ledger;
use std::time::{Duration, Instant};
use transaction::Transaction;
pub struct Recorder {
last_hash: Hash,
num_hashes: u64,
num_ticks: u32,
}
impl Recorder {
pub fn new(last_hash: Hash) -> Self {
Recorder {
last_hash,
num_hashes: 0,
num_ticks: 0,
}
}
pub fn hash(&mut self) {
self.last_hash = hash(&self.last_hash.as_ref());
self.num_hashes += 1;
}
pub fn record(&mut self, transactions: Vec<Transaction>) -> Vec<Entry> {
ledger::next_entries_mut(&mut self.last_hash, &mut self.num_hashes, transactions)
}
pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option<Entry> {
if start_time.elapsed() > tick_duration * (self.num_ticks + 1) {
// TODO: don't let this overflow u32
self.num_ticks += 1;
Some(Entry::new_mut(
&mut self.last_hash,
&mut self.num_hashes,
vec![],
false,
))
} else {
None
}
}
}

View File

@ -192,7 +192,7 @@ impl ThinClient {
self.balances
.get(pubkey)
.map(Bank::read_balance)
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "nokey"))
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "AccountNotFound"))
}
/// Request the finality from the leader node
@ -670,7 +670,9 @@ mod tests {
let balance = client.poll_get_balance(&bob_keypair.pubkey());
assert!(balance.is_err());
server.close().unwrap();
server
.close()
.unwrap_or_else(|e| panic!("close() failed! {:?}", e));
remove_dir_all(ledger_path).unwrap();
}
}

View File

@ -2,27 +2,27 @@
//! 5-stage transaction processing pipeline in software.
//!
//! ```text
//! .---------------------------------------------------------------.
//! | TPU .-----. |
//! | | PoH | |
//! | `--+--` |
//! | | |
//! | v |
//! | .-------. .-----------. .---------. .--------. .-------. |
//! .---------. | | Fetch | | SigVerify | | Banking | | Record | | Write | | .------------.
//! | Clients |--->| Stage |->| Stage |->| Stage |->| Stage |->| Stage +--->| Validators |
//! `---------` | | | | | | | | | | | | `------------`
//! | `-------` `-----------` `----+----` `--------` `---+---` |
//! | | | |
//! | | | |
//! | | | |
//! | | | |
//! `---------------------------------|-----------------------|-----`
//! | |
//! v v
//! .------. .--------.
//! | Bank | | Ledger |
//! `------` `--------`
//! .----------------------------------------------------.
//! | TPU .-------------. |
//! | | PoH Service | |
//! | `-------+-----` |
//! | ^ | |
//! | | v |
//! | .-------. .-----------. .-+-------. .-------. |
//! .---------. | | Fetch | | SigVerify | | Banking | | Write | | .------------.
//! | Clients |--->| Stage |->| Stage |->| Stage |-->| Stage +--->| Validators |
//! `---------` | | | | | | | | | | `------------`
//! | `-------` `-----------` `----+----` `---+---` |
//! | | | |
//! | | | |
//! | | | |
//! | | | |
//! `---------------------------------|------------|-----`
//! | |
//! v v
//! .------. .--------.
//! | Bank | | Ledger |
//! `------` `--------`
//! ```
use bank::Bank;
@ -30,7 +30,6 @@ use banking_stage::BankingStage;
use crdt::Crdt;
use entry::Entry;
use fetch_stage::FetchStage;
use record_stage::RecordStage;
use service::Service;
use signature::Keypair;
use sigverify_stage::SigVerifyStage;
@ -50,7 +49,6 @@ pub struct Tpu {
fetch_stage: FetchStage,
sigverify_stage: SigVerifyStage,
banking_stage: BankingStage,
record_stage: RecordStage,
write_stage: WriteStage,
exit: Arc<AtomicBool>,
}
@ -73,14 +71,8 @@ impl Tpu {
let (sigverify_stage, verified_receiver) =
SigVerifyStage::new(packet_receiver, sigverify_disabled);
let (banking_stage, signal_receiver) = BankingStage::new(bank.clone(), verified_receiver);
let (record_stage, entry_receiver) = match tick_duration {
Some(tick_duration) => {
RecordStage::new_with_clock(signal_receiver, bank.clone(), tick_duration)
}
None => RecordStage::new(signal_receiver, bank.clone()),
};
let (banking_stage, entry_receiver) =
BankingStage::new(bank.clone(), verified_receiver, tick_duration);
let (write_stage, entry_forwarder) = WriteStage::new(
keypair,
@ -95,7 +87,6 @@ impl Tpu {
fetch_stage,
sigverify_stage,
banking_stage,
record_stage,
write_stage,
exit: exit.clone(),
};
@ -119,7 +110,6 @@ impl Service for Tpu {
self.fetch_stage.join()?;
self.sigverify_stage.join()?;
self.banking_stage.join()?;
self.record_stage.join()?;
match self.write_stage.join()? {
WriteStageReturnType::LeaderRotation => Ok(Some(TpuReturnType::LeaderRotation)),
_ => Ok(None),

View File

@ -4,7 +4,7 @@ use bincode::{deserialize, serialize};
use budget::{Budget, Condition};
use budget_program::BudgetState;
use chrono::prelude::*;
use hash::Hash;
use hash::{Hash, Hasher};
use instruction::{Contract, Instruction, Vote};
use payment_plan::Payment;
use signature::{Keypair, KeypairUtil, Pubkey, Signature};
@ -292,6 +292,14 @@ impl Transaction {
true
}
}
// a hash of a slice of transactions only needs to hash the signatures
pub fn hash(transactions: &[Transaction]) -> Hash {
let mut hasher = Hasher::default();
transactions
.iter()
.for_each(|tx| hasher.hash(&tx.signature.as_ref()));
hasher.result()
}
}
pub fn test_tx() -> Transaction {

View File

@ -245,7 +245,7 @@ pub mod tests {
let transfer_amount = 501;
let bob_keypair = Keypair::new();
for i in 0..num_transfers {
let entry0 = Entry::new(&cur_hash, i, vec![], false);
let entry0 = Entry::new(&cur_hash, i, vec![]);
bank.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash.as_ref());
@ -257,7 +257,7 @@ pub mod tests {
);
bank.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash.as_ref());
let entry1 = Entry::new(&cur_hash, i + num_transfers, vec![tx0], false);
let entry1 = Entry::new(&cur_hash, i + num_transfers, vec![tx0]);
bank.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash.as_ref());

View File

@ -99,9 +99,9 @@ impl WindowUtil for Window {
received: u64,
) -> Vec<(SocketAddr, Vec<u8>)> {
let num_peers = crdt.read().unwrap().table.len() as u64;
let highest_lost = calculate_highest_lost_blob_index(num_peers, consumed, received);
let max_repair = calculate_max_repair(num_peers, consumed, received, times);
let idxs = self.clear_slots(consumed, highest_lost);
let idxs = self.clear_slots(consumed, max_repair);
let reqs: Vec<_> = idxs
.into_iter()
.filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok())
@ -110,14 +110,14 @@ impl WindowUtil for Window {
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
if log_enabled!(Level::Trace) {
trace!(
"{}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
"{}: repair_window counter times: {} consumed: {} received: {} max_repair: {} missing: {}",
id,
times,
consumed,
highest_lost,
received,
max_repair,
reqs.len()
);
for (to, _) in &reqs {
trace!("{}: repair_window request to {}", id, to);
}
@ -286,17 +286,22 @@ impl WindowUtil for Window {
}
}
fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u64) -> u64 {
fn calculate_max_repair(num_peers: u64, consumed: u64, received: u64, times: usize) -> u64 {
// Calculate the highest blob index that this node should have already received
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
// the data to their peer nodes. So there's a possibility that a blob (with index lower
// than current received index) is being retransmitted by a peer node.
let highest_lost = cmp::max(consumed, received.saturating_sub(num_peers));
let max_repair = if times >= 8 {
// if repair backoff is getting high, don't wait for avalanche
cmp::max(consumed, received)
} else {
cmp::max(consumed, received.saturating_sub(num_peers))
};
// This check prevents repairing a blob that will cause window to roll over. Even if
// the highes_lost blob is actually missing, asking to repair it might cause our
// current window to move past other missing blobs
cmp::min(consumed + WINDOW_SIZE - 1, highest_lost)
cmp::min(consumed + WINDOW_SIZE - 1, max_repair)
}
pub fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool {
@ -415,7 +420,7 @@ mod test {
use std::sync::Arc;
use std::time::Duration;
use streamer::{receiver, responder, PacketReceiver};
use window::{blob_idx_in_window, calculate_highest_lost_blob_index, WINDOW_SIZE};
use window::{blob_idx_in_window, calculate_max_repair, WINDOW_SIZE};
fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
@ -473,27 +478,28 @@ mod test {
}
#[test]
pub fn calculate_highest_lost_blob_index_test() {
assert_eq!(calculate_highest_lost_blob_index(0, 10, 90), 90);
assert_eq!(calculate_highest_lost_blob_index(15, 10, 90), 75);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 90), 10);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 50), 10);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 99), 10);
assert_eq!(calculate_highest_lost_blob_index(90, 10, 101), 11);
pub fn calculate_max_repair_test() {
assert_eq!(calculate_max_repair(0, 10, 90, 0), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 32), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 0), 75);
assert_eq!(calculate_max_repair(90, 10, 90, 0), 10);
assert_eq!(calculate_max_repair(90, 10, 50, 0), 10);
assert_eq!(calculate_max_repair(90, 10, 99, 0), 10);
assert_eq!(calculate_max_repair(90, 10, 101, 0), 11);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 95 + WINDOW_SIZE),
calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0),
WINDOW_SIZE + 5
);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 99 + WINDOW_SIZE),
calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 100 + WINDOW_SIZE),
calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_highest_lost_blob_index(90, 10, 120 + WINDOW_SIZE),
calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0),
WINDOW_SIZE + 9
);
}

View File

@ -272,6 +272,12 @@ pub fn window_service(
}
if received <= consumed {
trace!(
"{} we have everything received:{} consumed:{}",
id,
received,
consumed
);
continue;
}
@ -280,6 +286,7 @@ pub fn window_service(
trace!("{} !repair_backoff() times = {}", id, times);
continue;
}
trace!("{} let's repair! times = {}", id, times);
let mut window = window.write().unwrap();
let reqs = window.repair(&crdt, &id, times, consumed, received);

View File

@ -300,8 +300,7 @@ mod tests {
use crdt::{Crdt, Node};
use entry::Entry;
use hash::Hash;
use ledger::{genesis, read_ledger};
use recorder::Recorder;
use ledger::{genesis, next_entries_mut, read_ledger};
use service::Service;
use signature::{Keypair, KeypairUtil, Pubkey};
use std::fs::remove_dir_all;
@ -384,20 +383,20 @@ mod tests {
wcrdt.set_scheduled_leader(leader_rotation_interval, write_stage_info.my_id);
}
let last_entry_hash = write_stage_info
let mut last_id = write_stage_info
.ledger_tail
.last()
.expect("Ledger should not be empty")
.id;
let mut num_hashes = 0;
let genesis_entry_height = write_stage_info.ledger_tail.len() as u64;
// Input enough entries to make exactly leader_rotation_interval entries, which will
// trigger a check for leader rotation. Because the next scheduled leader
// is ourselves, we won't exit
let mut recorder = Recorder::new(last_entry_hash);
for _ in genesis_entry_height..leader_rotation_interval {
let new_entry = recorder.record(vec![]);
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
write_stage_info.entry_sender.send(new_entry).unwrap();
}
@ -416,7 +415,7 @@ mod tests {
// The write_stage will see that it's no longer the leader after
// checking the schedule, and exit
for _ in 0..leader_rotation_interval {
let new_entry = recorder.record(vec![]);
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
write_stage_info.entry_sender.send(new_entry).unwrap();
}
@ -452,7 +451,7 @@ mod tests {
}
let crdt = Arc::new(RwLock::new(crdt));
let entry = Entry::new(&Hash::default(), 0, vec![], false);
let entry = Entry::new(&Hash::default(), 0, vec![]);
// A vector that is completely within a certain epoch should return that
// entire vector

View File

@ -111,7 +111,7 @@ fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec<Entry> {
let mut id = start_hash;
let mut num_hashes = 0;
(0..num)
.map(|_| Entry::new_mut(&mut id, &mut num_hashes, vec![], false))
.map(|_| Entry::new_mut(&mut id, &mut num_hashes, vec![]))
.collect()
}
@ -136,7 +136,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
// write a bunch more ledger into leader's ledger, this should populate his window
// and force him to respond to repair from the ledger window
{
let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize * 2);
let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize);
let mut writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
writer.write_entries(entries).unwrap();
@ -159,6 +159,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
// start up another validator from zero, converge and then check
// balances
let keypair = Keypair::new();
let validator_pubkey = keypair.pubkey().clone();
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.info.clone();
let validator = Fullnode::new(
@ -170,24 +171,32 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
None,
);
// Send validator some tokens to vote
let validator_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &validator_pubkey, 500, None).unwrap();
info!("leader balance {}", validator_balance);
// contains the leader and new node
info!("converging....");
let _servers = converge(&leader_data, 2);
info!("converged.");
// another transaction with leader
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, None).unwrap();
info!("bob balance on leader {}", leader_balance);
assert_eq!(leader_balance, 500);
let bob_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 1, None).unwrap();
info!("bob balance on leader {}", bob_balance);
let mut checks = 1;
loop {
let mut client = mk_client(&validator_data);
let bal = client.poll_get_balance(&bob_pubkey);
info!("bob balance on validator {:?}...", bal);
if bal.unwrap_or(0) == leader_balance {
info!(
"bob balance on validator {:?} after {} checks...",
bal, checks
);
if bal.unwrap_or(0) == bob_balance {
break;
}
sleep(Duration::from_millis(300));
checks += 1;
}
info!("done!");
@ -240,6 +249,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
let mut nodes = vec![server];
for _ in 0..N {
let keypair = Keypair::new();
let validator_pubkey = keypair.pubkey().clone();
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(
&leader_ledger_path,
@ -247,6 +257,12 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
);
ledger_paths.push(ledger_path.clone());
// Send each validator some tokens to vote
let validator_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &validator_pubkey, 500, None)
.unwrap();
info!("validator balance {}", validator_balance);
let mut val = Fullnode::new(
validator,
&ledger_path,
@ -366,9 +382,17 @@ fn test_multi_node_basic() {
let mut nodes = vec![server];
for _ in 0..N {
let keypair = Keypair::new();
let validator_pubkey = keypair.pubkey().clone();
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic");
ledger_paths.push(ledger_path.clone());
// Send each validator some tokens to vote
let validator_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &validator_pubkey, 500, None)
.unwrap();
info!("validator balance {}", validator_balance);
let val = Fullnode::new(
validator,
&ledger_path,
@ -620,7 +644,7 @@ fn test_multi_node_dynamic_network() {
.spawn(move || {
info!("Spawned thread {}", n);
let keypair = Keypair::new();
//send some tokens to the new validator
//send some tokens to the new validators
let bal = retry_send_tx_and_retry_get_balance(
&leader_data,
&alice_clone.read().unwrap(),