Merge pull request #2 from aniketfuryrocks/cargo_fix

cargo fix and fmt
This commit is contained in:
galactus 2022-11-14 09:30:04 +00:00 committed by GitHub
commit b19e4bdd62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 90 additions and 64 deletions

View File

@ -1,8 +1,8 @@
use {
clap::{App, Arg, ArgMatches},
solana_clap_utils::input_validators::{is_url, is_url_or_moniker},
solana_cli_config::{ConfigInput},
std::{net::SocketAddr},
solana_cli_config::ConfigInput,
std::net::SocketAddr,
};
/// Holds the configuration for a single run of the benchmark
@ -19,7 +19,7 @@ impl Default for Config {
rpc_addr: SocketAddr::from(([127, 0, 0, 1], 8899)),
json_rpc_url: ConfigInput::default().json_rpc_url,
websocket_url: ConfigInput::default().websocket_url,
subscription_port : SocketAddr::from(([127, 0, 0, 1], 8900)),
subscription_port: SocketAddr::from(([127, 0, 0, 1], 8900)),
}
}
}
@ -67,7 +67,7 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
.takes_value(true)
.global(true)
.min_values(1025)
.help("subscription port on which which lite rpc will use to create subscriptions")
.help("subscription port on which which lite rpc will use to create subscriptions"),
)
}
@ -102,7 +102,7 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
args.subscription_port = SocketAddr::from(([127, 0, 0, 1], port));
} else {
let port = args.rpc_addr.port().saturating_add(1);
args.subscription_port =SocketAddr::from(([127, 0, 0, 1], port));
args.subscription_port = SocketAddr::from(([127, 0, 0, 1], port));
}
args
}

View File

