Remove tick counting from broadast service

This commit is contained in:
Anatoly Yakovenko 2019-02-25 13:50:31 -08:00 committed by Grimes
parent ba5077701d
commit 58eebd7f6c
4 changed files with 51 additions and 54 deletions

View File

@ -50,7 +50,7 @@ impl BankingStage {
last_entry_id: &Hash,
max_tick_height: u64,
leader_id: Pubkey,
) -> (Self, Receiver<Vec<Entry>>) {
) -> (Self, Receiver<Vec<(Entry, u64)>>) {
let (entry_sender, entry_receiver) = channel();
let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
let working_bank = WorkingBank {
@ -394,7 +394,10 @@ mod tests {
sleep(Duration::from_millis(500));
drop(verified_sender);
let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect();
let entries: Vec<_> = entry_receiver
.iter()
.flat_map(|x| x.into_iter().map(|e| e.0))
.collect();
assert!(entries.len() != 0);
assert!(entries.verify(&start_hash));
assert_eq!(entries[entries.len() - 1].id, bank.last_id());
@ -440,7 +443,11 @@ mod tests {
drop(verified_sender);
//receive entries + ticks
let entries: Vec<_> = entry_receiver.iter().map(|x| x).collect();
let entries: Vec<Vec<Entry>> = entry_receiver
.iter()
.map(|x| x.into_iter().map(|e| e.0).collect())
.collect();
assert!(entries.len() >= 1);
let mut last_id = start_hash;
@ -500,7 +507,10 @@ mod tests {
banking_stage.join().unwrap();
// Collect the ledger and feed it to a new bank.
let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect();
let entries: Vec<_> = entry_receiver
.iter()
.flat_map(|x| x.into_iter().map(|e| e.0))
.collect();
// same assertion as running through the bank, really...
assert!(entries.len() >= 2);
@ -619,7 +629,7 @@ mod tests {
poh_recorder.lock().unwrap().set_working_bank(working_bank);
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len());
assert_eq!(entries[0].0.transactions.len(), transactions.len());
// ProgramErrors should still be recorded
results[0] = Err(BankError::ProgramError(
@ -628,13 +638,13 @@ mod tests {
));
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len());
assert_eq!(entries[0].0.transactions.len(), transactions.len());
// Other BankErrors should not be recorded
results[0] = Err(BankError::AccountNotFound);
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len() - 1);
assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1);
}
#[test]
@ -671,7 +681,7 @@ mod tests {
// read entries until I find mine, might be ticks...
while need_tick {
let entries = entry_receiver.recv().unwrap();
for entry in entries {
for (entry, _) in entries {
if !entry.is_tick() {
assert_eq!(entry.transactions.len(), transactions.len());
assert_eq!(bank.get_balance(&pubkey), 1);

View File

@ -33,7 +33,6 @@ pub enum BroadcastServiceReturnType {
struct Broadcast {
id: Pubkey,
tick_height: u64,
max_tick_height: u64,
blob_index: u64,
@ -42,19 +41,10 @@ struct Broadcast {
}
impl Broadcast {
fn count_ticks(entries: &[Entry]) -> u64 {
entries.iter().fold(0, |mut sum, e| {
if e.is_tick() {
sum += 1
}
sum
})
}
fn run(
&mut self,
broadcast_table: &[NodeInfo],
receiver: &Receiver<Vec<Entry>>,
receiver: &Receiver<Vec<(Entry, u64)>>,
sock: &UdpSocket,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
blocktree: &Arc<Blocktree>,
@ -64,12 +54,12 @@ impl Broadcast {
let now = Instant::now();
let mut num_entries = entries.len();
let mut ventries = Vec::new();
let mut ticks = Self::count_ticks(&entries);
let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
while let Ok(entries) = receiver.try_recv() {
num_entries += entries.len();
ticks += Self::count_ticks(&entries);
last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
}
@ -78,12 +68,15 @@ impl Broadcast {
let to_blobs_start = Instant::now();
// Generate the slot heights for all the entries inside ventries
// this may span slots if this leader broadcasts for consecutive slots...
let slots = generate_slots(&ventries, self.tick_height + 1, leader_scheduler);
// this may span slots if this leader broadcasts for consecutive slots...
let slots = generate_slots(&ventries, leader_scheduler);
let blobs: Vec<_> = ventries
.into_par_iter()
.flat_map(|p| p.to_shared_blobs())
.flat_map(|p| {
let entries: Vec<_> = p.into_iter().map(|e| e.0).collect();
entries.to_shared_blobs()
})
.collect();
// TODO: blob_index should be slot-relative...
@ -105,9 +98,8 @@ impl Broadcast {
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
assert!(self.tick_height + ticks <= self.max_tick_height);
self.tick_height += ticks;
let contains_last_tick = self.tick_height == self.max_tick_height;
assert!(last_tick <= self.max_tick_height);
let contains_last_tick = last_tick == self.max_tick_height;
if contains_last_tick {
blobs.last().unwrap().write().unwrap().set_is_last_in_slot();
@ -152,8 +144,7 @@ impl Broadcast {
}
fn generate_slots(
ventries: &[Vec<Entry>],
mut tick_height: u64,
ventries: &[Vec<(Entry, u64)>],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Vec<u64> {
// Generate the slot heights for all the entries inside ventries
@ -163,13 +154,7 @@ fn generate_slots(
.flat_map(|p| {
let slot_heights: Vec<u64> = p
.iter()
.map(|e| {
let slot = r_leader_scheduler.tick_height_to_slot(tick_height);
if e.is_tick() {
tick_height += 1;
}
slot
})
.map(|(_, tick_height)| r_leader_scheduler.tick_height_to_slot(*tick_height))
.collect();
slot_heights
@ -207,7 +192,7 @@ impl BroadcastService {
cluster_info: &Arc<RwLock<ClusterInfo>>,
blob_index: u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
receiver: &Receiver<Vec<Entry>>,
receiver: &Receiver<Vec<(Entry, u64)>>,
max_tick_height: u64,
exit_signal: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
@ -216,7 +201,6 @@ impl BroadcastService {
let mut broadcast = Broadcast {
id: me.id,
tick_height: bank.tick_height(),
max_tick_height,
blob_index,
#[cfg(feature = "erasure")]
@ -278,7 +262,7 @@ impl BroadcastService {
cluster_info: Arc<RwLock<ClusterInfo>>,
blob_index: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
receiver: Receiver<Vec<Entry>>,
receiver: Receiver<Vec<(Entry, u64)>>,
max_tick_height: u64,
exit_sender: Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
@ -341,7 +325,7 @@ mod test {
leader_pubkey: Pubkey,
ledger_path: &str,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
entry_receiver: Receiver<Vec<Entry>>,
entry_receiver: Receiver<Vec<(Entry, u64)>>,
blob_index: u64,
max_tick_height: u64,
) -> MockBroadcastService {
@ -409,9 +393,9 @@ mod test {
);
let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default());
for (_, tick) in ticks.into_iter().enumerate() {
for (i, tick) in ticks.into_iter().enumerate() {
entry_sender
.send(vec![tick])
.send(vec![(tick, i as u64 + 1)])
.expect("Expect successful send to broadcast service");
}

View File

@ -29,7 +29,7 @@ pub enum PohRecorderError {
#[derive(Clone)]
pub struct WorkingBank {
pub bank: Arc<Bank>,
pub sender: Sender<Vec<Entry>>,
pub sender: Sender<Vec<(Entry, u64)>>,
pub min_tick_height: u64,
pub max_tick_height: u64,
}
@ -100,11 +100,11 @@ impl PohRecorder {
working_bank.max_tick_height,
cnt,
);
let cache: Vec<Entry> = self.tick_cache[..cnt].iter().map(|x| x.0.clone()).collect();
for t in &cache {
working_bank.bank.register_tick(&t.id);
let cache = &self.tick_cache[..cnt];
for t in cache {
working_bank.bank.register_tick(&t.0.id);
}
working_bank.sender.send(cache)
working_bank.sender.send(cache.to_vec())
} else {
Ok(())
};
@ -160,13 +160,15 @@ impl PohRecorder {
.ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?;
let entry = self.poh.record(mixin);
assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function");
let entry = Entry {
let recorded_entry = Entry {
num_hashes: entry.num_hashes,
id: entry.id,
transactions: txs,
};
trace!("sending entry {}", entry.is_tick());
working_bank.sender.send(vec![entry])?;
trace!("sending entry {}", recorded_entry.is_tick());
working_bank
.sender
.send(vec![(recorded_entry, entry.tick_height)])?;
Ok(())
}
@ -352,9 +354,9 @@ mod tests {
//tick in the cache + entry
let e = entry_receiver.recv().expect("recv 1");
assert_eq!(e.len(), 1);
assert!(e[0].is_tick());
assert!(e[0].0.is_tick());
let e = entry_receiver.recv().expect("recv 2");
assert!(!e[0].is_tick());
assert!(!e[0].0.is_tick());
}
#[test]
@ -381,8 +383,8 @@ mod tests {
let e = entry_receiver.recv().expect("recv 1");
assert_eq!(e.len(), 2);
assert!(e[0].is_tick());
assert!(e[1].is_tick());
assert!(e[0].0.is_tick());
assert!(e[1].0.is_tick());
}
#[test]

View File

@ -162,6 +162,7 @@ mod tests {
while need_tick || need_entry || need_partial {
for entry in entry_receiver.recv().unwrap() {
let entry = &entry.0;
if entry.is_tick() {
assert!(entry.num_hashes <= HASHES_PER_TICK);