From fb67ff14debe053a97671468e1070f39555156d4 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Wed, 6 Apr 2022 18:52:19 -0400 Subject: [PATCH] Remove replica-node crates (#24152) --- Cargo.lock | 48 --- Cargo.toml | 2 - core/Cargo.toml | 1 - core/src/validator.rs | 25 +- local-cluster/src/validator_configs.rs | 1 - replica-lib/Cargo.toml | 27 -- replica-lib/build.rs | 5 - replica-lib/proto/accountsdb_repl.proto | 44 -- replica-lib/src/accountsdb_repl_client.rs | 119 ----- replica-lib/src/accountsdb_repl_server.rs | 176 -------- .../src/accountsdb_repl_server_factory.rs | 29 -- replica-lib/src/lib.rs | 7 - replica-lib/src/replica_accounts_server.rs | 95 ---- .../src/replica_confirmed_slots_server.rs | 122 ------ replica-node/.gitignore | 2 - replica-node/Cargo.toml | 43 -- replica-node/src/accountsdb_repl_service.rs | 81 ---- replica-node/src/lib.rs | 5 - replica-node/src/main.rs | 408 ------------------ replica-node/src/replica_node.rs | 346 --------------- replica-node/src/replica_util.rs | 276 ------------ replica-node/tests/local_replica.rs | 299 ------------- rpc/src/rpc.rs | 2 +- validator/Cargo.toml | 1 - validator/src/main.rs | 22 - 25 files changed, 2 insertions(+), 2184 deletions(-) delete mode 100644 replica-lib/Cargo.toml delete mode 100644 replica-lib/build.rs delete mode 100644 replica-lib/proto/accountsdb_repl.proto delete mode 100644 replica-lib/src/accountsdb_repl_client.rs delete mode 100644 replica-lib/src/accountsdb_repl_server.rs delete mode 100644 replica-lib/src/accountsdb_repl_server_factory.rs delete mode 100644 replica-lib/src/lib.rs delete mode 100644 replica-lib/src/replica_accounts_server.rs delete mode 100644 replica-lib/src/replica_confirmed_slots_server.rs delete mode 100644 replica-node/.gitignore delete mode 100644 replica-node/Cargo.toml delete mode 100644 replica-node/src/accountsdb_repl_service.rs delete mode 100644 replica-node/src/lib.rs delete mode 100644 replica-node/src/main.rs delete mode 100644 replica-node/src/replica_node.rs delete mode 100644 replica-node/src/replica_util.rs delete mode 100644 replica-node/tests/local_replica.rs diff --git a/Cargo.lock b/Cargo.lock index 84aeeb24c..7c5989abb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4745,7 +4745,6 @@ dependencies = [ "solana-poh", "solana-program-runtime", "solana-rayon-threadlimit", - "solana-replica-lib", "solana-rpc", "solana-runtime", "solana-sdk", @@ -5530,52 +5529,6 @@ dependencies = [ "uriparse", ] -[[package]] -name = "solana-replica-lib" -version = "1.11.0" -dependencies = [ - "crossbeam-channel", - "futures-util", - "log", - "prost", - "solana-rpc", - "solana-runtime", - "solana-sdk", - "tokio", - "tonic", - "tonic-build", -] - -[[package]] -name = "solana-replica-node" -version = "1.11.0" -dependencies = [ - "clap 2.33.3", - "crossbeam-channel", - "log", - "rand 0.7.3", - "serial_test", - "solana-clap-utils", - "solana-core", - "solana-download-utils", - "solana-genesis-utils", - "solana-gossip", - "solana-ledger", - "solana-local-cluster", - "solana-logger 1.11.0", - "solana-net-utils", - "solana-replica-lib", - "solana-rpc", - "solana-runtime", - "solana-sdk", - "solana-send-transaction-service", - "solana-streamer", - "solana-validator", - "solana-version", - "tempfile", - "tonic-build", -] - [[package]] name = "solana-rpc" version = "1.11.0" @@ -6082,7 +6035,6 @@ dependencies = [ "solana-net-utils", "solana-perf", "solana-poh", - "solana-replica-lib", "solana-rpc", "solana-runtime", "solana-sdk", diff --git a/Cargo.toml b/Cargo.toml index 3a7db846e..fc237f7da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,8 +59,6 @@ members = [ "rayon-threadlimit", "rbpf-cli", "remote-wallet", - "replica-lib", - "replica-node", "rpc", "rpc-test", "runtime", diff --git a/core/Cargo.toml b/core/Cargo.toml index 4e2f0ef85..70f919e73 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -49,7 +49,6 @@ solana-perf = { path = "../perf", version = "=1.11.0" } solana-poh = { path = "../poh", version = "=1.11.0" } solana-program-runtime = { path = "../program-runtime", version = "=1.11.0" } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.11.0" } -solana-replica-lib = { path = "../replica-lib", version = "=1.11.0" } solana-rpc = { path = "../rpc", version = "=1.11.0" } solana-runtime = { path = "../runtime", version = "=1.11.0" } solana-sdk = { path = "../sdk", version = "=1.11.0" } diff --git a/core/src/validator.rs b/core/src/validator.rs index e5c101e21..eacb54f6f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -50,10 +50,6 @@ use { poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_service::{self, PohService}, }, - solana_replica_lib::{ - accountsdb_repl_server::{AccountsDbReplService, AccountsDbReplServiceConfig}, - accountsdb_repl_server_factory, - }, solana_rpc::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ @@ -122,7 +118,6 @@ pub struct ValidatorConfig { pub account_paths: Vec, pub account_shrink_paths: Option>, pub rpc_config: JsonRpcConfig, - pub accountsdb_repl_service_config: Option, pub geyser_plugin_config_files: Option>, pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub pubsub_config: PubSubConfig, @@ -184,7 +179,6 @@ impl Default for ValidatorConfig { account_paths: Vec::new(), account_shrink_paths: None, rpc_config: JsonRpcConfig::default(), - accountsdb_repl_service_config: None, geyser_plugin_config_files: None, rpc_addrs: None, pubsub_config: PubSubConfig::default(), @@ -339,7 +333,6 @@ pub struct Validator { pub cluster_info: Arc, pub bank_forks: Arc>, pub blockstore: Arc, - accountsdb_repl_service: Option, geyser_plugin_service: Option, } @@ -673,7 +666,6 @@ impl Validator { pubsub_service, optimistically_confirmed_bank_tracker, bank_notification_sender, - accountsdb_repl_service, ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs { if ContactInfo::is_valid_address(&node.info.rpc, &socket_addr_space) { assert!(ContactInfo::is_valid_address( @@ -687,13 +679,6 @@ impl Validator { )); } - let accountsdb_repl_service = config.accountsdb_repl_service_config.as_ref().map(|accountsdb_repl_service_config| { - let (bank_notification_sender, bank_notification_receiver) = unbounded(); - bank_notification_senders.push(bank_notification_sender); - accountsdb_repl_server_factory::AccountsDbReplServerFactory::build_accountsdb_repl_server( - accountsdb_repl_service_config.clone(), bank_notification_receiver, bank_forks.clone()) - }); - let (bank_notification_sender, bank_notification_receiver) = unbounded(); let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() { Some(Arc::new(RwLock::new(bank_notification_senders))) @@ -746,10 +731,9 @@ impl Validator { confirmed_bank_subscribers, )), Some(bank_notification_sender), - accountsdb_repl_service, ) } else { - (None, None, None, None, None) + (None, None, None, None) }; if config.dev_halt_at_slot.is_some() { @@ -999,7 +983,6 @@ impl Validator { cluster_info, bank_forks, blockstore: blockstore.clone(), - accountsdb_repl_service, geyser_plugin_service, } } @@ -1119,12 +1102,6 @@ impl Validator { ip_echo_server.shutdown_background(); } - if let Some(accountsdb_repl_service) = self.accountsdb_repl_service { - accountsdb_repl_service - .join() - .expect("accountsdb_repl_service"); - } - if let Some(geyser_plugin_service) = self.geyser_plugin_service { geyser_plugin_service.join().expect("geyser_plugin_service"); } diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 6ae588f02..d249cd5f2 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -14,7 +14,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { account_paths: config.account_paths.clone(), account_shrink_paths: config.account_shrink_paths.clone(), rpc_config: config.rpc_config.clone(), - accountsdb_repl_service_config: config.accountsdb_repl_service_config.clone(), geyser_plugin_config_files: config.geyser_plugin_config_files.clone(), rpc_addrs: config.rpc_addrs, pubsub_config: config.pubsub_config.clone(), diff --git a/replica-lib/Cargo.toml b/replica-lib/Cargo.toml deleted file mode 100644 index c288bd7ad..000000000 --- a/replica-lib/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -authors = ["Solana Maintainers "] -edition = "2021" -name = "solana-replica-lib" -description = "The library used for replication by both the client and server" -version = "1.11.0" -repository = "https://github.com/solana-labs/solana" -license = "Apache-2.0" -homepage = "https://solana.com/" -documentation = "https://docs.rs/solana-validator" - -[dependencies] -crossbeam-channel = "0.5" -futures-util = "0.3" -log = "0.4.11" -prost = "0.9.0" -solana-rpc = { path = "../rpc", version = "=1.11.0" } -solana-runtime = { path = "../runtime", version = "=1.11.0" } -solana-sdk = { path = "../sdk", version = "=1.11.0" } -tokio = { version = "1", features = ["full"] } -tonic = { version = "0.6.2", features = ["tls", "transport"] } - -[package.metadata.docs.rs] -targets = ["x86_64-unknown-linux-gnu"] - -[build-dependencies] -tonic-build = "0.6.2" diff --git a/replica-lib/build.rs b/replica-lib/build.rs deleted file mode 100644 index a90b01298..000000000 --- a/replica-lib/build.rs +++ /dev/null @@ -1,5 +0,0 @@ -fn main() -> Result<(), Box> { - // compiling protos using path on build time - tonic_build::configure().compile(&["proto/accountsdb_repl.proto"], &["proto"])?; - Ok(()) -} diff --git a/replica-lib/proto/accountsdb_repl.proto b/replica-lib/proto/accountsdb_repl.proto deleted file mode 100644 index 0b8b679ab..000000000 --- a/replica-lib/proto/accountsdb_repl.proto +++ /dev/null @@ -1,44 +0,0 @@ - -// version of prorocol buffer used -syntax = "proto3"; - -package accountsdb_repl; - -message ReplicaSlotConfirmationRequest { - uint64 last_replicated_slot = 1; -} - -message ReplicaSlotConfirmationResponse { - repeated uint64 updated_slots = 1; -} - -message ReplicaAccountsRequest { - uint64 slot = 1; -} - -message ReplicaAccountMeta { - bytes Pubkey = 1; - uint64 lamports = 2; - bytes owner = 3; - bool executable = 4; - uint64 rent_epoch = 5; -} - -message ReplicaAccountData { - bytes data = 1; -} - -message ReplicaAccountInfo { - ReplicaAccountMeta account_meta = 1; - bytes hash = 2; - ReplicaAccountData data = 3; -} - -message ReplicaAccountsResponse { - repeated ReplicaAccountInfo accounts = 1; -} - -service AccountsDbRepl { - rpc get_confirmed_slots(ReplicaSlotConfirmationRequest) returns (ReplicaSlotConfirmationResponse); - rpc get_slot_accounts(ReplicaAccountsRequest) returns (ReplicaAccountsResponse); -} diff --git a/replica-lib/src/accountsdb_repl_client.rs b/replica-lib/src/accountsdb_repl_client.rs deleted file mode 100644 index a76514529..000000000 --- a/replica-lib/src/accountsdb_repl_client.rs +++ /dev/null @@ -1,119 +0,0 @@ -use { - log::*, - solana_sdk::clock::Slot, - std::{net::SocketAddr, sync::Arc}, - tokio::runtime::Runtime, - tonic::{self, transport::Endpoint, Request}, -}; - -tonic::include_proto!("accountsdb_repl"); - -pub struct AccountsDbReplClient { - client: accounts_db_repl_client::AccountsDbReplClient, -} - -#[derive(Debug)] -pub enum ReplicaRpcError { - InvalidUrl(String), - ConnectionError(String), - GetSlotsError(String), - GetAccountsError(String), -} - -impl From for ReplicaRpcError { - fn from(err: tonic::transport::Error) -> Self { - ReplicaRpcError::ConnectionError(err.to_string()) - } -} - -impl AccountsDbReplClient { - pub async fn connect(rpc_peer: &SocketAddr) -> Result { - let url = format!("http://{}", rpc_peer); - let endpoint = match Endpoint::from_shared(url.to_string()) { - Ok(endpoint) => endpoint, - Err(e) => { - return Err(ReplicaRpcError::InvalidUrl(e.to_string())); - } - }; - let client = accounts_db_repl_client::AccountsDbReplClient::connect(endpoint).await?; - info!( - "Successfully connected to the AccountsDb Replication server: {:?}", - url - ); - Ok(AccountsDbReplClient { client }) - } - - pub async fn get_confirmed_slots( - &mut self, - last_slot: Slot, - ) -> Result, ReplicaRpcError> { - let request = ReplicaSlotConfirmationRequest { - last_replicated_slot: last_slot, - }; - let response = self.client.get_confirmed_slots(Request::new(request)).await; - - match response { - Ok(response) => Ok(response.into_inner().updated_slots), - Err(status) => Err(ReplicaRpcError::GetSlotsError(status.to_string())), - } - } - - pub async fn get_slot_accounts( - &mut self, - slot: Slot, - ) -> Result, ReplicaRpcError> { - let request = ReplicaAccountsRequest { slot }; - let response = self.client.get_slot_accounts(Request::new(request)).await; - - match response { - Ok(response) => Ok(response.into_inner().accounts), - Err(status) => Err(ReplicaRpcError::GetAccountsError(status.to_string())), - } - } -} - -#[derive(Clone)] -pub struct AccountsDbReplClientServiceConfig { - pub worker_threads: usize, - pub replica_server_addr: SocketAddr, -} - -/// The service wrapper over AccountsDbReplClient to make it run in the tokio runtime -pub struct AccountsDbReplClientService { - runtime: Arc, - accountsdb_repl_client: AccountsDbReplClient, -} - -impl AccountsDbReplClientService { - pub fn new(config: AccountsDbReplClientServiceConfig) -> Result { - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .worker_threads(config.worker_threads) - .thread_name("sol-accountsdb-repl-wrk") - .enable_all() - .build() - .expect("Runtime"), - ); - - let accountsdb_repl_client = - runtime.block_on(AccountsDbReplClient::connect(&config.replica_server_addr))?; - - Ok(Self { - runtime, - accountsdb_repl_client, - }) - } - - pub fn get_confirmed_slots(&mut self, last_slot: Slot) -> Result, ReplicaRpcError> { - self.runtime - .block_on(self.accountsdb_repl_client.get_confirmed_slots(last_slot)) - } - - pub fn get_slot_accounts( - &mut self, - slot: Slot, - ) -> Result, ReplicaRpcError> { - self.runtime - .block_on(self.accountsdb_repl_client.get_slot_accounts(slot)) - } -} diff --git a/replica-lib/src/accountsdb_repl_server.rs b/replica-lib/src/accountsdb_repl_server.rs deleted file mode 100644 index 72eff6b0c..000000000 --- a/replica-lib/src/accountsdb_repl_server.rs +++ /dev/null @@ -1,176 +0,0 @@ -use { - futures_util::FutureExt, - log::*, - std::{ - net::SocketAddr, - sync::{Arc, RwLock}, - thread::{self, Builder, JoinHandle}, - }, - tokio::{ - runtime::Runtime, - sync::oneshot::{self, Receiver, Sender}, - }, - tonic::{self, transport}, -}; - -tonic::include_proto!("accountsdb_repl"); - -pub trait ReplicaSlotConfirmationServer { - fn get_confirmed_slots( - &self, - request: &ReplicaSlotConfirmationRequest, - ) -> Result; - - fn join(&mut self) -> thread::Result<()>; -} - -pub trait ReplicaAccountsServer { - fn get_slot_accounts( - &self, - request: &ReplicaAccountsRequest, - ) -> Result; - fn join(&mut self) -> thread::Result<()>; -} - -#[derive(Clone)] -struct AccountsDbReplServer { - confirmed_slots_server: Arc>, - accounts_server: Arc>, -} - -/// Implementing the AccountsDbRepl interface declared by the protocol -#[tonic::async_trait] -impl accounts_db_repl_server::AccountsDbRepl for AccountsDbReplServer { - async fn get_confirmed_slots( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let server = self.confirmed_slots_server.read().unwrap(); - let result = server.get_confirmed_slots(&request.into_inner()); - result.map(tonic::Response::new) - } - - async fn get_slot_accounts( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let server = self.accounts_server.read().unwrap(); - let result = server.get_slot_accounts(&request.into_inner()); - result.map(tonic::Response::new) - } -} - -impl AccountsDbReplServer { - pub fn new( - confirmed_slots_server: Arc>, - accounts_server: Arc>, - ) -> Self { - Self { - confirmed_slots_server, - accounts_server, - } - } - - pub fn join(self) -> thread::Result<()> { - self.confirmed_slots_server.write().unwrap().join()?; - self.accounts_server.write().unwrap().join() - } -} - -#[derive(Clone)] -pub struct AccountsDbReplServiceConfig { - pub worker_threads: usize, - pub replica_server_addr: SocketAddr, -} - -/// The service wraps the AccountsDbReplServer to make runnable in the tokio runtime -/// and handles start and stop of the service. -pub struct AccountsDbReplService { - accountsdb_repl_server: AccountsDbReplServer, - thread: JoinHandle<()>, - exit_signal_sender: Sender<()>, -} - -impl AccountsDbReplService { - pub fn new( - config: AccountsDbReplServiceConfig, - confirmed_slots_server: Arc>, - accounts_server: Arc>, - ) -> Self { - let accountsdb_repl_server = - AccountsDbReplServer::new(confirmed_slots_server, accounts_server); - - let worker_threads = config.worker_threads; - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .worker_threads(worker_threads) - .thread_name("sol-accountsdb-repl-wrk") - .enable_all() - .build() - .expect("Runtime"), - ); - - let server_cloned = accountsdb_repl_server.clone(); - let (exit_signal_sender, exit_signal_receiver) = oneshot::channel::<()>(); - - let thread = Builder::new() - .name("sol-accountsdb-repl-rt".to_string()) - .spawn(move || { - Self::run_accountsdb_repl_server_in_runtime( - config, - runtime, - server_cloned, - exit_signal_receiver, - ); - }) - .unwrap(); - - Self { - accountsdb_repl_server, - thread, - exit_signal_sender, - } - } - - async fn run_accountsdb_repl_server( - config: AccountsDbReplServiceConfig, - server: AccountsDbReplServer, - exit_signal: Receiver<()>, - ) -> Result<(), tonic::transport::Error> { - info!( - "Running AccountsDbReplServer at the endpoint: {:?}", - config.replica_server_addr - ); - transport::Server::builder() - .add_service(accounts_db_repl_server::AccountsDbReplServer::new(server)) - .serve_with_shutdown(config.replica_server_addr, exit_signal.map(drop)) - .await - } - - fn run_accountsdb_repl_server_in_runtime( - config: AccountsDbReplServiceConfig, - runtime: Arc, - server: AccountsDbReplServer, - exit_signal: Receiver<()>, - ) { - let result = runtime.block_on(Self::run_accountsdb_repl_server( - config, - server, - exit_signal, - )); - match result { - Ok(_) => { - info!("AccountsDbReplServer finished"); - } - Err(err) => { - error!("AccountsDbReplServer finished in error: {:}?", err); - } - } - } - - pub fn join(self) -> thread::Result<()> { - let _ = self.exit_signal_sender.send(()); - self.accountsdb_repl_server.join()?; - self.thread.join() - } -} diff --git a/replica-lib/src/accountsdb_repl_server_factory.rs b/replica-lib/src/accountsdb_repl_server_factory.rs deleted file mode 100644 index 5cc1a9008..000000000 --- a/replica-lib/src/accountsdb_repl_server_factory.rs +++ /dev/null @@ -1,29 +0,0 @@ -use { - crate::{ - accountsdb_repl_server::{AccountsDbReplService, AccountsDbReplServiceConfig}, - replica_accounts_server::ReplicaAccountsServerImpl, - replica_confirmed_slots_server::ReplicaSlotConfirmationServerImpl, - }, - crossbeam_channel::Receiver, - solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, - solana_runtime::bank_forks::BankForks, - std::sync::{Arc, RwLock}, -}; - -pub struct AccountsDbReplServerFactory {} - -impl AccountsDbReplServerFactory { - pub fn build_accountsdb_repl_server( - config: AccountsDbReplServiceConfig, - confirmed_bank_receiver: Receiver, - bank_forks: Arc>, - ) -> AccountsDbReplService { - AccountsDbReplService::new( - config, - Arc::new(RwLock::new(ReplicaSlotConfirmationServerImpl::new( - confirmed_bank_receiver, - ))), - Arc::new(RwLock::new(ReplicaAccountsServerImpl::new(bank_forks))), - ) - } -} diff --git a/replica-lib/src/lib.rs b/replica-lib/src/lib.rs deleted file mode 100644 index 13715c691..000000000 --- a/replica-lib/src/lib.rs +++ /dev/null @@ -1,7 +0,0 @@ -#![allow(clippy::integer_arithmetic)] - -pub mod accountsdb_repl_client; -pub mod accountsdb_repl_server; -pub mod accountsdb_repl_server_factory; -pub mod replica_accounts_server; -pub mod replica_confirmed_slots_server; diff --git a/replica-lib/src/replica_accounts_server.rs b/replica-lib/src/replica_accounts_server.rs deleted file mode 100644 index 7e1af431f..000000000 --- a/replica-lib/src/replica_accounts_server.rs +++ /dev/null @@ -1,95 +0,0 @@ -use { - crate::accountsdb_repl_server::{ - self, ReplicaAccountData, ReplicaAccountInfo, ReplicaAccountMeta, ReplicaAccountsServer, - }, - solana_runtime::{ - accounts_cache::CachedAccount, accounts_db::LoadedAccount, append_vec::StoredAccountMeta, - bank_forks::BankForks, - }, - solana_sdk::account::Account, - std::{ - cmp::Eq, - sync::{Arc, RwLock}, - thread, - }, -}; - -pub(crate) struct ReplicaAccountsServerImpl { - bank_forks: Arc>, -} - -impl Eq for ReplicaAccountInfo {} - -impl ReplicaAccountInfo { - fn from_stored_account_meta(stored_account_meta: &StoredAccountMeta) -> Self { - let account_meta = Some(ReplicaAccountMeta { - pubkey: stored_account_meta.meta.pubkey.to_bytes().to_vec(), - lamports: stored_account_meta.account_meta.lamports, - owner: stored_account_meta.account_meta.owner.to_bytes().to_vec(), - executable: stored_account_meta.account_meta.executable, - rent_epoch: stored_account_meta.account_meta.rent_epoch, - }); - let data = Some(ReplicaAccountData { - data: stored_account_meta.data.to_vec(), - }); - ReplicaAccountInfo { - account_meta, - hash: stored_account_meta.hash.as_ref().to_vec(), - data, - } - } - - fn from_cached_account(cached_account: &CachedAccount) -> Self { - let account = Account::from(cached_account.account.clone()); - let account_meta = Some(ReplicaAccountMeta { - pubkey: cached_account.pubkey().to_bytes().to_vec(), - lamports: account.lamports, - owner: account.owner.to_bytes().to_vec(), - executable: account.executable, - rent_epoch: account.rent_epoch, - }); - let data = Some(ReplicaAccountData { - data: account.data.to_vec(), - }); - ReplicaAccountInfo { - account_meta, - hash: cached_account.hash().as_ref().to_vec(), - data, - } - } -} - -impl ReplicaAccountsServer for ReplicaAccountsServerImpl { - fn get_slot_accounts( - &self, - request: &accountsdb_repl_server::ReplicaAccountsRequest, - ) -> Result { - let slot = request.slot; - - match self.bank_forks.read().unwrap().get(slot) { - None => Err(tonic::Status::not_found("The slot is not found")), - Some(bank) => { - let accounts = bank.rc.accounts.scan_slot(slot, |account| match account { - LoadedAccount::Stored(stored_account_meta) => Some( - ReplicaAccountInfo::from_stored_account_meta(&stored_account_meta), - ), - LoadedAccount::Cached(cached_account) => { - Some(ReplicaAccountInfo::from_cached_account(&cached_account)) - } - }); - - Ok(accountsdb_repl_server::ReplicaAccountsResponse { accounts }) - } - } - } - - fn join(&mut self) -> thread::Result<()> { - Ok(()) - } -} - -impl ReplicaAccountsServerImpl { - pub fn new(bank_forks: Arc>) -> Self { - Self { bank_forks } - } -} diff --git a/replica-lib/src/replica_confirmed_slots_server.rs b/replica-lib/src/replica_confirmed_slots_server.rs deleted file mode 100644 index 017970bd4..000000000 --- a/replica-lib/src/replica_confirmed_slots_server.rs +++ /dev/null @@ -1,122 +0,0 @@ -use { - crate::accountsdb_repl_server::{self, ReplicaSlotConfirmationServer}, - crossbeam_channel::Receiver, - solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, - solana_sdk::{clock::Slot, commitment_config::CommitmentLevel}, - std::{ - collections::VecDeque, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, - }, - thread::{self, sleep, Builder, JoinHandle}, - time::Duration, - }, - tonic, -}; - -/// The structure modelling the slots eligible for replication and -/// their states. -#[derive(Default, Clone)] -struct ReplicaEligibleSlotSet { - slot_set: Arc>>, -} - -pub(crate) struct ReplicaSlotConfirmationServerImpl { - eligible_slot_set: ReplicaEligibleSlotSet, - confirmed_bank_receiver_service: Option>, - cleanup_service: Option>, - exit_updated_slot_server: Arc, -} - -impl ReplicaSlotConfirmationServer for ReplicaSlotConfirmationServerImpl { - fn get_confirmed_slots( - &self, - request: &accountsdb_repl_server::ReplicaSlotConfirmationRequest, - ) -> Result { - let slot_set = self.eligible_slot_set.slot_set.read().unwrap(); - let updated_slots: Vec = slot_set - .iter() - .filter(|(slot, _)| *slot > request.last_replicated_slot) - .map(|(slot, _)| *slot) - .collect(); - - Ok(accountsdb_repl_server::ReplicaSlotConfirmationResponse { updated_slots }) - } - - fn join(&mut self) -> thread::Result<()> { - self.exit_updated_slot_server.store(true, Ordering::Relaxed); - self.confirmed_bank_receiver_service - .take() - .map(JoinHandle::join) - .unwrap() - .expect("confirmed_bank_receiver_service"); - - self.cleanup_service.take().map(JoinHandle::join).unwrap() - } -} - -const MAX_ELIGIBLE_SLOT_SET_SIZE: usize = 262144; - -impl ReplicaSlotConfirmationServerImpl { - pub fn new(confirmed_bank_receiver: Receiver) -> Self { - let eligible_slot_set = ReplicaEligibleSlotSet::default(); - let exit_updated_slot_server = Arc::new(AtomicBool::new(false)); - - Self { - eligible_slot_set: eligible_slot_set.clone(), - confirmed_bank_receiver_service: Some(Self::run_confirmed_bank_receiver( - confirmed_bank_receiver, - eligible_slot_set.clone(), - exit_updated_slot_server.clone(), - )), - cleanup_service: Some(Self::run_cleanup_service( - eligible_slot_set, - MAX_ELIGIBLE_SLOT_SET_SIZE, - exit_updated_slot_server.clone(), - )), - exit_updated_slot_server, - } - } - - fn run_confirmed_bank_receiver( - confirmed_bank_receiver: Receiver, - eligible_slot_set: ReplicaEligibleSlotSet, - exit: Arc, - ) -> JoinHandle<()> { - Builder::new() - .name("confirmed_bank_receiver".to_string()) - .spawn(move || { - while !exit.load(Ordering::Relaxed) { - if let Ok(BankNotification::OptimisticallyConfirmed(slot)) = - confirmed_bank_receiver.recv() - { - let mut slot_set = eligible_slot_set.slot_set.write().unwrap(); - slot_set.push_back((slot, CommitmentLevel::Confirmed)); - } - } - }) - .unwrap() - } - - fn run_cleanup_service( - eligible_slot_set: ReplicaEligibleSlotSet, - max_set_size: usize, - exit: Arc, - ) -> JoinHandle<()> { - Builder::new() - .name("cleanup_service".to_string()) - .spawn(move || { - while !exit.load(Ordering::Relaxed) { - let mut slot_set = eligible_slot_set.slot_set.write().unwrap(); - let count_to_drain = slot_set.len().saturating_sub(max_set_size); - if count_to_drain > 0 { - drop(slot_set.drain(..count_to_drain)); - } - drop(slot_set); - sleep(Duration::from_millis(200)); - } - }) - .unwrap() - } -} diff --git a/replica-node/.gitignore b/replica-node/.gitignore deleted file mode 100644 index 5404b132d..000000000 --- a/replica-node/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/target/ -/farf/ diff --git a/replica-node/Cargo.toml b/replica-node/Cargo.toml deleted file mode 100644 index 99a74db8f..000000000 --- a/replica-node/Cargo.toml +++ /dev/null @@ -1,43 +0,0 @@ -[package] -authors = ["Solana Maintainers "] -edition = "2021" -name = "solana-replica-node" -description = "Solana replication node" -version = "1.11.0" -repository = "https://github.com/solana-labs/solana" -license = "Apache-2.0" -homepage = "https://solana.com/" -documentation = "https://docs.rs/solana-validator" - -[dependencies] -clap = "2.33.1" -crossbeam-channel = "0.5" -log = "0.4.14" -rand = "0.7.0" -solana-clap-utils = { path = "../clap-utils", version = "=1.11.0" } -solana-download-utils = { path = "../download-utils", version = "=1.11.0" } -solana-genesis-utils = { path = "../genesis-utils", version = "=1.11.0" } -solana-gossip = { path = "../gossip", version = "=1.11.0" } -solana-ledger = { path = "../ledger", version = "=1.11.0" } -solana-logger = { path = "../logger", version = "=1.11.0" } -solana-net-utils = { path = "../net-utils", version = "=1.11.0" } -solana-replica-lib = { path = "../replica-lib", version = "=1.11.0" } -solana-rpc = { path = "../rpc", version = "=1.11.0" } -solana-runtime = { path = "../runtime", version = "=1.11.0" } -solana-sdk = { path = "../sdk", version = "=1.11.0" } -solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.11.0" } -solana-streamer = { path = "../streamer", version = "=1.11.0" } -solana-validator = { path = "../validator", version = "=1.11.0" } -solana-version = { path = "../version", version = "=1.11.0" } - -[dev-dependencies] -serial_test = "0.6.0" -solana-core = { path = "../core", version = "=1.11.0" } -solana-local-cluster = { path = "../local-cluster", version = "=1.11.0" } -tempfile = "3.3.0" - -[package.metadata.docs.rs] -targets = ["x86_64-unknown-linux-gnu"] - -[build-dependencies] -tonic-build = "0.6.2" diff --git a/replica-node/src/accountsdb_repl_service.rs b/replica-node/src/accountsdb_repl_service.rs deleted file mode 100644 index bed1694fe..000000000 --- a/replica-node/src/accountsdb_repl_service.rs +++ /dev/null @@ -1,81 +0,0 @@ -/// Module responsible for replicating AccountsDb data from its peer to its local AccountsDb in the replica-node -use { - log::*, - solana_replica_lib::accountsdb_repl_client::{ - AccountsDbReplClientService, AccountsDbReplClientServiceConfig, ReplicaRpcError, - }, - solana_sdk::{clock::Slot, pubkey::Pubkey}, - std::{ - thread::{self, sleep, Builder, JoinHandle}, - time::Duration, - }, -}; - -pub struct AccountsDbReplService { - thread: JoinHandle<()>, -} - -impl AccountsDbReplService { - pub fn new( - last_replicated_slot: Slot, - config: AccountsDbReplClientServiceConfig, - ) -> Result { - let accountsdb_repl_client = AccountsDbReplClientService::new(config)?; - let thread = Builder::new() - .name("sol-accountsdb-repl-svc".to_string()) - .spawn(move || { - Self::run_service(last_replicated_slot, accountsdb_repl_client); - }) - .unwrap(); - Ok(Self { thread }) - } - - fn replicate_accounts_for_slot( - accountsdb_repl_client: &mut AccountsDbReplClientService, - slot: Slot, - ) { - match accountsdb_repl_client.get_slot_accounts(slot) { - Err(err) => { - error!( - "Ran into error getting accounts for slot {:?}, error: {:?}", - slot, err - ); - } - Ok(accounts) => { - for account in accounts.iter() { - debug!( - "Received account: {:?}", - Pubkey::new(&account.account_meta.as_ref().unwrap().pubkey) - ); - } - } - } - } - - fn run_service( - mut last_replicated_slot: Slot, - mut accountsdb_repl_client: AccountsDbReplClientService, - ) { - loop { - match accountsdb_repl_client.get_confirmed_slots(last_replicated_slot) { - Ok(slots) => { - info!("Received updated slots: {:?}", slots); - if !slots.is_empty() { - for slot in slots.iter() { - Self::replicate_accounts_for_slot(&mut accountsdb_repl_client, *slot); - } - last_replicated_slot = slots[slots.len() - 1]; - } - } - Err(err) => { - error!("Ran into error getting updated slots: {:?}", err); - } - } - sleep(Duration::from_millis(200)); - } - } - - pub fn join(self) -> thread::Result<()> { - self.thread.join() - } -} diff --git a/replica-node/src/lib.rs b/replica-node/src/lib.rs deleted file mode 100644 index f3ff1bd95..000000000 --- a/replica-node/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -#![allow(clippy::integer_arithmetic)] - -pub mod accountsdb_repl_service; -pub mod replica_node; -pub mod replica_util; diff --git a/replica-node/src/main.rs b/replica-node/src/main.rs deleted file mode 100644 index 07369ebd6..000000000 --- a/replica-node/src/main.rs +++ /dev/null @@ -1,408 +0,0 @@ -//! The main AccountsDb replication node responsible for replicating -//! AccountsDb information from peer a validator or another replica-node. - -#![allow(clippy::integer_arithmetic)] - -use { - clap::{crate_description, crate_name, value_t, values_t, App, AppSettings, Arg}, - log::*, - rand::{seq::SliceRandom, thread_rng}, - solana_clap_utils::{ - input_parsers::keypair_of, - input_validators::{is_keypair_or_ask_keyword, is_parsable, is_pubkey}, - keypair::SKIP_SEED_PHRASE_VALIDATION_ARG, - }, - solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, - solana_net_utils::VALIDATOR_PORT_RANGE, - solana_replica_node::{ - replica_node::{ReplicaNode, ReplicaNodeConfig}, - replica_util, - }, - solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig}, - solana_runtime::accounts_index::AccountSecondaryIndexes, - solana_sdk::{exit::Exit, pubkey::Pubkey, signature::Signer}, - solana_streamer::socket::SocketAddrSpace, - solana_validator::port_range_validator, - std::{ - collections::HashSet, - env, - net::{IpAddr, SocketAddr}, - path::PathBuf, - process::exit, - sync::{Arc, RwLock}, - }, -}; - -pub fn main() { - let default_dynamic_port_range = - &format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1); - - let matches = App::new(crate_name!()) - .about(crate_description!()) - .version(solana_version::version!()) - .setting(AppSettings::VersionlessSubcommands) - .setting(AppSettings::InferSubcommands) - .arg( - Arg::with_name(SKIP_SEED_PHRASE_VALIDATION_ARG.name) - .long(SKIP_SEED_PHRASE_VALIDATION_ARG.long) - .help(SKIP_SEED_PHRASE_VALIDATION_ARG.help), - ) - .arg( - Arg::with_name("ledger_path") - .short("l") - .long("ledger") - .value_name("DIR") - .takes_value(true) - .required(true) - .default_value("ledger") - .help("Use DIR as ledger location"), - ) - .arg( - Arg::with_name("snapshots") - .long("snapshots") - .value_name("DIR") - .takes_value(true) - .help("Use DIR as snapshot location [default: --ledger value]"), - ) - .arg( - Arg::with_name("peer_address") - .long("peer-address") - .value_name("IP") - .takes_value(true) - .required(true) - .help("The the address for the peer validator/replica to download from"), - ) - .arg( - Arg::with_name("peer_rpc_port") - .long("peer-rpc-port") - .value_name("PORT") - .takes_value(true) - .required(true) - .help("The the PORT for the peer validator/replica from which to download the snapshots"), - ) - .arg( - Arg::with_name("peer_accountsdb_repl_port") - .long("peer-accountsdb-repl-port") - .value_name("PORT") - .takes_value(true) - .required(true) - .help("The the PORT for the peer validator/replica serving the AccountsDb replication"), - ) - .arg( - Arg::with_name("peer_pubkey") - .long("peer-pubkey") - .validator(is_pubkey) - .value_name("The peer validator/replica IDENTITY") - .required(true) - .takes_value(true) - .help("The pubkey for the target validator."), - ) - .arg( - Arg::with_name("account_paths") - .long("accounts") - .value_name("PATHS") - .takes_value(true) - .multiple(true) - .help("Comma separated persistent accounts location"), - ) - .arg( - Arg::with_name("identity") - .short("i") - .long("identity") - .value_name("KEYPAIR") - .takes_value(true) - .validator(is_keypair_or_ask_keyword) - .help("Replica identity keypair"), - ) - .arg( - Arg::with_name("entrypoint") - .short("n") - .long("entrypoint") - .value_name("HOST:PORT") - .takes_value(true) - .multiple(true) - .validator(solana_net_utils::is_host_port) - .help("Rendezvous with the cluster at this gossip entrypoint"), - ) - .arg( - Arg::with_name("bind_address") - .long("bind-address") - .value_name("HOST") - .takes_value(true) - .validator(solana_net_utils::is_host) - .default_value("0.0.0.0") - .help("IP address to bind the replica ports"), - ) - .arg( - Arg::with_name("rpc_bind_address") - .long("rpc-bind-address") - .value_name("HOST") - .takes_value(true) - .validator(solana_net_utils::is_host) - .help("IP address to bind the Json RPC port [default: use --bind-address]"), - ) - .arg( - Arg::with_name("rpc_port") - .long("rpc-port") - .value_name("PORT") - .takes_value(true) - .validator(solana_validator::port_validator) - .help("Enable JSON RPC on this port, and the next port for the RPC websocket"), - ) - .arg( - Arg::with_name("dynamic_port_range") - .long("dynamic-port-range") - .value_name("MIN_PORT-MAX_PORT") - .takes_value(true) - .default_value(default_dynamic_port_range) - .validator(port_range_validator) - .help("Range to use for dynamically assigned ports"), - ) - .arg( - Arg::with_name("expected_shred_version") - .long("expected-shred-version") - .value_name("VERSION") - .takes_value(true) - .validator(is_parsable::) - .help("Require the shred version be this value"), - ) - .arg( - Arg::with_name("logfile") - .short("o") - .long("log") - .value_name("FILE") - .takes_value(true) - .help( - "Redirect logging to the specified file, '-' for standard error. \ - Sending the SIGUSR1 signal to the validator process will cause it \ - to re-open the log file", - ), - ) - .arg( - Arg::with_name("allow_private_addr") - .long("allow-private-addr") - .takes_value(false) - .help("Allow contacting private ip addresses") - .hidden(true), - ) - .get_matches(); - - let bind_address = solana_net_utils::parse_host(matches.value_of("bind_address").unwrap()) - .expect("invalid bind_address"); - - let rpc_bind_address = if let Some(rpc_bind_address) = matches.value_of("rpc_bind_address") { - solana_net_utils::parse_host(rpc_bind_address).expect("invalid rpc_bind_address") - } else { - bind_address - }; - - let identity_keypair = keypair_of(&matches, "identity").unwrap_or_else(|| { - clap::Error::with_description( - "The --identity argument is required", - clap::ErrorKind::ArgumentNotFound, - ) - .exit(); - }); - - let peer_pubkey = value_t!(matches, "peer_pubkey", Pubkey).unwrap(); - - let entrypoint_addrs = values_t!(matches, "entrypoint", String) - .unwrap_or_default() - .into_iter() - .map(|entrypoint| { - solana_net_utils::parse_host_port(&entrypoint).unwrap_or_else(|e| { - eprintln!("failed to parse entrypoint address: {}", e); - exit(1); - }) - }) - .collect::>() - .into_iter() - .collect::>(); - - let expected_shred_version = value_t!(matches, "expected_shred_version", u16) - .ok() - .or_else(|| replica_util::get_cluster_shred_version(&entrypoint_addrs)); - - let gossip_host: IpAddr = matches - .value_of("gossip_host") - .map(|gossip_host| { - solana_net_utils::parse_host(gossip_host).unwrap_or_else(|err| { - eprintln!("Failed to parse --gossip-host: {}", err); - exit(1); - }) - }) - .unwrap_or_else(|| { - if !entrypoint_addrs.is_empty() { - let mut order: Vec<_> = (0..entrypoint_addrs.len()).collect(); - order.shuffle(&mut thread_rng()); - - let gossip_host = order.into_iter().find_map(|i| { - let entrypoint_addr = &entrypoint_addrs[i]; - info!( - "Contacting {} to determine the validator's public IP address", - entrypoint_addr - ); - solana_net_utils::get_public_ip_addr(entrypoint_addr).map_or_else( - |err| { - eprintln!( - "Failed to contact cluster entrypoint {}: {}", - entrypoint_addr, err - ); - None - }, - Some, - ) - }); - - gossip_host.unwrap_or_else(|| { - eprintln!("Unable to determine the validator's public IP address"); - exit(1); - }) - } else { - std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)) - } - }); - - let gossip_addr = SocketAddr::new( - gossip_host, - value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| { - solana_net_utils::find_available_port_in_range(bind_address, (0, 1)).unwrap_or_else( - |err| { - eprintln!("Unable to find an available gossip port: {}", err); - exit(1); - }, - ) - }), - ); - - let dynamic_port_range = - solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap()) - .expect("invalid dynamic_port_range"); - - let cluster_entrypoints = entrypoint_addrs - .iter() - .map(ContactInfo::new_gossip_entry_point) - .collect::>(); - - let node = Node::new_with_external_ip( - &identity_keypair.pubkey(), - &gossip_addr, - dynamic_port_range, - bind_address, - None, - ); - - let ledger_path = PathBuf::from(matches.value_of("ledger_path").unwrap()); - let snapshot_archives_dir = if let Some(snapshots) = matches.value_of("snapshots") { - PathBuf::from(snapshots) - } else { - ledger_path.clone() - }; - let bank_snapshots_dir = snapshot_archives_dir.join("snapshot"); - - let account_paths: Vec = - if let Ok(account_paths) = values_t!(matches, "account_paths", String) { - account_paths - .join(",") - .split(',') - .map(PathBuf::from) - .collect() - } else { - vec![ledger_path.join("accounts")] - }; - - let peer_address = solana_net_utils::parse_host(matches.value_of("peer_address").unwrap()) - .expect("invalid peer_address"); - - let peer_rpc_port = value_t!(matches, "peer_rpc_port", u16).unwrap_or_else(|_| { - clap::Error::with_description( - "The --peer-rpc-port argument is required", - clap::ErrorKind::ArgumentNotFound, - ) - .exit(); - }); - - let rpc_peer_addr = SocketAddr::new(peer_address, peer_rpc_port); - - let peer_accountsdb_repl_port = value_t!(matches, "peer_accountsdb_repl_port", u16) - .unwrap_or_else(|_| { - clap::Error::with_description( - "The --peer-accountsdb-repl-port argument is required", - clap::ErrorKind::ArgumentNotFound, - ) - .exit(); - }); - - let accountsdb_repl_peer_addr = SocketAddr::new(peer_address, peer_accountsdb_repl_port); - - let rpc_port = value_t!(matches, "rpc_port", u16).unwrap_or_else(|_| { - clap::Error::with_description( - "The --rpc-port argument is required", - clap::ErrorKind::ArgumentNotFound, - ) - .exit(); - }); - let rpc_addrs = ( - SocketAddr::new(rpc_bind_address, rpc_port), - SocketAddr::new(rpc_bind_address, rpc_port + 1), - // If additional ports are added, +2 needs to be skipped to avoid a conflict with - // the websocket port (which is +2) in web3.js This odd port shifting is tracked at - // https://github.com/solana-labs/solana/issues/12250 - ); - - let logfile = { - let logfile = matches - .value_of("logfile") - .map(|s| s.into()) - .unwrap_or_else(|| format!("solana-replica-node-{}.log", identity_keypair.pubkey())); - - if logfile == "-" { - None - } else { - println!("log file: {}", logfile); - Some(logfile) - } - }; - let socket_addr_space = SocketAddrSpace::new(matches.is_present("allow_private_addr")); - - let _logger_thread = solana_validator::redirect_stderr_to_file(logfile); - - let (cluster_info, rpc_contact_info, snapshot_info) = replica_util::get_rpc_peer_info( - identity_keypair, - &cluster_entrypoints, - &ledger_path, - &node, - expected_shred_version, - &peer_pubkey, - &snapshot_archives_dir, - socket_addr_space, - ); - - info!( - "Using RPC service from node {}: {:?}, snapshot_info: {:?}", - rpc_contact_info.id, rpc_contact_info.rpc, snapshot_info - ); - - let config = ReplicaNodeConfig { - rpc_peer_addr, - accountsdb_repl_peer_addr: Some(accountsdb_repl_peer_addr), - rpc_addr: rpc_addrs.0, - rpc_pubsub_addr: rpc_addrs.1, - ledger_path, - snapshot_archives_dir, - bank_snapshots_dir, - account_paths, - snapshot_info: snapshot_info.unwrap(), - cluster_info, - rpc_config: JsonRpcConfig::default(), - snapshot_config: None, - pubsub_config: PubSubConfig::default(), - socket_addr_space, - account_indexes: AccountSecondaryIndexes::default(), - accounts_db_caching_enabled: false, - replica_exit: Arc::new(RwLock::new(Exit::default())), - }; - - let replica = ReplicaNode::new(config); - replica.join(); -} diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs deleted file mode 100644 index 7f2a55f04..000000000 --- a/replica-node/src/replica_node.rs +++ /dev/null @@ -1,346 +0,0 @@ -use { - crate::accountsdb_repl_service::AccountsDbReplService, - crossbeam_channel::unbounded, - log::*, - solana_download_utils::download_snapshot_archive, - solana_genesis_utils::download_then_check_genesis_hash, - solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, - solana_ledger::{ - blockstore::Blockstore, blockstore_db::BlockstoreOptions, blockstore_processor, - leader_schedule_cache::LeaderScheduleCache, - }, - solana_replica_lib::accountsdb_repl_client::AccountsDbReplClientServiceConfig, - solana_rpc::{ - max_slots::MaxSlots, - optimistically_confirmed_bank_tracker::{ - OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, - }, - rpc::JsonRpcConfig, - rpc_pubsub_service::{PubSubConfig, PubSubService}, - rpc_service::JsonRpcService, - rpc_subscriptions::RpcSubscriptions, - }, - solana_runtime::{ - accounts_index::AccountSecondaryIndexes, bank_forks::BankForks, - commitment::BlockCommitmentCache, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, - snapshot_config::SnapshotConfig, snapshot_package::SnapshotType, snapshot_utils, - }, - solana_sdk::{clock::Slot, exit::Exit, genesis_config::GenesisConfig, hash::Hash}, - solana_send_transaction_service::send_transaction_service, - solana_streamer::socket::SocketAddrSpace, - std::{ - fs, - net::SocketAddr, - path::PathBuf, - sync::{ - atomic::{AtomicBool, AtomicU64}, - Arc, RwLock, - }, - }, -}; - -pub struct ReplicaNodeConfig { - pub rpc_peer_addr: SocketAddr, - pub accountsdb_repl_peer_addr: Option, - pub rpc_addr: SocketAddr, - pub rpc_pubsub_addr: SocketAddr, - pub ledger_path: PathBuf, - pub snapshot_archives_dir: PathBuf, - pub bank_snapshots_dir: PathBuf, - pub account_paths: Vec, - pub snapshot_info: (Slot, Hash), - pub cluster_info: Arc, - pub rpc_config: JsonRpcConfig, - pub snapshot_config: Option, - pub pubsub_config: PubSubConfig, - pub account_indexes: AccountSecondaryIndexes, - pub accounts_db_caching_enabled: bool, - pub replica_exit: Arc>, - pub socket_addr_space: SocketAddrSpace, -} - -pub struct ReplicaNode { - json_rpc_service: Option, - pubsub_service: Option, - optimistically_confirmed_bank_tracker: Option, - accountsdb_repl_service: Option, -} - -// Struct maintaining information about banks -struct ReplicaBankInfo { - bank_forks: Arc>, - optimistically_confirmed_bank: Arc>, - leader_schedule_cache: Arc, - block_commitment_cache: Arc>, -} - -// Initialize the replica by downloading snapshot from the peer, initialize -// the BankForks, OptimisticallyConfirmedBank, LeaderScheduleCache and -// BlockCommitmentCache and return the info wrapped as ReplicaBankInfo. -fn initialize_from_snapshot( - replica_config: &ReplicaNodeConfig, - snapshot_config: &SnapshotConfig, - genesis_config: &GenesisConfig, -) -> ReplicaBankInfo { - info!( - "Downloading snapshot from the peer into {:?}", - replica_config.snapshot_archives_dir - ); - - download_snapshot_archive( - &replica_config.rpc_peer_addr, - &replica_config.snapshot_archives_dir, - replica_config.snapshot_info, - SnapshotType::FullSnapshot, - snapshot_config.maximum_full_snapshot_archives_to_retain, - snapshot_config.maximum_incremental_snapshot_archives_to_retain, - false, - &mut None, - ) - .unwrap(); - - fs::create_dir_all(&snapshot_config.bank_snapshots_dir) - .expect("Couldn't create bank snapshot directory"); - - let archive_info = snapshot_utils::get_highest_full_snapshot_archive_info( - &replica_config.snapshot_archives_dir, - ) - .unwrap(); - - let process_options = blockstore_processor::ProcessOptions { - account_indexes: replica_config.account_indexes.clone(), - accounts_db_caching_enabled: replica_config.accounts_db_caching_enabled, - ..blockstore_processor::ProcessOptions::default() - }; - - info!( - "Build bank from snapshot archive: {:?}", - &snapshot_config.bank_snapshots_dir - ); - let (bank0, _) = snapshot_utils::bank_from_snapshot_archives( - &replica_config.account_paths, - &snapshot_config.bank_snapshots_dir, - &archive_info, - None, - genesis_config, - process_options.debug_keys.clone(), - None, - process_options.account_indexes.clone(), - process_options.accounts_db_caching_enabled, - process_options.limit_load_slot_count_from_snapshot, - process_options.shrink_ratio, - process_options.accounts_db_test_hash_calculation, - false, - process_options.verify_index, - process_options.accounts_db_config, - None, - ) - .unwrap(); - - let bank0_slot = bank0.slot(); - let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); - - let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); - - let optimistically_confirmed_bank = - OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); - - let mut block_commitment_cache = BlockCommitmentCache::default(); - block_commitment_cache.initialize_slots(bank0_slot, bank0_slot); - let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache)); - - ReplicaBankInfo { - bank_forks, - optimistically_confirmed_bank, - leader_schedule_cache, - block_commitment_cache, - } -} - -fn start_client_rpc_services( - replica_config: &ReplicaNodeConfig, - genesis_config: &GenesisConfig, - cluster_info: Arc, - bank_info: &ReplicaBankInfo, - socket_addr_space: &SocketAddrSpace, -) -> ( - Option, - Option, - Option, -) { - let ReplicaBankInfo { - bank_forks, - optimistically_confirmed_bank, - leader_schedule_cache, - block_commitment_cache, - } = bank_info; - let blockstore = Arc::new( - Blockstore::open_with_options( - &replica_config.ledger_path, - BlockstoreOptions { - enforce_ulimit_nofile: false, - ..BlockstoreOptions::default() - }, - ) - .unwrap(), - ); - - let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(0)); - - let max_slots = Arc::new(MaxSlots::default()); - let exit = Arc::new(AtomicBool::new(false)); - - let subscriptions = Arc::new(RpcSubscriptions::new( - &exit, - max_complete_transaction_status_slot.clone(), - blockstore.clone(), - bank_forks.clone(), - block_commitment_cache.clone(), - optimistically_confirmed_bank.clone(), - )); - - let rpc_override_health_check = Arc::new(AtomicBool::new(false)); - if ContactInfo::is_valid_address(&replica_config.rpc_addr, socket_addr_space) { - assert!(ContactInfo::is_valid_address( - &replica_config.rpc_pubsub_addr, - socket_addr_space - )); - } else { - assert!(!ContactInfo::is_valid_address( - &replica_config.rpc_pubsub_addr, - socket_addr_space - )); - } - - let (trigger, pubsub_service) = PubSubService::new( - replica_config.pubsub_config.clone(), - &subscriptions, - replica_config.rpc_pubsub_addr, - ); - replica_config - .replica_exit - .write() - .unwrap() - .register_exit(Box::new(move || trigger.cancel())); - - let (_bank_notification_sender, bank_notification_receiver) = unbounded(); - ( - Some(JsonRpcService::new( - replica_config.rpc_addr, - replica_config.rpc_config.clone(), - replica_config.snapshot_config.clone(), - bank_forks.clone(), - block_commitment_cache.clone(), - blockstore, - cluster_info, - None, - genesis_config.hash(), - &replica_config.ledger_path, - replica_config.replica_exit.clone(), - None, - rpc_override_health_check, - optimistically_confirmed_bank.clone(), - send_transaction_service::Config { - retry_rate_ms: 0, - leader_forward_count: 0, - ..send_transaction_service::Config::default() - }, - max_slots, - leader_schedule_cache.clone(), - max_complete_transaction_status_slot, - )), - Some(pubsub_service), - Some(OptimisticallyConfirmedBankTracker::new( - bank_notification_receiver, - &exit, - bank_forks.clone(), - optimistically_confirmed_bank.clone(), - subscriptions, - None, - )), - ) -} - -impl ReplicaNode { - pub fn new(replica_config: ReplicaNodeConfig) -> Self { - let genesis_config = download_then_check_genesis_hash( - &replica_config.rpc_peer_addr, - &replica_config.ledger_path, - None, - MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, - false, - true, - ) - .unwrap(); - - let snapshot_config = SnapshotConfig { - full_snapshot_archive_interval_slots: Slot::MAX, - incremental_snapshot_archive_interval_slots: Slot::MAX, - snapshot_archives_dir: replica_config.snapshot_archives_dir.clone(), - bank_snapshots_dir: replica_config.bank_snapshots_dir.clone(), - ..SnapshotConfig::default() - }; - - let bank_info = - initialize_from_snapshot(&replica_config, &snapshot_config, &genesis_config); - - let (json_rpc_service, pubsub_service, optimistically_confirmed_bank_tracker) = - start_client_rpc_services( - &replica_config, - &genesis_config, - replica_config.cluster_info.clone(), - &bank_info, - &replica_config.socket_addr_space, - ); - - let accountsdb_repl_client_config = AccountsDbReplClientServiceConfig { - worker_threads: 1, - replica_server_addr: replica_config.accountsdb_repl_peer_addr.unwrap(), - }; - - let last_replicated_slot = bank_info.bank_forks.read().unwrap().root_bank().slot(); - info!( - "Starting AccountsDbReplService from slot {:?}", - last_replicated_slot - ); - let accountsdb_repl_service = Some( - AccountsDbReplService::new(last_replicated_slot, accountsdb_repl_client_config) - .expect("Failed to start AccountsDb replication service"), - ); - - info!( - "Started AccountsDbReplService from slot {:?}", - last_replicated_slot - ); - - ReplicaNode { - json_rpc_service, - pubsub_service, - optimistically_confirmed_bank_tracker, - accountsdb_repl_service, - } - } - - pub fn join(self) { - if let Some(json_rpc_service) = self.json_rpc_service { - json_rpc_service.join().expect("rpc_service"); - } - - if let Some(pubsub_service) = self.pubsub_service { - pubsub_service.join().expect("pubsub_service"); - } - - if let Some(optimistically_confirmed_bank_tracker) = - self.optimistically_confirmed_bank_tracker - { - optimistically_confirmed_bank_tracker - .join() - .expect("optimistically_confirmed_bank_tracker"); - } - if let Some(accountsdb_repl_service) = self.accountsdb_repl_service { - accountsdb_repl_service - .join() - .expect("accountsdb_repl_service"); - } - } -} diff --git a/replica-node/src/replica_util.rs b/replica-node/src/replica_util.rs deleted file mode 100644 index 5c5dc461c..000000000 --- a/replica-node/src/replica_util.rs +++ /dev/null @@ -1,276 +0,0 @@ -use { - log::*, - rand::{seq::SliceRandom, thread_rng, Rng}, - solana_gossip::{ - cluster_info::{ClusterInfo, Node}, - contact_info::ContactInfo, - gossip_service::GossipService, - }, - solana_runtime::{snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_utils}, - solana_sdk::{ - clock::Slot, - hash::Hash, - pubkey::Pubkey, - signature::{Keypair, Signer}, - }, - solana_streamer::socket::SocketAddrSpace, - std::{ - collections::HashSet, - net::{SocketAddr, UdpSocket}, - path::Path, - process::exit, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread::sleep, - time::{Duration, Instant}, - }, -}; - -pub fn get_cluster_shred_version(entrypoints: &[SocketAddr]) -> Option { - let entrypoints = { - let mut index: Vec<_> = (0..entrypoints.len()).collect(); - index.shuffle(&mut rand::thread_rng()); - index.into_iter().map(|i| &entrypoints[i]) - }; - for entrypoint in entrypoints { - match solana_net_utils::get_cluster_shred_version(entrypoint) { - Err(err) => eprintln!("get_cluster_shred_version failed: {}, {}", entrypoint, err), - Ok(0) => eprintln!("zero sherd-version from entrypoint: {}", entrypoint), - Ok(shred_version) => { - info!( - "obtained shred-version {} from {}", - shred_version, entrypoint - ); - return Some(shred_version); - } - } - } - None -} - -// Discover the RPC peer node via Gossip and return's ContactInfo -// And the initial snapshot info: (Slot, Hash) -// Alternatively, this can be solved via a RPC call instead of using gossip. -fn get_rpc_peer_node( - cluster_info: &ClusterInfo, - cluster_entrypoints: &[ContactInfo], - expected_shred_version: Option, - peer_pubkey: &Pubkey, - snapshot_archives_dir: &Path, -) -> Option<(ContactInfo, Option<(Slot, Hash)>)> { - let mut newer_cluster_snapshot_timeout = None; - let mut retry_reason = None; - loop { - sleep(Duration::from_secs(1)); - info!("Searching for the rpc peer node and latest snapshot information with shred_version {:?}.", expected_shred_version); - info!("\n{}", cluster_info.rpc_info_trace()); - - let shred_version = - expected_shred_version.unwrap_or_else(|| cluster_info.my_shred_version()); - if shred_version == 0 { - let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| { - cluster_info - .lookup_contact_info_by_gossip_addr(&cluster_entrypoint.gossip) - .map_or(false, |entrypoint| entrypoint.shred_version == 0) - }); - - if all_zero_shred_versions { - eprintln!( - "Entrypoint shred version is zero. Restart with --expected-shred-version" - ); - exit(1); - } - info!("Waiting to adopt entrypoint shred version..."); - continue; - } - - info!( - "Searching for an RPC service with shred version {}{}...", - shred_version, - retry_reason - .as_ref() - .map(|s| format!(" (Retrying: {})", s)) - .unwrap_or_default() - ); - - let rpc_peers = cluster_info - .all_rpc_peers() - .into_iter() - .filter(|contact_info| contact_info.shred_version == shred_version) - .collect::>(); - let rpc_peers_total = rpc_peers.len(); - - let rpc_known_peers = rpc_peers - .iter() - .filter(|rpc_peer| &rpc_peer.id == peer_pubkey) - .count(); - - info!( - "Total {} RPC nodes found. {} known", - rpc_peers_total, rpc_known_peers - ); - - let mut highest_snapshot_info: Option<(Slot, Hash)> = - snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_archives_dir).map( - |snapshot_archive_info| { - (snapshot_archive_info.slot(), *snapshot_archive_info.hash()) - }, - ); - let eligible_rpc_peers = { - let mut eligible_rpc_peers = vec![]; - - for rpc_peer in rpc_peers.iter() { - if &rpc_peer.id != peer_pubkey { - continue; - } - cluster_info.get_snapshot_hash_for_node(&rpc_peer.id, |snapshot_hashes| { - for snapshot_hash in snapshot_hashes { - if highest_snapshot_info.is_none() - || snapshot_hash.0 > highest_snapshot_info.unwrap().0 - { - // Found a higher snapshot, remove all nodes with a lower snapshot - eligible_rpc_peers.clear(); - highest_snapshot_info = Some(*snapshot_hash) - } - - if Some(*snapshot_hash) == highest_snapshot_info { - eligible_rpc_peers.push(rpc_peer.clone()); - } - } - }); - } - - match highest_snapshot_info { - None => { - assert!(eligible_rpc_peers.is_empty()); - } - Some(highest_snapshot_info) => { - if eligible_rpc_peers.is_empty() { - match newer_cluster_snapshot_timeout { - None => newer_cluster_snapshot_timeout = Some(Instant::now()), - Some(newer_cluster_snapshot_timeout) => { - if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 { - warn!("giving up newer snapshot from the cluster"); - return None; - } - } - } - retry_reason = Some(format!( - "Wait for newer snapshot than local: {:?}", - highest_snapshot_info - )); - continue; - } - - info!( - "Highest available snapshot slot is {}, available from {} node{}: {:?}", - highest_snapshot_info.0, - eligible_rpc_peers.len(), - if eligible_rpc_peers.len() > 1 { - "s" - } else { - "" - }, - eligible_rpc_peers - .iter() - .map(|contact_info| contact_info.id) - .collect::>() - ); - } - } - eligible_rpc_peers - }; - - if !eligible_rpc_peers.is_empty() { - let contact_info = - &eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())]; - return Some((contact_info.clone(), highest_snapshot_info)); - } else { - retry_reason = Some("No snapshots available".to_owned()); - } - } -} - -fn start_gossip_node( - identity_keypair: Arc, - cluster_entrypoints: &[ContactInfo], - ledger_path: &Path, - gossip_addr: &SocketAddr, - gossip_socket: UdpSocket, - expected_shred_version: Option, - gossip_validators: Option>, - should_check_duplicate_instance: bool, - socket_addr_space: SocketAddrSpace, -) -> (Arc, Arc, GossipService) { - let contact_info = ClusterInfo::gossip_contact_info( - identity_keypair.pubkey(), - *gossip_addr, - expected_shred_version.unwrap_or(0), - ); - let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space); - cluster_info.set_entrypoints(cluster_entrypoints.to_vec()); - cluster_info.restore_contact_info(ledger_path, 0); - let cluster_info = Arc::new(cluster_info); - - let gossip_exit_flag = Arc::new(AtomicBool::new(false)); - let gossip_service = GossipService::new( - &cluster_info, - None, - gossip_socket, - gossip_validators, - should_check_duplicate_instance, - None, - &gossip_exit_flag, - ); - info!("Started gossip node"); - info!( - "The cluster contact info:\n{}", - cluster_info.contact_info_trace() - ); - - (cluster_info, gossip_exit_flag, gossip_service) -} - -// Get the RPC peer info given the peer's Pubkey -// Returns the ClusterInfo, the peer's ContactInfo and the initial snapshot info -pub fn get_rpc_peer_info( - identity_keypair: Keypair, - cluster_entrypoints: &[ContactInfo], - ledger_path: &Path, - node: &Node, - expected_shred_version: Option, - peer_pubkey: &Pubkey, - snapshot_archives_dir: &Path, - socket_addr_space: SocketAddrSpace, -) -> (Arc, ContactInfo, Option<(Slot, Hash)>) { - let identity_keypair = Arc::new(identity_keypair); - - let gossip = start_gossip_node( - identity_keypair, - cluster_entrypoints, - ledger_path, - &node.info.gossip, - node.sockets.gossip.try_clone().unwrap(), - expected_shred_version, - None, - true, - socket_addr_space, - ); - - let rpc_node_details = get_rpc_peer_node( - &gossip.0, - cluster_entrypoints, - expected_shred_version, - peer_pubkey, - snapshot_archives_dir, - ); - let rpc_node_details = rpc_node_details.unwrap(); - - // We no longer need the gossip node, stop it: - let gossip_exit_flag = gossip.1; - gossip_exit_flag.store(true, Ordering::Relaxed); - - (gossip.0, rpc_node_details.0, rpc_node_details.1) -} diff --git a/replica-node/tests/local_replica.rs b/replica-node/tests/local_replica.rs deleted file mode 100644 index 372de06c9..000000000 --- a/replica-node/tests/local_replica.rs +++ /dev/null @@ -1,299 +0,0 @@ -#![allow(clippy::integer_arithmetic)] -use { - log::*, - serial_test::serial, - solana_core::validator::ValidatorConfig, - solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, - solana_local_cluster::{ - cluster::Cluster, - local_cluster::{ClusterConfig, LocalCluster}, - validator_configs::*, - }, - solana_replica_lib::accountsdb_repl_server::AccountsDbReplServiceConfig, - solana_replica_node::{ - replica_node::{ReplicaNode, ReplicaNodeConfig}, - replica_util, - }, - solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig}, - solana_runtime::{ - accounts_index::AccountSecondaryIndexes, snapshot_archive_info::SnapshotArchiveInfoGetter, - snapshot_config::SnapshotConfig, snapshot_utils, - }, - solana_sdk::{ - client::SyncClient, - clock::Slot, - commitment_config::CommitmentConfig, - epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, - exit::Exit, - hash::Hash, - signature::{Keypair, Signer}, - }, - solana_streamer::socket::SocketAddrSpace, - std::{ - net::{IpAddr, Ipv4Addr, SocketAddr}, - path::{Path, PathBuf}, - sync::{Arc, RwLock}, - thread::sleep, - time::Duration, - }, - tempfile::TempDir, -}; - -const RUST_LOG_FILTER: &str = - "error,solana_core::replay_stage=warn,solana_local_cluster=info,local_cluster=info"; - -fn wait_for_next_snapshot( - cluster: &LocalCluster, - snapshot_archives_dir: &Path, -) -> (PathBuf, (Slot, Hash)) { - // Get slot after which this was generated - let client = cluster - .get_validator_client(&cluster.entry_point_info.id) - .unwrap(); - let last_slot = client - .get_slot_with_commitment(CommitmentConfig::processed()) - .expect("Couldn't get slot"); - - // Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot - // must include the transactions just pushed - trace!( - "Waiting for snapshot archive to be generated with slot > {}", - last_slot - ); - loop { - if let Some(full_snapshot_archive_info) = - snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_archives_dir) - { - trace!( - "full snapshot for slot {} exists", - full_snapshot_archive_info.slot() - ); - if full_snapshot_archive_info.slot() >= last_slot { - return ( - full_snapshot_archive_info.path().clone(), - ( - full_snapshot_archive_info.slot(), - *full_snapshot_archive_info.hash(), - ), - ); - } - trace!( - "full snapshot slot {} < last_slot {}", - full_snapshot_archive_info.slot(), - last_slot - ); - } - sleep(Duration::from_millis(1000)); - } -} - -fn farf_dir() -> PathBuf { - std::env::var("FARF_DIR") - .unwrap_or_else(|_| "farf".to_string()) - .into() -} - -fn generate_account_paths(num_account_paths: usize) -> (Vec, Vec) { - let account_storage_dirs: Vec = (0..num_account_paths) - .map(|_| tempfile::tempdir_in(farf_dir()).unwrap()) - .collect(); - let account_storage_paths: Vec<_> = account_storage_dirs - .iter() - .map(|a| a.path().to_path_buf()) - .collect(); - (account_storage_dirs, account_storage_paths) -} - -struct SnapshotValidatorConfig { - _snapshot_dir: TempDir, - snapshot_archives_dir: TempDir, - account_storage_dirs: Vec, - validator_config: ValidatorConfig, -} - -fn setup_snapshot_validator_config( - snapshot_interval_slots: u64, - num_account_paths: usize, -) -> SnapshotValidatorConfig { - // Create the snapshot config - let bank_snapshots_dir = tempfile::tempdir_in(farf_dir()).unwrap(); - let snapshot_archives_dir = tempfile::tempdir_in(farf_dir()).unwrap(); - let snapshot_config = SnapshotConfig { - full_snapshot_archive_interval_slots: snapshot_interval_slots, - incremental_snapshot_archive_interval_slots: Slot::MAX, - snapshot_archives_dir: snapshot_archives_dir.path().to_path_buf(), - bank_snapshots_dir: bank_snapshots_dir.path().to_path_buf(), - ..SnapshotConfig::default() - }; - - // Create the account paths - let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths); - - let bind_ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); - let accountsdb_repl_port = - solana_net_utils::find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap(); - let replica_server_addr = SocketAddr::new(bind_ip_addr, accountsdb_repl_port); - - let accountsdb_repl_service_config = Some(AccountsDbReplServiceConfig { - worker_threads: 1, - replica_server_addr, - }); - - // Create the validator config - let validator_config = ValidatorConfig { - snapshot_config: Some(snapshot_config), - account_paths: account_storage_paths, - accounts_hash_interval_slots: snapshot_interval_slots, - accountsdb_repl_service_config, - ..ValidatorConfig::default_for_test() - }; - - SnapshotValidatorConfig { - _snapshot_dir: bank_snapshots_dir, - snapshot_archives_dir, - account_storage_dirs, - validator_config, - } -} - -fn test_local_cluster_start_and_exit_with_config(socket_addr_space: SocketAddrSpace) { - solana_logger::setup(); - const NUM_NODES: usize = 1; - let mut config = ClusterConfig { - validator_configs: make_identical_validator_configs( - &ValidatorConfig::default_for_test(), - NUM_NODES, - ), - node_stakes: vec![3; NUM_NODES], - cluster_lamports: 100, - ticks_per_slot: 8, - slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH as u64, - stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH as u64, - ..ClusterConfig::default() - }; - let cluster = LocalCluster::new(&mut config, socket_addr_space); - assert_eq!(cluster.validators.len(), NUM_NODES); -} - -#[test] -#[serial] -fn test_replica_bootstrap() { - let socket_addr_space = SocketAddrSpace::new(true); - - test_local_cluster_start_and_exit_with_config(socket_addr_space); - - solana_logger::setup_with_default(RUST_LOG_FILTER); - // First set up the cluster with 1 node - let snapshot_interval_slots = 50; - let num_account_paths = 3; - - let leader_snapshot_test_config = - setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); - - info!( - "Snapshot config for the leader: accounts: {:?}, snapshot: {:?}", - leader_snapshot_test_config.account_storage_dirs, - leader_snapshot_test_config.snapshot_archives_dir - ); - let stake = 10_000; - let mut config = ClusterConfig { - node_stakes: vec![stake], - cluster_lamports: 1_000_000, - validator_configs: make_identical_validator_configs( - &leader_snapshot_test_config.validator_config, - 1, - ), - ..ClusterConfig::default() - }; - - let cluster = LocalCluster::new(&mut config, socket_addr_space); - - assert_eq!(cluster.validators.len(), 1); - let contact_info = &cluster.entry_point_info; - - info!("Contact info: {:?}", contact_info); - - // Get slot after which this was generated - let snapshot_archives_dir = &leader_snapshot_test_config - .validator_config - .snapshot_config - .as_ref() - .unwrap() - .snapshot_archives_dir; - info!("Waiting for snapshot"); - let (archive_filename, archive_snapshot_hash) = - wait_for_next_snapshot(&cluster, snapshot_archives_dir); - info!("found: {:?}", archive_filename); - - let identity_keypair = Keypair::new(); - - // now bring up a replica to talk to it. - let ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); - let port = solana_net_utils::find_available_port_in_range(ip_addr, (8101, 8200)).unwrap(); - let rpc_addr = SocketAddr::new(ip_addr, port); - - let port = solana_net_utils::find_available_port_in_range(ip_addr, (8201, 8300)).unwrap(); - - let rpc_pubsub_addr = SocketAddr::new(ip_addr, port); - let ledger_dir = tempfile::tempdir_in(farf_dir()).unwrap(); - let ledger_path = ledger_dir.path(); - let snapshot_output_dir = tempfile::tempdir_in(farf_dir()).unwrap(); - let snapshot_archives_dir = snapshot_output_dir.path(); - let bank_snapshots_dir = snapshot_archives_dir.join("snapshot"); - let account_paths: Vec = vec![ledger_path.join("accounts")]; - - let port = solana_net_utils::find_available_port_in_range(ip_addr, (8301, 8400)).unwrap(); - let gossip_addr = SocketAddr::new(ip_addr, port); - - let dynamic_port_range = solana_net_utils::parse_port_range("8401-8500").unwrap(); - let bind_address = solana_net_utils::parse_host("127.0.0.1").unwrap(); - let node = Node::new_with_external_ip( - &identity_keypair.pubkey(), - &gossip_addr, - dynamic_port_range, - bind_address, - None, - ); - - info!("The peer id: {:?}", &contact_info.id); - let entry_points = vec![ContactInfo::new_gossip_entry_point(&contact_info.gossip)]; - let (cluster_info, _rpc_contact_info, _snapshot_info) = replica_util::get_rpc_peer_info( - identity_keypair, - &entry_points, - ledger_path, - &node, - None, - &contact_info.id, - snapshot_archives_dir, - socket_addr_space, - ); - - info!("The cluster info:\n{:?}", cluster_info.contact_info_trace()); - - let config = ReplicaNodeConfig { - rpc_peer_addr: contact_info.rpc, - accountsdb_repl_peer_addr: Some( - leader_snapshot_test_config - .validator_config - .accountsdb_repl_service_config - .unwrap() - .replica_server_addr, - ), - rpc_addr, - rpc_pubsub_addr, - ledger_path: ledger_path.to_path_buf(), - snapshot_archives_dir: snapshot_archives_dir.to_path_buf(), - bank_snapshots_dir, - account_paths, - snapshot_info: archive_snapshot_hash, - cluster_info, - rpc_config: JsonRpcConfig::default(), - snapshot_config: None, - pubsub_config: PubSubConfig::default(), - socket_addr_space, - account_indexes: AccountSecondaryIndexes::default(), - accounts_db_caching_enabled: false, - replica_exit: Arc::new(RwLock::new(Exit::default())), - }; - let _replica_node = ReplicaNode::new(config); -} diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 1575ae243..5239eaa5a 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -2644,7 +2644,7 @@ pub mod rpc_minimal { } // RPC interface that only depends on immediate Bank data -// Expected to be provided by both API nodes and (future) accounts replica nodes +// Expected to be provided by API nodes pub mod rpc_bank { use super::*; #[rpc] diff --git a/validator/Cargo.toml b/validator/Cargo.toml index c3e989138..8762ff04c 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -43,7 +43,6 @@ solana-metrics = { path = "../metrics", version = "=1.11.0" } solana-net-utils = { path = "../net-utils", version = "=1.11.0" } solana-perf = { path = "../perf", version = "=1.11.0" } solana-poh = { path = "../poh", version = "=1.11.0" } -solana-replica-lib = { path = "../replica-lib", version = "=1.11.0" } solana-rpc = { path = "../rpc", version = "=1.11.0" } solana-runtime = { path = "../runtime", version = "=1.11.0" } solana-sdk = { path = "../sdk", version = "=1.11.0" } diff --git a/validator/src/main.rs b/validator/src/main.rs index f32b75385..4a2bedc1e 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -36,7 +36,6 @@ use { solana_net_utils::VALIDATOR_PORT_RANGE, solana_perf::recycler::enable_recycler_warming, solana_poh::poh_service, - solana_replica_lib::accountsdb_repl_server::AccountsDbReplServiceConfig, solana_rpc::{ rpc::{JsonRpcConfig, RpcBigtableConfig}, rpc_pubsub_service::PubSubConfig, @@ -2268,26 +2267,6 @@ pub fn main() { } let accounts_db_config = Some(accounts_db_config); - let accountsdb_repl_service_config = if matches.is_present("enable_accountsdb_repl") { - let accountsdb_repl_bind_address = if matches.is_present("accountsdb_repl_bind_address") { - solana_net_utils::parse_host(matches.value_of("accountsdb_repl_bind_address").unwrap()) - .expect("invalid accountsdb_repl_bind_address") - } else { - bind_address - }; - let accountsdb_repl_port = value_t_or_exit!(matches, "accountsdb_repl_port", u16); - - Some(AccountsDbReplServiceConfig { - worker_threads: value_t_or_exit!(matches, "accountsdb_repl_threads", usize), - replica_server_addr: SocketAddr::new( - accountsdb_repl_bind_address, - accountsdb_repl_port, - ), - }) - } else { - None - }; - let geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") { Some( values_t_or_exit!(matches, "geyser_plugin_config", String) @@ -2368,7 +2347,6 @@ pub fn main() { account_indexes: account_indexes.clone(), rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"), }, - accountsdb_repl_service_config, geyser_plugin_config_files, rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| { (