Squashed 'src/leveldb/' changes from 936b461..e991315

e991315 Merge upstream LevelDB 1.15.
0cfb990 Release LevelDB 1.15
02ac9f1 Merge upstream LevelDB 1.14.
0b9a89f Release LevelDB 1.14

git-subtree-dir: src/leveldb
git-subtree-split: e991315d7fe4ca84a98902578106cbffa3dcccfd
This commit is contained in:
Pieter Wuille 2013-12-12 22:08:18 +01:00
parent eed29f0f50
commit 55c6890294
31 changed files with 449 additions and 366 deletions

View File

@ -9,3 +9,4 @@ Sanjay Ghemawat <sanjay@google.com>
# Partial list of contributors: # Partial list of contributors:
Kevin Regan <kevin.d.regan@gmail.com> Kevin Regan <kevin.d.regan@gmail.com>
Johan Bilien <jobi@litl.com>

View File

@ -44,6 +44,7 @@ TESTS = \
filename_test \ filename_test \
filter_block_test \ filter_block_test \
issue178_test \ issue178_test \
issue200_test \
log_test \ log_test \
memenv_test \ memenv_test \
skiplist_test \ skiplist_test \
@ -71,7 +72,7 @@ SHARED = $(SHARED1)
else else
# Update db.h if you change these. # Update db.h if you change these.
SHARED_MAJOR = 1 SHARED_MAJOR = 1
SHARED_MINOR = 13 SHARED_MINOR = 15
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT) SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR) SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR) SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
@ -154,6 +155,9 @@ filter_block_test: table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS)
issue178_test: issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS) issue178_test: issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) $(CXX) $(LDFLAGS) issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
issue200_test: issues/issue200_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) issues/issue200_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) $(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
@ -191,14 +195,14 @@ IOSVERSION=$(shell defaults read $(PLATFORMSROOT)/iPhoneOS.platform/version CFBu
mkdir -p ios-x86/$(dir $@) mkdir -p ios-x86/$(dir $@)
$(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@ $(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@) mkdir -p ios-arm/$(dir $@)
$(DEVICEROOT)/usr/bin/$(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@ xcrun -sdk iphoneos $(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@ lipo ios-x86/$@ ios-arm/$@ -create -output $@
.c.o: .c.o:
mkdir -p ios-x86/$(dir $@) mkdir -p ios-x86/$(dir $@)
$(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@ $(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@) mkdir -p ios-arm/$(dir $@)
$(DEVICEROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@ xcrun -sdk iphoneos $(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@ lipo ios-x86/$@ ios-arm/$@ -create -output $@
else else

View File

@ -137,6 +137,16 @@ case "$TARGET_OS" in
# man ld: +h internal_name # man ld: +h internal_name
PLATFORM_SHARED_LDFLAGS="-shared -Wl,+h -Wl," PLATFORM_SHARED_LDFLAGS="-shared -Wl,+h -Wl,"
;; ;;
IOS)
PLATFORM=IOS
COMMON_FLAGS="$MEMCMP_FLAG -DOS_MACOSX"
[ -z "$INSTALL_PATH" ] && INSTALL_PATH=`pwd`
PORT_FILE=port/port_posix.cc
PLATFORM_SHARED_EXT=
PLATFORM_SHARED_LDFLAGS=
PLATFORM_SHARED_CFLAGS=
PLATFORM_SHARED_VERSIONED=
;;
OS_WINDOWS_CROSSCOMPILE | NATIVE_WINDOWS) OS_WINDOWS_CROSSCOMPILE | NATIVE_WINDOWS)
PLATFORM=OS_WINDOWS PLATFORM=OS_WINDOWS
COMMON_FLAGS="-fno-builtin-memcmp -D_REENTRANT -DOS_WINDOWS -DLEVELDB_PLATFORM_WINDOWS -DWINVER=0x0500 -D__USE_MINGW_ANSI_STDIO=1" COMMON_FLAGS="-fno-builtin-memcmp -D_REENTRANT -DOS_WINDOWS -DLEVELDB_PLATFORM_WINDOWS -DWINVER=0x0500 -D__USE_MINGW_ANSI_STDIO=1"

View File

@ -75,7 +75,13 @@ class CorruptionTest {
Slice key = Key(i, &key_space); Slice key = Key(i, &key_space);
batch.Clear(); batch.Clear();
batch.Put(key, Value(i, &value_space)); batch.Put(key, Value(i, &value_space));
ASSERT_OK(db_->Write(WriteOptions(), &batch)); WriteOptions options;
// Corrupt() doesn't work without this sync on windows; stat reports 0 for
// the file size.
if (i == n - 1) {
options.sync = true;
}
ASSERT_OK(db_->Write(options, &batch));
} }
} }
@ -125,7 +131,7 @@ class CorruptionTest {
FileType type; FileType type;
std::string fname; std::string fname;
int picked_number = -1; int picked_number = -1;
for (int i = 0; i < filenames.size(); i++) { for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) && if (ParseFileName(filenames[i], &number, &type) &&
type == filetype && type == filetype &&
int(number) > picked_number) { // Pick latest file int(number) > picked_number) { // Pick latest file
@ -238,6 +244,22 @@ TEST(CorruptionTest, TableFile) {
Check(90, 99); Check(90, 99);
} }
TEST(CorruptionTest, TableFileRepair) {
options_.block_size = 2 * kValueSize; // Limit scope of corruption
options_.paranoid_checks = true;
Reopen();
Build(100);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
dbi->TEST_CompactRange(0, NULL, NULL);
dbi->TEST_CompactRange(1, NULL, NULL);
Corrupt(kTableFile, 100, 1);
RepairDB();
Reopen();
Check(95, 99);
}
TEST(CorruptionTest, TableFileIndexData) { TEST(CorruptionTest, TableFileIndexData) {
Build(10000); // Enough to build multiple Tables Build(10000); // Enough to build multiple Tables
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);

View File

@ -128,7 +128,7 @@ class RandomGenerator {
pos_ = 0; pos_ = 0;
} }
Slice Generate(int len) { Slice Generate(size_t len) {
if (pos_ + len > data_.size()) { if (pos_ + len > data_.size()) {
pos_ = 0; pos_ = 0;
assert(len < data_.size()); assert(len < data_.size());
@ -139,11 +139,11 @@ class RandomGenerator {
}; };
static Slice TrimSpace(Slice s) { static Slice TrimSpace(Slice s) {
int start = 0; size_t start = 0;
while (start < s.size() && isspace(s[start])) { while (start < s.size() && isspace(s[start])) {
start++; start++;
} }
int limit = s.size(); size_t limit = s.size();
while (limit > start && isspace(s[limit-1])) { while (limit > start && isspace(s[limit-1])) {
limit--; limit--;
} }
@ -399,7 +399,7 @@ class Benchmark {
heap_counter_(0) { heap_counter_(0) {
std::vector<std::string> files; std::vector<std::string> files;
Env::Default()->GetChildren(FLAGS_db, &files); Env::Default()->GetChildren(FLAGS_db, &files);
for (int i = 0; i < files.size(); i++) { for (size_t i = 0; i < files.size(); i++) {
if (Slice(files[i]).starts_with("heap-")) { if (Slice(files[i]).starts_with("heap-")) {
Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]); Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
} }

View File

@ -133,8 +133,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
seed_(0), seed_(0),
tmp_batch_(new WriteBatch), tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false), bg_compaction_scheduled_(false),
manual_compaction_(NULL), manual_compaction_(NULL) {
consecutive_compaction_errors_(0) {
mem_->Ref(); mem_->Ref();
has_imm_.Release_Store(NULL); has_imm_.Release_Store(NULL);
@ -217,6 +216,12 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
} }
void DBImpl::DeleteObsoleteFiles() { void DBImpl::DeleteObsoleteFiles() {
if (!bg_error_.ok()) {
// After a background error, we don't know whether a new version may
// or may not have been committed, so we cannot safely garbage collect.
return;
}
// Make a set of all of the live files // Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_; std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live); versions_->AddLiveFiles(&live);
@ -495,7 +500,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
return s; return s;
} }
Status DBImpl::CompactMemTable() { void DBImpl::CompactMemTable() {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(imm_ != NULL); assert(imm_ != NULL);
@ -523,9 +528,9 @@ Status DBImpl::CompactMemTable() {
imm_ = NULL; imm_ = NULL;
has_imm_.Release_Store(NULL); has_imm_.Release_Store(NULL);
DeleteObsoleteFiles(); DeleteObsoleteFiles();
} else {
RecordBackgroundError(s);
} }
return s;
} }
void DBImpl::CompactRange(const Slice* begin, const Slice* end) { void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
@ -568,16 +573,18 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
} }
MutexLock l(&mutex_); MutexLock l(&mutex_);
while (!manual.done) { while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
while (manual_compaction_ != NULL) { if (manual_compaction_ == NULL) { // Idle
bg_cv_.Wait(); manual_compaction_ = &manual;
} MaybeScheduleCompaction();
manual_compaction_ = &manual; } else { // Running either my compaction or another compaction.
MaybeScheduleCompaction();
while (manual_compaction_ == &manual) {
bg_cv_.Wait(); bg_cv_.Wait();
} }
} }
if (manual_compaction_ == &manual) {
// Cancel my manual compaction since we aborted early for some reason.
manual_compaction_ = NULL;
}
} }
Status DBImpl::TEST_CompactMemTable() { Status DBImpl::TEST_CompactMemTable() {
@ -596,12 +603,22 @@ Status DBImpl::TEST_CompactMemTable() {
return s; return s;
} }
void DBImpl::RecordBackgroundError(const Status& s) {
mutex_.AssertHeld();
if (bg_error_.ok()) {
bg_error_ = s;
bg_cv_.SignalAll();
}
}
void DBImpl::MaybeScheduleCompaction() { void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld(); mutex_.AssertHeld();
if (bg_compaction_scheduled_) { if (bg_compaction_scheduled_) {
// Already scheduled // Already scheduled
} else if (shutting_down_.Acquire_Load()) { } else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions // DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == NULL && } else if (imm_ == NULL &&
manual_compaction_ == NULL && manual_compaction_ == NULL &&
!versions_->NeedsCompaction()) { !versions_->NeedsCompaction()) {
@ -619,30 +636,12 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() { void DBImpl::BackgroundCall() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
assert(bg_compaction_scheduled_); assert(bg_compaction_scheduled_);
if (!shutting_down_.Acquire_Load()) { if (shutting_down_.Acquire_Load()) {
Status s = BackgroundCompaction(); // No more background work when shutting down.
if (s.ok()) { } else if (!bg_error_.ok()) {
// Success // No more background work after a background error.
consecutive_compaction_errors_ = 0; } else {
} else if (shutting_down_.Acquire_Load()) { BackgroundCompaction();
// Error most likely due to shutdown; do not wait
} else {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
++consecutive_compaction_errors_;
int seconds_to_sleep = 1;
for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) {
seconds_to_sleep *= 2;
}
env_->SleepForMicroseconds(seconds_to_sleep * 1000000);
mutex_.Lock();
}
} }
bg_compaction_scheduled_ = false; bg_compaction_scheduled_ = false;
@ -653,11 +652,12 @@ void DBImpl::BackgroundCall() {
bg_cv_.SignalAll(); bg_cv_.SignalAll();
} }
Status DBImpl::BackgroundCompaction() { void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld(); mutex_.AssertHeld();
if (imm_ != NULL) { if (imm_ != NULL) {
return CompactMemTable(); CompactMemTable();
return;
} }
Compaction* c; Compaction* c;
@ -691,6 +691,9 @@ Status DBImpl::BackgroundCompaction() {
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest); f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_); status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp; VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), static_cast<unsigned long long>(f->number),
@ -701,6 +704,9 @@ Status DBImpl::BackgroundCompaction() {
} else { } else {
CompactionState* compact = new CompactionState(c); CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact); status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact); CleanupCompaction(compact);
c->ReleaseInputs(); c->ReleaseInputs();
DeleteObsoleteFiles(); DeleteObsoleteFiles();
@ -714,9 +720,6 @@ Status DBImpl::BackgroundCompaction() {
} else { } else {
Log(options_.info_log, Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str()); "Compaction error: %s", status.ToString().c_str());
if (options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
}
} }
if (is_manual) { if (is_manual) {
@ -732,7 +735,6 @@ Status DBImpl::BackgroundCompaction() {
} }
manual_compaction_ = NULL; manual_compaction_ = NULL;
} }
return status;
} }
void DBImpl::CleanupCompaction(CompactionState* compact) { void DBImpl::CleanupCompaction(CompactionState* compact) {
@ -1002,6 +1004,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (status.ok()) { if (status.ok()) {
status = InstallCompactionResults(compact); status = InstallCompactionResults(compact);
} }
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp; VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp)); "compacted to: %s", versions_->LevelSummary(&tmp));
@ -1185,13 +1190,23 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
{ {
mutex_.Unlock(); mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates)); status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) { if (status.ok() && options.sync) {
status = logfile_->Sync(); status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
} }
if (status.ok()) { if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_); status = WriteBatchInternal::InsertInto(updates, mem_);
} }
mutex_.Lock(); mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
} }
if (updates == tmp_batch_) tmp_batch_->Clear(); if (updates == tmp_batch_) tmp_batch_->Clear();

