Merge pull request #14 from blockworks-foundation/adding_ws_subscriptions

Adding ws subscriptions
This commit is contained in:
galactus 2022-11-30 16:58:21 +01:00 committed by GitHub
commit cf4beed0f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1053 additions and 265 deletions

491
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,32 +6,32 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
solana-client = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-sdk = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-clap-utils = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-cli-config = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-pubsub-client = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-account-decoder = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-entry = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-faucet = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-gossip = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-ledger = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-measure = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-metrics = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-perf = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-poh = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-rayon-threadlimit = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-rpc-client-api = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-runtime = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-send-transaction-service = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-stake-program = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-storage-bigtable = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-streamer = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-tpu-client = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-transaction-status = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-version = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-vote-program = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-rpc = { git = "https://github.com/blockworks-foundation/solana.git" }
solana-client = { git = "https://github.com/solana-labs/solana.git" }
solana-sdk = { git = "https://github.com/solana-labs/solana.git" }
solana-clap-utils = { git = "https://github.com/solana-labs/solana.git" }
solana-cli-config = { git = "https://github.com/solana-labs/solana.git" }
solana-pubsub-client = { git = "https://github.com/solana-labs/solana.git" }
solana-account-decoder = { git = "https://github.com/solana-labs/solana.git" }
solana-entry = { git = "https://github.com/solana-labs/solana.git" }
solana-faucet = { git = "https://github.com/solana-labs/solana.git" }
solana-gossip = { git = "https://github.com/solana-labs/solana.git" }
solana-ledger = { git = "https://github.com/solana-labs/solana.git" }
solana-measure = { git = "https://github.com/solana-labs/solana.git" }
solana-metrics = { git = "https://github.com/solana-labs/solana.git" }
solana-perf = { git = "https://github.com/solana-labs/solana.git" }
solana-poh = { git = "https://github.com/solana-labs/solana.git" }
solana-rayon-threadlimit = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc-client-api = { git = "https://github.com/solana-labs/solana.git" }
solana-runtime = { git = "https://github.com/solana-labs/solana.git" }
solana-send-transaction-service = { git = "https://github.com/solana-labs/solana.git" }
solana-stake-program = { git = "https://github.com/solana-labs/solana.git" }
solana-storage-bigtable = { git = "https://github.com/solana-labs/solana.git" }
solana-streamer = { git = "https://github.com/solana-labs/solana.git" }
solana-tpu-client = { git = "https://github.com/solana-labs/solana.git" }
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git" }
solana-version = { git = "https://github.com/solana-labs/solana.git" }
solana-vote-program = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc = { git = "https://github.com/solana-labs/solana.git" }
tokio = { version = "1.14.1", features = ["full"]}
futures = "0.3.25"
@ -40,7 +40,7 @@ jsonrpc-core-client = { version = "18.0.0" }
jsonrpc-derive = "18.0.0"
jsonrpc-http-server = "18.0.0"
jsonrpc-pubsub = "18.0.0"
clap = "2.33.1"
clap = { version = "4.0.29", features = ["derive"] }
base64 = "0.13.0"
bincode = "1.3.3"

View File

