From c67f8bd821541b07bf374bacb88aeb641b13415c Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Tue, 8 Sep 2020 02:00:49 -0700 Subject: [PATCH] Forward transactions to the expected leader instead of your own TPU port (#12004) * Use PoHRecorder to send to the right leader * cleanup * fmt * clippy * Cleanup, fix bug Co-authored-by: Carl --- Cargo.lock | 2 + banks-server/Cargo.toml | 2 + banks-server/src/banks_server.rs | 2 +- banks-server/src/lib.rs | 4 + .../src/send_transaction_service.rs | 4 +- core/src/lib.rs | 1 + core/src/rpc.rs | 25 +- core/src/rpc_service.rs | 19 +- core/src/send_transaction_service.rs | 413 ++++++++++++++++++ core/src/validator.rs | 89 ++-- runtime/src/lib.rs | 1 - 11 files changed, 497 insertions(+), 65 deletions(-) rename {runtime => banks-server}/src/send_transaction_service.rs (99%) create mode 100644 core/src/send_transaction_service.rs diff --git a/Cargo.lock b/Cargo.lock index 9664197966..1ff2317994 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3326,7 +3326,9 @@ version = "1.4.0" dependencies = [ "bincode", "futures 0.3.5", + "log 0.4.8", "solana-banks-interface", + "solana-metrics", "solana-runtime", "solana-sdk 1.4.0", "tarpc", diff --git a/banks-server/Cargo.toml b/banks-server/Cargo.toml index 56706c0622..20951cbec9 100644 --- a/banks-server/Cargo.toml +++ b/banks-server/Cargo.toml @@ -11,9 +11,11 @@ edition = "2018" [dependencies] bincode = "1.3.1" futures = "0.3" +log = "0.4.8" solana-banks-interface = { path = "../banks-interface", version = "1.4.0" } solana-runtime = { path = "../runtime", version = "1.4.0" } solana-sdk = { path = "../sdk", version = "1.4.0" } +solana-metrics = { path = "../metrics", version = "1.4.0" } tarpc = { version = "0.21.0", features = ["full"] } tokio = "0.2" tokio-serde = { version = "0.6", features = ["bincode"] } diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index e5192dde12..1c66c8f595 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -1,3 +1,4 @@ +use crate::send_transaction_service::{SendTransactionService, TransactionInfo}; use bincode::{deserialize, serialize}; use futures::{ future, @@ -8,7 +9,6 @@ use solana_runtime::{ bank::Bank, bank_forks::BankForks, commitment::{BlockCommitmentCache, CommitmentSlots}, - send_transaction_service::{SendTransactionService, TransactionInfo}, }; use solana_sdk::{ account::Account, diff --git a/banks-server/src/lib.rs b/banks-server/src/lib.rs index a9acc11e5b..09b7067a50 100644 --- a/banks-server/src/lib.rs +++ b/banks-server/src/lib.rs @@ -1,2 +1,6 @@ pub mod banks_server; pub mod rpc_banks_service; +pub mod send_transaction_service; + +#[macro_use] +extern crate solana_metrics; diff --git a/runtime/src/send_transaction_service.rs b/banks-server/src/send_transaction_service.rs similarity index 99% rename from runtime/src/send_transaction_service.rs rename to banks-server/src/send_transaction_service.rs index bab219b0e4..89f9f80e4f 100644 --- a/runtime/src/send_transaction_service.rs +++ b/banks-server/src/send_transaction_service.rs @@ -1,6 +1,6 @@ -use crate::{bank::Bank, bank_forks::BankForks}; use log::*; use solana_metrics::{datapoint_warn, inc_new_counter_info}; +use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::{clock::Slot, signature::Signature}; use std::{ collections::HashMap, @@ -205,8 +205,6 @@ mod test { #[test] fn process_transactions() { - solana_logger::setup(); - let (genesis_config, mint_keypair) = create_genesis_config(4); let bank = Bank::new(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); diff --git a/core/src/lib.rs b/core/src/lib.rs index 5323da0dcd..03d621674c 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -58,6 +58,7 @@ pub mod rpc_pubsub; pub mod rpc_pubsub_service; pub mod rpc_service; pub mod rpc_subscriptions; +pub mod send_transaction_service; pub mod serve_repair; pub mod serve_repair_service; pub mod sigverify; diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 23e072556f..c62cd533d9 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -1,9 +1,13 @@ //! The `rpc` module implements the Solana RPC interface. use crate::{ - cluster_info::ClusterInfo, contact_info::ContactInfo, - non_circulating_supply::calculate_non_circulating_supply, rpc_error::RpcCustomError, - rpc_health::*, validator::ValidatorExit, + cluster_info::ClusterInfo, + contact_info::ContactInfo, + non_circulating_supply::calculate_non_circulating_supply, + rpc_error::RpcCustomError, + rpc_health::*, + send_transaction_service::{SendTransactionService, TransactionInfo}, + validator::ValidatorExit, }; use bincode::{config::Options, serialize}; use jsonrpc_core::{types::error, Error, Metadata, Result}; @@ -36,7 +40,6 @@ use solana_runtime::{ bank::Bank, bank_forks::BankForks, commitment::{BlockCommitmentArray, BlockCommitmentCache, CommitmentSlots}, - send_transaction_service::{SendTransactionService, TransactionInfo}, }; use solana_sdk::{ account::Account, @@ -213,7 +216,7 @@ impl JsonRpcRequestProcessor { let cluster_info = Arc::new(ClusterInfo::default()); let tpu_address = cluster_info.my_contact_info().tpu; let (sender, receiver) = channel(); - SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); Self { config: JsonRpcConfig::default(), @@ -2692,7 +2695,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr( &leader_pubkey, @@ -4004,7 +4007,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#; let res = io.handle_request_sync(req, meta); @@ -4045,7 +4048,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); let mut bad_transaction = system_transaction::transfer(&mint_keypair, &Pubkey::new_rand(), 42, Hash::default()); @@ -4227,7 +4230,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); assert_eq!(request_processor.validator_exit(), false); assert_eq!(exit.load(Ordering::Relaxed), false); } @@ -4256,7 +4259,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); assert_eq!(request_processor.validator_exit(), true); assert_eq!(exit.load(Ordering::Relaxed), true); } @@ -4342,7 +4345,7 @@ pub mod tests { &runtime::Runtime::new().unwrap(), None, ); - SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); + SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); assert_eq!( request_processor.get_block_commitment(0), RpcBlockCommitment { diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index f8ef0c6067..2152c4409d 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -1,8 +1,13 @@ //! The `rpc_service` module implements the Solana JSON RPC service. use crate::{ - bigtable_upload_service::BigTableUploadService, cluster_info::ClusterInfo, rpc::*, - rpc_health::*, validator::ValidatorExit, + bigtable_upload_service::BigTableUploadService, + cluster_info::ClusterInfo, + poh_recorder::PohRecorder, + rpc::*, + rpc_health::*, + send_transaction_service::{LeaderInfo, SendTransactionService}, + validator::ValidatorExit, }; use jsonrpc_core::MetaIoHandler; use jsonrpc_http_server::{ @@ -14,7 +19,6 @@ use solana_ledger::blockstore::Blockstore; use solana_runtime::{ bank_forks::{BankForks, SnapshotConfig}, commitment::BlockCommitmentCache, - send_transaction_service::SendTransactionService, snapshot_utils, }; use solana_sdk::{hash::Hash, native_token::lamports_to_sol, pubkey::Pubkey}; @@ -23,7 +27,7 @@ use std::{ net::SocketAddr, path::{Path, PathBuf}, sync::atomic::{AtomicBool, Ordering}, - sync::{mpsc::channel, Arc, RwLock}, + sync::{mpsc::channel, Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, }; use tokio::runtime; @@ -239,6 +243,7 @@ impl JsonRpcService { block_commitment_cache: Arc>, blockstore: Arc, cluster_info: Arc, + poh_recorder: Option>>, genesis_hash: Hash, ledger_path: &Path, validator_exit: Arc>>, @@ -302,16 +307,19 @@ impl JsonRpcService { blockstore, validator_exit.clone(), health.clone(), - cluster_info, + cluster_info.clone(), genesis_hash, &runtime, bigtable_ledger_storage, ); let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); + let leader_info = + poh_recorder.map(|recorder| LeaderInfo::new(cluster_info.clone(), recorder)); let _send_transaction_service = Arc::new(SendTransactionService::new( tpu_address, &bank_forks, + leader_info, &exit_send_transaction_service, receiver, )); @@ -438,6 +446,7 @@ mod tests { block_commitment_cache, blockstore, cluster_info, + None, Hash::default(), &PathBuf::from("farf"), validator_exit, diff --git a/core/src/send_transaction_service.rs b/core/src/send_transaction_service.rs new file mode 100644 index 0000000000..48e3ae947a --- /dev/null +++ b/core/src/send_transaction_service.rs @@ -0,0 +1,413 @@ +use crate::cluster_info::ClusterInfo; +use crate::poh_recorder::PohRecorder; +use log::*; +use solana_metrics::{datapoint_warn, inc_new_counter_info}; +use solana_runtime::{bank::Bank, bank_forks::BankForks}; +use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; +use std::sync::Mutex; +use std::{ + collections::HashMap, + net::{SocketAddr, UdpSocket}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::Receiver, + Arc, RwLock, + }, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, +}; + +/// Maximum size of the transaction queue +const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but maybe it needs to be bigger one day + +pub struct SendTransactionService { + thread: JoinHandle<()>, +} + +pub struct TransactionInfo { + pub signature: Signature, + pub wire_transaction: Vec, + pub last_valid_slot: Slot, +} + +impl TransactionInfo { + pub fn new(signature: Signature, wire_transaction: Vec, last_valid_slot: Slot) -> Self { + Self { + signature, + wire_transaction, + last_valid_slot, + } + } +} + +pub struct LeaderInfo { + cluster_info: Arc, + poh_recorder: Arc>, + recent_peers: HashMap, +} + +impl LeaderInfo { + pub fn new(cluster_info: Arc, poh_recorder: Arc>) -> Self { + Self { + cluster_info, + poh_recorder, + recent_peers: HashMap::new(), + } + } + + pub fn refresh_recent_peers(&mut self) { + self.recent_peers = self + .cluster_info + .tpu_peers() + .into_iter() + .map(|ci| (ci.id, ci.tpu)) + .collect(); + } + + pub fn get_leader_tpu(&self) -> Option<&SocketAddr> { + self.poh_recorder + .lock() + .unwrap() + .leader_after_n_slots(0) + .and_then(|leader| self.recent_peers.get(&leader)) + } +} + +#[derive(Default, Debug, PartialEq)] +struct ProcessTransactionsResult { + rooted: u64, + expired: u64, + retried: u64, + failed: u64, + retained: u64, +} + +impl SendTransactionService { + pub fn new( + tpu_address: SocketAddr, + bank_forks: &Arc>, + leader_info: Option, + exit: &Arc, + receiver: Receiver, + ) -> Self { + let thread = Self::retry_thread( + tpu_address, + receiver, + bank_forks.clone(), + leader_info, + exit.clone(), + ); + Self { thread } + } + + fn retry_thread( + tpu_address: SocketAddr, + receiver: Receiver, + bank_forks: Arc>, + mut leader_info: Option, + exit: Arc, + ) -> JoinHandle<()> { + let mut last_status_check = Instant::now(); + let mut transactions = HashMap::new(); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + + if let Some(leader_info) = leader_info.as_mut() { + leader_info.refresh_recent_peers(); + } + + Builder::new() + .name("send-tx-svc".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + + if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) { + let address = leader_info + .as_ref() + .and_then(|leader_info| leader_info.get_leader_tpu()) + .unwrap_or(&tpu_address); + Self::send_transaction( + &send_socket, + address, + &transaction_info.wire_transaction, + ); + if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { + transactions.insert(transaction_info.signature, transaction_info); + } else { + datapoint_warn!("send_transaction_service-queue-overflow"); + } + } + + if Instant::now().duration_since(last_status_check).as_secs() >= 5 { + if !transactions.is_empty() { + datapoint_info!( + "send_transaction_service-queue-size", + ("len", transactions.len(), i64) + ); + let bank_forks = bank_forks.read().unwrap(); + let root_bank = bank_forks.root_bank(); + let working_bank = bank_forks.working_bank(); + + let _result = Self::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &leader_info, + ); + } + last_status_check = Instant::now(); + if let Some(leader_info) = leader_info.as_mut() { + leader_info.refresh_recent_peers(); + } + } + }) + .unwrap() + } + + fn process_transactions( + working_bank: &Arc, + root_bank: &Arc, + send_socket: &UdpSocket, + tpu_address: &SocketAddr, + transactions: &mut HashMap, + leader_info: &Option, + ) -> ProcessTransactionsResult { + let mut result = ProcessTransactionsResult::default(); + + transactions.retain(|signature, transaction_info| { + if root_bank.has_signature(signature) { + info!("Transaction is rooted: {}", signature); + result.rooted += 1; + inc_new_counter_info!("send_transaction_service-rooted", 1); + false + } else if transaction_info.last_valid_slot < root_bank.slot() { + info!("Dropping expired transaction: {}", signature); + result.expired += 1; + inc_new_counter_info!("send_transaction_service-expired", 1); + false + } else { + match working_bank.get_signature_status_slot(signature) { + None => { + // Transaction is unknown to the working bank, it might have been + // dropped or landed in another fork. Re-send it + info!("Retrying transaction: {}", signature); + result.retried += 1; + inc_new_counter_info!("send_transaction_service-retry", 1); + Self::send_transaction( + &send_socket, + leader_info + .as_ref() + .and_then(|leader_info| leader_info.get_leader_tpu()) + .unwrap_or(&tpu_address), + &transaction_info.wire_transaction, + ); + true + } + Some((_slot, status)) => { + if status.is_err() { + info!("Dropping failed transaction: {}", signature); + result.failed += 1; + inc_new_counter_info!("send_transaction_service-failed", 1); + false + } else { + result.retained += 1; + true + } + } + } + } + }); + + result + } + + fn send_transaction( + send_socket: &UdpSocket, + tpu_address: &SocketAddr, + wire_transaction: &[u8], + ) { + if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) { + warn!("Failed to send transaction to {}: {:?}", tpu_address, err); + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread.join() + } +} + +#[cfg(test)] +mod test { + use super::*; + use solana_sdk::{ + genesis_config::create_genesis_config, pubkey::Pubkey, signature::Signer, + system_transaction, + }; + use std::sync::mpsc::channel; + + #[test] + fn service_exit() { + let tpu_address = "127.0.0.1:0".parse().unwrap(); + let bank = Bank::default(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let exit = Arc::new(AtomicBool::new(false)); + let (_sender, receiver) = channel(); + + let send_tranaction_service = + SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); + + exit.store(true, Ordering::Relaxed); + send_tranaction_service.join().unwrap(); + } + + #[test] + fn process_transactions() { + solana_logger::setup(); + + let (genesis_config, mint_keypair) = create_genesis_config(4); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let tpu_address = "127.0.0.1:0".parse().unwrap(); + + let root_bank = Arc::new(Bank::new_from_parent( + &bank_forks.read().unwrap().working_bank(), + &Pubkey::default(), + 1, + )); + let rooted_signature = root_bank + .transfer(1, &mint_keypair, &mint_keypair.pubkey()) + .unwrap(); + + let working_bank = Arc::new(Bank::new_from_parent(&root_bank, &Pubkey::default(), 2)); + + let non_rooted_signature = working_bank + .transfer(2, &mint_keypair, &mint_keypair.pubkey()) + .unwrap(); + + let failed_signature = { + let blockhash = working_bank.last_blockhash(); + let transaction = + system_transaction::transfer(&mint_keypair, &Pubkey::default(), 1, blockhash); + let signature = transaction.signatures[0]; + working_bank.process_transaction(&transaction).unwrap_err(); + signature + }; + + let mut transactions = HashMap::new(); + + info!("Expired transactions are dropped.."); + transactions.insert( + Signature::default(), + TransactionInfo::new(Signature::default(), vec![], root_bank.slot() - 1), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + expired: 1, + ..ProcessTransactionsResult::default() + } + ); + + info!("Rooted transactions are dropped..."); + transactions.insert( + rooted_signature, + TransactionInfo::new(rooted_signature, vec![], working_bank.slot()), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + rooted: 1, + ..ProcessTransactionsResult::default() + } + ); + + info!("Failed transactions are dropped..."); + transactions.insert( + failed_signature, + TransactionInfo::new(failed_signature, vec![], working_bank.slot()), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + failed: 1, + ..ProcessTransactionsResult::default() + } + ); + + info!("Non-rooted transactions are kept..."); + transactions.insert( + non_rooted_signature, + TransactionInfo::new(non_rooted_signature, vec![], working_bank.slot()), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert_eq!(transactions.len(), 1); + assert_eq!( + result, + ProcessTransactionsResult { + retained: 1, + ..ProcessTransactionsResult::default() + } + ); + transactions.clear(); + + info!("Unknown transactions are retried..."); + transactions.insert( + Signature::default(), + TransactionInfo::new(Signature::default(), vec![], working_bank.slot()), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert_eq!(transactions.len(), 1); + assert_eq!( + result, + ProcessTransactionsResult { + retried: 1, + ..ProcessTransactionsResult::default() + } + ); + } +} diff --git a/core/src/validator.rs b/core/src/validator.rs index fdf35de172..37d22ccfcb 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -297,49 +297,6 @@ impl Validator { &exit, ); - let rpc_override_health_check = Arc::new(AtomicBool::new(false)); - let rpc_service = config - .rpc_ports - .map(|(rpc_port, rpc_pubsub_port, rpc_banks_port)| { - if ContactInfo::is_valid_address(&node.info.rpc) { - assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); - assert_eq!(rpc_port, node.info.rpc.port()); - assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port()); - assert_eq!(rpc_banks_port, node.info.rpc_banks.port()); - } else { - assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); - } - let tpu_address = cluster_info.my_contact_info().tpu; - ( - JsonRpcService::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), - config.rpc_config.clone(), - config.snapshot_config.clone(), - bank_forks.clone(), - block_commitment_cache.clone(), - blockstore.clone(), - cluster_info.clone(), - genesis_config.hash(), - ledger_path, - validator_exit.clone(), - config.trusted_validators.clone(), - rpc_override_health_check.clone(), - ), - PubSubService::new( - &subscriptions, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), - &exit, - ), - RpcBanksService::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_banks_port), - tpu_address, - &bank_forks, - &block_commitment_cache, - &exit, - ), - ) - }); - info!( "Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}", bank.epoch(), @@ -362,7 +319,7 @@ impl Validator { std::thread::park(); } - let poh_config = Arc::new(genesis_config.poh_config); + let poh_config = Arc::new(genesis_config.poh_config.clone()); let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal( bank.tick_height(), bank.last_blockhash(), @@ -386,6 +343,50 @@ impl Validator { } let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let rpc_override_health_check = Arc::new(AtomicBool::new(false)); + let rpc_service = config + .rpc_ports + .map(|(rpc_port, rpc_pubsub_port, rpc_banks_port)| { + if ContactInfo::is_valid_address(&node.info.rpc) { + assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + assert_eq!(rpc_port, node.info.rpc.port()); + assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port()); + assert_eq!(rpc_banks_port, node.info.rpc_banks.port()); + } else { + assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + } + let tpu_address = cluster_info.my_contact_info().tpu; + ( + JsonRpcService::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), + config.rpc_config.clone(), + config.snapshot_config.clone(), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + Some(poh_recorder.clone()), + genesis_config.hash(), + ledger_path, + validator_exit.clone(), + config.trusted_validators.clone(), + rpc_override_health_check.clone(), + ), + PubSubService::new( + &subscriptions, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), + &exit, + ), + RpcBanksService::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_banks_port), + tpu_address, + &bank_forks, + &block_commitment_cache, + &exit, + ), + ) + }); + let ip_echo_server = solana_net_utils::ip_echo_server(node.sockets.ip_echo.unwrap()); let gossip_service = GossipService::new( diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index d78d793edd..ef33a47e92 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -20,7 +20,6 @@ pub mod message_processor; mod native_loader; pub mod nonce_utils; pub mod rent_collector; -pub mod send_transaction_service; pub mod serde_snapshot; pub mod snapshot_package; pub mod snapshot_utils;