websockets completion of broadcasting notification and rpc service

This commit is contained in:
Godmode Galactus 2022-11-27 15:23:08 +01:00
parent 85be3cc707
commit e40c7ec61e
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
4 changed files with 94 additions and 76 deletions

View File

@ -1,12 +1,13 @@
use crossbeam_channel::{Sender};
use dashmap::DashMap;
use serde::{Serialize, Deserialize};
use solana_client::{rpc_client::RpcClient, rpc_response::{RpcSignatureResult, ReceivedSignatureResult, RpcResponseContext}};
use serde::{Serialize};
use solana_client::{rpc_client::RpcClient, rpc_response::{RpcSignatureResult, ReceivedSignatureResult, RpcResponseContext, SlotInfo}};
use solana_rpc::{rpc_subscription_tracker::{SubscriptionId, SubscriptionParams, SignatureSubscriptionParams}, rpc_subscriptions::RpcNotification};
use solana_sdk::{commitment_config::{CommitmentConfig, CommitmentLevel}, signature::Signature};
use tokio::sync::broadcast;
use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc, RwLock}, time::Instant,
sync::{atomic::AtomicU64, Arc, RwLock}, time::Instant, thread::Builder,
};
pub struct BlockInformation {
@ -39,10 +40,12 @@ pub struct LiteRpcContext {
pub signature_status: RwLock<HashMap<String, Option<CommitmentLevel>>>,
pub finalized_block_info: BlockInformation,
pub confirmed_block_info: BlockInformation,
pub notification_sender : Sender<NotificationType>,
}
impl LiteRpcContext {
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
pub fn new(rpc_client: Arc<RpcClient>,
notification_sender : Sender<NotificationType>,) -> Self {
LiteRpcContext {
signature_status: RwLock::new(HashMap::new()),
confirmed_block_info: BlockInformation::new(
@ -50,6 +53,7 @@ impl LiteRpcContext {
CommitmentLevel::Confirmed,
),
finalized_block_info: BlockInformation::new(rpc_client, CommitmentLevel::Finalized),
notification_sender,
}
}
}
@ -80,12 +84,6 @@ struct Notification<T> {
params: NotificationParams<T>,
}
pub struct LiteRpcSubsrciptionControl {
broadcast_sender: broadcast::Sender<LiteRpcNotification>,
notification_reciever : crossbeam_channel::Receiver<NotificationType>,
subscriptions : DashMap<SubscriptionParams, SubscriptionId>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct Response<T> {
pub context: RpcResponseContext,
@ -129,6 +127,12 @@ pub struct LiteRpcNotification {
pub created_at: Instant,
}
pub struct LiteRpcSubsrciptionControl {
pub broadcast_sender: broadcast::Sender<LiteRpcNotification>,
notification_reciever : crossbeam_channel::Receiver<NotificationType>,
pub subscriptions : DashMap<SubscriptionParams, SubscriptionId>,
pub last_subscription_id : AtomicU64,
}
impl LiteRpcSubsrciptionControl {
pub fn new(
@ -138,6 +142,7 @@ impl LiteRpcSubsrciptionControl {
Self { broadcast_sender,
notification_reciever,
subscriptions : DashMap::new(),
last_subscription_id : AtomicU64::new(1)
}
}
@ -171,7 +176,7 @@ impl LiteRpcSubsrciptionControl {
let notification = Notification {
jsonrpc: Some(jsonrpc_core::Version::V2),
method: &"signatureSubscription",
method: &"signatureNotification",
params: NotificationParams {
result: value,
subscription: subscription_id,
@ -185,14 +190,35 @@ impl LiteRpcSubsrciptionControl {
json,
} )
},
dashmap::mapref::entry::Entry::Vacant(x) => {
dashmap::mapref::entry::Entry::Vacant(_x) => {
None
}
}
},
NotificationType::Slot(slot) => {
// SubscriptionId 0 will be used for slots
None
let subscription_id = SubscriptionId::from(0);
let value = SlotInfo{
parent: 0,
slot,
root: 0,
};
let notification = Notification {
jsonrpc: Some(jsonrpc_core::Version::V2),
method: &"slotNotification",
params: NotificationParams {
result: value,
subscription: subscription_id,
},
};
let json = serde_json::to_string(&notification).unwrap();
Some( LiteRpcNotification{
subscription_id : subscription_id,
created_at : Instant::now(),
is_final: false,
json,
} )
}
};
if let Some(rpc_notification) = rpc_notification {

View File

@ -1,8 +1,10 @@
use std::sync::Arc;
use context::LiteRpcSubsrciptionControl;
use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder};
use solana_perf::thread::renice_this_thread;
use tokio::sync::broadcast;
use crate::rpc::{
lite_rpc::{self, Lite},
@ -24,11 +26,25 @@ pub fn main() {
..
} = &cli_config;
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(128);
let (notification_sender, notification_reciever) = crossbeam_channel::unbounded();
let pubsub_control = Arc::new(LiteRpcSubsrciptionControl::new(broadcast_sender, notification_reciever));
// start recieving notifications and broadcast them
{
let pubsub_control = pubsub_control.clone();
std::thread::Builder::new().name("broadcasting thread".to_string()).spawn(move || {
pubsub_control.start_broadcasting();
}
).unwrap();
}
let mut io = MetaIoHandler::default();
let lite_rpc = lite_rpc::LightRpc;
io.extend_with(lite_rpc.to_delegate());
let mut request_processor = LightRpcRequestProcessor::new(json_rpc_url, websocket_url);
let mut request_processor = LightRpcRequestProcessor::new(json_rpc_url, websocket_url, notification_sender);
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()

View File

@ -1,42 +1,22 @@
use dashmap::DashMap;
use jsonrpc_core::ErrorCode;
use solana_client::{
pubsub_client::{BlockSubscription, PubsubClientError, SignatureSubscription},
tpu_client::TpuClientConfig,
};
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
use solana_rpc::{rpc_pubsub_service::PubSubConfig, rpc_subscription_tracker::{SubscriptionControl, SubscriptionToken, SubscriptionParams, SignatureSubscriptionParams}};
use std::{thread::{Builder, JoinHandle}, sync::Mutex, str::FromStr};
use solana_rpc::{rpc_subscription_tracker::{SubscriptionParams, SignatureSubscriptionParams}};
use std::{str::FromStr, sync::atomic::AtomicU64};
use crate::context::{BlockInformation, LiteRpcContext, self};
use crate::context::{LiteRpcSubsrciptionControl};
use {
bincode::config::Options,
crossbeam_channel::Receiver,
jsonrpc_core::{Error, Metadata, Result},
jsonrpc_core::{Error, Result},
jsonrpc_derive::rpc,
solana_client::{rpc_client::RpcClient, tpu_client::TpuClient},
solana_perf::packet::PACKET_DATA_SIZE,
solana_rpc_client_api::{
config::*,
response::{Response as RpcResponse, *},
},
solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
transaction::VersionedTransaction,
},
solana_client::connection_cache::ConnectionCache,
solana_transaction_status::{TransactionBinaryEncoding, UiTransactionEncoding},
std::{
any::type_name,
collections::HashMap,
sync::{atomic::Ordering, Arc, RwLock},
},
jsonrpc_pubsub::{
typed::Subscriber,
sync::{Arc},
},
solana_rpc::rpc_subscription_tracker::SubscriptionId,
jsonrpc_pubsub::SubscriptionId as PubSubSubscriptionId,
};
#[rpc]
@ -67,50 +47,47 @@ pub trait LiteRpcPubSub {
pub struct LiteRpcPubSubImpl {
config: PubSubConfig,
subscription_control: SubscriptionControl,
current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionToken>>,
context: Arc<LiteRpcContext>,
subscription_control: Arc<LiteRpcSubsrciptionControl>,
current_subscriptions: Arc<DashMap<SubscriptionId, (AtomicU64,SubscriptionParams)>>,
}
impl LiteRpcPubSubImpl {
pub fn new(
config : PubSubConfig,
subscription_control : SubscriptionControl,
context : Arc<LiteRpcContext>
subscription_control: Arc<LiteRpcSubsrciptionControl>,
) -> Self {
Self {
config,
current_subscriptions : Arc::new(DashMap::new()),
subscription_control,
context
}
}
fn subscribe(&self, params: SubscriptionParams) -> Result<SubscriptionId> {
let token = self
.subscription_control
.subscribe(params)
.map_err(|_| Error {
code: ErrorCode::InternalError,
message: "Internal Error: Subscription refused. Node subscription limit reached"
.into(),
data: None,
})?;
let id = token.id();
self.current_subscriptions.insert(id, token);
Ok(id)
match self.subscription_control.subscriptions.entry(params.clone()) {
dashmap::mapref::entry::Entry::Occupied(x) => {
Ok(*x.get())
},
dashmap::mapref::entry::Entry::Vacant(x) => {
let new_subscription_id = self.subscription_control.last_subscription_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let new_subsription_id = SubscriptionId::from(new_subscription_id);
x.insert(new_subsription_id);
self.current_subscriptions.insert(new_subsription_id, (AtomicU64::new(1), params));
Ok(new_subsription_id)
}
}
}
fn unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
if self.current_subscriptions.remove(&id).is_some() {
Ok(true)
} else {
Err(Error {
code: ErrorCode::InvalidParams,
message: "Invalid subscription id.".into(),
data: None,
})
match self.current_subscriptions.entry(id) {
dashmap::mapref::entry::Entry::Occupied(x) => {
let v = x.get();
let count = v.0.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
if count == 1 { // it was the last subscription
self.subscription_control.subscriptions.remove(&v.1);
x.remove();
}
Ok(true)
},
dashmap::mapref::entry::Entry::Vacant(_) => Ok(false),
}
}
@ -134,7 +111,7 @@ impl LiteRpcPubSub for LiteRpcPubSubImpl {
let params = SignatureSubscriptionParams {
signature: param::<Signature>(&signature_str, "signature")?,
commitment: config.commitment.unwrap_or_default(),
enable_received_notification: config.enable_received_notification.unwrap_or_default(),
enable_received_notification: false,
};
self.subscribe(SubscriptionParams::Signature(params))
}
@ -145,13 +122,11 @@ impl LiteRpcPubSub for LiteRpcPubSubImpl {
// Get notification when slot is encountered
fn slot_subscribe(&self) -> Result<SubscriptionId>{
self.subscribe(SubscriptionParams::Slot)
Ok(SubscriptionId::from(0))
}
// Unsubscribe from slot notification subscription.
fn slot_unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
self.unsubscribe(id)
Ok(true)
}
}

View File

@ -5,7 +5,7 @@ use solana_client::{
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
use std::{thread::{Builder, JoinHandle}, sync::Mutex};
use crate::context::{BlockInformation, LiteRpcContext};
use crate::context::{BlockInformation, LiteRpcContext, NotificationType};
use {
bincode::config::Options,
crossbeam_channel::Receiver,
@ -30,6 +30,7 @@ use {
sync::{atomic::Ordering, Arc, RwLock},
},
};
use crossbeam_channel::{Sender};
#[derive(Clone)]
pub struct LightRpcRequestProcessor {
@ -44,7 +45,7 @@ pub struct LightRpcRequestProcessor {
}
impl LightRpcRequestProcessor {
pub fn new(json_rpc_url: &str, websocket_url: &str) -> LightRpcRequestProcessor {
pub fn new(json_rpc_url: &str, websocket_url: &str, notification_sender : Sender<NotificationType>,) -> LightRpcRequestProcessor {
let rpc_client = Arc::new(RpcClient::new(json_rpc_url));
let connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = Arc::new(
@ -57,7 +58,7 @@ impl LightRpcRequestProcessor {
.unwrap(),
);
let context = Arc::new(LiteRpcContext::new(rpc_client.clone()));
let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender));
// subscribe for confirmed_blocks
let (client_confirmed, receiver_confirmed) =