use { bincode::{deserialize, serialize}, futures::{future, prelude::stream::StreamExt}, solana_banks_interface::{ Banks, BanksRequest, BanksResponse, TransactionConfirmationStatus, TransactionStatus, }, solana_runtime::{bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache}, solana_sdk::{ account::Account, clock::Slot, commitment_config::CommitmentLevel, fee_calculator::FeeCalculator, hash::Hash, pubkey::Pubkey, signature::Signature, transaction::{self, Transaction}, }, solana_send_transaction_service::{ send_transaction_service::{SendTransactionService, TransactionInfo}, tpu_info::NullTpuInfo, }, std::{ io, net::{Ipv4Addr, SocketAddr}, sync::{ mpsc::{channel, Receiver, Sender}, Arc, RwLock, }, thread::Builder, time::Duration, }, tarpc::{ context::Context, serde_transport::tcp, server::{self, Channel, Incoming}, transport::{self, channel::UnboundedChannel}, ClientMessage, Response, }, tokio::time::sleep, tokio_serde::formats::Bincode, }; #[derive(Clone)] struct BanksServer { bank_forks: Arc>, block_commitment_cache: Arc>, transaction_sender: Sender, } impl BanksServer { /// Return a BanksServer that forwards transactions to the /// given sender. If unit-testing, those transactions can go to /// a bank in the given BankForks. Otherwise, the receiver should /// forward them to a validator in the leader schedule. fn new( bank_forks: Arc>, block_commitment_cache: Arc>, transaction_sender: Sender, ) -> Self { Self { bank_forks, block_commitment_cache, transaction_sender, } } fn run(bank_forks: Arc>, transaction_receiver: Receiver) { while let Ok(info) = transaction_receiver.recv() { let mut transaction_infos = vec![info]; while let Ok(info) = transaction_receiver.try_recv() { transaction_infos.push(info); } let transactions: Vec<_> = transaction_infos .into_iter() .map(|info| deserialize(&info.wire_transaction).unwrap()) .collect(); let bank = bank_forks.read().unwrap().working_bank(); let _ = bank.try_process_transactions(transactions.iter()); } } /// Useful for unit-testing fn new_loopback( bank_forks: Arc>, block_commitment_cache: Arc>, ) -> Self { let (transaction_sender, transaction_receiver) = channel(); let bank = bank_forks.read().unwrap().working_bank(); let slot = bank.slot(); { // ensure that the commitment cache and bank are synced let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); w_block_commitment_cache.set_all_slots(slot, slot); } let server_bank_forks = bank_forks.clone(); Builder::new() .name("solana-bank-forks-client".to_string()) .spawn(move || Self::run(server_bank_forks, transaction_receiver)) .unwrap(); Self::new(bank_forks, block_commitment_cache, transaction_sender) } fn slot(&self, commitment: CommitmentLevel) -> Slot { self.block_commitment_cache .read() .unwrap() .slot_with_commitment(commitment) } fn bank(&self, commitment: CommitmentLevel) -> Arc { self.bank_forks.read().unwrap()[self.slot(commitment)].clone() } async fn poll_signature_status( self, signature: &Signature, blockhash: &Hash, last_valid_block_height: u64, commitment: CommitmentLevel, ) -> Option> { let mut status = self .bank(commitment) .get_signature_status_with_blockhash(signature, blockhash); while status.is_none() { sleep(Duration::from_millis(200)).await; let bank = self.bank(commitment); if bank.block_height() > last_valid_block_height { break; } status = bank.get_signature_status_with_blockhash(signature, blockhash); } status } } fn verify_transaction( transaction: &Transaction, libsecp256k1_0_5_upgrade_enabled: bool, libsecp256k1_fail_on_bad_count: bool, ) -> transaction::Result<()> { if let Err(err) = transaction.verify() { Err(err) } else if let Err(err) = transaction.verify_precompiles( libsecp256k1_0_5_upgrade_enabled, libsecp256k1_fail_on_bad_count, ) { Err(err) } else { Ok(()) } } #[tarpc::server] impl Banks for BanksServer { async fn send_transaction_with_context(self, _: Context, transaction: Transaction) { let blockhash = &transaction.message.recent_blockhash; let last_valid_block_height = self .bank_forks .read() .unwrap() .root_bank() .get_blockhash_last_valid_block_height(blockhash) .unwrap(); let signature = transaction.signatures.get(0).cloned().unwrap_or_default(); let info = TransactionInfo::new( signature, serialize(&transaction).unwrap(), last_valid_block_height, None, ); self.transaction_sender.send(info).unwrap(); } async fn get_fees_with_commitment_and_context( self, _: Context, commitment: CommitmentLevel, ) -> (FeeCalculator, Hash, u64) { let bank = self.bank(commitment); #[allow(deprecated)] let (blockhash, fee_calculator) = bank.last_blockhash_with_fee_calculator(); let last_valid_block_height = bank .get_blockhash_last_valid_block_height(&blockhash) .unwrap(); (fee_calculator, blockhash, last_valid_block_height) } async fn get_transaction_status_with_context( self, _: Context, signature: Signature, ) -> Option { let bank = self.bank(CommitmentLevel::Processed); let (slot, status) = bank.get_signature_status_slot(&signature)?; let r_block_commitment_cache = self.block_commitment_cache.read().unwrap(); let optimistically_confirmed_bank = self.bank(CommitmentLevel::Confirmed); let optimistically_confirmed = optimistically_confirmed_bank.get_signature_status_slot(&signature); let confirmations = if r_block_commitment_cache.root() >= slot && r_block_commitment_cache.highest_confirmed_root() >= slot { None } else { r_block_commitment_cache .get_confirmation_count(slot) .or(Some(0)) }; Some(TransactionStatus { slot, confirmations, err: status.err(), confirmation_status: if confirmations.is_none() { Some(TransactionConfirmationStatus::Finalized) } else if optimistically_confirmed.is_some() { Some(TransactionConfirmationStatus::Confirmed) } else { Some(TransactionConfirmationStatus::Processed) }, }) } async fn get_slot_with_context(self, _: Context, commitment: CommitmentLevel) -> Slot { self.slot(commitment) } async fn get_block_height_with_context(self, _: Context, commitment: CommitmentLevel) -> u64 { self.bank(commitment).block_height() } async fn process_transaction_with_commitment_and_context( self, _: Context, transaction: Transaction, commitment: CommitmentLevel, ) -> Option> { if let Err(err) = verify_transaction( &transaction, self.bank(commitment).libsecp256k1_0_5_upgrade_enabled(), self.bank(commitment).libsecp256k1_fail_on_bad_count(), ) { return Some(Err(err)); } let blockhash = &transaction.message.recent_blockhash; let last_valid_block_height = self .bank_forks .read() .unwrap() .root_bank() .get_blockhash_last_valid_block_height(blockhash) .unwrap(); let signature = transaction.signatures.get(0).cloned().unwrap_or_default(); let info = TransactionInfo::new( signature, serialize(&transaction).unwrap(), last_valid_block_height, None, ); self.transaction_sender.send(info).unwrap(); self.poll_signature_status(&signature, blockhash, last_valid_block_height, commitment) .await } async fn get_account_with_commitment_and_context( self, _: Context, address: Pubkey, commitment: CommitmentLevel, ) -> Option { let bank = self.bank(commitment); bank.get_account(&address).map(Account::from) } } pub async fn start_local_server( bank_forks: Arc>, block_commitment_cache: Arc>, ) -> UnboundedChannel, ClientMessage> { let banks_server = BanksServer::new_loopback(bank_forks, block_commitment_cache); let (client_transport, server_transport) = transport::channel::unbounded(); let server = server::BaseChannel::with_defaults(server_transport).execute(banks_server.serve()); tokio::spawn(server); client_transport } pub async fn start_tcp_server( listen_addr: SocketAddr, tpu_addr: SocketAddr, bank_forks: Arc>, block_commitment_cache: Arc>, ) -> io::Result<()> { // Note: These settings are copied straight from the tarpc example. let server = tcp::listen(listen_addr, Bincode::default) .await? // Ignore accept errors. .filter_map(|r| future::ready(r.ok())) .map(server::BaseChannel::with_defaults) // Limit channels to 1 per IP. .max_channels_per_key(1, |t| { t.as_ref() .peer_addr() .map(|x| x.ip()) .unwrap_or_else(|_| Ipv4Addr::new(0, 0, 0, 0).into()) }) // serve is generated by the service attribute. It takes as input any type implementing // the generated Banks trait. .map(move |chan| { let (sender, receiver) = channel(); SendTransactionService::new::( tpu_addr, &bank_forks, None, receiver, 5_000, 0, ); let server = BanksServer::new(bank_forks.clone(), block_commitment_cache.clone(), sender); chan.execute(server.serve()) }) // Max 10 channels. .buffer_unordered(10) .for_each(|_| async {}); server.await; Ok(()) }