Remove RpcClient code duplication (#9952)

This commit is contained in:
Michael Vines 2020-05-10 08:51:53 -07:00 committed by GitHub
parent 405e39fb9f
commit af6a8f5fac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 199 additions and 356 deletions

View File

@ -697,16 +697,11 @@ impl Archiver {
RpcClient::new_socket(rpc_peers[node_index].rpc) RpcClient::new_socket(rpc_peers[node_index].rpc)
}; };
Ok(rpc_client Ok(rpc_client
.send( .send::<u64>(
&RpcRequest::GetSlotsPerSegment, RpcRequest::GetSlotsPerSegment,
serde_json::json!([client_commitment]), serde_json::json!([client_commitment]),
0, 0,
) )
.map_err(|err| {
warn!("Error while making rpc request {:?}", err);
ArchiverError::ClientError(err)
})?
.as_u64()
.unwrap()) .unwrap())
} else { } else {
Err(ArchiverError::NoRpcPeers) Err(ArchiverError::NoRpcPeers)
@ -749,21 +744,14 @@ impl Archiver {
let node_index = thread_rng().gen_range(0, rpc_peers.len()); let node_index = thread_rng().gen_range(0, rpc_peers.len());
RpcClient::new_socket(rpc_peers[node_index].rpc) RpcClient::new_socket(rpc_peers[node_index].rpc)
}; };
let response = rpc_client
.send(
&RpcRequest::GetStorageTurn,
serde_json::value::Value::Null,
0,
)
.map_err(|err| {
warn!("Error while making rpc request {:?}", err);
ArchiverError::ClientError(err)
})?;
let RpcStorageTurn { let RpcStorageTurn {
blockhash: storage_blockhash, blockhash: storage_blockhash,
slot: turn_slot, slot: turn_slot,
} = serde_json::from_value::<RpcStorageTurn>(response) } = rpc_client.send(
.map_err(ArchiverError::JsonError)?; RpcRequest::GetStorageTurn,
serde_json::value::Value::Null,
0,
)?;
let turn_blockhash = storage_blockhash.parse().map_err(|err| { let turn_blockhash = storage_blockhash.parse().map_err(|err| {
io::Error::new( io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,

View File

@ -50,28 +50,29 @@ impl Into<TransportError> for ClientErrorKind {
#[derive(Error, Debug)] #[derive(Error, Debug)]
#[error("{kind}")] #[error("{kind}")]
pub struct ClientError { pub struct ClientError {
command: Option<&'static str>, request: Option<rpc_request::RpcRequest>,
#[source] #[source]
kind: ClientErrorKind, kind: ClientErrorKind,
} }
impl ClientError { impl ClientError {
pub fn new_with_command(kind: ClientErrorKind, command: &'static str) -> Self { pub fn new_with_request(kind: ClientErrorKind, request: rpc_request::RpcRequest) -> Self {
Self { Self {
command: Some(command), request: Some(request),
kind, kind,
} }
} }
pub fn into_with_command(self, command: &'static str) -> Self { pub fn into_with_request(self, request: rpc_request::RpcRequest) -> Self {
Self { Self {
command: Some(command), request: Some(request),
..self ..self
} }
} }
pub fn command(&self) -> Option<&'static str> { pub fn request(&self) -> Option<&rpc_request::RpcRequest> {
self.command self.request.as_ref()
} }
pub fn kind(&self) -> &ClientErrorKind { pub fn kind(&self) -> &ClientErrorKind {
@ -82,7 +83,7 @@ impl ClientError {
impl From<ClientErrorKind> for ClientError { impl From<ClientErrorKind> for ClientError {
fn from(kind: ClientErrorKind) -> Self { fn from(kind: ClientErrorKind) -> Self {
Self { Self {
command: None, request: None,
kind, kind,
} }
} }
@ -91,7 +92,7 @@ impl From<ClientErrorKind> for ClientError {
impl From<TransportError> for ClientError { impl From<TransportError> for ClientError {
fn from(err: TransportError) -> Self { fn from(err: TransportError) -> Self {
Self { Self {
command: None, request: None,
kind: err.into(), kind: err.into(),
} }
} }
@ -106,7 +107,7 @@ impl Into<TransportError> for ClientError {
impl From<std::io::Error> for ClientError { impl From<std::io::Error> for ClientError {
fn from(err: std::io::Error) -> Self { fn from(err: std::io::Error) -> Self {
Self { Self {
command: None, request: None,
kind: err.into(), kind: err.into(),
} }
} }
@ -115,7 +116,7 @@ impl From<std::io::Error> for ClientError {
impl From<reqwest::Error> for ClientError { impl From<reqwest::Error> for ClientError {
fn from(err: reqwest::Error) -> Self { fn from(err: reqwest::Error) -> Self {
Self { Self {
command: None, request: None,
kind: err.into(), kind: err.into(),
} }
} }
@ -124,7 +125,7 @@ impl From<reqwest::Error> for ClientError {
impl From<rpc_request::RpcError> for ClientError { impl From<rpc_request::RpcError> for ClientError {
fn from(err: rpc_request::RpcError) -> Self { fn from(err: rpc_request::RpcError) -> Self {
Self { Self {
command: None, request: None,
kind: err.into(), kind: err.into(),
} }
} }
@ -133,7 +134,7 @@ impl From<rpc_request::RpcError> for ClientError {
impl From<serde_json::error::Error> for ClientError { impl From<serde_json::error::Error> for ClientError {
fn from(err: serde_json::error::Error) -> Self { fn from(err: serde_json::error::Error) -> Self {
Self { Self {
command: None, request: None,
kind: err.into(), kind: err.into(),
} }
} }
@ -142,7 +143,7 @@ impl From<serde_json::error::Error> for ClientError {
impl From<SignerError> for ClientError { impl From<SignerError> for ClientError {
fn from(err: SignerError) -> Self { fn from(err: SignerError) -> Self {
Self { Self {
command: None, request: None,
kind: err.into(), kind: err.into(),
} }
} }
@ -151,7 +152,7 @@ impl From<SignerError> for ClientError {
impl From<TransactionError> for ClientError { impl From<TransactionError> for ClientError {
fn from(err: TransactionError) -> Self { fn from(err: TransactionError) -> Self {
Self { Self {
command: None, request: None,
kind: err.into(), kind: err.into(),
} }
} }

View File

@ -3,7 +3,7 @@ use crate::{client_error::Result, rpc_request::RpcRequest};
pub(crate) trait GenericRpcClientRequest { pub(crate) trait GenericRpcClientRequest {
fn send( fn send(
&self, &self,
request: &RpcRequest, request: RpcRequest,
params: serde_json::Value, params: serde_json::Value,
retries: usize, retries: usize,
) -> Result<serde_json::Value>; ) -> Result<serde_json::Value>;

View File

@ -40,11 +40,11 @@ impl MockRpcClientRequest {
impl GenericRpcClientRequest for MockRpcClientRequest { impl GenericRpcClientRequest for MockRpcClientRequest {
fn send( fn send(
&self, &self,
request: &RpcRequest, request: RpcRequest,
params: serde_json::Value, params: serde_json::Value,
_retries: usize, _retries: usize,
) -> Result<serde_json::Value> { ) -> Result<serde_json::Value> {
if let Some(value) = self.mocks.write().unwrap().remove(request) { if let Some(value) = self.mocks.write().unwrap().remove(&request) {
return Ok(value); return Ok(value);
} }
if self.url == "fails" { if self.url == "fails" {

View File

@ -96,32 +96,25 @@ impl RpcClient {
pub fn send_transaction(&self, transaction: &Transaction) -> ClientResult<Signature> { pub fn send_transaction(&self, transaction: &Transaction) -> ClientResult<Signature> {
let serialized_encoded = bs58::encode(serialize(transaction).unwrap()).into_string(); let serialized_encoded = bs58::encode(serialize(transaction).unwrap()).into_string();
let response =
self.client
.send(&RpcRequest::SendTransaction, json!([serialized_encoded]), 5)?;
match response.as_str() { let signature_base58_str: String =
None => { self.send(RpcRequest::SendTransaction, json!([serialized_encoded]), 5)?;
Err(RpcError::ForUser("Received result of an unexpected type".to_string()).into())
} let signature = signature_base58_str
Some(signature_base58_str) => { .parse::<Signature>()
let signature = signature_base58_str.parse::<Signature>().map_err(|err| { .map_err(|err| Into::<ClientError>::into(RpcError::ParseError(err.to_string())))?;
Into::<ClientError>::into(RpcError::ParseError(err.to_string())) // A mismatching RPC response signature indicates an issue with the RPC node, and
})?; // should not be passed along to confirmation methods. The transaction may or may
// A mismatching RPC response signature indicates an issue with the RPC node, and // not have been submitted to the cluster, so callers should verify the success of
// should not be passed along to confirmation methods. The transaction may or may // the correct transaction signature independently.
// not have been submitted to the cluster, so callers should verify the success of if signature != transaction.signatures[0] {
// the correct transaction signature independently. Err(RpcError::RpcRequestError(format!(
if signature != transaction.signatures[0] { "RPC node returned mismatched signature {:?}, expected {:?}",
Err(RpcError::RpcRequestError(format!( signature, transaction.signatures[0]
"RPC node returned mismatched signature {:?}, expected {:?}", ))
signature, transaction.signatures[0] .into())
)) } else {
.into()) Ok(transaction.signatures[0])
} else {
Ok(transaction.signatures[0])
}
}
} }
} }
@ -137,11 +130,7 @@ impl RpcClient {
signatures: &[Signature], signatures: &[Signature],
) -> RpcResult<Vec<Option<TransactionStatus>>> { ) -> RpcResult<Vec<Option<TransactionStatus>>> {
let signatures: Vec<_> = signatures.iter().map(|s| s.to_string()).collect(); let signatures: Vec<_> = signatures.iter().map(|s| s.to_string()).collect();
let signature_status = self.send(RpcRequest::GetSignatureStatuses, json!([signatures]), 5)
self.client
.send(&RpcRequest::GetSignatureStatuses, json!([signatures]), 5)?;
Ok(serde_json::from_value(signature_status)
.map_err(|err| ClientError::new_with_command(err.into(), "GetSignatureStatuses"))?)
} }
pub fn get_signature_status_with_commitment( pub fn get_signature_status_with_commitment(
@ -149,14 +138,11 @@ impl RpcClient {
signature: &Signature, signature: &Signature,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> ClientResult<Option<transaction::Result<()>>> { ) -> ClientResult<Option<transaction::Result<()>>> {
let signature_status = self.client.send( let result: Response<Vec<Option<TransactionStatus>>> = self.send(
&RpcRequest::GetSignatureStatuses, RpcRequest::GetSignatureStatuses,
json!([[signature.to_string()]]), json!([[signature.to_string()]]),
5, 5,
)?; )?;
let result: Response<Vec<Option<TransactionStatus>>> =
serde_json::from_value(signature_status)
.map_err(|err| ClientError::new_with_command(err.into(), "GetSignatureStatuses"))?;
Ok(result.value[0] Ok(result.value[0]
.clone() .clone()
.filter(|result| result.satisfies_commitment(commitment_config)) .filter(|result| result.satisfies_commitment(commitment_config))
@ -169,16 +155,13 @@ impl RpcClient {
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
search_transaction_history: bool, search_transaction_history: bool,
) -> ClientResult<Option<transaction::Result<()>>> { ) -> ClientResult<Option<transaction::Result<()>>> {
let signature_status = self.client.send( let result: Response<Vec<Option<TransactionStatus>>> = self.send(
&RpcRequest::GetSignatureStatuses, RpcRequest::GetSignatureStatuses,
json!([[signature.to_string()], { json!([[signature.to_string()], {
"searchTransactionHistory": search_transaction_history "searchTransactionHistory": search_transaction_history
}]), }]),
5, 5,
)?; )?;
let result: Response<Vec<Option<TransactionStatus>>> =
serde_json::from_value(signature_status)
.map_err(|err| ClientError::new_with_command(err.into(), "GetSignatureStatuses"))?;
Ok(result.value[0] Ok(result.value[0]
.clone() .clone()
.filter(|result| result.satisfies_commitment(commitment_config)) .filter(|result| result.satisfies_commitment(commitment_config))
@ -193,13 +176,7 @@ impl RpcClient {
&self, &self,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> ClientResult<Slot> { ) -> ClientResult<Slot> {
let response = self self.send(RpcRequest::GetSlot, json!([commitment_config]), 0)
.client
.send(&RpcRequest::GetSlot, json!([commitment_config]), 0)
.map_err(|err| err.into_with_command("GetSlot"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetSlot"))
} }
pub fn total_supply(&self) -> ClientResult<u64> { pub fn total_supply(&self) -> ClientResult<u64> {
@ -210,13 +187,7 @@ impl RpcClient {
&self, &self,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> ClientResult<u64> { ) -> ClientResult<u64> {
let response = self self.send(RpcRequest::GetTotalSupply, json!([commitment_config]), 0)
.client
.send(&RpcRequest::GetTotalSupply, json!([commitment_config]), 0)
.map_err(|err| err.into_with_command("GetTotalSupply"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetTotalSupply"))
} }
pub fn get_vote_accounts(&self) -> ClientResult<RpcVoteAccountStatus> { pub fn get_vote_accounts(&self) -> ClientResult<RpcVoteAccountStatus> {
@ -227,23 +198,11 @@ impl RpcClient {
&self, &self,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> ClientResult<RpcVoteAccountStatus> { ) -> ClientResult<RpcVoteAccountStatus> {
let response = self self.send(RpcRequest::GetVoteAccounts, json!([commitment_config]), 0)
.client
.send(&RpcRequest::GetVoteAccounts, json!([commitment_config]), 0)
.map_err(|err| err.into_with_command("GetVoteAccounts"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetVoteAccounts"))
} }
pub fn get_cluster_nodes(&self) -> ClientResult<Vec<RpcContactInfo>> { pub fn get_cluster_nodes(&self) -> ClientResult<Vec<RpcContactInfo>> {
let response = self self.send(RpcRequest::GetClusterNodes, Value::Null, 0)
.client
.send(&RpcRequest::GetClusterNodes, Value::Null, 0)
.map_err(|err| err.into_with_command("GetClusterNodes"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetClusterNodes"))
} }
pub fn get_confirmed_block(&self, slot: Slot) -> ClientResult<ConfirmedBlock> { pub fn get_confirmed_block(&self, slot: Slot) -> ClientResult<ConfirmedBlock> {
@ -255,13 +214,7 @@ impl RpcClient {
slot: Slot, slot: Slot,
encoding: TransactionEncoding, encoding: TransactionEncoding,
) -> ClientResult<ConfirmedBlock> { ) -> ClientResult<ConfirmedBlock> {
let response = self self.send(RpcRequest::GetConfirmedBlock, json!([slot, encoding]), 0)
.client
.send(&RpcRequest::GetConfirmedBlock, json!([slot, encoding]), 0)
.map_err(|err| err.into_with_command("GetConfirmedBlock"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetConfirmedBlock"))
} }
pub fn get_confirmed_blocks( pub fn get_confirmed_blocks(
@ -269,17 +222,11 @@ impl RpcClient {
start_slot: Slot, start_slot: Slot,
end_slot: Option<Slot>, end_slot: Option<Slot>,
) -> ClientResult<Vec<Slot>> { ) -> ClientResult<Vec<Slot>> {
let response = self self.send(
.client RpcRequest::GetConfirmedBlocks,
.send( json!([start_slot, end_slot]),
&RpcRequest::GetConfirmedBlocks, 0,
json!([start_slot, end_slot]), )
0,
)
.map_err(|err| err.into_with_command("GetConfirmedBlocks"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetConfirmedBlocks"))
} }
pub fn get_confirmed_signatures_for_address( pub fn get_confirmed_signatures_for_address(
@ -288,19 +235,11 @@ impl RpcClient {
start_slot: Slot, start_slot: Slot,
end_slot: Slot, end_slot: Slot,
) -> ClientResult<Vec<Signature>> { ) -> ClientResult<Vec<Signature>> {
let response = self let signatures_base58_str: Vec<String> = self.send(
.client RpcRequest::GetConfirmedSignaturesForAddress,
.send( json!([address.to_string(), start_slot, end_slot]),
&RpcRequest::GetConfirmedSignaturesForAddress, 0,
json!([address.to_string(), start_slot, end_slot]), )?;
0,
)
.map_err(|err| err.into_with_command("GetConfirmedSignaturesForAddress"))?;
let signatures_base58_str: Vec<String> =
serde_json::from_value(response).map_err(|err| {
ClientError::new_with_command(err.into(), "GetConfirmedSignaturesForAddress")
})?;
let mut signatures = vec![]; let mut signatures = vec![];
for signature_base58_str in signatures_base58_str { for signature_base58_str in signatures_base58_str {
@ -318,23 +257,16 @@ impl RpcClient {
signature: &Signature, signature: &Signature,
encoding: TransactionEncoding, encoding: TransactionEncoding,
) -> ClientResult<ConfirmedTransaction> { ) -> ClientResult<ConfirmedTransaction> {
let response = self self.send(
.client RpcRequest::GetConfirmedTransaction,
.send( json!([signature.to_string(), encoding]),
&RpcRequest::GetConfirmedTransaction, 0,
json!([signature.to_string(), encoding]), )
0,
)
.map_err(|err| err.into_with_command("GetConfirmedTransaction"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetConfirmedTransaction"))
} }
pub fn get_block_time(&self, slot: Slot) -> ClientResult<UnixTimestamp> { pub fn get_block_time(&self, slot: Slot) -> ClientResult<UnixTimestamp> {
let response = self let request = RpcRequest::GetBlockTime;
.client let response = self.client.send(request, json!([slot]), 0);
.send(&RpcRequest::GetBlockTime, json!([slot]), 0);
response response
.map(|result_json| { .map(|result_json| {
@ -342,11 +274,11 @@ impl RpcClient {
return Err(RpcError::ForUser(format!("Block Not Found: slot={}", slot)).into()); return Err(RpcError::ForUser(format!("Block Not Found: slot={}", slot)).into());
} }
let result = serde_json::from_value(result_json) let result = serde_json::from_value(result_json)
.map_err(|err| ClientError::new_with_command(err.into(), "GetBlockTime"))?; .map_err(|err| ClientError::new_with_request(err.into(), request))?;
trace!("Response block timestamp {:?} {:?}", slot, result); trace!("Response block timestamp {:?} {:?}", slot, result);
Ok(result) Ok(result)
}) })
.map_err(|err| err.into_with_command("GetBlockTime"))? .map_err(|err| err.into_with_request(request))?
} }
pub fn get_epoch_info(&self) -> ClientResult<RpcEpochInfo> { pub fn get_epoch_info(&self) -> ClientResult<RpcEpochInfo> {
@ -357,13 +289,7 @@ impl RpcClient {
&self, &self,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> ClientResult<RpcEpochInfo> { ) -> ClientResult<RpcEpochInfo> {
let response = self self.send(RpcRequest::GetEpochInfo, json!([commitment_config]), 0)
.client
.send(&RpcRequest::GetEpochInfo, json!([commitment_config]), 0)
.map_err(|err| err.into_with_command("GetEpochInfo"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetEpochInfo"))
} }
pub fn get_leader_schedule( pub fn get_leader_schedule(
@ -378,75 +304,38 @@ impl RpcClient {
slot: Option<Slot>, slot: Option<Slot>,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> ClientResult<Option<RpcLeaderSchedule>> { ) -> ClientResult<Option<RpcLeaderSchedule>> {
let response = self self.send(
.client RpcRequest::GetLeaderSchedule,
.send( json!([slot, commitment_config]),
&RpcRequest::GetLeaderSchedule, 0,
json!([slot, commitment_config]), )
0,
)
.map_err(|err| err.into_with_command("GetLeaderSchedule"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetLeaderSchedule"))
} }
pub fn get_epoch_schedule(&self) -> ClientResult<EpochSchedule> { pub fn get_epoch_schedule(&self) -> ClientResult<EpochSchedule> {
let response = self self.send(RpcRequest::GetEpochSchedule, Value::Null, 0)
.client
.send(&RpcRequest::GetEpochSchedule, Value::Null, 0)
.map_err(|err| err.into_with_command("GetEpochSchedule"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetEpochSchedule"))
} }
pub fn get_identity(&self) -> ClientResult<Pubkey> { pub fn get_identity(&self) -> ClientResult<Pubkey> {
let response = self let rpc_identity: RpcIdentity = self.send(RpcRequest::GetIdentity, Value::Null, 0)?;
.client
.send(&RpcRequest::GetIdentity, Value::Null, 0)
.map_err(|err| err.into_with_command("GetIdentity"))?;
serde_json::from_value(response) rpc_identity.identity.parse::<Pubkey>().map_err(|_| {
.map_err(|err| ClientError::new_with_command(err.into(), "GetIdentity")) ClientError::new_with_request(
.and_then(|rpc_identity: RpcIdentity| { RpcError::ParseError("Pubkey".to_string()).into(),
rpc_identity.identity.parse::<Pubkey>().map_err(|_| { RpcRequest::GetIdentity,
ClientError::new_with_command( )
RpcError::ParseError("Pubkey".to_string()).into(), })
"GetIdentity",
)
})
})
} }
pub fn get_inflation(&self) -> ClientResult<Inflation> { pub fn get_inflation(&self) -> ClientResult<Inflation> {
let response = self self.send(RpcRequest::GetInflation, Value::Null, 0)
.client
.send(&RpcRequest::GetInflation, Value::Null, 0)
.map_err(|err| err.into_with_command("GetInflation"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetInflation"))
} }
pub fn get_version(&self) -> ClientResult<RpcVersionInfo> { pub fn get_version(&self) -> ClientResult<RpcVersionInfo> {
let response = self self.send(RpcRequest::GetVersion, Value::Null, 0)
.client
.send(&RpcRequest::GetVersion, Value::Null, 0)
.map_err(|err| err.into_with_command("GetVersion"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetVersion"))
} }
pub fn minimum_ledger_slot(&self) -> ClientResult<Slot> { pub fn minimum_ledger_slot(&self) -> ClientResult<Slot> {
let response = self self.send(RpcRequest::MinimumLedgerSlot, Value::Null, 0)
.client
.send(&RpcRequest::MinimumLedgerSlot, Value::Null, 0)
.map_err(|err| err.into_with_command("MinimumLedgerSlot"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "MinimumLedgerSlot"))
} }
pub fn send_and_confirm_transaction( pub fn send_and_confirm_transaction(
@ -600,18 +489,15 @@ impl RpcClient {
pubkey: &Pubkey, pubkey: &Pubkey,
retries: usize, retries: usize,
) -> Result<Option<u64>, Box<dyn error::Error>> { ) -> Result<Option<u64>, Box<dyn error::Error>> {
let request = RpcRequest::GetBalance;
let balance_json = self let balance_json = self
.client .client
.send( .send(request, json!([pubkey.to_string()]), retries)
&RpcRequest::GetBalance, .map_err(|err| err.into_with_request(request))?;
json!([pubkey.to_string()]),
retries,
)
.map_err(|err| err.into_with_command("RetryGetBalance"))?;
Ok(Some( Ok(Some(
serde_json::from_value::<Response<u64>>(balance_json) serde_json::from_value::<Response<u64>>(balance_json)
.map_err(|err| ClientError::new_with_command(err.into(), "RetryGetBalance"))? .map_err(|err| ClientError::new_with_request(err.into(), request))?
.value, .value,
)) ))
} }
@ -628,7 +514,7 @@ impl RpcClient {
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> RpcResult<Option<Account>> { ) -> RpcResult<Option<Account>> {
let response = self.client.send( let response = self.client.send(
&RpcRequest::GetAccountInfo, RpcRequest::GetAccountInfo,
json!([pubkey.to_string(), commitment_config]), json!([pubkey.to_string(), commitment_config]),
0, 0,
); );
@ -664,18 +550,14 @@ impl RpcClient {
} }
pub fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> ClientResult<u64> { pub fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> ClientResult<u64> {
let request = RpcRequest::GetMinimumBalanceForRentExemption;
let minimum_balance_json = self let minimum_balance_json = self
.client .client
.send( .send(request, json!([data_len]), 0)
&RpcRequest::GetMinimumBalanceForRentExemption, .map_err(|err| err.into_with_request(request))?;
json!([data_len]),
0,
)
.map_err(|err| err.into_with_command("GetMinimumBalanceForRentExemption"))?;
let minimum_balance: u64 = serde_json::from_value(minimum_balance_json).map_err(|err| { let minimum_balance: u64 = serde_json::from_value(minimum_balance_json)
ClientError::new_with_command(err.into(), "GetMinimumBalanceForRentExemption") .map_err(|err| ClientError::new_with_request(err.into(), request))?;
})?;
trace!( trace!(
"Response minimum balance {:?} {:?}", "Response minimum balance {:?} {:?}",
data_len, data_len,
@ -696,39 +578,25 @@ impl RpcClient {
pubkey: &Pubkey, pubkey: &Pubkey,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> RpcResult<u64> { ) -> RpcResult<u64> {
let balance_json = self self.send(
.client RpcRequest::GetBalance,
.send( json!([pubkey.to_string(), commitment_config]),
&RpcRequest::GetBalance, 0,
json!([pubkey.to_string(), commitment_config]), )
0,
)
.map_err(|err| err.into_with_command("GetBalance"))?;
serde_json::from_value::<Response<u64>>(balance_json)
.map_err(|err| ClientError::new_with_command(err.into(), "GetBalance"))
} }
pub fn get_program_accounts(&self, pubkey: &Pubkey) -> ClientResult<Vec<(Pubkey, Account)>> { pub fn get_program_accounts(&self, pubkey: &Pubkey) -> ClientResult<Vec<(Pubkey, Account)>> {
let response = self let accounts: Vec<RpcKeyedAccount> = self.send(
.client RpcRequest::GetProgramAccounts,
.send( json!([pubkey.to_string()]),
&RpcRequest::GetProgramAccounts, 0,
json!([pubkey.to_string()]), )?;
0,
)
.map_err(|err| err.into_with_command("GetProgramAccounts"))?;
let accounts: Vec<RpcKeyedAccount> =
serde_json::from_value::<Vec<RpcKeyedAccount>>(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetProgramAccounts"))?;
let mut pubkey_accounts: Vec<(Pubkey, Account)> = Vec::new(); let mut pubkey_accounts: Vec<(Pubkey, Account)> = Vec::new();
for RpcKeyedAccount { pubkey, account } in accounts.into_iter() { for RpcKeyedAccount { pubkey, account } in accounts.into_iter() {
let pubkey = pubkey.parse().map_err(|_| { let pubkey = pubkey.parse().map_err(|_| {
ClientError::new_with_command( ClientError::new_with_request(
RpcError::ParseError("Pubkey".to_string()).into(), RpcError::ParseError("Pubkey".to_string()).into(),
"GetProgramAccounts", RpcRequest::GetProgramAccounts,
) )
})?; })?;
pubkey_accounts.push((pubkey, account.decode().unwrap())); pubkey_accounts.push((pubkey, account.decode().unwrap()));
@ -745,17 +613,11 @@ impl RpcClient {
&self, &self,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> ClientResult<u64> { ) -> ClientResult<u64> {
let response = self self.send(
.client RpcRequest::GetTransactionCount,
.send( json!([commitment_config]),
&RpcRequest::GetTransactionCount, 0,
json!([commitment_config]), )
0,
)
.map_err(|err| err.into_with_command("GetTransactionCount"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetTransactionCount"))
} }
pub fn get_recent_blockhash(&self) -> ClientResult<(Hash, FeeCalculator)> { pub fn get_recent_blockhash(&self) -> ClientResult<(Hash, FeeCalculator)> {
@ -768,15 +630,6 @@ impl RpcClient {
&self, &self,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> RpcResult<(Hash, FeeCalculator)> { ) -> RpcResult<(Hash, FeeCalculator)> {
let response = self
.client
.send(
&RpcRequest::GetRecentBlockhash,
json!([commitment_config]),
0,
)
.map_err(|err| err.into_with_command("GetRecentBlockhash"))?;
let Response { let Response {
context, context,
value: value:
@ -784,12 +637,16 @@ impl RpcClient {
blockhash, blockhash,
fee_calculator, fee_calculator,
}, },
} = serde_json::from_value::<Response<RpcBlockhashFeeCalculator>>(response) } = self.send::<Response<RpcBlockhashFeeCalculator>>(
.map_err(|err| ClientError::new_with_command(err.into(), "GetRecentBlockhash"))?; RpcRequest::GetRecentBlockhash,
json!([commitment_config]),
0,
)?;
let blockhash = blockhash.parse().map_err(|_| { let blockhash = blockhash.parse().map_err(|_| {
ClientError::new_with_command( ClientError::new_with_request(
RpcError::ParseError("Hash".to_string()).into(), RpcError::ParseError("Hash".to_string()).into(),
"GetRecentBlockhash", RpcRequest::GetRecentBlockhash,
) )
})?; })?;
Ok(Response { Ok(Response {
@ -802,31 +659,25 @@ impl RpcClient {
&self, &self,
blockhash: &Hash, blockhash: &Hash,
) -> ClientResult<Option<FeeCalculator>> { ) -> ClientResult<Option<FeeCalculator>> {
let response = self let Response { value, .. } = self.send::<Response<Option<RpcFeeCalculator>>>(
.client RpcRequest::GetFeeCalculatorForBlockhash,
.send( json!([blockhash.to_string()]),
&RpcRequest::GetFeeCalculatorForBlockhash, 0,
json!([blockhash.to_string()]), )?;
0,
)
.map_err(|err| err.into_with_command("GetFeeCalculatorForBlockhash"))?;
let Response { value, .. } = serde_json::from_value::<Response<Option<RpcFeeCalculator>>>(
response,
)
.map_err(|e| ClientError::new_with_command(e.into(), "GetFeeCalculatorForBlockhash"))?;
Ok(value.map(|rf| rf.fee_calculator)) Ok(value.map(|rf| rf.fee_calculator))
} }
pub fn get_fee_rate_governor(&self) -> RpcResult<FeeRateGovernor> { pub fn get_fee_rate_governor(&self) -> RpcResult<FeeRateGovernor> {
let response = self
.client
.send(&RpcRequest::GetFeeRateGovernor, Value::Null, 0)
.map_err(|err| err.into_with_command("GetFeeRateGovernor"))?;
let Response { let Response {
context, context,
value: RpcFeeRateGovernor { fee_rate_governor }, value: RpcFeeRateGovernor { fee_rate_governor },
} = serde_json::from_value::<Response<RpcFeeRateGovernor>>(response) } = self.send::<Response<RpcFeeRateGovernor>>(
.map_err(|e| ClientError::new_with_command(e.into(), "GetFeeRateGovernor"))?; RpcRequest::GetFeeRateGovernor,
Value::Null,
0,
)?;
Ok(Response { Ok(Response {
context, context,
value: fee_rate_governor, value: fee_rate_governor,
@ -860,18 +711,11 @@ impl RpcClient {
} }
pub fn get_genesis_hash(&self) -> ClientResult<Hash> { pub fn get_genesis_hash(&self) -> ClientResult<Hash> {
let response = self let hash_str: String = self.send(RpcRequest::GetGenesisHash, Value::Null, 0)?;
.client let hash = hash_str.parse().map_err(|_| {
.send(&RpcRequest::GetGenesisHash, Value::Null, 0) ClientError::new_with_request(
.map_err(|err| err.into_with_command("GetGenesisHash"))?;
let hash = serde_json::from_value::<String>(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetGenesisHash"))?;
let hash = hash.parse().map_err(|_| {
ClientError::new_with_command(
RpcError::ParseError("Hash".to_string()).into(), RpcError::ParseError("Hash".to_string()).into(),
"GetGenesisHash", RpcRequest::GetGenesisHash,
) )
})?; })?;
Ok(hash) Ok(hash)
@ -927,7 +771,7 @@ impl RpcClient {
return balance_result.ok(); return balance_result.ok();
} }
trace!( trace!(
"retry_get_balance[{}] {:?} {:?}", "wait_for_balance_with_commitment [{}] {:?} {:?}",
run, run,
balance_result, balance_result,
expected_balance expected_balance
@ -1060,23 +904,18 @@ impl RpcClient {
&self, &self,
signature: &Signature, signature: &Signature,
) -> ClientResult<usize> { ) -> ClientResult<usize> {
let response = self let result: Response<Vec<Option<TransactionStatus>>> = self.send(
.client RpcRequest::GetSignatureStatuses,
.send( json!([[signature.to_string()]]),
&RpcRequest::GetSignatureStatuses, 5,
json!([[signature.to_string()]]), )?;
5,
)
.map_err(|err| err.into_with_command("GetSignatureStatuses"))?;
let result: Response<Vec<Option<TransactionStatus>>> = serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "GetSignatureStatuses"))?;
let confirmations = result.value[0] let confirmations = result.value[0]
.clone() .clone()
.ok_or_else(|| { .ok_or_else(|| {
ClientError::new_with_command( ClientError::new_with_request(
ClientErrorKind::Custom("signature not found".to_string()), ClientErrorKind::Custom("signature not found".to_string()),
"GetSignatureStatuses", RpcRequest::GetSignatureStatuses,
) )
})? })?
.confirmations .confirmations
@ -1180,17 +1019,20 @@ impl RpcClient {
} }
pub fn validator_exit(&self) -> ClientResult<bool> { pub fn validator_exit(&self) -> ClientResult<bool> {
let response = self self.send(RpcRequest::ValidatorExit, Value::Null, 0)
.client
.send(&RpcRequest::ValidatorExit, Value::Null, 0)
.map_err(|err| err.into_with_command("ValidatorExit"))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_command(err.into(), "ValidatorExit"))
} }
pub fn send(&self, request: &RpcRequest, params: Value, retries: usize) -> ClientResult<Value> { pub fn send<T>(&self, request: RpcRequest, params: Value, retries: usize) -> ClientResult<T>
where
T: serde::de::DeserializeOwned,
{
assert!(params.is_array() || params.is_null()); assert!(params.is_array() || params.is_null());
self.client.send(request, params, retries) let response = self
.client
.send(request, params, retries)
.map_err(|err| err.into_with_request(request))?;
serde_json::from_value(response)
.map_err(|err| ClientError::new_with_request(err.into(), request))
} }
} }
@ -1260,21 +1102,23 @@ mod tests {
let rpc_addr = receiver.recv().unwrap(); let rpc_addr = receiver.recv().unwrap();
let rpc_client = RpcClient::new_socket(rpc_addr); let rpc_client = RpcClient::new_socket(rpc_addr);
let balance = rpc_client.send( let balance: u64 = rpc_client
&RpcRequest::GetBalance, .send(
json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx"]), RpcRequest::GetBalance,
0, json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx"]),
); 0,
assert_eq!(balance.unwrap().as_u64().unwrap(), 50); )
.unwrap();
assert_eq!(balance, 50);
let blockhash = rpc_client.send(&RpcRequest::GetRecentBlockhash, Value::Null, 0); let blockhash: String = rpc_client
assert_eq!( .send(RpcRequest::GetRecentBlockhash, Value::Null, 0)
blockhash.unwrap().as_str().unwrap(), .unwrap();
"deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx" assert_eq!(blockhash, "deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx");
);
// Send erroneous parameter // Send erroneous parameter
let blockhash = rpc_client.send(&RpcRequest::GetRecentBlockhash, json!(["parameter"]), 0); let blockhash: ClientResult<String> =
rpc_client.send(RpcRequest::GetRecentBlockhash, json!(["parameter"]), 0);
assert_eq!(blockhash.is_err(), true); assert_eq!(blockhash.is_err(), true);
} }
@ -1308,12 +1152,14 @@ mod tests {
let rpc_addr = receiver.recv().unwrap(); let rpc_addr = receiver.recv().unwrap();
let rpc_client = RpcClient::new_socket(rpc_addr); let rpc_client = RpcClient::new_socket(rpc_addr);
let balance = rpc_client.send( let balance: u64 = rpc_client
&RpcRequest::GetBalance, .send(
json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhw"]), RpcRequest::GetBalance,
10, json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhw"]),
); 10,
assert_eq!(balance.unwrap().as_u64().unwrap(), 5); )
.unwrap();
assert_eq!(balance, 5);
} }
#[test] #[test]

View File

@ -31,7 +31,7 @@ impl RpcClientRequest {
impl GenericRpcClientRequest for RpcClientRequest { impl GenericRpcClientRequest for RpcClientRequest {
fn send( fn send(
&self, &self,
request: &RpcRequest, request: RpcRequest,
params: serde_json::Value, params: serde_json::Value,
mut retries: usize, mut retries: usize,
) -> Result<serde_json::Value> { ) -> Result<serde_json::Value> {

View File

@ -1,7 +1,8 @@
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::fmt;
use thiserror::Error; use thiserror::Error;
#[derive(Debug, PartialEq, Eq, Hash)] #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub enum RpcRequest { pub enum RpcRequest {
DeregisterNode, DeregisterNode,
ValidatorExit, ValidatorExit,
@ -42,12 +43,8 @@ pub enum RpcRequest {
MinimumLedgerSlot, MinimumLedgerSlot,
} }
pub const MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS: usize = 256; impl fmt::Display for RpcRequest {
pub const MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS_SLOT_RANGE: u64 = 10_000; fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
impl RpcRequest {
pub(crate) fn build_request_json(&self, id: u64, params: Value) -> Value {
let jsonrpc = "2.0";
let method = match self { let method = match self {
RpcRequest::DeregisterNode => "deregisterNode", RpcRequest::DeregisterNode => "deregisterNode",
RpcRequest::ValidatorExit => "validatorExit", RpcRequest::ValidatorExit => "validatorExit",
@ -87,10 +84,21 @@ impl RpcRequest {
RpcRequest::GetMinimumBalanceForRentExemption => "getMinimumBalanceForRentExemption", RpcRequest::GetMinimumBalanceForRentExemption => "getMinimumBalanceForRentExemption",
RpcRequest::MinimumLedgerSlot => "minimumLedgerSlot", RpcRequest::MinimumLedgerSlot => "minimumLedgerSlot",
}; };
write!(f, "{}", method)
}
}
pub const MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS: usize = 256;
pub const MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS_SLOT_RANGE: u64 = 10_000;
impl RpcRequest {
pub(crate) fn build_request_json(self, id: u64, params: Value) -> Value {
let jsonrpc = "2.0";
json!({ json!({
"jsonrpc": jsonrpc, "jsonrpc": jsonrpc,
"id": id, "id": id,
"method": method, "method": format!("{}", self),
"params": params, "params": params,
}) })
} }