diff --git a/storage-bigtable/Cargo.toml b/storage-bigtable/Cargo.toml index 63848e348..4b769cc8f 100644 --- a/storage-bigtable/Cargo.toml +++ b/storage-bigtable/Cargo.toml @@ -9,10 +9,19 @@ homepage = "https://solana.com/" edition = "2018" [dependencies] -goauth = "0.7.1" +bincode = "1.2.1" +goauth = "0.7.2" log = "0.4.8" smpl_jwt = "0.5.0" tonic = {version="0.3.0", features = ["tls", "transport"]} +prost = "0.6.1" +prost-types = "0.6.1" +enum-iterator = "0.6.0" +bzip2 = "0.3.3" +flate2 = "1.0.14" +serde = "1.0.112" +serde_derive = "1.0.103" +zstd = "0.5.1" [lib] crate-type = ["lib"] diff --git a/storage-bigtable/src/compression.rs b/storage-bigtable/src/compression.rs new file mode 100644 index 000000000..665f93b46 --- /dev/null +++ b/storage-bigtable/src/compression.rs @@ -0,0 +1,105 @@ +use enum_iterator::IntoEnumIterator; +use std::io::{self, BufReader, Read, Write}; + +#[derive(Debug, Serialize, Deserialize, IntoEnumIterator)] +pub enum CompressionMethod { + NoCompression, + Bzip2, + Gzip, + Zstd, +} + +fn decompress_reader<'a, R: Read + 'a>( + method: CompressionMethod, + stream: R, +) -> Result, io::Error> { + let buf_reader = BufReader::new(stream); + let decompress_reader: Box = match method { + CompressionMethod::Bzip2 => Box::new(bzip2::bufread::BzDecoder::new(buf_reader)), + CompressionMethod::Gzip => Box::new(flate2::read::GzDecoder::new(buf_reader)), + CompressionMethod::Zstd => Box::new(zstd::stream::read::Decoder::new(buf_reader)?), + CompressionMethod::NoCompression => Box::new(buf_reader), + }; + Ok(decompress_reader) +} + +pub fn decompress(data: &[u8]) -> Result, io::Error> { + let method_size = bincode::serialized_size(&CompressionMethod::NoCompression).unwrap(); + if (data.len() as u64) < method_size { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("data len too small: {}", data.len()), + )); + } + let method = bincode::deserialize(&data[..method_size as usize]).map_err(|err| { + io::Error::new( + io::ErrorKind::Other, + format!("method deserialize failed: {}", err), + ) + })?; + + let mut reader = decompress_reader(method, &data[method_size as usize..])?; + let mut uncompressed_data = vec![]; + reader.read_to_end(&mut uncompressed_data)?; + Ok(uncompressed_data) +} + +pub fn compress(method: CompressionMethod, data: &[u8]) -> Result, io::Error> { + let mut compressed_data = bincode::serialize(&method).unwrap(); + compressed_data.extend( + match method { + CompressionMethod::Bzip2 => { + let mut e = bzip2::write::BzEncoder::new(Vec::new(), bzip2::Compression::Best); + e.write_all(data)?; + e.finish()? + } + CompressionMethod::Gzip => { + let mut e = + flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + e.write_all(data)?; + e.finish()? + } + CompressionMethod::Zstd => { + let mut e = zstd::stream::write::Encoder::new(Vec::new(), 0).unwrap(); + e.write_all(data)?; + e.finish()? + } + CompressionMethod::NoCompression => data.to_vec(), + } + .into_iter(), + ); + + Ok(compressed_data) +} + +pub fn compress_best(data: &[u8]) -> Result, io::Error> { + let mut candidates = vec![]; + for method in CompressionMethod::into_enum_iter() { + candidates.push(compress(method, data)?); + } + + Ok(candidates + .into_iter() + .min_by(|a, b| a.len().cmp(&b.len())) + .unwrap()) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_compress_uncompress() { + let data = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + assert_eq!( + decompress(&compress_best(&data).expect("compress_best")).expect("decompress"), + data + ); + } + + #[test] + fn test_compress() { + let data = vec![0; 256]; + assert!(compress_best(&data).expect("compress_best").len() < data.len()); + } +} diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index b85cb05db..a441dace1 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -1,2 +1,6 @@ mod access_token; +mod compression; mod root_ca_certificate; + +#[macro_use] +extern crate serde_derive;