@ -1,8 +1,9 @@
use std::{sync::{atomic::AtomicU64, RwLock, Arc}, collections::HashMap};
use solana_client::rpc_client::RpcClient;
use solana_sdk::commitment_config::{CommitmentLevel, CommitmentConfig};
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc, RwLock},
};
pub struct BlockInformation {
pub block_hash: RwLock<String>,
@ -13,20 +14,21 @@ pub struct BlockInformation {
impl BlockInformation {
pub fn new(rpc_client: Arc<RpcClient>, commitment: CommitmentLevel) -> Self {
let slot = rpc_client
.get_slot_with_commitment(CommitmentConfig {
commitment: commitment,
})
.unwrap();
let (blockhash, blockheight) = rpc_client.get_latest_blockhash_with_commitment(CommitmentConfig { commitment }).unwrap();
let (blockhash, blockheight) = rpc_client
.get_latest_blockhash_with_commitment(CommitmentConfig { commitment })
.unwrap();
BlockInformation{
BlockInformation {
block_hash: RwLock::new(blockhash.to_string()),
block_height: AtomicU64::new(blockheight),
slot: AtomicU64::new(slot),
confirmation_level : commitment,
confirmation_level: commitment,
}
}
}
@ -39,11 +41,16 @@ pub struct LiteRpcContext {
impl LiteRpcContext {
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
LiteRpcContext {
signature_status : RwLock::new(HashMap::new()),
confirmed_block_info : BlockInformation::new(rpc_client.clone(), CommitmentLevel::Confirmed),
finalized_block_info : BlockInformation::new(rpc_client.clone(), CommitmentLevel::Finalized)
signature_status: RwLock::new(HashMap::new()),
confirmed_block_info: BlockInformation::new(
rpc_client.clone(),
CommitmentLevel::Confirmed,
),
finalized_block_info: BlockInformation::new(
rpc_client.clone(),
CommitmentLevel::Finalized,
),
}
}
}

View File

@ -1,13 +1,16 @@
use std::sync::Arc;
use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::{ServerBuilder, DomainsValidation, hyper, AccessControlAllowOrigin};
use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder};
use solana_perf::thread::renice_this_thread;
use crate::rpc::{lite_rpc::{self, Lite}, LightRpcRequestProcessor};
use crate::rpc::{
lite_rpc::{self, Lite},
LightRpcRequestProcessor,
};
mod cli;
mod rpc;
mod context;
mod rpc;
pub fn main() {
let matches = cli::build_args(solana_version::version!()).get_matches();
@ -17,7 +20,6 @@ pub fn main() {
json_rpc_url,
websocket_url,
rpc_addr,
subscription_port,
..
} = &cli_config;
@ -25,8 +27,7 @@ pub fn main() {
let lite_rpc = lite_rpc::LightRpc;
io.extend_with(lite_rpc.to_delegate());
let request_processor =
LightRpcRequestProcessor::new(json_rpc_url, websocket_url);
let request_processor = LightRpcRequestProcessor::new(json_rpc_url, websocket_url);
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
@ -54,4 +55,4 @@ pub fn main() {
.start_http(&socket_addr);
println!("Starting Lite RPC node");
server.unwrap().wait();
}
}

View File

@ -1,5 +1,3 @@
use solana_client::{
pubsub_client::{BlockSubscription, PubsubClientError},
tpu_client::TpuClientConfig,
@ -7,6 +5,7 @@ use solana_client::{
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
use std::thread::{Builder, JoinHandle};
use crate::context::{BlockInformation, LiteRpcContext};
use {
bincode::config::Options,
crossbeam_channel::Receiver,
@ -28,13 +27,9 @@ use {
std::{
any::type_name,
collections::HashMap,
sync::{
atomic::{Ordering},
Arc, RwLock,
},
sync::{atomic::Ordering, Arc, RwLock},
},
};
use crate::context::{LiteRpcContext, BlockInformation};
#[derive(Clone)]
pub struct LightRpcRequestProcessor {
@ -49,10 +44,7 @@ pub struct LightRpcRequestProcessor {
}
impl LightRpcRequestProcessor {
pub fn new(
json_rpc_url: &String,
websocket_url: &String,
) -> LightRpcRequestProcessor {
pub fn new(json_rpc_url: &String, websocket_url: &String) -> LightRpcRequestProcessor {
let rpc_client = Arc::new(RpcClient::new(json_rpc_url.as_str()));
let connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = Arc::new(
@ -74,12 +66,20 @@ impl LightRpcRequestProcessor {
// subscribe for finalized blocks
let (client_finalized, receiver_finalized) =
Self::subscribe_block(websocket_url, CommitmentLevel::Finalized).unwrap();
// create threads to listen for finalized and confrimed blocks
let joinables = vec![
Self::build_thread_to_process_blocks(receiver_confirmed, &context, CommitmentLevel::Confirmed),
Self::build_thread_to_process_blocks(receiver_finalized, &context, CommitmentLevel::Finalized),
];
Self::build_thread_to_process_blocks(
receiver_confirmed,
&context,
CommitmentLevel::Confirmed,
),
Self::build_thread_to_process_blocks(
receiver_finalized,
&context,
CommitmentLevel::Finalized,
),
];
LightRpcRequestProcessor {
rpc_client,
@ -89,7 +89,7 @@ impl LightRpcRequestProcessor {
context: context,
_connection_cache: connection_cache,
_joinables: Arc::new(joinables),
_subscribed_clients : Arc::new(vec![client_confirmed, client_finalized]),
_subscribed_clients: Arc::new(vec![client_confirmed, client_finalized]),
}
}
@ -114,20 +114,21 @@ impl LightRpcRequestProcessor {
)
}
fn build_thread_to_process_blocks(reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
context : &Arc<LiteRpcContext>,
commitment : CommitmentLevel,) -> JoinHandle<()>{
fn build_thread_to_process_blocks(
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
context: &Arc<LiteRpcContext>,
commitment: CommitmentLevel,
) -> JoinHandle<()> {
let context = context.clone();
Builder::new()
.name("thread working on confirmation block".to_string())
.spawn(move || {
let block_info = if commitment.eq(&CommitmentLevel::Finalized) {&context.confirmed_block_info} else {&context.finalized_block_info};
Self::process_block(
reciever,
&context.signature_status,
commitment,
block_info,
);
let block_info = if commitment.eq(&CommitmentLevel::Finalized) {
&context.confirmed_block_info
} else {
&context.finalized_block_info
};
Self::process_block(reciever, &context.signature_status, commitment, block_info);
})
.unwrap()
}
@ -145,11 +146,14 @@ impl LightRpcRequestProcessor {
match block_data {
Ok(data) => {
let block_update = &data.value;
block_information.slot.store(block_update.slot, Ordering::Relaxed);
if let Some(block) = &block_update.block {
block_information
.slot
.store(block_update.slot, Ordering::Relaxed);
block_information.block_height.store(block.block_height.unwrap(), Ordering::Relaxed);
if let Some(block) = &block_update.block {
block_information
.block_height
.store(block.block_height.unwrap(), Ordering::Relaxed);
// context to update blockhash
{
let mut lock = block_information.block_hash.write().unwrap();
@ -227,7 +231,6 @@ pub mod lite_rpc {
lamports: u64,
config: Option<RpcRequestAirdropConfig>,
) -> Result<String>;
}
pub struct LightRpc;
impl Lite for LightRpc {
@ -271,14 +274,21 @@ pub mod lite_rpc {
None => CommitmentLevel::Confirmed,
};
let (block_hash, slot) = match commitment {
CommitmentLevel::Finalized => {
let slot = meta.context.finalized_block_info.slot.load(Ordering::Relaxed);
CommitmentLevel::Finalized => {
let slot = meta
.context
.finalized_block_info
.slot
.load(Ordering::Relaxed);
let lock = meta.context.finalized_block_info.block_hash.read().unwrap();
(lock.clone(), slot)
},
}
_ => {
let slot = meta.context.confirmed_block_info.slot.load(Ordering::Relaxed);
let slot = meta
.context
.confirmed_block_info
.slot
.load(Ordering::Relaxed);
let lock = meta.context.confirmed_block_info.block_hash.read().unwrap();
(lock.clone(), slot)
}
@ -307,9 +317,15 @@ pub mod lite_rpc {
};
let slot = if commitment.eq(&CommitmentLevel::Finalized) {
meta.context.finalized_block_info.slot.load(Ordering::Relaxed)
meta.context
.finalized_block_info
.slot
.load(Ordering::Relaxed)
} else {
meta.context.confirmed_block_info.slot.load(Ordering::Relaxed)
meta.context
.confirmed_block_info
.slot
.load(Ordering::Relaxed)
};
match k_value {
@ -361,8 +377,10 @@ pub mod lite_rpc {
) -> Result<String> {
let pubkey = Pubkey::from_str(pubkey_str.as_str()).unwrap();
let signature = match config {
Some(c) => meta.rpc_client.request_airdrop_with_config(&pubkey, lamports, c),
None => meta.rpc_client.request_airdrop(&pubkey, lamports)
Some(c) => meta
.rpc_client
.request_airdrop_with_config(&pubkey, lamports, c),
None => meta.rpc_client.request_airdrop(&pubkey, lamports),
};
Ok(signature.unwrap().to_string())
}