diff --git a/Cargo.lock b/Cargo.lock index 3a1bc9f756..5ad1903fa3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4429,9 +4429,9 @@ dependencies = [ "log 0.4.14", "mio 0.7.13", "solana-banks-interface", - "solana-metrics", "solana-runtime", "solana-sdk", + "solana-send-transaction-service", "tarpc", "tokio 1.10.0", "tokio-serde", @@ -5571,6 +5571,7 @@ dependencies = [ "solana-poh", "solana-runtime", "solana-sdk", + "solana-send-transaction-service", "solana-stake-program", "solana-storage-bigtable", "solana-streamer", @@ -5729,6 +5730,17 @@ dependencies = [ "solana-sdk", ] +[[package]] +name = "solana-send-transaction-service" +version = "1.8.0" +dependencies = [ + "log 0.4.14", + "solana-logger 1.8.0", + "solana-metrics", + "solana-runtime", + "solana-sdk", +] + [[package]] name = "solana-stake-accounts" version = "1.8.0" diff --git a/Cargo.toml b/Cargo.toml index 8abf20d575..8c0fe87cfc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,10 +56,11 @@ members = [ "rpc", "runtime", "runtime/store-tool", + "scripts", "sdk", "sdk/cargo-build-bpf", "sdk/cargo-test-bpf", - "scripts", + "send-transaction-service", "stake-accounts", "sys-tuner", "tokens", diff --git a/banks-client/src/lib.rs b/banks-client/src/lib.rs index 46ac029817..83c2c9e06e 100644 --- a/banks-client/src/lib.rs +++ b/banks-client/src/lib.rs @@ -5,30 +5,32 @@ //! but they are undocumented, may change over time, and are generally more //! cumbersome to use. -use borsh::BorshDeserialize; -use futures::{future::join_all, Future, FutureExt}; pub use solana_banks_interface::{BanksClient as TarpcClient, TransactionStatus}; -use solana_banks_interface::{BanksRequest, BanksResponse}; -use solana_program::{ - clock::Slot, fee_calculator::FeeCalculator, hash::Hash, program_pack::Pack, pubkey::Pubkey, - rent::Rent, sysvar::Sysvar, +use { + borsh::BorshDeserialize, + futures::{future::join_all, Future, FutureExt}, + solana_banks_interface::{BanksRequest, BanksResponse}, + solana_program::{ + clock::Slot, fee_calculator::FeeCalculator, hash::Hash, program_pack::Pack, pubkey::Pubkey, + rent::Rent, sysvar::Sysvar, + }, + solana_sdk::{ + account::{from_account, Account}, + commitment_config::CommitmentLevel, + signature::Signature, + transaction::{self, Transaction}, + transport, + }, + std::io::{self, Error, ErrorKind}, + tarpc::{ + client::{self, NewClient, RequestDispatch}, + context::{self, Context}, + serde_transport::tcp, + ClientMessage, Response, Transport, + }, + tokio::{net::ToSocketAddrs, time::Duration}, + tokio_serde::formats::Bincode, }; -use solana_sdk::{ - account::{from_account, Account}, - commitment_config::CommitmentLevel, - signature::Signature, - transaction::{self, Transaction}, - transport, -}; -use std::io::{self, Error, ErrorKind}; -use tarpc::{ - client::{self, NewClient, RequestDispatch}, - context::{self, Context}, - serde_transport::tcp, - ClientMessage, Response, Transport, -}; -use tokio::{net::ToSocketAddrs, time::Duration}; -use tokio_serde::formats::Bincode; // This exists only for backward compatibility pub trait BanksClientExt {} diff --git a/banks-interface/src/lib.rs b/banks-interface/src/lib.rs index 0ee4fddabb..4e07048da0 100644 --- a/banks-interface/src/lib.rs +++ b/banks-interface/src/lib.rs @@ -1,13 +1,15 @@ -use serde::{Deserialize, Serialize}; -use solana_sdk::{ - account::Account, - clock::Slot, - commitment_config::CommitmentLevel, - fee_calculator::FeeCalculator, - hash::Hash, - pubkey::Pubkey, - signature::Signature, - transaction::{self, Transaction, TransactionError}, +use { + serde::{Deserialize, Serialize}, + solana_sdk::{ + account::Account, + clock::Slot, + commitment_config::CommitmentLevel, + fee_calculator::FeeCalculator, + hash::Hash, + pubkey::Pubkey, + signature::Signature, + transaction::{self, Transaction, TransactionError}, + }, }; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/banks-server/Cargo.toml b/banks-server/Cargo.toml index a2b44f3734..08ad4585e9 100644 --- a/banks-server/Cargo.toml +++ b/banks-server/Cargo.toml @@ -17,7 +17,7 @@ mio = "0.7.13" solana-banks-interface = { path = "../banks-interface", version = "=1.8.0" } solana-runtime = { path = "../runtime", version = "=1.8.0" } solana-sdk = { path = "../sdk", version = "=1.8.0" } -solana-metrics = { path = "../metrics", version = "=1.8.0" } +solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.8.0" } tarpc = { version = "0.26.2", features = ["full"] } tokio = { version = "1", features = ["full"] } tokio-serde = { version = "0.8", features = ["bincode"] } diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index b528d5963c..0d6184f900 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -1,39 +1,44 @@ -use crate::send_transaction_service::{SendTransactionService, TransactionInfo}; -use bincode::{deserialize, serialize}; -use futures::{future, prelude::stream::StreamExt}; -use solana_banks_interface::{ - Banks, BanksRequest, BanksResponse, TransactionConfirmationStatus, TransactionStatus, -}; -use solana_runtime::{bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache}; -use solana_sdk::{ - account::Account, - clock::Slot, - commitment_config::CommitmentLevel, - fee_calculator::FeeCalculator, - hash::Hash, - pubkey::Pubkey, - signature::Signature, - transaction::{self, Transaction}, -}; -use std::{ - io, - net::{Ipv4Addr, SocketAddr}, - sync::{ - mpsc::{channel, Receiver, Sender}, - Arc, RwLock, +use { + bincode::{deserialize, serialize}, + futures::{future, prelude::stream::StreamExt}, + solana_banks_interface::{ + Banks, BanksRequest, BanksResponse, TransactionConfirmationStatus, TransactionStatus, }, - thread::Builder, - time::Duration, + solana_runtime::{bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache}, + solana_sdk::{ + account::Account, + clock::Slot, + commitment_config::CommitmentLevel, + fee_calculator::FeeCalculator, + hash::Hash, + pubkey::Pubkey, + signature::Signature, + transaction::{self, Transaction}, + }, + solana_send_transaction_service::{ + send_transaction_service::{SendTransactionService, TransactionInfo}, + tpu_info::NullTpuInfo, + }, + std::{ + io, + net::{Ipv4Addr, SocketAddr}, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, RwLock, + }, + thread::Builder, + time::Duration, + }, + tarpc::{ + context::Context, + serde_transport::tcp, + server::{self, Channel, Incoming}, + transport::{self, channel::UnboundedChannel}, + ClientMessage, Response, + }, + tokio::time::sleep, + tokio_serde::formats::Bincode, }; -use tarpc::{ - context::Context, - serde_transport::tcp, - server::{self, Channel, Incoming}, - transport::{self, channel::UnboundedChannel}, - ClientMessage, Response, -}; -use tokio::time::sleep; -use tokio_serde::formats::Bincode; #[derive(Clone)] struct BanksServer { @@ -161,6 +166,7 @@ impl Banks for BanksServer { signature, serialize(&transaction).unwrap(), last_valid_block_height, + None, ); self.transaction_sender.send(info).unwrap(); } @@ -250,6 +256,7 @@ impl Banks for BanksServer { signature, serialize(&transaction).unwrap(), last_valid_block_height, + None, ); self.transaction_sender.send(info).unwrap(); self.poll_signature_status(&signature, blockhash, last_valid_block_height, commitment) @@ -302,7 +309,14 @@ pub async fn start_tcp_server( .map(move |chan| { let (sender, receiver) = channel(); - SendTransactionService::new(tpu_addr, &bank_forks, receiver); + SendTransactionService::new::( + tpu_addr, + &bank_forks, + None, + receiver, + 5_000, + 0, + ); let server = BanksServer::new(bank_forks.clone(), block_commitment_cache.clone(), sender); diff --git a/banks-server/src/lib.rs b/banks-server/src/lib.rs index f1c74a08ea..33c1ae5c07 100644 --- a/banks-server/src/lib.rs +++ b/banks-server/src/lib.rs @@ -1,7 +1,3 @@ #![allow(clippy::integer_arithmetic)] pub mod banks_server; pub mod rpc_banks_service; -pub mod send_transaction_service; - -#[macro_use] -extern crate solana_metrics; diff --git a/banks-server/src/rpc_banks_service.rs b/banks-server/src/rpc_banks_service.rs index 96a20bf931..11fdb7ea73 100644 --- a/banks-server/src/rpc_banks_service.rs +++ b/banks-server/src/rpc_banks_service.rs @@ -1,21 +1,23 @@ //! The `rpc_banks_service` module implements the Solana Banks RPC API. -use crate::banks_server::start_tcp_server; -use futures::{future::FutureExt, pin_mut, prelude::stream::StreamExt, select}; -use solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache}; -use std::{ - net::SocketAddr, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, +use { + crate::banks_server::start_tcp_server, + futures::{future::FutureExt, pin_mut, prelude::stream::StreamExt, select}, + solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache}, + std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, Builder, JoinHandle}, }, - thread::{self, Builder, JoinHandle}, + tokio::{ + runtime::Runtime, + time::{self, Duration}, + }, + tokio_stream::wrappers::IntervalStream, }; -use tokio::{ - runtime::Runtime, - time::{self, Duration}, -}; -use tokio_stream::wrappers::IntervalStream; pub struct RpcBanksService { thread_hdl: JoinHandle<()>, diff --git a/banks-server/src/send_transaction_service.rs b/banks-server/src/send_transaction_service.rs deleted file mode 100644 index d65e2e6752..0000000000 --- a/banks-server/src/send_transaction_service.rs +++ /dev/null @@ -1,347 +0,0 @@ -// TODO: Merge this implementation with the one at `core/src/send_transaction_service.rs` -use log::*; -use solana_metrics::{datapoint_warn, inc_new_counter_info}; -use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::signature::Signature; -use std::{ - collections::HashMap, - net::{SocketAddr, UdpSocket}, - sync::{ - mpsc::{Receiver, RecvTimeoutError}, - Arc, RwLock, - }, - thread::{self, Builder, JoinHandle}, - time::{Duration, Instant}, -}; - -/// Maximum size of the transaction queue -const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but maybe it needs to be bigger one day - -pub struct SendTransactionService { - thread: JoinHandle<()>, -} - -pub struct TransactionInfo { - pub signature: Signature, - pub wire_transaction: Vec, - pub last_valid_block_height: u64, -} - -impl TransactionInfo { - pub fn new( - signature: Signature, - wire_transaction: Vec, - last_valid_block_height: u64, - ) -> Self { - Self { - signature, - wire_transaction, - last_valid_block_height, - } - } -} - -#[derive(Default, Debug, PartialEq)] -struct ProcessTransactionsResult { - rooted: u64, - expired: u64, - retried: u64, - failed: u64, - retained: u64, -} - -impl SendTransactionService { - pub fn new( - tpu_address: SocketAddr, - bank_forks: &Arc>, - receiver: Receiver, - ) -> Self { - let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address); - Self { thread } - } - - fn retry_thread( - receiver: Receiver, - bank_forks: Arc>, - tpu_address: SocketAddr, - ) -> JoinHandle<()> { - let mut last_status_check = Instant::now(); - let mut transactions = HashMap::new(); - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - - Builder::new() - .name("send-tx-svc".to_string()) - .spawn(move || loop { - match receiver.recv_timeout(Duration::from_secs(1)) { - Err(RecvTimeoutError::Disconnected) => break, - Err(RecvTimeoutError::Timeout) => {} - Ok(transaction_info) => { - Self::send_transaction( - &send_socket, - &tpu_address, - &transaction_info.wire_transaction, - ); - if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { - transactions.insert(transaction_info.signature, transaction_info); - } else { - datapoint_warn!("send_transaction_service-queue-overflow"); - } - } - } - - if Instant::now().duration_since(last_status_check).as_secs() >= 5 { - if !transactions.is_empty() { - datapoint_info!( - "send_transaction_service-queue-size", - ("len", transactions.len(), i64) - ); - let bank_forks = bank_forks.read().unwrap(); - let root_bank = bank_forks.root_bank(); - let working_bank = bank_forks.working_bank(); - - let _result = Self::process_transactions( - &working_bank, - &root_bank, - &send_socket, - &tpu_address, - &mut transactions, - ); - } - last_status_check = Instant::now(); - } - }) - .unwrap() - } - - fn process_transactions( - working_bank: &Arc, - root_bank: &Arc, - send_socket: &UdpSocket, - tpu_address: &SocketAddr, - transactions: &mut HashMap, - ) -> ProcessTransactionsResult { - let mut result = ProcessTransactionsResult::default(); - - transactions.retain(|signature, transaction_info| { - if root_bank.has_signature(signature) { - info!("Transaction is rooted: {}", signature); - result.rooted += 1; - inc_new_counter_info!("send_transaction_service-rooted", 1); - false - } else if transaction_info.last_valid_block_height < root_bank.block_height() { - info!("Dropping expired transaction: {}", signature); - result.expired += 1; - inc_new_counter_info!("send_transaction_service-expired", 1); - false - } else { - match working_bank.get_signature_status_slot(signature) { - None => { - // Transaction is unknown to the working bank, it might have been - // dropped or landed in another fork. Re-send it - info!("Retrying transaction: {}", signature); - result.retried += 1; - inc_new_counter_info!("send_transaction_service-retry", 1); - Self::send_transaction( - send_socket, - tpu_address, - &transaction_info.wire_transaction, - ); - true - } - Some((_slot, status)) => { - if status.is_err() { - info!("Dropping failed transaction: {}", signature); - result.failed += 1; - inc_new_counter_info!("send_transaction_service-failed", 1); - false - } else { - result.retained += 1; - true - } - } - } - } - }); - - result - } - - fn send_transaction( - send_socket: &UdpSocket, - tpu_address: &SocketAddr, - wire_transaction: &[u8], - ) { - if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) { - warn!("Failed to send transaction to {}: {:?}", tpu_address, err); - } - } - - pub fn join(self) -> thread::Result<()> { - self.thread.join() - } -} - -#[cfg(test)] -mod test { - use super::*; - use solana_sdk::{ - genesis_config::create_genesis_config, pubkey::Pubkey, signature::Signer, - system_transaction, - }; - use std::sync::mpsc::channel; - - #[test] - fn service_exit() { - let tpu_address = "127.0.0.1:0".parse().unwrap(); - let bank = Bank::default_for_tests(); - let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let (sender, receiver) = channel(); - - let send_tranaction_service = - SendTransactionService::new(tpu_address, &bank_forks, receiver); - - drop(sender); - send_tranaction_service.join().unwrap(); - } - - #[test] - fn process_transactions() { - let (genesis_config, mint_keypair) = create_genesis_config(4); - let bank = Bank::new_for_tests(&genesis_config); - let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let tpu_address = "127.0.0.1:0".parse().unwrap(); - - let root_bank = Arc::new(Bank::new_from_parent( - &bank_forks.read().unwrap().working_bank(), - &Pubkey::default(), - 1, - )); - let rooted_signature = root_bank - .transfer(1, &mint_keypair, &mint_keypair.pubkey()) - .unwrap(); - - let working_bank = Arc::new(Bank::new_from_parent(&root_bank, &Pubkey::default(), 2)); - - let non_rooted_signature = working_bank - .transfer(2, &mint_keypair, &mint_keypair.pubkey()) - .unwrap(); - - let failed_signature = { - let blockhash = working_bank.last_blockhash(); - let transaction = - system_transaction::transfer(&mint_keypair, &Pubkey::default(), 1, blockhash); - let signature = transaction.signatures[0]; - working_bank.process_transaction(&transaction).unwrap_err(); - signature - }; - - let mut transactions = HashMap::new(); - - info!("Expired transactions are dropped.."); - transactions.insert( - Signature::default(), - TransactionInfo::new(Signature::default(), vec![], root_bank.slot() - 1), - ); - let result = SendTransactionService::process_transactions( - &working_bank, - &root_bank, - &send_socket, - &tpu_address, - &mut transactions, - ); - assert!(transactions.is_empty()); - assert_eq!( - result, - ProcessTransactionsResult { - expired: 1, - ..ProcessTransactionsResult::default() - } - ); - - info!("Rooted transactions are dropped..."); - transactions.insert( - rooted_signature, - TransactionInfo::new(rooted_signature, vec![], working_bank.slot()), - ); - let result = SendTransactionService::process_transactions( - &working_bank, - &root_bank, - &send_socket, - &tpu_address, - &mut transactions, - ); - assert!(transactions.is_empty()); - assert_eq!( - result, - ProcessTransactionsResult { - rooted: 1, - ..ProcessTransactionsResult::default() - } - ); - - info!("Failed transactions are dropped..."); - transactions.insert( - failed_signature, - TransactionInfo::new(failed_signature, vec![], working_bank.slot()), - ); - let result = SendTransactionService::process_transactions( - &working_bank, - &root_bank, - &send_socket, - &tpu_address, - &mut transactions, - ); - assert!(transactions.is_empty()); - assert_eq!( - result, - ProcessTransactionsResult { - failed: 1, - ..ProcessTransactionsResult::default() - } - ); - - info!("Non-rooted transactions are kept..."); - transactions.insert( - non_rooted_signature, - TransactionInfo::new(non_rooted_signature, vec![], working_bank.slot()), - ); - let result = SendTransactionService::process_transactions( - &working_bank, - &root_bank, - &send_socket, - &tpu_address, - &mut transactions, - ); - assert_eq!(transactions.len(), 1); - assert_eq!( - result, - ProcessTransactionsResult { - retained: 1, - ..ProcessTransactionsResult::default() - } - ); - transactions.clear(); - - info!("Unknown transactions are retried..."); - transactions.insert( - Signature::default(), - TransactionInfo::new(Signature::default(), vec![], working_bank.slot()), - ); - let result = SendTransactionService::process_transactions( - &working_bank, - &root_bank, - &send_socket, - &tpu_address, - &mut transactions, - ); - assert_eq!(transactions.len(), 1); - assert_eq!( - result, - ProcessTransactionsResult { - retried: 1, - ..ProcessTransactionsResult::default() - } - ); - } -} diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 01d1399985..f059eb4051 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -2437,9 +2437,9 @@ dependencies = [ "log", "mio", "solana-banks-interface", - "solana-metrics", "solana-runtime", "solana-sdk", + "solana-send-transaction-service", "tarpc", "tokio", "tokio-serde", @@ -3288,6 +3288,17 @@ dependencies = [ "solana-sdk", ] +[[package]] +name = "solana-send-transaction-service" +version = "1.8.0" +dependencies = [ + "log", + "solana-logger 1.8.0", + "solana-metrics", + "solana-runtime", + "solana-sdk", +] + [[package]] name = "solana-stake-program" version = "1.8.0" diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 85ca17132b..1302ee02e4 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -39,6 +39,7 @@ solana-perf = { path = "../perf", version = "=1.8.0" } solana-poh = { path = "../poh", version = "=1.8.0" } solana-runtime = { path = "../runtime", version = "=1.8.0" } solana-sdk = { path = "../sdk", version = "=1.8.0" } +solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.8.0" } solana-streamer = { path = "../streamer", version = "=1.8.0" } solana-storage-bigtable = { path = "../storage-bigtable", version = "=1.8.0" } solana-transaction-status = { path = "../transaction-status", version = "=1.8.0" } diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs new file mode 100644 index 0000000000..9f0cb34043 --- /dev/null +++ b/rpc/src/cluster_tpu_info.rs @@ -0,0 +1,190 @@ +use { + solana_gossip::cluster_info::ClusterInfo, + solana_poh::poh_recorder::PohRecorder, + solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey}, + solana_send_transaction_service::tpu_info::TpuInfo, + std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, Mutex}, + }, +}; + +pub struct ClusterTpuInfo { + cluster_info: Arc, + poh_recorder: Arc>, + recent_peers: HashMap, +} + +impl ClusterTpuInfo { + pub fn new(cluster_info: Arc, poh_recorder: Arc>) -> Self { + Self { + cluster_info, + poh_recorder, + recent_peers: HashMap::new(), + } + } +} + +impl TpuInfo for ClusterTpuInfo { + fn refresh_recent_peers(&mut self) { + self.recent_peers = self + .cluster_info + .tpu_peers() + .into_iter() + .map(|ci| (ci.id, ci.tpu)) + .collect(); + } + + fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr> { + let recorder = self.poh_recorder.lock().unwrap(); + let leaders: Vec<_> = (0..max_count) + .filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS)) + .collect(); + drop(recorder); + let mut unique_leaders = vec![]; + for leader in leaders.iter() { + if let Some(addr) = self.recent_peers.get(leader) { + if !unique_leaders.contains(&addr) { + unique_leaders.push(addr); + } + } + } + unique_leaders + } +} + +#[cfg(test)] +mod test { + use { + super::*, + solana_gossip::contact_info::ContactInfo, + solana_ledger::{ + blockstore::Blockstore, get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache, + }, + solana_runtime::{ + bank::Bank, + genesis_utils::{ + create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, + }, + }, + solana_sdk::{ + poh_config::PohConfig, + signature::{Keypair, Signer}, + timing::timestamp, + }, + solana_streamer::socket::SocketAddrSpace, + std::sync::atomic::AtomicBool, + }; + + #[test] + fn test_get_leader_tpus() { + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&ledger_path).unwrap(); + + let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand(); + let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand(); + let validator_vote_keypairs2 = ValidatorVoteKeypairs::new_rand(); + let validator_keypairs = vec![ + &validator_vote_keypairs0, + &validator_vote_keypairs1, + &validator_vote_keypairs2, + ]; + let GenesisConfigInfo { + genesis_config, + mint_keypair: _, + voting_keypair: _, + } = create_genesis_config_with_vote_accounts( + 1_000_000_000, + &validator_keypairs, + vec![10_000; 3], + ); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + + let (poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( + 0, + bank.last_blockhash(), + 0, + Some((2, 2)), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &Arc::new(PohConfig::default()), + Arc::new(AtomicBool::default()), + ); + + let node_keypair = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), + node_keypair, + SocketAddrSpace::Unspecified, + )); + + let validator0_socket = SocketAddr::from(([127, 0, 0, 1], 1111)); + let validator1_socket = SocketAddr::from(([127, 0, 0, 1], 2222)); + let validator2_socket = SocketAddr::from(([127, 0, 0, 1], 3333)); + let recent_peers: HashMap<_, _> = vec![ + ( + validator_vote_keypairs0.node_keypair.pubkey(), + validator0_socket, + ), + ( + validator_vote_keypairs1.node_keypair.pubkey(), + validator1_socket, + ), + ( + validator_vote_keypairs2.node_keypair.pubkey(), + validator2_socket, + ), + ] + .iter() + .cloned() + .collect(); + let leader_info = ClusterTpuInfo { + cluster_info, + poh_recorder: Arc::new(Mutex::new(poh_recorder)), + recent_peers: recent_peers.clone(), + }; + + let slot = bank.slot(); + let first_leader = + solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap(); + assert_eq!( + leader_info.get_leader_tpus(1), + vec![recent_peers.get(&first_leader).unwrap()] + ); + + let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at( + slot + NUM_CONSECUTIVE_LEADER_SLOTS, + &bank, + ) + .unwrap(); + let mut expected_leader_sockets = vec![ + recent_peers.get(&first_leader).unwrap(), + recent_peers.get(&second_leader).unwrap(), + ]; + expected_leader_sockets.dedup(); + assert_eq!(leader_info.get_leader_tpus(2), expected_leader_sockets); + + let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at( + slot + (2 * NUM_CONSECUTIVE_LEADER_SLOTS), + &bank, + ) + .unwrap(); + let mut expected_leader_sockets = vec![ + recent_peers.get(&first_leader).unwrap(), + recent_peers.get(&second_leader).unwrap(), + recent_peers.get(&third_leader).unwrap(), + ]; + expected_leader_sockets.dedup(); + assert_eq!(leader_info.get_leader_tpus(3), expected_leader_sockets); + + for x in 4..8 { + assert!(leader_info.get_leader_tpus(x).len() <= recent_peers.len()); + } + } + Blockstore::destroy(&ledger_path).unwrap(); + } +} diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index f3a9b4b6e9..ca7e872387 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -1,4 +1,5 @@ #![allow(clippy::integer_arithmetic)] +mod cluster_tpu_info; pub mod max_slots; pub mod optimistically_confirmed_bank_tracker; pub mod parsed_token_accounts; @@ -9,7 +10,6 @@ pub mod rpc_pubsub; pub mod rpc_pubsub_service; pub mod rpc_service; pub mod rpc_subscriptions; -pub mod send_transaction_service; pub mod transaction_status_service; #[macro_use] diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 2079058797..5a08973d46 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -2,11 +2,8 @@ use { crate::{ - max_slots::MaxSlots, - optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, - parsed_token_accounts::*, - rpc_health::*, - send_transaction_service::{SendTransactionService, TransactionInfo}, + max_slots::MaxSlots, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, + parsed_token_accounts::*, rpc_health::*, }, bincode::{config::Options, serialize}, jsonrpc_core::{futures::future, types::error, BoxFuture, Error, Metadata, Result}, @@ -69,6 +66,10 @@ use { sysvar::stake_history, transaction::{self, SanitizedTransaction, TransactionError, VersionedTransaction}, }, + solana_send_transaction_service::{ + send_transaction_service::{SendTransactionService, TransactionInfo}, + tpu_info::NullTpuInfo, + }, solana_streamer::socket::SocketAddrSpace, solana_transaction_status::{ ConfirmedBlock, EncodedConfirmedTransaction, Reward, RewardType, @@ -294,7 +295,14 @@ impl JsonRpcRequestProcessor { )); let tpu_address = cluster_info.my_contact_info().tpu; let (sender, receiver) = channel(); - SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); + SendTransactionService::new::( + tpu_address, + &bank_forks, + None, + receiver, + 1000, + 1, + ); Self { config: JsonRpcConfig::default(), @@ -4400,7 +4408,14 @@ pub mod tests { Arc::new(LeaderScheduleCache::new_from_bank(&bank)), max_complete_transaction_status_slot, ); - SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); + SendTransactionService::new::( + tpu_address, + &bank_forks, + None, + receiver, + 1000, + 1, + ); cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr( &leader_pubkey, @@ -5983,7 +5998,14 @@ pub mod tests { Arc::new(LeaderScheduleCache::default()), Arc::new(AtomicU64::default()), ); - SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); + SendTransactionService::new::( + tpu_address, + &bank_forks, + None, + receiver, + 1000, + 1, + ); let mut bad_transaction = system_transaction::transfer( &mint_keypair, @@ -6265,7 +6287,14 @@ pub mod tests { Arc::new(LeaderScheduleCache::default()), Arc::new(AtomicU64::default()), ); - SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); + SendTransactionService::new::( + tpu_address, + &bank_forks, + None, + receiver, + 1000, + 1, + ); assert_eq!( request_processor.get_block_commitment(0), RpcBlockCommitment { diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 02d689af3a..fa60e9a610 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -2,6 +2,7 @@ use { crate::{ + cluster_tpu_info::ClusterTpuInfo, max_slots::MaxSlots, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, rpc::{ @@ -9,7 +10,6 @@ use { rpc_full::*, rpc_minimal::*, rpc_obsolete_v1_7::*, *, }, rpc_health::*, - send_transaction_service::{LeaderInfo, SendTransactionService}, }, jsonrpc_core::{futures::prelude::*, MetaIoHandler}, jsonrpc_http_server::{ @@ -34,6 +34,7 @@ use { exit::Exit, genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH, hash::Hash, native_token::lamports_to_sol, pubkey::Pubkey, }, + solana_send_transaction_service::send_transaction_service::SendTransactionService, std::{ collections::HashSet, net::SocketAddr, @@ -381,7 +382,7 @@ impl JsonRpcService { ); let leader_info = - poh_recorder.map(|recorder| LeaderInfo::new(cluster_info.clone(), recorder)); + poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder)); let _send_transaction_service = Arc::new(SendTransactionService::new( tpu_address, &bank_forks, diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml new file mode 100644 index 0000000000..555ac96483 --- /dev/null +++ b/send-transaction-service/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "solana-send-transaction-service" +version = "1.8.0" +description = "Solana send transaction service" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-send-transaction-service" +license = "Apache-2.0" +edition = "2018" + +[dependencies] +log = "0.4.14" +solana-logger = { path = "../logger", version = "=1.8.0" } +solana-metrics = { path = "../metrics", version = "=1.8.0" } +solana-runtime = { path = "../runtime", version = "=1.8.0" } +solana-sdk = { path = "../sdk", version = "=1.8.0" } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/send-transaction-service/src/lib.rs b/send-transaction-service/src/lib.rs new file mode 100644 index 0000000000..48820025bb --- /dev/null +++ b/send-transaction-service/src/lib.rs @@ -0,0 +1,6 @@ +#![allow(clippy::integer_arithmetic)] +pub mod send_transaction_service; +pub mod tpu_info; + +#[macro_use] +extern crate solana_metrics; diff --git a/rpc/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs similarity index 77% rename from rpc/src/send_transaction_service.rs rename to send-transaction-service/src/send_transaction_service.rs index 9b06c644e0..3cc826a754 100644 --- a/rpc/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -1,20 +1,15 @@ -// TODO: Merge this implementation with the one at `banks-server/src/send_transaction_service.rs` use { + crate::tpu_info::TpuInfo, log::*, - solana_gossip::cluster_info::ClusterInfo, solana_metrics::{datapoint_warn, inc_new_counter_info}, - solana_poh::poh_recorder::PohRecorder, solana_runtime::{bank::Bank, bank_forks::BankForks}, - solana_sdk::{ - clock::NUM_CONSECUTIVE_LEADER_SLOTS, hash::Hash, nonce_account, pubkey::Pubkey, - signature::Signature, - }, + solana_sdk::{hash::Hash, nonce_account, pubkey::Pubkey, signature::Signature}, std::{ collections::HashMap, net::{SocketAddr, UdpSocket}, sync::{ mpsc::{Receiver, RecvTimeoutError}, - Arc, Mutex, RwLock, + Arc, RwLock, }, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, @@ -51,48 +46,6 @@ impl TransactionInfo { } } -pub struct LeaderInfo { - cluster_info: Arc, - poh_recorder: Arc>, - recent_peers: HashMap, -} - -impl LeaderInfo { - pub fn new(cluster_info: Arc, poh_recorder: Arc>) -> Self { - Self { - cluster_info, - poh_recorder, - recent_peers: HashMap::new(), - } - } - - pub fn refresh_recent_peers(&mut self) { - self.recent_peers = self - .cluster_info - .tpu_peers() - .into_iter() - .map(|ci| (ci.id, ci.tpu)) - .collect(); - } - - pub fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr> { - let recorder = self.poh_recorder.lock().unwrap(); - let leaders: Vec<_> = (0..max_count) - .filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS)) - .collect(); - drop(recorder); - let mut unique_leaders = vec![]; - for leader in leaders.iter() { - if let Some(addr) = self.recent_peers.get(leader) { - if !unique_leaders.contains(&addr) { - unique_leaders.push(addr); - } - } - } - unique_leaders - } -} - #[derive(Default, Debug, PartialEq)] struct ProcessTransactionsResult { rooted: u64, @@ -103,10 +56,10 @@ struct ProcessTransactionsResult { } impl SendTransactionService { - pub fn new( + pub fn new( tpu_address: SocketAddr, bank_forks: &Arc>, - leader_info: Option, + leader_info: Option, receiver: Receiver, retry_rate_ms: u64, leader_forward_count: u64, @@ -122,11 +75,11 @@ impl SendTransactionService { Self { thread } } - fn retry_thread( + fn retry_thread( tpu_address: SocketAddr, receiver: Receiver, bank_forks: Arc>, - mut leader_info: Option, + mut leader_info: Option, retry_rate_ms: u64, leader_forward_count: u64, ) -> JoinHandle<()> { @@ -209,13 +162,13 @@ impl SendTransactionService { .unwrap() } - fn process_transactions( + fn process_transactions( working_bank: &Arc, root_bank: &Arc, send_socket: &UdpSocket, tpu_address: &SocketAddr, transactions: &mut HashMap, - leader_info: &Option, + leader_info: &Option, leader_forward_count: u64, ) -> ProcessTransactionsResult { let mut result = ProcessTransactionsResult::default(); @@ -312,26 +265,13 @@ impl SendTransactionService { mod test { use { super::*, - solana_gossip::contact_info::ContactInfo, - solana_ledger::{ - blockstore::Blockstore, get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache, - }, - solana_runtime::genesis_utils::{ - create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, - }, + crate::tpu_info::NullTpuInfo, solana_sdk::{ - account::AccountSharedData, - fee_calculator::FeeCalculator, - genesis_config::create_genesis_config, - nonce, - poh_config::PohConfig, - pubkey::Pubkey, - signature::{Keypair, Signer}, + account::AccountSharedData, fee_calculator::FeeCalculator, + genesis_config::create_genesis_config, nonce, pubkey::Pubkey, signature::Signer, system_program, system_transaction, - timing::timestamp, }, - solana_streamer::socket::SocketAddrSpace, - std::sync::{atomic::AtomicBool, mpsc::channel}, + std::sync::mpsc::channel, }; #[test] @@ -341,8 +281,14 @@ mod test { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let (sender, receiver) = channel(); - let send_tranaction_service = - SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); + let send_tranaction_service = SendTransactionService::new::( + tpu_address, + &bank_forks, + None, + receiver, + 1000, + 1, + ); drop(sender); send_tranaction_service.join().unwrap(); @@ -395,7 +341,7 @@ mod test { None, ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -418,7 +364,7 @@ mod test { rooted_signature, TransactionInfo::new(rooted_signature, vec![], working_bank.block_height(), None), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -441,7 +387,7 @@ mod test { failed_signature, TransactionInfo::new(failed_signature, vec![], working_bank.block_height(), None), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -469,7 +415,7 @@ mod test { None, ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -498,7 +444,7 @@ mod test { None, ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -577,7 +523,7 @@ mod test { Some((nonce_address, durable_nonce)), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -604,7 +550,7 @@ mod test { Some((nonce_address, Hash::new_unique())), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -633,7 +579,7 @@ mod test { Some((nonce_address, Hash::new_unique())), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -660,7 +606,7 @@ mod test { Some((nonce_address, durable_nonce)), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -688,7 +634,7 @@ mod test { Some((nonce_address, Hash::new_unique())), // runtime should advance nonce on failed transactions ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -716,7 +662,7 @@ mod test { Some((nonce_address, Hash::new_unique())), // runtime advances nonce when transaction lands ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -745,7 +691,7 @@ mod test { Some((nonce_address, durable_nonce)), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -773,7 +719,7 @@ mod test { let nonce_account = AccountSharedData::new_data(43, &new_nonce_state, &system_program::id()).unwrap(); working_bank.store_account(&nonce_address, &nonce_account); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &send_socket, @@ -791,115 +737,4 @@ mod test { } ); } - - #[test] - fn test_get_leader_tpus() { - let ledger_path = get_tmp_ledger_path!(); - { - let blockstore = Blockstore::open(&ledger_path).unwrap(); - - let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand(); - let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand(); - let validator_vote_keypairs2 = ValidatorVoteKeypairs::new_rand(); - let validator_keypairs = vec![ - &validator_vote_keypairs0, - &validator_vote_keypairs1, - &validator_vote_keypairs2, - ]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( - 1_000_000_000, - &validator_keypairs, - vec![10_000; 3], - ); - let bank = Arc::new(Bank::new_for_tests(&genesis_config)); - - let (poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( - 0, - bank.last_blockhash(), - 0, - Some((2, 2)), - bank.ticks_per_slot(), - &Pubkey::default(), - &Arc::new(blockstore), - &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), - &Arc::new(PohConfig::default()), - Arc::new(AtomicBool::default()), - ); - - let node_keypair = Arc::new(Keypair::new()); - let cluster_info = Arc::new(ClusterInfo::new( - ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), - node_keypair, - SocketAddrSpace::Unspecified, - )); - - let validator0_socket = SocketAddr::from(([127, 0, 0, 1], 1111)); - let validator1_socket = SocketAddr::from(([127, 0, 0, 1], 2222)); - let validator2_socket = SocketAddr::from(([127, 0, 0, 1], 3333)); - let recent_peers: HashMap<_, _> = vec![ - ( - validator_vote_keypairs0.node_keypair.pubkey(), - validator0_socket, - ), - ( - validator_vote_keypairs1.node_keypair.pubkey(), - validator1_socket, - ), - ( - validator_vote_keypairs2.node_keypair.pubkey(), - validator2_socket, - ), - ] - .iter() - .cloned() - .collect(); - let leader_info = LeaderInfo { - cluster_info, - poh_recorder: Arc::new(Mutex::new(poh_recorder)), - recent_peers: recent_peers.clone(), - }; - - let slot = bank.slot(); - let first_leader = - solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap(); - assert_eq!( - leader_info.get_leader_tpus(1), - vec![recent_peers.get(&first_leader).unwrap()] - ); - - let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at( - slot + NUM_CONSECUTIVE_LEADER_SLOTS, - &bank, - ) - .unwrap(); - let mut expected_leader_sockets = vec![ - recent_peers.get(&first_leader).unwrap(), - recent_peers.get(&second_leader).unwrap(), - ]; - expected_leader_sockets.dedup(); - assert_eq!(leader_info.get_leader_tpus(2), expected_leader_sockets); - - let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at( - slot + (2 * NUM_CONSECUTIVE_LEADER_SLOTS), - &bank, - ) - .unwrap(); - let mut expected_leader_sockets = vec![ - recent_peers.get(&first_leader).unwrap(), - recent_peers.get(&second_leader).unwrap(), - recent_peers.get(&third_leader).unwrap(), - ]; - expected_leader_sockets.dedup(); - assert_eq!(leader_info.get_leader_tpus(3), expected_leader_sockets); - - for x in 4..8 { - assert!(leader_info.get_leader_tpus(x).len() <= recent_peers.len()); - } - } - Blockstore::destroy(&ledger_path).unwrap(); - } } diff --git a/send-transaction-service/src/tpu_info.rs b/send-transaction-service/src/tpu_info.rs new file mode 100644 index 0000000000..02bccb9641 --- /dev/null +++ b/send-transaction-service/src/tpu_info.rs @@ -0,0 +1,15 @@ +use std::net::SocketAddr; + +pub trait TpuInfo { + fn refresh_recent_peers(&mut self); + fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr>; +} + +pub struct NullTpuInfo; + +impl TpuInfo for NullTpuInfo { + fn refresh_recent_peers(&mut self) {} + fn get_leader_tpus(&self, _max_count: u64) -> Vec<&SocketAddr> { + vec![] + } +} diff --git a/transaction-status/Cargo.toml b/transaction-status/Cargo.toml index 7440c66d1c..45fbebd90f 100644 --- a/transaction-status/Cargo.toml +++ b/transaction-status/Cargo.toml @@ -19,8 +19,8 @@ serde = "1.0.128" serde_derive = "1.0.103" serde_json = "1.0.66" solana-account-decoder = { path = "../account-decoder", version = "=1.8.0" } -solana-sdk = { path = "../sdk", version = "=1.8.0" } solana-runtime = { path = "../runtime", version = "=1.8.0" } +solana-sdk = { path = "../sdk", version = "=1.8.0" } solana-vote-program = { path = "../programs/vote", version = "=1.8.0" } spl-associated-token-account-v1-0 = { package = "spl-associated-token-account", version = "=1.0.3", features = ["no-entrypoint"] } spl-memo = { version = "=3.0.1", features = ["no-entrypoint"] }