@ -5,6 +5,7 @@ The lite-rpc server will not have any ledger or banks.
While sending transaction the lite rpc server will send the transaction to next few leader (FANOUT) and then use different strategies to confirm the transaction.
The rpc server will also have a websocket port which is reponsible for just subscribing to slots and signatures.
The lite rpc server will be optimized to confirm transactions which are forwarded to leader using the same server.
This project is currently based on an unstable feature of block subscription of solana which can be enabled using `--rpc-pubsub-enable-block-subscription` while running rpc node.
### Confirmation strategies
1) Subscribing to blocks changes and checking the confirmations. (Under development)
@ -15,12 +16,12 @@ The lite rpc server will be optimized to confirm transactions which are forwarde
`cargo build`
## Run
For RPC node : `http://localhost:8899`
Websocket : `http://localhost:8900` (Optional)
Port : `9000` Listening port for LiteRpc server
Subscription Port : `9001` Listening port of websocket subscriptions for LiteRpc server
* For RPC node : `http://localhost:8899`,
* Websocket : `http://localhost:8900` (Optional),
* Port : `9000` Listening port for LiteRpc server,
* Subscription Port : `9001` Listening port of websocket subscriptions for LiteRpc server,
```
cargo run --bin lite-rpc --url http://localhost:8899 --websocket_url http://localhost:8900 --port 9000 --subscription_port 9001
```
cargo run --bin lite-rpc -- --port 9000 --subscription-port 9001 --url http://localhost:8899
```

View File

@ -1,108 +1,43 @@
use {
clap::{App, Arg, ArgMatches},
solana_clap_utils::input_validators::{is_url, is_url_or_moniker},
solana_cli_config::ConfigInput,
std::net::SocketAddr,
};
use {clap::Parser, solana_cli_config::ConfigInput, std::net::SocketAddr};
/// Holds the configuration for a single run of the benchmark
pub struct Config {
pub rpc_addr: SocketAddr,
#[derive(Parser, Debug)]
#[command(
version,
about = "A lite version of solana rpc to send and confirm transactions.",
long_about = "Lite rpc is optimized to send and confirm transactions for solana blockchain. \
When it recieves a transaction it will directly send it to next few leaders. It then adds the signature into internal map. It listen to block subscriptions for confirmed and finalized blocks. \
It also has a websocket port for subscription to onSlotChange and onSignature subscriptions. \
"
)]
pub struct Args {
#[arg(short, long, default_value_t = SocketAddr::from(([127, 0, 0, 1], 8899)))]
pub port: SocketAddr,
#[arg(short, long, default_value_t = SocketAddr::from(([127, 0, 0, 1], 8900)))]
pub subscription_port: SocketAddr,
pub json_rpc_url: String,
#[arg(short, long, default_value_t = String::new())]
pub rpc_url: String,
#[arg(short, long, default_value_t = String::new())]
pub websocket_url: String,
}
impl Default for Config {
fn default() -> Config {
Config {
rpc_addr: SocketAddr::from(([127, 0, 0, 1], 8899)),
json_rpc_url: ConfigInput::default().json_rpc_url,
websocket_url: ConfigInput::default().websocket_url,
subscription_port: SocketAddr::from(([127, 0, 0, 1], 8900)),
impl Args {
pub fn resolve_address(&mut self) {
if self.rpc_url.is_empty() {
let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting(
self.rpc_url.as_str(),
&ConfigInput::default().json_rpc_url,
);
self.rpc_url = rpc_url;
}
if self.websocket_url.is_empty() {
let (_, ws_url) = ConfigInput::compute_websocket_url_setting(
&self.websocket_url.as_str(),
"",
self.rpc_url.as_str(),
"",
);
self.websocket_url = ws_url;
}
}
}
/// Defines and builds the CLI args for a run of the benchmark
pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
App::new("lite rpc")
.about("a lite version of solana rpc to send and confirm transactions")
.version(version)
.arg(
Arg::with_name("json_rpc_url")
.short("u")
.long("url")
.value_name("URL_OR_MONIKER")
.takes_value(true)
.global(true)
.validator(is_url_or_moniker)
.help(
"URL for Solana's JSON RPC or moniker (or their first letter): \
[mainnet-beta, testnet, devnet, localhost]",
),
)
.arg(
Arg::with_name("websocket_url")
.long("ws")
.value_name("URL")
.takes_value(true)
.global(true)
.validator(is_url)
.help("WebSocket URL for the solana cluster"),
)
.arg(
Arg::with_name("port")
.long("port")
.short("p")
.takes_value(true)
.global(true)
.min_values(1025)
.help("Port on which which lite rpc will listen to rpc requests"),
)
.arg(
Arg::with_name("subscription_port")
.long("sub_port")
.short("sp")
.takes_value(true)
.global(true)
.min_values(1025)
.help("subscription port on which which lite rpc will use to create subscriptions"),
)
}
pub fn extract_args(matches: &ArgMatches) -> Config {
let mut args = Config::default();
let config = if let Some(config_file) = matches.value_of("config_file") {
solana_cli_config::Config::load(config_file).unwrap_or_default()
} else {
solana_cli_config::Config::default()
};
let (_, json_rpc_url) = ConfigInput::compute_json_rpc_url_setting(
matches.value_of("json_rpc_url").unwrap_or(""),
&config.json_rpc_url,
);
args.json_rpc_url = json_rpc_url;
let (_, websocket_url) = ConfigInput::compute_websocket_url_setting(
matches.value_of("websocket_url").unwrap_or(""),
&config.websocket_url,
matches.value_of("json_rpc_url").unwrap_or(""),
&config.json_rpc_url,
);
args.websocket_url = websocket_url;
if let Some(port) = matches.value_of("port") {
let port: u16 = port.parse().expect("can't parse port");
args.rpc_addr = SocketAddr::from(([127, 0, 0, 1], port));
}
if let Some(port) = matches.value_of("subsription_port") {
let port: u16 = port.parse().expect("can't parse subsription_port");
args.subscription_port = SocketAddr::from(([127, 0, 0, 1], port));
} else {
let port = args.rpc_addr.port().saturating_add(1);
args.subscription_port = SocketAddr::from(([127, 0, 0, 1], port));
}
args
}

View File

@ -1,9 +1,22 @@
use solana_client::rpc_client::RpcClient;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc, RwLock},
use crossbeam_channel::Sender;
use dashmap::DashMap;
use serde::Serialize;
use solana_client::{
rpc_client::RpcClient,
rpc_response::{ProcessedSignatureResult, RpcResponseContext, RpcSignatureResult, SlotInfo},
};
use solana_rpc::rpc_subscription_tracker::{
SignatureSubscriptionParams, SubscriptionId, SubscriptionParams,
};
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
};
use std::{
sync::{atomic::AtomicU64, Arc, RwLock},
time::Instant,
};
use tokio::sync::broadcast;
pub struct BlockInformation {
pub block_hash: RwLock<String>,
@ -32,20 +45,209 @@ impl BlockInformation {
}
pub struct LiteRpcContext {
pub signature_status: RwLock<HashMap<String, Option<CommitmentLevel>>>,
pub signature_status: DashMap<String, Option<CommitmentLevel>>,
pub finalized_block_info: BlockInformation,
pub confirmed_block_info: BlockInformation,
pub notification_sender: Sender<NotificationType>,
}
impl LiteRpcContext {
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
pub fn new(rpc_client: Arc<RpcClient>, notification_sender: Sender<NotificationType>) -> Self {
LiteRpcContext {
signature_status: RwLock::new(HashMap::new()),
signature_status: DashMap::new(),
confirmed_block_info: BlockInformation::new(
rpc_client.clone(),
CommitmentLevel::Confirmed,
),
finalized_block_info: BlockInformation::new(rpc_client, CommitmentLevel::Finalized),
notification_sender,
}
}
}
pub struct SignatureNotification {
pub signature: Signature,
pub commitment: CommitmentLevel,
pub slot: u64,
pub error: Option<String>,
}
pub struct SlotNotification {
pub slot: u64,
pub commitment: CommitmentLevel,
pub parent: u64,
pub root: u64,
}
pub enum NotificationType {
Signature(SignatureNotification),
Slot(SlotNotification),
}
#[derive(Debug, Serialize)]
struct NotificationParams<T> {
result: T,
subscription: SubscriptionId,
}
#[derive(Debug, Serialize)]
struct Notification<T> {
jsonrpc: Option<jsonrpc_core::Version>,
method: &'static str,
params: NotificationParams<T>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct Response<T> {
pub context: RpcResponseContext,
pub value: T,
}
#[derive(Debug, Clone, PartialEq)]
struct RpcNotificationResponse<T> {
context: RpcNotificationContext,
value: T,
}
impl<T> From<RpcNotificationResponse<T>> for Response<T> {
fn from(notification: RpcNotificationResponse<T>) -> Self {
let RpcNotificationResponse {
context: RpcNotificationContext { slot },
value,
} = notification;
Self {
context: RpcResponseContext {
slot,
api_version: None,
},
value,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct RpcNotificationContext {
slot: u64,
}
#[derive(Debug, Clone)]
pub struct LiteRpcNotification {
pub subscription_id: SubscriptionId,
pub is_final: bool,
pub json: String,
pub created_at: Instant,
}
pub struct LiteRpcSubsrciptionControl {
pub broadcast_sender: broadcast::Sender<LiteRpcNotification>,
notification_reciever: crossbeam_channel::Receiver<NotificationType>,
pub subscriptions: DashMap<SubscriptionParams, SubscriptionId>,
pub last_subscription_id: AtomicU64,
}
impl LiteRpcSubsrciptionControl {
pub fn new(
broadcast_sender: broadcast::Sender<LiteRpcNotification>,
notification_reciever: crossbeam_channel::Receiver<NotificationType>,
) -> Self {
Self {
broadcast_sender,
notification_reciever,
subscriptions: DashMap::new(),
last_subscription_id: AtomicU64::new(2),
}
}
pub fn start_broadcasting(&self) {
loop {
let notification = self.notification_reciever.recv();
match notification {
Ok(notification_type) => {
let rpc_notification = match notification_type {
NotificationType::Signature(data) => {
println!(
"getting signature notification {} confirmation {}",
data.signature,
data.commitment.to_string()
);
let signature_params = SignatureSubscriptionParams {
commitment: CommitmentConfig {
commitment: data.commitment,
},
signature: data.signature,
enable_received_notification: false,
};
let param = SubscriptionParams::Signature(signature_params);
match self.subscriptions.entry(param) {
dashmap::mapref::entry::Entry::Occupied(x) => {
let subscription_id = *x.get();
let slot = data.slot;
let value = Response::from(RpcNotificationResponse {
context: RpcNotificationContext { slot },
value: RpcSignatureResult::ProcessedSignature(
ProcessedSignatureResult { err: None },
),
});
let notification = Notification {
jsonrpc: Some(jsonrpc_core::Version::V2),
method: &"signatureNotification",
params: NotificationParams {
result: value,
subscription: subscription_id,
},
};
let json = serde_json::to_string(&notification).unwrap();
Some(LiteRpcNotification {
subscription_id: *x.get(),
created_at: Instant::now(),
is_final: false,
json,
})
}
dashmap::mapref::entry::Entry::Vacant(_x) => None,
}
}
NotificationType::Slot(data) => {
// SubscriptionId 0 will be used for slots
let subscription_id = if data.commitment == CommitmentLevel::Confirmed {
SubscriptionId::from(0)
} else {
SubscriptionId::from(1)
};
let value = SlotInfo {
parent: data.parent,
slot: data.slot,
root: data.root,
};
let notification = Notification {
jsonrpc: Some(jsonrpc_core::Version::V2),
method: &"slotNotification",
params: NotificationParams {
result: value,
subscription: subscription_id,
},
};
let json = serde_json::to_string(&notification).unwrap();
Some(LiteRpcNotification {
subscription_id: subscription_id,
created_at: Instant::now(),
is_final: false,
json,
})
}
};
if let Some(rpc_notification) = rpc_notification {
self.broadcast_sender.send(rpc_notification).unwrap();
}
}
Err(_e) => {
break;
}
}
}
}
}

View File

@ -1,8 +1,12 @@
use std::sync::Arc;
use clap::Parser;
use context::LiteRpcSubsrciptionControl;
use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder};
use pubsub::LitePubSubService;
use solana_perf::thread::renice_this_thread;
use tokio::sync::broadcast;
use crate::rpc::{
lite_rpc::{self, Lite},
@ -10,24 +14,54 @@ use crate::rpc::{
};
mod cli;
mod context;
mod pubsub;
mod rpc;
pub fn main() {
let matches = cli::build_args(solana_version::version!()).get_matches();
let cli_config = cli::extract_args(&matches);
use cli::Args;
let cli::Config {
json_rpc_url,
pub fn main() {
let mut cli_config = Args::parse();
cli_config.resolve_address();
println!(
"Using rpc server {} and ws server {}",
cli_config.rpc_url, cli_config.websocket_url
);
let Args {
rpc_url: json_rpc_url,
websocket_url,
rpc_addr,
port: rpc_addr,
subscription_port,
..
} = &cli_config;
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(128);
let (notification_sender, notification_reciever) = crossbeam_channel::unbounded();
let pubsub_control = Arc::new(LiteRpcSubsrciptionControl::new(
broadcast_sender,
notification_reciever,
));
// start websocket server
let (_trigger, websocket_service) =
LitePubSubService::new(pubsub_control.clone(), *subscription_port);
// start recieving notifications and broadcast them
{
let pubsub_control = pubsub_control.clone();
std::thread::Builder::new()
.name("broadcasting thread".to_string())
.spawn(move || {
pubsub_control.start_broadcasting();
})
.unwrap();
}
let mut io = MetaIoHandler::default();
let lite_rpc = lite_rpc::LightRpc;
io.extend_with(lite_rpc.to_delegate());
let mut request_processor = LightRpcRequestProcessor::new(json_rpc_url, websocket_url);
let mut request_processor =
LightRpcRequestProcessor::new(json_rpc_url, websocket_url, notification_sender);
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
@ -59,4 +93,5 @@ pub fn main() {
server.unwrap().wait();
}
request_processor.free();
websocket_service.close().unwrap();
}

255
src/pubsub.rs Normal file
View File

@ -0,0 +1,255 @@
use dashmap::DashMap;
use jsonrpc_core::{ErrorCode, IoHandler};
use soketto::handshake::{server, Server};
use solana_rpc::rpc_subscription_tracker::{SignatureSubscriptionParams, SubscriptionParams};
use std::{net::SocketAddr, str::FromStr, thread::JoinHandle};
use stream_cancel::{Trigger, Tripwire};
use tokio::{net::TcpStream, pin, select};
use tokio_util::compat::TokioAsyncReadCompatExt;
use crate::context::LiteRpcSubsrciptionControl;
use {
jsonrpc_core::{Error, Result},
jsonrpc_derive::rpc,
solana_rpc::rpc_subscription_tracker::SubscriptionId,
solana_rpc_client_api::config::*,
solana_sdk::signature::Signature,
std::sync::Arc,
};
#[rpc]
pub trait LiteRpcPubSub {
// Get notification when signature is verified
// Accepts signature parameter as base-58 encoded string
#[rpc(name = "signatureSubscribe")]
fn signature_subscribe(
&self,
signature_str: String,
config: Option<RpcSignatureSubscribeConfig>,
) -> Result<SubscriptionId>;
// Unsubscribe from signature notification subscription.
#[rpc(name = "signatureUnsubscribe")]
fn signature_unsubscribe(&self, id: SubscriptionId) -> Result<bool>;
// Get notification when slot is encountered
#[rpc(name = "slotSubscribe")]
fn slot_subscribe(&self) -> Result<SubscriptionId>;
// Unsubscribe from slot notification subscription.
#[rpc(name = "slotUnsubscribe")]
fn slot_unsubscribe(&self, id: SubscriptionId) -> Result<bool>;
}
#[derive(Clone)]
pub struct LiteRpcPubSubImpl {
subscription_control: Arc<LiteRpcSubsrciptionControl>,
pub current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionParams>>,
}
impl LiteRpcPubSubImpl {
pub fn new(subscription_control: Arc<LiteRpcSubsrciptionControl>) -> Self {
Self {
current_subscriptions: Arc::new(DashMap::new()),
subscription_control,
}
}
fn subscribe(&self, params: SubscriptionParams) -> Result<SubscriptionId> {
match self
.subscription_control
.subscriptions
.entry(params.clone())
{
dashmap::mapref::entry::Entry::Occupied(x) => Ok(*x.get()),
dashmap::mapref::entry::Entry::Vacant(x) => {
let new_subscription_id = self
.subscription_control
.last_subscription_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let new_subsription_id = SubscriptionId::from(new_subscription_id);
x.insert(new_subsription_id);
self.current_subscriptions
.insert(new_subsription_id, params);
Ok(new_subsription_id)
}
}
}
fn unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
match self.current_subscriptions.entry(id) {
dashmap::mapref::entry::Entry::Occupied(x) => {
x.remove();
Ok(true)
}
dashmap::mapref::entry::Entry::Vacant(_) => Ok(false),
}
}
}
fn param<T: FromStr>(param_str: &str, thing: &str) -> Result<T> {
param_str.parse::<T>().map_err(|_e| Error {
code: ErrorCode::InvalidParams,
message: format!("Invalid Request: Invalid {} provided", thing),
data: None,
})
}
impl LiteRpcPubSub for LiteRpcPubSubImpl {
fn signature_subscribe(
&self,
signature_str: String,
config: Option<RpcSignatureSubscribeConfig>,
) -> Result<SubscriptionId> {
let config = config.unwrap_or_default();
let params = SignatureSubscriptionParams {
signature: param::<Signature>(&signature_str, "signature")?,
commitment: config.commitment.unwrap_or_default(),
enable_received_notification: false,
};
self.subscribe(SubscriptionParams::Signature(params))
}
fn signature_unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
self.unsubscribe(id)
}
// Get notification when slot is encountered
fn slot_subscribe(&self) -> Result<SubscriptionId> {
self.current_subscriptions
.insert(SubscriptionId::from(0), SubscriptionParams::Slot);
Ok(SubscriptionId::from(0))
}
// Unsubscribe from slot notification subscription.
fn slot_unsubscribe(&self, _id: SubscriptionId) -> Result<bool> {
self.current_subscriptions.remove(&SubscriptionId::from(0));
Ok(true)
}
}
pub struct LitePubSubService {
thread_hdl: JoinHandle<()>,
}
#[derive(Debug, thiserror::Error)]
enum HandleError {
#[error("handshake error: {0}")]
Handshake(#[from] soketto::handshake::Error),
#[error("connection error: {0}")]
Connection(#[from] soketto::connection::Error),
#[error("broadcast queue error: {0}")]
Broadcast(#[from] tokio::sync::broadcast::error::RecvError),
}
async fn handle_connection(
socket: TcpStream,
subscription_control: Arc<LiteRpcSubsrciptionControl>,
) -> core::result::Result<(), HandleError> {
let mut server = Server::new(socket.compat());
let request = server.receive_request().await?;
let accept = server::Response::Accept {
key: request.key(),
protocol: None,
};
server.send_response(&accept).await?;
let (mut sender, mut receiver) = server.into_builder().finish();
let mut broadcast_receiver = subscription_control.broadcast_sender.subscribe();
let mut json_rpc_handler = IoHandler::new();
let rpc_impl = LiteRpcPubSubImpl::new(subscription_control);
json_rpc_handler.extend_with(rpc_impl.clone().to_delegate());
loop {
let mut data = Vec::new();
// Extra block for dropping `receive_future`.
{
// soketto is not cancel safe, so we have to introduce an inner loop to poll
// `receive_data` to completion.
let receive_future = receiver.receive_data(&mut data);
pin!(receive_future);
loop {
select! {
result = &mut receive_future => match result {
Ok(_) => break,
Err(soketto::connection::Error::Closed) => return Ok(()),
Err(err) => return Err(err.into()),
},
result = broadcast_receiver.recv() => {
if let Ok(x) = result {
if rpc_impl.current_subscriptions.contains_key(&x.subscription_id) {
sender.send_text(&x.json).await?;
}
}
},
}
}
}
let data_str = String::from_utf8(data).unwrap();
if let Some(response) = json_rpc_handler.handle_request(data_str.as_str()).await {
sender.send_text(&response).await?;
}
}
}
async fn listen(
listen_address: SocketAddr,
subscription_control: Arc<LiteRpcSubsrciptionControl>,
mut tripwire: Tripwire,
) -> std::io::Result<()> {
let listener = tokio::net::TcpListener::bind(&listen_address).await?;
loop {
select! {
result = listener.accept() => match result {
Ok((socket, addr)) => {
let subscription_control = subscription_control.clone();
tokio::spawn(async move {
let handle = handle_connection(
socket, subscription_control
);
match handle.await {
Ok(()) => println!("connection closed ({:?})", addr),
Err(err) => println!("connection handler error ({:?}): {}", addr, err),
}
});
},
Err(e) => println!("couldn't accept connection: {:?}", e),
},
_ = &mut tripwire => return Ok(()),
}
}
}
impl LitePubSubService {
pub fn new(
subscription_control: Arc<LiteRpcSubsrciptionControl>,
pubsub_addr: SocketAddr,
) -> (Trigger, Self) {
let (trigger, tripwire) = Tripwire::new();
let thread_hdl = std::thread::Builder::new()
.name("solRpcPubSub".to_string())
.spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(512)
.enable_all()
.build()
.expect("runtime creation failed");
if let Err(err) =
runtime.block_on(listen(pubsub_addr, subscription_control, tripwire))
{
println!("pubsub service failed: {}", err);
};
})
.expect("thread spawn failed");
(trigger, Self { thread_hdl })
}
pub fn close(self) -> std::thread::Result<()> {
self.join()
}
pub fn join(self) -> std::thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -1,16 +1,25 @@
use dashmap::DashMap;
use solana_client::{
pubsub_client::{BlockSubscription, PubsubClientError, SignatureSubscription},
tpu_client::TpuClientConfig,
};
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
use std::{thread::{Builder, JoinHandle}, sync::Mutex};
use std::{
str::FromStr,
sync::Mutex,
thread::{Builder, JoinHandle},
};
use crate::context::{BlockInformation, LiteRpcContext};
use crate::context::{
BlockInformation, LiteRpcContext, NotificationType, SignatureNotification, SlotNotification,
};
use crossbeam_channel::Sender;
use {
bincode::config::Options,
crossbeam_channel::Receiver,
jsonrpc_core::{Error, Metadata, Result},
jsonrpc_derive::rpc,
solana_client::connection_cache::ConnectionCache,
solana_client::{rpc_client::RpcClient, tpu_client::TpuClient},
solana_perf::packet::PACKET_DATA_SIZE,
solana_rpc_client_api::{
@ -22,12 +31,10 @@ use {
signature::Signature,
transaction::VersionedTransaction,
},
solana_client::connection_cache::ConnectionCache,
solana_transaction_status::{TransactionBinaryEncoding, UiTransactionEncoding},
std::{
any::type_name,
collections::HashMap,
sync::{atomic::Ordering, Arc, RwLock},
sync::{atomic::Ordering, Arc},
},
};
@ -44,7 +51,11 @@ pub struct LightRpcRequestProcessor {
}
impl LightRpcRequestProcessor {
pub fn new(json_rpc_url: &str, websocket_url: &str) -> LightRpcRequestProcessor {
pub fn new(
json_rpc_url: &str,
websocket_url: &str,
notification_sender: Sender<NotificationType>,
) -> LightRpcRequestProcessor {
let rpc_client = Arc::new(RpcClient::new(json_rpc_url));
let connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = Arc::new(
@ -57,7 +68,7 @@ impl LightRpcRequestProcessor {
.unwrap(),
);
let context = Arc::new(LiteRpcContext::new(rpc_client.clone()));
let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender));
// subscribe for confirmed_blocks
let (client_confirmed, receiver_confirmed) =
@ -115,15 +126,13 @@ impl LightRpcRequestProcessor {
fn subscribe_signature(
websocket_url: &String,
signature: &Signature,
commitment:CommitmentLevel
commitment: CommitmentLevel,
) -> std::result::Result<SignatureSubscription, PubsubClientError> {
PubsubClient::signature_subscribe(
websocket_url.as_str(),
signature,
Some(RpcSignatureSubscribeConfig {
commitment: Some(CommitmentConfig {
commitment,
}),
commitment: Some(CommitmentConfig { commitment }),
enable_received_notification: Some(false),
}),
)
@ -143,15 +152,22 @@ impl LightRpcRequestProcessor {
} else {
&context.finalized_block_info
};
Self::process_block(reciever, &context.signature_status, commitment, block_info);
Self::process_block(
reciever,
&context.signature_status,
commitment,
&context.notification_sender,
block_info,
);
})
.unwrap()
}
fn process_block(
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
signature_status: &RwLock<HashMap<String, Option<CommitmentLevel>>>,
signature_status: &DashMap<String, Option<CommitmentLevel>>,
commitment: CommitmentLevel,
notification_sender: &crossbeam_channel::Sender<NotificationType>,
block_information: &BlockInformation,
) {
println!("processing blocks for {}", commitment);
@ -164,6 +180,17 @@ impl LightRpcRequestProcessor {
block_information
.slot
.store(block_update.slot, Ordering::Relaxed);
let slot_notification = SlotNotification {
commitment: commitment,
slot: block_update.slot,
parent: 0,
root: 0,
};
if let Err(e) =
notification_sender.send(NotificationType::Slot(slot_notification))
{
println!("Error sending slot notification error : {}", e.to_string());
}
if let Some(block) = &block_update.block {
block_information
@ -174,15 +201,35 @@ impl LightRpcRequestProcessor {
let mut lock = block_information.block_hash.write().unwrap();
*lock = block.blockhash.clone();
}
if let Some(signatures) = &block.signatures {
let mut lock = signature_status.write().unwrap();
for signature in signatures {
if lock.contains_key(signature) {
println!(
"found signature {} for commitment {}",
signature, commitment
);
lock.insert(signature.clone(), Some(commitment));
match signature_status.entry(signature.clone()) {
dashmap::mapref::entry::Entry::Occupied(mut x) => {
println!(
"found signature {} for commitment {}",
signature, commitment
);
let signature_notification = SignatureNotification {
signature: Signature::from_str(signature.as_str())
.unwrap(),
commitment,
slot: block_update.slot,
error: None,
};
if let Err(e) = notification_sender.send(
NotificationType::Signature(signature_notification),
) {
println!(
"Error sending signature notification error : {}",
e.to_string()
);
}
x.insert(Some(commitment));
}
dashmap::mapref::entry::Entry::Vacant(_x) => {
// do nothing transaction not sent by lite rpc
}
}
}
} else {
@ -210,7 +257,7 @@ impl LightRpcRequestProcessor {
subscribed_client.send_unsubscribe().unwrap();
subscribed_client.shutdown().unwrap();
}
let joinables = &mut self.joinables.lock().unwrap();
let len = joinables.len();
for _i in 0..len {
@ -285,11 +332,10 @@ pub mod lite_rpc {
let (wire_transaction, transaction) =
decode_and_deserialize::<VersionedTransaction>(data, binary_encoding)?;
{
let mut lock = meta.context.signature_status.write().unwrap();
lock.insert(transaction.signatures[0].to_string(), None);
println!("added {} to map", transaction.signatures[0]);
}
meta.context
.signature_status
.insert(transaction.signatures[0].to_string(), None);
println!("added {} to map", transaction.signatures[0]);
meta.tpu_client.send_wire_transaction(wire_transaction);
Ok(transaction.signatures[0].to_string())
}
@ -339,8 +385,9 @@ pub mod lite_rpc {
signature_str: String,
commitment_cfg: Option<CommitmentConfig>,
) -> Result<RpcResponse<bool>> {
let lock = meta.context.signature_status.read().unwrap();
let k_value = lock.get_key_value(&signature_str);
let singature_status = meta.context.signature_status.get(&signature_str);
let k_value = singature_status;
let commitment = match commitment_cfg {
Some(x) => x.commitment,
None => CommitmentLevel::Confirmed,
@ -359,7 +406,7 @@ pub mod lite_rpc {
};
match k_value {
Some(value) => match value.1 {
Some(value) => match *value {
Some(commitment_for_signature) => {
println!("found in cache");
Ok(RpcResponse {