geyser: do not disconnect if subscribe stream is closed (#140)

This commit is contained in:
Kirill Fomichev 2023-06-06 13:54:34 -04:00
parent 9e0775068d
commit 092a94fccf
No known key found for this signature in database
GPG Key ID: 6AA0144D5E0C0C0A
4 changed files with 11 additions and 6 deletions

2
Cargo.lock generated
View File

@ -4290,7 +4290,7 @@ dependencies = [
[[package]] [[package]]
name = "yellowstone-grpc-geyser" name = "yellowstone-grpc-geyser"
version = "0.8.0+solana.1.15.2" version = "0.8.1+solana.1.15.2"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64 0.21.0", "base64 0.21.0",

View File

@ -2,7 +2,7 @@
members = [ members = [
"examples/rust", # 1.2.0+solana.1.15.2 "examples/rust", # 1.2.0+solana.1.15.2
"yellowstone-grpc-client", # 1.2.0+solana.1.15.2 "yellowstone-grpc-client", # 1.2.0+solana.1.15.2
"yellowstone-grpc-geyser", # 0.8.0+solana.1.15.2 "yellowstone-grpc-geyser", # 0.8.1+solana.1.15.2
"yellowstone-grpc-proto", # 1.2.0+solana.1.15.2 "yellowstone-grpc-proto", # 1.2.0+solana.1.15.2
] ]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "yellowstone-grpc-geyser" name = "yellowstone-grpc-geyser"
version = "0.8.0+solana.1.15.2" version = "0.8.1+solana.1.15.2"
authors = ["Triton One"] authors = ["Triton One"]
edition = "2021" edition = "2021"
description = "Yellowstone gRPC Geyser Plugin" description = "Yellowstone gRPC Geyser Plugin"

View File

@ -609,6 +609,7 @@ impl GrpcService {
} }
ClientMessage::Drop { id } => { ClientMessage::Drop { id } => {
if clients.remove(&id).is_some() { if clients.remove(&id).is_some() {
info!("{id}, client removed");
CONNECTIONS_TOTAL.dec(); CONNECTIONS_TOTAL.dec();
} }
} }
@ -692,11 +693,15 @@ impl Geyser for GrpcService {
.await; .await;
} }
} }
Ok(None) => break, Ok(None) => {
Err(_error) => break, break;
}
Err(_error) => {
let _ = new_clients_tx.send(ClientMessage::Drop { id });
break;
}
} }
} }
let _ = new_clients_tx.send(ClientMessage::Drop { id });
}); });
Ok(Response::new(ReceiverStream::new(stream_rx))) Ok(Response::new(ReceiverStream::new(stream_rx)))