Initial commit, moving the project from solana to an independent repository.

This commit is contained in:
Godmode Galactus 2022-11-12 14:32:01 +01:00
parent 5db3fb1715
commit 48bef7e551
No known key found for this signature in database
9 changed files with 6992 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "deps/solana"]
path = deps/solana
url = https://github.com/blockworks-foundation/solana.git

6273
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

69
Cargo.toml Normal file
View File

@ -0,0 +1,69 @@
[package]
name = "lite-rpc"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
solana-client={path = "deps/solana/client"}
solana-sdk={path = "deps/solana/sdk"}
tokio = { version = "1.14.1", features = ["full"]}
futures = "0.3.25"
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0" }
jsonrpc-derive = "18.0.0"
jsonrpc-http-server = "18.0.0"
jsonrpc-pubsub = "18.0.0"
solana-rpc = {path = "deps/solana/rpc"}
clap = "2.33.1"
solana-clap-utils = { path = "deps/solana/clap-utils" }
solana-cli-config = { path = "deps/solana/cli-config" }
solana-pubsub-client = { path = "deps/solana/pubsub-client" }
base64 = "0.13.0"
bincode = "1.3.3"
bs58 = "0.4.0"
crossbeam-channel = "0.5"
dashmap = "4.0.2"
itertools = "0.10.5"
libc = "0.2.131"
log = "0.4.17"
rayon = "1.5.3"
regex = "1.6.0"
serde = "1.0.144"
serde_derive = "1.0.103"
serde_json = "1.0.83"
soketto = "0.7"
solana-account-decoder = { path = "deps/solana/account-decoder", version = "=1.15.0" }
solana-entry = { path = "deps/solana/entry", version = "=1.15.0" }
solana-faucet = { path = "deps/solana/faucet", version = "=1.15.0" }
solana-gossip = { path = "deps/solana/gossip", version = "=1.15.0" }
solana-ledger = { path = "deps/solana/ledger", version = "=1.15.0" }
solana-measure = { path = "deps/solana/measure", version = "=1.15.0" }
solana-metrics = { path = "deps/solana/metrics", version = "=1.15.0" }
solana-perf = { path = "deps/solana/perf", version = "=1.15.0" }
solana-poh = { path = "deps/solana/poh", version = "=1.15.0" }
solana-rayon-threadlimit = { path = "deps/solana/rayon-threadlimit", version = "=1.15.0" }
solana-rpc-client-api = { path = "deps/solana/rpc-client-api", version = "=1.15.0" }
solana-runtime = { path = "deps/solana/runtime", version = "=1.15.0" }
solana-send-transaction-service = { path = "deps/solana/send-transaction-service", version = "=1.15.0" }
solana-stake-program = { path = "deps/solana/programs/stake", version = "=1.15.0" }
solana-storage-bigtable = { path = "deps/solana/storage-bigtable", version = "=1.15.0" }
solana-streamer = { path = "deps/solana/streamer", version = "=1.15.0" }
solana-tpu-client = { path = "deps/solana/tpu-client", version = "=1.15.0", default-features = false }
solana-transaction-status = { path = "deps/solana/transaction-status", version = "=1.15.0" }
solana-version = { path = "deps/solana/version", version = "=1.15.0" }
solana-vote-program = { path = "deps/solana/programs/vote", version = "=1.15.0" }
spl-token = { version = "=3.5.0", features = ["no-entrypoint"] }
spl-token-2022 = { version = "=0.4.3", features = ["no-entrypoint"] }
stream-cancel = "0.8.1"
thiserror = "1.0"
tokio-util = { version = "0.6", features = ["codec", "compat"] }
[dev-dependencies]
csv = "1.1.6"
serde = { version = "1", features = ["derive"]}
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

1
deps/solana vendored Submodule

@ -0,0 +1 @@
Subproject commit ed2f4b21577719b3068ab315c2f4504007225bf0

108
src/cli.rs Normal file
View File

