Remove replica-node crates (#24152)

This commit is contained in:
Tyera Eulberg 2022-04-06 18:52:19 -04:00 committed by GitHub
parent 7ee1edddd1
commit fb67ff14de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 2 additions and 2184 deletions

48
Cargo.lock generated
View File

@ -4745,7 +4745,6 @@ dependencies = [
"solana-poh",
"solana-program-runtime",
"solana-rayon-threadlimit",
"solana-replica-lib",
"solana-rpc",
"solana-runtime",
"solana-sdk",
@ -5530,52 +5529,6 @@ dependencies = [
"uriparse",
]
[[package]]
name = "solana-replica-lib"
version = "1.11.0"
dependencies = [
"crossbeam-channel",
"futures-util",
"log",
"prost",
"solana-rpc",
"solana-runtime",
"solana-sdk",
"tokio",
"tonic",
"tonic-build",
]
[[package]]
name = "solana-replica-node"
version = "1.11.0"
dependencies = [
"clap 2.33.3",
"crossbeam-channel",
"log",
"rand 0.7.3",
"serial_test",
"solana-clap-utils",
"solana-core",
"solana-download-utils",
"solana-genesis-utils",
"solana-gossip",
"solana-ledger",
"solana-local-cluster",
"solana-logger 1.11.0",
"solana-net-utils",
"solana-replica-lib",
"solana-rpc",
"solana-runtime",
"solana-sdk",
"solana-send-transaction-service",
"solana-streamer",
"solana-validator",
"solana-version",
"tempfile",
"tonic-build",
]
[[package]]
name = "solana-rpc"
version = "1.11.0"
@ -6082,7 +6035,6 @@ dependencies = [
"solana-net-utils",
"solana-perf",
"solana-poh",
"solana-replica-lib",
"solana-rpc",
"solana-runtime",
"solana-sdk",

View File

@ -59,8 +59,6 @@ members = [
"rayon-threadlimit",
"rbpf-cli",
"remote-wallet",
"replica-lib",
"replica-node",
"rpc",
"rpc-test",
"runtime",

View File

@ -49,7 +49,6 @@ solana-perf = { path = "../perf", version = "=1.11.0" }
solana-poh = { path = "../poh", version = "=1.11.0" }
solana-program-runtime = { path = "../program-runtime", version = "=1.11.0" }
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.11.0" }
solana-replica-lib = { path = "../replica-lib", version = "=1.11.0" }
solana-rpc = { path = "../rpc", version = "=1.11.0" }
solana-runtime = { path = "../runtime", version = "=1.11.0" }
solana-sdk = { path = "../sdk", version = "=1.11.0" }

View File

@ -50,10 +50,6 @@ use {
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
poh_service::{self, PohService},
},
solana_replica_lib::{
accountsdb_repl_server::{AccountsDbReplService, AccountsDbReplServiceConfig},
accountsdb_repl_server_factory,
},
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
@ -122,7 +118,6 @@ pub struct ValidatorConfig {
pub account_paths: Vec<PathBuf>,
pub account_shrink_paths: Option<Vec<PathBuf>>,
pub rpc_config: JsonRpcConfig,
pub accountsdb_repl_service_config: Option<AccountsDbReplServiceConfig>,
pub geyser_plugin_config_files: Option<Vec<PathBuf>>,
pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
pub pubsub_config: PubSubConfig,
@ -184,7 +179,6 @@ impl Default for ValidatorConfig {
account_paths: Vec::new(),
account_shrink_paths: None,
rpc_config: JsonRpcConfig::default(),
accountsdb_repl_service_config: None,
geyser_plugin_config_files: None,
rpc_addrs: None,
pubsub_config: PubSubConfig::default(),
@ -339,7 +333,6 @@ pub struct Validator {
pub cluster_info: Arc<ClusterInfo>,
pub bank_forks: Arc<RwLock<BankForks>>,
pub blockstore: Arc<Blockstore>,
accountsdb_repl_service: Option<AccountsDbReplService>,
geyser_plugin_service: Option<GeyserPluginService>,
}
@ -673,7 +666,6 @@ impl Validator {
pubsub_service,
optimistically_confirmed_bank_tracker,
bank_notification_sender,
accountsdb_repl_service,
) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
if ContactInfo::is_valid_address(&node.info.rpc, &socket_addr_space) {
assert!(ContactInfo::is_valid_address(
@ -687,13 +679,6 @@ impl Validator {
));
}
let accountsdb_repl_service = config.accountsdb_repl_service_config.as_ref().map(|accountsdb_repl_service_config| {
let (bank_notification_sender, bank_notification_receiver) = unbounded();
bank_notification_senders.push(bank_notification_sender);
accountsdb_repl_server_factory::AccountsDbReplServerFactory::build_accountsdb_repl_server(
accountsdb_repl_service_config.clone(), bank_notification_receiver, bank_forks.clone())
});
let (bank_notification_sender, bank_notification_receiver) = unbounded();
let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() {
Some(Arc::new(RwLock::new(bank_notification_senders)))
@ -746,10 +731,9 @@ impl Validator {
confirmed_bank_subscribers,
)),
Some(bank_notification_sender),
accountsdb_repl_service,
)
} else {
(None, None, None, None, None)
(None, None, None, None)
};
if config.dev_halt_at_slot.is_some() {
@ -999,7 +983,6 @@ impl Validator {
cluster_info,
bank_forks,
blockstore: blockstore.clone(),
accountsdb_repl_service,
geyser_plugin_service,
}
}
@ -1119,12 +1102,6 @@ impl Validator {
ip_echo_server.shutdown_background();
}
if let Some(accountsdb_repl_service) = self.accountsdb_repl_service {
accountsdb_repl_service
.join()
.expect("accountsdb_repl_service");
}
if let Some(geyser_plugin_service) = self.geyser_plugin_service {
geyser_plugin_service.join().expect("geyser_plugin_service");
}

View File

@ -14,7 +14,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
account_paths: config.account_paths.clone(),
account_shrink_paths: config.account_shrink_paths.clone(),
rpc_config: config.rpc_config.clone(),
accountsdb_repl_service_config: config.accountsdb_repl_service_config.clone(),
geyser_plugin_config_files: config.geyser_plugin_config_files.clone(),
rpc_addrs: config.rpc_addrs,
pubsub_config: config.pubsub_config.clone(),

View File

@ -1,27 +0,0 @@
[package]
authors = ["Solana Maintainers <maintainers@solana.foundation>"]
edition = "2021"
name = "solana-replica-lib"
description = "The library used for replication by both the client and server"
version = "1.11.0"
repository = "https://github.com/solana-labs/solana"
license = "Apache-2.0"
homepage = "https://solana.com/"
documentation = "https://docs.rs/solana-validator"
[dependencies]
crossbeam-channel = "0.5"
futures-util = "0.3"
log = "0.4.11"
prost = "0.9.0"
solana-rpc = { path = "../rpc", version = "=1.11.0" }
solana-runtime = { path = "../runtime", version = "=1.11.0" }
solana-sdk = { path = "../sdk", version = "=1.11.0" }
tokio = { version = "1", features = ["full"] }
tonic = { version = "0.6.2", features = ["tls", "transport"] }
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
[build-dependencies]
tonic-build = "0.6.2"

View File

@ -1,5 +0,0 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
// compiling protos using path on build time
tonic_build::configure().compile(&["proto/accountsdb_repl.proto"], &["proto"])?;
Ok(())
}

View File

@ -1,44 +0,0 @@
// version of prorocol buffer used
syntax = "proto3";
package accountsdb_repl;
message ReplicaSlotConfirmationRequest {
uint64 last_replicated_slot = 1;
}
message ReplicaSlotConfirmationResponse {
repeated uint64 updated_slots = 1;
}
message ReplicaAccountsRequest {
uint64 slot = 1;
}
message ReplicaAccountMeta {
bytes Pubkey = 1;
uint64 lamports = 2;
bytes owner = 3;
bool executable = 4;
uint64 rent_epoch = 5;
}
message ReplicaAccountData {
bytes data = 1;
}
message ReplicaAccountInfo {
ReplicaAccountMeta account_meta = 1;
bytes hash = 2;
ReplicaAccountData data = 3;
}
message ReplicaAccountsResponse {
repeated ReplicaAccountInfo accounts = 1;
}
service AccountsDbRepl {
rpc get_confirmed_slots(ReplicaSlotConfirmationRequest) returns (ReplicaSlotConfirmationResponse);
rpc get_slot_accounts(ReplicaAccountsRequest) returns (ReplicaAccountsResponse);
}

View File

@ -1,119 +0,0 @@
use {
log::*,
solana_sdk::clock::Slot,
std::{net::SocketAddr, sync::Arc},
tokio::runtime::Runtime,
tonic::{self, transport::Endpoint, Request},
};
tonic::include_proto!("accountsdb_repl");
pub struct AccountsDbReplClient {
client: accounts_db_repl_client::AccountsDbReplClient<tonic::transport::Channel>,
}
#[derive(Debug)]
pub enum ReplicaRpcError {
InvalidUrl(String),
ConnectionError(String),
GetSlotsError(String),
GetAccountsError(String),
}
impl From<tonic::transport::Error> for ReplicaRpcError {
fn from(err: tonic::transport::Error) -> Self {
ReplicaRpcError::ConnectionError(err.to_string())
}
}
impl AccountsDbReplClient {
pub async fn connect(rpc_peer: &SocketAddr) -> Result<Self, ReplicaRpcError> {
let url = format!("http://{}", rpc_peer);
let endpoint = match Endpoint::from_shared(url.to_string()) {
Ok(endpoint) => endpoint,
Err(e) => {
return Err(ReplicaRpcError::InvalidUrl(e.to_string()));
}
};
let client = accounts_db_repl_client::AccountsDbReplClient::connect(endpoint).await?;
info!(
"Successfully connected to the AccountsDb Replication server: {:?}",
url
);
Ok(AccountsDbReplClient { client })
}
pub async fn get_confirmed_slots(
&mut self,
last_slot: Slot,
) -> Result<Vec<Slot>, ReplicaRpcError> {
let request = ReplicaSlotConfirmationRequest {
last_replicated_slot: last_slot,
};
let response = self.client.get_confirmed_slots(Request::new(request)).await;
match response {
Ok(response) => Ok(response.into_inner().updated_slots),
Err(status) => Err(ReplicaRpcError::GetSlotsError(status.to_string())),
}
}
pub async fn get_slot_accounts(
&mut self,
slot: Slot,
) -> Result<Vec<ReplicaAccountInfo>, ReplicaRpcError> {
let request = ReplicaAccountsRequest { slot };
let response = self.client.get_slot_accounts(Request::new(request)).await;
match response {
Ok(response) => Ok(response.into_inner().accounts),
Err(status) => Err(ReplicaRpcError::GetAccountsError(status.to_string())),
}
}
}
#[derive(Clone)]
pub struct AccountsDbReplClientServiceConfig {
pub worker_threads: usize,
pub replica_server_addr: SocketAddr,
}
/// The service wrapper over AccountsDbReplClient to make it run in the tokio runtime
pub struct AccountsDbReplClientService {
runtime: Arc<Runtime>,
accountsdb_repl_client: AccountsDbReplClient,
}
impl AccountsDbReplClientService {
pub fn new(config: AccountsDbReplClientServiceConfig) -> Result<Self, ReplicaRpcError> {
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.worker_threads)
.thread_name("sol-accountsdb-repl-wrk")
.enable_all()
.build()
.expect("Runtime"),
);
let accountsdb_repl_client =
runtime.block_on(AccountsDbReplClient::connect(&config.replica_server_addr))?;
Ok(Self {
runtime,
accountsdb_repl_client,
})
}
pub fn get_confirmed_slots(&mut self, last_slot: Slot) -> Result<Vec<Slot>, ReplicaRpcError> {
self.runtime
.block_on(self.accountsdb_repl_client.get_confirmed_slots(last_slot))
}
pub fn get_slot_accounts(
&mut self,
slot: Slot,
) -> Result<Vec<ReplicaAccountInfo>, ReplicaRpcError> {
self.runtime
.block_on(self.accountsdb_repl_client.get_slot_accounts(slot))
}
}

View File

@ -1,176 +0,0 @@
use {
futures_util::FutureExt,
log::*,
std::{
net::SocketAddr,
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
},
tokio::{
runtime::Runtime,
sync::oneshot::{self, Receiver, Sender},
},
tonic::{self, transport},
};
tonic::include_proto!("accountsdb_repl");
pub trait ReplicaSlotConfirmationServer {
fn get_confirmed_slots(
&self,
request: &ReplicaSlotConfirmationRequest,
) -> Result<ReplicaSlotConfirmationResponse, tonic::Status>;
fn join(&mut self) -> thread::Result<()>;
}
pub trait ReplicaAccountsServer {
fn get_slot_accounts(
&self,
request: &ReplicaAccountsRequest,
) -> Result<ReplicaAccountsResponse, tonic::Status>;
fn join(&mut self) -> thread::Result<()>;
}
#[derive(Clone)]
struct AccountsDbReplServer {
confirmed_slots_server: Arc<RwLock<dyn ReplicaSlotConfirmationServer + Sync + Send>>,
accounts_server: Arc<RwLock<dyn ReplicaAccountsServer + Sync + Send>>,
}
/// Implementing the AccountsDbRepl interface declared by the protocol
#[tonic::async_trait]
impl accounts_db_repl_server::AccountsDbRepl for AccountsDbReplServer {
async fn get_confirmed_slots(
&self,
request: tonic::Request<ReplicaSlotConfirmationRequest>,
) -> Result<tonic::Response<ReplicaSlotConfirmationResponse>, tonic::Status> {
let server = self.confirmed_slots_server.read().unwrap();
let result = server.get_confirmed_slots(&request.into_inner());
result.map(tonic::Response::new)
}
async fn get_slot_accounts(
&self,
request: tonic::Request<ReplicaAccountsRequest>,
) -> Result<tonic::Response<ReplicaAccountsResponse>, tonic::Status> {
let server = self.accounts_server.read().unwrap();
let result = server.get_slot_accounts(&request.into_inner());
result.map(tonic::Response::new)
}
}
impl AccountsDbReplServer {
pub fn new(
confirmed_slots_server: Arc<RwLock<dyn ReplicaSlotConfirmationServer + Sync + Send>>,
accounts_server: Arc<RwLock<dyn ReplicaAccountsServer + Sync + Send>>,
) -> Self {
Self {
confirmed_slots_server,
accounts_server,
}
}
pub fn join(self) -> thread::Result<()> {
self.confirmed_slots_server.write().unwrap().join()?;
self.accounts_server.write().unwrap().join()
}
}
#[derive(Clone)]
pub struct AccountsDbReplServiceConfig {
pub worker_threads: usize,
pub replica_server_addr: SocketAddr,
}
/// The service wraps the AccountsDbReplServer to make runnable in the tokio runtime
/// and handles start and stop of the service.
pub struct AccountsDbReplService {
accountsdb_repl_server: AccountsDbReplServer,
thread: JoinHandle<()>,
exit_signal_sender: Sender<()>,
}
impl AccountsDbReplService {
pub fn new(
config: AccountsDbReplServiceConfig,
confirmed_slots_server: Arc<RwLock<dyn ReplicaSlotConfirmationServer + Sync + Send>>,
accounts_server: Arc<RwLock<dyn ReplicaAccountsServer + Sync + Send>>,
) -> Self {
let accountsdb_repl_server =
AccountsDbReplServer::new(confirmed_slots_server, accounts_server);
let worker_threads = config.worker_threads;
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(worker_threads)
.thread_name("sol-accountsdb-repl-wrk")
.enable_all()
.build()
.expect("Runtime"),
);
let server_cloned = accountsdb_repl_server.clone();
let (exit_signal_sender, exit_signal_receiver) = oneshot::channel::<()>();
let thread = Builder::new()
.name("sol-accountsdb-repl-rt".to_string())
.spawn(move || {
Self::run_accountsdb_repl_server_in_runtime(
config,
runtime,
server_cloned,
exit_signal_receiver,
);
})
.unwrap();
Self {
accountsdb_repl_server,
thread,
exit_signal_sender,
}
}
async fn run_accountsdb_repl_server(
config: AccountsDbReplServiceConfig,
server: AccountsDbReplServer,
exit_signal: Receiver<()>,
) -> Result<(), tonic::transport::Error> {
info!(
"Running AccountsDbReplServer at the endpoint: {:?}",
config.replica_server_addr
);
transport::Server::builder()
.add_service(accounts_db_repl_server::AccountsDbReplServer::new(server))
.serve_with_shutdown(config.replica_server_addr, exit_signal.map(drop))
.await
}
fn run_accountsdb_repl_server_in_runtime(
config: AccountsDbReplServiceConfig,
runtime: Arc<Runtime>,
server: AccountsDbReplServer,
exit_signal: Receiver<()>,
) {
let result = runtime.block_on(Self::run_accountsdb_repl_server(
config,
server,
exit_signal,
));
match result {
Ok(_) => {
info!("AccountsDbReplServer finished");
}
Err(err) => {
error!("AccountsDbReplServer finished in error: {:}?", err);
}
}
}
pub fn join(self) -> thread::Result<()> {
let _ = self.exit_signal_sender.send(());
self.accountsdb_repl_server.join()?;
self.thread.join()
}
}

View File

@ -1,29 +0,0 @@
use {
crate::{
accountsdb_repl_server::{AccountsDbReplService, AccountsDbReplServiceConfig},
replica_accounts_server::ReplicaAccountsServerImpl,
replica_confirmed_slots_server::ReplicaSlotConfirmationServerImpl,
},
crossbeam_channel::Receiver,
solana_rpc::optimistically_confirmed_bank_tracker::BankNotification,
solana_runtime::bank_forks::BankForks,
std::sync::{Arc, RwLock},
};
pub struct AccountsDbReplServerFactory {}
impl AccountsDbReplServerFactory {
pub fn build_accountsdb_repl_server(
config: AccountsDbReplServiceConfig,
confirmed_bank_receiver: Receiver<BankNotification>,
bank_forks: Arc<RwLock<BankForks>>,
) -> AccountsDbReplService {
AccountsDbReplService::new(
config,
Arc::new(RwLock::new(ReplicaSlotConfirmationServerImpl::new(
confirmed_bank_receiver,
))),
Arc::new(RwLock::new(ReplicaAccountsServerImpl::new(bank_forks))),
)
}
}

View File

@ -1,7 +0,0 @@
#![allow(clippy::integer_arithmetic)]
pub mod accountsdb_repl_client;
pub mod accountsdb_repl_server;
pub mod accountsdb_repl_server_factory;
pub mod replica_accounts_server;
pub mod replica_confirmed_slots_server;

View File

@ -1,95 +0,0 @@
use {
crate::accountsdb_repl_server::{
self, ReplicaAccountData, ReplicaAccountInfo, ReplicaAccountMeta, ReplicaAccountsServer,
},
solana_runtime::{
accounts_cache::CachedAccount, accounts_db::LoadedAccount, append_vec::StoredAccountMeta,
bank_forks::BankForks,
},
solana_sdk::account::Account,
std::{
cmp::Eq,
sync::{Arc, RwLock},
thread,
},
};
pub(crate) struct ReplicaAccountsServerImpl {
bank_forks: Arc<RwLock<BankForks>>,
}
impl Eq for ReplicaAccountInfo {}
impl ReplicaAccountInfo {
fn from_stored_account_meta(stored_account_meta: &StoredAccountMeta) -> Self {
let account_meta = Some(ReplicaAccountMeta {
pubkey: stored_account_meta.meta.pubkey.to_bytes().to_vec(),
lamports: stored_account_meta.account_meta.lamports,
owner: stored_account_meta.account_meta.owner.to_bytes().to_vec(),
executable: stored_account_meta.account_meta.executable,
rent_epoch: stored_account_meta.account_meta.rent_epoch,
});
let data = Some(ReplicaAccountData {
data: stored_account_meta.data.to_vec(),
});
ReplicaAccountInfo {
account_meta,
hash: stored_account_meta.hash.as_ref().to_vec(),
data,
}
}
fn from_cached_account(cached_account: &CachedAccount) -> Self {
let account = Account::from(cached_account.account.clone());
let account_meta = Some(ReplicaAccountMeta {
pubkey: cached_account.pubkey().to_bytes().to_vec(),
lamports: account.lamports,
owner: account.owner.to_bytes().to_vec(),
executable: account.executable,
rent_epoch: account.rent_epoch,
});
let data = Some(ReplicaAccountData {
data: account.data.to_vec(),
});
ReplicaAccountInfo {
account_meta,
hash: cached_account.hash().as_ref().to_vec(),
data,
}
}
}
impl ReplicaAccountsServer for ReplicaAccountsServerImpl {
fn get_slot_accounts(
&self,
request: &accountsdb_repl_server::ReplicaAccountsRequest,
) -> Result<accountsdb_repl_server::ReplicaAccountsResponse, tonic::Status> {
let slot = request.slot;
match self.bank_forks.read().unwrap().get(slot) {
None => Err(tonic::Status::not_found("The slot is not found")),
Some(bank) => {
let accounts = bank.rc.accounts.scan_slot(slot, |account| match account {
LoadedAccount::Stored(stored_account_meta) => Some(
ReplicaAccountInfo::from_stored_account_meta(&stored_account_meta),
),
LoadedAccount::Cached(cached_account) => {
Some(ReplicaAccountInfo::from_cached_account(&cached_account))
}
});
Ok(accountsdb_repl_server::ReplicaAccountsResponse { accounts })
}
}
}
fn join(&mut self) -> thread::Result<()> {
Ok(())
}
}
impl ReplicaAccountsServerImpl {
pub fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
Self { bank_forks }
}
}

