Merge pull request #32 from blockworks-foundation/processing_transaction_errors

Processing transaction errors
This commit is contained in:
galactus 2023-01-06 17:51:01 +01:00 committed by GitHub
commit 9dbb268be2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 2799 additions and 2609 deletions

718
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -26,31 +26,18 @@ simplelog = "0.12.0"
[dependencies]
solana-client = { git = "https://github.com/solana-labs/solana.git" }
solana-sdk = { git = "https://github.com/solana-labs/solana.git" }
solana-clap-utils = { git = "https://github.com/solana-labs/solana.git" }
solana-cli-config = { git = "https://github.com/solana-labs/solana.git" }
solana-pubsub-client = { git = "https://github.com/solana-labs/solana.git" }
solana-account-decoder = { git = "https://github.com/solana-labs/solana.git" }
solana-entry = { git = "https://github.com/solana-labs/solana.git" }
solana-faucet = { git = "https://github.com/solana-labs/solana.git" }
solana-gossip = { git = "https://github.com/solana-labs/solana.git" }
solana-ledger = { git = "https://github.com/solana-labs/solana.git" }
solana-measure = { git = "https://github.com/solana-labs/solana.git" }
solana-metrics = { git = "https://github.com/solana-labs/solana.git" }
solana-perf = { git = "https://github.com/solana-labs/solana.git" }
solana-poh = { git = "https://github.com/solana-labs/solana.git" }
solana-rayon-threadlimit = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc-client-api = { git = "https://github.com/solana-labs/solana.git" }
solana-runtime = { git = "https://github.com/solana-labs/solana.git" }
solana-send-transaction-service = { git = "https://github.com/solana-labs/solana.git" }
solana-stake-program = { git = "https://github.com/solana-labs/solana.git" }
solana-storage-bigtable = { git = "https://github.com/solana-labs/solana.git" }
solana-streamer = { git = "https://github.com/solana-labs/solana.git" }
solana-tpu-client = { git = "https://github.com/solana-labs/solana.git" }
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git" }
solana-version = { git = "https://github.com/solana-labs/solana.git" }
solana-vote-program = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc = { git = "https://github.com/solana-labs/solana.git" }
solana-sdk = { git = "https://github.com/solana-labs/solana.git" }
solana-clap-utils = { git = "https://github.com/solana-labs/solana.git" }
solana-cli-config = { git = "https://github.com/solana-labs/solana.git" }
solana-pubsub-client = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc-client-api = { git = "https://github.com/solana-labs/solana.git" }
solana-runtime = { git = "https://github.com/solana-labs/solana.git" }
solana-send-transaction-service = { git = "https://github.com/solana-labs/solana.git" }
solana-tpu-client = { git = "https://github.com/solana-labs/solana.git" }
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git" }
solana-version = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc = { git = "https://github.com/solana-labs/solana.git" }
solana-perf = { git = "https://github.com/solana-labs/solana.git" }
tokio = { version = "1.14.1", features = ["full"]}
tokio-util = { version = "0.6", features = ["codec", "compat"] }
@ -81,3 +68,4 @@ spl-token = { version = "=3.5.0", features = ["no-entrypoint"] }
spl-token-2022 = { version = "0.5.0", features = ["no-entrypoint"] }
stream-cancel = "0.8.1"
thiserror = "1.0.37"
chrono = "0.4.23"

View File

