diff --git a/configure.ac b/configure.ac index 7c0a9f4a9..c5d08a028 100644 --- a/configure.ac +++ b/configure.ac @@ -590,17 +590,15 @@ fi if test x$use_boost = xyes; then -BOOST_LIBS="$BOOST_LDFLAGS $BOOST_SYSTEM_LIB $BOOST_FILESYSTEM_LIB $BOOST_PROGRAM_OPTIONS_LIB $BOOST_THREAD_LIB" +BOOST_LIBS="$BOOST_LDFLAGS $BOOST_SYSTEM_LIB $BOOST_FILESYSTEM_LIB $BOOST_PROGRAM_OPTIONS_LIB $BOOST_THREAD_LIB $BOOST_CHRONO_LIB" dnl Boost >= 1.50 uses sleep_for rather than the now-deprecated sleep, however dnl it was broken from 1.50 to 1.52 when backed by nanosleep. Use sleep_for if dnl a working version is available, else fall back to sleep. sleep was removed dnl after 1.56. dnl If neither is available, abort. -dnl If sleep_for is used, boost_chrono becomes a requirement. -if test x$ax_cv_boost_chrono = xyes; then TEMP_LIBS="$LIBS" -LIBS="$BOOST_LIBS $BOOST_CHRONO_LIB $LIBS" +LIBS="$BOOST_LIBS $LIBS" TEMP_CPPFLAGS="$CPPFLAGS" CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS" AC_LINK_IFELSE([AC_LANG_PROGRAM([[ @@ -613,12 +611,11 @@ AC_LINK_IFELSE([AC_LANG_PROGRAM([[ choke me #endif ]])], - [boost_sleep=yes; BOOST_LIBS="$BOOST_LIBS $BOOST_CHRONO_LIB"; + [boost_sleep=yes; AC_DEFINE(HAVE_WORKING_BOOST_SLEEP_FOR, 1, [Define this symbol if boost sleep_for works])], [boost_sleep=no]) LIBS="$TEMP_LIBS" CPPFLAGS="$TEMP_CPPFLAGS" -fi if test x$boost_sleep != xyes; then TEMP_LIBS="$LIBS" diff --git a/src/Makefile.am b/src/Makefile.am index fc8050a4a..1621b2dcd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -116,6 +116,7 @@ BITCOIN_CORE_H = \ rpcclient.h \ rpcprotocol.h \ rpcserver.h \ + scheduler.h \ script/interpreter.h \ script/script_error.h \ script/script.h \ @@ -259,6 +260,7 @@ libbitcoin_common_a_SOURCES = \ netbase.cpp \ protocol.cpp \ pubkey.cpp \ + scheduler.cpp \ script/interpreter.cpp \ script/script.cpp \ script/sign.cpp \ diff --git a/src/Makefile.test.include b/src/Makefile.test.include index 65112f6f5..099714811 100644 --- a/src/Makefile.test.include +++ b/src/Makefile.test.include @@ -61,6 +61,7 @@ BITCOIN_TESTS =\ test/pow_tests.cpp \ test/rpc_tests.cpp \ test/sanity_tests.cpp \ + test/scheduler_tests.cpp \ test/script_P2SH_tests.cpp \ test/script_tests.cpp \ test/scriptnum_tests.cpp \ diff --git a/src/bitcoind.cpp b/src/bitcoind.cpp index eeca8655c..cce687ac9 100644 --- a/src/bitcoind.cpp +++ b/src/bitcoind.cpp @@ -8,6 +8,7 @@ #include "init.h" #include "main.h" #include "noui.h" +#include "scheduler.h" #include "util.h" #include @@ -55,6 +56,7 @@ void WaitForShutdown(boost::thread_group* threadGroup) bool AppInit(int argc, char* argv[]) { boost::thread_group threadGroup; + CScheduler scheduler; bool fRet = false; @@ -142,7 +144,7 @@ bool AppInit(int argc, char* argv[]) #endif SoftSetBoolArg("-server", true); - fRet = AppInit2(threadGroup); + fRet = AppInit2(threadGroup, scheduler); } catch (const std::exception& e) { PrintExceptionContinue(&e, "AppInit()"); diff --git a/src/init.cpp b/src/init.cpp index 6ef0ce426..bacc93837 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -19,6 +19,7 @@ #include "net.h" #include "rpcserver.h" #include "script/standard.h" +#include "scheduler.h" #include "txdb.h" #include "ui_interface.h" #include "util.h" @@ -564,7 +565,7 @@ bool InitSanityCheck(void) /** Initialize bitcoin. * @pre Parameters should be parsed and config file should be read. */ -bool AppInit2(boost::thread_group& threadGroup) +bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler) { // ********************************************************* Step 1: setup #ifdef _MSC_VER @@ -890,6 +891,10 @@ bool AppInit2(boost::thread_group& threadGroup) threadGroup.create_thread(&ThreadScriptCheck); } + // Start the lightweight task scheduler thread + CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler); + threadGroup.create_thread(boost::bind(&TraceThread, "scheduler", serviceLoop)); + /* Start the RPC server already. It will be started in "warmup" mode * and not really process calls already (but it will signify connections * that the server is there and will be ready later). Warmup mode will @@ -1373,7 +1378,7 @@ bool AppInit2(boost::thread_group& threadGroup) LogPrintf("mapAddressBook.size() = %u\n", pwalletMain ? pwalletMain->mapAddressBook.size() : 0); #endif - StartNode(threadGroup); + StartNode(threadGroup, scheduler); #ifdef ENABLE_WALLET // Generate coins in the background diff --git a/src/init.h b/src/init.h index d451f65be..dcb2b2936 100644 --- a/src/init.h +++ b/src/init.h @@ -8,6 +8,7 @@ #include +class CScheduler; class CWallet; namespace boost @@ -20,7 +21,7 @@ extern CWallet* pwalletMain; void StartShutdown(); bool ShutdownRequested(); void Shutdown(); -bool AppInit2(boost::thread_group& threadGroup); +bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler); /** The help message mode determines what help message to show */ enum HelpMessageMode { diff --git a/src/net.cpp b/src/net.cpp index 2de04fc57..6849d7926 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -13,6 +13,7 @@ #include "chainparams.h" #include "clientversion.h" #include "primitives/transaction.h" +#include "scheduler.h" #include "ui_interface.h" #include "crypto/common.h" @@ -1590,7 +1591,7 @@ void static Discover(boost::thread_group& threadGroup) #endif } -void StartNode(boost::thread_group& threadGroup) +void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler) { uiInterface.InitMessage(_("Loading addresses...")); // Load addresses for peers.dat @@ -1640,7 +1641,7 @@ void StartNode(boost::thread_group& threadGroup) threadGroup.create_thread(boost::bind(&TraceThread, "msghand", &ThreadMessageHandler)); // Dump network addresses - threadGroup.create_thread(boost::bind(&LoopForever, "dumpaddr", &DumpAddresses, DUMP_ADDRESSES_INTERVAL * 1000)); + scheduler.scheduleEvery(&DumpAddresses, DUMP_ADDRESSES_INTERVAL); } bool StopNode() diff --git a/src/net.h b/src/net.h index 7c61a2be6..17502b97e 100644 --- a/src/net.h +++ b/src/net.h @@ -32,6 +32,7 @@ class CAddrMan; class CBlockIndex; +class CScheduler; class CNode; namespace boost { @@ -72,7 +73,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu void MapPort(bool fUseUPnP); unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); -void StartNode(boost::thread_group& threadGroup); +void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler); bool StopNode(); void SocketSendData(CNode *pnode); diff --git a/src/qt/bitcoin.cpp b/src/qt/bitcoin.cpp index 018169cfd..8740b98b7 100644 --- a/src/qt/bitcoin.cpp +++ b/src/qt/bitcoin.cpp @@ -26,6 +26,7 @@ #include "init.h" #include "main.h" #include "rpcserver.h" +#include "scheduler.h" #include "ui_interface.h" #include "util.h" @@ -178,6 +179,7 @@ signals: private: boost::thread_group threadGroup; + CScheduler scheduler; /// Pass fatal exception message to UI thread void handleRunawayException(const std::exception *e); @@ -258,7 +260,7 @@ void BitcoinCore::initialize() try { qDebug() << __func__ << ": Running AppInit2 in thread"; - int rv = AppInit2(threadGroup); + int rv = AppInit2(threadGroup, scheduler); if(rv) { /* Start a dummy RPC thread if no RPC thread is active yet diff --git a/src/scheduler.cpp b/src/scheduler.cpp new file mode 100644 index 000000000..8b55888ae --- /dev/null +++ b/src/scheduler.cpp @@ -0,0 +1,98 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "scheduler.h" + +#include +#include +#include + +CScheduler::CScheduler() : nThreadsServicingQueue(0) +{ +} + +CScheduler::~CScheduler() +{ + assert(nThreadsServicingQueue == 0); +} + + +#if BOOST_VERSION < 105000 +static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t) +{ + return boost::posix_time::from_time_t(boost::chrono::system_clock::to_time_t(t)); +} +#endif + +void CScheduler::serviceQueue() +{ + boost::unique_lock lock(newTaskMutex); + ++nThreadsServicingQueue; + + // newTaskMutex is locked throughout this loop EXCEPT + // when the thread is waiting or when the user's function + // is called. + while (1) { + try { + while (taskQueue.empty()) { + // Wait until there is something to do. + newTaskScheduled.wait(lock); + } +// Wait until either there is a new task, or until +// the time of the first item on the queue: + +// wait_until needs boost 1.50 or later; older versions have timed_wait: +#if BOOST_VERSION < 105000 + while (!taskQueue.empty() && newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) { + // Keep waiting until timeout + } +#else + while (!taskQueue.empty() && newTaskScheduled.wait_until(lock, taskQueue.begin()->first) != boost::cv_status::timeout) { + // Keep waiting until timeout + } +#endif + // If there are multiple threads, the queue can empty while we're waiting (another + // thread may service the task we were waiting on). + if (taskQueue.empty()) + continue; + + Function f = taskQueue.begin()->second; + taskQueue.erase(taskQueue.begin()); + + // Unlock before calling f, so it can reschedule itself or another task + // without deadlocking: + lock.unlock(); + f(); + lock.lock(); + } catch (...) { + --nThreadsServicingQueue; + throw; + } + } +} + +void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t) +{ + { + boost::unique_lock lock(newTaskMutex); + taskQueue.insert(std::make_pair(t, f)); + } + newTaskScheduled.notify_one(); +} + +void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaSeconds) +{ + schedule(f, boost::chrono::system_clock::now() + boost::chrono::seconds(deltaSeconds)); +} + +static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaSeconds) +{ + f(); + s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaSeconds), deltaSeconds); +} + +void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaSeconds) +{ + scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds); +} diff --git a/src/scheduler.h b/src/scheduler.h new file mode 100644 index 000000000..bb383ab9f --- /dev/null +++ b/src/scheduler.h @@ -0,0 +1,70 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_SCHEDULER_H +#define BITCOIN_SCHEDULER_H + +// +// NOTE: +// boost::thread / boost::function / boost::chrono should be ported to +// std::thread / std::function / std::chrono when we support C++11. +// +#include +#include +#include +#include + +// +// Simple class for background tasks that should be run +// periodically or once "after a while" +// +// Usage: +// +// CScheduler* s = new CScheduler(); +// s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { } +// s->scheduleFromNow(boost::bind(Class::func, this, argument), 3); +// boost::thread* t = new boost::thread(boost::bind(CScheduler::serviceQueue, s)); +// +// ... then at program shutdown, clean up the thread running serviceQueue: +// t->interrupt(); +// t->join(); +// delete t; +// delete s; // Must be done after thread is interrupted/joined. +// + +class CScheduler +{ +public: + CScheduler(); + ~CScheduler(); + + typedef boost::function Function; + + // Call func at/after time t + void schedule(Function f, boost::chrono::system_clock::time_point t); + + // Convenience method: call f once deltaSeconds from now + void scheduleFromNow(Function f, int64_t deltaSeconds); + + // Another convenience method: call f approximately + // every deltaSeconds forever, starting deltaSeconds from now. + // To be more precise: every time f is finished, it + // is rescheduled to run deltaSeconds later. If you + // need more accurate scheduling, don't use this method. + void scheduleEvery(Function f, int64_t deltaSeconds); + + // To keep things as simple as possible, there is no unschedule. + + // Services the queue 'forever'. Should be run in a thread, + // and interrupted using boost::interrupt_thread + void serviceQueue(); + +private: + std::multimap taskQueue; + boost::condition_variable newTaskScheduled; + boost::mutex newTaskMutex; + int nThreadsServicingQueue; +}; + +#endif diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp new file mode 100644 index 000000000..a26d0afae --- /dev/null +++ b/src/test/scheduler_tests.cpp @@ -0,0 +1,110 @@ +// Copyright (c) 2012-2013 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "random.h" +#include "scheduler.h" + +#include "test/test_bitcoin.h" + +#include +#include +#include +#include +#include + +BOOST_AUTO_TEST_SUITE(scheduler_tests) + +static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, boost::chrono::system_clock::time_point rescheduleTime) +{ + { + boost::unique_lock lock(mutex); + counter += delta; + } + boost::chrono::system_clock::time_point noTime = boost::chrono::system_clock::time_point::min(); + if (rescheduleTime != noTime) { + CScheduler::Function f = boost::bind(µTask, boost::ref(s), boost::ref(mutex), boost::ref(counter), -delta + 1, noTime); + s.schedule(f, rescheduleTime); + } +} + +static void MicroSleep(uint64_t n) +{ +#if defined(HAVE_WORKING_BOOST_SLEEP_FOR) + boost::this_thread::sleep_for(boost::chrono::microseconds(n)); +#elif defined(HAVE_WORKING_BOOST_SLEEP) + boost::this_thread::sleep(boost::posix_time::microseconds(n)); +#else + //should never get here + #error missing boost sleep implementation +#endif +} + +BOOST_AUTO_TEST_CASE(manythreads) +{ + // Stress test: hundreds of microsecond-scheduled tasks, + // serviced by 10 threads. + // + // So... ten shared counters, which if all the tasks execute + // properly will sum to the number of tasks done. + // Each task adds or subtracts from one of the counters a + // random amount, and then schedules another task 0-1000 + // microseconds in the future to subtract or add from + // the counter -random_amount+1, so in the end the shared + // counters should sum to the number of initial tasks performed. + CScheduler microTasks; + + boost::thread_group microThreads; + for (int i = 0; i < 5; i++) + microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, µTasks)); + + boost::mutex counterMutex[10]; + int counter[10] = { 0 }; + boost::random::mt19937 rng(insecure_rand()); + boost::random::uniform_int_distribution<> zeroToNine(0, 9); + boost::random::uniform_int_distribution<> randomMsec(-11, 1000); + boost::random::uniform_int_distribution<> randomDelta(-1000, 1000); + + boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now(); + boost::chrono::system_clock::time_point now = start; + + for (int i = 0; i < 100; i++) { + boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); + boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng)); + int whichCounter = zeroToNine(rng); + CScheduler::Function f = boost::bind(µTask, boost::ref(microTasks), + boost::ref(counterMutex[whichCounter]), boost::ref(counter[whichCounter]), + randomDelta(rng), tReschedule); + microTasks.schedule(f, t); + } + + MicroSleep(600); + now = boost::chrono::system_clock::now(); + // More threads and more tasks: + for (int i = 0; i < 5; i++) + microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, µTasks)); + for (int i = 0; i < 100; i++) { + boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); + boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng)); + int whichCounter = zeroToNine(rng); + CScheduler::Function f = boost::bind(µTask, boost::ref(microTasks), + boost::ref(counterMutex[whichCounter]), boost::ref(counter[whichCounter]), + randomDelta(rng), tReschedule); + microTasks.schedule(f, t); + } + + // All 2,000 tasks should be finished within 2 milliseconds. Sleep a bit longer. + MicroSleep(2100); + + microThreads.interrupt_all(); + microThreads.join_all(); + + int counterSum = 0; + for (int i = 0; i < 10; i++) { + BOOST_CHECK(counter[i] != 0); + counterSum += counter[i]; + } + BOOST_CHECK_EQUAL(counterSum, 200); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/src/util.h b/src/util.h index 483d9d785..4cc0faf4d 100644 --- a/src/util.h +++ b/src/util.h @@ -202,43 +202,6 @@ std::string HelpMessageOpt(const std::string& option, const std::string& message void SetThreadPriority(int nPriority); void RenameThread(const char* name); -/** - * Standard wrapper for do-something-forever thread functions. - * "Forever" really means until the thread is interrupted. - * Use it like: - * new boost::thread(boost::bind(&LoopForever, "dumpaddr", &DumpAddresses, 900000)); - * or maybe: - * boost::function f = boost::bind(&FunctionWithArg, argument); - * threadGroup.create_thread(boost::bind(&LoopForever >, "nothing", f, milliseconds)); - */ -template void LoopForever(const char* name, Callable func, int64_t msecs) -{ - std::string s = strprintf("bitcoin-%s", name); - RenameThread(s.c_str()); - LogPrintf("%s thread start\n", name); - try - { - while (1) - { - MilliSleep(msecs); - func(); - } - } - catch (const boost::thread_interrupted&) - { - LogPrintf("%s thread stop\n", name); - throw; - } - catch (const std::exception& e) { - PrintExceptionContinue(&e, name); - throw; - } - catch (...) { - PrintExceptionContinue(NULL, name); - throw; - } -} - /** * .. and a wrapper that just calls func once */