Add async WebSocket PubsubClient

This commit is contained in:
Kirill Fomichev 2021-12-26 21:04:56 +03:00 committed by Michael Vines
parent 3fab5a3b14
commit a6a8a712e5
6 changed files with 417 additions and 1 deletions

View File

@ -27,6 +27,7 @@ solana-test-validator = { path = "../test-validator", version = "=1.10.0" }
solana-transaction-status = { path = "../transaction-status", version = "=1.10.0" }
solana-version = { path = "../version", version = "=1.10.0" }
systemstat = "0.1.10"
tokio = { version = "1", features = ["full"] }
[dev-dependencies]
solana-logger = { path = "../logger", version = "=1.10.0" }

View File

@ -514,3 +514,86 @@ fn test_slot_subscription() {
assert_eq!(errors, [].to_vec());
}
#[tokio::test]
#[serial]
async fn test_slot_subscription_async() {
use {futures_util::StreamExt, solana_client::pubsub_client_async::PubsubClient};
let sync_service = Arc::new(AtomicU64::new(0));
let sync_client = Arc::clone(&sync_service);
fn wait_until(atomic: &Arc<AtomicU64>, value: u64) {
while atomic.load(Ordering::Relaxed) != value {
sleep(Duration::from_millis(1))
}
}
let pubsub_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
tokio::task::spawn_blocking(move || {
let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
));
let (trigger, pubsub_service) =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
sleep(Duration::from_millis(100));
sync_service.store(1, Ordering::Relaxed);
wait_until(&sync_service, 2);
subscriptions.notify_slot(1, 0, 0);
sync_service.store(3, Ordering::Relaxed);
wait_until(&sync_service, 4);
subscriptions.notify_slot(2, 1, 1);
sync_service.store(5, Ordering::Relaxed);
wait_until(&sync_service, 6);
exit.store(true, Ordering::Relaxed);
trigger.cancel();
pubsub_service.close().unwrap();
});
wait_until(&sync_client, 1);
let url = format!("ws://0.0.0.0:{}/", pubsub_addr.port());
let pubsub_client = PubsubClient::connect(&url).await.unwrap();
let (mut notifications, unsubscribe) = pubsub_client.slot_subscribe().await.unwrap();
sync_client.store(2, Ordering::Relaxed);
wait_until(&sync_client, 3);
assert_eq!(
tokio::time::timeout(Duration::from_millis(25), notifications.next()).await,
Ok(Some(SlotInfo {
slot: 1,
parent: 0,
root: 0,
}))
);
sync_client.store(4, Ordering::Relaxed);
wait_until(&sync_client, 5);
assert_eq!(
tokio::time::timeout(Duration::from_millis(25), notifications.next()).await,
Ok(Some(SlotInfo {
slot: 2,
parent: 1,
root: 1,
}))
);
sync_client.store(6, Ordering::Relaxed);
unsubscribe().await;
}

View File

@ -16,6 +16,7 @@ bincode = "1.3.3"
bs58 = "0.4.0"
clap = "2.33.0"
crossbeam-channel = "0.5"
futures-util = { version = "0.3.19", optional = true }
indicatif = "0.16.2"
jsonrpc-core = "18.0.0"
log = "0.4.14"
@ -36,6 +37,8 @@ solana-version = { path = "../version", version = "=1.10.0" }
solana-vote-program = { path = "../programs/vote", version = "=1.10.0" }
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1.8", optional = true }
tokio-tungstenite = { version = "0.16.0", features = ["rustls-tls-webpki-roots"], optional = true }
tungstenite = { version = "0.16.0", features = ["rustls-tls-webpki-roots"] }
url = "2.2.2"
@ -46,3 +49,7 @@ solana-logger = { path = "../logger", version = "=1.10.0" }
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
[features]
default = ["async"]
async = ["futures-util", "tokio-stream", "tokio-tungstenite"]

View File

@ -10,6 +10,8 @@ pub mod nonblocking;
pub mod nonce_utils;
pub mod perf_utils;
pub mod pubsub_client;
#[cfg(feature = "async")]
pub mod pubsub_client_async;
pub mod rpc_cache;
pub mod rpc_client;
pub mod rpc_config;

View File

