feat: implementation of live-slots command (#8129)

This commit is contained in:
Sunny Gleason 2020-02-06 14:16:30 -05:00 committed by GitHub
parent a25e57c397
commit a7fa92b372
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 496 additions and 10 deletions

110
Cargo.lock generated
View File

@ -1111,6 +1111,19 @@ name = "fnv"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "fs_extra"
version = "1.1.0"
@ -1593,6 +1606,14 @@ dependencies = [
"regex 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "input_buffer"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "iovec"
version = "0.1.4"
@ -2011,6 +2032,23 @@ name = "multimap"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "native-tls"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"openssl 0.10.27 (registry+https://github.com/rust-lang/crates.io-index)",
"openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"openssl-sys 0.9.54 (registry+https://github.com/rust-lang/crates.io-index)",
"schannel 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"security-framework 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"security-framework-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "net2"
version = "0.2.33"
@ -2169,11 +2207,36 @@ name = "opaque-debug"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "openssl"
version = "0.10.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
"foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
"openssl-sys 0.9.54 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "openssl-probe"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "openssl-sys"
version = "0.9.54"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"cc 1.0.49 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
"pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)",
"vcpkg 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ordermap"
version = "0.3.5"
@ -2398,6 +2461,11 @@ name = "pin-utils"
version = "0.1.0-alpha.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "pkg-config"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ppv-lite86"
version = "0.2.5"
@ -3679,6 +3747,9 @@ dependencies = [
"solana-logger 0.24.0",
"solana-net-utils 0.24.0",
"solana-sdk 0.24.0",
"thiserror 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
"tungstenite 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -5423,6 +5494,25 @@ dependencies = [
"cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tungstenite"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"base64 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"input_buffer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "typed-arena"
version = "1.6.1"
@ -5528,6 +5618,11 @@ dependencies = [
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "utf-8"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "utf8-ranges"
version = "0.1.3"
@ -5541,6 +5636,11 @@ dependencies = [
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "vcpkg"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "vec_map"
version = "0.8.1"
@ -5964,6 +6064,8 @@ dependencies = [
"checksum fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33"
"checksum flate2 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)" = "2adaffba6388640136149e18ed080b77a78611c1e1d6de75aedcdf78df5d4682"
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
"checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
"checksum fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5f2a4a2034423744d2cc7ca2068453168dcdb82c438419e639a26bd87839c674"
"checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
@ -6017,6 +6119,7 @@ dependencies = [
"checksum indexed 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d480125acf340d6a6e59dab69ae19d6fca3a906e1eade277671272cc8f73794b"
"checksum indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "076f042c5b7b98f31d205f1249267e12a6518c1481e9dae9764af19b707d2292"
"checksum indicatif 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49a68371cf417889c9d7f98235b7102ea7c54fc59bcbd22f3dea785be9d27e40"
"checksum input_buffer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754"
"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
"checksum itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0d47946d458e94a1b7bcabbf6521ea7c037062c81f534615abcad76e84d4970d"
"checksum itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f56a2d0bc861f9165be4eb3442afd3c236d8a98afd426f65d92324ae1091a484"
@ -6063,6 +6166,7 @@ dependencies = [
"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
"checksum mirai-annotations 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7968d6cdc3c7a9632e45d738fd07fde89d04bbb0e88e7abb058871a82fa92645"
"checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151"
"checksum native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4b2df1a4c22fd44a62147fd8f13dd0f95c9d8ca7b2610299b2a2f9cf8964274e"
"checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88"
"checksum new_debug_unreachable 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f40f005c60db6e03bae699e414c58bf9aa7ea02a2d0b9bfbcf19286cc4c82b30"
"checksum nibble_vec 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c8d77f3db4bce033f4d04db08079b2ef1c3d02b44e86f25d08886fafa7756ffa"
@ -6082,7 +6186,9 @@ dependencies = [
"checksum number_prefix 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a"
"checksum once_cell 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "891f486f630e5c5a4916c7e16c4b24a53e78c860b646e9f8e005e4f16847bfed"
"checksum opaque-debug 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "93f5bb2e8e8dec81642920ccff6b61f1eb94fa3020c5a325c9851ff604152409"
"checksum openssl 0.10.27 (registry+https://github.com/rust-lang/crates.io-index)" = "e176a45fedd4c990e26580847a525e39e16ec32ac78957dbf62ded31b3abfd6f"
"checksum openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
"checksum openssl-sys 0.9.54 (registry+https://github.com/rust-lang/crates.io-index)" = "1024c0a59774200a555087a6da3f253a9095a5f344e353b212ac4c8b8e450986"
"checksum ordermap 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a86ed3f5f244b372d6b1a00b72ef7f8876d0bc6a78a4c9985c53614041512063"
"checksum owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37"
"checksum owning_ref 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49a4b8ea2179e6a2e27411d3bca09ca6dd630821cf6894c6c7c8467a8ee7ef13"
@ -6108,6 +6214,7 @@ dependencies = [
"checksum pin-project-internal 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "44ca92f893f0656d3cba8158dd0f2b99b94de256a4a54e870bd6922fcc6c8355"
"checksum pin-project-lite 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f0af6cbca0e6e3ce8692ee19fb8d734b641899e07b68eb73e9bbbd32f1703991"
"checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587"
"checksum pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)" = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677"
"checksum ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e3cbf9f658cdb5000fcf6f362b8ea2ba154b9f146a61c7a20d647034c6b6561b"
"checksum precomputed-hash 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
"checksum predicates 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "53e09015b0d3f5a0ec2d4428f7559bb7b3fff341b4e159fedd1d57fac8b939ff"
@ -6301,6 +6408,7 @@ dependencies = [
"checksum trees 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "afa1821e85be4f56cc5bd08bdbc32c0e26d105c90bed9c637992f6c7f747c180"
"checksum try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382"
"checksum try_from 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "283d3b89e1368717881a9d51dad843cc435380d8109c9e47d38780a324698d8b"
"checksum tungstenite 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cfea31758bf674f990918962e8e5f07071a3161bd7c4138ed23e416e1ac4264e"
"checksum typed-arena 1.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9c0704a799d314795d3d847d519b284bae681ef9b1f3da99f7ebc7b47ba2e607"
"checksum typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "612d636f949607bdf9b123b4a6f6d966dedf3ff669f7f045890d3a4a73948169"
"checksum unicase 2.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a84e5511b2a947f3ae965dcb29b13b7b1691b6e7332cf5dbc1744138d5acb7f6"
@ -6317,8 +6425,10 @@ dependencies = [
"checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a"
"checksum url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb"
"checksum users 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c72f4267aea0c3ec6d07eaabea6ead7c5ddacfafc5e22bcf8d186706851fb4cf"
"checksum utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7"
"checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f"
"checksum uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)" = "90dbc611eb48397705a6b0f6e917da23ae517e4d127123d2cf7674206627d32a"
"checksum vcpkg 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3fc439f2794e98976c88a2a2dafce96b930fe8010b0a256b3c2199a773933168"
"checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a"
"checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd"
"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"

View File

@ -199,6 +199,9 @@ pub enum CliCommand {
commitment_config: CommitmentConfig,
},
LeaderSchedule,
LiveSlots {
url: String,
},
Ping {
lamports: u64,
interval: Duration,
@ -474,6 +477,7 @@ pub fn parse_command(matches: &ArgMatches<'_>) -> Result<CliCommandInfo, Box<dyn
require_keypair: false,
}),
("ping", Some(matches)) => parse_cluster_ping(matches),
("live-slots", Some(matches)) => parse_live_slots(matches),
("block-production", Some(matches)) => parse_show_block_production(matches),
("gossip", Some(_matches)) => Ok(CliCommandInfo {
command: CliCommand::ShowGossip,
@ -1274,6 +1278,7 @@ pub fn process_command(config: &CliConfig) -> ProcessResult {
process_get_transaction_count(&rpc_client, commitment_config)
}
CliCommand::LeaderSchedule => process_leader_schedule(&rpc_client),
CliCommand::LiveSlots { url } => process_live_slots(&url),
CliCommand::Ping {
lamports,
interval,

View File

@ -9,7 +9,11 @@ use clap::{value_t, value_t_or_exit, App, Arg, ArgMatches, SubCommand};
use console::{style, Emoji};
use indicatif::{ProgressBar, ProgressStyle};
use solana_clap_utils::{input_parsers::*, input_validators::*};
use solana_client::{rpc_client::RpcClient, rpc_response::RpcVoteAccountInfo};
use solana_client::{
pubsub_client::{PubsubClient, SlotInfoMessage},
rpc_client::RpcClient,
rpc_response::RpcVoteAccountInfo,
};
use solana_sdk::{
account_utils::StateMut,
clock::{self, Slot},
@ -23,6 +27,10 @@ use solana_sdk::{
use std::{
collections::{HashMap, VecDeque},
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::sleep,
time::{Duration, Instant},
};
@ -157,6 +165,19 @@ impl ClusterQuerySubCommands for App<'_, '_> {
),
),
)
.subcommand(
SubCommand::with_name("live-slots")
.about("Show information about the current slot progression")
.arg(
Arg::with_name("websocket_url")
.short("w")
.long("ws")
.value_name("URL")
.takes_value(true)
.default_value("ws://127.0.0.1:8900")
.help("WebSocket URL for PubSub RPC connection"),
),
)
.subcommand(
SubCommand::with_name("block-production")
.about("Show information about block production")
@ -246,6 +267,14 @@ pub fn parse_cluster_ping(matches: &ArgMatches<'_>) -> Result<CliCommandInfo, Cl
})
}
pub fn parse_live_slots(matches: &ArgMatches<'_>) -> Result<CliCommandInfo, CliError> {
let url: String = value_t_or_exit!(matches, "websocket_url", String);
Ok(CliCommandInfo {
command: CliCommand::LiveSlots { url },
require_keypair: false,
})
}
pub fn parse_get_block_time(matches: &ArgMatches<'_>) -> Result<CliCommandInfo, CliError> {
let slot = value_t_or_exit!(matches, "slot", u64);
Ok(CliCommandInfo {
@ -810,6 +839,63 @@ pub fn process_ping(
Ok("".to_string())
}
pub fn process_live_slots(url: &str) -> ProcessResult {
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
ctrlc::set_handler(move || {
exit_clone.store(true, Ordering::Relaxed);
})?;
let mut current: Option<SlotInfoMessage> = None;
let mut message = "".to_string();
let slot_progress = new_spinner_progress_bar();
slot_progress.set_message("Connecting...");
let (mut client, receiver) = PubsubClient::slot_subscribe(url)?;
slot_progress.set_message("Connected.");
loop {
if exit.load(Ordering::Relaxed) {
eprintln!("{}", message);
client.shutdown().unwrap();
break;
}
match receiver.recv() {
Ok(new_info) => {
message = format!("{:?}", new_info).to_owned();
slot_progress.set_message(&message);
if let Some(previous) = current {
let slot_delta: i64 = new_info.slot as i64 - previous.slot as i64;
let root_delta: i64 = new_info.root as i64 - previous.root as i64;
//
// if slot has advanced out of step with the root, we detect
// a mismatch and output the slot information
//
if slot_delta != root_delta {
let prev_root = format!(
"|<- {} <- … <- {} <- {}",
previous.root, previous.parent, previous.slot
)
.to_owned();
slot_progress.println(&prev_root);
}
}
current = Some(new_info);
}
Err(err) => {
eprintln!("disconnected: {:?}", err);
break;
}
}
}
Ok("".to_string())
}
pub fn process_show_gossip(rpc_client: &RpcClient) -> ProcessResult {
let cluster_nodes = rpc_client.get_cluster_nodes()?;

View File

@ -20,6 +20,9 @@ serde_derive = "1.0.103"
serde_json = "1.0.46"
solana-net-utils = { path = "../net-utils", version = "0.24.0" }
solana-sdk = { path = "../sdk", version = "0.24.0" }
thiserror = "1.0"
tungstenite = "0.10.1"
url = "2.1.1"
[dev-dependencies]
assert_matches = "1.3.0"

View File

@ -5,6 +5,7 @@ pub mod client_error;
mod generic_rpc_client_request;
pub mod mock_rpc_client_request;
pub mod perf_utils;
pub mod pubsub_client;
pub mod rpc_client;
pub mod rpc_client_request;
pub mod rpc_request;

220
client/src/pubsub_client.rs Normal file
View File

@ -0,0 +1,220 @@
use log::*;
use serde::{de::DeserializeOwned, Deserialize};
use serde_json::{
json,
value::Value::{Number, Object},
Map, Value,
};
use std::{
marker::PhantomData,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Receiver},
Arc, RwLock,
},
thread::JoinHandle,
};
use thiserror::Error;
use tungstenite::{client::AutoStream, connect, Message, WebSocket};
use url::{ParseError, Url};
#[derive(Debug, Error)]
pub enum PubsubClientError {
#[error("url parse error")]
UrlParseError(#[from] ParseError),
#[error("unable to connect to server")]
ConnectionError(#[from] tungstenite::Error),
#[error("json parse error")]
JsonParseError(#[from] serde_json::error::Error),
#[error("unexpected message format")]
UnexpectedMessageError,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub struct SlotInfoMessage {
pub parent: u64,
pub root: u64,
pub slot: u64,
}
pub struct PubsubClientSubscription<T>
where
T: DeserializeOwned,
{
message_type: PhantomData<T>,
operation: &'static str,
socket: Arc<RwLock<WebSocket<AutoStream>>>,
subscription_id: u64,
t_cleanup: Option<JoinHandle<()>>,
exit: Arc<AtomicBool>,
}
impl<T> Drop for PubsubClientSubscription<T>
where
T: DeserializeOwned,
{
fn drop(&mut self) {
self.send_unsubscribe()
.unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
self.socket
.write()
.unwrap()
.close(None)
.unwrap_or_else(|_| warn!("unable to close websocket"));
}
}
impl<T> PubsubClientSubscription<T>
where
T: DeserializeOwned,
{
fn send_subscribe(
writable_socket: &Arc<RwLock<WebSocket<AutoStream>>>,
operation: &str,
) -> Result<u64, PubsubClientError> {
let method = format!("{}Subscribe", operation);
writable_socket
.write()
.unwrap()
.write_message(Message::Text(
json!({
"jsonrpc":"2.0","id":1,"method":method,"params":[]
})
.to_string(),
))?;
let message = writable_socket.write().unwrap().read_message()?;
Self::extract_subscription_id(message)
}
fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
let message_text = &message.into_text()?;
let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
if let Some(Number(x)) = json_msg.get("result") {
if let Some(x) = x.as_u64() {
return Ok(x);
}
}
Err(PubsubClientError::UnexpectedMessageError)
}
pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
let method = format!("{}Unubscribe", self.operation);
self.socket
.write()
.unwrap()
.write_message(Message::Text(
json!({
"jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
})
.to_string(),
))
.map_err(|err| err.into())
}
fn read_message(
writable_socket: &Arc<RwLock<WebSocket<AutoStream>>>,
) -> Result<T, PubsubClientError> {
let message = writable_socket.write().unwrap().read_message()?;
let message_text = &message.into_text().unwrap();
let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
if let Some(Object(value_1)) = json_msg.get("params") {
if let Some(value_2) = value_1.get("result") {
let x: T = serde_json::from_value::<T>(value_2.clone()).unwrap();
return Ok(x);
}
}
Err(PubsubClientError::UnexpectedMessageError)
}
pub fn shutdown(&mut self) -> std::thread::Result<()> {
if self.t_cleanup.is_some() {
info!("websocket thread - shutting down");
self.exit.store(true, Ordering::Relaxed);
let x = self.t_cleanup.take().unwrap().join();
info!("websocket thread - shut down.");
x
} else {
warn!("websocket thread - already shut down.");
Ok(())
}
}
}
const SLOT_OPERATION: &str = "slot";
pub struct PubsubClient {}
impl PubsubClient {
pub fn slot_subscribe(
url: &str,
) -> Result<
(
PubsubClientSubscription<SlotInfoMessage>,
Receiver<SlotInfoMessage>,
),
PubsubClientError,
> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let (sender, receiver) = channel::<SlotInfoMessage>();
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let subscription_id = PubsubClientSubscription::<SlotInfoMessage>::send_subscribe(
&socket_clone,
SLOT_OPERATION,
)
.unwrap();
let t_cleanup = std::thread::spawn(move || {
loop {
if exit_clone.load(Ordering::Relaxed) {
break;
}
let message: Result<SlotInfoMessage, PubsubClientError> =
PubsubClientSubscription::read_message(&socket_clone);
if let Ok(msg) = message {
match sender.send(msg.clone()) {
Ok(_) => (),
Err(err) => {
info!("receive error: {:?}", err);
break;
}
}
} else {
info!("receive error: {:?}", message);
break;
}
}
info!("websocket - exited receive loop");
});
let result: PubsubClientSubscription<SlotInfoMessage> = PubsubClientSubscription {
message_type: PhantomData,
operation: SLOT_OPERATION,
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
exit,
};
Ok((result, receiver))
}
}
#[cfg(test)]
mod tests {
// see core/tests/client.rs#test_slot_subscription()
}

View File

@ -1,12 +1,26 @@
use solana_client::rpc_client::RpcClient;
use solana_core::validator::new_validator_for_tests;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::KeypairUtil;
use solana_sdk::system_transaction;
use std::fs::remove_dir_all;
use std::thread::sleep;
use std::time::{Duration, Instant};
use solana_client::{
pubsub_client::{PubsubClient, SlotInfoMessage},
rpc_client::RpcClient,
};
use solana_core::{
rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions,
validator::new_validator_for_tests,
};
use solana_sdk::{
commitment_config::CommitmentConfig, pubkey::Pubkey, rpc_port, signature::KeypairUtil,
system_transaction,
};
use std::{
fs::remove_dir_all,
net::{IpAddr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::sleep,
time::{Duration, Instant},
};
use systemstat::Ipv4Addr;
#[test]
fn test_rpc_client() {
@ -57,3 +71,50 @@ fn test_rpc_client() {
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}
#[test]
fn test_slot_subscription() {
let pubsub_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
let exit = Arc::new(AtomicBool::new(false));
let subscriptions = Arc::new(RpcSubscriptions::new(&exit));
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit);
std::thread::sleep(Duration::from_millis(400));
let (mut client, receiver) =
PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
let mut errors: Vec<(SlotInfoMessage, SlotInfoMessage)> = Vec::new();
for i in 0..3 {
subscriptions.notify_slot(i + 1, i, i);
let maybe_actual = receiver.recv_timeout(Duration::from_millis(400));
match maybe_actual {
Ok(actual) => {
let expected = SlotInfoMessage {
slot: i + 1,
parent: i,
root: i,
};
if actual != expected {
errors.push((actual, expected));
}
}
Err(_err) => {
eprintln!("unexpected websocket receive timeout");
break;
}
}
}
exit.store(true, Ordering::Relaxed);
client.shutdown().unwrap();
pubsub_service.close().unwrap();
assert_eq!(errors, [].to_vec());
}