From 9709059943d461ad98784618b78581cdfbf4a360 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Fri, 1 Nov 2024 18:44:39 -0300 Subject: [PATCH] initial work --- Cargo.lock | 156 ++++++- zebra-rpc/Cargo.toml | 15 +- zebra-rpc/build.rs | 9 +- zebra-rpc/proto/endpoint.proto | 99 +++++ zebra-rpc/src/methods.rs | 379 ++++++++++++++++++ zebra-rpc/src/methods/types.rs | 1 + .../src/methods/types/get_blockchain_info.rs | 5 + zebra-rpc/src/methods/types/grpc.rs | 66 +++ zebra-rpc/src/server.rs | 173 ++++---- .../src/server/http_request_compatibility.rs | 138 ++++++- zebra-rpc/src/server/jsonrpc.rs | 50 +++ 11 files changed, 980 insertions(+), 111 deletions(-) create mode 100644 zebra-rpc/proto/endpoint.proto create mode 100644 zebra-rpc/src/methods/types/grpc.rs create mode 100644 zebra-rpc/src/server/jsonrpc.rs diff --git a/Cargo.lock b/Cargo.lock index b45ed6190..abc15e567 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1155,6 +1155,12 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "data-encoding" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" + [[package]] name = "debugid" version = "0.8.0" @@ -1770,6 +1776,30 @@ dependencies = [ "num-traits", ] +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 0.2.12", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http 0.2.12", +] + [[package]] name = "heck" version = "0.3.3" @@ -1971,9 +2001,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" dependencies = [ "bytes", "futures-channel", @@ -2010,7 +2040,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "pin-project-lite", "tokio", @@ -2028,7 +2058,7 @@ dependencies = [ "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.4.1", + "hyper 1.5.0", "pin-project-lite", "socket2", "tokio", @@ -2571,7 +2601,7 @@ checksum = "b4f0c8427b39666bf970460908b213ec09b3b350f20c0c2eabcbba51704a08e6" dependencies = [ "base64 0.22.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "indexmap 2.5.0", "ipnet", @@ -2604,6 +2634,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -2637,6 +2677,24 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26c4d16a3d2b0e89ec6e7d509cf791545fcb48cbc8fc2fb2e96a492defda9140" +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 0.2.12", + "httparse", + "log", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "multimap" version = "0.10.0" @@ -3824,6 +3882,12 @@ dependencies = [ "zip32", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -4109,6 +4173,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.8" @@ -4573,6 +4648,18 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.6.10" @@ -4669,7 +4756,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-timeout", "hyper-util", "percent-encoding", @@ -4947,6 +5034,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -5089,6 +5195,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.2" @@ -5239,6 +5351,35 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http 0.2.12", + "hyper 0.14.30", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tungstenite", + "tokio-util 0.7.12", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -6094,6 +6235,7 @@ dependencies = [ "tonic-reflection", "tower", "tracing", + "warp", "zcash_address 0.5.0", "zcash_primitives 0.17.0", "zebra-chain", @@ -6285,7 +6427,7 @@ dependencies = [ "howudoin", "http-body-util", "humantime-serde", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "indexmap 2.5.0", "indicatif", diff --git a/zebra-rpc/Cargo.toml b/zebra-rpc/Cargo.toml index d3976e15e..e5ac61fba 100644 --- a/zebra-rpc/Cargo.toml +++ b/zebra-rpc/Cargo.toml @@ -22,10 +22,6 @@ categories = [ [features] indexer-rpcs = [ - "tonic-build", - "tonic", - "tonic-reflection", - "prost", "tokio-stream", ] @@ -82,11 +78,12 @@ tokio = { version = "1.40.0", features = [ ] } tower = "0.4.13" -# indexer-rpcs dependencies -tonic = { version = "0.12.3", optional = true } -tonic-reflection = { version = "0.12.3", optional = true } -prost = { version = "0.13.3", optional = true } +# indexer-rpcs and endpoint-rpcs dependencies +tonic = "0.12.3" +tonic-reflection = "0.12.3" +prost = "0.13.3" tokio-stream = { version = "0.1.16", optional = true } +warp = "0.3.7" tracing = "0.1.39" @@ -116,7 +113,7 @@ zebra-script = { path = "../zebra-script", version = "1.0.0-beta.40" } zebra-state = { path = "../zebra-state", version = "1.0.0-beta.40" } [build-dependencies] -tonic-build = { version = "0.12.3", optional = true } +tonic-build = "0.12.3" [dev-dependencies] insta = { version = "1.40.0", features = ["redactions", "json", "ron"] } diff --git a/zebra-rpc/build.rs b/zebra-rpc/build.rs index bbb84746f..78206f315 100644 --- a/zebra-rpc/build.rs +++ b/zebra-rpc/build.rs @@ -1,9 +1,16 @@ //! Compile proto files fn main() -> Result<(), Box> { + use std::{env, path::PathBuf}; + let out_dir = env::var("OUT_DIR").map(PathBuf::from); + + tonic_build::configure() + .type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]") + .file_descriptor_set_path(out_dir.unwrap().join("endpoint_descriptor.bin")) + .compile_protos(&["proto/endpoint.proto"], &[""])?; + #[cfg(feature = "indexer-rpcs")] { - use std::{env, path::PathBuf}; let out_dir = env::var("OUT_DIR").map(PathBuf::from); tonic_build::configure() .type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]") diff --git a/zebra-rpc/proto/endpoint.proto b/zebra-rpc/proto/endpoint.proto new file mode 100644 index 000000000..680f72f55 --- /dev/null +++ b/zebra-rpc/proto/endpoint.proto @@ -0,0 +1,99 @@ +syntax = "proto3"; +package zebra.endpoint.rpc; + +// Used by methods that take no arguments. +message Empty {}; + +service Endpoint { + // + rpc get_info(Empty) returns (GetInfo); + + // + rpc get_blockchain_info(Empty) returns (GetBlockChainInfo); + + // + rpc get_address_balance(AddressStrings) returns (AddressBalance); + + // + rpc send_raw_transaction(RawTransactionHex) returns (SentTransactionHash); +} + +// A response to a GetInfo call. +message GetInfo { + // The node version build number + string build = 1; + // The server sub-version identifier, used as the network protocol user-agent + string subversion = 2; +} + +// Define a GetBlockChainInfo message for the response +message GetBlockChainInfo { + // Current network name as defined in BIP70 (main, test, regtest) + string chain = 1; + + // The current number of blocks processed in the server, numeric + uint32 blocks = 2; + + // The hash of the currently best block, in big-endian order, hex-encoded + string best_block_hash = 3; + + // If syncing, the estimated height of the chain, else the current best height, numeric. + // + // In Zebra, this is always the height estimate, so it might be a little inaccurate. + uint32 estimated_height = 4; + + // Value pool balances + repeated ValuePoolBalance value_pools = 5; + + // Status of network upgrades + repeated UpgradeEntry upgrades = 6; + + // Branch IDs of the current and upcoming consensus rules + TipConsensusBranch consensus = 7; +} + +// Define a ValuePoolBalance message, replacing your array with repeated fields if needed +message ValuePoolBalance { + string id = 1; + double chain_value = 2; + uint64 chain_value_zat = 3; +} + +// +message UpgradeEntry { + string key = 1; + NetworkUpgradeInfo value = 2; +} + +// Define a NetworkUpgradeInfo message to represent upgrade statuses +message NetworkUpgradeInfo { + string name = 1; + uint32 activation_height = 2; + string status = 3; +} + +// Define the TipConsensusBranch message to represent consensus branches +message TipConsensusBranch { + string chain_tip = 1; + string next_block = 2; +} + +// +message AddressStrings { + repeated string addresses = 1; +} + +// +message AddressBalance { + uint64 balance = 1; +} + +// +message RawTransactionHex { + string hex = 1; +} + +// +message SentTransactionHash { + string hash = 1; +} diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index 8becc5bb7..a8581752d 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -15,6 +15,7 @@ use indexmap::IndexMap; use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use tokio::{sync::broadcast, task::JoinHandle}; +use tonic::{Response, Status}; use tower::{Service, ServiceExt}; use tracing::Instrument; @@ -1547,6 +1548,12 @@ enum NetworkUpgradeStatus { Pending, } +impl std::fmt::Display for NetworkUpgradeStatus { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + /// The [`ConsensusBranchId`]s for the tip and the next block. /// /// These branch IDs are different when the next block is a network upgrade activation block. @@ -1883,3 +1890,375 @@ pub fn height_from_signed_int(index: i32, tip_height: Height) -> Result Ok(Height(sanitized_height)) } } + +/// gRPC method implementations. +#[derive(Clone)] +pub struct GrpcImpl +where + Mempool: Service< + mempool::Request, + Response = mempool::Response, + Error = zebra_node_services::BoxError, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, + State: Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + State::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, +{ + // Configuration + // + /// Zebra's application version, with build metadata. + build_version: String, + + /// Zebra's RPC user agent. + user_agent: String, + + /// The configured network for this RPC service. + network: Network, + + /// Test-only option that makes Zebra say it is at the chain tip, + /// no matter what the estimated height or local clock is. + debug_force_finished_sync: bool, + + /// Test-only option that makes RPC responses more like `zcashd`. + #[allow(dead_code)] + debug_like_zcashd: bool, + + // Services + // + /// A handle to the mempool service. + mempool: Mempool, + + /// A handle to the state service. + state: State, + + /// Allows efficient access to the best tip of the blockchain. + latest_chain_tip: Tip, + + // Tasks + // + /// A sender component of a channel used to send transactions to the mempool queue. + queue_sender: broadcast::Sender, +} + +impl GrpcImpl +where + Mempool: Service< + mempool::Request, + Response = mempool::Response, + Error = zebra_node_services::BoxError, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, + State: Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + State::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, +{ + /// Create a new instance of the RPC handler. + // + // TODO: + // - put some of the configs or services in their own struct? + #[allow(clippy::too_many_arguments)] + pub fn new( + build_version: VersionString, + user_agent: UserAgentString, + network: Network, + debug_force_finished_sync: bool, + debug_like_zcashd: bool, + mempool: Mempool, + state: State, + latest_chain_tip: Tip, + ) -> (Self, JoinHandle<()>) + where + VersionString: ToString + Clone + Send + 'static, + UserAgentString: ToString + Clone + Send + 'static, + { + let (runner, queue_sender) = Queue::start(); + + let mut build_version = build_version.to_string(); + let user_agent = user_agent.to_string(); + + // Match zcashd's version format, if the version string has anything in it + if !build_version.is_empty() && !build_version.starts_with('v') { + build_version.insert(0, 'v'); + } + + let rpc_impl = GrpcImpl { + build_version, + user_agent, + network: network.clone(), + debug_force_finished_sync, + debug_like_zcashd, + mempool: mempool.clone(), + state: state.clone(), + latest_chain_tip: latest_chain_tip.clone(), + queue_sender, + }; + + // run the process queue + let rpc_tx_queue_task_handle = tokio::spawn( + runner + .run(mempool, state, latest_chain_tip, network) + .in_current_span(), + ); + + (rpc_impl, rpc_tx_queue_task_handle) + } +} + +#[tonic::async_trait] +impl crate::server::endpoint_server::Endpoint for GrpcImpl +where + Mempool: Service< + mempool::Request, + Response = mempool::Response, + Error = zebra_node_services::BoxError, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, + State: Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + State::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, +{ + async fn get_info( + &self, + _: tonic::Request, + ) -> std::result::Result, Status> { + let response = crate::server::GetInfo { + build: self.build_version.clone(), + subversion: self.user_agent.clone(), + }; + + Ok(Response::new(response)) + } + + async fn get_blockchain_info( + &self, + _: tonic::Request, + ) -> std::result::Result, Status> { + let network = self.network.clone(); + let debug_force_finished_sync = self.debug_force_finished_sync; + let mut state = self.state.clone(); + + // `chain` field + let chain = network.bip70_network_name(); + + let request = zebra_state::ReadRequest::TipPoolValues; + let response: zebra_state::ReadResponse = state + .ready() + .and_then(|service| service.call(request)) + .await + .map_server_error() + .unwrap(); + + let zebra_state::ReadResponse::TipPoolValues { + tip_height, + tip_hash, + value_balance, + } = response + else { + unreachable!("unmatched response to a TipPoolValues request") + }; + + let request = zebra_state::ReadRequest::BlockHeader(tip_hash.into()); + let response: zebra_state::ReadResponse = state + .ready() + .and_then(|service| service.call(request)) + .await + .map_server_error() + .unwrap(); + + let zebra_state::ReadResponse::BlockHeader(block_header) = response else { + unreachable!("unmatched response to a BlockHeader request") + }; + + let tip_block_time = block_header + .ok_or_server_error("unexpectedly could not read best chain tip block header") + .unwrap() + .time; + + let now = Utc::now(); + let zebra_estimated_height = + NetworkChainTipHeightEstimator::new(tip_block_time, tip_height, &network) + .estimate_height_at(now); + + // If we're testing the mempool, force the estimated height to be the actual tip height, otherwise, + // check if the estimated height is below Zebra's latest tip height, or if the latest tip's block time is + // later than the current time on the local clock. + let estimated_height = if tip_block_time > now + || zebra_estimated_height < tip_height + || debug_force_finished_sync + { + tip_height + } else { + zebra_estimated_height + }; + + // `upgrades` object + // + // Get the network upgrades in height order, like `zcashd`. + let mut upgrades = IndexMap::new(); + for (activation_height, network_upgrade) in network.full_activation_list() { + // Zebra defines network upgrades based on incompatible consensus rule changes, + // but zcashd defines them based on ZIPs. + // + // All the network upgrades with a consensus branch ID are the same in Zebra and zcashd. + if let Some(branch_id) = network_upgrade.branch_id() { + // zcashd's RPC seems to ignore Disabled network upgrades, so Zebra does too. + let status = if tip_height >= activation_height { + NetworkUpgradeStatus::Active + } else { + NetworkUpgradeStatus::Pending + }; + + let upgrade = NetworkUpgradeInfo { + name: network_upgrade, + activation_height, + status, + }; + upgrades.insert(ConsensusBranchIdHex(branch_id), upgrade); + } + } + + // `consensus` object + let next_block_height = + (tip_height + 1).expect("valid chain tips are a lot less than Height::MAX"); + let consensus = TipConsensusBranch { + chain_tip: ConsensusBranchIdHex( + NetworkUpgrade::current(&network, tip_height) + .branch_id() + .unwrap_or(ConsensusBranchId::RPC_MISSING_ID), + ), + next_block: ConsensusBranchIdHex( + NetworkUpgrade::current(&network, next_block_height) + .branch_id() + .unwrap_or(ConsensusBranchId::RPC_MISSING_ID), + ), + }; + + let response = GetBlockChainInfo { + chain, + blocks: tip_height, + best_block_hash: tip_hash, + estimated_height: estimated_height, + value_pools: types::ValuePoolBalance::from_value_balance(value_balance), + upgrades, + consensus, + } + .into(); + + Ok(Response::new(response)) + } + + async fn get_address_balance( + &self, + address_strings: tonic::Request, + ) -> std::result::Result, Status> { + let state = self.state.clone(); + + // TODO: fix all the unwraps here and in the above calls, use status::from_error as `send_raw_transaction`. + let converted_addresses: AddressStrings = address_strings.into_inner().into(); + let valid_addresses: HashSet
= converted_addresses.valid_addresses().unwrap(); + + let request = zebra_state::ReadRequest::AddressBalance(valid_addresses); + let response = state.oneshot(request).await.map_server_error().unwrap(); + + let res = match response { + zebra_state::ReadResponse::AddressBalance(balance) => AddressBalance { + balance: u64::from(balance), + }, + _ => unreachable!("Unexpected response from state service: {response:?}"), + } + .into(); + + Ok(Response::new(res)) + } + + async fn send_raw_transaction( + &self, + raw_transaction_hex: tonic::Request, + ) -> std::result::Result, Status> { + let mempool = self.mempool.clone(); + let queue_sender = self.queue_sender.clone(); + + let raw_transaction_hex = raw_transaction_hex.into_inner().hex; + + let raw_transaction_bytes = Vec::from_hex(raw_transaction_hex).map_err(|_| { + Status::invalid_argument("raw transaction is not specified as a hex string") + })?; + + let raw_transaction = Transaction::zcash_deserialize(&*raw_transaction_bytes) + .map_err(|_| Status::invalid_argument("raw transaction is structurally invalid"))?; + + let transaction_hash = raw_transaction.hash(); + + // send transaction to the rpc queue, ignore any error. + let unmined_transaction = UnminedTx::from(raw_transaction.clone()); + let _ = queue_sender.send(unmined_transaction); + + let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into()); + let request = mempool::Request::Queue(vec![transaction_parameter]); + + let response = mempool + .oneshot(request) + .await + .map_err(|e| Status::from_error(e))?; + + let mut queue_results = match response { + mempool::Response::Queued(results) => results, + _ => unreachable!("incorrect response variant from mempool service"), + }; + + assert_eq!( + queue_results.len(), + 1, + "mempool service returned more results than expected" + ); + + let queue_result = queue_results + .pop() + .expect("there should be exactly one item in Vec") + .inspect_err(|err| tracing::debug!("sent transaction to mempool: {:?}", &err)) + .map_err(|e| Status::from_error(e))? + .await; + + tracing::debug!("sent transaction to mempool: {:?}", &queue_result); + + let res = queue_result + .map_err(|e| Status::from_error(Box::new(e))) + .map(|_| SentTransactionHash(transaction_hash)) + .map_server_error(); + + Ok(Response::new( + res.map_err(|e| Status::from_error(Box::new(e)))?.into(), + )) + } +} diff --git a/zebra-rpc/src/methods/types.rs b/zebra-rpc/src/methods/types.rs index d29e697f0..2e81d5151 100644 --- a/zebra-rpc/src/methods/types.rs +++ b/zebra-rpc/src/methods/types.rs @@ -1,6 +1,7 @@ //! Types used in RPC methods. mod get_blockchain_info; +pub mod grpc; mod zec; pub use get_blockchain_info::ValuePoolBalance; diff --git a/zebra-rpc/src/methods/types/get_blockchain_info.rs b/zebra-rpc/src/methods/types/get_blockchain_info.rs index a2d1e7816..76fb7fecf 100644 --- a/zebra-rpc/src/methods/types/get_blockchain_info.rs +++ b/zebra-rpc/src/methods/types/get_blockchain_info.rs @@ -69,4 +69,9 @@ impl ValuePoolBalance { Self::deferred(value_balance.deferred_amount()), ] } + + /// Returns the pool's name, ZEC balance, and zatoshi balance. + pub fn data(&self) -> (&str, Zec, Amount) { + (&self.id, self.chain_value, self.chain_value_zat) + } } diff --git a/zebra-rpc/src/methods/types/grpc.rs b/zebra-rpc/src/methods/types/grpc.rs new file mode 100644 index 000000000..342219159 --- /dev/null +++ b/zebra-rpc/src/methods/types/grpc.rs @@ -0,0 +1,66 @@ +//! gRPC types and conversions for Zebra RPC methods. +impl From for crate::server::GetBlockChainInfo { + fn from(info: crate::methods::GetBlockChainInfo) -> Self { + let value_pools: Vec = info + .value_pools + .iter() + .map(|pool| crate::server::ValuePoolBalance { + id: pool.data().0.to_string(), + chain_value: pool.data().1.lossy_zec(), + chain_value_zat: pool.data().2.into(), + }) + .collect(); + + let upgrades: Vec = info + .upgrades + .into_iter() + .map(|(key, upgrade_info)| crate::server::UpgradeEntry { + key: key.0.to_string(), + value: Some(crate::server::NetworkUpgradeInfo { + name: upgrade_info.name.to_string(), + status: upgrade_info.status.to_string(), + activation_height: upgrade_info.activation_height.0, + }), + }) + .collect(); + + let consensus = crate::server::TipConsensusBranch { + chain_tip: info.consensus.chain_tip.0.to_string(), + next_block: info.consensus.next_block.0.to_string(), + }; + + crate::server::GetBlockChainInfo { + chain: info.chain, + blocks: info.blocks.0, + best_block_hash: hex::encode(info.best_block_hash.0), + estimated_height: info.estimated_height.0, + value_pools, + upgrades, + consensus: Some(consensus), + } + } +} + +impl From for crate::methods::AddressStrings { + fn from(addresses: crate::server::AddressStrings) -> Self { + crate::methods::AddressStrings { + addresses: addresses.addresses, + } + } +} + +impl From for crate::server::AddressBalance { + fn from(balance: crate::methods::AddressBalance) -> Self { + crate::server::AddressBalance { + balance: balance.balance, + } + } +} + +impl From for crate::server::SentTransactionHash { + fn from(hash: crate::methods::SentTransactionHash) -> Self { + crate::server::SentTransactionHash { + hash: hash.0.to_string(), + } + } +} diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index 73fcde65f..3b3cdfd7a 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -17,6 +17,12 @@ use tokio::task::JoinHandle; use tower::Service; use tracing::*; +use tonic::{ + transport::{Channel, Server, Uri}, + Request, +}; +use warp::Filter; + use zebra_chain::{ block, chain_sync_status::ChainSyncStatus, chain_tip::ChainTip, parameters::Network, }; @@ -25,8 +31,9 @@ use zebra_node_services::mempool; use crate::{ config::Config, - methods::{Rpc, RpcImpl}, + methods::{GrpcImpl, Rpc, RpcImpl}, server::{ + endpoint_client::EndpointClient, endpoint_server::EndpointServer, http_request_compatibility::HttpRequestMiddleware, rpc_call_compatibility::FixRpcResponseMiddleware, }, @@ -37,11 +44,19 @@ use crate::methods::{GetBlockTemplateRpc, GetBlockTemplateRpcImpl}; pub mod cookie; pub mod http_request_compatibility; +pub mod jsonrpc; pub mod rpc_call_compatibility; #[cfg(test)] mod tests; +// The generated endpoint proto +tonic::include_proto!("zebra.endpoint.rpc"); + +/// The file descriptor set for the generated endpoint proto +pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("endpoint_descriptor"); + /// Zebra RPC Server #[derive(Clone)] pub struct RpcServer { @@ -150,33 +165,22 @@ impl RpcServer { AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { if let Some(listen_addr) = config.listen_addr { + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build_v1() + .expect("Unable to build reflection service"); + info!("Trying to open RPC endpoint at {}...", listen_addr,); - // Create handler compatible with V1 and V2 RPC protocols - let mut io: MetaIoHandler<(), _> = - MetaIoHandler::new(Compatibility::Both, FixRpcResponseMiddleware); + // `grpc_server_listen_addr` should be a config argument + let grpc_server_listen_addr = "127.0.0.1:8080".parse().unwrap(); + let grpc_server_url: Uri = format!("http://{}", grpc_server_listen_addr) + .parse() + .unwrap(); - #[cfg(feature = "getblocktemplate-rpcs")] - { - // Initialize the getblocktemplate rpc method handler - let get_block_template_rpc_impl = GetBlockTemplateRpcImpl::new( - &network, - mining_config.clone(), - mempool.clone(), - state.clone(), - latest_chain_tip.clone(), - block_verifier_router, - sync_status, - address_book, - ); - - io.extend_with(get_block_template_rpc_impl.to_delegate()); - } - - // Initialize the rpc methods with the zebra version - let (rpc_impl, rpc_tx_queue_task_handle) = RpcImpl::new( + let (grpc, rpc_tx_queue_task_handle) = GrpcImpl::new( build_version.clone(), - user_agent, + user_agent.clone(), network.clone(), config.debug_force_finished_sync, #[cfg(feature = "getblocktemplate-rpcs")] @@ -188,88 +192,64 @@ impl RpcServer { latest_chain_tip, ); - io.extend_with(rpc_impl.to_delegate()); + // Start the gRPC server + let _ = tokio::spawn(async move { + let server_instance = Server::builder() + .accept_http1(true) + .add_service(reflection_service) + .add_service(EndpointServer::new(grpc)) + .serve(grpc_server_listen_addr) + .await + .expect("Unable to start RPC server"); - // If zero, automatically scale threads to the number of CPU cores - let mut parallel_cpu_threads = config.parallel_cpu_threads; - if parallel_cpu_threads == 0 { - parallel_cpu_threads = available_parallelism().map(usize::from).unwrap_or(1); - } + info!("Started gRPC server at {}", grpc_server_listen_addr); + server_instance + }); - // The server is a blocking task, which blocks on executor shutdown. - // So we need to start it in a std::thread. - // (Otherwise tokio panics on RPC port conflict, which shuts down the RPC server.) - let span = Span::current(); - let start_server = move || { - span.in_scope(|| { - let middleware = if config.enable_cookie_auth { - let cookie = Cookie::default(); - cookie::write_to_disk(&cookie, &config.cookie_dir) - .expect("Zebra must be able to write the auth cookie to the disk"); - HttpRequestMiddleware::default().with(cookie) - } else { - HttpRequestMiddleware::default() - }; - - // Use a different tokio executor from the rest of Zebra, - // so that large RPCs and any task handling bugs don't impact Zebra. - let server_instance = ServerBuilder::new(io) - .threads(parallel_cpu_threads) - // TODO: disable this security check if we see errors from lightwalletd - //.allowed_hosts(DomainsValidation::Disabled) - .request_middleware(middleware) - .start_http(&listen_addr) - .expect("Unable to start RPC server"); - - info!("{OPENED_RPC_ENDPOINT_MSG}{}", server_instance.address()); - - let close_handle = server_instance.close_handle(); - - let rpc_server_handle = RpcServer { - config, - network, - build_version: build_version.to_string(), - close_handle, - }; - - (server_instance, rpc_server_handle) - }) + // Create the middleware for the HTTP request + let middleware = if config.enable_cookie_auth { + let cookie = Cookie::default(); + cookie::write_to_disk(&cookie, &config.cookie_dir) + .expect("Zebra must be able to write the auth cookie to the disk"); + HttpRequestMiddleware::default().with(cookie) + } else { + HttpRequestMiddleware::default() }; - // Propagate panics from the std::thread - let (server_instance, rpc_server_handle) = match std::thread::spawn(start_server).join() - { - Ok(rpc_server) => rpc_server, - Err(panic_object) => panic::resume_unwind(panic_object), - }; + // Create the warp proxy server + let rpc_server_task_handle = tokio::spawn(async move { + let grpc_channel = Channel::builder(grpc_server_url) + .connect() + .await + .expect("Unable to connect to gRPC server"); - // The server is a blocking task, which blocks on executor shutdown. - // So we need to wait on it on a std::thread, inside a tokio blocking task. - // (Otherwise tokio panics when we shut down the RPC server.) - let span = Span::current(); - let wait_on_server = move || { - span.in_scope(|| { - server_instance.wait(); + let grpc_client = EndpointClient::new(grpc_channel); + let middleware = std::sync::Arc::new(middleware); - info!("Stopped RPC endpoint"); - }) - }; + let proxy = warp::post() + .and(warp::path::end()) + .and(warp::body::json()) + .and(warp::header::headers_cloned()) + .and(with_grpc_client(grpc_client)) // Pass the Arc directly + .and_then(move |request, headers, grpc_client| { + let middleware = std::sync::Arc::clone(&middleware); + async move { + middleware + .handle_request(request, headers, grpc_client) + .await + } + }); - let span = Span::current(); - let rpc_server_task_handle = tokio::task::spawn_blocking(move || { - let thread_handle = std::thread::spawn(wait_on_server); + warp::serve(proxy).run(listen_addr).await; - // Propagate panics from the inner std::thread to the outer tokio blocking task - span.in_scope(|| match thread_handle.join() { - Ok(()) => (), - Err(panic_object) => panic::resume_unwind(panic_object), - }) + info!("{OPENED_RPC_ENDPOINT_MSG}{}", listen_addr); }); ( rpc_server_task_handle, rpc_tx_queue_task_handle, - Some(rpc_server_handle), + //Some(rpc_server_handle), + None, ) } else { // There is no RPC port, so the RPC tasks do nothing. @@ -345,3 +325,10 @@ impl Drop for RpcServer { self.shutdown_blocking(); } } + +// Warp Filter to pass the gRPC client +fn with_grpc_client( + grpc_client: EndpointClient, +) -> impl Filter,), Error = std::convert::Infallible> + Clone { + warp::any().map(move || grpc_client.clone()) +} diff --git a/zebra-rpc/src/server/http_request_compatibility.rs b/zebra-rpc/src/server/http_request_compatibility.rs index 89925c229..3cb8b7f3b 100644 --- a/zebra-rpc/src/server/http_request_compatibility.rs +++ b/zebra-rpc/src/server/http_request_compatibility.rs @@ -9,7 +9,11 @@ use jsonrpc_http_server::{ RequestMiddleware, RequestMiddlewareAction, }; -use super::cookie::Cookie; +use crate::server::{ + cookie::Cookie, + jsonrpc::{JsonRpcError, JsonRpcRequest, JsonRpcResponse}, + Empty, EndpointClient, Request as TonicRequest, +}; /// HTTP [`RequestMiddleware`] with compatibility workarounds. /// @@ -60,6 +64,138 @@ impl With for HttpRequestMiddleware { } } +impl HttpRequestMiddleware { + /// Check if the number of parameters matches the expected length. + pub fn check_parameters_length( + request: JsonRpcRequest, + expeted_len: usize, + ) -> Option { + if request.params().len() != expeted_len { + return Some(JsonRpcResponse::new( + serde_json::to_value("invalid number of parameters") + .expect("string to serde_json::Value conversion"), + request.id().to_string(), + )); + } + None + } + + /// Handle incoming JSON-RPC requests + pub async fn handle_request( + &self, + request: JsonRpcRequest, + headers: warp::http::HeaderMap, + mut grpc_client: EndpointClient, + ) -> Result { + tracing::trace!(?request, "original HTTP request"); + + // Check if the request is authenticated + if !self.check_credentials(&headers) { + let error = JsonRpcError { + code: 401, + message: "unauthenticated method".to_string(), + }; + + let response = JsonRpcResponse::new( + serde_json::to_value(error).unwrap(), + request.id().to_string(), + ); + + return Ok(warp::reply::json(&response)); + } + + // Match the JSON-RPC `method` field to call the appropriate gRPC method + match request.method() { + "getinfo" => { + // Check for exactly zero parameter + if let Some(response) = Self::check_parameters_length(request.clone(), 0) { + return Ok(warp::reply::json(&response)); + } + + let grpc_request = TonicRequest::new(Empty {}); + let grpc_response = grpc_client + .get_info(grpc_request) + .await + .map(|grpc_response| serde_json::to_value(grpc_response.into_inner())) + .unwrap_or_else(|e| serde_json::to_value(e.to_string())) + .map_err(|_| warp::reject::reject())?; + let json_response = JsonRpcResponse::new(grpc_response, request.id().to_string()); + + Ok(warp::reply::json(&json_response)) + } + "getblockchaininfo" => { + // Check for exactly zero parameter + if let Some(response) = Self::check_parameters_length(request.clone(), 0) { + return Ok(warp::reply::json(&response)); + } + + let grpc_request = TonicRequest::new(Empty {}); + let grpc_response = grpc_client + .get_blockchain_info(grpc_request) + .await + .map(|grpc_response| serde_json::to_value(grpc_response.into_inner())) + .unwrap_or_else(|e| serde_json::to_value(e.to_string())) + .map_err(|_| warp::reject::reject())?; + let json_response = JsonRpcResponse::new(grpc_response, request.id().to_string()); + + Ok(warp::reply::json(&json_response)) + } + "getaddressbalance" => { + // Check for exactly one parameter + if let Some(response) = Self::check_parameters_length(request.clone(), 1) { + return Ok(warp::reply::json(&response)); + } + + let address_params: crate::server::AddressStrings = + serde_json::from_value(request.params().get(0).cloned().unwrap_or_default()) + .map_err(|_| warp::reject::reject())?; + + let grpc_response = grpc_client + .get_address_balance(address_params) + .await + .map(|grpc_response| serde_json::to_value(grpc_response.into_inner())) + .unwrap_or_else(|e| serde_json::to_value(e.to_string())) + .map_err(|_| warp::reject::reject())?; + + let json_response = JsonRpcResponse::new(grpc_response, request.id().to_string()); + + Ok(warp::reply::json(&json_response)) + } + "sendrawtransaction" => { + // Check for exactly one parameter + if let Some(response) = Self::check_parameters_length(request.clone(), 1) { + return Ok(warp::reply::json(&response)); + } + + let hex_param = request.params()[0].as_str().ok_or_else(|| warp::reject())?; + let grpc_request = TonicRequest::new(crate::server::RawTransactionHex { + hex: hex_param.to_string(), + }); + + let grpc_response = grpc_client + .send_raw_transaction(grpc_request) + .await + .map(|grpc_response| serde_json::to_value(grpc_response.into_inner())) + .unwrap_or_else(|e| serde_json::to_value(e.to_string())) + .map_err(|_| warp::reject::reject())?; + + let json_response = JsonRpcResponse::new(grpc_response, request.id().to_string()); + + Ok(warp::reply::json(&json_response)) + } + _ => { + let json_response = JsonRpcResponse::new( + serde_json::to_value("unsupported method") + .expect("string to serde_json::Value conversion"), + request.id().to_string(), + ); + + Ok(warp::reply::json(&json_response)) + } + } + } +} + impl RequestMiddleware for HttpRequestMiddleware { fn on_request(&self, mut request: Request) -> RequestMiddlewareAction { tracing::trace!(?request, "original HTTP request"); diff --git a/zebra-rpc/src/server/jsonrpc.rs b/zebra-rpc/src/server/jsonrpc.rs new file mode 100644 index 000000000..f9941ee5a --- /dev/null +++ b/zebra-rpc/src/server/jsonrpc.rs @@ -0,0 +1,50 @@ +//! Define JSON-RPC request and response structures + +/// The JSON-RPC request +#[derive(Debug, Clone, serde::Deserialize)] +pub struct JsonRpcRequest { + jsonrpc: String, + id: String, + method: String, + params: Vec, +} + +impl JsonRpcRequest { + /// Get the method name from the request + pub fn method(&self) -> &str { + &self.method + } + + /// Get the parameters from the request + pub fn params(&self) -> &[serde_json::Value] { + &self.params + } + + /// Get the request ID + pub fn id(&self) -> &str { + &self.id + } +} + +/// The JSON-RPC response +#[derive(serde::Serialize)] +pub struct JsonRpcResponse { + //jsonrpc: String, + result: serde_json::Value, + id: String, + //error: Option, +} + +impl JsonRpcResponse { + /// + pub fn new(result: serde_json::Value, id: String) -> Self { + Self { result, id } + } +} + +/// The JSON-RPC error +#[derive(serde::Serialize)] +pub struct JsonRpcError { + pub code: i64, + pub message: String, +}