Merge branch 'v1.10-comaspring' of github.com:rpcpool/solana-accountsdb-connector into v1.10-comaspring
This commit is contained in:
commit
b04fcc24ca
|
@ -33,7 +33,8 @@ tokio-stream = "0.1"
|
|||
|
||||
async-stream = "0.2"
|
||||
rand = "0.8"
|
||||
|
||||
zstd = "0.11.2"
|
||||
zstd-safe = "5.0.2"
|
||||
[build-dependencies]
|
||||
tonic-build = { version = "0.6", features = ["compression"] }
|
||||
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
use std::io::{Read, Write};
|
||||
|
||||
pub fn zstd_compress(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
|
||||
let mut encoder = zstd::stream::write::Encoder::new(Vec::new(), 0).unwrap();
|
||||
encoder.write_all(data)?;
|
||||
encoder.finish()
|
||||
}
|
||||
|
||||
pub fn zstd_decompress(data: &[u8], uncompressed: &mut Vec<u8>) -> Result<usize, std::io::Error> {
|
||||
let mut decoder = zstd::stream::read::Decoder::new(data).unwrap();
|
||||
decoder.read_to_end(uncompressed)
|
||||
}
|
||||
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
#[test]
|
||||
fn test_zstd_compression() {
|
||||
let data = vec![100; 256]; //sample data, 256 bytes of val 100.
|
||||
println!("Uncompressed Data = {:?}", data);
|
||||
|
||||
match zstd_compress(&data) {
|
||||
Ok(compressed) => {
|
||||
println!("Compressed Data = {:?}\n", compressed);
|
||||
|
||||
let mut uncompressed: Vec<u8> = Vec::new();
|
||||
|
||||
match zstd_decompress(&compressed, &mut uncompressed) {
|
||||
Ok(_) => {
|
||||
println!("Uncompressed Data = {:?}\n", uncompressed);
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error = {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error compressing Data {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,5 +1,8 @@
|
|||
use crate::compression::zstd_compress;
|
||||
|
||||
use {
|
||||
crate::accounts_selector::AccountsSelector,
|
||||
crate::compression,
|
||||
bs58,
|
||||
geyser_proto::{
|
||||
slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping,
|
||||
|
@ -113,6 +116,7 @@ pub struct PluginData {
|
|||
/// Needed to catch writes that signal account closure, where
|
||||
/// lamports=0 and owner=system-program.
|
||||
active_accounts: RwLock<HashSet<[u8; 32]>>,
|
||||
zstd_compression: bool,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
@ -131,6 +135,7 @@ impl std::fmt::Debug for Plugin {
|
|||
pub struct PluginConfig {
|
||||
pub bind_address: String,
|
||||
pub service_config: geyser_service::ServiceConfig,
|
||||
pub zstd_compression: bool,
|
||||
}
|
||||
|
||||
impl PluginData {
|
||||
|
@ -187,8 +192,8 @@ impl GeyserPlugin for Plugin {
|
|||
let server_broadcast = service.sender.clone();
|
||||
|
||||
let server = geyser_proto::accounts_db_server::AccountsDbServer::new(service)
|
||||
.accept_gzip()
|
||||
.send_gzip();
|
||||
.accept_gzip()
|
||||
.send_gzip();
|
||||
let runtime = tokio::runtime::Runtime::new().unwrap();
|
||||
runtime.spawn(Server::builder().add_service(server).serve_with_shutdown(
|
||||
addr,
|
||||
|
@ -219,6 +224,7 @@ impl GeyserPlugin for Plugin {
|
|||
accounts_selector,
|
||||
highest_write_slot,
|
||||
active_accounts: RwLock::new(HashSet::new()),
|
||||
zstd_compression: config.zstd_compression,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
|
@ -285,6 +291,21 @@ impl GeyserPlugin for Plugin {
|
|||
slot,
|
||||
);
|
||||
|
||||
let mut account_data = account.data.to_vec();
|
||||
|
||||
//zstd compress if enabled.
|
||||
if data.zstd_compression {
|
||||
match zstd_compress(&account_data) {
|
||||
Ok(res) => account_data = res,
|
||||
Err(e) => {
|
||||
println!(
|
||||
"zstd_decompress compression failed = {:?} , using original data.",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data.broadcast(UpdateOneof::AccountWrite(AccountWrite {
|
||||
slot,
|
||||
is_startup,
|
||||
|
@ -294,7 +315,7 @@ impl GeyserPlugin for Plugin {
|
|||
owner: account.owner.to_vec(),
|
||||
executable: account.executable,
|
||||
rent_epoch: account.rent_epoch,
|
||||
data: account.data.to_vec(),
|
||||
data: account_data,
|
||||
is_selected,
|
||||
}));
|
||||
}
|
||||
|
@ -377,6 +398,8 @@ pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
|
|||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use std::io::Write;
|
||||
|
||||
use {super::*, serde_json};
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -1,2 +1,3 @@
|
|||
pub mod compression;
|
||||
pub mod accounts_selector;
|
||||
pub mod geyser_plugin_grpc;
|
||||
|
|
Loading…
Reference in New Issue