Early exploratory draft

This commit is contained in:
Arya 2024-08-01 00:33:10 -04:00
parent 45261a26eb
commit 7a399735f2
7 changed files with 152 additions and 1 deletions

View File

@ -6035,6 +6035,7 @@ dependencies = [
name = "zebra-rpc"
version = "1.0.0-beta.38"
dependencies = [
"bytes",
"chrono",
"futures",
"hex",

View File

@ -27,6 +27,7 @@ indexer-rpcs = [
"tonic-reflection",
"prost",
"tokio-stream",
"bytes"
]
# Production features that activate extra dependencies, or extra features in dependencies
@ -81,6 +82,7 @@ tonic = { version = "0.12.1", optional = true }
tonic-reflection = { version = "0.12.1", optional = true }
prost = { version = "0.13.1", optional = true }
tokio-stream = { version = "0.1.15", optional = true }
bytes = { version = "1.6.1", optional = true }
tracing = "0.1.39"

View File

@ -5,6 +5,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
{
use std::{env, path::PathBuf};
let out_dir = env::var("OUT_DIR").map(PathBuf::from);
build_json_codec_service();
tonic_build::configure()
.type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]")
.file_descriptor_set_path(out_dir.unwrap().join("indexer_descriptor.bin"))
@ -13,3 +16,22 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
fn build_json_codec_service() {
let indexer_service = tonic_build::manual::Service::builder()
.name("Indexer")
.package("json.indexer")
.method(
tonic_build::manual::Method::builder()
.name("chain_tip_change")
.route_name("ChainTipChange")
.input_type("crate::indexer::types::Empty")
.output_type("crate::indexer::types::Empty")
.codec_path("crate::indexer::codec::JsonCodec")
.server_streaming()
.build(),
)
.build();
tonic_build::manual::Builder::new().compile(&[indexer_service]);
}

View File

@ -7,4 +7,5 @@ message Empty {};
service Indexer {
// Notifies listeners of chain tip changes
rpc ChainTipChange(Empty) returns (stream Empty);
}
}

View File

@ -3,11 +3,62 @@
#[cfg(test)]
mod tests;
pub mod codec;
pub mod methods;
pub mod server;
pub mod types;
// The generated indexer proto
tonic::include_proto!("zebra.indexer.rpc");
pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("indexer_descriptor");
pub mod json {
include!(concat!(env!("OUT_DIR"), "/json.indexer.Indexer.rs"));
}
use std::pin::Pin;
use futures::Stream;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Response, Status};
struct Indexer;
// TODO: Figure out how to base which service a request is routed to by checking its content-type.
// See https://github.com/hyperium/tonic/issues/851
//
// Or just set the codec based on a feature flag, comment on tonic#851, and open an issue for picking
// a codec based on the Content-Type header later (behind another feature flag?).
// Implement `From` for all of the types defined in Rust for all of the generated protobuf types?
#[tonic::async_trait]
impl json::indexer_server::Indexer for Indexer {
type ChainTipChangeStream = Pin<Box<dyn Stream<Item = Result<types::Empty, Status>> + Send>>;
async fn chain_tip_change(
&self,
_: tonic::Request<types::Empty>,
) -> Result<Response<Self::ChainTipChangeStream>, Status> {
let (response_sender, response_receiver) = tokio::sync::mpsc::channel(133);
let response_stream = ReceiverStream::new(response_receiver);
tokio::spawn(async move {
// Notify the client of chain tip changes until the channel is closed
for _ in 0..100 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let tx = response_sender.clone();
tokio::spawn(async move { tx.send(Ok(types::Empty {})).await });
}
let _ = response_sender
.send(Err(Status::unavailable(
"chain_tip_change channel has closed",
)))
.await;
});
Ok(Response::new(Box::pin(response_stream)))
}
}

View File

@ -0,0 +1,67 @@
//! This module defines the JsonCodec that is used by the indexer RPC service.
use bytes::{Buf, BufMut};
use std::marker::PhantomData;
use tonic::{
codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder},
Status,
};
#[derive(Debug)]
pub struct JsonEncoder<T>(PhantomData<T>);
impl<T: serde::Serialize> Encoder for JsonEncoder<T> {
type Item = T;
type Error = Status;
fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
serde_json::to_writer(buf.writer(), &item).map_err(|e| Status::internal(e.to_string()))
}
}
#[derive(Debug)]
pub struct JsonDecoder<U>(PhantomData<U>);
impl<U: serde::de::DeserializeOwned> Decoder for JsonDecoder<U> {
type Item = U;
type Error = Status;
fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
if !buf.has_remaining() {
return Ok(None);
}
let item: Self::Item =
serde_json::from_reader(buf.reader()).map_err(|e| Status::internal(e.to_string()))?;
Ok(Some(item))
}
}
/// A [`Codec`] that implements `application/grpc+json` via the serde library.
#[derive(Debug, Clone)]
pub struct JsonCodec<T, U>(PhantomData<(T, U)>);
impl<T, U> Default for JsonCodec<T, U> {
fn default() -> Self {
Self(PhantomData)
}
}
impl<T, U> Codec for JsonCodec<T, U>
where
T: serde::Serialize + Send + 'static,
U: serde::de::DeserializeOwned + Send + 'static,
{
type Encode = T;
type Decode = U;
type Encoder = JsonEncoder<T>;
type Decoder = JsonDecoder<U>;
fn encoder(&mut self) -> Self::Encoder {
JsonEncoder(PhantomData)
}
fn decoder(&mut self) -> Self::Decoder {
JsonDecoder(PhantomData)
}
}

View File

@ -0,0 +1,7 @@
//! Indexer types.
use serde::{Deserialize, Serialize};
/// Used for indexer RPC methods that take no arguments and/or respond without data.
#[derive(Debug, Deserialize, Serialize)]
pub struct Empty;