Added optional zstd compression to account data
This commit is contained in:
parent
7984be92e2
commit
5adf0ee0af
|
@ -33,7 +33,8 @@ tokio-stream = "0.1"
|
||||||
|
|
||||||
async-stream = "0.2"
|
async-stream = "0.2"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
|
zstd = "0.11.2"
|
||||||
|
zstd-safe = "5.0.2"
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
tonic-build = { version = "0.6", features = ["compression"] }
|
tonic-build = { version = "0.6", features = ["compression"] }
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
use std::io::{Read, Write};
|
||||||
|
|
||||||
|
pub fn zstd_compress(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
|
||||||
|
let mut encoder = zstd::stream::write::Encoder::new(Vec::new(), 0).unwrap();
|
||||||
|
|
||||||
|
encoder.write_all(data)?;
|
||||||
|
|
||||||
|
encoder.finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn zstd_decompress(data: &[u8], uncompressed: &mut Vec<u8>) -> Result<usize, std::io::Error> {
|
||||||
|
let mut decoder = zstd::stream::read::Decoder::new(data).unwrap();
|
||||||
|
decoder.read_to_end(uncompressed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) mod tests {
|
||||||
|
use super::*;
|
||||||
|
#[test]
|
||||||
|
fn test_zstd_compression() {
|
||||||
|
let data = vec![100; 256];
|
||||||
|
println!("Uncompressed Data = {:?}", data);
|
||||||
|
match zstd_compress(&data) {
|
||||||
|
Ok(compressed) => {
|
||||||
|
println!("Compressed Data = {:?}\n", compressed);
|
||||||
|
|
||||||
|
let mut uncompressed: Vec<u8> = Vec::new();
|
||||||
|
match zstd_decompress(&compressed, &mut uncompressed) {
|
||||||
|
Ok(res) => {
|
||||||
|
println!("Uncompressed Data = {:?}\n", uncompressed);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
println!("Error = {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
println!("Error compressing Data {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,5 +1,8 @@
|
||||||
|
use crate::compression::zstd_compress;
|
||||||
|
|
||||||
use {
|
use {
|
||||||
crate::accounts_selector::AccountsSelector,
|
crate::accounts_selector::AccountsSelector,
|
||||||
|
crate::compression,
|
||||||
bs58,
|
bs58,
|
||||||
geyser_proto::{
|
geyser_proto::{
|
||||||
slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping,
|
slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping,
|
||||||
|
@ -113,6 +116,7 @@ pub struct PluginData {
|
||||||
/// Needed to catch writes that signal account closure, where
|
/// Needed to catch writes that signal account closure, where
|
||||||
/// lamports=0 and owner=system-program.
|
/// lamports=0 and owner=system-program.
|
||||||
active_accounts: RwLock<HashSet<[u8; 32]>>,
|
active_accounts: RwLock<HashSet<[u8; 32]>>,
|
||||||
|
zstd_compression: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
|
@ -131,6 +135,7 @@ impl std::fmt::Debug for Plugin {
|
||||||
pub struct PluginConfig {
|
pub struct PluginConfig {
|
||||||
pub bind_address: String,
|
pub bind_address: String,
|
||||||
pub service_config: geyser_service::ServiceConfig,
|
pub service_config: geyser_service::ServiceConfig,
|
||||||
|
pub zstd_compression: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PluginData {
|
impl PluginData {
|
||||||
|
@ -219,6 +224,7 @@ impl GeyserPlugin for Plugin {
|
||||||
accounts_selector,
|
accounts_selector,
|
||||||
highest_write_slot,
|
highest_write_slot,
|
||||||
active_accounts: RwLock::new(HashSet::new()),
|
active_accounts: RwLock::new(HashSet::new()),
|
||||||
|
zstd_compression: config.zstd_compression,
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -285,6 +291,21 @@ impl GeyserPlugin for Plugin {
|
||||||
slot,
|
slot,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut account_data = account.data.to_vec();
|
||||||
|
|
||||||
|
//zstd compress if enabled.
|
||||||
|
if data.zstd_compression {
|
||||||
|
match zstd_compress(&account_data) {
|
||||||
|
Ok(res) => account_data = res,
|
||||||
|
Err(e) => {
|
||||||
|
println!(
|
||||||
|
"zstd_decompress compression failed = {:?} , using original data.",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
data.broadcast(UpdateOneof::AccountWrite(AccountWrite {
|
data.broadcast(UpdateOneof::AccountWrite(AccountWrite {
|
||||||
slot,
|
slot,
|
||||||
is_startup,
|
is_startup,
|
||||||
|
@ -294,7 +315,7 @@ impl GeyserPlugin for Plugin {
|
||||||
owner: account.owner.to_vec(),
|
owner: account.owner.to_vec(),
|
||||||
executable: account.executable,
|
executable: account.executable,
|
||||||
rent_epoch: account.rent_epoch,
|
rent_epoch: account.rent_epoch,
|
||||||
data: account.data.to_vec(),
|
data: account_data,
|
||||||
is_selected,
|
is_selected,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
@ -377,6 +398,8 @@ pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) mod tests {
|
pub(crate) mod tests {
|
||||||
|
use std::io::Write;
|
||||||
|
|
||||||
use {super::*, serde_json};
|
use {super::*, serde_json};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
@ -1,2 +1,3 @@
|
||||||
|
pub mod compression;
|
||||||
pub mod accounts_selector;
|
pub mod accounts_selector;
|
||||||
pub mod geyser_plugin_grpc;
|
pub mod geyser_plugin_grpc;
|
||||||
|
|
Loading…
Reference in New Issue