View File

@ -1,122 +0,0 @@
use {
crate::accountsdb_repl_server::{self, ReplicaSlotConfirmationServer},
crossbeam_channel::Receiver,
solana_rpc::optimistically_confirmed_bank_tracker::BankNotification,
solana_sdk::{clock::Slot, commitment_config::CommitmentLevel},
std::{
collections::VecDeque,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
tonic,
};
/// The structure modelling the slots eligible for replication and
/// their states.
#[derive(Default, Clone)]
struct ReplicaEligibleSlotSet {
slot_set: Arc<RwLock<VecDeque<(Slot, CommitmentLevel)>>>,
}
pub(crate) struct ReplicaSlotConfirmationServerImpl {
eligible_slot_set: ReplicaEligibleSlotSet,
confirmed_bank_receiver_service: Option<JoinHandle<()>>,
cleanup_service: Option<JoinHandle<()>>,
exit_updated_slot_server: Arc<AtomicBool>,
}
impl ReplicaSlotConfirmationServer for ReplicaSlotConfirmationServerImpl {
fn get_confirmed_slots(
&self,
request: &accountsdb_repl_server::ReplicaSlotConfirmationRequest,
) -> Result<accountsdb_repl_server::ReplicaSlotConfirmationResponse, tonic::Status> {
let slot_set = self.eligible_slot_set.slot_set.read().unwrap();
let updated_slots: Vec<u64> = slot_set
.iter()
.filter(|(slot, _)| *slot > request.last_replicated_slot)
.map(|(slot, _)| *slot)
.collect();
Ok(accountsdb_repl_server::ReplicaSlotConfirmationResponse { updated_slots })
}
fn join(&mut self) -> thread::Result<()> {
self.exit_updated_slot_server.store(true, Ordering::Relaxed);
self.confirmed_bank_receiver_service
.take()
.map(JoinHandle::join)
.unwrap()
.expect("confirmed_bank_receiver_service");
self.cleanup_service.take().map(JoinHandle::join).unwrap()
}
}
const MAX_ELIGIBLE_SLOT_SET_SIZE: usize = 262144;
impl ReplicaSlotConfirmationServerImpl {
pub fn new(confirmed_bank_receiver: Receiver<BankNotification>) -> Self {
let eligible_slot_set = ReplicaEligibleSlotSet::default();
let exit_updated_slot_server = Arc::new(AtomicBool::new(false));
Self {
eligible_slot_set: eligible_slot_set.clone(),
confirmed_bank_receiver_service: Some(Self::run_confirmed_bank_receiver(
confirmed_bank_receiver,
eligible_slot_set.clone(),
exit_updated_slot_server.clone(),
)),
cleanup_service: Some(Self::run_cleanup_service(
eligible_slot_set,
MAX_ELIGIBLE_SLOT_SET_SIZE,
exit_updated_slot_server.clone(),
)),
exit_updated_slot_server,
}
}
fn run_confirmed_bank_receiver(
confirmed_bank_receiver: Receiver<BankNotification>,
eligible_slot_set: ReplicaEligibleSlotSet,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("confirmed_bank_receiver".to_string())
.spawn(move || {
while !exit.load(Ordering::Relaxed) {
if let Ok(BankNotification::OptimisticallyConfirmed(slot)) =
confirmed_bank_receiver.recv()
{
let mut slot_set = eligible_slot_set.slot_set.write().unwrap();
slot_set.push_back((slot, CommitmentLevel::Confirmed));
}
}
})
.unwrap()
}
fn run_cleanup_service(
eligible_slot_set: ReplicaEligibleSlotSet,
max_set_size: usize,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("cleanup_service".to_string())
.spawn(move || {
while !exit.load(Ordering::Relaxed) {
let mut slot_set = eligible_slot_set.slot_set.write().unwrap();
let count_to_drain = slot_set.len().saturating_sub(max_set_size);
if count_to_drain > 0 {
drop(slot_set.drain(..count_to_drain));
}
drop(slot_set);
sleep(Duration::from_millis(200));
}
})
.unwrap()
}
}

View File

@ -1,2 +0,0 @@
/target/
/farf/

View File

@ -1,43 +0,0 @@
[package]
authors = ["Solana Maintainers <maintainers@solana.foundation>"]
edition = "2021"
name = "solana-replica-node"
description = "Solana replication node"
version = "1.11.0"
repository = "https://github.com/solana-labs/solana"
license = "Apache-2.0"
homepage = "https://solana.com/"
documentation = "https://docs.rs/solana-validator"
[dependencies]
clap = "2.33.1"
crossbeam-channel = "0.5"
log = "0.4.14"
rand = "0.7.0"
solana-clap-utils = { path = "../clap-utils", version = "=1.11.0" }
solana-download-utils = { path = "../download-utils", version = "=1.11.0" }
solana-genesis-utils = { path = "../genesis-utils", version = "=1.11.0" }
solana-gossip = { path = "../gossip", version = "=1.11.0" }
solana-ledger = { path = "../ledger", version = "=1.11.0" }
solana-logger = { path = "../logger", version = "=1.11.0" }
solana-net-utils = { path = "../net-utils", version = "=1.11.0" }
solana-replica-lib = { path = "../replica-lib", version = "=1.11.0" }
solana-rpc = { path = "../rpc", version = "=1.11.0" }
solana-runtime = { path = "../runtime", version = "=1.11.0" }
solana-sdk = { path = "../sdk", version = "=1.11.0" }
solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.11.0" }
solana-streamer = { path = "../streamer", version = "=1.11.0" }
solana-validator = { path = "../validator", version = "=1.11.0" }
solana-version = { path = "../version", version = "=1.11.0" }
[dev-dependencies]
serial_test = "0.6.0"
solana-core = { path = "../core", version = "=1.11.0" }
solana-local-cluster = { path = "../local-cluster", version = "=1.11.0" }
tempfile = "3.3.0"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
[build-dependencies]
tonic-build = "0.6.2"

View File

@ -1,81 +0,0 @@
/// Module responsible for replicating AccountsDb data from its peer to its local AccountsDb in the replica-node
use {
log::*,
solana_replica_lib::accountsdb_repl_client::{
AccountsDbReplClientService, AccountsDbReplClientServiceConfig, ReplicaRpcError,
},
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
};
pub struct AccountsDbReplService {
thread: JoinHandle<()>,
}
impl AccountsDbReplService {
pub fn new(
last_replicated_slot: Slot,
config: AccountsDbReplClientServiceConfig,
) -> Result<Self, ReplicaRpcError> {
let accountsdb_repl_client = AccountsDbReplClientService::new(config)?;
let thread = Builder::new()
.name("sol-accountsdb-repl-svc".to_string())
.spawn(move || {
Self::run_service(last_replicated_slot, accountsdb_repl_client);
})
.unwrap();
Ok(Self { thread })
}
fn replicate_accounts_for_slot(
accountsdb_repl_client: &mut AccountsDbReplClientService,
slot: Slot,
) {
match accountsdb_repl_client.get_slot_accounts(slot) {
Err(err) => {
error!(
"Ran into error getting accounts for slot {:?}, error: {:?}",
slot, err
);
}
Ok(accounts) => {
for account in accounts.iter() {
debug!(
"Received account: {:?}",
Pubkey::new(&account.account_meta.as_ref().unwrap().pubkey)
);
}
}
}
}
fn run_service(
mut last_replicated_slot: Slot,
mut accountsdb_repl_client: AccountsDbReplClientService,
) {
loop {
match accountsdb_repl_client.get_confirmed_slots(last_replicated_slot) {
Ok(slots) => {
info!("Received updated slots: {:?}", slots);
if !slots.is_empty() {
for slot in slots.iter() {
Self::replicate_accounts_for_slot(&mut accountsdb_repl_client, *slot);
}
last_replicated_slot = slots[slots.len() - 1];
}
}
Err(err) => {
error!("Ran into error getting updated slots: {:?}", err);
}
}
sleep(Duration::from_millis(200));
}
}
pub fn join(self) -> thread::Result<()> {
self.thread.join()
}
}

View File

@ -1,5 +0,0 @@
#![allow(clippy::integer_arithmetic)]
pub mod accountsdb_repl_service;
pub mod replica_node;
pub mod replica_util;

View File

@ -1,408 +0,0 @@
//! The main AccountsDb replication node responsible for replicating
//! AccountsDb information from peer a validator or another replica-node.
#![allow(clippy::integer_arithmetic)]
use {
clap::{crate_description, crate_name, value_t, values_t, App, AppSettings, Arg},
log::*,
rand::{seq::SliceRandom, thread_rng},
solana_clap_utils::{
input_parsers::keypair_of,
input_validators::{is_keypair_or_ask_keyword, is_parsable, is_pubkey},
keypair::SKIP_SEED_PHRASE_VALIDATION_ARG,
},
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_replica_node::{
replica_node::{ReplicaNode, ReplicaNodeConfig},
replica_util,
},
solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig},
solana_runtime::accounts_index::AccountSecondaryIndexes,
solana_sdk::{exit::Exit, pubkey::Pubkey, signature::Signer},
solana_streamer::socket::SocketAddrSpace,
solana_validator::port_range_validator,
std::{
collections::HashSet,
env,
net::{IpAddr, SocketAddr},
path::PathBuf,
process::exit,
sync::{Arc, RwLock},
},
};
pub fn main() {
let default_dynamic_port_range =
&format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1);
let matches = App::new(crate_name!())
.about(crate_description!())
.version(solana_version::version!())
.setting(AppSettings::VersionlessSubcommands)
.setting(AppSettings::InferSubcommands)
.arg(
Arg::with_name(SKIP_SEED_PHRASE_VALIDATION_ARG.name)
.long(SKIP_SEED_PHRASE_VALIDATION_ARG.long)
.help(SKIP_SEED_PHRASE_VALIDATION_ARG.help),
)
.arg(
Arg::with_name("ledger_path")
.short("l")
.long("ledger")
.value_name("DIR")
.takes_value(true)
.required(true)
.default_value("ledger")
.help("Use DIR as ledger location"),
)
.arg(
Arg::with_name("snapshots")
.long("snapshots")
.value_name("DIR")
.takes_value(true)
.help("Use DIR as snapshot location [default: --ledger value]"),
)
.arg(
Arg::with_name("peer_address")
.long("peer-address")
.value_name("IP")
.takes_value(true)
.required(true)
.help("The the address for the peer validator/replica to download from"),
)
.arg(
Arg::with_name("peer_rpc_port")
.long("peer-rpc-port")
.value_name("PORT")
.takes_value(true)
.required(true)
.help("The the PORT for the peer validator/replica from which to download the snapshots"),
)
.arg(
Arg::with_name("peer_accountsdb_repl_port")
.long("peer-accountsdb-repl-port")
.value_name("PORT")
.takes_value(true)
.required(true)
.help("The the PORT for the peer validator/replica serving the AccountsDb replication"),
)
.arg(
Arg::with_name("peer_pubkey")
.long("peer-pubkey")
.validator(is_pubkey)
.value_name("The peer validator/replica IDENTITY")
.required(true)
.takes_value(true)
.help("The pubkey for the target validator."),
)
.arg(
Arg::with_name("account_paths")
.long("accounts")
.value_name("PATHS")
.takes_value(true)
.multiple(true)
.help("Comma separated persistent accounts location"),
)
.arg(
Arg::with_name("identity")
.short("i")
.long("identity")
.value_name("KEYPAIR")
.takes_value(true)
.validator(is_keypair_or_ask_keyword)
.help("Replica identity keypair"),
)
.arg(
Arg::with_name("entrypoint")
.short("n")
.long("entrypoint")
.value_name("HOST:PORT")
.takes_value(true)
.multiple(true)
.validator(solana_net_utils::is_host_port)
.help("Rendezvous with the cluster at this gossip entrypoint"),
)
.arg(
Arg::with_name("bind_address")
.long("bind-address")
.value_name("HOST")
.takes_value(true)
.validator(solana_net_utils::is_host)
.default_value("0.0.0.0")
.help("IP address to bind the replica ports"),
)
.arg(
Arg::with_name("rpc_bind_address")
.long("rpc-bind-address")
.value_name("HOST")
.takes_value(true)
.validator(solana_net_utils::is_host)
.help("IP address to bind the Json RPC port [default: use --bind-address]"),
)
.arg(
Arg::with_name("rpc_port")
.long("rpc-port")
.value_name("PORT")
.takes_value(true)
.validator(solana_validator::port_validator)
.help("Enable JSON RPC on this port, and the next port for the RPC websocket"),
)
.arg(
Arg::with_name("dynamic_port_range")
.long("dynamic-port-range")
.value_name("MIN_PORT-MAX_PORT")
.takes_value(true)
.default_value(default_dynamic_port_range)
.validator(port_range_validator)
.help("Range to use for dynamically assigned ports"),
)
.arg(
Arg::with_name("expected_shred_version")
.long("expected-shred-version")
.value_name("VERSION")
.takes_value(true)
.validator(is_parsable::<u16>)
.help("Require the shred version be this value"),
)
.arg(
Arg::with_name("logfile")
.short("o")
.long("log")
.value_name("FILE")
.takes_value(true)
.help(
"Redirect logging to the specified file, '-' for standard error. \
Sending the SIGUSR1 signal to the validator process will cause it \
to re-open the log file",
),
)
.arg(
Arg::with_name("allow_private_addr")
.long("allow-private-addr")
.takes_value(false)
.help("Allow contacting private ip addresses")
.hidden(true),
)
.get_matches();
let bind_address = solana_net_utils::parse_host(matches.value_of("bind_address").unwrap())
.expect("invalid bind_address");
let rpc_bind_address = if let Some(rpc_bind_address) = matches.value_of("rpc_bind_address") {
solana_net_utils::parse_host(rpc_bind_address).expect("invalid rpc_bind_address")
} else {
bind_address
};
let identity_keypair = keypair_of(&matches, "identity").unwrap_or_else(|| {
clap::Error::with_description(
"The --identity <KEYPAIR> argument is required",
clap::ErrorKind::ArgumentNotFound,
)
.exit();
});
let peer_pubkey = value_t!(matches, "peer_pubkey", Pubkey).unwrap();
let entrypoint_addrs = values_t!(matches, "entrypoint", String)
.unwrap_or_default()
.into_iter()
.map(|entrypoint| {
solana_net_utils::parse_host_port(&entrypoint).unwrap_or_else(|e| {
eprintln!("failed to parse entrypoint address: {}", e);
exit(1);
})
})
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let expected_shred_version = value_t!(matches, "expected_shred_version", u16)
.ok()
.or_else(|| replica_util::get_cluster_shred_version(&entrypoint_addrs));
let gossip_host: IpAddr = matches
.value_of("gossip_host")
.map(|gossip_host| {
solana_net_utils::parse_host(gossip_host).unwrap_or_else(|err| {
eprintln!("Failed to parse --gossip-host: {}", err);
exit(1);
})
})
.unwrap_or_else(|| {
if !entrypoint_addrs.is_empty() {
let mut order: Vec<_> = (0..entrypoint_addrs.len()).collect();
order.shuffle(&mut thread_rng());
let gossip_host = order.into_iter().find_map(|i| {
let entrypoint_addr = &entrypoint_addrs[i];
info!(
"Contacting {} to determine the validator's public IP address",
entrypoint_addr
);
solana_net_utils::get_public_ip_addr(entrypoint_addr).map_or_else(
|err| {
eprintln!(
"Failed to contact cluster entrypoint {}: {}",
entrypoint_addr, err
);
None
},
Some,
)
});
gossip_host.unwrap_or_else(|| {
eprintln!("Unable to determine the validator's public IP address");
exit(1);
})
} else {
std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))
}
});
let gossip_addr = SocketAddr::new(
gossip_host,
value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| {
solana_net_utils::find_available_port_in_range(bind_address, (0, 1)).unwrap_or_else(
|err| {
eprintln!("Unable to find an available gossip port: {}", err);
exit(1);
},
)
}),
);
let dynamic_port_range =
solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap())
.expect("invalid dynamic_port_range");
let cluster_entrypoints = entrypoint_addrs
.iter()
.map(ContactInfo::new_gossip_entry_point)
.collect::<Vec<_>>();
let node = Node::new_with_external_ip(
&identity_keypair.pubkey(),
&gossip_addr,
dynamic_port_range,
bind_address,
None,
);
let ledger_path = PathBuf::from(matches.value_of("ledger_path").unwrap());
let snapshot_archives_dir = if let Some(snapshots) = matches.value_of("snapshots") {
PathBuf::from(snapshots)
} else {
ledger_path.clone()
};
let bank_snapshots_dir = snapshot_archives_dir.join("snapshot");
let account_paths: Vec<PathBuf> =
if let Ok(account_paths) = values_t!(matches, "account_paths", String) {
account_paths
.join(",")
.split(',')
.map(PathBuf::from)
.collect()
} else {
vec![ledger_path.join("accounts")]
};
let peer_address = solana_net_utils::parse_host(matches.value_of("peer_address").unwrap())
.expect("invalid peer_address");
let peer_rpc_port = value_t!(matches, "peer_rpc_port", u16).unwrap_or_else(|_| {
clap::Error::with_description(
"The --peer-rpc-port <PORT> argument is required",
clap::ErrorKind::ArgumentNotFound,
)
.exit();
});
let rpc_peer_addr = SocketAddr::new(peer_address, peer_rpc_port);
let peer_accountsdb_repl_port = value_t!(matches, "peer_accountsdb_repl_port", u16)
.unwrap_or_else(|_| {
clap::Error::with_description(
"The --peer-accountsdb-repl-port <PORT> argument is required",
clap::ErrorKind::ArgumentNotFound,
)
.exit();
});
let accountsdb_repl_peer_addr = SocketAddr::new(peer_address, peer_accountsdb_repl_port);
let rpc_port = value_t!(matches, "rpc_port", u16).unwrap_or_else(|_| {
clap::Error::with_description(
"The --rpc-port <PORT> argument is required",
clap::ErrorKind::ArgumentNotFound,
)
.exit();
});
let rpc_addrs = (
SocketAddr::new(rpc_bind_address, rpc_port),
SocketAddr::new(rpc_bind_address, rpc_port + 1),
// If additional ports are added, +2 needs to be skipped to avoid a conflict with
// the websocket port (which is +2) in web3.js This odd port shifting is tracked at
// https://github.com/solana-labs/solana/issues/12250
);
let logfile = {
let logfile = matches
.value_of("logfile")
.map(|s| s.into())
.unwrap_or_else(|| format!("solana-replica-node-{}.log", identity_keypair.pubkey()));
if logfile == "-" {
None
} else {
println!("log file: {}", logfile);
Some(logfile)
}
};
let socket_addr_space = SocketAddrSpace::new(matches.is_present("allow_private_addr"));
let _logger_thread = solana_validator::redirect_stderr_to_file(logfile);
let (cluster_info, rpc_contact_info, snapshot_info) = replica_util::get_rpc_peer_info(
identity_keypair,
&cluster_entrypoints,
&ledger_path,
&node,
expected_shred_version,
&peer_pubkey,
&snapshot_archives_dir,
socket_addr_space,
);
info!(
"Using RPC service from node {}: {:?}, snapshot_info: {:?}",
rpc_contact_info.id, rpc_contact_info.rpc, snapshot_info
);
let config = ReplicaNodeConfig {
rpc_peer_addr,
accountsdb_repl_peer_addr: Some(accountsdb_repl_peer_addr),
rpc_addr: rpc_addrs.0,
rpc_pubsub_addr: rpc_addrs.1,
ledger_path,
snapshot_archives_dir,
bank_snapshots_dir,
account_paths,
snapshot_info: snapshot_info.unwrap(),
cluster_info,
rpc_config: JsonRpcConfig::default(),
snapshot_config: None,
pubsub_config: PubSubConfig::default(),
socket_addr_space,
account_indexes: AccountSecondaryIndexes::default(),
accounts_db_caching_enabled: false,
replica_exit: Arc::new(RwLock::new(Exit::default())),
};
let replica = ReplicaNode::new(config);
replica.join();
}

