Revert signature-notification format change (#12032)

* Use untagged RpcSignatureResult enum to avoid breaking downstream consumers of current signature subscriptions

* Clean up client duplication

* Clippy
This commit is contained in:
Tyera Eulberg 2020-09-03 18:14:45 -06:00 committed by GitHub
parent 2c091e4fca
commit 39246f9dd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 56 additions and 96 deletions

View File

@ -10,9 +10,10 @@ use solana_clap_utils::{
commitment::commitment_arg, input_parsers::*, input_validators::*, keypair::signer_from_path,
};
use solana_client::{
pubsub_client::{PubsubClient, SlotInfoMessage},
pubsub_client::PubsubClient,
rpc_client::{GetConfirmedSignaturesForAddress2Config, RpcClient},
rpc_config::{RpcLargestAccountsConfig, RpcLargestAccountsFilter},
rpc_response::SlotInfo,
};
use solana_remote_wallet::remote_wallet::RemoteWalletManager;
use solana_sdk::{
@ -1033,7 +1034,7 @@ pub fn process_live_slots(url: &str) -> ProcessResult {
})?;
*/
let mut current: Option<SlotInfoMessage> = None;
let mut current: Option<SlotInfo> = None;
let mut message = "".to_string();
let slot_progress = new_spinner_progress_bar();

View File

@ -1,13 +1,12 @@
use crate::rpc_response::{Response as RpcResponse, RpcSignatureResult, SlotInfo};
use log::*;
use serde::{de::DeserializeOwned, Deserialize};
use serde::de::DeserializeOwned;
use serde_json::{
json,
value::Value::{Number, Object},
Map, Value,
};
use solana_sdk::{
commitment_config::CommitmentConfig, signature::Signature, transaction::TransactionError,
};
use solana_sdk::signature::Signature;
use std::{
marker::PhantomData,
sync::{
@ -21,7 +20,7 @@ use thiserror::Error;
use tungstenite::{client::AutoStream, connect, Message, WebSocket};
use url::{ParseError, Url};
type PubsubSignatureResponse = PubsubClientSubscription<RpcResponse<SignatureResult>>;
type PubsubSignatureResponse = PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
#[derive(Debug, Error)]
pub enum PubsubClientError {
@ -38,45 +37,6 @@ pub enum PubsubClientError {
UnexpectedMessageError,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "camelCase", tag = "type", content = "result")]
pub enum SignatureResult {
ProcessedSignatureResult(ProcessedSignatureResult),
ReceivedSignature,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub struct RpcResponseContext {
pub slot: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ProcessedSignatureResult {
pub err: Option<TransactionError>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RpcResponse<T> {
pub context: RpcResponseContext,
pub value: T,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub struct SlotInfoMessage {
pub parent: u64,
pub root: u64,
pub slot: u64,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcSignatureSubscribeConfig {
#[serde(flatten)]
pub commitment: Option<CommitmentConfig>,
pub enable_received_notification: Option<bool>,
}
pub struct PubsubClientSubscription<T>
where
T: DeserializeOwned,
@ -186,22 +146,16 @@ pub struct PubsubClient {}
impl PubsubClient {
pub fn slot_subscribe(
url: &str,
) -> Result<
(
PubsubClientSubscription<SlotInfoMessage>,
Receiver<SlotInfoMessage>,
),
PubsubClientError,
> {
) -> Result<(PubsubClientSubscription<SlotInfo>, Receiver<SlotInfo>), PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let (sender, receiver) = channel::<SlotInfoMessage>();
let (sender, receiver) = channel::<SlotInfo>();
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let subscription_id = PubsubClientSubscription::<SlotInfoMessage>::send_subscribe(
let subscription_id = PubsubClientSubscription::<SlotInfo>::send_subscribe(
&socket_clone,
json!({
"jsonrpc":"2.0","id":1,"method":format!("{}Subscribe", SLOT_OPERATION),"params":[]
@ -216,11 +170,11 @@ impl PubsubClient {
break;
}
let message: Result<SlotInfoMessage, PubsubClientError> =
let message: Result<SlotInfo, PubsubClientError> =
PubsubClientSubscription::read_message(&socket_clone);
if let Ok(msg) = message {
match sender.send(msg.clone()) {
match sender.send(msg) {
Ok(_) => (),
Err(err) => {
info!("receive error: {:?}", err);
@ -236,7 +190,7 @@ impl PubsubClient {
info!("websocket - exited receive loop");
});
let result: PubsubClientSubscription<SlotInfoMessage> = PubsubClientSubscription {
let result: PubsubClientSubscription<SlotInfo> = PubsubClientSubscription {
message_type: PhantomData,
operation: SLOT_OPERATION,
socket,
@ -254,13 +208,13 @@ impl PubsubClient {
) -> Result<
(
PubsubSignatureResponse,
Receiver<RpcResponse<SignatureResult>>,
Receiver<RpcResponse<RpcSignatureResult>>,
),
PubsubClientError,
> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let (sender, receiver) = channel::<RpcResponse<SignatureResult>>();
let (sender, receiver) = channel::<RpcResponse<RpcSignatureResult>>();
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
@ -277,7 +231,7 @@ impl PubsubClient {
})
.to_string();
let subscription_id =
PubsubClientSubscription::<RpcResponse<SignatureResult>>::send_subscribe(
PubsubClientSubscription::<RpcResponse<RpcSignatureResult>>::send_subscribe(
&socket_clone,
body,
)
@ -289,7 +243,7 @@ impl PubsubClient {
break;
}
let message: Result<RpcResponse<SignatureResult>, PubsubClientError> =
let message: Result<RpcResponse<RpcSignatureResult>, PubsubClientError> =
PubsubClientSubscription::read_message(&socket_clone);
if let Ok(msg) = message {
@ -309,7 +263,7 @@ impl PubsubClient {
info!("websocket - exited receive loop");
});
let result: PubsubClientSubscription<RpcResponse<SignatureResult>> =
let result: PubsubClientSubscription<RpcResponse<RpcSignatureResult>> =
PubsubClientSubscription {
message_type: PhantomData,
operation: SIGNATURE_OPERATION,

View File

@ -94,11 +94,18 @@ pub struct RpcKeyedAccount {
pub account: UiAccount,
}
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)]
pub struct SlotInfo {
pub slot: Slot,
pub parent: Slot,
pub root: Slot,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase", tag = "type", content = "result")]
#[serde(rename_all = "camelCase", untagged)]
pub enum RpcSignatureResult {
ProcessedSignatureResult(ProcessedSignatureResult),
ReceivedSignature,
ProcessedSignature(ProcessedSignatureResult),
ReceivedSignature(ReceivedSignatureResult),
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@ -107,6 +114,12 @@ pub struct ProcessedSignatureResult {
pub err: Option<TransactionError>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub enum ReceivedSignatureResult {
ReceivedSignature,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct RpcContactInfo {
/// Pubkey of the node as a base-58 string

View File

@ -1,13 +1,13 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::rpc_subscriptions::{RpcSubscriptions, RpcVote, SlotInfo};
use crate::rpc_subscriptions::{RpcSubscriptions, RpcVote};
use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId};
use solana_account_decoder::UiAccount;
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig},
rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcSignatureResult},
rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcSignatureResult, SlotInfo},
};
#[cfg(test)]
use solana_runtime::bank_forks::BankForks;
@ -361,7 +361,7 @@ mod tests {
use jsonrpc_pubsub::{PubSubHandler, Session};
use serial_test_derive::serial;
use solana_account_decoder::{parse_account_data::parse_account_data, UiAccountEncoding};
use solana_client::rpc_response::ProcessedSignatureResult;
use solana_client::rpc_response::{ProcessedSignatureResult, ReceivedSignatureResult};
use solana_runtime::{
bank::Bank,
bank_forks::BankForks,
@ -447,7 +447,7 @@ mod tests {
// Test signature confirmation notification
let (response, _) = robust_poll_or_panic(receiver);
let expected_res =
RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { err: None });
RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: None });
let expected = json!({
"jsonrpc": "2.0",
"method": "signatureNotification",
@ -479,7 +479,8 @@ mod tests {
.notify_signatures_received((received_slot, vec![tx.signatures[0]]));
// Test signature confirmation notification
let (response, _) = robust_poll_or_panic(receiver);
let expected_res = RpcSignatureResult::ReceivedSignature;
let expected_res =
RpcSignatureResult::ReceivedSignature(ReceivedSignatureResult::ReceivedSignature);
let expected = json!({
"jsonrpc": "2.0",
"method": "signatureNotification",

View File

@ -13,7 +13,8 @@ use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig},
rpc_filter::RpcFilterType,
rpc_response::{
ProcessedSignatureResult, Response, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult,
ProcessedSignatureResult, ReceivedSignatureResult, Response, RpcKeyedAccount,
RpcResponseContext, RpcSignatureResult, SlotInfo,
},
};
use solana_runtime::{
@ -47,13 +48,6 @@ use tokio_01::runtime::{Builder as RuntimeBuilder, Runtime, TaskExecutor};
const RECEIVE_DELAY_MILLIS: u64 = 100;
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub struct SlotInfo {
pub slot: Slot,
pub parent: Slot,
pub root: Slot,
}
// A more human-friendly version of Vote, with the bank state signature base58 encoded.
#[derive(Serialize, Deserialize, Debug)]
pub struct RpcVote {
@ -291,9 +285,7 @@ fn filter_signature_result(
) -> (Box<dyn Iterator<Item = RpcSignatureResult>>, Slot) {
(
Box::new(result.into_iter().map(|result| {
RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult {
err: result.err(),
})
RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: result.err() })
})),
last_notified_slot,
)
@ -986,7 +978,9 @@ impl RpcSubscriptions {
context: RpcResponseContext {
slot: *received_slot,
},
value: RpcSignatureResult::ReceivedSignature,
value: RpcSignatureResult::ReceivedSignature(
ReceivedSignatureResult::ReceivedSignature,
),
},
&sink,
);
@ -1369,8 +1363,9 @@ pub(crate) mod tests {
.notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]]));
subscriptions.notify_subscribers(commitment_slots);
let expected_res =
RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { err: None });
let received_expected_res = RpcSignatureResult::ReceivedSignature;
RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: None });
let received_expected_res =
RpcSignatureResult::ReceivedSignature(ReceivedSignatureResult::ReceivedSignature);
struct Notification {
slot: Slot,
id: u64,

View File

@ -1,7 +1,4 @@
use solana_client::{
pubsub_client::{PubsubClient, SlotInfoMessage},
rpc_client::RpcClient,
};
use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo};
use solana_core::{
rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions,
validator::TestValidator,
@ -105,7 +102,7 @@ fn test_slot_subscription() {
let (mut client, receiver) =
PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
let mut errors: Vec<(SlotInfoMessage, SlotInfoMessage)> = Vec::new();
let mut errors: Vec<(SlotInfo, SlotInfo)> = Vec::new();
for i in 0..3 {
subscriptions.notify_slot(i + 1, i, i);
@ -114,7 +111,7 @@ fn test_slot_subscription() {
match maybe_actual {
Ok(actual) => {
let expected = SlotInfoMessage {
let expected = SlotInfo {
slot: i + 1,
parent: i,
root: i,

View File

@ -285,7 +285,7 @@ fn test_rpc_subscriptions() {
let timeout = deadline.saturating_duration_since(Instant::now());
match status_receiver.recv_timeout(timeout) {
Ok((sig, result)) => {
if let RpcSignatureResult::ProcessedSignatureResult(result) = result.value {
if let RpcSignatureResult::ProcessedSignature(result) = result.value {
assert!(result.err.is_none());
assert!(signature_set.remove(&sig));
} else {

View File

@ -3,8 +3,7 @@ use gag::BufferRedirect;
use log::*;
use serial_test_derive::serial;
use solana_client::{
pubsub_client::{PubsubClient, SignatureResult},
rpc_client::RpcClient,
pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::RpcSignatureResult,
thin_client::create_client,
};
use solana_core::{
@ -179,11 +178,11 @@ fn test_local_cluster_signature_subscribe() {
let mut should_break = false;
for response in responses {
match response.value {
SignatureResult::ProcessedSignatureResult(_) => {
RpcSignatureResult::ProcessedSignature(_) => {
should_break = true;
break;
}
SignatureResult::ReceivedSignature => {
RpcSignatureResult::ReceivedSignature(_) => {
got_received_notification = true;
}
}