SendTransactionServices now exit their thread on channel drop instead of by a flag

This commit is contained in:
Michael Vines 2020-09-18 09:17:04 -07:00 committed by mergify[bot]
parent 75c3690ccd
commit c4913e3c9e
5 changed files with 52 additions and 74 deletions

View File

@ -25,7 +25,6 @@ use std::{
io, io,
net::{Ipv4Addr, SocketAddr}, net::{Ipv4Addr, SocketAddr},
sync::{ sync::{
atomic::AtomicBool,
mpsc::{channel, Receiver, Sender}, mpsc::{channel, Receiver, Sender},
Arc, RwLock, Arc, RwLock,
}, },
@ -250,14 +249,8 @@ pub async fn start_tcp_server(
// the generated Banks trait. // the generated Banks trait.
.map(move |chan| { .map(move |chan| {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
let exit_send_transaction_service = Arc::new(AtomicBool::new(false));
SendTransactionService::new( SendTransactionService::new(tpu_addr, &bank_forks, receiver);
tpu_addr,
&bank_forks,
&exit_send_transaction_service,
receiver,
);
let server = let server =
BanksServer::new(bank_forks.clone(), block_commitment_cache.clone(), sender); BanksServer::new(bank_forks.clone(), block_commitment_cache.clone(), sender);

View File

@ -1,3 +1,4 @@
// TODO: Merge this implementation with the one at `core/src/send_transaction_service.rs`
use log::*; use log::*;
use solana_metrics::{datapoint_warn, inc_new_counter_info}; use solana_metrics::{datapoint_warn, inc_new_counter_info};
use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_runtime::{bank::Bank, bank_forks::BankForks};
@ -6,8 +7,7 @@ use std::{
collections::HashMap, collections::HashMap,
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
sync::{ sync::{
atomic::{AtomicBool, Ordering}, mpsc::{Receiver, RecvTimeoutError},
mpsc::Receiver,
Arc, RwLock, Arc, RwLock,
}, },
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
@ -50,10 +50,9 @@ impl SendTransactionService {
pub fn new( pub fn new(
tpu_address: SocketAddr, tpu_address: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
exit: &Arc<AtomicBool>,
receiver: Receiver<TransactionInfo>, receiver: Receiver<TransactionInfo>,
) -> Self { ) -> Self {
let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address, exit.clone()); let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address);
Self { thread } Self { thread }
} }
@ -61,7 +60,6 @@ impl SendTransactionService {
receiver: Receiver<TransactionInfo>, receiver: Receiver<TransactionInfo>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
tpu_address: SocketAddr, tpu_address: SocketAddr,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let mut last_status_check = Instant::now(); let mut last_status_check = Instant::now();
let mut transactions = HashMap::new(); let mut transactions = HashMap::new();
@ -70,20 +68,20 @@ impl SendTransactionService {
Builder::new() Builder::new()
.name("send-tx-svc".to_string()) .name("send-tx-svc".to_string())
.spawn(move || loop { .spawn(move || loop {
if exit.load(Ordering::Relaxed) { match receiver.recv_timeout(Duration::from_secs(1)) {
break; Err(RecvTimeoutError::Disconnected) => break,
} Err(RecvTimeoutError::Timeout) => {}
Ok(transaction_info) => {
if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) { Self::send_transaction(
Self::send_transaction( &send_socket,
&send_socket, &tpu_address,
&tpu_address, &transaction_info.wire_transaction,
&transaction_info.wire_transaction, );
); if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE {
if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { transactions.insert(transaction_info.signature, transaction_info);
transactions.insert(transaction_info.signature, transaction_info); } else {
} else { datapoint_warn!("send_transaction_service-queue-overflow");
datapoint_warn!("send_transaction_service-queue-overflow"); }
} }
} }
@ -193,13 +191,12 @@ mod test {
let tpu_address = "127.0.0.1:0".parse().unwrap(); let tpu_address = "127.0.0.1:0".parse().unwrap();
let bank = Bank::default(); let bank = Bank::default();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = channel();
let (_sender, receiver) = channel();
let send_tranaction_service = let send_tranaction_service =
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, receiver);
exit.store(true, Ordering::Relaxed); drop(sender);
send_tranaction_service.join().unwrap(); send_tranaction_service.join().unwrap();
} }

View File

@ -218,7 +218,7 @@ impl JsonRpcRequestProcessor {
let cluster_info = Arc::new(ClusterInfo::default()); let cluster_info = Arc::new(ClusterInfo::default());
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let (sender, receiver) = channel(); let (sender, receiver) = channel();
SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
Self { Self {
config: JsonRpcConfig::default(), config: JsonRpcConfig::default(),
@ -2686,7 +2686,7 @@ pub mod tests {
&runtime::Runtime::new().unwrap(), &runtime::Runtime::new().unwrap(),
None, None,
); );
SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr( cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr(
&leader_pubkey, &leader_pubkey,
@ -3998,7 +3998,7 @@ pub mod tests {
&runtime::Runtime::new().unwrap(), &runtime::Runtime::new().unwrap(),
None, None,
); );
SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#; let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#;
let res = io.handle_request_sync(req, meta); let res = io.handle_request_sync(req, meta);
@ -4039,7 +4039,7 @@ pub mod tests {
&runtime::Runtime::new().unwrap(), &runtime::Runtime::new().unwrap(),
None, None,
); );
SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
let mut bad_transaction = let mut bad_transaction =
system_transaction::transfer(&mint_keypair, &Pubkey::new_rand(), 42, Hash::default()); system_transaction::transfer(&mint_keypair, &Pubkey::new_rand(), 42, Hash::default());
@ -4221,7 +4221,7 @@ pub mod tests {
&runtime::Runtime::new().unwrap(), &runtime::Runtime::new().unwrap(),
None, None,
); );
SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
assert_eq!(request_processor.validator_exit(), false); assert_eq!(request_processor.validator_exit(), false);
assert_eq!(exit.load(Ordering::Relaxed), false); assert_eq!(exit.load(Ordering::Relaxed), false);
} }
@ -4250,7 +4250,7 @@ pub mod tests {
&runtime::Runtime::new().unwrap(), &runtime::Runtime::new().unwrap(),
None, None,
); );
SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
assert_eq!(request_processor.validator_exit(), true); assert_eq!(request_processor.validator_exit(), true);
assert_eq!(exit.load(Ordering::Relaxed), true); assert_eq!(exit.load(Ordering::Relaxed), true);
} }
@ -4336,7 +4336,7 @@ pub mod tests {
&runtime::Runtime::new().unwrap(), &runtime::Runtime::new().unwrap(),
None, None,
); );
SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
assert_eq!( assert_eq!(
request_processor.get_block_commitment(0), request_processor.get_block_commitment(0),
RpcBlockCommitment { RpcBlockCommitment {

View File

@ -313,14 +313,12 @@ impl JsonRpcService {
bigtable_ledger_storage, bigtable_ledger_storage,
); );
let exit_send_transaction_service = Arc::new(AtomicBool::new(false));
let leader_info = let leader_info =
poh_recorder.map(|recorder| LeaderInfo::new(cluster_info.clone(), recorder)); poh_recorder.map(|recorder| LeaderInfo::new(cluster_info.clone(), recorder));
let _send_transaction_service = Arc::new(SendTransactionService::new( let _send_transaction_service = Arc::new(SendTransactionService::new(
tpu_address, tpu_address,
&bank_forks, &bank_forks,
leader_info, leader_info,
&exit_send_transaction_service,
receiver, receiver,
)); ));
@ -369,7 +367,6 @@ impl JsonRpcService {
let server = server.unwrap(); let server = server.unwrap();
close_handle_sender.send(server.close_handle()).unwrap(); close_handle_sender.send(server.close_handle()).unwrap();
server.wait(); server.wait();
exit_send_transaction_service.store(true, Ordering::Relaxed);
exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed); exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed);
}) })
.unwrap(); .unwrap();

View File

@ -1,3 +1,4 @@
// TODO: Merge this implementation with the one at `banks-server/src/send_transaction_service.rs`
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use log::*; use log::*;
@ -9,8 +10,7 @@ use std::{
collections::HashMap, collections::HashMap,
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
sync::{ sync::{
atomic::{AtomicBool, Ordering}, mpsc::{Receiver, RecvTimeoutError},
mpsc::Receiver,
Arc, RwLock, Arc, RwLock,
}, },
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
@ -87,16 +87,9 @@ impl SendTransactionService {
tpu_address: SocketAddr, tpu_address: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
leader_info: Option<LeaderInfo>, leader_info: Option<LeaderInfo>,
exit: &Arc<AtomicBool>,
receiver: Receiver<TransactionInfo>, receiver: Receiver<TransactionInfo>,
) -> Self { ) -> Self {
let thread = Self::retry_thread( let thread = Self::retry_thread(tpu_address, receiver, bank_forks.clone(), leader_info);
tpu_address,
receiver,
bank_forks.clone(),
leader_info,
exit.clone(),
);
Self { thread } Self { thread }
} }
@ -105,7 +98,6 @@ impl SendTransactionService {
receiver: Receiver<TransactionInfo>, receiver: Receiver<TransactionInfo>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
mut leader_info: Option<LeaderInfo>, mut leader_info: Option<LeaderInfo>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let mut last_status_check = Instant::now(); let mut last_status_check = Instant::now();
let mut transactions = HashMap::new(); let mut transactions = HashMap::new();
@ -118,24 +110,24 @@ impl SendTransactionService {
Builder::new() Builder::new()
.name("send-tx-sv2".to_string()) .name("send-tx-sv2".to_string())
.spawn(move || loop { .spawn(move || loop {
if exit.load(Ordering::Relaxed) { match receiver.recv_timeout(Duration::from_secs(1)) {
break; Err(RecvTimeoutError::Disconnected) => break,
} Err(RecvTimeoutError::Timeout) => {}
Ok(transaction_info) => {
if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) { let address = leader_info
let address = leader_info .as_ref()
.as_ref() .and_then(|leader_info| leader_info.get_leader_tpu())
.and_then(|leader_info| leader_info.get_leader_tpu()) .unwrap_or(&tpu_address);
.unwrap_or(&tpu_address); Self::send_transaction(
Self::send_transaction( &send_socket,
&send_socket, address,
address, &transaction_info.wire_transaction,
&transaction_info.wire_transaction, );
); if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE {
if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { transactions.insert(transaction_info.signature, transaction_info);
transactions.insert(transaction_info.signature, transaction_info); } else {
} else { datapoint_warn!("send_transaction_service-queue-overflow");
datapoint_warn!("send_transaction_service-queue-overflow"); }
} }
} }
@ -253,13 +245,12 @@ mod test {
let tpu_address = "127.0.0.1:0".parse().unwrap(); let tpu_address = "127.0.0.1:0".parse().unwrap();
let bank = Bank::default(); let bank = Bank::default();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = channel();
let (_sender, receiver) = channel();
let send_tranaction_service = let send_tranaction_service =
SendTransactionService::new(tpu_address, &bank_forks, None, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
exit.store(true, Ordering::Relaxed); drop(sender);
send_tranaction_service.join().unwrap(); send_tranaction_service.join().unwrap();
} }