2019-03-01 19:43:30 -08:00
|
|
|
//! A stage to broadcast data from a leader node to validators
|
2019-07-01 17:54:03 -07:00
|
|
|
use self::broadcast_fake_blobs_run::BroadcastFakeBlobsRun;
|
2019-06-19 00:13:19 -07:00
|
|
|
use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun;
|
|
|
|
use self::standard_broadcast_run::StandardBroadcastRun;
|
2019-02-11 17:56:52 -08:00
|
|
|
use crate::blocktree::Blocktree;
|
2019-06-03 20:38:05 -07:00
|
|
|
use crate::cluster_info::{ClusterInfo, ClusterInfoError};
|
2019-09-18 12:16:22 -07:00
|
|
|
use crate::poh_recorder::WorkingBankEntry;
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::result::{Error, Result};
|
|
|
|
use crate::service::Service;
|
2019-02-28 13:15:25 -08:00
|
|
|
use crate::staking_utils;
|
2019-10-04 16:25:22 -07:00
|
|
|
use solana_metrics::{datapoint_debug, inc_new_counter_error, inc_new_counter_info};
|
2018-08-09 11:54:23 -07:00
|
|
|
use std::net::UdpSocket;
|
2019-02-13 20:04:20 -08:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2018-09-18 13:49:10 -07:00
|
|
|
use std::sync::mpsc::{Receiver, RecvTimeoutError};
|
2018-08-09 11:54:23 -07:00
|
|
|
use std::sync::{Arc, RwLock};
|
2018-08-09 14:17:50 -07:00
|
|
|
use std::thread::{self, Builder, JoinHandle};
|
2019-06-19 00:13:19 -07:00
|
|
|
use std::time::Instant;
|
|
|
|
|
2019-07-01 17:54:03 -07:00
|
|
|
mod broadcast_fake_blobs_run;
|
2019-08-20 17:16:06 -07:00
|
|
|
pub(crate) mod broadcast_utils;
|
2019-06-19 00:13:19 -07:00
|
|
|
mod fail_entry_verification_broadcast_run;
|
|
|
|
mod standard_broadcast_run;
|
2018-08-09 11:54:23 -07:00
|
|
|
|
2019-05-29 17:16:36 -07:00
|
|
|
pub const NUM_THREADS: u32 = 10;
|
|
|
|
|
2018-09-14 00:17:40 -07:00
|
|
|
#[derive(Debug, PartialEq, Eq, Clone)]
|
2019-03-01 19:43:30 -08:00
|
|
|
pub enum BroadcastStageReturnType {
|
2018-09-14 00:17:40 -07:00
|
|
|
ChannelDisconnected,
|
|
|
|
}
|
|
|
|
|
2019-06-19 00:13:19 -07:00
|
|
|
#[derive(PartialEq, Clone, Debug)]
|
|
|
|
pub enum BroadcastStageType {
|
|
|
|
Standard,
|
|
|
|
FailEntryVerification,
|
2019-07-01 17:54:03 -07:00
|
|
|
BroadcastFakeBlobs,
|
2019-05-21 17:02:19 -07:00
|
|
|
}
|
|
|
|
|
2019-06-19 00:13:19 -07:00
|
|
|
impl BroadcastStageType {
|
|
|
|
pub fn new_broadcast_stage(
|
|
|
|
&self,
|
|
|
|
sock: UdpSocket,
|
|
|
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
2019-09-18 12:16:22 -07:00
|
|
|
receiver: Receiver<WorkingBankEntry>,
|
2019-06-19 00:13:19 -07:00
|
|
|
exit_sender: &Arc<AtomicBool>,
|
|
|
|
blocktree: &Arc<Blocktree>,
|
|
|
|
) -> BroadcastStage {
|
|
|
|
match self {
|
|
|
|
BroadcastStageType::Standard => BroadcastStage::new(
|
|
|
|
sock,
|
|
|
|
cluster_info,
|
|
|
|
receiver,
|
|
|
|
exit_sender,
|
|
|
|
blocktree,
|
|
|
|
StandardBroadcastRun::new(),
|
|
|
|
),
|
|
|
|
|
|
|
|
BroadcastStageType::FailEntryVerification => BroadcastStage::new(
|
|
|
|
sock,
|
|
|
|
cluster_info,
|
|
|
|
receiver,
|
|
|
|
exit_sender,
|
|
|
|
blocktree,
|
|
|
|
FailEntryVerificationBroadcastRun::new(),
|
|
|
|
),
|
2019-07-01 17:54:03 -07:00
|
|
|
|
|
|
|
BroadcastStageType::BroadcastFakeBlobs => BroadcastStage::new(
|
|
|
|
sock,
|
|
|
|
cluster_info,
|
|
|
|
receiver,
|
|
|
|
exit_sender,
|
|
|
|
blocktree,
|
|
|
|
BroadcastFakeBlobsRun::new(0),
|
|
|
|
),
|
2019-06-19 00:13:19 -07:00
|
|
|
}
|
|
|
|
}
|
2019-01-15 10:51:53 -08:00
|
|
|
}
|
|
|
|
|
2019-06-19 00:13:19 -07:00
|
|
|
trait BroadcastRun {
|
2019-01-15 10:51:53 -08:00
|
|
|
fn run(
|
|
|
|
&mut self,
|
2019-03-03 16:44:06 -08:00
|
|
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
2019-09-18 12:16:22 -07:00
|
|
|
receiver: &Receiver<WorkingBankEntry>,
|
2019-01-15 10:51:53 -08:00
|
|
|
sock: &UdpSocket,
|
2019-02-11 17:56:52 -08:00
|
|
|
blocktree: &Arc<Blocktree>,
|
2019-06-19 00:13:19 -07:00
|
|
|
) -> Result<()>;
|
|
|
|
}
|
2019-05-21 17:02:19 -07:00
|
|
|
|
2019-03-01 19:43:30 -08:00
|
|
|
// Implement a destructor for the BroadcastStage thread to signal it exited
|
2018-09-14 14:34:32 -07:00
|
|
|
// even on panics
|
|
|
|
struct Finalizer {
|
|
|
|
exit_sender: Arc<AtomicBool>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Finalizer {
|
|
|
|
fn new(exit_sender: Arc<AtomicBool>) -> Self {
|
|
|
|
Finalizer { exit_sender }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Implement a destructor for Finalizer.
|
|
|
|
impl Drop for Finalizer {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.exit_sender.clone().store(true, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-01 19:43:30 -08:00
|
|
|
pub struct BroadcastStage {
|
|
|
|
thread_hdl: JoinHandle<BroadcastStageReturnType>,
|
2018-08-09 14:17:50 -07:00
|
|
|
}
|
|
|
|
|
2019-03-01 19:43:30 -08:00
|
|
|
impl BroadcastStage {
|
2019-02-07 15:10:54 -08:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2018-08-09 14:17:50 -07:00
|
|
|
fn run(
|
2018-08-09 15:20:13 -07:00
|
|
|
sock: &UdpSocket,
|
2018-10-08 19:55:54 -07:00
|
|
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
2019-09-18 12:16:22 -07:00
|
|
|
receiver: &Receiver<WorkingBankEntry>,
|
2019-02-11 17:56:52 -08:00
|
|
|
blocktree: &Arc<Blocktree>,
|
2019-06-19 00:13:19 -07:00
|
|
|
mut broadcast_stage_run: impl BroadcastRun,
|
2019-03-01 19:43:30 -08:00
|
|
|
) -> BroadcastStageReturnType {
|
2018-08-09 14:17:50 -07:00
|
|
|
loop {
|
2019-09-03 21:32:51 -07:00
|
|
|
if let Err(e) = broadcast_stage_run.run(&cluster_info, receiver, sock, blocktree) {
|
2018-08-09 14:17:50 -07:00
|
|
|
match e {
|
2019-02-01 12:03:14 -08:00
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => {
|
2019-03-01 19:43:30 -08:00
|
|
|
return BroadcastStageReturnType::ChannelDisconnected;
|
2018-09-14 00:17:40 -07:00
|
|
|
}
|
2018-08-09 14:17:50 -07:00
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
2018-10-08 19:55:54 -07:00
|
|
|
Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
2018-08-09 14:17:50 -07:00
|
|
|
_ => {
|
2019-05-17 07:00:06 -07:00
|
|
|
inc_new_counter_error!("streamer-broadcaster-error", 1, 1);
|
2018-08-09 14:17:50 -07:00
|
|
|
error!("broadcaster error: {:?}", e);
|
2018-08-09 11:54:23 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2018-08-09 14:17:50 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Service to broadcast messages from the leader to layer 1 nodes.
|
2018-10-08 19:55:54 -07:00
|
|
|
/// See `cluster_info` for network layer definitions.
|
2018-08-09 14:17:50 -07:00
|
|
|
/// # Arguments
|
|
|
|
/// * `sock` - Socket to send from.
|
|
|
|
/// * `exit` - Boolean to signal system exit.
|
2018-10-08 19:55:54 -07:00
|
|
|
/// * `cluster_info` - ClusterInfo structure
|
2018-08-09 14:17:50 -07:00
|
|
|
/// * `window` - Cache of blobs that we have broadcast
|
|
|
|
/// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
2018-12-06 12:52:47 -08:00
|
|
|
/// * `exit_sender` - Set to true when this service exits, allows rest of Tpu to exit cleanly.
|
|
|
|
/// Otherwise, when a Tpu closes, it only closes the stages that come after it. The stages
|
2018-09-14 14:34:32 -07:00
|
|
|
/// that come before could be blocked on a receive, and never notice that they need to
|
|
|
|
/// exit. Now, if any stage of the Tpu closes, it will lead to closing the WriteStage (b/c
|
2018-12-06 12:52:47 -08:00
|
|
|
/// WriteStage is the last stage in the pipeline), which will then close Broadcast service,
|
2018-09-14 14:34:32 -07:00
|
|
|
/// which will then close FetchStage in the Tpu, and then the rest of the Tpu,
|
|
|
|
/// completing the cycle.
|
2019-02-07 15:10:54 -08:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2019-06-19 00:13:19 -07:00
|
|
|
fn new(
|
2018-08-09 14:17:50 -07:00
|
|
|
sock: UdpSocket,
|
2018-10-08 19:55:54 -07:00
|
|
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
2019-09-18 12:16:22 -07:00
|
|
|
receiver: Receiver<WorkingBankEntry>,
|
2019-03-04 19:53:50 -08:00
|
|
|
exit_sender: &Arc<AtomicBool>,
|
2019-02-11 17:56:52 -08:00
|
|
|
blocktree: &Arc<Blocktree>,
|
2019-06-19 00:13:19 -07:00
|
|
|
broadcast_stage_run: impl BroadcastRun + Send + 'static,
|
2019-01-08 20:11:04 -08:00
|
|
|
) -> Self {
|
2019-02-11 17:56:52 -08:00
|
|
|
let blocktree = blocktree.clone();
|
2019-03-04 19:53:50 -08:00
|
|
|
let exit_sender = exit_sender.clone();
|
2018-08-09 14:17:50 -07:00
|
|
|
let thread_hdl = Builder::new()
|
|
|
|
.name("solana-broadcaster".to_string())
|
2018-09-14 14:34:32 -07:00
|
|
|
.spawn(move || {
|
2019-03-04 19:53:50 -08:00
|
|
|
let _finalizer = Finalizer::new(exit_sender);
|
2019-06-19 00:13:19 -07:00
|
|
|
Self::run(
|
|
|
|
&sock,
|
|
|
|
&cluster_info,
|
|
|
|
&receiver,
|
|
|
|
&blocktree,
|
|
|
|
broadcast_stage_run,
|
|
|
|
)
|
2018-12-07 19:01:28 -08:00
|
|
|
})
|
|
|
|
.unwrap();
|
2018-08-09 14:17:50 -07:00
|
|
|
|
2019-01-08 20:11:04 -08:00
|
|
|
Self { thread_hdl }
|
2018-08-09 14:17:50 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-01 19:43:30 -08:00
|
|
|
impl Service for BroadcastStage {
|
|
|
|
type JoinReturnType = BroadcastStageReturnType;
|
2018-08-09 14:17:50 -07:00
|
|
|
|
2019-03-01 19:43:30 -08:00
|
|
|
fn join(self) -> thread::Result<BroadcastStageReturnType> {
|
2018-09-14 00:17:40 -07:00
|
|
|
self.thread_hdl.join()
|
2018-08-09 14:17:50 -07:00
|
|
|
}
|
2018-08-09 11:54:23 -07:00
|
|
|
}
|
2018-12-12 15:58:29 -08:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
2019-02-26 17:11:26 -08:00
|
|
|
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
|
2018-12-12 15:58:29 -08:00
|
|
|
use crate::cluster_info::{ClusterInfo, Node};
|
2019-01-09 14:33:44 -08:00
|
|
|
use crate::entry::create_ticks;
|
2019-05-22 20:39:00 -07:00
|
|
|
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
2018-12-12 15:58:29 -08:00
|
|
|
use crate::service::Service;
|
2019-03-03 16:44:06 -08:00
|
|
|
use solana_runtime::bank::Bank;
|
2018-12-12 15:58:29 -08:00
|
|
|
use solana_sdk::hash::Hash;
|
2019-06-19 00:13:19 -07:00
|
|
|
use solana_sdk::pubkey::Pubkey;
|
2018-12-12 15:58:29 -08:00
|
|
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
2019-07-30 15:53:41 -07:00
|
|
|
use std::path::Path;
|
2018-12-12 15:58:29 -08:00
|
|
|
use std::sync::atomic::AtomicBool;
|
|
|
|
use std::sync::mpsc::channel;
|
|
|
|
use std::sync::{Arc, RwLock};
|
|
|
|
use std::thread::sleep;
|
|
|
|
use std::time::Duration;
|
|
|
|
|
2019-03-01 19:43:30 -08:00
|
|
|
struct MockBroadcastStage {
|
2019-02-07 20:52:39 -08:00
|
|
|
blocktree: Arc<Blocktree>,
|
2019-03-01 19:43:30 -08:00
|
|
|
broadcast_service: BroadcastStage,
|
2019-03-03 16:44:06 -08:00
|
|
|
bank: Arc<Bank>,
|
2018-12-12 15:58:29 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
fn setup_dummy_broadcast_service(
|
2019-03-09 19:28:43 -08:00
|
|
|
leader_pubkey: &Pubkey,
|
2019-07-30 15:53:41 -07:00
|
|
|
ledger_path: &Path,
|
2019-09-18 12:16:22 -07:00
|
|
|
entry_receiver: Receiver<WorkingBankEntry>,
|
2019-03-01 19:43:30 -08:00
|
|
|
) -> MockBroadcastStage {
|
2018-12-12 15:58:29 -08:00
|
|
|
// Make the database ledger
|
2019-02-07 20:52:39 -08:00
|
|
|
let blocktree = Arc::new(Blocktree::open(ledger_path).unwrap());
|
2018-12-12 15:58:29 -08:00
|
|
|
|
|
|
|
// Make the leader node and scheduler
|
|
|
|
let leader_info = Node::new_localhost_with_pubkey(leader_pubkey);
|
|
|
|
|
|
|
|
// Make a node to broadcast to
|
|
|
|
let buddy_keypair = Keypair::new();
|
2019-03-09 19:28:43 -08:00
|
|
|
let broadcast_buddy = Node::new_localhost_with_pubkey(&buddy_keypair.pubkey());
|
2018-12-12 15:58:29 -08:00
|
|
|
|
|
|
|
// Fill the cluster_info with the buddy's info
|
2019-03-06 13:47:18 -08:00
|
|
|
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone());
|
2018-12-12 15:58:29 -08:00
|
|
|
cluster_info.insert_info(broadcast_buddy.info);
|
|
|
|
let cluster_info = Arc::new(RwLock::new(cluster_info));
|
|
|
|
|
|
|
|
let exit_sender = Arc::new(AtomicBool::new(false));
|
2019-04-18 21:56:43 -07:00
|
|
|
|
2019-05-22 20:39:00 -07:00
|
|
|
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000);
|
2019-04-18 21:56:43 -07:00
|
|
|
let bank = Arc::new(Bank::new(&genesis_block));
|
2018-12-12 15:58:29 -08:00
|
|
|
|
|
|
|
// Start up the broadcast stage
|
2019-03-01 19:43:30 -08:00
|
|
|
let broadcast_service = BroadcastStage::new(
|
2018-12-12 15:58:29 -08:00
|
|
|
leader_info.sockets.broadcast,
|
|
|
|
cluster_info,
|
|
|
|
entry_receiver,
|
2019-03-04 19:53:50 -08:00
|
|
|
&exit_sender,
|
2019-02-11 17:56:52 -08:00
|
|
|
&blocktree,
|
2019-06-19 00:13:19 -07:00
|
|
|
StandardBroadcastRun::new(),
|
2018-12-12 15:58:29 -08:00
|
|
|
);
|
|
|
|
|
2019-03-01 19:43:30 -08:00
|
|
|
MockBroadcastStage {
|
2019-02-07 20:52:39 -08:00
|
|
|
blocktree,
|
2018-12-12 15:58:29 -08:00
|
|
|
broadcast_service,
|
2019-03-03 16:44:06 -08:00
|
|
|
bank,
|
2018-12-12 15:58:29 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_broadcast_ledger() {
|
2019-04-18 21:56:43 -07:00
|
|
|
solana_logger::setup();
|
2019-02-07 15:10:54 -08:00
|
|
|
let ledger_path = get_tmp_ledger_path("test_broadcast_ledger");
|
2019-04-18 21:56:43 -07:00
|
|
|
|
2018-12-12 15:58:29 -08:00
|
|
|
{
|
|
|
|
// Create the leader scheduler
|
|
|
|
let leader_keypair = Keypair::new();
|
|
|
|
|
2019-01-08 20:11:04 -08:00
|
|
|
let (entry_sender, entry_receiver) = channel();
|
2018-12-12 15:58:29 -08:00
|
|
|
let broadcast_service = setup_dummy_broadcast_service(
|
2019-03-09 19:28:43 -08:00
|
|
|
&leader_keypair.pubkey(),
|
2018-12-12 15:58:29 -08:00
|
|
|
&ledger_path,
|
2019-01-08 20:11:04 -08:00
|
|
|
entry_receiver,
|
2018-12-12 15:58:29 -08:00
|
|
|
);
|
2019-08-20 17:16:06 -07:00
|
|
|
let start_tick_height;
|
|
|
|
let max_tick_height;
|
|
|
|
let ticks_per_slot;
|
2019-08-26 18:27:45 -07:00
|
|
|
let slot;
|
2019-08-20 17:16:06 -07:00
|
|
|
{
|
|
|
|
let bank = broadcast_service.bank.clone();
|
|
|
|
start_tick_height = bank.tick_height();
|
|
|
|
max_tick_height = bank.max_tick_height();
|
|
|
|
ticks_per_slot = bank.ticks_per_slot();
|
2019-08-26 18:27:45 -07:00
|
|
|
slot = bank.slot();
|
2019-08-20 17:16:06 -07:00
|
|
|
let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default());
|
|
|
|
for (i, tick) in ticks.into_iter().enumerate() {
|
|
|
|
entry_sender
|
2019-09-18 12:16:22 -07:00
|
|
|
.send((bank.clone(), (tick, i as u64 + 1)))
|
2019-08-20 17:16:06 -07:00
|
|
|
.expect("Expect successful send to broadcast service");
|
|
|
|
}
|
2018-12-12 15:58:29 -08:00
|
|
|
}
|
|
|
|
sleep(Duration::from_millis(2000));
|
2019-04-18 21:56:43 -07:00
|
|
|
|
|
|
|
trace!(
|
|
|
|
"[broadcast_ledger] max_tick_height: {}, start_tick_height: {}, ticks_per_slot: {}",
|
|
|
|
max_tick_height,
|
|
|
|
start_tick_height,
|
|
|
|
ticks_per_slot,
|
|
|
|
);
|
|
|
|
|
2019-02-07 20:52:39 -08:00
|
|
|
let blocktree = broadcast_service.blocktree;
|
2019-08-26 18:27:45 -07:00
|
|
|
let (entries, _) = blocktree
|
|
|
|
.get_slot_entries_with_shred_count(slot, 0)
|
|
|
|
.expect("Expect entries to be present");
|
|
|
|
assert_eq!(entries.len(), max_tick_height as usize);
|
2018-12-12 15:58:29 -08:00
|
|
|
|
2019-01-08 20:11:04 -08:00
|
|
|
drop(entry_sender);
|
2018-12-12 15:58:29 -08:00
|
|
|
broadcast_service
|
|
|
|
.broadcast_service
|
|
|
|
.join()
|
|
|
|
.expect("Expect successful join of broadcast service");
|
|
|
|
}
|
|
|
|
|
2019-02-07 20:52:39 -08:00
|
|
|
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
|
2018-12-12 15:58:29 -08:00
|
|
|
}
|
|
|
|
}
|