Merge pull request #2 from Nader-Sl/v1.10

Adds optional zstd compression to account data
This commit is contained in:
Linus Kendall 2022-07-13 22:09:52 +02:00 committed by GitHub
commit 057cc19459
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 4 deletions

View File

@ -33,7 +33,8 @@ tokio-stream = "0.1"
async-stream = "0.2"
rand = "0.8"
zstd = "0.11.2"
zstd-safe = "5.0.2"
[build-dependencies]
tonic-build = { version = "0.6", features = ["compression"] }

View File

@ -0,0 +1,41 @@
use std::io::{Read, Write};
pub fn zstd_compress(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
let mut encoder = zstd::stream::write::Encoder::new(Vec::new(), 0).unwrap();
encoder.write_all(data)?;
encoder.finish()
}
pub fn zstd_decompress(data: &[u8], uncompressed: &mut Vec<u8>) -> Result<usize, std::io::Error> {
let mut decoder = zstd::stream::read::Decoder::new(data).unwrap();
decoder.read_to_end(uncompressed)
}
pub(crate) mod tests {
use super::*;
#[test]
fn test_zstd_compression() {
let data = vec![100; 256]; //sample data, 256 bytes of val 100.
println!("Uncompressed Data = {:?}", data);
match zstd_compress(&data) {
Ok(compressed) => {
println!("Compressed Data = {:?}\n", compressed);
let mut uncompressed: Vec<u8> = Vec::new();
match zstd_decompress(&compressed, &mut uncompressed) {
Ok(_) => {
println!("Uncompressed Data = {:?}\n", uncompressed);
}
Err(e) => {
println!("Error = {:?}", e);
}
}
}
Err(e) => {
println!("Error compressing Data {:?}", e);
}
}
}
}

View File

@ -1,5 +1,8 @@
use crate::compression::zstd_compress;
use {
crate::accounts_selector::AccountsSelector,
crate::compression,
bs58,
geyser_proto::{
slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping,
@ -113,6 +116,7 @@ pub struct PluginData {
/// Needed to catch writes that signal account closure, where
/// lamports=0 and owner=system-program.
active_accounts: RwLock<HashSet<[u8; 32]>>,
zstd_compression: bool,
}
#[derive(Default)]
@ -131,6 +135,7 @@ impl std::fmt::Debug for Plugin {
pub struct PluginConfig {
pub bind_address: String,
pub service_config: geyser_service::ServiceConfig,
pub zstd_compression: bool,
}
impl PluginData {
@ -187,8 +192,8 @@ impl GeyserPlugin for Plugin {
let server_broadcast = service.sender.clone();
let server = geyser_proto::accounts_db_server::AccountsDbServer::new(service)
.accept_gzip()
.send_gzip();
.accept_gzip()
.send_gzip();
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.spawn(Server::builder().add_service(server).serve_with_shutdown(
addr,
@ -219,6 +224,7 @@ impl GeyserPlugin for Plugin {
accounts_selector,
highest_write_slot,
active_accounts: RwLock::new(HashSet::new()),
zstd_compression: config.zstd_compression,
});
Ok(())
@ -285,6 +291,21 @@ impl GeyserPlugin for Plugin {
slot,
);
let mut account_data = account.data.to_vec();
//zstd compress if enabled.
if data.zstd_compression {
match zstd_compress(&account_data) {
Ok(res) => account_data = res,
Err(e) => {
println!(
"zstd_decompress compression failed = {:?} , using original data.",
e
);
}
}
}
data.broadcast(UpdateOneof::AccountWrite(AccountWrite {
slot,
is_startup,
@ -294,7 +315,7 @@ impl GeyserPlugin for Plugin {
owner: account.owner.to_vec(),
executable: account.executable,
rent_epoch: account.rent_epoch,
data: account.data.to_vec(),
data: account_data,
is_selected,
}));
}
@ -377,6 +398,8 @@ pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
#[cfg(test)]
pub(crate) mod tests {
use std::io::Write;
use {super::*, serde_json};
#[test]

View File

@ -1,2 +1,3 @@
pub mod compression;
pub mod accounts_selector;
pub mod geyser_plugin_grpc;