processing transaction errors, updating benchmark files

This commit is contained in:
godmodegalactus 2023-01-04 14:33:36 +01:00
parent 5aa0dbef33
commit 169bd4cddb
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
9 changed files with 2099 additions and 2024 deletions

104
bench_transactions_send.ts Normal file
View File

@ -0,0 +1,104 @@
import { Connection, Keypair, LAMPORTS_PER_SOL, PublicKey, TransactionSignature, Transaction } from '@solana/web3.js';
import * as fs from 'fs';
import * as splToken from "@solana/spl-token";
import * as os from 'os';
// number of users
const tps : number = +process.argv[2];
const forSeconds : number = +process.argv[3];
// url
const url = process.argv.length > 4 ? process.argv[4] : "http://localhost:8899";
const skip_confirmations = process.argv.length > 5 ? process.argv[5] === "true": false;
import * as InFile from "./out.json";
function sleep(ms: number) {
return new Promise( resolve => setTimeout(resolve, ms) );
}
console.log("benching " + tps + " transactions per second on " + url + " for " + forSeconds + " seconds");
function delay(ms: number) {
return new Promise( resolve => setTimeout(resolve, ms) );
}
export async function main() {
const connection = new Connection(url, 'confirmed');
const authority = Keypair.fromSecretKey(
Uint8Array.from(
JSON.parse(
process.env.KEYPAIR ||
fs.readFileSync(os.homedir() + '/.config/solana/id.json', 'utf-8'),
),
),
);
const users = InFile.users.map(x => Keypair.fromSecretKey(Uint8Array.from(x.secretKey)));
const userAccounts = InFile.tokenAccounts.map(x => new PublicKey(x));
let promises_to_unpack : Promise<TransactionSignature>[][] = [];
for (let i = 0; i<forSeconds; ++i)
{
const start = performance.now();
let promises : Promise<TransactionSignature>[] = [];
const blockhash = (await connection.getLatestBlockhash()).blockhash;
for (let j=0; j<tps; ++j)
{
const toIndex = Math.floor(Math.random() * users.length);
let fromIndex = toIndex;
while (fromIndex === toIndex)
{
fromIndex = Math.floor(Math.random() * users.length);
}
const userFrom = userAccounts[fromIndex];
const userTo = userAccounts[toIndex];
if(skip_confirmations === false) {
const transaction = new Transaction().add(
splToken.createTransferInstruction(userFrom, userTo, users[fromIndex].publicKey, 100)
);
transaction.recentBlockhash = blockhash;
transaction.feePayer = authority.publicKey;
promises.push(connection.sendTransaction(transaction, [authority, users[fromIndex]], {skipPreflight: true}))
}
}
if (skip_confirmations === false)
{
promises_to_unpack.push(promises)
}
const end = performance.now();
const diff = (end - start);
if (diff > 0) {
await sleep(1000 - diff)
}
}
console.log('finish sending transactions');
await delay(5000)
console.log('checking for confirmations');
if(skip_confirmations === false) {
const size = promises_to_unpack.length
let successes : Uint32Array = new Uint32Array(size).fill(0);
let failures : Uint32Array = new Uint32Array(size).fill(0);
for (let i=0; i< size; ++i)
{
const promises = promises_to_unpack[i];
const signatures = await Promise.all(promises);
let statuses = await connection.getSignatureStatuses(signatures, {searchTransactionHistory: false})
for (const status of statuses.value) {
if(status != null && status.confirmationStatus && status.err == null) {
successes[i] += 1;
}
else {
failures[i] += 1;
}
}
}
console.log("sucesses " + successes)
console.log("failures " + failures)
}
}
main().then(x => {
console.log('finished sucessfully')
}).catch(e => {
console.log('caught an error : ' + e)
})

View File

@ -6,7 +6,7 @@ import * as os from 'os';
// number of users
const nbUsers = +process.argv[2];
// url
const url = process.argv.length > 3 ? process.argv[3] : "http://localhost:8899";
const url = process.argv.length > 3 ? process.argv[3] : "http://0.0.0.0:8899";
// outfile
const outFile = process.argv.length > 4 ? process.argv[4] : "out.json";

View File

