Creating multiple threads to send tpu batches

This commit is contained in:
Godmode Galactus 2022-12-12 09:15:46 +01:00
parent 6d14edd617
commit 6a70cf5548
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
4 changed files with 47 additions and 19 deletions

5
Cargo.lock generated
View File

@ -623,9 +623,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.22"
version = "0.4.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1"
checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f"
dependencies = [
"iana-time-zone",
"js-sys",
@ -2301,6 +2301,7 @@ dependencies = [
"base64 0.13.1",
"bincode",
"bs58",
"chrono",
"clap 4.0.29",
"crossbeam-channel",
"csv",

View File

@ -62,6 +62,7 @@ spl-token-2022 = { version = "=0.4.3", features = ["no-entrypoint"] }
stream-cancel = "0.8.1"
thiserror = "1.0"
tokio-util = { version = "0.6", features = ["codec", "compat"] }
chrono = "0.4.23"
[dev-dependencies]
csv = "1.1.6"

View File

@ -1,3 +1,4 @@
use chrono::DateTime;
use crossbeam_channel::Sender;
use dashmap::DashMap;
use serde::Serialize;
@ -18,7 +19,7 @@ use std::{
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
time::{Duration, Instant, SystemTime},
};
use tokio::sync::broadcast;
@ -284,14 +285,15 @@ impl LiteRpcSubsrciptionControl {
pub struct PerformanceCounter {
pub total_confirmations: Arc<AtomicU64>,
pub total_transactions_sent: Arc<AtomicU64>,
pub transaction_sent_error: Arc<AtomicU64>,
pub confirmations_per_seconds: Arc<AtomicU64>,
pub transactions_per_seconds: Arc<AtomicU64>,
pub transaction_sent_error: Arc<AtomicU64>,
pub send_transactions_errors_per_seconds: Arc<AtomicU64>,
last_count_for_confirmations: Arc<AtomicU64>,
last_count_for_transactions_sent: Arc<AtomicU64>,
last_count_for_sent_errors: Arc<AtomicU64>,
}
impl PerformanceCounter {
@ -304,6 +306,8 @@ impl PerformanceCounter {
last_count_for_confirmations: Arc::new(AtomicU64::new(0)),
last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)),
transaction_sent_error: Arc::new(AtomicU64::new(0)),
last_count_for_sent_errors: Arc::new(AtomicU64::new(0)),
send_transactions_errors_per_seconds: Arc::new(AtomicU64::new(0)),
}
}
@ -312,6 +316,8 @@ impl PerformanceCounter {
let total_transactions: u64 = self.total_transactions_sent.load(Ordering::Relaxed);
let total_errors: u64 = self.transaction_sent_error.load(Ordering::Relaxed);
self.confirmations_per_seconds.store(
total_confirmations - self.last_count_for_confirmations.load(Ordering::Relaxed),
Ordering::Release,
@ -323,11 +329,17 @@ impl PerformanceCounter {
.load(Ordering::Relaxed),
Ordering::Release,
);
self.send_transactions_errors_per_seconds.store(
total_errors - self.last_count_for_sent_errors.load(Ordering::Relaxed),
Ordering::Release,
);
self.last_count_for_confirmations
.store(total_confirmations, Ordering::Relaxed);
self.last_count_for_transactions_sent
.store(total_transactions, Ordering::Relaxed);
self.last_count_for_sent_errors
.store(total_errors, Ordering::Relaxed);
}
pub fn update_confirm_transaction_counter(&self) {
@ -350,16 +362,17 @@ pub fn launch_performance_updating_thread(
.confirmations_per_seconds
.load(Ordering::Acquire);
let errors_on_sent = performance_counter.transaction_sent_error.load(Ordering::Acquire);
let errors_on_sent = performance_counter.send_transactions_errors_per_seconds.load(Ordering::Acquire);
let total_transactions_per_seconds = performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
let runtime = start.elapsed();
if let Some(remaining) = wait_time.checked_sub(runtime) {
let datetime: DateTime<chrono::Local> = SystemTime::now().into();
println!(
"Sent {} transactions sucessfully {} with errors, and confrimed {} transactions",
total_transactions_per_seconds, errors_on_sent, confirmations_per_seconds
"{} : sent {} transactions sucessfully {} with errors, and confrimed {} transactions",
datetime.format("%H:%M:%S"), total_transactions_per_seconds, errors_on_sent, confirmations_per_seconds
);
thread::sleep(remaining);
}

View File

@ -91,16 +91,29 @@ impl LightRpcRequestProcessor {
&context,
CommitmentLevel::Finalized,
),
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
connection_cache.clone(),
tpu_consumer.clone(),
performance_counter.clone(),
),
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
connection_cache.clone(),
tpu_consumer.clone(),
performance_counter.clone(),
),
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
connection_cache.clone(),
tpu_consumer,
performance_counter.clone(),
),
];
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
connection_cache.clone(),
tpu_consumer,
performance_counter.clone(),
);
LightRpcRequestProcessor {
rpc_client,
last_valid_block_height: 0,
@ -164,7 +177,7 @@ impl LightRpcRequestProcessor {
connection_cache: Arc<ConnectionCache>,
receiver: Receiver<Transaction>,
performance_counters: PerformanceCounter,
) {
) -> JoinHandle<()> {
Builder::new()
.name("thread working on confirmation block".to_string())
.spawn(move || {
@ -184,7 +197,7 @@ impl LightRpcRequestProcessor {
match recv_res {
Ok(transaction) => {
let mut transactions_vec = vec![transaction];
let mut time_remaining = Duration::from_micros(50000);
let mut time_remaining = Duration::from_micros(5000);
for _i in 1..TPU_BATCH_SIZE {
let start = std::time::Instant::now();
let another = receiver.recv_timeout(time_remaining);
@ -219,7 +232,7 @@ impl LightRpcRequestProcessor {
}
};
}
}).unwrap();
}).unwrap()
}
fn process_block(