Delete Service trait (#6921)

This commit is contained in:
Greg Fitzgerald 2019-11-13 11:12:09 -07:00 committed by GitHub
parent 4b1e9ada18
commit a3a830e1ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 57 additions and 222 deletions

View File

@ -9,7 +9,6 @@ use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_core::packet::to_packets_chunked; use solana_core::packet::to_packets_chunked;
use solana_core::poh_recorder::PohRecorder; use solana_core::poh_recorder::PohRecorder;
use solana_core::poh_recorder::WorkingBankEntry; use solana_core::poh_recorder::WorkingBankEntry;
use solana_core::service::Service;
use solana_ledger::bank_forks::BankForks; use solana_ledger::bank_forks::BankForks;
use solana_ledger::{blocktree::Blocktree, get_tmp_ledger_path}; use solana_ledger::{blocktree::Blocktree, get_tmp_ledger_path};
use solana_measure::measure::Measure; use solana_measure::measure::Measure;

View File

@ -1,8 +1,6 @@
#![feature(test)] #![feature(test)]
extern crate test; extern crate test;
#[macro_use]
extern crate solana_ledger;
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
use log::*; use log::*;
@ -14,7 +12,6 @@ use solana_core::cluster_info::Node;
use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_core::packet::to_packets_chunked; use solana_core::packet::to_packets_chunked;
use solana_core::poh_recorder::WorkingBankEntry; use solana_core::poh_recorder::WorkingBankEntry;
use solana_core::service::Service;
use solana_ledger::blocktree_processor::process_entries; use solana_ledger::blocktree_processor::process_entries;
use solana_ledger::entry::{next_hash, Entry}; use solana_ledger::entry::{next_hash, Entry};
use solana_ledger::{blocktree::Blocktree, get_tmp_ledger_path}; use solana_ledger::{blocktree::Blocktree, get_tmp_ledger_path};

View File

@ -7,7 +7,6 @@ use crossbeam_channel::unbounded;
use log::*; use log::*;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use solana_core::packet::to_packets_chunked; use solana_core::packet::to_packets_chunked;
use solana_core::service::Service;
use solana_core::sigverify::TransactionSigVerifier; use solana_core::sigverify::TransactionSigVerifier;
use solana_core::sigverify_stage::SigVerifyStage; use solana_core::sigverify_stage::SigVerifyStage;
use solana_perf::test_tx::test_tx; use solana_perf::test_tx::test_tx;

View File

@ -8,7 +8,6 @@ use crate::{
repair_service, repair_service,
repair_service::{RepairService, RepairSlotRange, RepairStrategy}, repair_service::{RepairService, RepairSlotRange, RepairStrategy},
result::{Error, Result}, result::{Error, Result},
service::Service,
shred_fetch_stage::ShredFetchStage, shred_fetch_stage::ShredFetchStage,
sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
storage_stage::NUM_STORAGE_SAMPLES, storage_stage::NUM_STORAGE_SAMPLES,

View File

@ -7,7 +7,6 @@ use crate::{
poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}, poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry},
poh_service::PohService, poh_service::PohService,
result::{Error, Result}, result::{Error, Result},
service::Service,
}; };
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools; use itertools::Itertools;
@ -925,12 +924,8 @@ impl BankingStage {
unprocessed_packets.push((packets, packet_indexes)); unprocessed_packets.push((packets, packet_indexes));
} }
} }
}
impl Service for BankingStage { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
for bank_thread_hdl in self.bank_thread_hdls { for bank_thread_hdl in self.bank_thread_hdls {
bank_thread_hdl.join()?; bank_thread_hdl.join()?;
} }

View File

@ -8,7 +8,6 @@ use crate::blockstream::MockBlockstream as Blockstream;
#[cfg(not(test))] #[cfg(not(test))]
use crate::blockstream::SocketBlockstream as Blockstream; use crate::blockstream::SocketBlockstream as Blockstream;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service;
use solana_ledger::blocktree::Blocktree; use solana_ledger::blocktree::Blocktree;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::path::Path; use std::path::Path;
@ -88,12 +87,8 @@ impl BlockstreamService {
} }
Ok(()) Ok(())
} }
}
impl Service for BlockstreamService { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_blockstream.join() self.t_blockstream.join()
} }
} }

