making the batching of transactions work

This commit is contained in:
Godmode Galactus 2022-12-11 18:06:39 +01:00
parent 7a4125aaa2
commit ca616bcf2b
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
4 changed files with 82 additions and 117 deletions

View File

@ -1,6 +1,5 @@
use clap::Subcommand;
use clap::Parser;
use solana_cli_config::ConfigInput;
/// Holds the configuration for a single run of the benchmark
#[derive(Parser, Debug)]
@ -13,24 +12,18 @@ use clap::Parser;
"
)]
pub struct Args {
#[clap(subcommand)]
pub command: Command,
/*
#[arg(short, long, default_value_t = String::from("8899"))]
pub port: String,
#[arg(short, long, default_value_t = String::from("8900"))]
pub subscription_port: String,
#[arg(short, long, default_value_t = 8899)]
pub port: u16,
#[arg(short, long, default_value_t = 8900)]
pub subscription_port: u16,
#[arg(short, long, default_value_t = String::from("http://localhost:8899"))]
pub rpc_url: String,
#[arg(short, long, default_value_t = String::new())]
#[arg(short, long, default_value_t = String::from("http://localhost:8900"))]
pub websocket_url: String,
*/
}
/*
impl Args {
pub fn resolve_address(&mut self) {
if self.rpc_url.is_empty() {
let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting(
self.rpc_url.as_str(),
@ -48,20 +41,4 @@ impl Args {
self.websocket_url = ws_url;
}
}
}
*/
#[derive(Subcommand, Debug)]
pub enum Command {
Run {
#[arg(short, long, default_value_t = String::from("8899"))]
port: String,
#[arg(short, long, default_value_t = String::from("8900"))]
subscription_port: String,
#[arg(short, long, default_value_t = String::from("http://localhost:8899"))]
rpc_url: String,
#[arg(short, long, default_value_t = String::new())]
websocket_url: String,
},
Test,
}

View File

@ -329,7 +329,7 @@ impl PerformanceCounter {
self.last_count_for_transactions_sent
.store(total_transactions, Ordering::Relaxed);
}
pub fn update_confirm_transaction_counter(&self) {
self.total_confirmations.fetch_add(1, Ordering::Relaxed);
}

View File

@ -1,4 +1,8 @@
use std::{net::SocketAddr, sync::Arc, thread::sleep};
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
thread::sleep,
};
use clap::Parser;
use context::LiteRpcSubsrciptionControl;
@ -24,7 +28,7 @@ mod rpc;
use cli::Args;
fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: String) {
fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String) {
let rpc_url = if rpc_url.is_empty() {
let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting(
rpc_url.as_str(),
@ -60,10 +64,8 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
notification_reciever,
));
let subscription_port = format!("127.0.0.1:{}",subscription_port)
.parse::<SocketAddr>()
.expect("Invalid subscription port");
let subscription_port =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), subscription_port);
// start websocket server
let (_trigger, websocket_service) = LitePubSubService::new(
@ -114,9 +116,7 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
);
let max_request_body_size: usize = 50 * (1 << 10);
let socket_addr = format!("127.0.0.1:{}",rpc_addr).parse::<SocketAddr>().unwrap();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
{
let request_processor = request_processor.clone();
let server =
@ -140,40 +140,15 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
cleaning_thread.join().unwrap();
}
fn ts_test() {
let res = std::process::Command::new("yarn")
.args(["run", "test:test-validator"])
.output()
.unwrap();
println!("{}", String::from_utf8_lossy(&res.stdout));
println!("{}", String::from_utf8_lossy(&res.stderr));
}
#[tokio::main]
pub async fn main() {
let cli_command = Args::parse();
let mut cli = Args::parse();
cli.resolve_address();
let Args {
port,
subscription_port,
rpc_url,
websocket_url,
} = cli;
match cli_command.command {
cli::Command::Run {
port,
subscription_port,
rpc_url,
websocket_url,
} => run(port, subscription_port, rpc_url, websocket_url),
cli::Command::Test => ts_test(),
}
//cli_config.resolve_address();
//println!(
// "Using rpc server {} and ws server {}",
// cli_config.rpc_url, cli_config.websocket_url
//);
//let Args {
// rpc_url: json_rpc_url,
// websocket_url,
// port: rpc_addr,
// subscription_port,
// ..
//} = &cli_config;
// start recieving notifications and broadcast them
run(port, subscription_port, rpc_url, websocket_url)
}

View File

