Correctly dispatching subscriptions with subscription ids

This commit is contained in:
Godmode Galactus 2022-11-30 13:27:47 +01:00
parent 12535686d4
commit 40b4e40930
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
3 changed files with 72 additions and 51 deletions

View File

@ -12,9 +12,7 @@ use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
};
use stream_cancel::Tripwire;
use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc, RwLock},
time::Instant,
};
@ -47,7 +45,7 @@ impl BlockInformation {
}
pub struct LiteRpcContext {
pub signature_status: RwLock<HashMap<String, Option<CommitmentLevel>>>,
pub signature_status: DashMap<String, Option<CommitmentLevel>>,
pub finalized_block_info: BlockInformation,
pub confirmed_block_info: BlockInformation,
pub notification_sender: Sender<NotificationType>,
@ -56,7 +54,7 @@ pub struct LiteRpcContext {
impl LiteRpcContext {
pub fn new(rpc_client: Arc<RpcClient>, notification_sender: Sender<NotificationType>) -> Self {
LiteRpcContext {
signature_status: RwLock::new(HashMap::new()),
signature_status: DashMap::new(),
confirmed_block_info: BlockInformation::new(
rpc_client.clone(),
CommitmentLevel::Confirmed,
@ -74,9 +72,16 @@ pub struct SignatureNotification {
pub error: Option<String>,
}
pub struct SlotNotification {
pub slot : u64,
pub commitment : CommitmentLevel,
pub parent : u64,
pub root : u64,
}
pub enum NotificationType {
Signature(SignatureNotification),
Slot(u64),
Slot(SlotNotification),
}
#[derive(Debug, Serialize)]
@ -149,7 +154,7 @@ impl LiteRpcSubsrciptionControl {
broadcast_sender,
notification_reciever,
subscriptions: DashMap::new(),
last_subscription_id: AtomicU64::new(1),
last_subscription_id: AtomicU64::new(2),
}
}
@ -160,6 +165,7 @@ impl LiteRpcSubsrciptionControl {
Ok(notification_type) => {
let rpc_notification = match notification_type {
NotificationType::Signature(data) => {
println!("getting signature notification {} confirmation {}", data.signature, data.commitment.to_string());
let signature_params = SignatureSubscriptionParams {
commitment: CommitmentConfig {
commitment: data.commitment,
@ -200,13 +206,13 @@ impl LiteRpcSubsrciptionControl {
dashmap::mapref::entry::Entry::Vacant(_x) => None,
}
}
NotificationType::Slot(slot) => {
NotificationType::Slot(data) => {
// SubscriptionId 0 will be used for slots
let subscription_id = SubscriptionId::from(0);
let subscription_id = if data.commitment == CommitmentLevel::Confirmed { SubscriptionId::from(0) } else {SubscriptionId::from(1)};
let value = SlotInfo {
parent: 0,
slot,
root: 0,
parent: data.parent,
slot: data.slot,
root: data.root,
};
let notification = Notification {

View File

@ -1,6 +1,7 @@
use dashmap::DashMap;
use jsonrpc_core::{ErrorCode, IoHandler};
use soketto::handshake::{server, Server};
use solana_client::pubsub_client::SlotsSubscription;
use solana_rpc::rpc_subscription_tracker::{SignatureSubscriptionParams, SubscriptionParams};
use std::{net::SocketAddr, str::FromStr, sync::atomic::AtomicU64, thread::JoinHandle};
use stream_cancel::{Tripwire, Trigger};
@ -41,9 +42,10 @@ pub trait LiteRpcPubSub {
fn slot_unsubscribe(&self, id: SubscriptionId) -> Result<bool>;
}
#[derive(Clone)]
pub struct LiteRpcPubSubImpl {
subscription_control: Arc<LiteRpcSubsrciptionControl>,
current_subscriptions: Arc<DashMap<SubscriptionId, (AtomicU64, SubscriptionParams)>>,
pub current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionParams>>,
}
impl LiteRpcPubSubImpl {
@ -69,22 +71,19 @@ impl LiteRpcPubSubImpl {
let new_subsription_id = SubscriptionId::from(new_subscription_id);
x.insert(new_subsription_id);
self.current_subscriptions
.insert(new_subsription_id, (AtomicU64::new(1), params));
.insert(new_subsription_id, params);
println!("subscribing {}", new_subscription_id);
Ok(new_subsription_id)
}
}
}
fn unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
let sub_id : u64 = u64::from(id);
match self.current_subscriptions.entry(id) {
dashmap::mapref::entry::Entry::Occupied(x) => {
let v = x.get();
let count = v.0.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
if count == 1 {
// it was the last subscription
self.subscription_control.subscriptions.remove(&v.1);
x.remove();
}
x.remove();
println!("unsubscribing {}", sub_id);
Ok(true)
}
dashmap::mapref::entry::Entry::Vacant(_) => Ok(false),
@ -160,7 +159,8 @@ async fn handle_connection(
let mut broadcast_receiver = subscription_control.broadcast_sender.subscribe();
let mut json_rpc_handler = IoHandler::new();
let rpc_impl = LiteRpcPubSubImpl::new(subscription_control);
json_rpc_handler.extend_with(rpc_impl.to_delegate());
rpc_impl.current_subscriptions.insert(SubscriptionId::from(0), SubscriptionParams::Slot);
json_rpc_handler.extend_with(rpc_impl.clone().to_delegate());
loop {
let mut data = Vec::new();
// Extra block for dropping `receive_future`.
@ -178,7 +178,10 @@ async fn handle_connection(
},
result = broadcast_receiver.recv() => {
if let Ok(x) = result {
sender.send_text(&x.json).await?;
if rpc_impl.current_subscriptions.contains_key(&x.subscription_id) {
println!("sending message \n {}", x.json);
sender.send_text(&x.json).await?;
}
}
},
}
@ -187,6 +190,7 @@ async fn handle_connection(
let data_str = String::from_utf8(data).unwrap();
if let Some(response) = json_rpc_handler.handle_request(data_str.as_str()).await {
println!("sending response \n {}", response);
sender.send_text(&response).await?;
}
}

View File

@ -1,3 +1,4 @@
use dashmap::DashMap;
use solana_client::{
pubsub_client::{BlockSubscription, PubsubClientError, SignatureSubscription},
tpu_client::TpuClientConfig,
@ -9,7 +10,7 @@ use std::{
thread::{Builder, JoinHandle},
};
use crate::context::{BlockInformation, LiteRpcContext, NotificationType, SignatureNotification};
use crate::context::{BlockInformation, LiteRpcContext, NotificationType, SignatureNotification, SlotNotification};
use crossbeam_channel::Sender;
use {
bincode::config::Options,
@ -163,7 +164,7 @@ impl LightRpcRequestProcessor {
fn process_block(
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
signature_status: &RwLock<HashMap<String, Option<CommitmentLevel>>>,
signature_status: &DashMap<String, Option<CommitmentLevel>>,
commitment: CommitmentLevel,
notification_sender: &crossbeam_channel::Sender<NotificationType>,
block_information: &BlockInformation,
@ -178,8 +179,14 @@ impl LightRpcRequestProcessor {
block_information
.slot
.store(block_update.slot, Ordering::Relaxed);
let slot_notification = SlotNotification {
commitment: commitment,
slot: block_update.slot,
parent : 0,
root : 0,
};
if let Err(e) =
notification_sender.send(NotificationType::Slot(block_update.slot))
notification_sender.send(NotificationType::Slot(slot_notification))
{
println!("Error sending slot notification error : {}", e.to_string());
}
@ -193,29 +200,35 @@ impl LightRpcRequestProcessor {
let mut lock = block_information.block_hash.write().unwrap();
*lock = block.blockhash.clone();
}
if let Some(signatures) = &block.signatures {
let mut lock = signature_status.write().unwrap();
for signature in signatures {
if lock.contains_key(signature) {
println!(
"found signature {} for commitment {}",
signature, commitment
);
let signature_notification = SignatureNotification {
signature: Signature::from_str(signature.as_str()).unwrap(),
commitment,
slot: block_update.slot,
error: None,
};
if let Err(e) = notification_sender
.send(NotificationType::Signature(signature_notification))
{
match signature_status.entry(signature.clone()) {
dashmap::mapref::entry::Entry::Occupied(mut x) => {
println!(
"Error sending signature notification error : {}",
e.to_string()
"found signature {} for commitment {}",
signature, commitment
);
let signature_notification = SignatureNotification {
signature: Signature::from_str(signature.as_str()).unwrap(),
commitment,
slot: block_update.slot,
error: None,
};
if let Err(e) = notification_sender
.send(NotificationType::Signature(signature_notification))
{
println!(
"Error sending signature notification error : {}",
e.to_string()
);
}
x.insert(Some(commitment));
},
dashmap::mapref::entry::Entry::Vacant(_x) => {
// do nothing transaction not sent by lite rpc
}
lock.insert(signature.clone(), Some(commitment));
}
}
} else {
@ -318,11 +331,8 @@ pub mod lite_rpc {
let (wire_transaction, transaction) =
decode_and_deserialize::<VersionedTransaction>(data, binary_encoding)?;
{
let mut lock = meta.context.signature_status.write().unwrap();
lock.insert(transaction.signatures[0].to_string(), None);
println!("added {} to map", transaction.signatures[0]);
}
meta.context.signature_status.insert(transaction.signatures[0].to_string(), None);
println!("added {} to map", transaction.signatures[0]);
meta.tpu_client.send_wire_transaction(wire_transaction);
Ok(transaction.signatures[0].to_string())
}
@ -372,8 +382,9 @@ pub mod lite_rpc {
signature_str: String,
commitment_cfg: Option<CommitmentConfig>,
) -> Result<RpcResponse<bool>> {
let lock = meta.context.signature_status.read().unwrap();
let k_value = lock.get_key_value(&signature_str);
let singature_status =meta.context.signature_status.get(&signature_str);
let k_value = singature_status;
let commitment = match commitment_cfg {
Some(x) => x.commitment,
None => CommitmentLevel::Confirmed,
@ -392,7 +403,7 @@ pub mod lite_rpc {
};
match k_value {
Some(value) => match value.1 {
Some(value) => match *value {
Some(commitment_for_signature) => {
println!("found in cache");
Ok(RpcResponse {