View File

@ -5,7 +5,6 @@ use self::standard_broadcast_run::StandardBroadcastRun;
use crate::cluster_info::{ClusterInfo, ClusterInfoError}; use crate::cluster_info::{ClusterInfo, ClusterInfoError};
use crate::poh_recorder::WorkingBankEntry; use crate::poh_recorder::WorkingBankEntry;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service;
use solana_ledger::blocktree::Blocktree; use solana_ledger::blocktree::Blocktree;
use solana_ledger::staking_utils; use solana_ledger::staking_utils;
use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
@ -178,12 +177,8 @@ impl BroadcastStage {
Self { thread_hdl } Self { thread_hdl }
} }
}
impl Service for BroadcastStage { pub fn join(self) -> thread::Result<BroadcastStageReturnType> {
type JoinReturnType = BroadcastStageReturnType;
fn join(self) -> thread::Result<BroadcastStageReturnType> {
self.thread_hdl.join() self.thread_hdl.join()
} }
} }
@ -193,7 +188,6 @@ mod test {
use super::*; use super::*;
use crate::cluster_info::{ClusterInfo, Node}; use crate::cluster_info::{ClusterInfo, Node};
use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use crate::service::Service;
use solana_ledger::entry::create_ticks; use solana_ledger::entry::create_ticks;
use solana_ledger::{blocktree::Blocktree, get_tmp_ledger_path}; use solana_ledger::{blocktree::Blocktree, get_tmp_ledger_path};
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;

View File

@ -1,7 +1,6 @@
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::crds_value::EpochSlots; use crate::crds_value::EpochSlots;
use crate::result::Result; use crate::result::Result;
use crate::service::Service;
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use rand::SeedableRng; use rand::SeedableRng;
@ -515,12 +514,8 @@ impl ClusterInfoRepairListener {
fn get_last_ts(pubkey: &Pubkey, peer_infos: &mut HashMap<Pubkey, RepaireeInfo>) -> Option<u64> { fn get_last_ts(pubkey: &Pubkey, peer_infos: &mut HashMap<Pubkey, RepaireeInfo>) -> Option<u64> {
peer_infos.get(pubkey).map(|p| p.last_ts) peer_infos.get(pubkey).map(|p| p.last_ts)
} }
}
impl Service for ClusterInfoRepairListener { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }

View File

@ -2,7 +2,6 @@ use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS};
use crate::packet::Packets; use crate::packet::Packets;
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::result::Result; use crate::result::Result;
use crate::service::Service;
use crate::{packet, sigverify}; use crate::{packet, sigverify};
use crossbeam_channel::Sender as CrossbeamSender; use crossbeam_channel::Sender as CrossbeamSender;
use solana_metrics::inc_new_counter_debug; use solana_metrics::inc_new_counter_debug;
@ -72,12 +71,8 @@ impl ClusterInfoVoteListener {
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
} }
} }
}
impl Service for ClusterInfoVoteListener { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }

View File

@ -1,7 +1,4 @@
use crate::{ use crate::result::{Error, Result};
result::{Error, Result},
service::Service,
};
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::clock::Slot; use solana_sdk::clock::Slot;
use solana_vote_api::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY}; use solana_vote_api::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY};
@ -230,12 +227,8 @@ impl AggregateCommitmentService {
} }
} }
} }
}
impl Service for AggregateCommitmentService { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_commitment.join() self.t_commitment.join()
} }
} }

View File

