Remove max_tick_height, leader_scheduler from broadcast_service

This commit is contained in:
Carl 2019-02-25 16:15:53 -08:00 committed by Grimes
parent 58eebd7f6c
commit 3e893ffddc
8 changed files with 29 additions and 66 deletions

View File

@ -1483,7 +1483,7 @@ pub mod tests {
fn test_read_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
let slot = 0;
index_blobs(&shared_blobs, &mut 0, &[slot; 10]);
index_blobs(&shared_blobs, &mut 0, slot);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();

View File

@ -6,7 +6,6 @@ use crate::entry::Entry;
use crate::entry::EntrySlice;
#[cfg(feature = "erasure")]
use crate::erasure::CodingGenerator;
use crate::leader_scheduler::LeaderScheduler;
use crate::packet::index_blobs;
use crate::result::{Error, Result};
use crate::service::Service;
@ -33,7 +32,6 @@ pub enum BroadcastServiceReturnType {
struct Broadcast {
id: Pubkey,
max_tick_height: u64,
blob_index: u64,
#[cfg(feature = "erasure")]
@ -43,10 +41,11 @@ struct Broadcast {
impl Broadcast {
fn run(
&mut self,
slot_height: u64,
max_tick_height: u64,
broadcast_table: &[NodeInfo],
receiver: &Receiver<Vec<(Entry, u64)>>,
sock: &UdpSocket,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
let timer = Duration::new(1, 0);
@ -67,10 +66,6 @@ impl Broadcast {
let to_blobs_start = Instant::now();
// Generate the slot heights for all the entries inside ventries
// this may span slots if this leader broadcasts for consecutive slots...
let slots = generate_slots(&ventries, leader_scheduler);
let blobs: Vec<_> = ventries
.into_par_iter()
.flat_map(|p| {
@ -80,12 +75,12 @@ impl Broadcast {
.collect();
// TODO: blob_index should be slot-relative...
index_blobs(&blobs, &mut self.blob_index, &slots);
index_blobs(&blobs, &mut self.blob_index, slot_height);
let parent = {
if slots[0] == 0 {
if slot_height == 0 {
0
} else {
slots[0] - 1
slot_height - 1
}
};
for b in blobs.iter() {
@ -98,8 +93,8 @@ impl Broadcast {
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
assert!(last_tick <= self.max_tick_height);
let contains_last_tick = last_tick == self.max_tick_height;
assert!(last_tick <= max_tick_height);
let contains_last_tick = last_tick == max_tick_height;
if contains_last_tick {
blobs.last().unwrap().write().unwrap().set_is_last_in_slot();
@ -143,25 +138,6 @@ impl Broadcast {
}
}
fn generate_slots(
ventries: &[Vec<(Entry, u64)>],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Vec<u64> {
// Generate the slot heights for all the entries inside ventries
let r_leader_scheduler = leader_scheduler.read().unwrap();
ventries
.iter()
.flat_map(|p| {
let slot_heights: Vec<u64> = p
.iter()
.map(|(_, tick_height)| r_leader_scheduler.tick_height_to_slot(*tick_height))
.collect();
slot_heights
})
.collect()
}
// Implement a destructor for the BroadcastService3 thread to signal it exited
// even on panics
struct Finalizer {
@ -187,13 +163,12 @@ pub struct BroadcastService {
impl BroadcastService {
#[allow(clippy::too_many_arguments)]
fn run(
slot_height: u64,
bank: &Arc<Bank>,
sock: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>,
blob_index: u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
receiver: &Receiver<Vec<(Entry, u64)>>,
max_tick_height: u64,
exit_signal: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
) -> BroadcastServiceReturnType {
@ -201,12 +176,13 @@ impl BroadcastService {
let mut broadcast = Broadcast {
id: me.id,
max_tick_height,
blob_index,
#[cfg(feature = "erasure")]
coding_generator: CodingGenerator::new(),
};
let max_tick_height = (slot_height + 1) * bank.ticks_per_slot() - 1;
loop {
if exit_signal.load(Ordering::Relaxed) {
return BroadcastServiceReturnType::ExitSignal;
@ -219,10 +195,11 @@ impl BroadcastService {
broadcast_table.truncate(DATA_PLANE_FANOUT);
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
if let Err(e) = broadcast.run(
slot_height,
max_tick_height,
&broadcast_table,
receiver,
sock,
leader_scheduler,
blocktree,
) {
match e {
@ -257,13 +234,12 @@ impl BroadcastService {
/// completing the cycle.
#[allow(clippy::too_many_arguments)]
pub fn new(
slot_height: u64,
bank: Arc<Bank>,
sock: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>,
blob_index: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
receiver: Receiver<Vec<(Entry, u64)>>,
max_tick_height: u64,
exit_sender: Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
) -> Self {
@ -274,13 +250,12 @@ impl BroadcastService {
.spawn(move || {
let _exit = Finalizer::new(exit_sender);
Self::run(
slot_height,
&bank,
&sock,
&cluster_info,
blob_index,
&leader_scheduler,
&receiver,
max_tick_height,
&exit_signal,
&blocktree,
)
@ -306,6 +281,7 @@ mod test {
use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, Node};
use crate::entry::create_ticks;
use crate::leader_scheduler::LeaderScheduler;
use crate::service::Service;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
@ -322,12 +298,11 @@ mod test {
}
fn setup_dummy_broadcast_service(
slot_height: u64,
leader_pubkey: Pubkey,
ledger_path: &str,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
entry_receiver: Receiver<Vec<(Entry, u64)>>,
blob_index: u64,
max_tick_height: u64,
) -> MockBroadcastService {
// Make the database ledger
let blocktree = Arc::new(Blocktree::open(ledger_path).unwrap());
@ -349,13 +324,12 @@ mod test {
// Start up the broadcast stage
let broadcast_service = BroadcastService::new(
slot_height,
bank.clone(),
leader_info.sockets.broadcast,
cluster_info,
blob_index,
leader_scheduler,
entry_receiver,
max_tick_height,
exit_sender,
&blocktree,
);
@ -381,15 +355,13 @@ mod test {
let start_tick_height = 0;
let max_tick_height = start_tick_height + DEFAULT_TICKS_PER_SLOT;
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
let (entry_sender, entry_receiver) = channel();
let broadcast_service = setup_dummy_broadcast_service(
0,
leader_keypair.pubkey(),
&ledger_path,
leader_scheduler.clone(),
entry_receiver,
0,
max_tick_height,
);
let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default());
@ -403,10 +375,7 @@ mod test {
let blocktree = broadcast_service.blocktree;
let mut blob_index = 0;
for i in 0..max_tick_height - start_tick_height {
let slot = leader_scheduler
.read()
.unwrap()
.tick_height_to_slot(start_tick_height + i + 1);
let slot = leader_scheduler.tick_height_to_slot(start_tick_height + i + 1);
let result = blocktree.get_data_blob(slot, blob_index).unwrap();

View File

@ -450,7 +450,7 @@ mod test {
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();
index_blobs(&shared_blobs, &mut 0, &vec![slot; num_entries]);
index_blobs(&shared_blobs, &mut 0, slot);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -545,7 +545,7 @@ mod test {
let original_entries = make_tiny_test_entries(num_entries);
let shared_blobs = original_entries.clone().to_shared_blobs();
index_blobs(&shared_blobs, &mut 0, &vec![0; num_entries]);
index_blobs(&shared_blobs, &mut 0, 0);
for blob in shared_blobs.iter().rev() {
process_blob(&leader_scheduler, &blocktree, blob)

View File

@ -889,7 +889,7 @@ pub mod test {
}
// Make some dummy slots
index_blobs(&blobs, &mut (offset as u64), &vec![slot; blobs.len()]);
index_blobs(&blobs, &mut (offset as u64), slot);
for b in blobs {
let idx = b.read().unwrap().index() as usize % WINDOW_SIZE;
@ -902,7 +902,7 @@ pub mod test {
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs();
index_blobs(&blobs, &mut (offset as u64), &vec![0; blobs.len()]);
index_blobs(&blobs, &mut (offset as u64), 0);
blobs
}

View File

@ -101,7 +101,6 @@ pub struct Fullnode {
node_services: NodeServices,
rotation_receiver: TvuRotationReceiver,
blocktree: Arc<Blocktree>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
bank_forks: Arc<RwLock<BankForks>>,
}
@ -252,7 +251,6 @@ impl Fullnode {
broadcast_socket: node.sockets.broadcast,
rotation_receiver,
blocktree,
leader_scheduler,
bank_forks,
}
}
@ -306,7 +304,6 @@ impl Fullnode {
rotation_info.slot,
rotation_info.last_entry_id,
&self.blocktree,
&self.leader_scheduler,
);
transition
} else {

View File

@ -452,13 +452,13 @@ impl Blob {
}
}
pub fn index_blobs(blobs: &[SharedBlob], blob_index: &mut u64, slots: &[u64]) {
pub fn index_blobs(blobs: &[SharedBlob], blob_index: &mut u64, slot: u64) {
// enumerate all the blobs, those are the indices
for (blob, slot) in blobs.iter().zip(slots) {
for blob in blobs.iter() {
let mut blob = blob.write().unwrap();
blob.set_index(*blob_index);
blob.set_slot(*slot);
blob.set_slot(slot);
blob.forward(true);
*blob_index += 1;
}

View File

@ -7,7 +7,6 @@ use crate::broadcast_service::BroadcastService;
use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::fetch_stage::FetchStage;
use crate::leader_scheduler::LeaderScheduler;
use crate::poh_service::PohServiceConfig;
use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage;
@ -199,7 +198,6 @@ impl Tpu {
slot: u64,
last_entry_id: Hash,
blocktree: &Arc<Blocktree>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) {
self.close_and_forward_unprocessed_packets();
@ -240,13 +238,12 @@ impl Tpu {
);
let broadcast_service = BroadcastService::new(
slot,
bank,
broadcast_socket,
self.cluster_info.clone(),
blob_index,
leader_scheduler.clone(),
entry_receiver,
max_tick_height,
self.exit.clone(),
blocktree,
);

View File

@ -159,7 +159,7 @@ fn test_replay() {
let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2];
let blobs = entries.to_shared_blobs();
index_blobs(&blobs, &mut blob_idx, &vec![0; blobs.len()]);
index_blobs(&blobs, &mut blob_idx, 0);
blobs
.iter()
.for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));