Getting compressed accounts with tonic gzip compression

This commit is contained in:
godmodegalactus 2024-04-22 21:35:13 +02:00
parent 2c9785b8b0
commit 2002c43589
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
10 changed files with 82 additions and 21 deletions

3
Cargo.lock generated
View File

@ -4668,8 +4668,10 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tonic",
"tonic-health",
"tracing",
"yellowstone-grpc-client",
"yellowstone-grpc-proto",
]
@ -6102,6 +6104,7 @@ dependencies = [
"axum",
"base64 0.21.7",
"bytes",
"flate2",
"h2",
"http",
"http-body",

View File

@ -93,6 +93,7 @@ geyser-grpc-connector = { tag = "v0.10.6+yellowstone.1.13+solana.1.17.28", git =
async-trait = "0.1.68"
tonic-health = "0.10"
tonic-health = "0.10.2"
tonic = { version = "0.10.2", features = ["gzip"] }
lz4 = "1.24.0"
zstd = "0.11.2"

View File

@ -4,7 +4,7 @@ use anyhow::bail;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_account_decoder::UiAccount;
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::types::BlockInfoStream;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::{
@ -78,7 +78,7 @@ impl AccountService {
pub fn process_account_stream(
&self,
mut account_stream: AccountStream,
mut block_stream: BlockStream,
mut block_stream: BlockInfoStream,
) -> Vec<AnyhowJoinHandle> {
let this = self.clone();
let processed_task = tokio::spawn(async move {

View File

@ -3,7 +3,6 @@ use std::{collections::HashSet, sync::Arc};
use crate::account_store_interface::{AccountLoadingError, AccountStorageInterface};
use async_trait::async_trait;
use dashmap::DashMap;
use log::warn;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_lite_rpc_core::{commitment_utils::Commitment, structures::account_data::AccountData};
use solana_rpc_client_api::filter::RpcFilterType;
@ -93,8 +92,11 @@ impl AccountDataByCommitment {
.unwrap_or(true);
let mut updated = false;
if self.processed_accounts.get(&data.updated_slot).is_none() {
// processed not present for the slot
// processed not present for the slot
// grpc can send multiple inter transaction changed account states for same slot
// we have to update till we get the last
if commitment == Commitment::Processed || !self.processed_accounts.contains_key(&data.updated_slot){
self.processed_accounts
.insert(data.updated_slot, data.clone());
updated = true;
@ -300,12 +302,6 @@ impl AccountStorageInterface for InmemoryAccountStore {
let commitment = self
.maybe_update_slot_status(&account_data, commitment)
.await;
log::info!(
"got account {} at {} {}",
account_data.pubkey.to_string(),
account_data.updated_slot,
commitment.into_commitment_level().to_string()
);
match self.account_store.entry(account_data.pubkey) {
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
@ -416,7 +412,11 @@ impl AccountStorageInterface for InmemoryAccountStore {
status.accounts_updated.clone()
}
None => {
warn!("slot status not found for {} and commitment {}, should be normal during startup", slot, commitment.into_commitment_level());
if commitment == Commitment::Confirmed {
log::warn!("slot status not found for {} and commitment {}, confirmed lagging", slot, commitment.into_commitment_level());
} else if commitment == Commitment::Finalized {
log::error!("slot status not found for {} and commitment {}, should be normal during startup", slot, commitment.into_commitment_level());
}
let status = SlotStatus {
commitment,
accounts_updated: HashSet::new(),

View File

@ -46,3 +46,5 @@ itertools = {workspace = true}
prometheus = { workspace = true }
lazy_static = { workspace = true }
tonic-health = { workspace = true }
tonic = {workspace = true}
yellowstone-grpc-client = "1.13.0"

View File

@ -6,9 +6,7 @@ use std::{
time::Duration,
};
use geyser_grpc_connector::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use geyser_grpc_connector::yellowstone_grpc_util::GeyserGrpcClientBufferConfig;
use geyser_grpc_connector::{GeyserGrpcClient, GeyserGrpcClientResult, GrpcSourceConfig};
use itertools::Itertools;
use solana_lite_rpc_core::{
@ -29,6 +27,8 @@ use yellowstone_grpc_proto::geyser::{
};
use yellowstone_grpc_proto::tonic::service::Interceptor;
use super::grpc_utils::connect_with_timeout_with_buffers_and_compression;
pub fn start_account_streaming_tasks(
grpc_config: GrpcSourceConfig,
accounts_filters: AccountFilters,
@ -220,7 +220,7 @@ pub fn start_account_streaming_tasks(
async fn create_connection(
grpc_config: &GrpcSourceConfig,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor + Sized>> {
connect_with_timeout_with_buffers(
connect_with_timeout_with_buffers_and_compression(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
None,

View File

@ -0,0 +1,56 @@
use std::time::Duration;
use bytes::Bytes;
use geyser_grpc_connector::{yellowstone_grpc_util::GeyserGrpcClientBufferConfig, GeyserGrpcClient, GeyserGrpcClientResult};
use tonic::{codec::CompressionEncoding, metadata::{errors::InvalidMetadataValue, AsciiMetadataValue}, service::Interceptor, transport::ClientTlsConfig};
use tonic_health::pb::health_client::HealthClient;
use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient;
use yellowstone_grpc_client::InterceptorXToken;
pub async fn connect_with_timeout_with_buffers_and_compression<E, T>(
endpoint: E,
x_token: Option<T>,
tls_config: Option<ClientTlsConfig>,
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
buffer_config: GeyserGrpcClientBufferConfig,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
// see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10
let mut endpoint = tonic::transport::Endpoint::from_shared(endpoint)?
.buffer_size(buffer_config.buffer_size)
.initial_connection_window_size(buffer_config.conn_window)
.initial_stream_window_size(buffer_config.stream_window);
if let Some(tls_config) = tls_config {
endpoint = endpoint.tls_config(tls_config)?;
}
if let Some(connect_timeout) = connect_timeout {
endpoint = endpoint.timeout(connect_timeout);
}
if let Some(request_timeout) = request_timeout {
endpoint = endpoint.timeout(request_timeout);
}
let x_token: Option<AsciiMetadataValue> = match x_token {
Some(x_token) => Some(x_token.try_into()?),
None => None,
};
let interceptor = InterceptorXToken { x_token };
let channel = endpoint.connect_lazy();
let client = GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size())
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
);
Ok(client)
}

View File

@ -1 +1,2 @@
pub mod grpc_accounts_streaming;
pub mod grpc_utils;

View File

@ -5,9 +5,7 @@ use crate::grpc_multiplex::{
};
use anyhow::Context;
use futures::StreamExt;
use geyser_grpc_connector::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use geyser_grpc_connector::yellowstone_grpc_util::{connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig};
use geyser_grpc_connector::GrpcSourceConfig;
use itertools::Itertools;
use log::trace;

View File

@ -210,7 +210,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
let account_service = AccountService::new(account_storage, account_notification_sender);
account_service
.process_account_stream(account_stream.resubscribe(), blocks_notifier.resubscribe());
.process_account_stream(account_stream.resubscribe(), blockinfo_notifier.resubscribe());
account_service
.populate_from_rpc(