View File

@ -87,8 +87,8 @@ class DBImpl : public DB {
// Compact the in-memory write buffer to disk. Switches to a new // Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful. // log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable() // Errors are recorded in bg_error_.
EXCLUSIVE_LOCKS_REQUIRED(mutex_); void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status RecoverLogFile(uint64_t log_number, Status RecoverLogFile(uint64_t log_number,
VersionEdit* edit, VersionEdit* edit,
@ -102,10 +102,12 @@ class DBImpl : public DB {
EXCLUSIVE_LOCKS_REQUIRED(mutex_); EXCLUSIVE_LOCKS_REQUIRED(mutex_);
WriteBatch* BuildBatchGroup(Writer** last_writer); WriteBatch* BuildBatchGroup(Writer** last_writer);
void RecordBackgroundError(const Status& s);
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void* db); static void BGWork(void* db);
void BackgroundCall(); void BackgroundCall();
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState* compact) void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_); EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState* compact) Status DoCompactionWork(CompactionState* compact)
@ -170,7 +172,6 @@ class DBImpl : public DB {
// Have we encountered a background error in paranoid mode? // Have we encountered a background error in paranoid mode?
Status bg_error_; Status bg_error_;
int consecutive_compaction_errors_;
// Per level compaction stats. stats_[level] stores the stats for // Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level". // compactions that produced data for the specified "level".

View File

@ -161,12 +161,13 @@ void DBIter::Next() {
saved_key_.clear(); saved_key_.clear();
return; return;
} }
// saved_key_ already contains the key to skip past.
} else {
// Store in saved_key_ the current key so we skip it below.
SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
} }
// Temporarily use saved_key_ as storage for key to skip. FindNextUserEntry(true, &saved_key_);
std::string* skip = &saved_key_;
SaveKey(ExtractUserKey(iter_->key()), skip);
FindNextUserEntry(true, skip);
} }
void DBIter::FindNextUserEntry(bool skipping, std::string* skip) { void DBIter::FindNextUserEntry(bool skipping, std::string* skip) {

View File

@ -57,8 +57,11 @@ void DelayMilliseconds(int millis) {
// Special Env used to delay background operations // Special Env used to delay background operations
class SpecialEnv : public EnvWrapper { class SpecialEnv : public EnvWrapper {
public: public:
// sstable Sync() calls are blocked while this pointer is non-NULL. // sstable/log Sync() calls are blocked while this pointer is non-NULL.
port::AtomicPointer delay_sstable_sync_; port::AtomicPointer delay_data_sync_;
// sstable/log Sync() calls return an error.
port::AtomicPointer data_sync_error_;
// Simulate no-space errors while this pointer is non-NULL. // Simulate no-space errors while this pointer is non-NULL.
port::AtomicPointer no_space_; port::AtomicPointer no_space_;
@ -75,11 +78,9 @@ class SpecialEnv : public EnvWrapper {
bool count_random_reads_; bool count_random_reads_;
AtomicCounter random_read_counter_; AtomicCounter random_read_counter_;
AtomicCounter sleep_counter_;
AtomicCounter sleep_time_counter_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) { explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(NULL); delay_data_sync_.Release_Store(NULL);
data_sync_error_.Release_Store(NULL);
no_space_.Release_Store(NULL); no_space_.Release_Store(NULL);
non_writable_.Release_Store(NULL); non_writable_.Release_Store(NULL);
count_random_reads_ = false; count_random_reads_ = false;
@ -88,17 +89,17 @@ class SpecialEnv : public EnvWrapper {
} }
Status NewWritableFile(const std::string& f, WritableFile** r) { Status NewWritableFile(const std::string& f, WritableFile** r) {
class SSTableFile : public WritableFile { class DataFile : public WritableFile {
private: private:
SpecialEnv* env_; SpecialEnv* env_;
WritableFile* base_; WritableFile* base_;
public: public:
SSTableFile(SpecialEnv* env, WritableFile* base) DataFile(SpecialEnv* env, WritableFile* base)
: env_(env), : env_(env),
base_(base) { base_(base) {
} }
~SSTableFile() { delete base_; } ~DataFile() { delete base_; }
Status Append(const Slice& data) { Status Append(const Slice& data) {
if (env_->no_space_.Acquire_Load() != NULL) { if (env_->no_space_.Acquire_Load() != NULL) {
// Drop writes on the floor // Drop writes on the floor
@ -110,7 +111,10 @@ class SpecialEnv : public EnvWrapper {
Status Close() { return base_->Close(); } Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); } Status Flush() { return base_->Flush(); }
Status Sync() { Status Sync() {
while (env_->delay_sstable_sync_.Acquire_Load() != NULL) { if (env_->data_sync_error_.Acquire_Load() != NULL) {
return Status::IOError("simulated data sync error");
}
while (env_->delay_data_sync_.Acquire_Load() != NULL) {
DelayMilliseconds(100); DelayMilliseconds(100);
} }
return base_->Sync(); return base_->Sync();
@ -147,8 +151,9 @@ class SpecialEnv : public EnvWrapper {
Status s = target()->NewWritableFile(f, r); Status s = target()->NewWritableFile(f, r);
if (s.ok()) { if (s.ok()) {
if (strstr(f.c_str(), ".sst") != NULL) { if (strstr(f.c_str(), ".ldb") != NULL ||
*r = new SSTableFile(this, *r); strstr(f.c_str(), ".log") != NULL) {
*r = new DataFile(this, *r);
} else if (strstr(f.c_str(), "MANIFEST") != NULL) { } else if (strstr(f.c_str(), "MANIFEST") != NULL) {
*r = new ManifestFile(this, *r); *r = new ManifestFile(this, *r);
} }
@ -179,12 +184,6 @@ class SpecialEnv : public EnvWrapper {
} }
return s; return s;
} }
virtual void SleepForMicroseconds(int micros) {
sleep_counter_.Increment();
sleep_time_counter_.IncrementBy(micros);
}
}; };
class DBTest { class DBTest {
@ -322,7 +321,7 @@ class DBTest {
} }
// Check reverse iteration results are the reverse of forward results // Check reverse iteration results are the reverse of forward results
int matched = 0; size_t matched = 0;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_LT(matched, forward.size()); ASSERT_LT(matched, forward.size());
ASSERT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]); ASSERT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]);
@ -484,6 +483,24 @@ class DBTest {
} }
return false; return false;
} }
// Returns number of files renamed.
int RenameLDBToSST() {
std::vector<std::string> filenames;
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
uint64_t number;
FileType type;
int files_renamed = 0;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) {
const std::string from = TableFileName(dbname_, number);
const std::string to = SSTTableFileName(dbname_, number);
ASSERT_OK(env_->RenameFile(from, to));
files_renamed++;
}
}
return files_renamed;
}
}; };
TEST(DBTest, Empty) { TEST(DBTest, Empty) {
@ -525,11 +542,11 @@ TEST(DBTest, GetFromImmutableLayer) {
ASSERT_OK(Put("foo", "v1")); ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo")); ASSERT_EQ("v1", Get("foo"));
env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls env_->delay_data_sync_.Release_Store(env_); // Block sync calls
Put("k1", std::string(100000, 'x')); // Fill memtable Put("k1", std::string(100000, 'x')); // Fill memtable
Put("k2", std::string(100000, 'y')); // Trigger compaction Put("k2", std::string(100000, 'y')); // Trigger compaction
ASSERT_EQ("v1", Get("foo")); ASSERT_EQ("v1", Get("foo"));
env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls env_->delay_data_sync_.Release_Store(NULL); // Release sync calls
} while (ChangeOptions()); } while (ChangeOptions());
} }
@ -1516,41 +1533,13 @@ TEST(DBTest, NoSpace) {
Compact("a", "z"); Compact("a", "z");
const int num_files = CountFiles(); const int num_files = CountFiles();
env_->no_space_.Release_Store(env_); // Force out-of-space errors env_->no_space_.Release_Store(env_); // Force out-of-space errors
env_->sleep_counter_.Reset(); for (int i = 0; i < 10; i++) {
for (int i = 0; i < 5; i++) {
for (int level = 0; level < config::kNumLevels-1; level++) { for (int level = 0; level < config::kNumLevels-1; level++) {
dbfull()->TEST_CompactRange(level, NULL, NULL); dbfull()->TEST_CompactRange(level, NULL, NULL);
} }
} }
env_->no_space_.Release_Store(NULL); env_->no_space_.Release_Store(NULL);
ASSERT_LT(CountFiles(), num_files + 3); ASSERT_LT(CountFiles(), num_files + 3);
// Check that compaction attempts slept after errors
ASSERT_GE(env_->sleep_counter_.Read(), 5);
}
TEST(DBTest, ExponentialBackoff) {
Options options = CurrentOptions();
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
Compact("a", "z");
env_->non_writable_.Release_Store(env_); // Force errors for new files
env_->sleep_counter_.Reset();
env_->sleep_time_counter_.Reset();
for (int i = 0; i < 5; i++) {
dbfull()->TEST_CompactRange(2, NULL, NULL);
}
env_->non_writable_.Release_Store(NULL);
// Wait for compaction to finish
DelayMilliseconds(1000);
ASSERT_GE(env_->sleep_counter_.Read(), 5);
ASSERT_LT(env_->sleep_counter_.Read(), 10);
ASSERT_GE(env_->sleep_time_counter_.Read(), 10e6);
} }
TEST(DBTest, NonWritableFileSystem) { TEST(DBTest, NonWritableFileSystem) {
@ -1573,6 +1562,37 @@ TEST(DBTest, NonWritableFileSystem) {
env_->non_writable_.Release_Store(NULL); env_->non_writable_.Release_Store(NULL);
} }
TEST(DBTest, WriteSyncError) {
// Check that log sync errors cause the DB to disallow future writes.
// (a) Cause log sync calls to fail
Options options = CurrentOptions();
options.env = env_;
Reopen(&options);
env_->data_sync_error_.Release_Store(env_);
// (b) Normal write should succeed
WriteOptions w;
ASSERT_OK(db_->Put(w, "k1", "v1"));
ASSERT_EQ("v1", Get("k1"));
// (c) Do a sync write; should fail
w.sync = true;
ASSERT_TRUE(!db_->Put(w, "k2", "v2").ok());
ASSERT_EQ("v1", Get("k1"));
ASSERT_EQ("NOT_FOUND", Get("k2"));
// (d) make sync behave normally
env_->data_sync_error_.Release_Store(NULL);
// (e) Do a non-sync write; should fail
w.sync = false;
ASSERT_TRUE(!db_->Put(w, "k3", "v3").ok());
ASSERT_EQ("v1", Get("k1"));
ASSERT_EQ("NOT_FOUND", Get("k2"));
ASSERT_EQ("NOT_FOUND", Get("k3"));
}
TEST(DBTest, ManifestWriteError) { TEST(DBTest, ManifestWriteError) {
// Test for the following problem: // Test for the following problem:
// (a) Compaction produces file F // (a) Compaction produces file F
@ -1632,6 +1652,22 @@ TEST(DBTest, MissingSSTFile) {
<< s.ToString(); << s.ToString();
} }
TEST(DBTest, StillReadSST) {
ASSERT_OK(Put("foo", "bar"));
ASSERT_EQ("bar", Get("foo"));
// Dump the memtable to disk.
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("bar", Get("foo"));
Close();
ASSERT_GT(RenameLDBToSST(), 0);
Options options = CurrentOptions();
options.paranoid_checks = true;
Status s = TryReopen(&options);
ASSERT_TRUE(s.ok());
ASSERT_EQ("bar", Get("foo"));
}
TEST(DBTest, FilesDeletedAfterCompaction) { TEST(DBTest, FilesDeletedAfterCompaction) {
ASSERT_OK(Put("foo", "v2")); ASSERT_OK(Put("foo", "v2"));
Compact("a", "z"); Compact("a", "z");
@ -1663,7 +1699,7 @@ TEST(DBTest, BloomFilter) {
dbfull()->TEST_CompactMemTable(); dbfull()->TEST_CompactMemTable();
// Prevent auto compactions triggered by seeks // Prevent auto compactions triggered by seeks
env_->delay_sstable_sync_.Release_Store(env_); env_->delay_data_sync_.Release_Store(env_);
// Lookup present keys. Should rarely read from small sstable. // Lookup present keys. Should rarely read from small sstable.
env_->random_read_counter_.Reset(); env_->random_read_counter_.Reset();
@ -1684,7 +1720,7 @@ TEST(DBTest, BloomFilter) {
fprintf(stderr, "%d missing => %d reads\n", N, reads); fprintf(stderr, "%d missing => %d reads\n", N, reads);
ASSERT_LE(reads, 3*N/100); ASSERT_LE(reads, 3*N/100);
env_->delay_sstable_sync_.Release_Store(NULL); env_->delay_data_sync_.Release_Store(NULL);
Close(); Close();
delete options.block_cache; delete options.block_cache;
delete options.filter_policy; delete options.filter_policy;
@ -1744,7 +1780,7 @@ static void MTThreadBody(void* arg) {
ASSERT_EQ(k, key); ASSERT_EQ(k, key);
ASSERT_GE(w, 0); ASSERT_GE(w, 0);
ASSERT_LT(w, kNumThreads); ASSERT_LT(w, kNumThreads);
ASSERT_LE(c, reinterpret_cast<uintptr_t>( ASSERT_LE(static_cast<uintptr_t>(c), reinterpret_cast<uintptr_t>(
t->state->counter[w].Acquire_Load())); t->state->counter[w].Acquire_Load()));
} }
} }

View File

@ -30,6 +30,11 @@ std::string LogFileName(const std::string& name, uint64_t number) {
} }
std::string TableFileName(const std::string& name, uint64_t number) { std::string TableFileName(const std::string& name, uint64_t number) {
assert(number > 0);
return MakeFileName(name, number, "ldb");
}
std::string SSTTableFileName(const std::string& name, uint64_t number) {
assert(number > 0); assert(number > 0);
return MakeFileName(name, number, "sst"); return MakeFileName(name, number, "sst");
} }
@ -71,7 +76,7 @@ std::string OldInfoLogFileName(const std::string& dbname) {
// dbname/LOG // dbname/LOG
// dbname/LOG.old // dbname/LOG.old
// dbname/MANIFEST-[0-9]+ // dbname/MANIFEST-[0-9]+
// dbname/[0-9]+.(log|sst) // dbname/[0-9]+.(log|sst|ldb)
bool ParseFileName(const std::string& fname, bool ParseFileName(const std::string& fname,
uint64_t* number, uint64_t* number,
FileType* type) { FileType* type) {
@ -106,7 +111,7 @@ bool ParseFileName(const std::string& fname,
Slice suffix = rest; Slice suffix = rest;
if (suffix == Slice(".log")) { if (suffix == Slice(".log")) {
*type = kLogFile; *type = kLogFile;
} else if (suffix == Slice(".sst")) { } else if (suffix == Slice(".sst") || suffix == Slice(".ldb")) {
*type = kTableFile; *type = kTableFile;
} else if (suffix == Slice(".dbtmp")) { } else if (suffix == Slice(".dbtmp")) {
*type = kTempFile; *type = kTempFile;

View File

@ -37,6 +37,11 @@ extern std::string LogFileName(const std::string& dbname, uint64_t number);
// "dbname". // "dbname".
extern std::string TableFileName(const std::string& dbname, uint64_t number); extern std::string TableFileName(const std::string& dbname, uint64_t number);
// Return the legacy file name for an sstable with the specified number
// in the db named by "dbname". The result will be prefixed with
// "dbname".
extern std::string SSTTableFileName(const std::string& dbname, uint64_t number);
// Return the name of the descriptor file for the db named by // Return the name of the descriptor file for the db named by
// "dbname" and the specified incarnation number. The result will be // "dbname" and the specified incarnation number. The result will be
// prefixed with "dbname". // prefixed with "dbname".

View File

@ -27,6 +27,7 @@ TEST(FileNameTest, Parse) {
{ "100.log", 100, kLogFile }, { "100.log", 100, kLogFile },
{ "0.log", 0, kLogFile }, { "0.log", 0, kLogFile },
{ "0.sst", 0, kTableFile }, { "0.sst", 0, kTableFile },
{ "0.ldb", 0, kTableFile },
{ "CURRENT", 0, kCurrentFile }, { "CURRENT", 0, kCurrentFile },
{ "LOCK", 0, kDBLockFile }, { "LOCK", 0, kDBLockFile },
{ "MANIFEST-2", 2, kDescriptorFile }, { "MANIFEST-2", 2, kDescriptorFile },

View File

@ -244,60 +244,133 @@ class Repairer {
void ExtractMetaData() { void ExtractMetaData() {
std::vector<TableInfo> kept; std::vector<TableInfo> kept;
for (size_t i = 0; i < table_numbers_.size(); i++) { for (size_t i = 0; i < table_numbers_.size(); i++) {
TableInfo t; ScanTable(table_numbers_[i]);
t.meta.number = table_numbers_[i];
Status status = ScanTable(&t);
if (!status.ok()) {
std::string fname = TableFileName(dbname_, table_numbers_[i]);
Log(options_.info_log, "Table #%llu: ignoring %s",
(unsigned long long) table_numbers_[i],
status.ToString().c_str());
ArchiveFile(fname);
} else {
tables_.push_back(t);
}
} }
} }
Status ScanTable(TableInfo* t) { Iterator* NewTableIterator(const FileMetaData& meta) {
std::string fname = TableFileName(dbname_, t->meta.number); // Same as compaction iterators: if paranoid_checks are on, turn
int counter = 0; // on checksum verification.
Status status = env_->GetFileSize(fname, &t->meta.file_size); ReadOptions r;
if (status.ok()) { r.verify_checksums = options_.paranoid_checks;
Iterator* iter = table_cache_->NewIterator( return table_cache_->NewIterator(r, meta.number, meta.file_size);
ReadOptions(), t->meta.number, t->meta.file_size); }
bool empty = true;
ParsedInternalKey parsed;
t->max_sequence = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (!ParseInternalKey(key, &parsed)) {
Log(options_.info_log, "Table #%llu: unparsable key %s",
(unsigned long long) t->meta.number,
EscapeString(key).c_str());
continue;
}
counter++; void ScanTable(uint64_t number) {
if (empty) { TableInfo t;
empty = false; t.meta.number = number;
t->meta.smallest.DecodeFrom(key); std::string fname = TableFileName(dbname_, number);
} Status status = env_->GetFileSize(fname, &t.meta.file_size);
t->meta.largest.DecodeFrom(key); if (!status.ok()) {
if (parsed.sequence > t->max_sequence) { // Try alternate file name.
t->max_sequence = parsed.sequence; fname = SSTTableFileName(dbname_, number);
} Status s2 = env_->GetFileSize(fname, &t.meta.file_size);
if (s2.ok()) {
status = Status::OK();
} }
if (!iter->status().ok()) {
status = iter->status();
}
delete iter;
} }
if (!status.ok()) {
ArchiveFile(TableFileName(dbname_, number));
ArchiveFile(SSTTableFileName(dbname_, number));
Log(options_.info_log, "Table #%llu: dropped: %s",
(unsigned long long) t.meta.number,
status.ToString().c_str());
return;
}
// Extract metadata by scanning through table.
int counter = 0;
Iterator* iter = NewTableIterator(t.meta);
bool empty = true;
ParsedInternalKey parsed;
t.max_sequence = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (!ParseInternalKey(key, &parsed)) {
Log(options_.info_log, "Table #%llu: unparsable key %s",
(unsigned long long) t.meta.number,
EscapeString(key).c_str());
continue;
}
counter++;
if (empty) {
empty = false;
t.meta.smallest.DecodeFrom(key);
}
t.meta.largest.DecodeFrom(key);
if (parsed.sequence > t.max_sequence) {
t.max_sequence = parsed.sequence;
}
}
if (!iter->status().ok()) {
status = iter->status();
}
delete iter;
Log(options_.info_log, "Table #%llu: %d entries %s", Log(options_.info_log, "Table #%llu: %d entries %s",
(unsigned long long) t->meta.number, (unsigned long long) t.meta.number,
counter, counter,
status.ToString().c_str()); status.ToString().c_str());
return status;
if (status.ok()) {
tables_.push_back(t);
} else {
RepairTable(fname, t); // RepairTable archives input file.
}
}
void RepairTable(const std::string& src, TableInfo t) {
// We will copy src contents to a new table and then rename the
// new table over the source.
// Create builder.
std::string copy = TableFileName(dbname_, next_file_number_++);
WritableFile* file;
Status s = env_->NewWritableFile(copy, &file);
if (!s.ok()) {
return;
}
TableBuilder* builder = new TableBuilder(options_, file);
// Copy data.
Iterator* iter = NewTableIterator(t.meta);
int counter = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
builder->Add(iter->key(), iter->value());
counter++;
}
delete iter;
ArchiveFile(src);
if (counter == 0) {
builder->Abandon(); // Nothing to save
} else {
s = builder->Finish();
if (s.ok()) {
t.meta.file_size = builder->FileSize();
}
}
delete builder;
builder = NULL;
if (s.ok()) {
s = file->Close();
}
delete file;
file = NULL;
if (counter > 0 && s.ok()) {
std::string orig = TableFileName(dbname_, t.meta.number);
s = env_->RenameFile(copy, orig);
if (s.ok()) {
Log(options_.info_log, "Table #%llu: %d entries repaired",
(unsigned long long) t.meta.number, counter);
tables_.push_back(t);
}
}
if (!s.ok()) {
env_->DeleteFile(copy);
}
} }
Status WriteDescriptor() { Status WriteDescriptor() {

View File

@ -54,6 +54,12 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
RandomAccessFile* file = NULL; RandomAccessFile* file = NULL;
Table* table = NULL; Table* table = NULL;
s = env_->NewRandomAccessFile(fname, &file); s = env_->NewRandomAccessFile(fname, &file);
if (!s.ok()) {
std::string old_fname = SSTTableFileName(dbname_, file_number);
if (env_->NewRandomAccessFile(old_fname, &file).ok()) {
s = Status::OK();
}
}
if (s.ok()) { if (s.ok()) {
s = Table::Open(*options_, file, file_size, &table); s = Table::Open(*options_, file, file_size, &table);
} }

View File

@ -876,12 +876,6 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
} }
if (!s.ok()) { if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str()); Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
if (ManifestContains(record)) {
Log(options_->info_log,
"MANIFEST contains log record despite error; advancing to new "
"version to prevent mismatch between in-memory and logged state");
s = Status::OK();
}
} }
} }
@ -889,8 +883,6 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
// new CURRENT file that points to it. // new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) { if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_); s = SetCurrentFile(env_, dbname_, manifest_file_number_);
// No need to double-check MANIFEST in case of error since it
// will be discarded below.
} }
mu->Lock(); mu->Lock();
@ -1124,31 +1116,6 @@ const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
return scratch->buffer; return scratch->buffer;
} }
// Return true iff the manifest contains the specified record.
bool VersionSet::ManifestContains(const std::string& record) const {
std::string fname = DescriptorFileName(dbname_, manifest_file_number_);
Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str());
SequentialFile* file = NULL;
Status s = env_->NewSequentialFile(fname, &file);
if (!s.ok()) {
Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str());
return false;
}
log::Reader reader(file, NULL, true/*checksum*/, 0);
Slice r;
std::string scratch;
bool result = false;
while (reader.ReadRecord(&r, &scratch)) {
if (r == Slice(record)) {
result = true;
break;
}
}
delete file;
Log(options_->info_log, "ManifestContains: result = %d\n", result ? 1 : 0);
return result;
}
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0; uint64_t result = 0;
for (int level = 0; level < config::kNumLevels; level++) { for (int level = 0; level < config::kNumLevels; level++) {

View File

@ -292,8 +292,6 @@ class VersionSet {
void AppendVersion(Version* v); void AppendVersion(Version* v);
bool ManifestContains(const std::string& record) const;
Env* const env_; Env* const env_;
const std::string dbname_; const std::string dbname_;
const Options* const options_; const Options* const options_;

View File

@ -11,7 +11,7 @@
The implementation of leveldb is similar in spirit to the The implementation of leveldb is similar in spirit to the
representation of a single representation of a single
<a href="http://labs.google.com/papers/bigtable.html"> <a href="http://research.google.com/archive/bigtable.html">
Bigtable tablet (section 5.3)</a>. Bigtable tablet (section 5.3)</a>.
However the organization of the files that make up the representation However the organization of the files that make up the representation
is somewhat different and is explained below. is somewhat different and is explained below.

View File

@ -14,7 +14,7 @@ namespace leveldb {
// Update Makefile if you change these // Update Makefile if you change these
static const int kMajorVersion = 1; static const int kMajorVersion = 1;
static const int kMinorVersion = 13; static const int kMinorVersion = 15;
struct Options; struct Options;
struct ReadOptions; struct ReadOptions;

View File

@ -13,9 +13,9 @@
#ifndef STORAGE_LEVELDB_INCLUDE_ENV_H_ #ifndef STORAGE_LEVELDB_INCLUDE_ENV_H_
#define STORAGE_LEVELDB_INCLUDE_ENV_H_ #define STORAGE_LEVELDB_INCLUDE_ENV_H_
#include <cstdarg>
#include <string> #include <string>
#include <vector> #include <vector>
#include <stdarg.h>
#include <stdint.h> #include <stdint.h>
#include "leveldb/status.h" #include "leveldb/status.h"

59
issues/issue200_test.cc Normal file
View File

@ -0,0 +1,59 @@
// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// Test for issue 200: when iterator switches direction from backward
// to forward, the current key can be yielded unexpectedly if a new
// mutation has been added just before the current key.
#include "leveldb/db.h"
#include "util/testharness.h"
namespace leveldb {
class Issue200 { };
TEST(Issue200, Test) {
// Get rid of any state from an old run.
std::string dbpath = test::TmpDir() + "/leveldb_issue200_test";
DestroyDB(dbpath, Options());
DB *db;
Options options;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbpath, &db));
WriteOptions write_options;
ASSERT_OK(db->Put(write_options, "1", "b"));
ASSERT_OK(db->Put(write_options, "2", "c"));
ASSERT_OK(db->Put(write_options, "3", "d"));
ASSERT_OK(db->Put(write_options, "4", "e"));
ASSERT_OK(db->Put(write_options, "5", "f"));
ReadOptions read_options;
Iterator *iter = db->NewIterator(read_options);
// Add an element that should not be reflected in the iterator.
ASSERT_OK(db->Put(write_options, "25", "cd"));
iter->Seek("5");
ASSERT_EQ(iter->key().ToString(), "5");
iter->Prev();
ASSERT_EQ(iter->key().ToString(), "4");
iter->Prev();
ASSERT_EQ(iter->key().ToString(), "3");
iter->Next();
ASSERT_EQ(iter->key().ToString(), "4");
iter->Next();
ASSERT_EQ(iter->key().ToString(), "5");
delete iter;
delete db;
DestroyDB(dbpath, options);
}
} // namespace leveldb
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}

View File

@ -50,6 +50,13 @@ namespace port {
// http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx // http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx
#define LEVELDB_HAVE_MEMORY_BARRIER #define LEVELDB_HAVE_MEMORY_BARRIER
// Mac OS
#elif defined(OS_MACOSX)
inline void MemoryBarrier() {
OSMemoryBarrier();
}
#define LEVELDB_HAVE_MEMORY_BARRIER
// Gcc on x86 // Gcc on x86
#elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__) #elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__)
inline void MemoryBarrier() { inline void MemoryBarrier() {
@ -68,13 +75,6 @@ inline void MemoryBarrier() {
} }
#define LEVELDB_HAVE_MEMORY_BARRIER #define LEVELDB_HAVE_MEMORY_BARRIER
// Mac OS
#elif defined(OS_MACOSX)
inline void MemoryBarrier() {
OSMemoryBarrier();
}
#define LEVELDB_HAVE_MEMORY_BARRIER
// ARM Linux // ARM Linux
#elif defined(ARCH_CPU_ARM_FAMILY) && defined(__linux__) #elif defined(ARCH_CPU_ARM_FAMILY) && defined(__linux__)
typedef void (*LinuxKernelMemoryBarrierFunc)(void); typedef void (*LinuxKernelMemoryBarrierFunc)(void);

View File

@ -29,7 +29,7 @@ class TestHashFilter : public FilterPolicy {
virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const { virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const {
uint32_t h = Hash(key.data(), key.size(), 1); uint32_t h = Hash(key.data(), key.size(), 1);
for (int i = 0; i + 4 <= filter.size(); i += 4) { for (size_t i = 0; i + 4 <= filter.size(); i += 4) {
if (h == DecodeFixed32(filter.data() + i)) { if (h == DecodeFixed32(filter.data() + i)) {
return true; return true;
} }

View File

@ -40,7 +40,7 @@ char* Arena::AllocateFallback(size_t bytes) {
} }
char* Arena::AllocateAligned(size_t bytes) { char* Arena::AllocateAligned(size_t bytes) {
const int align = sizeof(void*); // We'll align to pointer size const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
assert((align & (align-1)) == 0); // Pointer size should be a power of 2 assert((align & (align-1)) == 0); // Pointer size should be a power of 2
size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1); size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
size_t slop = (current_mod == 0 ? 0 : align - current_mod); size_t slop = (current_mod == 0 ? 0 : align - current_mod);

View File

@ -5,9 +5,9 @@
#ifndef STORAGE_LEVELDB_UTIL_ARENA_H_ #ifndef STORAGE_LEVELDB_UTIL_ARENA_H_
#define STORAGE_LEVELDB_UTIL_ARENA_H_ #define STORAGE_LEVELDB_UTIL_ARENA_H_
#include <cstddef>
#include <vector> #include <vector>
#include <assert.h> #include <assert.h>
#include <stddef.h>
#include <stdint.h> #include <stdint.h>
namespace leveldb { namespace leveldb {

View File

@ -40,7 +40,7 @@ TEST(ArenaTest, Simple) {
r = arena.Allocate(s); r = arena.Allocate(s);
} }
for (int b = 0; b < s; b++) { for (size_t b = 0; b < s; b++) {
// Fill the "i"th allocation with a known bit pattern // Fill the "i"th allocation with a known bit pattern
r[b] = i % 256; r[b] = i % 256;
} }
@ -51,10 +51,10 @@ TEST(ArenaTest, Simple) {
ASSERT_LE(arena.MemoryUsage(), bytes * 1.10); ASSERT_LE(arena.MemoryUsage(), bytes * 1.10);
} }
} }
for (int i = 0; i < allocated.size(); i++) { for (size_t i = 0; i < allocated.size(); i++) {
size_t num_bytes = allocated[i].first; size_t num_bytes = allocated[i].first;
const char* p = allocated[i].second; const char* p = allocated[i].second;
for (int b = 0; b < num_bytes; b++) { for (size_t b = 0; b < num_bytes; b++) {
// Check the "i"th allocation for the known bit pattern // Check the "i"th allocation for the known bit pattern
ASSERT_EQ(int(p[b]) & 0xff, i % 256); ASSERT_EQ(int(p[b]) & 0xff, i % 256);
} }

View File

@ -126,7 +126,8 @@ TEST(BloomTest, VaryingLengths) {
} }
Build(); Build();
ASSERT_LE(FilterSize(), (length * 10 / 8) + 40) << length; ASSERT_LE(FilterSize(), static_cast<size_t>((length * 10 / 8) + 40))
<< length;
// All added keys must match // All added keys must match
for (int i = 0; i < length; i++) { for (int i = 0; i < length; i++) {

View File

@ -112,13 +112,13 @@ TEST(Coding, Varint64) {
} }
std::string s; std::string s;
for (int i = 0; i < values.size(); i++) { for (size_t i = 0; i < values.size(); i++) {
PutVarint64(&s, values[i]); PutVarint64(&s, values[i]);
} }
const char* p = s.data(); const char* p = s.data();
const char* limit = p + s.size(); const char* limit = p + s.size();
for (int i = 0; i < values.size(); i++) { for (size_t i = 0; i < values.size(); i++) {
ASSERT_TRUE(p < limit); ASSERT_TRUE(p < limit);
uint64_t actual; uint64_t actual;
const char* start = p; const char* start = p;
@ -143,7 +143,7 @@ TEST(Coding, Varint32Truncation) {
std::string s; std::string s;
PutVarint32(&s, large_value); PutVarint32(&s, large_value);
uint32_t result; uint32_t result;
for (int len = 0; len < s.size() - 1; len++) { for (size_t len = 0; len < s.size() - 1; len++) {
ASSERT_TRUE(GetVarint32Ptr(s.data(), s.data() + len, &result) == NULL); ASSERT_TRUE(GetVarint32Ptr(s.data(), s.data() + len, &result) == NULL);
} }
ASSERT_TRUE(GetVarint32Ptr(s.data(), s.data() + s.size(), &result) != NULL); ASSERT_TRUE(GetVarint32Ptr(s.data(), s.data() + s.size(), &result) != NULL);
@ -162,7 +162,7 @@ TEST(Coding, Varint64Truncation) {
std::string s; std::string s;
PutVarint64(&s, large_value); PutVarint64(&s, large_value);
uint64_t result; uint64_t result;
for (int len = 0; len < s.size() - 1; len++) { for (size_t len = 0; len < s.size() - 1; len++) {
ASSERT_TRUE(GetVarint64Ptr(s.data(), s.data() + len, &result) == NULL); ASSERT_TRUE(GetVarint64Ptr(s.data(), s.data() + len, &result) == NULL);
} }
ASSERT_TRUE(GetVarint64Ptr(s.data(), s.data() + s.size(), &result) != NULL); ASSERT_TRUE(GetVarint64Ptr(s.data(), s.data() + s.size(), &result) != NULL);

View File

@ -176,147 +176,43 @@ class PosixMmapReadableFile: public RandomAccessFile {
} }
}; };
// We preallocate up to an extra megabyte and use memcpy to append new class PosixWritableFile : public WritableFile {
// data to the file. This is safe since we either properly close the
// file before reading from it, or for log files, the reading code
// knows enough to skip zero suffixes.
class PosixMmapFile : public WritableFile {
private: private:
std::string filename_; std::string filename_;
int fd_; FILE* file_;
size_t page_size_;
size_t map_size_; // How much extra memory to map at a time
char* base_; // The mapped region
char* limit_; // Limit of the mapped region
char* dst_; // Where to write next (in range [base_,limit_])
char* last_sync_; // Where have we synced up to
uint64_t file_offset_; // Offset of base_ in file
// Have we done an munmap of unsynced data?
bool pending_sync_;
// Roundup x to a multiple of y
static size_t Roundup(size_t x, size_t y) {
return ((x + y - 1) / y) * y;
}
size_t TruncateToPageBoundary(size_t s) {
s -= (s & (page_size_ - 1));
assert((s % page_size_) == 0);
return s;
}
bool UnmapCurrentRegion() {
bool result = true;
if (base_ != NULL) {
if (last_sync_ < limit_) {
// Defer syncing this data until next Sync() call, if any
pending_sync_ = true;
}
if (munmap(base_, limit_ - base_) != 0) {
result = false;
}
file_offset_ += limit_ - base_;
base_ = NULL;
limit_ = NULL;
last_sync_ = NULL;
dst_ = NULL;
// Increase the amount we map the next time, but capped at 1MB
if (map_size_ < (1<<20)) {
map_size_ *= 2;
}
}
return result;
}
bool MapNewRegion() {
assert(base_ == NULL);
if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
return false;
}
void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
fd_, file_offset_);
if (ptr == MAP_FAILED) {
return false;
}
base_ = reinterpret_cast<char*>(ptr);
limit_ = base_ + map_size_;
dst_ = base_;
last_sync_ = base_;
return true;
}
public: public:
PosixMmapFile(const std::string& fname, int fd, size_t page_size) PosixWritableFile(const std::string& fname, FILE* f)
: filename_(fname), : filename_(fname), file_(f) { }
fd_(fd),
page_size_(page_size),
map_size_(Roundup(65536, page_size)),
base_(NULL),
limit_(NULL),
dst_(NULL),
last_sync_(NULL),
file_offset_(0),
pending_sync_(false) {
assert((page_size & (page_size - 1)) == 0);
}
~PosixWritableFile() {
~PosixMmapFile() { if (file_ != NULL) {
if (fd_ >= 0) { // Ignoring any potential errors
PosixMmapFile::Close(); fclose(file_);
} }
} }
virtual Status Append(const Slice& data) { virtual Status Append(const Slice& data) {
const char* src = data.data(); size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
size_t left = data.size(); if (r != data.size()) {
while (left > 0) { return IOError(filename_, errno);
assert(base_ <= dst_);
assert(dst_ <= limit_);
size_t avail = limit_ - dst_;
if (avail == 0) {
if (!UnmapCurrentRegion() ||
!MapNewRegion()) {
return IOError(filename_, errno);
}
}
size_t n = (left <= avail) ? left : avail;
memcpy(dst_, src, n);
dst_ += n;
src += n;
left -= n;
} }
return Status::OK(); return Status::OK();
} }
virtual Status Close() { virtual Status Close() {
Status s; Status result;
size_t unused = limit_ - dst_; if (fclose(file_) != 0) {
if (!UnmapCurrentRegion()) { result = IOError(filename_, errno);
s = IOError(filename_, errno);
} else if (unused > 0) {
// Trim the extra space at the end of the file
if (ftruncate(fd_, file_offset_ - unused) < 0) {
s = IOError(filename_, errno);
}
} }
file_ = NULL;
if (close(fd_) < 0) { return result;
if (s.ok()) {
s = IOError(filename_, errno);
}
}
fd_ = -1;
base_ = NULL;
limit_ = NULL;
return s;
} }
virtual Status Flush() { virtual Status Flush() {
if (fflush_unlocked(file_) != 0) {
return IOError(filename_, errno);
}
return Status::OK(); return Status::OK();
} }
@ -353,26 +249,10 @@ class PosixMmapFile : public WritableFile {
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
if (fflush_unlocked(file_) != 0 ||
if (pending_sync_) { fdatasync(fileno(file_)) != 0) {
// Some unmapped data was not synced s = Status::IOError(filename_, strerror(errno));
pending_sync_ = false;
if (fdatasync(fd_) < 0) {
s = IOError(filename_, errno);
}
} }
if (dst_ > last_sync_) {
// Find the beginnings of the pages that contain the first and last
// bytes to be synced.
size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
last_sync_ = dst_;
if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
s = IOError(filename_, errno);
}
}
return s; return s;
} }
}; };
@ -463,12 +343,12 @@ class PosixEnv : public Env {
virtual Status NewWritableFile(const std::string& fname, virtual Status NewWritableFile(const std::string& fname,
WritableFile** result) { WritableFile** result) {
Status s; Status s;
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); FILE* f = fopen(fname.c_str(), "w");
if (fd < 0) { if (f == NULL) {
*result = NULL; *result = NULL;
s = IOError(fname, errno); s = IOError(fname, errno);
} else { } else {
*result = new PosixMmapFile(fname, fd, page_size_); *result = new PosixWritableFile(fname, f);
} }
return s; return s;
} }
@ -631,7 +511,6 @@ class PosixEnv : public Env {
return NULL; return NULL;
} }
size_t page_size_;
pthread_mutex_t mu_; pthread_mutex_t mu_;
pthread_cond_t bgsignal_; pthread_cond_t bgsignal_;
pthread_t bgthread_; pthread_t bgthread_;
@ -646,8 +525,7 @@ class PosixEnv : public Env {
MmapLimiter mmap_limit_; MmapLimiter mmap_limit_;
}; };
PosixEnv::PosixEnv() : page_size_(getpagesize()), PosixEnv::PosixEnv() : started_bgthread_(false) {
started_bgthread_(false) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL)); PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
} }