@ -4,7 +4,6 @@ use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET;
use crate::packet::PacketsRecycler; use crate::packet::PacketsRecycler;
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service;
use crate::streamer::{self, PacketReceiver, PacketSender}; use crate::streamer::{self, PacketReceiver, PacketSender};
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info}; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info};
use solana_perf::recycler::Recycler; use solana_perf::recycler::Recycler;
@ -140,12 +139,8 @@ impl FetchStage {
thread_hdls.push(fwd_thread_hdl); thread_hdls.push(fwd_thread_hdl);
Self { thread_hdls } Self { thread_hdls }
} }
}
impl Service for FetchStage { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }

View File

@ -2,7 +2,6 @@
use crate::cluster_info::{ClusterInfo, VALIDATOR_PORT_RANGE}; use crate::cluster_info::{ClusterInfo, VALIDATOR_PORT_RANGE};
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::service::Service;
use crate::streamer; use crate::streamer;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use solana_client::thin_client::{create_client, ThinClient}; use solana_client::thin_client::{create_client, ThinClient};
@ -51,6 +50,13 @@ impl GossipService {
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Self { thread_hdls } Self { thread_hdls }
} }
pub fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
}
} }
/// Discover Nodes and Archivers in a cluster /// Discover Nodes and Archivers in a cluster
@ -264,17 +270,6 @@ fn make_gossip_node(
(gossip_service, ip_echo, cluster_info) (gossip_service, ip_echo, cluster_info)
} }
impl Service for GossipService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -1,7 +1,6 @@
//! The `ledger_cleanup_service` drops older ledger data to limit disk space usage //! The `ledger_cleanup_service` drops older ledger data to limit disk space usage
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service;
use solana_ledger::blocktree::Blocktree; use solana_ledger::blocktree::Blocktree;
use solana_sdk::clock::DEFAULT_SLOTS_PER_EPOCH; use solana_sdk::clock::DEFAULT_SLOTS_PER_EPOCH;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
@ -63,12 +62,8 @@ impl LedgerCleanupService {
} }
Ok(()) Ok(())
} }
}
impl Service for LedgerCleanupService { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_cleanup.join() self.t_cleanup.join()
} }
} }

View File

@ -65,7 +65,6 @@ pub mod rpc_pubsub_service;
pub mod rpc_service; pub mod rpc_service;
pub mod rpc_subscriptions; pub mod rpc_subscriptions;
pub mod sendmmsg; pub mod sendmmsg;
pub mod service;
pub mod sigverify; pub mod sigverify;
pub mod sigverify_shreds; pub mod sigverify_shreds;
pub mod sigverify_stage; pub mod sigverify_stage;

View File

@ -1,6 +1,5 @@
//! The `local_vote_signer_service` can be started locally to sign validator votes //! The `local_vote_signer_service` can be started locally to sign validator votes
use crate::service::Service;
use solana_net_utils::PortRange; use solana_net_utils::PortRange;
use solana_vote_signer::rpc::VoteSignerRpcService; use solana_vote_signer::rpc::VoteSignerRpcService;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@ -13,15 +12,6 @@ pub struct LocalVoteSignerService {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
} }
impl Service for LocalVoteSignerService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.exit.store(true, Ordering::Relaxed);
self.thread.join()
}
}
impl LocalVoteSignerService { impl LocalVoteSignerService {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new(port_range: PortRange) -> (Self, SocketAddr) { pub fn new(port_range: PortRange) -> (Self, SocketAddr) {
@ -41,4 +31,9 @@ impl LocalVoteSignerService {
(Self { thread, exit }, addr) (Self { thread, exit }, addr)
} }
pub fn join(self) -> thread::Result<()> {
self.exit.store(true, Ordering::Relaxed);
self.thread.join()
}
} }

View File

@ -1,7 +1,6 @@
//! The `poh_service` module implements a service that records the passing of //! The `poh_service` module implements a service that records the passing of
//! "ticks", a measure of time in the PoH stream //! "ticks", a measure of time in the PoH stream
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::service::Service;
use core_affinity; use core_affinity;
use solana_sdk::poh_config::PohConfig; use solana_sdk::poh_config::PohConfig;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -96,12 +95,8 @@ impl PohService {
} }
} }
} }
}
impl Service for PohService { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.tick_producer.join() self.tick_producer.join()
} }
} }