View File

@ -1,346 +0,0 @@
use {
crate::accountsdb_repl_service::AccountsDbReplService,
crossbeam_channel::unbounded,
log::*,
solana_download_utils::download_snapshot_archive,
solana_genesis_utils::download_then_check_genesis_hash,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::{
blockstore::Blockstore, blockstore_db::BlockstoreOptions, blockstore_processor,
leader_schedule_cache::LeaderScheduleCache,
},
solana_replica_lib::accountsdb_repl_client::AccountsDbReplClientServiceConfig,
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
},
rpc::JsonRpcConfig,
rpc_pubsub_service::{PubSubConfig, PubSubService},
rpc_service::JsonRpcService,
rpc_subscriptions::RpcSubscriptions,
},
solana_runtime::{
accounts_index::AccountSecondaryIndexes, bank_forks::BankForks,
commitment::BlockCommitmentCache, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
snapshot_config::SnapshotConfig, snapshot_package::SnapshotType, snapshot_utils,
},
solana_sdk::{clock::Slot, exit::Exit, genesis_config::GenesisConfig, hash::Hash},
solana_send_transaction_service::send_transaction_service,
solana_streamer::socket::SocketAddrSpace,
std::{
fs,
net::SocketAddr,
path::PathBuf,
sync::{
atomic::{AtomicBool, AtomicU64},
Arc, RwLock,
},
},
};
pub struct ReplicaNodeConfig {
pub rpc_peer_addr: SocketAddr,
pub accountsdb_repl_peer_addr: Option<SocketAddr>,
pub rpc_addr: SocketAddr,
pub rpc_pubsub_addr: SocketAddr,
pub ledger_path: PathBuf,
pub snapshot_archives_dir: PathBuf,
pub bank_snapshots_dir: PathBuf,
pub account_paths: Vec<PathBuf>,
pub snapshot_info: (Slot, Hash),
pub cluster_info: Arc<ClusterInfo>,
pub rpc_config: JsonRpcConfig,
pub snapshot_config: Option<SnapshotConfig>,
pub pubsub_config: PubSubConfig,
pub account_indexes: AccountSecondaryIndexes,
pub accounts_db_caching_enabled: bool,
pub replica_exit: Arc<RwLock<Exit>>,
pub socket_addr_space: SocketAddrSpace,
}
pub struct ReplicaNode {
json_rpc_service: Option<JsonRpcService>,
pubsub_service: Option<PubSubService>,
optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
accountsdb_repl_service: Option<AccountsDbReplService>,
}
// Struct maintaining information about banks
struct ReplicaBankInfo {
bank_forks: Arc<RwLock<BankForks>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
}
// Initialize the replica by downloading snapshot from the peer, initialize
// the BankForks, OptimisticallyConfirmedBank, LeaderScheduleCache and
// BlockCommitmentCache and return the info wrapped as ReplicaBankInfo.
fn initialize_from_snapshot(
replica_config: &ReplicaNodeConfig,
snapshot_config: &SnapshotConfig,
genesis_config: &GenesisConfig,
) -> ReplicaBankInfo {
info!(
"Downloading snapshot from the peer into {:?}",
replica_config.snapshot_archives_dir
);
download_snapshot_archive(
&replica_config.rpc_peer_addr,
&replica_config.snapshot_archives_dir,
replica_config.snapshot_info,
SnapshotType::FullSnapshot,
snapshot_config.maximum_full_snapshot_archives_to_retain,
snapshot_config.maximum_incremental_snapshot_archives_to_retain,
false,
&mut None,
)
.unwrap();
fs::create_dir_all(&snapshot_config.bank_snapshots_dir)
.expect("Couldn't create bank snapshot directory");
let archive_info = snapshot_utils::get_highest_full_snapshot_archive_info(
&replica_config.snapshot_archives_dir,
)
.unwrap();
let process_options = blockstore_processor::ProcessOptions {
account_indexes: replica_config.account_indexes.clone(),
accounts_db_caching_enabled: replica_config.accounts_db_caching_enabled,
..blockstore_processor::ProcessOptions::default()
};
info!(
"Build bank from snapshot archive: {:?}",
&snapshot_config.bank_snapshots_dir
);
let (bank0, _) = snapshot_utils::bank_from_snapshot_archives(
&replica_config.account_paths,
&snapshot_config.bank_snapshots_dir,
&archive_info,
None,
genesis_config,
process_options.debug_keys.clone(),
None,
process_options.account_indexes.clone(),
process_options.accounts_db_caching_enabled,
process_options.limit_load_slot_count_from_snapshot,
process_options.shrink_ratio,
process_options.accounts_db_test_hash_calculation,
false,
process_options.verify_index,
process_options.accounts_db_config,
None,
)
.unwrap();
let bank0_slot = bank0.slot();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut block_commitment_cache = BlockCommitmentCache::default();
block_commitment_cache.initialize_slots(bank0_slot, bank0_slot);
let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
ReplicaBankInfo {
bank_forks,
optimistically_confirmed_bank,
leader_schedule_cache,
block_commitment_cache,
}
}
fn start_client_rpc_services(
replica_config: &ReplicaNodeConfig,
genesis_config: &GenesisConfig,
cluster_info: Arc<ClusterInfo>,
bank_info: &ReplicaBankInfo,
socket_addr_space: &SocketAddrSpace,
) -> (
Option<JsonRpcService>,
Option<PubSubService>,
Option<OptimisticallyConfirmedBankTracker>,
) {
let ReplicaBankInfo {
bank_forks,
optimistically_confirmed_bank,
leader_schedule_cache,
block_commitment_cache,
} = bank_info;
let blockstore = Arc::new(
Blockstore::open_with_options(
&replica_config.ledger_path,
BlockstoreOptions {
enforce_ulimit_nofile: false,
..BlockstoreOptions::default()
},
)
.unwrap(),
);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(0));
let max_slots = Arc::new(MaxSlots::default());
let exit = Arc::new(AtomicBool::new(false));
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
max_complete_transaction_status_slot.clone(),
blockstore.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
optimistically_confirmed_bank.clone(),
));
let rpc_override_health_check = Arc::new(AtomicBool::new(false));
if ContactInfo::is_valid_address(&replica_config.rpc_addr, socket_addr_space) {
assert!(ContactInfo::is_valid_address(
&replica_config.rpc_pubsub_addr,
socket_addr_space
));
} else {
assert!(!ContactInfo::is_valid_address(
&replica_config.rpc_pubsub_addr,
socket_addr_space
));
}
let (trigger, pubsub_service) = PubSubService::new(
replica_config.pubsub_config.clone(),
&subscriptions,
replica_config.rpc_pubsub_addr,
);
replica_config
.replica_exit
.write()
.unwrap()
.register_exit(Box::new(move || trigger.cancel()));
let (_bank_notification_sender, bank_notification_receiver) = unbounded();
(
Some(JsonRpcService::new(
replica_config.rpc_addr,
replica_config.rpc_config.clone(),
replica_config.snapshot_config.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore,
cluster_info,
None,
genesis_config.hash(),
&replica_config.ledger_path,
replica_config.replica_exit.clone(),
None,
rpc_override_health_check,
optimistically_confirmed_bank.clone(),
send_transaction_service::Config {
retry_rate_ms: 0,
leader_forward_count: 0,
..send_transaction_service::Config::default()
},
max_slots,
leader_schedule_cache.clone(),
max_complete_transaction_status_slot,
)),
Some(pubsub_service),
Some(OptimisticallyConfirmedBankTracker::new(
bank_notification_receiver,
&exit,
bank_forks.clone(),
optimistically_confirmed_bank.clone(),
subscriptions,
None,
)),
)
}
impl ReplicaNode {
pub fn new(replica_config: ReplicaNodeConfig) -> Self {
let genesis_config = download_then_check_genesis_hash(
&replica_config.rpc_peer_addr,
&replica_config.ledger_path,
None,
MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
false,
true,
)
.unwrap();
let snapshot_config = SnapshotConfig {
full_snapshot_archive_interval_slots: Slot::MAX,
incremental_snapshot_archive_interval_slots: Slot::MAX,
snapshot_archives_dir: replica_config.snapshot_archives_dir.clone(),
bank_snapshots_dir: replica_config.bank_snapshots_dir.clone(),
..SnapshotConfig::default()
};
let bank_info =
initialize_from_snapshot(&replica_config, &snapshot_config, &genesis_config);
let (json_rpc_service, pubsub_service, optimistically_confirmed_bank_tracker) =
start_client_rpc_services(
&replica_config,
&genesis_config,
replica_config.cluster_info.clone(),
&bank_info,
&replica_config.socket_addr_space,
);
let accountsdb_repl_client_config = AccountsDbReplClientServiceConfig {
worker_threads: 1,
replica_server_addr: replica_config.accountsdb_repl_peer_addr.unwrap(),
};
let last_replicated_slot = bank_info.bank_forks.read().unwrap().root_bank().slot();
info!(
"Starting AccountsDbReplService from slot {:?}",
last_replicated_slot
);
let accountsdb_repl_service = Some(
AccountsDbReplService::new(last_replicated_slot, accountsdb_repl_client_config)
.expect("Failed to start AccountsDb replication service"),
);
info!(
"Started AccountsDbReplService from slot {:?}",
last_replicated_slot
);
ReplicaNode {
json_rpc_service,
pubsub_service,
optimistically_confirmed_bank_tracker,
accountsdb_repl_service,
}
}
pub fn join(self) {
if let Some(json_rpc_service) = self.json_rpc_service {
json_rpc_service.join().expect("rpc_service");
}
if let Some(pubsub_service) = self.pubsub_service {
pubsub_service.join().expect("pubsub_service");
}
if let Some(optimistically_confirmed_bank_tracker) =
self.optimistically_confirmed_bank_tracker
{
optimistically_confirmed_bank_tracker
.join()
.expect("optimistically_confirmed_bank_tracker");
}
if let Some(accountsdb_repl_service) = self.accountsdb_repl_service {
accountsdb_repl_service
.join()
.expect("accountsdb_repl_service");
}
}
}

