groom broadcast (#3170)

This commit is contained in:
Rob Walker 2019-03-07 09:43:42 -08:00 committed by GitHub
parent 94882418ab
commit 9e9c0785e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 29 additions and 27 deletions

View File

@ -1535,7 +1535,7 @@ pub mod tests {
fn test_read_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
let slot = 0;
index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();

View File

@ -53,13 +53,6 @@ impl Broadcast {
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
let max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1;
// TODO: Fix BankingStage/BroadcastStage to operate on `slot` directly instead of
// `max_tick_height`
let mut blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
let now = Instant::now();
let mut num_entries = entries.len();
@ -90,19 +83,20 @@ impl Broadcast {
})
.collect();
index_blobs(&blobs, &self.id, &mut blob_index, bank.slot());
let parent = bank.parents().first().map(|bank| bank.slot()).unwrap_or(0);
for b in blobs.iter() {
b.write().unwrap().set_parent(parent);
}
let blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
index_blobs(
&blobs,
&self.id,
blob_index,
bank.slot(),
bank.parent().map_or(0, |parent| parent.slot()),
);
let broadcast_start = Instant::now();
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
assert!(last_tick <= max_tick_height);
let contains_last_tick = last_tick == max_tick_height;
if contains_last_tick {
@ -111,9 +105,15 @@ impl Broadcast {
blocktree.write_shared_blobs(&blobs)?;
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
let broadcast_start = Instant::now();
// Send out data
ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
// Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")]
{

View File

@ -415,7 +415,7 @@ mod test {
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();
index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -505,7 +505,7 @@ mod test {
let original_entries = make_tiny_test_entries(num_entries);
let shared_blobs = original_entries.clone().to_shared_blobs();
index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, 0);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, 0, 0);
for blob in shared_blobs.iter().rev() {
process_blob(&blocktree, blob).expect("Expect successful processing of blob");

View File

@ -890,7 +890,7 @@ pub mod test {
}
// Make some dummy slots
index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), slot);
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, slot, 0);
for b in blobs {
let idx = b.read().unwrap().index() as usize % WINDOW_SIZE;
@ -903,7 +903,7 @@ pub mod test {
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs();
index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), 0);
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 0, 0);
blobs
}

View File

@ -463,16 +463,17 @@ impl Blob {
}
}
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: &mut u64, slot: u64) {
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot: u64, parent: u64) {
// enumerate all the blobs, those are the indices
for blob in blobs.iter() {
let mut blob = blob.write().unwrap();
blob.set_index(*blob_index);
blob.set_index(blob_index);
blob.set_slot(slot);
blob.set_parent(parent);
blob.set_id(id);
blob.forward(true);
*blob_index += 1;
blob_index += 1;
}
}

View File

@ -160,7 +160,8 @@ fn test_replay() {
let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2];
let blobs = entries.to_shared_blobs();
index_blobs(&blobs, &leader.info.id, &mut blob_idx, 0);
index_blobs(&blobs, &leader.info.id, blob_idx, 0, 0);
blob_idx += blobs.len() as u64;
blobs
.iter()
.for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));