From ddccf28022d16a76862ef119d96c67fce205150b Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Sat, 10 Dec 2022 15:33:57 +0100 Subject: [PATCH] sending transactions in batches in a new thread --- src/context.rs | 48 +++++++++++++++++++----- src/main.rs | 27 +++++++++++--- src/rpc.rs | 99 +++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 141 insertions(+), 33 deletions(-) diff --git a/src/context.rs b/src/context.rs index 66ef583d..9fd06326 100644 --- a/src/context.rs +++ b/src/context.rs @@ -48,8 +48,32 @@ impl BlockInformation { } } +pub struct SignatureStatus { + pub status: Option, + pub error: Option, + pub created: Instant, +} + +impl SignatureStatus { + pub fn new() -> Self { + Self { + status: None, + error: None, + created: Instant::now(), + } + } + + pub fn new_from_commitment(commitment: CommitmentLevel) -> Self { + Self { + status: Some(commitment), + error: None, + created: Instant::now(), + } + } +} + pub struct LiteRpcContext { - pub signature_status: DashMap>, + pub signature_status: DashMap, pub finalized_block_info: BlockInformation, pub confirmed_block_info: BlockInformation, pub notification_sender: Sender, @@ -67,6 +91,11 @@ impl LiteRpcContext { notification_sender, } } + + pub fn remove_stale_data(&self, purgetime_in_seconds: u64) { + self.signature_status + .retain(|_k, v| v.created.elapsed().as_secs() < purgetime_in_seconds); + } } pub struct SignatureNotification { @@ -259,6 +288,8 @@ pub struct PerformanceCounter { pub confirmations_per_seconds: Arc, pub transactions_per_seconds: Arc, + pub transaction_sent_error: Arc, + last_count_for_confirmations: Arc, last_count_for_transactions_sent: Arc, } @@ -272,6 +303,7 @@ impl PerformanceCounter { transactions_per_seconds: Arc::new(AtomicU64::new(0)), last_count_for_confirmations: Arc::new(AtomicU64::new(0)), last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)), + transaction_sent_error: Arc::new(AtomicU64::new(0)), } } @@ -297,11 +329,7 @@ impl PerformanceCounter { self.last_count_for_transactions_sent .store(total_transactions, Ordering::Relaxed); } - - pub fn update_sent_transactions_counter(&self) { - self.total_transactions_sent.fetch_add(1, Ordering::Relaxed); - } - + pub fn update_confirm_transaction_counter(&self) { self.total_confirmations.fetch_add(1, Ordering::Relaxed); } @@ -321,15 +349,17 @@ pub fn launch_performance_updating_thread( let confirmations_per_seconds = performance_counter .confirmations_per_seconds .load(Ordering::Acquire); + + let errors_on_sent = performance_counter.transaction_sent_error.load(Ordering::Acquire); + let total_transactions_per_seconds = performance_counter .transactions_per_seconds .load(Ordering::Acquire); - let runtime = start.elapsed(); if let Some(remaining) = wait_time.checked_sub(runtime) { println!( - "Sent {} transactions and confrimed {} transactions", - total_transactions_per_seconds, confirmations_per_seconds + "Sent {} transactions sucessfully {} with errors, and confrimed {} transactions", + total_transactions_per_seconds, errors_on_sent, confirmations_per_seconds ); thread::sleep(remaining); } diff --git a/src/main.rs b/src/main.rs index 1ab9b1a3..338fe24a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, thread::sleep}; use clap::Parser; use context::LiteRpcSubsrciptionControl; @@ -52,7 +52,7 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: let performance_counter = PerformanceCounter::new(); launch_performance_updating_thread(performance_counter.clone()); - let (broadcast_sender, _broadcast_receiver) = broadcast::channel(128); + let (broadcast_sender, _broadcast_receiver) = broadcast::channel(10000); let (notification_sender, notification_reciever) = crossbeam_channel::unbounded(); let pubsub_control = Arc::new(LiteRpcSubsrciptionControl::new( @@ -71,15 +71,16 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: subscription_port, performance_counter.clone(), ); - { + let broadcast_thread = { + // build broadcasting thread let pubsub_control = pubsub_control.clone(); std::thread::Builder::new() .name("broadcasting thread".to_string()) .spawn(move || { pubsub_control.start_broadcasting(); }) - .unwrap(); - } + .unwrap() + }; let mut io = MetaIoHandler::default(); let lite_rpc = lite_rpc::LightRpc; io.extend_with(lite_rpc.to_delegate()); @@ -90,6 +91,18 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: notification_sender, performance_counter.clone(), ); + let cleaning_thread = { + // build cleaning thread + let context = request_processor.context.clone(); + std::thread::Builder::new() + .name("cleaning thread".to_string()) + .spawn(move || { + context.remove_stale_data(60 * 10); + sleep(std::time::Duration::from_secs(60 * 5)) + }) + .unwrap() + }; + let runtime = Arc::new( tokio::runtime::Builder::new_multi_thread() .worker_threads(1) @@ -111,7 +124,7 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: request_processor.clone() }) .event_loop_executor(runtime.handle().clone()) - .threads(1) + .threads(4) .cors(DomainsValidation::AllowOnly(vec![ AccessControlAllowOrigin::Any, ])) @@ -123,6 +136,8 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: } request_processor.free(); websocket_service.close().unwrap(); + broadcast_thread.join().unwrap(); + cleaning_thread.join().unwrap(); } fn ts_test() { diff --git a/src/rpc.rs b/src/rpc.rs index 8fed9bad..e88304d3 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,4 +1,5 @@ use dashmap::DashMap; +use futures::executor::block_on; use serde::{Deserialize, Serialize}; use solana_client::{ pubsub_client::{BlockSubscription, PubsubClientError}, @@ -9,11 +10,12 @@ use std::{ str::FromStr, sync::Mutex, thread::{Builder, JoinHandle}, + time::Duration, }; use crate::context::{ BlockInformation, LiteRpcContext, NotificationType, PerformanceCounter, SignatureNotification, - SlotNotification, + SignatureStatus, SlotNotification, }; use crossbeam_channel::Sender; use { @@ -22,7 +24,8 @@ use { jsonrpc_core::{Error, Metadata, Result}, jsonrpc_derive::rpc, solana_client::connection_cache::ConnectionCache, - solana_client::{rpc_client::RpcClient, tpu_client::TpuClient}, + solana_client::rpc_client::RpcClient, + solana_client::nonblocking::rpc_client::RpcClient as NonblockingRpcClient, solana_perf::packet::PACKET_DATA_SIZE, solana_rpc_client_api::{ config::*, @@ -33,6 +36,7 @@ use { signature::Signature, transaction::VersionedTransaction, }, + solana_tpu_client::nonblocking::tpu_client::TpuClient, solana_transaction_status::{TransactionBinaryEncoding, UiTransactionEncoding}, std::{ any::type_name, @@ -40,10 +44,11 @@ use { }, }; +const TPU_BATCH_SIZE: usize = 64; + #[derive(Clone)] pub struct LightRpcRequestProcessor { pub rpc_client: Arc, - pub tpu_client: Arc, pub last_valid_block_height: u64, pub ws_url: String, pub context: Arc, @@ -51,6 +56,7 @@ pub struct LightRpcRequestProcessor { joinables: Arc>>>, subscribed_clients: Arc>>, performance_counter: PerformanceCounter, + tpu_producer_channel: Sender>, } impl LightRpcRequestProcessor { @@ -60,15 +66,17 @@ impl LightRpcRequestProcessor { notification_sender: Sender, performance_counter: PerformanceCounter, ) -> LightRpcRequestProcessor { + + let nonblocking_rpc_client = Arc::new(NonblockingRpcClient::new(json_rpc_url.to_string())); let rpc_client = Arc::new(RpcClient::new(json_rpc_url)); let connection_cache = Arc::new(ConnectionCache::default()); let tpu_client = Arc::new( - TpuClient::new_with_connection_cache( - rpc_client.clone(), + block_on(TpuClient::new_with_connection_cache( + nonblocking_rpc_client, websocket_url, - TpuClientConfig::default(), + TpuClientConfig {fanout_slots : 100}, // value for max fanout slots connection_cache.clone(), - ) + )) .unwrap(), ); @@ -82,6 +90,8 @@ impl LightRpcRequestProcessor { let (client_finalized, receiver_finalized) = Self::subscribe_block(websocket_url, CommitmentLevel::Finalized).unwrap(); + let (tpu_producer, tpu_consumer) = crossbeam_channel::bounded(100000); + // create threads to listen for finalized and confrimed blocks let joinables = vec![ Self::build_thread_to_process_blocks( @@ -94,11 +104,11 @@ impl LightRpcRequestProcessor { &context, CommitmentLevel::Finalized, ), + Self::build_thread_to_process_transactions(tpu_client.clone(), tpu_consumer, performance_counter.clone()), ]; LightRpcRequestProcessor { rpc_client, - tpu_client, last_valid_block_height: 0, ws_url: websocket_url.to_string(), context, @@ -106,6 +116,7 @@ impl LightRpcRequestProcessor { joinables: Arc::new(Mutex::new(joinables)), subscribed_clients: Arc::new(Mutex::new(vec![client_confirmed, client_finalized])), performance_counter, + tpu_producer_channel: tpu_producer, } } @@ -153,9 +164,55 @@ impl LightRpcRequestProcessor { .unwrap() } + fn build_thread_to_process_transactions( + tpu_client: Arc, + receiver: Receiver>, + performance_counters : PerformanceCounter, + ) -> JoinHandle<()> { + Builder::new() + .name("tpu sender".to_string()) + .spawn(move || loop { + let recv_res = receiver.recv(); + match recv_res { + Ok(transaction) => { + let mut transactions_vec = vec![transaction]; + let mut time_remaining = Duration::from_micros(50000); + for _i in 1..TPU_BATCH_SIZE { + let start = std::time::Instant::now(); + let another = receiver.recv_timeout(time_remaining); + + match another { + Ok(x) => transactions_vec.push(x), + Err(_) => break, + } + match time_remaining.checked_sub(start.elapsed()) { + Some(x) => time_remaining = x, + None => break, + } + } + let count:u64 = transactions_vec.len() as u64; + let fut_res = block_on(tpu_client.try_send_wire_transaction_batch(transactions_vec)); + match fut_res + { + Ok(_) => performance_counters.total_confirmations.fetch_add( count, Ordering::Relaxed), + Err(e) => { + println!("Got error while sending transaction batch of size {}, error {}", count, e.to_string()); + performance_counters.transaction_sent_error.fetch_add(count, Ordering::Relaxed) + } + }; + } + Err(e) => { + println!("got error on tpu channel {}", e.to_string()); + break; + } + }; + }) + .unwrap() + } + fn process_block( reciever: Receiver>, - signature_status: &DashMap>, + signature_status: &DashMap, commitment: CommitmentLevel, notification_sender: &crossbeam_channel::Sender, block_information: &BlockInformation, @@ -210,7 +267,7 @@ impl LightRpcRequestProcessor { e.to_string() ); } - x.insert(Some(commitment)); + x.insert(SignatureStatus::new_from_commitment(commitment)); } dashmap::mapref::entry::Entry::Vacant(_x) => { // do nothing transaction not sent by lite rpc @@ -355,12 +412,18 @@ pub mod lite_rpc { let (wire_transaction, transaction) = decode_and_deserialize::(data, binary_encoding)?; - meta.context - .signature_status - .insert(transaction.signatures[0].to_string(), None); - meta.tpu_client.send_wire_transaction(wire_transaction); - meta.performance_counter.update_sent_transactions_counter(); - Ok(transaction.signatures[0].to_string()) + meta.context.signature_status.insert( + transaction.signatures[0].to_string(), + SignatureStatus::new(), + ); + + match meta.tpu_producer_channel.send(wire_transaction) { + Ok(_) => Ok(transaction.signatures[0].to_string()), + Err(e) => { + println!("got error while sending on channel {}", e.to_string()); + Err(jsonrpc_core::Error::new(jsonrpc_core::ErrorCode::InternalError)) + } + } } fn get_recent_blockhash( @@ -462,7 +525,7 @@ pub mod lite_rpc { .update_confirm_transaction_counter(); match k_value { - Some(value) => match *value { + Some(value) => match (*value).status { Some(commitment_for_signature) => Ok(RpcResponse { context: RpcResponseContext::new(slot), value: if commitment.eq(&CommitmentLevel::Finalized) { @@ -513,7 +576,7 @@ pub mod lite_rpc { let singature_status = meta.context.signature_status.get(x); let k_value = singature_status; match k_value { - Some(value) => match *value { + Some(value) => match (*value).status { Some(commitment_for_signature) => { let slot = meta .context