initial work

This commit is contained in:
Alfredo Garcia 2024-11-01 18:44:39 -03:00
parent 46c6b6eb38
commit 9709059943
11 changed files with 980 additions and 111 deletions

View File

@ -1155,6 +1155,12 @@ dependencies = [
"syn 2.0.79",
]
[[package]]
name = "data-encoding"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2"
[[package]]
name = "debugid"
version = "0.8.0"
@ -1770,6 +1776,30 @@ dependencies = [
"num-traits",
]
[[package]]
name = "headers"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270"
dependencies = [
"base64 0.21.7",
"bytes",
"headers-core",
"http 0.2.12",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http 0.2.12",
]
[[package]]
name = "heck"
version = "0.3.3"
@ -1971,9 +2001,9 @@ dependencies = [
[[package]]
name = "hyper"
version = "1.4.1"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a"
dependencies = [
"bytes",
"futures-channel",
@ -2010,7 +2040,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper 1.4.1",
"hyper 1.5.0",
"hyper-util",
"pin-project-lite",
"tokio",
@ -2028,7 +2058,7 @@ dependencies = [
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"hyper 1.4.1",
"hyper 1.5.0",
"pin-project-lite",
"socket2",
"tokio",
@ -2571,7 +2601,7 @@ checksum = "b4f0c8427b39666bf970460908b213ec09b3b350f20c0c2eabcbba51704a08e6"
dependencies = [
"base64 0.22.1",
"http-body-util",
"hyper 1.4.1",
"hyper 1.5.0",
"hyper-util",
"indexmap 2.5.0",
"ipnet",
@ -2604,6 +2634,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -2637,6 +2677,24 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26c4d16a3d2b0e89ec6e7d509cf791545fcb48cbc8fc2fb2e96a492defda9140"
[[package]]
name = "multer"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http 0.2.12",
"httparse",
"log",
"memchr",
"mime",
"spin",
"version_check",
]
[[package]]
name = "multimap"
version = "0.10.0"
@ -3824,6 +3882,12 @@ dependencies = [
"zip32",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -4109,6 +4173,17 @@ dependencies = [
"version_check",
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if 1.0.0",
"cpufeatures",
"digest",
]
[[package]]
name = "sha2"
version = "0.10.8"
@ -4573,6 +4648,18 @@ dependencies = [
"tokio-stream",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.6.10"
@ -4669,7 +4756,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.5.0",
"hyper-timeout",
"hyper-util",
"percent-encoding",
@ -4947,6 +5034,25 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.1.0",
"httparse",
"log",
"rand 0.8.5",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.17.0"
@ -5089,6 +5195,12 @@ dependencies = [
"serde",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "utf8parse"
version = "0.2.2"
@ -5239,6 +5351,35 @@ dependencies = [
"try-lock",
]
[[package]]
name = "warp"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"headers",
"http 0.2.12",
"hyper 0.14.30",
"log",
"mime",
"mime_guess",
"multer",
"percent-encoding",
"pin-project",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-tungstenite",
"tokio-util 0.7.12",
"tower-service",
"tracing",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
@ -6094,6 +6235,7 @@ dependencies = [
"tonic-reflection",
"tower",
"tracing",
"warp",
"zcash_address 0.5.0",
"zcash_primitives 0.17.0",
"zebra-chain",
@ -6285,7 +6427,7 @@ dependencies = [
"howudoin",
"http-body-util",
"humantime-serde",
"hyper 1.4.1",
"hyper 1.5.0",
"hyper-util",
"indexmap 2.5.0",
"indicatif",

View File

@ -22,10 +22,6 @@ categories = [
[features]
indexer-rpcs = [
"tonic-build",
"tonic",
"tonic-reflection",
"prost",
"tokio-stream",
]
@ -82,11 +78,12 @@ tokio = { version = "1.40.0", features = [
] }
tower = "0.4.13"
# indexer-rpcs dependencies
tonic = { version = "0.12.3", optional = true }
tonic-reflection = { version = "0.12.3", optional = true }
prost = { version = "0.13.3", optional = true }
# indexer-rpcs and endpoint-rpcs dependencies
tonic = "0.12.3"
tonic-reflection = "0.12.3"
prost = "0.13.3"
tokio-stream = { version = "0.1.16", optional = true }
warp = "0.3.7"
tracing = "0.1.39"
@ -116,7 +113,7 @@ zebra-script = { path = "../zebra-script", version = "1.0.0-beta.40" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.40" }
[build-dependencies]
tonic-build = { version = "0.12.3", optional = true }
tonic-build = "0.12.3"
[dev-dependencies]
insta = { version = "1.40.0", features = ["redactions", "json", "ron"] }

View File

@ -1,9 +1,16 @@
//! Compile proto files
fn main() -> Result<(), Box<dyn std::error::Error>> {
use std::{env, path::PathBuf};
let out_dir = env::var("OUT_DIR").map(PathBuf::from);
tonic_build::configure()
.type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]")
.file_descriptor_set_path(out_dir.unwrap().join("endpoint_descriptor.bin"))
.compile_protos(&["proto/endpoint.proto"], &[""])?;
#[cfg(feature = "indexer-rpcs")]
{
use std::{env, path::PathBuf};
let out_dir = env::var("OUT_DIR").map(PathBuf::from);
tonic_build::configure()
.type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]")

View File

@ -0,0 +1,99 @@
syntax = "proto3";
package zebra.endpoint.rpc;
// Used by methods that take no arguments.
message Empty {};
service Endpoint {
//
rpc get_info(Empty) returns (GetInfo);
//
rpc get_blockchain_info(Empty) returns (GetBlockChainInfo);
//
rpc get_address_balance(AddressStrings) returns (AddressBalance);
//
rpc send_raw_transaction(RawTransactionHex) returns (SentTransactionHash);
}
// A response to a GetInfo call.
message GetInfo {
// The node version build number
string build = 1;
// The server sub-version identifier, used as the network protocol user-agent
string subversion = 2;
}
// Define a GetBlockChainInfo message for the response
message GetBlockChainInfo {
// Current network name as defined in BIP70 (main, test, regtest)
string chain = 1;
// The current number of blocks processed in the server, numeric
uint32 blocks = 2;
// The hash of the currently best block, in big-endian order, hex-encoded
string best_block_hash = 3;
// If syncing, the estimated height of the chain, else the current best height, numeric.
//
// In Zebra, this is always the height estimate, so it might be a little inaccurate.
uint32 estimated_height = 4;
// Value pool balances
repeated ValuePoolBalance value_pools = 5;
// Status of network upgrades
repeated UpgradeEntry upgrades = 6;
// Branch IDs of the current and upcoming consensus rules
TipConsensusBranch consensus = 7;
}
// Define a ValuePoolBalance message, replacing your array with repeated fields if needed
message ValuePoolBalance {
string id = 1;
double chain_value = 2;
uint64 chain_value_zat = 3;
}
//
message UpgradeEntry {
string key = 1;
NetworkUpgradeInfo value = 2;
}
// Define a NetworkUpgradeInfo message to represent upgrade statuses
message NetworkUpgradeInfo {
string name = 1;
uint32 activation_height = 2;
string status = 3;
}
// Define the TipConsensusBranch message to represent consensus branches
message TipConsensusBranch {
string chain_tip = 1;
string next_block = 2;
}
//
message AddressStrings {
repeated string addresses = 1;
}
//
message AddressBalance {
uint64 balance = 1;
}
//
message RawTransactionHex {
string hex = 1;
}
//
message SentTransactionHash {
string hash = 1;
}

View File

@ -15,6 +15,7 @@ use indexmap::IndexMap;
use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use tokio::{sync::broadcast, task::JoinHandle};
use tonic::{Response, Status};
use tower::{Service, ServiceExt};
use tracing::Instrument;
@ -1547,6 +1548,12 @@ enum NetworkUpgradeStatus {
Pending,
}
impl std::fmt::Display for NetworkUpgradeStatus {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
/// The [`ConsensusBranchId`]s for the tip and the next block.
///
/// These branch IDs are different when the next block is a network upgrade activation block.
@ -1883,3 +1890,375 @@ pub fn height_from_signed_int(index: i32, tip_height: Height) -> Result<Height>
Ok(Height(sanitized_height))
}
}
/// gRPC method implementations.
#[derive(Clone)]
pub struct GrpcImpl<Mempool, State, Tip>
where
Mempool: Service<
mempool::Request,
Response = mempool::Response,
Error = zebra_node_services::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Mempool::Future: Send,
State: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
State::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
// Configuration
//
/// Zebra's application version, with build metadata.
build_version: String,
/// Zebra's RPC user agent.
user_agent: String,
/// The configured network for this RPC service.
network: Network,
/// Test-only option that makes Zebra say it is at the chain tip,
/// no matter what the estimated height or local clock is.
debug_force_finished_sync: bool,
/// Test-only option that makes RPC responses more like `zcashd`.
#[allow(dead_code)]
debug_like_zcashd: bool,
// Services
//
/// A handle to the mempool service.
mempool: Mempool,
/// A handle to the state service.
state: State,
/// Allows efficient access to the best tip of the blockchain.
latest_chain_tip: Tip,
// Tasks
//
/// A sender component of a channel used to send transactions to the mempool queue.
queue_sender: broadcast::Sender<UnminedTx>,
}
impl<Mempool, State, Tip> GrpcImpl<Mempool, State, Tip>
where
Mempool: Service<
mempool::Request,
Response = mempool::Response,
Error = zebra_node_services::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Mempool::Future: Send,
State: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
State::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
/// Create a new instance of the RPC handler.
//
// TODO:
// - put some of the configs or services in their own struct?
#[allow(clippy::too_many_arguments)]
pub fn new<VersionString, UserAgentString>(
build_version: VersionString,
user_agent: UserAgentString,
network: Network,
debug_force_finished_sync: bool,
debug_like_zcashd: bool,
mempool: Mempool,
state: State,
latest_chain_tip: Tip,
) -> (Self, JoinHandle<()>)
where
VersionString: ToString + Clone + Send + 'static,
UserAgentString: ToString + Clone + Send + 'static,
{
let (runner, queue_sender) = Queue::start();
let mut build_version = build_version.to_string();
let user_agent = user_agent.to_string();
// Match zcashd's version format, if the version string has anything in it
if !build_version.is_empty() && !build_version.starts_with('v') {
build_version.insert(0, 'v');
}
let rpc_impl = GrpcImpl {
build_version,
user_agent,
network: network.clone(),
debug_force_finished_sync,
debug_like_zcashd,
mempool: mempool.clone(),
state: state.clone(),
latest_chain_tip: latest_chain_tip.clone(),
queue_sender,
};
// run the process queue
let rpc_tx_queue_task_handle = tokio::spawn(
runner
.run(mempool, state, latest_chain_tip, network)
.in_current_span(),
);
(rpc_impl, rpc_tx_queue_task_handle)
}
}
#[tonic::async_trait]
impl<Mempool, State, Tip> crate::server::endpoint_server::Endpoint for GrpcImpl<Mempool, State, Tip>
where
Mempool: Service<
mempool::Request,
Response = mempool::Response,
Error = zebra_node_services::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Mempool::Future: Send,
State: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
State::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
async fn get_info(
&self,
_: tonic::Request<crate::server::Empty>,
) -> std::result::Result<Response<crate::server::GetInfo>, Status> {
let response = crate::server::GetInfo {
build: self.build_version.clone(),
subversion: self.user_agent.clone(),
};
Ok(Response::new(response))
}
async fn get_blockchain_info(
&self,
_: tonic::Request<crate::server::Empty>,
) -> std::result::Result<Response<crate::server::GetBlockChainInfo>, Status> {
let network = self.network.clone();
let debug_force_finished_sync = self.debug_force_finished_sync;
let mut state = self.state.clone();
// `chain` field
let chain = network.bip70_network_name();
let request = zebra_state::ReadRequest::TipPoolValues;
let response: zebra_state::ReadResponse = state
.ready()
.and_then(|service| service.call(request))
.await
.map_server_error()
.unwrap();
let zebra_state::ReadResponse::TipPoolValues {
tip_height,
tip_hash,
value_balance,
} = response
else {
unreachable!("unmatched response to a TipPoolValues request")
};
let request = zebra_state::ReadRequest::BlockHeader(tip_hash.into());
let response: zebra_state::ReadResponse = state
.ready()
.and_then(|service| service.call(request))
.await
.map_server_error()
.unwrap();
let zebra_state::ReadResponse::BlockHeader(block_header) = response else {
unreachable!("unmatched response to a BlockHeader request")
};
let tip_block_time = block_header
.ok_or_server_error("unexpectedly could not read best chain tip block header")
.unwrap()
.time;
let now = Utc::now();
let zebra_estimated_height =
NetworkChainTipHeightEstimator::new(tip_block_time, tip_height, &network)
.estimate_height_at(now);
// If we're testing the mempool, force the estimated height to be the actual tip height, otherwise,
// check if the estimated height is below Zebra's latest tip height, or if the latest tip's block time is
// later than the current time on the local clock.
let estimated_height = if tip_block_time > now
|| zebra_estimated_height < tip_height
|| debug_force_finished_sync
{
tip_height
} else {
zebra_estimated_height
};
// `upgrades` object
//
// Get the network upgrades in height order, like `zcashd`.
let mut upgrades = IndexMap::new();
for (activation_height, network_upgrade) in network.full_activation_list() {
// Zebra defines network upgrades based on incompatible consensus rule changes,
// but zcashd defines them based on ZIPs.
//
// All the network upgrades with a consensus branch ID are the same in Zebra and zcashd.
if let Some(branch_id) = network_upgrade.branch_id() {
// zcashd's RPC seems to ignore Disabled network upgrades, so Zebra does too.
let status = if tip_height >= activation_height {
NetworkUpgradeStatus::Active
} else {
NetworkUpgradeStatus::Pending
};
let upgrade = NetworkUpgradeInfo {
name: network_upgrade,
activation_height,
status,
};
upgrades.insert(ConsensusBranchIdHex(branch_id), upgrade);
}
}
// `consensus` object
let next_block_height =
(tip_height + 1).expect("valid chain tips are a lot less than Height::MAX");
let consensus = TipConsensusBranch {
chain_tip: ConsensusBranchIdHex(
NetworkUpgrade::current(&network, tip_height)
.branch_id()
.unwrap_or(ConsensusBranchId::RPC_MISSING_ID),
),
next_block: ConsensusBranchIdHex(
NetworkUpgrade::current(&network, next_block_height)
.branch_id()
.unwrap_or(ConsensusBranchId::RPC_MISSING_ID),
),
};
let response = GetBlockChainInfo {
chain,
blocks: tip_height,
best_block_hash: tip_hash,
estimated_height: estimated_height,
value_pools: types::ValuePoolBalance::from_value_balance(value_balance),
upgrades,
consensus,
}
.into();
Ok(Response::new(response))
}
async fn get_address_balance(
&self,
address_strings: tonic::Request<crate::server::AddressStrings>,
) -> std::result::Result<Response<crate::server::AddressBalance>, Status> {
let state = self.state.clone();
// TODO: fix all the unwraps here and in the above calls, use status::from_error as `send_raw_transaction`.
let converted_addresses: AddressStrings = address_strings.into_inner().into();
let valid_addresses: HashSet<Address> = converted_addresses.valid_addresses().unwrap();
let request = zebra_state::ReadRequest::AddressBalance(valid_addresses);
let response = state.oneshot(request).await.map_server_error().unwrap();
let res = match response {
zebra_state::ReadResponse::AddressBalance(balance) => AddressBalance {
balance: u64::from(balance),
},
_ => unreachable!("Unexpected response from state service: {response:?}"),
}
.into();
Ok(Response::new(res))
}
async fn send_raw_transaction(
&self,
raw_transaction_hex: tonic::Request<crate::server::RawTransactionHex>,
) -> std::result::Result<Response<crate::server::SentTransactionHash>, Status> {
let mempool = self.mempool.clone();
let queue_sender = self.queue_sender.clone();
let raw_transaction_hex = raw_transaction_hex.into_inner().hex;
let raw_transaction_bytes = Vec::from_hex(raw_transaction_hex).map_err(|_| {
Status::invalid_argument("raw transaction is not specified as a hex string")
})?;
let raw_transaction = Transaction::zcash_deserialize(&*raw_transaction_bytes)
.map_err(|_| Status::invalid_argument("raw transaction is structurally invalid"))?;
let transaction_hash = raw_transaction.hash();
// send transaction to the rpc queue, ignore any error.
let unmined_transaction = UnminedTx::from(raw_transaction.clone());
let _ = queue_sender.send(unmined_transaction);
let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into());
let request = mempool::Request::Queue(vec![transaction_parameter]);
let response = mempool
.oneshot(request)
.await
.map_err(|e| Status::from_error(e))?;
let mut queue_results = match response {
mempool::Response::Queued(results) => results,
_ => unreachable!("incorrect response variant from mempool service"),
};
assert_eq!(
queue_results.len(),
1,
"mempool service returned more results than expected"
);
let queue_result = queue_results
.pop()
.expect("there should be exactly one item in Vec")
.inspect_err(|err| tracing::debug!("sent transaction to mempool: {:?}", &err))
.map_err(|e| Status::from_error(e))?
.await;
tracing::debug!("sent transaction to mempool: {:?}", &queue_result);
let res = queue_result
.map_err(|e| Status::from_error(Box::new(e)))
.map(|_| SentTransactionHash(transaction_hash))
.map_server_error();
Ok(Response::new(
res.map_err(|e| Status::from_error(Box::new(e)))?.into(),
))
}
}

View File

@ -1,6 +1,7 @@
//! Types used in RPC methods.
mod get_blockchain_info;
pub mod grpc;
mod zec;
pub use get_blockchain_info::ValuePoolBalance;

View File

@ -69,4 +69,9 @@ impl ValuePoolBalance {
Self::deferred(value_balance.deferred_amount()),
]
}
/// Returns the pool's name, ZEC balance, and zatoshi balance.
pub fn data(&self) -> (&str, Zec<NonNegative>, Amount<NonNegative>) {
(&self.id, self.chain_value, self.chain_value_zat)
}
}

View File

@ -0,0 +1,66 @@
//! gRPC types and conversions for Zebra RPC methods.
impl From<crate::methods::GetBlockChainInfo> for crate::server::GetBlockChainInfo {
fn from(info: crate::methods::GetBlockChainInfo) -> Self {
let value_pools: Vec<crate::server::ValuePoolBalance> = info
.value_pools
.iter()
.map(|pool| crate::server::ValuePoolBalance {
id: pool.data().0.to_string(),
chain_value: pool.data().1.lossy_zec(),
chain_value_zat: pool.data().2.into(),
})
.collect();
let upgrades: Vec<crate::server::UpgradeEntry> = info
.upgrades
.into_iter()
.map(|(key, upgrade_info)| crate::server::UpgradeEntry {
key: key.0.to_string(),
value: Some(crate::server::NetworkUpgradeInfo {
name: upgrade_info.name.to_string(),
status: upgrade_info.status.to_string(),
activation_height: upgrade_info.activation_height.0,
}),
})
.collect();
let consensus = crate::server::TipConsensusBranch {
chain_tip: info.consensus.chain_tip.0.to_string(),
next_block: info.consensus.next_block.0.to_string(),
};
crate::server::GetBlockChainInfo {
chain: info.chain,
blocks: info.blocks.0,
best_block_hash: hex::encode(info.best_block_hash.0),
estimated_height: info.estimated_height.0,
value_pools,
upgrades,
consensus: Some(consensus),
}
}
}
impl From<crate::server::AddressStrings> for crate::methods::AddressStrings {
fn from(addresses: crate::server::AddressStrings) -> Self {
crate::methods::AddressStrings {
addresses: addresses.addresses,
}
}
}
impl From<crate::methods::AddressBalance> for crate::server::AddressBalance {
fn from(balance: crate::methods::AddressBalance) -> Self {
crate::server::AddressBalance {
balance: balance.balance,
}
}
}
impl From<crate::methods::SentTransactionHash> for crate::server::SentTransactionHash {
fn from(hash: crate::methods::SentTransactionHash) -> Self {
crate::server::SentTransactionHash {
hash: hash.0.to_string(),
}
}
}

View File

@ -17,6 +17,12 @@ use tokio::task::JoinHandle;
use tower::Service;
use tracing::*;
use tonic::{
transport::{Channel, Server, Uri},
Request,
};
use warp::Filter;
use zebra_chain::{
block, chain_sync_status::ChainSyncStatus, chain_tip::ChainTip, parameters::Network,
};
@ -25,8 +31,9 @@ use zebra_node_services::mempool;
use crate::{
config::Config,
methods::{Rpc, RpcImpl},
methods::{GrpcImpl, Rpc, RpcImpl},
server::{
endpoint_client::EndpointClient, endpoint_server::EndpointServer,
http_request_compatibility::HttpRequestMiddleware,
rpc_call_compatibility::FixRpcResponseMiddleware,
},
@ -37,11 +44,19 @@ use crate::methods::{GetBlockTemplateRpc, GetBlockTemplateRpcImpl};
pub mod cookie;
pub mod http_request_compatibility;
pub mod jsonrpc;
pub mod rpc_call_compatibility;
#[cfg(test)]
mod tests;
// The generated endpoint proto
tonic::include_proto!("zebra.endpoint.rpc");
/// The file descriptor set for the generated endpoint proto
pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("endpoint_descriptor");
/// Zebra RPC Server
#[derive(Clone)]
pub struct RpcServer {
@ -150,33 +165,22 @@ impl RpcServer {
AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
{
if let Some(listen_addr) = config.listen_addr {
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
.build_v1()
.expect("Unable to build reflection service");
info!("Trying to open RPC endpoint at {}...", listen_addr,);
// Create handler compatible with V1 and V2 RPC protocols
let mut io: MetaIoHandler<(), _> =
MetaIoHandler::new(Compatibility::Both, FixRpcResponseMiddleware);
// `grpc_server_listen_addr` should be a config argument
let grpc_server_listen_addr = "127.0.0.1:8080".parse().unwrap();
let grpc_server_url: Uri = format!("http://{}", grpc_server_listen_addr)
.parse()
.unwrap();
#[cfg(feature = "getblocktemplate-rpcs")]
{
// Initialize the getblocktemplate rpc method handler
let get_block_template_rpc_impl = GetBlockTemplateRpcImpl::new(
&network,
mining_config.clone(),
mempool.clone(),
state.clone(),
latest_chain_tip.clone(),
block_verifier_router,
sync_status,
address_book,
);
io.extend_with(get_block_template_rpc_impl.to_delegate());
}
// Initialize the rpc methods with the zebra version
let (rpc_impl, rpc_tx_queue_task_handle) = RpcImpl::new(
let (grpc, rpc_tx_queue_task_handle) = GrpcImpl::new(
build_version.clone(),
user_agent,
user_agent.clone(),
network.clone(),
config.debug_force_finished_sync,
#[cfg(feature = "getblocktemplate-rpcs")]
@ -188,88 +192,64 @@ impl RpcServer {
latest_chain_tip,
);
io.extend_with(rpc_impl.to_delegate());
// Start the gRPC server
let _ = tokio::spawn(async move {
let server_instance = Server::builder()
.accept_http1(true)
.add_service(reflection_service)
.add_service(EndpointServer::new(grpc))
.serve(grpc_server_listen_addr)
.await
.expect("Unable to start RPC server");
// If zero, automatically scale threads to the number of CPU cores
let mut parallel_cpu_threads = config.parallel_cpu_threads;
if parallel_cpu_threads == 0 {
parallel_cpu_threads = available_parallelism().map(usize::from).unwrap_or(1);
}
info!("Started gRPC server at {}", grpc_server_listen_addr);
server_instance
});
// The server is a blocking task, which blocks on executor shutdown.
// So we need to start it in a std::thread.
// (Otherwise tokio panics on RPC port conflict, which shuts down the RPC server.)
let span = Span::current();
let start_server = move || {
span.in_scope(|| {
let middleware = if config.enable_cookie_auth {
let cookie = Cookie::default();
cookie::write_to_disk(&cookie, &config.cookie_dir)
.expect("Zebra must be able to write the auth cookie to the disk");
HttpRequestMiddleware::default().with(cookie)
} else {
HttpRequestMiddleware::default()
};
// Use a different tokio executor from the rest of Zebra,
// so that large RPCs and any task handling bugs don't impact Zebra.
let server_instance = ServerBuilder::new(io)
.threads(parallel_cpu_threads)
// TODO: disable this security check if we see errors from lightwalletd
//.allowed_hosts(DomainsValidation::Disabled)
.request_middleware(middleware)
.start_http(&listen_addr)
.expect("Unable to start RPC server");
info!("{OPENED_RPC_ENDPOINT_MSG}{}", server_instance.address());
let close_handle = server_instance.close_handle();
let rpc_server_handle = RpcServer {
config,
network,
build_version: build_version.to_string(),
close_handle,
};
(server_instance, rpc_server_handle)
})
// Create the middleware for the HTTP request
let middleware = if config.enable_cookie_auth {
let cookie = Cookie::default();
cookie::write_to_disk(&cookie, &config.cookie_dir)
.expect("Zebra must be able to write the auth cookie to the disk");
HttpRequestMiddleware::default().with(cookie)
} else {
HttpRequestMiddleware::default()
};
// Propagate panics from the std::thread
let (server_instance, rpc_server_handle) = match std::thread::spawn(start_server).join()
{
Ok(rpc_server) => rpc_server,
Err(panic_object) => panic::resume_unwind(panic_object),
};
// Create the warp proxy server
let rpc_server_task_handle = tokio::spawn(async move {
let grpc_channel = Channel::builder(grpc_server_url)
.connect()
.await
.expect("Unable to connect to gRPC server");
// The server is a blocking task, which blocks on executor shutdown.
// So we need to wait on it on a std::thread, inside a tokio blocking task.
// (Otherwise tokio panics when we shut down the RPC server.)
let span = Span::current();
let wait_on_server = move || {
span.in_scope(|| {
server_instance.wait();
let grpc_client = EndpointClient::new(grpc_channel);
let middleware = std::sync::Arc::new(middleware);
info!("Stopped RPC endpoint");
})
};
let proxy = warp::post()
.and(warp::path::end())
.and(warp::body::json())
.and(warp::header::headers_cloned())
.and(with_grpc_client(grpc_client)) // Pass the Arc directly
.and_then(move |request, headers, grpc_client| {
let middleware = std::sync::Arc::clone(&middleware);
async move {
middleware
.handle_request(request, headers, grpc_client)
.await
}
});
let span = Span::current();
let rpc_server_task_handle = tokio::task::spawn_blocking(move || {
let thread_handle = std::thread::spawn(wait_on_server);
warp::serve(proxy).run(listen_addr).await;
// Propagate panics from the inner std::thread to the outer tokio blocking task
span.in_scope(|| match thread_handle.join() {
Ok(()) => (),
Err(panic_object) => panic::resume_unwind(panic_object),
})
info!("{OPENED_RPC_ENDPOINT_MSG}{}", listen_addr);
});
(
rpc_server_task_handle,
rpc_tx_queue_task_handle,
Some(rpc_server_handle),
//Some(rpc_server_handle),
None,
)
} else {
// There is no RPC port, so the RPC tasks do nothing.
@ -345,3 +325,10 @@ impl Drop for RpcServer {
self.shutdown_blocking();
}
}
// Warp Filter to pass the gRPC client
fn with_grpc_client(
grpc_client: EndpointClient<Channel>,
) -> impl Filter<Extract = (EndpointClient<Channel>,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || grpc_client.clone())
}

View File

@ -9,7 +9,11 @@ use jsonrpc_http_server::{
RequestMiddleware, RequestMiddlewareAction,
};
use super::cookie::Cookie;
use crate::server::{
cookie::Cookie,
jsonrpc::{JsonRpcError, JsonRpcRequest, JsonRpcResponse},
Empty, EndpointClient, Request as TonicRequest,
};
/// HTTP [`RequestMiddleware`] with compatibility workarounds.
///
@ -60,6 +64,138 @@ impl With<Cookie> for HttpRequestMiddleware {
}
}
impl HttpRequestMiddleware {
/// Check if the number of parameters matches the expected length.
pub fn check_parameters_length(
request: JsonRpcRequest,
expeted_len: usize,
) -> Option<JsonRpcResponse> {
if request.params().len() != expeted_len {
return Some(JsonRpcResponse::new(
serde_json::to_value("invalid number of parameters")
.expect("string to serde_json::Value conversion"),
request.id().to_string(),
));
}
None
}
/// Handle incoming JSON-RPC requests
pub async fn handle_request(
&self,
request: JsonRpcRequest,
headers: warp::http::HeaderMap,
mut grpc_client: EndpointClient<tonic::transport::Channel>,
) -> Result<impl warp::Reply, warp::Rejection> {
tracing::trace!(?request, "original HTTP request");
// Check if the request is authenticated
if !self.check_credentials(&headers) {
let error = JsonRpcError {
code: 401,
message: "unauthenticated method".to_string(),
};
let response = JsonRpcResponse::new(
serde_json::to_value(error).unwrap(),
request.id().to_string(),
);
return Ok(warp::reply::json(&response));
}
// Match the JSON-RPC `method` field to call the appropriate gRPC method
match request.method() {
"getinfo" => {
// Check for exactly zero parameter
if let Some(response) = Self::check_parameters_length(request.clone(), 0) {
return Ok(warp::reply::json(&response));
}
let grpc_request = TonicRequest::new(Empty {});
let grpc_response = grpc_client
.get_info(grpc_request)
.await
.map(|grpc_response| serde_json::to_value(grpc_response.into_inner()))
.unwrap_or_else(|e| serde_json::to_value(e.to_string()))
.map_err(|_| warp::reject::reject())?;
let json_response = JsonRpcResponse::new(grpc_response, request.id().to_string());
Ok(warp::reply::json(&json_response))
}
"getblockchaininfo" => {
// Check for exactly zero parameter
if let Some(response) = Self::check_parameters_length(request.clone(), 0) {
return Ok(warp::reply::json(&response));
}
let grpc_request = TonicRequest::new(Empty {});
let grpc_response = grpc_client
.get_blockchain_info(grpc_request)
.await
.map(|grpc_response| serde_json::to_value(grpc_response.into_inner()))
.unwrap_or_else(|e| serde_json::to_value(e.to_string()))
.map_err(|_| warp::reject::reject())?;
let json_response = JsonRpcResponse::new(grpc_response, request.id().to_string());
Ok(warp::reply::json(&json_response))
}
"getaddressbalance" => {
// Check for exactly one parameter
if let Some(response) = Self::check_parameters_length(request.clone(), 1) {
return Ok(warp::reply::json(&response));
}
let address_params: crate::server::AddressStrings =
serde_json::from_value(request.params().get(0).cloned().unwrap_or_default())
.map_err(|_| warp::reject::reject())?;
let grpc_response = grpc_client
.get_address_balance(address_params)
.await
.map(|grpc_response| serde_json::to_value(grpc_response.into_inner()))
.unwrap_or_else(|e| serde_json::to_value(e.to_string()))
.map_err(|_| warp::reject::reject())?;
let json_response = JsonRpcResponse::new(grpc_response, request.id().to_string());
Ok(warp::reply::json(&json_response))
}
"sendrawtransaction" => {
// Check for exactly one parameter
if let Some(response) = Self::check_parameters_length(request.clone(), 1) {
return Ok(warp::reply::json(&response));
}
let hex_param = request.params()[0].as_str().ok_or_else(|| warp::reject())?;
let grpc_request = TonicRequest::new(crate::server::RawTransactionHex {
hex: hex_param.to_string(),
});
let grpc_response = grpc_client
.send_raw_transaction(grpc_request)
.await
.map(|grpc_response| serde_json::to_value(grpc_response.into_inner()))
.unwrap_or_else(|e| serde_json::to_value(e.to_string()))
.map_err(|_| warp::reject::reject())?;
let json_response = JsonRpcResponse::new(grpc_response, request.id().to_string());
Ok(warp::reply::json(&json_response))
}
_ => {
let json_response = JsonRpcResponse::new(
serde_json::to_value("unsupported method")
.expect("string to serde_json::Value conversion"),
request.id().to_string(),
);
Ok(warp::reply::json(&json_response))
}
}
}
}
impl RequestMiddleware for HttpRequestMiddleware {
fn on_request(&self, mut request: Request<Body>) -> RequestMiddlewareAction {
tracing::trace!(?request, "original HTTP request");

View File

@ -0,0 +1,50 @@
//! Define JSON-RPC request and response structures
/// The JSON-RPC request
#[derive(Debug, Clone, serde::Deserialize)]
pub struct JsonRpcRequest {
jsonrpc: String,
id: String,
method: String,
params: Vec<serde_json::Value>,
}
impl JsonRpcRequest {
/// Get the method name from the request
pub fn method(&self) -> &str {
&self.method
}
/// Get the parameters from the request
pub fn params(&self) -> &[serde_json::Value] {
&self.params
}
/// Get the request ID
pub fn id(&self) -> &str {
&self.id
}
}
/// The JSON-RPC response
#[derive(serde::Serialize)]
pub struct JsonRpcResponse {
//jsonrpc: String,
result: serde_json::Value,
id: String,
//error: Option<String>,
}
impl JsonRpcResponse {
///
pub fn new(result: serde_json::Value, id: String) -> Self {
Self { result, id }
}
}
/// The JSON-RPC error
#[derive(serde::Serialize)]
pub struct JsonRpcError {
pub code: i64,
pub message: String,
}