upgrade jsonrpsee and fixed 1 bug of early return

This commit is contained in:
aniketfuryrocks 2023-04-20 02:38:55 +05:30
parent 0f429ef727
commit 099fd80e34
No known key found for this signature in database
GPG Key ID: 1B75EA596D89FF06
5 changed files with 432 additions and 281 deletions

660
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -20,8 +20,8 @@ solana-client = "1.15.2"
solana-net-utils = "1.15.2" solana-net-utils = "1.15.2"
solana-pubsub-client = "1.15.2" solana-pubsub-client = "1.15.2"
solana-streamer = "1.15.2" solana-streamer = "1.15.2"
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.93" serde_json = "1.0.96"
tokio = { version = "1.27.0", features = ["full", "fs"]} tokio = { version = "1.27.0", features = ["full", "fs"]}
bincode = "1.3.3" bincode = "1.3.3"
bs58 = "0.4.0" bs58 = "0.4.0"
@ -29,15 +29,15 @@ base64 = "0.21.0"
thiserror = "1.0.40" thiserror = "1.0.40"
futures = "0.3.28" futures = "0.3.28"
bytes = "1.4.0" bytes = "1.4.0"
anyhow = "1.0.69" anyhow = "1.0.70"
log = "0.4.17" log = "0.4.17"
clap = { version = "4.1.6", features = ["derive"] } clap = { version = "4.2.4", features = ["derive"] }
dashmap = "5.4.0" dashmap = "5.4.0"
const_env = "0.1.2" const_env = "0.1.2"
jsonrpsee = { version = "0.16.2", features = ["macros", "full"] } jsonrpsee = { version = "0.17.0", features = ["macros", "full"] }
tracing-subscriber = "0.3.16" tracing-subscriber = "0.3.16"
chrono = "0.4.24" chrono = "0.4.24"
tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4"] } tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }
native-tls = "0.2.11" native-tls = "0.2.11"
postgres-native-tls = "0.5.0" postgres-native-tls = "0.5.0"
prometheus = "0.13.3" prometheus = "0.13.3"

View File

@ -17,7 +17,7 @@ use anyhow::bail;
use log::{error, info}; use log::{error, info};
use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink}; use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
use prometheus::{core::GenericGauge, opts, register_int_counter, register_int_gauge, IntCounter}; use prometheus::{core::GenericGauge, opts, register_int_counter, register_int_gauge, IntCounter};
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction}; use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
@ -444,16 +444,18 @@ impl LiteRpcServer for LiteBridge {
Ok(airdrop_sig) Ok(airdrop_sig)
} }
fn signature_subscribe( async fn signature_subscribe(
&self, &self,
mut sink: SubscriptionSink, pending: PendingSubscriptionSink,
signature: String, signature: String,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> SubscriptionResult { ) -> SubscriptionResult {
RPC_SIGNATURE_SUBSCRIBE.inc(); RPC_SIGNATURE_SUBSCRIBE.inc();
sink.accept()?; let sink = pending.accept().await?;
self.block_listner self.block_listner
.signature_subscribe(signature, commitment_config, sink); .signature_subscribe(signature, commitment_config, sink);
Ok(()) Ok(())
} }
} }

View File

@ -1,3 +1,4 @@
use jsonrpsee::core::SubscriptionResult;
use jsonrpsee::proc_macros::rpc; use jsonrpsee::proc_macros::rpc;
use solana_rpc_client_api::config::{ use solana_rpc_client_api::config::{
RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig, RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig,
@ -51,5 +52,9 @@ pub trait LiteRpc {
) -> Result<String>; ) -> Result<String>;
#[subscription(name = "signatureSubscribe" => "signatureNotification", unsubscribe="signatureUnsubscribe", item=RpcResponse<serde_json::Value>)] #[subscription(name = "signatureSubscribe" => "signatureNotification", unsubscribe="signatureUnsubscribe", item=RpcResponse<serde_json::Value>)]
fn signature_subscribe(&self, signature: String, commitment_config: CommitmentConfig); async fn signature_subscribe(
&self,
signature: String,
commitment_config: CommitmentConfig,
) -> SubscriptionResult;
} }

View File

@ -6,7 +6,7 @@ use std::{
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use dashmap::DashMap; use dashmap::DashMap;
use jsonrpsee::SubscriptionSink; use jsonrpsee::{SubscriptionMessage, SubscriptionSink};
use log::{info, trace, warn}; use log::{info, trace, warn};
use prometheus::{ use prometheus::{
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter, core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter,
@ -331,17 +331,23 @@ impl BlockListener {
}; };
// subscribers // subscribers
if let Some((_sig, (mut sink, _))) = if let Some((_sig, (sink, _))) =
self.signature_subscribers.remove(&(sig, commitment_config)) self.signature_subscribers.remove(&(sig, commitment_config))
{ {
// none if transaction succeeded // none if transaction succeeded
sink.send(&RpcResponse { let _res = sink
context: RpcResponseContext { .send(
slot, SubscriptionMessage::from_json(&RpcResponse {
api_version: None, context: RpcResponseContext {
}, slot,
value: serde_json::json!({ "err": err }), api_version: None,
})?; },
value: serde_json::json!({ "err": err }),
})
.unwrap(),
)
.await;
NUMBER_OF_SIGNATURE_SUBSCRIBERS.dec(); NUMBER_OF_SIGNATURE_SUBSCRIBERS.dec();
} }
} }