diff --git a/Cargo.lock b/Cargo.lock index 9a5fc705b..26e4d352b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -688,6 +688,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "derivative" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb582b60359da160a9477ee80f15c8d784c477e69c217ef2cdd4169c24ea380f" +dependencies = [ + "proc-macro2 1.0.19", + "quote 1.0.6", + "syn 1.0.27", +] + [[package]] name = "dialoguer" version = "0.6.2" @@ -1230,7 +1241,7 @@ dependencies = [ "log 0.4.8", "slab", "tokio 0.2.22", - "tokio-util", + "tokio-util 0.3.1", ] [[package]] @@ -3241,6 +3252,45 @@ dependencies = [ "solana-version", ] +[[package]] +name = "solana-banks-client" +version = "1.4.0" +dependencies = [ + "async-trait", + "bincode", + "futures 0.3.5", + "solana-banks-interface", + "solana-banks-server", + "solana-runtime", + "solana-sdk 1.4.0", + "tarpc", + "tokio 0.2.22", + "tokio-serde", +] + +[[package]] +name = "solana-banks-interface" +version = "1.4.0" +dependencies = [ + "serde", + "solana-sdk 1.4.0", + "tarpc", +] + +[[package]] +name = "solana-banks-server" +version = "1.4.0" +dependencies = [ + "bincode", + "futures 0.3.5", + "solana-banks-interface", + "solana-runtime", + "solana-sdk 1.4.0", + "tarpc", + "tokio 0.2.22", + "tokio-serde", +] + [[package]] name = "solana-bench-exchange" version = "1.4.0" @@ -3491,6 +3541,7 @@ dependencies = [ "serial_test", "serial_test_derive", "solana-account-decoder", + "solana-banks-server", "solana-bpf-loader-program", "solana-budget-program", "solana-clap-utils", @@ -4383,6 +4434,8 @@ dependencies = [ "indicatif", "pickledb", "serde", + "solana-banks-client", + "solana-banks-server", "solana-clap-utils", "solana-cli-config", "solana-client", @@ -4391,9 +4444,10 @@ dependencies = [ "solana-runtime", "solana-sdk 1.4.0", "solana-stake-program", - "solana-transaction-status", "tempfile", "thiserror", + "tokio 0.2.22", + "url 2.1.1", ] [[package]] @@ -4748,6 +4802,36 @@ dependencies = [ "xattr", ] +[[package]] +name = "tarpc" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7262a81ff505d04617aabee6f3e416eafd4d67f856832196c221ffd434efda47" +dependencies = [ + "fnv", + "futures 0.3.5", + "humantime 1.3.0", + "log 0.4.8", + "pin-project", + "rand 0.7.3", + "serde", + "tarpc-plugins", + "tokio 0.2.22", + "tokio-serde", + "tokio-util 0.2.0", +] + +[[package]] +name = "tarpc-plugins" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edbaf92ceea0a2ab555bea18a47a891e46ba2d6f930ec9506771662f4ab82bb7" +dependencies = [ + "proc-macro2 1.0.19", + "quote 1.0.6", + "syn 1.0.27", +] + [[package]] name = "tempdir" version = "0.3.7" @@ -5074,6 +5158,20 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-serde" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebdd897b01021779294eb09bb3b52b6e11b0747f9f7e333a84bef532b656de99" +dependencies = [ + "bincode", + "bytes 0.5.4", + "derivative", + "futures 0.3.5", + "pin-project", + "serde", +] + [[package]] name = "tokio-sync" version = "0.1.8" @@ -5181,6 +5279,20 @@ dependencies = [ "tokio-reactor", ] +[[package]] +name = "tokio-util" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930" +dependencies = [ + "bytes 0.5.4", + "futures-core", + "futures-sink", + "log 0.4.8", + "pin-project-lite", + "tokio 0.2.22", +] + [[package]] name = "tokio-util" version = "0.3.1" @@ -5225,7 +5337,7 @@ dependencies = [ "prost-derive", "tokio 0.2.22", "tokio-rustls 0.14.0", - "tokio-util", + "tokio-util 0.3.1", "tower", "tower-balance", "tower-load", diff --git a/Cargo.toml b/Cargo.toml index 181683eeb..5e4ce8cfe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,9 @@ members = [ "bench-tps", "accounts-bench", "banking-bench", + "banks-client", + "banks-interface", + "banks-server", "clap-utils", "cli-config", "client", diff --git a/banks-client/Cargo.toml b/banks-client/Cargo.toml new file mode 100644 index 000000000..a6fed40fe --- /dev/null +++ b/banks-client/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "solana-banks-client" +version = "1.4.0" +description = "Solana banks client" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +edition = "2018" + +[dependencies] +async-trait = "0.1.36" +bincode = "1.3.1" +futures = "0.3" +solana-banks-interface = { path = "../banks-interface", version = "1.4.0" } +solana-sdk = { path = "../sdk", version = "1.4.0" } +tarpc = { version = "0.21.0", features = ["full"] } +tokio = "0.2" +tokio-serde = { version = "0.6", features = ["bincode"] } + +[dev-dependencies] +solana-runtime = { path = "../runtime", version = "1.4.0" } +solana-banks-server = { path = "../banks-server", version = "1.4.0" } + +[lib] +crate-type = ["lib"] +name = "solana_banks_client" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/banks-client/src/lib.rs b/banks-client/src/lib.rs new file mode 100644 index 000000000..1ded4dfa4 --- /dev/null +++ b/banks-client/src/lib.rs @@ -0,0 +1,283 @@ +//! A client for the ledger state, from the perspective of an arbitrary validator. +//! +//! Use start_tcp_client() to create a client and then import BanksClientExt to +//! access its methods. Additional "*_with_context" methods are also available, +//! but they are undocumented, may change over time, and are generally more +//! cumbersome to use. + +use async_trait::async_trait; +use futures::future::join_all; +pub use solana_banks_interface::{BanksClient, TransactionStatus}; +use solana_banks_interface::{BanksRequest, BanksResponse}; +use solana_sdk::{ + account::Account, clock::Slot, commitment_config::CommitmentLevel, + fee_calculator::FeeCalculator, hash::Hash, pubkey::Pubkey, signature::Signature, + transaction::Transaction, transport, +}; +use std::io::{self, Error, ErrorKind}; +use tarpc::{ + client, context, + rpc::{transport::channel::UnboundedChannel, ClientMessage, Response}, + serde_transport::tcp, +}; +use tokio::{net::ToSocketAddrs, time::Duration}; +use tokio_serde::formats::Bincode; + +#[async_trait] +pub trait BanksClientExt { + /// Send a transaction and return immediately. The server will resend the + /// transaction until either it is accepted by the cluster or the transaction's + /// blockhash expires. + async fn send_transaction(&mut self, transaction: Transaction) -> io::Result<()>; + + /// Return a recent, rooted blockhash from the server. The cluster will only accept + /// transactions with a blockhash that has not yet expired. Use the `get_fees` + /// method to get both a blockhash and the blockhash's last valid slot. + async fn get_recent_blockhash(&mut self) -> io::Result; + + /// Return the fee parameters associated with a recent, rooted blockhash. The cluster + /// will use the transaction's blockhash to look up these same fee parameters and + /// use them to calculate the transaction fee. + async fn get_fees(&mut self) -> io::Result<(FeeCalculator, Hash, Slot)>; + + /// Send a transaction and return after the transaction has been rejected or + /// reached the given level of commitment. + async fn process_transaction_with_commitment( + &mut self, + transaction: Transaction, + commitment: CommitmentLevel, + ) -> transport::Result<()>; + + /// Send a transaction and return after the transaction has been finalized or rejected. + async fn process_transaction(&mut self, transaction: Transaction) -> transport::Result<()>; + + /// Return the status of a transaction with a signature matching the transaction's first + /// signature. Return None if the transaction is not found, which may be because the + /// blockhash was expired or the fee-paying account had insufficient funds to pay the + /// transaction fee. Note that servers rarely store the full transaction history. This + /// method may return None if the transaction status has been discarded. + async fn get_transaction_status( + &mut self, + signature: Signature, + ) -> io::Result>; + + /// Same as get_transaction_status, but for multiple transactions. + async fn get_transaction_statuses( + &mut self, + signatures: Vec, + ) -> io::Result>>; + + /// Return the most recent rooted slot height. All transactions at or below this height + /// are said to be finalized. The cluster will not fork to a higher slot height. + async fn get_root_slot(&mut self) -> io::Result; + + /// Return the account at the given address at the time of the most recent root slot. + /// If the account is not found, None is returned. + async fn get_account(&mut self, address: Pubkey) -> io::Result>; + + /// Return the balance in lamports of an account at the given address at the slot + /// corresponding to the given commitment level. + async fn get_balance_with_commitment( + &mut self, + address: Pubkey, + commitment: CommitmentLevel, + ) -> io::Result; + + /// Return the balance in lamports of an account at the given address at the time + /// of the most recent root slot. + async fn get_balance(&mut self, address: Pubkey) -> io::Result; +} + +#[async_trait] +impl BanksClientExt for BanksClient { + async fn send_transaction(&mut self, transaction: Transaction) -> io::Result<()> { + self.send_transaction_with_context(context::current(), transaction) + .await + } + + async fn get_fees(&mut self) -> io::Result<(FeeCalculator, Hash, Slot)> { + self.get_fees_with_commitment_and_context(context::current(), CommitmentLevel::Root) + .await + } + + async fn get_recent_blockhash(&mut self) -> io::Result { + Ok(self.get_fees().await?.1) + } + + async fn process_transaction_with_commitment( + &mut self, + transaction: Transaction, + commitment: CommitmentLevel, + ) -> transport::Result<()> { + let mut ctx = context::current(); + ctx.deadline += Duration::from_secs(50); + let result = self + .process_transaction_with_commitment_and_context(ctx, transaction, commitment) + .await?; + match result { + None => Err(Error::new(ErrorKind::TimedOut, "invalid blockhash or fee-payer").into()), + Some(transaction_result) => Ok(transaction_result?), + } + } + + async fn process_transaction(&mut self, transaction: Transaction) -> transport::Result<()> { + self.process_transaction_with_commitment(transaction, CommitmentLevel::default()) + .await + } + + async fn get_root_slot(&mut self) -> io::Result { + self.get_slot_with_context(context::current(), CommitmentLevel::Root) + .await + } + + async fn get_account(&mut self, address: Pubkey) -> io::Result> { + self.get_account_with_commitment_and_context( + context::current(), + address, + CommitmentLevel::default(), + ) + .await + } + + async fn get_balance_with_commitment( + &mut self, + address: Pubkey, + commitment: CommitmentLevel, + ) -> io::Result { + let account = self + .get_account_with_commitment_and_context(context::current(), address, commitment) + .await?; + Ok(account.map(|x| x.lamports).unwrap_or(0)) + } + + async fn get_balance(&mut self, address: Pubkey) -> io::Result { + self.get_balance_with_commitment(address, CommitmentLevel::default()) + .await + } + + async fn get_transaction_status( + &mut self, + signature: Signature, + ) -> io::Result> { + self.get_transaction_status_with_context(context::current(), signature) + .await + } + + async fn get_transaction_statuses( + &mut self, + signatures: Vec, + ) -> io::Result>> { + // tarpc futures oddly hold a mutable reference back to the client so clone the client upfront + let mut clients_and_signatures: Vec<_> = signatures + .into_iter() + .map(|signature| (self.clone(), signature)) + .collect(); + + let futs = clients_and_signatures + .iter_mut() + .map(|(client, signature)| client.get_transaction_status(*signature)); + + let statuses = join_all(futs).await; + + // Convert Vec> to Result> + statuses.into_iter().collect() + } +} + +pub async fn start_client( + transport: UnboundedChannel, ClientMessage>, +) -> io::Result { + BanksClient::new(client::Config::default(), transport).spawn() +} + +pub async fn start_tcp_client(addr: T) -> io::Result { + let transport = tcp::connect(addr, Bincode::default()).await?; + BanksClient::new(client::Config::default(), transport).spawn() +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_banks_server::banks_server::start_local_server; + use solana_runtime::{bank::Bank, bank_forks::BankForks, genesis_utils::create_genesis_config}; + use solana_sdk::{message::Message, pubkey::Pubkey, signature::Signer, system_instruction}; + use std::sync::{Arc, RwLock}; + use tarpc::transport; + use tokio::{runtime::Runtime, time::delay_for}; + + #[test] + fn test_banks_client_new() { + let (client_transport, _server_transport) = transport::channel::unbounded(); + BanksClient::new(client::Config::default(), client_transport); + } + + #[test] + fn test_banks_server_transfer_via_server() -> io::Result<()> { + // This test shows the preferred way to interact with BanksServer. + // It creates a runtime explicitly (no globals via tokio macros) and calls + // `runtime.block_on()` just once, to run all the async code. + + let genesis = create_genesis_config(10); + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::new( + &genesis.genesis_config, + )))); + + let bob_pubkey = Pubkey::new_rand(); + let mint_pubkey = genesis.mint_keypair.pubkey(); + let instruction = system_instruction::transfer(&mint_pubkey, &bob_pubkey, 1); + let message = Message::new(&[instruction], Some(&mint_pubkey)); + + Runtime::new()?.block_on(async { + let client_transport = start_local_server(&bank_forks).await; + let mut banks_client = + BanksClient::new(client::Config::default(), client_transport).spawn()?; + + let recent_blockhash = banks_client.get_recent_blockhash().await?; + let transaction = Transaction::new(&[&genesis.mint_keypair], message, recent_blockhash); + banks_client.process_transaction(transaction).await.unwrap(); + assert_eq!(banks_client.get_balance(bob_pubkey).await?, 1); + Ok(()) + }) + } + + #[test] + fn test_banks_server_transfer_via_client() -> io::Result<()> { + // The caller may not want to hold the connection open until the transaction + // is processed (or blockhash expires). In this test, we verify the + // server-side functionality is available to the client. + + let genesis = create_genesis_config(10); + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::new( + &genesis.genesis_config, + )))); + + let mint_pubkey = &genesis.mint_keypair.pubkey(); + let bob_pubkey = Pubkey::new_rand(); + let instruction = system_instruction::transfer(&mint_pubkey, &bob_pubkey, 1); + let message = Message::new(&[instruction], Some(&mint_pubkey)); + + Runtime::new()?.block_on(async { + let client_transport = start_local_server(&bank_forks).await; + let mut banks_client = + BanksClient::new(client::Config::default(), client_transport).spawn()?; + let (_, recent_blockhash, last_valid_slot) = banks_client.get_fees().await?; + let transaction = Transaction::new(&[&genesis.mint_keypair], message, recent_blockhash); + let signature = transaction.signatures[0]; + banks_client.send_transaction(transaction).await?; + + let mut status = banks_client.get_transaction_status(signature).await?; + + while status.is_none() { + let root_slot = banks_client.get_root_slot().await?; + if root_slot > last_valid_slot { + break; + } + delay_for(Duration::from_millis(100)).await; + status = banks_client.get_transaction_status(signature).await?; + } + assert!(status.unwrap().err.is_none()); + assert_eq!(banks_client.get_balance(bob_pubkey).await?, 1); + Ok(()) + }) + } +} diff --git a/banks-interface/Cargo.toml b/banks-interface/Cargo.toml new file mode 100644 index 000000000..a69de8e1e --- /dev/null +++ b/banks-interface/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "solana-banks-interface" +version = "1.4.0" +description = "Solana banks RPC interface" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +edition = "2018" + +[dependencies] +serde = { version = "1.0.112", features = ["derive"] } +solana-sdk = { path = "../sdk", version = "1.4.0" } +tarpc = { version = "0.21.0", features = ["full"] } + +[lib] +crate-type = ["lib"] +name = "solana_banks_interface" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/banks-interface/src/lib.rs b/banks-interface/src/lib.rs new file mode 100644 index 000000000..46640e91e --- /dev/null +++ b/banks-interface/src/lib.rs @@ -0,0 +1,49 @@ +use serde::{Deserialize, Serialize}; +use solana_sdk::{ + account::Account, + clock::Slot, + commitment_config::CommitmentLevel, + fee_calculator::FeeCalculator, + hash::Hash, + pubkey::Pubkey, + signature::Signature, + transaction::{self, Transaction, TransactionError}, +}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct TransactionStatus { + pub slot: Slot, + pub confirmations: Option, // None = rooted + pub err: Option, +} + +#[tarpc::service] +pub trait Banks { + async fn send_transaction_with_context(transaction: Transaction); + async fn get_fees_with_commitment_and_context( + commitment: CommitmentLevel, + ) -> (FeeCalculator, Hash, Slot); + async fn get_transaction_status_with_context(signature: Signature) + -> Option; + async fn get_slot_with_context(commitment: CommitmentLevel) -> Slot; + async fn process_transaction_with_commitment_and_context( + transaction: Transaction, + commitment: CommitmentLevel, + ) -> Option>; + async fn get_account_with_commitment_and_context( + address: Pubkey, + commitment: CommitmentLevel, + ) -> Option; +} + +#[cfg(test)] +mod tests { + use super::*; + use tarpc::{client, transport}; + + #[test] + fn test_banks_client_new() { + let (client_transport, _server_transport) = transport::channel::unbounded(); + BanksClient::new(client::Config::default(), client_transport); + } +} diff --git a/banks-server/Cargo.toml b/banks-server/Cargo.toml new file mode 100644 index 000000000..56706c062 --- /dev/null +++ b/banks-server/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "solana-banks-server" +version = "1.4.0" +description = "Solana banks server" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +edition = "2018" + +[dependencies] +bincode = "1.3.1" +futures = "0.3" +solana-banks-interface = { path = "../banks-interface", version = "1.4.0" } +solana-runtime = { path = "../runtime", version = "1.4.0" } +solana-sdk = { path = "../sdk", version = "1.4.0" } +tarpc = { version = "0.21.0", features = ["full"] } +tokio = "0.2" +tokio-serde = { version = "0.6", features = ["bincode"] } + +[lib] +crate-type = ["lib"] +name = "solana_banks_server" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs new file mode 100644 index 000000000..912e0813d --- /dev/null +++ b/banks-server/src/banks_server.rs @@ -0,0 +1,272 @@ +use bincode::{deserialize, serialize}; +use futures::{ + future, + prelude::stream::{self, StreamExt}, +}; +use solana_banks_interface::{Banks, BanksRequest, BanksResponse, TransactionStatus}; +use solana_runtime::{ + bank::Bank, + bank_forks::BankForks, + commitment::{BlockCommitmentCache, CommitmentSlots}, + send_transaction_service::{SendTransactionService, TransactionInfo}, +}; +use solana_sdk::{ + account::Account, + clock::Slot, + commitment_config::CommitmentLevel, + fee_calculator::FeeCalculator, + hash::Hash, + pubkey::Pubkey, + signature::Signature, + transaction::{self, Transaction}, +}; +use std::{ + collections::HashMap, + io, + net::SocketAddr, + sync::{ + atomic::AtomicBool, + mpsc::{channel, Receiver, Sender}, + Arc, RwLock, + }, + thread::Builder, + time::Duration, +}; +use tarpc::{ + context::Context, + rpc::{transport::channel::UnboundedChannel, ClientMessage, Response}, + serde_transport::tcp, + server::{self, Channel, Handler}, + transport, +}; +use tokio::time::delay_for; +use tokio_serde::formats::Bincode; + +#[derive(Clone)] +struct BanksServer { + bank_forks: Arc>, + block_commitment_cache: Arc>, + transaction_sender: Sender, +} + +impl BanksServer { + /// Return a BanksServer that forwards transactions to the + /// given sender. If unit-testing, those transactions can go to + /// a bank in the given BankForks. Otherwise, the receiver should + /// forward them to a validator in the leader schedule. + fn new( + bank_forks: Arc>, + block_commitment_cache: Arc>, + transaction_sender: Sender, + ) -> Self { + Self { + bank_forks, + block_commitment_cache, + transaction_sender, + } + } + + fn run(bank: &Bank, transaction_receiver: Receiver) { + while let Ok(info) = transaction_receiver.recv() { + let mut transaction_infos = vec![info]; + while let Ok(info) = transaction_receiver.try_recv() { + transaction_infos.push(info); + } + let transactions: Vec<_> = transaction_infos + .into_iter() + .map(|info| deserialize(&info.wire_transaction).unwrap()) + .collect(); + let _ = bank.process_transactions(&transactions); + } + } + + /// Useful for unit-testing + fn new_loopback(bank_forks: Arc>) -> Self { + let (transaction_sender, transaction_receiver) = channel(); + let bank = bank_forks.read().unwrap().working_bank(); + let slot = bank.slot(); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new( + HashMap::default(), + 0, + CommitmentSlots { + slot, + root: 0, + highest_confirmed_slot: 0, + highest_confirmed_root: 0, + }, + ))); + Builder::new() + .name("solana-bank-forks-client".to_string()) + .spawn(move || Self::run(&bank, transaction_receiver)) + .unwrap(); + Self::new(bank_forks, block_commitment_cache, transaction_sender) + } + + fn slot(&self, commitment: CommitmentLevel) -> Slot { + self.block_commitment_cache + .read() + .unwrap() + .slot_with_commitment(commitment) + } + + fn bank(&self, commitment: CommitmentLevel) -> Arc { + self.bank_forks.read().unwrap()[self.slot(commitment)].clone() + } + + async fn poll_signature_status( + self, + signature: Signature, + last_valid_slot: Slot, + commitment: CommitmentLevel, + ) -> Option> { + let mut status = self.bank(commitment).get_signature_status(&signature); + while status.is_none() { + delay_for(Duration::from_millis(200)).await; + let bank = self.bank(commitment); + if bank.slot() > last_valid_slot { + break; + } + status = bank.get_signature_status(&signature); + } + status + } +} + +#[tarpc::server] +impl Banks for BanksServer { + async fn send_transaction_with_context(self, _: Context, transaction: Transaction) { + let blockhash = &transaction.message.recent_blockhash; + let last_valid_slot = self + .bank_forks + .read() + .unwrap() + .root_bank() + .get_blockhash_last_valid_slot(&blockhash) + .unwrap(); + let signature = transaction.signatures.get(0).cloned().unwrap_or_default(); + let info = + TransactionInfo::new(signature, serialize(&transaction).unwrap(), last_valid_slot); + self.transaction_sender.send(info).unwrap(); + } + + async fn get_fees_with_commitment_and_context( + self, + _: Context, + commitment: CommitmentLevel, + ) -> (FeeCalculator, Hash, Slot) { + let bank = self.bank(commitment); + let (blockhash, fee_calculator) = bank.last_blockhash_with_fee_calculator(); + let last_valid_slot = bank.get_blockhash_last_valid_slot(&blockhash).unwrap(); + (fee_calculator, blockhash, last_valid_slot) + } + + async fn get_transaction_status_with_context( + self, + _: Context, + signature: Signature, + ) -> Option { + let bank = self.bank(CommitmentLevel::Recent); + let (slot, status) = bank.get_signature_status_slot(&signature)?; + let r_block_commitment_cache = self.block_commitment_cache.read().unwrap(); + + let confirmations = if r_block_commitment_cache.root() >= slot { + None + } else { + r_block_commitment_cache + .get_confirmation_count(slot) + .or(Some(0)) + }; + Some(TransactionStatus { + slot, + confirmations, + err: status.err(), + }) + } + + async fn get_slot_with_context(self, _: Context, commitment: CommitmentLevel) -> Slot { + self.slot(commitment) + } + + async fn process_transaction_with_commitment_and_context( + self, + _: Context, + transaction: Transaction, + commitment: CommitmentLevel, + ) -> Option> { + let blockhash = &transaction.message.recent_blockhash; + let last_valid_slot = self + .bank_forks + .read() + .unwrap() + .root_bank() + .get_blockhash_last_valid_slot(&blockhash) + .unwrap(); + let signature = transaction.signatures.get(0).cloned().unwrap_or_default(); + let info = + TransactionInfo::new(signature, serialize(&transaction).unwrap(), last_valid_slot); + self.transaction_sender.send(info).unwrap(); + self.poll_signature_status(signature, last_valid_slot, commitment) + .await + } + + async fn get_account_with_commitment_and_context( + self, + _: Context, + address: Pubkey, + commitment: CommitmentLevel, + ) -> Option { + let bank = self.bank(commitment); + bank.get_account(&address) + } +} + +pub async fn start_local_server( + bank_forks: &Arc>, +) -> UnboundedChannel, ClientMessage> { + let banks_server = BanksServer::new_loopback(bank_forks.clone()); + let (client_transport, server_transport) = transport::channel::unbounded(); + let server = server::new(server::Config::default()) + .incoming(stream::once(future::ready(server_transport))) + .respond_with(banks_server.serve()); + tokio::spawn(server); + client_transport +} + +pub async fn start_tcp_server( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: Arc>, + block_commitment_cache: Arc>, +) -> io::Result<()> { + // Note: These settings are copied straight from the tarpc example. + let server = tcp::listen(listen_addr, Bincode::default) + .await? + // Ignore accept errors. + .filter_map(|r| future::ready(r.ok())) + .map(server::BaseChannel::with_defaults) + // Limit channels to 1 per IP. + .max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip()) + // serve is generated by the service attribute. It takes as input any type implementing + // the generated Banks trait. + .map(move |chan| { + let (sender, receiver) = channel(); + let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); + + SendTransactionService::new( + tpu_addr, + &bank_forks, + &exit_send_transaction_service, + receiver, + ); + + let server = + BanksServer::new(bank_forks.clone(), block_commitment_cache.clone(), sender); + chan.respond_with(server.serve()).execute() + }) + // Max 10 channels. + .buffer_unordered(10) + .for_each(|_| async {}); + + server.await; + Ok(()) +} diff --git a/banks-server/src/lib.rs b/banks-server/src/lib.rs new file mode 100644 index 000000000..a9acc11e5 --- /dev/null +++ b/banks-server/src/lib.rs @@ -0,0 +1,2 @@ +pub mod banks_server; +pub mod rpc_banks_service; diff --git a/banks-server/src/rpc_banks_service.rs b/banks-server/src/rpc_banks_service.rs new file mode 100644 index 000000000..541133e64 --- /dev/null +++ b/banks-server/src/rpc_banks_service.rs @@ -0,0 +1,116 @@ +//! The `rpc_banks_service` module implements the Solana Banks RPC API. + +use crate::banks_server::start_tcp_server; +use futures::{future::FutureExt, pin_mut, prelude::stream::StreamExt, select}; +use solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache}; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{self, Builder, JoinHandle}, +}; +use tokio::{ + runtime::Runtime, + time::{self, Duration}, +}; + +pub struct RpcBanksService { + thread_hdl: JoinHandle<()>, +} + +/// Run the TCP service until `exit` is set to true +async fn start_abortable_tcp_server( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: Arc>, + block_commitment_cache: Arc>, + exit: Arc, +) { + let server = start_tcp_server( + listen_addr, + tpu_addr, + bank_forks.clone(), + block_commitment_cache.clone(), + ) + .fuse(); + let interval = time::interval(Duration::from_millis(100)).fuse(); + pin_mut!(server, interval); + loop { + select! { + _ = server => {}, + _ = interval.select_next_some() => { + if exit.load(Ordering::Relaxed) { + break; + } + } + } + } +} + +impl RpcBanksService { + fn run( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: Arc>, + block_commitment_cache: Arc>, + exit: Arc, + ) { + let server = start_abortable_tcp_server( + listen_addr, + tpu_addr, + bank_forks, + block_commitment_cache, + exit, + ); + Runtime::new().unwrap().block_on(server); + } + + pub fn new( + listen_addr: SocketAddr, + tpu_addr: SocketAddr, + bank_forks: &Arc>, + block_commitment_cache: &Arc>, + exit: &Arc, + ) -> Self { + let bank_forks = bank_forks.clone(); + let block_commitment_cache = block_commitment_cache.clone(); + let exit = exit.clone(); + let thread_hdl = Builder::new() + .name("solana-rpc-banks".to_string()) + .spawn(move || { + Self::run( + listen_addr, + tpu_addr, + bank_forks, + block_commitment_cache, + exit, + ) + }) + .unwrap(); + + Self { thread_hdl } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_runtime::bank::Bank; + + #[test] + fn test_rpc_banks_server_exit() { + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::default()))); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let exit = Arc::new(AtomicBool::new(false)); + let addr = "127.0.0.1:0".parse().unwrap(); + let service = RpcBanksService::new(addr, addr, &bank_forks, &block_commitment_cache, &exit); + exit.store(true, Ordering::Relaxed); + service.join().unwrap(); + } +} diff --git a/cli-config/src/config.rs b/cli-config/src/config.rs index f766f1dec..c65d087ed 100644 --- a/cli-config/src/config.rs +++ b/cli-config/src/config.rs @@ -76,6 +76,17 @@ impl Config { ws_url.to_string() } + pub fn compute_rpc_banks_url(json_rpc_url: &str) -> String { + let json_rpc_url: Option = json_rpc_url.parse().ok(); + if json_rpc_url.is_none() { + return "".to_string(); + } + let mut url = json_rpc_url.unwrap(); + let port = url.port_or_known_default().unwrap_or(80); + url.set_port(Some(port + 2)).expect("unable to set port"); + url.to_string() + } + pub fn import_address_labels

(&mut self, filename: P) -> Result<(), io::Error> where P: AsRef, @@ -122,4 +133,28 @@ mod test { assert_eq!(Config::compute_websocket_url(&"garbage"), String::new()); } + + #[test] + fn compute_rpc_banks_url() { + assert_eq!( + Config::compute_rpc_banks_url(&"http://devnet.solana.com"), + "http://devnet.solana.com:82/".to_string() + ); + + assert_eq!( + Config::compute_rpc_banks_url(&"https://devnet.solana.com"), + "https://devnet.solana.com:445/".to_string() + ); + + assert_eq!( + Config::compute_rpc_banks_url(&"http://example.com:8899"), + "http://example.com:8901/".to_string() + ); + assert_eq!( + Config::compute_rpc_banks_url(&"https://example.com:1234"), + "https://example.com:1236/".to_string() + ); + + assert_eq!(Config::compute_rpc_banks_url(&"garbage"), String::new()); + } } diff --git a/core/Cargo.toml b/core/Cargo.toml index de39b6376..dfca764d6 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -44,6 +44,7 @@ serde = "1.0.112" serde_derive = "1.0.103" serde_json = "1.0.56" solana-account-decoder = { path = "../account-decoder", version = "1.4.0" } +solana-banks-server = { path = "../banks-server", version = "1.4.0" } solana-bpf-loader-program = { path = "../programs/bpf_loader", version = "1.4.0" } solana-budget-program = { path = "../programs/budget", version = "1.4.0" } solana-clap-utils = { path = "../clap-utils", version = "1.4.0" } diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 93229fea3..4e7d08ccf 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -357,7 +357,7 @@ pub fn make_accounts_hashes_message( } // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "6qRS1ZwydpdSqzeyRdDvn5uwfDdFYkuUz4K4jSkd1oFW")] +#[frozen_abi(digest = "CnN1gW2K2TRydGc84eYnQJwdTADPjQf6LJLZ4RP1QeoH")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] enum Protocol { @@ -542,7 +542,7 @@ impl ClusterInfo { } let ip_addr = node.gossip.ip(); Some(format!( - "{:15} {:2}| {:5} | {:44} |{:^15}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", + "{:15} {:2}| {:5} | {:44} |{:^15}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", if ContactInfo::is_valid_address(&node.gossip) { ip_addr.to_string() } else { @@ -565,6 +565,7 @@ impl ClusterInfo { addr_to_string(&ip_addr, &node.serve_repair), addr_to_string(&ip_addr, &node.rpc), addr_to_string(&ip_addr, &node.rpc_pubsub), + addr_to_string(&ip_addr, &node.rpc_banks), node.shred_version, )) } @@ -2419,10 +2420,12 @@ impl Node { let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap(); let rpc_pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_pubsub_port); + let rpc_banks_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap(); + let rpc_banks_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_banks_port); let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let unused = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let info = ContactInfo { id: *pubkey, @@ -2432,7 +2435,7 @@ impl Node { repair: repair.local_addr().unwrap(), tpu: tpu.local_addr().unwrap(), tpu_forwards: tpu_forwards.local_addr().unwrap(), - unused: unused.local_addr().unwrap(), + rpc_banks: rpc_banks_addr, rpc: rpc_addr, rpc_pubsub: rpc_pubsub_addr, serve_repair: serve_repair.local_addr().unwrap(), @@ -2513,7 +2516,7 @@ impl Node { repair: SocketAddr::new(gossip_addr.ip(), repair_port), tpu: SocketAddr::new(gossip_addr.ip(), tpu_port), tpu_forwards: SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), - unused: socketaddr_any!(), + rpc_banks: socketaddr_any!(), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), serve_repair: SocketAddr::new(gossip_addr.ip(), serve_repair_port), diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index 1d75a934b..fce883008 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -25,8 +25,8 @@ pub struct ContactInfo { pub tpu: SocketAddr, /// address to forward unprocessed transactions to pub tpu_forwards: SocketAddr, - /// unused address - pub unused: SocketAddr, + /// address to which to send bank state requests + pub rpc_banks: SocketAddr, /// address to which to send JSON-RPC requests pub rpc: SocketAddr, /// websocket for JSON-RPC push notifications @@ -95,7 +95,7 @@ impl Default for ContactInfo { repair: socketaddr_any!(), tpu: socketaddr_any!(), tpu_forwards: socketaddr_any!(), - unused: socketaddr_any!(), + rpc_banks: socketaddr_any!(), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), serve_repair: socketaddr_any!(), @@ -115,7 +115,7 @@ impl ContactInfo { repair: socketaddr!("127.0.0.1:1237"), tpu: socketaddr!("127.0.0.1:1238"), tpu_forwards: socketaddr!("127.0.0.1:1239"), - unused: socketaddr!("127.0.0.1:1240"), + rpc_banks: socketaddr!("127.0.0.1:1240"), rpc: socketaddr!("127.0.0.1:1241"), rpc_pubsub: socketaddr!("127.0.0.1:1242"), serve_repair: socketaddr!("127.0.0.1:1243"), @@ -137,7 +137,7 @@ impl ContactInfo { repair: addr, tpu: addr, tpu_forwards: addr, - unused: addr, + rpc_banks: addr, rpc: addr, rpc_pubsub: addr, serve_repair: addr, @@ -162,6 +162,7 @@ impl ContactInfo { let repair = next_port(&bind_addr, 5); let rpc = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); let rpc_pubsub = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); + let rpc_banks = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_BANKS_PORT); let serve_repair = next_port(&bind_addr, 6); Self { id: *pubkey, @@ -171,7 +172,7 @@ impl ContactInfo { repair, tpu, tpu_forwards, - unused: "0.0.0.0:0".parse().unwrap(), + rpc_banks, rpc, rpc_pubsub, serve_repair, @@ -248,7 +249,7 @@ mod tests { assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); - assert!(ci.unused.ip().is_unspecified()); + assert!(ci.rpc_banks.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } #[test] @@ -260,7 +261,7 @@ mod tests { assert!(ci.rpc.ip().is_multicast()); assert!(ci.rpc_pubsub.ip().is_multicast()); assert!(ci.tpu.ip().is_multicast()); - assert!(ci.unused.ip().is_multicast()); + assert!(ci.rpc_banks.ip().is_multicast()); assert!(ci.serve_repair.ip().is_multicast()); } #[test] @@ -273,7 +274,7 @@ mod tests { assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); - assert!(ci.unused.ip().is_unspecified()); + assert!(ci.rpc_banks.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } #[test] @@ -286,7 +287,7 @@ mod tests { assert_eq!(ci.tpu_forwards.port(), 13); assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT); assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); - assert!(ci.unused.ip().is_unspecified()); + assert_eq!(ci.rpc_banks.port(), rpc_port::DEFAULT_RPC_BANKS_PORT); assert_eq!(ci.serve_repair.port(), 16); } @@ -310,6 +311,10 @@ mod tests { d1.rpc_pubsub, socketaddr!(format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_PUBSUB_PORT)) ); + assert_eq!( + d1.rpc_banks, + socketaddr!(format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_BANKS_PORT)) + ); assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238")); assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239")); assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240")); diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 636c72eb2..57f5f7f09 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -662,7 +662,7 @@ mod tests { repair: socketaddr!("127.0.0.1:1237"), tpu: socketaddr!("127.0.0.1:1238"), tpu_forwards: socketaddr!("127.0.0.1:1239"), - unused: socketaddr!("127.0.0.1:1240"), + rpc_banks: socketaddr!("127.0.0.1:1240"), rpc: socketaddr!("127.0.0.1:1241"), rpc_pubsub: socketaddr!("127.0.0.1:1242"), serve_repair: socketaddr!("127.0.0.1:1243"), @@ -745,7 +745,7 @@ mod tests { repair: socketaddr!([127, 0, 0, 1], 1237), tpu: socketaddr!([127, 0, 0, 1], 1238), tpu_forwards: socketaddr!([127, 0, 0, 1], 1239), - unused: socketaddr!([127, 0, 0, 1], 1240), + rpc_banks: socketaddr!([127, 0, 0, 1], 1240), rpc: socketaddr!([127, 0, 0, 1], 1241), rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242), serve_repair: serve_repair_addr, @@ -773,7 +773,7 @@ mod tests { repair: socketaddr!([127, 0, 0, 1], 1237), tpu: socketaddr!([127, 0, 0, 1], 1238), tpu_forwards: socketaddr!([127, 0, 0, 1], 1239), - unused: socketaddr!([127, 0, 0, 1], 1240), + rpc_banks: socketaddr!([127, 0, 0, 1], 1240), rpc: socketaddr!([127, 0, 0, 1], 1241), rpc_pubsub: socketaddr!([127, 0, 0, 1], 1242), serve_repair: serve_repair_addr2, diff --git a/core/src/validator.rs b/core/src/validator.rs index e59311bb8..41e60fcdd 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -23,6 +23,7 @@ use crate::{ }; use crossbeam_channel::unbounded; use rand::{thread_rng, Rng}; +use solana_banks_server::rpc_banks_service::RpcBanksService; use solana_ledger::{ bank_forks_utils, blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType}, @@ -72,7 +73,7 @@ pub struct ValidatorConfig { pub voting_disabled: bool, pub account_paths: Vec, pub rpc_config: JsonRpcConfig, - pub rpc_ports: Option<(u16, u16)>, // (API, PubSub) + pub rpc_ports: Option<(u16, u16, u16)>, // (JsonRpc, JsonRpcPubSub, Banks) pub snapshot_config: Option, pub max_ledger_shreds: Option, pub broadcast_stage_type: BroadcastStageType, @@ -148,7 +149,7 @@ struct TransactionHistoryServices { pub struct Validator { pub id: Pubkey, validator_exit: Arc>>, - rpc_service: Option<(JsonRpcService, PubSubService)>, + rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>, transaction_status_service: Option, rewards_recorder_service: Option, gossip_service: GossipService, @@ -282,36 +283,47 @@ impl Validator { )); let rpc_override_health_check = Arc::new(AtomicBool::new(false)); - let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { - if ContactInfo::is_valid_address(&node.info.rpc) { - assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); - assert_eq!(rpc_port, node.info.rpc.port()); - assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port()); - } else { - assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); - } - ( - JsonRpcService::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), - config.rpc_config.clone(), - config.snapshot_config.clone(), - bank_forks.clone(), - block_commitment_cache.clone(), - blockstore.clone(), - cluster_info.clone(), - genesis_config.hash(), - ledger_path, - validator_exit.clone(), - config.trusted_validators.clone(), - rpc_override_health_check.clone(), - ), - PubSubService::new( - &subscriptions, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), - &exit, - ), - ) - }); + let rpc_service = config + .rpc_ports + .map(|(rpc_port, rpc_pubsub_port, rpc_banks_port)| { + if ContactInfo::is_valid_address(&node.info.rpc) { + assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + assert_eq!(rpc_port, node.info.rpc.port()); + assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port()); + assert_eq!(rpc_banks_port, node.info.rpc_banks.port()); + } else { + assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + } + let tpu_address = cluster_info.my_contact_info().tpu; + ( + JsonRpcService::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), + config.rpc_config.clone(), + config.snapshot_config.clone(), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + genesis_config.hash(), + ledger_path, + validator_exit.clone(), + config.trusted_validators.clone(), + rpc_override_health_check.clone(), + ), + PubSubService::new( + &subscriptions, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), + &exit, + ), + RpcBanksService::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_banks_port), + tpu_address, + &bank_forks, + &block_commitment_cache, + &exit, + ), + ) + }); info!( "Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}", @@ -542,9 +554,10 @@ impl Validator { pub fn join(self) -> Result<()> { self.poh_service.join()?; drop(self.poh_recorder); - if let Some((rpc_service, rpc_pubsub_service)) = self.rpc_service { + if let Some((rpc_service, rpc_pubsub_service, rpc_banks_service)) = self.rpc_service { rpc_service.join()?; rpc_pubsub_service.join()?; + rpc_banks_service.join()?; } if let Some(transaction_status_service) = self.transaction_status_service { transaction_status_service.join()?; @@ -870,7 +883,11 @@ impl TestValidator { let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_config); let config = ValidatorConfig { - rpc_ports: Some((node.info.rpc.port(), node.info.rpc_pubsub.port())), + rpc_ports: Some(( + node.info.rpc.port(), + node.info.rpc_pubsub.port(), + node.info.rpc_banks.port(), + )), ..ValidatorConfig::default() }; let node = Validator::new( @@ -1038,6 +1055,7 @@ mod tests { rpc_ports: Some(( validator_node.info.rpc.port(), validator_node.info.rpc_pubsub.port(), + validator_node.info.rpc_banks.port(), )), ..ValidatorConfig::default() }; @@ -1112,6 +1130,7 @@ mod tests { rpc_ports: Some(( validator_node.info.rpc.port(), validator_node.info.rpc_pubsub.port(), + validator_node.info.rpc_banks.port(), )), ..ValidatorConfig::default() }; diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index e299f0864..e2797cb15 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -202,6 +202,7 @@ impl LocalCluster { leader_config.rpc_ports = Some(( leader_node.info.rpc.port(), leader_node.info.rpc_pubsub.port(), + leader_node.info.rpc_banks.port(), )); leader_config.account_paths = vec![leader_ledger_path.join("accounts")]; let leader_server = Validator::new( @@ -343,6 +344,7 @@ impl LocalCluster { config.rpc_ports = Some(( validator_node.info.rpc.port(), validator_node.info.rpc_pubsub.port(), + validator_node.info.rpc_banks.port(), )); config.account_paths = vec![ledger_path.join("accounts")]; let voting_keypair = voting_keypair.unwrap(); @@ -613,8 +615,11 @@ impl Cluster for LocalCluster { // Update the stored ContactInfo for this node let node = Node::new_localhost_with_pubkey(&pubkey); cluster_validator_info.info.contact_info = node.info.clone(); - cluster_validator_info.config.rpc_ports = - Some((node.info.rpc.port(), node.info.rpc_pubsub.port())); + cluster_validator_info.config.rpc_ports = Some(( + node.info.rpc.port(), + node.info.rpc_pubsub.port(), + node.info.rpc_banks.port(), + )); let entry_point_info = { if *pubkey == self.entry_point_info.id { diff --git a/runtime/src/send_transaction_service.rs b/runtime/src/send_transaction_service.rs index 0f24fe8a7..bab219b0e 100644 --- a/runtime/src/send_transaction_service.rs +++ b/runtime/src/send_transaction_service.rs @@ -22,9 +22,9 @@ pub struct SendTransactionService { } pub struct TransactionInfo { - signature: Signature, - wire_transaction: Vec, - last_valid_slot: Slot, + pub signature: Signature, + pub wire_transaction: Vec, + pub last_valid_slot: Slot, } impl TransactionInfo { diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 6eb508ee0..2df5ea57e 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -26,7 +26,7 @@ default = [ "serde_json", "ed25519-dalek", "solana-logger", - "solana-crate-features" + "solana-crate-features", ] [dependencies] diff --git a/sdk/src/commitment_config.rs b/sdk/src/commitment_config.rs index 4ab7b2ca9..d0f67e9fb 100644 --- a/sdk/src/commitment_config.rs +++ b/sdk/src/commitment_config.rs @@ -46,11 +46,27 @@ impl CommitmentConfig { #[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)] #[serde(rename_all = "camelCase")] +/// An attribute of a slot. It describes how finalized a block is at some point in time. For example, a slot +/// is said to be at the max level immediately after the cluster recognizes the block at that slot as +/// finalized. When querying the ledger state, use lower levels of commitment to report progress and higher +/// levels to ensure state changes will not be rolled back. pub enum CommitmentLevel { + /// The highest slot having reached max vote lockout, as recognized by a supermajority of the cluster. Max, + + /// The highest slot of the heaviest fork. Ledger state at this slot is not derived from a finalized + /// block, but if multiple forks are present, is from the fork the validator believes is most likely + /// to finalize. Recent, + + /// The highest slot having reached max vote lockout. Root, + + /// The highest slot having reached 1 confirmation. Single, + + /// The highest slot having reached 1 confirmation via gossip votes; may occur before or after Single, + /// depending on gossip traffic. SingleGossip, } diff --git a/sdk/src/rpc_port.rs b/sdk/src/rpc_port.rs index 531be001b..cbdbcb5dc 100644 --- a/sdk/src/rpc_port.rs +++ b/sdk/src/rpc_port.rs @@ -3,3 +3,6 @@ pub const DEFAULT_RPC_PORT: u16 = 8899; /// Default port number for JSON RPC pubsub pub const DEFAULT_RPC_PUBSUB_PORT: u16 = 8900; + +/// Default port number for Banks RPC API +pub const DEFAULT_RPC_BANKS_PORT: u16 = 8901; diff --git a/tokens/Cargo.toml b/tokens/Cargo.toml index 4b15b950e..dbba6a0d5 100644 --- a/tokens/Cargo.toml +++ b/tokens/Cargo.toml @@ -18,6 +18,7 @@ indexmap = "1.4.0" indicatif = "0.15.0" pickledb = "0.4.1" serde = { version = "1.0", features = ["derive"] } +solana-banks-client = { path = "../banks-client", version = "1.4.0" } solana-clap-utils = { path = "../clap-utils", version = "1.4.0" } solana-cli-config = { path = "../cli-config", version = "1.4.0" } solana-client = { path = "../client", version = "1.4.0" } @@ -25,9 +26,12 @@ solana-remote-wallet = { path = "../remote-wallet", version = "1.4.0" } solana-runtime = { path = "../runtime", version = "1.4.0" } solana-sdk = { path = "../sdk", version = "1.4.0" } solana-stake-program = { path = "../programs/stake", version = "1.4.0" } -solana-transaction-status = { path = "../transaction-status", version = "1.4.0" } tempfile = "3.1.0" thiserror = "1.0" +tokio = "0.2" +url = "2.1" [dev-dependencies] +solana-banks-server = { path = "../banks-server", version = "1.4.0" } solana-core = { path = "../core", version = "1.4.0" } +solana-runtime = { path = "../runtime", version = "1.4.0" } diff --git a/tokens/src/commands.rs b/tokens/src/commands.rs index 8c07dca1c..b464735df 100644 --- a/tokens/src/commands.rs +++ b/tokens/src/commands.rs @@ -1,18 +1,20 @@ use crate::args::{BalancesArgs, DistributeTokensArgs, StakeArgs, TransactionLogArgs}; use crate::db::{self, TransactionInfo}; -use crate::thin_client::{Client, ThinClient}; use console::style; use csv::{ReaderBuilder, Trim}; use indexmap::IndexMap; use indicatif::{ProgressBar, ProgressStyle}; use pickledb::PickleDb; use serde::{Deserialize, Serialize}; +use solana_banks_client::{BanksClient, BanksClientExt}; use solana_sdk::{ + commitment_config::CommitmentLevel, message::Message, native_token::{lamports_to_sol, sol_to_lamports}, signature::{unique_signers, Signature, Signer}, system_instruction, - transport::TransportError, + transaction::Transaction, + transport::{self, TransportError}, }; use solana_stake_program::{ stake_instruction, @@ -21,9 +23,9 @@ use solana_stake_program::{ use std::{ cmp::{self}, io, - thread::sleep, time::Duration, }; +use tokio::time::delay_for; #[derive(Serialize, Deserialize, Debug, Clone)] struct Bid { @@ -92,8 +94,25 @@ fn create_allocation(bid: &Bid, dollars_per_sol: f64) -> Allocation { } } -fn distribute_tokens( - client: &ThinClient, +async fn transfer( + client: &mut BanksClient, + lamports: u64, + sender_keypair: &S, + to_pubkey: &Pubkey, +) -> io::Result { + let create_instruction = + system_instruction::transfer(&sender_keypair.pubkey(), &to_pubkey, lamports); + let message = Message::new(&[create_instruction], Some(&sender_keypair.pubkey())); + let recent_blockhash = client.get_recent_blockhash().await?; + Ok(Transaction::new( + &[sender_keypair], + message, + recent_blockhash, + )) +} + +async fn distribute_tokens( + client: &mut BanksClient, db: &mut PickleDb, allocations: &[Allocation], args: &DistributeTokensArgs, @@ -159,7 +178,17 @@ fn distribute_tokens( let fee_payer_pubkey = args.fee_payer.pubkey(); let message = Message::new(&instructions, Some(&fee_payer_pubkey)); - match client.send_and_confirm_message(message, &signers) { + let result: transport::Result<(Transaction, u64)> = { + if args.dry_run { + Ok((Transaction::new_unsigned(message), std::u64::MAX)) + } else { + let (_fee_calculator, blockhash, last_valid_slot) = client.get_fees().await?; + let transaction = Transaction::new(&signers, message, blockhash); + client.send_transaction(transaction.clone()).await?; + Ok((transaction, last_valid_slot)) + } + }; + match result { Ok((transaction, last_valid_slot)) => { db::set_transaction_info( db, @@ -206,8 +235,8 @@ fn new_spinner_progress_bar() -> ProgressBar { progress_bar } -pub fn process_distribute_tokens( - client: &ThinClient, +pub async fn process_distribute_tokens( + client: &mut BanksClient, args: &DistributeTokensArgs, ) -> Result, Error> { let mut allocations: Vec = @@ -230,7 +259,7 @@ pub fn process_distribute_tokens( let mut db = db::open_db(&args.transaction_db, args.dry_run)?; // Start by finalizing any transactions from the previous run. - let confirmations = finalize_transactions(client, &mut db, args.dry_run)?; + let confirmations = finalize_transactions(client, &mut db, args.dry_run).await?; let transaction_infos = db::read_transaction_infos(&db); apply_previous_transactions(&mut allocations, &transaction_infos); @@ -284,14 +313,14 @@ pub fn process_distribute_tokens( ); } - distribute_tokens(client, &mut db, &allocations, args)?; + distribute_tokens(client, &mut db, &allocations, args).await?; - let opt_confirmations = finalize_transactions(client, &mut db, args.dry_run)?; + let opt_confirmations = finalize_transactions(client, &mut db, args.dry_run).await?; Ok(opt_confirmations) } -fn finalize_transactions( - client: &ThinClient, +async fn finalize_transactions( + client: &mut BanksClient, db: &mut PickleDb, dry_run: bool, ) -> Result, Error> { @@ -299,7 +328,7 @@ fn finalize_transactions( return Ok(None); } - let mut opt_confirmations = update_finalized_transactions(client, db)?; + let mut opt_confirmations = update_finalized_transactions(client, db).await?; let progress_bar = new_spinner_progress_bar(); @@ -312,8 +341,8 @@ fn finalize_transactions( } // Sleep for about 1 slot - sleep(Duration::from_millis(500)); - let opt_conf = update_finalized_transactions(client, db)?; + delay_for(Duration::from_millis(500)).await; + let opt_conf = update_finalized_transactions(client, db).await?; opt_confirmations = opt_conf; } @@ -322,8 +351,8 @@ fn finalize_transactions( // Update the finalized bit on any transactions that are now rooted // Return the lowest number of confirmations on the unfinalized transactions or None if all are finalized. -fn update_finalized_transactions( - client: &ThinClient, +async fn update_finalized_transactions( + client: &mut BanksClient, db: &mut PickleDb, ) -> Result, Error> { let transaction_infos = db::read_transaction_infos(db); @@ -342,8 +371,10 @@ fn update_finalized_transactions( .map(|(tx, _slot)| tx.signatures[0]) .filter(|sig| *sig != Signature::default()) // Filter out dry-run signatures .collect(); - let transaction_statuses = client.get_signature_statuses(&unconfirmed_signatures)?; - let root_slot = client.get_slot()?; + let transaction_statuses = client + .get_transaction_statuses(unconfirmed_signatures) + .await?; + let root_slot = client.get_root_slot().await?; let mut confirmations = None; for ((transaction, last_valid_slot), opt_transaction_status) in unconfirmed_transactions @@ -368,7 +399,10 @@ fn update_finalized_transactions( Ok(confirmations) } -pub fn process_balances(client: &ThinClient, args: &BalancesArgs) -> Result<(), csv::Error> { +pub async fn process_balances( + client: &mut BanksClient, + args: &BalancesArgs, +) -> Result<(), csv::Error> { let allocations: Vec = read_allocations(&args.input_csv, args.from_bids, args.dollars_per_sol); let allocations = merge_allocations(&allocations); @@ -385,7 +419,7 @@ pub fn process_balances(client: &ThinClient, args: &BalancesArgs) -> Result<(), for allocation in &allocations { let address = allocation.recipient.parse().unwrap(); let expected = lamports_to_sol(sol_to_lamports(allocation.amount)); - let actual = lamports_to_sol(client.get_balance(&address).unwrap()); + let actual = lamports_to_sol(client.get_balance(address).await.unwrap()); println!( "{:<44} {:>24.9} {:>24.9} {:>24.9}", allocation.recipient, @@ -406,15 +440,30 @@ pub fn process_transaction_log(args: &TransactionLogArgs) -> Result<(), Error> { use solana_sdk::{pubkey::Pubkey, signature::Keypair}; use tempfile::{tempdir, NamedTempFile}; -pub fn test_process_distribute_tokens_with_client(client: C, sender_keypair: Keypair) { - let thin_client = ThinClient::new(client, false); +pub async fn test_process_distribute_tokens_with_client( + client: &mut BanksClient, + sender_keypair: Keypair, +) { let fee_payer = Keypair::new(); - let (transaction, _last_valid_slot) = thin_client - .transfer(sol_to_lamports(1.0), &sender_keypair, &fee_payer.pubkey()) - .unwrap(); - thin_client - .poll_for_confirmation(&transaction.signatures[0]) + let transaction = transfer( + client, + sol_to_lamports(1.0), + &sender_keypair, + &fee_payer.pubkey(), + ) + .await + .unwrap(); + client + .process_transaction_with_commitment(transaction, CommitmentLevel::Recent) + .await .unwrap(); + assert_eq!( + client + .get_balance_with_commitment(fee_payer.pubkey(), CommitmentLevel::Recent) + .await + .unwrap(), + sol_to_lamports(1.0), + ); let alice_pubkey = Pubkey::new_rand(); let allocation = Allocation { @@ -445,7 +494,7 @@ pub fn test_process_distribute_tokens_with_client(client: C, sender_k dollars_per_sol: None, stake_args: None, }; - let confirmations = process_distribute_tokens(&thin_client, &args).unwrap(); + let confirmations = process_distribute_tokens(client, &args).await.unwrap(); assert_eq!(confirmations, None); let transaction_infos = @@ -459,12 +508,12 @@ pub fn test_process_distribute_tokens_with_client(client: C, sender_k ); assert_eq!( - thin_client.get_balance(&alice_pubkey).unwrap(), + client.get_balance(alice_pubkey).await.unwrap(), expected_amount, ); // Now, run it again, and check there's no double-spend. - process_distribute_tokens(&thin_client, &args).unwrap(); + process_distribute_tokens(client, &args).await.unwrap(); let transaction_infos = db::read_transaction_infos(&db::open_db(&transaction_db, true).unwrap()); assert_eq!(transaction_infos.len(), 1); @@ -476,19 +525,27 @@ pub fn test_process_distribute_tokens_with_client(client: C, sender_k ); assert_eq!( - thin_client.get_balance(&alice_pubkey).unwrap(), + client.get_balance(alice_pubkey).await.unwrap(), expected_amount, ); } -pub fn test_process_distribute_stake_with_client(client: C, sender_keypair: Keypair) { - let thin_client = ThinClient::new(client, false); +pub async fn test_process_distribute_stake_with_client( + client: &mut BanksClient, + sender_keypair: Keypair, +) { let fee_payer = Keypair::new(); - let (transaction, _last_valid_slot) = thin_client - .transfer(sol_to_lamports(1.0), &sender_keypair, &fee_payer.pubkey()) - .unwrap(); - thin_client - .poll_for_confirmation(&transaction.signatures[0]) + let transaction = transfer( + client, + sol_to_lamports(1.0), + &sender_keypair, + &fee_payer.pubkey(), + ) + .await + .unwrap(); + client + .process_transaction_with_commitment(transaction, CommitmentLevel::Recent) + .await .unwrap(); let stake_account_keypair = Keypair::new(); @@ -510,8 +567,11 @@ pub fn test_process_distribute_stake_with_client(client: C, sender_ke ); let message = Message::new(&instructions, Some(&sender_keypair.pubkey())); let signers = [&sender_keypair, &stake_account_keypair]; - thin_client - .send_and_confirm_message(message, &signers) + let blockhash = client.get_recent_blockhash().await.unwrap(); + let transaction = Transaction::new(&signers, message, blockhash); + client + .process_transaction_with_commitment(transaction, CommitmentLevel::Recent) + .await .unwrap(); let alice_pubkey = Pubkey::new_rand(); @@ -549,7 +609,7 @@ pub fn test_process_distribute_stake_with_client(client: C, sender_ke sender_keypair: Box::new(sender_keypair), dollars_per_sol: None, }; - let confirmations = process_distribute_tokens(&thin_client, &args).unwrap(); + let confirmations = process_distribute_tokens(client, &args).await.unwrap(); assert_eq!(confirmations, None); let transaction_infos = @@ -563,17 +623,17 @@ pub fn test_process_distribute_stake_with_client(client: C, sender_ke ); assert_eq!( - thin_client.get_balance(&alice_pubkey).unwrap(), + client.get_balance(alice_pubkey).await.unwrap(), sol_to_lamports(1.0), ); let new_stake_account_address = transaction_infos[0].new_stake_account_address.unwrap(); assert_eq!( - thin_client.get_balance(&new_stake_account_address).unwrap(), + client.get_balance(new_stake_account_address).await.unwrap(), expected_amount - sol_to_lamports(1.0), ); // Now, run it again, and check there's no double-spend. - process_distribute_tokens(&thin_client, &args).unwrap(); + process_distribute_tokens(client, &args).await.unwrap(); let transaction_infos = db::read_transaction_infos(&db::open_db(&transaction_db, true).unwrap()); assert_eq!(transaction_infos.len(), 1); @@ -585,11 +645,11 @@ pub fn test_process_distribute_stake_with_client(client: C, sender_ke ); assert_eq!( - thin_client.get_balance(&alice_pubkey).unwrap(), + client.get_balance(alice_pubkey).await.unwrap(), sol_to_lamports(1.0), ); assert_eq!( - thin_client.get_balance(&new_stake_account_address).unwrap(), + client.get_balance(new_stake_account_address).await.unwrap(), expected_amount - sol_to_lamports(1.0), ); } @@ -597,23 +657,33 @@ pub fn test_process_distribute_stake_with_client(client: C, sender_ke #[cfg(test)] mod tests { use super::*; - use solana_runtime::{bank::Bank, bank_client::BankClient}; + use solana_banks_client::start_client; + use solana_banks_server::banks_server::start_local_server; + use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::genesis_config::create_genesis_config; + use std::sync::{Arc, RwLock}; + use tokio::runtime::Runtime; #[test] fn test_process_distribute_tokens() { let (genesis_config, sender_keypair) = create_genesis_config(sol_to_lamports(9_000_000.0)); - let bank = Bank::new(&genesis_config); - let bank_client = BankClient::new(bank); - test_process_distribute_tokens_with_client(bank_client, sender_keypair); + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::new(&genesis_config)))); + Runtime::new().unwrap().block_on(async { + let transport = start_local_server(&bank_forks).await; + let mut banks_client = start_client(transport).await.unwrap(); + test_process_distribute_tokens_with_client(&mut banks_client, sender_keypair).await; + }); } #[test] fn test_process_distribute_stake() { let (genesis_config, sender_keypair) = create_genesis_config(sol_to_lamports(9_000_000.0)); - let bank = Bank::new(&genesis_config); - let bank_client = BankClient::new(bank); - test_process_distribute_stake_with_client(bank_client, sender_keypair); + let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::new(&genesis_config)))); + Runtime::new().unwrap().block_on(async { + let transport = start_local_server(&bank_forks).await; + let mut banks_client = start_client(transport).await.unwrap(); + test_process_distribute_stake_with_client(&mut banks_client, sender_keypair).await; + }); } #[test] diff --git a/tokens/src/db.rs b/tokens/src/db.rs index d792bb346..85daac62e 100644 --- a/tokens/src/db.rs +++ b/tokens/src/db.rs @@ -1,8 +1,8 @@ use chrono::prelude::*; use pickledb::{error::Error, PickleDb, PickleDbDumpPolicy}; use serde::{Deserialize, Serialize}; +use solana_banks_client::TransactionStatus; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature, transaction::Transaction}; -use solana_transaction_status::TransactionStatus; use std::{cmp::Ordering, fs, io, path::Path}; #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] @@ -155,7 +155,7 @@ pub fn update_finalized_transaction( return Ok(Some(confirmations)); } - if let Err(e) = &transaction_status.status { + if let Some(e) = &transaction_status.err { // The transaction was finalized, but execution failed. Drop it. eprintln!( "Error in transaction with signature {}: {}", @@ -273,7 +273,6 @@ mod tests { let transaction_status = TransactionStatus { slot: 0, confirmations: Some(1), - status: Ok(()), err: None, }; assert_eq!( @@ -297,12 +296,10 @@ mod tests { let signature = Signature::default(); let transaction_info = TransactionInfo::default(); db.set(&signature.to_string(), &transaction_info).unwrap(); - let status = Err(TransactionError::AccountNotFound); let transaction_status = TransactionStatus { slot: 0, confirmations: None, - status, - err: None, + err: Some(TransactionError::AccountNotFound), }; assert_eq!( update_finalized_transaction(&mut db, &signature, Some(transaction_status), 0, 0) @@ -325,7 +322,6 @@ mod tests { let transaction_status = TransactionStatus { slot: 0, confirmations: None, - status: Ok(()), err: None, }; assert_eq!( diff --git a/tokens/src/lib.rs b/tokens/src/lib.rs index f955230c8..ad25864bb 100644 --- a/tokens/src/lib.rs +++ b/tokens/src/lib.rs @@ -2,4 +2,3 @@ pub mod arg_parser; pub mod args; pub mod commands; mod db; -pub mod thin_client; diff --git a/tokens/src/main.rs b/tokens/src/main.rs index 080793ba0..4080de5b4 100644 --- a/tokens/src/main.rs +++ b/tokens/src/main.rs @@ -1,11 +1,9 @@ -use solana_cli_config::Config; -use solana_cli_config::CONFIG_FILE; -use solana_client::rpc_client::RpcClient; -use solana_tokens::{arg_parser::parse_args, args::Command, commands, thin_client::ThinClient}; -use std::env; -use std::error::Error; -use std::path::Path; -use std::process; +use solana_banks_client::start_tcp_client; +use solana_cli_config::{Config, CONFIG_FILE}; +use solana_tokens::{arg_parser::parse_args, args::Command, commands}; +use std::{env, error::Error, path::Path, process}; +use tokio::runtime::Runtime; +use url::Url; fn main() -> Result<(), Box> { let command_args = parse_args(env::args_os())?; @@ -20,16 +18,22 @@ fn main() -> Result<(), Box> { Config::default() }; let json_rpc_url = command_args.url.unwrap_or(config.json_rpc_url); - let client = RpcClient::new(json_rpc_url); + let rpc_banks_url = Config::compute_rpc_banks_url(&json_rpc_url); + let url = Url::parse(&rpc_banks_url)?; + let host_port = (url.host_str().unwrap(), url.port().unwrap()); + + let mut runtime = Runtime::new().unwrap(); + let mut banks_client = runtime.block_on(start_tcp_client(&host_port))?; match command_args.command { Command::DistributeTokens(args) => { - let thin_client = ThinClient::new(client, args.dry_run); - commands::process_distribute_tokens(&thin_client, &args)?; + runtime.block_on(commands::process_distribute_tokens( + &mut banks_client, + &args, + ))?; } Command::Balances(args) => { - let thin_client = ThinClient::new(client, false); - commands::process_balances(&thin_client, &args)?; + runtime.block_on(commands::process_balances(&mut banks_client, &args))?; } Command::TransactionLog(args) => { commands::process_transaction_log(&args)?; diff --git a/tokens/src/thin_client.rs b/tokens/src/thin_client.rs deleted file mode 100644 index 86c7635ba..000000000 --- a/tokens/src/thin_client.rs +++ /dev/null @@ -1,193 +0,0 @@ -use solana_client::{rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig}; -use solana_runtime::bank_client::BankClient; -use solana_sdk::{ - account::Account, - client::{AsyncClient, SyncClient}, - clock::Slot, - commitment_config::CommitmentConfig, - fee_calculator::FeeCalculator, - hash::Hash, - message::Message, - pubkey::Pubkey, - signature::{Signature, Signer}, - signers::Signers, - system_instruction, - transaction::Transaction, - transport::{Result, TransportError}, -}; -use solana_transaction_status::TransactionStatus; - -pub trait Client { - fn send_transaction1(&self, transaction: Transaction) -> Result; - fn get_signature_statuses1( - &self, - signatures: &[Signature], - ) -> Result>>; - fn get_balance1(&self, pubkey: &Pubkey) -> Result; - fn get_fees1(&self) -> Result<(Hash, FeeCalculator, Slot)>; - fn get_slot1(&self) -> Result; - fn get_account1(&self, pubkey: &Pubkey) -> Result>; -} - -impl Client for RpcClient { - fn send_transaction1(&self, transaction: Transaction) -> Result { - self.send_transaction_with_config( - &transaction, - RpcSendTransactionConfig { - skip_preflight: true, - }, - ) - .map_err(|e| TransportError::Custom(e.to_string())) - } - - fn get_signature_statuses1( - &self, - signatures: &[Signature], - ) -> Result>> { - self.get_signature_statuses(signatures) - .map(|response| response.value) - .map_err(|e| TransportError::Custom(e.to_string())) - } - - fn get_balance1(&self, pubkey: &Pubkey) -> Result { - self.get_balance(pubkey) - .map_err(|e| TransportError::Custom(e.to_string())) - } - - fn get_fees1(&self) -> Result<(Hash, FeeCalculator, Slot)> { - let result = self - .get_recent_blockhash_with_commitment(CommitmentConfig::default()) - .map_err(|e| TransportError::Custom(e.to_string()))?; - Ok(result.value) - } - - fn get_slot1(&self) -> Result { - self.get_slot() - .map_err(|e| TransportError::Custom(e.to_string())) - } - - fn get_account1(&self, pubkey: &Pubkey) -> Result> { - self.get_account(pubkey) - .map(Some) - .map_err(|e| TransportError::Custom(e.to_string())) - } -} - -impl Client for BankClient { - fn send_transaction1(&self, transaction: Transaction) -> Result { - self.async_send_transaction(transaction) - } - - fn get_signature_statuses1( - &self, - signatures: &[Signature], - ) -> Result>> { - signatures - .iter() - .map(|signature| { - self.get_signature_status(signature).map(|opt| { - opt.map(|status| TransactionStatus { - slot: 0, - confirmations: None, - status, - err: None, - }) - }) - }) - .collect() - } - - fn get_balance1(&self, pubkey: &Pubkey) -> Result { - self.get_balance(pubkey) - } - - fn get_fees1(&self) -> Result<(Hash, FeeCalculator, Slot)> { - self.get_recent_blockhash_with_commitment(CommitmentConfig::default()) - } - - fn get_slot1(&self) -> Result { - self.get_slot() - } - - fn get_account1(&self, pubkey: &Pubkey) -> Result> { - self.get_account(pubkey) - } -} - -pub struct ThinClient<'a> { - client: Box, - dry_run: bool, -} - -impl<'a> ThinClient<'a> { - pub fn new(client: C, dry_run: bool) -> Self { - Self { - client: Box::new(client), - dry_run, - } - } - - pub fn send_transaction(&self, transaction: Transaction) -> Result { - if self.dry_run { - return Ok(Signature::default()); - } - self.client.send_transaction1(transaction) - } - - pub fn poll_for_confirmation(&self, signature: &Signature) -> Result<()> { - while self.get_signature_statuses(&[*signature])?[0].is_none() { - std::thread::sleep(std::time::Duration::from_millis(500)); - } - Ok(()) - } - - pub fn get_signature_statuses( - &self, - signatures: &[Signature], - ) -> Result>> { - self.client.get_signature_statuses1(signatures) - } - - pub fn send_and_confirm_message( - &self, - message: Message, - signers: &S, - ) -> Result<(Transaction, Slot)> { - if self.dry_run { - return Ok((Transaction::new_unsigned(message), std::u64::MAX)); - } - let (blockhash, _fee_caluclator, last_valid_slot) = self.get_fees()?; - - let transaction = Transaction::new(signers, message, blockhash); - self.send_transaction(transaction.clone())?; - Ok((transaction, last_valid_slot)) - } - - pub fn transfer( - &self, - lamports: u64, - sender_keypair: &S, - to_pubkey: &Pubkey, - ) -> Result<(Transaction, u64)> { - let create_instruction = - system_instruction::transfer(&sender_keypair.pubkey(), &to_pubkey, lamports); - let message = Message::new(&[create_instruction], Some(&sender_keypair.pubkey())); - self.send_and_confirm_message(message, &[sender_keypair]) - } - - pub fn get_fees(&self) -> Result<(Hash, FeeCalculator, Slot)> { - self.client.get_fees1() - } - - pub fn get_slot(&self) -> Result { - self.client.get_slot1() - } - - pub fn get_balance(&self, pubkey: &Pubkey) -> Result { - self.client.get_balance1(pubkey) - } - - pub fn get_account(&self, pubkey: &Pubkey) -> Result> { - self.client.get_account1(pubkey) - } -} diff --git a/tokens/tests/commands.rs b/tokens/tests/commands.rs index 3bdab39c4..f6568fdc8 100644 --- a/tokens/tests/commands.rs +++ b/tokens/tests/commands.rs @@ -1,18 +1,29 @@ -use solana_client::rpc_client::RpcClient; +use solana_banks_client::start_tcp_client; use solana_core::validator::{TestValidator, TestValidatorOptions}; use solana_sdk::native_token::sol_to_lamports; use solana_tokens::commands::test_process_distribute_tokens_with_client; use std::fs::remove_dir_all; +use tokio::runtime::Runtime; #[test] fn test_process_distribute_with_rpc_client() { - let validator = TestValidator::run_with_options(TestValidatorOptions { + let TestValidator { + server, + leader_data, + alice, + ledger_path, + .. + } = TestValidator::run_with_options(TestValidatorOptions { mint_lamports: sol_to_lamports(9_000_000.0), ..TestValidatorOptions::default() }); - let rpc_client = RpcClient::new_socket(validator.leader_data.rpc); - test_process_distribute_tokens_with_client(rpc_client, validator.alice); - validator.server.close().unwrap(); - remove_dir_all(validator.ledger_path).unwrap(); + Runtime::new().unwrap().block_on(async { + let mut banks_client = start_tcp_client(leader_data.rpc_banks).await.unwrap(); + test_process_distribute_tokens_with_client(&mut banks_client, alice).await + }); + + // Explicit cleanup, otherwise "pure virtual method called" crash in Docker + server.close().unwrap(); + remove_dir_all(ledger_path).unwrap(); } diff --git a/validator/src/main.rs b/validator/src/main.rs index a401934b7..1301f7fa6 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -599,7 +599,7 @@ pub fn main() { .value_name("PORT") .takes_value(true) .validator(port_validator) - .help("Use this port for JSON RPC, and the next port for the RPC websocket"), + .help("Use this port for JSON RPC, the next port for the RPC websocket, and the following for the RPC banks API"), ) .arg( Arg::with_name("private_rpc") @@ -960,7 +960,7 @@ pub fn main() { }, rpc_ports: value_t!(matches, "rpc_port", u16) .ok() - .map(|rpc_port| (rpc_port, rpc_port + 1)), + .map(|rpc_port| (rpc_port, rpc_port + 1, rpc_port + 2)), voting_disabled: matches.is_present("no_voting"), wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(), trusted_validators, @@ -1178,9 +1178,10 @@ pub fn main() { ); if !private_rpc { - if let Some((rpc_port, rpc_pubsub_port)) = validator_config.rpc_ports { + if let Some((rpc_port, rpc_pubsub_port, rpc_banks_port)) = validator_config.rpc_ports { node.info.rpc = SocketAddr::new(node.info.gossip.ip(), rpc_port); node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), rpc_pubsub_port); + node.info.rpc_banks = SocketAddr::new(node.info.gossip.ip(), rpc_banks_port); } } @@ -1199,8 +1200,12 @@ pub fn main() { let mut tcp_listeners = vec![]; if !private_rpc { - if let Some((rpc_port, rpc_pubsub_port)) = validator_config.rpc_ports { - for (purpose, port) in &[("RPC", rpc_port), ("RPC pubsub", rpc_pubsub_port)] { + if let Some((rpc_port, rpc_pubsub_port, rpc_banks_port)) = validator_config.rpc_ports { + for (purpose, port) in &[ + ("RPC", rpc_port), + ("RPC pubsub", rpc_pubsub_port), + ("RPC banks", rpc_banks_port), + ] { tcp_listeners.push(( *port, TcpListener::bind(&SocketAddr::from((rpc_bind_address, *port)))