From 760f189ba41424b05bc598e5218f54e404afeaa6 Mon Sep 17 00:00:00 2001 From: Liam Vovk <63673978+vovkman@users.noreply.github.com> Date: Mon, 4 Dec 2023 16:23:39 -0500 Subject: [PATCH] client: include request in initial subscribe to gRPC endpoint to fix LB connection delay (#252) * send request with subscribe * remove unused import --- CHANGELOG.md | 2 ++ examples/rust/src/bin/client.rs | 6 +----- yellowstone-grpc-client/src/lib.rs | 25 ++++++++++++++++++++----- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f2cc8f..8d4dde9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ The minor version will be incremented upon a breaking change and the patch versi ### Fixes +- client: include request in initial subscribe to gRPC endpoint to fix LB connection delay ([#252](https://github.com/rpcpool/yellowstone-grpc/pull/252)) + ### Features ### Breaking diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index ba1f4eb..39cac8f 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -524,11 +524,7 @@ async fn geyser_subscribe( request: SubscribeRequest, resub: usize, ) -> anyhow::Result<()> { - let (mut subscribe_tx, mut stream) = client.subscribe().await?; - subscribe_tx - .send(request) - .await - .map_err(GeyserGrpcClientError::SubscribeSendError)?; + let (mut subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?; info!("stream opened"); let mut counter = 0; diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index 99e71f1..8389133 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -173,7 +173,23 @@ impl GeyserGrpcClient { impl Sink, impl Stream>, )> { - let (subscribe_tx, subscribe_rx) = mpsc::unbounded(); + self.subscribe_with_request(None).await + } + + pub async fn subscribe_with_request( + &mut self, + request: Option, + ) -> GeyserGrpcClientResult<( + impl Sink, + impl Stream>, + )> { + let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded(); + if let Some(request) = request { + subscribe_tx + .send(request) + .await + .map_err(GeyserGrpcClientError::SubscribeSendError)?; + } let response: Response> = self.geyser.subscribe(subscribe_rx).await?; Ok((subscribe_tx, response.into_inner())) @@ -206,14 +222,13 @@ impl GeyserGrpcClient { .await } - #[allow(clippy::too_many_arguments)] pub async fn subscribe_once2( &mut self, request: SubscribeRequest, ) -> GeyserGrpcClientResult>> { - let (mut subscribe_tx, response) = self.subscribe().await?; - subscribe_tx.send(request).await?; - Ok(response) + self.subscribe_with_request(Some(request)) + .await + .map(|(_sink, stream)| stream) } pub async fn ping(&mut self, count: i32) -> GeyserGrpcClientResult {