View File

@ -2,7 +2,7 @@
//! regularly finds missing blobs in the ledger and sends repair requests for those blobs //! regularly finds missing blobs in the ledger and sends repair requests for those blobs
use crate::{ use crate::{
cluster_info::ClusterInfo, cluster_info_repair_listener::ClusterInfoRepairListener, cluster_info::ClusterInfo, cluster_info_repair_listener::ClusterInfoRepairListener,
result::Result, service::Service, result::Result,
}; };
use solana_ledger::{ use solana_ledger::{
bank_forks::BankForks, bank_forks::BankForks,
@ -373,12 +373,8 @@ impl RepairService {
.cloned() .cloned()
.collect(); .collect();
} }
}
impl Service for RepairService { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
let mut results = vec![self.t_repair.join()]; let mut results = vec![self.t_repair.join()];
if let Some(cluster_info_repair_listener) = self.cluster_info_repair_listener { if let Some(cluster_info_repair_listener) = self.cluster_info_repair_listener {
results.push(cluster_info_repair_listener.join()); results.push(cluster_info_repair_listener.join());

View File

@ -8,7 +8,6 @@ use crate::consensus::{StakeLockout, Tower};
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::rpc_subscriptions::RpcSubscriptions; use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use solana_ledger::{ use solana_ledger::{
bank_forks::BankForks, bank_forks::BankForks,
block_error::BlockError, block_error::BlockError,
@ -925,12 +924,8 @@ impl ReplayStage {
} }
} }
} }
}
impl Service for ReplayStage { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.commitment_service.join()?; self.commitment_service.join()?;
self.t_replay.join().map(|_| ()) self.t_replay.join().map(|_| ())
} }

View File

@ -5,7 +5,6 @@ use crate::{
packet::Packets, packet::Packets,
repair_service::RepairStrategy, repair_service::RepairStrategy,
result::{Error, Result}, result::{Error, Result},
service::Service,
streamer::PacketReceiver, streamer::PacketReceiver,
window_service::{should_retransmit_and_persist, WindowService}, window_service::{should_retransmit_and_persist, WindowService},
}; };
@ -256,12 +255,8 @@ impl RetransmitStage {
window_service, window_service,
} }
} }
}
impl Service for RetransmitStage { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }

View File

@ -2,7 +2,6 @@
use crate::rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}; use crate::rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl};
use crate::rpc_subscriptions::RpcSubscriptions; use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use jsonrpc_pubsub::{PubSubHandler, Session}; use jsonrpc_pubsub::{PubSubHandler, Session};
use jsonrpc_ws_server::{RequestContext, ServerBuilder}; use jsonrpc_ws_server::{RequestContext, ServerBuilder};
use std::net::SocketAddr; use std::net::SocketAddr;
@ -15,14 +14,6 @@ pub struct PubSubService {
thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
} }
impl Service for PubSubService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
impl PubSubService { impl PubSubService {
pub fn new( pub fn new(
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
@ -64,6 +55,10 @@ impl PubSubService {
pub fn close(self) -> thread::Result<()> { pub fn close(self) -> thread::Result<()> {
self.join() self.join()
} }
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -1,7 +1,7 @@
//! The `rpc_service` module implements the Solana JSON RPC service. //! The `rpc_service` module implements the Solana JSON RPC service.
use crate::{ use crate::{
cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, service::Service, cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*,
storage_stage::StorageState, validator::ValidatorExit, storage_stage::StorageState, validator::ValidatorExit,
}; };
use jsonrpc_core::MetaIoHandler; use jsonrpc_core::MetaIoHandler;
@ -163,12 +163,8 @@ impl JsonRpcService {
c.close() c.close()
} }
} }
}
impl Service for JsonRpcService { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.thread_hdl.join() self.thread_hdl.join()
} }
} }

