Propagate subscription failures to the caller

This commit is contained in:
Michael Vines 2022-01-26 21:35:31 -08:00
parent 8c376f58cb
commit 9d477d45c7
2 changed files with 57 additions and 43 deletions

View File

@ -63,9 +63,9 @@ impl HttpSender {
}
#[derive(Deserialize, Debug)]
struct RpcErrorObject {
code: i64,
message: String,
pub(crate) struct RpcErrorObject {
pub code: i64,
pub message: String,
}
struct StatsUpdater<'a> {

View File

@ -1,5 +1,6 @@
use {
crate::{
http_sender::RpcErrorObject,
rpc_config::{
RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
@ -15,6 +16,7 @@ use {
sink::SinkExt,
stream::{BoxStream, StreamExt},
},
log::*,
serde::de::DeserializeOwned,
serde_json::{json, Map, Value},
solana_account_decoder::UiAccount,
@ -52,21 +54,19 @@ pub enum PubsubClientError {
#[error("websocket error")]
WsError(#[from] tokio_tungstenite::tungstenite::Error),
#[error("connection closed")]
ConnectionClosed,
#[error("connection closed (({0})")]
ConnectionClosed(String),
#[error("json parse error")]
JsonParseError(#[from] serde_json::error::Error),
#[error("subscribe failed: {reason}")]
SubscribeFailed {
reason: &'static str,
message: String,
},
SubscribeFailed { reason: String, message: String },
}
type UnsubscribeFn = Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>;
type SubscribeResponseMsg = (mpsc::UnboundedReceiver<Value>, UnsubscribeFn);
type SubscribeResponseMsg =
Result<(mpsc::UnboundedReceiver<Value>, UnsubscribeFn), PubsubClientError>;
type SubscribeRequestMsg = (String, Value, oneshot::Sender<SubscribeResponseMsg>);
type SubscribeResult<'a, T> = PubsubClientResult<(BoxStream<'a, T>, UnsubscribeFn)>;
@ -106,10 +106,11 @@ impl PubsubClient {
let (response_tx, response_rx) = oneshot::channel();
self.subscribe_tx
.send((operation.to_string(), params, response_tx))
.map_err(|_| PubsubClientError::ConnectionClosed)?;
.map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?;
let (notifications, unsubscribe) = response_rx
.await
.map_err(|_| PubsubClientError::ConnectionClosed)?;
.map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??;
Ok((
UnboundedReceiverStream::new(notifications)
.filter_map(|value| ready(serde_json::from_value::<T>(value).ok()))
@ -225,6 +226,7 @@ impl PubsubClient {
Some(msg) => msg?,
None => break,
};
trace!("ws.next(): {:?}", &msg);
// Get text from the message
let text = match msg {
@ -237,52 +239,64 @@ impl PubsubClient {
Message::Pong(_data) => continue,
Message::Close(_frame) => break,
};
let mut json: Map<String, Value> = serde_json::from_str(&text)?;
// Subscribe/Unsubscribe response, example:
// `{"jsonrpc":"2.0","result":5308752,"id":1}`
if let Some(id) = json.get("id") {
// Request Id
let id = id.as_u64().ok_or_else(|| {
PubsubClientError::SubscribeFailed { reason: "invalid `id` field", message: text.clone() }
PubsubClientError::SubscribeFailed { reason: "invalid `id` field".into(), message: text.clone() }
})?;
// Check that response is unsubscribe
let err = json.get("error").map(|error_object| {
match serde_json::from_value::<RpcErrorObject>(error_object.clone()) {
Ok(rpc_error_object) => {
format!("{} ({})", rpc_error_object.message, rpc_error_object.code)
}
Err(err) => format!(
"Failed to deserialize RPC error response: {} [{}]",
serde_json::to_string(error_object).unwrap(),
err
)
}
});
if let Some(response_tx) = requests_unsubscribe.remove(&id) {
let _ = response_tx.send(()); // do not care if receiver is closed
} else {
// Subscribe Id
let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| {
PubsubClientError::SubscribeFailed { reason: "invalid `result` field", message: text.clone() }
})?;
} else if let Some((operation, response_tx)) = requests_subscribe.remove(&id) {
match err {
Some(reason) => {
let _ = response_tx.send(Err(PubsubClientError::SubscribeFailed { reason, message: text.clone()}));
},
None => {
// Subscribe Id
let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| {
PubsubClientError::SubscribeFailed { reason: "invalid `result` field".into(), message: text.clone() }
})?;
// Get subscribe request details
let (operation, response_tx) = requests_subscribe.remove(&id).ok_or_else(|| {
PubsubClientError::SubscribeFailed { reason: "request for received `id` not found", message: text.clone() }
})?;
// Create notifications channel and unsubscribe function
let (notifications_tx, notifications_rx) = mpsc::unbounded_channel();
let unsubscribe_tx = unsubscribe_tx.clone();
let unsubscribe = Box::new(move || async move {
let (response_tx, response_rx) = oneshot::channel();
// do nothing if ws already closed
if unsubscribe_tx.send((operation, sid, response_tx)).is_ok() {
let _ = response_rx.await; // channel can be closed only if ws is closed
}
}.boxed());
// Create notifications channel and unsubscribe function
let (notifications_tx, notifications_rx) = mpsc::unbounded_channel();
let unsubscribe_tx = unsubscribe_tx.clone();
let unsubscribe = Box::new(move || async move {
let (response_tx, response_rx) = oneshot::channel();
// do nothing if ws already closed
if unsubscribe_tx.send((operation, sid, response_tx)).is_ok() {
let _ = response_rx.await; // channel can be closed only if ws is closed
}
}.boxed());
// Resolve subscribe request
match response_tx.send((notifications_rx, unsubscribe)) {
Ok(()) => {
if response_tx.send(Ok((notifications_rx, unsubscribe))).is_err() {
break;
}
subscriptions.insert(sid, notifications_tx);
}
Err((_notifications_rx, unsubscribe)) => {
unsubscribe();
}
};
}
} else {
error!("Unknown request id: {}", id);
break;
}
continue;
}