@ -1,11 +1,11 @@
use dashmap::DashMap;
use futures::executor::block_on;
use serde::{Deserialize, Serialize};
use solana_client::{
pubsub_client::{BlockSubscription, PubsubClientError},
tpu_client::TpuClientConfig,
};
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
use solana_sdk::transaction::Transaction;
use std::{
str::FromStr,
sync::Mutex,
@ -25,7 +25,7 @@ use {
jsonrpc_derive::rpc,
solana_client::connection_cache::ConnectionCache,
solana_client::rpc_client::RpcClient,
solana_client::nonblocking::rpc_client::RpcClient as NonblockingRpcClient,
solana_client::tpu_client::TpuClient,
solana_perf::packet::PACKET_DATA_SIZE,
solana_rpc_client_api::{
config::*,
@ -34,9 +34,7 @@ use {
solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
transaction::VersionedTransaction,
},
solana_tpu_client::nonblocking::tpu_client::TpuClient,
solana_transaction_status::{TransactionBinaryEncoding, UiTransactionEncoding},
std::{
any::type_name,
@ -56,7 +54,7 @@ pub struct LightRpcRequestProcessor {
joinables: Arc<Mutex<Vec<JoinHandle<()>>>>,
subscribed_clients: Arc<Mutex<Vec<PubsubBlockClientSubscription>>>,
performance_counter: PerformanceCounter,
tpu_producer_channel: Sender<Vec<u8>>,
tpu_producer_channel: Sender<Transaction>,
}
impl LightRpcRequestProcessor {
@ -66,19 +64,8 @@ impl LightRpcRequestProcessor {
notification_sender: Sender<NotificationType>,
performance_counter: PerformanceCounter,
) -> LightRpcRequestProcessor {
let nonblocking_rpc_client = Arc::new(NonblockingRpcClient::new(json_rpc_url.to_string()));
let rpc_client = Arc::new(RpcClient::new(json_rpc_url));
let connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = Arc::new(
block_on(TpuClient::new_with_connection_cache(
nonblocking_rpc_client,
websocket_url,
TpuClientConfig {fanout_slots : 100}, // value for max fanout slots
connection_cache.clone(),
))
.unwrap(),
);
let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender));
@ -104,9 +91,16 @@ impl LightRpcRequestProcessor {
&context,
CommitmentLevel::Finalized,
),
Self::build_thread_to_process_transactions(tpu_client.clone(), tpu_consumer, performance_counter.clone()),
];
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
connection_cache.clone(),
tpu_consumer,
performance_counter.clone(),
);
LightRpcRequestProcessor {
rpc_client,
last_valid_block_height: 0,
@ -165,14 +159,28 @@ impl LightRpcRequestProcessor {
}
fn build_thread_to_process_transactions(
tpu_client: Arc<TpuClient>,
receiver: Receiver<Vec<u8>>,
performance_counters : PerformanceCounter,
) -> JoinHandle<()> {
json_rpc_url: String,
websocket_url: String,
connection_cache: Arc<ConnectionCache>,
receiver: Receiver<Transaction>,
performance_counters: PerformanceCounter,
) {
Builder::new()
.name("tpu sender".to_string())
.spawn(move || loop {
.name("thread working on confirmation block".to_string())
.spawn(move || {
let nonblocking_rpc_client =
Arc::new(RpcClient::new(json_rpc_url.to_string()));
let tpu_client = TpuClient::new_with_connection_cache(
nonblocking_rpc_client,
websocket_url.as_str(),
TpuClientConfig::default(), // value for max fanout slots
connection_cache.clone(),
);
let tpu_client = Arc::new(tpu_client.unwrap());
loop {
let recv_res = receiver.recv();
println!("recieved a transaction");
match recv_res {
Ok(transaction) => {
let mut transactions_vec = vec![transaction];
@ -190,14 +198,18 @@ impl LightRpcRequestProcessor {
None => break,
}
}
let count:u64 = transactions_vec.len() as u64;
let fut_res = block_on(tpu_client.try_send_wire_transaction_batch(transactions_vec));
match fut_res
{
Ok(_) => performance_counters.total_confirmations.fetch_add( count, Ordering::Relaxed),
let count: u64 = transactions_vec.len() as u64;
let slice = transactions_vec.as_slice();
let fut_res = tpu_client.try_send_transaction_batch(slice);
match fut_res {
Ok(_) => performance_counters
.total_transactions_sent
.fetch_add(count, Ordering::Relaxed),
Err(e) => {
println!("Got error while sending transaction batch of size {}, error {}", count, e.to_string());
performance_counters.transaction_sent_error.fetch_add(count, Ordering::Relaxed)
performance_counters
.transaction_sent_error
.fetch_add(count, Ordering::Relaxed)
}
};
}
@ -206,8 +218,8 @@ impl LightRpcRequestProcessor {
break;
}
};
})
.unwrap()
}
}).unwrap();
}
fn process_block(
@ -325,7 +337,7 @@ pub mod lite_rpc {
use std::str::FromStr;
use itertools::Itertools;
use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey};
use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey, transaction::Transaction};
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
use super::*;
@ -409,19 +421,20 @@ pub mod lite_rpc {
tx_encoding
))
})?;
let (wire_transaction, transaction) =
decode_and_deserialize::<VersionedTransaction>(data, binary_encoding)?;
let (_wire_transaction, transaction) =
decode_and_deserialize::<Transaction>(data, binary_encoding)?;
let signature = transaction.signatures[0].to_string();
meta.context
.signature_status
.insert(signature.clone(), SignatureStatus::new());
meta.context.signature_status.insert(
transaction.signatures[0].to_string(),
SignatureStatus::new(),
);
match meta.tpu_producer_channel.send(wire_transaction) {
Ok(_) => Ok(transaction.signatures[0].to_string()),
Err(e) => {
match meta.tpu_producer_channel.send(transaction) {
Ok(_) => Ok(signature),
Err(e) => {
println!("got error while sending on channel {}", e.to_string());
Err(jsonrpc_core::Error::new(jsonrpc_core::ErrorCode::InternalError))
Err(jsonrpc_core::Error::new(
jsonrpc_core::ErrorCode::InternalError,
))
}
}
}