From ca616bcf2b493868038dcc7e4905410d1e2a1801 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Sun, 11 Dec 2022 18:06:39 +0100 Subject: [PATCH] making the batching of transactions work --- src/cli.rs | 37 ++++--------------- src/context.rs | 2 +- src/main.rs | 61 +++++++++---------------------- src/rpc.rs | 99 ++++++++++++++++++++++++++++---------------------- 4 files changed, 82 insertions(+), 117 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 72a6b92a..16fcfe46 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,6 +1,5 @@ -use clap::Subcommand; - use clap::Parser; +use solana_cli_config::ConfigInput; /// Holds the configuration for a single run of the benchmark #[derive(Parser, Debug)] @@ -13,24 +12,18 @@ use clap::Parser; " )] pub struct Args { - #[clap(subcommand)] - pub command: Command, - /* - #[arg(short, long, default_value_t = String::from("8899"))] - pub port: String, - #[arg(short, long, default_value_t = String::from("8900"))] - pub subscription_port: String, + #[arg(short, long, default_value_t = 8899)] + pub port: u16, + #[arg(short, long, default_value_t = 8900)] + pub subscription_port: u16, #[arg(short, long, default_value_t = String::from("http://localhost:8899"))] pub rpc_url: String, - #[arg(short, long, default_value_t = String::new())] + #[arg(short, long, default_value_t = String::from("http://localhost:8900"))] pub websocket_url: String, - */ } -/* + impl Args { - pub fn resolve_address(&mut self) { - if self.rpc_url.is_empty() { let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting( self.rpc_url.as_str(), @@ -48,20 +41,4 @@ impl Args { self.websocket_url = ws_url; } } - -} -*/ -#[derive(Subcommand, Debug)] -pub enum Command { - Run { - #[arg(short, long, default_value_t = String::from("8899"))] - port: String, - #[arg(short, long, default_value_t = String::from("8900"))] - subscription_port: String, - #[arg(short, long, default_value_t = String::from("http://localhost:8899"))] - rpc_url: String, - #[arg(short, long, default_value_t = String::new())] - websocket_url: String, - }, - Test, } diff --git a/src/context.rs b/src/context.rs index 9fd06326..33c8dd08 100644 --- a/src/context.rs +++ b/src/context.rs @@ -329,7 +329,7 @@ impl PerformanceCounter { self.last_count_for_transactions_sent .store(total_transactions, Ordering::Relaxed); } - + pub fn update_confirm_transaction_counter(&self) { self.total_confirmations.fetch_add(1, Ordering::Relaxed); } diff --git a/src/main.rs b/src/main.rs index 7d02dfa2..22568812 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,8 @@ -use std::{net::SocketAddr, sync::Arc, thread::sleep}; +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::Arc, + thread::sleep, +}; use clap::Parser; use context::LiteRpcSubsrciptionControl; @@ -24,7 +28,7 @@ mod rpc; use cli::Args; -fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: String) { +fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String) { let rpc_url = if rpc_url.is_empty() { let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting( rpc_url.as_str(), @@ -60,10 +64,8 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: notification_reciever, )); - let subscription_port = format!("127.0.0.1:{}",subscription_port) - - .parse::() - .expect("Invalid subscription port"); + let subscription_port = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), subscription_port); // start websocket server let (_trigger, websocket_service) = LitePubSubService::new( @@ -114,9 +116,7 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: ); let max_request_body_size: usize = 50 * (1 << 10); - let socket_addr = format!("127.0.0.1:{}",rpc_addr).parse::().unwrap(); - - + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); { let request_processor = request_processor.clone(); let server = @@ -140,40 +140,15 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: cleaning_thread.join().unwrap(); } -fn ts_test() { - let res = std::process::Command::new("yarn") - .args(["run", "test:test-validator"]) - .output() - .unwrap(); - println!("{}", String::from_utf8_lossy(&res.stdout)); - println!("{}", String::from_utf8_lossy(&res.stderr)); -} - -#[tokio::main] pub async fn main() { - let cli_command = Args::parse(); + let mut cli = Args::parse(); + cli.resolve_address(); + let Args { + port, + subscription_port, + rpc_url, + websocket_url, + } = cli; - match cli_command.command { - cli::Command::Run { - port, - subscription_port, - rpc_url, - websocket_url, - } => run(port, subscription_port, rpc_url, websocket_url), - cli::Command::Test => ts_test(), - } - //cli_config.resolve_address(); - //println!( - // "Using rpc server {} and ws server {}", - // cli_config.rpc_url, cli_config.websocket_url - //); - //let Args { - // rpc_url: json_rpc_url, - // websocket_url, - // port: rpc_addr, - // subscription_port, - // .. - //} = &cli_config; - - // start recieving notifications and broadcast them + run(port, subscription_port, rpc_url, websocket_url) } diff --git a/src/rpc.rs b/src/rpc.rs index e88304d3..24cb3370 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,11 +1,11 @@ use dashmap::DashMap; -use futures::executor::block_on; use serde::{Deserialize, Serialize}; use solana_client::{ pubsub_client::{BlockSubscription, PubsubClientError}, tpu_client::TpuClientConfig, }; use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient}; +use solana_sdk::transaction::Transaction; use std::{ str::FromStr, sync::Mutex, @@ -25,7 +25,7 @@ use { jsonrpc_derive::rpc, solana_client::connection_cache::ConnectionCache, solana_client::rpc_client::RpcClient, - solana_client::nonblocking::rpc_client::RpcClient as NonblockingRpcClient, + solana_client::tpu_client::TpuClient, solana_perf::packet::PACKET_DATA_SIZE, solana_rpc_client_api::{ config::*, @@ -34,9 +34,7 @@ use { solana_sdk::{ commitment_config::{CommitmentConfig, CommitmentLevel}, signature::Signature, - transaction::VersionedTransaction, }, - solana_tpu_client::nonblocking::tpu_client::TpuClient, solana_transaction_status::{TransactionBinaryEncoding, UiTransactionEncoding}, std::{ any::type_name, @@ -56,7 +54,7 @@ pub struct LightRpcRequestProcessor { joinables: Arc>>>, subscribed_clients: Arc>>, performance_counter: PerformanceCounter, - tpu_producer_channel: Sender>, + tpu_producer_channel: Sender, } impl LightRpcRequestProcessor { @@ -66,19 +64,8 @@ impl LightRpcRequestProcessor { notification_sender: Sender, performance_counter: PerformanceCounter, ) -> LightRpcRequestProcessor { - - let nonblocking_rpc_client = Arc::new(NonblockingRpcClient::new(json_rpc_url.to_string())); let rpc_client = Arc::new(RpcClient::new(json_rpc_url)); let connection_cache = Arc::new(ConnectionCache::default()); - let tpu_client = Arc::new( - block_on(TpuClient::new_with_connection_cache( - nonblocking_rpc_client, - websocket_url, - TpuClientConfig {fanout_slots : 100}, // value for max fanout slots - connection_cache.clone(), - )) - .unwrap(), - ); let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender)); @@ -104,9 +91,16 @@ impl LightRpcRequestProcessor { &context, CommitmentLevel::Finalized, ), - Self::build_thread_to_process_transactions(tpu_client.clone(), tpu_consumer, performance_counter.clone()), ]; + Self::build_thread_to_process_transactions( + json_rpc_url.to_string(), + websocket_url.to_string(), + connection_cache.clone(), + tpu_consumer, + performance_counter.clone(), + ); + LightRpcRequestProcessor { rpc_client, last_valid_block_height: 0, @@ -165,14 +159,28 @@ impl LightRpcRequestProcessor { } fn build_thread_to_process_transactions( - tpu_client: Arc, - receiver: Receiver>, - performance_counters : PerformanceCounter, - ) -> JoinHandle<()> { + json_rpc_url: String, + websocket_url: String, + connection_cache: Arc, + receiver: Receiver, + performance_counters: PerformanceCounter, + ) { Builder::new() - .name("tpu sender".to_string()) - .spawn(move || loop { + .name("thread working on confirmation block".to_string()) + .spawn(move || { + let nonblocking_rpc_client = + Arc::new(RpcClient::new(json_rpc_url.to_string())); + let tpu_client = TpuClient::new_with_connection_cache( + nonblocking_rpc_client, + websocket_url.as_str(), + TpuClientConfig::default(), // value for max fanout slots + connection_cache.clone(), + ); + let tpu_client = Arc::new(tpu_client.unwrap()); + + loop { let recv_res = receiver.recv(); + println!("recieved a transaction"); match recv_res { Ok(transaction) => { let mut transactions_vec = vec![transaction]; @@ -190,14 +198,18 @@ impl LightRpcRequestProcessor { None => break, } } - let count:u64 = transactions_vec.len() as u64; - let fut_res = block_on(tpu_client.try_send_wire_transaction_batch(transactions_vec)); - match fut_res - { - Ok(_) => performance_counters.total_confirmations.fetch_add( count, Ordering::Relaxed), + let count: u64 = transactions_vec.len() as u64; + let slice = transactions_vec.as_slice(); + let fut_res = tpu_client.try_send_transaction_batch(slice); + match fut_res { + Ok(_) => performance_counters + .total_transactions_sent + .fetch_add(count, Ordering::Relaxed), Err(e) => { println!("Got error while sending transaction batch of size {}, error {}", count, e.to_string()); - performance_counters.transaction_sent_error.fetch_add(count, Ordering::Relaxed) + performance_counters + .transaction_sent_error + .fetch_add(count, Ordering::Relaxed) } }; } @@ -206,8 +218,8 @@ impl LightRpcRequestProcessor { break; } }; - }) - .unwrap() + } + }).unwrap(); } fn process_block( @@ -325,7 +337,7 @@ pub mod lite_rpc { use std::str::FromStr; use itertools::Itertools; - use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey}; + use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey, transaction::Transaction}; use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus}; use super::*; @@ -409,19 +421,20 @@ pub mod lite_rpc { tx_encoding )) })?; - let (wire_transaction, transaction) = - decode_and_deserialize::(data, binary_encoding)?; + let (_wire_transaction, transaction) = + decode_and_deserialize::(data, binary_encoding)?; + let signature = transaction.signatures[0].to_string(); + meta.context + .signature_status + .insert(signature.clone(), SignatureStatus::new()); - meta.context.signature_status.insert( - transaction.signatures[0].to_string(), - SignatureStatus::new(), - ); - - match meta.tpu_producer_channel.send(wire_transaction) { - Ok(_) => Ok(transaction.signatures[0].to_string()), - Err(e) => { + match meta.tpu_producer_channel.send(transaction) { + Ok(_) => Ok(signature), + Err(e) => { println!("got error while sending on channel {}", e.to_string()); - Err(jsonrpc_core::Error::new(jsonrpc_core::ErrorCode::InternalError)) + Err(jsonrpc_core::Error::new( + jsonrpc_core::ErrorCode::InternalError, + )) } } }