View File

@ -1,276 +0,0 @@
use {
log::*,
rand::{seq::SliceRandom, thread_rng, Rng},
solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
gossip_service::GossipService,
},
solana_runtime::{snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_utils},
solana_sdk::{
clock::Slot,
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
std::{
collections::HashSet,
net::{SocketAddr, UdpSocket},
path::Path,
process::exit,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::sleep,
time::{Duration, Instant},
},
};
pub fn get_cluster_shred_version(entrypoints: &[SocketAddr]) -> Option<u16> {
let entrypoints = {
let mut index: Vec<_> = (0..entrypoints.len()).collect();
index.shuffle(&mut rand::thread_rng());
index.into_iter().map(|i| &entrypoints[i])
};
for entrypoint in entrypoints {
match solana_net_utils::get_cluster_shred_version(entrypoint) {
Err(err) => eprintln!("get_cluster_shred_version failed: {}, {}", entrypoint, err),
Ok(0) => eprintln!("zero sherd-version from entrypoint: {}", entrypoint),
Ok(shred_version) => {
info!(
"obtained shred-version {} from {}",
shred_version, entrypoint
);
return Some(shred_version);
}
}
}
None
}
// Discover the RPC peer node via Gossip and return's ContactInfo
// And the initial snapshot info: (Slot, Hash)
// Alternatively, this can be solved via a RPC call instead of using gossip.
fn get_rpc_peer_node(
cluster_info: &ClusterInfo,
cluster_entrypoints: &[ContactInfo],
expected_shred_version: Option<u16>,
peer_pubkey: &Pubkey,
snapshot_archives_dir: &Path,
) -> Option<(ContactInfo, Option<(Slot, Hash)>)> {
let mut newer_cluster_snapshot_timeout = None;
let mut retry_reason = None;
loop {
sleep(Duration::from_secs(1));
info!("Searching for the rpc peer node and latest snapshot information with shred_version {:?}.", expected_shred_version);
info!("\n{}", cluster_info.rpc_info_trace());
let shred_version =
expected_shred_version.unwrap_or_else(|| cluster_info.my_shred_version());
if shred_version == 0 {
let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| {
cluster_info
.lookup_contact_info_by_gossip_addr(&cluster_entrypoint.gossip)
.map_or(false, |entrypoint| entrypoint.shred_version == 0)
});
if all_zero_shred_versions {
eprintln!(
"Entrypoint shred version is zero. Restart with --expected-shred-version"
);
exit(1);
}
info!("Waiting to adopt entrypoint shred version...");
continue;
}
info!(
"Searching for an RPC service with shred version {}{}...",
shred_version,
retry_reason
.as_ref()
.map(|s| format!(" (Retrying: {})", s))
.unwrap_or_default()
);
let rpc_peers = cluster_info
.all_rpc_peers()
.into_iter()
.filter(|contact_info| contact_info.shred_version == shred_version)
.collect::<Vec<_>>();
let rpc_peers_total = rpc_peers.len();
let rpc_known_peers = rpc_peers
.iter()
.filter(|rpc_peer| &rpc_peer.id == peer_pubkey)
.count();
info!(
"Total {} RPC nodes found. {} known",
rpc_peers_total, rpc_known_peers
);
let mut highest_snapshot_info: Option<(Slot, Hash)> =
snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_archives_dir).map(
|snapshot_archive_info| {
(snapshot_archive_info.slot(), *snapshot_archive_info.hash())
},
);
let eligible_rpc_peers = {
let mut eligible_rpc_peers = vec![];
for rpc_peer in rpc_peers.iter() {
if &rpc_peer.id != peer_pubkey {
continue;
}
cluster_info.get_snapshot_hash_for_node(&rpc_peer.id, |snapshot_hashes| {
for snapshot_hash in snapshot_hashes {
if highest_snapshot_info.is_none()
|| snapshot_hash.0 > highest_snapshot_info.unwrap().0
{
// Found a higher snapshot, remove all nodes with a lower snapshot
eligible_rpc_peers.clear();
highest_snapshot_info = Some(*snapshot_hash)
}
if Some(*snapshot_hash) == highest_snapshot_info {
eligible_rpc_peers.push(rpc_peer.clone());
}
}
});
}
match highest_snapshot_info {
None => {
assert!(eligible_rpc_peers.is_empty());
}
Some(highest_snapshot_info) => {
if eligible_rpc_peers.is_empty() {
match newer_cluster_snapshot_timeout {
None => newer_cluster_snapshot_timeout = Some(Instant::now()),
Some(newer_cluster_snapshot_timeout) => {
if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 {
warn!("giving up newer snapshot from the cluster");
return None;
}
}
}
retry_reason = Some(format!(
"Wait for newer snapshot than local: {:?}",
highest_snapshot_info
));
continue;
}
info!(
"Highest available snapshot slot is {}, available from {} node{}: {:?}",
highest_snapshot_info.0,
eligible_rpc_peers.len(),
if eligible_rpc_peers.len() > 1 {
"s"
} else {
""
},
eligible_rpc_peers
.iter()
.map(|contact_info| contact_info.id)
.collect::<Vec<_>>()
);
}
}
eligible_rpc_peers
};
if !eligible_rpc_peers.is_empty() {
let contact_info =
&eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())];
return Some((contact_info.clone(), highest_snapshot_info));
} else {
retry_reason = Some("No snapshots available".to_owned());
}
}
}
fn start_gossip_node(
identity_keypair: Arc<Keypair>,
cluster_entrypoints: &[ContactInfo],
ledger_path: &Path,
gossip_addr: &SocketAddr,
gossip_socket: UdpSocket,
expected_shred_version: Option<u16>,
gossip_validators: Option<HashSet<Pubkey>>,
should_check_duplicate_instance: bool,
socket_addr_space: SocketAddrSpace,
) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
let contact_info = ClusterInfo::gossip_contact_info(
identity_keypair.pubkey(),
*gossip_addr,
expected_shred_version.unwrap_or(0),
);
let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
cluster_info.restore_contact_info(ledger_path, 0);
let cluster_info = Arc::new(cluster_info);
let gossip_exit_flag = Arc::new(AtomicBool::new(false));
let gossip_service = GossipService::new(
&cluster_info,
None,
gossip_socket,
gossip_validators,
should_check_duplicate_instance,
None,
&gossip_exit_flag,
);
info!("Started gossip node");
info!(
"The cluster contact info:\n{}",
cluster_info.contact_info_trace()
);
(cluster_info, gossip_exit_flag, gossip_service)
}
// Get the RPC peer info given the peer's Pubkey
// Returns the ClusterInfo, the peer's ContactInfo and the initial snapshot info
pub fn get_rpc_peer_info(
identity_keypair: Keypair,
cluster_entrypoints: &[ContactInfo],
ledger_path: &Path,
node: &Node,
expected_shred_version: Option<u16>,
peer_pubkey: &Pubkey,
snapshot_archives_dir: &Path,
socket_addr_space: SocketAddrSpace,
) -> (Arc<ClusterInfo>, ContactInfo, Option<(Slot, Hash)>) {
let identity_keypair = Arc::new(identity_keypair);
let gossip = start_gossip_node(
identity_keypair,
cluster_entrypoints,
ledger_path,
&node.info.gossip,
node.sockets.gossip.try_clone().unwrap(),
expected_shred_version,
None,
true,
socket_addr_space,
);
let rpc_node_details = get_rpc_peer_node(
&gossip.0,
cluster_entrypoints,
expected_shred_version,
peer_pubkey,
snapshot_archives_dir,
);
let rpc_node_details = rpc_node_details.unwrap();
// We no longer need the gossip node, stop it:
let gossip_exit_flag = gossip.1;
gossip_exit_flag.store(true, Ordering::Relaxed);
(gossip.0, rpc_node_details.0, rpc_node_details.1)
}

