diff --git a/Cargo.lock b/Cargo.lock index 99cc64f92b..d1d3e32ebd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -229,6 +229,11 @@ name = "bs58" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "build_const" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "bv" version = "0.11.0" @@ -370,6 +375,14 @@ dependencies = [ "libc 0.2.50 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crc" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "build_const 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crc32fast" version = "1.1.2" @@ -1973,6 +1986,7 @@ dependencies = [ "bs58 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "hashbrown 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3021,6 +3035,7 @@ dependencies = [ "checksum block-padding 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4fc4358306e344bf9775d0197fd00d2603e5afb0771bb353538630f022068ea3" "checksum bloom 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d00ac8e5056d6d65376a3c1aa5c7c34850d6949ace17f0266953a254eb3d6fe8" "checksum bs58 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0de79cfb98e7aa9988188784d8664b4b5dad6eaaa0863b91d9a4ed871d4f7a42" +"checksum build_const 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "39092a32794787acd8525ee150305ff051b0aa6cc2abaf193924f5ab05425f39" "checksum bv 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6cd4ae9e585e783756cd14b0ea21863acdfbb6383664ac2f7c9ef8d180a14727" "checksum byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "560c32574a12a89ecd91f5e742165893f86e3ab98d21f8ea548658eb9eef5f40" "checksum byte-tools 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "980479e6fde23246dfb54d47580d66b4e99202e7579c5eaa9fe10ecb5ebd2182" @@ -3039,6 +3054,7 @@ dependencies = [ "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" "checksum core-foundation 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "286e0b41c3a20da26536c6000a280585d519fd07b3956b43aed8a79e9edce980" "checksum core-foundation-sys 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "716c271e8613ace48344f723b60b900a93150271e5be206212d052bbc0883efa" +"checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" "checksum crc32fast 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e91d5240c6975ef33aeb5f148f35275c25eda8e8a5f95abe421978b05b8bf192" "checksum crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "137bc235f622ffaa0428e3854e24acb53291fc0b3ff6fb2cb75a8be6fb02f06b" "checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3" diff --git a/core/Cargo.toml b/core/Cargo.toml index 42d566c724..1abd63c9c0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -17,13 +17,14 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git chacha = [] cuda = [] erasure = [] -kvstore = ["memmap"] +kvstore = ["crc", "memmap"] [dependencies] bincode = "1.1.2" bs58 = "0.2.0" byteorder = "1.3.1" chrono = { version = "0.4.0", features = ["serde"] } +crc = { version = "1.8.1", optional = true } hashbrown = "0.1.8" indexmap = "1.0" itertools = "0.8.0" diff --git a/core/src/kvstore/io_utils.rs b/core/src/kvstore/io_utils.rs index a2b8b3af95..cd9f117591 100644 --- a/core/src/kvstore/io_utils.rs +++ b/core/src/kvstore/io_utils.rs @@ -1,7 +1,9 @@ +use byteorder::{BigEndian, ByteOrder}; +use crc::crc32; use memmap::Mmap; - +use std::cmp; use std::fs::File; -use std::io::{self, BufWriter, Seek, SeekFrom, Write}; +use std::io::{self, BufWriter, Read, Seek, SeekFrom, Write}; use std::ops::Deref; use std::sync::{Arc, RwLock}; @@ -25,12 +27,125 @@ pub struct SharedWriter { pos: u64, } +#[derive(Debug)] +pub struct CRCWriter { + writer: W, + buffer: Vec, + position: usize, + capacity: usize, +} + +#[derive(Debug)] +pub struct CRCReader { + reader: R, + buffer: Vec, + position: usize, + chunk_size: usize, +} + impl SharedWriter { pub fn new(buf: Arc>>) -> SharedWriter { SharedWriter { buf, pos: 0 } } } +impl CRCWriter { + #[allow(dead_code)] + pub fn new(inner: W, chunk_size: usize) -> CRCWriter { + if chunk_size <= 8 { + panic!("chunk_size must be > 8"); + } + + CRCWriter { + writer: inner, + buffer: vec![0; chunk_size], + position: 0, + capacity: chunk_size - 8, + } + } + + #[allow(dead_code)] + pub fn into_inner(mut self) -> io::Result { + self.flush()?; + Ok(self.writer) + } + + #[allow(dead_code)] + pub fn get_ref(&self) -> &W { + &self.writer + } + + #[allow(dead_code)] + pub fn get_mut(&mut self) -> &mut W { + &mut self.writer + } +} + +impl CRCReader { + #[allow(dead_code)] + pub fn new(inner: R, chunk_size: usize) -> CRCReader { + if chunk_size <= 8 { + panic!("chunk_size must be > 8"); + } + + CRCReader { + reader: inner, + buffer: vec![0; chunk_size - 8], + position: chunk_size, + chunk_size, + } + } + + #[allow(dead_code)] + pub fn into_inner(self) -> R { + self.reader + } + + fn load_block(&mut self) -> io::Result<()> { + self.buffer.clear(); + self.position = 0; + + let mut block_buffer = vec![0; self.chunk_size]; + let mut block_position = 0; + + while block_position < self.chunk_size { + let bytes_read = self.reader.read(&mut block_buffer[block_position..])?; + if bytes_read == 0 { + break; + } + block_position += bytes_read + } + + if block_position < self.chunk_size { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + assert_eq!(block_position, self.chunk_size); + + let stored_digest = BigEndian::read_u32(&block_buffer[0..4]); + let payload_len = BigEndian::read_u32(&block_buffer[4..8]) as usize; + if payload_len + 8 > block_buffer.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "CRCReader: invalid block size", + )); + } + let payload = &block_buffer[8..8 + payload_len]; + let computed_digest = crc32::checksum_ieee(&block_buffer[4..8 + payload_len]); + + if computed_digest != stored_digest { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "CRCReader: CRC validation failed", + )); + } + + self.buffer.extend_from_slice(payload); + + Ok(()) + } +} + impl Deref for MemMap { type Target = [u8]; @@ -48,6 +163,69 @@ impl Deref for MemMap { } } +impl Write for CRCWriter +where + W: Write, +{ + fn write(&mut self, buffer: &[u8]) -> io::Result { + let mut written = 0; + + while written < buffer.len() { + let batch_len = (&mut self.buffer[8 + self.position..]).write(&buffer[written..])?; + + self.position += batch_len; + written += batch_len; + + if self.position >= self.capacity { + self.flush()?; + } + } + + Ok(written) + } + + fn flush(&mut self) -> io::Result<()> { + BigEndian::write_u32(&mut self.buffer[4..8], self.position as u32); + let total_len = self.position + 8; + + // crc over length + payload + let digest = crc32::checksum_ieee(&self.buffer[4..total_len]); + + BigEndian::write_u32(&mut self.buffer[0..4], digest); + self.writer.write_all(&self.buffer)?; + + self.position = 0; + Ok(()) + } +} + +impl Read for CRCReader +where + R: Read, +{ + fn read(&mut self, buffer: &mut [u8]) -> io::Result { + let mut write_position = 0; + + while write_position < buffer.len() { + if self.position >= self.buffer.len() { + self.load_block()?; + } + + let bytes_available = self.buffer.len() - self.position; + let space_remaining = buffer.len() - write_position; + let copy_len = cmp::min(bytes_available, space_remaining); + + (&mut buffer[write_position..write_position + copy_len]) + .copy_from_slice(&self.buffer[self.position..self.position + copy_len]); + + write_position += copy_len; + self.position += copy_len; + } + + Ok(write_position) + } +} + impl Write for SharedWriter { fn write(&mut self, buf: &[u8]) -> io::Result { use std::cmp; @@ -129,3 +307,117 @@ impl Seek for Writer { } } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_crc_write() { + let block_sizes = &[256, 512, 1024, 2048]; + let byte_counts = &[8, 128, 1024, 1024 * 8]; + + for &block_size in block_sizes { + for &n_bytes in byte_counts { + let bytes: Vec<_> = (0..n_bytes).map(|x| (x % 255) as u8).collect(); + let buffer = Vec::new(); + + let mut writer = CRCWriter::new(buffer, block_size); + writer.write_all(&bytes).unwrap(); + + let buffer = writer.into_inner().unwrap(); + + let space_per_block = block_size - 8; + let n_full_blocks = n_bytes / space_per_block; + let blocks_expected = n_full_blocks + (n_bytes % space_per_block != 0) as usize; + let expected_len = blocks_expected * block_size; + + assert_eq!(buffer.len(), expected_len); + assert_eq!(&buffer[8..16], &[0, 1, 2, 3, 4, 5, 6, 7]); + } + } + } + + #[test] + fn test_crc_io() { + const BLK_SIZE: usize = 1024; + let bytes: Vec<_> = (0..512 * 1024).map(|x| (x % 255) as u8).collect(); + let buffer = Vec::new(); + + let mut writer = CRCWriter::new(buffer, BLK_SIZE); + writer.write_all(&bytes).unwrap(); + + let buffer = writer.into_inner().unwrap(); + assert_eq!(&buffer[8..16], &[0, 1, 2, 3, 4, 5, 6, 7]); + + let mut reader = CRCReader::new(&buffer[..], BLK_SIZE); + + let mut retrieved = Vec::with_capacity(512 * 1024); + let read_buffer = &mut [0; 1024]; + while let Ok(amt) = reader.read(read_buffer) { + if amt == 0 { + break; + } + retrieved.extend_from_slice(&read_buffer[..amt]); + } + + assert_eq!(&retrieved[..8], &[0, 1, 2, 3, 4, 5, 6, 7]); + + assert_eq!(bytes.len(), retrieved.len()); + assert_eq!(bytes, retrieved); + } + + #[test] + fn test_crc_validation() { + const BLK_SIZE: usize = 1024; + let n_bytes = 512 * 1024; + let bytes: Vec<_> = (0..n_bytes).map(|x| (x % 255) as u8).collect(); + let buffer = Vec::new(); + + let mut writer = CRCWriter::new(buffer, BLK_SIZE); + writer.write_all(&bytes).unwrap(); + + let mut buffer = writer.into_inner().unwrap(); + buffer[BLK_SIZE / 2] += 1; + + let mut reader = CRCReader::new(&buffer[..], BLK_SIZE); + + let mut retrieved = vec![]; + let res = reader.read_to_end(&mut retrieved); + assert_eq!(res.unwrap_err().kind(), io::ErrorKind::InvalidData); + } + + #[test] + fn test_crc_size_mismatch() { + const BLK_SIZE: usize = 1024; + let n_bytes = 512 * 1024; + let bytes: Vec<_> = (0..n_bytes).map(|x| (x % 255) as u8).collect(); + let buffer = Vec::new(); + + let mut writer = CRCWriter::new(buffer, BLK_SIZE); + writer.write_all(&bytes).unwrap(); + + let mut buffer = writer.into_inner().unwrap(); + buffer.drain((n_bytes - 512)..n_bytes); + + for &size_diff in &[100, 1, 25, BLK_SIZE - 9] { + let mut reader = CRCReader::new(&buffer[..], BLK_SIZE - size_diff); + + let mut retrieved = vec![]; + let res = reader.read_to_end(&mut retrieved); + assert_eq!(res.unwrap_err().kind(), io::ErrorKind::InvalidData); + } + } + + #[should_panic] + #[test] + fn test_crc_writer_invalid_chunk_size() { + let _ = CRCWriter::new(Vec::new(), 8); + } + + #[should_panic] + #[test] + fn test_crc_reader_invalid_chunk_size() { + let _ = CRCReader::new(io::empty(), 8); + } +}