From 5adf0ee0af933beb420ce9edbabc923f9c9575e5 Mon Sep 17 00:00:00 2001 From: Nader Sleiman Date: Wed, 13 Jul 2022 21:20:33 +0300 Subject: [PATCH] Added optional zstd compression to account data --- geyser-plugin-grpc/Cargo.toml | 3 +- geyser-plugin-grpc/src/compression.rs | 41 ++++++++++++++++++++ geyser-plugin-grpc/src/geyser_plugin_grpc.rs | 29 ++++++++++++-- geyser-plugin-grpc/src/lib.rs | 1 + 4 files changed, 70 insertions(+), 4 deletions(-) create mode 100644 geyser-plugin-grpc/src/compression.rs diff --git a/geyser-plugin-grpc/Cargo.toml b/geyser-plugin-grpc/Cargo.toml index 69765e6..c4a988b 100644 --- a/geyser-plugin-grpc/Cargo.toml +++ b/geyser-plugin-grpc/Cargo.toml @@ -33,7 +33,8 @@ tokio-stream = "0.1" async-stream = "0.2" rand = "0.8" - +zstd = "0.11.2" +zstd-safe = "5.0.2" [build-dependencies] tonic-build = { version = "0.6", features = ["compression"] } diff --git a/geyser-plugin-grpc/src/compression.rs b/geyser-plugin-grpc/src/compression.rs new file mode 100644 index 0000000..82f50e6 --- /dev/null +++ b/geyser-plugin-grpc/src/compression.rs @@ -0,0 +1,41 @@ +use std::io::{Read, Write}; + +pub fn zstd_compress(data: &[u8]) -> Result, std::io::Error> { + let mut encoder = zstd::stream::write::Encoder::new(Vec::new(), 0).unwrap(); + + encoder.write_all(data)?; + + encoder.finish() +} + +pub fn zstd_decompress(data: &[u8], uncompressed: &mut Vec) -> Result { + let mut decoder = zstd::stream::read::Decoder::new(data).unwrap(); + decoder.read_to_end(uncompressed) +} + +pub(crate) mod tests { + use super::*; + #[test] + fn test_zstd_compression() { + let data = vec![100; 256]; + println!("Uncompressed Data = {:?}", data); + match zstd_compress(&data) { + Ok(compressed) => { + println!("Compressed Data = {:?}\n", compressed); + + let mut uncompressed: Vec = Vec::new(); + match zstd_decompress(&compressed, &mut uncompressed) { + Ok(res) => { + println!("Uncompressed Data = {:?}\n", uncompressed); + } + Err(e) => { + println!("Error = {:?}", e); + } + } + } + Err(e) => { + println!("Error compressing Data {:?}", e); + } + } + } +} diff --git a/geyser-plugin-grpc/src/geyser_plugin_grpc.rs b/geyser-plugin-grpc/src/geyser_plugin_grpc.rs index a044018..f940e5f 100644 --- a/geyser-plugin-grpc/src/geyser_plugin_grpc.rs +++ b/geyser-plugin-grpc/src/geyser_plugin_grpc.rs @@ -1,5 +1,8 @@ +use crate::compression::zstd_compress; + use { crate::accounts_selector::AccountsSelector, + crate::compression, bs58, geyser_proto::{ slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping, @@ -113,6 +116,7 @@ pub struct PluginData { /// Needed to catch writes that signal account closure, where /// lamports=0 and owner=system-program. active_accounts: RwLock>, + zstd_compression: bool, } #[derive(Default)] @@ -131,6 +135,7 @@ impl std::fmt::Debug for Plugin { pub struct PluginConfig { pub bind_address: String, pub service_config: geyser_service::ServiceConfig, + pub zstd_compression: bool, } impl PluginData { @@ -187,8 +192,8 @@ impl GeyserPlugin for Plugin { let server_broadcast = service.sender.clone(); let server = geyser_proto::accounts_db_server::AccountsDbServer::new(service) - .accept_gzip() - .send_gzip(); + .accept_gzip() + .send_gzip(); let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.spawn(Server::builder().add_service(server).serve_with_shutdown( addr, @@ -219,6 +224,7 @@ impl GeyserPlugin for Plugin { accounts_selector, highest_write_slot, active_accounts: RwLock::new(HashSet::new()), + zstd_compression: config.zstd_compression, }); Ok(()) @@ -285,6 +291,21 @@ impl GeyserPlugin for Plugin { slot, ); + let mut account_data = account.data.to_vec(); + + //zstd compress if enabled. + if data.zstd_compression { + match zstd_compress(&account_data) { + Ok(res) => account_data = res, + Err(e) => { + println!( + "zstd_decompress compression failed = {:?} , using original data.", + e + ); + } + } + } + data.broadcast(UpdateOneof::AccountWrite(AccountWrite { slot, is_startup, @@ -294,7 +315,7 @@ impl GeyserPlugin for Plugin { owner: account.owner.to_vec(), executable: account.executable, rent_epoch: account.rent_epoch, - data: account.data.to_vec(), + data: account_data, is_selected, })); } @@ -377,6 +398,8 @@ pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin { #[cfg(test)] pub(crate) mod tests { + use std::io::Write; + use {super::*, serde_json}; #[test] diff --git a/geyser-plugin-grpc/src/lib.rs b/geyser-plugin-grpc/src/lib.rs index b3ec09f..de47172 100644 --- a/geyser-plugin-grpc/src/lib.rs +++ b/geyser-plugin-grpc/src/lib.rs @@ -1,2 +1,3 @@ +pub mod compression; pub mod accounts_selector; pub mod geyser_plugin_grpc;