From 2002c435890ecba191567aa042956f29fc8c70c6 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Mon, 22 Apr 2024 21:35:13 +0200 Subject: [PATCH] Getting compressed accounts with tonic gzip compression --- Cargo.lock | 3 + Cargo.toml | 3 +- accounts/src/account_service.rs | 4 +- accounts/src/inmemory_account_store.rs | 20 +++---- cluster-endpoints/Cargo.toml | 2 + .../src/grpc/grpc_accounts_streaming.rs | 8 +-- cluster-endpoints/src/grpc/grpc_utils.rs | 56 +++++++++++++++++++ cluster-endpoints/src/grpc/mod.rs | 1 + cluster-endpoints/src/grpc_subscription.rs | 4 +- lite-rpc/src/main.rs | 2 +- 10 files changed, 82 insertions(+), 21 deletions(-) create mode 100644 cluster-endpoints/src/grpc/grpc_utils.rs diff --git a/Cargo.lock b/Cargo.lock index 338b05bb..2c41574a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4668,8 +4668,10 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tonic", "tonic-health", "tracing", + "yellowstone-grpc-client", "yellowstone-grpc-proto", ] @@ -6102,6 +6104,7 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", + "flate2", "h2", "http", "http-body", diff --git a/Cargo.toml b/Cargo.toml index 8c74780c..0e4d9aa7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,6 +93,7 @@ geyser-grpc-connector = { tag = "v0.10.6+yellowstone.1.13+solana.1.17.28", git = async-trait = "0.1.68" -tonic-health = "0.10" +tonic-health = "0.10.2" +tonic = { version = "0.10.2", features = ["gzip"] } lz4 = "1.24.0" zstd = "0.11.2" \ No newline at end of file diff --git a/accounts/src/account_service.rs b/accounts/src/account_service.rs index 27d05b01..78627679 100644 --- a/accounts/src/account_service.rs +++ b/accounts/src/account_service.rs @@ -4,7 +4,7 @@ use anyhow::bail; use itertools::Itertools; use prometheus::{opts, register_int_gauge, IntGauge}; use solana_account_decoder::UiAccount; -use solana_lite_rpc_core::types::BlockStream; +use solana_lite_rpc_core::types::BlockInfoStream; use solana_lite_rpc_core::{ commitment_utils::Commitment, structures::{ @@ -78,7 +78,7 @@ impl AccountService { pub fn process_account_stream( &self, mut account_stream: AccountStream, - mut block_stream: BlockStream, + mut block_stream: BlockInfoStream, ) -> Vec { let this = self.clone(); let processed_task = tokio::spawn(async move { diff --git a/accounts/src/inmemory_account_store.rs b/accounts/src/inmemory_account_store.rs index 13adcb88..baf3654b 100644 --- a/accounts/src/inmemory_account_store.rs +++ b/accounts/src/inmemory_account_store.rs @@ -3,7 +3,6 @@ use std::{collections::HashSet, sync::Arc}; use crate::account_store_interface::{AccountLoadingError, AccountStorageInterface}; use async_trait::async_trait; use dashmap::DashMap; -use log::warn; use prometheus::{opts, register_int_gauge, IntGauge}; use solana_lite_rpc_core::{commitment_utils::Commitment, structures::account_data::AccountData}; use solana_rpc_client_api::filter::RpcFilterType; @@ -93,8 +92,11 @@ impl AccountDataByCommitment { .unwrap_or(true); let mut updated = false; - if self.processed_accounts.get(&data.updated_slot).is_none() { - // processed not present for the slot + // processed not present for the slot + // grpc can send multiple inter transaction changed account states for same slot + // we have to update till we get the last + if commitment == Commitment::Processed || !self.processed_accounts.contains_key(&data.updated_slot){ + self.processed_accounts .insert(data.updated_slot, data.clone()); updated = true; @@ -300,12 +302,6 @@ impl AccountStorageInterface for InmemoryAccountStore { let commitment = self .maybe_update_slot_status(&account_data, commitment) .await; - log::info!( - "got account {} at {} {}", - account_data.pubkey.to_string(), - account_data.updated_slot, - commitment.into_commitment_level().to_string() - ); match self.account_store.entry(account_data.pubkey) { dashmap::mapref::entry::Entry::Occupied(mut occ) => { @@ -416,7 +412,11 @@ impl AccountStorageInterface for InmemoryAccountStore { status.accounts_updated.clone() } None => { - warn!("slot status not found for {} and commitment {}, should be normal during startup", slot, commitment.into_commitment_level()); + if commitment == Commitment::Confirmed { + log::warn!("slot status not found for {} and commitment {}, confirmed lagging", slot, commitment.into_commitment_level()); + } else if commitment == Commitment::Finalized { + log::error!("slot status not found for {} and commitment {}, should be normal during startup", slot, commitment.into_commitment_level()); + } let status = SlotStatus { commitment, accounts_updated: HashSet::new(), diff --git a/cluster-endpoints/Cargo.toml b/cluster-endpoints/Cargo.toml index 2c0619e3..cbf6e6f5 100644 --- a/cluster-endpoints/Cargo.toml +++ b/cluster-endpoints/Cargo.toml @@ -46,3 +46,5 @@ itertools = {workspace = true} prometheus = { workspace = true } lazy_static = { workspace = true } tonic-health = { workspace = true } +tonic = {workspace = true} +yellowstone-grpc-client = "1.13.0" \ No newline at end of file diff --git a/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs b/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs index 1e2be444..deec77b5 100644 --- a/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs +++ b/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs @@ -6,9 +6,7 @@ use std::{ time::Duration, }; -use geyser_grpc_connector::yellowstone_grpc_util::{ - connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig, -}; +use geyser_grpc_connector::yellowstone_grpc_util::GeyserGrpcClientBufferConfig; use geyser_grpc_connector::{GeyserGrpcClient, GeyserGrpcClientResult, GrpcSourceConfig}; use itertools::Itertools; use solana_lite_rpc_core::{ @@ -29,6 +27,8 @@ use yellowstone_grpc_proto::geyser::{ }; use yellowstone_grpc_proto::tonic::service::Interceptor; +use super::grpc_utils::connect_with_timeout_with_buffers_and_compression; + pub fn start_account_streaming_tasks( grpc_config: GrpcSourceConfig, accounts_filters: AccountFilters, @@ -220,7 +220,7 @@ pub fn start_account_streaming_tasks( async fn create_connection( grpc_config: &GrpcSourceConfig, ) -> GeyserGrpcClientResult> { - connect_with_timeout_with_buffers( + connect_with_timeout_with_buffers_and_compression( grpc_config.grpc_addr.clone(), grpc_config.grpc_x_token.clone(), None, diff --git a/cluster-endpoints/src/grpc/grpc_utils.rs b/cluster-endpoints/src/grpc/grpc_utils.rs new file mode 100644 index 00000000..9bee247d --- /dev/null +++ b/cluster-endpoints/src/grpc/grpc_utils.rs @@ -0,0 +1,56 @@ +use std::time::Duration; + +use bytes::Bytes; +use geyser_grpc_connector::{yellowstone_grpc_util::GeyserGrpcClientBufferConfig, GeyserGrpcClient, GeyserGrpcClientResult}; +use tonic::{codec::CompressionEncoding, metadata::{errors::InvalidMetadataValue, AsciiMetadataValue}, service::Interceptor, transport::ClientTlsConfig}; +use tonic_health::pb::health_client::HealthClient; +use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient; +use yellowstone_grpc_client::InterceptorXToken; + + +pub async fn connect_with_timeout_with_buffers_and_compression( + endpoint: E, + x_token: Option, + tls_config: Option, + connect_timeout: Option, + request_timeout: Option, + buffer_config: GeyserGrpcClientBufferConfig, +) -> GeyserGrpcClientResult> +where + E: Into, + T: TryInto, +{ + // see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10 + let mut endpoint = tonic::transport::Endpoint::from_shared(endpoint)? + .buffer_size(buffer_config.buffer_size) + .initial_connection_window_size(buffer_config.conn_window) + .initial_stream_window_size(buffer_config.stream_window); + + if let Some(tls_config) = tls_config { + endpoint = endpoint.tls_config(tls_config)?; + } + + if let Some(connect_timeout) = connect_timeout { + endpoint = endpoint.timeout(connect_timeout); + } + + if let Some(request_timeout) = request_timeout { + endpoint = endpoint.timeout(request_timeout); + } + + let x_token: Option = match x_token { + Some(x_token) => Some(x_token.try_into()?), + None => None, + }; + let interceptor = InterceptorXToken { x_token }; + + let channel = endpoint.connect_lazy(); + let client = GeyserGrpcClient::new( + HealthClient::with_interceptor(channel.clone(), interceptor.clone()), + GeyserClient::with_interceptor(channel, interceptor) + .max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip), + ); + Ok(client) +} diff --git a/cluster-endpoints/src/grpc/mod.rs b/cluster-endpoints/src/grpc/mod.rs index 6ba29f85..6fadb239 100644 --- a/cluster-endpoints/src/grpc/mod.rs +++ b/cluster-endpoints/src/grpc/mod.rs @@ -1 +1,2 @@ pub mod grpc_accounts_streaming; +pub mod grpc_utils; \ No newline at end of file diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index 6343f24f..ead44b12 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -5,9 +5,7 @@ use crate::grpc_multiplex::{ }; use anyhow::Context; use futures::StreamExt; -use geyser_grpc_connector::yellowstone_grpc_util::{ - connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig, -}; +use geyser_grpc_connector::yellowstone_grpc_util::{connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig}; use geyser_grpc_connector::GrpcSourceConfig; use itertools::Itertools; use log::trace; diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 2fe873c2..3d0db64c 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -210,7 +210,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: let account_service = AccountService::new(account_storage, account_notification_sender); account_service - .process_account_stream(account_stream.resubscribe(), blocks_notifier.resubscribe()); + .process_account_stream(account_stream.resubscribe(), blockinfo_notifier.resubscribe()); account_service .populate_from_rpc(