diff --git a/Cargo.lock b/Cargo.lock index 4b18ac309..9117830a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6035,6 +6035,7 @@ dependencies = [ name = "zebra-rpc" version = "1.0.0-beta.38" dependencies = [ + "bytes", "chrono", "futures", "hex", diff --git a/zebra-rpc/Cargo.toml b/zebra-rpc/Cargo.toml index ea3adeb65..53eab1ced 100644 --- a/zebra-rpc/Cargo.toml +++ b/zebra-rpc/Cargo.toml @@ -27,6 +27,7 @@ indexer-rpcs = [ "tonic-reflection", "prost", "tokio-stream", + "bytes" ] # Production features that activate extra dependencies, or extra features in dependencies @@ -81,6 +82,7 @@ tonic = { version = "0.12.1", optional = true } tonic-reflection = { version = "0.12.1", optional = true } prost = { version = "0.13.1", optional = true } tokio-stream = { version = "0.1.15", optional = true } +bytes = { version = "1.6.1", optional = true } tracing = "0.1.39" diff --git a/zebra-rpc/build.rs b/zebra-rpc/build.rs index 75db7fd2a..e504d607b 100644 --- a/zebra-rpc/build.rs +++ b/zebra-rpc/build.rs @@ -5,6 +5,9 @@ fn main() -> Result<(), Box> { { use std::{env, path::PathBuf}; let out_dir = env::var("OUT_DIR").map(PathBuf::from); + + build_json_codec_service(); + tonic_build::configure() .type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]") .file_descriptor_set_path(out_dir.unwrap().join("indexer_descriptor.bin")) @@ -13,3 +16,22 @@ fn main() -> Result<(), Box> { Ok(()) } + +fn build_json_codec_service() { + let indexer_service = tonic_build::manual::Service::builder() + .name("Indexer") + .package("json.indexer") + .method( + tonic_build::manual::Method::builder() + .name("chain_tip_change") + .route_name("ChainTipChange") + .input_type("crate::indexer::types::Empty") + .output_type("crate::indexer::types::Empty") + .codec_path("crate::indexer::codec::JsonCodec") + .server_streaming() + .build(), + ) + .build(); + + tonic_build::manual::Builder::new().compile(&[indexer_service]); +} diff --git a/zebra-rpc/proto/indexer.proto b/zebra-rpc/proto/indexer.proto index 6ce5911ba..41fa786a5 100644 --- a/zebra-rpc/proto/indexer.proto +++ b/zebra-rpc/proto/indexer.proto @@ -7,4 +7,5 @@ message Empty {}; service Indexer { // Notifies listeners of chain tip changes rpc ChainTipChange(Empty) returns (stream Empty); -} \ No newline at end of file +} + diff --git a/zebra-rpc/src/indexer.rs b/zebra-rpc/src/indexer.rs index 9f4c69312..6a0bed64f 100644 --- a/zebra-rpc/src/indexer.rs +++ b/zebra-rpc/src/indexer.rs @@ -3,11 +3,62 @@ #[cfg(test)] mod tests; +pub mod codec; pub mod methods; pub mod server; +pub mod types; // The generated indexer proto tonic::include_proto!("zebra.indexer.rpc"); pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("indexer_descriptor"); + +pub mod json { + include!(concat!(env!("OUT_DIR"), "/json.indexer.Indexer.rs")); +} + +use std::pin::Pin; + +use futures::Stream; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Response, Status}; + +struct Indexer; + +// TODO: Figure out how to base which service a request is routed to by checking its content-type. +// See https://github.com/hyperium/tonic/issues/851 +// +// Or just set the codec based on a feature flag, comment on tonic#851, and open an issue for picking +// a codec based on the Content-Type header later (behind another feature flag?). +// Implement `From` for all of the types defined in Rust for all of the generated protobuf types? + +#[tonic::async_trait] +impl json::indexer_server::Indexer for Indexer { + type ChainTipChangeStream = Pin> + Send>>; + + async fn chain_tip_change( + &self, + _: tonic::Request, + ) -> Result, Status> { + let (response_sender, response_receiver) = tokio::sync::mpsc::channel(133); + let response_stream = ReceiverStream::new(response_receiver); + + tokio::spawn(async move { + // Notify the client of chain tip changes until the channel is closed + for _ in 0..100 { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let tx = response_sender.clone(); + tokio::spawn(async move { tx.send(Ok(types::Empty {})).await }); + } + + let _ = response_sender + .send(Err(Status::unavailable( + "chain_tip_change channel has closed", + ))) + .await; + }); + + Ok(Response::new(Box::pin(response_stream))) + } +} diff --git a/zebra-rpc/src/indexer/codec.rs b/zebra-rpc/src/indexer/codec.rs new file mode 100644 index 000000000..186a51f88 --- /dev/null +++ b/zebra-rpc/src/indexer/codec.rs @@ -0,0 +1,67 @@ +//! This module defines the JsonCodec that is used by the indexer RPC service. + +use bytes::{Buf, BufMut}; +use std::marker::PhantomData; +use tonic::{ + codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}, + Status, +}; + +#[derive(Debug)] +pub struct JsonEncoder(PhantomData); + +impl Encoder for JsonEncoder { + type Item = T; + type Error = Status; + + fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { + serde_json::to_writer(buf.writer(), &item).map_err(|e| Status::internal(e.to_string())) + } +} + +#[derive(Debug)] +pub struct JsonDecoder(PhantomData); + +impl Decoder for JsonDecoder { + type Item = U; + type Error = Status; + + fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result, Self::Error> { + if !buf.has_remaining() { + return Ok(None); + } + + let item: Self::Item = + serde_json::from_reader(buf.reader()).map_err(|e| Status::internal(e.to_string()))?; + Ok(Some(item)) + } +} + +/// A [`Codec`] that implements `application/grpc+json` via the serde library. +#[derive(Debug, Clone)] +pub struct JsonCodec(PhantomData<(T, U)>); + +impl Default for JsonCodec { + fn default() -> Self { + Self(PhantomData) + } +} + +impl Codec for JsonCodec +where + T: serde::Serialize + Send + 'static, + U: serde::de::DeserializeOwned + Send + 'static, +{ + type Encode = T; + type Decode = U; + type Encoder = JsonEncoder; + type Decoder = JsonDecoder; + + fn encoder(&mut self) -> Self::Encoder { + JsonEncoder(PhantomData) + } + + fn decoder(&mut self) -> Self::Decoder { + JsonDecoder(PhantomData) + } +} diff --git a/zebra-rpc/src/indexer/types.rs b/zebra-rpc/src/indexer/types.rs new file mode 100644 index 000000000..1b32e053f --- /dev/null +++ b/zebra-rpc/src/indexer/types.rs @@ -0,0 +1,7 @@ +//! Indexer types. + +use serde::{Deserialize, Serialize}; + +/// Used for indexer RPC methods that take no arguments and/or respond without data. +#[derive(Debug, Deserialize, Serialize)] +pub struct Empty;