View File

@ -1,299 +0,0 @@
#![allow(clippy::integer_arithmetic)]
use {
log::*,
serial_test::serial,
solana_core::validator::ValidatorConfig,
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_local_cluster::{
cluster::Cluster,
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::*,
},
solana_replica_lib::accountsdb_repl_server::AccountsDbReplServiceConfig,
solana_replica_node::{
replica_node::{ReplicaNode, ReplicaNodeConfig},
replica_util,
},
solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig},
solana_runtime::{
accounts_index::AccountSecondaryIndexes, snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig, snapshot_utils,
},
solana_sdk::{
client::SyncClient,
clock::Slot,
commitment_config::CommitmentConfig,
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
exit::Exit,
hash::Hash,
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
sync::{Arc, RwLock},
thread::sleep,
time::Duration,
},
tempfile::TempDir,
};
const RUST_LOG_FILTER: &str =
"error,solana_core::replay_stage=warn,solana_local_cluster=info,local_cluster=info";
fn wait_for_next_snapshot(
cluster: &LocalCluster,
snapshot_archives_dir: &Path,
) -> (PathBuf, (Slot, Hash)) {
// Get slot after which this was generated
let client = cluster
.get_validator_client(&cluster.entry_point_info.id)
.unwrap();
let last_slot = client
.get_slot_with_commitment(CommitmentConfig::processed())
.expect("Couldn't get slot");
// Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot
// must include the transactions just pushed
trace!(
"Waiting for snapshot archive to be generated with slot > {}",
last_slot
);
loop {
if let Some(full_snapshot_archive_info) =
snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_archives_dir)
{
trace!(
"full snapshot for slot {} exists",
full_snapshot_archive_info.slot()
);
if full_snapshot_archive_info.slot() >= last_slot {
return (
full_snapshot_archive_info.path().clone(),
(
full_snapshot_archive_info.slot(),
*full_snapshot_archive_info.hash(),
),
);
}
trace!(
"full snapshot slot {} < last_slot {}",
full_snapshot_archive_info.slot(),
last_slot
);
}
sleep(Duration::from_millis(1000));
}
}
fn farf_dir() -> PathBuf {
std::env::var("FARF_DIR")
.unwrap_or_else(|_| "farf".to_string())
.into()
}
fn generate_account_paths(num_account_paths: usize) -> (Vec<TempDir>, Vec<PathBuf>) {
let account_storage_dirs: Vec<TempDir> = (0..num_account_paths)
.map(|_| tempfile::tempdir_in(farf_dir()).unwrap())
.collect();
let account_storage_paths: Vec<_> = account_storage_dirs
.iter()
.map(|a| a.path().to_path_buf())
.collect();
(account_storage_dirs, account_storage_paths)
}
struct SnapshotValidatorConfig {
_snapshot_dir: TempDir,
snapshot_archives_dir: TempDir,
account_storage_dirs: Vec<TempDir>,
validator_config: ValidatorConfig,
}
fn setup_snapshot_validator_config(
snapshot_interval_slots: u64,
num_account_paths: usize,
) -> SnapshotValidatorConfig {
// Create the snapshot config
let bank_snapshots_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let snapshot_archives_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let snapshot_config = SnapshotConfig {
full_snapshot_archive_interval_slots: snapshot_interval_slots,
incremental_snapshot_archive_interval_slots: Slot::MAX,
snapshot_archives_dir: snapshot_archives_dir.path().to_path_buf(),
bank_snapshots_dir: bank_snapshots_dir.path().to_path_buf(),
..SnapshotConfig::default()
};
// Create the account paths
let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths);
let bind_ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let accountsdb_repl_port =
solana_net_utils::find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap();
let replica_server_addr = SocketAddr::new(bind_ip_addr, accountsdb_repl_port);
let accountsdb_repl_service_config = Some(AccountsDbReplServiceConfig {
worker_threads: 1,
replica_server_addr,
});
// Create the validator config
let validator_config = ValidatorConfig {
snapshot_config: Some(snapshot_config),
account_paths: account_storage_paths,
accounts_hash_interval_slots: snapshot_interval_slots,
accountsdb_repl_service_config,
..ValidatorConfig::default_for_test()
};
SnapshotValidatorConfig {
_snapshot_dir: bank_snapshots_dir,
snapshot_archives_dir,
account_storage_dirs,
validator_config,
}
}
fn test_local_cluster_start_and_exit_with_config(socket_addr_space: SocketAddrSpace) {
solana_logger::setup();
const NUM_NODES: usize = 1;
let mut config = ClusterConfig {
validator_configs: make_identical_validator_configs(
&ValidatorConfig::default_for_test(),
NUM_NODES,
),
node_stakes: vec![3; NUM_NODES],
cluster_lamports: 100,
ticks_per_slot: 8,
slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH as u64,
stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH as u64,
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&mut config, socket_addr_space);
assert_eq!(cluster.validators.len(), NUM_NODES);
}
#[test]
#[serial]
fn test_replica_bootstrap() {
let socket_addr_space = SocketAddrSpace::new(true);
test_local_cluster_start_and_exit_with_config(socket_addr_space);
solana_logger::setup_with_default(RUST_LOG_FILTER);
// First set up the cluster with 1 node
let snapshot_interval_slots = 50;
let num_account_paths = 3;
let leader_snapshot_test_config =
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
info!(
"Snapshot config for the leader: accounts: {:?}, snapshot: {:?}",
leader_snapshot_test_config.account_storage_dirs,
leader_snapshot_test_config.snapshot_archives_dir
);
let stake = 10_000;
let mut config = ClusterConfig {
node_stakes: vec![stake],
cluster_lamports: 1_000_000,
validator_configs: make_identical_validator_configs(
&leader_snapshot_test_config.validator_config,
1,
),
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&mut config, socket_addr_space);
assert_eq!(cluster.validators.len(), 1);
let contact_info = &cluster.entry_point_info;
info!("Contact info: {:?}", contact_info);
// Get slot after which this was generated
let snapshot_archives_dir = &leader_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.snapshot_archives_dir;
info!("Waiting for snapshot");
let (archive_filename, archive_snapshot_hash) =
wait_for_next_snapshot(&cluster, snapshot_archives_dir);
info!("found: {:?}", archive_filename);
let identity_keypair = Keypair::new();
// now bring up a replica to talk to it.
let ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let port = solana_net_utils::find_available_port_in_range(ip_addr, (8101, 8200)).unwrap();
let rpc_addr = SocketAddr::new(ip_addr, port);
let port = solana_net_utils::find_available_port_in_range(ip_addr, (8201, 8300)).unwrap();
let rpc_pubsub_addr = SocketAddr::new(ip_addr, port);
let ledger_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let ledger_path = ledger_dir.path();
let snapshot_output_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let snapshot_archives_dir = snapshot_output_dir.path();
let bank_snapshots_dir = snapshot_archives_dir.join("snapshot");
let account_paths: Vec<PathBuf> = vec![ledger_path.join("accounts")];
let port = solana_net_utils::find_available_port_in_range(ip_addr, (8301, 8400)).unwrap();
let gossip_addr = SocketAddr::new(ip_addr, port);
let dynamic_port_range = solana_net_utils::parse_port_range("8401-8500").unwrap();
let bind_address = solana_net_utils::parse_host("127.0.0.1").unwrap();
let node = Node::new_with_external_ip(
&identity_keypair.pubkey(),
&gossip_addr,
dynamic_port_range,
bind_address,
None,
);
info!("The peer id: {:?}", &contact_info.id);
let entry_points = vec![ContactInfo::new_gossip_entry_point(&contact_info.gossip)];
let (cluster_info, _rpc_contact_info, _snapshot_info) = replica_util::get_rpc_peer_info(
identity_keypair,
&entry_points,
ledger_path,
&node,
None,
&contact_info.id,
snapshot_archives_dir,
socket_addr_space,
);
info!("The cluster info:\n{:?}", cluster_info.contact_info_trace());
let config = ReplicaNodeConfig {
rpc_peer_addr: contact_info.rpc,
accountsdb_repl_peer_addr: Some(
leader_snapshot_test_config
.validator_config
.accountsdb_repl_service_config
.unwrap()
.replica_server_addr,
),
rpc_addr,
rpc_pubsub_addr,
ledger_path: ledger_path.to_path_buf(),
snapshot_archives_dir: snapshot_archives_dir.to_path_buf(),
bank_snapshots_dir,
account_paths,
snapshot_info: archive_snapshot_hash,
cluster_info,
rpc_config: JsonRpcConfig::default(),
snapshot_config: None,
pubsub_config: PubSubConfig::default(),
socket_addr_space,
account_indexes: AccountSecondaryIndexes::default(),
accounts_db_caching_enabled: false,
replica_exit: Arc::new(RwLock::new(Exit::default())),
};
let _replica_node = ReplicaNode::new(config);
}

