client: include request in initial subscribe to gRPC endpoint to fix LB connection delay (#252)

* send request with subscribe

* remove unused import
This commit is contained in:
Liam Vovk 2023-12-04 16:23:39 -05:00 committed by Kirill Fomichev
parent 6590208d4d
commit 760f189ba4
No known key found for this signature in database
GPG Key ID: 6AA0144D5E0C0C0A
3 changed files with 23 additions and 10 deletions

View File

@ -12,6 +12,8 @@ The minor version will be incremented upon a breaking change and the patch versi
### Fixes
- client: include request in initial subscribe to gRPC endpoint to fix LB connection delay ([#252](https://github.com/rpcpool/yellowstone-grpc/pull/252))
### Features
### Breaking

View File

@ -524,11 +524,7 @@ async fn geyser_subscribe(
request: SubscribeRequest,
resub: usize,
) -> anyhow::Result<()> {
let (mut subscribe_tx, mut stream) = client.subscribe().await?;
subscribe_tx
.send(request)
.await
.map_err(GeyserGrpcClientError::SubscribeSendError)?;
let (mut subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?;
info!("stream opened");
let mut counter = 0;

View File

@ -173,7 +173,23 @@ impl<F: Interceptor> GeyserGrpcClient<F> {
impl Sink<SubscribeRequest, Error = mpsc::SendError>,
impl Stream<Item = Result<SubscribeUpdate, Status>>,
)> {
let (subscribe_tx, subscribe_rx) = mpsc::unbounded();
self.subscribe_with_request(None).await
}
pub async fn subscribe_with_request(
&mut self,
request: Option<SubscribeRequest>,
) -> GeyserGrpcClientResult<(
impl Sink<SubscribeRequest, Error = mpsc::SendError>,
impl Stream<Item = Result<SubscribeUpdate, Status>>,
)> {
let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded();
if let Some(request) = request {
subscribe_tx
.send(request)
.await
.map_err(GeyserGrpcClientError::SubscribeSendError)?;
}
let response: Response<Streaming<SubscribeUpdate>> =
self.geyser.subscribe(subscribe_rx).await?;
Ok((subscribe_tx, response.into_inner()))
@ -206,14 +222,13 @@ impl<F: Interceptor> GeyserGrpcClient<F> {
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn subscribe_once2(
&mut self,
request: SubscribeRequest,
) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
let (mut subscribe_tx, response) = self.subscribe().await?;
subscribe_tx.send(request).await?;
Ok(response)
self.subscribe_with_request(Some(request))
.await
.map(|(_sink, stream)| stream)
}
pub async fn ping(&mut self, count: i32) -> GeyserGrpcClientResult<PongResponse> {