2018-12-06 12:52:47 -08:00
|
|
|
//! The `broadcast_service` broadcasts data from a leader node to validators
|
2018-08-09 11:54:23 -07:00
|
|
|
//!
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo};
|
|
|
|
use crate::counter::Counter;
|
2018-12-12 15:58:29 -08:00
|
|
|
use crate::db_ledger::DbLedger;
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::entry::Entry;
|
2018-08-09 11:54:23 -07:00
|
|
|
#[cfg(feature = "erasure")]
|
2018-12-08 21:40:42 -08:00
|
|
|
use crate::erasure;
|
2018-12-12 15:58:29 -08:00
|
|
|
use crate::leader_scheduler::LeaderScheduler;
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::ledger::Block;
|
2018-12-12 15:58:29 -08:00
|
|
|
use crate::packet::{index_blobs, SharedBlob};
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::result::{Error, Result};
|
|
|
|
use crate::service::Service;
|
|
|
|
use crate::window::{SharedWindow, WindowIndex, WindowUtil};
|
2018-08-09 11:54:23 -07:00
|
|
|
use log::Level;
|
2018-09-18 21:45:49 -07:00
|
|
|
use rayon::prelude::*;
|
2018-11-16 08:45:59 -08:00
|
|
|
use solana_metrics::{influxdb, submit};
|
2018-11-15 13:23:26 -08:00
|
|
|
use solana_sdk::pubkey::Pubkey;
|
2018-11-16 08:45:59 -08:00
|
|
|
use solana_sdk::timing::duration_as_ms;
|
2018-08-09 11:54:23 -07:00
|
|
|
use std::net::UdpSocket;
|
2018-09-14 14:34:32 -07:00
|
|
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
2018-09-18 13:49:10 -07:00
|
|
|
use std::sync::mpsc::{Receiver, RecvTimeoutError};
|
2018-08-09 11:54:23 -07:00
|
|
|
use std::sync::{Arc, RwLock};
|
2018-08-09 14:17:50 -07:00
|
|
|
use std::thread::{self, Builder, JoinHandle};
|
2018-09-18 21:45:49 -07:00
|
|
|
use std::time::{Duration, Instant};
|
2018-08-09 11:54:23 -07:00
|
|
|
|
2018-09-14 00:17:40 -07:00
|
|
|
#[derive(Debug, PartialEq, Eq, Clone)]
|
2018-12-06 12:52:47 -08:00
|
|
|
pub enum BroadcastServiceReturnType {
|
2018-09-14 00:17:40 -07:00
|
|
|
LeaderRotation,
|
|
|
|
ChannelDisconnected,
|
2018-12-12 15:58:29 -08:00
|
|
|
ExitSignal,
|
2018-09-14 00:17:40 -07:00
|
|
|
}
|
|
|
|
|
2018-12-07 19:01:28 -08:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2018-08-09 11:54:23 -07:00
|
|
|
fn broadcast(
|
2018-12-12 15:58:29 -08:00
|
|
|
db_ledger: &Arc<RwLock<DbLedger>>,
|
2018-11-13 02:21:37 -08:00
|
|
|
max_tick_height: Option<u64>,
|
2018-11-15 13:23:26 -08:00
|
|
|
leader_id: Pubkey,
|
2018-08-09 11:54:23 -07:00
|
|
|
node_info: &NodeInfo,
|
|
|
|
broadcast_table: &[NodeInfo],
|
|
|
|
window: &SharedWindow,
|
2018-09-18 13:49:10 -07:00
|
|
|
receiver: &Receiver<Vec<Entry>>,
|
2018-08-09 11:54:23 -07:00
|
|
|
sock: &UdpSocket,
|
|
|
|
transmit_index: &mut WindowIndex,
|
|
|
|
receive_index: &mut u64,
|
2018-12-12 15:58:29 -08:00
|
|
|
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
2018-08-09 11:54:23 -07:00
|
|
|
) -> Result<()> {
|
2018-09-05 21:36:59 -07:00
|
|
|
let id = node_info.id;
|
2018-08-09 11:54:23 -07:00
|
|
|
let timer = Duration::new(1, 0);
|
2018-09-18 13:49:10 -07:00
|
|
|
let entries = receiver.recv_timeout(timer)?;
|
2018-09-24 17:13:49 -07:00
|
|
|
let now = Instant::now();
|
2018-09-18 21:45:49 -07:00
|
|
|
let mut num_entries = entries.len();
|
|
|
|
let mut ventries = Vec::new();
|
|
|
|
ventries.push(entries);
|
2018-12-12 15:58:29 -08:00
|
|
|
|
|
|
|
let mut contains_last_tick = false;
|
2018-09-18 13:49:10 -07:00
|
|
|
while let Ok(entries) = receiver.try_recv() {
|
2018-09-18 21:45:49 -07:00
|
|
|
num_entries += entries.len();
|
|
|
|
ventries.push(entries);
|
2018-08-09 11:54:23 -07:00
|
|
|
}
|
2018-12-12 15:58:29 -08:00
|
|
|
|
|
|
|
if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) {
|
|
|
|
contains_last_tick |= Some(last.tick_height + 1) == max_tick_height && last.is_tick();
|
|
|
|
}
|
|
|
|
|
2018-12-06 12:52:47 -08:00
|
|
|
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
|
2018-08-09 11:54:23 -07:00
|
|
|
|
2018-09-18 21:45:49 -07:00
|
|
|
let to_blobs_start = Instant::now();
|
2018-11-13 02:21:37 -08:00
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
// Generate the slot heights for all the entries inside ventries
|
|
|
|
let slot_heights = generate_slots(&ventries, leader_scheduler);
|
2018-11-13 02:21:37 -08:00
|
|
|
|
2018-12-12 20:42:12 -08:00
|
|
|
let blobs: Vec<_> = ventries
|
2018-09-18 21:45:49 -07:00
|
|
|
.into_par_iter()
|
2018-09-26 09:50:12 -07:00
|
|
|
.flat_map(|p| p.to_blobs())
|
2018-09-18 21:45:49 -07:00
|
|
|
.collect();
|
|
|
|
|
2018-12-12 20:42:12 -08:00
|
|
|
let blobs_slot_heights: Vec<(SharedBlob, u64)> = blobs.into_iter().zip(slot_heights).collect();
|
2018-09-18 21:45:49 -07:00
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
|
2018-08-09 11:54:23 -07:00
|
|
|
|
2018-09-18 21:45:49 -07:00
|
|
|
let blobs_chunking = Instant::now();
|
2018-08-09 11:54:23 -07:00
|
|
|
// We could receive more blobs than window slots so
|
|
|
|
// break them up into window-sized chunks to process
|
2018-10-30 10:05:18 -07:00
|
|
|
let window_size = window.read().unwrap().window_size();
|
2018-12-12 15:58:29 -08:00
|
|
|
let blobs_chunked = blobs_slot_heights
|
|
|
|
.chunks(window_size as usize)
|
|
|
|
.map(|x| x.to_vec());
|
2018-09-18 21:45:49 -07:00
|
|
|
let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed());
|
2018-08-09 11:54:23 -07:00
|
|
|
|
2018-09-18 21:45:49 -07:00
|
|
|
let broadcast_start = Instant::now();
|
2018-12-08 21:44:20 -08:00
|
|
|
for blobs in blobs_chunked {
|
2018-08-09 11:54:23 -07:00
|
|
|
let blobs_len = blobs.len();
|
2018-09-05 21:36:59 -07:00
|
|
|
trace!("{}: broadcast blobs.len: {}", id, blobs_len);
|
2018-08-09 11:54:23 -07:00
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
index_blobs(blobs.iter(), &node_info.id, *receive_index);
|
2018-08-09 11:54:23 -07:00
|
|
|
|
|
|
|
// keep the cache of blobs that are broadcast
|
|
|
|
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
|
|
|
|
{
|
|
|
|
let mut win = window.write().unwrap();
|
|
|
|
assert!(blobs.len() <= win.len());
|
2018-12-12 15:58:29 -08:00
|
|
|
for (b, _) in &blobs {
|
2018-11-07 13:18:14 -08:00
|
|
|
let ix = b.read().unwrap().index().expect("blob index");
|
2018-10-30 10:05:18 -07:00
|
|
|
let pos = (ix % window_size) as usize;
|
2018-09-14 13:13:36 -07:00
|
|
|
if let Some(x) = win[pos].data.take() {
|
2018-09-26 09:50:12 -07:00
|
|
|
trace!(
|
|
|
|
"{} popped {} at {}",
|
|
|
|
id,
|
2018-11-07 13:18:14 -08:00
|
|
|
x.read().unwrap().index().unwrap(),
|
2018-09-26 09:50:12 -07:00
|
|
|
pos
|
|
|
|
);
|
2018-08-09 11:54:23 -07:00
|
|
|
}
|
2018-09-14 13:13:36 -07:00
|
|
|
if let Some(x) = win[pos].coding.take() {
|
2018-09-26 09:50:12 -07:00
|
|
|
trace!(
|
|
|
|
"{} popped {} at {}",
|
|
|
|
id,
|
2018-11-07 13:18:14 -08:00
|
|
|
x.read().unwrap().index().unwrap(),
|
2018-09-26 09:50:12 -07:00
|
|
|
pos
|
|
|
|
);
|
2018-08-09 11:54:23 -07:00
|
|
|
}
|
|
|
|
|
2018-09-05 21:36:59 -07:00
|
|
|
trace!("{} null {}", id, pos);
|
2018-08-09 11:54:23 -07:00
|
|
|
}
|
2018-12-12 15:58:29 -08:00
|
|
|
for (b, slot) in &blobs {
|
|
|
|
{
|
|
|
|
let ix = b.read().unwrap().index().expect("blob index");
|
|
|
|
let pos = (ix % window_size) as usize;
|
|
|
|
trace!("{} caching {} at {}", id, ix, pos);
|
|
|
|
assert!(win[pos].data.is_none());
|
|
|
|
win[pos].data = Some(b.clone());
|
|
|
|
}
|
|
|
|
db_ledger
|
|
|
|
.write()
|
|
|
|
.unwrap()
|
|
|
|
.write_shared_blobs(*slot, vec![b])?;
|
2018-08-09 11:54:23 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Fill in the coding blob data from the window data blobs
|
|
|
|
#[cfg(feature = "erasure")]
|
|
|
|
{
|
|
|
|
erasure::generate_coding(
|
2018-09-05 21:36:59 -07:00
|
|
|
&id,
|
2018-08-09 11:54:23 -07:00
|
|
|
&mut window.write().unwrap(),
|
|
|
|
*receive_index,
|
|
|
|
blobs_len,
|
|
|
|
&mut transmit_index.coding,
|
|
|
|
)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
*receive_index += blobs_len as u64;
|
|
|
|
|
|
|
|
// Send blobs out from the window
|
2018-10-08 19:55:54 -07:00
|
|
|
ClusterInfo::broadcast(
|
2018-12-12 15:58:29 -08:00
|
|
|
contains_last_tick,
|
2018-11-15 13:23:26 -08:00
|
|
|
leader_id,
|
2018-08-09 11:54:23 -07:00
|
|
|
&node_info,
|
|
|
|
&broadcast_table,
|
|
|
|
&window,
|
|
|
|
&sock,
|
|
|
|
transmit_index,
|
|
|
|
*receive_index,
|
|
|
|
)?;
|
|
|
|
}
|
2018-09-18 21:45:49 -07:00
|
|
|
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
|
|
|
|
|
2018-09-24 17:13:49 -07:00
|
|
|
inc_new_counter_info!(
|
2018-12-06 12:52:47 -08:00
|
|
|
"broadcast_service-time_ms",
|
2018-09-24 17:13:49 -07:00
|
|
|
duration_as_ms(&now.elapsed()) as usize
|
|
|
|
);
|
2018-09-18 21:45:49 -07:00
|
|
|
info!(
|
|
|
|
"broadcast: {} entries, blob time {} chunking time {} broadcast time {}",
|
|
|
|
num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed
|
|
|
|
);
|
|
|
|
|
2018-11-16 08:45:59 -08:00
|
|
|
submit(
|
2018-12-06 12:52:47 -08:00
|
|
|
influxdb::Point::new("broadcast-service")
|
2018-10-17 17:32:50 -07:00
|
|
|
.add_field(
|
|
|
|
"transmit-index",
|
|
|
|
influxdb::Value::Integer(transmit_index.data as i64),
|
2018-12-07 19:01:28 -08:00
|
|
|
)
|
|
|
|
.to_owned(),
|
2018-10-17 17:32:50 -07:00
|
|
|
);
|
|
|
|
|
2018-08-09 11:54:23 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
fn generate_slots(
|
|
|
|
ventries: &[Vec<Entry>],
|
|
|
|
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
|
|
|
) -> Vec<u64> {
|
|
|
|
// Generate the slot heights for all the entries inside ventries
|
|
|
|
let r_leader_scheduler = leader_scheduler.read().unwrap();
|
|
|
|
ventries
|
|
|
|
.iter()
|
|
|
|
.flat_map(|p| {
|
|
|
|
let slot_heights: Vec<u64> = p
|
|
|
|
.iter()
|
|
|
|
.map(|e| {
|
|
|
|
let (_, slot) = r_leader_scheduler
|
|
|
|
.get_scheduled_leader(e.tick_height + 1)
|
|
|
|
.expect("Leader schedule should never be unknown while indexing blobs");
|
|
|
|
slot
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
slot_heights
|
|
|
|
})
|
|
|
|
.collect()
|
|
|
|
}
|
|
|
|
|
2018-12-06 12:52:47 -08:00
|
|
|
// Implement a destructor for the BroadcastService3 thread to signal it exited
|
2018-09-14 14:34:32 -07:00
|
|
|
// even on panics
|
|
|
|
struct Finalizer {
|
|
|
|
exit_sender: Arc<AtomicBool>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Finalizer {
|
|
|
|
fn new(exit_sender: Arc<AtomicBool>) -> Self {
|
|
|
|
Finalizer { exit_sender }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Implement a destructor for Finalizer.
|
|
|
|
impl Drop for Finalizer {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.exit_sender.clone().store(true, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-12-06 12:52:47 -08:00
|
|
|
pub struct BroadcastService {
|
|
|
|
thread_hdl: JoinHandle<BroadcastServiceReturnType>,
|
2018-08-09 14:17:50 -07:00
|
|
|
}
|
|
|
|
|
2018-12-06 12:52:47 -08:00
|
|
|
impl BroadcastService {
|
2018-08-09 14:17:50 -07:00
|
|
|
fn run(
|
2018-12-12 15:58:29 -08:00
|
|
|
db_ledger: &Arc<RwLock<DbLedger>>,
|
2018-08-09 15:20:13 -07:00
|
|
|
sock: &UdpSocket,
|
2018-10-08 19:55:54 -07:00
|
|
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
2018-08-09 15:20:13 -07:00
|
|
|
window: &SharedWindow,
|
2018-08-09 14:17:50 -07:00
|
|
|
entry_height: u64,
|
2018-12-12 15:58:29 -08:00
|
|
|
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
2018-09-18 13:49:10 -07:00
|
|
|
receiver: &Receiver<Vec<Entry>>,
|
2018-11-13 02:21:37 -08:00
|
|
|
max_tick_height: Option<u64>,
|
2018-12-12 15:58:29 -08:00
|
|
|
exit_signal: &Arc<AtomicBool>,
|
2018-12-06 12:52:47 -08:00
|
|
|
) -> BroadcastServiceReturnType {
|
2018-08-09 14:17:50 -07:00
|
|
|
let mut transmit_index = WindowIndex {
|
|
|
|
data: entry_height,
|
|
|
|
coding: entry_height,
|
|
|
|
};
|
|
|
|
let mut receive_index = entry_height;
|
2018-10-10 16:49:41 -07:00
|
|
|
let me = cluster_info.read().unwrap().my_data().clone();
|
2018-08-09 14:17:50 -07:00
|
|
|
loop {
|
2018-12-12 15:58:29 -08:00
|
|
|
if exit_signal.load(Ordering::Relaxed) {
|
|
|
|
return BroadcastServiceReturnType::ExitSignal;
|
|
|
|
}
|
2018-11-15 23:30:22 -08:00
|
|
|
let broadcast_table = cluster_info.read().unwrap().tvu_peers();
|
2018-12-10 14:24:32 -08:00
|
|
|
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
|
2018-11-15 13:23:26 -08:00
|
|
|
let leader_id = cluster_info.read().unwrap().leader_id();
|
2018-08-09 14:17:50 -07:00
|
|
|
if let Err(e) = broadcast(
|
2018-12-12 15:58:29 -08:00
|
|
|
db_ledger,
|
2018-11-13 02:21:37 -08:00
|
|
|
max_tick_height,
|
2018-11-15 13:23:26 -08:00
|
|
|
leader_id,
|
2018-08-09 14:17:50 -07:00
|
|
|
&me,
|
|
|
|
&broadcast_table,
|
|
|
|
&window,
|
|
|
|
&receiver,
|
|
|
|
&sock,
|
|
|
|
&mut transmit_index,
|
|
|
|
&mut receive_index,
|
2018-12-12 15:58:29 -08:00
|
|
|
leader_scheduler,
|
2018-08-09 14:17:50 -07:00
|
|
|
) {
|
|
|
|
match e {
|
2018-09-14 00:17:40 -07:00
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
2018-12-06 12:52:47 -08:00
|
|
|
return BroadcastServiceReturnType::ChannelDisconnected
|
2018-09-14 00:17:40 -07:00
|
|
|
}
|
2018-08-09 14:17:50 -07:00
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
2018-10-08 19:55:54 -07:00
|
|
|
Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
2018-08-09 14:17:50 -07:00
|
|
|
_ => {
|
|
|
|
inc_new_counter_info!("streamer-broadcaster-error", 1, 1);
|
|
|
|
error!("broadcaster error: {:?}", e);
|
2018-08-09 11:54:23 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2018-08-09 14:17:50 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Service to broadcast messages from the leader to layer 1 nodes.
|
2018-10-08 19:55:54 -07:00
|
|
|
/// See `cluster_info` for network layer definitions.
|
2018-08-09 14:17:50 -07:00
|
|
|
/// # Arguments
|
|
|
|
/// * `sock` - Socket to send from.
|
|
|
|
/// * `exit` - Boolean to signal system exit.
|
2018-10-08 19:55:54 -07:00
|
|
|
/// * `cluster_info` - ClusterInfo structure
|
2018-08-09 14:17:50 -07:00
|
|
|
/// * `window` - Cache of blobs that we have broadcast
|
|
|
|
/// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
2018-12-06 12:52:47 -08:00
|
|
|
/// * `exit_sender` - Set to true when this service exits, allows rest of Tpu to exit cleanly.
|
|
|
|
/// Otherwise, when a Tpu closes, it only closes the stages that come after it. The stages
|
2018-09-14 14:34:32 -07:00
|
|
|
/// that come before could be blocked on a receive, and never notice that they need to
|
|
|
|
/// exit. Now, if any stage of the Tpu closes, it will lead to closing the WriteStage (b/c
|
2018-12-06 12:52:47 -08:00
|
|
|
/// WriteStage is the last stage in the pipeline), which will then close Broadcast service,
|
2018-09-14 14:34:32 -07:00
|
|
|
/// which will then close FetchStage in the Tpu, and then the rest of the Tpu,
|
|
|
|
/// completing the cycle.
|
2018-12-12 15:58:29 -08:00
|
|
|
#[allow(clippy::too_many_arguments, clippy::new_ret_no_self)]
|
2018-08-09 14:17:50 -07:00
|
|
|
pub fn new(
|
2018-12-12 15:58:29 -08:00
|
|
|
db_ledger: Arc<RwLock<DbLedger>>,
|
2018-08-09 14:17:50 -07:00
|
|
|
sock: UdpSocket,
|
2018-10-08 19:55:54 -07:00
|
|
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
2018-08-09 14:17:50 -07:00
|
|
|
window: SharedWindow,
|
|
|
|
entry_height: u64,
|
2018-12-12 15:58:29 -08:00
|
|
|
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
2018-09-18 13:49:10 -07:00
|
|
|
receiver: Receiver<Vec<Entry>>,
|
2018-11-13 02:21:37 -08:00
|
|
|
max_tick_height: Option<u64>,
|
2018-09-14 14:34:32 -07:00
|
|
|
exit_sender: Arc<AtomicBool>,
|
2018-12-12 15:58:29 -08:00
|
|
|
) -> (Self, Arc<AtomicBool>) {
|
|
|
|
let exit_signal = Arc::new(AtomicBool::new(false));
|
|
|
|
let exit_signal_ = exit_signal.clone();
|
2018-08-09 14:17:50 -07:00
|
|
|
let thread_hdl = Builder::new()
|
|
|
|
.name("solana-broadcaster".to_string())
|
2018-09-14 14:34:32 -07:00
|
|
|
.spawn(move || {
|
|
|
|
let _exit = Finalizer::new(exit_sender);
|
2018-11-07 13:18:14 -08:00
|
|
|
Self::run(
|
2018-12-12 15:58:29 -08:00
|
|
|
&db_ledger,
|
2018-11-07 13:18:14 -08:00
|
|
|
&sock,
|
|
|
|
&cluster_info,
|
|
|
|
&window,
|
|
|
|
entry_height,
|
2018-12-12 15:58:29 -08:00
|
|
|
&leader_scheduler,
|
2018-11-07 13:18:14 -08:00
|
|
|
&receiver,
|
2018-11-13 02:21:37 -08:00
|
|
|
max_tick_height,
|
2018-12-12 15:58:29 -08:00
|
|
|
&exit_signal_,
|
2018-11-07 13:18:14 -08:00
|
|
|
)
|
2018-12-07 19:01:28 -08:00
|
|
|
})
|
|
|
|
.unwrap();
|
2018-08-09 14:17:50 -07:00
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
(Self { thread_hdl }, exit_signal)
|
2018-08-09 14:17:50 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-12-06 12:52:47 -08:00
|
|
|
impl Service for BroadcastService {
|
|
|
|
type JoinReturnType = BroadcastServiceReturnType;
|
2018-08-09 14:17:50 -07:00
|
|
|
|
2018-12-06 12:52:47 -08:00
|
|
|
fn join(self) -> thread::Result<BroadcastServiceReturnType> {
|
2018-09-14 00:17:40 -07:00
|
|
|
self.thread_hdl.join()
|
2018-08-09 14:17:50 -07:00
|
|
|
}
|
2018-08-09 11:54:23 -07:00
|
|
|
}
|
2018-12-12 15:58:29 -08:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
|
|
|
use crate::cluster_info::{ClusterInfo, Node};
|
|
|
|
use crate::db_ledger::DbLedger;
|
|
|
|
use crate::ledger::create_ticks;
|
|
|
|
use crate::ledger::get_tmp_ledger_path;
|
|
|
|
use crate::service::Service;
|
|
|
|
use crate::window::new_window;
|
|
|
|
use solana_sdk::hash::Hash;
|
|
|
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
|
|
|
use std::sync::atomic::AtomicBool;
|
|
|
|
use std::sync::mpsc::channel;
|
|
|
|
use std::sync::mpsc::Sender;
|
|
|
|
use std::sync::{Arc, RwLock};
|
|
|
|
use std::thread::sleep;
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
struct DummyBroadcastService {
|
|
|
|
db_ledger: Arc<RwLock<DbLedger>>,
|
|
|
|
broadcast_service: BroadcastService,
|
|
|
|
entry_sender: Sender<Vec<Entry>>,
|
|
|
|
exit_signal: Arc<AtomicBool>,
|
|
|
|
}
|
|
|
|
|
|
|
|
fn setup_dummy_broadcast_service(
|
|
|
|
leader_pubkey: Pubkey,
|
|
|
|
ledger_path: &str,
|
|
|
|
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
|
|
|
entry_height: u64,
|
|
|
|
max_tick_height: u64,
|
|
|
|
) -> DummyBroadcastService {
|
|
|
|
// Make the database ledger
|
|
|
|
let db_ledger = Arc::new(RwLock::new(DbLedger::open(ledger_path).unwrap()));
|
|
|
|
|
|
|
|
// Make the leader node and scheduler
|
|
|
|
let leader_info = Node::new_localhost_with_pubkey(leader_pubkey);
|
|
|
|
|
|
|
|
// Make a node to broadcast to
|
|
|
|
let buddy_keypair = Keypair::new();
|
|
|
|
let broadcast_buddy = Node::new_localhost_with_pubkey(buddy_keypair.pubkey());
|
|
|
|
|
|
|
|
// Fill the cluster_info with the buddy's info
|
|
|
|
let mut cluster_info = ClusterInfo::new(leader_info.info.clone());
|
|
|
|
cluster_info.insert_info(broadcast_buddy.info);
|
|
|
|
let cluster_info = Arc::new(RwLock::new(cluster_info));
|
|
|
|
|
|
|
|
let window = new_window(32 * 1024);
|
|
|
|
let shared_window = Arc::new(RwLock::new(window));
|
|
|
|
let (entry_sender, entry_receiver) = channel();
|
|
|
|
let exit_sender = Arc::new(AtomicBool::new(false));
|
|
|
|
|
|
|
|
// Start up the broadcast stage
|
|
|
|
let (broadcast_service, exit_signal) = BroadcastService::new(
|
|
|
|
db_ledger.clone(),
|
|
|
|
leader_info.sockets.broadcast,
|
|
|
|
cluster_info,
|
|
|
|
shared_window,
|
|
|
|
entry_height,
|
|
|
|
leader_scheduler,
|
|
|
|
entry_receiver,
|
|
|
|
Some(max_tick_height),
|
|
|
|
exit_sender,
|
|
|
|
);
|
|
|
|
|
|
|
|
DummyBroadcastService {
|
|
|
|
db_ledger,
|
|
|
|
broadcast_service,
|
|
|
|
entry_sender,
|
|
|
|
exit_signal,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_broadcast_ledger() {
|
|
|
|
let ledger_path = get_tmp_ledger_path("test_broadcast");
|
|
|
|
{
|
|
|
|
// Create the leader scheduler
|
|
|
|
let leader_keypair = Keypair::new();
|
|
|
|
let mut leader_scheduler =
|
|
|
|
LeaderScheduler::from_bootstrap_leader(leader_keypair.pubkey());
|
|
|
|
|
|
|
|
// Mock the tick height to look like the tick height right after a leader transition
|
|
|
|
leader_scheduler.last_seed_height = Some(leader_scheduler.bootstrap_height);
|
|
|
|
leader_scheduler.set_leader_schedule(vec![leader_keypair.pubkey()]);
|
|
|
|
leader_scheduler.use_only_bootstrap_leader = false;
|
|
|
|
let start_tick_height = leader_scheduler.bootstrap_height;
|
|
|
|
let max_tick_height = start_tick_height + leader_scheduler.last_seed_height.unwrap();
|
|
|
|
let entry_height = 2 * start_tick_height;
|
|
|
|
|
|
|
|
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
|
|
|
|
let broadcast_service = setup_dummy_broadcast_service(
|
|
|
|
leader_keypair.pubkey(),
|
|
|
|
&ledger_path,
|
|
|
|
leader_scheduler.clone(),
|
|
|
|
entry_height,
|
|
|
|
max_tick_height,
|
|
|
|
);
|
|
|
|
|
|
|
|
let ticks = create_ticks(
|
|
|
|
(max_tick_height - start_tick_height) as usize,
|
|
|
|
Hash::default(),
|
|
|
|
);
|
|
|
|
for (i, mut tick) in ticks.into_iter().enumerate() {
|
|
|
|
// Simulate the tick heights generated in poh.rs
|
|
|
|
tick.tick_height = start_tick_height + i as u64;
|
|
|
|
broadcast_service
|
|
|
|
.entry_sender
|
|
|
|
.send(vec![tick])
|
|
|
|
.expect("Expect successful send to broadcast service");
|
|
|
|
}
|
|
|
|
|
|
|
|
sleep(Duration::from_millis(2000));
|
|
|
|
let r_db = broadcast_service.db_ledger.read().unwrap();
|
|
|
|
for i in 0..max_tick_height - start_tick_height {
|
|
|
|
let (_, slot) = leader_scheduler
|
|
|
|
.read()
|
|
|
|
.unwrap()
|
|
|
|
.get_scheduled_leader(start_tick_height + i + 1)
|
|
|
|
.expect("Leader should exist");
|
|
|
|
let result = r_db
|
|
|
|
.data_cf
|
|
|
|
.get_by_slot_index(&r_db.db, slot, entry_height + i)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
assert!(result.is_some());
|
|
|
|
}
|
|
|
|
|
|
|
|
broadcast_service.exit_signal.store(true, Ordering::Relaxed);
|
|
|
|
broadcast_service
|
|
|
|
.broadcast_service
|
|
|
|
.join()
|
|
|
|
.expect("Expect successful join of broadcast service");
|
|
|
|
}
|
|
|
|
|
|
|
|
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
|
|
|
|
}
|
|
|
|
}
|