@ -0,0 +1,108 @@
use {
clap::{App, Arg, ArgMatches},
solana_clap_utils::input_validators::{is_url, is_url_or_moniker},
solana_cli_config::{ConfigInput},
std::{net::SocketAddr},
};
/// Holds the configuration for a single run of the benchmark
pub struct Config {
pub rpc_addr: SocketAddr,
pub subscription_port: SocketAddr,
pub json_rpc_url: String,
pub websocket_url: String,
}
impl Default for Config {
fn default() -> Config {
Config {
rpc_addr: SocketAddr::from(([127, 0, 0, 1], 8899)),
json_rpc_url: ConfigInput::default().json_rpc_url,
websocket_url: ConfigInput::default().websocket_url,
subscription_port : SocketAddr::from(([127, 0, 0, 1], 8900)),
}
}
}
/// Defines and builds the CLI args for a run of the benchmark
pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
App::new("lite rpc")
.about("a lite version of solana rpc to send and confirm transactions")
.version(version)
.arg(
Arg::with_name("json_rpc_url")
.short("u")
.long("url")
.value_name("URL_OR_MONIKER")
.takes_value(true)
.global(true)
.validator(is_url_or_moniker)
.help(
"URL for Solana's JSON RPC or moniker (or their first letter): \
[mainnet-beta, testnet, devnet, localhost]",
),
)
.arg(
Arg::with_name("websocket_url")
.long("ws")
.value_name("URL")
.takes_value(true)
.global(true)
.validator(is_url)
.help("WebSocket URL for the solana cluster"),
)
.arg(
Arg::with_name("port")
.long("port")
.short("p")
.takes_value(true)
.global(true)
.min_values(1025)
.help("Port on which which lite rpc will listen to rpc requests"),
)
.arg(
Arg::with_name("subscription_port")
.long("sub_port")
.short("sp")
.takes_value(true)
.global(true)
.min_values(1025)
.help("subscription port on which which lite rpc will use to create subscriptions")
)
}
pub fn extract_args(matches: &ArgMatches) -> Config {
let mut args = Config::default();
let config = if let Some(config_file) = matches.value_of("config_file") {
solana_cli_config::Config::load(config_file).unwrap_or_default()
} else {
solana_cli_config::Config::default()
};
let (_, json_rpc_url) = ConfigInput::compute_json_rpc_url_setting(
matches.value_of("json_rpc_url").unwrap_or(""),
&config.json_rpc_url,
);
args.json_rpc_url = json_rpc_url;
let (_, websocket_url) = ConfigInput::compute_websocket_url_setting(
matches.value_of("websocket_url").unwrap_or(""),
&config.websocket_url,
matches.value_of("json_rpc_url").unwrap_or(""),
&config.json_rpc_url,
);
args.websocket_url = websocket_url;
if let Some(port) = matches.value_of("port") {
let port: u16 = port.parse().expect("can't parse port");
args.rpc_addr = SocketAddr::from(([127, 0, 0, 1], port));
}
if let Some(port) = matches.value_of("subsription_port") {
let port: u16 = port.parse().expect("can't parse subsription_port");
args.subscription_port = SocketAddr::from(([127, 0, 0, 1], port));
} else {
let port = args.rpc_addr.port().saturating_add(1);
args.subscription_port =SocketAddr::from(([127, 0, 0, 1], port));
}
args
}

49
src/context.rs Normal file
View File

@ -0,0 +1,49 @@
use std::{sync::{atomic::AtomicU64, RwLock, Arc}, collections::HashMap};
use solana_client::rpc_client::RpcClient;
use solana_sdk::commitment_config::{CommitmentLevel, CommitmentConfig};
pub struct BlockInformation {
pub block_hash: RwLock<String>,
pub block_height: AtomicU64,
pub slot: AtomicU64,
pub confirmation_level: CommitmentLevel,
}
impl BlockInformation {
pub fn new(rpc_client: Arc<RpcClient>, commitment: CommitmentLevel) -> Self {
let slot = rpc_client
.get_slot_with_commitment(CommitmentConfig {
commitment: commitment,
})
.unwrap();
let (blockhash, blockheight) = rpc_client.get_latest_blockhash_with_commitment(CommitmentConfig { commitment }).unwrap();
BlockInformation{
block_hash: RwLock::new(blockhash.to_string()),
block_height: AtomicU64::new(blockheight),
slot: AtomicU64::new(slot),
confirmation_level : commitment,
}
}
}
pub struct LiteRpcContext {
pub signature_status: RwLock<HashMap<String, Option<CommitmentLevel>>>,
pub finalized_block_info: BlockInformation,
pub confirmed_block_info: BlockInformation,
}
impl LiteRpcContext {
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
LiteRpcContext {
signature_status : RwLock::new(HashMap::new()),
confirmed_block_info : BlockInformation::new(rpc_client.clone(), CommitmentLevel::Confirmed),
finalized_block_info : BlockInformation::new(rpc_client.clone(), CommitmentLevel::Finalized)
}
}
}

57
src/main.rs Normal file
View File

