AppendVec PR with using "/tmp" as the default directory and a random file (#3743)

* AppendVec with raw pointers
* fixed test target directory
This commit is contained in:
anatoly yakovenko 2019-04-12 04:30:17 -07:00 committed by GitHub
parent 29dc139a22
commit 8b08fe265a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 304 additions and 513 deletions

View File

@ -1,241 +1,120 @@
#![feature(test)] #![feature(test)]
extern crate rand;
extern crate test; extern crate test;
use bincode::{deserialize, serialize_into, serialized_size};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use solana_runtime::append_vec::{ use solana_runtime::append_vec::test_utils::{create_test_account, get_append_vec_path};
deserialize_account, get_serialized_size, serialize_account, AppendVec, use solana_runtime::append_vec::AppendVec;
}; use std::sync::{Arc, Mutex};
use solana_sdk::account::Account; use std::thread::sleep;
use solana_sdk::pubkey::Pubkey;
use std::env;
use std::fs::{create_dir_all, remove_dir_all};
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::spawn; use std::thread::spawn;
use std::time::Duration;
use test::Bencher; use test::Bencher;
const START_SIZE: u64 = 4 * 1024 * 1024;
const INC_SIZE: u64 = 1 * 1024 * 1024;
macro_rules! align_up {
($addr: expr, $align: expr) => {
($addr + ($align - 1)) & !($align - 1)
};
}
fn get_append_vec_bench_path(path: &str) -> PathBuf {
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let mut buf = PathBuf::new();
buf.push(&format!("{}/{}", out_dir, path));
let _ignored = remove_dir_all(out_dir.clone());
create_dir_all(out_dir).expect("Create directory failed");
buf
}
#[bench] #[bench]
fn append_vec_atomic_append(bencher: &mut Bencher) { fn append_vec_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_append"); let path = get_append_vec_path("bench_append");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE); let vec = AppendVec::new(&path.path, true, 64 * 1024);
bencher.iter(|| { bencher.iter(|| {
if vec.append(AtomicUsize::new(0)).is_none() { let account = create_test_account(0);
assert!(vec.grow_file().is_ok()); if vec.append_account(&account).is_none() {
assert!(vec.append(AtomicUsize::new(0)).is_some()); vec.reset();
} }
}); });
std::fs::remove_file(path).unwrap(); }
fn add_test_accounts(vec: &AppendVec, size: usize) -> Vec<(usize, usize)> {
(0..size)
.into_iter()
.filter_map(|sample| {
let account = create_test_account(sample);
vec.append_account(&account).map(|pos| (sample, pos))
})
.collect()
} }
#[bench] #[bench]
fn append_vec_atomic_random_access(bencher: &mut Bencher) { fn append_vec_sequential_read(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_ra"); let path = get_append_vec_path("seq_read");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE); let vec = AppendVec::new(&path.path, true, 64 * 1024);
let size = 1_000_000; let size = 1_000;
for _ in 0..size { let mut indexes = add_test_accounts(&vec, size);
if vec.append(AtomicUsize::new(0)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(0)).is_some());
}
}
bencher.iter(|| { bencher.iter(|| {
let index = thread_rng().gen_range(0, size as u64); let (sample, pos) = indexes.pop().unwrap();
vec.get(index * std::mem::size_of::<AtomicUsize>() as u64); let account = vec.get_account(pos);
let test = create_test_account(sample);
assert_eq!(*account, test);
indexes.push((sample, pos));
}); });
std::fs::remove_file(path).unwrap();
} }
#[bench] #[bench]
fn append_vec_atomic_random_change(bencher: &mut Bencher) { fn append_vec_random_read(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_rax"); let path = get_append_vec_path("random_read");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE); let vec = AppendVec::new(&path.path, true, 64 * 1024);
let size = 1_000_000; let size = 1_000;
for k in 0..size { let indexes = add_test_accounts(&vec, size);
if vec.append(AtomicUsize::new(k)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(k)).is_some());
}
}
bencher.iter(|| { bencher.iter(|| {
let index = thread_rng().gen_range(0, size as u64); let random_index: usize = thread_rng().gen_range(0, indexes.len());
let atomic1 = vec.get(index * std::mem::size_of::<AtomicUsize>() as u64); let (sample, pos) = &indexes[random_index];
let current1 = atomic1.load(Ordering::Relaxed); let account = vec.get_account(*pos);
assert_eq!(current1, index as usize); let test = create_test_account(*sample);
let next = current1 + 1; assert_eq!(*account, test);
let mut index = vec.append(AtomicUsize::new(next));
if index.is_none() {
assert!(vec.grow_file().is_ok());
index = vec.append(AtomicUsize::new(next));
}
let atomic2 = vec.get(index.unwrap());
let current2 = atomic2.load(Ordering::Relaxed);
assert_eq!(current2, next);
}); });
std::fs::remove_file(path).unwrap();
} }
#[bench] #[bench]
fn append_vec_atomic_random_read(bencher: &mut Bencher) { fn append_vec_concurrent_append_read(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_read"); let path = get_append_vec_path("concurrent_read");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE); let vec = Arc::new(AppendVec::new(&path.path, true, 1024 * 1024));
let size = 1_000_000;
for _ in 0..size {
if vec.append(AtomicUsize::new(0)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(0)).is_some());
}
}
bencher.iter(|| {
let index = thread_rng().gen_range(0, size);
let atomic1 = vec.get((index * std::mem::size_of::<AtomicUsize>()) as u64);
let current1 = atomic1.load(Ordering::Relaxed);
assert_eq!(current1, 0);
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn append_vec_concurrent_lock_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_lock_append");
let vec = Arc::new(RwLock::new(AppendVec::<AtomicUsize>::new(
&path, true, START_SIZE, INC_SIZE,
)));
let vec1 = vec.clone(); let vec1 = vec.clone();
let size = 1_000_000; let indexes: Arc<Mutex<Vec<(usize, usize)>>> = Arc::new(Mutex::new(vec![]));
let count = Arc::new(AtomicUsize::new(0)); let indexes1 = indexes.clone();
let count1 = count.clone();
spawn(move || loop { spawn(move || loop {
let mut len = count.load(Ordering::Relaxed); let sample = indexes1.lock().unwrap().len();
{ let account = create_test_account(sample);
let rlock = vec1.read().unwrap(); if let Some(pos) = vec1.append_account(&account) {
loop { indexes1.lock().unwrap().push((sample, pos))
if rlock.append(AtomicUsize::new(0)).is_none() { } else {
break; break;
} }
len = count.fetch_add(1, Ordering::Relaxed);
}
if len >= size {
break;
}
}
{
let mut wlock = vec1.write().unwrap();
if len >= size {
break;
}
assert!(wlock.grow_file().is_ok());
}
}); });
while indexes.lock().unwrap().is_empty() {
sleep(Duration::from_millis(100));
}
bencher.iter(|| { bencher.iter(|| {
let _rlock = vec.read().unwrap(); let len = indexes.lock().unwrap().len();
let len = count1.load(Ordering::Relaxed); let random_index: usize = thread_rng().gen_range(0, len);
assert!(len < size * 2); let (sample, pos) = indexes.lock().unwrap().get(random_index).unwrap().clone();
let account = vec.get_account(pos);
let test = create_test_account(sample);
assert_eq!(*account, test);
}); });
std::fs::remove_file(path).unwrap();
} }
#[bench] #[bench]
fn append_vec_concurrent_get_append(bencher: &mut Bencher) { fn append_vec_concurrent_read_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_get_append"); let path = get_append_vec_path("concurrent_read");
let vec = Arc::new(RwLock::new(AppendVec::<AtomicUsize>::new( let vec = Arc::new(AppendVec::new(&path.path, true, 1024 * 1024));
&path, true, START_SIZE, INC_SIZE,
)));
let vec1 = vec.clone(); let vec1 = vec.clone();
let size = 1_000_000; let indexes: Arc<Mutex<Vec<(usize, usize)>>> = Arc::new(Mutex::new(vec![]));
let count = Arc::new(AtomicUsize::new(0)); let indexes1 = indexes.clone();
let count1 = count.clone();
spawn(move || loop { spawn(move || loop {
let mut len = count.load(Ordering::Relaxed); let len = indexes1.lock().unwrap().len();
{ let random_index: usize = thread_rng().gen_range(0, len + 1);
let rlock = vec1.read().unwrap(); let (sample, pos) = indexes1
loop { .lock()
if rlock.append(AtomicUsize::new(0)).is_none() { .unwrap()
break; .get(random_index % len)
} .unwrap()
len = count.fetch_add(1, Ordering::Relaxed); .clone();
} let account = vec1.get_account(pos);
if len >= size { let test = create_test_account(sample);
break; assert_eq!(*account, test);
}
}
{
let mut wlock = vec1.write().unwrap();
if len >= size {
break;
}
assert!(wlock.grow_file().is_ok());
}
}); });
bencher.iter(|| { bencher.iter(|| {
let rlock = vec.read().unwrap(); let sample: usize = thread_rng().gen_range(0, 256);
let len = count1.load(Ordering::Relaxed); let account = create_test_account(sample);
if len > 0 { if let Some(pos) = vec.append_account(&account) {
let index = thread_rng().gen_range(0, len); indexes.lock().unwrap().push((sample, pos))
rlock.get((index * std::mem::size_of::<AtomicUsize>()) as u64);
} }
}); });
std::fs::remove_file(path).unwrap();
}
#[bench]
fn bench_account_serialize(bencher: &mut Bencher) {
let num: usize = 1000;
let account = Account::new(2, 100, &Pubkey::new_rand());
let len = get_serialized_size(&account);
let ser_len = align_up!(len + std::mem::size_of::<u64>(), std::mem::size_of::<u64>());
let mut memory = test::black_box(vec![0; num * ser_len]);
bencher.iter(|| {
for i in 0..num {
let start = i * ser_len;
serialize_account(&mut memory[start..start + ser_len], &account, len);
}
});
let index = thread_rng().gen_range(0, num);
let start = index * ser_len;
let new_account = deserialize_account(&memory[start..start + ser_len], 0, num * len).unwrap();
assert_eq!(new_account, account);
}
#[bench]
fn bench_account_serialize_bincode(bencher: &mut Bencher) {
let num: usize = 1000;
let account = Account::new(2, 100, &Pubkey::new_rand());
let len = serialized_size(&account).unwrap() as usize;
let mut memory = test::black_box(vec![0u8; num * len]);
bencher.iter(|| {
for i in 0..num {
let start = i * len;
let cursor = Cursor::new(&mut memory[start..start + len]);
serialize_into(cursor, &account).unwrap();
}
});
let index = thread_rng().gen_range(0, len);
let start = index * len;
let new_account: Account = deserialize(&memory[start..start + len]).unwrap();
assert_eq!(new_account, account);
} }

