Optimize RPC pubsub for multiple clients with the same subscription (#18943)
* reimplement rpc pubsub with a broadcast queue * update tests for new pubsub implementation * fix: fix review suggestions * chore(rpc): add additional pubsub metrics * integrate max subscriptions check into SubscriptionTracker to reduce locking * separate subscription control from tracker * limit memory usage of items in pubsub broadcast queue, improve error handling * add more pubsub metrics * add final count metrics to pubsub * add metric for total number of subscriptions * fix small review suggestions * remove by_params from SubscriptionTracker and add node_progress_watchers map instead * add subscription tracker tests * add metrics for number of pubsub notifications as a counter * ignore clippy lint in TokenCounter * fix underflow in token counter * reduce queue capacity in pubsub tests * fix(rpc): fix test timeouts * fix race in account subscription test * Add RpcSubscriptions::new_for_tests Co-authored-by: Pavel Strakhov <p.strakhov@iconic.vc> Co-authored-by: Nikita Podoliako <n.podoliako@zubr.io> Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
parent
fc2bf2d3b6
commit
65227f44dc
|
@ -4113,6 +4113,21 @@ dependencies = [
|
||||||
"winapi 0.3.9",
|
"winapi 0.3.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "soketto"
|
||||||
|
version = "0.6.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a74e48087dbeed4833785c2f3352b59140095dc192dce966a3bfc155020a439f"
|
||||||
|
dependencies = [
|
||||||
|
"base64 0.13.0",
|
||||||
|
"bytes 1.0.1",
|
||||||
|
"futures 0.3.17",
|
||||||
|
"httparse",
|
||||||
|
"log 0.4.14",
|
||||||
|
"rand 0.8.3",
|
||||||
|
"sha-1 0.9.6",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "solana-account-decoder"
|
name = "solana-account-decoder"
|
||||||
version = "1.8.0"
|
version = "1.8.0"
|
||||||
|
@ -4476,6 +4491,8 @@ dependencies = [
|
||||||
"itertools 0.10.1",
|
"itertools 0.10.1",
|
||||||
"jsonrpc-core",
|
"jsonrpc-core",
|
||||||
"jsonrpc-core-client",
|
"jsonrpc-core-client",
|
||||||
|
"jsonrpc-derive",
|
||||||
|
"jsonrpc-pubsub",
|
||||||
"log 0.4.14",
|
"log 0.4.14",
|
||||||
"lru",
|
"lru",
|
||||||
"matches",
|
"matches",
|
||||||
|
@ -5318,6 +5335,7 @@ dependencies = [
|
||||||
"bincode",
|
"bincode",
|
||||||
"bs58 0.4.0",
|
"bs58 0.4.0",
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
|
"dashmap",
|
||||||
"itertools 0.10.1",
|
"itertools 0.10.1",
|
||||||
"jsonrpc-core",
|
"jsonrpc-core",
|
||||||
"jsonrpc-core-client",
|
"jsonrpc-core-client",
|
||||||
|
@ -5332,6 +5350,7 @@ dependencies = [
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serial_test",
|
"serial_test",
|
||||||
|
"soketto",
|
||||||
"solana-account-decoder",
|
"solana-account-decoder",
|
||||||
"solana-client",
|
"solana-client",
|
||||||
"solana-entry",
|
"solana-entry",
|
||||||
|
@ -5353,7 +5372,9 @@ dependencies = [
|
||||||
"solana-version",
|
"solana-version",
|
||||||
"solana-vote-program",
|
"solana-vote-program",
|
||||||
"spl-token",
|
"spl-token",
|
||||||
|
"stream-cancel",
|
||||||
"symlink",
|
"symlink",
|
||||||
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
]
|
]
|
||||||
|
@ -5910,6 +5931,17 @@ version = "0.1.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
|
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "stream-cancel"
|
||||||
|
version = "0.8.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7b0a9eb2715209fb8cc0d942fcdff45674bfc9f0090a0d897e85a22955ad159b"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
"pin-project 1.0.7",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "strsim"
|
name = "strsim"
|
||||||
version = "0.8.0"
|
version = "0.8.0"
|
||||||
|
@ -6428,6 +6460,7 @@ checksum = "ebb7cb2f00c5ae8df755b252306272cd1790d39728363936e01827e11f0b017b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes 1.0.1",
|
"bytes 1.0.1",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
"futures-io",
|
||||||
"futures-sink",
|
"futures-sink",
|
||||||
"log 0.4.14",
|
"log 0.4.14",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
|
|
|
@ -49,7 +49,7 @@ pub enum UiAccountData {
|
||||||
Binary(String, UiAccountEncoding),
|
Binary(String, UiAccountEncoding),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)]
|
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub enum UiAccountEncoding {
|
pub enum UiAccountEncoding {
|
||||||
Binary, // Legacy. Retained for RPC backwards compatibility
|
Binary, // Legacy. Retained for RPC backwards compatibility
|
||||||
|
@ -179,7 +179,7 @@ impl Default for UiFeeCalculator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct UiDataSliceConfig {
|
pub struct UiDataSliceConfig {
|
||||||
pub offset: usize,
|
pub offset: usize,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub enum RpcFilterType {
|
pub enum RpcFilterType {
|
||||||
DataSize(u64),
|
DataSize(u64),
|
||||||
|
@ -40,19 +40,19 @@ pub enum RpcFilterError {
|
||||||
Base58DataTooLarge,
|
Base58DataTooLarge,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub enum MemcmpEncoding {
|
pub enum MemcmpEncoding {
|
||||||
Binary,
|
Binary,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase", untagged)]
|
#[serde(rename_all = "camelCase", untagged)]
|
||||||
pub enum MemcmpEncodedBytes {
|
pub enum MemcmpEncodedBytes {
|
||||||
Binary(String),
|
Binary(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
pub struct Memcmp {
|
pub struct Memcmp {
|
||||||
/// Data offset to begin match
|
/// Data offset to begin match
|
||||||
pub offset: usize,
|
pub offset: usize,
|
||||||
|
|
|
@ -64,6 +64,8 @@ trees = "0.4.2"
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
jsonrpc-core = "18.0.0"
|
jsonrpc-core = "18.0.0"
|
||||||
jsonrpc-core-client = { version = "18.0.0", features = ["ipc", "ws"] }
|
jsonrpc-core-client = { version = "18.0.0", features = ["ipc", "ws"] }
|
||||||
|
jsonrpc-derive = "18.0.0"
|
||||||
|
jsonrpc-pubsub = "18.0.0"
|
||||||
matches = "0.1.9"
|
matches = "0.1.9"
|
||||||
reqwest = { version = "0.11.4", default-features = false, features = ["blocking", "rustls-tls", "json"] }
|
reqwest = { version = "0.11.4", default-features = false, features = ["blocking", "rustls-tls", "json"] }
|
||||||
serde_json = "1.0.68"
|
serde_json = "1.0.68"
|
||||||
|
@ -74,6 +76,7 @@ solana-version = { path = "../version", version = "=1.8.0" }
|
||||||
static_assertions = "1.1.0"
|
static_assertions = "1.1.0"
|
||||||
systemstat = "0.1.8"
|
systemstat = "0.1.8"
|
||||||
|
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
rustc_version = "0.4"
|
rustc_version = "0.4"
|
||||||
|
|
||||||
|
|
|
@ -1530,7 +1530,7 @@ mod tests {
|
||||||
let vote_tracker = VoteTracker::new(&bank);
|
let vote_tracker = VoteTracker::new(&bank);
|
||||||
let optimistically_confirmed_bank =
|
let optimistically_confirmed_bank =
|
||||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||||
let subscriptions = Arc::new(RpcSubscriptions::new(
|
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
||||||
|
@ -1649,7 +1649,7 @@ mod tests {
|
||||||
let bank = bank_forks.read().unwrap().get(0).unwrap().clone();
|
let bank = bank_forks.read().unwrap().get(0).unwrap().clone();
|
||||||
let optimistically_confirmed_bank =
|
let optimistically_confirmed_bank =
|
||||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||||
let subscriptions = Arc::new(RpcSubscriptions::new(
|
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
||||||
|
|
|
@ -3014,7 +3014,7 @@ pub mod tests {
|
||||||
let optimistically_confirmed_bank =
|
let optimistically_confirmed_bank =
|
||||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(bank_forks);
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(bank_forks);
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let rpc_subscriptions = Arc::new(RpcSubscriptions::new(
|
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
||||||
|
@ -3545,7 +3545,7 @@ pub mod tests {
|
||||||
&replay_vote_sender,
|
&replay_vote_sender,
|
||||||
&VerifyRecyclers::default(),
|
&VerifyRecyclers::default(),
|
||||||
);
|
);
|
||||||
let rpc_subscriptions = Arc::new(RpcSubscriptions::new(
|
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
block_commitment_cache,
|
block_commitment_cache,
|
||||||
|
@ -3613,7 +3613,7 @@ pub mod tests {
|
||||||
|
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
|
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
|
||||||
let rpc_subscriptions = Arc::new(RpcSubscriptions::new(
|
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
block_commitment_cache.clone(),
|
block_commitment_cache.clone(),
|
||||||
|
|
|
@ -452,7 +452,7 @@ pub mod tests {
|
||||||
},
|
},
|
||||||
blockstore,
|
blockstore,
|
||||||
ledger_signal_receiver,
|
ledger_signal_receiver,
|
||||||
&Arc::new(RpcSubscriptions::new(
|
&Arc::new(RpcSubscriptions::new_for_tests(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
block_commitment_cache.clone(),
|
block_commitment_cache.clone(),
|
||||||
|
|
|
@ -471,12 +471,12 @@ impl Validator {
|
||||||
let optimistically_confirmed_bank =
|
let optimistically_confirmed_bank =
|
||||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||||
|
|
||||||
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_vote_subscription(
|
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
block_commitment_cache.clone(),
|
block_commitment_cache.clone(),
|
||||||
optimistically_confirmed_bank.clone(),
|
optimistically_confirmed_bank.clone(),
|
||||||
config.pubsub_config.enable_vote_subscription,
|
&config.pubsub_config,
|
||||||
));
|
));
|
||||||
|
|
||||||
let max_slots = Arc::new(MaxSlots::default());
|
let max_slots = Arc::new(MaxSlots::default());
|
||||||
|
@ -577,12 +577,18 @@ impl Validator {
|
||||||
if config.rpc_config.minimal_api {
|
if config.rpc_config.minimal_api {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some(PubSubService::new(
|
let (trigger, pubsub_service) = PubSubService::new(
|
||||||
config.pubsub_config.clone(),
|
config.pubsub_config.clone(),
|
||||||
&rpc_subscriptions,
|
&rpc_subscriptions,
|
||||||
rpc_pubsub_addr,
|
rpc_pubsub_addr,
|
||||||
&exit,
|
);
|
||||||
))
|
config
|
||||||
|
.validator_exit
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.register_exit(Box::new(move || trigger.cancel()));
|
||||||
|
|
||||||
|
Some(pubsub_service)
|
||||||
},
|
},
|
||||||
Some(OptimisticallyConfirmedBankTracker::new(
|
Some(OptimisticallyConfirmedBankTracker::new(
|
||||||
bank_notification_receiver,
|
bank_notification_receiver,
|
||||||
|
|
|
@ -98,14 +98,14 @@ fn test_slot_subscription() {
|
||||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
||||||
let optimistically_confirmed_bank =
|
let optimistically_confirmed_bank =
|
||||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||||
let subscriptions = Arc::new(RpcSubscriptions::new(
|
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
||||||
optimistically_confirmed_bank,
|
optimistically_confirmed_bank,
|
||||||
));
|
));
|
||||||
let pubsub_service =
|
let (trigger, pubsub_service) =
|
||||||
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr, &exit);
|
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
|
||||||
std::thread::sleep(Duration::from_millis(400));
|
std::thread::sleep(Duration::from_millis(400));
|
||||||
|
|
||||||
let (mut client, receiver) =
|
let (mut client, receiver) =
|
||||||
|
@ -138,6 +138,7 @@ fn test_slot_subscription() {
|
||||||
}
|
}
|
||||||
|
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
trigger.cancel();
|
||||||
client.shutdown().unwrap();
|
client.shutdown().unwrap();
|
||||||
pubsub_service.close().unwrap();
|
pubsub_service.close().unwrap();
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
use jsonrpc_core::futures::StreamExt;
|
use jsonrpc_core::futures::StreamExt;
|
||||||
use jsonrpc_core_client::transports::ws;
|
use jsonrpc_core_client::transports::ws;
|
||||||
|
|
||||||
use log::*;
|
use log::*;
|
||||||
use reqwest::{self, header::CONTENT_TYPE};
|
use reqwest::{self, header::CONTENT_TYPE};
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
|
@ -10,11 +11,12 @@ use solana_client::{
|
||||||
rpc_client::RpcClient,
|
rpc_client::RpcClient,
|
||||||
rpc_config::{RpcAccountInfoConfig, RpcSignatureSubscribeConfig},
|
rpc_config::{RpcAccountInfoConfig, RpcSignatureSubscribeConfig},
|
||||||
rpc_request::RpcError,
|
rpc_request::RpcError,
|
||||||
rpc_response::{Response, RpcSignatureResult, SlotUpdate},
|
rpc_response::{Response as RpcResponse, RpcSignatureResult, SlotUpdate},
|
||||||
tpu_client::{TpuClient, TpuClientConfig},
|
tpu_client::{TpuClient, TpuClientConfig},
|
||||||
};
|
};
|
||||||
use solana_core::test_validator::TestValidator;
|
use solana_core::test_validator::TestValidator;
|
||||||
use solana_rpc::rpc_pubsub::gen_client::Client as PubsubClient;
|
use solana_rpc::rpc_pubsub::gen_client::Client as PubsubClient;
|
||||||
|
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
commitment_config::CommitmentConfig,
|
commitment_config::CommitmentConfig,
|
||||||
hash::Hash,
|
hash::Hash,
|
||||||
|
@ -256,9 +258,9 @@ fn test_rpc_subscriptions() {
|
||||||
// Track when subscriptions are ready
|
// Track when subscriptions are ready
|
||||||
let (ready_sender, ready_receiver) = channel::<()>();
|
let (ready_sender, ready_receiver) = channel::<()>();
|
||||||
// Track account notifications are received
|
// Track account notifications are received
|
||||||
let (account_sender, account_receiver) = channel::<Response<UiAccount>>();
|
let (account_sender, account_receiver) = channel::<RpcResponse<UiAccount>>();
|
||||||
// Track when status notifications are received
|
// Track when status notifications are received
|
||||||
let (status_sender, status_receiver) = channel::<(String, Response<RpcSignatureResult>)>();
|
let (status_sender, status_receiver) = channel::<(String, RpcResponse<RpcSignatureResult>)>();
|
||||||
|
|
||||||
// Create the pub sub runtime
|
// Create the pub sub runtime
|
||||||
let rt = Runtime::new().unwrap();
|
let rt = Runtime::new().unwrap();
|
||||||
|
|
|
@ -3,3 +3,61 @@ pub mod counter;
|
||||||
pub mod datapoint;
|
pub mod datapoint;
|
||||||
mod metrics;
|
mod metrics;
|
||||||
pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit};
|
pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit};
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
/// A helper that sends the count of created tokens as a datapoint.
|
||||||
|
#[allow(clippy::redundant_allocation)]
|
||||||
|
pub struct TokenCounter(Arc<&'static str>);
|
||||||
|
|
||||||
|
impl TokenCounter {
|
||||||
|
/// Creates a new counter with the specified metrics `name`.
|
||||||
|
pub fn new(name: &'static str) -> Self {
|
||||||
|
Self(Arc::new(name))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a new token for this counter. The metric's value will be equal
|
||||||
|
/// to the number of `CounterToken`s.
|
||||||
|
pub fn create_token(&self) -> CounterToken {
|
||||||
|
// new_count = strong_count
|
||||||
|
// - 1 (in TokenCounter)
|
||||||
|
// + 1 (token that's being created)
|
||||||
|
datapoint_info!(*self.0, ("count", Arc::strong_count(&self.0), i64));
|
||||||
|
CounterToken(self.0.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A token for `TokenCounter`.
|
||||||
|
#[allow(clippy::redundant_allocation)]
|
||||||
|
pub struct CounterToken(Arc<&'static str>);
|
||||||
|
|
||||||
|
impl Clone for CounterToken {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
// new_count = strong_count
|
||||||
|
// - 1 (in TokenCounter)
|
||||||
|
// + 1 (token that's being created)
|
||||||
|
datapoint_info!(*self.0, ("count", Arc::strong_count(&self.0), i64));
|
||||||
|
CounterToken(self.0.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for CounterToken {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// new_count = strong_count
|
||||||
|
// - 1 (in TokenCounter, if it still exists)
|
||||||
|
// - 1 (token that's being dropped)
|
||||||
|
datapoint_info!(
|
||||||
|
*self.0,
|
||||||
|
("count", Arc::strong_count(&self.0).saturating_sub(2), i64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for TokenCounter {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
datapoint_info!(
|
||||||
|
*self.0,
|
||||||
|
("count", Arc::strong_count(&self.0).saturating_sub(2), i64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -208,6 +208,17 @@ fn start_client_rpc_services(
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let (trigger, pubsub_service) = PubSubService::new(
|
||||||
|
replica_config.pubsub_config.clone(),
|
||||||
|
&subscriptions,
|
||||||
|
replica_config.rpc_pubsub_addr,
|
||||||
|
);
|
||||||
|
replica_config
|
||||||
|
.replica_exit
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.register_exit(Box::new(move || trigger.cancel()));
|
||||||
|
|
||||||
let (_bank_notification_sender, bank_notification_receiver) = unbounded();
|
let (_bank_notification_sender, bank_notification_receiver) = unbounded();
|
||||||
(
|
(
|
||||||
Some(JsonRpcService::new(
|
Some(JsonRpcService::new(
|
||||||
|
@ -231,18 +242,13 @@ fn start_client_rpc_services(
|
||||||
leader_schedule_cache.clone(),
|
leader_schedule_cache.clone(),
|
||||||
max_complete_transaction_status_slot,
|
max_complete_transaction_status_slot,
|
||||||
)),
|
)),
|
||||||
Some(PubSubService::new(
|
Some(pubsub_service),
|
||||||
replica_config.pubsub_config.clone(),
|
|
||||||
&subscriptions,
|
|
||||||
replica_config.rpc_pubsub_addr,
|
|
||||||
&exit,
|
|
||||||
)),
|
|
||||||
Some(OptimisticallyConfirmedBankTracker::new(
|
Some(OptimisticallyConfirmedBankTracker::new(
|
||||||
bank_notification_receiver,
|
bank_notification_receiver,
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
optimistically_confirmed_bank.clone(),
|
optimistically_confirmed_bank.clone(),
|
||||||
subscriptions.clone(),
|
subscriptions,
|
||||||
None,
|
None,
|
||||||
)),
|
)),
|
||||||
)
|
)
|
||||||
|
|
|
@ -14,6 +14,7 @@ base64 = "0.12.3"
|
||||||
bincode = "1.3.3"
|
bincode = "1.3.3"
|
||||||
bs58 = "0.4.0"
|
bs58 = "0.4.0"
|
||||||
crossbeam-channel = "0.5"
|
crossbeam-channel = "0.5"
|
||||||
|
dashmap = "4.0.2"
|
||||||
itertools = "0.10.1"
|
itertools = "0.10.1"
|
||||||
jsonrpc-core = "18.0.0"
|
jsonrpc-core = "18.0.0"
|
||||||
jsonrpc-core-client = { version = "18.0.0", features = ["ipc", "ws"] }
|
jsonrpc-core-client = { version = "18.0.0", features = ["ipc", "ws"] }
|
||||||
|
@ -27,6 +28,7 @@ regex = "1.5.4"
|
||||||
serde = "1.0.130"
|
serde = "1.0.130"
|
||||||
serde_derive = "1.0.103"
|
serde_derive = "1.0.103"
|
||||||
serde_json = "1.0.68"
|
serde_json = "1.0.68"
|
||||||
|
soketto = "0.6"
|
||||||
solana-account-decoder = { path = "../account-decoder", version = "=1.8.0" }
|
solana-account-decoder = { path = "../account-decoder", version = "=1.8.0" }
|
||||||
solana-client = { path = "../client", version = "=1.8.0" }
|
solana-client = { path = "../client", version = "=1.8.0" }
|
||||||
solana-entry = { path = "../entry", version = "=1.8.0" }
|
solana-entry = { path = "../entry", version = "=1.8.0" }
|
||||||
|
@ -46,8 +48,10 @@ solana-transaction-status = { path = "../transaction-status", version = "=1.8.0"
|
||||||
solana-version = { path = "../version", version = "=1.8.0" }
|
solana-version = { path = "../version", version = "=1.8.0" }
|
||||||
solana-vote-program = { path = "../programs/vote", version = "=1.8.0" }
|
solana-vote-program = { path = "../programs/vote", version = "=1.8.0" }
|
||||||
spl-token-v2-0 = { package = "spl-token", version = "=3.2.0", features = ["no-entrypoint"] }
|
spl-token-v2-0 = { package = "spl-token", version = "=3.2.0", features = ["no-entrypoint"] }
|
||||||
|
stream-cancel = "0.8.1"
|
||||||
|
thiserror = "1.0"
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
tokio-util = { version = "0.6", features = ["codec"] }
|
tokio-util = { version = "0.6", features = ["codec", "compat"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
serial_test = "0.5.1"
|
serial_test = "0.5.1"
|
||||||
|
|
|
@ -9,6 +9,7 @@ pub mod rpc_health;
|
||||||
pub mod rpc_pubsub;
|
pub mod rpc_pubsub;
|
||||||
pub mod rpc_pubsub_service;
|
pub mod rpc_pubsub_service;
|
||||||
pub mod rpc_service;
|
pub mod rpc_service;
|
||||||
|
pub mod rpc_subscription_tracker;
|
||||||
pub mod rpc_subscriptions;
|
pub mod rpc_subscriptions;
|
||||||
pub mod transaction_status_service;
|
pub mod transaction_status_service;
|
||||||
|
|
||||||
|
|
|
@ -324,7 +324,7 @@ mod tests {
|
||||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||||
|
|
||||||
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
|
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
|
||||||
let subscriptions = Arc::new(RpcSubscriptions::new(
|
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
block_commitment_cache,
|
block_commitment_cache,
|
||||||
|
|
|
@ -7716,7 +7716,7 @@ pub mod tests {
|
||||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||||
let mut pending_optimistically_confirmed_banks = HashSet::new();
|
let mut pending_optimistically_confirmed_banks = HashSet::new();
|
||||||
|
|
||||||
let subscriptions = Arc::new(RpcSubscriptions::new(
|
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
block_commitment_cache.clone(),
|
block_commitment_cache.clone(),
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -2,45 +2,60 @@
|
||||||
|
|
||||||
use {
|
use {
|
||||||
crate::{
|
crate::{
|
||||||
rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl, MAX_ACTIVE_SUBSCRIPTIONS},
|
rpc_pubsub::{RpcSolPubSubImpl, RpcSolPubSubInternal},
|
||||||
rpc_subscriptions::RpcSubscriptions,
|
rpc_subscription_tracker::{
|
||||||
},
|
SubscriptionControl, SubscriptionId, SubscriptionParams, SubscriptionToken,
|
||||||
jsonrpc_pubsub::{PubSubHandler, Session},
|
|
||||||
jsonrpc_ws_server::{RequestContext, ServerBuilder},
|
|
||||||
std::{
|
|
||||||
net::SocketAddr,
|
|
||||||
sync::{
|
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
Arc,
|
|
||||||
},
|
},
|
||||||
thread::{self, sleep, Builder, JoinHandle},
|
rpc_subscriptions::{RpcNotification, RpcSubscriptions},
|
||||||
time::Duration,
|
|
||||||
},
|
},
|
||||||
|
dashmap::{mapref::entry::Entry, DashMap},
|
||||||
|
jsonrpc_core::IoHandler,
|
||||||
|
soketto::handshake::{server, Server},
|
||||||
|
solana_metrics::TokenCounter,
|
||||||
|
std::{
|
||||||
|
io,
|
||||||
|
net::SocketAddr,
|
||||||
|
str,
|
||||||
|
sync::Arc,
|
||||||
|
thread::{self, Builder, JoinHandle},
|
||||||
|
},
|
||||||
|
stream_cancel::{Trigger, Tripwire},
|
||||||
|
thiserror::Error,
|
||||||
|
tokio::{net::TcpStream, pin, select, sync::broadcast},
|
||||||
|
tokio_util::compat::TokioAsyncReadCompatExt,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 1_000_000;
|
||||||
|
pub const DEFAULT_QUEUE_CAPACITY_ITEMS: usize = 10_000_000;
|
||||||
|
pub const DEFAULT_TEST_QUEUE_CAPACITY_ITEMS: usize = 100;
|
||||||
|
pub const DEFAULT_QUEUE_CAPACITY_BYTES: usize = 256 * 1024 * 1024;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct PubSubConfig {
|
pub struct PubSubConfig {
|
||||||
pub enable_vote_subscription: bool,
|
pub enable_vote_subscription: bool,
|
||||||
|
|
||||||
// See the corresponding fields in
|
|
||||||
// https://github.com/paritytech/ws-rs/blob/be4d47575bae55c60d9f51b47480d355492a94fc/src/lib.rs#L131
|
|
||||||
// for a complete description of each field in this struct
|
|
||||||
pub max_connections: usize,
|
|
||||||
pub max_fragment_size: usize,
|
|
||||||
pub max_in_buffer_capacity: usize,
|
|
||||||
pub max_out_buffer_capacity: usize,
|
|
||||||
pub max_active_subscriptions: usize,
|
pub max_active_subscriptions: usize,
|
||||||
|
pub queue_capacity_items: usize,
|
||||||
|
pub queue_capacity_bytes: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for PubSubConfig {
|
impl Default for PubSubConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
enable_vote_subscription: false,
|
enable_vote_subscription: false,
|
||||||
max_connections: 1000, // Arbitrary, default of 100 is too low
|
|
||||||
max_fragment_size: 50 * 1024, // 50KB
|
|
||||||
max_in_buffer_capacity: 50 * 1024, // 50KB
|
|
||||||
max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc
|
|
||||||
max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
|
max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
|
||||||
|
queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS,
|
||||||
|
queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PubSubConfig {
|
||||||
|
pub fn default_for_tests() -> Self {
|
||||||
|
Self {
|
||||||
|
enable_vote_subscription: false,
|
||||||
|
max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
|
||||||
|
queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS,
|
||||||
|
queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -54,51 +69,30 @@ impl PubSubService {
|
||||||
pubsub_config: PubSubConfig,
|
pubsub_config: PubSubConfig,
|
||||||
subscriptions: &Arc<RpcSubscriptions>,
|
subscriptions: &Arc<RpcSubscriptions>,
|
||||||
pubsub_addr: SocketAddr,
|
pubsub_addr: SocketAddr,
|
||||||
exit: &Arc<AtomicBool>,
|
) -> (Trigger, Self) {
|
||||||
) -> Self {
|
let subscription_control = subscriptions.control().clone();
|
||||||
info!("rpc_pubsub bound to {:?}", pubsub_addr);
|
info!("rpc_pubsub bound to {:?}", pubsub_addr);
|
||||||
let rpc = RpcSolPubSubImpl::new(
|
|
||||||
subscriptions.clone(),
|
|
||||||
pubsub_config.max_active_subscriptions,
|
|
||||||
);
|
|
||||||
let exit_ = exit.clone();
|
|
||||||
|
|
||||||
|
let (trigger, tripwire) = Tripwire::new();
|
||||||
let thread_hdl = Builder::new()
|
let thread_hdl = Builder::new()
|
||||||
.name("solana-pubsub".to_string())
|
.name("solana-pubsub".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let mut io = PubSubHandler::default();
|
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||||
io.extend_with(rpc.to_delegate());
|
.enable_all()
|
||||||
|
.build()
|
||||||
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
|
.expect("runtime creation failed");
|
||||||
info!("New pubsub connection");
|
if let Err(err) = runtime.block_on(listen(
|
||||||
let session = Arc::new(Session::new(context.sender()));
|
pubsub_addr,
|
||||||
session.on_drop(|| {
|
pubsub_config,
|
||||||
info!("Pubsub connection dropped");
|
subscription_control,
|
||||||
});
|
tripwire,
|
||||||
session
|
)) {
|
||||||
})
|
error!("pubsub service failed: {}", err);
|
||||||
.max_connections(pubsub_config.max_connections)
|
};
|
||||||
.max_payload(pubsub_config.max_fragment_size)
|
|
||||||
.max_in_buffer_capacity(pubsub_config.max_in_buffer_capacity)
|
|
||||||
.max_out_buffer_capacity(pubsub_config.max_out_buffer_capacity)
|
|
||||||
.start(&pubsub_addr);
|
|
||||||
|
|
||||||
if let Err(e) = server {
|
|
||||||
warn!(
|
|
||||||
"Pubsub service unavailable error: {:?}. \n\
|
|
||||||
Also, check that port {} is not already in use by another application",
|
|
||||||
e,
|
|
||||||
pubsub_addr.port()
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
while !exit_.load(Ordering::Relaxed) {
|
|
||||||
sleep(Duration::from_millis(100));
|
|
||||||
}
|
|
||||||
server.unwrap().close();
|
|
||||||
})
|
})
|
||||||
.unwrap();
|
.expect("thread spawn failed");
|
||||||
Self { thread_hdl }
|
|
||||||
|
(trigger, Self { thread_hdl })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn close(self) -> thread::Result<()> {
|
pub fn close(self) -> thread::Result<()> {
|
||||||
|
@ -110,6 +104,244 @@ impl PubSubService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct BroadcastHandler {
|
||||||
|
current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionToken>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn count_final(params: &SubscriptionParams) {
|
||||||
|
match params {
|
||||||
|
SubscriptionParams::Account(_) => {
|
||||||
|
inc_new_counter_info!("rpc-pubsub-final-accounts", 1);
|
||||||
|
}
|
||||||
|
SubscriptionParams::Logs(_) => {
|
||||||
|
inc_new_counter_info!("rpc-pubsub-final-logs", 1);
|
||||||
|
}
|
||||||
|
SubscriptionParams::Program(_) => {
|
||||||
|
inc_new_counter_info!("rpc-pubsub-final-programs", 1);
|
||||||
|
}
|
||||||
|
SubscriptionParams::Signature(_) => {
|
||||||
|
inc_new_counter_info!("rpc-pubsub-final-signatures", 1);
|
||||||
|
}
|
||||||
|
SubscriptionParams::Slot => {
|
||||||
|
inc_new_counter_info!("rpc-pubsub-final-slots", 1);
|
||||||
|
}
|
||||||
|
SubscriptionParams::SlotsUpdates => {
|
||||||
|
inc_new_counter_info!("rpc-pubsub-final-slots-updates", 1);
|
||||||
|
}
|
||||||
|
SubscriptionParams::Root => {
|
||||||
|
inc_new_counter_info!("rpc-pubsub-final-roots", 1);
|
||||||
|
}
|
||||||
|
SubscriptionParams::Vote => {
|
||||||
|
inc_new_counter_info!("rpc-pubsub-final-votes", 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BroadcastHandler {
|
||||||
|
fn handle(&self, notification: RpcNotification) -> Result<Option<Arc<String>>, Error> {
|
||||||
|
if let Entry::Occupied(entry) = self
|
||||||
|
.current_subscriptions
|
||||||
|
.entry(notification.subscription_id)
|
||||||
|
{
|
||||||
|
count_final(entry.get().params());
|
||||||
|
|
||||||
|
if notification.is_final {
|
||||||
|
entry.remove();
|
||||||
|
}
|
||||||
|
notification
|
||||||
|
.json
|
||||||
|
.upgrade()
|
||||||
|
.ok_or(Error::NotificationIsGone)
|
||||||
|
.map(Some)
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub struct TestBroadcastReceiver {
|
||||||
|
handler: BroadcastHandler,
|
||||||
|
inner: tokio::sync::broadcast::Receiver<RpcNotification>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
impl TestBroadcastReceiver {
|
||||||
|
pub fn recv(&mut self) -> String {
|
||||||
|
use std::thread::sleep;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
use tokio::sync::broadcast::error::TryRecvError;
|
||||||
|
|
||||||
|
let timeout = Duration::from_millis(500);
|
||||||
|
let started = Instant::now();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.inner.try_recv() {
|
||||||
|
Ok(notification) => {
|
||||||
|
if let Some(json) = self.handler.handle(notification).expect("handler failed") {
|
||||||
|
return json.to_string();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(TryRecvError::Empty) => {
|
||||||
|
if started.elapsed() > timeout {
|
||||||
|
panic!("TestBroadcastReceiver: no data, timeout reached");
|
||||||
|
}
|
||||||
|
sleep(Duration::from_millis(50));
|
||||||
|
}
|
||||||
|
Err(err) => panic!("broadcast receiver error: {}", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn test_connection(
|
||||||
|
subscriptions: &Arc<RpcSubscriptions>,
|
||||||
|
) -> (RpcSolPubSubImpl, TestBroadcastReceiver) {
|
||||||
|
let current_subscriptions = Arc::new(DashMap::new());
|
||||||
|
|
||||||
|
let rpc_impl = RpcSolPubSubImpl::new(
|
||||||
|
PubSubConfig {
|
||||||
|
enable_vote_subscription: true,
|
||||||
|
queue_capacity_items: 100,
|
||||||
|
..PubSubConfig::default()
|
||||||
|
},
|
||||||
|
subscriptions.control().clone(),
|
||||||
|
Arc::clone(¤t_subscriptions),
|
||||||
|
);
|
||||||
|
let broadcast_handler = BroadcastHandler {
|
||||||
|
current_subscriptions,
|
||||||
|
};
|
||||||
|
let receiver = TestBroadcastReceiver {
|
||||||
|
inner: subscriptions.control().broadcast_receiver(),
|
||||||
|
handler: broadcast_handler,
|
||||||
|
};
|
||||||
|
(rpc_impl, receiver)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
enum Error {
|
||||||
|
#[error("handshake error: {0}")]
|
||||||
|
Handshake(#[from] soketto::handshake::Error),
|
||||||
|
#[error("connection error: {0}")]
|
||||||
|
Connection(#[from] soketto::connection::Error),
|
||||||
|
#[error("broadcast queue error: {0}")]
|
||||||
|
Broadcast(#[from] broadcast::error::RecvError),
|
||||||
|
#[error("client has lagged behind (notification is gone)")]
|
||||||
|
NotificationIsGone,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_connection(
|
||||||
|
socket: TcpStream,
|
||||||
|
subscription_control: SubscriptionControl,
|
||||||
|
config: PubSubConfig,
|
||||||
|
mut tripwire: Tripwire,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut server = Server::new(socket.compat());
|
||||||
|
let request = server.receive_request().await?;
|
||||||
|
let accept = server::Response::Accept {
|
||||||
|
key: request.key(),
|
||||||
|
protocol: None,
|
||||||
|
};
|
||||||
|
server.send_response(&accept).await?;
|
||||||
|
let (mut sender, mut receiver) = server.into_builder().finish();
|
||||||
|
|
||||||
|
let mut broadcast_receiver = subscription_control.broadcast_receiver();
|
||||||
|
let mut data = Vec::new();
|
||||||
|
let current_subscriptions = Arc::new(DashMap::new());
|
||||||
|
|
||||||
|
let mut json_rpc_handler = IoHandler::new();
|
||||||
|
let rpc_impl = RpcSolPubSubImpl::new(
|
||||||
|
config,
|
||||||
|
subscription_control,
|
||||||
|
Arc::clone(¤t_subscriptions),
|
||||||
|
);
|
||||||
|
json_rpc_handler.extend_with(rpc_impl.to_delegate());
|
||||||
|
let broadcast_handler = BroadcastHandler {
|
||||||
|
current_subscriptions,
|
||||||
|
};
|
||||||
|
loop {
|
||||||
|
// Extra block for dropping `receive_future`.
|
||||||
|
{
|
||||||
|
// soketto is not cancel safe, so we have to introduce an inner loop to poll
|
||||||
|
// `receive_data` to completion.
|
||||||
|
let receive_future = receiver.receive_data(&mut data);
|
||||||
|
pin!(receive_future);
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
result = &mut receive_future => match result {
|
||||||
|
Ok(_) => break,
|
||||||
|
Err(soketto::connection::Error::Closed) => return Ok(()),
|
||||||
|
Err(err) => return Err(err.into()),
|
||||||
|
},
|
||||||
|
result = broadcast_receiver.recv() => {
|
||||||
|
|
||||||
|
// In both possible error cases (closed or lagged) we disconnect the client.
|
||||||
|
if let Some(json) = broadcast_handler.handle(result?)? {
|
||||||
|
sender.send_text(&*json).await?;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ = &mut tripwire => {
|
||||||
|
warn!("disconnecting websocket client: shutting down");
|
||||||
|
return Ok(())
|
||||||
|
},
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let data_str = match str::from_utf8(&data) {
|
||||||
|
Ok(str) => str,
|
||||||
|
Err(_) => {
|
||||||
|
// Old implementation just closes the connection, so we preserve that behavior
|
||||||
|
// for now. It would be more correct to respond with an error.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(response) = json_rpc_handler.handle_request(data_str).await {
|
||||||
|
sender.send_text(&response).await?;
|
||||||
|
}
|
||||||
|
data.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn listen(
|
||||||
|
listen_address: SocketAddr,
|
||||||
|
config: PubSubConfig,
|
||||||
|
subscription_control: SubscriptionControl,
|
||||||
|
mut tripwire: Tripwire,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
let listener = tokio::net::TcpListener::bind(&listen_address).await?;
|
||||||
|
let counter = TokenCounter::new("rpc_pubsub_connections");
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
result = listener.accept() => match result {
|
||||||
|
Ok((socket, addr)) => {
|
||||||
|
debug!("new client ({:?})", addr);
|
||||||
|
let subscription_control = subscription_control.clone();
|
||||||
|
let config = config.clone();
|
||||||
|
let tripwire = tripwire.clone();
|
||||||
|
let counter_token = counter.create_token();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let handle = handle_connection(
|
||||||
|
socket, subscription_control, config, tripwire
|
||||||
|
);
|
||||||
|
match handle.await {
|
||||||
|
Ok(()) => debug!("connection closed ({:?})", addr),
|
||||||
|
Err(err) => warn!("connection handler error ({:?}): {}", addr, err),
|
||||||
|
}
|
||||||
|
drop(counter_token); // Force moving token into the task.
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => error!("couldn't accept connection: {:?}", e),
|
||||||
|
},
|
||||||
|
_ = &mut tripwire => return Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use {
|
use {
|
||||||
|
@ -123,7 +355,7 @@ mod tests {
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
net::{IpAddr, Ipv4Addr},
|
net::{IpAddr, Ipv4Addr},
|
||||||
sync::RwLock,
|
sync::{atomic::AtomicBool, RwLock},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -136,14 +368,14 @@ mod tests {
|
||||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
||||||
let optimistically_confirmed_bank =
|
let optimistically_confirmed_bank =
|
||||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||||
let subscriptions = Arc::new(RpcSubscriptions::new(
|
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
|
||||||
&exit,
|
&exit,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
|
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
|
||||||
optimistically_confirmed_bank,
|
optimistically_confirmed_bank,
|
||||||
));
|
));
|
||||||
let pubsub_service =
|
let (_trigger, pubsub_service) =
|
||||||
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr, &exit);
|
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
|
||||||
let thread = pubsub_service.thread_hdl.thread();
|
let thread = pubsub_service.thread_hdl.thread();
|
||||||
assert_eq!(thread.name().unwrap(), "solana-pubsub");
|
assert_eq!(thread.name().unwrap(), "solana-pubsub");
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,731 @@
|
||||||
|
use {
|
||||||
|
crate::rpc_subscriptions::{NotificationEntry, RpcNotification},
|
||||||
|
dashmap::{mapref::entry::Entry as DashEntry, DashMap},
|
||||||
|
solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig},
|
||||||
|
solana_client::rpc_filter::RpcFilterType,
|
||||||
|
solana_metrics::{CounterToken, TokenCounter},
|
||||||
|
solana_runtime::{
|
||||||
|
bank::{TransactionLogCollectorConfig, TransactionLogCollectorFilter},
|
||||||
|
bank_forks::BankForks,
|
||||||
|
},
|
||||||
|
solana_sdk::{
|
||||||
|
clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature,
|
||||||
|
},
|
||||||
|
std::{
|
||||||
|
collections::{
|
||||||
|
hash_map::{Entry, HashMap},
|
||||||
|
HashSet,
|
||||||
|
},
|
||||||
|
fmt,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicU64, Ordering},
|
||||||
|
Arc, RwLock, Weak,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
thiserror::Error,
|
||||||
|
tokio::sync::broadcast,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
|
pub struct SubscriptionId(u64);
|
||||||
|
|
||||||
|
impl From<u64> for SubscriptionId {
|
||||||
|
fn from(value: u64) -> Self {
|
||||||
|
SubscriptionId(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<SubscriptionId> for u64 {
|
||||||
|
fn from(value: SubscriptionId) -> Self {
|
||||||
|
value.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub enum SubscriptionParams {
|
||||||
|
Account(AccountSubscriptionParams),
|
||||||
|
Logs(LogsSubscriptionParams),
|
||||||
|
Program(ProgramSubscriptionParams),
|
||||||
|
Signature(SignatureSubscriptionParams),
|
||||||
|
Slot,
|
||||||
|
SlotsUpdates,
|
||||||
|
Root,
|
||||||
|
Vote,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SubscriptionParams {
|
||||||
|
fn method(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
SubscriptionParams::Account(_) => "accountNotification",
|
||||||
|
SubscriptionParams::Logs(_) => "logsNotification",
|
||||||
|
SubscriptionParams::Program(_) => "programNotification",
|
||||||
|
SubscriptionParams::Signature(_) => "signatureNotification",
|
||||||
|
SubscriptionParams::Slot => "slotNotification",
|
||||||
|
SubscriptionParams::SlotsUpdates => "slotsUpdatesNotification",
|
||||||
|
SubscriptionParams::Root => "rootNotification",
|
||||||
|
SubscriptionParams::Vote => "voteNotification",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn commitment(&self) -> Option<CommitmentConfig> {
|
||||||
|
match self {
|
||||||
|
SubscriptionParams::Account(params) => Some(params.commitment),
|
||||||
|
SubscriptionParams::Logs(params) => Some(params.commitment),
|
||||||
|
SubscriptionParams::Program(params) => Some(params.commitment),
|
||||||
|
SubscriptionParams::Signature(params) => Some(params.commitment),
|
||||||
|
SubscriptionParams::Slot
|
||||||
|
| SubscriptionParams::SlotsUpdates
|
||||||
|
| SubscriptionParams::Root
|
||||||
|
| SubscriptionParams::Vote => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_commitment_watcher(&self) -> bool {
|
||||||
|
let commitment = match self {
|
||||||
|
SubscriptionParams::Account(params) => ¶ms.commitment,
|
||||||
|
SubscriptionParams::Logs(params) => ¶ms.commitment,
|
||||||
|
SubscriptionParams::Program(params) => ¶ms.commitment,
|
||||||
|
SubscriptionParams::Signature(params) => ¶ms.commitment,
|
||||||
|
SubscriptionParams::Slot
|
||||||
|
| SubscriptionParams::SlotsUpdates
|
||||||
|
| SubscriptionParams::Root
|
||||||
|
| SubscriptionParams::Vote => return false,
|
||||||
|
};
|
||||||
|
!commitment.is_confirmed()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_gossip_watcher(&self) -> bool {
|
||||||
|
let commitment = match self {
|
||||||
|
SubscriptionParams::Account(params) => ¶ms.commitment,
|
||||||
|
SubscriptionParams::Logs(params) => ¶ms.commitment,
|
||||||
|
SubscriptionParams::Program(params) => ¶ms.commitment,
|
||||||
|
SubscriptionParams::Signature(params) => ¶ms.commitment,
|
||||||
|
SubscriptionParams::Slot
|
||||||
|
| SubscriptionParams::SlotsUpdates
|
||||||
|
| SubscriptionParams::Root
|
||||||
|
| SubscriptionParams::Vote => return false,
|
||||||
|
};
|
||||||
|
commitment.is_confirmed()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_node_progress_watcher(&self) -> bool {
|
||||||
|
matches!(
|
||||||
|
self,
|
||||||
|
SubscriptionParams::Slot
|
||||||
|
| SubscriptionParams::SlotsUpdates
|
||||||
|
| SubscriptionParams::Root
|
||||||
|
| SubscriptionParams::Vote
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct AccountSubscriptionParams {
|
||||||
|
pub pubkey: Pubkey,
|
||||||
|
pub encoding: UiAccountEncoding,
|
||||||
|
pub data_slice: Option<UiDataSliceConfig>,
|
||||||
|
pub commitment: CommitmentConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct LogsSubscriptionParams {
|
||||||
|
pub kind: LogsSubscriptionKind,
|
||||||
|
pub commitment: CommitmentConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub enum LogsSubscriptionKind {
|
||||||
|
All,
|
||||||
|
AllWithVotes,
|
||||||
|
Single(Pubkey),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct ProgramSubscriptionParams {
|
||||||
|
pub pubkey: Pubkey,
|
||||||
|
pub filters: Vec<RpcFilterType>,
|
||||||
|
pub encoding: UiAccountEncoding,
|
||||||
|
pub data_slice: Option<UiDataSliceConfig>,
|
||||||
|
pub commitment: CommitmentConfig,
|
||||||
|
pub with_context: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct SignatureSubscriptionParams {
|
||||||
|
pub signature: Signature,
|
||||||
|
pub commitment: CommitmentConfig,
|
||||||
|
pub enable_received_notification: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SubscriptionControl(Arc<SubscriptionControlInner>);
|
||||||
|
|
||||||
|
struct SubscriptionControlInner {
|
||||||
|
subscriptions: DashMap<SubscriptionParams, Weak<SubscriptionTokenInner>>,
|
||||||
|
next_id: AtomicU64,
|
||||||
|
max_active_subscriptions: usize,
|
||||||
|
sender: crossbeam_channel::Sender<NotificationEntry>,
|
||||||
|
broadcast_sender: broadcast::Sender<RpcNotification>,
|
||||||
|
counter: TokenCounter,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SubscriptionControl {
|
||||||
|
pub fn new(
|
||||||
|
max_active_subscriptions: usize,
|
||||||
|
sender: crossbeam_channel::Sender<NotificationEntry>,
|
||||||
|
broadcast_sender: broadcast::Sender<RpcNotification>,
|
||||||
|
) -> Self {
|
||||||
|
Self(Arc::new(SubscriptionControlInner {
|
||||||
|
subscriptions: DashMap::new(),
|
||||||
|
next_id: AtomicU64::new(0),
|
||||||
|
max_active_subscriptions,
|
||||||
|
sender,
|
||||||
|
broadcast_sender,
|
||||||
|
counter: TokenCounter::new("rpc_pubsub_total_subscriptions"),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn broadcast_receiver(&self) -> broadcast::Receiver<RpcNotification> {
|
||||||
|
self.0.broadcast_sender.subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn subscribe(&self, params: SubscriptionParams) -> Result<SubscriptionToken, Error> {
|
||||||
|
debug!(
|
||||||
|
"Total existing subscriptions: {}",
|
||||||
|
self.0.subscriptions.len()
|
||||||
|
);
|
||||||
|
let count = self.0.subscriptions.len();
|
||||||
|
match self.0.subscriptions.entry(params) {
|
||||||
|
DashEntry::Occupied(entry) => Ok(SubscriptionToken(
|
||||||
|
entry
|
||||||
|
.get()
|
||||||
|
.upgrade()
|
||||||
|
.expect("dead subscription encountered in SubscriptionControl"),
|
||||||
|
self.0.counter.create_token(),
|
||||||
|
)),
|
||||||
|
DashEntry::Vacant(entry) => {
|
||||||
|
if count >= self.0.max_active_subscriptions {
|
||||||
|
inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1);
|
||||||
|
return Err(Error::TooManySubscriptions);
|
||||||
|
}
|
||||||
|
let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel));
|
||||||
|
let token = SubscriptionToken(
|
||||||
|
Arc::new(SubscriptionTokenInner {
|
||||||
|
control: Arc::clone(&self.0),
|
||||||
|
params: entry.key().clone(),
|
||||||
|
id,
|
||||||
|
}),
|
||||||
|
self.0.counter.create_token(),
|
||||||
|
);
|
||||||
|
let _ = self
|
||||||
|
.0
|
||||||
|
.sender
|
||||||
|
.send(NotificationEntry::Subscribed(token.0.params.clone(), id));
|
||||||
|
entry.insert(Arc::downgrade(&token.0));
|
||||||
|
datapoint_info!(
|
||||||
|
"rpc-subscription",
|
||||||
|
("total", self.0.subscriptions.len(), i64)
|
||||||
|
);
|
||||||
|
Ok(token)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn total(&self) -> usize {
|
||||||
|
self.0.subscriptions.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn assert_subscribed(&self, params: &SubscriptionParams) {
|
||||||
|
assert!(self.0.subscriptions.contains_key(params));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn assert_unsubscribed(&self, params: &SubscriptionParams) {
|
||||||
|
assert!(!self.0.subscriptions.contains_key(params));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn account_subscribed(&self, pubkey: &Pubkey) -> bool {
|
||||||
|
self.0.subscriptions.iter().any(|item| {
|
||||||
|
if let SubscriptionParams::Account(params) = item.key() {
|
||||||
|
¶ms.pubkey == pubkey
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn signature_subscribed(&self, signature: &Signature) -> bool {
|
||||||
|
self.0.subscriptions.iter().any(|item| {
|
||||||
|
if let SubscriptionParams::Signature(params) = item.key() {
|
||||||
|
¶ms.signature == signature
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SubscriptionInfo {
|
||||||
|
id: SubscriptionId,
|
||||||
|
params: SubscriptionParams,
|
||||||
|
method: &'static str,
|
||||||
|
pub last_notified_slot: RwLock<Slot>,
|
||||||
|
commitment: Option<CommitmentConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SubscriptionInfo {
|
||||||
|
pub fn id(&self) -> SubscriptionId {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn method(&self) -> &'static str {
|
||||||
|
self.method
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn params(&self) -> &SubscriptionParams {
|
||||||
|
&self.params
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn commitment(&self) -> Option<CommitmentConfig> {
|
||||||
|
self.commitment
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("node subscription limit reached")]
|
||||||
|
TooManySubscriptions,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct LogsSubscriptionsIndex {
|
||||||
|
all_count: usize,
|
||||||
|
all_with_votes_count: usize,
|
||||||
|
single_count: HashMap<Pubkey, usize>,
|
||||||
|
|
||||||
|
bank_forks: Arc<RwLock<BankForks>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LogsSubscriptionsIndex {
|
||||||
|
fn add(&mut self, params: &LogsSubscriptionParams) {
|
||||||
|
match params.kind {
|
||||||
|
LogsSubscriptionKind::All => self.all_count += 1,
|
||||||
|
LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count += 1,
|
||||||
|
LogsSubscriptionKind::Single(key) => {
|
||||||
|
*self.single_count.entry(key).or_default() += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.update_config();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&mut self, params: &LogsSubscriptionParams) {
|
||||||
|
match params.kind {
|
||||||
|
LogsSubscriptionKind::All => self.all_count -= 1,
|
||||||
|
LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count -= 1,
|
||||||
|
LogsSubscriptionKind::Single(key) => match self.single_count.entry(key) {
|
||||||
|
Entry::Occupied(mut entry) => {
|
||||||
|
*entry.get_mut() -= 1;
|
||||||
|
if *entry.get() == 0 {
|
||||||
|
entry.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Entry::Vacant(_) => error!("missing entry in single_count"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
self.update_config();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_config(&self) {
|
||||||
|
let config = if self.all_with_votes_count > 0 {
|
||||||
|
TransactionLogCollectorConfig {
|
||||||
|
filter: TransactionLogCollectorFilter::AllWithVotes,
|
||||||
|
mentioned_addresses: HashSet::new(),
|
||||||
|
}
|
||||||
|
} else if self.all_count > 0 {
|
||||||
|
TransactionLogCollectorConfig {
|
||||||
|
filter: TransactionLogCollectorFilter::All,
|
||||||
|
mentioned_addresses: HashSet::new(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
TransactionLogCollectorConfig {
|
||||||
|
filter: TransactionLogCollectorFilter::OnlyMentionedAddresses,
|
||||||
|
mentioned_addresses: self.single_count.keys().copied().collect(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
*self
|
||||||
|
.bank_forks
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.root_bank()
|
||||||
|
.transaction_log_collector_config
|
||||||
|
.write()
|
||||||
|
.unwrap() = config;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SubscriptionsTracker {
|
||||||
|
logs_subscriptions_index: LogsSubscriptionsIndex,
|
||||||
|
by_signature: HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>>,
|
||||||
|
// Accounts, logs, programs, signatures (not gossip)
|
||||||
|
commitment_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
|
||||||
|
// Accounts, logs, programs, signatures (gossip)
|
||||||
|
gossip_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
|
||||||
|
// Slots, slots updates, roots, votes.
|
||||||
|
node_progress_watchers: HashMap<SubscriptionParams, Arc<SubscriptionInfo>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SubscriptionsTracker {
|
||||||
|
pub fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
|
||||||
|
SubscriptionsTracker {
|
||||||
|
logs_subscriptions_index: LogsSubscriptionsIndex {
|
||||||
|
all_count: 0,
|
||||||
|
all_with_votes_count: 0,
|
||||||
|
single_count: HashMap::new(),
|
||||||
|
bank_forks,
|
||||||
|
},
|
||||||
|
by_signature: HashMap::new(),
|
||||||
|
commitment_watchers: HashMap::new(),
|
||||||
|
gossip_watchers: HashMap::new(),
|
||||||
|
node_progress_watchers: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn subscribe(
|
||||||
|
&mut self,
|
||||||
|
params: SubscriptionParams,
|
||||||
|
id: SubscriptionId,
|
||||||
|
last_notified_slot: impl FnOnce() -> Slot,
|
||||||
|
) {
|
||||||
|
let info = Arc::new(SubscriptionInfo {
|
||||||
|
last_notified_slot: RwLock::new(last_notified_slot()),
|
||||||
|
id,
|
||||||
|
commitment: params.commitment(),
|
||||||
|
method: params.method(),
|
||||||
|
params: params.clone(),
|
||||||
|
});
|
||||||
|
match ¶ms {
|
||||||
|
SubscriptionParams::Logs(params) => {
|
||||||
|
self.logs_subscriptions_index.add(params);
|
||||||
|
}
|
||||||
|
SubscriptionParams::Signature(params) => {
|
||||||
|
self.by_signature
|
||||||
|
.entry(params.signature)
|
||||||
|
.or_default()
|
||||||
|
.insert(id, Arc::clone(&info));
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
if info.params.is_commitment_watcher() {
|
||||||
|
self.commitment_watchers.insert(id, Arc::clone(&info));
|
||||||
|
}
|
||||||
|
if info.params.is_gossip_watcher() {
|
||||||
|
self.gossip_watchers.insert(id, Arc::clone(&info));
|
||||||
|
}
|
||||||
|
if info.params.is_node_progress_watcher() {
|
||||||
|
self.node_progress_watchers
|
||||||
|
.insert(info.params.clone(), Arc::clone(&info));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::collapsible_if)]
|
||||||
|
pub fn unsubscribe(&mut self, params: SubscriptionParams, id: SubscriptionId) {
|
||||||
|
match ¶ms {
|
||||||
|
SubscriptionParams::Logs(params) => {
|
||||||
|
self.logs_subscriptions_index.remove(params);
|
||||||
|
}
|
||||||
|
SubscriptionParams::Signature(params) => {
|
||||||
|
if let Entry::Occupied(mut entry) = self.by_signature.entry(params.signature) {
|
||||||
|
if entry.get_mut().remove(&id).is_none() {
|
||||||
|
warn!("Subscriptions inconsistency (missing entry in by_signature)");
|
||||||
|
}
|
||||||
|
if entry.get_mut().is_empty() {
|
||||||
|
entry.remove();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("Subscriptions inconsistency (missing entry in by_signature)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
if params.is_commitment_watcher() {
|
||||||
|
if self.commitment_watchers.remove(&id).is_none() {
|
||||||
|
warn!("Subscriptions inconsistency (missing entry in commitment_watchers)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if params.is_gossip_watcher() {
|
||||||
|
if self.gossip_watchers.remove(&id).is_none() {
|
||||||
|
warn!("Subscriptions inconsistency (missing entry in gossip_watchers)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if params.is_node_progress_watcher() {
|
||||||
|
if self.node_progress_watchers.remove(¶ms).is_none() {
|
||||||
|
warn!("Subscriptions inconsistency (missing entry in node_progress_watchers)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn by_signature(
|
||||||
|
&self,
|
||||||
|
) -> &HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>> {
|
||||||
|
&self.by_signature
|
||||||
|
}
|
||||||
|
pub fn commitment_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
|
||||||
|
&self.commitment_watchers
|
||||||
|
}
|
||||||
|
pub fn gossip_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
|
||||||
|
&self.gossip_watchers
|
||||||
|
}
|
||||||
|
pub fn node_progress_watchers(&self) -> &HashMap<SubscriptionParams, Arc<SubscriptionInfo>> {
|
||||||
|
&self.node_progress_watchers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SubscriptionTokenInner {
|
||||||
|
control: Arc<SubscriptionControlInner>,
|
||||||
|
params: SubscriptionParams,
|
||||||
|
id: SubscriptionId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for SubscriptionTokenInner {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("SubscriptionTokenInner")
|
||||||
|
.field("id", &self.id)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for SubscriptionTokenInner {
|
||||||
|
#[allow(clippy::collapsible_if)]
|
||||||
|
fn drop(&mut self) {
|
||||||
|
match self.control.subscriptions.entry(self.params.clone()) {
|
||||||
|
DashEntry::Vacant(_) => {
|
||||||
|
warn!("Subscriptions inconsistency (missing entry in by_params)");
|
||||||
|
}
|
||||||
|
DashEntry::Occupied(entry) => {
|
||||||
|
let _ = self.control.sender.send(NotificationEntry::Unsubscribed(
|
||||||
|
self.params.clone(),
|
||||||
|
self.id,
|
||||||
|
));
|
||||||
|
entry.remove();
|
||||||
|
datapoint_info!(
|
||||||
|
"rpc-subscription",
|
||||||
|
("total", self.control.subscriptions.len(), i64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SubscriptionToken(Arc<SubscriptionTokenInner>, CounterToken);
|
||||||
|
|
||||||
|
impl SubscriptionToken {
|
||||||
|
pub fn id(&self) -> SubscriptionId {
|
||||||
|
self.0.id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn params(&self) -> &SubscriptionParams {
|
||||||
|
&self.0.params
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::rpc_pubsub_service::PubSubConfig;
|
||||||
|
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
|
||||||
|
use solana_runtime::bank::Bank;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
struct ControlWrapper {
|
||||||
|
control: SubscriptionControl,
|
||||||
|
receiver: crossbeam_channel::Receiver<NotificationEntry>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ControlWrapper {
|
||||||
|
fn new() -> Self {
|
||||||
|
let (sender, receiver) = crossbeam_channel::unbounded();
|
||||||
|
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(42);
|
||||||
|
|
||||||
|
let control = SubscriptionControl::new(
|
||||||
|
PubSubConfig::default().max_active_subscriptions,
|
||||||
|
sender,
|
||||||
|
broadcast_sender,
|
||||||
|
);
|
||||||
|
Self { control, receiver }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn assert_subscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
|
||||||
|
if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap() {
|
||||||
|
assert_eq!(¶ms, expected_params);
|
||||||
|
assert_eq!(id, SubscriptionId::from(expected_id));
|
||||||
|
} else {
|
||||||
|
panic!("unexpected notification");
|
||||||
|
}
|
||||||
|
self.assert_silence();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn assert_unsubscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
|
||||||
|
if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap() {
|
||||||
|
assert_eq!(¶ms, expected_params);
|
||||||
|
assert_eq!(id, SubscriptionId::from(expected_id));
|
||||||
|
} else {
|
||||||
|
panic!("unexpected notification");
|
||||||
|
}
|
||||||
|
self.assert_silence();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn assert_silence(&self) {
|
||||||
|
assert!(self.receiver.try_recv().is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn notify_subscribe() {
|
||||||
|
let control = ControlWrapper::new();
|
||||||
|
let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
|
||||||
|
control.assert_subscribed(&SubscriptionParams::Slot, 0);
|
||||||
|
drop(token1);
|
||||||
|
control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn notify_subscribe_multiple() {
|
||||||
|
let control = ControlWrapper::new();
|
||||||
|
let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
|
||||||
|
control.assert_subscribed(&SubscriptionParams::Slot, 0);
|
||||||
|
let token2 = token1.clone();
|
||||||
|
drop(token1);
|
||||||
|
let token3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
|
||||||
|
drop(token3);
|
||||||
|
control.assert_silence();
|
||||||
|
drop(token2);
|
||||||
|
control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn notify_subscribe_two_subscriptions() {
|
||||||
|
let control = ControlWrapper::new();
|
||||||
|
let token_slot1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
|
||||||
|
control.assert_subscribed(&SubscriptionParams::Slot, 0);
|
||||||
|
|
||||||
|
let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
|
||||||
|
signature: Signature::default(),
|
||||||
|
commitment: CommitmentConfig::processed(),
|
||||||
|
enable_received_notification: false,
|
||||||
|
});
|
||||||
|
let token_signature1 = control.control.subscribe(signature_params.clone()).unwrap();
|
||||||
|
control.assert_subscribed(&signature_params, 1);
|
||||||
|
|
||||||
|
let token_slot2 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
|
||||||
|
let token_signature2 = control.control.subscribe(signature_params.clone()).unwrap();
|
||||||
|
drop(token_slot1);
|
||||||
|
control.assert_silence();
|
||||||
|
drop(token_slot2);
|
||||||
|
control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
|
||||||
|
drop(token_signature2);
|
||||||
|
control.assert_silence();
|
||||||
|
drop(token_signature1);
|
||||||
|
control.assert_unsubscribed(&signature_params, 1);
|
||||||
|
|
||||||
|
let token_slot3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
|
||||||
|
control.assert_subscribed(&SubscriptionParams::Slot, 2);
|
||||||
|
drop(token_slot3);
|
||||||
|
control.assert_unsubscribed(&SubscriptionParams::Slot, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn subscription_info() {
|
||||||
|
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||||
|
let bank = Bank::new_for_tests(&genesis_config);
|
||||||
|
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
||||||
|
let mut tracker = SubscriptionsTracker::new(bank_forks);
|
||||||
|
|
||||||
|
tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
|
||||||
|
let info = tracker
|
||||||
|
.node_progress_watchers
|
||||||
|
.get(&SubscriptionParams::Slot)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(info.commitment, None);
|
||||||
|
assert_eq!(info.params, SubscriptionParams::Slot);
|
||||||
|
assert_eq!(info.method, SubscriptionParams::Slot.method());
|
||||||
|
assert_eq!(info.id, SubscriptionId::from(0));
|
||||||
|
assert_eq!(*info.last_notified_slot.read().unwrap(), 0);
|
||||||
|
|
||||||
|
let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
|
||||||
|
pubkey: Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap(),
|
||||||
|
commitment: CommitmentConfig::finalized(),
|
||||||
|
encoding: UiAccountEncoding::Base64Zstd,
|
||||||
|
data_slice: None,
|
||||||
|
});
|
||||||
|
tracker.subscribe(account_params.clone(), 1.into(), || 42);
|
||||||
|
|
||||||
|
let info = tracker
|
||||||
|
.commitment_watchers
|
||||||
|
.get(&SubscriptionId::from(1))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(info.commitment, Some(CommitmentConfig::finalized()));
|
||||||
|
assert_eq!(info.params, account_params);
|
||||||
|
assert_eq!(info.method, account_params.method());
|
||||||
|
assert_eq!(info.id, SubscriptionId::from(1));
|
||||||
|
assert_eq!(*info.last_notified_slot.read().unwrap(), 42);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn subscription_indexes() {
|
||||||
|
fn counts(tracker: &SubscriptionsTracker) -> (usize, usize, usize, usize) {
|
||||||
|
(
|
||||||
|
tracker.by_signature.len(),
|
||||||
|
tracker.commitment_watchers.len(),
|
||||||
|
tracker.gossip_watchers.len(),
|
||||||
|
tracker.node_progress_watchers.len(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||||
|
let bank = Bank::new_for_tests(&genesis_config);
|
||||||
|
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
||||||
|
let mut tracker = SubscriptionsTracker::new(bank_forks);
|
||||||
|
|
||||||
|
tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
|
||||||
|
assert_eq!(counts(&tracker), (0, 0, 0, 1));
|
||||||
|
tracker.unsubscribe(SubscriptionParams::Slot, 0.into());
|
||||||
|
assert_eq!(counts(&tracker), (0, 0, 0, 0));
|
||||||
|
|
||||||
|
let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
|
||||||
|
pubkey: Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap(),
|
||||||
|
commitment: CommitmentConfig::finalized(),
|
||||||
|
encoding: UiAccountEncoding::Base64Zstd,
|
||||||
|
data_slice: None,
|
||||||
|
});
|
||||||
|
tracker.subscribe(account_params.clone(), 1.into(), || 0);
|
||||||
|
assert_eq!(counts(&tracker), (0, 1, 0, 0));
|
||||||
|
tracker.unsubscribe(account_params, 1.into());
|
||||||
|
assert_eq!(counts(&tracker), (0, 0, 0, 0));
|
||||||
|
|
||||||
|
let account_params2 = SubscriptionParams::Account(AccountSubscriptionParams {
|
||||||
|
pubkey: Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap(),
|
||||||
|
commitment: CommitmentConfig::confirmed(),
|
||||||
|
encoding: UiAccountEncoding::Base64Zstd,
|
||||||
|
data_slice: None,
|
||||||
|
});
|
||||||
|
tracker.subscribe(account_params2.clone(), 2.into(), || 0);
|
||||||
|
assert_eq!(counts(&tracker), (0, 0, 1, 0));
|
||||||
|
tracker.unsubscribe(account_params2, 2.into());
|
||||||
|
assert_eq!(counts(&tracker), (0, 0, 0, 0));
|
||||||
|
|
||||||
|
let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
|
||||||
|
signature: Signature::default(),
|
||||||
|
commitment: CommitmentConfig::processed(),
|
||||||
|
enable_received_notification: false,
|
||||||
|
});
|
||||||
|
tracker.subscribe(signature_params.clone(), 3.into(), || 0);
|
||||||
|
assert_eq!(counts(&tracker), (1, 1, 0, 0));
|
||||||
|
tracker.unsubscribe(signature_params, 3.into());
|
||||||
|
assert_eq!(counts(&tracker), (0, 0, 0, 0));
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -4,7 +4,7 @@
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Default, Clone, Copy, Debug, PartialEq)]
|
#[derive(Serialize, Deserialize, Default, Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct CommitmentConfig {
|
pub struct CommitmentConfig {
|
||||||
pub commitment: CommitmentLevel,
|
pub commitment: CommitmentLevel,
|
||||||
|
|
|
@ -1059,15 +1059,13 @@ pub fn main() {
|
||||||
&format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1);
|
&format!("{}-{}", VALIDATOR_PORT_RANGE.0, VALIDATOR_PORT_RANGE.1);
|
||||||
let default_genesis_archive_unpacked_size = &MAX_GENESIS_ARCHIVE_UNPACKED_SIZE.to_string();
|
let default_genesis_archive_unpacked_size = &MAX_GENESIS_ARCHIVE_UNPACKED_SIZE.to_string();
|
||||||
let default_rpc_max_multiple_accounts = &MAX_MULTIPLE_ACCOUNTS.to_string();
|
let default_rpc_max_multiple_accounts = &MAX_MULTIPLE_ACCOUNTS.to_string();
|
||||||
let default_rpc_pubsub_max_connections = PubSubConfig::default().max_connections.to_string();
|
|
||||||
let default_rpc_pubsub_max_fragment_size =
|
|
||||||
PubSubConfig::default().max_fragment_size.to_string();
|
|
||||||
let default_rpc_pubsub_max_in_buffer_capacity =
|
|
||||||
PubSubConfig::default().max_in_buffer_capacity.to_string();
|
|
||||||
let default_rpc_pubsub_max_out_buffer_capacity =
|
|
||||||
PubSubConfig::default().max_out_buffer_capacity.to_string();
|
|
||||||
let default_rpc_pubsub_max_active_subscriptions =
|
let default_rpc_pubsub_max_active_subscriptions =
|
||||||
PubSubConfig::default().max_active_subscriptions.to_string();
|
PubSubConfig::default().max_active_subscriptions.to_string();
|
||||||
|
let default_rpc_pubsub_queue_capacity_items =
|
||||||
|
PubSubConfig::default().queue_capacity_items.to_string();
|
||||||
|
let default_rpc_pubsub_queue_capacity_bytes =
|
||||||
|
PubSubConfig::default().queue_capacity_bytes.to_string();
|
||||||
let default_rpc_send_transaction_retry_ms = ValidatorConfig::default()
|
let default_rpc_send_transaction_retry_ms = ValidatorConfig::default()
|
||||||
.send_transaction_retry_ms
|
.send_transaction_retry_ms
|
||||||
.to_string();
|
.to_string();
|
||||||
|
@ -1737,10 +1735,10 @@ pub fn main() {
|
||||||
.value_name("NUMBER")
|
.value_name("NUMBER")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.validator(is_parsable::<usize>)
|
.validator(is_parsable::<usize>)
|
||||||
.default_value(&default_rpc_pubsub_max_connections)
|
.hidden(true)
|
||||||
.help("The maximum number of connections that RPC PubSub will support. \
|
.help("The maximum number of connections that RPC PubSub will support. \
|
||||||
This is a hard limit and no new connections beyond this limit can \
|
This is a hard limit and no new connections beyond this limit can \
|
||||||
be made until an old connection is dropped."),
|
be made until an old connection is dropped. (Obsolete)"),
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("rpc_pubsub_max_fragment_size")
|
Arg::with_name("rpc_pubsub_max_fragment_size")
|
||||||
|
@ -1748,9 +1746,9 @@ pub fn main() {
|
||||||
.value_name("BYTES")
|
.value_name("BYTES")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.validator(is_parsable::<usize>)
|
.validator(is_parsable::<usize>)
|
||||||
.default_value(&default_rpc_pubsub_max_fragment_size)
|
.hidden(true)
|
||||||
.help("The maximum length in bytes of acceptable incoming frames. Messages longer \
|
.help("The maximum length in bytes of acceptable incoming frames. Messages longer \
|
||||||
than this will be rejected."),
|
than this will be rejected. (Obsolete)"),
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("rpc_pubsub_max_in_buffer_capacity")
|
Arg::with_name("rpc_pubsub_max_in_buffer_capacity")
|
||||||
|
@ -1758,8 +1756,9 @@ pub fn main() {
|
||||||
.value_name("BYTES")
|
.value_name("BYTES")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.validator(is_parsable::<usize>)
|
.validator(is_parsable::<usize>)
|
||||||
.default_value(&default_rpc_pubsub_max_in_buffer_capacity)
|
.hidden(true)
|
||||||
.help("The maximum size in bytes to which the incoming websocket buffer can grow."),
|
.help("The maximum size in bytes to which the incoming websocket buffer can grow. \
|
||||||
|
(Obsolete)"),
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("rpc_pubsub_max_out_buffer_capacity")
|
Arg::with_name("rpc_pubsub_max_out_buffer_capacity")
|
||||||
|
@ -1767,8 +1766,9 @@ pub fn main() {
|
||||||
.value_name("BYTES")
|
.value_name("BYTES")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.validator(is_parsable::<usize>)
|
.validator(is_parsable::<usize>)
|
||||||
.default_value(&default_rpc_pubsub_max_out_buffer_capacity)
|
.hidden(true)
|
||||||
.help("The maximum size in bytes to which the outgoing websocket buffer can grow."),
|
.help("The maximum size in bytes to which the outgoing websocket buffer can grow. \
|
||||||
|
(Obsolete)"),
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("rpc_pubsub_max_active_subscriptions")
|
Arg::with_name("rpc_pubsub_max_active_subscriptions")
|
||||||
|
@ -1780,6 +1780,26 @@ pub fn main() {
|
||||||
.help("The maximum number of active subscriptions that RPC PubSub will accept \
|
.help("The maximum number of active subscriptions that RPC PubSub will accept \
|
||||||
across all connections."),
|
across all connections."),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("rpc_pubsub_queue_capacity_items")
|
||||||
|
.long("rpc-pubsub-queue-capacity-items")
|
||||||
|
.takes_value(true)
|
||||||
|
.value_name("NUMBER")
|
||||||
|
.validator(is_parsable::<usize>)
|
||||||
|
.default_value(&default_rpc_pubsub_queue_capacity_items)
|
||||||
|
.help("The maximum number of notifications that RPC PubSub will store \
|
||||||
|
across all connections."),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("rpc_pubsub_queue_capacity_bytes")
|
||||||
|
.long("rpc-pubsub-queue-capacity-bytes")
|
||||||
|
.takes_value(true)
|
||||||
|
.value_name("BYTES")
|
||||||
|
.validator(is_parsable::<usize>)
|
||||||
|
.default_value(&default_rpc_pubsub_queue_capacity_bytes)
|
||||||
|
.help("The maximum total size of notifications that RPC PubSub will store \
|
||||||
|
across all connections."),
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("rpc_send_transaction_retry_ms")
|
Arg::with_name("rpc_send_transaction_retry_ms")
|
||||||
.long("rpc-send-retry-ms")
|
.long("rpc-send-retry-ms")
|
||||||
|
@ -2569,23 +2589,21 @@ pub fn main() {
|
||||||
}),
|
}),
|
||||||
pubsub_config: PubSubConfig {
|
pubsub_config: PubSubConfig {
|
||||||
enable_vote_subscription: matches.is_present("rpc_pubsub_enable_vote_subscription"),
|
enable_vote_subscription: matches.is_present("rpc_pubsub_enable_vote_subscription"),
|
||||||
max_connections: value_t_or_exit!(matches, "rpc_pubsub_max_connections", usize),
|
|
||||||
max_fragment_size: value_t_or_exit!(matches, "rpc_pubsub_max_fragment_size", usize),
|
|
||||||
max_in_buffer_capacity: value_t_or_exit!(
|
|
||||||
matches,
|
|
||||||
"rpc_pubsub_max_in_buffer_capacity",
|
|
||||||
usize
|
|
||||||
),
|
|
||||||
max_out_buffer_capacity: value_t_or_exit!(
|
|
||||||
matches,
|
|
||||||
"rpc_pubsub_max_out_buffer_capacity",
|
|
||||||
usize
|
|
||||||
),
|
|
||||||
max_active_subscriptions: value_t_or_exit!(
|
max_active_subscriptions: value_t_or_exit!(
|
||||||
matches,
|
matches,
|
||||||
"rpc_pubsub_max_active_subscriptions",
|
"rpc_pubsub_max_active_subscriptions",
|
||||||
usize
|
usize
|
||||||
),
|
),
|
||||||
|
queue_capacity_items: value_t_or_exit!(
|
||||||
|
matches,
|
||||||
|
"rpc_pubsub_queue_capacity_items",
|
||||||
|
usize
|
||||||
|
),
|
||||||
|
queue_capacity_bytes: value_t_or_exit!(
|
||||||
|
matches,
|
||||||
|
"rpc_pubsub_queue_capacity_bytes",
|
||||||
|
usize
|
||||||
|
),
|
||||||
},
|
},
|
||||||
voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
|
voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
|
||||||
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
|
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
|
||||||
|
|
Loading…
Reference in New Issue