Support sending versioned txs in AsyncClient (#23982)

This commit is contained in:
Justin Starry 2022-04-02 11:12:02 +08:00 committed by GitHub
parent 694292f7fa
commit 792bbf75ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 85 deletions

View File

@ -595,47 +595,21 @@ impl SyncClient for ThinClient {
} }
impl AsyncClient for ThinClient { impl AsyncClient for ThinClient {
fn async_send_transaction(&self, transaction: Transaction) -> TransportResult<Signature> { fn async_send_versioned_transaction(
let transaction = VersionedTransaction::from(transaction); &self,
transaction: VersionedTransaction,
) -> TransportResult<Signature> {
serialize_and_send_transaction(&transaction, self.tpu_addr())?; serialize_and_send_transaction(&transaction, self.tpu_addr())?;
Ok(transaction.signatures[0]) Ok(transaction.signatures[0])
} }
fn async_send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> { fn async_send_versioned_transaction_batch(
let batch: Vec<VersionedTransaction> = transactions.into_iter().map(Into::into).collect(); &self,
batch: Vec<VersionedTransaction>,
) -> TransportResult<()> {
par_serialize_and_send_transaction_batch(&batch[..], self.tpu_addr())?; par_serialize_and_send_transaction_batch(&batch[..], self.tpu_addr())?;
Ok(()) Ok(())
} }
fn async_send_message<T: Signers>(
&self,
keypairs: &T,
message: Message,
recent_blockhash: Hash,
) -> TransportResult<Signature> {
let transaction = Transaction::new(keypairs, message, recent_blockhash);
self.async_send_transaction(transaction)
}
fn async_send_instruction(
&self,
keypair: &Keypair,
instruction: Instruction,
recent_blockhash: Hash,
) -> TransportResult<Signature> {
let message = Message::new(&[instruction], Some(&keypair.pubkey()));
self.async_send_message(&[keypair], message, recent_blockhash)
}
fn async_transfer(
&self,
lamports: u64,
keypair: &Keypair,
pubkey: &Pubkey,
recent_blockhash: Hash,
) -> TransportResult<Signature> {
let transfer_instruction =
system_instruction::transfer(&keypair.pubkey(), pubkey, lamports);
self.async_send_instruction(keypair, transfer_instruction, recent_blockhash)
}
} }
pub fn create_client((rpc, tpu): (SocketAddr, SocketAddr)) -> ThinClient { pub fn create_client((rpc, tpu): (SocketAddr, SocketAddr)) -> ThinClient {

View File

@ -14,7 +14,7 @@ use {
signature::{Keypair, Signature, Signer}, signature::{Keypair, Signature, Signer},
signers::Signers, signers::Signers,
system_instruction, system_instruction,
transaction::{self, Transaction}, transaction::{self, Transaction, VersionedTransaction},
transport::{Result, TransportError}, transport::{Result, TransportError},
}, },
std::{ std::{
@ -28,7 +28,7 @@ use {
pub struct BankClient { pub struct BankClient {
bank: Arc<Bank>, bank: Arc<Bank>,
transaction_sender: Mutex<Sender<Transaction>>, transaction_sender: Mutex<Sender<VersionedTransaction>>,
} }
impl Client for BankClient { impl Client for BankClient {
@ -38,52 +38,15 @@ impl Client for BankClient {
} }
impl AsyncClient for BankClient { impl AsyncClient for BankClient {
fn async_send_transaction(&self, transaction: Transaction) -> Result<Signature> { fn async_send_versioned_transaction(
&self,
transaction: VersionedTransaction,
) -> Result<Signature> {
let signature = transaction.signatures.get(0).cloned().unwrap_or_default(); let signature = transaction.signatures.get(0).cloned().unwrap_or_default();
let transaction_sender = self.transaction_sender.lock().unwrap(); let transaction_sender = self.transaction_sender.lock().unwrap();
transaction_sender.send(transaction).unwrap(); transaction_sender.send(transaction).unwrap();
Ok(signature) Ok(signature)
} }
fn async_send_batch(&self, transactions: Vec<Transaction>) -> Result<()> {
for t in transactions {
self.async_send_transaction(t)?;
}
Ok(())
}
fn async_send_message<T: Signers>(
&self,
keypairs: &T,
message: Message,
recent_blockhash: Hash,
) -> Result<Signature> {
let transaction = Transaction::new(keypairs, message, recent_blockhash);
self.async_send_transaction(transaction)
}
fn async_send_instruction(
&self,
keypair: &Keypair,
instruction: Instruction,
recent_blockhash: Hash,
) -> Result<Signature> {
let message = Message::new(&[instruction], Some(&keypair.pubkey()));
self.async_send_message(&[keypair], message, recent_blockhash)
}
/// Transfer `lamports` from `keypair` to `pubkey`
fn async_transfer(
&self,
lamports: u64,
keypair: &Keypair,
pubkey: &Pubkey,
recent_blockhash: Hash,
) -> Result<Signature> {
let transfer_instruction =
system_instruction::transfer(&keypair.pubkey(), pubkey, lamports);
self.async_send_instruction(keypair, transfer_instruction, recent_blockhash)
}
} }
impl SyncClient for BankClient { impl SyncClient for BankClient {
@ -333,13 +296,13 @@ impl SyncClient for BankClient {
} }
impl BankClient { impl BankClient {
fn run(bank: &Bank, transaction_receiver: Receiver<Transaction>) { fn run(bank: &Bank, transaction_receiver: Receiver<VersionedTransaction>) {
while let Ok(tx) = transaction_receiver.recv() { while let Ok(tx) = transaction_receiver.recv() {
let mut transactions = vec![tx]; let mut transactions = vec![tx];
while let Ok(tx) = transaction_receiver.try_recv() { while let Ok(tx) = transaction_receiver.try_recv() {
transactions.push(tx); transactions.push(tx);
} }
let _ = bank.try_process_transactions(transactions.iter()); let _ = bank.try_process_entry_transactions(transactions);
} }
} }

View File

@ -20,8 +20,10 @@ use crate::{
message::Message, message::Message,
pubkey::Pubkey, pubkey::Pubkey,
signature::{Keypair, Signature}, signature::{Keypair, Signature},
signer::Signer,
signers::Signers, signers::Signers,
transaction, system_instruction,
transaction::{self, Transaction, VersionedTransaction},
transport::Result, transport::Result,
}; };
@ -173,9 +175,32 @@ pub trait SyncClient {
pub trait AsyncClient { pub trait AsyncClient {
/// Send a signed transaction, but don't wait to see if the server accepted it. /// Send a signed transaction, but don't wait to see if the server accepted it.
fn async_send_transaction(&self, transaction: transaction::Transaction) -> Result<Signature>; fn async_send_transaction(&self, transaction: Transaction) -> Result<Signature> {
self.async_send_versioned_transaction(transaction.into())
}
fn async_send_batch(&self, transactions: Vec<transaction::Transaction>) -> Result<()>; /// Send a batch of signed transactions without confirmation.
fn async_send_batch(&self, transactions: Vec<Transaction>) -> Result<()> {
let transactions = transactions.into_iter().map(Into::into).collect();
self.async_send_versioned_transaction_batch(transactions)
}
/// Send a signed versioned transaction, but don't wait to see if the server accepted it.
fn async_send_versioned_transaction(
&self,
transaction: VersionedTransaction,
) -> Result<Signature>;
/// Send a batch of signed versioned transactions without confirmation.
fn async_send_versioned_transaction_batch(
&self,
transactions: Vec<VersionedTransaction>,
) -> Result<()> {
for t in transactions {
self.async_send_versioned_transaction(t)?;
}
Ok(())
}
/// Create a transaction from the given message, and send it to the /// Create a transaction from the given message, and send it to the
/// server, but don't wait for to see if the server accepted it. /// server, but don't wait for to see if the server accepted it.
@ -184,7 +209,10 @@ pub trait AsyncClient {
keypairs: &T, keypairs: &T,
message: Message, message: Message,
recent_blockhash: Hash, recent_blockhash: Hash,
) -> Result<Signature>; ) -> Result<Signature> {
let transaction = Transaction::new(keypairs, message, recent_blockhash);
self.async_send_transaction(transaction)
}
/// Create a transaction from a single instruction that only requires /// Create a transaction from a single instruction that only requires
/// a single signer. Then send it to the server, but don't wait for a reply. /// a single signer. Then send it to the server, but don't wait for a reply.
@ -193,7 +221,10 @@ pub trait AsyncClient {
keypair: &Keypair, keypair: &Keypair,
instruction: Instruction, instruction: Instruction,
recent_blockhash: Hash, recent_blockhash: Hash,
) -> Result<Signature>; ) -> Result<Signature> {
let message = Message::new(&[instruction], Some(&keypair.pubkey()));
self.async_send_message(&[keypair], message, recent_blockhash)
}
/// Attempt to transfer lamports from `keypair` to `pubkey`, but don't wait to confirm. /// Attempt to transfer lamports from `keypair` to `pubkey`, but don't wait to confirm.
fn async_transfer( fn async_transfer(
@ -202,5 +233,9 @@ pub trait AsyncClient {
keypair: &Keypair, keypair: &Keypair,
pubkey: &Pubkey, pubkey: &Pubkey,
recent_blockhash: Hash, recent_blockhash: Hash,
) -> Result<Signature>; ) -> Result<Signature> {
let transfer_instruction =
system_instruction::transfer(&keypair.pubkey(), pubkey, lamports);
self.async_send_instruction(keypair, transfer_instruction, recent_blockhash)
}
} }