remove ledger tail code, WINDOW_SIZE begone (#1617)

* remove WINDOW_SIZE, use window.window_size()
* move ledger tail, redundant with ledger-based repair
This commit is contained in:
Rob Walker 2018-10-30 10:05:18 -07:00 committed by GitHub
parent 3cc78d3a41
commit 13bfdde228
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 256 additions and 323 deletions

View File

@ -103,7 +103,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
verified_receiver,
Default::default(),
&mint.last_id(),
0,
None,
);
@ -204,7 +203,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
verified_receiver,
Default::default(),
&mint.last_id(),
0,
None,
);

View File

@ -40,7 +40,6 @@ use token_program::TokenProgram;
use tokio::prelude::Future;
use transaction::Transaction;
use vote_program::VoteProgram;
use window::WINDOW_SIZE;
/// The number of most recent `last_id` values that the bank will track the signatures
/// of. Once the bank discards a `last_id`, it will reject any transactions that use
@ -906,36 +905,6 @@ impl Bank {
Ok(())
}
/// Process an ordered list of entries, populating a circular buffer "tail"
/// as we go.
fn process_entries_tail(
&self,
entries: &[Entry],
tail: &mut Vec<Entry>,
tail_idx: &mut usize,
) -> Result<u64> {
let mut entry_count = 0;
for entry in entries {
if tail.len() > *tail_idx {
tail[*tail_idx] = entry.clone();
} else {
tail.push(entry.clone());
}
*tail_idx = (*tail_idx + 1) % WINDOW_SIZE as usize;
entry_count += 1;
// TODO: We prepare for implementing voting contract by making the associated
// process_entries functions aware of the vote-tracking structure inside
// the leader scheduler. Next we will extract the vote tracking structure
// out of the leader scheduler, and into the bank, and remove the leader
// scheduler from these banking functions.
self.process_entry(entry)?;
}
Ok(entry_count)
}
/// Process an ordered list of entries.
pub fn process_entries(&self, entries: &[Entry]) -> Result<()> {
self.par_process_entries(entries)
@ -998,44 +967,56 @@ impl Bank {
Ok(())
}
/// Process an ordered list of entries, populating a circular buffer "tail"
/// as we go.
fn process_block(&self, entries: &[Entry]) -> Result<()> {
for entry in entries {
// TODO: We prepare for implementing voting contract by making the associated
// process_entries functions aware of the vote-tracking structure inside
// the leader scheduler. Next we will extract the vote tracking structure
// out of the leader scheduler, and into the bank, and remove the leader
// scheduler from these banking functions.
self.process_entry(entry)?;
}
Ok(())
}
/// Append entry blocks to the ledger, verifying them along the way.
fn process_blocks<I>(
fn process_ledger_blocks<I>(
&self,
start_hash: Hash,
entry_height: u64,
entries: I,
tail: &mut Vec<Entry>,
tail_idx: &mut usize,
) -> Result<u64>
) -> Result<(u64, Hash)>
where
I: IntoIterator<Item = Entry>,
{
// these magic numbers are from genesis of the mint, could pull them
// back out of this loop.
let mut entry_height = entry_height;
let mut last_id = start_hash;
// Ledger verification needs to be parallelized, but we can't pull the whole
// thing into memory. We therefore chunk it.
let mut entry_height = *tail_idx as u64;
for entry in &tail[0..*tail_idx] {
if entry.is_tick() {
*self.tick_height.lock().unwrap() += 1;
}
}
let mut id = start_hash;
for block in &entries.into_iter().chunks(VERIFY_BLOCK_SIZE) {
let block: Vec<_> = block.collect();
if !block.verify(&id) {
if !block.verify(&last_id) {
warn!("Ledger proof of history failed at entry: {}", entry_height);
return Err(BankError::LedgerVerificationFailed);
}
id = block.last().unwrap().id;
let entry_count = self.process_entries_tail(&block, tail, tail_idx)?;
entry_height += entry_count;
self.process_block(&block)?;
last_id = block.last().unwrap().id;
entry_height += block.len() as u64;
}
Ok(entry_height)
Ok((entry_height, last_id))
}
/// Process a full ledger.
pub fn process_ledger<I>(&self, entries: I) -> Result<(u64, u64, Vec<Entry>)>
pub fn process_ledger<I>(&self, entries: I) -> Result<(u64, Hash)>
where
I: IntoIterator<Item = Entry>,
{
@ -1071,20 +1052,8 @@ impl Bank {
}
self.register_entry_id(&entry0.id);
self.register_entry_id(&entry1.id);
let entry1_id = entry1.id;
let mut tail = Vec::with_capacity(WINDOW_SIZE as usize);
tail.push(entry0);
tail.push(entry1);
let mut tail_idx = 2;
let entry_height = self.process_blocks(entry1_id, entries, &mut tail, &mut tail_idx)?;
// check if we need to rotate tail
if tail.len() == WINDOW_SIZE as usize {
tail.rotate_left(tail_idx)
}
Ok((*self.tick_height.lock().unwrap(), entry_height, tail))
Ok(self.process_ledger_blocks(entry1.id, 2, entries)?)
}
/// Create, sign, and process a Transaction from `keypair` to `to` of
@ -1682,42 +1651,12 @@ mod tests {
#[test]
fn test_process_ledger_simple() {
let (ledger, pubkey) = create_sample_ledger(1);
let (ledger, dup) = ledger.tee();
let bank = Bank::default();
let (tick_height, ledger_height, tail) = bank.process_ledger(ledger).unwrap();
let (ledger_height, last_id) = bank.process_ledger(ledger).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, 4);
assert_eq!(tick_height, 2);
assert_eq!(tail.len(), 4);
assert_eq!(tail, dup.collect_vec());
let last_entry = &tail[tail.len() - 1];
// last entry is a tick
assert_eq!(0, last_entry.transactions.len());
// tick is registered
assert_eq!(bank.last_id(), last_entry.id);
}
#[test]
fn test_process_ledger_around_window_size() {
// TODO: put me back in when Criterion is up
// for _ in 0..10 {
// let (ledger, _) = create_sample_ledger(WINDOW_SIZE as usize);
// let bank = Bank::default();
// let (_, _) = bank.process_ledger(ledger).unwrap();
// }
let window_size = 128;
for entry_count in window_size - 3..window_size + 2 {
let (ledger, pubkey) = create_sample_ledger(entry_count);
let bank = Bank::default();
let (tick_height, ledger_height, tail) = bank.process_ledger(ledger).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, entry_count as u64 + 3);
assert_eq!(tick_height, 2);
assert!(tail.len() <= WINDOW_SIZE as usize);
let last_entry = &tail[tail.len() - 1];
assert_eq!(bank.last_id(), last_entry.id);
}
assert_eq!(bank.get_tick_height(), 1);
assert_eq!(bank.last_id(), last_id);
}
// Write the given entries to a file and then return a file iterator to them.

View File

@ -48,7 +48,6 @@ impl BankingStage {
verified_receiver: Receiver<VerifiedPackets>,
config: Config,
last_entry_id: &Hash,
tick_height: u64,
max_tick_height: Option<u64>,
) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel();
@ -57,7 +56,6 @@ impl BankingStage {
bank.clone(),
entry_sender,
*last_entry_id,
tick_height,
max_tick_height,
false,
vec![],
@ -267,7 +265,6 @@ mod tests {
verified_receiver,
Default::default(),
&bank.last_id(),
0,
None,
);
drop(verified_sender);
@ -286,7 +283,6 @@ mod tests {
verified_receiver,
Default::default(),
&bank.last_id(),
0,
None,
);
drop(entry_receiver);
@ -306,7 +302,6 @@ mod tests {
verified_receiver,
Config::Sleep(Duration::from_millis(1)),
&bank.last_id(),
0,
None,
);
sleep(Duration::from_millis(500));
@ -333,7 +328,6 @@ mod tests {
verified_receiver,
Default::default(),
&bank.last_id(),
0,
None,
);
@ -388,7 +382,6 @@ mod tests {
verified_receiver,
Default::default(),
&bank.last_id(),
0,
None,
);
@ -442,7 +435,6 @@ mod tests {
verified_receiver,
Default::default(),
&bank.last_id(),
0,
Some(max_tick_height),
);
assert_eq!(

View File

@ -21,7 +21,7 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant};
use timing::duration_as_ms;
use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE};
use window::{self, SharedWindow, WindowIndex, WindowUtil};
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BroadcastStageReturnType {
@ -71,16 +71,16 @@ fn broadcast(
let blobs_chunking = Instant::now();
// We could receive more blobs than window slots so
// break them up into window-sized chunks to process
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
let window_size = window.read().unwrap().window_size();
let blobs_chunked = blobs_vec.chunks(window_size as usize).map(|x| x.to_vec());
let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed());
trace!("{}", window.read().unwrap().print(&id, *receive_index));
let broadcast_start = Instant::now();
for mut blobs in blobs_chunked {
let blobs_len = blobs.len();
trace!("{}: broadcast blobs.len: {}", id, blobs_len);
// TODO: move all this into window.rs
// Index the blobs
window::index_blobs(node_info, &blobs, receive_index)
.expect("index blobs for initial window");
@ -92,7 +92,7 @@ fn broadcast(
assert!(blobs.len() <= win.len());
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
let pos = (ix % window_size) as usize;
if let Some(x) = win[pos].data.take() {
trace!(
"{} popped {} at {}",
@ -114,7 +114,7 @@ fn broadcast(
}
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
let pos = (ix % window_size) as usize;
trace!("{} caching {} at {}", id, ix, pos);
assert!(win[pos].data.is_none());
win[pos].data = Some(b.clone());

View File

@ -329,13 +329,13 @@ mod tests {
let ledger_path = get_tmp_ledger_path("send_airdrop");
let vote_account_keypair = Arc::new(Keypair::new());
let last_id = bank.last_id();
let server = Fullnode::new_with_bank(
leader_keypair,
vote_account_keypair,
bank,
0,
0,
&[],
&last_id,
leader,
None,
&ledger_path,

View File

@ -3,7 +3,6 @@
use bank::Bank;
use broadcast_stage::BroadcastStage;
use cluster_info::{ClusterInfo, Node, NodeInfo};
use entry::Entry;
use hash::Hash;
use leader_scheduler::LeaderScheduler;
use ledger::read_ledger;
@ -21,7 +20,7 @@ use std::thread::Result;
use tpu::{Tpu, TpuReturnType};
use tvu::{Tvu, TvuReturnType};
use untrusted::Input;
use window;
use window::{new_window, SharedWindow};
pub enum NodeRole {
Leader(LeaderServices),
@ -96,7 +95,7 @@ pub struct Fullnode {
cluster_info: Arc<RwLock<ClusterInfo>>,
ledger_path: String,
sigverify_disabled: bool,
shared_window: window::SharedWindow,
shared_window: SharedWindow,
replicate_socket: Vec<UdpSocket>,
repair_socket: UdpSocket,
retransmit_socket: UdpSocket,
@ -142,7 +141,8 @@ impl Fullnode {
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
info!("creating bank...");
let (bank, tick_height, entry_height, ledger_tail) =
let (bank, entry_height, last_id) =
Self::new_bank_from_ledger(ledger_path, leader_scheduler);
info!("creating networking stack...");
@ -160,9 +160,8 @@ impl Fullnode {
keypair,
vote_account_keypair,
bank,
tick_height,
entry_height,
&ledger_tail,
&last_id,
node,
leader_info.as_ref(),
ledger_path,
@ -243,9 +242,8 @@ impl Fullnode {
keypair: Arc<Keypair>,
vote_account_keypair: Arc<Keypair>,
bank: Bank,
tick_height: u64,
entry_height: u64,
ledger_tail: &[Entry],
last_id: &Hash,
node: Node,
bootstrap_leader_info_option: Option<&NodeInfo>,
ledger_path: &str,
@ -267,12 +265,7 @@ impl Fullnode {
.expect("Failed to clone respond socket"),
));
let last_entry_id = &ledger_tail
.last()
.expect("Expected at least one entry in the ledger")
.id;
let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info);
let window = new_window(32 * 1024);
let shared_window = Arc::new(RwLock::new(window));
let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(node.info).expect("ClusterInfo::new"),
@ -330,7 +323,7 @@ impl Fullnode {
} else {
let max_tick_height = {
let ls_lock = bank.leader_scheduler.read().unwrap();
ls_lock.max_height_for_leader(tick_height)
ls_lock.max_height_for_leader(bank.get_tick_height())
};
// Start in leader mode.
let (tpu, entry_receiver, tpu_exit) = Tpu::new(
@ -343,9 +336,8 @@ impl Fullnode {
.collect(),
ledger_path,
sigverify_disabled,
tick_height,
max_tick_height,
last_entry_id,
last_id,
);
let broadcast_stage = BroadcastStage::new(
@ -358,7 +350,7 @@ impl Fullnode {
entry_height,
entry_receiver,
bank.leader_scheduler.clone(),
tick_height,
bank.get_tick_height(),
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);
@ -414,10 +406,10 @@ impl Fullnode {
// Clear the leader scheduler
new_leader_scheduler.reset();
let (new_bank, scheduled_leader, tick_height, entry_height, last_entry_id) = {
let (new_bank, scheduled_leader, entry_height, last_entry_id) = {
// TODO: We can avoid building the bank again once RecordStage is
// integrated with BankingStage
let (new_bank, tick_height, entry_height, ledger_tail) = Self::new_bank_from_ledger(
let (new_bank, entry_height, last_id) = Self::new_bank_from_ledger(
&self.ledger_path,
Arc::new(RwLock::new(new_leader_scheduler)),
);
@ -427,16 +419,7 @@ impl Fullnode {
.get_current_leader()
.expect("Scheduled leader should exist after rebuilding bank");
(
new_bank,
scheduled_leader,
tick_height,
entry_height,
ledger_tail
.last()
.expect("Expected at least one entry in the ledger")
.id,
)
(new_bank, scheduled_leader, entry_height, last_id)
};
self.cluster_info
@ -467,6 +450,7 @@ impl Fullnode {
// in the active set, then the leader scheduler will pick the same leader again, so
// check for that
if scheduled_leader == self.keypair.pubkey() {
let tick_height = self.bank.get_tick_height();
self.validator_to_leader(tick_height, entry_height, last_entry_id);
Ok(())
} else {
@ -495,7 +479,7 @@ impl Fullnode {
}
}
fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_entry_id: Hash) {
fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_id: Hash) {
self.cluster_info
.write()
.unwrap()
@ -515,12 +499,11 @@ impl Fullnode {
.collect(),
&self.ledger_path,
self.sigverify_disabled,
tick_height,
max_tick_height,
// We pass the last_entry_id from the replicate stage because we can't trust that
// the window didn't overwrite the slot at for the last entry that the replicate stage
// processed. We also want to avoid reading processing the ledger for the last id.
&last_entry_id,
&last_id,
);
let broadcast_stage = BroadcastStage::new(
@ -596,19 +579,19 @@ impl Fullnode {
pub fn new_bank_from_ledger(
ledger_path: &str,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> (Bank, u64, u64, Vec<Entry>) {
) -> (Bank, u64, Hash) {
let mut bank = Bank::new_with_builtin_programs();
bank.leader_scheduler = leader_scheduler;
let entries = read_ledger(ledger_path, true).expect("opening ledger");
let entries = entries
.map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err)));
info!("processing ledger...");
let (tick_height, entry_height, ledger_tail) =
bank.process_ledger(entries).expect("process_ledger");
let (entry_height, last_id) = bank.process_ledger(entries).expect("process_ledger");
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
info!("processed {} ledger...", entry_height);
(bank, tick_height, entry_height, ledger_tail)
(bank, entry_height, last_id)
}
pub fn get_leader_scheduler(&self) -> &Arc<RwLock<LeaderScheduler>> {
@ -707,13 +690,13 @@ mod tests {
)));
bank.leader_scheduler = leader_scheduler;
let last_id = bank.last_id();
let v = Fullnode::new_with_bank(
Arc::new(keypair),
Arc::new(Keypair::new()),
bank,
0,
entry_height,
&genesis_entries,
&last_id,
tn,
Some(&entry),
&validator_ledger_path,
@ -737,21 +720,19 @@ mod tests {
let mut bank = Bank::new(&mint);
let entry = tn.info.clone();
let genesis_entries = &mint.create_entries();
let entry_height = genesis_entries.len() as u64;
let leader_scheduler = Arc::new(RwLock::new(
LeaderScheduler::from_bootstrap_leader(entry.id),
));
bank.leader_scheduler = leader_scheduler;
let entry_height = mint.create_entries().len() as u64;
let last_id = bank.last_id();
Fullnode::new_with_bank(
Arc::new(keypair),
Arc::new(Keypair::new()),
bank,
0,
entry_height,
&genesis_entries,
&last_id,
tn,
Some(&entry),
&validator_ledger_path,
@ -788,6 +769,7 @@ mod tests {
let initial_tick_height = genesis_entries
.iter()
.skip(2)
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64);
// Create the common leader scheduling configuration
@ -873,6 +855,7 @@ mod tests {
let genesis_tick_height = genesis_entries
.iter()
.skip(2)
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64)
+ num_ending_ticks as u64;
ledger_writer.write_entries(&active_set_entries).unwrap();
@ -970,6 +953,7 @@ mod tests {
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0);
let initial_tick_height = genesis_entries
.iter()
.skip(2)
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64);
let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height;
let active_set_entries_len = active_set_entries.len() as u64;
@ -1053,12 +1037,12 @@ mod tests {
// Check the validator ledger for the correct entry + tick heights, we should've
// transitioned after tick_height = bootstrap_height.
let (_, tick_height, entry_height, _) = Fullnode::new_bank_from_ledger(
let (bank, entry_height, _) = Fullnode::new_bank_from_ledger(
&validator_ledger_path,
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))),
);
assert_eq!(tick_height, bootstrap_height);
assert_eq!(bank.get_tick_height(), bootstrap_height);
assert_eq!(
entry_height,
// Only the first genesis entry has num_hashes = 0, every other entry

View File

@ -27,7 +27,6 @@ use std::path::Path;
use transaction::Transaction;
use vote_program::Vote;
use vote_transaction::VoteTransaction;
use window::WINDOW_SIZE;
//
// A persistent ledger is 2 files:
@ -78,6 +77,10 @@ pub struct LedgerWindow {
pub const LEDGER_DATA_FILE: &str = "data";
const LEDGER_INDEX_FILE: &str = "index";
const LEDGER_BUF_COUNT: usize = 32 * 1024;
const LEDGER_DATA_BUF_SIZE: usize = LEDGER_BUF_COUNT * BLOB_DATA_SIZE;
const LEDGER_INDEX_BUF_SIZE: usize = LEDGER_BUF_COUNT * SIZEOF_U64 as usize;
// use a CONST because there's a cast, and we don't want "sizeof::<u64> as u64"...
const SIZEOF_U64: u64 = size_of::<u64>() as u64;
@ -113,9 +116,9 @@ impl LedgerWindow {
let ledger_path = Path::new(&ledger_path);
let index = File::open(ledger_path.join(LEDGER_INDEX_FILE))?;
let index = BufReader::with_capacity((WINDOW_SIZE * SIZEOF_U64) as usize, index);
let index = BufReader::with_capacity(LEDGER_INDEX_BUF_SIZE, index);
let data = File::open(ledger_path.join(LEDGER_DATA_FILE))?;
let data = BufReader::with_capacity(WINDOW_SIZE as usize * BLOB_DATA_SIZE, data);
let data = BufReader::with_capacity(LEDGER_DATA_BUF_SIZE, data);
Ok(LedgerWindow { index, data })
}
@ -185,10 +188,10 @@ pub fn verify_ledger(ledger_path: &str) -> io::Result<()> {
format!("index is not a multiple of {} bytes long", SIZEOF_U64),
))?;
}
let mut index = BufReader::with_capacity((WINDOW_SIZE * SIZEOF_U64) as usize, index);
let mut index = BufReader::with_capacity(LEDGER_INDEX_BUF_SIZE, index);
let data = File::open(ledger_path.join(LEDGER_DATA_FILE))?;
let mut data = BufReader::with_capacity(WINDOW_SIZE as usize * BLOB_DATA_SIZE, data);
let mut data = BufReader::with_capacity(LEDGER_DATA_BUF_SIZE, data);
let mut last_data_offset = 0;
let mut index_offset = 0;
@ -503,6 +506,7 @@ impl Block for [Entry] {
}
}
// TODO: move this to the right file, entry.rs?
pub fn reconstruct_entries_from_blobs(blobs: Vec<SharedBlob>) -> Result<Vec<Entry>> {
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());

View File

@ -82,12 +82,11 @@ impl PohRecorder {
bank: Arc<Bank>,
sender: Sender<Vec<Entry>>,
last_entry_id: Hash,
tick_height: u64,
max_tick_height: Option<u64>,
is_virtual: bool,
virtual_tick_entries: Vec<Entry>,
) -> Self {
let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height)));
let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, bank.get_tick_height())));
let virtual_tick_entries = Arc::new(Mutex::new(virtual_tick_entries));
PohRecorder {
poh,
@ -156,8 +155,7 @@ mod tests {
let bank = Arc::new(Bank::new(&mint));
let last_id = bank.last_id();
let (entry_sender, entry_receiver) = channel();
let mut poh_recorder =
PohRecorder::new(bank, entry_sender, last_id, 0, None, false, vec![]);
let mut poh_recorder = PohRecorder::new(bank, entry_sender, last_id, None, false, vec![]);
//send some data
let h1 = hash(b"hello world!");

View File

@ -276,6 +276,7 @@ mod test {
last_id = active_set_entries.last().unwrap().id;
let initial_tick_height = genesis_entries
.iter()
.skip(2)
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64);
let active_set_entries_len = active_set_entries.len() as u64;
let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height;
@ -300,7 +301,7 @@ mod test {
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
// Set up the bank
let (bank, _, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
// Set up the replicate stage
let (entry_sender, entry_receiver) = channel();

View File

@ -79,7 +79,8 @@ impl Replicator {
network_addr: Option<SocketAddr>,
done: Arc<AtomicBool>,
) -> (Replicator, NodeInfo) {
let window = window::new_window_from_entries(&[], entry_height, &node.info);
const REPLICATOR_WINDOW_SIZE: usize = 32 * 1024;
let window = window::new_window(REPLICATOR_WINDOW_SIZE);
let shared_window = Arc::new(RwLock::new(window));
let cluster_info = Arc::new(RwLock::new(

View File

@ -610,22 +610,19 @@ mod tests {
let serial_tx = serialize(&tx).unwrap();
let rpc_port = 22222; // Needs to be distinct known number to not conflict with other tests
let genesis_entries = &alice.create_entries();
let entry_height = genesis_entries.len() as u64;
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_data.id,
)));
bank.leader_scheduler = leader_scheduler;
let vote_account_keypair = Arc::new(Keypair::new());
let entry_height = alice.create_entries().len() as u64;
let server = Fullnode::new_with_bank(
leader_keypair,
vote_account_keypair,
bank,
0,
entry_height,
&genesis_entries,
&last_id,
leader,
None,
&ledger_path,

30
src/settings.rs.foo Normal file
View File

@ -0,0 +1,30 @@
//! The `config` module pulls together global configuration
//!
use config::{Config, Value};
use serde::de::Deserialize;
use std::sync::RwLock;
lazy_static! {
static ref SETTINGS: RwLock<Config> = {
let settings = RwLock::new(Config::default());
{
let mut settings = settings.write().unwrap();
// defaults go here
settings.set_default("window_size", 32*1024).unwrap();
}
settings
};
}
pub fn get<'de, T: Deserialize<'de>>(key: &'de str) -> T {
SETTINGS.read().unwrap().get(key).unwrap()
}
pub fn set<T>(key: &str, value: T) -> ()
where
T: Into<Value>,
{
SETTINGS.write().unwrap().set(key, value).unwrap();
}

View File

@ -502,13 +502,13 @@ mod tests {
)));
bank.leader_scheduler = leader_scheduler;
let vote_account_keypair = Arc::new(Keypair::new());
let last_id = bank.last_id();
let server = Fullnode::new_with_bank(
leader_keypair,
vote_account_keypair,
bank,
0,
0,
&[],
&last_id,
leader,
None,
&ledger_path,
@ -555,13 +555,13 @@ mod tests {
)));
bank.leader_scheduler = leader_scheduler;
let vote_account_keypair = Arc::new(Keypair::new());
let last_id = bank.last_id();
let server = Fullnode::new_with_bank(
leader_keypair,
vote_account_keypair,
bank,
0,
0,
&[],
&last_id,
leader,
None,
&ledger_path,
@ -616,21 +616,19 @@ mod tests {
let leader_data = leader.info.clone();
let ledger_path = create_tmp_ledger_with_mint("client_check_signature", &alice);
let genesis_entries = &alice.create_entries();
let entry_height = genesis_entries.len() as u64;
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_data.id,
)));
bank.leader_scheduler = leader_scheduler;
let vote_account_keypair = Arc::new(Keypair::new());
let entry_height = alice.create_entries().len() as u64;
let last_id = bank.last_id();
let server = Fullnode::new_with_bank(
leader_keypair,
vote_account_keypair,
bank,
0,
entry_height,
&genesis_entries,
&last_id,
leader,
None,
&ledger_path,
@ -686,21 +684,19 @@ mod tests {
let leader_data = leader.info.clone();
let ledger_path = create_tmp_ledger_with_mint("zero_balance_check", &alice);
let genesis_entries = &alice.create_entries();
let entry_height = genesis_entries.len() as u64;
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_data.id,
)));
bank.leader_scheduler = leader_scheduler;
let vote_account_keypair = Arc::new(Keypair::new());
let last_id = bank.last_id();
let entry_height = alice.create_entries().len() as u64;
let server = Fullnode::new_with_bank(
leader_keypair,
vote_account_keypair,
bank,
0,
entry_height,
&genesis_entries,
&last_id,
leader,
None,
&ledger_path,

View File

@ -61,7 +61,6 @@ impl Tpu {
transactions_sockets: Vec<UdpSocket>,
ledger_path: &str,
sigverify_disabled: bool,
tick_height: u64,
max_tick_height: Option<u64>,
last_entry_id: &Hash,
) -> (Self, Receiver<Vec<Entry>>, Arc<AtomicBool>) {
@ -77,7 +76,6 @@ impl Tpu {
verified_receiver,
tick_duration,
last_entry_id,
tick_height,
max_tick_height,
);

View File

@ -1092,13 +1092,14 @@ mod tests {
)));
bank.leader_scheduler = leader_scheduler;
let vote_account_keypair = Arc::new(Keypair::new());
let last_id = bank.last_id();
let server = Fullnode::new_with_bank(
leader_keypair,
vote_account_keypair,
bank,
0,
0,
&[],
&last_id,
leader,
None,
&ledger_path,
@ -1167,21 +1168,19 @@ mod tests {
let rpc_port = 11111; // Needs to be distinct known number to not conflict with other tests
let genesis_entries = &alice.create_entries();
let entry_height = genesis_entries.len() as u64;
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_data.id,
)));
bank.leader_scheduler = leader_scheduler;
let vote_account_keypair = Arc::new(Keypair::new());
let last_id = bank.last_id();
let entry_height = alice.create_entries().len() as u64;
let server = Fullnode::new_with_bank(
leader_keypair,
vote_account_keypair,
bank,
0,
entry_height,
&genesis_entries,
&last_id,
leader,
None,
&ledger_path,
@ -1258,13 +1257,13 @@ mod tests {
)));
bank.leader_scheduler = leader_scheduler;
let vote_account_keypair = Arc::new(Keypair::new());
let last_id = bank.last_id();
let server = Fullnode::new_with_bank(
leader_keypair,
vote_account_keypair,
bank,
0,
0,
&[],
&last_id,
leader,
None,
&ledger_path,
@ -1377,19 +1376,18 @@ mod tests {
let mut config_payer = WalletConfig::default();
let mut config_witness = WalletConfig::default();
let rpc_port = 11223; // Needs to be distinct known number to not conflict with other tests
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_data.id,
)));
bank.leader_scheduler = leader_scheduler;
let vote_account_keypair = Arc::new(Keypair::new());
let last_id = bank.last_id();
let server = Fullnode::new_with_bank(
leader_keypair,
vote_account_keypair,
bank,
0,
0,
&[],
&last_id,
leader,
None,
&ledger_path,
@ -1506,13 +1504,13 @@ mod tests {
)));
bank.leader_scheduler = leader_scheduler;
let vote_account_keypair = Arc::new(Keypair::new());
let last_id = bank.last_id();
let server = Fullnode::new_with_bank(
leader_keypair,
vote_account_keypair,
bank,
0,
0,
&[],
&last_id,
leader,
None,
&ledger_path,

View File

@ -6,7 +6,7 @@ use entry::Entry;
#[cfg(feature = "erasure")]
use erasure;
use leader_scheduler::LeaderScheduler;
use ledger::{reconstruct_entries_from_blobs, Block};
use ledger::reconstruct_entries_from_blobs;
use log::Level;
use packet::SharedBlob;
use result::Result;
@ -17,8 +17,6 @@ use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, RwLock};
pub const WINDOW_SIZE: u64 = 32 * 1024;
#[derive(Default, Clone)]
pub struct WindowSlot {
pub data: Option<SharedBlob>,
@ -52,6 +50,8 @@ pub trait WindowUtil {
/// Finds available slots, clears them, and returns their indices.
fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec<u64>;
fn window_size(&self) -> u64;
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
fn repair(
&mut self,
@ -79,13 +79,15 @@ pub trait WindowUtil {
leader_unknown: bool,
pending_retransmits: &mut bool,
);
fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool;
}
impl WindowUtil for Window {
fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec<u64> {
(consumed..received)
.filter_map(|pix| {
let i = (pix % WINDOW_SIZE) as usize;
let i = (pix % self.window_size()) as usize;
if let Some(blob_idx) = self[i].blob_index() {
if blob_idx == pix {
return None;
@ -96,6 +98,43 @@ impl WindowUtil for Window {
}).collect()
}
fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool {
// Prevent receive window from running over
// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < consumed {
trace!(
"{}: received: {} but older than consumed: {} skipping..",
id,
pix,
consumed
);
false
} else {
// received always has to be updated even if we don't accept the packet into
// the window. The worst case here is the server *starts* outside
// the window, none of the packets it receives fits in the window
// and repair requests (which are based on received) are never generated
*received = cmp::max(pix, *received);
if pix >= consumed + self.window_size() {
trace!(
"{}: received: {} will overrun window: {} skipping..",
id,
pix,
consumed + self.window_size()
);
false
} else {
true
}
}
}
fn window_size(&self) -> u64 {
self.len() as u64
}
fn repair(
&mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>,
@ -144,7 +183,14 @@ impl WindowUtil for Window {
let num_peers = rcluster_info.table.len() as u64;
let max_repair = if max_entry_height == 0 {
calculate_max_repair(num_peers, consumed, received, times, is_next_leader)
calculate_max_repair(
num_peers,
consumed,
received,
times,
is_next_leader,
self.window_size(),
)
} else {
max_entry_height + 1
};
@ -181,7 +227,7 @@ impl WindowUtil for Window {
.iter()
.enumerate()
.map(|(i, _v)| {
if i == (consumed % WINDOW_SIZE) as usize {
if i == (consumed % self.window_size()) as usize {
"V"
} else {
" "
@ -237,7 +283,7 @@ impl WindowUtil for Window {
leader_unknown: bool,
pending_retransmits: &mut bool,
) {
let w = (pix % WINDOW_SIZE) as usize;
let w = (pix % self.window_size()) as usize;
let is_coding = blob.read().unwrap().is_coding();
@ -283,14 +329,15 @@ impl WindowUtil for Window {
#[cfg(feature = "erasure")]
{
if erasure::recover(id, self, *consumed, (*consumed % WINDOW_SIZE) as usize).is_err() {
let window_size = self.window_size();
if erasure::recover(id, self, *consumed, (*consumed % window_size) as usize).is_err() {
trace!("{}: erasure::recover failed", id);
}
}
// push all contiguous blobs into consumed queue, increment consumed
loop {
let k = (*consumed % WINDOW_SIZE) as usize;
let k = (*consumed % self.window_size()) as usize;
trace!("{}: k: {} consumed: {}", id, k, *consumed,);
let k_data_blob;
@ -334,6 +381,7 @@ fn calculate_max_repair(
received: u64,
times: usize,
is_next_leader: bool,
window_size: u64,
) -> u64 {
// Calculate the highest blob index that this node should have already received
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
@ -350,44 +398,15 @@ fn calculate_max_repair(
// This check prevents repairing a blob that will cause window to roll over. Even if
// the highes_lost blob is actually missing, asking to repair it might cause our
// current window to move past other missing blobs
cmp::min(consumed + WINDOW_SIZE - 1, max_repair)
cmp::min(consumed + window_size - 1, max_repair)
}
pub fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool {
// Prevent receive window from running over
// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < consumed {
trace!(
"{}: received: {} but older than consumed: {} skipping..",
id,
pix,
consumed
);
false
} else {
// received always has to be updated even if we don't accept the packet into
// the window. The worst case here is the server *starts* outside
// the window, none of the packets it receives fits in the window
// and repair requests (which are based on received) are never generated
*received = cmp::max(pix, *received);
if pix >= consumed + WINDOW_SIZE {
trace!(
"{}: received: {} will overrun window: {} skipping..",
id,
pix,
consumed + WINDOW_SIZE
);
false
} else {
true
}
}
pub fn new_window(window_size: usize) -> Window {
(0..window_size).map(|_| WindowSlot::default()).collect()
}
pub fn default_window() -> Window {
(0..WINDOW_SIZE).map(|_| WindowSlot::default()).collect()
(0..2048).map(|_| WindowSlot::default()).collect()
}
pub fn index_blobs(
@ -410,52 +429,6 @@ pub fn index_blobs(
Ok(())
}
/// Initialize a rebroadcast window with most recent Entry blobs
/// * `cluster_info` - gossip instance, used to set blob ids
/// * `blobs` - up to WINDOW_SIZE most recent blobs
/// * `entry_height` - current entry height
pub fn initialized_window(
node_info: &NodeInfo,
blobs: Vec<SharedBlob>,
entry_height: u64,
) -> Window {
let mut window = default_window();
let id = node_info.id;
trace!(
"{} initialized window entry_height:{} blobs_len:{}",
id,
entry_height,
blobs.len()
);
// Index the blobs
let mut received = entry_height - blobs.len() as u64;
index_blobs(&node_info, &blobs, &mut received).expect("index blobs for initial window");
// populate the window, offset by implied index
let diff = cmp::max(blobs.len() as isize - window.len() as isize, 0) as usize;
for b in blobs.into_iter().skip(diff) {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("{} caching {} at {}", id, ix, pos);
assert!(window[pos].data.is_none());
window[pos].data = Some(b);
}
window
}
pub fn new_window_from_entries(
ledger_tail: &[Entry],
entry_height: u64,
node_info: &NodeInfo,
) -> Window {
// convert to blobs
let blobs = ledger_tail.to_blobs();
initialized_window(&node_info, blobs, entry_height)
}
#[cfg(test)]
mod test {
use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
@ -468,7 +441,7 @@ mod test {
use std::sync::Arc;
use std::time::Duration;
use streamer::{receiver, responder, PacketReceiver};
use window::{blob_idx_in_window, calculate_max_repair, WINDOW_SIZE};
use window::{calculate_max_repair, new_window, Window, WindowUtil};
fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
@ -530,59 +503,79 @@ mod test {
}
#[test]
pub fn calculate_max_repair_test() {
assert_eq!(calculate_max_repair(0, 10, 90, 0, false), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 32, false), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 0, false), 75);
assert_eq!(calculate_max_repair(90, 10, 90, 0, false), 10);
assert_eq!(calculate_max_repair(90, 10, 50, 0, false), 10);
assert_eq!(calculate_max_repair(90, 10, 99, 0, false), 10);
assert_eq!(calculate_max_repair(90, 10, 101, 0, false), 11);
pub fn test_calculate_max_repair() {
const WINDOW_SIZE: u64 = 200;
assert_eq!(calculate_max_repair(0, 10, 90, 0, false, WINDOW_SIZE), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 32, false, WINDOW_SIZE), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 0, false, WINDOW_SIZE), 75);
assert_eq!(calculate_max_repair(90, 10, 90, 0, false, WINDOW_SIZE), 10);
assert_eq!(calculate_max_repair(90, 10, 50, 0, false, WINDOW_SIZE), 10);
assert_eq!(calculate_max_repair(90, 10, 99, 0, false, WINDOW_SIZE), 10);
assert_eq!(calculate_max_repair(90, 10, 101, 0, false, WINDOW_SIZE), 11);
assert_eq!(
calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0, false),
calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
WINDOW_SIZE + 5
);
assert_eq!(
calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0, false),
calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0, false),
calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0, false),
calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, false),
calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, false, WINDOW_SIZE),
WINDOW_SIZE
);
assert_eq!(
calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, true),
calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, true, WINDOW_SIZE),
50 + WINDOW_SIZE
);
}
fn wrap_blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: u64) -> (bool, u64) {
fn wrap_blob_idx_in_window(
window: &Window,
id: &Pubkey,
pix: u64,
consumed: u64,
received: u64,
) -> (bool, u64) {
let mut received = received;
let is_in_window = blob_idx_in_window(&id, pix, consumed, &mut received);
let is_in_window = window.blob_idx_in_window(&id, pix, consumed, &mut received);
(is_in_window, received)
}
#[test]
pub fn blob_idx_in_window_test() {
pub fn test_blob_idx_in_window() {
let id = Pubkey::default();
const WINDOW_SIZE: u64 = 200;
let window = new_window(WINDOW_SIZE as usize);
assert_eq!(
wrap_blob_idx_in_window(&id, 90 + WINDOW_SIZE, 90, 100),
wrap_blob_idx_in_window(&window, &id, 90 + WINDOW_SIZE, 90, 100),
(false, 90 + WINDOW_SIZE)
);
assert_eq!(
wrap_blob_idx_in_window(&id, 91 + WINDOW_SIZE, 90, 100),
wrap_blob_idx_in_window(&window, &id, 91 + WINDOW_SIZE, 90, 100),
(false, 91 + WINDOW_SIZE)
);
assert_eq!(wrap_blob_idx_in_window(&id, 89, 90, 100), (false, 100));
assert_eq!(
wrap_blob_idx_in_window(&window, &id, 89, 90, 100),
(false, 100)
);
assert_eq!(wrap_blob_idx_in_window(&id, 91, 90, 100), (true, 100));
assert_eq!(wrap_blob_idx_in_window(&id, 101, 90, 100), (true, 101));
assert_eq!(
wrap_blob_idx_in_window(&window, &id, 91, 90, 100),
(true, 100)
);
assert_eq!(
wrap_blob_idx_in_window(&window, &id, 101, 90, 100),
(true, 101)
);
}
}

View File

@ -19,7 +19,7 @@ use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
use streamer::{BlobReceiver, BlobSender};
use timing::duration_as_ms;
use window::{blob_idx_in_window, SharedWindow, WindowUtil};
use window::{SharedWindow, WindowUtil};
pub const MAX_REPAIR_BACKOFF: usize = 128;
@ -206,7 +206,11 @@ fn recv_window(
};
pixs.push(pix);
if !blob_idx_in_window(&id, pix, *consumed, received) {
if !window
.read()
.unwrap()
.blob_idx_in_window(&id, pix, *consumed, received)
{
continue;
}

View File

@ -24,7 +24,7 @@ use solana::system_transaction::SystemTransaction;
use solana::thin_client::ThinClient;
use solana::timing::{duration_as_ms, duration_as_s};
use solana::transaction::Transaction;
use solana::window::{default_window, WINDOW_SIZE};
use solana::window::default_window;
use solana_sdk::pubkey::Pubkey;
use std::collections::{HashSet, VecDeque};
use std::env;
@ -126,7 +126,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
// write a bunch more ledger into leader's ledger, this should populate his window
// and force him to respond to repair from the ledger window
{
let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize);
let entries = make_tiny_test_entries(alice.last_id(), 100);
let mut writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
writer.write_entries(&entries).unwrap();
@ -897,12 +897,12 @@ fn test_leader_to_validator_transition() {
// Check the ledger to make sure it's the right height, we should've
// transitioned after tick_height == bootstrap_height
let (_, tick_height, _, _) = Fullnode::new_bank_from_ledger(
let (bank, _, _) = Fullnode::new_bank_from_ledger(
&leader_ledger_path,
Arc::new(RwLock::new(LeaderScheduler::default())),
);
assert_eq!(tick_height, bootstrap_height);
assert_eq!(bank.get_tick_height(), bootstrap_height);
// Shut down
ncp.close().unwrap();