From 6f56df0c6ffdbc7ea2310c76515d895f78c8aa86 Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Thu, 30 Jul 2015 15:14:14 -0400 Subject: [PATCH 1/2] Added p2p message signal bindings. --- example/index.js | 6 ++- lib/daemon.js | 6 +++ src/bitcoindjs.cc | 120 ++++++++++++++++++++++++++++++++++++++++++++++ src/bitcoindjs.h | 1 + 4 files changed, 132 insertions(+), 1 deletion(-) diff --git a/example/index.js b/example/index.js index 7782b355..6d9d74a0 100755 --- a/example/index.js +++ b/example/index.js @@ -11,11 +11,15 @@ process.title = 'bitcoind.js'; /** * daemon */ - var daemon = require('../').daemon({ datadir: process.env.BITCOINDJS_DIR || '~/.bitcoin', }); +daemon.on('ready', function() { + console.log('ready'); + daemon.startTxMon(); +}); + daemon.on('error', function(err) { daemon.log('error="%s"', err.message); }); diff --git a/lib/daemon.js b/lib/daemon.js index 26b6dd19..c3dcfc84 100644 --- a/lib/daemon.js +++ b/lib/daemon.js @@ -272,6 +272,12 @@ Daemon.prototype.start = function(options, callback) { }, 1000); }; +Daemon.prototype.startTxMon = function() { + return bitcoindjs.startTxMon(function(command) { + console.log('command', command); + }); +}; + Daemon.prototype.getBlock = function(blockhash, callback) { if (daemon.stopping) return []; return bitcoindjs.getBlock(blockhash, function(err, block) { diff --git a/src/bitcoindjs.cc b/src/bitcoindjs.cc index 888835ee..e96e9ca7 100644 --- a/src/bitcoindjs.cc +++ b/src/bitcoindjs.cc @@ -29,6 +29,9 @@ extern int64_t nTimeBestReceived; * Node.js Internal Function Templates */ +static void +txmon(uv_async_t *handle); + static void async_tip_update(uv_work_t *req); @@ -71,6 +74,9 @@ async_get_tx(uv_work_t *req); static void async_get_tx_after(uv_work_t *req); +static bool +process_packets(CNode* pfrom); + extern "C" void init(Handle); @@ -78,6 +84,10 @@ init(Handle); * Private Global Variables * Used only by bitcoindjs functions. */ +static uv_mutex_t txmon_mutex; +static std::vector txmon_messages; +static uv_async_t txmon_async; +static Eternal txmon_callback; static volatile bool shutdown_complete = false; static char *g_data_dir = NULL; @@ -196,6 +206,115 @@ NAN_METHOD(VerifyScript) { static bool set_cooked(void); +NAN_METHOD(StartTxMon) { + // todo: if already running give an error + + Isolate* isolate = Isolate::GetCurrent(); + HandleScope scope(isolate); + + Local callback = Local::Cast(args[0]); + Eternal cb(isolate, callback); + txmon_callback = cb; + + CNodeSignals& nodeSignals = GetNodeSignals(); + nodeSignals.ProcessMessages.connect(&process_packets); + + uv_async_init(uv_default_loop(), &txmon_async, txmon); + + NanReturnValue(Undefined(isolate)); +}; + +static void +txmon(uv_async_t *handle) { + Isolate* isolate = Isolate::GetCurrent(); + HandleScope scope(isolate); + + uv_mutex_lock(&txmon_mutex); + + Local results = Array::New(isolate); + int arrayIndex = 0; + + BOOST_FOREACH(const std::string& message, txmon_messages) { + results->Set(arrayIndex, NanNew(message)); + arrayIndex++; + } + + const unsigned argc = 1; + Local argv[argc] = { + Local::New(isolate, results) + }; + + Local cb = txmon_callback.Get(isolate); + + cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); + + txmon_messages.clear(); + + uv_mutex_unlock(&txmon_mutex); + +} + +static bool +process_packets(CNode* pfrom) { + + bool fOk = true; + + std::deque::iterator it = pfrom->vRecvMsg.begin(); + while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { + // Don't bother if send buffer is too full to respond anyway + if (pfrom->nSendSize >= SendBufferSize()) { + break; + } + + // get next message + CNetMessage& msg = *it; + + // end, if an incomplete message is found + if (!msg.complete()) { + break; + } + + // at this point, any failure means we can delete the current message + it++; + + // Scan for message start + if (memcmp(msg.hdr.pchMessageStart, + Params().MessageStart(), MESSAGE_START_SIZE) != 0) { + fOk = false; + break; + } + + // Read header + CMessageHeader& hdr = msg.hdr; + if (!hdr.IsValid(Params().MessageStart())) { + continue; + } + string strCommand = hdr.GetCommand(); + + // Message size + unsigned int nMessageSize = hdr.nMessageSize; + + // Checksum + CDataStream& vRecv = msg.vRecv; + uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize); + unsigned int nChecksum = 0; + memcpy(&nChecksum, &hash, sizeof(nChecksum)); + if (nChecksum != hdr.nChecksum) { + continue; + } + + uv_mutex_lock(&txmon_mutex); + txmon_messages.push_back(strCommand); + uv_mutex_unlock(&txmon_mutex); + uv_async_send(&txmon_async); + + boost::this_thread::interruption_point(); + break; + } + + return fOk; +} + /** * Functions */ @@ -1347,6 +1466,7 @@ init(Handle target) { NODE_SET_METHOD(target, "verifyScript", VerifyScript); NODE_SET_METHOD(target, "sendTransaction", SendTransaction); NODE_SET_METHOD(target, "estimateFee", EstimateFee); + NODE_SET_METHOD(target, "startTxMon", StartTxMon); } diff --git a/src/bitcoindjs.h b/src/bitcoindjs.h index a05dca82..307f5968 100644 --- a/src/bitcoindjs.h +++ b/src/bitcoindjs.h @@ -30,3 +30,4 @@ NAN_METHOD(AddMempoolUncheckedTransaction); NAN_METHOD(VerifyScript); NAN_METHOD(SendTransaction); NAN_METHOD(EstimateFee); +NAN_METHOD(StartTxMon); From 49587b7e2dc6f2db7e5d3c4e036ed6a259811103 Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Thu, 30 Jul 2015 20:22:06 -0400 Subject: [PATCH 2/2] Process tx messages - Check for tx command when processing messages - Emit tx events from the daemon - Start the txmon when the daemon starts --- example/index.js | 5 ++++- lib/daemon.js | 13 ++++++------ src/bitcoindjs.cc | 51 +++++++++++++++++++++++++++-------------------- 3 files changed, 40 insertions(+), 29 deletions(-) diff --git a/example/index.js b/example/index.js index 6d9d74a0..62fb0584 100755 --- a/example/index.js +++ b/example/index.js @@ -17,7 +17,10 @@ var daemon = require('../').daemon({ daemon.on('ready', function() { console.log('ready'); - daemon.startTxMon(); +}); + +daemon.on('tx', function(txid) { + console.log('txid', txid); }); daemon.on('error', function(err) { diff --git a/lib/daemon.js b/lib/daemon.js index c3dcfc84..0133bfb2 100644 --- a/lib/daemon.js +++ b/lib/daemon.js @@ -213,6 +213,13 @@ Daemon.prototype.start = function(options, callback) { bitcoindjs.onTipUpdate(onTipUpdateListener); self.emit('ready', result); + + bitcoindjs.startTxMon(function(txs) { + for(var i = 0; i < txs.length; i++) { + self.emit('tx', txs[i]); + } + }); + }); setTimeout(function callee() { @@ -272,12 +279,6 @@ Daemon.prototype.start = function(options, callback) { }, 1000); }; -Daemon.prototype.startTxMon = function() { - return bitcoindjs.startTxMon(function(command) { - console.log('command', command); - }); -}; - Daemon.prototype.getBlock = function(blockhash, callback) { if (daemon.stopping) return []; return bitcoindjs.getBlock(blockhash, function(err, block) { diff --git a/src/bitcoindjs.cc b/src/bitcoindjs.cc index e96e9ca7..172286f9 100644 --- a/src/bitcoindjs.cc +++ b/src/bitcoindjs.cc @@ -75,7 +75,7 @@ static void async_get_tx_after(uv_work_t *req); static bool -process_packets(CNode* pfrom); +process_messages(CNode* pfrom); extern "C" void init(Handle); @@ -207,8 +207,6 @@ static bool set_cooked(void); NAN_METHOD(StartTxMon) { - // todo: if already running give an error - Isolate* isolate = Isolate::GetCurrent(); HandleScope scope(isolate); @@ -217,7 +215,7 @@ NAN_METHOD(StartTxMon) { txmon_callback = cb; CNodeSignals& nodeSignals = GetNodeSignals(); - nodeSignals.ProcessMessages.connect(&process_packets); + nodeSignals.ProcessMessages.connect(&process_messages); uv_async_init(uv_default_loop(), &txmon_async, txmon); @@ -255,7 +253,7 @@ txmon(uv_async_t *handle) { } static bool -process_packets(CNode* pfrom) { +process_messages(CNode* pfrom) { bool fOk = true; @@ -278,8 +276,7 @@ process_packets(CNode* pfrom) { it++; // Scan for message start - if (memcmp(msg.hdr.pchMessageStart, - Params().MessageStart(), MESSAGE_START_SIZE) != 0) { + if (memcmp(msg.hdr.pchMessageStart, Params().MessageStart(), MESSAGE_START_SIZE) != 0) { fOk = false; break; } @@ -289,25 +286,35 @@ process_packets(CNode* pfrom) { if (!hdr.IsValid(Params().MessageStart())) { continue; } - string strCommand = hdr.GetCommand(); - // Message size - unsigned int nMessageSize = hdr.nMessageSize; + std::string strCommand = hdr.GetCommand(); + + if (strCommand == (std::string)"tx") { + + // Message size + unsigned int nMessageSize = hdr.nMessageSize; + + // Checksum + CDataStream& vRecv = msg.vRecv; + uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize); + unsigned int nChecksum = 0; + memcpy(&nChecksum, &hash, sizeof(nChecksum)); + if (nChecksum != hdr.nChecksum) { + continue; + } + + CTransaction tx; + vRecv >> tx; + + string txHash = tx.GetHash().GetHex(); + + uv_mutex_lock(&txmon_mutex); + txmon_messages.push_back(txHash); + uv_mutex_unlock(&txmon_mutex); + uv_async_send(&txmon_async); - // Checksum - CDataStream& vRecv = msg.vRecv; - uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize); - unsigned int nChecksum = 0; - memcpy(&nChecksum, &hash, sizeof(nChecksum)); - if (nChecksum != hdr.nChecksum) { - continue; } - uv_mutex_lock(&txmon_mutex); - txmon_messages.push_back(strCommand); - uv_mutex_unlock(&txmon_mutex); - uv_async_send(&txmon_async); - boost::this_thread::interruption_point(); break; }