AppendVecs that can return references and read/append without locks (#3713)

* AppendVec with raw pointers

* appendvecs

* imports

* review

* review comments

* clippy
This commit is contained in:
anatoly yakovenko 2019-04-11 13:16:56 -07:00 committed by GitHub
parent a28c3b0e9a
commit f669ae5868
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 301 additions and 513 deletions

View File

@ -1,241 +1,120 @@
#![feature(test)]
extern crate rand;
extern crate test;
use bincode::{deserialize, serialize_into, serialized_size};
use rand::{thread_rng, Rng};
use solana_runtime::append_vec::{
deserialize_account, get_serialized_size, serialize_account, AppendVec,
};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use std::env;
use std::fs::{create_dir_all, remove_dir_all};
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use solana_runtime::append_vec::test_utils::{create_test_account, get_append_vec_path};
use solana_runtime::append_vec::AppendVec;
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::thread::spawn;
use std::time::Duration;
use test::Bencher;
const START_SIZE: u64 = 4 * 1024 * 1024;
const INC_SIZE: u64 = 1 * 1024 * 1024;
macro_rules! align_up {
($addr: expr, $align: expr) => {
($addr + ($align - 1)) & !($align - 1)
};
}
fn get_append_vec_bench_path(path: &str) -> PathBuf {
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let mut buf = PathBuf::new();
buf.push(&format!("{}/{}", out_dir, path));
let _ignored = remove_dir_all(out_dir.clone());
create_dir_all(out_dir).expect("Create directory failed");
buf
}
#[bench]
fn append_vec_atomic_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_append");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
fn append_vec_append(bencher: &mut Bencher) {
let path = get_append_vec_path("bench_append");
let vec = AppendVec::new(&path.path, true, 64 * 1024);
bencher.iter(|| {
if vec.append(AtomicUsize::new(0)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(0)).is_some());
let account = create_test_account(0);
if vec.append_account(&account).is_none() {
vec.reset();
}
});
std::fs::remove_file(path).unwrap();
}
fn add_test_accounts(vec: &AppendVec, size: usize) -> Vec<(usize, usize)> {
(0..size)
.into_iter()
.filter_map(|sample| {
let account = create_test_account(sample);
vec.append_account(&account).map(|pos| (sample, pos))
})
.collect()
}
#[bench]
fn append_vec_atomic_random_access(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_ra");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
let size = 1_000_000;
for _ in 0..size {
if vec.append(AtomicUsize::new(0)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(0)).is_some());
}
}
fn append_vec_sequential_read(bencher: &mut Bencher) {
let path = get_append_vec_path("seq_read");
let vec = AppendVec::new(&path.path, true, 64 * 1024);
let size = 1_000;
let mut indexes = add_test_accounts(&vec, size);
bencher.iter(|| {
let index = thread_rng().gen_range(0, size as u64);
vec.get(index * std::mem::size_of::<AtomicUsize>() as u64);
let (sample, pos) = indexes.pop().unwrap();
let account = vec.get_account(pos);
let test = create_test_account(sample);
assert_eq!(*account, test);
indexes.push((sample, pos));
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn append_vec_atomic_random_change(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_rax");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
let size = 1_000_000;
for k in 0..size {
if vec.append(AtomicUsize::new(k)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(k)).is_some());
}
}
fn append_vec_random_read(bencher: &mut Bencher) {
let path = get_append_vec_path("random_read");
let vec = AppendVec::new(&path.path, true, 64 * 1024);
let size = 1_000;
let mut indexes = add_test_accounts(&vec, size);
bencher.iter(|| {
let index = thread_rng().gen_range(0, size as u64);
let atomic1 = vec.get(index * std::mem::size_of::<AtomicUsize>() as u64);
let current1 = atomic1.load(Ordering::Relaxed);
assert_eq!(current1, index as usize);
let next = current1 + 1;
let mut index = vec.append(AtomicUsize::new(next));
if index.is_none() {
assert!(vec.grow_file().is_ok());
index = vec.append(AtomicUsize::new(next));
}
let atomic2 = vec.get(index.unwrap());
let current2 = atomic2.load(Ordering::Relaxed);
assert_eq!(current2, next);
let random_index: usize = thread_rng().gen_range(0, indexes.len());
let (sample, pos) = &indexes[random_index];
let account = vec.get_account(*pos);
let test = create_test_account(*sample);
assert_eq!(*account, test);
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn append_vec_atomic_random_read(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_read");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
let size = 1_000_000;
for _ in 0..size {
if vec.append(AtomicUsize::new(0)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(0)).is_some());
}
}
bencher.iter(|| {
let index = thread_rng().gen_range(0, size);
let atomic1 = vec.get((index * std::mem::size_of::<AtomicUsize>()) as u64);
let current1 = atomic1.load(Ordering::Relaxed);
assert_eq!(current1, 0);
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn append_vec_concurrent_lock_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_lock_append");
let vec = Arc::new(RwLock::new(AppendVec::<AtomicUsize>::new(
&path, true, START_SIZE, INC_SIZE,
)));
fn append_vec_concurrent_append_read(bencher: &mut Bencher) {
let path = get_append_vec_path("concurrent_read");
let vec = Arc::new(AppendVec::new(&path.path, true, 1024 * 1024));
let vec1 = vec.clone();
let size = 1_000_000;
let count = Arc::new(AtomicUsize::new(0));
let count1 = count.clone();
let indexes: Arc<Mutex<Vec<(usize, usize)>>> = Arc::new(Mutex::new(vec![]));
let indexes1 = indexes.clone();
spawn(move || loop {
let mut len = count.load(Ordering::Relaxed);
{
let rlock = vec1.read().unwrap();
loop {
if rlock.append(AtomicUsize::new(0)).is_none() {
break;
}
len = count.fetch_add(1, Ordering::Relaxed);
}
if len >= size {
break;
}
}
{
let mut wlock = vec1.write().unwrap();
if len >= size {
break;
}
assert!(wlock.grow_file().is_ok());
let sample = indexes1.lock().unwrap().len();
let account = create_test_account(sample);
if let Some(pos) = vec1.append_account(&account) {
indexes1.lock().unwrap().push((sample, pos))
} else {
break;
}
});
while indexes.lock().unwrap().is_empty() {
sleep(Duration::from_millis(100));
}
bencher.iter(|| {
let _rlock = vec.read().unwrap();
let len = count1.load(Ordering::Relaxed);
assert!(len < size * 2);
let len = indexes.lock().unwrap().len();
let random_index: usize = thread_rng().gen_range(0, len);
let (sample, pos) = indexes.lock().unwrap().get(random_index).unwrap().clone();
let account = vec.get_account(pos);
let test = create_test_account(sample);
assert_eq!(*account, test);
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn append_vec_concurrent_get_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_get_append");
let vec = Arc::new(RwLock::new(AppendVec::<AtomicUsize>::new(
&path, true, START_SIZE, INC_SIZE,
)));
fn append_vec_concurrent_read_append(bencher: &mut Bencher) {
let path = get_append_vec_path("concurrent_read");
let vec = Arc::new(AppendVec::new(&path.path, true, 1024 * 1024));
let vec1 = vec.clone();
let size = 1_000_000;
let count = Arc::new(AtomicUsize::new(0));
let count1 = count.clone();
let indexes: Arc<Mutex<Vec<(usize, usize)>>> = Arc::new(Mutex::new(vec![]));
let indexes1 = indexes.clone();
spawn(move || loop {
let mut len = count.load(Ordering::Relaxed);
{
let rlock = vec1.read().unwrap();
loop {
if rlock.append(AtomicUsize::new(0)).is_none() {
break;
}
len = count.fetch_add(1, Ordering::Relaxed);
}
if len >= size {
break;
}
}
{
let mut wlock = vec1.write().unwrap();
if len >= size {
break;
}
assert!(wlock.grow_file().is_ok());
}
let len = indexes1.lock().unwrap().len();
let random_index: usize = thread_rng().gen_range(0, len + 1);
let (sample, pos) = indexes1
.lock()
.unwrap()
.get(random_index % len)
.unwrap()
.clone();
let account = vec1.get_account(pos);
let test = create_test_account(sample);
assert_eq!(*account, test);
});
bencher.iter(|| {
let rlock = vec.read().unwrap();
let len = count1.load(Ordering::Relaxed);
if len > 0 {
let index = thread_rng().gen_range(0, len);
rlock.get((index * std::mem::size_of::<AtomicUsize>()) as u64);
let sample: usize = thread_rng().gen_range(0, 256);
let account = create_test_account(sample);
if let Some(pos) = vec.append_account(&account) {
indexes.lock().unwrap().push((sample, pos))
}
});
std::fs::remove_file(path).unwrap();
}
#[bench]
fn bench_account_serialize(bencher: &mut Bencher) {
let num: usize = 1000;
let account = Account::new(2, 100, &Pubkey::new_rand());
let len = get_serialized_size(&account);
let ser_len = align_up!(len + std::mem::size_of::<u64>(), std::mem::size_of::<u64>());
let mut memory = test::black_box(vec![0; num * ser_len]);
bencher.iter(|| {
for i in 0..num {
let start = i * ser_len;
serialize_account(&mut memory[start..start + ser_len], &account, len);
}
});
let index = thread_rng().gen_range(0, num);
let start = index * ser_len;
let new_account = deserialize_account(&memory[start..start + ser_len], 0, num * len).unwrap();
assert_eq!(new_account, account);
}
#[bench]
fn bench_account_serialize_bincode(bencher: &mut Bencher) {
let num: usize = 1000;
let account = Account::new(2, 100, &Pubkey::new_rand());
let len = serialized_size(&account).unwrap() as usize;
let mut memory = test::black_box(vec![0u8; num * len]);
bencher.iter(|| {
for i in 0..num {
let start = i * len;
let cursor = Cursor::new(&mut memory[start..start + len]);
serialize_into(cursor, &account).unwrap();
}
});
let index = thread_rng().gen_range(0, len);
let start = index * len;
let new_account: Account = deserialize(&memory[start..start + len]).unwrap();
assert_eq!(new_account, account);
}

View File

@ -18,7 +18,7 @@ use std::env;
use std::fs::{create_dir_all, remove_dir_all};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Mutex, RwLock};
pub type InstructionAccounts = Vec<Account>;
pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>;
@ -101,7 +101,7 @@ struct AccountInfo {
id: AppendVecId,
/// offset into the storage
offset: u64,
offset: usize,
/// lamports in the account used when squashing kept for optimization
/// purposes to remove accounts with zero balance.
@ -127,7 +127,7 @@ struct AccountIndex {
/// Persistent storage structure holding the accounts
struct AccountStorageEntry {
/// storage holding the accounts
accounts: Arc<RwLock<AppendVec<Account>>>,
accounts: AppendVec,
/// Keeps track of the number of accounts stored in a specific AppendVec.
/// This is periodically checked to reuse the stores that do not have
@ -139,17 +139,12 @@ struct AccountStorageEntry {
}
impl AccountStorageEntry {
pub fn new(path: &str, id: usize, file_size: u64, inc_size: u64) -> Self {
pub fn new(path: &str, id: usize, file_size: u64) -> Self {
let p = format!("{}/{}", path, id);
let path = Path::new(&p);
let _ignored = remove_dir_all(path);
create_dir_all(path).expect("Create directory failed");
let accounts = Arc::new(RwLock::new(AppendVec::<Account>::new(
&path.join(ACCOUNT_DATA_FILE),
true,
file_size,
inc_size,
)));
let accounts = AppendVec::new(&path.join(ACCOUNT_DATA_FILE), true, file_size as usize);
AccountStorageEntry {
accounts,
@ -172,7 +167,7 @@ impl AccountStorageEntry {
fn remove_account(&self) {
if self.count.fetch_sub(1, Ordering::Relaxed) == 1 {
self.accounts.write().unwrap().reset();
self.accounts.reset();
self.set_status(AccountStorageStatus::StorageAvailable);
}
}
@ -200,9 +195,6 @@ pub struct AccountsDB {
/// Starting file size of appendvecs
file_size: u64,
/// Increment size of appendvecs
inc_size: u64,
}
/// This structure handles synchronization for db
@ -241,7 +233,7 @@ impl Drop for Accounts {
}
impl AccountsDB {
pub fn new_with_file_size(fork: Fork, paths: &str, file_size: u64, inc_size: u64) -> Self {
pub fn new_with_file_size(fork: Fork, paths: &str, file_size: u64) -> Self {
let account_index = AccountIndex {
account_maps: RwLock::new(HashMap::new()),
};
@ -253,7 +245,6 @@ impl AccountsDB {
parents_map: RwLock::new(HashMap::new()),
paths,
file_size,
inc_size,
};
accounts_db.add_storage(&accounts_db.paths);
accounts_db.add_fork(fork, None);
@ -261,7 +252,7 @@ impl AccountsDB {
}
pub fn new(fork: Fork, paths: &str) -> Self {
Self::new_with_file_size(fork, paths, ACCOUNT_DATA_FILE_SIZE, 0)
Self::new_with_file_size(fork, paths, ACCOUNT_DATA_FILE_SIZE)
}
pub fn add_fork(&self, fork: Fork, parent: Option<Fork>) {
@ -287,7 +278,6 @@ impl AccountsDB {
path,
self.next_id.fetch_add(1, Ordering::Relaxed),
self.file_size,
self.inc_size,
)
}
@ -329,10 +319,11 @@ impl AccountsDB {
Some(hash(&serialize(&ordered_accounts).unwrap()))
}
fn get_account(&self, id: AppendVecId, offset: u64) -> Account {
let accounts = &self.storage.read().unwrap()[id].accounts;
let av = accounts.read().unwrap();
av.get_account(offset).unwrap()
fn get_account(&self, id: AppendVecId, offset: usize) -> Account {
self.storage.read().unwrap()[id]
.accounts
.get_account(offset)
.clone()
}
fn load(&self, fork: Fork, pubkey: &Pubkey, walk_back: bool) -> Option<Account> {
@ -440,8 +431,8 @@ impl AccountsDB {
id
}
fn append_account(&self, account: &Account) -> (usize, u64) {
let offset: u64;
fn append_account(&self, account: &Account) -> (usize, usize) {
let offset: usize;
let start = self.next_id.fetch_add(1, Ordering::Relaxed);
let mut id = self.get_storage_id(start, std::usize::MAX);
@ -454,10 +445,10 @@ impl AccountsDB {
}
loop {
let result: Option<u64>;
let result: Option<usize>;
{
let av = &self.storage.read().unwrap()[id].accounts;
result = av.read().unwrap().append_account(acc);
result = av.append_account(acc);
}
if let Some(val) = result {
offset = val;
@ -1628,7 +1619,7 @@ mod tests {
fn test_account_grow_many() {
let paths = get_tmp_accounts_path("many2,many3");
let size = 4096;
let accounts = AccountsDB::new_with_file_size(0, &paths.paths, size, 0);
let accounts = AccountsDB::new_with_file_size(0, &paths.paths, size);
let mut keys = vec![];
for i in 0..9 {
let key = Pubkey::new_rand();

View File

@ -1,358 +1,276 @@
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use memmap::{Mmap, MmapMut};
use memmap::MmapMut;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use std::fs::{File, OpenOptions};
use std::io::{Error, ErrorKind, Read, Result, Seek, SeekFrom, Write};
use std::marker::PhantomData;
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
use std::mem;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
const SIZEOF_U64: usize = mem::size_of::<u64>();
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
macro_rules! align_up {
($addr: expr, $align: expr) => {
($addr + ($align - 1)) & !($align - 1)
};
}
pub struct AppendVec<T> {
data: File,
mmap: Mmap,
current_offset: AtomicUsize,
mmap_mut: Mutex<MmapMut>,
pub struct AppendVec {
map: MmapMut,
// This mutex forces append to be single threaded, but concurrent with reads
append_offset: Mutex<usize>,
current_len: AtomicUsize,
file_size: u64,
inc_size: u64,
phantom: PhantomData<T>,
}
fn get_account_size_static() -> usize {
mem::size_of::<Account>() - mem::size_of::<Vec<u8>>()
}
pub fn get_serialized_size(account: &Account) -> usize {
get_account_size_static() + account.data.len()
}
pub fn serialize_account(dst_slice: &mut [u8], account: &Account, len: usize) {
let mut at = 0;
write_u64(&mut at, dst_slice, len as u64);
write_u64(&mut at, dst_slice, account.lamports);
write_bytes(&mut at, dst_slice, &account.data);
write_bytes(&mut at, dst_slice, account.owner.as_ref());
write_bytes(&mut at, dst_slice, &[account.executable as u8]);
}
fn read_bytes(at: &mut usize, dst_slice: &mut [u8], src_slice: &[u8], len: usize) {
let data = &src_slice[*at..*at + len];
(&data[..]).read_exact(&mut dst_slice[..]).unwrap();
*at += len;
}
fn write_bytes(at: &mut usize, dst_slice: &mut [u8], src_slice: &[u8]) {
let data = &mut dst_slice[*at..*at + src_slice.len()];
(&mut data[..]).write_all(&src_slice).unwrap();
*at += src_slice.len();
}
fn read_u64(at: &mut usize, src_slice: &[u8]) -> u64 {
let data = &src_slice[*at..*at + mem::size_of::<u64>()];
*at += mem::size_of::<u64>();
(&data[..]).read_u64::<LittleEndian>().unwrap()
}
fn write_u64(at: &mut usize, dst_slice: &mut [u8], value: u64) {
let data = &mut dst_slice[*at..*at + mem::size_of::<u64>()];
(&mut data[..]).write_u64::<LittleEndian>(value).unwrap();
*at += mem::size_of::<u64>();
}
pub fn deserialize_account(
src_slice: &[u8],
index: usize,
current_offset: usize,
) -> Result<Account> {
let mut at = index;
let size = read_u64(&mut at, &src_slice);
let len = size as usize;
assert!(current_offset >= at + len);
let lamports = read_u64(&mut at, &src_slice);
let data_len = len - get_account_size_static();
let mut data = vec![0; data_len];
read_bytes(&mut at, &mut data, &src_slice, data_len);
let mut pubkey = vec![0; mem::size_of::<Pubkey>()];
read_bytes(&mut at, &mut pubkey, &src_slice, mem::size_of::<Pubkey>());
let owner = Pubkey::new(&pubkey);
let mut exec = vec![0; mem::size_of::<bool>()];
read_bytes(&mut at, &mut exec, &src_slice, mem::size_of::<bool>());
let executable: bool = exec[0] != 0;
Ok(Account {
lamports,
data,
owner,
executable,
})
}
impl<T> AppendVec<T>
where
T: Default,
{
pub fn new(path: &Path, create: bool, size: u64, inc: u64) -> Self {
impl AppendVec {
#[allow(clippy::mutex_atomic)]
pub fn new(file: &Path, create: bool, size: usize) -> Self {
let mut data = OpenOptions::new()
.read(true)
.write(true)
.create(create)
.open(path)
.open(file)
.expect("Unable to open data file");
data.seek(SeekFrom::Start(size)).unwrap();
data.seek(SeekFrom::Start(size as u64)).unwrap();
data.write_all(&[0]).unwrap();
data.seek(SeekFrom::Start(0)).unwrap();
data.flush().unwrap();
let mmap = unsafe { Mmap::map(&data).expect("failed to map the data file") };
let mmap_mut = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") };
let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") };
AppendVec {
data,
mmap,
current_offset: AtomicUsize::new(0),
mmap_mut: Mutex::new(mmap_mut),
file_size: size,
inc_size: inc,
phantom: PhantomData,
map,
// This mutex forces append to be single threaded, but concurrent with reads
append_offset: Mutex::new(0),
current_len: AtomicUsize::new(0),
file_size: size as u64,
}
}
pub fn reset(&mut self) {
let _mmap_mut = self.mmap_mut.lock().unwrap();
self.current_offset.store(0, Ordering::Relaxed);
#[allow(clippy::mutex_atomic)]
pub fn reset(&self) {
// This mutex forces append to be single threaded, but concurrent with reads
let mut offset = self.append_offset.lock().unwrap();
self.current_len.store(0, Ordering::Relaxed);
*offset = 0;
}
#[allow(dead_code)]
pub fn get(&self, index: u64) -> &T {
let offset = self.current_offset.load(Ordering::Relaxed);
let at = index as usize;
assert!(offset >= at + mem::size_of::<T>());
let data = &self.mmap[at..at + mem::size_of::<T>()];
let ptr = data.as_ptr() as *const T;
let x: Option<&T> = unsafe { ptr.as_ref() };
x.unwrap()
pub fn len(&self) -> usize {
self.current_len.load(Ordering::Relaxed)
}
#[allow(dead_code)]
pub fn grow_file(&mut self) -> Result<()> {
if self.inc_size == 0 {
return Err(Error::new(ErrorKind::WriteZero, "Grow not supported"));
}
let mut mmap_mut = self.mmap_mut.lock().unwrap();
let index = self.current_offset.load(Ordering::Relaxed) + mem::size_of::<T>();
if index as u64 + self.inc_size < self.file_size {
// grow was already called
return Ok(());
}
let end = self.file_size + self.inc_size;
drop(mmap_mut.to_owned());
drop(self.mmap.to_owned());
self.data.seek(SeekFrom::Start(end))?;
self.data.write_all(&[0])?;
self.mmap = unsafe { Mmap::map(&self.data)? };
*mmap_mut = unsafe { MmapMut::map_mut(&self.data)? };
self.file_size = end;
Ok(())
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[allow(dead_code)]
pub fn append(&self, val: T) -> Option<u64> {
let mmap_mut = self.mmap_mut.lock().unwrap();
let index = self.current_offset.load(Ordering::Relaxed);
pub fn capacity(&self) -> u64 {
self.file_size
}
if (self.file_size as usize) < index + mem::size_of::<T>() {
return None;
}
let data = &mmap_mut[index..(index + mem::size_of::<T>())];
// The reason for the `mut` is to allow the account data pointer to be fixed up after
// the structure is loaded
#[allow(clippy::mut_from_ref)]
fn get_slice(&self, offset: usize, size: usize) -> &mut [u8] {
let len = self.len();
assert!(len >= offset + size);
let data = &self.map[offset..offset + size];
unsafe {
let ptr = data.as_ptr() as *mut T;
std::ptr::write(ptr, val)
let dst = data.as_ptr() as *mut u8;
std::slice::from_raw_parts_mut(dst, size)
}
}
fn append_ptr(&self, offset: &mut usize, src: *const u8, len: usize) {
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let pos = align_up!(*offset as usize, mem::size_of::<u64>());
let data = &self.map[pos..(pos + len)];
unsafe {
let dst = data.as_ptr() as *mut u8;
std::ptr::copy(src, dst, len);
};
self.current_offset
.fetch_add(mem::size_of::<T>(), Ordering::Relaxed);
Some(index as u64)
*offset = pos + len;
}
pub fn get_account(&self, index: u64) -> Result<Account> {
let index = index as usize;
deserialize_account(
&self.mmap[..],
index,
self.current_offset.load(Ordering::Relaxed),
)
}
#[allow(clippy::mutex_atomic)]
fn append_ptrs(&self, vals: &[(*const u8, usize)]) -> Option<usize> {
// This mutex forces append to be single threaded, but concurrent with reads
let mut offset = self.append_offset.lock().unwrap();
let mut end = *offset;
for val in vals {
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
end = align_up!(end, mem::size_of::<u64>());
end += val.1;
}
pub fn append_account(&self, account: &Account) -> Option<u64> {
let mut mmap_mut = self.mmap_mut.lock().unwrap();
let data_at = align_up!(
self.current_offset.load(Ordering::Relaxed),
mem::size_of::<u64>()
);
let len = get_serialized_size(account);
if (self.file_size as usize) < data_at + len + SIZEOF_U64 {
if (self.file_size as usize) <= end {
return None;
}
serialize_account(
&mut mmap_mut[data_at..data_at + len + SIZEOF_U64],
&account,
len,
);
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let pos = align_up!(*offset, mem::size_of::<u64>());
for val in vals {
self.append_ptr(&mut offset, val.0, val.1)
}
self.current_len.store(*offset, Ordering::Relaxed);
Some(pos)
}
self.current_offset
.store(data_at + len + SIZEOF_U64, Ordering::Relaxed);
Some(data_at as u64)
#[allow(clippy::transmute_ptr_to_ptr)]
pub fn get_account(&self, offset: usize) -> &Account {
let account: *mut Account = {
let data = self.get_slice(offset, mem::size_of::<Account>());
unsafe { std::mem::transmute::<*const u8, *mut Account>(data.as_ptr()) }
};
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let data_at = align_up!(offset + mem::size_of::<Account>(), mem::size_of::<u64>());
let account_ref: &mut Account = unsafe { &mut *account };
let data = self.get_slice(data_at, account_ref.data.len());
unsafe {
let mut new_data = Vec::from_raw_parts(data.as_mut_ptr(), data.len(), data.len());
std::mem::swap(&mut account_ref.data, &mut new_data);
std::mem::forget(new_data);
};
account_ref
}
pub fn accounts(&self, mut start: usize) -> Vec<&Account> {
let mut accounts = vec![];
loop {
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let end = align_up!(start + mem::size_of::<Account>(), mem::size_of::<u64>());
if end > self.len() {
break;
}
let first = self.get_account(start);
accounts.push(first);
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let data_at = align_up!(start + mem::size_of::<Account>(), mem::size_of::<u64>());
let next = align_up!(data_at + first.data.len(), mem::size_of::<u64>());
start = next;
}
accounts
}
pub fn append_account(&self, account: &Account) -> Option<usize> {
let acc_ptr = account as *const Account;
let data_len = account.data.len();
let data_ptr = account.data.as_ptr();
let ptrs = [
(acc_ptr as *const u8, mem::size_of::<Account>()),
(data_ptr, data_len),
];
self.append_ptrs(&ptrs)
}
}
pub mod test_utils {
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use std::fs::{create_dir_all, remove_dir_all};
use std::path::PathBuf;
pub struct TempFile {
pub path: PathBuf,
}
impl Drop for TempFile {
fn drop(&mut self) {
let mut path = PathBuf::new();
std::mem::swap(&mut path, &mut self.path);
std::fs::remove_file(path).unwrap();
}
}
pub fn get_append_vec_path(path: &str) -> TempFile {
let out_dir = std::env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let mut buf = PathBuf::new();
buf.push(&format!("{}/{}", out_dir, path));
let _ignored = remove_dir_all(out_dir.clone());
create_dir_all(out_dir).expect("Create directory failed");
TempFile { path: buf }
}
pub fn create_test_account(sample: usize) -> Account {
let data_len = sample % 256;
let mut account = Account::new(sample as u64, 0, &Pubkey::default());
account.data = (0..data_len).map(|_| data_len as u8).collect();
account
}
}
#[cfg(test)]
pub mod tests {
use super::test_utils::*;
use super::*;
use log::*;
use rand::{thread_rng, Rng};
use solana_sdk::timing::{duration_as_ms, duration_as_s};
use std::sync::atomic::{AtomicUsize, Ordering};
use solana_sdk::timing::duration_as_ms;
use std::time::Instant;
const START_SIZE: u64 = 4 * 1024 * 1024;
const INC_SIZE: u64 = 1 * 1024 * 1024;
#[test]
fn test_append_vec() {
let path = Path::new("append_vec");
let av = AppendVec::new(path, true, START_SIZE, INC_SIZE);
let val: u64 = 5;
let index = av.append(val).unwrap();
assert_eq!(*av.get(index), val);
let val1 = val + 1;
let index1 = av.append(val1).unwrap();
assert_eq!(*av.get(index), val);
assert_eq!(*av.get(index1), val1);
std::fs::remove_file(path).unwrap();
let path = get_append_vec_path("test_append");
let av = AppendVec::new(&path.path, true, 1024 * 1024);
let account = create_test_account(0);
let index = av.append_account(&account).unwrap();
assert_eq!(*av.get_account(index), account);
}
#[test]
fn test_append_vec_account() {
let path = Path::new("append_vec_account");
let av: AppendVec<Account> = AppendVec::new(path, true, START_SIZE, INC_SIZE);
let v1 = vec![1u8; 32];
let mut account1 = Account {
lamports: 1,
data: v1,
owner: Pubkey::default(),
executable: false,
};
fn test_append_vec_data() {
let path = get_append_vec_path("test_append_data");
let av = AppendVec::new(&path.path, true, 1024 * 1024);
let account = create_test_account(5);
let index = av.append_account(&account).unwrap();
assert_eq!(*av.get_account(index), account);
let account1 = create_test_account(6);
let index1 = av.append_account(&account1).unwrap();
assert_eq!(index1, 0);
assert_eq!(av.get_account(index1).unwrap(), account1);
let v2 = vec![4u8; 32];
let mut account2 = Account {
lamports: 1,
data: v2,
owner: Pubkey::default(),
executable: false,
};
let index2 = av.append_account(&account2).unwrap();
let mut len = get_serialized_size(&account1) + SIZEOF_U64 as usize;
assert_eq!(index2, len as u64);
assert_eq!(av.get_account(index2).unwrap(), account2);
assert_eq!(av.get_account(index1).unwrap(), account1);
account2.data.iter_mut().for_each(|e| *e *= 2);
let index3 = av.append_account(&account2).unwrap();
len += get_serialized_size(&account2) + SIZEOF_U64 as usize;
assert_eq!(index3, len as u64);
assert_eq!(av.get_account(index3).unwrap(), account2);
account1.data.extend([1, 2, 3, 4, 5, 6].iter().cloned());
let index4 = av.append_account(&account1).unwrap();
len += get_serialized_size(&account2) + SIZEOF_U64 as usize;
assert_eq!(index4, len as u64);
assert_eq!(av.get_account(index4).unwrap(), account1);
std::fs::remove_file(path).unwrap();
assert_eq!(*av.get_account(index), account);
assert_eq!(*av.get_account(index1), account1);
}
#[test]
fn test_grow_append_vec() {
let path = Path::new("grow");
let mut av = AppendVec::new(path, true, START_SIZE, INC_SIZE);
let mut val = [5u64; 32];
let size = 100_000;
let mut offsets = vec![0; size];
fn test_append_vec_append_many() {
let path = get_append_vec_path("test_append_many");
let av = AppendVec::new(&path.path, true, 1024 * 1024);
let size = 1000;
let mut indexes = vec![];
let now = Instant::now();
for sample in 0..size {
let account = create_test_account(sample);
let pos = av.append_account(&account).unwrap();
assert_eq!(*av.get_account(pos), account);
indexes.push(pos)
}
trace!("append time: {} ms", duration_as_ms(&now.elapsed()),);
let now = Instant::now();
for index in 0..size {
if let Some(offset) = av.append(val) {
offsets[index] = offset;
} else {
assert!(av.grow_file().is_ok());
if let Some(offset) = av.append(val) {
offsets[index] = offset;
} else {
assert!(false);
}
}
val[0] += 1;
for _ in 0..size {
let sample = thread_rng().gen_range(0, indexes.len());
let account = create_test_account(sample);
assert_eq!(*av.get_account(indexes[sample]), account);
}
info!(
"time: {} ms {} / s",
duration_as_ms(&now.elapsed()),
((mem::size_of::<[u64; 32]>() * size) as f32) / duration_as_s(&now.elapsed()),
);
trace!("random read time: {} ms", duration_as_ms(&now.elapsed()),);
let now = Instant::now();
let num_reads = 100_000;
for _ in 0..num_reads {
let index = thread_rng().gen_range(0, size);
assert_eq!(av.get(offsets[index])[0], (index + 5) as u64);
assert_eq!(indexes.len(), size);
assert_eq!(indexes[0], 0);
let accounts = av.accounts(indexes[0]);
assert_eq!(accounts.len(), size);
for (sample, v) in accounts.iter().enumerate() {
let account = create_test_account(sample);
assert_eq!(**v, account)
}
info!(
"time: {} ms {} / s",
trace!(
"sequential read time: {} ms",
duration_as_ms(&now.elapsed()),
(num_reads as f32) / duration_as_s(&now.elapsed()),
);
std::fs::remove_file(path).unwrap();
}
#[test]
fn random_atomic_change() {
let path = Path::new("random");
let mut vec = AppendVec::<AtomicUsize>::new(path, true, START_SIZE, INC_SIZE);
let size = 1_000;
for k in 0..size {
if vec.append(AtomicUsize::new(k)).is_none() {
assert!(vec.grow_file().is_ok());
assert!(vec.append(AtomicUsize::new(0)).is_some());
}
}
let index = thread_rng().gen_range(0, size as u64);
let atomic1 = vec.get(index * mem::size_of::<AtomicUsize>() as u64);
let current1 = atomic1.load(Ordering::Relaxed);
assert_eq!(current1, index as usize);
let next = current1 + 1;
let index = vec.append(AtomicUsize::new(next)).unwrap();
let atomic2 = vec.get(index);
let current2 = atomic2.load(Ordering::Relaxed);
assert_eq!(current2, next);
std::fs::remove_file(path).unwrap();
}
}