ReplayStage asking ledger for updates (#2597)

* Modify replay stage to ask db_ledger for updates instead of reading from upstream channel

* Add signal for db_ledger to update listeners about updates

* fix flaky test
This commit is contained in:
carllin 2019-02-04 15:33:43 -08:00 committed by GitHub
parent 5375c420c1
commit 3feda8a315
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 631 additions and 344 deletions

View File

@ -19,6 +19,7 @@ use std::cmp;
use std::fs;
use std::io;
use std::path::Path;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::Arc;
pub type DbLedgerRawIterator = rocksdb::DBRawIterator;
@ -281,6 +282,7 @@ pub struct DbLedger {
meta_cf: MetaCf,
data_cf: DataCf,
erasure_cf: ErasureCf,
new_blobs_signals: Vec<SyncSender<bool>>,
}
// TODO: Once we support a window that knows about different leader
@ -330,9 +332,32 @@ impl DbLedger {
meta_cf,
data_cf,
erasure_cf,
new_blobs_signals: vec![],
})
}
pub fn open_with_signal(ledger_path: &str) -> Result<(Self, SyncSender<bool>, Receiver<bool>)> {
let mut db_ledger = Self::open(ledger_path)?;
let (signal_sender, signal_receiver) = sync_channel(1);
db_ledger.new_blobs_signals = vec![signal_sender.clone()];
Ok((db_ledger, signal_sender, signal_receiver))
}
/// Returns the entry vector for the slot starting with `blob_start_index`
pub fn get_slot_entries(
&self,
slot_index: u64,
blob_start_index: u64,
max_entries: Option<u64>,
) -> Result<Vec<Entry>> {
trace!("get_slot_entries {} {}", slot_index, blob_start_index);
// Find the next consecutive block of blobs.
let consecutive_blobs =
self.get_slot_consecutive_blobs(slot_index, blob_start_index, max_entries)?;
Ok(Self::deserialize_blobs(&consecutive_blobs))
}
pub fn meta(&self) -> Result<Option<SlotMeta>> {
self.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))
}
@ -392,6 +417,33 @@ impl DbLedger {
self.write_blobs(&blobs)
}
/// Returns the next consumed index and the number of ticks in the new consumed
/// range
fn get_slot_consecutive_blobs(
&self,
slot_index: u64,
mut current_index: u64,
max_blobs: Option<u64>,
) -> Result<Vec<Vec<u8>>> {
let mut blobs: Vec<Vec<u8>> = vec![];
loop {
if Some(blobs.len() as u64) == max_blobs {
break;
}
// Try to find the next blob we're looking for in the prev_inserted_blob_datas
if let Some(blob_data) = self.data_cf.get_by_slot_index(slot_index, current_index)? {
// Try to find the next blob we're looking for in the database
blobs.push(blob_data);
} else {
break;
}
current_index += 1;
}
Ok(blobs)
}
pub fn insert_data_blobs<I>(&self, new_blobs: I) -> Result<Vec<Entry>>
where
I: IntoIterator,
@ -522,6 +574,11 @@ impl DbLedger {
}
self.db.write(batch)?;
if !consumed_queue.is_empty() {
for signal in self.new_blobs_signals.iter() {
let _ = signal.try_send(true);
}
}
Ok(consumed_queue)
}
@ -764,6 +821,21 @@ impl DbLedger {
)
}
fn deserialize_blobs<I>(blob_datas: &[I]) -> Vec<Entry>
where
I: Borrow<[u8]>,
{
blob_datas
.iter()
.map(|blob_data| {
let serialized_entry_data = &blob_data.borrow()[BLOB_HEADER_SIZE..];
let entry: Entry = deserialize(serialized_entry_data)
.expect("Ledger should only contain well formed data");
entry
})
.collect()
}
fn get_cf_options() -> Options {
let mut options = Options::default();
options.set_max_write_buffer_number(32);
@ -1060,6 +1132,11 @@ mod tests {
let result = ledger.insert_data_blobs(vec![blobs[1]]).unwrap();
assert!(result.len() == 0);
assert!(ledger
.get_slot_entries(DEFAULT_SLOT_HEIGHT, 0, None)
.unwrap()
.is_empty());
let meta = ledger
.meta_cf
.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))
@ -1069,7 +1146,11 @@ mod tests {
// Insert first blob, check for consecutive returned entries
let result = ledger.insert_data_blobs(vec![blobs[0]]).unwrap();
assert_eq!(result, entries);
let result = ledger
.get_slot_entries(DEFAULT_SLOT_HEIGHT, 0, None)
.unwrap();
assert_eq!(result, entries);
let meta = ledger
@ -1101,12 +1182,15 @@ mod tests {
// Insert blobs in reverse, check for consecutive returned blobs
for i in (0..num_blobs).rev() {
let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap();
let result_fetch = ledger
.get_slot_entries(DEFAULT_SLOT_HEIGHT, 0, None)
.unwrap();
let meta = ledger
.meta_cf
.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))
.unwrap()
.expect("Expected metadata object to exist");
assert_eq!(result, result_fetch);
if i != 0 {
assert_eq!(result.len(), 0);
assert!(meta.consumed == 0 && meta.received == num_blobs as u64);
@ -1207,6 +1291,70 @@ mod tests {
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_get_slot_entries1() {
let db_ledger_path = get_tmp_ledger_path("test_get_slot_entries1");
{
let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
let entries = make_tiny_test_entries(8);
let mut blobs = entries.clone().to_blobs();
for (i, b) in blobs.iter_mut().enumerate() {
b.set_slot(1);
if i < 4 {
b.set_index(i as u64);
} else {
b.set_index(8 + i as u64);
}
}
db_ledger
.write_blobs(&blobs)
.expect("Expected successful write of blobs");
assert_eq!(
db_ledger.get_slot_entries(1, 2, None).unwrap()[..],
entries[2..4],
);
assert_eq!(
db_ledger.get_slot_entries(1, 12, None).unwrap()[..],
entries[4..],
);
}
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_get_slot_entries2() {
let db_ledger_path = get_tmp_ledger_path("test_get_slot_entries2");
{
let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
// Write entries
let num_slots = 5 as u64;
let mut index = 0;
for slot_height in 0..num_slots {
let entries = make_tiny_test_entries(slot_height as usize + 1);
let last_entry = entries.last().unwrap().clone();
let mut blobs = entries.clone().to_blobs();
for b in blobs.iter_mut() {
b.set_index(index);
b.set_slot(slot_height as u64);
index += 1;
}
db_ledger
.write_blobs(&blobs)
.expect("Expected successful write of blobs");
assert_eq!(
db_ledger
.get_slot_entries(slot_height, index - 1, None)
.unwrap(),
vec![last_entry],
);
}
}
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_data_blobs_bulk() {
let db_ledger_path = get_tmp_ledger_path("test_insert_data_blobs_bulk");

View File

@ -24,7 +24,7 @@ use std::net::UdpSocket;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::Result;
@ -117,7 +117,8 @@ impl Fullnode {
config: &FullnodeConfig,
) -> Self {
let id = keypair.pubkey();
let (genesis_block, db_ledger) = Self::make_db_ledger(ledger_path);
let (genesis_block, db_ledger, ledger_signal_sender, ledger_signal_receiver) =
Self::make_db_ledger(ledger_path);
let (bank, entry_height, last_entry_id) =
Self::new_bank_from_db_ledger(&genesis_block, &db_ledger, leader_scheduler);
@ -235,6 +236,8 @@ impl Fullnode {
to_leader_sender,
&storage_state,
config.entry_stream.as_ref(),
ledger_signal_sender,
ledger_signal_receiver,
);
let max_tick_height = {
let ls_lock = bank.leader_scheduler.read().unwrap();
@ -396,7 +399,7 @@ impl Fullnode {
self.join()
}
fn new_bank_from_db_ledger(
pub fn new_bank_from_db_ledger(
genesis_block: &GenesisBlock,
db_ledger: &DbLedger,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
@ -424,7 +427,7 @@ impl Fullnode {
ledger_path: &str,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> (Bank, u64, Hash) {
let (genesis_block, db_ledger) = Self::make_db_ledger(ledger_path);
let (genesis_block, db_ledger, _, _) = Self::make_db_ledger(ledger_path);
Self::new_bank_from_db_ledger(&genesis_block, &db_ledger, leader_scheduler)
}
@ -432,14 +435,19 @@ impl Fullnode {
&self.bank.leader_scheduler
}
fn make_db_ledger(ledger_path: &str) -> (GenesisBlock, Arc<DbLedger>) {
let db_ledger = Arc::new(
DbLedger::open(ledger_path).expect("Expected to successfully open database ledger"),
);
fn make_db_ledger(
ledger_path: &str,
) -> (
GenesisBlock,
Arc<DbLedger>,
SyncSender<bool>,
Receiver<bool>,
) {
let (db_ledger, l_sender, l_receiver) = DbLedger::open_with_signal(ledger_path)
.expect("Expected to successfully open database ledger");
let genesis_block =
GenesisBlock::load(ledger_path).expect("Expected to successfully open genesis block");
(genesis_block, db_ledger)
(genesis_block, Arc::new(db_ledger), l_sender, l_receiver)
}
}
@ -675,7 +683,6 @@ mod tests {
);
assert!(validator.node_services.tpu.is_leader());
validator.close().expect("Expected leader node to close");
bootstrap_leader
.close()

View File

@ -3,7 +3,8 @@
use crate::bank::Bank;
use crate::cluster_info::ClusterInfo;
use crate::counter::Counter;
use crate::entry::{EntryReceiver, EntrySender, EntrySlice};
use crate::db_ledger::DbLedger;
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
#[cfg(not(test))]
use crate::entry_stream::EntryStream;
use crate::entry_stream::EntryStreamHandler;
@ -23,11 +24,9 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms;
use solana_sdk::vote_transaction::VoteTransaction;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::{channel, Receiver, SyncSender};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
@ -52,38 +51,29 @@ impl Drop for Finalizer {
pub struct ReplayStage {
t_replay: JoinHandle<()>,
exit: Arc<AtomicBool>,
ledger_signal_sender: SyncSender<bool>,
}
impl ReplayStage {
/// Process entry blobs, already in order
#[allow(clippy::too_many_arguments)]
fn process_entries(
mut entries: Vec<Entry>,
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
window_receiver: &EntryReceiver,
my_id: Pubkey,
voting_keypair: Option<&Arc<VotingKeypair>>,
ledger_entry_sender: &EntrySender,
entry_height: &Arc<RwLock<u64>>,
last_entry_id: &Arc<RwLock<Hash>>,
entry_stream: Option<&mut EntryStream>,
) -> Result<()> {
let timer = Duration::new(1, 0);
//coalesce all the available entries into a single vote
let mut entries = window_receiver.recv_timeout(timer)?;
while let Ok(mut more) = window_receiver.try_recv() {
entries.append(&mut more);
if entries.len() >= MAX_ENTRY_RECV_PER_ITER {
break;
}
}
if let Some(stream) = entry_stream {
stream.stream_entries(&entries).unwrap_or_else(|e| {
error!("Entry Stream error: {:?}, {:?}", e, stream.socket);
});
}
//coalesce all the available entries into a single vote
submit(
influxdb::Point::new("replicate-stage")
.add_field("count", influxdb::Value::Integer(entries.len() as i64))
@ -105,8 +95,6 @@ impl ReplayStage {
let (current_leader, _) = bank
.get_current_leader()
.expect("Scheduled leader should be calculated by this point");
let already_leader = my_id == current_leader;
let mut did_rotate = false;
// Next vote tick is ceiling of (current tick/ticks per block)
let mut num_ticks_to_next_vote =
@ -160,14 +148,11 @@ impl ReplayStage {
// TODO: Remove this soon once we boot the leader from ClusterInfo
if scheduled_leader != current_leader {
did_rotate = true;
cluster_info.write().unwrap().set_leader(scheduled_leader);
}
if !already_leader && my_id == scheduled_leader && did_rotate {
num_entries_to_write = i + 1;
break;
}
start_entry_index = i + 1;
num_ticks_to_next_vote = DEFAULT_TICKS_PER_SLOT;
}
@ -194,12 +179,12 @@ impl ReplayStage {
}
*entry_height.write().unwrap() += entries_len;
res?;
inc_new_counter_info!(
"replicate_stage-duration",
duration_as_ms(&now.elapsed()) as usize
);
Ok(())
}
@ -207,63 +192,142 @@ impl ReplayStage {
pub fn new(
my_id: Pubkey,
voting_keypair: Option<Arc<VotingKeypair>>,
db_ledger: Arc<DbLedger>,
bank: Arc<Bank>,
cluster_info: Arc<RwLock<ClusterInfo>>,
window_receiver: EntryReceiver,
exit: Arc<AtomicBool>,
entry_height: Arc<RwLock<u64>>,
last_entry_id: Arc<RwLock<Hash>>,
to_leader_sender: TvuRotationSender,
entry_stream: Option<&String>,
ledger_signal_sender: SyncSender<bool>,
ledger_signal_receiver: Receiver<bool>,
) -> (Self, EntryReceiver) {
let (ledger_entry_sender, ledger_entry_receiver) = channel();
let mut entry_stream = entry_stream.cloned().map(EntryStream::new);
let (_, mut current_slot) = bank
.get_current_leader()
.expect("Scheduled leader should be calculated by this point");
let mut max_tick_height_for_slot = bank
.leader_scheduler
.read()
.unwrap()
.max_tick_height_for_slot(current_slot);
let exit_ = exit.clone();
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit);
let entry_height_ = entry_height;
let last_entry_id = last_entry_id;
let _exit = Finalizer::new(exit_.clone());
let (mut last_leader_id, _) = bank
.get_current_leader()
.expect("Scheduled leader should be calculated by this point");
// Loop through db_ledger MAX_ENTRY_RECV_PER_ITER entries at a time for each
// relevant slot to see if there are any available updates
loop {
let (leader_id, _) = bank
.get_current_leader()
.expect("Scheduled leader should be calculated by this point");
if leader_id != last_leader_id && leader_id == my_id {
to_leader_sender
.send(TvuReturnType::LeaderRotation(
bank.tick_height(),
*entry_height_.read().unwrap(),
*last_entry_id.read().unwrap(),
))
.unwrap();
// Stop getting entries if we get exit signal
if exit_.load(Ordering::Relaxed) {
break;
}
last_leader_id = leader_id;
match Self::process_entries(
&bank,
&cluster_info,
&window_receiver,
my_id,
voting_keypair.as_ref(),
&ledger_entry_sender,
&entry_height_.clone(),
&last_entry_id.clone(),
entry_stream.as_mut(),
) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Err(e) => error!("{:?}", e),
Ok(()) => (),
let current_entry_height = *entry_height.read().unwrap();
let entries = {
if let Ok(entries) = db_ledger.get_slot_entries(
current_slot,
current_entry_height,
Some(MAX_ENTRY_RECV_PER_ITER as u64),
) {
entries
} else {
vec![]
}
};
let entry_len = entries.len();
// Fetch the next entries from the database
if !entries.is_empty() {
if let Err(e) = Self::process_entries(
entries,
&bank,
&cluster_info,
voting_keypair.as_ref(),
&ledger_entry_sender,
&entry_height,
&last_entry_id,
entry_stream.as_mut(),
) {
error!("{:?}", e);
}
let current_tick_height = bank.tick_height();
// We've reached the end of a slot, reset our state and check
// for leader rotation
if max_tick_height_for_slot == current_tick_height {
// Check for leader rotation
let leader_id = Self::get_leader(&bank, &cluster_info);
if leader_id != last_leader_id && my_id == leader_id {
to_leader_sender
.send(TvuReturnType::LeaderRotation(
bank.tick_height(),
*entry_height.read().unwrap(),
*last_entry_id.read().unwrap(),
))
.unwrap();
}
current_slot += 1;
max_tick_height_for_slot = bank
.leader_scheduler
.read()
.unwrap()
.max_tick_height_for_slot(current_slot);
last_leader_id = leader_id;
}
}
// Block until there are updates again
if entry_len < MAX_ENTRY_RECV_PER_ITER && ledger_signal_receiver.recv().is_err()
{
// Update disconnected, exit
break;
}
}
})
.unwrap();
(Self { t_replay }, ledger_entry_receiver)
(
Self {
t_replay,
exit,
ledger_signal_sender,
},
ledger_entry_receiver,
)
}
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
let _ = self.ledger_signal_sender.send(true);
}
fn get_leader(bank: &Bank, cluster_info: &Arc<RwLock<ClusterInfo>>) -> Pubkey {
let (scheduled_leader, _) = bank
.get_current_leader()
.expect("Scheduled leader should be calculated by this point");
// TODO: Remove this soon once we boot the leader from ClusterInfo
cluster_info.write().unwrap().set_leader(scheduled_leader);
scheduled_leader
}
}
@ -285,13 +349,11 @@ mod test {
use crate::entry::create_ticks;
use crate::entry::Entry;
use crate::fullnode::Fullnode;
use crate::genesis_block::GenesisBlock;
use crate::leader_scheduler::{
make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig,
};
use crate::packet::BlobError;
use crate::replay_stage::ReplayStage;
use crate::result::Error;
use crate::service::Service;
use crate::tvu::TvuReturnType;
use crate::voting_keypair::VotingKeypair;
use chrono::{DateTime, FixedOffset};
@ -304,7 +366,6 @@ mod test {
use std::sync::{Arc, RwLock};
#[test]
#[ignore]
pub fn test_replay_stage_leader_rotation_exit() {
solana_logger::setup();
@ -330,7 +391,7 @@ mod test {
let my_keypair = Arc::new(my_keypair);
// Write two entries to the ledger so that the validator is in the active set:
// 1) Give the validator a nonzero number of tokens 2) A vote from the validator .
// 1) Give the validator a nonzero number of tokens 2) A vote from the validator.
// This will cause leader rotation after the bootstrap height
let (active_set_entries, voting_keypair) =
make_active_set_entries(&my_keypair, &mint_keypair, &last_id, &last_id, 0);
@ -340,7 +401,24 @@ mod test {
let initial_non_tick_height = genesis_entry_height - initial_tick_height;
{
let db_ledger = DbLedger::open(&my_ledger_path).unwrap();
// Set up the LeaderScheduler so that this this node becomes the leader at
// bootstrap_height = num_bootstrap_slots * leader_rotation_interval
let leader_rotation_interval = 16;
let bootstrap_height = 2 * leader_rotation_interval;
assert!((num_ending_ticks as u64) < bootstrap_height);
let leader_scheduler_config = LeaderSchedulerConfig::new(
bootstrap_height,
leader_rotation_interval,
leader_rotation_interval * 2,
bootstrap_height,
);
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
let (db_ledger, l_sender, l_receiver) =
DbLedger::open_with_signal(&my_ledger_path).unwrap();
let db_ledger = Arc::new(db_ledger);
db_ledger
.write_entries(
DEFAULT_SLOT_HEIGHT,
@ -348,90 +426,97 @@ mod test {
&active_set_entries,
)
.unwrap();
}
// Set up the LeaderScheduler so that this node becomes the leader at
// bootstrap_height
let leader_rotation_interval = 16;
let bootstrap_height = 2 * leader_rotation_interval;
assert!((num_ending_ticks as u64) < bootstrap_height);
let leader_scheduler_config = LeaderSchedulerConfig::new(
bootstrap_height,
leader_rotation_interval,
leader_rotation_interval * 2,
bootstrap_height,
);
let genesis_block = GenesisBlock::load(&my_ledger_path)
.expect("Expected to successfully open genesis block");
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
// Set up the bank
let (bank, _, last_entry_id) =
Fullnode::new_bank_from_db_ledger(&genesis_block, &db_ledger, leader_scheduler);
// Set up the bank
let (bank, entry_height, last_entry_id) =
Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
// Set up the replay stage
let (rotation_sender, rotation_receiver) = channel();
let meta = db_ledger.meta().unwrap().unwrap();
let exit = Arc::new(AtomicBool::new(false));
let (replay_stage, ledger_writer_recv) = ReplayStage::new(
my_id,
Some(Arc::new(voting_keypair)),
db_ledger.clone(),
Arc::new(bank),
Arc::new(RwLock::new(cluster_info_me)),
exit.clone(),
Arc::new(RwLock::new(meta.consumed)),
Arc::new(RwLock::new(last_entry_id)),
rotation_sender,
None,
l_sender,
l_receiver,
);
// Set up the replay stage
let (entry_sender, entry_receiver) = channel();
let (rotation_sender, rotation_receiver) = channel();
let exit = Arc::new(AtomicBool::new(false));
let (_replay_stage, ledger_writer_recv) = ReplayStage::new(
my_keypair.pubkey(),
Some(Arc::new(voting_keypair)),
Arc::new(bank),
Arc::new(RwLock::new(cluster_info_me)),
entry_receiver,
exit.clone(),
Arc::new(RwLock::new(entry_height)),
Arc::new(RwLock::new(last_entry_id)),
rotation_sender,
None,
);
// Send enough ticks to trigger leader rotation
let extra_entries = leader_rotation_interval;
let total_entries_to_send = (bootstrap_height + extra_entries) as usize;
let num_hashes = 1;
let mut entries_to_send = vec![];
// Send enough ticks to trigger leader rotation
let total_entries_to_send = (bootstrap_height + leader_rotation_interval) as usize;
let mut entries_to_send = vec![];
while entries_to_send.len() < total_entries_to_send {
let entry = Entry::new(&mut last_id, 0, 1, vec![]);
last_id = entry.id;
entries_to_send.push(entry);
}
// Add on the only entries that weren't ticks to the bootstrap height to get the
// total expected entry length
let leader_rotation_index = (bootstrap_height - initial_tick_height) as usize;
let expected_entry_height =
bootstrap_height + initial_non_tick_height + active_set_entries_len;
let expected_last_id = entries_to_send[leader_rotation_index - 1].id;
entry_sender.send(entries_to_send.clone()).unwrap();
// Wait for replay_stage to exit and check return value is correct
assert_eq!(
Some(TvuReturnType::LeaderRotation(
bootstrap_height,
expected_entry_height,
expected_last_id,
)),
{
Some(
rotation_receiver
.recv()
.expect("should have signaled leader rotation"),
)
while entries_to_send.len() < total_entries_to_send {
let entry = Entry::new(&mut last_id, 0, num_hashes, vec![]);
last_id = entry.id;
entries_to_send.push(entry);
}
);
// Check that the entries on the ledger writer channel are correct
let received_ticks = ledger_writer_recv
.recv()
.expect("Expected to receive an entry on the ledger writer receiver");
assert!((num_ending_ticks as u64) < bootstrap_height);
assert_eq!(
&received_ticks[..],
&entries_to_send[..leader_rotation_index]
);
//replay stage should continue running even after rotation has happened (tvu never goes down)
assert_eq!(exit.load(Ordering::Relaxed), false);
//force exit
exit.store(true, Ordering::Relaxed);
// Add on the only entries that weren't ticks to the bootstrap height to get the
// total expected entry length
let leader_rotation_index = (bootstrap_height - initial_tick_height) as usize;
let expected_entry_height =
bootstrap_height + initial_non_tick_height + active_set_entries_len;
let expected_last_id = entries_to_send[leader_rotation_index - 1].id;
// Write the entries to the ledger, replay_stage should get notified of changes
db_ledger
.write_entries(DEFAULT_SLOT_HEIGHT, meta.consumed, &entries_to_send)
.unwrap();
// Wait for replay_stage to exit and check return value is correct
assert_eq!(
Some(TvuReturnType::LeaderRotation(
bootstrap_height,
expected_entry_height,
expected_last_id,
)),
{
Some(
rotation_receiver
.recv()
.expect("should have signaled leader rotation"),
)
}
);
// Check that the entries on the ledger writer channel are correct
let mut received_ticks = ledger_writer_recv
.recv()
.expect("Expected to recieve an entry on the ledger writer receiver");
while let Ok(entries) = ledger_writer_recv.try_recv() {
received_ticks.extend(entries);
}
assert_eq!(
&received_ticks[..],
&entries_to_send[..leader_rotation_index]
);
//replay stage should continue running even after rotation has happened (tvu never goes down)
assert_eq!(exit.load(Ordering::Relaxed), false);
//force exit
replay_stage
.close()
.expect("Expect successful ReplayStage exit");
}
let _ignored = remove_dir_all(&my_ledger_path);
}
@ -455,51 +540,60 @@ mod test {
500,
);
// Set up the bank
let (bank, entry_height, last_entry_id) =
Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
// Set up the cluster info
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
// Set up the replay stage
let bank = Arc::new(bank);
let (entry_sender, entry_receiver) = channel();
let exit = Arc::new(AtomicBool::new(false));
let my_keypair = Arc::new(my_keypair);
let voting_keypair = Arc::new(VotingKeypair::new_local(&my_keypair));
let (to_leader_sender, _) = channel();
let (replay_stage, ledger_writer_recv) = ReplayStage::new(
my_keypair.pubkey(),
Some(voting_keypair.clone()),
bank.clone(),
cluster_info_me.clone(),
entry_receiver,
exit.clone(),
Arc::new(RwLock::new(entry_height)),
Arc::new(RwLock::new(last_entry_id)),
to_leader_sender,
None,
);
{
let (db_ledger, l_sender, l_receiver) =
DbLedger::open_with_signal(&my_ledger_path).unwrap();
let db_ledger = Arc::new(db_ledger);
// Set up the bank
let genesis_block = GenesisBlock::load(&my_ledger_path)
.expect("Expected to successfully open genesis block");
let (bank, entry_height, last_entry_id) =
Fullnode::new_bank_from_db_ledger(&genesis_block, &db_ledger, leader_scheduler);
let bank = Arc::new(bank);
let (replay_stage, ledger_writer_recv) = ReplayStage::new(
my_keypair.pubkey(),
Some(voting_keypair.clone()),
db_ledger.clone(),
bank.clone(),
cluster_info_me.clone(),
exit.clone(),
Arc::new(RwLock::new(entry_height)),
Arc::new(RwLock::new(last_entry_id)),
to_leader_sender,
None,
l_sender,
l_receiver,
);
let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote(keypair, bank.tick_height(), bank.last_id(), 0);
cluster_info_me.write().unwrap().push_vote(vote);
let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote(keypair, bank.tick_height(), bank.last_id(), 0);
cluster_info_me.write().unwrap().push_vote(vote);
// Send ReplayStage an entry, should see it on the ledger writer receiver
let next_tick = create_ticks(1, last_entry_id);
entry_sender
.send(next_tick.clone())
.expect("Error sending entry to ReplayStage");
let received_tick = ledger_writer_recv
.recv()
.expect("Expected to recieve an entry on the ledger writer receiver");
// Send ReplayStage an entry, should see it on the ledger writer receiver
let next_tick = create_ticks(1, last_entry_id);
assert_eq!(next_tick, received_tick);
drop(entry_sender);
replay_stage
.join()
.expect("Expect successful ReplayStage exit");
db_ledger
.write_entries(DEFAULT_SLOT_HEIGHT, entry_height, next_tick.clone())
.unwrap();
let received_tick = ledger_writer_recv
.recv()
.expect("Expected to recieve an entry on the ledger writer receiver");
assert_eq!(next_tick, received_tick);
replay_stage
.close()
.expect("Expect successful ReplayStage exit");
}
let _ignored = remove_dir_all(&my_ledger_path);
}
@ -537,17 +631,8 @@ mod test {
let active_set_entries_len = active_set_entries.len() as u64;
let initial_non_tick_height = genesis_entry_height - initial_tick_height;
{
let db_ledger = DbLedger::open(&my_ledger_path).unwrap();
db_ledger
.write_entries(
DEFAULT_SLOT_HEIGHT,
genesis_entry_height,
&active_set_entries,
)
.unwrap();
}
// Set up the LeaderScheduler so that this this node becomes the leader at
// bootstrap_height = num_bootstrap_slots * leader_rotation_interval
// Set up the LeaderScheduler so that this this node becomes the leader at
// bootstrap_height = num_bootstrap_slots * leader_rotation_interval
let leader_rotation_interval = 10;
@ -563,85 +648,107 @@ mod test {
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
// Set up the bank
let (bank, entry_height, last_entry_id) =
Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
// Set up the cluster info
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
// Set up the replay stage
let voting_keypair = Arc::new(voting_keypair);
let bank = Arc::new(bank);
let (entry_sender, entry_receiver) = channel();
let (rotation_tx, rotation_rx) = channel();
let exit = Arc::new(AtomicBool::new(false));
let (_replay_stage, ledger_writer_recv) = ReplayStage::new(
my_keypair.pubkey(),
Some(voting_keypair.clone()),
bank.clone(),
cluster_info_me.clone(),
entry_receiver,
exit.clone(),
Arc::new(RwLock::new(entry_height)),
Arc::new(RwLock::new(last_entry_id)),
rotation_tx,
None,
);
let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote(keypair, bank.tick_height(), bank.last_id(), 0);
cluster_info_me.write().unwrap().push_vote(vote);
// Send enough ticks to trigger leader rotation
let total_entries_to_send = (bootstrap_height - initial_tick_height) as usize;
// Add on the only entries that weren't ticks to the bootstrap height to get the
// total expected entry length
let expected_entry_height =
bootstrap_height + initial_non_tick_height + active_set_entries_len;
let leader_rotation_index = (bootstrap_height - initial_tick_height - 1) as usize;
let mut expected_last_id = Hash::default();
for i in 0..total_entries_to_send {
let entry = Entry::new(&mut last_id, 0, 1, vec![]);
last_id = entry.id;
entry_sender
.send(vec![entry.clone()])
.expect("Expected to be able to send entry to ReplayStage");
// Check that the entries on the ledger writer channel are correct
let received_entry = ledger_writer_recv
.recv()
.expect("Expected to recieve an entry on the ledger writer receiver");
assert_eq!(received_entry[0], entry);
if i == leader_rotation_index {
expected_last_id = entry.id;
}
debug!(
"loop: i={}, leader_rotation_index={}, entry={:?}",
i, leader_rotation_index, entry,
);
}
info!("Wait for replay_stage to exit and check return value is correct");
assert_eq!(
Some(TvuReturnType::LeaderRotation(
bootstrap_height,
expected_entry_height,
expected_last_id,
)),
{
Some(
rotation_rx
.recv()
.expect("should have signaled leader rotation"),
{
let (db_ledger, l_sender, l_receiver) =
DbLedger::open_with_signal(&my_ledger_path).unwrap();
let db_ledger = Arc::new(db_ledger);
db_ledger
.write_entries(
DEFAULT_SLOT_HEIGHT,
genesis_entry_height,
&active_set_entries,
)
}
);
assert_ne!(expected_last_id, Hash::default());
.unwrap();
let meta = db_ledger
.meta()
.unwrap()
.expect("First slot metadata must exist");
info!("Replay stage should continue running even after rotation has happened (TVU never goes down)");
assert_eq!(exit.load(Ordering::Relaxed), false);
// Set up the bank
let genesis_block = GenesisBlock::load(&my_ledger_path)
.expect("Expected to successfully open genesis block");
let (bank, _, last_entry_id) =
Fullnode::new_bank_from_db_ledger(&genesis_block, &db_ledger, leader_scheduler);
let voting_keypair = Arc::new(voting_keypair);
let bank = Arc::new(bank);
let (replay_stage, ledger_writer_recv) = ReplayStage::new(
my_keypair.pubkey(),
Some(voting_keypair.clone()),
db_ledger.clone(),
bank.clone(),
cluster_info_me.clone(),
exit.clone(),
Arc::new(RwLock::new(meta.consumed)),
Arc::new(RwLock::new(last_entry_id)),
rotation_tx,
None,
l_sender,
l_receiver,
);
let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote(keypair, bank.tick_height(), bank.last_id(), 0);
cluster_info_me.write().unwrap().push_vote(vote);
// Send enough ticks to trigger leader rotation
let total_entries_to_send = (bootstrap_height - initial_tick_height) as usize;
let num_hashes = 1;
// Add on the only entries that weren't ticks to the bootstrap height to get the
// total expected entry length
let expected_entry_height =
bootstrap_height + initial_non_tick_height + active_set_entries_len;
let leader_rotation_index = (bootstrap_height - initial_tick_height - 1) as usize;
let mut expected_last_id = Hash::default();
for i in 0..total_entries_to_send {
let entry = Entry::new(&mut last_id, 0, num_hashes, vec![]);
last_id = entry.id;
db_ledger
.write_entries(
DEFAULT_SLOT_HEIGHT,
meta.consumed + i as u64,
vec![entry.clone()],
)
.expect("Expected successful database write");
// Check that the entries on the ledger writer channel are correct
let received_entry = ledger_writer_recv
.recv()
.expect("Expected to recieve an entry on the ledger writer receiver");
assert_eq!(received_entry[0], entry);
if i == leader_rotation_index {
expected_last_id = entry.id;
}
}
// Wait for replay_stage to exit and check return value is correct
assert_eq!(
Some(TvuReturnType::LeaderRotation(
bootstrap_height,
expected_entry_height,
expected_last_id,
)),
{
Some(
rotation_rx
.recv()
.expect("should have signaled leader rotation"),
)
}
);
assert_ne!(expected_last_id, Hash::default());
//replay stage should continue running even after rotation has happened (tvu never goes down)
replay_stage
.close()
.expect("Expect successful ReplayStage exit");
}
let _ignored = remove_dir_all(&my_ledger_path);
}
@ -653,7 +760,6 @@ mod test {
let my_node = Node::new_localhost_with_pubkey(my_id);
// Set up the cluster info
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
let (entry_sender, entry_receiver) = channel();
let (ledger_entry_sender, _ledger_entry_receiver) = channel();
let last_entry_id = Hash::default();
@ -665,17 +771,13 @@ mod test {
last_id = entry.id;
entries.push(entry);
}
entry_sender
.send(entries.clone())
.expect("Expected to err out");
let my_keypair = Arc::new(my_keypair);
let voting_keypair = Arc::new(VotingKeypair::new_local(&my_keypair));
let res = ReplayStage::process_entries(
entries.clone(),
&Arc::new(Bank::default()),
&cluster_info_me,
&entry_receiver,
my_id,
Some(&voting_keypair),
&ledger_entry_sender,
&Arc::new(RwLock::new(entry_height)),
@ -693,15 +795,11 @@ mod test {
let entry = Entry::new(&mut Hash::default(), 0, 1, vec![]); //just broken entries
entries.push(entry);
}
entry_sender
.send(entries.clone())
.expect("Expected to err out");
let res = ReplayStage::process_entries(
entries.clone(),
&Arc::new(Bank::default()),
&cluster_info_me,
&entry_receiver,
Keypair::new().pubkey(),
Some(&voting_keypair),
&ledger_entry_sender,
&Arc::new(RwLock::new(entry_height)),
@ -731,7 +829,6 @@ mod test {
let my_node = Node::new_localhost_with_pubkey(my_id);
// Set up the cluster info
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
let (entry_sender, entry_receiver) = channel();
let (ledger_entry_sender, _ledger_entry_receiver) = channel();
let last_entry_id = Hash::default();
@ -745,17 +842,13 @@ mod test {
expected_entries.push(entry.clone());
entries.push(entry);
}
entry_sender
.send(entries.clone())
.expect("Expected to err out");
let my_keypair = Arc::new(my_keypair);
let voting_keypair = Arc::new(VotingKeypair::new_local(&my_keypair));
ReplayStage::process_entries(
entries.clone(),
&Arc::new(Bank::default()),
&cluster_info_me,
&entry_receiver,
my_id,
Some(&voting_keypair),
&ledger_entry_sender,
&Arc::new(RwLock::new(entry_height)),

View File

@ -136,9 +136,10 @@ impl Replicator {
// DbLedger. Note for now, this ledger will not contain any of the existing entries
// in the ledger located at ledger_path, and will only append on newly received
// entries after being passed to window_service
let db_ledger = Arc::new(
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger"),
);
let db_ledger =
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger");
let db_ledger = Arc::new(db_ledger);
let gossip_service = GossipService::new(
&cluster_info,
@ -172,8 +173,6 @@ impl Replicator {
// todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel();
let (entry_sender, entry_receiver) = channel();
let t_window = window_service(
db_ledger.clone(),
cluster_info.clone(),
@ -181,7 +180,6 @@ impl Replicator {
entry_height,
max_entry_height,
blob_fetch_receiver,
Some(entry_sender),
retransmit_sender,
repair_socket,
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
@ -192,10 +190,10 @@ impl Replicator {
);
info!("window created, waiting for ledger download done");
let start = Instant::now();
let mut received_so_far = 0;
let _start = Instant::now();
let mut _received_so_far = 0;
while !done.load(Ordering::Relaxed) {
/*while !done.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
let elapsed = start.elapsed();
@ -207,7 +205,7 @@ impl Replicator {
"Timed out waiting to receive any blocks",
)));
}
}
}*/
info!("Done receiving entries from window_service");

View File

@ -4,7 +4,6 @@ use crate::bank::Bank;
use crate::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE};
use crate::counter::Counter;
use crate::db_ledger::DbLedger;
use crate::entry::Entry;
use crate::leader_scheduler::LeaderScheduler;
use crate::result::{Error, Result};
use crate::service::Service;
@ -14,8 +13,8 @@ use log::Level;
use solana_metrics::{influxdb, submit};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
@ -135,7 +134,7 @@ impl RetransmitStage {
fetch_stage_receiver: BlobReceiver,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
exit: Arc<AtomicBool>,
) -> (Self, Receiver<Vec<Entry>>) {
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = retransmitter(
@ -144,7 +143,6 @@ impl RetransmitStage {
cluster_info.clone(),
retransmit_receiver,
);
let (entry_sender, entry_receiver) = channel();
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(
db_ledger,
@ -153,7 +151,6 @@ impl RetransmitStage {
entry_height,
0,
fetch_stage_receiver,
Some(entry_sender),
retransmit_sender,
repair_socket,
leader_scheduler,
@ -162,7 +159,7 @@ impl RetransmitStage {
);
let thread_hdls = vec![t_retransmit, t_window];
(Self { thread_hdls }, entry_receiver)
Self { thread_hdls }
}
}

View File

@ -27,7 +27,7 @@ use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::{channel, Receiver, SyncSender};
use std::sync::{Arc, RwLock};
use std::thread;
@ -75,6 +75,8 @@ impl Tvu {
to_leader_sender: TvuRotationSender,
storage_state: &StorageState,
entry_stream: Option<&String>,
ledger_signal_sender: SyncSender<bool>,
ledger_signal_receiver: Receiver<bool>,
) -> (Self, BlobSender) {
let exit = Arc::new(AtomicBool::new(false));
let keypair: Arc<Keypair> = cluster_info
@ -101,7 +103,7 @@ impl Tvu {
//TODO
//the packets coming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction
let (retransmit_stage, blob_window_receiver) = RetransmitStage::new(
let retransmit_stage = RetransmitStage::new(
bank,
db_ledger.clone(),
&cluster_info,
@ -120,14 +122,16 @@ impl Tvu {
let (replay_stage, ledger_entry_receiver) = ReplayStage::new(
keypair.pubkey(),
voting_keypair,
db_ledger.clone(),
bank.clone(),
cluster_info.clone(),
blob_window_receiver,
exit.clone(),
l_entry_height.clone(),
l_last_entry_id.clone(),
to_leader_sender,
entry_stream,
ledger_signal_sender,
ledger_signal_receiver,
);
let storage_stage = StorageStage::new(
@ -167,11 +171,14 @@ impl Tvu {
}
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
// Call exit to make sure replay stage is unblocked from a channel it may be blocked on.
// Then replay stage will set the self.exit variable and cause the rest of the
// pipeline to exit
self.replay_stage.exit();
}
pub fn close(self) -> thread::Result<Option<TvuReturnType>> {
self.fetch_stage.close();
self.exit();
self.join()
}
}
@ -225,6 +232,60 @@ pub mod tests {
GossipService::new(&cluster_info, None, gossip, exit)
}
#[test]
fn test_tvu_exit() {
solana_logger::setup();
let leader = Node::new_localhost();
let target1_keypair = Keypair::new();
let target1 = Node::new_localhost_with_pubkey(target1_keypair.pubkey());
let starting_balance = 10_000;
let (genesis_block, _mint_keypair) = GenesisBlock::new(starting_balance);
let leader_id = leader.info.id;
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_id,
)));
let mut bank = Bank::new(&genesis_block);
bank.leader_scheduler = leader_scheduler;
let bank = Arc::new(bank);
//start cluster_info1
let mut cluster_info1 = ClusterInfo::new(target1.info.clone());
cluster_info1.insert_info(leader.info.clone());
cluster_info1.set_leader(leader.info.id);
let cref1 = Arc::new(RwLock::new(cluster_info1));
let cur_hash = Hash::default();
let db_ledger_path = get_tmp_ledger_path("test_replay");
let (db_ledger, l_sender, l_receiver) = DbLedger::open_with_signal(&db_ledger_path)
.expect("Expected to successfully open ledger");
let vote_account_keypair = Arc::new(Keypair::new());
let voting_keypair = VotingKeypair::new_local(&vote_account_keypair);
let (sender, _receiver) = channel();
let (tvu, _blob_sender) = Tvu::new(
Some(Arc::new(voting_keypair)),
&bank,
0,
cur_hash,
&cref1,
{
Sockets {
repair: target1.sockets.repair,
retransmit: target1.sockets.retransmit,
fetch: target1.sockets.tvu,
}
},
Arc::new(db_ledger),
STORAGE_ROTATE_TEST_COUNT,
sender,
&StorageState::default(),
None,
l_sender,
l_receiver,
);
tvu.close().expect("close");
}
/// Test that message sent from leader to target1 and replayed to target2
#[test]
#[ignore]
@ -287,8 +348,8 @@ pub mod tests {
let mut cur_hash = Hash::default();
let db_ledger_path = get_tmp_ledger_path("test_replay");
let db_ledger =
DbLedger::open(&db_ledger_path).expect("Expected to successfully open ledger");
let (db_ledger, l_sender, l_receiver) = DbLedger::open_with_signal(&db_ledger_path)
.expect("Expected to successfully open ledger");
let vote_account_keypair = Arc::new(Keypair::new());
let voting_keypair = VotingKeypair::new_local(&vote_account_keypair);
let (sender, _) = channel();
@ -310,6 +371,8 @@ pub mod tests {
sender,
&StorageState::default(),
None,
l_sender,
l_receiver,
);
let mut alice_ref_balance = starting_balance;

View File

@ -4,7 +4,6 @@ use crate::cluster_info::ClusterInfo;
use crate::counter::Counter;
use crate::db_ledger::DbLedger;
use crate::db_window::*;
use crate::entry::EntrySender;
use crate::leader_scheduler::LeaderScheduler;
use crate::result::{Error, Result};
@ -57,7 +56,6 @@ fn recv_window(
tick_height: &mut u64,
max_ix: u64,
r: &BlobReceiver,
entry_sender: &Option<EntrySender>,
retransmit: &BlobSender,
done: &Arc<AtomicBool>,
) -> Result<()> {
@ -107,12 +105,6 @@ fn recv_window(
duration_as_ms(&now.elapsed())
);
if !consume_queue.is_empty() {
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
if let Some(entry_sender) = entry_sender {
entry_sender.send(consume_queue)?;
}
}
Ok(())
}
@ -124,7 +116,6 @@ pub fn window_service(
entry_height: u64,
max_entry_height: u64,
r: BlobReceiver,
entry_sender: Option<EntrySender>,
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
@ -150,7 +141,6 @@ pub fn window_service(
&mut tick_height_,
max_entry_height,
&r,
&entry_sender,
&retransmit,
&done,
) {
@ -218,7 +208,7 @@ mod test {
use crate::cluster_info::{ClusterInfo, Node};
use crate::db_ledger::get_tmp_ledger_path;
use crate::db_ledger::DbLedger;
use crate::entry::{make_consecutive_blobs, Entry};
use crate::entry::make_consecutive_blobs;
use crate::leader_scheduler::LeaderScheduler;
use crate::streamer::{blob_receiver, responder};
@ -227,25 +217,11 @@ mod test {
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
fn get_entries(r: Receiver<Vec<Entry>>, num: &mut usize) {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => {
*num += m.len();
}
e => info!("error {:?}", e),
}
if *num == 10 {
break;
}
}
}
#[test]
pub fn window_send_test() {
solana_logger::setup();
@ -262,7 +238,6 @@ mod test {
let (s_reader, r_reader) = channel();
let t_receiver =
blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader);
let (s_window, r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let done = Arc::new(AtomicBool::new(false));
let db_ledger_path = get_tmp_ledger_path("window_send_test");
@ -276,7 +251,6 @@ mod test {
0,
0,
r_reader,
Some(s_window),
s_retransmit,
Arc::new(leader_node.sockets.repair),
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),
@ -305,14 +279,22 @@ mod test {
t_responder
};
let mut num = 0;
get_entries(r_window, &mut num);
assert_eq!(num, 10);
let mut q = r_retransmit.recv().unwrap();
while let Ok(mut nq) = r_retransmit.try_recv() {
q.append(&mut nq);
let max_attempts = 10;
let mut num_attempts = 0;
loop {
assert!(num_attempts != max_attempts);
let mut q = r_retransmit.recv().unwrap();
while let Ok(mut nq) = r_retransmit.try_recv() {
q.append(&mut nq);
}
if q.len() != 10 {
sleep(Duration::from_millis(100));
} else {
break;
}
num_attempts += 1;
}
assert_eq!(q.len(), 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
@ -336,7 +318,6 @@ mod test {
let (s_reader, r_reader) = channel();
let t_receiver =
blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader);
let (s_window, _r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let done = Arc::new(AtomicBool::new(false));
let db_ledger_path = get_tmp_ledger_path("window_send_late_leader_test");
@ -350,7 +331,6 @@ mod test {
0,
0,
r_reader,
Some(s_window),
s_retransmit,
Arc::new(leader_node.sockets.repair),
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),

View File

@ -28,6 +28,7 @@ use std::thread::sleep;
use std::time::Duration;
#[test]
#[ignore]
fn test_replicator_startup() {
solana_logger::setup();
info!("starting replicator test");