Register SendTransactionService exit (#31261)

* Pass exit into SendTransactionService

* Abort SendTransactionService with BanksService

* Register SendTransactionService exit as part of RpcService validator Exit

* Improve test, ensure receiver has been dropped
This commit is contained in:
Tyera 2023-04-20 19:23:14 -05:00 committed by GitHub
parent 4e3300e7d6
commit 03c1744e1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 4 deletions

View File

@ -33,7 +33,7 @@ use {
convert::TryFrom,
io,
net::{Ipv4Addr, SocketAddr},
sync::{Arc, RwLock},
sync::{atomic::AtomicBool, Arc, RwLock},
thread::Builder,
time::Duration,
},
@ -433,6 +433,7 @@ pub async fn start_tcp_server(
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
connection_cache: Arc<ConnectionCache>,
exit: Arc<AtomicBool>,
) -> io::Result<()> {
// Note: These settings are copied straight from the tarpc example.
let server = tcp::listen(listen_addr, Bincode::default)
@ -460,6 +461,7 @@ pub async fn start_tcp_server(
&connection_cache,
5_000,
0,
exit.clone(),
);
let server = BanksServer::new(

View File

@ -39,6 +39,7 @@ async fn start_abortable_tcp_server(
bank_forks.clone(),
block_commitment_cache.clone(),
connection_cache,
exit.clone(),
)
.fuse();
let interval = IntervalStream::new(time::interval(Duration::from_millis(100))).fuse();

View File

@ -365,6 +365,7 @@ impl JsonRpcRequestProcessor {
&connection_cache,
1000,
1,
exit.clone(),
);
Self {
@ -6429,6 +6430,7 @@ pub mod tests {
&connection_cache,
1000,
1,
exit,
);
let mut bad_transaction = system_transaction::transfer(
@ -6697,6 +6699,7 @@ pub mod tests {
&connection_cache,
1000,
1,
exit,
);
assert_eq!(
request_processor.get_block_commitment(0),

View File

@ -476,6 +476,7 @@ impl JsonRpcService {
prioritization_fee_cache,
);
let exit = Arc::new(AtomicBool::new(false));
let leader_info =
poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder));
let _send_transaction_service = Arc::new(SendTransactionService::new_with_config(
@ -485,6 +486,7 @@ impl JsonRpcService {
receiver,
&connection_cache,
send_transaction_service_config,
exit.clone(),
));
#[cfg(test)]
@ -556,7 +558,10 @@ impl JsonRpcService {
validator_exit
.write()
.unwrap()
.register_exit(Box::new(move || close_handle_.close()));
.register_exit(Box::new(move || {
close_handle_.close();
exit.store(true, Ordering::Relaxed);
}));
Ok(Self {
thread_hdl,
#[cfg(test)]

View File

@ -332,6 +332,7 @@ impl SendTransactionService {
connection_cache: &Arc<ConnectionCache>,
retry_rate_ms: u64,
leader_forward_count: u64,
exit: Arc<AtomicBool>,
) -> Self {
let config = Config {
retry_rate_ms,
@ -345,6 +346,7 @@ impl SendTransactionService {
receiver,
connection_cache,
config,
exit,
)
}
@ -355,6 +357,7 @@ impl SendTransactionService {
receiver: Receiver<TransactionInfo>,
connection_cache: &Arc<ConnectionCache>,
config: Config,
exit: Arc<AtomicBool>,
) -> Self {
let stats_report = Arc::new(SendTransactionServiceStatsReport::default());
@ -362,7 +365,6 @@ impl SendTransactionService {
let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info)));
let exit = Arc::new(AtomicBool::new(false));
let receive_txn_thread = Self::receive_txn_thread(
tpu_address,
receiver,
@ -776,7 +778,7 @@ mod test {
use {
super::*,
crate::tpu_info::NullTpuInfo,
crossbeam_channel::unbounded,
crossbeam_channel::{bounded, unbounded},
solana_sdk::{
account::AccountSharedData,
genesis_config::create_genesis_config,
@ -804,12 +806,55 @@ mod test {
&connection_cache,
1000,
1,
Arc::new(AtomicBool::new(false)),
);
drop(sender);
send_transaction_service.join().unwrap();
}
#[test]
fn validator_exit() {
let tpu_address = "127.0.0.1:0".parse().unwrap();
let bank = Bank::default_for_tests();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let (sender, receiver) = bounded(0);
let dummy_tx_info = || TransactionInfo {
signature: Signature::default(),
wire_transaction: vec![0; 128],
last_valid_block_height: 0,
durable_nonce_info: None,
max_retries: None,
retries: 0,
last_sent_time: None,
};
let exit = Arc::new(AtomicBool::new(false));
let connection_cache = Arc::new(ConnectionCache::default());
let _send_transaction_service = SendTransactionService::new::<NullTpuInfo>(
tpu_address,
&bank_forks,
None,
receiver,
&connection_cache,
1000,
1,
exit.clone(),
);
sender.send(dummy_tx_info()).unwrap();
thread::spawn(move || {
exit.store(true, Ordering::Relaxed);
});
let mut option = Ok(());
while option.is_ok() {
option = sender.send(dummy_tx_info());
}
}
#[test]
fn process_transactions() {
solana_logger::setup();