diff --git a/client/src/http_sender.rs b/client/src/http_sender.rs index 56950399d..f38ce866d 100644 --- a/client/src/http_sender.rs +++ b/client/src/http_sender.rs @@ -63,9 +63,9 @@ impl HttpSender { } #[derive(Deserialize, Debug)] -struct RpcErrorObject { - code: i64, - message: String, +pub(crate) struct RpcErrorObject { + pub code: i64, + pub message: String, } struct StatsUpdater<'a> { diff --git a/client/src/nonblocking/pubsub_client.rs b/client/src/nonblocking/pubsub_client.rs index 91c06c08a..9a0dcf57b 100644 --- a/client/src/nonblocking/pubsub_client.rs +++ b/client/src/nonblocking/pubsub_client.rs @@ -1,5 +1,6 @@ use { crate::{ + http_sender::RpcErrorObject, rpc_config::{ RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, @@ -15,6 +16,7 @@ use { sink::SinkExt, stream::{BoxStream, StreamExt}, }, + log::*, serde::de::DeserializeOwned, serde_json::{json, Map, Value}, solana_account_decoder::UiAccount, @@ -52,21 +54,19 @@ pub enum PubsubClientError { #[error("websocket error")] WsError(#[from] tokio_tungstenite::tungstenite::Error), - #[error("connection closed")] - ConnectionClosed, + #[error("connection closed (({0})")] + ConnectionClosed(String), #[error("json parse error")] JsonParseError(#[from] serde_json::error::Error), #[error("subscribe failed: {reason}")] - SubscribeFailed { - reason: &'static str, - message: String, - }, + SubscribeFailed { reason: String, message: String }, } type UnsubscribeFn = Box BoxFuture<'static, ()> + Send>; -type SubscribeResponseMsg = (mpsc::UnboundedReceiver, UnsubscribeFn); +type SubscribeResponseMsg = + Result<(mpsc::UnboundedReceiver, UnsubscribeFn), PubsubClientError>; type SubscribeRequestMsg = (String, Value, oneshot::Sender); type SubscribeResult<'a, T> = PubsubClientResult<(BoxStream<'a, T>, UnsubscribeFn)>; @@ -106,10 +106,11 @@ impl PubsubClient { let (response_tx, response_rx) = oneshot::channel(); self.subscribe_tx .send((operation.to_string(), params, response_tx)) - .map_err(|_| PubsubClientError::ConnectionClosed)?; + .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?; + let (notifications, unsubscribe) = response_rx .await - .map_err(|_| PubsubClientError::ConnectionClosed)?; + .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??; Ok(( UnboundedReceiverStream::new(notifications) .filter_map(|value| ready(serde_json::from_value::(value).ok())) @@ -225,6 +226,7 @@ impl PubsubClient { Some(msg) => msg?, None => break, }; + trace!("ws.next(): {:?}", &msg); // Get text from the message let text = match msg { @@ -237,52 +239,64 @@ impl PubsubClient { Message::Pong(_data) => continue, Message::Close(_frame) => break, }; + + let mut json: Map = serde_json::from_str(&text)?; // Subscribe/Unsubscribe response, example: // `{"jsonrpc":"2.0","result":5308752,"id":1}` if let Some(id) = json.get("id") { - // Request Id let id = id.as_u64().ok_or_else(|| { - PubsubClientError::SubscribeFailed { reason: "invalid `id` field", message: text.clone() } + PubsubClientError::SubscribeFailed { reason: "invalid `id` field".into(), message: text.clone() } })?; - // Check that response is unsubscribe + let err = json.get("error").map(|error_object| { + match serde_json::from_value::(error_object.clone()) { + Ok(rpc_error_object) => { + format!("{} ({})", rpc_error_object.message, rpc_error_object.code) + } + Err(err) => format!( + "Failed to deserialize RPC error response: {} [{}]", + serde_json::to_string(error_object).unwrap(), + err + ) + } + }); + if let Some(response_tx) = requests_unsubscribe.remove(&id) { let _ = response_tx.send(()); // do not care if receiver is closed - } else { - // Subscribe Id - let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| { - PubsubClientError::SubscribeFailed { reason: "invalid `result` field", message: text.clone() } - })?; + } else if let Some((operation, response_tx)) = requests_subscribe.remove(&id) { + match err { + Some(reason) => { + let _ = response_tx.send(Err(PubsubClientError::SubscribeFailed { reason, message: text.clone()})); + }, + None => { + // Subscribe Id + let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| { + PubsubClientError::SubscribeFailed { reason: "invalid `result` field".into(), message: text.clone() } + })?; - // Get subscribe request details - let (operation, response_tx) = requests_subscribe.remove(&id).ok_or_else(|| { - PubsubClientError::SubscribeFailed { reason: "request for received `id` not found", message: text.clone() } - })?; + // Create notifications channel and unsubscribe function + let (notifications_tx, notifications_rx) = mpsc::unbounded_channel(); + let unsubscribe_tx = unsubscribe_tx.clone(); + let unsubscribe = Box::new(move || async move { + let (response_tx, response_rx) = oneshot::channel(); + // do nothing if ws already closed + if unsubscribe_tx.send((operation, sid, response_tx)).is_ok() { + let _ = response_rx.await; // channel can be closed only if ws is closed + } + }.boxed()); - // Create notifications channel and unsubscribe function - let (notifications_tx, notifications_rx) = mpsc::unbounded_channel(); - let unsubscribe_tx = unsubscribe_tx.clone(); - let unsubscribe = Box::new(move || async move { - let (response_tx, response_rx) = oneshot::channel(); - // do nothing if ws already closed - if unsubscribe_tx.send((operation, sid, response_tx)).is_ok() { - let _ = response_rx.await; // channel can be closed only if ws is closed - } - }.boxed()); - - // Resolve subscribe request - match response_tx.send((notifications_rx, unsubscribe)) { - Ok(()) => { + if response_tx.send(Ok((notifications_rx, unsubscribe))).is_err() { + break; + } subscriptions.insert(sid, notifications_tx); } - Err((_notifications_rx, unsubscribe)) => { - unsubscribe(); - } - }; + } + } else { + error!("Unknown request id: {}", id); + break; } - continue; }