@ -0,0 +1,57 @@
use std::sync::Arc;
use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::{ServerBuilder, DomainsValidation, hyper, AccessControlAllowOrigin};
use solana_perf::thread::renice_this_thread;
use crate::rpc::{lite_rpc::{self, Lite}, LightRpcRequestProcessor};
mod cli;
mod rpc;
mod context;
pub fn main() {
let matches = cli::build_args(solana_version::version!()).get_matches();
let cli_config = cli::extract_args(&matches);
let cli::Config {
json_rpc_url,
websocket_url,
rpc_addr,
subscription_port,
..
} = &cli_config;
let mut io = MetaIoHandler::default();
let lite_rpc = lite_rpc::LightRpc;
io.extend_with(lite_rpc.to_delegate());
let request_processor =
LightRpcRequestProcessor::new(json_rpc_url, websocket_url);
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.on_thread_start(move || renice_this_thread(0).unwrap())
.thread_name("solLiteRpcProcessor")
.enable_all()
.build()
.expect("Runtime"),
);
let max_request_body_size: usize = 50 * (1 << 10);
let socket_addr = rpc_addr.clone();
let server =
ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request<hyper::Body>| {
request_processor.clone()
})
.event_loop_executor(runtime.handle().clone())
.threads(1)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,
]))
.cors_max_age(86400)
.max_request_body_size(max_request_body_size)
.start_http(&socket_addr);
println!("Starting Lite RPC node");
server.unwrap().wait();
}

431
src/rpc.rs Normal file
View File