View File

@ -1,30 +0,0 @@
//! The `service` module implements a trait used by services and stages.
//!
//! A Service is any object that implements its functionality on a separate thread. It implements a
//! `join()` method, which can be used to wait for that thread to close.
//!
//! The Service trait may also be used to implement a pipeline stage. Like a service, its
//! functionality is also implemented by a thread, but unlike a service, a stage isn't a server
//! that replies to client requests. Instead, a stage acts more like a pure function. It's a oneway
//! street. It processes messages from its input channel and then sends the processed data to an
//! output channel. Stages can be composed to form a linear chain called a pipeline.
//!
//! The approach to creating a pipeline stage in Rust may be unique to Solana. We haven't seen the
//! same technique used in other Rust projects and there may be better ways to do it. The Solana
//! approach defines a stage as an object that communicates to its previous stage and the next
//! stage using channels. By convention, each stage accepts a *receiver* for input and creates a
//! second output channel. The second channel is used to pass data to the next stage, and so its
//! sender is moved into the stage's thread and the receiver is returned from its constructor.
//!
//! A well-written stage should create a thread and call a short `run()` method. The method should
//! read input from its input channel, call a function from another module that processes it, and
//! then send the output to the output channel. The functionality in the second module will likely
//! not use threads or channels.
use std::thread::Result;
pub trait Service {
type JoinReturnType;
fn join(self) -> Result<Self::JoinReturnType>;
}

View File

@ -1,7 +1,6 @@
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
use crate::packet::{Packet, PacketsRecycler}; use crate::packet::{Packet, PacketsRecycler};
use crate::service::Service;
use crate::streamer::{self, PacketReceiver, PacketSender}; use crate::streamer::{self, PacketReceiver, PacketSender};
use solana_perf::cuda_runtime::PinnedVec; use solana_perf::cuda_runtime::PinnedVec;
use solana_perf::recycler::Recycler; use solana_perf::recycler::Recycler;
@ -104,12 +103,8 @@ impl ShredFetchStage {
Self { thread_hdls } Self { thread_hdls }
} }
}
impl Service for ShredFetchStage { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }

View File

@ -7,7 +7,6 @@
use crate::packet::Packets; use crate::packet::Packets;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service;
use crate::sigverify; use crate::sigverify;
use crate::streamer::{self, PacketReceiver}; use crate::streamer::{self, PacketReceiver};
use crossbeam_channel::Sender as CrossbeamSender; use crossbeam_channel::Sender as CrossbeamSender;
@ -148,12 +147,8 @@ impl SigVerifyStage {
}) })
.collect() .collect()
} }
}
impl Service for SigVerifyStage { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }

View File

@ -1,5 +1,4 @@
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service;
use bincode::serialize_into; use bincode::serialize_into;
use solana_ledger::snapshot_package::{SnapshotPackage, SnapshotPackageReceiver}; use solana_ledger::snapshot_package::{SnapshotPackage, SnapshotPackageReceiver};
use solana_ledger::snapshot_utils::{self, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR}; use solana_ledger::snapshot_utils::{self, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
@ -173,12 +172,8 @@ impl SnapshotPackagerService {
); );
Ok(()) Ok(())
} }
}
impl Service for SnapshotPackagerService { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_snapshot_packager.join() self.t_snapshot_packager.join()
} }
} }

View File

