diff --git a/configure.ac b/configure.ac index 70ba71601..a9086b9fb 100644 --- a/configure.ac +++ b/configure.ac @@ -106,6 +106,12 @@ AC_ARG_ENABLE([upnp-default], [use_upnp_default=$enableval], [use_upnp_default=no]) +AC_ARG_ENABLE([proton], + [AS_HELP_STRING([--disable-proton], + [disable Proton (AMQP messaging)])], + [use_proton=$enableval], + [use_proton=yes]) + AC_ARG_ENABLE(tests, AS_HELP_STRING([--enable-tests],[compile tests (default is yes)]), [use_tests=$enableval], @@ -534,6 +540,23 @@ if test x$enable_wallet != xno; then BITCOIN_FIND_BDB62 fi +dnl Check Qpid Proton headers and library exist +if test x$use_proton = xyes; then + AC_CHECK_HEADERS([proton/connection.hpp], + [], + [AC_MSG_WARN([Proton headers not found, disabling Proton support]) + use_proton=no]) + AC_CHECK_LIB([qpid-proton-cpp], [main], + [PROTON_LIBS="-lqpid-proton-cpp -lqpid-proton"], + [AC_MSG_WARN([Proton libraries not found, disabling Proton support]) + use_proton=no]) +fi +if test x$use_proton = xyes; then + AC_DEFINE(ENABLE_PROTON, 1, [Define to 1 to enable Proton functions]) +else + AC_DEFINE(ENABLE_PROTON, 0, [Define to 1 to enable Proton functions]) +fi + dnl Check for libminiupnpc (optional) if test x$use_upnp != xno; then AC_CHECK_HEADERS( @@ -883,6 +906,8 @@ fi AM_CONDITIONAL([ENABLE_ZMQ], [test "x$use_zmq" = "xyes"]) +AM_CONDITIONAL([ENABLE_PROTON], [test "x$use_proton" = "xyes"]) + AC_MSG_CHECKING([whether to build test_bitcoin]) if test x$use_tests = xyes; then AC_MSG_RESULT([yes]) @@ -941,6 +966,7 @@ AC_SUBST(GMP_LIBS) AC_SUBST(GMPXX_LIBS) AC_SUBST(LIBSNARK_LIBS) AC_SUBST(LIBZCASH_LIBS) +AC_SUBST(PROTON_LIBS) AC_CONFIG_FILES([Makefile src/Makefile doc/man/Makefile share/setup.nsi src/test/buildenv.py]) AC_CONFIG_FILES([qa/pull-tester/run-bitcoind-for-test.sh],[chmod +x qa/pull-tester/run-bitcoind-for-test.sh]) AC_CONFIG_FILES([qa/pull-tester/tests-config.sh],[chmod +x qa/pull-tester/tests-config.sh]) diff --git a/contrib/amqp/amqp_sub.py b/contrib/amqp/amqp_sub.py new file mode 100644 index 000000000..bc51e8428 --- /dev/null +++ b/contrib/amqp/amqp_sub.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python2 +# Copyright (c) 2017 The Zcash developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +# Requirements: +# pip install python-qpid-proton + +import binascii +from proton.handlers import MessagingHandler +from proton.reactor import Container + +port = 5672 + +class Server(MessagingHandler): + def __init__(self, url): + super(Server, self).__init__() + self.url = url + self.senders = {} + + def on_start(self, event): + print "Listening on:", self.url + self.container = event.container + self.acceptor = event.container.listen(self.url) + + def on_message(self, event): + m = event.message + topic = m.subject + body = m.body + sequence = str( m.properties['x-opt-sequence-number'] ) + if topic == "hashablock": + print '- HASH BLOCK ('+sequence+') -' + print binascii.hexlify(body) + elif topic == "hashtx": + print '- HASH TX ('+sequence+') -' + print binascii.hexlify(body) + elif topic == "rawblock": + print '- RAW BLOCK HEADER ('+sequence+') -' + print binascii.hexlify(body[:80]) + elif topic == "rawtx": + print '- RAW TX ('+sequence+') -' + print binascii.hexlify(body) + +try: + Container(Server("127.0.0.1:%i" % port)).run() +except KeyboardInterrupt: + pass + diff --git a/contrib/debian/copyright b/contrib/debian/copyright index aea954e07..f1316160a 100644 --- a/contrib/debian/copyright +++ b/contrib/debian/copyright @@ -59,6 +59,10 @@ Files: depends/sources/google*.tar.gz Copyright: 2008 Google Inc. License: BSD-3clause-Google +Files: depends/sources/qpid-proton-*.tar.gz +Copyright: 2012-2017 The Apache Software Foundation +License: Apache-Qpid-Proton-with-BSD-Subcomponents + License: Boost-Software-License-1.0 Permission is hereby granted, free of charge, to any person or organization obtaining a copy of the software and accompanying documentation covered by @@ -1091,3 +1095,220 @@ License: LGPL-with-ZeroMQ-exception Comment: You should have received a copy of the GNU General Public License along with this program. If not, see . + +License: Apache-Qpid-Proton-with-BSD-Subcomponents + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + . + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + . + 1. Definitions. + . + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + . + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + . + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + . + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + . + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + . + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + . + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + . + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + . + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + . + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + . + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + . + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + . + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + . + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + . + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + . + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + . + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + . + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + . + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + . + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + . + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + . + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + . + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + . + END OF TERMS AND CONDITIONS + . + APPENDIX: How to apply the Apache License to your work. + . + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + . + Copyright [yyyy] [name of copyright owner] + . + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + . + http://www.apache.org/licenses/LICENSE-2.0 + . + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + . + . + PROTON SUBCOMPONENTS: + . + Proton includes freegetopt with a separate BSD license. Your use + of the source code for freegetopt is subject to the terms and + conditions of its license in examples/include/pncompat/internal/LICENSE. + . + The setup scripts for the python bindings include files derived by + PyZMQ and are licensed with a separate Modified BSD license. Use of + the source code in these setup files are subject to the terms and + conditions in the license: + proton-c/bindings/python/setuputils/PYZMQ_LICENSE.BSD. + diff --git a/depends/packages/packages.mk b/depends/packages/packages.mk index 9d025a6b2..33f45aea3 100644 --- a/depends/packages/packages.mk +++ b/depends/packages/packages.mk @@ -1,6 +1,6 @@ rust_packages := rust librustzcash zcash_packages := libsnark libgmp libsodium -packages := boost openssl libevent zeromq $(zcash_packages) googletest googlemock +packages := boost openssl libevent zeromq $(zcash_packages) googletest googlemock proton native_packages := native_ccache wallet_packages=bdb diff --git a/depends/packages/proton.mk b/depends/packages/proton.mk new file mode 100644 index 000000000..fa1779ead --- /dev/null +++ b/depends/packages/proton.mk @@ -0,0 +1,30 @@ +package=proton +$(package)_version=0.17.0 +$(package)_download_path=http://apache.cs.utah.edu/qpid/proton/$($(package)_version) +$(package)_file_name=qpid-proton-$($(package)_version).tar.gz +$(package)_sha256_hash=6ffd26d3d0e495bfdb5d9fefc5349954e6105ea18cc4bb191161d27742c5a01a +$(package)_dependencies= + +define $(package)_preprocess_cmds + sed -i.old 's/qpid-proton SHARED/qpid-proton STATIC/' proton-c/CMakeLists.txt && \ + sed -i.old 's/SASL/_DO_NOT_BUILD_SASL_/' proton-c/CMakeLists.txt && \ + sed -i.old 's/qpid-proton-core SHARED/qpid-proton-core STATIC/' proton-c/CMakeLists.txt && \ + sed -i.old 's/find_package(OpenSSL)/#find_package(OpenSSL)/' proton-c/CMakeLists.txt && \ + sed -i.old 's/qpid-proton-cpp SHARED/qpid-proton-cpp STATIC/' proton-c/bindings/cpp/CMakeLists.txt && \ + sed -i.old 's/DEFAULT_GO ON/DEFAULT_GO OFF/' proton-c/bindings/CMakeLists.txt && \ + mkdir build +endef + +define $(package)_config_cmds + cd build; cmake .. -DCMAKE_CXX_STANDARD=11 -DCMAKE_INSTALL_PREFIX=/ -DSYSINSTALL_BINDINGS=ON -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_PYTHON=OFF -DBUILD_PHP=OFF -DBUILD_JAVA=OFF -DBUILD_PERL=OFF -DBUILD_RUBY=OFF -DDEFAULT_GO=OFF +endef + +define $(package)_build_cmds + cd build; $(MAKE) VERBOSE=1 all && \ + cp CMakeCache.txt /tmp/CMakeCache.txt +endef + +define $(package)_stage_cmds + cd build; $(MAKE) VERBOSE=1 DESTDIR=$($(package)_staging_prefix_dir) install +endef + diff --git a/doc/amqp.md b/doc/amqp.md new file mode 100644 index 000000000..f733fa514 --- /dev/null +++ b/doc/amqp.md @@ -0,0 +1,123 @@ +# Block and Transaction Broadcasting With AMQP 1.0 (Experimental Feature) + +[AMQP](https://www.amqp.org/) is an enterprise-level message queuing +protocol for the reliable passing of real-time data and business +transactions between applications. AMQP supports both broker and +brokerless messaging. AMQP 1.0 is an open standard and has been +ratified as ISO/IEC 19464. + +The Zcash daemon can be configured to act as a trusted "border +router", implementing the Zcash P2P protocol and relay, making +consensus decisions, maintaining the local blockchain database, +broadcasting locally generated transactions into the network, and +providing a queryable RPC interface to interact on a polled basis for +requesting blockchain related data. However, there exists only a +limited service to notify external software of events like the arrival +of new blocks or transactions. + +The AMQP facility implements a notification interface through a set +of specific notifiers. Currently there are notifiers that publish +blocks and transactions. This read-only facility requires only the +connection of a corresponding AMQP subscriber port in receiving +software. + +Currently the facility is not authenticated nor is there any two-way +protocol involvement. Therefore, subscribers should validate the +received data since it may be out of date, incomplete or even invalid. + +Because AMQP is message oriented, subscribers receive transactions +and blocks all-at-once and do not need to implement any sort of +buffering or reassembly. + +## Prerequisites + +The AMQP feature in Zcash requires [Qpid Proton](https://qpid.apache.org/proton/) +version 0.17 or newer, which you will need to install if you are not +using the depends system. Typically, it is packaged by distributions as +something like *libqpid-proton*. The C++ wrapper for AMQP *is* required. + +In order to run the example Python client scripts in contrib/ one must +also install *python-qpid-proton*, though this is not necessary for +daemon operation. + +## Enabling + +By default, the AMQP feature is automatically compiled in if the +necessary prerequisites are found. To disable, use --disable-proton +during the *configure* step of building zcashd: + + $ ./configure --disable-proton (other options) + +To actually enable operation, one must set the appropriate options on +the commandline or in the configuration file. + +## Usage + +AMQP support is currently an experimental feature, so you must pass +the option: + + -experimentalfeatures + +Currently, the following notifications are supported: + + -amqppubhashtx=address + -amqppubhashblock=address + -amqppubrawblock=address + -amqppubrawtx=address + +The address must be a valid AMQP address, where the same address can be +used in more than notification. Note that SSL and SASL addresses are +not currently supported. + +Launch zcashd like this: + + $ zcashd -amqppubhashtx=amqp://127.0.0.1:5672 + +Or this: + + $ zcashd -amqppubhashtx=amqp://127.0.0.1:5672 \ + -amqppubrawtx=amqp://127.0.0.1:5672 \ + -amqppubrawblock=amqp://127.0.0.1:5672 \ + -amqppubhashblock=amqp://127.0.0.1:5672 \ + -debug=amqp + +The debug category `amqp` enables AMQP-related logging. + +Each notification has a topic and body, where the header corresponds +to the notification type. For instance, for the notification `-amqpubhashtx` +the topic is `hashtx` (no null terminator) and the body is the hexadecimal +transaction hash (32 bytes). This transaction hash and the block hash +found in `hashblock` are in RPC byte order. + +These options can also be provided in zcash.conf. + +Please see `contrib/amqp/amqp_sub.py` for a working example of an +AMQP server listening for messages. + +## Remarks + +From the perspective of zcashd, the local end of an AMQP link is write-only. + +No information is broadcast that wasn't already received from the public +P2P network. + +No authentication or authorization is done on peers that zcashd connects +to; it is assumed that the AMQP link is exposed only to trusted entities, +using other means such as firewalling. + +TLS support may be added once OpenSSL has been removed from the Zcash +project and alternative TLS implementations have been evaluated. + +SASL support may be added in a future update for secure communication. + +Note that when the block chain tip changes, a reorganisation may occur +and just the tip will be notified. It is up to the subscriber to +retrieve the chain from the last known block to the new tip. + +At present, zcashd does not try to resend a notification if there was +a problem confirming receipt. Support for delivery guarantees such as +*at-least-once* and *exactly-once* will be added in in a future update. + +Currently, zcashd appends an up-counting sequence number to each notification +which allows listeners to detect lost notifications. + diff --git a/qa/pull-tester/rpc-tests.sh b/qa/pull-tester/rpc-tests.sh index 08ff3fe7a..f90bf9760 100755 --- a/qa/pull-tester/rpc-tests.sh +++ b/qa/pull-tester/rpc-tests.sh @@ -65,6 +65,10 @@ if [ "x$ENABLE_ZMQ" = "x1" ]; then testScripts+=('zmq_test.py') fi +if [ "x$ENABLE_PROTON" = "x1" ]; then + testScripts+=('proton_test.py') +fi + extArg="-extended" passOn=${@#$extArg} diff --git a/qa/pull-tester/tests-config.sh.in b/qa/pull-tester/tests-config.sh.in index 1cb9ee06b..cc76e8ad8 100755 --- a/qa/pull-tester/tests-config.sh.in +++ b/qa/pull-tester/tests-config.sh.in @@ -11,6 +11,7 @@ EXEEXT="@EXEEXT@" @BUILD_BITCOIN_UTILS_TRUE@ENABLE_UTILS=1 @BUILD_BITCOIND_TRUE@ENABLE_BITCOIND=1 @ENABLE_ZMQ_TRUE@ENABLE_ZMQ=1 +@ENABLE_PROTON_TRUE@ENABLE_PROTON=1 REAL_BITCOIND="$BUILDDIR/src/zcashd${EXEEXT}" REAL_BITCOINCLI="$BUILDDIR/src/zcash-cli${EXEEXT}" diff --git a/qa/rpc-tests/proton_test.py b/qa/rpc-tests/proton_test.py new file mode 100755 index 000000000..b895c087f --- /dev/null +++ b/qa/rpc-tests/proton_test.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python2 +# Copyright (c) 2017 The Zcash developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +# +# Test Proton interface (provides AMQP 1.0 messaging support). +# +# Requirements: +# Python library for Qpid Proton: +# https://pypi.python.org/pypi/python-qpid-proton +# To install: +# pip install python-qpid-proton +# + +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import * +from proton.handlers import MessagingHandler +from proton.reactor import Container +import binascii +import struct +import threading + +try: + import http.client as httplib +except ImportError: + import httplib +try: + import urllib.parse as urlparse +except ImportError: + import urlparse + + +class Server(MessagingHandler): + + def __init__(self, url, limit): + super(Server, self).__init__() + self.url = url + self.counter = limit + self.blockhashes = [] + self.txids = [] + self.blockseq = -1 + self.txidseq = -1 + + def on_start(self, event): + print "Proton listening on:", self.url + self.container = event.container + self.acceptor = event.container.listen(self.url) + + def on_message(self, event): + m = event.message + hash = bytes_to_hex_str(m.body) + sequence = m.properties['x-opt-sequence-number'] + if m.subject == "hashtx": + self.txids.append(hash) + + # Test that sequence id is incrementing + assert(sequence == 1 + self.txidseq) + self.txidseq = sequence + elif m.subject == "hashblock": + self.blockhashes.append(hash) + + # Test that sequence id is incrementing + assert(sequence == 1 + self.blockseq) + self.blockseq = sequence + + self.counter = self.counter - 1 + if self.counter == 0: + self.container.stop() + + +class ProtonTest (BitcoinTestFramework): + + port = 25672 + numblocks = 10 # must be even, as two nodes generate equal number + assert(numblocks % 2 == 0) + + def setup_nodes(self): + + # Launch proton server in background thread + # It terminates after receiving numblocks * 2 messages (one for coinbase, one for block) + self.server = Server("127.0.0.1:%i" % self.port, self.numblocks * 2) + self.container = Container(self.server) + self.t1 = threading.Thread(target=self.container.run) + self.t1.start() + + return start_nodes(4, self.options.tmpdir, extra_args=[ + ['-experimentalfeatures', '-debug=amqp', '-amqppubhashtx=amqp://127.0.0.1:'+str(self.port), + '-amqppubhashblock=amqp://127.0.0.1:'+str(self.port)], + [], + [], + [] + ]) + + def run_test(self): + self.sync_all() + baseheight = self.nodes[0].getblockcount() # 200 blocks already mined + + # generate some blocks + self.nodes[0].generate(self.numblocks/2) + self.sync_all() + self.nodes[1].generate(self.numblocks/2) + self.sync_all() + + # wait for server to finish + self.t1.join() + + # sequence numbers have already been checked in the server's message handler + + # sanity check that we have the right number of block hashes and coinbase txids + assert_equal(len(self.server.blockhashes), self.numblocks) + assert_equal(len(self.server.txids), self.numblocks) + + # verify that each block has the correct coinbase txid + for i in xrange(0, self.numblocks): + height = baseheight + i + 1 + blockhash = self.nodes[0].getblockhash(height) + assert_equal(blockhash, self.server.blockhashes[i]) + resp = self.nodes[0].getblock(blockhash) + coinbase = resp["tx"][0] + assert_equal(coinbase, self.server.txids[i]) + + +if __name__ == '__main__': + ProtonTest().main() diff --git a/src/Makefile.am b/src/Makefile.am index 33c1fba2d..e2ce181b1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -55,6 +55,9 @@ endif if ENABLE_ZMQ EXTRA_LIBRARIES += libbitcoin_zmq.a endif +if ENABLE_PROTON +EXTRA_LIBRARIES += libbitcoin_proton.a +endif if BUILD_BITCOIN_LIBS lib_LTLIBRARIES = libzcashconsensus.la @@ -92,6 +95,11 @@ BITCOIN_CORE_H = \ addrman.h \ alert.h \ amount.h \ + amqp/amqpabstractnotifier.h \ + amqp/amqpconfig.h \ + amqp/amqpnotificationinterface.h \ + amqp/amqppublishnotifier.h \ + amqp/amqpsender.h \ arith_uint256.h \ asyncrpcoperation.h \ asyncrpcqueue.h \ @@ -243,6 +251,15 @@ libbitcoin_zmq_a_SOURCES = \ zmq/zmqpublishnotifier.cpp endif +if ENABLE_PROTON +LIBBITCOIN_PROTON=libbitcoin_proton.a + +libbitcoin_proton_a_CPPFLAGS = $(BITCOIN_INCLUDES) +libbitcoin_proton_a_SOURCES = \ + amqp/amqpabstractnotifier.cpp \ + amqp/amqpnotificationinterface.cpp \ + amqp/amqppublishnotifier.cpp +endif # wallet: zcashd, but only linked when wallet enabled libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES) @@ -401,6 +418,10 @@ zcashd_LDADD += \ $(LIBBITCOIN_CRYPTO) \ $(LIBZCASH_LIBS) +if ENABLE_PROTON +zcashd_LDADD += $(LIBBITCOIN_PROTON) $(PROTON_LIBS) +endif + # bitcoin-cli binary # zcash_cli_SOURCES = bitcoin-cli.cpp zcash_cli_CPPFLAGS = $(BITCOIN_INCLUDES) $(EVENT_CFLAGS) diff --git a/src/Makefile.gtest.include b/src/Makefile.gtest.include index 3752e0d48..471ecd008 100644 --- a/src/Makefile.gtest.include +++ b/src/Makefile.gtest.include @@ -53,6 +53,11 @@ endif zcash_gtest_LDADD += $(LIBZCASH_CONSENSUS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) $(LIBZCASH) $(LIBZCASH_LIBS) +if ENABLE_PROTON +zcash_gtest_LDADD += $(LIBBITCOIN_PROTON) $(PROTON_LIBS) +endif + + zcash_gtest_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static zcash-gtest_check: zcash-gtest FORCE diff --git a/src/Makefile.test.include b/src/Makefile.test.include index 7de8c392f..10cecd59b 100644 --- a/src/Makefile.test.include +++ b/src/Makefile.test.include @@ -112,6 +112,10 @@ if ENABLE_ZMQ test_test_bitcoin_LDADD += $(ZMQ_LIBS) endif +if ENABLE_PROTON +test_test_bitcoin_LDADD += $(PROTON_LIBS) +endif + nodist_test_test_bitcoin_SOURCES = $(GENERATED_TEST_FILES) $(BITCOIN_TESTS): $(GENERATED_TEST_FILES) diff --git a/src/amqp/amqpabstractnotifier.cpp b/src/amqp/amqpabstractnotifier.cpp new file mode 100644 index 000000000..57686ef1d --- /dev/null +++ b/src/amqp/amqpabstractnotifier.cpp @@ -0,0 +1,21 @@ +// Copyright (c) 2017 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "amqpabstractnotifier.h" +#include "util.h" + + +AMQPAbstractNotifier::~AMQPAbstractNotifier() +{ +} + +bool AMQPAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool AMQPAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/) +{ + return true; +} diff --git a/src/amqp/amqpabstractnotifier.h b/src/amqp/amqpabstractnotifier.h new file mode 100644 index 000000000..c993a2b3e --- /dev/null +++ b/src/amqp/amqpabstractnotifier.h @@ -0,0 +1,43 @@ +// Copyright (c) 2017 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef ZCASH_AMQP_AMQPABSTRACTNOTIFIER_H +#define ZCASH_AMQP_AMQPABSTRACTNOTIFIER_H + +#include "amqpconfig.h" + +class CBlockIndex; +class AMQPAbstractNotifier; + +typedef AMQPAbstractNotifier* (*AMQPNotifierFactory)(); + +class AMQPAbstractNotifier +{ +public: + AMQPAbstractNotifier() { } + virtual ~AMQPAbstractNotifier(); + + template + static AMQPAbstractNotifier* Create() + { + return new T(); + } + + std::string GetType() const { return type; } + void SetType(const std::string &t) { type = t; } + std::string GetAddress() const { return address; } + void SetAddress(const std::string &a) { address = a; } + + virtual bool Initialize() = 0; + virtual void Shutdown() = 0; + + virtual bool NotifyBlock(const CBlockIndex *pindex); + virtual bool NotifyTransaction(const CTransaction &transaction); + +protected: + std::string type; + std::string address; +}; + +#endif // ZCASH_AMQP_AMQPABSTRACTNOTIFIER_H diff --git a/src/amqp/amqpconfig.h b/src/amqp/amqpconfig.h new file mode 100644 index 000000000..dcc5f7709 --- /dev/null +++ b/src/amqp/amqpconfig.h @@ -0,0 +1,33 @@ +// Copyright (c) 2017 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef ZCASH_AMQP_AMQPCONFIG_H +#define ZCASH_AMQP_AMQPCONFIG_H + +#if defined(HAVE_CONFIG_H) +#include "config/bitcoin-config.h" +#endif + +#include +#include + +#if ENABLE_PROTON +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +#include "primitives/block.h" +#include "primitives/transaction.h" + +#endif // ZCASH_AMQP_AMQPCONFIG_H diff --git a/src/amqp/amqpnotificationinterface.cpp b/src/amqp/amqpnotificationinterface.cpp new file mode 100644 index 000000000..66f5398ca --- /dev/null +++ b/src/amqp/amqpnotificationinterface.cpp @@ -0,0 +1,136 @@ +// Copyright (c) 2017 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "amqpnotificationinterface.h" +#include "amqppublishnotifier.h" + +#include "version.h" +#include "main.h" +#include "streams.h" +#include "util.h" + +// AMQP 1.0 Support +// +// The boost::signals2 signals and slot system is thread safe, so CValidationInterface listeners +// can be invoked from any thread. +// +// Currently signals are fired from main.cpp so the callbacks should be invoked on the same thread. +// It should be safe to share objects responsible for sending, as they should not be run concurrently +// across different threads. +// +// Developers should be mindful of where notifications are fired to avoid potential race conditions. +// For example, different signals targeting the same address could be fired from different threads +// in different parts of the system around the same time. +// +// Like the ZMQ notification interface, if a notifier fails to send a message, the notifier is shut down. +// + +AMQPNotificationInterface::AMQPNotificationInterface() +{ +} + +AMQPNotificationInterface::~AMQPNotificationInterface() +{ + Shutdown(); + + for (std::list::iterator i = notifiers.begin(); i != notifiers.end(); ++i) { + delete *i; + } +} + +AMQPNotificationInterface* AMQPNotificationInterface::CreateWithArguments(const std::map &args) +{ + AMQPNotificationInterface* notificationInterface = nullptr; + std::map factories; + std::list notifiers; + + factories["pubhashblock"] = AMQPAbstractNotifier::Create; + factories["pubhashtx"] = AMQPAbstractNotifier::Create; + factories["pubrawblock"] = AMQPAbstractNotifier::Create; + factories["pubrawtx"] = AMQPAbstractNotifier::Create; + + for (std::map::const_iterator i=factories.begin(); i!=factories.end(); ++i) { + std::map::const_iterator j = args.find("-amqp" + i->first); + if (j!=args.end()) { + AMQPNotifierFactory factory = i->second; + std::string address = j->second; + AMQPAbstractNotifier *notifier = factory(); + notifier->SetType(i->first); + notifier->SetAddress(address); + notifiers.push_back(notifier); + } + } + + if (!notifiers.empty()) { + notificationInterface = new AMQPNotificationInterface(); + notificationInterface->notifiers = notifiers; + + if (!notificationInterface->Initialize()) { + delete notificationInterface; + notificationInterface = nullptr; + } + } + + return notificationInterface; +} + +// Called at startup to conditionally set up +bool AMQPNotificationInterface::Initialize() +{ + LogPrint("amqp", "amqp: Initialize notification interface\n"); + + std::list::iterator i = notifiers.begin(); + for (; i != notifiers.end(); ++i) { + AMQPAbstractNotifier *notifier = *i; + if (notifier->Initialize()) { + LogPrint("amqp", "amqp: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + } else { + LogPrint("amqp", "amqp: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + break; + } + } + + if (i != notifiers.end()) { + return false; + } + + return true; +} + +// Called during shutdown sequence +void AMQPNotificationInterface::Shutdown() +{ + LogPrint("amqp", "amqp: Shutdown notification interface\n"); + + for (std::list::iterator i = notifiers.begin(); i != notifiers.end(); ++i) { + AMQPAbstractNotifier *notifier = *i; + notifier->Shutdown(); + } +} + +void AMQPNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindex) +{ + for (std::list::iterator i = notifiers.begin(); i != notifiers.end(); ) { + AMQPAbstractNotifier *notifier = *i; + if (notifier->NotifyBlock(pindex)) { + i++; + } else { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} + +void AMQPNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock) +{ + for (std::list::iterator i = notifiers.begin(); i != notifiers.end(); ) { + AMQPAbstractNotifier *notifier = *i; + if (notifier->NotifyTransaction(tx)) { + i++; + } else { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} diff --git a/src/amqp/amqpnotificationinterface.h b/src/amqp/amqpnotificationinterface.h new file mode 100644 index 000000000..0c07ce235 --- /dev/null +++ b/src/amqp/amqpnotificationinterface.h @@ -0,0 +1,36 @@ +// Copyright (c) 2017 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef ZCASH_AMQP_AMQPNOTIFICATIONINTERFACE_H +#define ZCASH_AMQP_AMQPNOTIFICATIONINTERFACE_H + +#include "validationinterface.h" +#include +#include + +class CBlockIndex; +class AMQPAbstractNotifier; + +class AMQPNotificationInterface : public CValidationInterface +{ +public: + virtual ~AMQPNotificationInterface(); + + static AMQPNotificationInterface* CreateWithArguments(const std::map &args); + +protected: + bool Initialize(); + void Shutdown(); + + // CValidationInterface + void SyncTransaction(const CTransaction &tx, const CBlock *pblock); + void UpdatedBlockTip(const CBlockIndex *pindex); + +private: + AMQPNotificationInterface(); + + std::list notifiers; +}; + +#endif // ZCASH_AMQP_AMQPNOTIFICATIONINTERFACE_H diff --git a/src/amqp/amqppublishnotifier.cpp b/src/amqp/amqppublishnotifier.cpp new file mode 100644 index 000000000..589eb151f --- /dev/null +++ b/src/amqp/amqppublishnotifier.cpp @@ -0,0 +1,177 @@ +// Copyright (c) 2017 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "amqppublishnotifier.h" +#include "main.h" +#include "util.h" + +#include "amqpsender.h" + +#include +#include + +static std::multimap mapPublishNotifiers; + +static const char *MSG_HASHBLOCK = "hashblock"; +static const char *MSG_HASHTX = "hashtx"; +static const char *MSG_RAWBLOCK = "rawblock"; +static const char *MSG_RAWTX = "rawtx"; + +// Invoke this method from a new thread to run the proton container event loop. +void AMQPAbstractPublishNotifier::SpawnProtonContainer() +{ + try { + proton::default_container(*handler_).run(); + } + catch (const proton::error_condition &e) { + LogPrint("amqp", "amqp: container error: %s\n", e.what()); + } + catch (const std::runtime_error &e) { + LogPrint("amqp", "amqp: runtime error: %s\n", e.what()); + } + catch (const std::exception &e) { + LogPrint("amqp", "amqp: exception: %s\n", e.what()); + } + catch (...) { + LogPrint("amqp", "amqp: unknown error\n"); + } + handler_->terminate(); +} + +bool AMQPAbstractPublishNotifier::Initialize() +{ + std::multimap::iterator i = mapPublishNotifiers.find(address); + + if (i == mapPublishNotifiers.end()) { + try { + handler_ = std::make_shared(address); + thread_ = std::make_shared(&AMQPAbstractPublishNotifier::SpawnProtonContainer, this); + } + catch (std::exception &e) { + LogPrint("amqp", "amqp: initialization error: %s\n", e.what()); + return false; + } + mapPublishNotifiers.insert(std::make_pair(address, this)); + } else { + // copy the shared ptrs to the message handler and the thread where the proton container is running + handler_ = i->second->handler_; + thread_ = i->second->thread_; + mapPublishNotifiers.insert(std::make_pair(address, this)); + } + + return true; +} + + +void AMQPAbstractPublishNotifier::Shutdown() +{ + LogPrint("amqp", "amqp: Shutdown notifier %s at %s\n", GetType(), GetAddress()); + + int count = mapPublishNotifiers.count(address); + + // remove this notifier from the list of publishers using this address + typedef std::multimap::iterator iterator; + std::pair iterpair = mapPublishNotifiers.equal_range(address); + + for (iterator it = iterpair.first; it != iterpair.second; ++it) { + if (it->second == this) { + mapPublishNotifiers.erase(it); + break; + } + } + + // terminate the connection if this is the last publisher using this address + if (count == 1) { + handler_->terminate(); + if (thread_.get() != nullptr) { + if (thread_->joinable()) { + thread_->join(); + } + } + } +} + + +bool AMQPAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size) +{ + try { + proton::binary content; + const char *p = (const char *)data; + content.assign(p, p + size); + + proton::message message(content); + message.subject(std::string(command)); + proton::message::property_map & props = message.properties(); + props.put("x-opt-sequence-number", sequence_); + handler_->publish(message); + + } catch (proton::error_condition &e) { + LogPrint("amqp", "amqp: error : %s\n", e.what()); + return false; + } + catch (const std::runtime_error &e) { + LogPrint("amqp", "amqp: runtime error: %s\n", e.what()); + return false; + } + catch (const std::exception &e) { + LogPrint("amqp", "amqp: exception: %s\n", e.what()); + return false; + } + catch (...) { + LogPrint("amqp", "amqp: unknown error\n"); + return false; + } + + sequence_++; + + return true; +} + +bool AMQPPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint("amqp", "amqp: Publish hashblock %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + return SendMessage(MSG_HASHBLOCK, data, 32); +} + +bool AMQPPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("amqp", "amqp: Publish hashtx %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + return SendMessage(MSG_HASHTX, data, 32); +} + +bool AMQPPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) +{ + LogPrint("amqp", "amqp: Publish rawblock %s\n", pindex->GetBlockHash().GetHex()); + + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + { + LOCK(cs_main); + CBlock block; + if(!ReadBlockFromDisk(block, pindex)) { + LogPrint("amqp", "amqp: Can't read block from disk"); + return false; + } + + ss << block; + } + + return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); +} + +bool AMQPPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("amqp", "amqp: Publish rawtx %s\n", hash.GetHex()); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + ss << transaction; + return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); +} diff --git a/src/amqp/amqppublishnotifier.h b/src/amqp/amqppublishnotifier.h new file mode 100644 index 000000000..08b3aba08 --- /dev/null +++ b/src/amqp/amqppublishnotifier.h @@ -0,0 +1,56 @@ +// Copyright (c) 2017 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef ZCASH_AMQP_AMQPPUBLISHNOTIFIER_H +#define ZCASH_AMQP_AMQPPUBLISHNOTIFIER_H + +#include "amqpabstractnotifier.h" +#include "amqpconfig.h" +#include "amqpsender.h" + +#include +#include + +class CBlockIndex; + +class AMQPAbstractPublishNotifier : public AMQPAbstractNotifier +{ +private: + uint64_t sequence_; // memory only, per notifier instance: upcounting message sequence number + + std::shared_ptr thread_; // proton container thread, may be shared between notifiers + std::shared_ptr handler_; // proton container message handler, may be shared between notifiers + +public: + bool SendMessage(const char *command, const void* data, size_t size); + bool Initialize(); + void Shutdown(); + void SpawnProtonContainer(); +}; + +class AMQPPublishHashBlockNotifier : public AMQPAbstractPublishNotifier +{ +public: + bool NotifyBlock(const CBlockIndex *pindex); +}; + +class AMQPPublishHashTransactionNotifier : public AMQPAbstractPublishNotifier +{ +public: + bool NotifyTransaction(const CTransaction &transaction); +}; + +class AMQPPublishRawBlockNotifier : public AMQPAbstractPublishNotifier +{ +public: + bool NotifyBlock(const CBlockIndex *pindex); +}; + +class AMQPPublishRawTransactionNotifier : public AMQPAbstractPublishNotifier +{ +public: + bool NotifyTransaction(const CTransaction &transaction); +}; + +#endif // ZCASH_AMQP_AMQPPUBLISHNOTIFIER_H diff --git a/src/amqp/amqpsender.h b/src/amqp/amqpsender.h new file mode 100644 index 000000000..7fa85d89c --- /dev/null +++ b/src/amqp/amqpsender.h @@ -0,0 +1,115 @@ +// Copyright (c) 2017 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef ZCASH_AMQP_AMQPSENDER_H +#define ZCASH_AMQP_AMQPSENDER_H + +#include "amqpconfig.h" + +#include +#include +#include +#include + +class AMQPSender : public proton::messaging_handler { + private: + std::deque messages_; + proton::url url_; + proton::connection conn_; + proton::sender sender_; + std::mutex lock_; + std::atomic terminated_ = {false}; + + public: + + AMQPSender(const std::string& url) : url_(url) {} + + // Callback to initialize the container when run() is invoked + void on_container_start(proton::container& c) override { + proton::duration t(10000); // milliseconds + proton::connection_options opts = proton::connection_options().idle_timeout(t); + conn_ = c.connect(url_, opts); + sender_ = conn_.open_sender(url_.path()); + } + + // Remote end signals when the local end can send (i.e. has credit) + void on_sendable(proton::sender &s) override { + dispatch(); + } + + // Publish message by adding to queue and trying to dispatch it + void publish(const proton::message &m) { + add_message(m); + dispatch(); + } + + // Add message to queue + void add_message(const proton::message &m) { + std::lock_guard guard(lock_); + messages_.push_back(m); + } + + // Send messages in queue + void dispatch() { + std::lock_guard guard(lock_); + + if (isTerminated()) { + throw std::runtime_error("amqp connection was terminated"); + } + + if (!conn_.active()) { + throw std::runtime_error("amqp connection is not active"); + } + + while (messages_.size() > 0) { + if (sender_.credit()) { + const proton::message& m = messages_.front(); + sender_.send(m); + messages_.pop_front(); + } else { + break; + } + } + } + + // Close connection to remote end. Container event-loop, by default, will auto-stop. + void terminate() { + std::lock_guard guard(lock_); + conn_.close(); + terminated_.store(true); + } + + bool isTerminated() const { + return terminated_.load(); + } + + void on_transport_error(proton::transport &t) override { + t.connection().close(); + throw t.error(); + } + + void on_connection_error(proton::connection &c) override { + c.close(); + throw c.error(); + } + + void on_session_error(proton::session &s) override { + s.connection().close(); + throw s.error(); + } + + void on_receiver_error(proton::receiver &r) override { + r.connection().close(); + throw r.error(); + } + + void on_sender_error(proton::sender &s) override { + s.connection().close(); + throw s.error(); + } + +}; + + +#endif //ZCASH_AMQP_AMQPSENDER_H diff --git a/src/init.cpp b/src/init.cpp index 66f7005eb..f8a684bc3 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -58,6 +58,10 @@ #include "zmq/zmqnotificationinterface.h" #endif +#if ENABLE_PROTON +#include "amqp/amqpnotificationinterface.h" +#endif + using namespace std; extern void ThreadSendAlert(); @@ -73,6 +77,10 @@ bool fFeeEstimatesInitialized = false; static CZMQNotificationInterface* pzmqNotificationInterface = NULL; #endif +#if ENABLE_PROTON +static AMQPNotificationInterface* pAMQPNotificationInterface = NULL; +#endif + #ifdef WIN32 // Win32 LevelDB doesn't use file descriptors, and the ones used for // accessing block files don't count towards the fd_set size limit @@ -233,6 +241,14 @@ void Shutdown() } #endif +#if ENABLE_PROTON + if (pAMQPNotificationInterface) { + UnregisterValidationInterface(pAMQPNotificationInterface); + delete pAMQPNotificationInterface; + pAMQPNotificationInterface = NULL; + } +#endif + #ifndef WIN32 try { boost::filesystem::remove(GetPidFile()); @@ -408,6 +424,14 @@ std::string HelpMessage(HelpMessageMode mode) strUsage += HelpMessageOpt("-zmqpubrawtx=
", _("Enable publish raw transaction in
")); #endif +#if ENABLE_PROTON + strUsage += HelpMessageGroup(_("AMQP 1.0 notification options:")); + strUsage += HelpMessageOpt("-amqppubhashblock=
", _("Enable publish hash block in
")); + strUsage += HelpMessageOpt("-amqppubhashtx=
", _("Enable publish hash transaction in
")); + strUsage += HelpMessageOpt("-amqppubrawblock=
", _("Enable publish raw block in
")); + strUsage += HelpMessageOpt("-amqppubrawtx=
", _("Enable publish raw transaction in
")); +#endif + strUsage += HelpMessageGroup(_("Debugging/Testing options:")); if (showDebug) { @@ -1230,6 +1254,21 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler) } #endif +#if ENABLE_PROTON + pAMQPNotificationInterface = AMQPNotificationInterface::CreateWithArguments(mapArgs); + + if (pAMQPNotificationInterface) { + + // AMQP support is currently an experimental feature, so fail if user configured AMQP notifications + // without enabling experimental features. + if (!fExperimentalMode) { + return InitError(_("AMQP support requires -experimentalfeatures.")); + } + + RegisterValidationInterface(pAMQPNotificationInterface); + } +#endif + // ********************************************************* Step 7: load block chain fReindex = GetBoolArg("-reindex", false);