diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f2cc8f..8d4dde9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ The minor version will be incremented upon a breaking change and the patch versi ### Fixes +- client: include request in initial subscribe to gRPC endpoint to fix LB connection delay ([#252](https://github.com/rpcpool/yellowstone-grpc/pull/252)) + ### Features ### Breaking diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index ba1f4eb..39cac8f 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -524,11 +524,7 @@ async fn geyser_subscribe( request: SubscribeRequest, resub: usize, ) -> anyhow::Result<()> { - let (mut subscribe_tx, mut stream) = client.subscribe().await?; - subscribe_tx - .send(request) - .await - .map_err(GeyserGrpcClientError::SubscribeSendError)?; + let (mut subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?; info!("stream opened"); let mut counter = 0; diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index 99e71f1..8389133 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -173,7 +173,23 @@ impl GeyserGrpcClient { impl Sink, impl Stream>, )> { - let (subscribe_tx, subscribe_rx) = mpsc::unbounded(); + self.subscribe_with_request(None).await + } + + pub async fn subscribe_with_request( + &mut self, + request: Option, + ) -> GeyserGrpcClientResult<( + impl Sink, + impl Stream>, + )> { + let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded(); + if let Some(request) = request { + subscribe_tx + .send(request) + .await + .map_err(GeyserGrpcClientError::SubscribeSendError)?; + } let response: Response> = self.geyser.subscribe(subscribe_rx).await?; Ok((subscribe_tx, response.into_inner())) @@ -206,14 +222,13 @@ impl GeyserGrpcClient { .await } - #[allow(clippy::too_many_arguments)] pub async fn subscribe_once2( &mut self, request: SubscribeRequest, ) -> GeyserGrpcClientResult>> { - let (mut subscribe_tx, response) = self.subscribe().await?; - subscribe_tx.send(request).await?; - Ok(response) + self.subscribe_with_request(Some(request)) + .await + .map(|(_sink, stream)| stream) } pub async fn ping(&mut self, count: i32) -> GeyserGrpcClientResult {