From 8378e8790f8fea9227668b6bbc5791c83bae8977 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 1 Sep 2021 14:10:16 -0700 Subject: [PATCH] Accountsdb replication installment 2 (#19325) This is the 2nd installment for the AccountsDb replication. Summary of Changes The basic google protocol buffer protocol for replicating updated slots and accounts. tonic/tokio is used for transporting the messages. The basic framework of the client and server for replicating slots and accounts -- the persisting of accounts in the replica-side will be done at the next PR -- right now -- the accounts are streamed to the replica-node and dumped. Replication for information about Bank is also not done in this PR -- to be addressed in the next PR to limit the change size. Functionality used by both the client and server side are encapsulated in the replica-lib crate. There is no impact to the existing validator by default. Tests: Observe the confirmed slots replicated to the replica-node. Observe the accounts for the confirmed slot are received at the replica-node side. --- Cargo.lock | 30 +++ Cargo.toml | 1 + core/Cargo.toml | 1 + core/src/validator.rs | 27 ++- docs/src/proposals/accounts-db-replication.md | 14 +- local-cluster/src/validator_configs.rs | 1 + replica-lib/Cargo.toml | 34 ++++ replica-lib/build.rs | 5 + replica-lib/proto/accountsdb_repl.proto | 44 +++++ replica-lib/src/accountsdb_repl_client.rs | 119 ++++++++++++ replica-lib/src/accountsdb_repl_server.rs | 176 ++++++++++++++++++ .../src/accountsdb_repl_server_factory.rs | 29 +++ replica-lib/src/lib.rs | 7 + replica-lib/src/replica_accounts_server.rs | 95 ++++++++++ .../src/replica_confirmed_slots_server.rs | 119 ++++++++++++ replica-node/Cargo.toml | 7 + replica-node/src/accountsdb_repl_service.rs | 81 ++++++++ replica-node/src/lib.rs | 3 + replica-node/src/main.rs | 56 ++++-- replica-node/src/replica_node.rs | 37 +++- replica-node/src/replica_util.rs | 9 +- replica-node/tests/local_replica.rs | 21 ++- .../optimistically_confirmed_bank_tracker.rs | 31 +++ rpc/src/rpc.rs | 4 + rpc/src/rpc_subscriptions.rs | 8 + runtime/src/accounts_cache.rs | 3 + validator/Cargo.toml | 1 + validator/src/main.rs | 58 ++++++ 28 files changed, 994 insertions(+), 27 deletions(-) create mode 100644 replica-lib/Cargo.toml create mode 100644 replica-lib/build.rs create mode 100644 replica-lib/proto/accountsdb_repl.proto create mode 100644 replica-lib/src/accountsdb_repl_client.rs create mode 100644 replica-lib/src/accountsdb_repl_server.rs create mode 100644 replica-lib/src/accountsdb_repl_server_factory.rs create mode 100644 replica-lib/src/lib.rs create mode 100644 replica-lib/src/replica_accounts_server.rs create mode 100644 replica-lib/src/replica_confirmed_slots_server.rs create mode 100644 replica-node/src/accountsdb_repl_service.rs diff --git a/Cargo.lock b/Cargo.lock index 9972030a0..37e0d40ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4748,6 +4748,7 @@ dependencies = [ "solana-program-runtime", "solana-program-test", "solana-rayon-threadlimit", + "solana-replica-lib", "solana-rpc", "solana-runtime", "solana-sdk", @@ -5522,6 +5523,29 @@ dependencies = [ "uriparse", ] +[[package]] +name = "solana-replica-lib" +version = "1.8.0" +dependencies = [ + "bincode", + "chrono", + "crossbeam-channel", + "futures 0.3.16", + "futures-util", + "log 0.4.14", + "prost", + "prost-types", + "serde", + "solana-logger 1.8.0", + "solana-metrics", + "solana-rpc", + "solana-runtime", + "solana-sdk", + "tokio 1.10.1", + "tonic", + "tonic-build", +] + [[package]] name = "solana-replica-node" version = "1.8.0" @@ -5538,6 +5562,8 @@ dependencies = [ "jsonrpc-ipc-server 17.1.0", "jsonrpc-server-utils 17.1.0", "log 0.4.14", + "prost", + "prost-types", "rand 0.7.3", "serde", "serial_test 0.5.1", @@ -5553,6 +5579,7 @@ dependencies = [ "solana-logger 1.8.0", "solana-metrics", "solana-net-utils", + "solana-replica-lib", "solana-rpc", "solana-runtime", "solana-sdk", @@ -5561,6 +5588,8 @@ dependencies = [ "solana-version", "solana-vote-program", "tempfile", + "tonic", + "tonic-build", ] [[package]] @@ -5993,6 +6022,7 @@ dependencies = [ "solana-net-utils", "solana-perf", "solana-poh", + "solana-replica-lib", "solana-rpc", "solana-runtime", "solana-sdk", diff --git a/Cargo.toml b/Cargo.toml index 8c0fe87cf..b6df6f2cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ members = [ "rayon-threadlimit", "watchtower", "replica-node", + "replica-lib", ] exclude = [ diff --git a/core/Cargo.toml b/core/Cargo.toml index 6122302cb..4fa3b546a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -62,6 +62,7 @@ solana-perf = { path = "../perf", version = "=1.8.0" } solana-poh = { path = "../poh", version = "=1.8.0" } solana-program-test = { path = "../program-test", version = "=1.8.0" } solana-rpc = { path = "../rpc", version = "=1.8.0" } +solana-replica-lib = { path = "../replica-lib", version = "=1.8.0" } solana-runtime = { path = "../runtime", version = "=1.8.0" } solana-sdk = { path = "../sdk", version = "=1.8.0" } solana-frozen-abi = { path = "../frozen-abi", version = "=1.8.0" } diff --git a/core/src/validator.rs b/core/src/validator.rs index 6b282782b..8aa97df2c 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -44,6 +44,10 @@ use { poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_service::{self, PohService}, }, + solana_replica_lib::{ + accountsdb_repl_server::{AccountsDbReplService, AccountsDbReplServiceConfig}, + accountsdb_repl_server_factory, + }, solana_rpc::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ @@ -108,6 +112,7 @@ pub struct ValidatorConfig { pub account_paths: Vec, pub account_shrink_paths: Option>, pub rpc_config: JsonRpcConfig, + pub accountsdb_repl_service_config: Option, pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub pubsub_config: PubSubConfig, pub snapshot_config: Option, @@ -167,6 +172,7 @@ impl Default for ValidatorConfig { account_paths: Vec::new(), account_shrink_paths: None, rpc_config: JsonRpcConfig::default(), + accountsdb_repl_service_config: None, rpc_addrs: None, pubsub_config: PubSubConfig::default(), snapshot_config: None, @@ -272,6 +278,7 @@ pub struct Validator { tvu: Tvu, ip_echo_server: Option, pub cluster_info: Arc, + accountsdb_repl_service: Option, } // in the distant future, get rid of ::new()/exit() and use Result properly... @@ -522,6 +529,7 @@ impl Validator { pubsub_service, optimistically_confirmed_bank_tracker, bank_notification_sender, + accountsdb_repl_service, ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs { if ContactInfo::is_valid_address(&node.info.rpc, &socket_addr_space) { assert!(ContactInfo::is_valid_address( @@ -534,6 +542,13 @@ impl Validator { &socket_addr_space )); } + + let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); + + let accountsdb_repl_service = config.accountsdb_repl_service_config.as_ref().map(|accountsdb_repl_service_config| { + accountsdb_repl_server_factory::AccountsDbReplServerFactory::build_accountsdb_repl_server( + accountsdb_repl_service_config.clone(), confirmed_bank_receiver, bank_forks.clone())}); + let (bank_notification_sender, bank_notification_receiver) = unbounded(); ( Some(JsonRpcService::new( @@ -573,11 +588,13 @@ impl Validator { bank_forks.clone(), optimistically_confirmed_bank, rpc_subscriptions.clone(), + Some(Arc::new(RwLock::new(vec![confirmed_bank_sender]))), )), Some(bank_notification_sender), + accountsdb_repl_service, ) } else { - (None, None, None, None) + (None, None, None, None, None) }; if config.dev_halt_at_slot.is_some() { @@ -789,6 +806,7 @@ impl Validator { ); datapoint_info!("validator-new", ("id", id.to_string(), String)); + *start_progress.write().unwrap() = ValidatorStartProgress::Running; Self { gossip_service, @@ -810,6 +828,7 @@ impl Validator { ip_echo_server, validator_exit: config.validator_exit.clone(), cluster_info, + accountsdb_repl_service, } } @@ -914,6 +933,12 @@ impl Validator { if let Some(ip_echo_server) = self.ip_echo_server { ip_echo_server.shutdown_background(); } + + if let Some(accountsdb_repl_service) = self.accountsdb_repl_service { + accountsdb_repl_service + .join() + .expect("accountsdb_repl_service"); + } } } diff --git a/docs/src/proposals/accounts-db-replication.md b/docs/src/proposals/accounts-db-replication.md index c629e1a6d..42fb6c211 100644 --- a/docs/src/proposals/accounts-db-replication.md +++ b/docs/src/proposals/accounts-db-replication.md @@ -53,15 +53,15 @@ It will be a separate executable from the validator. The replica consists of the following major components: -The `ReplicaUpdatedSlotsRequestor`: this service is responsible for periodically sending the -request `ReplicaUpdatedSlotsRequest` to its peer validator or replica for the latest slots. +The `ReplicaSlotConfirmationRequestor`: this service is responsible for periodically sending the +request `ReplicaSlotConfirmationRequest` to its peer validator or replica for the latest slots. It specifies the latest slot (last_replicated_slot) for which the replica has already fetched the accounts information for. This maintains the ReplWorkingSlotSet and manages the lifecycle of BankForks, BlockCommitmentCache (for the highest confirmed slot) and the optimistically confirmed bank. -The `ReplicaUpdatedSlotsServer`: this service is responsible for serving the -`ReplicaUpdatedSlotsRequest` and sends the `ReplicaUpdatedSlotsResponse` back to the requestor. +The `ReplicaSlotConfirmationServer`: this service is responsible for serving the +`ReplicaSlotConfirmationRequest` and sends the `ReplicaSlotConfirmationResponse` back to the requestor. The response consists of a vector of new slots the validator knows of which is later than the specified last_replicated_slot. This service also runs in the main validator. This service gets the slots for replication from the BankForks, BlockCommitmentCache and OptimiscallyConfirmBank. @@ -99,7 +99,7 @@ The replica node only serves the AccountsDb calls. The existing JsonRpcService requires `BankForks`, `OptimisticallyConfirmedBank` and `BlockCommitmentCache` to load the Bank. The JsonRpcAccountsService will need to use -information obtained from ReplicaUpdatedSlotsResponse to construct the AccountsDb. +information obtained from ReplicaSlotConfirmationResponse to construct the AccountsDb. The `AccountsBackgroundService`: this service also runs in the replica which is responsible for taking snapshots periodically and shrinking the AccountsDb and doing accounts cleaning. @@ -175,8 +175,8 @@ Action Items 1. Build the replica framework and executable 2. Integrate snapshot restore code for bootstrap the AccountsDb. -3. Develop the ReplicaUpdatedSlotsRequestor and ReplicaUpdatedSlotsServer interface code -4. Develop the ReplicaUpdatedSlotsRequestor and ReplicaUpdatedSlotsServer detailed implementations: managing the ReplEligibleSlotSet lifecycle: adding new roots and deleting root to it. And interfaces managing ReplWorkingSlotSet interface: adding and removing. Develop component synthesising information from BankForks, BlockCommitmentCache and OptimistcallyConfirmedBank on the server side and maintaining information on the client side. +3. Develop the ReplicaSlotConfirmationRequestor and ReplicaSlotConfirmationServer interface code +4. Develop the ReplicaSlotConfirmationRequestor and ReplicaSlotConfirmationServer detailed implementations: managing the ReplEligibleSlotSet lifecycle: adding new roots and deleting root to it. And interfaces managing ReplWorkingSlotSet interface: adding and removing. Develop component synthesising information from BankForks, BlockCommitmentCache and OptimistcallyConfirmedBank on the server side and maintaining information on the client side. 5. Develop the interface code for ReplicaAccountsRequestor and ReplicaAccountsServer 6. Develop detailed implementation for ReplicaAccountsRequestor and ReplicaAccountsServer and develop the replication account storage serializer and deserializer. 7. Develop the interface code JsonRpcAccountsService diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 47bfd1f13..55d289754 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -12,6 +12,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { account_paths: config.account_paths.clone(), account_shrink_paths: config.account_shrink_paths.clone(), rpc_config: config.rpc_config.clone(), + accountsdb_repl_service_config: config.accountsdb_repl_service_config.clone(), rpc_addrs: config.rpc_addrs, pubsub_config: config.pubsub_config.clone(), snapshot_config: config.snapshot_config.clone(), diff --git a/replica-lib/Cargo.toml b/replica-lib/Cargo.toml new file mode 100644 index 000000000..a78ba1f85 --- /dev/null +++ b/replica-lib/Cargo.toml @@ -0,0 +1,34 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-replica-lib" +description = "The library used for replication by both the client and server" +version = "1.8.0" +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-validator" + +[dependencies] +bincode = "1.3.1" +chrono = { version = "0.4.11", features = ["serde"] } +crossbeam-channel = "0.5" +futures = "0.3" +futures-util = "0.3" +log = "0.4.11" +prost = "0.8.0" +prost-types = "0.8.0" +serde = "1.0.112" +solana-logger = { path = "../logger", version = "=1.8.0" } +solana-metrics = { path = "../metrics", version = "=1.8.0" } +solana-rpc = { path = "../rpc", version = "=1.8.0" } +solana-runtime = { path = "../runtime", version = "=1.8.0" } +solana-sdk = { path = "../sdk", version = "=1.8.0" } +tokio = { version = "1", features = ["full"] } +tonic = { version = "0.5.0", features = ["tls", "transport"] } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[build-dependencies] +tonic-build = "0.5.1" diff --git a/replica-lib/build.rs b/replica-lib/build.rs new file mode 100644 index 000000000..a90b01298 --- /dev/null +++ b/replica-lib/build.rs @@ -0,0 +1,5 @@ +fn main() -> Result<(), Box> { + // compiling protos using path on build time + tonic_build::configure().compile(&["proto/accountsdb_repl.proto"], &["proto"])?; + Ok(()) +} diff --git a/replica-lib/proto/accountsdb_repl.proto b/replica-lib/proto/accountsdb_repl.proto new file mode 100644 index 000000000..0b8b679ab --- /dev/null +++ b/replica-lib/proto/accountsdb_repl.proto @@ -0,0 +1,44 @@ + +// version of prorocol buffer used +syntax = "proto3"; + +package accountsdb_repl; + +message ReplicaSlotConfirmationRequest { + uint64 last_replicated_slot = 1; +} + +message ReplicaSlotConfirmationResponse { + repeated uint64 updated_slots = 1; +} + +message ReplicaAccountsRequest { + uint64 slot = 1; +} + +message ReplicaAccountMeta { + bytes Pubkey = 1; + uint64 lamports = 2; + bytes owner = 3; + bool executable = 4; + uint64 rent_epoch = 5; +} + +message ReplicaAccountData { + bytes data = 1; +} + +message ReplicaAccountInfo { + ReplicaAccountMeta account_meta = 1; + bytes hash = 2; + ReplicaAccountData data = 3; +} + +message ReplicaAccountsResponse { + repeated ReplicaAccountInfo accounts = 1; +} + +service AccountsDbRepl { + rpc get_confirmed_slots(ReplicaSlotConfirmationRequest) returns (ReplicaSlotConfirmationResponse); + rpc get_slot_accounts(ReplicaAccountsRequest) returns (ReplicaAccountsResponse); +} diff --git a/replica-lib/src/accountsdb_repl_client.rs b/replica-lib/src/accountsdb_repl_client.rs new file mode 100644 index 000000000..a76514529 --- /dev/null +++ b/replica-lib/src/accountsdb_repl_client.rs @@ -0,0 +1,119 @@ +use { + log::*, + solana_sdk::clock::Slot, + std::{net::SocketAddr, sync::Arc}, + tokio::runtime::Runtime, + tonic::{self, transport::Endpoint, Request}, +}; + +tonic::include_proto!("accountsdb_repl"); + +pub struct AccountsDbReplClient { + client: accounts_db_repl_client::AccountsDbReplClient, +} + +#[derive(Debug)] +pub enum ReplicaRpcError { + InvalidUrl(String), + ConnectionError(String), + GetSlotsError(String), + GetAccountsError(String), +} + +impl From for ReplicaRpcError { + fn from(err: tonic::transport::Error) -> Self { + ReplicaRpcError::ConnectionError(err.to_string()) + } +} + +impl AccountsDbReplClient { + pub async fn connect(rpc_peer: &SocketAddr) -> Result { + let url = format!("http://{}", rpc_peer); + let endpoint = match Endpoint::from_shared(url.to_string()) { + Ok(endpoint) => endpoint, + Err(e) => { + return Err(ReplicaRpcError::InvalidUrl(e.to_string())); + } + }; + let client = accounts_db_repl_client::AccountsDbReplClient::connect(endpoint).await?; + info!( + "Successfully connected to the AccountsDb Replication server: {:?}", + url + ); + Ok(AccountsDbReplClient { client }) + } + + pub async fn get_confirmed_slots( + &mut self, + last_slot: Slot, + ) -> Result, ReplicaRpcError> { + let request = ReplicaSlotConfirmationRequest { + last_replicated_slot: last_slot, + }; + let response = self.client.get_confirmed_slots(Request::new(request)).await; + + match response { + Ok(response) => Ok(response.into_inner().updated_slots), + Err(status) => Err(ReplicaRpcError::GetSlotsError(status.to_string())), + } + } + + pub async fn get_slot_accounts( + &mut self, + slot: Slot, + ) -> Result, ReplicaRpcError> { + let request = ReplicaAccountsRequest { slot }; + let response = self.client.get_slot_accounts(Request::new(request)).await; + + match response { + Ok(response) => Ok(response.into_inner().accounts), + Err(status) => Err(ReplicaRpcError::GetAccountsError(status.to_string())), + } + } +} + +#[derive(Clone)] +pub struct AccountsDbReplClientServiceConfig { + pub worker_threads: usize, + pub replica_server_addr: SocketAddr, +} + +/// The service wrapper over AccountsDbReplClient to make it run in the tokio runtime +pub struct AccountsDbReplClientService { + runtime: Arc, + accountsdb_repl_client: AccountsDbReplClient, +} + +impl AccountsDbReplClientService { + pub fn new(config: AccountsDbReplClientServiceConfig) -> Result { + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .worker_threads(config.worker_threads) + .thread_name("sol-accountsdb-repl-wrk") + .enable_all() + .build() + .expect("Runtime"), + ); + + let accountsdb_repl_client = + runtime.block_on(AccountsDbReplClient::connect(&config.replica_server_addr))?; + + Ok(Self { + runtime, + accountsdb_repl_client, + }) + } + + pub fn get_confirmed_slots(&mut self, last_slot: Slot) -> Result, ReplicaRpcError> { + self.runtime + .block_on(self.accountsdb_repl_client.get_confirmed_slots(last_slot)) + } + + pub fn get_slot_accounts( + &mut self, + slot: Slot, + ) -> Result, ReplicaRpcError> { + self.runtime + .block_on(self.accountsdb_repl_client.get_slot_accounts(slot)) + } +} diff --git a/replica-lib/src/accountsdb_repl_server.rs b/replica-lib/src/accountsdb_repl_server.rs new file mode 100644 index 000000000..72eff6b0c --- /dev/null +++ b/replica-lib/src/accountsdb_repl_server.rs @@ -0,0 +1,176 @@ +use { + futures_util::FutureExt, + log::*, + std::{ + net::SocketAddr, + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, + }, + tokio::{ + runtime::Runtime, + sync::oneshot::{self, Receiver, Sender}, + }, + tonic::{self, transport}, +}; + +tonic::include_proto!("accountsdb_repl"); + +pub trait ReplicaSlotConfirmationServer { + fn get_confirmed_slots( + &self, + request: &ReplicaSlotConfirmationRequest, + ) -> Result; + + fn join(&mut self) -> thread::Result<()>; +} + +pub trait ReplicaAccountsServer { + fn get_slot_accounts( + &self, + request: &ReplicaAccountsRequest, + ) -> Result; + fn join(&mut self) -> thread::Result<()>; +} + +#[derive(Clone)] +struct AccountsDbReplServer { + confirmed_slots_server: Arc>, + accounts_server: Arc>, +} + +/// Implementing the AccountsDbRepl interface declared by the protocol +#[tonic::async_trait] +impl accounts_db_repl_server::AccountsDbRepl for AccountsDbReplServer { + async fn get_confirmed_slots( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let server = self.confirmed_slots_server.read().unwrap(); + let result = server.get_confirmed_slots(&request.into_inner()); + result.map(tonic::Response::new) + } + + async fn get_slot_accounts( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let server = self.accounts_server.read().unwrap(); + let result = server.get_slot_accounts(&request.into_inner()); + result.map(tonic::Response::new) + } +} + +impl AccountsDbReplServer { + pub fn new( + confirmed_slots_server: Arc>, + accounts_server: Arc>, + ) -> Self { + Self { + confirmed_slots_server, + accounts_server, + } + } + + pub fn join(self) -> thread::Result<()> { + self.confirmed_slots_server.write().unwrap().join()?; + self.accounts_server.write().unwrap().join() + } +} + +#[derive(Clone)] +pub struct AccountsDbReplServiceConfig { + pub worker_threads: usize, + pub replica_server_addr: SocketAddr, +} + +/// The service wraps the AccountsDbReplServer to make runnable in the tokio runtime +/// and handles start and stop of the service. +pub struct AccountsDbReplService { + accountsdb_repl_server: AccountsDbReplServer, + thread: JoinHandle<()>, + exit_signal_sender: Sender<()>, +} + +impl AccountsDbReplService { + pub fn new( + config: AccountsDbReplServiceConfig, + confirmed_slots_server: Arc>, + accounts_server: Arc>, + ) -> Self { + let accountsdb_repl_server = + AccountsDbReplServer::new(confirmed_slots_server, accounts_server); + + let worker_threads = config.worker_threads; + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .worker_threads(worker_threads) + .thread_name("sol-accountsdb-repl-wrk") + .enable_all() + .build() + .expect("Runtime"), + ); + + let server_cloned = accountsdb_repl_server.clone(); + let (exit_signal_sender, exit_signal_receiver) = oneshot::channel::<()>(); + + let thread = Builder::new() + .name("sol-accountsdb-repl-rt".to_string()) + .spawn(move || { + Self::run_accountsdb_repl_server_in_runtime( + config, + runtime, + server_cloned, + exit_signal_receiver, + ); + }) + .unwrap(); + + Self { + accountsdb_repl_server, + thread, + exit_signal_sender, + } + } + + async fn run_accountsdb_repl_server( + config: AccountsDbReplServiceConfig, + server: AccountsDbReplServer, + exit_signal: Receiver<()>, + ) -> Result<(), tonic::transport::Error> { + info!( + "Running AccountsDbReplServer at the endpoint: {:?}", + config.replica_server_addr + ); + transport::Server::builder() + .add_service(accounts_db_repl_server::AccountsDbReplServer::new(server)) + .serve_with_shutdown(config.replica_server_addr, exit_signal.map(drop)) + .await + } + + fn run_accountsdb_repl_server_in_runtime( + config: AccountsDbReplServiceConfig, + runtime: Arc, + server: AccountsDbReplServer, + exit_signal: Receiver<()>, + ) { + let result = runtime.block_on(Self::run_accountsdb_repl_server( + config, + server, + exit_signal, + )); + match result { + Ok(_) => { + info!("AccountsDbReplServer finished"); + } + Err(err) => { + error!("AccountsDbReplServer finished in error: {:}?", err); + } + } + } + + pub fn join(self) -> thread::Result<()> { + let _ = self.exit_signal_sender.send(()); + self.accountsdb_repl_server.join()?; + self.thread.join() + } +} diff --git a/replica-lib/src/accountsdb_repl_server_factory.rs b/replica-lib/src/accountsdb_repl_server_factory.rs new file mode 100644 index 000000000..e35d26bbf --- /dev/null +++ b/replica-lib/src/accountsdb_repl_server_factory.rs @@ -0,0 +1,29 @@ +use { + crate::{ + accountsdb_repl_server::{AccountsDbReplService, AccountsDbReplServiceConfig}, + replica_accounts_server::ReplicaAccountsServerImpl, + replica_confirmed_slots_server::ReplicaSlotConfirmationServerImpl, + }, + crossbeam_channel::Receiver, + solana_runtime::bank_forks::BankForks, + solana_sdk::clock::Slot, + std::sync::{Arc, RwLock}, +}; + +pub struct AccountsDbReplServerFactory {} + +impl AccountsDbReplServerFactory { + pub fn build_accountsdb_repl_server( + config: AccountsDbReplServiceConfig, + confirmed_bank_receiver: Receiver, + bank_forks: Arc>, + ) -> AccountsDbReplService { + AccountsDbReplService::new( + config, + Arc::new(RwLock::new(ReplicaSlotConfirmationServerImpl::new( + confirmed_bank_receiver, + ))), + Arc::new(RwLock::new(ReplicaAccountsServerImpl::new(bank_forks))), + ) + } +} diff --git a/replica-lib/src/lib.rs b/replica-lib/src/lib.rs new file mode 100644 index 000000000..13715c691 --- /dev/null +++ b/replica-lib/src/lib.rs @@ -0,0 +1,7 @@ +#![allow(clippy::integer_arithmetic)] + +pub mod accountsdb_repl_client; +pub mod accountsdb_repl_server; +pub mod accountsdb_repl_server_factory; +pub mod replica_accounts_server; +pub mod replica_confirmed_slots_server; diff --git a/replica-lib/src/replica_accounts_server.rs b/replica-lib/src/replica_accounts_server.rs new file mode 100644 index 000000000..516b8ed7b --- /dev/null +++ b/replica-lib/src/replica_accounts_server.rs @@ -0,0 +1,95 @@ +use { + crate::accountsdb_repl_server::{ + self, ReplicaAccountData, ReplicaAccountInfo, ReplicaAccountMeta, ReplicaAccountsServer, + }, + solana_runtime::{ + accounts_cache::CachedAccount, accounts_db::LoadedAccount, append_vec::StoredAccountMeta, + bank_forks::BankForks, + }, + solana_sdk::account::Account, + std::{ + cmp::Eq, + sync::{Arc, RwLock}, + thread, + }, +}; + +pub(crate) struct ReplicaAccountsServerImpl { + bank_forks: Arc>, +} + +impl Eq for ReplicaAccountInfo {} + +impl ReplicaAccountInfo { + fn from_stored_account_meta(stored_account_meta: &StoredAccountMeta) -> Self { + let account_meta = Some(ReplicaAccountMeta { + pubkey: stored_account_meta.meta.pubkey.to_bytes().to_vec(), + lamports: stored_account_meta.account_meta.lamports, + owner: stored_account_meta.account_meta.owner.to_bytes().to_vec(), + executable: stored_account_meta.account_meta.executable, + rent_epoch: stored_account_meta.account_meta.rent_epoch, + }); + let data = Some(ReplicaAccountData { + data: stored_account_meta.data.to_vec(), + }); + ReplicaAccountInfo { + account_meta, + hash: stored_account_meta.hash.0.to_vec(), + data, + } + } + + fn from_cached_account(cached_account: &CachedAccount) -> Self { + let account = Account::from(cached_account.account.clone()); + let account_meta = Some(ReplicaAccountMeta { + pubkey: cached_account.pubkey().to_bytes().to_vec(), + lamports: account.lamports, + owner: account.owner.to_bytes().to_vec(), + executable: account.executable, + rent_epoch: account.rent_epoch, + }); + let data = Some(ReplicaAccountData { + data: account.data.to_vec(), + }); + ReplicaAccountInfo { + account_meta, + hash: cached_account.hash().0.to_vec(), + data, + } + } +} + +impl ReplicaAccountsServer for ReplicaAccountsServerImpl { + fn get_slot_accounts( + &self, + request: &accountsdb_repl_server::ReplicaAccountsRequest, + ) -> Result { + let slot = request.slot; + + match self.bank_forks.read().unwrap().get(slot) { + None => Err(tonic::Status::not_found("The slot is not found")), + Some(bank) => { + let accounts = bank.rc.accounts.scan_slot(slot, |account| match account { + LoadedAccount::Stored(stored_account_meta) => Some( + ReplicaAccountInfo::from_stored_account_meta(&stored_account_meta), + ), + LoadedAccount::Cached((_pubkey, cached_account)) => { + Some(ReplicaAccountInfo::from_cached_account(&cached_account)) + } + }); + + Ok(accountsdb_repl_server::ReplicaAccountsResponse { accounts }) + } + } + } + + fn join(&mut self) -> thread::Result<()> { + Ok(()) + } +} + +impl ReplicaAccountsServerImpl { + pub fn new(bank_forks: Arc>) -> Self { + Self { bank_forks } + } +} diff --git a/replica-lib/src/replica_confirmed_slots_server.rs b/replica-lib/src/replica_confirmed_slots_server.rs new file mode 100644 index 000000000..37963632b --- /dev/null +++ b/replica-lib/src/replica_confirmed_slots_server.rs @@ -0,0 +1,119 @@ +use { + crate::accountsdb_repl_server::{self, ReplicaSlotConfirmationServer}, + crossbeam_channel::Receiver, + solana_sdk::{clock::Slot, commitment_config::CommitmentLevel}, + std::{ + collections::VecDeque, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, sleep, Builder, JoinHandle}, + time::Duration, + }, + tonic, +}; + +/// The structure modelling the slots eligible for replication and +/// their states. +#[derive(Default, Clone)] +struct ReplicaEligibleSlotSet { + slot_set: Arc>>, +} + +pub(crate) struct ReplicaSlotConfirmationServerImpl { + eligible_slot_set: ReplicaEligibleSlotSet, + confirmed_bank_receiver_service: Option>, + cleanup_service: Option>, + exit_updated_slot_server: Arc, +} + +impl ReplicaSlotConfirmationServer for ReplicaSlotConfirmationServerImpl { + fn get_confirmed_slots( + &self, + request: &accountsdb_repl_server::ReplicaSlotConfirmationRequest, + ) -> Result { + let slot_set = self.eligible_slot_set.slot_set.read().unwrap(); + let updated_slots: Vec = slot_set + .iter() + .filter(|(slot, _)| *slot > request.last_replicated_slot) + .map(|(slot, _)| *slot) + .collect(); + + Ok(accountsdb_repl_server::ReplicaSlotConfirmationResponse { updated_slots }) + } + + fn join(&mut self) -> thread::Result<()> { + self.exit_updated_slot_server.store(true, Ordering::Relaxed); + self.confirmed_bank_receiver_service + .take() + .map(JoinHandle::join) + .unwrap() + .expect("confirmed_bank_receiver_service"); + + self.cleanup_service.take().map(JoinHandle::join).unwrap() + } +} + +const MAX_ELIGIBLE_SLOT_SET_SIZE: usize = 262144; + +impl ReplicaSlotConfirmationServerImpl { + pub fn new(confirmed_bank_receiver: Receiver) -> Self { + let eligible_slot_set = ReplicaEligibleSlotSet::default(); + let exit_updated_slot_server = Arc::new(AtomicBool::new(false)); + + Self { + eligible_slot_set: eligible_slot_set.clone(), + confirmed_bank_receiver_service: Some(Self::run_confirmed_bank_receiver( + confirmed_bank_receiver, + eligible_slot_set.clone(), + exit_updated_slot_server.clone(), + )), + cleanup_service: Some(Self::run_cleanup_service( + eligible_slot_set, + MAX_ELIGIBLE_SLOT_SET_SIZE, + exit_updated_slot_server.clone(), + )), + exit_updated_slot_server, + } + } + + fn run_confirmed_bank_receiver( + confirmed_bank_receiver: Receiver, + eligible_slot_set: ReplicaEligibleSlotSet, + exit: Arc, + ) -> JoinHandle<()> { + Builder::new() + .name("confirmed_bank_receiver".to_string()) + .spawn(move || { + while !exit.load(Ordering::Relaxed) { + if let Ok(slot) = confirmed_bank_receiver.recv() { + let mut slot_set = eligible_slot_set.slot_set.write().unwrap(); + slot_set.push_back((slot, CommitmentLevel::Confirmed)); + } + } + }) + .unwrap() + } + + fn run_cleanup_service( + eligible_slot_set: ReplicaEligibleSlotSet, + max_set_size: usize, + exit: Arc, + ) -> JoinHandle<()> { + Builder::new() + .name("cleanup_service".to_string()) + .spawn(move || { + while !exit.load(Ordering::Relaxed) { + let mut slot_set = eligible_slot_set.slot_set.write().unwrap(); + let count_to_drain = slot_set.len().saturating_sub(max_set_size); + if count_to_drain > 0 { + drop(slot_set.drain(..count_to_drain)); + } + drop(slot_set); + sleep(Duration::from_millis(200)); + } + }) + .unwrap() + } +} diff --git a/replica-node/Cargo.toml b/replica-node/Cargo.toml index 9647d5393..d8beb2bf2 100644 --- a/replica-node/Cargo.toml +++ b/replica-node/Cargo.toml @@ -21,6 +21,8 @@ jsonrpc-derive = "17.0.0" jsonrpc-ipc-server = "17.0.0" jsonrpc-server-utils= "17.0.0" log = "0.4.11" +prost = "0.8.0" +prost-types = "0.8.0" rand = "0.7.0" serde = "1.0.130" solana-clap-utils = { path = "../clap-utils", version = "=1.8.0" } @@ -34,11 +36,13 @@ solana-logger = { path = "../logger", version = "=1.8.0" } solana-metrics = { path = "../metrics", version = "=1.8.0" } solana-net-utils = { path = "../net-utils", version = "=1.8.0" } solana-rpc = { path = "../rpc", version = "=1.8.0" } +solana-replica-lib = { path = "../replica-lib", version = "=1.8.0" } solana-runtime = { path = "../runtime", version = "=1.8.0" } solana-sdk = { path = "../sdk", version = "=1.8.0" } solana-streamer = { path = "../streamer", version = "=1.8.0" } solana-version = { path = "../version", version = "=1.8.0" } solana-validator = { path = "../validator", version = "=1.8.0" } +tonic = { version = "0.5.0", features = ["tls", "transport"] } [dev-dependencies] solana-core = { path = "../core", version = "=1.8.0" } @@ -51,3 +55,6 @@ tempfile = "3.2.0" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] + +[build-dependencies] +tonic-build = "0.5.1" diff --git a/replica-node/src/accountsdb_repl_service.rs b/replica-node/src/accountsdb_repl_service.rs new file mode 100644 index 000000000..bed1694fe --- /dev/null +++ b/replica-node/src/accountsdb_repl_service.rs @@ -0,0 +1,81 @@ +/// Module responsible for replicating AccountsDb data from its peer to its local AccountsDb in the replica-node +use { + log::*, + solana_replica_lib::accountsdb_repl_client::{ + AccountsDbReplClientService, AccountsDbReplClientServiceConfig, ReplicaRpcError, + }, + solana_sdk::{clock::Slot, pubkey::Pubkey}, + std::{ + thread::{self, sleep, Builder, JoinHandle}, + time::Duration, + }, +}; + +pub struct AccountsDbReplService { + thread: JoinHandle<()>, +} + +impl AccountsDbReplService { + pub fn new( + last_replicated_slot: Slot, + config: AccountsDbReplClientServiceConfig, + ) -> Result { + let accountsdb_repl_client = AccountsDbReplClientService::new(config)?; + let thread = Builder::new() + .name("sol-accountsdb-repl-svc".to_string()) + .spawn(move || { + Self::run_service(last_replicated_slot, accountsdb_repl_client); + }) + .unwrap(); + Ok(Self { thread }) + } + + fn replicate_accounts_for_slot( + accountsdb_repl_client: &mut AccountsDbReplClientService, + slot: Slot, + ) { + match accountsdb_repl_client.get_slot_accounts(slot) { + Err(err) => { + error!( + "Ran into error getting accounts for slot {:?}, error: {:?}", + slot, err + ); + } + Ok(accounts) => { + for account in accounts.iter() { + debug!( + "Received account: {:?}", + Pubkey::new(&account.account_meta.as_ref().unwrap().pubkey) + ); + } + } + } + } + + fn run_service( + mut last_replicated_slot: Slot, + mut accountsdb_repl_client: AccountsDbReplClientService, + ) { + loop { + match accountsdb_repl_client.get_confirmed_slots(last_replicated_slot) { + Ok(slots) => { + info!("Received updated slots: {:?}", slots); + if !slots.is_empty() { + for slot in slots.iter() { + Self::replicate_accounts_for_slot(&mut accountsdb_repl_client, *slot); + } + last_replicated_slot = slots[slots.len() - 1]; + } + } + Err(err) => { + error!("Ran into error getting updated slots: {:?}", err); + } + } + sleep(Duration::from_millis(200)); + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread.join() + } +} diff --git a/replica-node/src/lib.rs b/replica-node/src/lib.rs index 6fc5d3bf9..f3ff1bd95 100644 --- a/replica-node/src/lib.rs +++ b/replica-node/src/lib.rs @@ -1,2 +1,5 @@ +#![allow(clippy::integer_arithmetic)] + +pub mod accountsdb_repl_service; pub mod replica_node; pub mod replica_util; diff --git a/replica-node/src/main.rs b/replica-node/src/main.rs index e5564a82c..a2d84f9dc 100644 --- a/replica-node/src/main.rs +++ b/replica-node/src/main.rs @@ -67,12 +67,28 @@ pub fn main() { .help("Use DIR as snapshot location [default: --ledger value]"), ) .arg( - Arg::with_name("peer") - .long("peer") - .value_name("IP:PORT") + Arg::with_name("peer_address") + .long("peer-address") + .value_name("IP") .takes_value(true) .required(true) - .help("The the IP:PORT for the peer validator/replica to download from"), + .help("The the address for the peer validator/replica to download from"), + ) + .arg( + Arg::with_name("peer_rpc_port") + .long("peer-rpc-port") + .value_name("PORT") + .takes_value(true) + .required(true) + .help("The the PORT for the peer validator/replica from which to download the snapshots"), + ) + .arg( + Arg::with_name("peer_accountsdb_repl_port") + .long("peer-accountsdb-repl-port") + .value_name("PORT") + .takes_value(true) + .required(true) + .help("The the PORT for the peer validator/replica serving the AccountsDb replication"), ) .arg( Arg::with_name("peer_pubkey") @@ -296,19 +312,30 @@ pub fn main() { vec![ledger_path.join("accounts")] }; - let rpc_source_addr = - solana_net_utils::parse_host_port(matches.value_of("peer").unwrap_or_else(|| { + let peer_address = solana_net_utils::parse_host(matches.value_of("peer_address").unwrap()) + .expect("invalid peer_address"); + + let peer_rpc_port = value_t!(matches, "peer_rpc_port", u16).unwrap_or_else(|_| { + clap::Error::with_description( + "The --peer-rpc-port argument is required", + clap::ErrorKind::ArgumentNotFound, + ) + .exit(); + }); + + let rpc_peer_addr = SocketAddr::new(peer_address, peer_rpc_port); + + let peer_accountsdb_repl_port = value_t!(matches, "peer_accountsdb_repl_port", u16) + .unwrap_or_else(|_| { clap::Error::with_description( - "The --peer argument is required", + "The --peer-accountsdb-repl-port argument is required", clap::ErrorKind::ArgumentNotFound, ) .exit(); - })) - .unwrap_or_else(|e| { - eprintln!("failed to parse entrypoint address: {}", e); - exit(1); }); + let accountsdb_repl_peer_addr = SocketAddr::new(peer_address, peer_accountsdb_repl_port); + let rpc_port = value_t!(matches, "rpc_port", u16).unwrap_or_else(|_| { clap::Error::with_description( "The --rpc-port argument is required", @@ -358,7 +385,8 @@ pub fn main() { ); let config = ReplicaNodeConfig { - rpc_source_addr, + rpc_peer_addr, + accountsdb_repl_peer_addr: Some(accountsdb_repl_peer_addr), rpc_addr: rpc_addrs.0, rpc_pubsub_addr: rpc_addrs.1, ledger_path, @@ -376,6 +404,6 @@ pub fn main() { replica_exit: Arc::new(RwLock::new(Exit::default())), }; - let validator = ReplicaNode::new(config); - validator.join(); + let replica = ReplicaNode::new(config); + replica.join(); } diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs index 94838c62d..35226ccf2 100644 --- a/replica-node/src/replica_node.rs +++ b/replica-node/src/replica_node.rs @@ -1,4 +1,5 @@ use { + crate::accountsdb_repl_service::AccountsDbReplService, crossbeam_channel::unbounded, log::*, solana_download_utils::download_snapshot, @@ -8,6 +9,7 @@ use { blockstore::Blockstore, blockstore_db::AccessType, blockstore_processor, leader_schedule_cache::LeaderScheduleCache, }, + solana_replica_lib::accountsdb_repl_client::AccountsDbReplClientServiceConfig, solana_rpc::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ @@ -40,7 +42,8 @@ use { }; pub struct ReplicaNodeConfig { - pub rpc_source_addr: SocketAddr, + pub rpc_peer_addr: SocketAddr, + pub accountsdb_repl_peer_addr: Option, pub rpc_addr: SocketAddr, pub rpc_pubsub_addr: SocketAddr, pub ledger_path: PathBuf, @@ -62,6 +65,7 @@ pub struct ReplicaNode { json_rpc_service: Option, pubsub_service: Option, optimistically_confirmed_bank_tracker: Option, + accountsdb_repl_service: Option, } // Struct maintaining information about banks @@ -86,7 +90,7 @@ fn initialize_from_snapshot( ); download_snapshot( - &replica_config.rpc_source_addr, + &replica_config.rpc_peer_addr, &replica_config.snapshot_archives_dir, replica_config.snapshot_info, false, @@ -240,6 +244,7 @@ fn start_client_rpc_services( bank_forks.clone(), optimistically_confirmed_bank.clone(), subscriptions.clone(), + None, )), ) } @@ -247,7 +252,7 @@ fn start_client_rpc_services( impl ReplicaNode { pub fn new(replica_config: ReplicaNodeConfig) -> Self { let genesis_config = download_then_check_genesis_hash( - &replica_config.rpc_source_addr, + &replica_config.rpc_peer_addr, &replica_config.ledger_path, None, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, @@ -279,10 +284,31 @@ impl ReplicaNode { &replica_config.socket_addr_space, ); + let accountsdb_repl_client_config = AccountsDbReplClientServiceConfig { + worker_threads: 1, + replica_server_addr: replica_config.accountsdb_repl_peer_addr.unwrap(), + }; + + let last_replicated_slot = bank_info.bank_forks.read().unwrap().root_bank().slot(); + info!( + "Starting AccountsDbReplService from slot {:?}", + last_replicated_slot + ); + let accountsdb_repl_service = Some( + AccountsDbReplService::new(last_replicated_slot, accountsdb_repl_client_config) + .expect("Failed to start AccountsDb replication service"), + ); + + info!( + "Started AccountsDbReplService from slot {:?}", + last_replicated_slot + ); + ReplicaNode { json_rpc_service, pubsub_service, optimistically_confirmed_bank_tracker, + accountsdb_repl_service, } } @@ -302,5 +328,10 @@ impl ReplicaNode { .join() .expect("optimistically_confirmed_bank_tracker"); } + if let Some(accountsdb_repl_service) = self.accountsdb_repl_service { + accountsdb_repl_service + .join() + .expect("accountsdb_repl_service"); + } } } diff --git a/replica-node/src/replica_util.rs b/replica-node/src/replica_util.rs index 269bfa012..8ba956b4d 100644 --- a/replica-node/src/replica_util.rs +++ b/replica-node/src/replica_util.rs @@ -19,7 +19,10 @@ use { net::{SocketAddr, UdpSocket}, path::Path, process::exit, - sync::{atomic::AtomicBool, Arc}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, thread::sleep, time::{Duration, Instant}, }, @@ -264,5 +267,9 @@ pub fn get_rpc_peer_info( ); let rpc_node_details = rpc_node_details.unwrap(); + // We no longer need the gossip node, stop it: + let gossip_exit_flag = gossip.1; + gossip_exit_flag.store(true, Ordering::Relaxed); + (gossip.0, rpc_node_details.0, rpc_node_details.1) } diff --git a/replica-node/tests/local_replica.rs b/replica-node/tests/local_replica.rs index e2fed68fc..f89cce1cd 100644 --- a/replica-node/tests/local_replica.rs +++ b/replica-node/tests/local_replica.rs @@ -9,6 +9,7 @@ use { local_cluster::{ClusterConfig, LocalCluster}, validator_configs::*, }, + solana_replica_lib::accountsdb_repl_server::AccountsDbReplServiceConfig, solana_replica_node::{ replica_node::{ReplicaNode, ReplicaNodeConfig}, replica_util, @@ -132,11 +133,22 @@ fn setup_snapshot_validator_config( // Create the account paths let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths); + let bind_ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + let accountsdb_repl_port = + solana_net_utils::find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap(); + let replica_server_addr = SocketAddr::new(bind_ip_addr, accountsdb_repl_port); + + let accountsdb_repl_service_config = Some(AccountsDbReplServiceConfig { + worker_threads: 1, + replica_server_addr, + }); + // Create the validator config let validator_config = ValidatorConfig { snapshot_config: Some(snapshot_config), account_paths: account_storage_paths, accounts_hash_interval_slots: snapshot_interval_slots, + accountsdb_repl_service_config, ..ValidatorConfig::default() }; @@ -259,7 +271,14 @@ fn test_replica_bootstrap() { info!("The cluster info:\n{:?}", cluster_info.contact_info_trace()); let config = ReplicaNodeConfig { - rpc_source_addr: contact_info.rpc, + rpc_peer_addr: contact_info.rpc, + accountsdb_repl_peer_addr: Some( + leader_snapshot_test_config + .validator_config + .accountsdb_repl_service_config + .unwrap() + .replica_server_addr, + ), rpc_addr, rpc_pubsub_addr, ledger_path: ledger_path.to_path_buf(), diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index 12de21495..9e49d1d76 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -63,6 +63,7 @@ impl OptimisticallyConfirmedBankTracker { bank_forks: Arc>, optimistically_confirmed_bank: Arc>, subscriptions: Arc, + confirmed_bank_subscribers: Option>>>>, ) -> Self { let exit_ = exit.clone(); let mut pending_optimistically_confirmed_banks = HashSet::new(); @@ -83,6 +84,7 @@ impl OptimisticallyConfirmedBankTracker { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &confirmed_bank_subscribers, ) { break; } @@ -99,6 +101,7 @@ impl OptimisticallyConfirmedBankTracker { mut pending_optimistically_confirmed_banks: &mut HashSet, mut last_notified_confirmed_slot: &mut Slot, mut highest_confirmed_slot: &mut Slot, + confirmed_bank_subscribers: &Option>>>>, ) -> Result<(), RecvTimeoutError> { let notification = receiver.recv_timeout(Duration::from_secs(1))?; Self::process_notification( @@ -109,6 +112,7 @@ impl OptimisticallyConfirmedBankTracker { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + confirmed_bank_subscribers, ); Ok(()) } @@ -119,6 +123,7 @@ impl OptimisticallyConfirmedBankTracker { bank: &Arc, last_notified_confirmed_slot: &mut Slot, pending_optimistically_confirmed_banks: &mut HashSet, + confirmed_bank_subscribers: &Option>>>>, ) { if bank.is_frozen() { if bank.slot() > *last_notified_confirmed_slot { @@ -128,6 +133,20 @@ impl OptimisticallyConfirmedBankTracker { ); subscriptions.notify_gossip_subscribers(bank.slot()); *last_notified_confirmed_slot = bank.slot(); + if let Some(confirmed_bank_subscribers) = confirmed_bank_subscribers { + for sender in confirmed_bank_subscribers.read().unwrap().iter() { + match sender.send(bank.slot()) { + Ok(_) => {} + Err(err) => { + info!( + "Failed to send slot {:} update, error: {:?}", + bank.slot(), + err + ); + } + } + } + } } } else if bank.slot() > bank_forks.read().unwrap().root_bank().slot() { pending_optimistically_confirmed_banks.insert(bank.slot()); @@ -142,6 +161,7 @@ impl OptimisticallyConfirmedBankTracker { slot_threshold: Slot, mut last_notified_confirmed_slot: &mut Slot, mut pending_optimistically_confirmed_banks: &mut HashSet, + confirmed_bank_subscribers: &Option>>>>, ) { for confirmed_bank in bank.clone().parents_inclusive().iter().rev() { if confirmed_bank.slot() > slot_threshold { @@ -155,6 +175,7 @@ impl OptimisticallyConfirmedBankTracker { confirmed_bank, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, + confirmed_bank_subscribers, ); } } @@ -168,6 +189,7 @@ impl OptimisticallyConfirmedBankTracker { mut pending_optimistically_confirmed_banks: &mut HashSet, mut last_notified_confirmed_slot: &mut Slot, highest_confirmed_slot: &mut Slot, + confirmed_bank_subscribers: &Option>>>>, ) { debug!("received bank notification: {:?}", notification); match notification { @@ -189,6 +211,7 @@ impl OptimisticallyConfirmedBankTracker { *highest_confirmed_slot, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, + confirmed_bank_subscribers, ); *highest_confirmed_slot = slot; @@ -237,6 +260,7 @@ impl OptimisticallyConfirmedBankTracker { *last_notified_confirmed_slot, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, + confirmed_bank_subscribers, ); let mut w_optimistically_confirmed_bank = @@ -320,6 +344,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(highest_confirmed_slot, 2); @@ -333,6 +358,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(highest_confirmed_slot, 2); @@ -346,6 +372,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(pending_optimistically_confirmed_banks.len(), 1); @@ -364,6 +391,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); assert_eq!(highest_confirmed_slot, 3); @@ -381,6 +409,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); assert_eq!(pending_optimistically_confirmed_banks.len(), 1); @@ -399,6 +428,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); @@ -424,6 +454,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 1b6f90dcd..8335a278b 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -7744,6 +7744,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7761,6 +7762,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7778,6 +7780,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7796,6 +7799,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 055cfc6ad..018012b50 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -1722,6 +1722,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // a closure to reduce code duplications in building expected responses: @@ -1765,6 +1766,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let (response, _) = robust_poll_or_panic(transport_receiver); @@ -1876,6 +1878,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // The following should panic @@ -1985,6 +1988,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // a closure to reduce code duplications in building expected responses: @@ -2036,6 +2040,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let (response, transport_receiver) = robust_poll_or_panic(transport_receiver); @@ -2505,6 +2510,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); // Now, notify the frozen bank and ensure its notifications are processed @@ -2517,6 +2523,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let (response, _) = robust_poll_or_panic(transport_receiver0); @@ -2563,6 +2570,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &None, ); let (response, _) = robust_poll_or_panic(transport_receiver1); let expected = json!({ diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index 8896a1eea..b9df80344 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -134,6 +134,9 @@ impl CachedAccountInner { } } } + pub fn pubkey(&self) -> Pubkey { + self.pubkey + } } #[derive(Debug, Default)] diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 7352ff86f..d131fa23f 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -43,6 +43,7 @@ solana-metrics = { path = "../metrics", version = "=1.8.0" } solana-net-utils = { path = "../net-utils", version = "=1.8.0" } solana-perf = { path = "../perf", version = "=1.8.0" } solana-poh = { path = "../poh", version = "=1.8.0" } +solana-replica-lib = { path = "../replica-lib", version = "=1.8.0" } solana-rpc = { path = "../rpc", version = "=1.8.0" } solana-runtime = { path = "../runtime", version = "=1.8.0" } solana-sdk = { path = "../sdk", version = "=1.8.0" } diff --git a/validator/src/main.rs b/validator/src/main.rs index d22af8843..027301668 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -37,6 +37,7 @@ use { solana_ledger::blockstore_db::BlockstoreRecoveryMode, solana_perf::recycler::enable_recycler_warming, solana_poh::poh_service, + solana_replica_lib::accountsdb_repl_server::AccountsDbReplServiceConfig, solana_rpc::{rpc::JsonRpcConfig, rpc_pubsub_service::PubSubConfig}, solana_runtime::{ accounts_db::{ @@ -1058,6 +1059,7 @@ pub fn main() { .send_transaction_leader_forward_count .to_string(); let default_rpc_threads = num_cpus::get().to_string(); + let default_accountsdb_repl_threads = num_cpus::get().to_string(); let default_max_snapshot_to_retain = &DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN.to_string(); let default_min_snapshot_download_speed = &DEFAULT_MIN_SNAPSHOT_DOWNLOAD_SPEED.to_string(); let default_max_snapshot_download_abort = &MAX_SNAPSHOT_DOWNLOAD_ABORT.to_string(); @@ -1745,6 +1747,41 @@ pub fn main() { .requires("enable_rpc_transaction_history") .help("Verifies blockstore roots on boot and fixes any gaps"), ) + .arg( + Arg::with_name("enable_accountsdb_repl") + .long("enable-accountsdb-repl") + .takes_value(false) + .hidden(true) + .help("Enable AccountsDb Replication"), + ) + .arg( + Arg::with_name("accountsdb_repl_bind_address") + .long("accountsdb-repl-bind-address") + .value_name("HOST") + .takes_value(true) + .validator(solana_net_utils::is_host) + .hidden(true) + .help("IP address to bind the AccountsDb Replication port [default: use --bind-address]"), + ) + .arg( + Arg::with_name("accountsdb_repl_port") + .long("accountsdb-repl-port") + .value_name("PORT") + .takes_value(true) + .validator(solana_validator::port_validator) + .hidden(true) + .help("Enable AccountsDb Replication Service on this port"), + ) + .arg( + Arg::with_name("accountsdb_repl_threads") + .long("accountsdb-repl-threads") + .value_name("NUMBER") + .validator(is_parsable::) + .takes_value(true) + .default_value(&default_accountsdb_repl_threads) + .hidden(true) + .help("Number of threads to use for servicing AccountsDb Replication requests"), + ) .arg( Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch") .alias("halt-on-trusted-validators-accounts-hash-mismatch") @@ -2396,6 +2433,26 @@ pub fn main() { .ok() .map(|bins| AccountsIndexConfig { bins: Some(bins) }); + let accountsdb_repl_service_config = if matches.is_present("enable_accountsdb_repl") { + let accountsdb_repl_bind_address = if matches.is_present("accountsdb_repl_bind_address") { + solana_net_utils::parse_host(matches.value_of("accountsdb_repl_bind_address").unwrap()) + .expect("invalid accountsdb_repl_bind_address") + } else { + bind_address + }; + let accountsdb_repl_port = value_t_or_exit!(matches, "accountsdb_repl_port", u16); + + Some(AccountsDbReplServiceConfig { + worker_threads: value_t_or_exit!(matches, "accountsdb_repl_threads", usize), + replica_server_addr: SocketAddr::new( + accountsdb_repl_bind_address, + accountsdb_repl_port, + ), + }) + } else { + None + }; + let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), tower_storage, @@ -2436,6 +2493,7 @@ pub fn main() { account_indexes: account_indexes.clone(), rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"), }, + accountsdb_repl_service_config, rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| { ( SocketAddr::new(rpc_bind_address, rpc_port),