@ -7,7 +7,6 @@ use crate::{
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
contact_info::ContactInfo, contact_info::ContactInfo,
result::{Error, Result}, result::{Error, Result},
service::Service,
}; };
use rand::{Rng, SeedableRng}; use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng; use rand_chacha::ChaChaRng;
@ -630,12 +629,8 @@ impl StorageStage {
res?; res?;
Ok(()) Ok(())
} }
}
impl Service for StorageStage { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_storage_create_accounts.join().unwrap(); self.t_storage_create_accounts.join().unwrap();
self.t_storage_mining_verifier.join() self.t_storage_mining_verifier.join()
} }
@ -651,7 +646,6 @@ pub fn test_cluster_info(id: &Pubkey) -> Arc<RwLock<ClusterInfo>> {
mod tests { mod tests {
use super::*; use super::*;
use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use crate::service::Service;
use rayon::prelude::*; use rayon::prelude::*;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::hash::Hasher; use solana_sdk::hash::Hasher;

View File

@ -7,7 +7,6 @@ use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::fetch_stage::FetchStage; use crate::fetch_stage::FetchStage;
use crate::poh_recorder::{PohRecorder, WorkingBankEntry}; use crate::poh_recorder::{PohRecorder, WorkingBankEntry};
use crate::service::Service;
use crate::sigverify::TransactionSigVerifier; use crate::sigverify::TransactionSigVerifier;
use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage}; use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage};
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
@ -90,12 +89,8 @@ impl Tpu {
broadcast_stage, broadcast_stage,
} }
} }
}
impl Service for Tpu { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
let mut results = vec![]; let mut results = vec![];
results.push(self.fetch_stage.join()); results.push(self.fetch_stage.join());
results.push(self.sigverify_stage.join()); results.push(self.sigverify_stage.join());

View File

@ -9,7 +9,6 @@ use crate::poh_recorder::PohRecorder;
use crate::replay_stage::ReplayStage; use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage; use crate::retransmit_stage::RetransmitStage;
use crate::rpc_subscriptions::RpcSubscriptions; use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::shred_fetch_stage::ShredFetchStage; use crate::shred_fetch_stage::ShredFetchStage;
use crate::sigverify_shreds::ShredSigVerifier; use crate::sigverify_shreds::ShredSigVerifier;
use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage}; use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage};
@ -206,12 +205,8 @@ impl Tvu {
snapshot_packager_service, snapshot_packager_service,
} }
} }
}
impl Service for Tvu { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.retransmit_stage.join()?; self.retransmit_stage.join()?;
self.fetch_stage.join()?; self.fetch_stage.join()?;
self.sigverify_stage.join()?; self.sigverify_stage.join()?;

View File

@ -12,7 +12,6 @@ use crate::{
rpc_pubsub_service::PubSubService, rpc_pubsub_service::PubSubService,
rpc_service::JsonRpcService, rpc_service::JsonRpcService,
rpc_subscriptions::RpcSubscriptions, rpc_subscriptions::RpcSubscriptions,
service::Service,
sigverify, sigverify,
storage_stage::StorageState, storage_stage::StorageState,
tpu::Tpu, tpu::Tpu,
@ -408,6 +407,24 @@ impl Validator {
node.sockets.retransmit_sockets[0].local_addr().unwrap() node.sockets.retransmit_sockets[0].local_addr().unwrap()
); );
} }
pub fn join(self) -> Result<()> {
self.poh_service.join()?;
drop(self.poh_recorder);
if let Some(rpc_service) = self.rpc_service {
rpc_service.join()?;
}
if let Some(rpc_pubsub_service) = self.rpc_pubsub_service {
rpc_pubsub_service.join()?;
}
self.gossip_service.join()?;
self.tpu.join()?;
self.tvu.join()?;
self.ip_echo_server.shutdown_now();
Ok(())
}
} }
pub fn new_banks_from_blocktree( pub fn new_banks_from_blocktree(
@ -480,28 +497,6 @@ pub fn new_banks_from_blocktree(
) )
} }
impl Service for Validator {
type JoinReturnType = ();
fn join(self) -> Result<()> {
self.poh_service.join()?;
drop(self.poh_recorder);
if let Some(rpc_service) = self.rpc_service {
rpc_service.join()?;
}
if let Some(rpc_pubsub_service) = self.rpc_pubsub_service {
rpc_pubsub_service.join()?;
}
self.gossip_service.join()?;
self.tpu.join()?;
self.tvu.join()?;
self.ip_echo_server.shutdown_now();
Ok(())
}
}
pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) { pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) {
use crate::genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}; use crate::genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo};

