diff --git a/.travis.yml b/.travis.yml index 0537d69a4..3b919b32c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,11 +33,11 @@ matrix: - compiler: ": Win32" env: HOST=i686-w64-mingw32 PACKAGES="nsis gcc-mingw-w64-i686 g++-mingw-w64-i686 binutils-mingw-w64-i686 mingw-w64-dev wine bc" RUN_TESTS=true GOAL="deploy" BITCOIN_CONFIG="--enable-gui --enable-reduce-exports" MAKEJOBS="-j2" - compiler: ": 32-bit + dash" - env: HOST=i686-pc-linux-gnu PACKAGES="g++-multilib bc" RUN_TESTS=true GOAL="install" BITCOIN_CONFIG="--enable-glibc-back-compat --enable-reduce-exports LDFLAGS=-static-libstdc++" USE_SHELL="/bin/dash" + env: HOST=i686-pc-linux-gnu PACKAGES="g++-multilib bc python-zmq" PPA="ppa:chris-lea/zeromq" RUN_TESTS=true GOAL="install" BITCOIN_CONFIG="--enable-zmq --enable-glibc-back-compat --enable-reduce-exports LDFLAGS=-static-libstdc++" USE_SHELL="/bin/dash" - compiler: ": Win64" env: HOST=x86_64-w64-mingw32 PACKAGES="nsis gcc-mingw-w64-x86-64 g++-mingw-w64-x86-64 binutils-mingw-w64-x86-64 mingw-w64-dev wine bc" RUN_TESTS=true GOAL="deploy" BITCOIN_CONFIG="--enable-gui --enable-reduce-exports" MAKEJOBS="-j2" - compiler: ": bitcoind" - env: HOST=x86_64-unknown-linux-gnu PACKAGES="bc" DEP_OPTS="NO_QT=1 NO_UPNP=1 DEBUG=1" RUN_TESTS=true GOAL="install" BITCOIN_CONFIG="--enable-glibc-back-compat --enable-reduce-exports CPPFLAGS=-DDEBUG_LOCKORDER" + env: HOST=x86_64-unknown-linux-gnu PACKAGES="bc python-zmq" PPA="ppa:chris-lea/zeromq" DEP_OPTS="NO_QT=1 NO_UPNP=1 DEBUG=1" RUN_TESTS=true GOAL="install" BITCOIN_CONFIG="--enable-zmq --enable-glibc-back-compat --enable-reduce-exports CPPFLAGS=-DDEBUG_LOCKORDER" - compiler: ": No wallet" env: HOST=x86_64-unknown-linux-gnu DEP_OPTS="NO_WALLET=1" RUN_TESTS=true GOAL="install" BITCOIN_CONFIG="--enable-glibc-back-compat --enable-reduce-exports" - compiler: ": Cross-Mac" @@ -45,6 +45,7 @@ matrix: exclude: - compiler: gcc install: + - if [ -n "$PACKAGES" ]; then sudo rm -f /etc/apt/sources.list.d/travis_ci_zeromq3-source.list; fi - if [ -n "$PACKAGES" ]; then travis_retry sudo apt-get update; fi - if [ -n "$PACKAGES" ]; then travis_retry sudo apt-get install --no-install-recommends --no-upgrade -qq $PACKAGES; fi before_script: diff --git a/configure.ac b/configure.ac index ea0d39b6c..c1032d167 100644 --- a/configure.ac +++ b/configure.ac @@ -156,9 +156,15 @@ AC_ARG_ENABLE([glibc-back-compat], [use_glibc_compat=$enableval], [use_glibc_compat=no]) +AC_ARG_ENABLE([zmq], + [AS_HELP_STRING([--disable-zmq], + [disable ZMQ notifications])], + [use_zmq=$enableval], + [use_zmq=yes]) + AC_ARG_WITH([protoc-bindir],[AS_HELP_STRING([--with-protoc-bindir=BIN_DIR],[specify protoc bin path])], [protoc_bin_path=$withval], []) -# Enable debug +# Enable debug AC_ARG_ENABLE([debug], [AS_HELP_STRING([--enable-debug], [use debug compiler flags and macros (default is no)])], @@ -169,11 +175,11 @@ if test "x$enable_debug" = xyes; then if test "x$GCC" = xyes; then CFLAGS="-g3 -O0 -DDEBUG" fi - + if test "x$GXX" = xyes; then CXXFLAGS="-g3 -O0 -DDEBUG" fi -fi +fi ## TODO: Remove these hard-coded paths and flags. They are here for the sake of ## compatibility with the legacy buildsystem. @@ -693,6 +699,16 @@ if test x$use_pkgconfig = xyes; then if test x$use_qr != xno; then BITCOIN_QT_CHECK([PKG_CHECK_MODULES([QR], [libqrencode], [have_qrencode=yes], [have_qrencode=no])]) fi + + if test "x$use_zmq" = "xyes"; then + PKG_CHECK_MODULES([ZMQ],[libzmq >= 4], + [AC_DEFINE([ENABLE_ZMQ],[1],[Define to 1 to enable ZMQ functions])], + [AC_DEFINE([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions]) + AC_MSG_WARN([libzmq version 4.x or greater not found, disabling]) + use_zmq=no]) + else + AC_DEFINE_UNQUOTED([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions]) + fi ] ) else @@ -705,6 +721,29 @@ else AC_CHECK_HEADER([openssl/ssl.h],, AC_MSG_ERROR(libssl headers missing),) AC_CHECK_LIB([ssl], [main],SSL_LIBS=-lssl, AC_MSG_ERROR(libssl missing)) + if test "x$use_zmq" = "xyes"; then + AC_CHECK_HEADER([zmq.h], + [AC_DEFINE([ENABLE_ZMQ],[1],[Define to 1 to enable ZMQ functions])], + [AC_MSG_WARN([zmq.h not found, disabling zmq support]) + use_zmq=no + AC_DEFINE([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions])]) + AC_CHECK_LIB([zmq],[zmq_ctx_shutdown],ZMQ_LIBS=-lzmq, + [AC_MSG_WARN([libzmq >= 4.0 not found, disabling zmq support]) + use_zmq=no + AC_DEFINE([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions])]) + else + AC_DEFINE_UNQUOTED([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions]) + fi + + if test "x$use_zmq" = "xyes"; then + dnl Assume libzmq was built for static linking + case $host in + *mingw*) + ZMQ_CFLAGS="$ZMQ_CFLAGS -DZMQ_STATIC" + ;; + esac + fi + BITCOIN_QT_CHECK(AC_CHECK_LIB([protobuf] ,[main],[PROTOBUF_LIBS=-lprotobuf], BITCOIN_QT_FAIL(libprotobuf not found))) if test x$use_qr != xno; then BITCOIN_QT_CHECK([AC_CHECK_LIB([qrencode], [main],[QR_LIBS=-lqrencode], [have_qrencode=no])]) @@ -873,6 +912,8 @@ if test x$bitcoin_enable_qt != xno; then fi fi +AM_CONDITIONAL([ENABLE_ZMQ], [test "x$use_zmq" = "xyes"]) + AC_MSG_CHECKING([whether to build test_bitcoin]) if test x$use_tests = xyes; then AC_MSG_RESULT([yes]) diff --git a/contrib/debian/copyright b/contrib/debian/copyright index 04daeb024..fc94e2399 100644 --- a/contrib/debian/copyright +++ b/contrib/debian/copyright @@ -100,6 +100,19 @@ Files: depends/sources/miniupnpc-*.tar.gz Copyright: 2005-2016 Thomas BERNARD License: BSD-3clause +Files: depends/sources/zeromq-*.tar.gz +Copyright: + 1994, 1995, 1996, 1999, 2000, 2001, 2002, 2004, 2005, 2006, 2007 Free Software Foundation, Inc. + 2007-2014 iMatix Corporation + 2009-2011 250bpm s.r.o. + 2010-2011 Miru Limited + 2011 VMware, Inc. + 2012 Spotify AB + 2013 Ericsson AB + 2014 AppDynamics Inc. + 2015-2016 Brocade Communications Systems Inc. +License: LGPL-with-ZeroMQ-exception + Files: depends/sources/google*.tar.gz Copyright: 2008 Google Inc. License: BSD-3clause-Google @@ -1140,3 +1153,32 @@ Comment: License: PUB-DOM This work is in the public domain. + +License: LGPL-with-ZeroMQ-exception + GNU LESSER GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + . + On Debian systems the GNU Lesser General Public License (LGPL) is + located in '/usr/share/common-licenses/LGPL'. + . + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + -------------------------------------------------------------------------------- + SPECIAL EXCEPTION GRANTED BY COPYRIGHT HOLDERS + . + As a special exception, copyright holders give you permission to link this + library with independent modules to produce an executable, regardless of + the license terms of these independent modules, and to copy and distribute + the resulting executable under terms of your choice, provided that you also + meet, for each linked independent module, the terms and conditions of + the license of that module. An independent module is a module which is not + derived from or based on this library. If you modify this library, you must + extend this exception to your version of the library. + + Note: this exception relieves you of any obligations under sections 4 and 5 + of this license, and section 6 of the GNU General Public License. +Comment: + You should have received a copy of the GNU General Public License + along with this program. If not, see . diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py new file mode 100755 index 000000000..3dea5e3c1 --- /dev/null +++ b/contrib/zmq/zmq_sub.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python2 +# Copyright (c) 2014-2016 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import array +import binascii +import zmq +import struct + +port = 28332 + +zmqContext = zmq.Context() +zmqSubSocket = zmqContext.socket(zmq.SUB) +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx") +zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) + +try: + while True: + msg = zmqSubSocket.recv_multipart() + topic = str(msg[0]) + body = msg[1] + sequence = "Unknown"; + if len(msg[-1]) == 4: + msgSequence = struct.unpack(' #include @@ -50,6 +49,10 @@ #include "libsnark/common/profiling.hpp" +#if ENABLE_ZMQ +#include "zmq/zmqnotificationinterface.h" +#endif + using namespace std; extern void ThreadSendAlert(); @@ -61,6 +64,10 @@ CWallet* pwalletMain = NULL; #endif bool fFeeEstimatesInitialized = false; +#if ENABLE_ZMQ +static CZMQNotificationInterface* pzmqNotificationInterface = NULL; +#endif + #ifdef WIN32 // Win32 LevelDB doesn't use file descriptors, and the ones used for // accessing block files don't count towards the fd_set size limit @@ -199,6 +206,15 @@ void Shutdown() if (pwalletMain) pwalletMain->Flush(true); #endif + +#if ENABLE_ZMQ + if (pzmqNotificationInterface) { + UnregisterValidationInterface(pzmqNotificationInterface); + delete pzmqNotificationInterface; + pzmqNotificationInterface = NULL; + } +#endif + #ifndef WIN32 try { boost::filesystem::remove(GetPidFile()); @@ -367,6 +383,14 @@ std::string HelpMessage(HelpMessageMode mode) #endif +#if ENABLE_ZMQ + strUsage += HelpMessageGroup(_("ZeroMQ notification options:")); + strUsage += HelpMessageOpt("-zmqpubhashblock=
", _("Enable publish hash block in
")); + strUsage += HelpMessageOpt("-zmqpubhashtx=
", _("Enable publish hash transaction in
")); + strUsage += HelpMessageOpt("-zmqpubrawblock=
", _("Enable publish raw block in
")); + strUsage += HelpMessageOpt("-zmqpubrawtx=
", _("Enable publish raw transaction in
")); +#endif + strUsage += HelpMessageGroup(_("Debugging/Testing options:")); if (showDebug) { @@ -380,7 +404,7 @@ std::string HelpMessage(HelpMessageMode mode) strUsage += HelpMessageOpt("-stopafterblockimport", strprintf("Stop running after importing blocks from disk (default: %u)", 0)); } string debugCategories = "addrman, alert, bench, coindb, db, estimatefee, lock, mempool, net, partitioncheck, pow, proxy, prune, " - "rand, reindex, rpc, selectcoins, zrpc, zrpcunsafe (implies zrpc)"; // Don't translate these and qt below + "rand, reindex, rpc, selectcoins, zmq, zrpc, zrpcunsafe (implies zrpc)"; // Don't translate these and qt below if (mode == HMM_BITCOIN_QT) debugCategories += ", qt"; strUsage += HelpMessageOpt("-debug=", strprintf(_("Output debugging information (default: %u, supplying is optional)"), 0) + ". " + @@ -1143,6 +1167,14 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler) BOOST_FOREACH(string strDest, mapMultiArgs["-seednode"]) AddOneShot(strDest); +#if ENABLE_ZMQ + pzmqNotificationInterface = CZMQNotificationInterface::CreateWithArguments(mapArgs); + + if (pzmqNotificationInterface) { + RegisterValidationInterface(pzmqNotificationInterface); + } +#endif + // ********************************************************* Step 7: load block chain fReindex = GetBoolArg("-reindex", false); diff --git a/src/main.cpp b/src/main.cpp index d5c9198f7..74344696d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -2707,6 +2707,7 @@ bool ActivateBestChain(CValidationState &state, CBlock *pblock) { pnode->PushInventory(CInv(MSG_BLOCK, hashNewTip)); } // Notify external listeners about the new tip. + GetMainSignals().UpdatedBlockTip(pindexNewTip); uiInterface.NotifyBlockTip(hashNewTip); } } while(pindexMostWork != chainActive.Tip()); diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index 3df6cd3f2..cd3e30f3d 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -13,6 +13,7 @@ CMainSignals& GetMainSignals() } void RegisterValidationInterface(CValidationInterface* pwalletIn) { + g_signals.UpdatedBlockTip.connect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1)); g_signals.SyncTransaction.connect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2)); g_signals.EraseTransaction.connect(boost::bind(&CValidationInterface::EraseFromWallet, pwalletIn, _1)); g_signals.UpdatedTransaction.connect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1)); @@ -32,6 +33,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) { g_signals.UpdatedTransaction.disconnect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1)); g_signals.EraseTransaction.disconnect(boost::bind(&CValidationInterface::EraseFromWallet, pwalletIn, _1)); g_signals.SyncTransaction.disconnect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2)); + g_signals.UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1)); } void UnregisterAllValidationInterfaces() { @@ -43,6 +45,7 @@ void UnregisterAllValidationInterfaces() { g_signals.UpdatedTransaction.disconnect_all_slots(); g_signals.EraseTransaction.disconnect_all_slots(); g_signals.SyncTransaction.disconnect_all_slots(); + g_signals.UpdatedBlockTip.disconnect_all_slots(); } void SyncWithWallets(const CTransaction &tx, const CBlock *pblock) { diff --git a/src/validationinterface.h b/src/validationinterface.h index 144e716bb..1855dacd7 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -31,6 +31,7 @@ void SyncWithWallets(const CTransaction& tx, const CBlock* pblock = NULL); class CValidationInterface { protected: + virtual void UpdatedBlockTip(const CBlockIndex *pindex) {} virtual void SyncTransaction(const CTransaction &tx, const CBlock *pblock) {} virtual void EraseFromWallet(const uint256 &hash) {} virtual void ChainTip(const CBlockIndex *pindex, const CBlock *pblock, ZCIncrementalMerkleTree tree, bool added) {} @@ -45,6 +46,8 @@ protected: }; struct CMainSignals { + /** Notifies listeners of updated block chain tip */ + boost::signals2::signal UpdatedBlockTip; /** Notifies listeners of updated transaction data (transaction, and optionally the block it is found in. */ boost::signals2::signal SyncTransaction; /** Notifies listeners of an erased transaction (currently disabled, requires transaction replacement). */ diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp new file mode 100644 index 000000000..9f5cb3ba6 --- /dev/null +++ b/src/zmq/zmqabstractnotifier.cpp @@ -0,0 +1,22 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqabstractnotifier.h" +#include "util.h" + + +CZMQAbstractNotifier::~CZMQAbstractNotifier() +{ + assert(!psocket); +} + +bool CZMQAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/) +{ + return true; +} diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h new file mode 100644 index 000000000..77cf5141e --- /dev/null +++ b/src/zmq/zmqabstractnotifier.h @@ -0,0 +1,44 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H +#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H + +#include "zmqconfig.h" + +class CBlockIndex; +class CZMQAbstractNotifier; + +typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)(); + +class CZMQAbstractNotifier +{ +public: + CZMQAbstractNotifier() : psocket(0) { } + virtual ~CZMQAbstractNotifier(); + + template + static CZMQAbstractNotifier* Create() + { + return new T(); + } + + std::string GetType() const { return type; } + void SetType(const std::string &t) { type = t; } + std::string GetAddress() const { return address; } + void SetAddress(const std::string &a) { address = a; } + + virtual bool Initialize(void *pcontext) = 0; + virtual void Shutdown() = 0; + + virtual bool NotifyBlock(const CBlockIndex *pindex); + virtual bool NotifyTransaction(const CTransaction &transaction); + +protected: + void *psocket; + std::string type; + std::string address; +}; + +#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H diff --git a/src/zmq/zmqconfig.h b/src/zmq/zmqconfig.h new file mode 100644 index 000000000..6057f5d1a --- /dev/null +++ b/src/zmq/zmqconfig.h @@ -0,0 +1,24 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQCONFIG_H +#define BITCOIN_ZMQ_ZMQCONFIG_H + +#if defined(HAVE_CONFIG_H) +#include "config/bitcoin-config.h" +#endif + +#include +#include + +#if ENABLE_ZMQ +#include +#endif + +#include "primitives/block.h" +#include "primitives/transaction.h" + +void zmqError(const char *str); + +#endif // BITCOIN_ZMQ_ZMQCONFIG_H diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp new file mode 100644 index 000000000..4df7550d4 --- /dev/null +++ b/src/zmq/zmqnotificationinterface.cpp @@ -0,0 +1,159 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqnotificationinterface.h" +#include "zmqpublishnotifier.h" + +#include "version.h" +#include "main.h" +#include "streams.h" +#include "util.h" + +void zmqError(const char *str) +{ + LogPrint("zmq", "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno)); +} + +CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL) +{ +} + +CZMQNotificationInterface::~CZMQNotificationInterface() +{ + Shutdown(); + + for (std::list::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) + { + delete *i; + } +} + +CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map &args) +{ + CZMQNotificationInterface* notificationInterface = NULL; + std::map factories; + std::list notifiers; + + factories["pubhashblock"] = CZMQAbstractNotifier::Create; + factories["pubhashtx"] = CZMQAbstractNotifier::Create; + factories["pubrawblock"] = CZMQAbstractNotifier::Create; + factories["pubrawtx"] = CZMQAbstractNotifier::Create; + + for (std::map::const_iterator i=factories.begin(); i!=factories.end(); ++i) + { + std::map::const_iterator j = args.find("-zmq" + i->first); + if (j!=args.end()) + { + CZMQNotifierFactory factory = i->second; + std::string address = j->second; + CZMQAbstractNotifier *notifier = factory(); + notifier->SetType(i->first); + notifier->SetAddress(address); + notifiers.push_back(notifier); + } + } + + if (!notifiers.empty()) + { + notificationInterface = new CZMQNotificationInterface(); + notificationInterface->notifiers = notifiers; + + if (!notificationInterface->Initialize()) + { + delete notificationInterface; + notificationInterface = NULL; + } + } + + return notificationInterface; +} + +// Called at startup to conditionally set up ZMQ socket(s) +bool CZMQNotificationInterface::Initialize() +{ + LogPrint("zmq", "zmq: Initialize notification interface\n"); + assert(!pcontext); + + pcontext = zmq_init(1); + + if (!pcontext) + { + zmqError("Unable to initialize context"); + return false; + } + + std::list::iterator i=notifiers.begin(); + for (; i!=notifiers.end(); ++i) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->Initialize(pcontext)) + { + LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + } + else + { + LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + break; + } + } + + if (i!=notifiers.end()) + { + return false; + } + + return true; +} + +// Called during shutdown sequence +void CZMQNotificationInterface::Shutdown() +{ + LogPrint("zmq", "zmq: Shutdown notification interface\n"); + if (pcontext) + { + for (std::list::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) + { + CZMQAbstractNotifier *notifier = *i; + LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); + notifier->Shutdown(); + } + zmq_ctx_destroy(pcontext); + + pcontext = 0; + } +} + +void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindex) +{ + for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyBlock(pindex)) + { + i++; + } + else + { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} + +void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock) +{ + for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyTransaction(tx)) + { + i++; + } + else + { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h new file mode 100644 index 000000000..3ccfaf341 --- /dev/null +++ b/src/zmq/zmqnotificationinterface.h @@ -0,0 +1,37 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H +#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H + +#include "validationinterface.h" +#include +#include + +class CBlockIndex; +class CZMQAbstractNotifier; + +class CZMQNotificationInterface : public CValidationInterface +{ +public: + virtual ~CZMQNotificationInterface(); + + static CZMQNotificationInterface* CreateWithArguments(const std::map &args); + +protected: + bool Initialize(); + void Shutdown(); + + // CValidationInterface + void SyncTransaction(const CTransaction &tx, const CBlock *pblock); + void UpdatedBlockTip(const CBlockIndex *pindex); + +private: + CZMQNotificationInterface(); + + void *pcontext; + std::list notifiers; +}; + +#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp new file mode 100644 index 000000000..75a2523e7 --- /dev/null +++ b/src/zmq/zmqpublishnotifier.cpp @@ -0,0 +1,189 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqpublishnotifier.h" +#include "main.h" +#include "util.h" + +static std::multimap mapPublishNotifiers; + +static const char *MSG_HASHBLOCK = "hashblock"; +static const char *MSG_HASHTX = "hashtx"; +static const char *MSG_RAWBLOCK = "rawblock"; +static const char *MSG_RAWTX = "rawtx"; + +// Internal function to send multipart message +static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) +{ + va_list args; + va_start(args, size); + + while (1) + { + zmq_msg_t msg; + + int rc = zmq_msg_init_size(&msg, size); + if (rc != 0) + { + zmqError("Unable to initialize ZMQ msg"); + return -1; + } + + void *buf = zmq_msg_data(&msg); + memcpy(buf, data, size); + + data = va_arg(args, const void*); + + rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); + if (rc == -1) + { + zmqError("Unable to send ZMQ msg"); + zmq_msg_close(&msg); + return -1; + } + + zmq_msg_close(&msg); + + if (!data) + break; + + size = va_arg(args, size_t); + } + return 0; +} + +bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) +{ + assert(!psocket); + + // check if address is being used by other publish notifier + std::multimap::iterator i = mapPublishNotifiers.find(address); + + if (i==mapPublishNotifiers.end()) + { + psocket = zmq_socket(pcontext, ZMQ_PUB); + if (!psocket) + { + zmqError("Failed to create socket"); + return false; + } + + int rc = zmq_bind(psocket, address.c_str()); + if (rc!=0) + { + zmqError("Failed to bind address"); + zmq_close(psocket); + return false; + } + + // register this notifier for the address, so it can be reused for other publish notifier + mapPublishNotifiers.insert(std::make_pair(address, this)); + return true; + } + else + { + LogPrint("zmq", "zmq: Reusing socket for address %s\n", address); + + psocket = i->second->psocket; + mapPublishNotifiers.insert(std::make_pair(address, this)); + + return true; + } +} + +void CZMQAbstractPublishNotifier::Shutdown() +{ + assert(psocket); + + int count = mapPublishNotifiers.count(address); + + // remove this notifier from the list of publishers using this address + typedef std::multimap::iterator iterator; + std::pair iterpair = mapPublishNotifiers.equal_range(address); + + for (iterator it = iterpair.first; it != iterpair.second; ++it) + { + if (it->second==this) + { + mapPublishNotifiers.erase(it); + break; + } + } + + if (count == 1) + { + LogPrint("zmq", "Close socket at address %s\n", address); + int linger = 0; + zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); + zmq_close(psocket); + } + + psocket = 0; +} + +bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size) +{ + assert(psocket); + + /* send three parts, command & data & a LE 4byte sequence number */ + unsigned char msgseq[sizeof(uint32_t)]; + WriteLE32(&msgseq[0], nSequence); + int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0); + if (rc == -1) + return false; + + /* increment memory only sequence number after sending */ + nSequence++; + + return true; +} + +bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + return SendMessage(MSG_HASHBLOCK, data, 32); +} + +bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "zmq: Publish hashtx %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + return SendMessage(MSG_HASHTX, data, 32); +} + +bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) +{ + LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex()); + + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + { + LOCK(cs_main); + CBlock block; + if(!ReadBlockFromDisk(block, pindex)) + { + zmqError("Can't read block from disk"); + return false; + } + + ss << block; + } + + return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); +} + +bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex()); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + ss << transaction; + return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); +} diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h new file mode 100644 index 000000000..22f02a3d0 --- /dev/null +++ b/src/zmq/zmqpublishnotifier.h @@ -0,0 +1,55 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H +#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H + +#include "zmqabstractnotifier.h" + +class CBlockIndex; + +class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier +{ +private: + uint32_t nSequence; //! upcounting per message sequence number + +public: + + /* send zmq multipart message + parts: + * command + * data + * message sequence number + */ + bool SendMessage(const char *command, const void* data, size_t size); + + bool Initialize(void *pcontext); + void Shutdown(); +}; + +class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlock(const CBlockIndex *pindex); +}; + +class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyTransaction(const CTransaction &transaction); +}; + +class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlock(const CBlockIndex *pindex); +}; + +class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyTransaction(const CTransaction &transaction); +}; + +#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H