@ -0,0 +1,431 @@
use solana_client::{
pubsub_client::{BlockSubscription, PubsubClientError},
tpu_client::TpuClientConfig,
};
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
use std::thread::{Builder, JoinHandle};
use {
bincode::config::Options,
crossbeam_channel::Receiver,
jsonrpc_core::{Error, Metadata, Result},
jsonrpc_derive::rpc,
solana_client::{rpc_client::RpcClient, tpu_client::TpuClient},
solana_perf::packet::PACKET_DATA_SIZE,
solana_rpc_client_api::{
config::*,
response::{Response as RpcResponse, *},
},
solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
transaction::VersionedTransaction,
},
solana_tpu_client::connection_cache::ConnectionCache,
solana_transaction_status::{TransactionBinaryEncoding, UiTransactionEncoding},
std::{
any::type_name,
collections::HashMap,
sync::{
atomic::{Ordering},
Arc, RwLock,
},
},
};
use crate::context::{LiteRpcContext, BlockInformation};
#[derive(Clone)]
pub struct LightRpcRequestProcessor {
pub rpc_client: Arc<RpcClient>,
pub tpu_client: Arc<TpuClient>,
pub last_valid_block_height: u64,
pub ws_url: String,
pub context: Arc<LiteRpcContext>,
_connection_cache: Arc<ConnectionCache>,
_joinables: Arc<Vec<JoinHandle<()>>>,
_subscribed_clients: Arc<Vec<PubsubBlockClientSubscription>>,
}
impl LightRpcRequestProcessor {
pub fn new(
json_rpc_url: &String,
websocket_url: &String,
) -> LightRpcRequestProcessor {
let rpc_client = Arc::new(RpcClient::new(json_rpc_url.as_str()));
let connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = Arc::new(
TpuClient::new_with_connection_cache(
rpc_client.clone(),
websocket_url.as_str(),
TpuClientConfig::default(),
connection_cache.clone(),
)
.unwrap(),
);
let context = Arc::new(LiteRpcContext::new(rpc_client.clone()));
// subscribe for confirmed_blocks
let (client_confirmed, receiver_confirmed) =
Self::subscribe_block(websocket_url, CommitmentLevel::Confirmed).unwrap();
// subscribe for finalized blocks
let (client_finalized, receiver_finalized) =
Self::subscribe_block(websocket_url, CommitmentLevel::Finalized).unwrap();
// create threads to listen for finalized and confrimed blocks
let joinables = vec![
Self::build_thread_to_process_blocks(receiver_confirmed, &context, CommitmentLevel::Confirmed),
Self::build_thread_to_process_blocks(receiver_finalized, &context, CommitmentLevel::Finalized),
];
LightRpcRequestProcessor {
rpc_client,
tpu_client,
last_valid_block_height: 0,
ws_url: websocket_url.clone(),
context: context,
_connection_cache: connection_cache,
_joinables: Arc::new(joinables),
_subscribed_clients : Arc::new(vec![client_confirmed, client_finalized]),
}
}
fn subscribe_block(
websocket_url: &String,
commitment: CommitmentLevel,
) -> std::result::Result<BlockSubscription, PubsubClientError> {
PubsubClient::block_subscribe(
websocket_url.as_str(),
RpcBlockSubscribeFilter::All,
Some(RpcBlockSubscribeConfig {
commitment: Some(CommitmentConfig {
commitment: commitment,
}),
encoding: None,
transaction_details: Some(
solana_transaction_status::TransactionDetails::Signatures,
),
show_rewards: None,
max_supported_transaction_version: None,
}),
)
}
fn build_thread_to_process_blocks(reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
context : &Arc<LiteRpcContext>,
commitment : CommitmentLevel,) -> JoinHandle<()>{
let context = context.clone();
Builder::new()
.name("thread working on confirmation block".to_string())
.spawn(move || {
let block_info = if commitment.eq(&CommitmentLevel::Finalized) {&context.confirmed_block_info} else {&context.finalized_block_info};
Self::process_block(
reciever,
&context.signature_status,
commitment,
block_info,
);
})
.unwrap()
}
fn process_block(
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
signature_status: &RwLock<HashMap<String, Option<CommitmentLevel>>>,
commitment: CommitmentLevel,
block_information: &BlockInformation,
) {
println!("processing blocks for {}", commitment.to_string());
loop {
let block_data = reciever.recv();
match block_data {
Ok(data) => {
let block_update = &data.value;
block_information.slot.store(block_update.slot, Ordering::Relaxed);
if let Some(block) = &block_update.block {
block_information.block_height.store(block.block_height.unwrap(), Ordering::Relaxed);
// context to update blockhash
{
let mut lock = block_information.block_hash.write().unwrap();
*lock = block.blockhash.clone();
}
if let Some(signatures) = &block.signatures {
let mut lock = signature_status.write().unwrap();
for signature in signatures {
if lock.contains_key(signature) {
println!(
"found signature {} for commitment {}",
signature, commitment
);
lock.insert(signature.clone(), Some(commitment));
}
}
} else {
println!(
"Cannot get signatures at slot {} block hash {}",
block_update.slot, block.blockhash,
);
}
} else {
println!("Cannot get a block at slot {}", block_update.slot);
}
}
Err(e) => {
println!("Got error when recieving the block ({})", e.to_string());
}
}
}
}
}
impl Metadata for LightRpcRequestProcessor {}
pub mod lite_rpc {
use std::str::FromStr;
use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey};
use super::*;
#[rpc]
pub trait Lite {
type Metadata;
#[rpc(meta, name = "sendTransaction")]
fn send_transaction(
&self,
meta: Self::Metadata,
data: String,
config: Option<RpcSendTransactionConfig>,
) -> Result<String>;
#[rpc(meta, name = "getRecentBlockhash")]
fn get_recent_blockhash(
&self,
meta: Self::Metadata,
commitment: Option<CommitmentConfig>,
) -> Result<RpcResponse<RpcBlockhashFeeCalculator>>;
#[rpc(meta, name = "confirmTransaction")]
fn confirm_transaction(
&self,
meta: Self::Metadata,
signature_str: String,
commitment: Option<CommitmentConfig>,
) -> Result<RpcResponse<bool>>;
#[rpc(meta, name = "requestAirdrop")]
fn request_airdrop(
&self,
meta: Self::Metadata,
pubkey_str: String,
lamports: u64,
config: Option<RpcRequestAirdropConfig>,
) -> Result<String>;
}
pub struct LightRpc;
impl Lite for LightRpc {
type Metadata = LightRpcRequestProcessor;
fn send_transaction(
&self,
meta: Self::Metadata,
data: String,
config: Option<RpcSendTransactionConfig>,
) -> Result<String> {
let config = config.unwrap_or_default();
let encoding = config.encoding;
let tx_encoding = encoding.unwrap_or(UiTransactionEncoding::Base58);
let binary_encoding = tx_encoding.into_binary_encoding().ok_or_else(|| {
Error::invalid_params(format!(
"unsupported encoding: {}. Supported encodings: base58, base64",
tx_encoding
))
})?;
let (wire_transaction, transaction) =
decode_and_deserialize::<VersionedTransaction>(data.clone(), binary_encoding)?;
{
let mut lock = meta.context.signature_status.write().unwrap();
lock.insert(transaction.signatures[0].to_string(), None);
println!("added {} to map", transaction.signatures[0].to_string());
}
meta.tpu_client
.send_wire_transaction(wire_transaction.clone());
Ok(transaction.signatures[0].to_string())
}
fn get_recent_blockhash(
&self,
meta: Self::Metadata,
commitment: Option<CommitmentConfig>,
) -> Result<RpcResponse<RpcBlockhashFeeCalculator>> {
let commitment = match commitment {
Some(x) => x.commitment,
None => CommitmentLevel::Confirmed,
};
let (block_hash, slot) = match commitment {
CommitmentLevel::Finalized => {
let slot = meta.context.finalized_block_info.slot.load(Ordering::Relaxed);
let lock = meta.context.finalized_block_info.block_hash.read().unwrap();
(lock.clone(), slot)
},
_ => {
let slot = meta.context.confirmed_block_info.slot.load(Ordering::Relaxed);
let lock = meta.context.confirmed_block_info.block_hash.read().unwrap();
(lock.clone(), slot)
}
};
Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: RpcBlockhashFeeCalculator {
blockhash: block_hash,
fee_calculator: FeeCalculator::default(),
},
})
}
fn confirm_transaction(
&self,
meta: Self::Metadata,
signature_str: String,
commitment_cfg: Option<CommitmentConfig>,
) -> Result<RpcResponse<bool>> {
let lock = meta.context.signature_status.read().unwrap();
let k_value = lock.get_key_value(&signature_str);
let commitment = match commitment_cfg {
Some(x) => x.commitment,
None => CommitmentLevel::Confirmed,
};
let slot = if commitment.eq(&CommitmentLevel::Finalized) {
meta.context.finalized_block_info.slot.load(Ordering::Relaxed)
} else {
meta.context.confirmed_block_info.slot.load(Ordering::Relaxed)
};
match k_value {
Some(value) => match value.1 {
Some(commitment_for_signature) => {
println!("found in cache");
return Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: if commitment.eq(&CommitmentLevel::Finalized) {
commitment_for_signature.eq(&CommitmentLevel::Finalized)
} else {
commitment_for_signature.eq(&CommitmentLevel::Finalized)
|| commitment_for_signature.eq(&CommitmentLevel::Confirmed)
},
});
}
None => {
return Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: false,
})
}
},
None => {
let signature = Signature::from_str(signature_str.as_str()).unwrap();
let ans = match commitment_cfg {
None => meta.rpc_client.confirm_transaction(&signature).unwrap(),
Some(cfg) => {
meta.rpc_client
.confirm_transaction_with_commitment(&signature, cfg)
.unwrap()
.value
}
};
return Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: ans,
});
}
};
}
fn request_airdrop(
&self,
meta: Self::Metadata,
pubkey_str: String,
lamports: u64,
config: Option<RpcRequestAirdropConfig>,
) -> Result<String> {
let pubkey = Pubkey::from_str(pubkey_str.as_str()).unwrap();
let signature = match config {
Some(c) => meta.rpc_client.request_airdrop_with_config(&pubkey, lamports, c),
None => meta.rpc_client.request_airdrop(&pubkey, lamports)
};
Ok(signature.unwrap().to_string())
}
}
}
const MAX_BASE58_SIZE: usize = 1683; // Golden, bump if PACKET_DATA_SIZE changes
const MAX_BASE64_SIZE: usize = 1644; // Golden, bump if PACKET_DATA_SIZE changes
fn decode_and_deserialize<T>(
encoded: String,
encoding: TransactionBinaryEncoding,
) -> Result<(Vec<u8>, T)>
where
T: serde::de::DeserializeOwned,
{
let wire_output = match encoding {
TransactionBinaryEncoding::Base58 => {
if encoded.len() > MAX_BASE58_SIZE {
return Err(Error::invalid_params(format!(
"base58 encoded {} too large: {} bytes (max: encoded/raw {}/{})",
type_name::<T>(),
encoded.len(),
MAX_BASE58_SIZE,
PACKET_DATA_SIZE,
)));
}
bs58::decode(encoded)
.into_vec()
.map_err(|e| Error::invalid_params(format!("invalid base58 encoding: {:?}", e)))?
}
TransactionBinaryEncoding::Base64 => {
if encoded.len() > MAX_BASE64_SIZE {
return Err(Error::invalid_params(format!(
"base64 encoded {} too large: {} bytes (max: encoded/raw {}/{})",
type_name::<T>(),
encoded.len(),
MAX_BASE64_SIZE,
PACKET_DATA_SIZE,
)));
}
base64::decode(encoded)
.map_err(|e| Error::invalid_params(format!("invalid base64 encoding: {:?}", e)))?
}
};
if wire_output.len() > PACKET_DATA_SIZE {
return Err(Error::invalid_params(format!(
"decoded {} too large: {} bytes (max: {} bytes)",
type_name::<T>(),
wire_output.len(),
PACKET_DATA_SIZE
)));
}
bincode::options()
.with_limit(PACKET_DATA_SIZE as u64)
.with_fixint_encoding()
.allow_trailing_bytes()
.deserialize_from(&wire_output[..])
.map_err(|err| {
Error::invalid_params(format!(
"failed to deserialize {}: {}",
type_name::<T>(),
&err.to_string()
))
})
.map(|output| (wire_output, output))
}