View File

@ -38,7 +38,7 @@ int RunAllTests() {
int num = 0; int num = 0;
if (tests != NULL) { if (tests != NULL) {
for (int i = 0; i < tests->size(); i++) { for (size_t i = 0; i < tests->size(); i++) {
const Test& t = (*tests)[i]; const Test& t = (*tests)[i];
if (matcher != NULL) { if (matcher != NULL) {
std::string name = t.base; std::string name = t.base;

View File

@ -32,7 +32,7 @@ std::string RandomKey(Random* rnd, int len) {
extern Slice CompressibleString(Random* rnd, double compressed_fraction, extern Slice CompressibleString(Random* rnd, double compressed_fraction,
int len, std::string* dst) { size_t len, std::string* dst) {
int raw = static_cast<int>(len * compressed_fraction); int raw = static_cast<int>(len * compressed_fraction);
if (raw < 1) raw = 1; if (raw < 1) raw = 1;
std::string raw_data; std::string raw_data;

View File

@ -24,7 +24,7 @@ extern std::string RandomKey(Random* rnd, int len);
// "N*compressed_fraction" bytes and return a Slice that references // "N*compressed_fraction" bytes and return a Slice that references
// the generated data. // the generated data.
extern Slice CompressibleString(Random* rnd, double compressed_fraction, extern Slice CompressibleString(Random* rnd, double compressed_fraction,
int len, std::string* dst); size_t len, std::string* dst);
// A wrapper that allows injection of errors. // A wrapper that allows injection of errors.
class ErrorEnv : public EnvWrapper { class ErrorEnv : public EnvWrapper {