Tonic retry client (#53)

* Feat: client with default retry policy

Once the server connection is broken, client will reconnect to the server, then retry to send payload SubscribeRequest to server

* Refactor: backoff as default crates, simplify cargo.toml

---------

Co-authored-by: Nikita Baksalyar <nikita.baksalyar@gmail.com>
This commit is contained in:
cairo 2023-02-17 14:36:50 +08:00 committed by GitHub
parent 00ba4e96da
commit ad73f64a8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 250 additions and 41 deletions

1
.gitignore vendored
View File

@ -4,3 +4,4 @@
config-test.json
*.sw?
.idea/

16
Cargo.lock generated
View File

@ -240,6 +240,20 @@ dependencies = [
"tower-service",
]
[[package]]
name = "backoff"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1"
dependencies = [
"futures-core",
"getrandom 0.2.8",
"instant",
"pin-project-lite",
"rand 0.8.5",
"tokio",
]
[[package]]
name = "base64"
version = "0.12.3"
@ -2624,6 +2638,7 @@ name = "solana-geyser-grpc"
version = "0.5.2+solana.1.14.15"
dependencies = [
"anyhow",
"backoff",
"bincode",
"cargo-lock",
"clap",
@ -2641,6 +2656,7 @@ dependencies = [
"solana-logger",
"solana-sdk",
"solana-transaction-status",
"thiserror",
"tokio",
"tokio-stream",
"tonic",

View File

@ -26,6 +26,8 @@ solana-transaction-status = "=1.14.15"
tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time"] }
tokio-stream = "0.1.11"
tonic = { version = "0.8.2", features = ["gzip", "tls", "tls-roots"] }
backoff = { version = "0.4.0", features = ["tokio"] }
thiserror = "1.0"
[build-dependencies]
anyhow = "1.0.62"

View File

@ -1,16 +1,20 @@
use {
backoff::{future::retry, ExponentialBackoff},
clap::Parser,
futures::stream::{once, StreamExt},
solana_geyser_grpc::proto::{
geyser_client::GeyserClient, SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdate,
},
std::collections::HashMap,
thiserror::Error,
tonic::{
metadata::MetadataValue,
transport::{channel::ClientTlsConfig, Endpoint, Uri},
Request,
codec::Streaming,
metadata::{Ascii, MetadataValue},
service::interceptor::InterceptedService,
transport::{channel::ClientTlsConfig, Channel, Endpoint},
Request, Response, Status,
},
};
@ -73,8 +77,158 @@ struct Args {
blocks_meta: bool,
}
const XTOKEN_LENGTH: usize = 28;
#[derive(Debug, Error)]
pub enum Error {
#[error("XToken: {0}")]
XToken(String),
#[error("Invalid URI {0}")]
InvalidUri(String),
#[error("RetrySubscribe")]
RetrySubscribe(anyhow::Error),
}
#[derive(Debug)]
struct RetryChannel {
x_token: Option<MetadataValue<Ascii>>,
channel: Channel,
}
impl RetryChannel {
/// Establish a channel to tonic endpoint
/// The channel does not attempt to connect to the endpoint until first use
pub fn new(endpoint_str: String, x_token_str: Option<String>) -> Result<Self, Error> {
let endpoint: Endpoint;
let x_token: Option<MetadataValue<Ascii>>;
// the client should fail immediately if the x-token is invalid
match x_token_str {
// x-token length is 28
Some(token_str) if token_str.len() == XTOKEN_LENGTH => {
match token_str.parse::<MetadataValue<Ascii>>() {
Ok(metadata) => x_token = Some(metadata),
Err(_) => return Err(Error::XToken(token_str)),
}
}
Some(token_str) => return Err(Error::XToken(token_str)),
None => return Err(Error::XToken("".to_owned())),
}
let res = Channel::from_shared(endpoint_str.clone());
match res {
Err(e) => {
println!("{}", e);
return Err(Error::InvalidUri(endpoint_str));
}
Ok(_endpoint) => {
if _endpoint.uri().scheme_str() == Some("https") {
match _endpoint.tls_config(ClientTlsConfig::new()) {
Err(e) => {
println!("{}", e);
return Err(Error::InvalidUri(endpoint_str));
}
Ok(e) => endpoint = e,
}
} else {
endpoint = _endpoint;
}
}
}
let channel = endpoint.connect_lazy();
Ok(Self { x_token, channel })
}
/// Create a new GeyserClient client with Auth interceptor
/// Clients require `&mut self`, due to `Tonic::transport::Channel` limitations, however
/// creating new clients is cheap and thus can be used as a work around for ease of use.
pub fn client(&self) -> RetryClient<impl FnMut(Request<()>) -> InterceptedRequestResult + '_> {
let client = GeyserClient::with_interceptor(
self.channel.clone(),
move |mut req: tonic::Request<()>| {
if let Some(x_token) = self.x_token.clone() {
req.metadata_mut().insert("x-token", x_token);
}
Ok(req)
},
);
RetryClient { client }
}
pub async fn subscribe_retry(
&self,
slots: &SlotsFilterMap,
accounts: &AccountFilterMap,
transactions: &TransactionsFilterMap,
blocks: &BlocksFilterMap,
blocks_meta: &BlocksMetaFilterMap,
) -> anyhow::Result<()> {
// The default exponential backoff strategy intervals:
// [500ms, 750ms, 1.125s, 1.6875s, 2.53125s, 3.796875s, 5.6953125s,
// 8.5s, 12.8s, 19.2s, 28.8s, 43.2s, 64.8s, 97s, ... ]
retry(ExponentialBackoff::default(), move || async {
println!("Retry to connect to the server");
let mut client = self.client();
client
.subscribe(slots, accounts, transactions, blocks, blocks_meta)
.await?;
Ok(())
})
.await
}
}
type InterceptedRequestResult = std::result::Result<Request<()>, Status>;
pub struct RetryClient<F: FnMut(Request<()>) -> InterceptedRequestResult> {
client: GeyserClient<InterceptedService<Channel, F>>,
}
type SlotsFilterMap = HashMap<String, SubscribeRequestFilterSlots>;
type AccountFilterMap = HashMap<String, SubscribeRequestFilterAccounts>;
type TransactionsFilterMap = HashMap<String, SubscribeRequestFilterTransactions>;
type BlocksFilterMap = HashMap<String, SubscribeRequestFilterBlocks>;
type BlocksMetaFilterMap = HashMap<String, SubscribeRequestFilterBlocksMeta>;
impl<F: FnMut(Request<()>) -> InterceptedRequestResult> RetryClient<F> {
async fn subscribe(
&mut self,
slots: &SlotsFilterMap,
accounts: &AccountFilterMap,
transactions: &TransactionsFilterMap,
blocks: &BlocksFilterMap,
blocks_meta: &BlocksMetaFilterMap,
) -> anyhow::Result<()> {
let request = SubscribeRequest {
slots: slots.clone(),
accounts: accounts.clone(),
transactions: transactions.clone(),
blocks: blocks.clone(),
blocks_meta: blocks_meta.clone(),
};
println!("Going to send request: {:?}", request);
let response: Response<Streaming<SubscribeUpdate>> =
self.client.subscribe(once(async move { request })).await?;
let mut stream: Streaming<SubscribeUpdate> = response.into_inner();
println!("stream opened");
while let Some(message) = stream.next().await {
match message {
Ok(message) => println!("new message: {:?}", message),
Err(error) => eprintln!("error: {:?}", error),
}
}
println!("stream closed");
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
async fn main() -> Result<(), Error> {
let args = Args::parse();
let mut accounts = HashMap::new();
@ -88,12 +242,12 @@ async fn main() -> anyhow::Result<()> {
);
}
let mut slots = HashMap::new();
let mut slots: SlotsFilterMap = HashMap::new();
if args.slots {
slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});
}
let mut transactions = HashMap::new();
let mut transactions: TransactionsFilterMap = HashMap::new();
if args.transactions {
transactions.insert(
"client".to_string(),
@ -107,51 +261,87 @@ async fn main() -> anyhow::Result<()> {
);
}
let mut blocks = HashMap::new();
let mut blocks: BlocksFilterMap = HashMap::new();
if args.blocks {
blocks.insert("client".to_owned(), SubscribeRequestFilterBlocks {});
}
let mut blocks_meta = HashMap::new();
let mut blocks_meta: BlocksMetaFilterMap = HashMap::new();
if args.blocks_meta {
blocks_meta.insert("client".to_owned(), SubscribeRequestFilterBlocksMeta {});
}
let is_https = args.endpoint.parse::<Uri>()?.scheme_str() == Some("https");
let mut endpoint = Endpoint::from_shared(args.endpoint.clone())?;
if is_https {
endpoint = endpoint.tls_config(ClientTlsConfig::new())?;
// Client with retry policy
let res: Result<RetryChannel, Error> =
RetryChannel::new(args.endpoint.clone(), args.x_token.clone());
if let Err(e) = res {
eprintln!("Error: {}", e);
return Err(e);
}
// let mut client = GeyserClient::connect(endpoint).await?;
let token: Option<MetadataValue<_>> = args.x_token.map(|token| token.parse()).transpose()?;
let conn = endpoint.connect().await?;
let mut client = GeyserClient::with_interceptor(conn, move |mut req: Request<()>| {
if let Some(token) = token.clone() {
req.metadata_mut().insert("x-token", token);
}
Ok(req)
});
let request = SubscribeRequest {
slots,
accounts,
transactions,
blocks,
blocks_meta,
};
println!("Going to send request: {:?}", request);
let response = client.subscribe(once(async move { request })).await?;
let mut stream = response.into_inner();
println!("stream opened");
while let Some(message) = stream.next().await {
match message {
Ok(message) => println!("new message: {:?}", message),
Err(error) => eprintln!("error: {:?}", error),
}
let res: anyhow::Result<()> = res
.unwrap()
.subscribe_retry(&slots, &accounts, &transactions, &blocks, &blocks_meta)
.await;
if let Err(e) = res {
println!("Error: {}", e);
return Err(Error::RetrySubscribe(e));
}
println!("stream closed");
Ok(())
}
#[cfg(test)]
mod tests {
use super::{Error, RetryChannel};
#[tokio::test]
async fn test_channel_https_success() {
let endpoint = "https://ams17.rpcpool.com:443".to_owned();
let x_token = "1000000000000000000000000007".to_owned();
let res: Result<RetryChannel, Error> = RetryChannel::new(endpoint, Some(x_token));
assert!(res.is_ok())
}
#[tokio::test]
async fn test_channel_http_success() {
let endpoint = "http://127.0.0.1:10000".to_owned();
let x_token = "1234567891012141618202224268".to_owned();
let res: Result<RetryChannel, Error> = RetryChannel::new(endpoint, Some(x_token));
assert!(res.is_ok())
}
#[tokio::test]
async fn test_channel_invalid_token_some() {
let endpoint = "http://127.0.0.1:10000".to_owned();
let x_token = "123".to_owned();
let res: Result<RetryChannel, Error> = RetryChannel::new(endpoint, Some(x_token.clone()));
assert!(res.is_err());
if let Err(Error::XToken(_)) = res {
assert!(true);
} else {
assert!(false);
}
}
#[tokio::test]
async fn test_channel_invalid_token_none() {
let endpoint = "http://127.0.0.1:10000".to_owned();
let x_token = None;
assert!(matches!(
RetryChannel::new(endpoint, x_token),
Err(Error::XToken(_))
));
}
#[tokio::test]
async fn test_channel_invalid_uri() {
let endpoint = "sites/files/images/picture.png".to_owned();
let x_token = "1234567891012141618202224268".to_owned();
assert!(matches!(
RetryChannel::new(endpoint, Some(x_token)),
Err(Error::InvalidUri(_))
));
}
}