Simplify use of SendTransactionService (#10999)

* Send transaction upon recv

This will allow us to move the channel to the public interface

* Use a channel, not a method, to communicate

* Pipeline the services

* Ignore unused return values

* Fix clippy warning
This commit is contained in:
Greg Fitzgerald 2020-07-10 19:14:41 -06:00 committed by GitHub
parent 19813b0ab2
commit 145906123a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 102 deletions

View File

@ -29,7 +29,7 @@ use solana_runtime::{
bank_forks::BankForks, bank_forks::BankForks,
commitment::{BlockCommitmentArray, BlockCommitmentCache}, commitment::{BlockCommitmentArray, BlockCommitmentCache},
log_collector::LogCollector, log_collector::LogCollector,
send_transaction_service::SendTransactionService, send_transaction_service::{SendTransactionService, TransactionInfo},
}; };
use solana_sdk::{ use solana_sdk::{
account_utils::StateMut, account_utils::StateMut,
@ -58,7 +58,8 @@ use std::{
str::FromStr, str::FromStr,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, RwLock, mpsc::{channel, Receiver, Sender},
Arc, Mutex, RwLock,
}, },
}; };
@ -97,7 +98,7 @@ pub struct JsonRpcRequestProcessor {
health: Arc<RpcHealth>, health: Arc<RpcHealth>,
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash, genesis_hash: Hash,
send_transaction_service: Arc<SendTransactionService>, transaction_sender: Arc<Mutex<Sender<TransactionInfo>>>,
} }
impl Metadata for JsonRpcRequestProcessor {} impl Metadata for JsonRpcRequestProcessor {}
@ -153,8 +154,9 @@ impl JsonRpcRequestProcessor {
health: Arc<RpcHealth>, health: Arc<RpcHealth>,
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash, genesis_hash: Hash,
send_transaction_service: Arc<SendTransactionService>, ) -> (Self, Receiver<TransactionInfo>) {
) -> Self { let (sender, receiver) = channel();
(
Self { Self {
config, config,
bank_forks, bank_forks,
@ -164,8 +166,10 @@ impl JsonRpcRequestProcessor {
health, health,
cluster_info, cluster_info,
genesis_hash, genesis_hash,
send_transaction_service, transaction_sender: Arc::new(Mutex::new(sender)),
} },
receiver,
)
} }
// Useful for unit testing // Useful for unit testing
@ -179,9 +183,12 @@ impl JsonRpcRequestProcessor {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let cluster_info = Arc::new(ClusterInfo::default()); let cluster_info = Arc::new(ClusterInfo::default());
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let (sender, receiver) = channel();
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
Self { Self {
config: JsonRpcConfig::default(), config: JsonRpcConfig::default(),
bank_forks: bank_forks.clone(), bank_forks,
block_commitment_cache: Arc::new(RwLock::new(BlockCommitmentCache::new( block_commitment_cache: Arc::new(RwLock::new(BlockCommitmentCache::new(
HashMap::new(), HashMap::new(),
0, 0,
@ -195,11 +202,7 @@ impl JsonRpcRequestProcessor {
health: Arc::new(RpcHealth::new(cluster_info.clone(), None, 0, exit.clone())), health: Arc::new(RpcHealth::new(cluster_info.clone(), None, 0, exit.clone())),
cluster_info, cluster_info,
genesis_hash, genesis_hash,
send_transaction_service: Arc::new(SendTransactionService::new( transaction_sender: Arc::new(Mutex::new(sender)),
tpu_address,
&bank_forks,
&exit,
)),
} }
} }
@ -1479,8 +1482,12 @@ impl RpcSol for RpcSolImpl {
Error::internal_error() Error::internal_error()
})?; })?;
meta.send_transaction_service let transaction_info = TransactionInfo::new(signature, wire_transaction, last_valid_slot);
.send(signature, wire_transaction, last_valid_slot); meta.transaction_sender
.lock()
.unwrap()
.send(transaction_info)
.unwrap_or_else(|err| warn!("Failed to enqueue transaction: {}", err));
Ok(signature.to_string()) Ok(signature.to_string())
} }
@ -1527,8 +1534,12 @@ impl RpcSol for RpcSolImpl {
} }
} }
meta.send_transaction_service let transaction_info = TransactionInfo::new(signature, wire_transaction, last_valid_slot);
.send(signature, wire_transaction, last_valid_slot); meta.transaction_sender
.lock()
.unwrap()
.send(transaction_info)
.unwrap_or_else(|err| warn!("Failed to enqueue transaction: {}", err));
Ok(signature.to_string()) Ok(signature.to_string())
} }
@ -1883,7 +1894,7 @@ pub mod tests {
&socketaddr!("127.0.0.1:1234"), &socketaddr!("127.0.0.1:1234"),
)); ));
let meta = JsonRpcRequestProcessor::new( let (meta, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig { JsonRpcConfig {
enable_rpc_transaction_history: true, enable_rpc_transaction_history: true,
identity_pubkey: *pubkey, identity_pubkey: *pubkey,
@ -1896,8 +1907,8 @@ pub mod tests {
RpcHealth::stub(), RpcHealth::stub(),
cluster_info.clone(), cluster_info.clone(),
Hash::default(), Hash::default(),
Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)),
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr( cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr(
&leader_pubkey, &leader_pubkey,
@ -3029,7 +3040,7 @@ pub mod tests {
let cluster_info = Arc::new(ClusterInfo::default()); let cluster_info = Arc::new(ClusterInfo::default());
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let bank_forks = new_bank_forks().0; let bank_forks = new_bank_forks().0;
let meta = JsonRpcRequestProcessor::new( let (meta, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig::default(), JsonRpcConfig::default(),
new_bank_forks().0, new_bank_forks().0,
block_commitment_cache, block_commitment_cache,
@ -3038,8 +3049,8 @@ pub mod tests {
RpcHealth::stub(), RpcHealth::stub(),
cluster_info, cluster_info,
Hash::default(), Hash::default(),
Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)),
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#; let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#;
let res = io.handle_request_sync(req, meta); let res = io.handle_request_sync(req, meta);
@ -3068,7 +3079,7 @@ pub mod tests {
ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")), ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")),
)); ));
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let meta = JsonRpcRequestProcessor::new( let (meta, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig::default(), JsonRpcConfig::default(),
bank_forks.clone(), bank_forks.clone(),
block_commitment_cache, block_commitment_cache,
@ -3077,8 +3088,8 @@ pub mod tests {
health.clone(), health.clone(),
cluster_info, cluster_info,
Hash::default(), Hash::default(),
Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)),
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
let mut bad_transaction = let mut bad_transaction =
system_transaction::transfer(&Keypair::new(), &Pubkey::default(), 42, Hash::default()); system_transaction::transfer(&Keypair::new(), &Pubkey::default(), 42, Hash::default());
@ -3215,7 +3226,7 @@ pub mod tests {
let cluster_info = Arc::new(ClusterInfo::default()); let cluster_info = Arc::new(ClusterInfo::default());
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let bank_forks = new_bank_forks().0; let bank_forks = new_bank_forks().0;
let request_processor = JsonRpcRequestProcessor::new( let (request_processor, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig::default(), JsonRpcConfig::default(),
bank_forks.clone(), bank_forks.clone(),
block_commitment_cache, block_commitment_cache,
@ -3224,8 +3235,8 @@ pub mod tests {
RpcHealth::stub(), RpcHealth::stub(),
cluster_info, cluster_info,
Hash::default(), Hash::default(),
Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)),
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
assert_eq!(request_processor.validator_exit(), false); assert_eq!(request_processor.validator_exit(), false);
assert_eq!(exit.load(Ordering::Relaxed), false); assert_eq!(exit.load(Ordering::Relaxed), false);
} }
@ -3242,7 +3253,7 @@ pub mod tests {
let bank_forks = new_bank_forks().0; let bank_forks = new_bank_forks().0;
let cluster_info = Arc::new(ClusterInfo::default()); let cluster_info = Arc::new(ClusterInfo::default());
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let request_processor = JsonRpcRequestProcessor::new( let (request_processor, receiver) = JsonRpcRequestProcessor::new(
config, config,
bank_forks.clone(), bank_forks.clone(),
block_commitment_cache, block_commitment_cache,
@ -3251,8 +3262,8 @@ pub mod tests {
RpcHealth::stub(), RpcHealth::stub(),
cluster_info, cluster_info,
Hash::default(), Hash::default(),
Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)),
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
assert_eq!(request_processor.validator_exit(), true); assert_eq!(request_processor.validator_exit(), true);
assert_eq!(exit.load(Ordering::Relaxed), true); assert_eq!(exit.load(Ordering::Relaxed), true);
} }
@ -3329,7 +3340,7 @@ pub mod tests {
config.enable_validator_exit = true; config.enable_validator_exit = true;
let cluster_info = Arc::new(ClusterInfo::default()); let cluster_info = Arc::new(ClusterInfo::default());
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let request_processor = JsonRpcRequestProcessor::new( let (request_processor, receiver) = JsonRpcRequestProcessor::new(
config, config,
bank_forks.clone(), bank_forks.clone(),
block_commitment_cache, block_commitment_cache,
@ -3338,8 +3349,8 @@ pub mod tests {
RpcHealth::stub(), RpcHealth::stub(),
cluster_info, cluster_info,
Hash::default(), Hash::default(),
Arc::new(SendTransactionService::new(tpu_address, &bank_forks, &exit)),
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
assert_eq!( assert_eq!(
request_processor.get_block_commitment(0), request_processor.get_block_commitment(0),
RpcBlockCommitment { RpcBlockCommitment {

View File

@ -249,14 +249,7 @@ impl JsonRpcService {
)); ));
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); let (request_processor, receiver) = JsonRpcRequestProcessor::new(
let send_transaction_service = Arc::new(SendTransactionService::new(
tpu_address,
&bank_forks,
&exit_send_transaction_service,
));
let request_processor = JsonRpcRequestProcessor::new(
config, config,
bank_forks.clone(), bank_forks.clone(),
block_commitment_cache, block_commitment_cache,
@ -265,9 +258,16 @@ impl JsonRpcService {
health.clone(), health.clone(),
cluster_info, cluster_info,
genesis_hash, genesis_hash,
send_transaction_service,
); );
let exit_send_transaction_service = Arc::new(AtomicBool::new(false));
let _send_transaction_service = Arc::new(SendTransactionService::new(
tpu_address,
&bank_forks,
&exit_send_transaction_service,
receiver,
));
#[cfg(test)] #[cfg(test)]
let test_request_processor = request_processor.clone(); let test_request_processor = request_processor.clone();

View File

@ -7,8 +7,8 @@ use std::{
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
mpsc::{channel, Receiver, Sender}, mpsc::Receiver,
Arc, Mutex, RwLock, Arc, RwLock,
}, },
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::{Duration, Instant}, time::{Duration, Instant},
@ -19,17 +19,24 @@ const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but m
pub struct SendTransactionService { pub struct SendTransactionService {
thread: JoinHandle<()>, thread: JoinHandle<()>,
sender: Mutex<Sender<TransactionInfo>>,
send_socket: UdpSocket,
tpu_address: SocketAddr,
} }
struct TransactionInfo { pub struct TransactionInfo {
signature: Signature, signature: Signature,
wire_transaction: Vec<u8>, wire_transaction: Vec<u8>,
last_valid_slot: Slot, last_valid_slot: Slot,
} }
impl TransactionInfo {
pub fn new(signature: Signature, wire_transaction: Vec<u8>, last_valid_slot: Slot) -> Self {
Self {
signature,
wire_transaction,
last_valid_slot,
}
}
}
#[derive(Default, Debug, PartialEq)] #[derive(Default, Debug, PartialEq)]
struct ProcessTransactionsResult { struct ProcessTransactionsResult {
rooted: u64, rooted: u64,
@ -44,16 +51,10 @@ impl SendTransactionService {
tpu_address: SocketAddr, tpu_address: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
receiver: Receiver<TransactionInfo>,
) -> Self { ) -> Self {
let (sender, receiver) = channel::<TransactionInfo>();
let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address, exit.clone()); let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address, exit.clone());
Self { Self { thread }
thread,
sender: Mutex::new(sender),
send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
tpu_address,
}
} }
fn retry_thread( fn retry_thread(
@ -74,6 +75,11 @@ impl SendTransactionService {
} }
if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) { if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) {
Self::send_transaction(
&send_socket,
&tpu_address,
&transaction_info.wire_transaction,
);
if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE {
transactions.insert(transaction_info.signature, transaction_info); transactions.insert(transaction_info.signature, transaction_info);
} else { } else {
@ -168,21 +174,6 @@ impl SendTransactionService {
} }
} }
pub fn send(&self, signature: Signature, wire_transaction: Vec<u8>, last_valid_slot: Slot) {
inc_new_counter_info!("send_transaction_service-enqueue", 1, 1);
Self::send_transaction(&self.send_socket, &self.tpu_address, &wire_transaction);
self.sender
.lock()
.unwrap()
.send(TransactionInfo {
signature,
wire_transaction,
last_valid_slot,
})
.unwrap_or_else(|err| warn!("Failed to enqueue transaction: {}", err));
}
pub fn join(self) -> thread::Result<()> { pub fn join(self) -> thread::Result<()> {
self.thread.join() self.thread.join()
} }
@ -195,6 +186,7 @@ mod test {
genesis_config::create_genesis_config, pubkey::Pubkey, signature::Signer, genesis_config::create_genesis_config, pubkey::Pubkey, signature::Signer,
system_transaction, system_transaction,
}; };
use std::sync::mpsc::channel;
#[test] #[test]
fn service_exit() { fn service_exit() {
@ -202,8 +194,10 @@ mod test {
let bank = Bank::default(); let bank = Bank::default();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (_sender, receiver) = channel();
let send_tranaction_service = SendTransactionService::new(tpu_address, &bank_forks, &exit); let send_tranaction_service =
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
send_tranaction_service.join().unwrap(); send_tranaction_service.join().unwrap();
@ -248,11 +242,7 @@ mod test {
info!("Expired transactions are dropped.."); info!("Expired transactions are dropped..");
transactions.insert( transactions.insert(
Signature::default(), Signature::default(),
TransactionInfo { TransactionInfo::new(Signature::default(), vec![], root_bank.slot() - 1),
signature: Signature::default(),
wire_transaction: vec![],
last_valid_slot: root_bank.slot() - 1,
},
); );
let result = SendTransactionService::process_transactions( let result = SendTransactionService::process_transactions(
&working_bank, &working_bank,
@ -273,11 +263,7 @@ mod test {
info!("Rooted transactions are dropped..."); info!("Rooted transactions are dropped...");
transactions.insert( transactions.insert(
rooted_signature, rooted_signature,
TransactionInfo { TransactionInfo::new(rooted_signature, vec![], working_bank.slot()),
signature: rooted_signature,
wire_transaction: vec![],
last_valid_slot: working_bank.slot(),
},
); );
let result = SendTransactionService::process_transactions( let result = SendTransactionService::process_transactions(
&working_bank, &working_bank,
@ -298,11 +284,7 @@ mod test {
info!("Failed transactions are dropped..."); info!("Failed transactions are dropped...");
transactions.insert( transactions.insert(
failed_signature, failed_signature,
TransactionInfo { TransactionInfo::new(failed_signature, vec![], working_bank.slot()),
signature: failed_signature,
wire_transaction: vec![],
last_valid_slot: working_bank.slot(),
},
); );
let result = SendTransactionService::process_transactions( let result = SendTransactionService::process_transactions(
&working_bank, &working_bank,
@ -323,11 +305,7 @@ mod test {
info!("Non-rooted transactions are kept..."); info!("Non-rooted transactions are kept...");
transactions.insert( transactions.insert(
non_rooted_signature, non_rooted_signature,
TransactionInfo { TransactionInfo::new(non_rooted_signature, vec![], working_bank.slot()),
signature: non_rooted_signature,
wire_transaction: vec![],
last_valid_slot: working_bank.slot(),
},
); );
let result = SendTransactionService::process_transactions( let result = SendTransactionService::process_transactions(
&working_bank, &working_bank,
@ -349,11 +327,7 @@ mod test {
info!("Unknown transactions are retried..."); info!("Unknown transactions are retried...");
transactions.insert( transactions.insert(
Signature::default(), Signature::default(),
TransactionInfo { TransactionInfo::new(Signature::default(), vec![], working_bank.slot()),
signature: Signature::default(),
wire_transaction: vec![],
last_valid_slot: working_bank.slot(),
},
); );
let result = SendTransactionService::process_transactions( let result = SendTransactionService::process_transactions(
&working_bank, &working_bank,