@ -615,5 +615,5 @@ impl PubsubClient {
#[cfg(test)]
mod tests {
// see core/tests/client.rs#test_slot_subscription()
// see client-test/test/client.rs
}

View File

@ -0,0 +1,323 @@
use {
crate::{
rpc_config::{
RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
},
rpc_response::{
Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
},
},
futures_util::{
future::{ready, BoxFuture, FutureExt},
sink::SinkExt,
stream::{BoxStream, StreamExt},
},
serde::de::DeserializeOwned,
serde_json::{json, Map, Value},
solana_account_decoder::UiAccount,
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
std::collections::BTreeMap,
thiserror::Error,
tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
task::JoinHandle,
time::{sleep, Duration},
},
tokio_stream::wrappers::UnboundedReceiverStream,
tokio_tungstenite::{
connect_async,
tungstenite::{
protocol::frame::{coding::CloseCode, CloseFrame},
Message,
},
MaybeTlsStream, WebSocketStream,
},
url::Url,
};
pub type PubsubClientResult<T = ()> = Result<T, PubsubClientError>;
#[derive(Debug, Error)]
pub enum PubsubClientError {
#[error("url parse error")]
UrlParseError(#[from] url::ParseError),
#[error("unable to connect to server")]
ConnectionError(tokio_tungstenite::tungstenite::Error),
#[error("websocket error")]
WsError(#[from] tokio_tungstenite::tungstenite::Error),
#[error("connection closed")]
ConnectionClosed,
#[error("json parse error")]
JsonParseError(#[from] serde_json::error::Error),
#[error("subscribe failed: {reason}")]
SubscribeFailed {
reason: &'static str,
message: String,
},
}
type UnsubscribeFn = Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>;
type SubscribeResponseMsg = (mpsc::UnboundedReceiver<Value>, UnsubscribeFn);
type SubscribeRequestMsg = (String, Value, oneshot::Sender<SubscribeResponseMsg>);
type SubscribeResult<'a, T> = PubsubClientResult<(BoxStream<'a, T>, UnsubscribeFn)>;
#[derive(Debug)]
pub struct PubsubClient {
subscribe_tx: mpsc::UnboundedSender<SubscribeRequestMsg>,
shutdown_tx: oneshot::Sender<()>,
ws: JoinHandle<PubsubClientResult>,
}
impl PubsubClient {
pub async fn connect(url: &str) -> PubsubClientResult<Self> {
let url = Url::parse(url)?;
let (ws, _response) = connect_async(url)
.await
.map_err(PubsubClientError::ConnectionError)?;
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
Ok(Self {
subscribe_tx,
shutdown_tx,
ws: tokio::spawn(PubsubClient::run_ws(ws, subscribe_rx, shutdown_rx)),
})
}
pub async fn shutdown(self) -> PubsubClientResult {
let _ = self.shutdown_tx.send(());
self.ws.await.unwrap() // WS future should not be cancelled or panicked
}
async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T>
where
T: DeserializeOwned + Send + 'a,
{
let (response_tx, response_rx) = oneshot::channel();
self.subscribe_tx
.send((operation.to_string(), params, response_tx))
.map_err(|_| PubsubClientError::ConnectionClosed)?;
let (notifications, unsubscribe) = response_rx
.await
.map_err(|_| PubsubClientError::ConnectionClosed)?;
Ok((
UnboundedReceiverStream::new(notifications)
.filter_map(|value| ready(serde_json::from_value::<T>(value).ok()))
.boxed(),
unsubscribe,
))
}
pub async fn account_subscribe(
&self,
pubkey: &Pubkey,
config: Option<RpcAccountInfoConfig>,
) -> SubscribeResult<'_, RpcResponse<UiAccount>> {
let params = json!([pubkey.to_string(), config]);
self.subscribe("account", params).await
}
pub async fn block_subscribe(
&self,
filter: RpcBlockSubscribeFilter,
config: Option<RpcBlockSubscribeConfig>,
) -> SubscribeResult<'_, RpcResponse<RpcBlockUpdate>> {
self.subscribe("block", json!([filter, config])).await
}
pub async fn logs_subscribe(
&self,
filter: RpcTransactionLogsFilter,
config: RpcTransactionLogsConfig,
) -> SubscribeResult<'_, RpcResponse<RpcLogsResponse>> {
self.subscribe("logs", json!([filter, config])).await
}
pub async fn program_subscribe(
&self,
pubkey: &Pubkey,
config: Option<RpcProgramAccountsConfig>,
) -> SubscribeResult<'_, RpcResponse<RpcKeyedAccount>> {
let params = json!([pubkey.to_string(), config]);
self.subscribe("program", params).await
}
pub async fn vote_subscribe(&self) -> SubscribeResult<'_, RpcVote> {
self.subscribe("vote", json!([])).await
}
pub async fn root_subscribe(&self) -> SubscribeResult<'_, Slot> {
self.subscribe("root", json!([])).await
}
pub async fn signature_subscribe(
&self,
signature: &Signature,
config: Option<RpcSignatureSubscribeConfig>,
) -> SubscribeResult<'_, RpcResponse<RpcSignatureResult>> {
let params = json!([signature.to_string(), config]);
self.subscribe("signature", params).await
}
pub async fn slot_subscribe(&self) -> SubscribeResult<'_, SlotInfo> {
self.subscribe("slot", json!([])).await
}
pub async fn slot_updates_subscribe(&self) -> SubscribeResult<'_, SlotUpdate> {
self.subscribe("slotsUpdates", json!([])).await
}
async fn run_ws(
mut ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequestMsg>,
mut shutdown_rx: oneshot::Receiver<()>,
) -> PubsubClientResult {
let mut request_id: u64 = 0;
let mut requests_subscribe = BTreeMap::new();
let mut requests_unsubscribe = BTreeMap::<u64, oneshot::Sender<()>>::new();
let mut subscriptions = BTreeMap::new();
let (unsubscribe_tx, mut unsubscribe_rx) = mpsc::unbounded_channel();
loop {
tokio::select! {
// Send close on shutdown signal
_ = (&mut shutdown_rx) => {
let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() };
ws.send(Message::Close(Some(frame))).await?;
ws.flush().await?;
break;
},
// Send `Message::Ping` each 10s if no any other communication
() = sleep(Duration::from_secs(10)) => {
ws.send(Message::Ping(Vec::new())).await?;
},
// Read message for subscribe
Some((operation, params, response_tx)) = subscribe_rx.recv() => {
request_id += 1;
let method = format!("{}Subscribe", operation);
let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
ws.send(Message::Text(text)).await?;
requests_subscribe.insert(request_id, (operation, response_tx));
},
// Read message for unsubscribe
Some((operation, sid, response_tx)) = unsubscribe_rx.recv() => {
subscriptions.remove(&sid);
request_id += 1;
let method = format!("{}Unsubscribe", operation);
let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string();
ws.send(Message::Text(text)).await?;
requests_unsubscribe.insert(request_id, response_tx);
},
// Read incoming WebSocket message
next_msg = ws.next() => {
let msg = match next_msg {
Some(msg) => msg?,
None => break,
};
// Get text from the message
let text = match msg {
Message::Text(text) => text,
Message::Binary(_data) => continue, // Ignore
Message::Ping(data) => {
ws.send(Message::Pong(data)).await?;
continue
},
Message::Pong(_data) => continue,
Message::Close(_frame) => break,
};
let mut json: Map<String, Value> = serde_json::from_str(&text)?;
// Subscribe/Unsubscribe response, example:
// `{"jsonrpc":"2.0","result":5308752,"id":1}`
if let Some(id) = json.get("id") {
// Request Id
let id = id.as_u64().ok_or_else(|| {
PubsubClientError::SubscribeFailed { reason: "invalid `id` field", message: text.clone() }
})?;
// Check that response is unsubscribe
if let Some(response_tx) = requests_unsubscribe.remove(&id) {
let _ = response_tx.send(()); // do not care if receiver is closed
} else {
// Subscribe Id
let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| {
PubsubClientError::SubscribeFailed { reason: "invalid `result` field", message: text.clone() }
})?;
// Get subscribe request details
let (operation, response_tx) = requests_subscribe.remove(&id).ok_or_else(|| {
PubsubClientError::SubscribeFailed { reason: "request for received `id` not found", message: text.clone() }
})?;
// Create notifications channel and unsubscribe function
let (notifications_tx, notifications_rx) = mpsc::unbounded_channel();
let unsubscribe_tx = unsubscribe_tx.clone();
let unsubscribe = Box::new(move || async move {
let (response_tx, response_rx) = oneshot::channel();
// do nothing if ws already closed
if unsubscribe_tx.send((operation, sid, response_tx)).is_ok() {
let _ = response_rx.await; // channel can be closed only if ws is closed
}
}.boxed());
// Resolve subscribe request
match response_tx.send((notifications_rx, unsubscribe)) {
Ok(()) => {
subscriptions.insert(sid, notifications_tx);
}
Err((_notifications_rx, unsubscribe)) => {
unsubscribe();
}
};
}
continue;
}
// Notification, example:
// `{"jsonrpc":"2.0","method":"logsNotification","params":{"result":{...},"subscription":3114862}}`
if let Some(Value::Object(params)) = json.get_mut("params") {
if let Some(sid) = params.get("subscription").and_then(Value::as_u64) {
let mut unsubscribe_required = false;
if let Some(notifications_tx) = subscriptions.get(&sid) {
if let Some(result) = params.remove("result") {
if notifications_tx.send(result).is_err() {
unsubscribe_required = true;
}
}
} else {
unsubscribe_required = true;
}
if unsubscribe_required {
if let Some(Value::String(method)) = json.remove("method") {
if let Some(operation) = method.strip_suffix("Notification") {
let (response_tx, _response_rx) = oneshot::channel();
let _ = unsubscribe_tx.send((operation.to_string(), sid, response_tx));
}
}
}
}
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
// see client-test/test/client.rs
}