View File

@ -18,7 +18,7 @@ use std::env;
use std::fs::{create_dir_all, remove_dir_all}; use std::fs::{create_dir_all, remove_dir_all};
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Mutex, RwLock};
pub type InstructionAccounts = Vec<Account>; pub type InstructionAccounts = Vec<Account>;
pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>; pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>;
@ -101,7 +101,7 @@ struct AccountInfo {
id: AppendVecId, id: AppendVecId,
/// offset into the storage /// offset into the storage
offset: u64, offset: usize,
/// lamports in the account used when squashing kept for optimization /// lamports in the account used when squashing kept for optimization
/// purposes to remove accounts with zero balance. /// purposes to remove accounts with zero balance.
@ -127,7 +127,7 @@ struct AccountIndex {
/// Persistent storage structure holding the accounts /// Persistent storage structure holding the accounts
struct AccountStorageEntry { struct AccountStorageEntry {
/// storage holding the accounts /// storage holding the accounts
accounts: Arc<RwLock<AppendVec<Account>>>, accounts: AppendVec,
/// Keeps track of the number of accounts stored in a specific AppendVec. /// Keeps track of the number of accounts stored in a specific AppendVec.
/// This is periodically checked to reuse the stores that do not have /// This is periodically checked to reuse the stores that do not have
@ -139,17 +139,12 @@ struct AccountStorageEntry {
} }
impl AccountStorageEntry { impl AccountStorageEntry {
pub fn new(path: &str, id: usize, file_size: u64, inc_size: u64) -> Self { pub fn new(path: &str, id: usize, file_size: u64) -> Self {
let p = format!("{}/{}", path, id); let p = format!("{}/{}", path, id);
let path = Path::new(&p); let path = Path::new(&p);
let _ignored = remove_dir_all(path); let _ignored = remove_dir_all(path);
create_dir_all(path).expect("Create directory failed"); create_dir_all(path).expect("Create directory failed");
let accounts = Arc::new(RwLock::new(AppendVec::<Account>::new( let accounts = AppendVec::new(&path.join(ACCOUNT_DATA_FILE), true, file_size as usize);
&path.join(ACCOUNT_DATA_FILE),
true,
file_size,
inc_size,
)));
AccountStorageEntry { AccountStorageEntry {
accounts, accounts,
@ -172,7 +167,7 @@ impl AccountStorageEntry {
fn remove_account(&self) { fn remove_account(&self) {
if self.count.fetch_sub(1, Ordering::Relaxed) == 1 { if self.count.fetch_sub(1, Ordering::Relaxed) == 1 {
self.accounts.write().unwrap().reset(); self.accounts.reset();
self.set_status(AccountStorageStatus::StorageAvailable); self.set_status(AccountStorageStatus::StorageAvailable);
} }
} }
@ -200,9 +195,6 @@ pub struct AccountsDB {
/// Starting file size of appendvecs /// Starting file size of appendvecs
file_size: u64, file_size: u64,
/// Increment size of appendvecs
inc_size: u64,
} }
/// This structure handles synchronization for db /// This structure handles synchronization for db
@ -241,7 +233,7 @@ impl Drop for Accounts {
} }
impl AccountsDB { impl AccountsDB {
pub fn new_with_file_size(fork: Fork, paths: &str, file_size: u64, inc_size: u64) -> Self { pub fn new_with_file_size(fork: Fork, paths: &str, file_size: u64) -> Self {
let account_index = AccountIndex { let account_index = AccountIndex {
account_maps: RwLock::new(HashMap::new()), account_maps: RwLock::new(HashMap::new()),
}; };
@ -253,7 +245,6 @@ impl AccountsDB {
parents_map: RwLock::new(HashMap::new()), parents_map: RwLock::new(HashMap::new()),
paths, paths,
file_size, file_size,
inc_size,
}; };
accounts_db.add_storage(&accounts_db.paths); accounts_db.add_storage(&accounts_db.paths);
accounts_db.add_fork(fork, None); accounts_db.add_fork(fork, None);
@ -261,7 +252,7 @@ impl AccountsDB {
} }
pub fn new(fork: Fork, paths: &str) -> Self { pub fn new(fork: Fork, paths: &str) -> Self {
Self::new_with_file_size(fork, paths, ACCOUNT_DATA_FILE_SIZE, 0) Self::new_with_file_size(fork, paths, ACCOUNT_DATA_FILE_SIZE)
} }
pub fn add_fork(&self, fork: Fork, parent: Option<Fork>) { pub fn add_fork(&self, fork: Fork, parent: Option<Fork>) {
@ -287,7 +278,6 @@ impl AccountsDB {
path, path,
self.next_id.fetch_add(1, Ordering::Relaxed), self.next_id.fetch_add(1, Ordering::Relaxed),
self.file_size, self.file_size,
self.inc_size,
) )
} }
@ -329,10 +319,11 @@ impl AccountsDB {
Some(hash(&serialize(&ordered_accounts).unwrap())) Some(hash(&serialize(&ordered_accounts).unwrap()))
} }
fn get_account(&self, id: AppendVecId, offset: u64) -> Account { fn get_account(&self, id: AppendVecId, offset: usize) -> Account {
let accounts = &self.storage.read().unwrap()[id].accounts; self.storage.read().unwrap()[id]
let av = accounts.read().unwrap(); .accounts
av.get_account(offset).unwrap() .get_account(offset)
.clone()
} }
fn load(&self, fork: Fork, pubkey: &Pubkey, walk_back: bool) -> Option<Account> { fn load(&self, fork: Fork, pubkey: &Pubkey, walk_back: bool) -> Option<Account> {
@ -440,8 +431,8 @@ impl AccountsDB {
id id
} }
fn append_account(&self, account: &Account) -> (usize, u64) { fn append_account(&self, account: &Account) -> (usize, usize) {
let offset: u64; let offset: usize;
let start = self.next_id.fetch_add(1, Ordering::Relaxed); let start = self.next_id.fetch_add(1, Ordering::Relaxed);
let mut id = self.get_storage_id(start, std::usize::MAX); let mut id = self.get_storage_id(start, std::usize::MAX);
@ -454,10 +445,10 @@ impl AccountsDB {
} }
loop { loop {
let result: Option<u64>; let result: Option<usize>;
{ {
let av = &self.storage.read().unwrap()[id].accounts; let av = &self.storage.read().unwrap()[id].accounts;
result = av.read().unwrap().append_account(acc); result = av.append_account(acc);
} }
if let Some(val) = result { if let Some(val) = result {
offset = val; offset = val;
@ -1628,7 +1619,7 @@ mod tests {
fn test_account_grow_many() { fn test_account_grow_many() {
let paths = get_tmp_accounts_path("many2,many3"); let paths = get_tmp_accounts_path("many2,many3");
let size = 4096; let size = 4096;
let accounts = AccountsDB::new_with_file_size(0, &paths.paths, size, 0); let accounts = AccountsDB::new_with_file_size(0, &paths.paths, size);
let mut keys = vec![]; let mut keys = vec![];
for i in 0..9 { for i in 0..9 {
let key = Pubkey::new_rand(); let key = Pubkey::new_rand();

View File

@ -1,358 +1,279 @@
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use memmap::MmapMut;
use memmap::{Mmap, MmapMut};
use solana_sdk::account::Account; use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey; use std::fs::OpenOptions;
use std::fs::{File, OpenOptions}; use std::io::{Seek, SeekFrom, Write};
use std::io::{Error, ErrorKind, Read, Result, Seek, SeekFrom, Write};
use std::marker::PhantomData;
use std::mem; use std::mem;
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex; use std::sync::Mutex;
const SIZEOF_U64: usize = mem::size_of::<u64>(); //Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
macro_rules! align_up { macro_rules! align_up {
($addr: expr, $align: expr) => { ($addr: expr, $align: expr) => {
($addr + ($align - 1)) & !($align - 1) ($addr + ($align - 1)) & !($align - 1)
}; };
} }
pub struct AppendVec<T> { pub struct AppendVec {
data: File, map: MmapMut,
mmap: Mmap, // This mutex forces append to be single threaded, but concurrent with reads
current_offset: AtomicUsize, append_offset: Mutex<usize>,
mmap_mut: Mutex<MmapMut>, current_len: AtomicUsize,
file_size: u64, file_size: u64,
inc_size: u64,
phantom: PhantomData<T>,
} }
fn get_account_size_static() -> usize { impl AppendVec {
mem::size_of::<Account>() - mem::size_of::<Vec<u8>>() #[allow(clippy::mutex_atomic)]
} pub fn new(file: &Path, create: bool, size: usize) -> Self {
pub fn get_serialized_size(account: &Account) -> usize {
get_account_size_static() + account.data.len()
}
pub fn serialize_account(dst_slice: &mut [u8], account: &Account, len: usize) {
let mut at = 0;
write_u64(&mut at, dst_slice, len as u64);
write_u64(&mut at, dst_slice, account.lamports);
write_bytes(&mut at, dst_slice, &account.data);
write_bytes(&mut at, dst_slice, account.owner.as_ref());
write_bytes(&mut at, dst_slice, &[account.executable as u8]);
}
fn read_bytes(at: &mut usize, dst_slice: &mut [u8], src_slice: &[u8], len: usize) {
let data = &src_slice[*at..*at + len];
(&data[..]).read_exact(&mut dst_slice[..]).unwrap();
*at += len;
}
fn write_bytes(at: &mut usize, dst_slice: &mut [u8], src_slice: &[u8]) {
let data = &mut dst_slice[*at..*at + src_slice.len()];
(&mut data[..]).write_all(&src_slice).unwrap();
*at += src_slice.len();
}
fn read_u64(at: &mut usize, src_slice: &[u8]) -> u64 {
let data = &src_slice[*at..*at + mem::size_of::<u64>()];
*at += mem::size_of::<u64>();
(&data[..]).read_u64::<LittleEndian>().unwrap()
}
fn write_u64(at: &mut usize, dst_slice: &mut [u8], value: u64) {
let data = &mut dst_slice[*at..*at + mem::size_of::<u64>()];
(&mut data[..]).write_u64::<LittleEndian>(value).unwrap();
*at += mem::size_of::<u64>();
}
pub fn deserialize_account(
src_slice: &[u8],
index: usize,
current_offset: usize,
) -> Result<Account> {
let mut at = index;
let size = read_u64(&mut at, &src_slice);
let len = size as usize;
assert!(current_offset >= at + len);
let lamports = read_u64(&mut at, &src_slice);
let data_len = len - get_account_size_static();
let mut data = vec![0; data_len];
read_bytes(&mut at, &mut data, &src_slice, data_len);
let mut pubkey = vec![0; mem::size_of::<Pubkey>()];
read_bytes(&mut at, &mut pubkey, &src_slice, mem::size_of::<Pubkey>());
let owner = Pubkey::new(&pubkey);
let mut exec = vec![0; mem::size_of::<bool>()];
read_bytes(&mut at, &mut exec, &src_slice, mem::size_of::<bool>());
let executable: bool = exec[0] != 0;
Ok(Account {
lamports,
data,
owner,
executable,
})
}
impl<T> AppendVec<T>
where
T: Default,
{
pub fn new(path: &Path, create: bool, size: u64, inc: u64) -> Self {
let mut data = OpenOptions::new() let mut data = OpenOptions::new()
.read(true) .read(true)
.write(true) .write(true)
.create(create) .create(create)
.open(path) .open(file)
.expect("Unable to open data file"); .expect("Unable to open data file");
data.seek(SeekFrom::Start(size)).unwrap(); data.seek(SeekFrom::Start(size as u64)).unwrap();
data.write_all(&[0]).unwrap(); data.write_all(&[0]).unwrap();
data.seek(SeekFrom::Start(0)).unwrap(); data.seek(SeekFrom::Start(0)).unwrap();
data.flush().unwrap(); data.flush().unwrap();
let mmap = unsafe { Mmap::map(&data).expect("failed to map the data file") }; let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") };
let mmap_mut = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") };
AppendVec { AppendVec {
data, map,
mmap, // This mutex forces append to be single threaded, but concurrent with reads
current_offset: AtomicUsize::new(0), append_offset: Mutex::new(0),
mmap_mut: Mutex::new(mmap_mut), current_len: AtomicUsize::new(0),
file_size: size, file_size: size as u64,
inc_size: inc,
phantom: PhantomData,
} }
} }
pub fn reset(&mut self) { #[allow(clippy::mutex_atomic)]
let _mmap_mut = self.mmap_mut.lock().unwrap(); pub fn reset(&self) {
self.current_offset.store(0, Ordering::Relaxed); // This mutex forces append to be single threaded, but concurrent with reads
let mut offset = self.append_offset.lock().unwrap();
self.current_len.store(0, Ordering::Relaxed);
*offset = 0;
} }
#[allow(dead_code)] pub fn len(&self) -> usize {
pub fn get(&self, index: u64) -> &T { self.current_len.load(Ordering::Relaxed)
let offset = self.current_offset.load(Ordering::Relaxed);
let at = index as usize;
assert!(offset >= at + mem::size_of::<T>());
let data = &self.mmap[at..at + mem::size_of::<T>()];
let ptr = data.as_ptr() as *const T;
let x: Option<&T> = unsafe { ptr.as_ref() };
x.unwrap()
} }
#[allow(dead_code)] pub fn is_empty(&self) -> bool {
pub fn grow_file(&mut self) -> Result<()> { self.len() == 0
if self.inc_size == 0 {
return Err(Error::new(ErrorKind::WriteZero, "Grow not supported"));
}
let mut mmap_mut = self.mmap_mut.lock().unwrap();
let index = self.current_offset.load(Ordering::Relaxed) + mem::size_of::<T>();
if index as u64 + self.inc_size < self.file_size {
// grow was already called
return Ok(());
}
let end = self.file_size + self.inc_size;
drop(mmap_mut.to_owned());
drop(self.mmap.to_owned());
self.data.seek(SeekFrom::Start(end))?;
self.data.write_all(&[0])?;
self.mmap = unsafe { Mmap::map(&self.data)? };
*mmap_mut = unsafe { MmapMut::map_mut(&self.data)? };
self.file_size = end;
Ok(())
} }
#[allow(dead_code)] pub fn capacity(&self) -> u64 {
pub fn append(&self, val: T) -> Option<u64> { self.file_size
let mmap_mut = self.mmap_mut.lock().unwrap();
let index = self.current_offset.load(Ordering::Relaxed);
if (self.file_size as usize) < index + mem::size_of::<T>() {
return None;
} }
let data = &mmap_mut[index..(index + mem::size_of::<T>())]; // The reason for the `mut` is to allow the account data pointer to be fixed up after
// the structure is loaded
#[allow(clippy::mut_from_ref)]
fn get_slice(&self, offset: usize, size: usize) -> &mut [u8] {
let len = self.len();
assert!(len >= offset + size);
let data = &self.map[offset..offset + size];
unsafe { unsafe {
let ptr = data.as_ptr() as *mut T; let dst = data.as_ptr() as *mut u8;
std::ptr::write(ptr, val) std::slice::from_raw_parts_mut(dst, size)
}
}
fn append_ptr(&self, offset: &mut usize, src: *const u8, len: usize) {
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let pos = align_up!(*offset as usize, mem::size_of::<u64>());
let data = &self.map[pos..(pos + len)];
unsafe {
let dst = data.as_ptr() as *mut u8;
std::ptr::copy(src, dst, len);
}; };
self.current_offset *offset = pos + len;
.fetch_add(mem::size_of::<T>(), Ordering::Relaxed);
Some(index as u64)
} }
pub fn get_account(&self, index: u64) -> Result<Account> { #[allow(clippy::mutex_atomic)]
let index = index as usize; fn append_ptrs(&self, vals: &[(*const u8, usize)]) -> Option<usize> {
deserialize_account( // This mutex forces append to be single threaded, but concurrent with reads
&self.mmap[..], let mut offset = self.append_offset.lock().unwrap();
index, let mut end = *offset;
self.current_offset.load(Ordering::Relaxed), for val in vals {
) //Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
end = align_up!(end, mem::size_of::<u64>());
end += val.1;
} }
pub fn append_account(&self, account: &Account) -> Option<u64> { if (self.file_size as usize) <= end {
let mut mmap_mut = self.mmap_mut.lock().unwrap();
let data_at = align_up!(
self.current_offset.load(Ordering::Relaxed),
mem::size_of::<u64>()
);
let len = get_serialized_size(account);
if (self.file_size as usize) < data_at + len + SIZEOF_U64 {
return None; return None;
} }
serialize_account( //Data is aligned at the next 64 byte offset. Without alignment loading the memory may
&mut mmap_mut[data_at..data_at + len + SIZEOF_U64], //crash on some architectures.
&account, let pos = align_up!(*offset, mem::size_of::<u64>());
len, for val in vals {
); self.append_ptr(&mut offset, val.0, val.1)
}
self.current_len.store(*offset, Ordering::Relaxed);
Some(pos)
}
self.current_offset #[allow(clippy::transmute_ptr_to_ptr)]
.store(data_at + len + SIZEOF_U64, Ordering::Relaxed); pub fn get_account(&self, offset: usize) -> &Account {
Some(data_at as u64) let account: *mut Account = {
let data = self.get_slice(offset, mem::size_of::<Account>());
unsafe { std::mem::transmute::<*const u8, *mut Account>(data.as_ptr()) }
};
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let data_at = align_up!(offset + mem::size_of::<Account>(), mem::size_of::<u64>());
let account_ref: &mut Account = unsafe { &mut *account };
let data = self.get_slice(data_at, account_ref.data.len());
unsafe {
let mut new_data = Vec::from_raw_parts(data.as_mut_ptr(), data.len(), data.len());
std::mem::swap(&mut account_ref.data, &mut new_data);
std::mem::forget(new_data);
};
account_ref
}
pub fn accounts(&self, mut start: usize) -> Vec<&Account> {
let mut accounts = vec![];
loop {
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let end = align_up!(start + mem::size_of::<Account>(), mem::size_of::<u64>());
if end > self.len() {
break;
}
let first = self.get_account(start);
accounts.push(first);
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let data_at = align_up!(start + mem::size_of::<Account>(), mem::size_of::<u64>());
let next = align_up!(data_at + first.data.len(), mem::size_of::<u64>());
start = next;
}
accounts
}
pub fn append_account(&self, account: &Account) -> Option<usize> {
let acc_ptr = account as *const Account;
let data_len = account.data.len();
let data_ptr = account.data.as_ptr();
let ptrs = [
(acc_ptr as *const u8, mem::size_of::<Account>()),
(data_ptr, data_len),
];
self.append_ptrs(&ptrs)
}
}
pub mod test_utils {
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use std::fs::create_dir_all;
use std::path::PathBuf;
pub struct TempFile {
pub path: PathBuf,
}
impl Drop for TempFile {
fn drop(&mut self) {
let mut path = PathBuf::new();
std::mem::swap(&mut path, &mut self.path);
let _ = std::fs::remove_file(path);
}
}
pub fn get_append_vec_path(path: &str) -> TempFile {
let out_dir =
std::env::var("OUT_DIR").unwrap_or_else(|_| "/tmp/append_vec_tests".to_string());
let mut buf = PathBuf::new();
let rand_string: String = thread_rng().sample_iter(&Alphanumeric).take(30).collect();
buf.push(&format!("{}/{}{}", out_dir, path, rand_string));
create_dir_all(out_dir).expect("Create directory failed");
TempFile { path: buf }
}
pub fn create_test_account(sample: usize) -> Account {
let data_len = sample % 256;
let mut account = Account::new(sample as u64, 0, &Pubkey::default());
account.data = (0..data_len).map(|_| data_len as u8).collect();
account
} }
} }
#[cfg(test)] #[cfg(test)]
pub mod tests { pub mod tests {
use super::test_utils::*;
use super::*; use super::*;
use log::*; use log::*;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use solana_sdk::timing::{duration_as_ms, duration_as_s}; use solana_sdk::timing::duration_as_ms;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant; use std::time::Instant;
const START_SIZE: u64 = 4 * 1024 * 1024;
const INC_SIZE: u64 = 1 * 1024 * 1024;
#[test] #[test]
fn test_append_vec() { fn test_append_vec() {
let path = Path::new("append_vec"); let path = get_append_vec_path("test_append");
let av = AppendVec::new(path, true, START_SIZE, INC_SIZE); let av = AppendVec::new(&path.path, true, 1024 * 1024);
let val: u64 = 5; let account = create_test_account(0);
let index = av.append(val).unwrap(); let index = av.append_account(&account).unwrap();
assert_eq!(*av.get(index), val); assert_eq!(*av.get_account(index), account);
let val1 = val + 1;
let index1 = av.append(val1).unwrap();
assert_eq!(*av.get(index), val);
assert_eq!(*av.get(index1), val1);
std::fs::remove_file(path).unwrap();
} }
#[test] #[test]
fn test_append_vec_account() { fn test_append_vec_data() {
let path = Path::new("append_vec_account"); let path = get_append_vec_path("test_append_data");
let av: AppendVec<Account> = AppendVec::new(path, true, START_SIZE, INC_SIZE); let av = AppendVec::new(&path.path, true, 1024 * 1024);
let v1 = vec![1u8; 32]; let account = create_test_account(5);
let mut account1 = Account { let index = av.append_account(&account).unwrap();
lamports: 1, assert_eq!(*av.get_account(index), account);
data: v1, let account1 = create_test_account(6);
owner: Pubkey::default(),
executable: false,
};
let index1 = av.append_account(&account1).unwrap(); let index1 = av.append_account(&account1).unwrap();
assert_eq!(index1, 0); assert_eq!(*av.get_account(index), account);
assert_eq!(av.get_account(index1).unwrap(), account1); assert_eq!(*av.get_account(index1), account1);
let v2 = vec![4u8; 32];
let mut account2 = Account {
lamports: 1,
data: v2,
owner: Pubkey::default(),
executable: false,
};
let index2 = av.append_account(&account2).unwrap();
let mut len = get_serialized_size(&account1) + SIZEOF_U64 as usize;
assert_eq!(index2, len as u64);
assert_eq!(av.get_account(index2).unwrap(), account2);
assert_eq!(av.get_account(index1).unwrap(), account1);
account2.data.iter_mut().for_each(|e| *e *= 2);
let index3 = av.append_account(&account2).unwrap();
len += get_serialized_size(&account2) + SIZEOF_U64 as usize;
assert_eq!(index3, len as u64);
assert_eq!(av.get_account(index3).unwrap(), account2);
account1.data.extend([1, 2, 3, 4, 5, 6].iter().cloned());
let index4 = av.append_account(&account1).unwrap();
len += get_serialized_size(&account2) + SIZEOF_U64 as usize;
assert_eq!(index4, len as u64);
assert_eq!(av.get_account(index4).unwrap(), account1);
std::fs::remove_file(path).unwrap();
} }
#[test] #[test]
fn test_grow_append_vec() { fn test_append_vec_append_many() {
let path = Path::new("grow"); let path = get_append_vec_path("test_append_many");
let mut av = AppendVec::new(path, true, START_SIZE, INC_SIZE); let av = AppendVec::new(&path.path, true, 1024 * 1024);
let mut val = [5u64; 32]; let size = 1000;
let size = 100_000; let mut indexes = vec![];
let mut offsets = vec![0; size]; let now = Instant::now();
for sample in 0..size {
let account = create_test_account(sample);
let pos = av.append_account(&account).unwrap();
assert_eq!(*av.get_account(pos), account);
indexes.push(pos)
}
trace!("append time: {} ms", duration_as_ms(&now.elapsed()),);
let now = Instant::now(); let now = Instant::now();
for index in 0..size { for _ in 0..size {
if let Some(offset) = av.append(val) { let sample = thread_rng().gen_range(0, indexes.len());
offsets[index] = offset; let account = create_test_account(sample);
} else { assert_eq!(*av.get_account(indexes[sample]), account);
assert!(av.grow_file().is_ok());
if let Some(offset) = av.append(val) {
offsets[index] = offset;
} else {
assert!(false);
} }
} trace!("random read time: {} ms", duration_as_ms(&now.elapsed()),);
val[0] += 1;
}
info!(
"time: {} ms {} / s",
duration_as_ms(&now.elapsed()),
((mem::size_of::<[u64; 32]>() * size) as f32) / duration_as_s(&now.elapsed()),
);
let now = Instant::now(); let now = Instant::now();
let num_reads = 100_000; assert_eq!(indexes.len(), size);
for _ in 0..num_reads { assert_eq!(indexes[0], 0);
let index = thread_rng().gen_range(0, size); let accounts = av.accounts(indexes[0]);
assert_eq!(av.get(offsets[index])[0], (index + 5) as u64); assert_eq!(accounts.len(), size);
for (sample, v) in accounts.iter().enumerate() {
let account = create_test_account(sample);
assert_eq!(**v, account)
} }
info!( trace!(
"time: {} ms {} / s", "sequential read time: {} ms",
duration_as_ms(&now.elapsed()), duration_as_ms(&now.elapsed()),
(num_reads as f32) / duration_as_s(&now.elapsed()),
); );
std::fs::remove_file(path).unwrap();
}
#[test]
fn random_atomic_change() {
let path = Path::new("random");
let mut vec = AppendVec::<AtomicUsize>::new(path, true, START_SIZE, INC_SIZE);
let size = 1_000;
for k in 0..size {
if vec.append(AtomicUsize::new(k)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(0)).is_some());
}
}
let index = thread_rng().gen_range(0, size as u64);
let atomic1 = vec.get(index * mem::size_of::<AtomicUsize>() as u64);
let current1 = atomic1.load(Ordering::Relaxed);
assert_eq!(current1, index as usize);
let next = current1 + 1;
let index = vec.append(AtomicUsize::new(next)).unwrap();
let atomic2 = vec.get(index);
let current2 = atomic2.load(Ordering::Relaxed);
assert_eq!(current2, next);
std::fs::remove_file(path).unwrap();
} }
} }