Process tx messages

- Check for tx command when processing messages
- Emit tx events from the daemon
- Start the txmon when the daemon starts
This commit is contained in:
Braydon Fuller 2015-07-30 20:22:06 -04:00
parent 6f56df0c6f
commit 49587b7e2d
3 changed files with 40 additions and 29 deletions

View File

@ -17,7 +17,10 @@ var daemon = require('../').daemon({
daemon.on('ready', function() {
console.log('ready');
daemon.startTxMon();
});
daemon.on('tx', function(txid) {
console.log('txid', txid);
});
daemon.on('error', function(err) {

View File

@ -213,6 +213,13 @@ Daemon.prototype.start = function(options, callback) {
bitcoindjs.onTipUpdate(onTipUpdateListener);
self.emit('ready', result);
bitcoindjs.startTxMon(function(txs) {
for(var i = 0; i < txs.length; i++) {
self.emit('tx', txs[i]);
}
});
});
setTimeout(function callee() {
@ -272,12 +279,6 @@ Daemon.prototype.start = function(options, callback) {
}, 1000);
};
Daemon.prototype.startTxMon = function() {
return bitcoindjs.startTxMon(function(command) {
console.log('command', command);
});
};
Daemon.prototype.getBlock = function(blockhash, callback) {
if (daemon.stopping) return [];
return bitcoindjs.getBlock(blockhash, function(err, block) {

View File

@ -75,7 +75,7 @@ static void
async_get_tx_after(uv_work_t *req);
static bool
process_packets(CNode* pfrom);
process_messages(CNode* pfrom);
extern "C" void
init(Handle<Object>);
@ -207,8 +207,6 @@ static bool
set_cooked(void);
NAN_METHOD(StartTxMon) {
// todo: if already running give an error
Isolate* isolate = Isolate::GetCurrent();
HandleScope scope(isolate);
@ -217,7 +215,7 @@ NAN_METHOD(StartTxMon) {
txmon_callback = cb;
CNodeSignals& nodeSignals = GetNodeSignals();
nodeSignals.ProcessMessages.connect(&process_packets);
nodeSignals.ProcessMessages.connect(&process_messages);
uv_async_init(uv_default_loop(), &txmon_async, txmon);
@ -255,7 +253,7 @@ txmon(uv_async_t *handle) {
}
static bool
process_packets(CNode* pfrom) {
process_messages(CNode* pfrom) {
bool fOk = true;
@ -278,8 +276,7 @@ process_packets(CNode* pfrom) {
it++;
// Scan for message start
if (memcmp(msg.hdr.pchMessageStart,
Params().MessageStart(), MESSAGE_START_SIZE) != 0) {
if (memcmp(msg.hdr.pchMessageStart, Params().MessageStart(), MESSAGE_START_SIZE) != 0) {
fOk = false;
break;
}
@ -289,25 +286,35 @@ process_packets(CNode* pfrom) {
if (!hdr.IsValid(Params().MessageStart())) {
continue;
}
string strCommand = hdr.GetCommand();
// Message size
unsigned int nMessageSize = hdr.nMessageSize;
std::string strCommand = hdr.GetCommand();
if (strCommand == (std::string)"tx") {
// Message size
unsigned int nMessageSize = hdr.nMessageSize;
// Checksum
CDataStream& vRecv = msg.vRecv;
uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize);
unsigned int nChecksum = 0;
memcpy(&nChecksum, &hash, sizeof(nChecksum));
if (nChecksum != hdr.nChecksum) {
continue;
}
CTransaction tx;
vRecv >> tx;
string txHash = tx.GetHash().GetHex();
uv_mutex_lock(&txmon_mutex);
txmon_messages.push_back(txHash);
uv_mutex_unlock(&txmon_mutex);
uv_async_send(&txmon_async);
// Checksum
CDataStream& vRecv = msg.vRecv;
uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize);
unsigned int nChecksum = 0;
memcpy(&nChecksum, &hash, sizeof(nChecksum));
if (nChecksum != hdr.nChecksum) {
continue;
}
uv_mutex_lock(&txmon_mutex);
txmon_messages.push_back(strCommand);
uv_mutex_unlock(&txmon_mutex);
uv_async_send(&txmon_async);
boost::this_thread::interruption_point();
break;
}