View File

@ -2644,7 +2644,7 @@ pub mod rpc_minimal {
}
// RPC interface that only depends on immediate Bank data
// Expected to be provided by both API nodes and (future) accounts replica nodes
// Expected to be provided by API nodes
pub mod rpc_bank {
use super::*;
#[rpc]

View File

@ -43,7 +43,6 @@ solana-metrics = { path = "../metrics", version = "=1.11.0" }
solana-net-utils = { path = "../net-utils", version = "=1.11.0" }
solana-perf = { path = "../perf", version = "=1.11.0" }
solana-poh = { path = "../poh", version = "=1.11.0" }
solana-replica-lib = { path = "../replica-lib", version = "=1.11.0" }
solana-rpc = { path = "../rpc", version = "=1.11.0" }
solana-runtime = { path = "../runtime", version = "=1.11.0" }
solana-sdk = { path = "../sdk", version = "=1.11.0" }

View File

@ -36,7 +36,6 @@ use {
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_perf::recycler::enable_recycler_warming,
solana_poh::poh_service,
solana_replica_lib::accountsdb_repl_server::AccountsDbReplServiceConfig,
solana_rpc::{
rpc::{JsonRpcConfig, RpcBigtableConfig},
rpc_pubsub_service::PubSubConfig,
@ -2268,26 +2267,6 @@ pub fn main() {
}
let accounts_db_config = Some(accounts_db_config);
let accountsdb_repl_service_config = if matches.is_present("enable_accountsdb_repl") {
let accountsdb_repl_bind_address = if matches.is_present("accountsdb_repl_bind_address") {
solana_net_utils::parse_host(matches.value_of("accountsdb_repl_bind_address").unwrap())
.expect("invalid accountsdb_repl_bind_address")
} else {
bind_address
};
let accountsdb_repl_port = value_t_or_exit!(matches, "accountsdb_repl_port", u16);
Some(AccountsDbReplServiceConfig {
worker_threads: value_t_or_exit!(matches, "accountsdb_repl_threads", usize),
replica_server_addr: SocketAddr::new(
accountsdb_repl_bind_address,
accountsdb_repl_port,
),
})
} else {
None
};
let geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
Some(
values_t_or_exit!(matches, "geyser_plugin_config", String)
@ -2368,7 +2347,6 @@ pub fn main() {
account_indexes: account_indexes.clone(),
rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"),
},
accountsdb_repl_service_config,
geyser_plugin_config_files,
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
(