Merge branch 'main' into main

This commit is contained in:
Kittycat 2022-12-07 19:31:14 +05:30 committed by GitHub
commit 8b6b351a1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 366 additions and 50 deletions

55
Cargo.lock generated
View File

@ -243,7 +243,7 @@ dependencies = [
"asn1-rs-derive",
"asn1-rs-impl",
"displaydoc",
"nom",
"nom 7.1.1",
"num-traits",
"rusticata-macros",
"thiserror",
@ -697,7 +697,7 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
"nom 7.1.1",
]
[[package]]
@ -1157,7 +1157,7 @@ checksum = "42d4bc9b0db0a0df9ae64634ac5bdefb7afcb534e182275ca0beadbe486701c1"
dependencies = [
"asn1-rs",
"displaydoc",
"nom",
"nom 7.1.1",
"num-bigint 0.4.3",
"num-traits",
"rusticata-macros",
@ -2410,6 +2410,7 @@ dependencies = [
"jsonrpc-pubsub",
"libc",
"log",
"procinfo",
"rayon",
"regex",
"serde",
@ -2659,6 +2660,12 @@ dependencies = [
"pin-utils",
]
[[package]]
name = "nom"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf51a729ecf40266a2368ad335a5fdde43471f545a967109cd62146ecf8b66ff"
[[package]]
name = "nom"
version = "7.1.1"
@ -3198,6 +3205,18 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "procinfo"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ab1427f3d2635891f842892dda177883dca0639e05fe66796a62c9d2f23b49c"
dependencies = [
"byteorder",
"libc",
"nom 2.2.1",
"rustc_version 0.2.3",
]
[[package]]
name = "prost"
version = "0.11.2"
@ -3616,6 +3635,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
dependencies = [
"semver 0.9.0",
]
[[package]]
name = "rustc_version"
version = "0.3.3"
@ -3640,7 +3668,7 @@ version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632"
dependencies = [
"nom",
"nom 7.1.1",
]
[[package]]
@ -3795,13 +3823,22 @@ dependencies = [
"libc",
]
[[package]]
name = "semver"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser 0.7.0",
]
[[package]]
name = "semver"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser",
"semver-parser 0.10.2",
]
[[package]]
@ -3810,6 +3847,12 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
[[package]]
name = "semver-parser"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "semver-parser"
version = "0.10.2"
@ -6558,7 +6601,7 @@ dependencies = [
"data-encoding",
"der-parser",
"lazy_static",
"nom",
"nom 7.1.1",
"oid-registry",
"rusticata-macros",
"thiserror",

View File

@ -41,6 +41,7 @@ jsonrpc-derive = "18.0.0"
jsonrpc-http-server = "18.0.0"
jsonrpc-pubsub = "18.0.0"
clap = { version = "4.0.29", features = ["derive"] }
procinfo = "0.4.2"
base64 = "0.13.0"
bincode = "1.3.3"

33
src/client.rs Normal file
View File

@ -0,0 +1,33 @@
use std::ops::{Deref, DerefMut};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_request::RpcRequest;
pub struct LiteClient(pub RpcClient);
impl Deref for LiteClient {
type Target = RpcClient;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for LiteClient {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl LiteClient {
pub async fn confirm_transaction(&self, signature: String) -> bool {
self.send(
RpcRequest::Custom {
method: "confirmTransaction",
},
serde_json::json!([signature]),
)
.await
.unwrap()
}
}

View File

@ -13,8 +13,12 @@ use solana_sdk::{
signature::Signature,
};
use std::{
sync::{atomic::AtomicU64, Arc, RwLock},
time::Instant,
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
};
use tokio::sync::broadcast;
@ -165,11 +169,6 @@ impl LiteRpcSubsrciptionControl {
Ok(notification_type) => {
let rpc_notification = match notification_type {
NotificationType::Signature(data) => {
println!(
"getting signature notification {} confirmation {}",
data.signature,
data.commitment.to_string()
);
let signature_params = SignatureSubscriptionParams {
commitment: CommitmentConfig {
commitment: data.commitment,
@ -251,3 +250,89 @@ impl LiteRpcSubsrciptionControl {
}
}
}
#[derive(Clone)]
pub struct PerformanceCounter {
pub total_confirmations: Arc<AtomicU64>,
pub total_transactions_sent: Arc<AtomicU64>,
pub confirmations_per_seconds: Arc<AtomicU64>,
pub transactions_per_seconds: Arc<AtomicU64>,
last_count_for_confirmations: Arc<AtomicU64>,
last_count_for_transactions_sent: Arc<AtomicU64>,
}
impl PerformanceCounter {
pub fn new() -> Self {
Self {
total_confirmations: Arc::new(AtomicU64::new(0)),
total_transactions_sent: Arc::new(AtomicU64::new(0)),
confirmations_per_seconds: Arc::new(AtomicU64::new(0)),
transactions_per_seconds: Arc::new(AtomicU64::new(0)),
last_count_for_confirmations: Arc::new(AtomicU64::new(0)),
last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)),
}
}
pub fn update_per_seconds_transactions(&self) {
let total_confirmations: u64 = self.total_confirmations.load(Ordering::Relaxed);
let total_transactions: u64 = self.total_transactions_sent.load(Ordering::Relaxed);
self.confirmations_per_seconds.store(
total_confirmations - self.last_count_for_confirmations.load(Ordering::Relaxed),
Ordering::Release,
);
self.transactions_per_seconds.store(
total_transactions
- self
.last_count_for_transactions_sent
.load(Ordering::Relaxed),
Ordering::Release,
);
self.last_count_for_confirmations
.store(total_confirmations, Ordering::Relaxed);
self.last_count_for_transactions_sent
.store(total_transactions, Ordering::Relaxed);
}
pub fn update_sent_transactions_counter(&self) {
self.total_transactions_sent.fetch_add(1, Ordering::Relaxed);
}
pub fn update_confirm_transaction_counter(&self) {
self.total_confirmations.fetch_add(1, Ordering::Relaxed);
}
}
pub fn launch_performance_updating_thread(
performance_counter: PerformanceCounter,
) -> JoinHandle<()> {
Builder::new()
.name("Performance Counter".to_string())
.spawn(move || loop {
let start = Instant::now();
let wait_time = Duration::from_millis(1000);
let performance_counter = performance_counter.clone();
performance_counter.update_per_seconds_transactions();
let confirmations_per_seconds = performance_counter
.confirmations_per_seconds
.load(Ordering::Acquire);
let total_transactions_per_seconds = performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
let runtime = start.elapsed();
if let Some(remaining) = wait_time.checked_sub(runtime) {
println!(
"Sent {} transactions and confrimed {} transactions",
total_transactions_per_seconds, confirmations_per_seconds
);
thread::sleep(remaining);
}
})
.unwrap()
}

View File

@ -8,11 +8,15 @@ use pubsub::LitePubSubService;
use solana_perf::thread::renice_this_thread;
use tokio::sync::broadcast;
use crate::rpc::{
lite_rpc::{self, Lite},
LightRpcRequestProcessor,
use crate::{
context::{launch_performance_updating_thread, PerformanceCounter},
rpc::{
lite_rpc::{self, Lite},
LightRpcRequestProcessor,
},
};
mod cli;
mod client;
mod context;
mod pubsub;
mod rpc;
@ -34,6 +38,9 @@ pub fn main() {
..
} = &cli_config;
let performance_counter = PerformanceCounter::new();
launch_performance_updating_thread(performance_counter.clone());
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(128);
let (notification_sender, notification_reciever) = crossbeam_channel::unbounded();
@ -47,8 +54,11 @@ pub fn main() {
.expect("Invalid subscription port");
// start websocket server
let (_trigger, websocket_service) =
LitePubSubService::new(pubsub_control.clone(), subscription_port);
let (_trigger, websocket_service) = LitePubSubService::new(
pubsub_control.clone(),
*subscription_port,
performance_counter.clone(),
);
// start recieving notifications and broadcast them
{
@ -64,8 +74,12 @@ pub fn main() {
let lite_rpc = lite_rpc::LightRpc;
io.extend_with(lite_rpc.to_delegate());
let mut request_processor =
LightRpcRequestProcessor::new(json_rpc_url, websocket_url, notification_sender);
let mut request_processor = LightRpcRequestProcessor::new(
json_rpc_url,
websocket_url,
notification_sender,
performance_counter.clone(),
);
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()

View File

@ -7,7 +7,7 @@ use stream_cancel::{Trigger, Tripwire};
use tokio::{net::TcpStream, pin, select};
use tokio_util::compat::TokioAsyncReadCompatExt;
use crate::context::LiteRpcSubsrciptionControl;
use crate::context::{LiteRpcSubsrciptionControl, PerformanceCounter};
use {
jsonrpc_core::{Error, Result},
jsonrpc_derive::rpc,
@ -145,6 +145,7 @@ enum HandleError {
async fn handle_connection(
socket: TcpStream,
subscription_control: Arc<LiteRpcSubsrciptionControl>,
performance_counter: PerformanceCounter,
) -> core::result::Result<(), HandleError> {
let mut server = Server::new(socket.compat());
let request = server.receive_request().await?;
@ -177,6 +178,7 @@ async fn handle_connection(
result = broadcast_receiver.recv() => {
if let Ok(x) = result {
if rpc_impl.current_subscriptions.contains_key(&x.subscription_id) {
performance_counter.update_confirm_transaction_counter();
sender.send_text(&x.json).await?;
}
}
@ -195,6 +197,7 @@ async fn listen(
listen_address: SocketAddr,
subscription_control: Arc<LiteRpcSubsrciptionControl>,
mut tripwire: Tripwire,
performance_counter: PerformanceCounter,
) -> std::io::Result<()> {
let listener = tokio::net::TcpListener::bind(&listen_address).await?;
loop {
@ -202,9 +205,10 @@ async fn listen(
result = listener.accept() => match result {
Ok((socket, addr)) => {
let subscription_control = subscription_control.clone();
let performance_counter = performance_counter.clone();
tokio::spawn(async move {
let handle = handle_connection(
socket, subscription_control
socket, subscription_control, performance_counter,
);
match handle.await {
Ok(()) => println!("connection closed ({:?})", addr),
@ -223,6 +227,7 @@ impl LitePubSubService {
pub fn new(
subscription_control: Arc<LiteRpcSubsrciptionControl>,
pubsub_addr: SocketAddr,
performance_counter: PerformanceCounter,
) -> (Trigger, Self) {
let (trigger, tripwire) = Tripwire::new();
@ -234,9 +239,12 @@ impl LitePubSubService {
.enable_all()
.build()
.expect("runtime creation failed");
if let Err(err) =
runtime.block_on(listen(pubsub_addr, subscription_control, tripwire))
{
if let Err(err) = runtime.block_on(listen(
pubsub_addr,
subscription_control,
tripwire,
performance_counter,
)) {
println!("pubsub service failed: {}", err);
};
})

View File

@ -1,6 +1,7 @@
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use solana_client::{
pubsub_client::{BlockSubscription, PubsubClientError, SignatureSubscription},
pubsub_client::{BlockSubscription, PubsubClientError},
tpu_client::TpuClientConfig,
};
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
@ -11,7 +12,8 @@ use std::{
};
use crate::context::{
BlockInformation, LiteRpcContext, NotificationType, SignatureNotification, SlotNotification,
BlockInformation, LiteRpcContext, NotificationType, PerformanceCounter, SignatureNotification,
SlotNotification,
};
use crossbeam_channel::Sender;
use {
@ -48,6 +50,7 @@ pub struct LightRpcRequestProcessor {
_connection_cache: Arc<ConnectionCache>,
joinables: Arc<Mutex<Vec<JoinHandle<()>>>>,
subscribed_clients: Arc<Mutex<Vec<PubsubBlockClientSubscription>>>,
performance_counter: PerformanceCounter,
}
impl LightRpcRequestProcessor {
@ -55,6 +58,7 @@ impl LightRpcRequestProcessor {
json_rpc_url: &str,
websocket_url: &str,
notification_sender: Sender<NotificationType>,
performance_counter: PerformanceCounter,
) -> LightRpcRequestProcessor {
let rpc_client = Arc::new(RpcClient::new(json_rpc_url));
let connection_cache = Arc::new(ConnectionCache::default());
@ -101,6 +105,7 @@ impl LightRpcRequestProcessor {
_connection_cache: connection_cache,
joinables: Arc::new(Mutex::new(joinables)),
subscribed_clients: Arc::new(Mutex::new(vec![client_confirmed, client_finalized])),
performance_counter,
}
}
@ -123,21 +128,6 @@ impl LightRpcRequestProcessor {
)
}
fn subscribe_signature(
websocket_url: &String,
signature: &Signature,
commitment: CommitmentLevel,
) -> std::result::Result<SignatureSubscription, PubsubClientError> {
PubsubClient::signature_subscribe(
websocket_url.as_str(),
signature,
Some(RpcSignatureSubscribeConfig {
commitment: Some(CommitmentConfig { commitment }),
enable_received_notification: Some(false),
}),
)
}
fn build_thread_to_process_blocks(
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
context: &Arc<LiteRpcContext>,
@ -170,7 +160,6 @@ impl LightRpcRequestProcessor {
notification_sender: &crossbeam_channel::Sender<NotificationType>,
block_information: &BlockInformation,
) {
println!("processing blocks for {}", commitment);
loop {
let block_data = reciever.recv();
@ -206,10 +195,7 @@ impl LightRpcRequestProcessor {
for signature in signatures {
match signature_status.entry(signature.clone()) {
dashmap::mapref::entry::Entry::Occupied(mut x) => {
println!(
"found signature {} for commitment {}",
signature, commitment
);
let signature_notification = SignatureNotification {
signature: Signature::from_str(signature.as_str())
.unwrap(),
@ -268,10 +254,23 @@ impl LightRpcRequestProcessor {
impl Metadata for LightRpcRequestProcessor {}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcPerformanceCounterResults {
pub transactions_per_seconds: u64,
pub confirmations_per_seconds: u64,
pub total_transactions_count: u64,
pub total_confirmations_count: u64,
pub memory_used: u64,
pub nb_threads: u64,
}
pub mod lite_rpc {
use std::str::FromStr;
use itertools::Itertools;
use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey};
use solana_transaction_status::{TransactionStatus, TransactionConfirmationStatus};
use super::*;
#[rpc]
@ -309,6 +308,28 @@ pub mod lite_rpc {
lamports: u64,
config: Option<RpcRequestAirdropConfig>,
) -> Result<String>;
#[rpc(meta, name = "getPerformanceCounters")]
fn get_performance_counters(
&self,
meta: Self::Metadata,
) -> Result<RpcPerformanceCounterResults>;
#[rpc(meta, name = "getLatestBlockhash")]
fn get_latest_blockhash(
&self,
meta: Self::Metadata,
config: Option<RpcContextConfig>,
) -> Result<RpcResponse<RpcBlockhash>>;
#[rpc(meta, name = "getSignatureStatuses")]
fn get_signature_statuses(
&self,
meta: Self::Metadata,
signature_strs: Vec<String>,
config: Option<RpcSignatureStatusConfig>,
) -> Result<RpcResponse<Vec<Option<TransactionStatus>>>>;
}
pub struct LightRpc;
impl Lite for LightRpc {
@ -335,8 +356,8 @@ pub mod lite_rpc {
meta.context
.signature_status
.insert(transaction.signatures[0].to_string(), None);
println!("added {} to map", transaction.signatures[0]);
meta.tpu_client.send_wire_transaction(wire_transaction);
meta.performance_counter.update_sent_transactions_counter();
Ok(transaction.signatures[0].to_string())
}
@ -347,7 +368,7 @@ pub mod lite_rpc {
) -> Result<RpcResponse<RpcBlockhashFeeCalculator>> {
let commitment = match commitment {
Some(x) => x.commitment,
None => CommitmentLevel::Confirmed,
None => CommitmentLevel::Finalized,
};
let (block_hash, slot) = match commitment {
CommitmentLevel::Finalized => {
@ -379,6 +400,37 @@ pub mod lite_rpc {
})
}
fn get_latest_blockhash(
&self,
meta: Self::Metadata,
config: Option<RpcContextConfig>,
) -> Result<RpcResponse<RpcBlockhash>> {
let commitment = match config {
Some(x) => match x.commitment {
Some(x) => x.commitment,
None => CommitmentLevel::Finalized,
},
None => CommitmentLevel::Finalized,
};
let block_info = match commitment {
CommitmentLevel::Finalized => &meta.context.finalized_block_info,
_ => &meta.context.confirmed_block_info,
};
let slot = block_info.slot.load(Ordering::Relaxed);
let last_valid_block_height = block_info.block_height.load(Ordering::Relaxed);
let blockhash = block_info.block_hash.read().unwrap().clone();
Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: RpcBlockhash {
blockhash,
last_valid_block_height,
},
})
}
fn confirm_transaction(
&self,
meta: Self::Metadata,
@ -404,11 +456,12 @@ pub mod lite_rpc {
.slot
.load(Ordering::Relaxed)
};
meta.performance_counter
.update_confirm_transaction_counter();
match k_value {
Some(value) => match *value {
Some(commitment_for_signature) => {
println!("found in cache");
Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: if commitment.eq(&CommitmentLevel::Finalized) {
@ -443,6 +496,51 @@ pub mod lite_rpc {
}
}
fn get_signature_statuses(
&self,
meta: Self::Metadata,
signature_strs: Vec<String>,
_config: Option<RpcSignatureStatusConfig>,
) -> Result<RpcResponse<Vec<Option<TransactionStatus>>>> {
let confirmed_slot = meta.context.confirmed_block_info.slot.load(Ordering::Relaxed);
let status = signature_strs.iter().map(|x| {
let singature_status = meta.context.signature_status.get(x);
let k_value = singature_status;
match k_value {
Some(value) => match *value {
Some(commitment_for_signature) => {
let slot = meta.context
.confirmed_block_info
.slot
.load(Ordering::Relaxed);
meta.performance_counter
.update_confirm_transaction_counter();
let status = match commitment_for_signature {
CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized,
_ => TransactionConfirmationStatus::Confirmed,
};
Some(TransactionStatus {
slot,
confirmations: Some(1),
status: Ok(()),
err: None,
confirmation_status : Some(status)
})
}
None => None,
},
None => None
}
}).collect_vec();
Ok(
RpcResponse {
context : RpcResponseContext::new(confirmed_slot),
value: status,
}
)
}
fn request_airdrop(
&self,
meta: Self::Metadata,
@ -459,6 +557,40 @@ pub mod lite_rpc {
};
Ok(signature.unwrap().to_string())
}
fn get_performance_counters(
&self,
meta: Self::Metadata,
) -> Result<RpcPerformanceCounterResults> {
let total_transactions_count = meta
.performance_counter
.total_transactions_sent
.load(Ordering::Relaxed);
let total_confirmations_count = meta
.performance_counter
.total_confirmations
.load(Ordering::Relaxed);
let transactions_per_seconds = meta
.performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
let confirmations_per_seconds = meta
.performance_counter
.confirmations_per_seconds
.load(Ordering::Acquire);
let procinfo::pid::Statm { size, .. } = procinfo::pid::statm_self().unwrap();
let procinfo::pid::Stat { num_threads, .. } = procinfo::pid::stat_self().unwrap();
Ok(RpcPerformanceCounterResults {
confirmations_per_seconds,
transactions_per_seconds,
total_confirmations_count,
total_transactions_count,
memory_used: size as u64,
nb_threads: num_threads as u64,
})
}
}
}