tools: remove `ordering_key` from `PubsubMessage` (#257)

This commit is contained in:
Kirill Fomichev 2023-12-06 22:14:31 -05:00 committed by GitHub
parent ceac499c8f
commit 96477484b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 24 additions and 26 deletions

View File

@ -12,12 +12,19 @@ The minor version will be incremented upon a breaking change and the patch versi
### Fixes ### Fixes
- client: include request in initial subscribe to gRPC endpoint to fix LB connection delay ([#252](https://github.com/rpcpool/yellowstone-grpc/pull/252))
### Features ### Features
### Breaking ### Breaking
## 2023-12-06
- yellowstone-grpc-tools-1.0.0-rc.8+solana.1.17.6
### Fixes
- client: include request in initial subscribe to gRPC endpoint to fix LB connection delay ([#252](https://github.com/rpcpool/yellowstone-grpc/pull/252))
- tools: remove `ordering_key` from `PubsubMessage` ([#257](https://github.com/rpcpool/yellowstone-grpc/pull/257))
## 2023-11-24 ## 2023-11-24
- yellowstone-grpc-geyser-1.11.1+solana.1.17.6 - yellowstone-grpc-geyser-1.11.1+solana.1.17.6

2
Cargo.lock generated
View File

@ -5041,7 +5041,7 @@ dependencies = [
[[package]] [[package]]
name = "yellowstone-grpc-tools" name = "yellowstone-grpc-tools"
version = "1.0.0-rc.7+solana.1.17.6" version = "1.0.0-rc.8+solana.1.17.6"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",

View File

@ -5,7 +5,7 @@ members = [
"yellowstone-grpc-client", # 1.12.0+solana.1.17.6 "yellowstone-grpc-client", # 1.12.0+solana.1.17.6
"yellowstone-grpc-geyser", # 1.11.1+solana.1.17.6 "yellowstone-grpc-geyser", # 1.11.1+solana.1.17.6
"yellowstone-grpc-proto", # 1.11.0+solana.1.17.6 "yellowstone-grpc-proto", # 1.11.0+solana.1.17.6
"yellowstone-grpc-tools", # 1.0.0-rc.7+solana.1.17.6 "yellowstone-grpc-tools", # 1.0.0-rc.8+solana.1.17.6
] ]
[workspace.package] [workspace.package]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "yellowstone-grpc-tools" name = "yellowstone-grpc-tools"
version = "1.0.0-rc.7+solana.1.17.6" version = "1.0.0-rc.8+solana.1.17.6"
authors = { workspace = true } authors = { workspace = true }
edition = { workspace = true } edition = { workspace = true }
description = "Yellowstone gRPC Tools" description = "Yellowstone gRPC Tools"

View File

@ -1,10 +1,16 @@
{ {
"prometheus": "127.0.0.1:8873", "prometheus": "127.0.0.1:8873",
// "with_auth": true, // Auth with DefaultTokenSourceProvider
// "with_credentials": "path_to_json_file_with_creds", // or use envs: GOOGLE_APPLICATION_CREDENTIALS / GOOGLE_APPLICATION_CREDENTIALS_JSON
"grpc2pubsub": { "grpc2pubsub": {
"endpoint": "http://127.0.0.1:10000", "endpoint": "http://127.0.0.1:10000",
"x_token": null, "x_token": null,
"request": { "request": {
"slots": ["client"], "slots": {
"client": {
"filter_by_commitment": null
}
},
"blocks": { "blocks": {
"client": { "client": {
"account_include": [], "account_include": [],

View File

@ -119,8 +119,6 @@ impl ArgsAction {
// Receive-send loop // Receive-send loop
let mut send_tasks = JoinSet::new(); let mut send_tasks = JoinSet::new();
let mut msg_slot = 0;
let mut msg_id = 0;
'outer: loop { 'outer: loop {
let sleep = sleep(Duration::from_millis(config.bulk_max_wait_ms as u64)); let sleep = sleep(Duration::from_millis(config.bulk_max_wait_ms as u64));
tokio::pin!(sleep); tokio::pin!(sleep);
@ -154,25 +152,9 @@ impl ArgsAction {
Some(value) => value, Some(value) => value,
None => unreachable!("Expect valid message"), None => unreachable!("Expect valid message"),
}; };
let slot = match message {
UpdateOneof::Account(msg) => msg.slot,
UpdateOneof::Slot(msg) => msg.slot,
UpdateOneof::Transaction(msg) => msg.slot,
UpdateOneof::Block(msg) => msg.slot,
UpdateOneof::Ping(_) => continue,
UpdateOneof::Pong(_) => continue,
UpdateOneof::BlockMeta(msg) => msg.slot,
UpdateOneof::Entry(msg) => msg.slot,
};
if msg_slot != slot {
msg_slot = slot;
msg_id = 0;
}
msg_id += 1;
messages.push(PubsubMessage { messages.push(PubsubMessage {
data: payload, data: payload,
ordering_key: format!("{msg_slot}-{msg_id}"),
..Default::default() ..Default::default()
}); });
prom_kind.push(GprcMessageKind::from(message)); prom_kind.push(GprcMessageKind::from(message));

View File

@ -12,14 +12,17 @@ use {
#[serde(default)] #[serde(default)]
pub struct Config { pub struct Config {
pub prometheus: Option<SocketAddr>, pub prometheus: Option<SocketAddr>,
pub auth: Option<String>, pub with_auth: Option<bool>,
pub with_credentials: Option<String>,
pub grpc2pubsub: Option<ConfigGrpc2PubSub>, pub grpc2pubsub: Option<ConfigGrpc2PubSub>,
} }
impl Config { impl Config {
pub async fn create_client(&self) -> anyhow::Result<Client> { pub async fn create_client(&self) -> anyhow::Result<Client> {
let mut config = ClientConfig::default(); let mut config = ClientConfig::default();
if let Some(creds) = match self.auth.clone() { if matches!(self.with_auth, Some(true)) {
config = config.with_auth().await?;
} else if let Some(creds) = match self.with_credentials.clone() {
Some(filepath) => CredentialsFile::new_from_file(filepath).await.map(Some), Some(filepath) => CredentialsFile::new_from_file(filepath).await.map(Some),
None => { None => {
if std::env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON").is_ok() if std::env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON").is_ok()