Connect TPU's broadcast service with TVU's blob fetch stage (#2587)

* Connect TPU's broadcast service with TVU's blob fetch stage

- This is needed since ledger is being written only in TVU now

* fix clippy warnings

* fix failing test

* fix broken tests

* fixed failing tests
This commit is contained in:
Pankaj Garg 2019-01-31 13:43:22 -08:00 committed by GitHub
parent 2dd20c38b2
commit 32162ef0f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 80 additions and 31 deletions

View File

@ -1,10 +1,9 @@
//! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel.
use crate::service::Service;
use crate::streamer::{self, BlobReceiver};
use crate::streamer::{self, BlobSender};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
@ -14,21 +13,20 @@ pub struct BlobFetchStage {
}
impl BlobFetchStage {
#[allow(clippy::new_ret_no_self)]
pub fn new(socket: Arc<UdpSocket>, exit: Arc<AtomicBool>) -> (Self, BlobReceiver) {
Self::new_multi_socket(vec![socket], exit)
pub fn new(socket: Arc<UdpSocket>, sender: &BlobSender, exit: Arc<AtomicBool>) -> Self {
Self::new_multi_socket(vec![socket], sender, exit)
}
pub fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
sender: &BlobSender,
exit: Arc<AtomicBool>,
) -> (Self, BlobReceiver) {
let (sender, receiver) = channel();
) -> Self {
let thread_hdls: Vec<_> = sockets
.into_iter()
.map(|socket| streamer::blob_receiver(socket, exit.clone(), sender.clone()))
.collect();
(Self { exit, thread_hdls }, receiver)
Self { exit, thread_hdls }
}
pub fn close(&self) {

View File

@ -11,6 +11,7 @@ use crate::leader_scheduler::LeaderScheduler;
use crate::packet::index_blobs;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::streamer::BlobSender;
use log::Level;
use rayon::prelude::*;
use solana_metrics::{influxdb, submit};
@ -46,6 +47,7 @@ impl Broadcast {
receiver: &Receiver<Vec<Entry>>,
sock: &UdpSocket,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
blob_sender: &BlobSender,
) -> Result<()> {
let timer = Duration::new(1, 0);
let entries = receiver.recv_timeout(timer)?;
@ -91,6 +93,8 @@ impl Broadcast {
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
blob_sender.send(blobs.clone())?;
// don't count coding blobs in the blob indexes
self.blob_index += blobs.len() as u64;
@ -191,6 +195,7 @@ impl BroadcastService {
receiver: &Receiver<Vec<Entry>>,
max_tick_height: Option<u64>,
exit_signal: &Arc<AtomicBool>,
blob_sender: &BlobSender,
) -> BroadcastServiceReturnType {
let me = cluster_info.read().unwrap().my_data().clone();
@ -210,7 +215,13 @@ impl BroadcastService {
// Layer 1, leader nodes are limited to the fanout size.
broadcast_table.truncate(DATA_PLANE_FANOUT);
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
if let Err(e) = broadcast.run(&broadcast_table, receiver, sock, leader_scheduler) {
if let Err(e) = broadcast.run(
&broadcast_table,
receiver,
sock,
leader_scheduler,
blob_sender,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
return BroadcastServiceReturnType::ChannelDisconnected;
@ -250,8 +261,10 @@ impl BroadcastService {
receiver: Receiver<Vec<Entry>>,
max_tick_height: Option<u64>,
exit_sender: Arc<AtomicBool>,
blob_sender: &BlobSender,
) -> Self {
let exit_signal = Arc::new(AtomicBool::new(false));
let blob_sender = blob_sender.clone();
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
@ -265,6 +278,7 @@ impl BroadcastService {
&receiver,
max_tick_height,
&exit_signal,
&blob_sender,
)
})
.unwrap();
@ -328,6 +342,8 @@ mod test {
let exit_sender = Arc::new(AtomicBool::new(false));
let bank = Arc::new(Bank::default());
let (blob_fetch_sender, _) = channel();
// Start up the broadcast stage
let broadcast_service = BroadcastService::new(
bank.clone(),
@ -338,6 +354,7 @@ mod test {
entry_receiver,
Some(max_tick_height),
exit_sender,
&blob_fetch_sender,
);
MockBroadcastService {

View File

@ -11,6 +11,7 @@ use crate::rpc::JsonRpcService;
use crate::rpc_pubsub::PubSubService;
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::streamer::BlobSender;
use crate::tpu::{Tpu, TpuReturnType};
use crate::tvu::{Sockets, Tvu, TvuReturnType};
use crate::vote_signer_proxy::VoteSignerProxy;
@ -100,6 +101,7 @@ pub struct Fullnode {
broadcast_socket: UdpSocket,
pub node_services: NodeServices,
pub role_notifiers: (TvuRotationReceiver, TpuRotationReceiver),
blob_sender: BlobSender,
}
impl Fullnode {
@ -219,7 +221,7 @@ impl Fullnode {
let (to_leader_sender, to_leader_receiver) = channel();
let (to_validator_sender, to_validator_receiver) = channel();
let tvu = Tvu::new(
let (tvu, blob_sender) = Tvu::new(
vote_signer_option,
&bank,
entry_height,
@ -257,6 +259,7 @@ impl Fullnode {
id,
scheduled_leader == id,
&to_validator_sender,
&blob_sender,
);
inc_new_counter_info!("fullnode-new", 1);
@ -274,6 +277,7 @@ impl Fullnode {
tpu_sockets: node.sockets.tpu,
broadcast_socket: node.sockets.broadcast,
role_notifiers: (to_leader_receiver, to_validator_receiver),
blob_sender,
}
}
@ -333,6 +337,7 @@ impl Fullnode {
&last_id,
self.id,
&to_validator_sender,
&self.blob_sender,
)
}

View File

@ -167,8 +167,9 @@ impl Replicator {
let mut blob_sockets: Vec<Arc<UdpSocket>> =
node.sockets.tvu.into_iter().map(Arc::new).collect();
blob_sockets.push(repair_socket.clone());
let (fetch_stage, blob_fetch_receiver) =
BlobFetchStage::new_multi_socket(blob_sockets, exit.clone());
let (blob_fetch_sender, blob_fetch_receiver) = channel();
let fetch_stage =
BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, exit.clone());
// todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel();
@ -189,6 +190,7 @@ impl Replicator {
leader_pubkey,
))),
done.clone(),
exit.clone(),
);
info!("window created, waiting for ledger download done");

View File

@ -123,7 +123,7 @@ pub struct RetransmitStage {
}
impl RetransmitStage {
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new(
bank: &Arc<Bank>,
db_ledger: Arc<DbLedger>,
@ -134,6 +134,7 @@ impl RetransmitStage {
repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: BlobReceiver,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
exit: Arc<AtomicBool>,
) -> (Self, Receiver<Vec<Entry>>) {
let (retransmit_sender, retransmit_receiver) = channel();
@ -157,6 +158,7 @@ impl RetransmitStage {
repair_socket,
leader_scheduler,
done,
exit,
);
let thread_hdls = vec![t_retransmit, t_window];

View File

@ -10,6 +10,7 @@ use crate::fullnode::TpuRotationSender;
use crate::poh_service::Config;
use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage;
use crate::streamer::BlobSender;
use crate::tpu_forwarder::TpuForwarder;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
@ -81,6 +82,7 @@ impl Tpu {
leader_id: Pubkey,
is_leader: bool,
to_validator_sender: &TpuRotationSender,
blob_sender: &BlobSender,
) -> Self {
let exit = Arc::new(AtomicBool::new(false));
@ -110,6 +112,7 @@ impl Tpu {
entry_receiver,
max_tick_height,
exit.clone(),
blob_sender,
);
let svcs = LeaderServices::new(
@ -162,6 +165,7 @@ impl Tpu {
last_entry_id: &Hash,
leader_id: Pubkey,
to_validator_sender: &TpuRotationSender,
blob_sender: &BlobSender,
) {
match &self.tpu_mode {
TpuMode::Leader(svcs) => {
@ -197,6 +201,7 @@ impl Tpu {
entry_receiver,
max_tick_height,
self.exit.clone(),
blob_sender,
);
let svcs = LeaderServices::new(

View File

@ -21,11 +21,13 @@ use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
use crate::service::Service;
use crate::storage_stage::{StorageStage, StorageState};
use crate::streamer::BlobSender;
use crate::vote_signer_proxy::VoteSignerProxy;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread;
@ -60,7 +62,7 @@ impl Tvu {
/// * `cluster_info` - The cluster_info state.
/// * `sockets` - My fetch, repair, and restransmit sockets
/// * `db_ledger` - the ledger itself
#[allow(clippy::too_many_arguments)]
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new(
vote_signer: Option<Arc<VoteSignerProxy>>,
bank: &Arc<Bank>,
@ -73,7 +75,7 @@ impl Tvu {
to_leader_sender: TvuRotationSender,
storage_state: &StorageState,
entry_stream: Option<String>,
) -> Self {
) -> (Self, BlobSender) {
let exit = Arc::new(AtomicBool::new(false));
let keypair: Arc<Keypair> = cluster_info
.read()
@ -87,12 +89,14 @@ impl Tvu {
retransmit: retransmit_socket,
} = sockets;
let (blob_fetch_sender, blob_fetch_receiver) = channel();
let repair_socket = Arc::new(repair_socket);
let mut blob_sockets: Vec<Arc<UdpSocket>> =
fetch_sockets.into_iter().map(Arc::new).collect();
blob_sockets.push(repair_socket.clone());
let (fetch_stage, blob_fetch_receiver) =
BlobFetchStage::new_multi_socket(blob_sockets, exit.clone());
let fetch_stage =
BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, exit.clone());
//TODO
//the packets coming out of blob_receiver need to be sent to the GPU and verified
@ -107,6 +111,7 @@ impl Tvu {
repair_socket,
blob_fetch_receiver,
bank.leader_scheduler.clone(),
exit.clone(),
);
let l_entry_height = Arc::new(RwLock::new(entry_height));
@ -136,15 +141,18 @@ impl Tvu {
&cluster_info,
);
Tvu {
fetch_stage,
retransmit_stage,
replay_stage,
storage_stage,
exit,
last_entry_id: l_last_entry_id,
entry_height: l_entry_height,
}
(
Tvu {
fetch_stage,
retransmit_stage,
replay_stage,
storage_stage,
exit,
last_entry_id: l_last_entry_id,
entry_height: l_entry_height,
},
blob_fetch_sender,
)
}
pub fn get_state(&self) -> (Hash, u64) {
@ -285,7 +293,7 @@ pub mod tests {
let vote_account_keypair = Arc::new(Keypair::new());
let vote_signer = VoteSignerProxy::new_local(&vote_account_keypair);
let (sender, _) = channel();
let tvu = Tvu::new(
let (tvu, _) = Tvu::new(
Some(Arc::new(vote_signer)),
&bank,
0,

View File

@ -15,7 +15,7 @@ use solana_metrics::{influxdb, submit};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle};
@ -129,6 +129,7 @@ pub fn window_service(
repair_socket: Arc<UdpSocket>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
done: Arc<AtomicBool>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-window".to_string())
@ -139,6 +140,9 @@ pub fn window_service(
let id = cluster_info.read().unwrap().id();
trace!("{}: RECV_WINDOW started", id);
loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = recv_window(
&db_ledger,
&id,
@ -273,6 +277,7 @@ mod test {
Arc::new(tn.sockets.repair),
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),
done,
exit.clone(),
);
let t_responder = {
let (s_responder, r_responder) = channel();
@ -342,6 +347,7 @@ mod test {
Arc::new(tn.sockets.repair),
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),
done,
exit.clone(),
);
let t_responder = {
let (s_responder, r_responder) = channel();

View File

@ -29,6 +29,7 @@ use std::env;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::{sleep, Builder, JoinHandle};
use std::time::{Duration, Instant};
@ -1708,9 +1709,14 @@ fn test_broadcast_last_tick() {
let blob_fetch_stages: Vec<_> = listening_nodes
.iter_mut()
.map(|(_, _, node, _)| {
BlobFetchStage::new(
Arc::new(node.sockets.tvu.pop().unwrap()),
blob_receiver_exit.clone(),
let (blob_fetch_sender, blob_fetch_receiver) = channel();
(
BlobFetchStage::new(
Arc::new(node.sockets.tvu.pop().unwrap()),
&blob_fetch_sender,
blob_receiver_exit.clone(),
),
blob_fetch_receiver,
)
})
.collect();