removes erroneous uses of &Arc<...> from window-service
This commit is contained in:
parent
d57398a959
commit
b64eeb7729
|
@ -601,9 +601,9 @@ impl RetransmitStage {
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
retransmit_sender,
|
retransmit_sender,
|
||||||
repair_socket,
|
repair_socket,
|
||||||
exit,
|
exit.clone(),
|
||||||
repair_info,
|
repair_info,
|
||||||
leader_schedule_cache,
|
leader_schedule_cache.clone(),
|
||||||
move |id, shred, working_bank, last_root| {
|
move |id, shred, working_bank, last_root| {
|
||||||
let is_connected = cfg
|
let is_connected = cfg
|
||||||
.as_ref()
|
.as_ref()
|
||||||
|
|
|
@ -1,43 +1,47 @@
|
||||||
//! `window_service` handles the data plane incoming shreds, storing them in
|
//! `window_service` handles the data plane incoming shreds, storing them in
|
||||||
//! blockstore and retransmitting where required
|
//! blockstore and retransmitting where required
|
||||||
//!
|
//!
|
||||||
use crate::{
|
use {
|
||||||
|
crate::{
|
||||||
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
|
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
|
||||||
cluster_info_vote_listener::VerifiedVoteReceiver,
|
cluster_info_vote_listener::VerifiedVoteReceiver,
|
||||||
completed_data_sets_service::CompletedDataSetsSender,
|
completed_data_sets_service::CompletedDataSetsSender,
|
||||||
repair_response,
|
repair_response,
|
||||||
repair_service::{OutstandingShredRepairs, RepairInfo, RepairService},
|
repair_service::{OutstandingShredRepairs, RepairInfo, RepairService},
|
||||||
result::{Error, Result},
|
result::{Error, Result},
|
||||||
};
|
},
|
||||||
use crossbeam_channel::{
|
crossbeam_channel::{
|
||||||
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
|
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
|
||||||
};
|
},
|
||||||
use rayon::{prelude::*, ThreadPool};
|
rayon::{prelude::*, ThreadPool},
|
||||||
use solana_gossip::cluster_info::ClusterInfo;
|
solana_gossip::cluster_info::ClusterInfo,
|
||||||
use solana_ledger::{
|
solana_ledger::{
|
||||||
blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT},
|
blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT},
|
||||||
leader_schedule_cache::LeaderScheduleCache,
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
shred::{Nonce, Shred},
|
shred::{Nonce, Shred},
|
||||||
};
|
},
|
||||||
use solana_measure::measure::Measure;
|
solana_measure::measure::Measure,
|
||||||
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
|
solana_metrics::{inc_new_counter_debug, inc_new_counter_error},
|
||||||
use solana_perf::packet::{Packet, Packets};
|
solana_perf::packet::{Packet, Packets},
|
||||||
use solana_rayon_threadlimit::get_thread_count;
|
solana_rayon_threadlimit::get_thread_count,
|
||||||
use solana_runtime::{bank::Bank, bank_forks::BankForks};
|
solana_runtime::{bank::Bank, bank_forks::BankForks},
|
||||||
use solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms};
|
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms},
|
||||||
use solana_streamer::streamer::PacketSender;
|
solana_streamer::streamer::PacketSender,
|
||||||
use std::collections::HashSet;
|
std::collections::HashSet,
|
||||||
use std::{
|
std::{
|
||||||
net::{SocketAddr, UdpSocket},
|
net::{SocketAddr, UdpSocket},
|
||||||
ops::Deref,
|
ops::Deref,
|
||||||
sync::atomic::{AtomicBool, Ordering},
|
sync::{
|
||||||
sync::{Arc, RwLock},
|
atomic::{AtomicBool, Ordering},
|
||||||
|
Arc, RwLock,
|
||||||
|
},
|
||||||
thread::{self, Builder, JoinHandle},
|
thread::{self, Builder, JoinHandle},
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub type DuplicateSlotSender = CrossbeamSender<Slot>;
|
type DuplicateSlotSender = CrossbeamSender<Slot>;
|
||||||
pub type DuplicateSlotReceiver = CrossbeamReceiver<Slot>;
|
pub(crate) type DuplicateSlotReceiver = CrossbeamReceiver<Slot>;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct WindowServiceMetrics {
|
struct WindowServiceMetrics {
|
||||||
|
@ -48,7 +52,7 @@ struct WindowServiceMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WindowServiceMetrics {
|
impl WindowServiceMetrics {
|
||||||
pub fn report_metrics(&self, metric_name: &'static str) {
|
fn report_metrics(&self, metric_name: &'static str) {
|
||||||
datapoint_info!(
|
datapoint_info!(
|
||||||
metric_name,
|
metric_name,
|
||||||
("run_insert_count", self.run_insert_count as i64, i64),
|
("run_insert_count", self.run_insert_count as i64, i64),
|
||||||
|
@ -79,10 +83,10 @@ fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
|
||||||
|
|
||||||
/// drop shreds that are from myself or not from the correct leader for the
|
/// drop shreds that are from myself or not from the correct leader for the
|
||||||
/// shred's slot
|
/// shred's slot
|
||||||
pub fn should_retransmit_and_persist(
|
pub(crate) fn should_retransmit_and_persist(
|
||||||
shred: &Shred,
|
shred: &Shred,
|
||||||
bank: Option<Arc<Bank>>,
|
bank: Option<Arc<Bank>>,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &LeaderScheduleCache,
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
root: u64,
|
root: u64,
|
||||||
shred_version: u16,
|
shred_version: u16,
|
||||||
|
@ -171,7 +175,7 @@ fn verify_repair(
|
||||||
fn prune_shreds_invalid_repair(
|
fn prune_shreds_invalid_repair(
|
||||||
shreds: &mut Vec<Shred>,
|
shreds: &mut Vec<Shred>,
|
||||||
repair_infos: &mut Vec<Option<RepairMeta>>,
|
repair_infos: &mut Vec<Option<RepairMeta>>,
|
||||||
outstanding_requests: &Arc<RwLock<OutstandingShredRepairs>>,
|
outstanding_requests: &RwLock<OutstandingShredRepairs>,
|
||||||
) {
|
) {
|
||||||
assert_eq!(shreds.len(), repair_infos.len());
|
assert_eq!(shreds.len(), repair_infos.len());
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
|
@ -197,13 +201,13 @@ fn prune_shreds_invalid_repair(
|
||||||
|
|
||||||
fn run_insert<F>(
|
fn run_insert<F>(
|
||||||
shred_receiver: &CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
shred_receiver: &CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Blockstore,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &LeaderScheduleCache,
|
||||||
handle_duplicate: F,
|
handle_duplicate: F,
|
||||||
metrics: &mut BlockstoreInsertionMetrics,
|
metrics: &mut BlockstoreInsertionMetrics,
|
||||||
ws_metrics: &mut WindowServiceMetrics,
|
ws_metrics: &mut WindowServiceMetrics,
|
||||||
completed_data_sets_sender: &CompletedDataSetsSender,
|
completed_data_sets_sender: &CompletedDataSetsSender,
|
||||||
outstanding_requests: &Arc<RwLock<OutstandingShredRepairs>>,
|
outstanding_requests: &RwLock<OutstandingShredRepairs>,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
F: Fn(Shred),
|
F: Fn(Shred),
|
||||||
|
@ -250,9 +254,9 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recv_window<F>(
|
fn recv_window<F>(
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Blockstore,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &LeaderScheduleCache,
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
|
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
|
||||||
|
@ -361,7 +365,7 @@ impl Drop for Finalizer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WindowService {
|
pub(crate) struct WindowService {
|
||||||
t_window: JoinHandle<()>,
|
t_window: JoinHandle<()>,
|
||||||
t_insert: JoinHandle<()>,
|
t_insert: JoinHandle<()>,
|
||||||
t_check_duplicate: JoinHandle<()>,
|
t_check_duplicate: JoinHandle<()>,
|
||||||
|
@ -370,14 +374,14 @@ pub struct WindowService {
|
||||||
|
|
||||||
impl WindowService {
|
impl WindowService {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn new<F>(
|
pub(crate) fn new<F>(
|
||||||
blockstore: Arc<Blockstore>,
|
blockstore: Arc<Blockstore>,
|
||||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||||
retransmit: PacketSender,
|
retransmit: PacketSender,
|
||||||
repair_socket: Arc<UdpSocket>,
|
repair_socket: Arc<UdpSocket>,
|
||||||
exit: &Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
repair_info: RepairInfo,
|
repair_info: RepairInfo,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||||
shred_filter: F,
|
shred_filter: F,
|
||||||
verified_vote_receiver: VerifiedVoteReceiver,
|
verified_vote_receiver: VerifiedVoteReceiver,
|
||||||
completed_data_sets_sender: CompletedDataSetsSender,
|
completed_data_sets_sender: CompletedDataSetsSender,
|
||||||
|
@ -418,9 +422,9 @@ impl WindowService {
|
||||||
);
|
);
|
||||||
|
|
||||||
let t_insert = Self::start_window_insert_thread(
|
let t_insert = Self::start_window_insert_thread(
|
||||||
exit,
|
exit.clone(),
|
||||||
&blockstore,
|
blockstore.clone(),
|
||||||
leader_schedule_cache,
|
leader_schedule_cache.clone(),
|
||||||
insert_receiver,
|
insert_receiver,
|
||||||
duplicate_sender,
|
duplicate_sender,
|
||||||
completed_data_sets_sender,
|
completed_data_sets_sender,
|
||||||
|
@ -430,12 +434,12 @@ impl WindowService {
|
||||||
let t_window = Self::start_recv_window_thread(
|
let t_window = Self::start_recv_window_thread(
|
||||||
id,
|
id,
|
||||||
exit,
|
exit,
|
||||||
&blockstore,
|
blockstore,
|
||||||
insert_sender,
|
insert_sender,
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
shred_filter,
|
shred_filter,
|
||||||
leader_schedule_cache,
|
leader_schedule_cache,
|
||||||
&bank_forks,
|
bank_forks,
|
||||||
retransmit,
|
retransmit,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -480,17 +484,14 @@ impl WindowService {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_window_insert_thread(
|
fn start_window_insert_thread(
|
||||||
exit: &Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: Arc<Blockstore>,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||||
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||||
check_duplicate_sender: CrossbeamSender<Shred>,
|
check_duplicate_sender: CrossbeamSender<Shred>,
|
||||||
completed_data_sets_sender: CompletedDataSetsSender,
|
completed_data_sets_sender: CompletedDataSetsSender,
|
||||||
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
|
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
let exit = exit.clone();
|
|
||||||
let blockstore = blockstore.clone();
|
|
||||||
let leader_schedule_cache = leader_schedule_cache.clone();
|
|
||||||
let mut handle_timeout = || {};
|
let mut handle_timeout = || {};
|
||||||
let handle_error = || {
|
let handle_error = || {
|
||||||
inc_new_counter_error!("solana-window-insert-error", 1, 1);
|
inc_new_counter_error!("solana-window-insert-error", 1, 1);
|
||||||
|
@ -540,13 +541,13 @@ impl WindowService {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn start_recv_window_thread<F>(
|
fn start_recv_window_thread<F>(
|
||||||
id: Pubkey,
|
id: Pubkey,
|
||||||
exit: &Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: Arc<Blockstore>,
|
||||||
insert_sender: CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
insert_sender: CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||||
shred_filter: F,
|
shred_filter: F,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
bank_forks: Arc<RwLock<BankForks>>,
|
||||||
retransmit: PacketSender,
|
retransmit: PacketSender,
|
||||||
) -> JoinHandle<()>
|
) -> JoinHandle<()>
|
||||||
where
|
where
|
||||||
|
@ -555,10 +556,6 @@ impl WindowService {
|
||||||
+ std::marker::Send
|
+ std::marker::Send
|
||||||
+ std::marker::Sync,
|
+ std::marker::Sync,
|
||||||
{
|
{
|
||||||
let exit = exit.clone();
|
|
||||||
let blockstore = blockstore.clone();
|
|
||||||
let bank_forks = bank_forks.clone();
|
|
||||||
let leader_schedule_cache = leader_schedule_cache.clone();
|
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-window".to_string())
|
.name("solana-window".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
|
@ -633,7 +630,7 @@ impl WindowService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn join(self) -> thread::Result<()> {
|
pub(crate) fn join(self) -> thread::Result<()> {
|
||||||
self.t_window.join()?;
|
self.t_window.join()?;
|
||||||
self.t_insert.join()?;
|
self.t_insert.join()?;
|
||||||
self.t_check_duplicate.join()?;
|
self.t_check_duplicate.join()?;
|
||||||
|
@ -643,23 +640,24 @@ impl WindowService {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use {
|
||||||
use solana_entry::entry::{create_ticks, Entry};
|
super::*,
|
||||||
use solana_gossip::contact_info::ContactInfo;
|
solana_entry::entry::{create_ticks, Entry},
|
||||||
use solana_ledger::{
|
solana_gossip::contact_info::ContactInfo,
|
||||||
|
solana_ledger::{
|
||||||
blockstore::{make_many_slot_entries, Blockstore},
|
blockstore::{make_many_slot_entries, Blockstore},
|
||||||
genesis_utils::create_genesis_config_with_leader,
|
genesis_utils::create_genesis_config_with_leader,
|
||||||
get_tmp_ledger_path,
|
get_tmp_ledger_path,
|
||||||
shred::{DataShredHeader, Shredder},
|
shred::{DataShredHeader, Shredder},
|
||||||
};
|
},
|
||||||
use solana_sdk::{
|
solana_sdk::{
|
||||||
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
|
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
|
||||||
hash::Hash,
|
hash::Hash,
|
||||||
signature::{Keypair, Signer},
|
signature::{Keypair, Signer},
|
||||||
timing::timestamp,
|
timing::timestamp,
|
||||||
|
},
|
||||||
|
solana_streamer::socket::SocketAddrSpace,
|
||||||
};
|
};
|
||||||
use solana_streamer::socket::SocketAddrSpace;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
fn local_entries_to_shred(
|
fn local_entries_to_shred(
|
||||||
entries: &[Entry],
|
entries: &[Entry],
|
||||||
|
|
|
@ -797,7 +797,7 @@ impl Blockstore {
|
||||||
&self,
|
&self,
|
||||||
shreds: Vec<Shred>,
|
shreds: Vec<Shred>,
|
||||||
is_repaired: Vec<bool>,
|
is_repaired: Vec<bool>,
|
||||||
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
|
leader_schedule: Option<&LeaderScheduleCache>,
|
||||||
is_trusted: bool,
|
is_trusted: bool,
|
||||||
handle_duplicate: &F,
|
handle_duplicate: &F,
|
||||||
metrics: &mut BlockstoreInsertionMetrics,
|
metrics: &mut BlockstoreInsertionMetrics,
|
||||||
|
@ -1040,7 +1040,7 @@ impl Blockstore {
|
||||||
pub fn insert_shreds(
|
pub fn insert_shreds(
|
||||||
&self,
|
&self,
|
||||||
shreds: Vec<Shred>,
|
shreds: Vec<Shred>,
|
||||||
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
|
leader_schedule: Option<&LeaderScheduleCache>,
|
||||||
is_trusted: bool,
|
is_trusted: bool,
|
||||||
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)> {
|
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)> {
|
||||||
let shreds_len = shreds.len();
|
let shreds_len = shreds.len();
|
||||||
|
@ -1223,7 +1223,7 @@ impl Blockstore {
|
||||||
index_meta_time: &mut u64,
|
index_meta_time: &mut u64,
|
||||||
is_trusted: bool,
|
is_trusted: bool,
|
||||||
handle_duplicate: &F,
|
handle_duplicate: &F,
|
||||||
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
|
leader_schedule: Option<&LeaderScheduleCache>,
|
||||||
shred_source: ShredSource,
|
shred_source: ShredSource,
|
||||||
) -> std::result::Result<Vec<(u32, u32)>, InsertDataShredError>
|
) -> std::result::Result<Vec<(u32, u32)>, InsertDataShredError>
|
||||||
where
|
where
|
||||||
|
@ -1358,7 +1358,7 @@ impl Blockstore {
|
||||||
slot_meta: &SlotMeta,
|
slot_meta: &SlotMeta,
|
||||||
just_inserted_data_shreds: &HashMap<(u64, u64), Shred>,
|
just_inserted_data_shreds: &HashMap<(u64, u64), Shred>,
|
||||||
last_root: &RwLock<u64>,
|
last_root: &RwLock<u64>,
|
||||||
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
|
leader_schedule: Option<&LeaderScheduleCache>,
|
||||||
shred_source: ShredSource,
|
shred_source: ShredSource,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
use crate::shred::SHRED_PAYLOAD_SIZE;
|
use crate::shred::SHRED_PAYLOAD_SIZE;
|
||||||
|
|
Loading…
Reference in New Issue