Fixing minor bugs

This commit is contained in:
godmodegalactus 2023-01-04 20:52:11 +01:00
parent 1e631c6199
commit 17e2d3f6f6
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
4 changed files with 19 additions and 16 deletions

View File

@ -278,7 +278,7 @@ pub struct PerformanceCounter {
pub total_transactions_sent: Arc<AtomicU64>, pub total_transactions_sent: Arc<AtomicU64>,
pub transaction_sent_error: Arc<AtomicU64>, pub transaction_sent_error: Arc<AtomicU64>,
pub finalized_per_seconds: Arc<AtomicU64>, pub finalized_per_seconds: Arc<AtomicU64>,
pub confirmations_per_seconds: Arc<AtomicU64>, pub confirmations_per_seconds: Arc<AtomicU64>,
pub transactions_per_seconds: Arc<AtomicU64>, pub transactions_per_seconds: Arc<AtomicU64>,
pub send_transactions_errors_per_seconds: Arc<AtomicU64>, pub send_transactions_errors_per_seconds: Arc<AtomicU64>,

View File

@ -104,7 +104,7 @@ fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String
let runtime = Arc::new( let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.worker_threads(1) .worker_threads(32)
.on_thread_start(move || renice_this_thread(0).unwrap()) .on_thread_start(move || renice_this_thread(0).unwrap())
.thread_name("solLiteRpcProcessor") .thread_name("solLiteRpcProcessor")
.enable_all() .enable_all()
@ -122,7 +122,7 @@ fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String
request_processor.clone() request_processor.clone()
}) })
.event_loop_executor(runtime.handle().clone()) .event_loop_executor(runtime.handle().clone())
.threads(4) .threads(32)
.cors(DomainsValidation::AllowOnly(vec![ .cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any, AccessControlAllowOrigin::Any,
])) ]))

View File

@ -250,7 +250,7 @@ impl LitePubSubService {
.name("solRpcPubSub".to_string()) .name("solRpcPubSub".to_string())
.spawn(move || { .spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread() let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1) .worker_threads(8)
.enable_all() .enable_all()
.build() .build()
.expect("runtime creation failed"); .expect("runtime creation failed");

View File

@ -42,7 +42,7 @@ use {
}, },
}; };
const TPU_BATCH_SIZE: usize = 64; const TPU_BATCH_SIZE: usize = 32;
#[derive(Clone)] #[derive(Clone)]
pub struct LightRpcRequestProcessor { pub struct LightRpcRequestProcessor {
@ -199,7 +199,7 @@ impl LightRpcRequestProcessor {
match recv_res { match recv_res {
Ok(transaction) => { Ok(transaction) => {
let mut transactions_vec = vec![transaction]; let mut transactions_vec = vec![transaction];
let mut time_remaining = Duration::from_micros(5000); let mut time_remaining = Duration::from_micros(200);
for _i in 1..TPU_BATCH_SIZE { for _i in 1..TPU_BATCH_SIZE {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
let another = receiver.recv_timeout(time_remaining); let another = receiver.recv_timeout(time_remaining);
@ -306,12 +306,15 @@ impl LightRpcRequestProcessor {
e.to_string() e.to_string()
); );
} }
if commitment.eq(&CommitmentLevel::Finalized) { if commitment.eq(&CommitmentLevel::Finalized) {
performance_counters.finalized_per_seconds.fetch_add(1, Ordering::Relaxed); performance_counters
.total_finalized
.fetch_add(1, Ordering::Relaxed);
} else { } else {
performance_counters.confirmations_per_seconds.fetch_add(1, Ordering::Relaxed); performance_counters
} .total_confirmations
.fetch_add(1, Ordering::Relaxed);
}
x.insert(SignatureStatus { x.insert(SignatureStatus {
status: Some(commitment), status: Some(commitment),
@ -462,13 +465,14 @@ pub mod lite_rpc {
let (_wire_transaction, transaction) = let (_wire_transaction, transaction) =
decode_and_deserialize::<Transaction>(data, binary_encoding)?; decode_and_deserialize::<Transaction>(data, binary_encoding)?;
let signature = transaction.signatures[0].to_string(); let signature = transaction.signatures[0].to_string();
meta.context meta.context.signature_status.insert(
.signature_status signature.clone(),
.insert(signature.clone(), SignatureStatus{ SignatureStatus {
status: None, status: None,
error: None, error: None,
created: Instant::now(), created: Instant::now(),
}); },
);
match meta.tpu_producer_channel.send(transaction) { match meta.tpu_producer_channel.send(transaction) {
Ok(_) => Ok(signature), Ok(_) => Ok(signature),
@ -588,8 +592,7 @@ pub mod lite_rpc {
}; };
Ok(RpcResponse { Ok(RpcResponse {
context: RpcResponseContext::new(slot), context: RpcResponseContext::new(slot),
value: commitment_matches value: commitment_matches && value.error.is_none(),
&& value.error.is_none(),
}) })
} }
None => Ok(RpcResponse { None => Ok(RpcResponse {