@ -1,6 +1,5 @@
use clap::Subcommand;
use clap::Parser;
use solana_cli_config::ConfigInput;
/// Holds the configuration for a single run of the benchmark
#[derive(Parser, Debug)]
@ -13,9 +12,6 @@ use clap::Parser;
"
)]
pub struct Args {
#[clap(subcommand)]
pub command: Command,
/*
#[arg(short, long, default_value_t = String::from("8899"))]
pub port: String,
#[arg(short, long, default_value_t = String::from("8900"))]
@ -24,13 +20,10 @@ pub struct Args {
pub rpc_url: String,
#[arg(short, long, default_value_t = String::new())]
pub websocket_url: String,
*/
}
/*
impl Args {
pub fn resolve_address(&mut self) {
if self.rpc_url.is_empty() {
let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting(
self.rpc_url.as_str(),
@ -48,20 +41,4 @@ impl Args {
self.websocket_url = ws_url;
}
}
}
*/
#[derive(Subcommand, Debug)]
pub enum Command {
Run {
#[arg(short, long, default_value_t = String::from("8899"))]
port: String,
#[arg(short, long, default_value_t = String::from("8900"))]
subscription_port: String,
#[arg(short, long, default_value_t = String::from("http://localhost:8899"))]
rpc_url: String,
#[arg(short, long, default_value_t = String::new())]
websocket_url: String,
},
Test,
}

View File