View File

@ -5,7 +5,6 @@ use crate::cluster_info::ClusterInfo;
use crate::packet::Packets; use crate::packet::Packets;
use crate::repair_service::{RepairService, RepairStrategy}; use crate::repair_service::{RepairService, RepairStrategy};
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service;
use crate::streamer::PacketSender; use crate::streamer::PacketSender;
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use rayon::iter::IntoParallelRefMutIterator; use rayon::iter::IntoParallelRefMutIterator;
@ -262,12 +261,8 @@ impl WindowService {
repair_service, repair_service,
} }
} }
}
impl Service for WindowService { pub fn join(self) -> thread::Result<()> {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_window.join()?; self.t_window.join()?;
self.repair_service.join() self.repair_service.join()
} }
@ -282,7 +277,6 @@ mod test {
genesis_utils::create_genesis_config_with_leader, genesis_utils::create_genesis_config_with_leader,
packet::{Packet, Packets}, packet::{Packet, Packets},
repair_service::RepairSlotRange, repair_service::RepairSlotRange,
service::Service,
}; };
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
use rand::thread_rng; use rand::thread_rng;

View File

@ -7,7 +7,6 @@ mod tests {
use itertools::Itertools; use itertools::Itertools;
use solana_core::{ use solana_core::{
genesis_utils::{create_genesis_config, GenesisConfigInfo}, genesis_utils::{create_genesis_config, GenesisConfigInfo},
service::Service,
snapshot_packager_service::SnapshotPackagerService, snapshot_packager_service::SnapshotPackagerService,
}; };
use solana_ledger::{ use solana_ledger::{

View File

@ -7,7 +7,6 @@ use solana_core::gossip_service::GossipService;
use solana_core::packet::Packet; use solana_core::packet::Packet;
use solana_core::result; use solana_core::result;
use solana_core::service::Service;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::timestamp; use solana_sdk::timing::timestamp;
use std::net::UdpSocket; use std::net::UdpSocket;

View File

@ -4,7 +4,6 @@
mod tests { mod tests {
use log::*; use log::*;
use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_core::service::Service;
use solana_core::storage_stage::{test_cluster_info, SLOTS_PER_TURN_TEST}; use solana_core::storage_stage::{test_cluster_info, SLOTS_PER_TURN_TEST};
use solana_core::storage_stage::{StorageStage, StorageState}; use solana_core::storage_stage::{StorageStage, StorageState};
use solana_ledger::bank_forks::BankForks; use solana_ledger::bank_forks::BankForks;

View File

@ -7,7 +7,6 @@ use solana_core::{
contact_info::ContactInfo, contact_info::ContactInfo,
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
gossip_service::discover_cluster, gossip_service::discover_cluster,
service::Service,
validator::{Validator, ValidatorConfig}, validator::{Validator, ValidatorConfig},
}; };
use solana_ledger::create_new_tmp_ledger; use solana_ledger::create_new_tmp_ledger;

View File

@ -12,7 +12,6 @@ use solana_core::cluster_info::{Node, VALIDATOR_PORT_RANGE};
use solana_core::contact_info::ContactInfo; use solana_core::contact_info::ContactInfo;
use solana_core::gossip_service::discover; use solana_core::gossip_service::discover;
use solana_core::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS; use solana_core::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS;
use solana_core::service::Service;
use solana_core::socketaddr; use solana_core::socketaddr;
use solana_core::validator::{Validator, ValidatorConfig}; use solana_core::validator::{Validator, ValidatorConfig};
use solana_ledger::bank_forks::SnapshotConfig; use solana_ledger::bank_forks::SnapshotConfig;