bitcoind: add addresstxid event

This commit is contained in:
Braydon Fuller 2016-05-17 20:20:11 -04:00
parent a48bcaf900
commit 4df9b5f6cf
3 changed files with 136 additions and 8 deletions

View File

@ -2,6 +2,8 @@
var events = require('events');
var util = require('util');
var bitcore = require('bitcore-lib');
var _ = bitcore.deps._;
/**
* The bus represents a connection to node, decoupled from the transport layer, that can

View File

@ -5,6 +5,7 @@ var spawn = require('child_process').spawn;
var util = require('util');
var mkdirp = require('mkdirp');
var bitcore = require('bitcore-lib');
var Address = bitcore.Address;
var zmq = require('zmq');
var async = require('async');
var LRU = require('lru-cache');
@ -44,6 +45,7 @@ function Bitcoin(options) {
this.subscriptions = {};
this.subscriptions.rawtransaction = [];
this.subscriptions.hashblock = [];
this.subscriptions.address = {};
// set initial settings
this._initDefaults(options);
@ -189,13 +191,19 @@ Bitcoin.prototype.getPublishEvents = function() {
scope: this,
subscribe: this.subscribe.bind(this, 'hashblock'),
unsubscribe: this.unsubscribe.bind(this, 'hashblock')
},
{
name: 'bitcoind/addresstxid',
scope: this,
subscribe: this.subscribeAddress.bind(this),
unsubscribe: this.unsubscribeAddress.bind(this)
}
];
};
Bitcoin.prototype.subscribe = function(name, emitter) {
this.subscriptions[name].push(emitter);
log.info(emitter.remoteAddress, 'subscribing:', 'bitcoind/' + name, 'total:', this.subscriptions[name].length);
log.info(emitter.remoteAddress, 'subscribe:', 'bitcoind/' + name, 'total:', this.subscriptions[name].length);
};
Bitcoin.prototype.unsubscribe = function(name, emitter) {
@ -203,7 +211,54 @@ Bitcoin.prototype.unsubscribe = function(name, emitter) {
if (index > -1) {
this.subscriptions[name].splice(index, 1);
}
log.info(emitter.remoteAddress, 'unsubscribing:', 'bitcoind/' + name, 'total:', this.subscriptions[name].length);
log.info(emitter.remoteAddress, 'unsubscribe:', 'bitcoind/' + name, 'total:', this.subscriptions[name].length);
};
Bitcoin.prototype.subscribeAddress = function(emitter, addresses) {
for(var i = 0; i < addresses.length; i++) {
var hashHex = bitcore.Address(addresses[i]).hashBuffer.toString('hex');
if(!this.subscriptions.address[hashHex]) {
this.subscriptions.address[hashHex] = [];
}
this.subscriptions.address[hashHex].push(emitter);
}
log.info(emitter.remoteAddress, 'subscribe:', 'bitcoind/addresstxid', 'total:', _.size(this.subscriptions.address));
};
Bitcoin.prototype.unsubscribeAddress = function(emitter, addresses) {
if(!addresses) {
return this.unsubscribeAddressAll(emitter);
}
for(var i = 0; i < addresses.length; i++) {
var hashHex = bitcore.Address(addresses[i]).hashBuffer.toString('hex');
if(this.subscriptions.address[hashHex]) {
var emitters = this.subscriptions.address[hashHex];
var index = emitters.indexOf(emitter);
if(index > -1) {
emitters.splice(index, 1);
}
}
}
log.info(emitter.remoteAddress, 'unsubscribe:', 'bitcoind/addresstxid', 'total:', _.size(this.subscriptions.address));
};
/**
* A helper function for the `unsubscribe` method to unsubscribe from all addresses.
* @param {String} name - The name of the event
* @param {EventEmitter} emitter - An instance of an event emitter
*/
Bitcoin.prototype.unsubscribeAddressAll = function(emitter) {
for(var hashHex in this.subscriptions.address) {
var emitters = this.subscriptions.address[hashHex];
var index = emitters.indexOf(emitter);
if(index > -1) {
emitters.splice(index, 1);
}
if (emitters.length === 0) {
delete this.subscriptions.address[hashHex];
}
}
log.info(emitter.remoteAddress, 'unsubscribe:', 'bitcoind/addresstxid', 'total:', _.size(this.subscriptions.address));
};
Bitcoin.prototype._getDefaultConfig = function() {
@ -477,12 +532,72 @@ Bitcoin.prototype._updateTip = function(node, message) {
});
}
}
};
Bitcoin.prototype._getAddressHashesFromInput = function(input, addressHashes) {
if (!input.script) {
return;
}
var hashBuffer;
var script = input.script;
if (script.isPublicKeyHashIn()) {
hashBuffer = bitcore.crypto.Hash.sha256ripemd160(input.script.chunks[1].buf);
} else if (script.isScriptHashIn()) {
hashBuffer = bitcore.crypto.Hash.sha256ripemd160(input.script.chunks[input.script.chunks.length - 1].buf);
} else {
return;
}
addressHashes.push(hashBuffer.toString('hex'));
};
Bitcoin.prototype._getAddressHashesFromOutput = function(output, addressHashes) {
if (!output.script) {
return;
}
var script = output.script;
var hashBuffer;
if (script.isPublicKeyHashOut()) {
hashBuffer = script.chunks[2].buf;
} else if (script.isScriptHashOut()) {
hashBuffer = script.chunks[1].buf;
} else {
return;
}
addressHashes.push(hashBuffer.toString('hex'));
};
Bitcoin.prototype._getAddressHashesFromTransaction = function(transaction) {
var addressHashes = [];
for (var i = 0; i < transaction.inputs.length; i++) {
var input = transaction.inputs[i];
this._getAddressHashesFromInput(input, addressHashes);
}
for (var j = 0; j < transaction.outputs.length; j++) {
var output = transaction.outputs[j];
this._getAddressHashesFromOutput(output, addressHashes);
}
return addressHashes;
};
Bitcoin.prototype._notifyAddressTxidSubscribers = function(txid, transaction) {
var addressHashes = this._getAddressHashesFromTransaction(transaction);
for (var i = 0; i < addressHashes.length; i++) {
if(this.subscriptions.address[addressHashes[i]]) {
var emitters = this.subscriptions.address[addressHashes[i]];
for(var j = 0; j < emitters.length; j++) {
emitters[j].emit('bitcoind/addresstxid', txid);
}
}
}
};
Bitcoin.prototype._zmqTransactionHandler = function(node, message) {
var self = this;
var id = bitcore.crypto.Hash.sha256sha256(message).toString('binary');
var hash = bitcore.crypto.Hash.sha256sha256(message);
var id = hash.toString('binary');
if (!self.zmqKnownTransactions.get(id)) {
self.zmqKnownTransactions.set(id, true);
self.emit('tx', message);
@ -491,6 +606,12 @@ Bitcoin.prototype._zmqTransactionHandler = function(node, message) {
for (var i = 0; i < this.subscriptions.rawtransaction.length; i++) {
this.subscriptions.rawtransaction[i].emit('bitcoind/rawtransaction', message.toString('hex'));
}
var tx = bitcore.Transaction();
tx.fromString(message);
var txid = bitcore.util.buffer.reverse(hash).toString('hex');
self._notifyAddressTxidSubscribers(txid, tx);
}
};

View File

@ -74,6 +74,7 @@ describe('Bitcoin Service', function() {
it('will set subscriptions', function() {
var bitcoind = new BitcoinService(baseConfig);
bitcoind.subscriptions.should.deep.equal({
address: {},
rawtransaction: [],
hashblock: []
});
@ -100,7 +101,7 @@ describe('Bitcoin Service', function() {
var bitcoind = new BitcoinService(baseConfig);
var events = bitcoind.getPublishEvents();
should.exist(events);
events.length.should.equal(2);
events.length.should.equal(3);
events[0].name.should.equal('bitcoind/rawtransaction');
events[0].scope.should.equal(bitcoind);
events[0].subscribe.should.be.a('function');
@ -109,6 +110,10 @@ describe('Bitcoin Service', function() {
events[1].scope.should.equal(bitcoind);
events[1].subscribe.should.be.a('function');
events[1].unsubscribe.should.be.a('function');
events[2].name.should.equal('bitcoind/addresstxid');
events[2].scope.should.equal(bitcoind);
events[2].subscribe.should.be.a('function');
events[2].unsubscribe.should.be.a('function');
});
it('will call subscribe/unsubscribe with correct args', function() {
var bitcoind = new BitcoinService(baseConfig);
@ -718,7 +723,7 @@ describe('Bitcoin Service', function() {
describe('#_zmqTransactionHandler', function() {
it('will emit to subscribers', function(done) {
var bitcoind = new BitcoinService(baseConfig);
var expectedBuffer = new Buffer('abcdef', 'hex');
var expectedBuffer = new Buffer(txhex, 'hex');
var emitter = new EventEmitter();
bitcoind.subscriptions.rawtransaction.push(emitter);
emitter.on('bitcoind/rawtransaction', function(hex) {
@ -731,7 +736,7 @@ describe('Bitcoin Service', function() {
});
it('will NOT emit to subscribers more than once for the same tx', function(done) {
var bitcoind = new BitcoinService(baseConfig);
var expectedBuffer = new Buffer('abcdef', 'hex');
var expectedBuffer = new Buffer(txhex, 'hex');
var emitter = new EventEmitter();
bitcoind.subscriptions.rawtransaction.push(emitter);
emitter.on('bitcoind/rawtransaction', function() {
@ -743,7 +748,7 @@ describe('Bitcoin Service', function() {
});
it('will emit "tx" event', function(done) {
var bitcoind = new BitcoinService(baseConfig);
var expectedBuffer = new Buffer('abcdef', 'hex');
var expectedBuffer = new Buffer(txhex, 'hex');
bitcoind.on('tx', function(buffer) {
buffer.should.be.instanceof(Buffer);
buffer.toString('hex').should.equal(expectedBuffer.toString('hex'));
@ -754,7 +759,7 @@ describe('Bitcoin Service', function() {
});
it('will NOT emit "tx" event more than once for the same tx', function(done) {
var bitcoind = new BitcoinService(baseConfig);
var expectedBuffer = new Buffer('abcdef', 'hex');
var expectedBuffer = new Buffer(txhex, 'hex');
bitcoind.on('tx', function() {
done();
});