@ -206,8 +206,14 @@ impl LiteRpcSubsrciptionControl {
},
};
let json = serde_json::to_string(&notification).unwrap();
let subscription_id = *x.get();
// no more notification for this signature has been finalized
if data.commitment.eq(&CommitmentLevel::Finalized) {
x.remove();
}
Some(LiteRpcNotification {
subscription_id: *x.get(),
subscription_id,
created_at: Instant::now(),
is_final: false,
json,
@ -319,26 +325,29 @@ pub fn launch_performance_updating_thread(
) -> JoinHandle<()> {
Builder::new()
.name("Performance Counter".to_string())
.spawn(move || loop {
let start = Instant::now();
.spawn(move || {
let mut nb_seconds: u64 = 0;
loop {
let start = Instant::now();
let wait_time = Duration::from_millis(1000);
let performance_counter = performance_counter.clone();
performance_counter.update_per_seconds_transactions();
let confirmations_per_seconds = performance_counter
.confirmations_per_seconds
.load(Ordering::Acquire);
let total_transactions_per_seconds = performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
let runtime = start.elapsed();
if let Some(remaining) = wait_time.checked_sub(runtime) {
let wait_time = Duration::from_millis(1000);
let performance_counter = performance_counter.clone();
performance_counter.update_per_seconds_transactions();
let confirmations_per_seconds = performance_counter
.confirmations_per_seconds
.load(Ordering::Acquire);
let total_transactions_per_seconds = performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
println!(
"Sent {} transactions and confrimed {} transactions",
total_transactions_per_seconds, confirmations_per_seconds
"At {} second, Sent {} transactions and confrimed {} transactions",
nb_seconds, total_transactions_per_seconds, confirmations_per_seconds
);
thread::sleep(remaining);
let runtime = start.elapsed();
nb_seconds += 1;
if let Some(remaining) = wait_time.checked_sub(runtime) {
thread::sleep(remaining);
}
}
})
.unwrap()

View File

@ -122,25 +122,15 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
websocket_service.close().unwrap();
}
fn ts_test() {
let res = std::process::Command::new("yarn")
.args(["run", "test:test-validator"])
.output()
.unwrap();
println!("{}", String::from_utf8_lossy(&res.stdout));
println!("{}", String::from_utf8_lossy(&res.stderr));
}
pub fn main() {
let cli_command = Args::parse();
let mut cli_command = Args::parse();
cli_command.resolve_address();
match cli_command.command {
cli::Command::Run {
port,
subscription_port,
rpc_url,
websocket_url,
} => run(port, subscription_port, rpc_url, websocket_url),
cli::Command::Test => ts_test(),
}
let Args {
port,
subscription_port,
rpc_url,
websocket_url,
} = cli_command;
run(port, subscription_port, rpc_url, websocket_url)
}

View File

@ -2,7 +2,7 @@ use dashmap::DashMap;
use jsonrpc_core::{ErrorCode, IoHandler};
use soketto::handshake::{server, Server};
use solana_rpc::rpc_subscription_tracker::{SignatureSubscriptionParams, SubscriptionParams};
use std::{net::SocketAddr, str::FromStr, thread::JoinHandle};
use std::{net::SocketAddr, str::FromStr, thread::JoinHandle, time::Instant};
use stream_cancel::{Trigger, Tripwire};
use tokio::{net::TcpStream, pin, select};
use tokio_util::compat::TokioAsyncReadCompatExt;
@ -41,16 +41,22 @@ pub trait LiteRpcPubSub {
fn slot_unsubscribe(&self, id: SubscriptionId) -> Result<bool>;
}
#[derive(Clone)]
pub struct SubscriptionParamsWithTime {
params: SubscriptionParams,
time: Instant,
}
#[derive(Clone)]
pub struct LiteRpcPubSubImpl {
subscription_control: Arc<LiteRpcSubsrciptionControl>,
pub current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionParams>>,
pub current_subscriptions: DashMap<SubscriptionId, SubscriptionParamsWithTime>,
}
impl LiteRpcPubSubImpl {
pub fn new(subscription_control: Arc<LiteRpcSubsrciptionControl>) -> Self {
Self {
current_subscriptions: Arc::new(DashMap::new()),
current_subscriptions: DashMap::new(),
subscription_control,
}
}
@ -69,8 +75,13 @@ impl LiteRpcPubSubImpl {
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let new_subsription_id = SubscriptionId::from(new_subscription_id);
x.insert(new_subsription_id);
self.current_subscriptions
.insert(new_subsription_id, params);
self.current_subscriptions.insert(
new_subsription_id,
SubscriptionParamsWithTime {
params,
time: Instant::now(),
},
);
Ok(new_subsription_id)
}
}
@ -116,8 +127,13 @@ impl LiteRpcPubSub for LiteRpcPubSubImpl {
// Get notification when slot is encountered
fn slot_subscribe(&self) -> Result<SubscriptionId> {
self.current_subscriptions
.insert(SubscriptionId::from(0), SubscriptionParams::Slot);
self.current_subscriptions.insert(
SubscriptionId::from(0),
SubscriptionParamsWithTime {
params: SubscriptionParams::Slot,
time: Instant::now(),
},
);
Ok(SubscriptionId::from(0))
}
@ -235,7 +251,7 @@ impl LitePubSubService {
.name("solRpcPubSub".to_string())
.spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(512)
.worker_threads(1)
.enable_all()
.build()
.expect("runtime creation failed");

View File

@ -119,9 +119,7 @@ impl LightRpcRequestProcessor {
Some(RpcBlockSubscribeConfig {
commitment: Some(CommitmentConfig { commitment }),
encoding: None,
transaction_details: Some(
solana_transaction_status::TransactionDetails::Signatures,
),
transaction_details: Some(solana_transaction_status::TransactionDetails::Full),
show_rewards: None,
max_supported_transaction_version: None,
}),
@ -162,7 +160,6 @@ impl LightRpcRequestProcessor {
) {
loop {
let block_data = reciever.recv();
match block_data {
Ok(data) => {
let block_update = &data.value;
@ -191,25 +188,22 @@ impl LightRpcRequestProcessor {
*lock = block.blockhash.clone();
}
if let Some(signatures) = &block.signatures {
for (index, signature) in signatures.iter().enumerate() {
if let Some(transactions) = &block.transactions {
for transaction in transactions {
let decoded_transaction =
&transaction.transaction.decode().unwrap();
let signature = decoded_transaction.signatures[0].to_string();
match signature_status.entry(signature.clone()) {
dashmap::mapref::entry::Entry::Occupied(mut x) => {
// get signature status
let transaction_error = match &block.transactions {
Some(transactions) => match &transactions[index].meta {
Some(meta) => meta.err.clone(),
None => {
println!("error while getting transaction status, meta null");
None
}
},
let transaction_error = match &transaction.meta {
Some(x) => x.err.clone(),
None => {
println!("Error while getting transaction status, transactions null");
println!("cannot decode transaction error");
None
}
};
let signature_notification = SignatureNotification {
signature: Signature::from_str(signature.as_str())
.unwrap(),

3859
yarn.lock

File diff suppressed because it is too large Load Diff