@ -1,4 +1,4 @@
import { Connection, Keypair, LAMPORTS_PER_SOL, PublicKey, TransactionSignature } from '@solana/web3.js';
import { Connection, Keypair, LAMPORTS_PER_SOL, PublicKey, TransactionSignature, Transaction } from '@solana/web3.js';
import * as fs from 'fs';
import * as splToken from "@solana/spl-token";
import * as os from 'os';
@ -17,6 +17,10 @@ function sleep(ms: number) {
console.log("benching " + tps + " transactions per second on " + url + " for " + forSeconds + " seconds");
function delay(ms: number) {
return new Promise( resolve => setTimeout(resolve, ms) );
}
export async function main() {
const connection = new Connection(url, 'confirmed');
@ -31,12 +35,14 @@ export async function main() {
const users = InFile.users.map(x => Keypair.fromSecretKey(Uint8Array.from(x.secretKey)));
const userAccounts = InFile.tokenAccounts.map(x => new PublicKey(x));
let promises_to_unpack : Promise<TransactionSignature>[][] = [];
let signatures_to_unpack : TransactionSignature[][] = [];
let time_taken_to_send = [];
for (let i = 0; i<forSeconds; ++i)
{
const start = performance.now();
let promises : Promise<TransactionSignature>[] = [];
let signatures : TransactionSignature[] = [];
const blockhash = (await connection.getLatestBlockhash()).blockhash;
for (let j=0; j<tps; ++j)
{
const toIndex = Math.floor(Math.random() * users.length);
@ -48,49 +54,50 @@ export async function main() {
const userFrom = userAccounts[fromIndex];
const userTo = userAccounts[toIndex];
if(skip_confirmations === false) {
promises.push(
splToken.transfer(
connection,
authority,
userFrom,
userTo,
users[fromIndex],
100,
)
)
const transaction = new Transaction().add(
splToken.createTransferInstruction(userFrom, userTo, users[fromIndex].publicKey, Math.ceil(Math.random() * 100))
);
transaction.recentBlockhash = blockhash;
transaction.feePayer = authority.publicKey;
const p = connection.sendTransaction(transaction, [authority, users[fromIndex]], {skipPreflight: true});
signatures.push(await p)
}
}
if (skip_confirmations === false)
{
promises_to_unpack.push(promises)
signatures_to_unpack.push(signatures)
}
const end = performance.now();
const diff = (end - start);
if (diff > 0) {
time_taken_to_send[i] = diff;
if (diff > 0 && diff < 1000) {
await sleep(1000 - diff)
}
}
console.log('finish sending transactions');
await delay(5000)
console.log('checking for confirmations');
if(skip_confirmations === false) {
const size = promises_to_unpack.length
const size = signatures_to_unpack.length
let successes : Uint32Array = new Uint32Array(size).fill(0);
let failures : Uint32Array = new Uint32Array(size).fill(0);
for (let i=0; i< size; ++i)
{
const promises = promises_to_unpack[i];
await Promise.all( promises.map( promise => {
promise.then((_fullfil)=>{
Atomics.add(successes, i, 1);
},
(_reject)=>{
Atomics.add(failures, i, 1);
})
}))
const signatures = signatures_to_unpack[i];
for (const signature of signatures) {
const confirmed = await connection.getSignatureStatus(signature);
if (confirmed != null && confirmed.value != null && confirmed.value.err == null) {
successes[i]++;
} else {
failures[i]++;
}
}
}
console.log("sucesses " + successes)
console.log("failures " + failures)
console.log("sucesses : " + successes)
console.log("failures : " + failures)
//console.log("time taken to send : " + time_taken_to_send)
}
}

View File

@ -0,0 +1,82 @@
import { Connection, Keypair, LAMPORTS_PER_SOL, PublicKey, TransactionSignature } from '@solana/web3.js';
import * as fs from 'fs';
import * as splToken from "@solana/spl-token";
import * as os from 'os';
// number of users
const tps : number = +process.argv[2];
const forSeconds : number = +process.argv[3];
// url
const url = process.argv.length > 4 ? process.argv[4] : "http://localhost:8899";
import * as InFile from "./out.json";
function sleep(ms: number) {
return new Promise( resolve => setTimeout(resolve, ms) );
}
console.log("benching " + tps + " transactions per second on " + url + " for " + forSeconds + " seconds");
export async function main() {
const connection = new Connection(url, 'confirmed');
const authority = Keypair.fromSecretKey(
Uint8Array.from(
JSON.parse(
process.env.KEYPAIR ||
fs.readFileSync(os.homedir() + '/.config/solana/id.json', 'utf-8'),
),
),
);
const users = InFile.users.map(x => Keypair.fromSecretKey(Uint8Array.from(x.secretKey)));
const userAccounts = InFile.tokenAccounts.map(x => new PublicKey(x));
let successes : Uint32Array = new Uint32Array(forSeconds).fill(0);
let failures : Uint32Array = new Uint32Array(forSeconds).fill(0);
let promises : Promise<void>[] = [];
for (let i = 0; i<forSeconds; ++i)
{
const start = performance.now();
let signatures : TransactionSignature[] = [];
for (let j=0; j<tps; ++j)
{
const toIndex = Math.floor(Math.random() * users.length);
let fromIndex = toIndex;
while (fromIndex === toIndex)
{
fromIndex = Math.floor(Math.random() * users.length);
}
const userFrom = userAccounts[fromIndex];
const userTo = userAccounts[toIndex];
const p = splToken.transfer(
connection,
authority,
userFrom,
userTo,
users[fromIndex],
100,
).then((_)=> {successes[i]++}, (_) => {failures[i]++})
promises.push(p)
}
const end = performance.now();
const diff = (end - start);
if (diff > 0) {
await sleep(1000 - diff)
}
}
for (const p of promises)
{
await p;
}
console.log("successes : " + successes);
console.log("failures : " + failures);
}
main().then(x => {
console.log('finished sucessfully')
}).catch(e => {
console.log('caught an error : ' + e)
})

View File

@ -6,7 +6,7 @@ import * as os from 'os';
// number of users
const nbUsers = +process.argv[2];
// url
const url = process.argv.length > 3 ? process.argv[3] : "http://localhost:8899";
const url = process.argv.length > 3 ? process.argv[3] : "http://0.0.0.0:8899";
// outfile
const outFile = process.argv.length > 4 ? process.argv[4] : "out.json";

View File

@ -5,7 +5,7 @@ use solana_client::{
rpc_response::Response as RpcResponse,
};
pub const LOCAL_LIGHT_RPC_ADDR: &str = "http://127.0.0.1:8890";
pub const LOCAL_LIGHT_RPC_ADDR: &str = "http://0.0.0.0:8890";
pub struct LiteClient(pub RpcClient);
@ -35,4 +35,3 @@ impl LiteClient {
.unwrap()
}
}

View File

@ -1,6 +1,5 @@
use clap::Subcommand;
use clap::Parser;
use solana_cli_config::ConfigInput;
/// Holds the configuration for a single run of the benchmark
#[derive(Parser, Debug)]
@ -13,24 +12,18 @@ use clap::Parser;
"
)]
pub struct Args {
#[clap(subcommand)]
pub command: Command,
/*
#[arg(short, long, default_value_t = String::from("8899"))]
pub port: String,
#[arg(short, long, default_value_t = String::from("8900"))]
pub subscription_port: String,
#[arg(short, long, default_value_t = 9000)]
pub port: u16,
#[arg(short, long, default_value_t = 9001)]
pub subscription_port: u16,
#[arg(short, long, default_value_t = String::from("http://localhost:8899"))]
pub rpc_url: String,
#[arg(short, long, default_value_t = String::new())]
#[arg(short, long, default_value_t = String::from("ws://localhost:8900"))]
pub websocket_url: String,
*/
}
/*
impl Args {
pub fn resolve_address(&mut self) {
if self.rpc_url.is_empty() {
let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting(
self.rpc_url.as_str(),
@ -48,20 +41,4 @@ impl Args {
self.websocket_url = ws_url;
}
}
}
*/
#[derive(Subcommand, Debug)]
pub enum Command {
Run {
#[arg(short, long, default_value_t = String::from("8899"))]
port: String,
#[arg(short, long, default_value_t = String::from("8900"))]
subscription_port: String,
#[arg(short, long, default_value_t = String::from("http://localhost:8899"))]
rpc_url: String,
#[arg(short, long, default_value_t = String::new())]
websocket_url: String,
},
Test,
}

View File

@ -6,11 +6,12 @@ use solana_client::{
rpc_response::{ProcessedSignatureResult, RpcResponseContext, RpcSignatureResult, SlotInfo},
};
use solana_rpc::rpc_subscription_tracker::{
SignatureSubscriptionParams, SubscriptionId, SubscriptionParams,
SignatureSubscriptionParams, SubscriptionParams,
};
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
transaction::TransactionError,
};
use std::{
sync::{
@ -48,8 +49,14 @@ impl BlockInformation {
}
}
pub struct SignatureStatus {
pub status: Option<CommitmentLevel>,
pub error: Option<TransactionError>,
pub created: Instant,
}
pub struct LiteRpcContext {
pub signature_status: DashMap<String, Option<CommitmentLevel>>,
pub signature_status: DashMap<String, SignatureStatus>,
pub finalized_block_info: BlockInformation,
pub confirmed_block_info: BlockInformation,
pub notification_sender: Sender<NotificationType>,
@ -67,6 +74,11 @@ impl LiteRpcContext {
notification_sender,
}
}
pub fn remove_stale_data(&self, purgetime_in_seconds: u64) {
self.signature_status
.retain(|_k, v| v.created.elapsed().as_secs() < purgetime_in_seconds);
}
}
pub struct SignatureNotification {
@ -88,6 +100,8 @@ pub enum NotificationType {
Slot(SlotNotification),
}
type SubscriptionId = u64;
#[derive(Debug, Serialize)]
struct NotificationParams<T> {
result: T,
@ -199,8 +213,14 @@ impl LiteRpcSubsrciptionControl {
},
};
let json = serde_json::to_string(&notification).unwrap();
let subscription_id = *x.get();
// no more notification for this signature has been finalized
if data.commitment.eq(&CommitmentLevel::Finalized) {
x.remove();
}
Some(LiteRpcNotification {
subscription_id: *x.get(),
subscription_id,
created_at: Instant::now(),
is_final: false,
json,
@ -212,9 +232,9 @@ impl LiteRpcSubsrciptionControl {
NotificationType::Slot(data) => {
// SubscriptionId 0 will be used for slots
let subscription_id = if data.commitment == CommitmentLevel::Confirmed {
SubscriptionId::from(0)
0
} else {
SubscriptionId::from(1)
1
};
let value = SlotInfo {
parent: data.parent,
@ -253,85 +273,107 @@ impl LiteRpcSubsrciptionControl {
#[derive(Clone)]
pub struct PerformanceCounter {
pub total_finalized: Arc<AtomicU64>,
pub total_confirmations: Arc<AtomicU64>,
pub total_transactions_sent: Arc<AtomicU64>,
pub transaction_sent_error: Arc<AtomicU64>,
pub total_transactions_recieved: Arc<AtomicU64>,
pub confirmations_per_seconds: Arc<AtomicU64>,
pub transactions_per_seconds: Arc<AtomicU64>,
last_count_for_finalized: Arc<AtomicU64>,
last_count_for_confirmations: Arc<AtomicU64>,
last_count_for_transactions_sent: Arc<AtomicU64>,
last_count_for_sent_errors: Arc<AtomicU64>,
last_count_for_transactions_recieved: Arc<AtomicU64>,
}
pub struct PerformancePerSec {
pub finalized_per_seconds: u64,
pub confirmations_per_seconds: u64,
pub transactions_per_seconds: u64,
pub send_transactions_errors_per_seconds: u64,
pub transaction_recieved_per_second: u64,
}
impl PerformanceCounter {
pub fn new() -> Self {
Self {
total_finalized: Arc::new(AtomicU64::new(0)),
total_confirmations: Arc::new(AtomicU64::new(0)),
total_transactions_sent: Arc::new(AtomicU64::new(0)),
confirmations_per_seconds: Arc::new(AtomicU64::new(0)),
transactions_per_seconds: Arc::new(AtomicU64::new(0)),
total_transactions_recieved: Arc::new(AtomicU64::new(0)),
transaction_sent_error: Arc::new(AtomicU64::new(0)),
last_count_for_finalized: Arc::new(AtomicU64::new(0)),
last_count_for_confirmations: Arc::new(AtomicU64::new(0)),
last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)),
last_count_for_transactions_recieved: Arc::new(AtomicU64::new(0)),
last_count_for_sent_errors: Arc::new(AtomicU64::new(0)),
}
}
pub fn update_per_seconds_transactions(&self) {
pub fn update_per_seconds_transactions(&self) -> PerformancePerSec {
let total_finalized: u64 = self.total_finalized.load(Ordering::Relaxed);
let total_confirmations: u64 = self.total_confirmations.load(Ordering::Relaxed);
let total_transactions: u64 = self.total_transactions_sent.load(Ordering::Relaxed);
let total_errors: u64 = self.transaction_sent_error.load(Ordering::Relaxed);
let total_transactions_recieved: u64 =
self.total_transactions_recieved.load(Ordering::Relaxed);
self.confirmations_per_seconds.store(
total_confirmations - self.last_count_for_confirmations.load(Ordering::Relaxed),
Ordering::Release,
);
self.transactions_per_seconds.store(
total_transactions
- self
.last_count_for_transactions_sent
.load(Ordering::Relaxed),
Ordering::Release,
);
let finalized_per_seconds = total_finalized
- self
.last_count_for_finalized
.swap(total_finalized, Ordering::Relaxed);
let confirmations_per_seconds = total_confirmations
- self
.last_count_for_confirmations
.swap(total_confirmations, Ordering::Relaxed);
let transactions_per_seconds = total_transactions
- self
.last_count_for_transactions_sent
.swap(total_transactions, Ordering::Relaxed);
let send_transactions_errors_per_seconds = total_errors
- self
.last_count_for_sent_errors
.swap(total_errors, Ordering::Relaxed);
let transaction_recieved_per_second = total_transactions_recieved
- self
.last_count_for_transactions_recieved
.swap(total_transactions_recieved, Ordering::Relaxed);
self.last_count_for_confirmations
.store(total_confirmations, Ordering::Relaxed);
self.last_count_for_transactions_sent
.store(total_transactions, Ordering::Relaxed);
}
pub fn update_sent_transactions_counter(&self) {
self.total_transactions_sent.fetch_add(1, Ordering::Relaxed);
}
pub fn update_confirm_transaction_counter(&self) {
self.total_confirmations.fetch_add(1, Ordering::Relaxed);
PerformancePerSec {
confirmations_per_seconds,
finalized_per_seconds,
send_transactions_errors_per_seconds,
transaction_recieved_per_second,
transactions_per_seconds,
}
}
}
const PRINT_COUNTERS : bool = true;
pub fn launch_performance_updating_thread(
performance_counter: PerformanceCounter,
) -> JoinHandle<()> {
Builder::new()
.name("Performance Counter".to_string())
.spawn(move || loop {
let start = Instant::now();
.spawn(move || {
let mut nb_seconds: u64 = 0;
loop {
let start = Instant::now();
let wait_time = Duration::from_millis(1000);
let performance_counter = performance_counter.clone();
performance_counter.update_per_seconds_transactions();
let confirmations_per_seconds = performance_counter
.confirmations_per_seconds
.load(Ordering::Acquire);
let total_transactions_per_seconds = performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
let runtime = start.elapsed();
if let Some(remaining) = wait_time.checked_sub(runtime) {
println!(
"Sent {} transactions and confrimed {} transactions",
total_transactions_per_seconds, confirmations_per_seconds
);
thread::sleep(remaining);
let wait_time = Duration::from_millis(1000);
let performance_counter = performance_counter.clone();
let data = performance_counter.update_per_seconds_transactions();
if PRINT_COUNTERS {
println!(
"At {} second, Recieved {}, Sent {} transactions, finalized {} and confirmed {} transactions",
nb_seconds, data.transaction_recieved_per_second, data.transactions_per_seconds, data.finalized_per_seconds, data.confirmations_per_seconds
);
}
let runtime = start.elapsed();
nb_seconds += 1;
if let Some(remaining) = wait_time.checked_sub(runtime) {
thread::sleep(remaining);
}
}
})
.unwrap()

View File

@ -3,7 +3,7 @@ mod context;
mod pubsub;
mod rpc;
use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc, thread::sleep};
use clap::Parser;
use context::LiteRpcSubsrciptionControl;
@ -23,7 +23,7 @@ use crate::{
};
use cli::Args;
fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: String) {
fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String) {
let rpc_url = if rpc_url.is_empty() {
let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting(
rpc_url.as_str(),
@ -51,7 +51,7 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
let performance_counter = PerformanceCounter::new();
launch_performance_updating_thread(performance_counter.clone());
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(128);
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(10000);
let (notification_sender, notification_reciever) = crossbeam_channel::unbounded();
let pubsub_control = Arc::new(LiteRpcSubsrciptionControl::new(
@ -59,7 +59,7 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
notification_reciever,
));
let subscription_port = format!("127.0.0.1:{}", subscription_port)
let subscription_port = format!("0.0.0.0:{}", subscription_port)
.parse::<SocketAddr>()
.expect("Invalid subscription port");
@ -69,15 +69,17 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
subscription_port,
performance_counter.clone(),
);
{
let _broadcast_thread = {
// build broadcasting thread
let pubsub_control = pubsub_control.clone();
std::thread::Builder::new()
.name("broadcasting thread".to_string())
.spawn(move || {
pubsub_control.start_broadcasting();
})
.unwrap();
}
.unwrap()
};
let mut io = MetaIoHandler::default();
let lite_rpc = lite_rpc::LightRpc;
io.extend_with(lite_rpc.to_delegate());
@ -88,9 +90,21 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
notification_sender,
performance_counter.clone(),
);
let _cleaning_thread = {
// build cleaning thread
let context = request_processor.context.clone();
std::thread::Builder::new()
.name("cleaning thread".to_string())
.spawn(move || {
context.remove_stale_data(60 * 10);
sleep(std::time::Duration::from_secs(60))
})
.unwrap()
};
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.worker_threads(64)
.on_thread_start(move || renice_this_thread(0).unwrap())
.thread_name("solLiteRpcProcessor")
.enable_all()
@ -99,7 +113,7 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
);
let max_request_body_size: usize = 50 * (1 << 10);
let socket_addr = format!("127.0.0.1:{}", port).parse::<SocketAddr>().unwrap();
let socket_addr = format!("0.0.0.0:{}", port).parse::<SocketAddr>().unwrap();
{
let request_processor = request_processor.clone();
@ -122,39 +136,15 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
websocket_service.close().unwrap();
}
fn ts_test() {
let res = std::process::Command::new("yarn")
.args(["run", "test:test-validator"])
.output()
.unwrap();
println!("{}", String::from_utf8_lossy(&res.stdout));
println!("{}", String::from_utf8_lossy(&res.stderr));
}
pub fn main() {
let cli_command = Args::parse();
let mut cli_command = Args::parse();
cli_command.resolve_address();
match cli_command.command {
cli::Command::Run {
port,
subscription_port,
rpc_url,
websocket_url,
} => run(port, subscription_port, rpc_url, websocket_url),
cli::Command::Test => ts_test(),
}
//cli_config.resolve_address();
//println!(
// "Using rpc server {} and ws server {}",
// cli_config.rpc_url, cli_config.websocket_url
//);
//let Args {
// rpc_url: json_rpc_url,
// websocket_url,
// port: rpc_addr,
// subscription_port,
// ..
//} = &cli_config;
// start recieving notifications and broadcast them
let Args {
port,
subscription_port,
rpc_url,
websocket_url,
} = cli_command;
run(port, subscription_port, rpc_url, websocket_url)
}

View File

@ -1,8 +1,8 @@
use dashmap::DashMap;
use jsonrpc_core::{ErrorCode, IoHandler};
use soketto::handshake::{server, Server};
use solana_rpc::rpc_subscription_tracker::{SignatureSubscriptionParams, SubscriptionParams};
use std::{net::SocketAddr, str::FromStr, thread::JoinHandle};
use std::{net::SocketAddr, str::FromStr, thread::JoinHandle, collections::{BTreeSet}, sync::{RwLock}};
use stream_cancel::{Trigger, Tripwire};
use tokio::{net::TcpStream, pin, select};
use tokio_util::compat::TokioAsyncReadCompatExt;
@ -11,12 +11,13 @@ use crate::context::{LiteRpcSubsrciptionControl, PerformanceCounter};
use {
jsonrpc_core::{Error, Result},
jsonrpc_derive::rpc,
solana_rpc::rpc_subscription_tracker::SubscriptionId,
solana_rpc_client_api::config::*,
solana_sdk::signature::Signature,
std::sync::Arc,
};
type SubscriptionId = u64;
#[rpc]
pub trait LiteRpcPubSub {
// Get notification when signature is verified
@ -44,13 +45,13 @@ pub trait LiteRpcPubSub {
#[derive(Clone)]
pub struct LiteRpcPubSubImpl {
subscription_control: Arc<LiteRpcSubsrciptionControl>,
pub current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionParams>>,
pub current_subscriptions: Arc<RwLock<BTreeSet<u64>>>,
}
impl LiteRpcPubSubImpl {
pub fn new(subscription_control: Arc<LiteRpcSubsrciptionControl>) -> Self {
Self {
current_subscriptions: Arc::new(DashMap::new()),
current_subscriptions: Arc::new(RwLock::new(BTreeSet::new())),
subscription_control,
}
}
@ -69,20 +70,34 @@ impl LiteRpcPubSubImpl {
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let new_subsription_id = SubscriptionId::from(new_subscription_id);
x.insert(new_subsription_id);
self.current_subscriptions
.insert(new_subsription_id, params);
Ok(new_subsription_id)
let mut lock = self.current_subscriptions.write();
match &mut lock {
Ok(set) => {
set.insert(new_subsription_id);
Ok(new_subsription_id)
},
Err(_) => {
Err(Error::new(jsonrpc_core::ErrorCode::InternalError))
}
}
}
}
}
fn unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
match self.current_subscriptions.entry(id) {
dashmap::mapref::entry::Entry::Occupied(x) => {
x.remove();
Ok(true)
let mut lock = self.current_subscriptions.write();
match &mut lock {
Ok(set) => {
if set.contains(&id) {
set.remove(&id);
return Ok(true)
}
return Ok(false)
},
Err(_) => {
Err(Error::new(jsonrpc_core::ErrorCode::InternalError))
}
dashmap::mapref::entry::Entry::Vacant(_) => Ok(false),
}
}
}
@ -107,7 +122,8 @@ impl LiteRpcPubSub for LiteRpcPubSubImpl {
commitment: config.commitment.unwrap_or_default(),
enable_received_notification: false,
};
self.subscribe(SubscriptionParams::Signature(params))
let id = self.subscribe(SubscriptionParams::Signature(params));
id
}
fn signature_unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
@ -116,15 +132,34 @@ impl LiteRpcPubSub for LiteRpcPubSubImpl {
// Get notification when slot is encountered
fn slot_subscribe(&self) -> Result<SubscriptionId> {
self.current_subscriptions
.insert(SubscriptionId::from(0), SubscriptionParams::Slot);
Ok(SubscriptionId::from(0))
let mut lock = self.current_subscriptions.write();
match &mut lock {
Ok(set) => {
set.insert(0);
Ok(0)
},
Err(_) => {
Err(Error::new(jsonrpc_core::ErrorCode::InternalError))
}
}
}
// Unsubscribe from slot notification subscription.
fn slot_unsubscribe(&self, _id: SubscriptionId) -> Result<bool> {
self.current_subscriptions.remove(&SubscriptionId::from(0));
Ok(true)
let mut lock = self.current_subscriptions.write();
match &mut lock {
Ok(set) => {
if set.contains(&0) {
set.remove(&0);
return Ok(true)
}
return Ok(false)
},
Err(_) => {
Err(Error::new(jsonrpc_core::ErrorCode::InternalError))
}
}
}
}
@ -145,7 +180,7 @@ enum HandleError {
async fn handle_connection(
socket: TcpStream,
subscription_control: Arc<LiteRpcSubsrciptionControl>,
performance_counter: PerformanceCounter,
_performance_counter: PerformanceCounter,
) -> core::result::Result<(), HandleError> {
let mut server = Server::new(socket.compat());
let request = server.receive_request().await?;
@ -177,8 +212,7 @@ async fn handle_connection(
},
result = broadcast_receiver.recv() => {
if let Ok(x) = result {
if rpc_impl.current_subscriptions.contains_key(&x.subscription_id) {
performance_counter.update_confirm_transaction_counter();
if rpc_impl.current_subscriptions.read().unwrap().contains(&x.subscription_id) {
sender.send_text(&x.json).await?;
}
}
@ -235,7 +269,7 @@ impl LitePubSubService {
.name("solRpcPubSub".to_string())
.spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(512)
.worker_threads(128)
.enable_all()
.build()
.expect("runtime creation failed");

View File

@ -5,15 +5,17 @@ use solana_client::{
tpu_client::TpuClientConfig,
};
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
use solana_sdk::transaction::Transaction;
use std::{
str::FromStr,
sync::Mutex,
thread::{Builder, JoinHandle},
time::{Duration, Instant},
};
use crate::context::{
BlockInformation, LiteRpcContext, NotificationType, PerformanceCounter, SignatureNotification,
SlotNotification,
SignatureStatus, SlotNotification,
};
use crossbeam_channel::Sender;
use {
@ -22,7 +24,8 @@ use {
jsonrpc_core::{Error, Metadata, Result},
jsonrpc_derive::rpc,
solana_client::connection_cache::ConnectionCache,
solana_client::{rpc_client::RpcClient, tpu_client::TpuClient},
solana_client::rpc_client::RpcClient,
solana_client::tpu_client::TpuClient,
solana_perf::packet::PACKET_DATA_SIZE,
solana_rpc_client_api::{
config::*,
@ -31,7 +34,6 @@ use {
solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
transaction::VersionedTransaction,
},
solana_transaction_status::{TransactionBinaryEncoding, UiTransactionEncoding},
std::{
@ -40,10 +42,11 @@ use {
},
};
const TPU_BATCH_SIZE: usize = 8;
#[derive(Clone)]
pub struct LightRpcRequestProcessor {
pub rpc_client: Arc<RpcClient>,
pub tpu_client: Arc<TpuClient>,
pub last_valid_block_height: u64,
pub ws_url: String,
pub context: Arc<LiteRpcContext>,
@ -51,6 +54,7 @@ pub struct LightRpcRequestProcessor {
joinables: Arc<Mutex<Vec<JoinHandle<()>>>>,
subscribed_clients: Arc<Mutex<Vec<PubsubBlockClientSubscription>>>,
performance_counter: PerformanceCounter,
tpu_producer_channel: Sender<Transaction>,
}
impl LightRpcRequestProcessor {
@ -61,19 +65,10 @@ impl LightRpcRequestProcessor {
performance_counter: PerformanceCounter,
) -> LightRpcRequestProcessor {
let rpc_client = Arc::new(RpcClient::new(json_rpc_url));
let connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = Arc::new(
TpuClient::new_with_connection_cache(
rpc_client.clone(),
websocket_url,
TpuClientConfig::default(),
connection_cache.clone(),
)
.unwrap(),
);
let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender));
let connection_cache = Arc::new(ConnectionCache::default());
println!("ws_url {}", websocket_url);
// subscribe for confirmed_blocks
let (client_confirmed, receiver_confirmed) =
Self::subscribe_block(websocket_url, CommitmentLevel::Confirmed).unwrap();
@ -82,23 +77,33 @@ impl LightRpcRequestProcessor {
let (client_finalized, receiver_finalized) =
Self::subscribe_block(websocket_url, CommitmentLevel::Finalized).unwrap();
let (tpu_producer, tpu_consumer) = crossbeam_channel::bounded(100000);
// create threads to listen for finalized and confrimed blocks
let joinables = vec![
Self::build_thread_to_process_blocks(
receiver_confirmed,
&context,
CommitmentLevel::Confirmed,
performance_counter.clone(),
),
Self::build_thread_to_process_blocks(
receiver_finalized,
&context,
CommitmentLevel::Finalized,
performance_counter.clone(),
),
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
&context,
tpu_consumer.clone(),
performance_counter.clone(),
),
];
LightRpcRequestProcessor {
rpc_client,
tpu_client,
last_valid_block_height: 0,
ws_url: websocket_url.to_string(),
context,
@ -106,6 +111,7 @@ impl LightRpcRequestProcessor {
joinables: Arc::new(Mutex::new(joinables)),
subscribed_clients: Arc::new(Mutex::new(vec![client_confirmed, client_finalized])),
performance_counter,
tpu_producer_channel: tpu_producer,
}
}
@ -119,9 +125,7 @@ impl LightRpcRequestProcessor {
Some(RpcBlockSubscribeConfig {
commitment: Some(CommitmentConfig { commitment }),
encoding: None,
transaction_details: Some(
solana_transaction_status::TransactionDetails::Signatures,
),
transaction_details: Some(solana_transaction_status::TransactionDetails::Full),
show_rewards: None,
max_supported_transaction_version: None,
}),
@ -132,6 +136,7 @@ impl LightRpcRequestProcessor {
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
context: &Arc<LiteRpcContext>,
commitment: CommitmentLevel,
performance_counters: PerformanceCounter,
) -> JoinHandle<()> {
let context = context.clone();
Builder::new()
@ -148,21 +153,135 @@ impl LightRpcRequestProcessor {
commitment,
&context.notification_sender,
block_info,
performance_counters,
);
})
.unwrap()
}
fn build_thread_to_process_transactions(
json_rpc_url: String,
websocket_url: String,
context: &Arc<LiteRpcContext>,
receiver: Receiver<Transaction>,
performance_counters: PerformanceCounter,
) -> JoinHandle<()> {
let context = context.clone();
Builder::new()
.name("thread working on confirmation block".to_string())
.spawn(move || {
let rpc_client =
Arc::new(RpcClient::new(json_rpc_url.to_string()));
let mut connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(),
websocket_url.as_str(),
TpuClientConfig::default(), // value for max fanout slots
connection_cache.clone(),
);
let mut tpu_client = Arc::new(tpu_client.unwrap());
let mut consecutive_errors: u8 = 0;
loop {
let recv_res = receiver.recv();
match recv_res {
Ok(transaction) => {
let (fut_res, count) = if TPU_BATCH_SIZE > 1 {
let mut transactions_vec = vec![transaction];
let mut time_remaining = Duration::from_micros(1000);
for _i in 1..TPU_BATCH_SIZE {
let start = std::time::Instant::now();
let another = receiver.recv_timeout(time_remaining);
match another {
Ok(x) => transactions_vec.push(x),
Err(_) => break,
}
match time_remaining.checked_sub(start.elapsed()) {
Some(x) => time_remaining = x,
None => break,
}
}
let count: u64 = transactions_vec.len() as u64;
let slice = transactions_vec.as_slice();
let fut_res = tpu_client.try_send_transaction_batch(slice);
// insert sent transactions into signature status map
transactions_vec.iter().for_each(|x| {
let signature = x.signatures[0].to_string();
context.signature_status.insert(
signature.clone(),
SignatureStatus {
status: None,
error: None,
created: Instant::now(),
},
);
});
(fut_res, count)
} else {
let fut_res = tpu_client.try_send_transaction(&transaction);
let signature = transaction.signatures[0].to_string();
context.signature_status.insert(
signature.clone(),
SignatureStatus {
status: None,
error: None,
created: Instant::now(),
},
);
(fut_res, 1)
};
match fut_res {
Ok(_) => {
consecutive_errors = 0;
performance_counters
.total_transactions_sent
.fetch_add(count, Ordering::Relaxed);
},
Err(e) => {
println!("Got error while sending transaction batch of size {}, error {}", count, e.to_string());
consecutive_errors += 1;
if consecutive_errors > 3 {
connection_cache = Arc::new(ConnectionCache::default());
let new_tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(),
websocket_url.as_str(),
TpuClientConfig::default(), // value for max fanout slots
connection_cache.clone(),
);
// reset TPU connection
tpu_client = Arc::new(new_tpu_client.unwrap());
}
performance_counters
.transaction_sent_error
.fetch_add(count, Ordering::Relaxed);
}
};
}
Err(e) => {
println!("got error on tpu channel {}", e.to_string());
break;
}
};
}
}).unwrap()
}
fn process_block(
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
signature_status: &DashMap<String, Option<CommitmentLevel>>,
signature_status: &DashMap<String, SignatureStatus>,
commitment: CommitmentLevel,
notification_sender: &crossbeam_channel::Sender<NotificationType>,
block_information: &BlockInformation,
performance_counters: PerformanceCounter,
) {
loop {
let block_data = reciever.recv();
match block_data {
Ok(data) => {
let block_update = &data.value;
@ -191,16 +310,28 @@ impl LightRpcRequestProcessor {
*lock = block.blockhash.clone();
}
if let Some(signatures) = &block.signatures {
for signature in signatures {
if let Some(transactions) = &block.transactions {
for transaction in transactions {
let decoded_transaction =
&transaction.transaction.decode().unwrap();
let signature = decoded_transaction.signatures[0].to_string();
match signature_status.entry(signature.clone()) {
dashmap::mapref::entry::Entry::Occupied(mut x) => {
// get signature status
let transaction_error = match &transaction.meta {
Some(x) => x.err.clone(),
None => {
println!("cannot decode transaction error");
None
}
};
let signature_notification = SignatureNotification {
signature: Signature::from_str(signature.as_str())
.unwrap(),
commitment,
slot: block_update.slot,
error: None,
error: transaction_error.clone().map(|x| x.to_string()),
};
if let Err(e) = notification_sender.send(
NotificationType::Signature(signature_notification),
@ -210,7 +341,21 @@ impl LightRpcRequestProcessor {
e.to_string()
);
}
x.insert(Some(commitment));
if commitment.eq(&CommitmentLevel::Finalized) {
performance_counters
.total_finalized
.fetch_add(1, Ordering::Relaxed);
} else {
performance_counters
.total_confirmations
.fetch_add(1, Ordering::Relaxed);
}
x.insert(SignatureStatus {
status: Some(commitment),
error: transaction_error,
created: Instant::now(),
});
}
dashmap::mapref::entry::Entry::Vacant(_x) => {
// do nothing transaction not sent by lite rpc
@ -268,7 +413,7 @@ pub mod lite_rpc {
use std::str::FromStr;
use itertools::Itertools;
use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey};
use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey, transaction::Transaction};
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
use super::*;
@ -308,12 +453,6 @@ pub mod lite_rpc {
config: Option<RpcRequestAirdropConfig>,
) -> Result<String>;
#[rpc(meta, name = "getPerformanceCounters")]
fn get_performance_counters(
&self,
meta: Self::Metadata,
) -> Result<RpcPerformanceCounterResults>;
#[rpc(meta, name = "getLatestBlockhash")]
fn get_latest_blockhash(
&self,
@ -352,15 +491,21 @@ pub mod lite_rpc {
tx_encoding
))
})?;
let (wire_transaction, transaction) =
decode_and_deserialize::<VersionedTransaction>(data, binary_encoding)?;
let transaction = decode_and_deserialize::<Transaction>(data, binary_encoding)?;
let signature = transaction.signatures[0].to_string();
meta.performance_counter
.total_transactions_recieved
.fetch_add(1, Ordering::Relaxed);
meta.context
.signature_status
.insert(transaction.signatures[0].to_string(), None);
meta.tpu_client.send_wire_transaction(wire_transaction);
meta.performance_counter.update_sent_transactions_counter();
Ok(transaction.signatures[0].to_string())
match meta.tpu_producer_channel.send(transaction) {
Ok(_) => Ok(signature),
Err(e) => {
println!("got error while sending on channel {}", e.to_string());
Err(jsonrpc_core::Error::new(
jsonrpc_core::ErrorCode::InternalError,
))
}
}
}
fn get_recent_blockhash(
@ -458,20 +603,21 @@ pub mod lite_rpc {
.slot
.load(Ordering::Relaxed)
};
meta.performance_counter
.update_confirm_transaction_counter();
match k_value {
Some(value) => match *value {
Some(commitment_for_signature) => Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: if commitment.eq(&CommitmentLevel::Finalized) {
commitment_for_signature.eq(&CommitmentLevel::Finalized)
Some(value) => match value.status {
Some(commitment) => {
let commitment_matches = if commitment.eq(&CommitmentLevel::Finalized) {
commitment.eq(&CommitmentLevel::Finalized)
} else {
commitment_for_signature.eq(&CommitmentLevel::Finalized)
|| commitment_for_signature.eq(&CommitmentLevel::Confirmed)
},
}),
commitment.eq(&CommitmentLevel::Finalized)
|| commitment.eq(&CommitmentLevel::Confirmed)
};
Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: commitment_matches && value.error.is_none(),
})
}
None => Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: false,
@ -513,17 +659,15 @@ pub mod lite_rpc {
let singature_status = meta.context.signature_status.get(x);
let k_value = singature_status;
match k_value {
Some(value) => match *value {
Some(commitment_for_signature) => {
Some(value) => match value.status {
Some(commitment_level) => {
let slot = meta
.context
.confirmed_block_info
.slot
.load(Ordering::Relaxed);
meta.performance_counter
.update_confirm_transaction_counter();
let status = match commitment_for_signature {
let status = match commitment_level {
CommitmentLevel::Finalized => {
TransactionConfirmationStatus::Finalized
}
@ -531,9 +675,9 @@ pub mod lite_rpc {
};
Some(TransactionStatus {
slot,
confirmations: Some(1),
confirmations: None,
status: Ok(()),
err: None,
err: value.error.clone(),
confirmation_status: Some(status),
})
}
@ -573,49 +717,12 @@ pub mod lite_rpc {
feature_set: Some(version.feature_set),
})
}
fn get_performance_counters(
&self,
meta: Self::Metadata,
) -> Result<RpcPerformanceCounterResults> {
let total_transactions_count = meta
.performance_counter
.total_transactions_sent
.load(Ordering::Relaxed);
let total_confirmations_count = meta
.performance_counter
.total_confirmations
.load(Ordering::Relaxed);
let transactions_per_seconds = meta
.performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
let confirmations_per_seconds = meta
.performance_counter
.confirmations_per_seconds
.load(Ordering::Acquire);
let procinfo::pid::Statm { size, .. } = procinfo::pid::statm_self().unwrap();
let procinfo::pid::Stat { num_threads, .. } = procinfo::pid::stat_self().unwrap();
Ok(RpcPerformanceCounterResults {
confirmations_per_seconds,
transactions_per_seconds,
total_confirmations_count,
total_transactions_count,
memory_used: size as u64,
nb_threads: num_threads as u64,
})
}
}
}
const MAX_BASE58_SIZE: usize = 1683; // Golden, bump if PACKET_DATA_SIZE changes
const MAX_BASE64_SIZE: usize = 1644; // Golden, bump if PACKET_DATA_SIZE changes
fn decode_and_deserialize<T>(
encoded: String,
encoding: TransactionBinaryEncoding,
) -> Result<(Vec<u8>, T)>
fn decode_and_deserialize<T>(encoded: String, encoding: TransactionBinaryEncoding) -> Result<T>
where
T: serde::de::DeserializeOwned,
{
@ -668,5 +775,4 @@ where
&err.to_string()
))
})
.map(|output| (wire_output, output))
}

View File

@ -3,8 +3,8 @@ use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_client::SerializableTransaction;
use solana_sdk::native_token::LAMPORTS_PER_SOL;
use lite_client::{LiteClient, LOCAL_LIGHT_RPC_ADDR};
use lite_bench_utils::{generate_txs, new_funded_payer, wait_till_confirmed};
use lite_client::{LiteClient, LOCAL_LIGHT_RPC_ADDR};
use simplelog::*;
const AMOUNT: usize = 100;

3859
yarn.lock

File diff suppressed because it is too large Load Diff