diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 6e0c722e7..7c7676e15 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -9,7 +9,6 @@ use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_core::packet::to_packets_chunked; use solana_core::poh_recorder::PohRecorder; use solana_core::poh_recorder::WorkingBankEntry; -use solana_core::service::Service; use solana_ledger::bank_forks::BankForks; use solana_ledger::{blocktree::Blocktree, get_tmp_ledger_path}; use solana_measure::measure::Measure; diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 36f8e6a0a..ee8954aef 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -1,8 +1,6 @@ #![feature(test)] extern crate test; -#[macro_use] -extern crate solana_ledger; use crossbeam_channel::unbounded; use log::*; @@ -14,7 +12,6 @@ use solana_core::cluster_info::Node; use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_core::packet::to_packets_chunked; use solana_core::poh_recorder::WorkingBankEntry; -use solana_core::service::Service; use solana_ledger::blocktree_processor::process_entries; use solana_ledger::entry::{next_hash, Entry}; use solana_ledger::{blocktree::Blocktree, get_tmp_ledger_path}; diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index db1c1fe8d..fe4392a38 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -7,7 +7,6 @@ use crossbeam_channel::unbounded; use log::*; use rand::{thread_rng, Rng}; use solana_core::packet::to_packets_chunked; -use solana_core::service::Service; use solana_core::sigverify::TransactionSigVerifier; use solana_core::sigverify_stage::SigVerifyStage; use solana_perf::test_tx::test_tx; diff --git a/core/src/archiver.rs b/core/src/archiver.rs index 8641b66ad..fe2028cdd 100644 --- a/core/src/archiver.rs +++ b/core/src/archiver.rs @@ -8,7 +8,6 @@ use crate::{ repair_service, repair_service::{RepairService, RepairSlotRange, RepairStrategy}, result::{Error, Result}, - service::Service, shred_fetch_stage::ShredFetchStage, sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, storage_stage::NUM_STORAGE_SAMPLES, diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index d99e85053..7af7887be 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -7,7 +7,6 @@ use crate::{ poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}, poh_service::PohService, result::{Error, Result}, - service::Service, }; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; @@ -925,12 +924,8 @@ impl BankingStage { unprocessed_packets.push((packets, packet_indexes)); } } -} -impl Service for BankingStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { for bank_thread_hdl in self.bank_thread_hdls { bank_thread_hdl.join()?; } diff --git a/core/src/blockstream_service.rs b/core/src/blockstream_service.rs index 3e7603ab3..0e8b3bece 100644 --- a/core/src/blockstream_service.rs +++ b/core/src/blockstream_service.rs @@ -8,7 +8,6 @@ use crate::blockstream::MockBlockstream as Blockstream; #[cfg(not(test))] use crate::blockstream::SocketBlockstream as Blockstream; use crate::result::{Error, Result}; -use crate::service::Service; use solana_ledger::blocktree::Blocktree; use solana_sdk::pubkey::Pubkey; use std::path::Path; @@ -88,12 +87,8 @@ impl BlockstreamService { } Ok(()) } -} -impl Service for BlockstreamService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { self.t_blockstream.join() } } diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index cef2c560e..edb3688b4 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -5,7 +5,6 @@ use self::standard_broadcast_run::StandardBroadcastRun; use crate::cluster_info::{ClusterInfo, ClusterInfoError}; use crate::poh_recorder::WorkingBankEntry; use crate::result::{Error, Result}; -use crate::service::Service; use solana_ledger::blocktree::Blocktree; use solana_ledger::staking_utils; use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; @@ -178,12 +177,8 @@ impl BroadcastStage { Self { thread_hdl } } -} -impl Service for BroadcastStage { - type JoinReturnType = BroadcastStageReturnType; - - fn join(self) -> thread::Result { + pub fn join(self) -> thread::Result { self.thread_hdl.join() } } @@ -193,7 +188,6 @@ mod test { use super::*; use crate::cluster_info::{ClusterInfo, Node}; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use crate::service::Service; use solana_ledger::entry::create_ticks; use solana_ledger::{blocktree::Blocktree, get_tmp_ledger_path}; use solana_runtime::bank::Bank; diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index 2587aa8cc..e34431bf6 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -1,7 +1,6 @@ use crate::cluster_info::ClusterInfo; use crate::crds_value::EpochSlots; use crate::result::Result; -use crate::service::Service; use byteorder::{ByteOrder, LittleEndian}; use rand::seq::SliceRandom; use rand::SeedableRng; @@ -515,12 +514,8 @@ impl ClusterInfoRepairListener { fn get_last_ts(pubkey: &Pubkey, peer_infos: &mut HashMap) -> Option { peer_infos.get(pubkey).map(|p| p.last_ts) } -} -impl Service for ClusterInfoRepairListener { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index ebbee4d24..1e313a4dd 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -2,7 +2,6 @@ use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}; use crate::packet::Packets; use crate::poh_recorder::PohRecorder; use crate::result::Result; -use crate::service::Service; use crate::{packet, sigverify}; use crossbeam_channel::Sender as CrossbeamSender; use solana_metrics::inc_new_counter_debug; @@ -72,12 +71,8 @@ impl ClusterInfoVoteListener { sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); } } -} -impl Service for ClusterInfoVoteListener { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } diff --git a/core/src/commitment.rs b/core/src/commitment.rs index e7be84c0a..e3e0b4568 100644 --- a/core/src/commitment.rs +++ b/core/src/commitment.rs @@ -1,7 +1,4 @@ -use crate::{ - result::{Error, Result}, - service::Service, -}; +use crate::result::{Error, Result}; use solana_runtime::bank::Bank; use solana_sdk::clock::Slot; use solana_vote_api::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY}; @@ -230,12 +227,8 @@ impl AggregateCommitmentService { } } } -} -impl Service for AggregateCommitmentService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { self.t_commitment.join() } } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 45a7dfabd..e1e88abcd 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -4,7 +4,6 @@ use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET; use crate::packet::PacketsRecycler; use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; -use crate::service::Service; use crate::streamer::{self, PacketReceiver, PacketSender}; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info}; use solana_perf::recycler::Recycler; @@ -140,12 +139,8 @@ impl FetchStage { thread_hdls.push(fwd_thread_hdl); Self { thread_hdls } } -} -impl Service for FetchStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index b8db306f2..a87a71404 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -2,7 +2,6 @@ use crate::cluster_info::{ClusterInfo, VALIDATOR_PORT_RANGE}; use crate::contact_info::ContactInfo; -use crate::service::Service; use crate::streamer; use rand::{thread_rng, Rng}; use solana_client::thin_client::{create_client, ThinClient}; @@ -51,6 +50,13 @@ impl GossipService { let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; Self { thread_hdls } } + + pub fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } + Ok(()) + } } /// Discover Nodes and Archivers in a cluster @@ -264,17 +270,6 @@ fn make_gossip_node( (gossip_service, ip_echo, cluster_info) } -impl Service for GossipService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - Ok(()) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index efb885e9a..8b88ebde5 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -1,7 +1,6 @@ //! The `ledger_cleanup_service` drops older ledger data to limit disk space usage use crate::result::{Error, Result}; -use crate::service::Service; use solana_ledger::blocktree::Blocktree; use solana_sdk::clock::DEFAULT_SLOTS_PER_EPOCH; use solana_sdk::pubkey::Pubkey; @@ -63,12 +62,8 @@ impl LedgerCleanupService { } Ok(()) } -} -impl Service for LedgerCleanupService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { self.t_cleanup.join() } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 31d7d5141..84290e731 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -65,7 +65,6 @@ pub mod rpc_pubsub_service; pub mod rpc_service; pub mod rpc_subscriptions; pub mod sendmmsg; -pub mod service; pub mod sigverify; pub mod sigverify_shreds; pub mod sigverify_stage; diff --git a/core/src/local_vote_signer_service.rs b/core/src/local_vote_signer_service.rs index 1277bd7c3..e9d9f3343 100644 --- a/core/src/local_vote_signer_service.rs +++ b/core/src/local_vote_signer_service.rs @@ -1,6 +1,5 @@ //! The `local_vote_signer_service` can be started locally to sign validator votes -use crate::service::Service; use solana_net_utils::PortRange; use solana_vote_signer::rpc::VoteSignerRpcService; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -13,15 +12,6 @@ pub struct LocalVoteSignerService { exit: Arc, } -impl Service for LocalVoteSignerService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - self.exit.store(true, Ordering::Relaxed); - self.thread.join() - } -} - impl LocalVoteSignerService { #[allow(clippy::new_ret_no_self)] pub fn new(port_range: PortRange) -> (Self, SocketAddr) { @@ -41,4 +31,9 @@ impl LocalVoteSignerService { (Self { thread, exit }, addr) } + + pub fn join(self) -> thread::Result<()> { + self.exit.store(true, Ordering::Relaxed); + self.thread.join() + } } diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index 815551157..50292f639 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -1,7 +1,6 @@ //! The `poh_service` module implements a service that records the passing of //! "ticks", a measure of time in the PoH stream use crate::poh_recorder::PohRecorder; -use crate::service::Service; use core_affinity; use solana_sdk::poh_config::PohConfig; use std::sync::atomic::{AtomicBool, Ordering}; @@ -96,12 +95,8 @@ impl PohService { } } } -} -impl Service for PohService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { self.tick_producer.join() } } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index ae0051c85..cc7f34784 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -2,7 +2,7 @@ //! regularly finds missing blobs in the ledger and sends repair requests for those blobs use crate::{ cluster_info::ClusterInfo, cluster_info_repair_listener::ClusterInfoRepairListener, - result::Result, service::Service, + result::Result, }; use solana_ledger::{ bank_forks::BankForks, @@ -373,12 +373,8 @@ impl RepairService { .cloned() .collect(); } -} -impl Service for RepairService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { let mut results = vec![self.t_repair.join()]; if let Some(cluster_info_repair_listener) = self.cluster_info_repair_listener { results.push(cluster_info_repair_listener.join()); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index fee4dbb48..06b325d8a 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -8,7 +8,6 @@ use crate::consensus::{StakeLockout, Tower}; use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; use crate::rpc_subscriptions::RpcSubscriptions; -use crate::service::Service; use solana_ledger::{ bank_forks::BankForks, block_error::BlockError, @@ -925,12 +924,8 @@ impl ReplayStage { } } } -} -impl Service for ReplayStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { self.commitment_service.join()?; self.t_replay.join().map(|_| ()) } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 4134528ee..b65fd5663 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -5,7 +5,6 @@ use crate::{ packet::Packets, repair_service::RepairStrategy, result::{Error, Result}, - service::Service, streamer::PacketReceiver, window_service::{should_retransmit_and_persist, WindowService}, }; @@ -256,12 +255,8 @@ impl RetransmitStage { window_service, } } -} -impl Service for RetransmitStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } diff --git a/core/src/rpc_pubsub_service.rs b/core/src/rpc_pubsub_service.rs index 25481dfa4..6ec5c7ae3 100644 --- a/core/src/rpc_pubsub_service.rs +++ b/core/src/rpc_pubsub_service.rs @@ -2,7 +2,6 @@ use crate::rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}; use crate::rpc_subscriptions::RpcSubscriptions; -use crate::service::Service; use jsonrpc_pubsub::{PubSubHandler, Session}; use jsonrpc_ws_server::{RequestContext, ServerBuilder}; use std::net::SocketAddr; @@ -15,14 +14,6 @@ pub struct PubSubService { thread_hdl: JoinHandle<()>, } -impl Service for PubSubService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - self.thread_hdl.join() - } -} - impl PubSubService { pub fn new( subscriptions: &Arc, @@ -64,6 +55,10 @@ impl PubSubService { pub fn close(self) -> thread::Result<()> { self.join() } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } } #[cfg(test)] diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index c236ef430..5d0a44566 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -1,7 +1,7 @@ //! The `rpc_service` module implements the Solana JSON RPC service. use crate::{ - cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, service::Service, + cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, storage_stage::StorageState, validator::ValidatorExit, }; use jsonrpc_core::MetaIoHandler; @@ -163,12 +163,8 @@ impl JsonRpcService { c.close() } } -} -impl Service for JsonRpcService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { self.thread_hdl.join() } } diff --git a/core/src/service.rs b/core/src/service.rs deleted file mode 100644 index a882181bc..000000000 --- a/core/src/service.rs +++ /dev/null @@ -1,30 +0,0 @@ -//! The `service` module implements a trait used by services and stages. -//! -//! A Service is any object that implements its functionality on a separate thread. It implements a -//! `join()` method, which can be used to wait for that thread to close. -//! -//! The Service trait may also be used to implement a pipeline stage. Like a service, its -//! functionality is also implemented by a thread, but unlike a service, a stage isn't a server -//! that replies to client requests. Instead, a stage acts more like a pure function. It's a oneway -//! street. It processes messages from its input channel and then sends the processed data to an -//! output channel. Stages can be composed to form a linear chain called a pipeline. -//! -//! The approach to creating a pipeline stage in Rust may be unique to Solana. We haven't seen the -//! same technique used in other Rust projects and there may be better ways to do it. The Solana -//! approach defines a stage as an object that communicates to its previous stage and the next -//! stage using channels. By convention, each stage accepts a *receiver* for input and creates a -//! second output channel. The second channel is used to pass data to the next stage, and so its -//! sender is moved into the stage's thread and the receiver is returned from its constructor. -//! -//! A well-written stage should create a thread and call a short `run()` method. The method should -//! read input from its input channel, call a function from another module that processes it, and -//! then send the output to the output channel. The functionality in the second module will likely -//! not use threads or channels. - -use std::thread::Result; - -pub trait Service { - type JoinReturnType; - - fn join(self) -> Result; -} diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 9e79e2bb7..36603bcaa 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,7 +1,6 @@ //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. use crate::packet::{Packet, PacketsRecycler}; -use crate::service::Service; use crate::streamer::{self, PacketReceiver, PacketSender}; use solana_perf::cuda_runtime::PinnedVec; use solana_perf::recycler::Recycler; @@ -104,12 +103,8 @@ impl ShredFetchStage { Self { thread_hdls } } -} -impl Service for ShredFetchStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index aff8049ca..b9076907c 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -7,7 +7,6 @@ use crate::packet::Packets; use crate::result::{Error, Result}; -use crate::service::Service; use crate::sigverify; use crate::streamer::{self, PacketReceiver}; use crossbeam_channel::Sender as CrossbeamSender; @@ -148,12 +147,8 @@ impl SigVerifyStage { }) .collect() } -} -impl Service for SigVerifyStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index d52eba187..6518c3352 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,5 +1,4 @@ use crate::result::{Error, Result}; -use crate::service::Service; use bincode::serialize_into; use solana_ledger::snapshot_package::{SnapshotPackage, SnapshotPackageReceiver}; use solana_ledger::snapshot_utils::{self, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR}; @@ -173,12 +172,8 @@ impl SnapshotPackagerService { ); Ok(()) } -} -impl Service for SnapshotPackagerService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { self.t_snapshot_packager.join() } } diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index db67b643b..90def769b 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -7,7 +7,6 @@ use crate::{ cluster_info::ClusterInfo, contact_info::ContactInfo, result::{Error, Result}, - service::Service, }; use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; @@ -630,12 +629,8 @@ impl StorageStage { res?; Ok(()) } -} -impl Service for StorageStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { self.t_storage_create_accounts.join().unwrap(); self.t_storage_mining_verifier.join() } @@ -651,7 +646,6 @@ pub fn test_cluster_info(id: &Pubkey) -> Arc> { mod tests { use super::*; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use crate::service::Service; use rayon::prelude::*; use solana_runtime::bank::Bank; use solana_sdk::hash::Hasher; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index bd05fdabf..88af96f2c 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -7,7 +7,6 @@ use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::fetch_stage::FetchStage; use crate::poh_recorder::{PohRecorder, WorkingBankEntry}; -use crate::service::Service; use crate::sigverify::TransactionSigVerifier; use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage}; use crossbeam_channel::unbounded; @@ -90,12 +89,8 @@ impl Tpu { broadcast_stage, } } -} -impl Service for Tpu { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { let mut results = vec![]; results.push(self.fetch_stage.join()); results.push(self.sigverify_stage.join()); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 0306588c1..d8e2c0ce1 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -9,7 +9,6 @@ use crate::poh_recorder::PohRecorder; use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; use crate::rpc_subscriptions::RpcSubscriptions; -use crate::service::Service; use crate::shred_fetch_stage::ShredFetchStage; use crate::sigverify_shreds::ShredSigVerifier; use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage}; @@ -206,12 +205,8 @@ impl Tvu { snapshot_packager_service, } } -} -impl Service for Tvu { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { self.retransmit_stage.join()?; self.fetch_stage.join()?; self.sigverify_stage.join()?; diff --git a/core/src/validator.rs b/core/src/validator.rs index 32a2f8ab0..d2dacada2 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -12,7 +12,6 @@ use crate::{ rpc_pubsub_service::PubSubService, rpc_service::JsonRpcService, rpc_subscriptions::RpcSubscriptions, - service::Service, sigverify, storage_stage::StorageState, tpu::Tpu, @@ -408,6 +407,24 @@ impl Validator { node.sockets.retransmit_sockets[0].local_addr().unwrap() ); } + + pub fn join(self) -> Result<()> { + self.poh_service.join()?; + drop(self.poh_recorder); + if let Some(rpc_service) = self.rpc_service { + rpc_service.join()?; + } + if let Some(rpc_pubsub_service) = self.rpc_pubsub_service { + rpc_pubsub_service.join()?; + } + + self.gossip_service.join()?; + self.tpu.join()?; + self.tvu.join()?; + self.ip_echo_server.shutdown_now(); + + Ok(()) + } } pub fn new_banks_from_blocktree( @@ -480,28 +497,6 @@ pub fn new_banks_from_blocktree( ) } -impl Service for Validator { - type JoinReturnType = (); - - fn join(self) -> Result<()> { - self.poh_service.join()?; - drop(self.poh_recorder); - if let Some(rpc_service) = self.rpc_service { - rpc_service.join()?; - } - if let Some(rpc_pubsub_service) = self.rpc_pubsub_service { - rpc_pubsub_service.join()?; - } - - self.gossip_service.join()?; - self.tpu.join()?; - self.tvu.join()?; - self.ip_echo_server.shutdown_now(); - - Ok(()) - } -} - pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) { use crate::genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}; diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 5b5d03d39..2a85968fb 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -5,7 +5,6 @@ use crate::cluster_info::ClusterInfo; use crate::packet::Packets; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; -use crate::service::Service; use crate::streamer::PacketSender; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use rayon::iter::IntoParallelRefMutIterator; @@ -262,12 +261,8 @@ impl WindowService { repair_service, } } -} -impl Service for WindowService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { + pub fn join(self) -> thread::Result<()> { self.t_window.join()?; self.repair_service.join() } @@ -282,7 +277,6 @@ mod test { genesis_utils::create_genesis_config_with_leader, packet::{Packet, Packets}, repair_service::RepairSlotRange, - service::Service, }; use crossbeam_channel::unbounded; use rand::thread_rng; diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index 81b5dd750..634517ff9 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -7,7 +7,6 @@ mod tests { use itertools::Itertools; use solana_core::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, - service::Service, snapshot_packager_service::SnapshotPackagerService, }; use solana_ledger::{ diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index c4a53b577..e2193f939 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -7,7 +7,6 @@ use solana_core::gossip_service::GossipService; use solana_core::packet::Packet; use solana_core::result; -use solana_core::service::Service; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::timestamp; use std::net::UdpSocket; diff --git a/core/tests/storage_stage.rs b/core/tests/storage_stage.rs index dac7708b4..1d31a3519 100644 --- a/core/tests/storage_stage.rs +++ b/core/tests/storage_stage.rs @@ -4,7 +4,6 @@ mod tests { use log::*; use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use solana_core::service::Service; use solana_core::storage_stage::{test_cluster_info, SLOTS_PER_TURN_TEST}; use solana_core::storage_stage::{StorageStage, StorageState}; use solana_ledger::bank_forks::BankForks; diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 83876a6a7..15a8f26b6 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -7,7 +7,6 @@ use solana_core::{ contact_info::ContactInfo, genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, gossip_service::discover_cluster, - service::Service, validator::{Validator, ValidatorConfig}, }; use solana_ledger::create_new_tmp_ledger; diff --git a/validator/src/main.rs b/validator/src/main.rs index 20f6b75fb..053f9ca6f 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -12,7 +12,6 @@ use solana_core::cluster_info::{Node, VALIDATOR_PORT_RANGE}; use solana_core::contact_info::ContactInfo; use solana_core::gossip_service::discover; use solana_core::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS; -use solana_core::service::Service; use solana_core::socketaddr; use solana_core::validator::{Validator, ValidatorConfig}; use solana_ledger::bank_forks::SnapshotConfig;