fine tuning send_transactions, updating performance counter

This commit is contained in:
godmodegalactus 2023-01-05 11:16:37 +01:00
parent 17e2d3f6f6
commit c9c358afbe
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
3 changed files with 111 additions and 144 deletions

View File

@ -36,6 +36,7 @@ export async function main() {
const users = InFile.users.map(x => Keypair.fromSecretKey(Uint8Array.from(x.secretKey)));
const userAccounts = InFile.tokenAccounts.map(x => new PublicKey(x));
let promises_to_unpack : Promise<TransactionSignature>[][] = [];
let time_taken_to_send = [];
for (let i = 0; i<forSeconds; ++i)
{
@ -54,7 +55,7 @@ export async function main() {
const userTo = userAccounts[toIndex];
if(skip_confirmations === false) {
const transaction = new Transaction().add(
splToken.createTransferInstruction(userFrom, userTo, users[fromIndex].publicKey, 100)
splToken.createTransferInstruction(userFrom, userTo, users[fromIndex].publicKey, Math.ceil(Math.random() * 100))
);
transaction.recentBlockhash = blockhash;
transaction.feePayer = authority.publicKey;
@ -67,7 +68,8 @@ export async function main() {
}
const end = performance.now();
const diff = (end - start);
if (diff > 0) {
time_taken_to_send[i] = diff;
if (diff > 0 && diff < 1000) {
await sleep(1000 - diff)
}
}
@ -81,19 +83,20 @@ export async function main() {
for (let i=0; i< size; ++i)
{
const promises = promises_to_unpack[i];
const signatures = await Promise.all(promises);
let statuses = await connection.getSignatureStatuses(signatures, {searchTransactionHistory: false})
for (const status of statuses.value) {
if(status != null && status.confirmationStatus && status.err == null) {
successes[i] += 1;
}
else {
failures[i] += 1;
for (const promise of promises) {
const signature = await promise;
const confirmed = await connection.getSignatureStatus(signature);
if (confirmed != null && confirmed.value != null && confirmed.value.err == null) {
successes[i]++;
} else {
failures[i]++;
}
}
}
console.log("sucesses " + successes)
console.log("failures " + failures)
console.log("sucesses : " + successes)
console.log("failures : " + failures)
console.log("time taken to send : " + time_taken_to_send)
}
}

View File

@ -1,7 +1,5 @@
use chrono::DateTime;
use crossbeam_channel::Sender;
use dashmap::DashMap;
use libc::file_clone_range;
use serde::Serialize;
use solana_client::{
rpc_client::RpcClient,
@ -21,7 +19,7 @@ use std::{
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant, SystemTime},
time::{Duration, Instant},
};
use tokio::sync::broadcast;
@ -277,16 +275,21 @@ pub struct PerformanceCounter {
pub total_confirmations: Arc<AtomicU64>,
pub total_transactions_sent: Arc<AtomicU64>,
pub transaction_sent_error: Arc<AtomicU64>,
pub finalized_per_seconds: Arc<AtomicU64>,
pub confirmations_per_seconds: Arc<AtomicU64>,
pub transactions_per_seconds: Arc<AtomicU64>,
pub send_transactions_errors_per_seconds: Arc<AtomicU64>,
pub total_transactions_recieved: Arc<AtomicU64>,
last_count_for_finalized: Arc<AtomicU64>,
last_count_for_confirmations: Arc<AtomicU64>,
last_count_for_transactions_sent: Arc<AtomicU64>,
last_count_for_sent_errors: Arc<AtomicU64>,
last_count_for_transactions_recieved: Arc<AtomicU64>,
}
pub struct PerformancePerSec {
pub finalized_per_seconds: u64,
pub confirmations_per_seconds: u64,
pub transactions_per_seconds: u64,
pub send_transactions_errors_per_seconds: u64,
pub transaction_recieved_per_second: u64,
}
impl PerformanceCounter {
@ -295,54 +298,52 @@ impl PerformanceCounter {
total_finalized: Arc::new(AtomicU64::new(0)),
total_confirmations: Arc::new(AtomicU64::new(0)),
total_transactions_sent: Arc::new(AtomicU64::new(0)),
confirmations_per_seconds: Arc::new(AtomicU64::new(0)),
transactions_per_seconds: Arc::new(AtomicU64::new(0)),
finalized_per_seconds: Arc::new(AtomicU64::new(0)),
total_transactions_recieved: Arc::new(AtomicU64::new(0)),
transaction_sent_error: Arc::new(AtomicU64::new(0)),
last_count_for_finalized: Arc::new(AtomicU64::new(0)),
last_count_for_confirmations: Arc::new(AtomicU64::new(0)),
last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)),
transaction_sent_error: Arc::new(AtomicU64::new(0)),
last_count_for_transactions_recieved: Arc::new(AtomicU64::new(0)),
last_count_for_sent_errors: Arc::new(AtomicU64::new(0)),
send_transactions_errors_per_seconds: Arc::new(AtomicU64::new(0)),
}
}
pub fn update_per_seconds_transactions(&self) {
pub fn update_per_seconds_transactions(&self) -> PerformancePerSec {
let total_finalized: u64 = self.total_finalized.load(Ordering::Relaxed);
let total_confirmations: u64 = self.total_confirmations.load(Ordering::Relaxed);
let total_transactions: u64 = self.total_transactions_sent.load(Ordering::Relaxed);
let total_errors: u64 = self.transaction_sent_error.load(Ordering::Relaxed);
let total_transactions_recieved: u64 =
self.total_transactions_recieved.load(Ordering::Relaxed);
self.finalized_per_seconds.store(
total_finalized - self.last_count_for_finalized.load(Ordering::Relaxed),
Ordering::Release,
);
self.confirmations_per_seconds.store(
total_confirmations - self.last_count_for_confirmations.load(Ordering::Relaxed),
Ordering::Release,
);
self.transactions_per_seconds.store(
total_transactions
- self
.last_count_for_transactions_sent
.load(Ordering::Relaxed),
Ordering::Release,
);
self.send_transactions_errors_per_seconds.store(
total_errors - self.last_count_for_sent_errors.load(Ordering::Relaxed),
Ordering::Release,
);
let finalized_per_seconds = total_finalized
- self
.last_count_for_finalized
.swap(total_finalized, Ordering::Relaxed);
let confirmations_per_seconds = total_confirmations
- self
.last_count_for_confirmations
.swap(total_confirmations, Ordering::Relaxed);
let transactions_per_seconds = total_transactions
- self
.last_count_for_transactions_sent
.swap(total_transactions, Ordering::Relaxed);
let send_transactions_errors_per_seconds = total_errors
- self
.last_count_for_sent_errors
.swap(total_errors, Ordering::Relaxed);
let transaction_recieved_per_second = total_transactions_recieved
- self
.last_count_for_transactions_recieved
.swap(total_transactions_recieved, Ordering::Relaxed);
self.last_count_for_finalized
.store(total_finalized, Ordering::Relaxed);
self.last_count_for_confirmations
.store(total_confirmations, Ordering::Relaxed);
self.last_count_for_transactions_sent
.store(total_transactions, Ordering::Relaxed);
self.last_count_for_sent_errors
.store(total_errors, Ordering::Relaxed);
PerformancePerSec {
confirmations_per_seconds,
finalized_per_seconds,
send_transactions_errors_per_seconds,
transaction_recieved_per_second,
transactions_per_seconds,
}
}
}
@ -358,17 +359,10 @@ pub fn launch_performance_updating_thread(
let wait_time = Duration::from_millis(1000);
let performance_counter = performance_counter.clone();
performance_counter.update_per_seconds_transactions();
let confirmations_per_seconds = performance_counter
.confirmations_per_seconds
.load(Ordering::Acquire);
let total_transactions_per_seconds = performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
let finalized_per_second = performance_counter.finalized_per_seconds.load(Ordering::Acquire);
let data = performance_counter.update_per_seconds_transactions();
println!(
"At {} second, Sent {} transactions, finalized {} and confirmed {} transactions",
nb_seconds, total_transactions_per_seconds, finalized_per_second, confirmations_per_seconds
"At {} second, Recieved {}, Sent {} transactions, finalized {} and confirmed {} transactions",
nb_seconds, data.transaction_recieved_per_second, data.transactions_per_seconds, data.finalized_per_seconds, data.confirmations_per_seconds
);
let runtime = start.elapsed();
nb_seconds += 1;

View File

@ -65,10 +65,9 @@ impl LightRpcRequestProcessor {
performance_counter: PerformanceCounter,
) -> LightRpcRequestProcessor {
let rpc_client = Arc::new(RpcClient::new(json_rpc_url));
let connection_cache = Arc::new(ConnectionCache::default());
let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender));
let connection_cache = Arc::new(ConnectionCache::default());
println!("ws_url {}", websocket_url);
// subscribe for confirmed_blocks
let (client_confirmed, receiver_confirmed) =
@ -97,24 +96,10 @@ impl LightRpcRequestProcessor {
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
connection_cache.clone(),
&context,
tpu_consumer.clone(),
performance_counter.clone(),
),
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
connection_cache.clone(),
tpu_consumer.clone(),
performance_counter.clone(),
),
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
connection_cache.clone(),
tpu_consumer,
performance_counter.clone(),
),
];
LightRpcRequestProcessor {
@ -177,29 +162,33 @@ impl LightRpcRequestProcessor {
fn build_thread_to_process_transactions(
json_rpc_url: String,
websocket_url: String,
connection_cache: Arc<ConnectionCache>,
context: &Arc<LiteRpcContext>,
receiver: Receiver<Transaction>,
performance_counters: PerformanceCounter,
) -> JoinHandle<()> {
let context = context.clone();
Builder::new()
.name("thread working on confirmation block".to_string())
.spawn(move || {
let rpc_client =
Arc::new(RpcClient::new(json_rpc_url.to_string()));
let mut connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = TpuClient::new_with_connection_cache(
rpc_client,
rpc_client.clone(),
websocket_url.as_str(),
TpuClientConfig::default(), // value for max fanout slots
connection_cache.clone(),
);
let tpu_client = Arc::new(tpu_client.unwrap());
let mut tpu_client = Arc::new(tpu_client.unwrap());
let mut consecutive_errors: u8 = 0;
loop {
let recv_res = receiver.recv();
match recv_res {
Ok(transaction) => {
let mut transactions_vec = vec![transaction];
let mut time_remaining = Duration::from_micros(200);
let mut time_remaining = Duration::from_micros(1000);
for _i in 1..TPU_BATCH_SIZE {
let start = std::time::Instant::now();
let another = receiver.recv_timeout(time_remaining);
@ -216,15 +205,46 @@ impl LightRpcRequestProcessor {
let count: u64 = transactions_vec.len() as u64;
let slice = transactions_vec.as_slice();
let fut_res = tpu_client.try_send_transaction_batch(slice);
// insert sent transactions into signature status map
transactions_vec.iter().for_each(|x| {
let signature = x.signatures[0].to_string();
context.signature_status.insert(
signature.clone(),
SignatureStatus {
status: None,
error: None,
created: Instant::now(),
},
);
});
match fut_res {
Ok(_) => performance_counters
Ok(_) => {
consecutive_errors = 0;
performance_counters
.total_transactions_sent
.fetch_add(count, Ordering::Relaxed),
.fetch_add(count, Ordering::Relaxed);
},
Err(e) => {
println!("Got error while sending transaction batch of size {}, error {}", count, e.to_string());
consecutive_errors += 1;
if consecutive_errors > 3 {
connection_cache = Arc::new(ConnectionCache::default());
let new_tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(),
websocket_url.as_str(),
TpuClientConfig::default(), // value for max fanout slots
connection_cache.clone(),
);
// reset TPU connection
tpu_client = Arc::new(new_tpu_client.unwrap());
}
performance_counters
.transaction_sent_error
.fetch_add(count, Ordering::Relaxed)
.fetch_add(count, Ordering::Relaxed);
}
};
}
@ -418,12 +438,6 @@ pub mod lite_rpc {
config: Option<RpcRequestAirdropConfig>,
) -> Result<String>;
#[rpc(meta, name = "getPerformanceCounters")]
fn get_performance_counters(
&self,
meta: Self::Metadata,
) -> Result<RpcPerformanceCounterResults>;
#[rpc(meta, name = "getLatestBlockhash")]
fn get_latest_blockhash(
&self,
@ -462,17 +476,11 @@ pub mod lite_rpc {
tx_encoding
))
})?;
let (_wire_transaction, transaction) =
decode_and_deserialize::<Transaction>(data, binary_encoding)?;
let transaction = decode_and_deserialize::<Transaction>(data, binary_encoding)?;
let signature = transaction.signatures[0].to_string();
meta.context.signature_status.insert(
signature.clone(),
SignatureStatus {
status: None,
error: None,
created: Instant::now(),
},
);
meta.performance_counter
.total_transactions_recieved
.fetch_add(1, Ordering::Relaxed);
match meta.tpu_producer_channel.send(transaction) {
Ok(_) => Ok(signature),
@ -694,49 +702,12 @@ pub mod lite_rpc {
feature_set: Some(version.feature_set),
})
}
fn get_performance_counters(
&self,
meta: Self::Metadata,
) -> Result<RpcPerformanceCounterResults> {
let total_transactions_count = meta
.performance_counter
.total_transactions_sent
.load(Ordering::Relaxed);
let total_confirmations_count = meta
.performance_counter
.total_confirmations
.load(Ordering::Relaxed);
let transactions_per_seconds = meta
.performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
let confirmations_per_seconds = meta
.performance_counter
.confirmations_per_seconds
.load(Ordering::Acquire);
let procinfo::pid::Statm { size, .. } = procinfo::pid::statm_self().unwrap();
let procinfo::pid::Stat { num_threads, .. } = procinfo::pid::stat_self().unwrap();
Ok(RpcPerformanceCounterResults {
confirmations_per_seconds,
transactions_per_seconds,
total_confirmations_count,
total_transactions_count,
memory_used: size as u64,
nb_threads: num_threads as u64,
})
}
}
}
const MAX_BASE58_SIZE: usize = 1683; // Golden, bump if PACKET_DATA_SIZE changes
const MAX_BASE64_SIZE: usize = 1644; // Golden, bump if PACKET_DATA_SIZE changes
fn decode_and_deserialize<T>(
encoded: String,
encoding: TransactionBinaryEncoding,
) -> Result<(Vec<u8>, T)>
fn decode_and_deserialize<T>(encoded: String, encoding: TransactionBinaryEncoding) -> Result<T>
where
T: serde::de::DeserializeOwned,
{
@ -789,5 +760,4 @@ where
&err.to_string()
))
})
.map(|output| (wire_output, output))
}