fix: signature subscribe with commitment

This commit is contained in:
aniketfuryrocks 2023-02-08 03:38:09 +05:30 committed by Godmode Galactus
parent 6e02b8daf9
commit 9212850600
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
2 changed files with 19 additions and 8 deletions

View File

@ -390,11 +390,12 @@ impl LiteRpcServer for LiteBridge {
&self,
mut sink: SubscriptionSink,
signature: String,
_commitment_config: CommitmentConfig,
commitment_config: CommitmentConfig,
) -> SubscriptionResult {
RPC_SIGNATURE_SUBSCRIBE.inc();
sink.accept()?;
self.block_listner.signature_subscribe(signature, sink);
self.block_listner
.signature_subscribe(signature, commitment_config, sink);
Ok(())
}
}

View File

@ -63,7 +63,7 @@ pub struct BlockListener {
tx_sender: TxSender,
block_store: BlockStore,
rpc_client: Arc<RpcClient>,
pub signature_subscribers: Arc<DashMap<String, SubscriptionSink>>,
pub signature_subscribers: Arc<DashMap<(String, CommitmentConfig), SubscriptionSink>>,
}
#[derive(Clone, Debug)]
@ -97,12 +97,20 @@ impl BlockListener {
num_of_sigs_commited
}
pub fn signature_subscribe(&self, signature: String, sink: SubscriptionSink) {
let _ = self.signature_subscribers.insert(signature, sink);
pub fn signature_subscribe(
&self,
signature: String,
commitment_config: CommitmentConfig,
sink: SubscriptionSink,
) {
let _ = self
.signature_subscribers
.insert((signature, commitment_config), sink);
}
pub fn signature_un_subscribe(&self, signature: String) {
self.signature_subscribers.remove(&signature);
pub fn signature_un_subscribe(&self, signature: String, commitment_config: CommitmentConfig) {
self.signature_subscribers
.remove(&(signature, commitment_config));
}
fn increment_invalid_block_metric(commitment_config: CommitmentConfig) {
@ -253,7 +261,9 @@ impl BlockListener {
};
// subscribers
if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) {
if let Some((_sig, mut sink)) =
self.signature_subscribers.remove(&(sig, commitment_config))
{
// none if transaction succeeded
sink.send(&RpcResponse {
context: RpcResponseContext {