diff --git a/runtime/benches/append_vec.rs b/runtime/benches/append_vec.rs index 2533ac14e..54c81f39e 100644 --- a/runtime/benches/append_vec.rs +++ b/runtime/benches/append_vec.rs @@ -1,241 +1,120 @@ #![feature(test)] - -extern crate rand; extern crate test; -use bincode::{deserialize, serialize_into, serialized_size}; use rand::{thread_rng, Rng}; -use solana_runtime::append_vec::{ - deserialize_account, get_serialized_size, serialize_account, AppendVec, -}; -use solana_sdk::account::Account; -use solana_sdk::pubkey::Pubkey; -use std::env; -use std::fs::{create_dir_all, remove_dir_all}; -use std::io::Cursor; -use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; +use solana_runtime::append_vec::test_utils::{create_test_account, get_append_vec_path}; +use solana_runtime::append_vec::AppendVec; +use std::sync::{Arc, Mutex}; +use std::thread::sleep; use std::thread::spawn; +use std::time::Duration; use test::Bencher; -const START_SIZE: u64 = 4 * 1024 * 1024; -const INC_SIZE: u64 = 1 * 1024 * 1024; - -macro_rules! align_up { - ($addr: expr, $align: expr) => { - ($addr + ($align - 1)) & !($align - 1) - }; -} - -fn get_append_vec_bench_path(path: &str) -> PathBuf { - let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); - let mut buf = PathBuf::new(); - buf.push(&format!("{}/{}", out_dir, path)); - let _ignored = remove_dir_all(out_dir.clone()); - create_dir_all(out_dir).expect("Create directory failed"); - buf -} - #[bench] -fn append_vec_atomic_append(bencher: &mut Bencher) { - let path = get_append_vec_bench_path("bench_append"); - let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); +fn append_vec_append(bencher: &mut Bencher) { + let path = get_append_vec_path("bench_append"); + let vec = AppendVec::new(&path.path, true, 64 * 1024); bencher.iter(|| { - if vec.append(AtomicUsize::new(0)).is_none() { - assert!(vec.grow_file().is_ok()); - assert!(vec.append(AtomicUsize::new(0)).is_some()); + let account = create_test_account(0); + if vec.append_account(&account).is_none() { + vec.reset(); } }); - std::fs::remove_file(path).unwrap(); +} + +fn add_test_accounts(vec: &AppendVec, size: usize) -> Vec<(usize, usize)> { + (0..size) + .into_iter() + .filter_map(|sample| { + let account = create_test_account(sample); + vec.append_account(&account).map(|pos| (sample, pos)) + }) + .collect() } #[bench] -fn append_vec_atomic_random_access(bencher: &mut Bencher) { - let path = get_append_vec_bench_path("bench_ra"); - let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); - let size = 1_000_000; - for _ in 0..size { - if vec.append(AtomicUsize::new(0)).is_none() { - assert!(vec.grow_file().is_ok()); - assert!(vec.append(AtomicUsize::new(0)).is_some()); - } - } +fn append_vec_sequential_read(bencher: &mut Bencher) { + let path = get_append_vec_path("seq_read"); + let vec = AppendVec::new(&path.path, true, 64 * 1024); + let size = 1_000; + let mut indexes = add_test_accounts(&vec, size); bencher.iter(|| { - let index = thread_rng().gen_range(0, size as u64); - vec.get(index * std::mem::size_of::() as u64); + let (sample, pos) = indexes.pop().unwrap(); + let account = vec.get_account(pos); + let test = create_test_account(sample); + assert_eq!(*account, test); + indexes.push((sample, pos)); }); - std::fs::remove_file(path).unwrap(); } - #[bench] -fn append_vec_atomic_random_change(bencher: &mut Bencher) { - let path = get_append_vec_bench_path("bench_rax"); - let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); - let size = 1_000_000; - for k in 0..size { - if vec.append(AtomicUsize::new(k)).is_none() { - assert!(vec.grow_file().is_ok()); - assert!(vec.append(AtomicUsize::new(k)).is_some()); - } - } +fn append_vec_random_read(bencher: &mut Bencher) { + let path = get_append_vec_path("random_read"); + let vec = AppendVec::new(&path.path, true, 64 * 1024); + let size = 1_000; + let mut indexes = add_test_accounts(&vec, size); bencher.iter(|| { - let index = thread_rng().gen_range(0, size as u64); - let atomic1 = vec.get(index * std::mem::size_of::() as u64); - let current1 = atomic1.load(Ordering::Relaxed); - assert_eq!(current1, index as usize); - let next = current1 + 1; - let mut index = vec.append(AtomicUsize::new(next)); - if index.is_none() { - assert!(vec.grow_file().is_ok()); - index = vec.append(AtomicUsize::new(next)); - } - let atomic2 = vec.get(index.unwrap()); - let current2 = atomic2.load(Ordering::Relaxed); - assert_eq!(current2, next); + let random_index: usize = thread_rng().gen_range(0, indexes.len()); + let (sample, pos) = &indexes[random_index]; + let account = vec.get_account(*pos); + let test = create_test_account(*sample); + assert_eq!(*account, test); }); - std::fs::remove_file(path).unwrap(); } #[bench] -fn append_vec_atomic_random_read(bencher: &mut Bencher) { - let path = get_append_vec_bench_path("bench_read"); - let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); - let size = 1_000_000; - for _ in 0..size { - if vec.append(AtomicUsize::new(0)).is_none() { - assert!(vec.grow_file().is_ok()); - assert!(vec.append(AtomicUsize::new(0)).is_some()); - } - } - bencher.iter(|| { - let index = thread_rng().gen_range(0, size); - let atomic1 = vec.get((index * std::mem::size_of::()) as u64); - let current1 = atomic1.load(Ordering::Relaxed); - assert_eq!(current1, 0); - }); - std::fs::remove_file(path).unwrap(); -} - -#[bench] -fn append_vec_concurrent_lock_append(bencher: &mut Bencher) { - let path = get_append_vec_bench_path("bench_lock_append"); - let vec = Arc::new(RwLock::new(AppendVec::::new( - &path, true, START_SIZE, INC_SIZE, - ))); +fn append_vec_concurrent_append_read(bencher: &mut Bencher) { + let path = get_append_vec_path("concurrent_read"); + let vec = Arc::new(AppendVec::new(&path.path, true, 1024 * 1024)); let vec1 = vec.clone(); - let size = 1_000_000; - let count = Arc::new(AtomicUsize::new(0)); - let count1 = count.clone(); + let indexes: Arc>> = Arc::new(Mutex::new(vec![])); + let indexes1 = indexes.clone(); spawn(move || loop { - let mut len = count.load(Ordering::Relaxed); - { - let rlock = vec1.read().unwrap(); - loop { - if rlock.append(AtomicUsize::new(0)).is_none() { - break; - } - len = count.fetch_add(1, Ordering::Relaxed); - } - if len >= size { - break; - } - } - { - let mut wlock = vec1.write().unwrap(); - if len >= size { - break; - } - assert!(wlock.grow_file().is_ok()); + let sample = indexes1.lock().unwrap().len(); + let account = create_test_account(sample); + if let Some(pos) = vec1.append_account(&account) { + indexes1.lock().unwrap().push((sample, pos)) + } else { + break; } }); + while indexes.lock().unwrap().is_empty() { + sleep(Duration::from_millis(100)); + } bencher.iter(|| { - let _rlock = vec.read().unwrap(); - let len = count1.load(Ordering::Relaxed); - assert!(len < size * 2); + let len = indexes.lock().unwrap().len(); + let random_index: usize = thread_rng().gen_range(0, len); + let (sample, pos) = indexes.lock().unwrap().get(random_index).unwrap().clone(); + let account = vec.get_account(pos); + let test = create_test_account(sample); + assert_eq!(*account, test); }); - std::fs::remove_file(path).unwrap(); } #[bench] -fn append_vec_concurrent_get_append(bencher: &mut Bencher) { - let path = get_append_vec_bench_path("bench_get_append"); - let vec = Arc::new(RwLock::new(AppendVec::::new( - &path, true, START_SIZE, INC_SIZE, - ))); +fn append_vec_concurrent_read_append(bencher: &mut Bencher) { + let path = get_append_vec_path("concurrent_read"); + let vec = Arc::new(AppendVec::new(&path.path, true, 1024 * 1024)); let vec1 = vec.clone(); - let size = 1_000_000; - let count = Arc::new(AtomicUsize::new(0)); - let count1 = count.clone(); + let indexes: Arc>> = Arc::new(Mutex::new(vec![])); + let indexes1 = indexes.clone(); spawn(move || loop { - let mut len = count.load(Ordering::Relaxed); - { - let rlock = vec1.read().unwrap(); - loop { - if rlock.append(AtomicUsize::new(0)).is_none() { - break; - } - len = count.fetch_add(1, Ordering::Relaxed); - } - if len >= size { - break; - } - } - { - let mut wlock = vec1.write().unwrap(); - if len >= size { - break; - } - assert!(wlock.grow_file().is_ok()); - } + let len = indexes1.lock().unwrap().len(); + let random_index: usize = thread_rng().gen_range(0, len + 1); + let (sample, pos) = indexes1 + .lock() + .unwrap() + .get(random_index % len) + .unwrap() + .clone(); + let account = vec1.get_account(pos); + let test = create_test_account(sample); + assert_eq!(*account, test); }); bencher.iter(|| { - let rlock = vec.read().unwrap(); - let len = count1.load(Ordering::Relaxed); - if len > 0 { - let index = thread_rng().gen_range(0, len); - rlock.get((index * std::mem::size_of::()) as u64); + let sample: usize = thread_rng().gen_range(0, 256); + let account = create_test_account(sample); + if let Some(pos) = vec.append_account(&account) { + indexes.lock().unwrap().push((sample, pos)) } }); - std::fs::remove_file(path).unwrap(); -} - -#[bench] -fn bench_account_serialize(bencher: &mut Bencher) { - let num: usize = 1000; - let account = Account::new(2, 100, &Pubkey::new_rand()); - let len = get_serialized_size(&account); - let ser_len = align_up!(len + std::mem::size_of::(), std::mem::size_of::()); - let mut memory = test::black_box(vec![0; num * ser_len]); - bencher.iter(|| { - for i in 0..num { - let start = i * ser_len; - serialize_account(&mut memory[start..start + ser_len], &account, len); - } - }); - - let index = thread_rng().gen_range(0, num); - let start = index * ser_len; - let new_account = deserialize_account(&memory[start..start + ser_len], 0, num * len).unwrap(); - assert_eq!(new_account, account); -} - -#[bench] -fn bench_account_serialize_bincode(bencher: &mut Bencher) { - let num: usize = 1000; - let account = Account::new(2, 100, &Pubkey::new_rand()); - let len = serialized_size(&account).unwrap() as usize; - let mut memory = test::black_box(vec![0u8; num * len]); - bencher.iter(|| { - for i in 0..num { - let start = i * len; - let cursor = Cursor::new(&mut memory[start..start + len]); - serialize_into(cursor, &account).unwrap(); - } - }); - - let index = thread_rng().gen_range(0, len); - let start = index * len; - let new_account: Account = deserialize(&memory[start..start + len]).unwrap(); - assert_eq!(new_account, account); } diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 21668adc0..847cd0bcb 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -18,7 +18,7 @@ use std::env; use std::fs::{create_dir_all, remove_dir_all}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Mutex, RwLock}; pub type InstructionAccounts = Vec; pub type InstructionLoaders = Vec>; @@ -101,7 +101,7 @@ struct AccountInfo { id: AppendVecId, /// offset into the storage - offset: u64, + offset: usize, /// lamports in the account used when squashing kept for optimization /// purposes to remove accounts with zero balance. @@ -127,7 +127,7 @@ struct AccountIndex { /// Persistent storage structure holding the accounts struct AccountStorageEntry { /// storage holding the accounts - accounts: Arc>>, + accounts: AppendVec, /// Keeps track of the number of accounts stored in a specific AppendVec. /// This is periodically checked to reuse the stores that do not have @@ -139,17 +139,12 @@ struct AccountStorageEntry { } impl AccountStorageEntry { - pub fn new(path: &str, id: usize, file_size: u64, inc_size: u64) -> Self { + pub fn new(path: &str, id: usize, file_size: u64) -> Self { let p = format!("{}/{}", path, id); let path = Path::new(&p); let _ignored = remove_dir_all(path); create_dir_all(path).expect("Create directory failed"); - let accounts = Arc::new(RwLock::new(AppendVec::::new( - &path.join(ACCOUNT_DATA_FILE), - true, - file_size, - inc_size, - ))); + let accounts = AppendVec::new(&path.join(ACCOUNT_DATA_FILE), true, file_size as usize); AccountStorageEntry { accounts, @@ -172,7 +167,7 @@ impl AccountStorageEntry { fn remove_account(&self) { if self.count.fetch_sub(1, Ordering::Relaxed) == 1 { - self.accounts.write().unwrap().reset(); + self.accounts.reset(); self.set_status(AccountStorageStatus::StorageAvailable); } } @@ -200,9 +195,6 @@ pub struct AccountsDB { /// Starting file size of appendvecs file_size: u64, - - /// Increment size of appendvecs - inc_size: u64, } /// This structure handles synchronization for db @@ -241,7 +233,7 @@ impl Drop for Accounts { } impl AccountsDB { - pub fn new_with_file_size(fork: Fork, paths: &str, file_size: u64, inc_size: u64) -> Self { + pub fn new_with_file_size(fork: Fork, paths: &str, file_size: u64) -> Self { let account_index = AccountIndex { account_maps: RwLock::new(HashMap::new()), }; @@ -253,7 +245,6 @@ impl AccountsDB { parents_map: RwLock::new(HashMap::new()), paths, file_size, - inc_size, }; accounts_db.add_storage(&accounts_db.paths); accounts_db.add_fork(fork, None); @@ -261,7 +252,7 @@ impl AccountsDB { } pub fn new(fork: Fork, paths: &str) -> Self { - Self::new_with_file_size(fork, paths, ACCOUNT_DATA_FILE_SIZE, 0) + Self::new_with_file_size(fork, paths, ACCOUNT_DATA_FILE_SIZE) } pub fn add_fork(&self, fork: Fork, parent: Option) { @@ -287,7 +278,6 @@ impl AccountsDB { path, self.next_id.fetch_add(1, Ordering::Relaxed), self.file_size, - self.inc_size, ) } @@ -329,10 +319,11 @@ impl AccountsDB { Some(hash(&serialize(&ordered_accounts).unwrap())) } - fn get_account(&self, id: AppendVecId, offset: u64) -> Account { - let accounts = &self.storage.read().unwrap()[id].accounts; - let av = accounts.read().unwrap(); - av.get_account(offset).unwrap() + fn get_account(&self, id: AppendVecId, offset: usize) -> Account { + self.storage.read().unwrap()[id] + .accounts + .get_account(offset) + .clone() } fn load(&self, fork: Fork, pubkey: &Pubkey, walk_back: bool) -> Option { @@ -440,8 +431,8 @@ impl AccountsDB { id } - fn append_account(&self, account: &Account) -> (usize, u64) { - let offset: u64; + fn append_account(&self, account: &Account) -> (usize, usize) { + let offset: usize; let start = self.next_id.fetch_add(1, Ordering::Relaxed); let mut id = self.get_storage_id(start, std::usize::MAX); @@ -454,10 +445,10 @@ impl AccountsDB { } loop { - let result: Option; + let result: Option; { let av = &self.storage.read().unwrap()[id].accounts; - result = av.read().unwrap().append_account(acc); + result = av.append_account(acc); } if let Some(val) = result { offset = val; @@ -1628,7 +1619,7 @@ mod tests { fn test_account_grow_many() { let paths = get_tmp_accounts_path("many2,many3"); let size = 4096; - let accounts = AccountsDB::new_with_file_size(0, &paths.paths, size, 0); + let accounts = AccountsDB::new_with_file_size(0, &paths.paths, size); let mut keys = vec![]; for i in 0..9 { let key = Pubkey::new_rand(); diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index 9ac2f8e1f..9bbcd80c9 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -1,358 +1,276 @@ -use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; -use memmap::{Mmap, MmapMut}; +use memmap::MmapMut; use solana_sdk::account::Account; -use solana_sdk::pubkey::Pubkey; -use std::fs::{File, OpenOptions}; -use std::io::{Error, ErrorKind, Read, Result, Seek, SeekFrom, Write}; -use std::marker::PhantomData; +use std::fs::OpenOptions; +use std::io::{Seek, SeekFrom, Write}; use std::mem; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; -const SIZEOF_U64: usize = mem::size_of::(); - +//Data is aligned at the next 64 byte offset. Without alignment loading the memory may +//crash on some architectures. macro_rules! align_up { ($addr: expr, $align: expr) => { ($addr + ($align - 1)) & !($align - 1) }; } -pub struct AppendVec { - data: File, - mmap: Mmap, - current_offset: AtomicUsize, - mmap_mut: Mutex, +pub struct AppendVec { + map: MmapMut, + // This mutex forces append to be single threaded, but concurrent with reads + append_offset: Mutex, + current_len: AtomicUsize, file_size: u64, - inc_size: u64, - phantom: PhantomData, } -fn get_account_size_static() -> usize { - mem::size_of::() - mem::size_of::>() -} - -pub fn get_serialized_size(account: &Account) -> usize { - get_account_size_static() + account.data.len() -} - -pub fn serialize_account(dst_slice: &mut [u8], account: &Account, len: usize) { - let mut at = 0; - - write_u64(&mut at, dst_slice, len as u64); - write_u64(&mut at, dst_slice, account.lamports); - write_bytes(&mut at, dst_slice, &account.data); - write_bytes(&mut at, dst_slice, account.owner.as_ref()); - write_bytes(&mut at, dst_slice, &[account.executable as u8]); -} - -fn read_bytes(at: &mut usize, dst_slice: &mut [u8], src_slice: &[u8], len: usize) { - let data = &src_slice[*at..*at + len]; - (&data[..]).read_exact(&mut dst_slice[..]).unwrap(); - *at += len; -} - -fn write_bytes(at: &mut usize, dst_slice: &mut [u8], src_slice: &[u8]) { - let data = &mut dst_slice[*at..*at + src_slice.len()]; - (&mut data[..]).write_all(&src_slice).unwrap(); - *at += src_slice.len(); -} - -fn read_u64(at: &mut usize, src_slice: &[u8]) -> u64 { - let data = &src_slice[*at..*at + mem::size_of::()]; - *at += mem::size_of::(); - (&data[..]).read_u64::().unwrap() -} - -fn write_u64(at: &mut usize, dst_slice: &mut [u8], value: u64) { - let data = &mut dst_slice[*at..*at + mem::size_of::()]; - (&mut data[..]).write_u64::(value).unwrap(); - *at += mem::size_of::(); -} - -pub fn deserialize_account( - src_slice: &[u8], - index: usize, - current_offset: usize, -) -> Result { - let mut at = index; - - let size = read_u64(&mut at, &src_slice); - let len = size as usize; - assert!(current_offset >= at + len); - - let lamports = read_u64(&mut at, &src_slice); - - let data_len = len - get_account_size_static(); - let mut data = vec![0; data_len]; - read_bytes(&mut at, &mut data, &src_slice, data_len); - - let mut pubkey = vec![0; mem::size_of::()]; - read_bytes(&mut at, &mut pubkey, &src_slice, mem::size_of::()); - let owner = Pubkey::new(&pubkey); - - let mut exec = vec![0; mem::size_of::()]; - read_bytes(&mut at, &mut exec, &src_slice, mem::size_of::()); - let executable: bool = exec[0] != 0; - - Ok(Account { - lamports, - data, - owner, - executable, - }) -} - -impl AppendVec -where - T: Default, -{ - pub fn new(path: &Path, create: bool, size: u64, inc: u64) -> Self { +impl AppendVec { + #[allow(clippy::mutex_atomic)] + pub fn new(file: &Path, create: bool, size: usize) -> Self { let mut data = OpenOptions::new() .read(true) .write(true) .create(create) - .open(path) + .open(file) .expect("Unable to open data file"); - data.seek(SeekFrom::Start(size)).unwrap(); + data.seek(SeekFrom::Start(size as u64)).unwrap(); data.write_all(&[0]).unwrap(); data.seek(SeekFrom::Start(0)).unwrap(); data.flush().unwrap(); - let mmap = unsafe { Mmap::map(&data).expect("failed to map the data file") }; - let mmap_mut = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") }; + let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") }; AppendVec { - data, - mmap, - current_offset: AtomicUsize::new(0), - mmap_mut: Mutex::new(mmap_mut), - file_size: size, - inc_size: inc, - phantom: PhantomData, + map, + // This mutex forces append to be single threaded, but concurrent with reads + append_offset: Mutex::new(0), + current_len: AtomicUsize::new(0), + file_size: size as u64, } } - pub fn reset(&mut self) { - let _mmap_mut = self.mmap_mut.lock().unwrap(); - self.current_offset.store(0, Ordering::Relaxed); + #[allow(clippy::mutex_atomic)] + pub fn reset(&self) { + // This mutex forces append to be single threaded, but concurrent with reads + let mut offset = self.append_offset.lock().unwrap(); + self.current_len.store(0, Ordering::Relaxed); + *offset = 0; } - #[allow(dead_code)] - pub fn get(&self, index: u64) -> &T { - let offset = self.current_offset.load(Ordering::Relaxed); - let at = index as usize; - assert!(offset >= at + mem::size_of::()); - let data = &self.mmap[at..at + mem::size_of::()]; - let ptr = data.as_ptr() as *const T; - let x: Option<&T> = unsafe { ptr.as_ref() }; - x.unwrap() + pub fn len(&self) -> usize { + self.current_len.load(Ordering::Relaxed) } - #[allow(dead_code)] - pub fn grow_file(&mut self) -> Result<()> { - if self.inc_size == 0 { - return Err(Error::new(ErrorKind::WriteZero, "Grow not supported")); - } - let mut mmap_mut = self.mmap_mut.lock().unwrap(); - let index = self.current_offset.load(Ordering::Relaxed) + mem::size_of::(); - if index as u64 + self.inc_size < self.file_size { - // grow was already called - return Ok(()); - } - let end = self.file_size + self.inc_size; - drop(mmap_mut.to_owned()); - drop(self.mmap.to_owned()); - self.data.seek(SeekFrom::Start(end))?; - self.data.write_all(&[0])?; - self.mmap = unsafe { Mmap::map(&self.data)? }; - *mmap_mut = unsafe { MmapMut::map_mut(&self.data)? }; - self.file_size = end; - Ok(()) + pub fn is_empty(&self) -> bool { + self.len() == 0 } - #[allow(dead_code)] - pub fn append(&self, val: T) -> Option { - let mmap_mut = self.mmap_mut.lock().unwrap(); - let index = self.current_offset.load(Ordering::Relaxed); + pub fn capacity(&self) -> u64 { + self.file_size + } - if (self.file_size as usize) < index + mem::size_of::() { - return None; - } - - let data = &mmap_mut[index..(index + mem::size_of::())]; + // The reason for the `mut` is to allow the account data pointer to be fixed up after + // the structure is loaded + #[allow(clippy::mut_from_ref)] + fn get_slice(&self, offset: usize, size: usize) -> &mut [u8] { + let len = self.len(); + assert!(len >= offset + size); + let data = &self.map[offset..offset + size]; unsafe { - let ptr = data.as_ptr() as *mut T; - std::ptr::write(ptr, val) + let dst = data.as_ptr() as *mut u8; + std::slice::from_raw_parts_mut(dst, size) + } + } + + fn append_ptr(&self, offset: &mut usize, src: *const u8, len: usize) { + //Data is aligned at the next 64 byte offset. Without alignment loading the memory may + //crash on some architectures. + let pos = align_up!(*offset as usize, mem::size_of::()); + let data = &self.map[pos..(pos + len)]; + unsafe { + let dst = data.as_ptr() as *mut u8; + std::ptr::copy(src, dst, len); }; - self.current_offset - .fetch_add(mem::size_of::(), Ordering::Relaxed); - Some(index as u64) + *offset = pos + len; } - pub fn get_account(&self, index: u64) -> Result { - let index = index as usize; - deserialize_account( - &self.mmap[..], - index, - self.current_offset.load(Ordering::Relaxed), - ) - } + #[allow(clippy::mutex_atomic)] + fn append_ptrs(&self, vals: &[(*const u8, usize)]) -> Option { + // This mutex forces append to be single threaded, but concurrent with reads + let mut offset = self.append_offset.lock().unwrap(); + let mut end = *offset; + for val in vals { + //Data is aligned at the next 64 byte offset. Without alignment loading the memory may + //crash on some architectures. + end = align_up!(end, mem::size_of::()); + end += val.1; + } - pub fn append_account(&self, account: &Account) -> Option { - let mut mmap_mut = self.mmap_mut.lock().unwrap(); - let data_at = align_up!( - self.current_offset.load(Ordering::Relaxed), - mem::size_of::() - ); - let len = get_serialized_size(account); - - if (self.file_size as usize) < data_at + len + SIZEOF_U64 { + if (self.file_size as usize) <= end { return None; } - serialize_account( - &mut mmap_mut[data_at..data_at + len + SIZEOF_U64], - &account, - len, - ); + //Data is aligned at the next 64 byte offset. Without alignment loading the memory may + //crash on some architectures. + let pos = align_up!(*offset, mem::size_of::()); + for val in vals { + self.append_ptr(&mut offset, val.0, val.1) + } + self.current_len.store(*offset, Ordering::Relaxed); + Some(pos) + } - self.current_offset - .store(data_at + len + SIZEOF_U64, Ordering::Relaxed); - Some(data_at as u64) + #[allow(clippy::transmute_ptr_to_ptr)] + pub fn get_account(&self, offset: usize) -> &Account { + let account: *mut Account = { + let data = self.get_slice(offset, mem::size_of::()); + unsafe { std::mem::transmute::<*const u8, *mut Account>(data.as_ptr()) } + }; + //Data is aligned at the next 64 byte offset. Without alignment loading the memory may + //crash on some architectures. + let data_at = align_up!(offset + mem::size_of::(), mem::size_of::()); + let account_ref: &mut Account = unsafe { &mut *account }; + let data = self.get_slice(data_at, account_ref.data.len()); + unsafe { + let mut new_data = Vec::from_raw_parts(data.as_mut_ptr(), data.len(), data.len()); + std::mem::swap(&mut account_ref.data, &mut new_data); + std::mem::forget(new_data); + }; + account_ref + } + + pub fn accounts(&self, mut start: usize) -> Vec<&Account> { + let mut accounts = vec![]; + loop { + //Data is aligned at the next 64 byte offset. Without alignment loading the memory may + //crash on some architectures. + let end = align_up!(start + mem::size_of::(), mem::size_of::()); + if end > self.len() { + break; + } + let first = self.get_account(start); + accounts.push(first); + //Data is aligned at the next 64 byte offset. Without alignment loading the memory may + //crash on some architectures. + let data_at = align_up!(start + mem::size_of::(), mem::size_of::()); + let next = align_up!(data_at + first.data.len(), mem::size_of::()); + start = next; + } + accounts + } + + pub fn append_account(&self, account: &Account) -> Option { + let acc_ptr = account as *const Account; + let data_len = account.data.len(); + let data_ptr = account.data.as_ptr(); + let ptrs = [ + (acc_ptr as *const u8, mem::size_of::()), + (data_ptr, data_len), + ]; + self.append_ptrs(&ptrs) + } +} + +pub mod test_utils { + use solana_sdk::account::Account; + use solana_sdk::pubkey::Pubkey; + use std::fs::{create_dir_all, remove_dir_all}; + use std::path::PathBuf; + + pub struct TempFile { + pub path: PathBuf, + } + + impl Drop for TempFile { + fn drop(&mut self) { + let mut path = PathBuf::new(); + std::mem::swap(&mut path, &mut self.path); + std::fs::remove_file(path).unwrap(); + } + } + + pub fn get_append_vec_path(path: &str) -> TempFile { + let out_dir = std::env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); + let mut buf = PathBuf::new(); + buf.push(&format!("{}/{}", out_dir, path)); + let _ignored = remove_dir_all(out_dir.clone()); + create_dir_all(out_dir).expect("Create directory failed"); + TempFile { path: buf } + } + + pub fn create_test_account(sample: usize) -> Account { + let data_len = sample % 256; + let mut account = Account::new(sample as u64, 0, &Pubkey::default()); + account.data = (0..data_len).map(|_| data_len as u8).collect(); + account } } #[cfg(test)] pub mod tests { + use super::test_utils::*; use super::*; use log::*; use rand::{thread_rng, Rng}; - use solana_sdk::timing::{duration_as_ms, duration_as_s}; - use std::sync::atomic::{AtomicUsize, Ordering}; + use solana_sdk::timing::duration_as_ms; use std::time::Instant; - const START_SIZE: u64 = 4 * 1024 * 1024; - const INC_SIZE: u64 = 1 * 1024 * 1024; - #[test] fn test_append_vec() { - let path = Path::new("append_vec"); - let av = AppendVec::new(path, true, START_SIZE, INC_SIZE); - let val: u64 = 5; - let index = av.append(val).unwrap(); - assert_eq!(*av.get(index), val); - let val1 = val + 1; - let index1 = av.append(val1).unwrap(); - assert_eq!(*av.get(index), val); - assert_eq!(*av.get(index1), val1); - std::fs::remove_file(path).unwrap(); + let path = get_append_vec_path("test_append"); + let av = AppendVec::new(&path.path, true, 1024 * 1024); + let account = create_test_account(0); + let index = av.append_account(&account).unwrap(); + assert_eq!(*av.get_account(index), account); } #[test] - fn test_append_vec_account() { - let path = Path::new("append_vec_account"); - let av: AppendVec = AppendVec::new(path, true, START_SIZE, INC_SIZE); - let v1 = vec![1u8; 32]; - let mut account1 = Account { - lamports: 1, - data: v1, - owner: Pubkey::default(), - executable: false, - }; + fn test_append_vec_data() { + let path = get_append_vec_path("test_append_data"); + let av = AppendVec::new(&path.path, true, 1024 * 1024); + let account = create_test_account(5); + let index = av.append_account(&account).unwrap(); + assert_eq!(*av.get_account(index), account); + let account1 = create_test_account(6); let index1 = av.append_account(&account1).unwrap(); - assert_eq!(index1, 0); - assert_eq!(av.get_account(index1).unwrap(), account1); - - let v2 = vec![4u8; 32]; - let mut account2 = Account { - lamports: 1, - data: v2, - owner: Pubkey::default(), - executable: false, - }; - let index2 = av.append_account(&account2).unwrap(); - let mut len = get_serialized_size(&account1) + SIZEOF_U64 as usize; - assert_eq!(index2, len as u64); - assert_eq!(av.get_account(index2).unwrap(), account2); - assert_eq!(av.get_account(index1).unwrap(), account1); - - account2.data.iter_mut().for_each(|e| *e *= 2); - let index3 = av.append_account(&account2).unwrap(); - len += get_serialized_size(&account2) + SIZEOF_U64 as usize; - assert_eq!(index3, len as u64); - assert_eq!(av.get_account(index3).unwrap(), account2); - - account1.data.extend([1, 2, 3, 4, 5, 6].iter().cloned()); - let index4 = av.append_account(&account1).unwrap(); - len += get_serialized_size(&account2) + SIZEOF_U64 as usize; - assert_eq!(index4, len as u64); - assert_eq!(av.get_account(index4).unwrap(), account1); - std::fs::remove_file(path).unwrap(); + assert_eq!(*av.get_account(index), account); + assert_eq!(*av.get_account(index1), account1); } #[test] - fn test_grow_append_vec() { - let path = Path::new("grow"); - let mut av = AppendVec::new(path, true, START_SIZE, INC_SIZE); - let mut val = [5u64; 32]; - let size = 100_000; - let mut offsets = vec![0; size]; + fn test_append_vec_append_many() { + let path = get_append_vec_path("test_append_many"); + let av = AppendVec::new(&path.path, true, 1024 * 1024); + let size = 1000; + let mut indexes = vec![]; + let now = Instant::now(); + for sample in 0..size { + let account = create_test_account(sample); + let pos = av.append_account(&account).unwrap(); + assert_eq!(*av.get_account(pos), account); + indexes.push(pos) + } + trace!("append time: {} ms", duration_as_ms(&now.elapsed()),); let now = Instant::now(); - for index in 0..size { - if let Some(offset) = av.append(val) { - offsets[index] = offset; - } else { - assert!(av.grow_file().is_ok()); - if let Some(offset) = av.append(val) { - offsets[index] = offset; - } else { - assert!(false); - } - } - val[0] += 1; + for _ in 0..size { + let sample = thread_rng().gen_range(0, indexes.len()); + let account = create_test_account(sample); + assert_eq!(*av.get_account(indexes[sample]), account); } - info!( - "time: {} ms {} / s", - duration_as_ms(&now.elapsed()), - ((mem::size_of::<[u64; 32]>() * size) as f32) / duration_as_s(&now.elapsed()), - ); + trace!("random read time: {} ms", duration_as_ms(&now.elapsed()),); let now = Instant::now(); - let num_reads = 100_000; - for _ in 0..num_reads { - let index = thread_rng().gen_range(0, size); - assert_eq!(av.get(offsets[index])[0], (index + 5) as u64); + assert_eq!(indexes.len(), size); + assert_eq!(indexes[0], 0); + let accounts = av.accounts(indexes[0]); + assert_eq!(accounts.len(), size); + for (sample, v) in accounts.iter().enumerate() { + let account = create_test_account(sample); + assert_eq!(**v, account) } - info!( - "time: {} ms {} / s", + trace!( + "sequential read time: {} ms", duration_as_ms(&now.elapsed()), - (num_reads as f32) / duration_as_s(&now.elapsed()), ); - std::fs::remove_file(path).unwrap(); - } - - #[test] - fn random_atomic_change() { - let path = Path::new("random"); - let mut vec = AppendVec::::new(path, true, START_SIZE, INC_SIZE); - let size = 1_000; - for k in 0..size { - if vec.append(AtomicUsize::new(k)).is_none() { - assert!(vec.grow_file().is_ok()); - assert!(vec.append(AtomicUsize::new(0)).is_some()); - } - } - let index = thread_rng().gen_range(0, size as u64); - let atomic1 = vec.get(index * mem::size_of::() as u64); - let current1 = atomic1.load(Ordering::Relaxed); - assert_eq!(current1, index as usize); - let next = current1 + 1; - let index = vec.append(AtomicUsize::new(next)).unwrap(); - let atomic2 = vec.get(index); - let current2 = atomic2.load(Ordering::Relaxed); - assert_eq!(current2, next); - std::fs::remove_file(path).unwrap(); } }