This commit is contained in:
Aniket Prajapati 2023-01-05 16:24:00 +05:30
parent 3ffe1470b8
commit 7396bb4146
No known key found for this signature in database
GPG Key ID: D4346D8C9C5398F2
5 changed files with 46 additions and 15 deletions

View File

@ -8,6 +8,7 @@ use crate::{
use std::{ops::Deref, str::FromStr, sync::Arc};
use anyhow::bail;
use log::info;
use reqwest::Url;
use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink};
@ -72,9 +73,10 @@ impl LiteBridge {
}
/// List for `JsonRpc` requests
pub async fn start_services(
pub async fn start_services<T: ToSocketAddrs + std::fmt::Debug + 'static + Send + Clone>(
self,
addr: impl ToSocketAddrs,
http_addr: T,
ws_addr: T,
) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
let tx_sender = self.tx_sender.clone();
@ -82,17 +84,36 @@ impl LiteBridge {
let confirmed_block_listenser = self.confirmed_block_listenser.clone().listen();
let handle = ServerBuilder::default()
.build(addr)
let ws_server_handle = ServerBuilder::default()
.ws_only()
.build(ws_addr.clone())
.await?
.start(self.clone().into_rpc())?;
let http_server_handle = ServerBuilder::default()
.http_only()
.build(http_addr.clone())
.await?
.start(self.into_rpc())?;
let server = tokio::spawn(async move {
handle.stopped().await;
bail!("server stopped");
let ws_server = tokio::spawn(async move {
info!("Websocket Server started at {ws_addr:?}");
ws_server_handle.stopped().await;
bail!("Websocket server stopped");
});
let mut services = vec![server, finalized_block_listenser, confirmed_block_listenser];
let http_server = tokio::spawn(async move {
info!("HTTP Server started at {http_addr:?}");
http_server_handle.stopped().await;
bail!("HTTP server stopped");
});
let mut services = vec![
ws_server,
http_server,
finalized_block_listenser,
confirmed_block_listenser,
];
if let Some(tx_sender) = tx_sender {
services.push(tx_sender.execute());
@ -214,10 +235,11 @@ impl LiteRpcServer for LiteBridge {
fn signature_subscribe(
&self,
sink: SubscriptionSink,
mut sink: SubscriptionSink,
signature: String,
commitment_config: CommitmentConfig,
) -> SubscriptionResult {
sink.accept()?;
self.get_block_listner(commitment_config)
.signature_subscribe(signature, sink);
Ok(())

View File

@ -9,7 +9,9 @@ pub struct Args {
#[arg(short, long, default_value_t = String::from(DEFAULT_WS_ADDR))]
pub ws_addr: String,
#[arg(short, long, default_value_t = String::from("127.0.0.1:8890"))]
pub lite_rpc_addr: String,
pub lite_rpc_http_addr: String,
#[arg(short, long, default_value_t = String::from("127.0.0.1:8891"))]
pub lite_rpc_ws_addr: String,
#[arg(short, long, default_value_t = false)]
pub batch_transactions: bool,
}

View File

@ -18,8 +18,9 @@ pub async fn main() -> anyhow::Result<()> {
let Args {
rpc_addr,
ws_addr,
lite_rpc_addr,
batch_transactions,
lite_rpc_ws_addr,
lite_rpc_http_addr,
} = Args::parse();
let light_bridge = LiteBridge::new(
@ -29,7 +30,9 @@ pub async fn main() -> anyhow::Result<()> {
)
.await?;
let services = light_bridge.start_services(lite_rpc_addr).await?;
let services = light_bridge
.start_services(lite_rpc_http_addr, lite_rpc_ws_addr)
.await?;
let services = futures::future::try_join_all(services);
let ctrl_c_signal = tokio::signal::ctrl_c();

View File

@ -4,7 +4,7 @@ use anyhow::{bail, Context};
use dashmap::DashMap;
use futures::StreamExt;
use jsonrpsee::SubscriptionSink;
use log::info;
use log::{info, warn};
use solana_client::{
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient},
rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter},
@ -80,7 +80,9 @@ impl BlockListener {
}
pub fn signature_subscribe(&self, signature: String, sink: SubscriptionSink) {
self.signature_subscribers.insert(signature, sink).unwrap();
warn!("subscribing {signature}");
let _ = self.signature_subscribers.insert(signature, sink);
}
pub fn signature_un_subscribe(&self, signature: String) {
@ -144,7 +146,8 @@ impl BlockListener {
info!("{comfirmation_status:?} {sig}");
// subscribers
if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) {
if let Some((_, mut sink)) = self.signature_subscribers.remove(&sig) {
warn!("notification {}", sig);
// none if transaction succeeded
sink.send::<Option<TransactionError>>(&None).unwrap();
}

View File

@ -8,6 +8,7 @@ test('send and confirm transaction', async () => {
const toAccount = Keypair.generate().publicKey;
const airdropSignature = await connection.requestAirdrop(payer.publicKey, LAMPORTS_PER_SOL * 2);
console.log('airdrop signature ' + airdropSignature);
await connection.confirmTransaction(airdropSignature, 'finalized');
const transaction = new Transaction();