From a7fa92b372fa713dc2c49c7f3e7c0f1e74fb3e63 Mon Sep 17 00:00:00 2001 From: Sunny Gleason Date: Thu, 6 Feb 2020 14:16:30 -0500 Subject: [PATCH] feat: implementation of live-slots command (#8129) --- Cargo.lock | 110 ++++++++++++++++++ cli/src/cli.rs | 5 + cli/src/cluster_query.rs | 88 ++++++++++++++- client/Cargo.toml | 3 + client/src/lib.rs | 1 + client/src/pubsub_client.rs | 220 ++++++++++++++++++++++++++++++++++++ core/tests/client.rs | 79 +++++++++++-- 7 files changed, 496 insertions(+), 10 deletions(-) create mode 100644 client/src/pubsub_client.rs diff --git a/Cargo.lock b/Cargo.lock index e39bde975..91f43f75d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1111,6 +1111,19 @@ name = "fnv" version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "fs_extra" version = "1.1.0" @@ -1593,6 +1606,14 @@ dependencies = [ "regex 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "input_buffer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "iovec" version = "0.1.4" @@ -2011,6 +2032,23 @@ name = "multimap" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "native-tls" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "openssl 0.10.27 (registry+https://github.com/rust-lang/crates.io-index)", + "openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "openssl-sys 0.9.54 (registry+https://github.com/rust-lang/crates.io-index)", + "schannel 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", + "security-framework 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "security-framework-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "net2" version = "0.2.33" @@ -2169,11 +2207,36 @@ name = "opaque-debug" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "openssl" +version = "0.10.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "openssl-sys 0.9.54 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "openssl-probe" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "openssl-sys" +version = "0.9.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "cc 1.0.49 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)", + "vcpkg 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ordermap" version = "0.3.5" @@ -2398,6 +2461,11 @@ name = "pin-utils" version = "0.1.0-alpha.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "pkg-config" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "ppv-lite86" version = "0.2.5" @@ -3679,6 +3747,9 @@ dependencies = [ "solana-logger 0.24.0", "solana-net-utils 0.24.0", "solana-sdk 0.24.0", + "thiserror 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tungstenite 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", + "url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -5423,6 +5494,25 @@ dependencies = [ "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tungstenite" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "base64 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "input_buffer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", + "sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "typed-arena" version = "1.6.1" @@ -5528,6 +5618,11 @@ dependencies = [ "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "utf-8" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "utf8-ranges" version = "0.1.3" @@ -5541,6 +5636,11 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "vcpkg" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "vec_map" version = "0.8.1" @@ -5964,6 +6064,8 @@ dependencies = [ "checksum fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" "checksum flate2 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)" = "2adaffba6388640136149e18ed080b77a78611c1e1d6de75aedcdf78df5d4682" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" +"checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +"checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" "checksum fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5f2a4a2034423744d2cc7ca2068453168dcdb82c438419e639a26bd87839c674" "checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" @@ -6017,6 +6119,7 @@ dependencies = [ "checksum indexed 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d480125acf340d6a6e59dab69ae19d6fca3a906e1eade277671272cc8f73794b" "checksum indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "076f042c5b7b98f31d205f1249267e12a6518c1481e9dae9764af19b707d2292" "checksum indicatif 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49a68371cf417889c9d7f98235b7102ea7c54fc59bcbd22f3dea785be9d27e40" +"checksum input_buffer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" "checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" "checksum itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0d47946d458e94a1b7bcabbf6521ea7c037062c81f534615abcad76e84d4970d" "checksum itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f56a2d0bc861f9165be4eb3442afd3c236d8a98afd426f65d92324ae1091a484" @@ -6063,6 +6166,7 @@ dependencies = [ "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" "checksum mirai-annotations 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7968d6cdc3c7a9632e45d738fd07fde89d04bbb0e88e7abb058871a82fa92645" "checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" +"checksum native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4b2df1a4c22fd44a62147fd8f13dd0f95c9d8ca7b2610299b2a2f9cf8964274e" "checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" "checksum new_debug_unreachable 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f40f005c60db6e03bae699e414c58bf9aa7ea02a2d0b9bfbcf19286cc4c82b30" "checksum nibble_vec 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c8d77f3db4bce033f4d04db08079b2ef1c3d02b44e86f25d08886fafa7756ffa" @@ -6082,7 +6186,9 @@ dependencies = [ "checksum number_prefix 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a" "checksum once_cell 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "891f486f630e5c5a4916c7e16c4b24a53e78c860b646e9f8e005e4f16847bfed" "checksum opaque-debug 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "93f5bb2e8e8dec81642920ccff6b61f1eb94fa3020c5a325c9851ff604152409" +"checksum openssl 0.10.27 (registry+https://github.com/rust-lang/crates.io-index)" = "e176a45fedd4c990e26580847a525e39e16ec32ac78957dbf62ded31b3abfd6f" "checksum openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" +"checksum openssl-sys 0.9.54 (registry+https://github.com/rust-lang/crates.io-index)" = "1024c0a59774200a555087a6da3f253a9095a5f344e353b212ac4c8b8e450986" "checksum ordermap 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a86ed3f5f244b372d6b1a00b72ef7f8876d0bc6a78a4c9985c53614041512063" "checksum owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37" "checksum owning_ref 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49a4b8ea2179e6a2e27411d3bca09ca6dd630821cf6894c6c7c8467a8ee7ef13" @@ -6108,6 +6214,7 @@ dependencies = [ "checksum pin-project-internal 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "44ca92f893f0656d3cba8158dd0f2b99b94de256a4a54e870bd6922fcc6c8355" "checksum pin-project-lite 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f0af6cbca0e6e3ce8692ee19fb8d734b641899e07b68eb73e9bbbd32f1703991" "checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" +"checksum pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)" = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" "checksum ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e3cbf9f658cdb5000fcf6f362b8ea2ba154b9f146a61c7a20d647034c6b6561b" "checksum precomputed-hash 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" "checksum predicates 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "53e09015b0d3f5a0ec2d4428f7559bb7b3fff341b4e159fedd1d57fac8b939ff" @@ -6301,6 +6408,7 @@ dependencies = [ "checksum trees 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "afa1821e85be4f56cc5bd08bdbc32c0e26d105c90bed9c637992f6c7f747c180" "checksum try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" "checksum try_from 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "283d3b89e1368717881a9d51dad843cc435380d8109c9e47d38780a324698d8b" +"checksum tungstenite 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cfea31758bf674f990918962e8e5f07071a3161bd7c4138ed23e416e1ac4264e" "checksum typed-arena 1.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9c0704a799d314795d3d847d519b284bae681ef9b1f3da99f7ebc7b47ba2e607" "checksum typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "612d636f949607bdf9b123b4a6f6d966dedf3ff669f7f045890d3a4a73948169" "checksum unicase 2.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a84e5511b2a947f3ae965dcb29b13b7b1691b6e7332cf5dbc1744138d5acb7f6" @@ -6317,8 +6425,10 @@ dependencies = [ "checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" "checksum url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb" "checksum users 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c72f4267aea0c3ec6d07eaabea6ead7c5ddacfafc5e22bcf8d186706851fb4cf" +"checksum utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" "checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f" "checksum uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)" = "90dbc611eb48397705a6b0f6e917da23ae517e4d127123d2cf7674206627d32a" +"checksum vcpkg 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3fc439f2794e98976c88a2a2dafce96b930fe8010b0a256b3c2199a773933168" "checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" "checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" diff --git a/cli/src/cli.rs b/cli/src/cli.rs index aec539c98..71972f2c0 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -199,6 +199,9 @@ pub enum CliCommand { commitment_config: CommitmentConfig, }, LeaderSchedule, + LiveSlots { + url: String, + }, Ping { lamports: u64, interval: Duration, @@ -474,6 +477,7 @@ pub fn parse_command(matches: &ArgMatches<'_>) -> Result parse_cluster_ping(matches), + ("live-slots", Some(matches)) => parse_live_slots(matches), ("block-production", Some(matches)) => parse_show_block_production(matches), ("gossip", Some(_matches)) => Ok(CliCommandInfo { command: CliCommand::ShowGossip, @@ -1274,6 +1278,7 @@ pub fn process_command(config: &CliConfig) -> ProcessResult { process_get_transaction_count(&rpc_client, commitment_config) } CliCommand::LeaderSchedule => process_leader_schedule(&rpc_client), + CliCommand::LiveSlots { url } => process_live_slots(&url), CliCommand::Ping { lamports, interval, diff --git a/cli/src/cluster_query.rs b/cli/src/cluster_query.rs index d69bec848..703879f9e 100644 --- a/cli/src/cluster_query.rs +++ b/cli/src/cluster_query.rs @@ -9,7 +9,11 @@ use clap::{value_t, value_t_or_exit, App, Arg, ArgMatches, SubCommand}; use console::{style, Emoji}; use indicatif::{ProgressBar, ProgressStyle}; use solana_clap_utils::{input_parsers::*, input_validators::*}; -use solana_client::{rpc_client::RpcClient, rpc_response::RpcVoteAccountInfo}; +use solana_client::{ + pubsub_client::{PubsubClient, SlotInfoMessage}, + rpc_client::RpcClient, + rpc_response::RpcVoteAccountInfo, +}; use solana_sdk::{ account_utils::StateMut, clock::{self, Slot}, @@ -23,6 +27,10 @@ use solana_sdk::{ use std::{ collections::{HashMap, VecDeque}, net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, thread::sleep, time::{Duration, Instant}, }; @@ -157,6 +165,19 @@ impl ClusterQuerySubCommands for App<'_, '_> { ), ), ) + .subcommand( + SubCommand::with_name("live-slots") + .about("Show information about the current slot progression") + .arg( + Arg::with_name("websocket_url") + .short("w") + .long("ws") + .value_name("URL") + .takes_value(true) + .default_value("ws://127.0.0.1:8900") + .help("WebSocket URL for PubSub RPC connection"), + ), + ) .subcommand( SubCommand::with_name("block-production") .about("Show information about block production") @@ -246,6 +267,14 @@ pub fn parse_cluster_ping(matches: &ArgMatches<'_>) -> Result) -> Result { + let url: String = value_t_or_exit!(matches, "websocket_url", String); + Ok(CliCommandInfo { + command: CliCommand::LiveSlots { url }, + require_keypair: false, + }) +} + pub fn parse_get_block_time(matches: &ArgMatches<'_>) -> Result { let slot = value_t_or_exit!(matches, "slot", u64); Ok(CliCommandInfo { @@ -810,6 +839,63 @@ pub fn process_ping( Ok("".to_string()) } +pub fn process_live_slots(url: &str) -> ProcessResult { + let exit = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + + ctrlc::set_handler(move || { + exit_clone.store(true, Ordering::Relaxed); + })?; + + let mut current: Option = None; + let mut message = "".to_string(); + + let slot_progress = new_spinner_progress_bar(); + slot_progress.set_message("Connecting..."); + let (mut client, receiver) = PubsubClient::slot_subscribe(url)?; + slot_progress.set_message("Connected."); + + loop { + if exit.load(Ordering::Relaxed) { + eprintln!("{}", message); + client.shutdown().unwrap(); + break; + } + + match receiver.recv() { + Ok(new_info) => { + message = format!("{:?}", new_info).to_owned(); + slot_progress.set_message(&message); + + if let Some(previous) = current { + let slot_delta: i64 = new_info.slot as i64 - previous.slot as i64; + let root_delta: i64 = new_info.root as i64 - previous.root as i64; + + // + // if slot has advanced out of step with the root, we detect + // a mismatch and output the slot information + // + if slot_delta != root_delta { + let prev_root = format!( + "|<- {} <- … <- {} <- {}", + previous.root, previous.parent, previous.slot + ) + .to_owned(); + slot_progress.println(&prev_root); + } + } + current = Some(new_info); + } + Err(err) => { + eprintln!("disconnected: {:?}", err); + break; + } + } + } + + Ok("".to_string()) +} + pub fn process_show_gossip(rpc_client: &RpcClient) -> ProcessResult { let cluster_nodes = rpc_client.get_cluster_nodes()?; diff --git a/client/Cargo.toml b/client/Cargo.toml index 9d35f4206..c50e416d3 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -20,6 +20,9 @@ serde_derive = "1.0.103" serde_json = "1.0.46" solana-net-utils = { path = "../net-utils", version = "0.24.0" } solana-sdk = { path = "../sdk", version = "0.24.0" } +thiserror = "1.0" +tungstenite = "0.10.1" +url = "2.1.1" [dev-dependencies] assert_matches = "1.3.0" diff --git a/client/src/lib.rs b/client/src/lib.rs index 0d7ba4dae..dd4d3183b 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -5,6 +5,7 @@ pub mod client_error; mod generic_rpc_client_request; pub mod mock_rpc_client_request; pub mod perf_utils; +pub mod pubsub_client; pub mod rpc_client; pub mod rpc_client_request; pub mod rpc_request; diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs new file mode 100644 index 000000000..db2233288 --- /dev/null +++ b/client/src/pubsub_client.rs @@ -0,0 +1,220 @@ +use log::*; +use serde::{de::DeserializeOwned, Deserialize}; +use serde_json::{ + json, + value::Value::{Number, Object}, + Map, Value, +}; +use std::{ + marker::PhantomData, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{channel, Receiver}, + Arc, RwLock, + }, + thread::JoinHandle, +}; +use thiserror::Error; +use tungstenite::{client::AutoStream, connect, Message, WebSocket}; +use url::{ParseError, Url}; + +#[derive(Debug, Error)] +pub enum PubsubClientError { + #[error("url parse error")] + UrlParseError(#[from] ParseError), + + #[error("unable to connect to server")] + ConnectionError(#[from] tungstenite::Error), + + #[error("json parse error")] + JsonParseError(#[from] serde_json::error::Error), + + #[error("unexpected message format")] + UnexpectedMessageError, +} + +#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)] +pub struct SlotInfoMessage { + pub parent: u64, + pub root: u64, + pub slot: u64, +} + +pub struct PubsubClientSubscription +where + T: DeserializeOwned, +{ + message_type: PhantomData, + operation: &'static str, + socket: Arc>>, + subscription_id: u64, + t_cleanup: Option>, + exit: Arc, +} + +impl Drop for PubsubClientSubscription +where + T: DeserializeOwned, +{ + fn drop(&mut self) { + self.send_unsubscribe() + .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket")); + self.socket + .write() + .unwrap() + .close(None) + .unwrap_or_else(|_| warn!("unable to close websocket")); + } +} + +impl PubsubClientSubscription +where + T: DeserializeOwned, +{ + fn send_subscribe( + writable_socket: &Arc>>, + operation: &str, + ) -> Result { + let method = format!("{}Subscribe", operation); + writable_socket + .write() + .unwrap() + .write_message(Message::Text( + json!({ + "jsonrpc":"2.0","id":1,"method":method,"params":[] + }) + .to_string(), + ))?; + let message = writable_socket.write().unwrap().read_message()?; + Self::extract_subscription_id(message) + } + + fn extract_subscription_id(message: Message) -> Result { + let message_text = &message.into_text()?; + let json_msg: Map = serde_json::from_str(message_text)?; + + if let Some(Number(x)) = json_msg.get("result") { + if let Some(x) = x.as_u64() { + return Ok(x); + } + } + + Err(PubsubClientError::UnexpectedMessageError) + } + + pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> { + let method = format!("{}Unubscribe", self.operation); + self.socket + .write() + .unwrap() + .write_message(Message::Text( + json!({ + "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id] + }) + .to_string(), + )) + .map_err(|err| err.into()) + } + + fn read_message( + writable_socket: &Arc>>, + ) -> Result { + let message = writable_socket.write().unwrap().read_message()?; + let message_text = &message.into_text().unwrap(); + let json_msg: Map = serde_json::from_str(message_text)?; + + if let Some(Object(value_1)) = json_msg.get("params") { + if let Some(value_2) = value_1.get("result") { + let x: T = serde_json::from_value::(value_2.clone()).unwrap(); + return Ok(x); + } + } + + Err(PubsubClientError::UnexpectedMessageError) + } + + pub fn shutdown(&mut self) -> std::thread::Result<()> { + if self.t_cleanup.is_some() { + info!("websocket thread - shutting down"); + self.exit.store(true, Ordering::Relaxed); + let x = self.t_cleanup.take().unwrap().join(); + info!("websocket thread - shut down."); + x + } else { + warn!("websocket thread - already shut down."); + Ok(()) + } + } +} + +const SLOT_OPERATION: &str = "slot"; + +pub struct PubsubClient {} + +impl PubsubClient { + pub fn slot_subscribe( + url: &str, + ) -> Result< + ( + PubsubClientSubscription, + Receiver, + ), + PubsubClientError, + > { + let url = Url::parse(url)?; + let (socket, _response) = connect(url)?; + let (sender, receiver) = channel::(); + + let socket = Arc::new(RwLock::new(socket)); + let socket_clone = socket.clone(); + let exit = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + let subscription_id = PubsubClientSubscription::::send_subscribe( + &socket_clone, + SLOT_OPERATION, + ) + .unwrap(); + + let t_cleanup = std::thread::spawn(move || { + loop { + if exit_clone.load(Ordering::Relaxed) { + break; + } + + let message: Result = + PubsubClientSubscription::read_message(&socket_clone); + + if let Ok(msg) = message { + match sender.send(msg.clone()) { + Ok(_) => (), + Err(err) => { + info!("receive error: {:?}", err); + break; + } + } + } else { + info!("receive error: {:?}", message); + break; + } + } + + info!("websocket - exited receive loop"); + }); + + let result: PubsubClientSubscription = PubsubClientSubscription { + message_type: PhantomData, + operation: SLOT_OPERATION, + socket, + subscription_id, + t_cleanup: Some(t_cleanup), + exit, + }; + + Ok((result, receiver)) + } +} + +#[cfg(test)] +mod tests { + // see core/tests/client.rs#test_slot_subscription() +} diff --git a/core/tests/client.rs b/core/tests/client.rs index 9b6b63147..69182a220 100644 --- a/core/tests/client.rs +++ b/core/tests/client.rs @@ -1,12 +1,26 @@ -use solana_client::rpc_client::RpcClient; -use solana_core::validator::new_validator_for_tests; -use solana_sdk::commitment_config::CommitmentConfig; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::KeypairUtil; -use solana_sdk::system_transaction; -use std::fs::remove_dir_all; -use std::thread::sleep; -use std::time::{Duration, Instant}; +use solana_client::{ + pubsub_client::{PubsubClient, SlotInfoMessage}, + rpc_client::RpcClient, +}; +use solana_core::{ + rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions, + validator::new_validator_for_tests, +}; +use solana_sdk::{ + commitment_config::CommitmentConfig, pubkey::Pubkey, rpc_port, signature::KeypairUtil, + system_transaction, +}; +use std::{ + fs::remove_dir_all, + net::{IpAddr, SocketAddr}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::sleep, + time::{Duration, Instant}, +}; +use systemstat::Ipv4Addr; #[test] fn test_rpc_client() { @@ -57,3 +71,50 @@ fn test_rpc_client() { server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } + +#[test] +fn test_slot_subscription() { + let pubsub_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + rpc_port::DEFAULT_RPC_PUBSUB_PORT, + ); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); + let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); + std::thread::sleep(Duration::from_millis(400)); + + let (mut client, receiver) = + PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap(); + + let mut errors: Vec<(SlotInfoMessage, SlotInfoMessage)> = Vec::new(); + + for i in 0..3 { + subscriptions.notify_slot(i + 1, i, i); + + let maybe_actual = receiver.recv_timeout(Duration::from_millis(400)); + + match maybe_actual { + Ok(actual) => { + let expected = SlotInfoMessage { + slot: i + 1, + parent: i, + root: i, + }; + + if actual != expected { + errors.push((actual, expected)); + } + } + Err(_err) => { + eprintln!("unexpected websocket receive timeout"); + break; + } + } + } + + exit.store(true, Ordering::Relaxed); + client.shutdown().unwrap(); + pubsub_service.close().unwrap(); + + assert_eq!(errors, [].to_vec()); +}