websocket pubsub

This commit is contained in:
Aniket Prajapati 2023-01-04 18:51:00 +05:30
parent af05b7420c
commit 50d0d06b89
4 changed files with 40 additions and 7 deletions

View File

@ -10,7 +10,7 @@ use std::{ops::Deref, str::FromStr, sync::Arc};
use anyhow::bail; use anyhow::bail;
use reqwest::Url; use reqwest::Url;
use jsonrpsee::server::ServerBuilder; use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink};
use solana_client::{ use solana_client::{
nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient}, nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient},
rpc_config::{RpcContextConfig, RpcRequestAirdropConfig}, rpc_config::{RpcContextConfig, RpcRequestAirdropConfig},
@ -211,6 +211,17 @@ impl LiteRpcServer for LiteBridge {
.unwrap() .unwrap()
.to_string()) .to_string())
} }
fn signature_subscribe(
&self,
sink: SubscriptionSink,
signature: String,
commitment_config: CommitmentConfig,
) -> SubscriptionResult {
self.get_block_listner(commitment_config)
.signature_subscribe(signature, sink);
Ok(())
}
} }
impl Deref for LiteBridge { impl Deref for LiteBridge {

View File

@ -4,12 +4,12 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Default, Serialize, Deserialize)] #[derive(Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct SendTransactionConfig { pub struct SendTransactionConfig {
// #[serde(default)] // #[serde(default)]
// pub skip_preflight: bool, // pub skip_preflight: bool,
// #[serde(default)] // #[serde(default)]
// pub preflight_commitment: CommitmentLevel, // pub preflight_commitment: CommitmentLevel,
#[serde(default)] #[serde(default)]
pub encoding: BinaryEncoding, pub encoding: BinaryEncoding,
pub max_retries: Option<u16>, pub max_retries: Option<u16>,
// pub min_context_slot: Option<Slot>, // pub min_context_slot: Option<Slot>,
} }

View File

@ -4,6 +4,7 @@ use solana_client::rpc_config::{
RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig, RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig,
}; };
use solana_client::rpc_response::{Response as RpcResponse, RpcBlockhash, RpcVersionInfo}; use solana_client::rpc_response::{Response as RpcResponse, RpcBlockhash, RpcVersionInfo};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::TransactionStatus; use solana_transaction_status::TransactionStatus;
use crate::configs::SendTransactionConfig; use crate::configs::SendTransactionConfig;
@ -42,4 +43,7 @@ pub trait LiteRpc {
lamports: u64, lamports: u64,
config: Option<RpcRequestAirdropConfig>, config: Option<RpcRequestAirdropConfig>,
) -> Result<String>; ) -> Result<String>;
#[subscription(name = "signatureSubscribe", unsubscribe="signatureUnsubscribe", item=Option<TransactionError>)]
fn signature_subscribe(&self, signature: String, commitment_config: CommitmentConfig);
} }

View File

@ -4,12 +4,14 @@ use std::sync::Arc;
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use dashmap::DashMap; use dashmap::DashMap;
use futures::StreamExt; use futures::StreamExt;
use jsonrpsee::SubscriptionSink;
use log::info; use log::info;
use solana_client::nonblocking::pubsub_client::PubsubClient; use solana_client::nonblocking::pubsub_client::PubsubClient;
use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter}; use solana_client::rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter};
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::transaction::TransactionError;
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus}; use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
@ -24,6 +26,7 @@ pub struct BlockListener {
latest_block_hash: Arc<RwLock<String>>, latest_block_hash: Arc<RwLock<String>>,
block_height: Arc<AtomicU64>, block_height: Arc<AtomicU64>,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
signature_subscribers: Arc<DashMap<String, SubscriptionSink>>,
} }
impl BlockListener { impl BlockListener {
@ -44,6 +47,7 @@ impl BlockListener {
latest_block_hash: Arc::new(RwLock::new(latest_block_hash.to_string())), latest_block_hash: Arc::new(RwLock::new(latest_block_hash.to_string())),
block_height: Arc::new(AtomicU64::new(block_height)), block_height: Arc::new(AtomicU64::new(block_height)),
commitment_config, commitment_config,
signature_subscribers: Default::default(),
}) })
} }
@ -70,6 +74,14 @@ impl BlockListener {
) )
} }
pub fn signature_subscribe(&self, signature: String, sink: SubscriptionSink) {
self.signature_subscribers.insert(signature, sink).unwrap();
}
pub fn signature_un_subscribe(&self, signature: String) {
self.signature_subscribers.remove(&signature);
}
pub fn listen(self) -> JoinHandle<anyhow::Result<()>> { pub fn listen(self) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move { tokio::spawn(async move {
info!("Subscribing to blocks"); info!("Subscribing to blocks");
@ -124,12 +136,18 @@ impl BlockListener {
for sig in signatures { for sig in signatures {
info!("{comfirmation_status:?} {sig}"); info!("{comfirmation_status:?} {sig}");
// subscribers
if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) {
// none if transaction succeeded
sink.send::<Option<TransactionError>>(&None).unwrap();
}
self.blocks.insert( self.blocks.insert(
sig, sig,
TransactionStatus { TransactionStatus {
slot, slot,
confirmations: None, //TODO: talk about this confirmations: None, //TODO: talk about this
status: Ok(()), // legacy field status: Ok(()), // legacy field
err: None, err: None,
confirmation_status: Some(comfirmation_status.clone()), confirmation_status: Some(comfirmation_status.clone()),
}, },