Merge pull request #114 from braydonf/address-txsignal

Process incoming transactions for the Address Module
This commit is contained in:
Chris Kleeschulte 2015-08-10 14:12:47 -04:00
commit 6f3bae027d
5 changed files with 248 additions and 50 deletions

View File

@ -9,9 +9,7 @@ var bitcore = require('bitcore');
var $ = bitcore.util.preconditions;
var BufferWriter = bitcore.encoding.BufferWriter;
var errors = require('./errors');
var levelup = chainlib.deps.levelup;
var log = chainlib.log;
var Address = bitcore.Address;
var BaseModule = require('./module');
var AddressModule = require('./modules/address');
@ -27,17 +25,13 @@ function DB(options) {
this.Transaction = Transaction;
this.network = bitcore.Networks.get(options.network) || bitcore.Networks.testnet;
// Modules to be loaded when ready
this._modules = options.modules || [];
this._modules.push(AddressModule);
this.modules = [];
// Add address module
this.addModule(AddressModule);
// Add other modules
if(options.modules && options.modules.length) {
for(var i = 0; i < options.modules.length; i++) {
this.addModule(options.modules[i]);
}
}
}
util.inherits(DB, BaseDB);

View File

@ -20,6 +20,9 @@ var AddressModule = function(options) {
this.subscriptions = {};
this.subscriptions.transaction = {};
this.subscriptions.balance = {};
this.db.bitcoind.on('tx', this.transactionHandler.bind(this));
};
inherits(AddressModule, BaseModule);
@ -56,6 +59,73 @@ AddressModule.prototype.getPublishEvents = function() {
];
};
/**
* Will process each output of a transaction from the daemon "tx" event, and construct
* an object with the data for the message to be relayed to any subscribers for an address.
*
* @param {Object} messages - An object to collect messages
* @param {Transaction} tx - Instance of the transaction
* @param {Number} outputIndex - The index of the output in the transaction
* @param {Boolean} rejected - If the transaction was rejected by the mempool
*/
AddressModule.prototype.transactionOutputHandler = function(messages, tx, outputIndex, rejected) {
var script = tx.outputs[outputIndex].script;
// If the script is invalid skip
if (!script) {
return;
}
// Find the address for the output
var address = script.toAddress(this.db.network);
if (!address && script.isPublicKeyOut()) {
var pubkey = script.chunks[0].buf;
address = Address.fromPublicKey(new PublicKey(pubkey), this.db.network);
} else if (!address){
return;
}
// Collect data to publish to address subscribers
if (messages[address]) {
messages[address].outputIndexes.push(outputIndex);
} else {
messages[address] = {
tx: tx,
outputIndexes: [outputIndex],
address: address.toString(),
rejected: rejected
};
}
};
/**
* This will handle data from the daemon "tx" event, go through each of the outputs
* and send messages to any subscribers for a particular address.
*
* @param {Object} txInfo - The data from the daemon.on('tx') event
* @param {Buffer} txInfo.buffer - The transaction buffer
* @param {Boolean} txInfo.mempool - If the transaction was accepted in the mempool
* @param {String} txInfo.hash - The hash of the transaction
*/
AddressModule.prototype.transactionHandler = function(txInfo) {
// Basic transaction format is handled by the daemon
// and we can safely assume the buffer is properly formatted.
var tx = bitcore.Transaction().fromBuffer(txInfo.buffer);
var messages = {};
var outputsLength = tx.outputs.length;
for (var i = 0; i < outputsLength; i++) {
this.transactionOutputHandler(messages, tx, i, !txInfo.mempool);
}
for (var key in messages) {
this.transactionEventHandler(messages[key]);
}
};
AddressModule.prototype.blockHandler = function(block, addOutput, callback) {
var txs = this.db.getTransactionsFromBlock(block);
@ -74,6 +144,9 @@ AddressModule.prototype.blockHandler = function(block, addOutput, callback) {
var inputs = tx.inputs;
var outputs = tx.outputs;
// Subscription messages
var txmessages = {};
var outputLength = outputs.length;
for (var j = 0; j < outputLength; j++) {
var output = outputs[j];
@ -109,22 +182,38 @@ AddressModule.prototype.blockHandler = function(block, addOutput, callback) {
value: value
});
// publish events to any subscribers
this.transactionEventHandler(block, address, tx);
// Collect data for subscribers
if (txmessages[addressStr]) {
txmessages[addressStr].outputIndexes.push(outputIndex);
} else {
txmessages[addressStr] = {
tx: tx,
height: block.__height,
outputIndexes: [outputIndex],
address: addressStr,
timestamp: block.timestamp
};
}
this.balanceEventHandler(block, address);
}
// Publish events to any subscribers for this transaction
for (var addressKey in txmessages) {
this.transactionEventHandler(txmessages[addressKey]);
}
if(tx.isCoinbase()) {
continue;
}
for(var j = 0; j < inputs.length; j++) {
var input = inputs[j].toObject();
for(var k = 0; k < inputs.length; k++) {
var input = inputs[k].toObject();
operations.push({
type: action,
key: [AddressModule.PREFIXES.SPENTS, input.prevTxId, input.outputIndex].join('-'),
value: [txid, j].join(':')
value: [txid, k].join(':')
});
}
}
@ -134,11 +223,21 @@ AddressModule.prototype.blockHandler = function(block, addOutput, callback) {
});
};
AddressModule.prototype.transactionEventHandler = function(block, address, tx) {
if(this.subscriptions.transaction[address]) {
var emitters = this.subscriptions.transaction[address];
/**
* @param {Object} obj
* @param {Transaction} obj.tx - The transaction
* @param {String} [obj.address] - The address for the subscription
* @param {Array} [obj.outputIndexes] - Indexes of the inputs that includes the address
* @param {Array} [obj.inputIndexes] - Indexes of the outputs that includes the address
* @param {Date} [obj.timestamp] - The time of the block the transaction was included
* @param {Number} [obj.height] - The height of the block the transaction was included
* @param {Boolean} [obj.rejected] - If the transaction was not accepted in the mempool
*/
AddressModule.prototype.transactionEventHandler = function(obj) {
if(this.subscriptions.transaction[obj.address]) {
var emitters = this.subscriptions.transaction[obj.address];
for(var k = 0; k < emitters.length; k++) {
emitters[k].emit('transaction', address, tx, block);
emitters[k].emit('transaction', obj);
}
}
};

View File

@ -400,6 +400,15 @@ Node.prototype._initializeDatabase = function() {
// Database
this.db.on('ready', function() {
// Add all db option modules
var modules = self.db._modules;
if(modules && modules.length) {
for(var i = 0; i < modules.length; i++) {
self.db.addModule(modules[i]);
}
}
log.info('Bitcoin Database Ready');
self.chain.initialize();
});
@ -408,6 +417,7 @@ Node.prototype._initializeDatabase = function() {
Error.captureStackTrace(err);
self.emit('error', err);
});
};
Node.prototype._initializeChain = function() {
@ -416,7 +426,7 @@ Node.prototype._initializeChain = function() {
// Chain
this.chain.on('ready', function() {
log.info('Bitcoin Chain Ready');
self._syncBitcoind();
self._syncBitcoind();
self.emit('ready');
});
@ -428,8 +438,6 @@ Node.prototype._initializeChain = function() {
Node.prototype._initialize = function() {
var self = this;
// DB References
this.db.chain = this.chain;
this.db.Block = this.Block;

View File

@ -11,11 +11,17 @@ var errors = bitcoindjs.errors;
var chainlib = require('chainlib');
var levelup = chainlib.deps.levelup;
var mockdb = {
bitcoind: {
on: sinon.stub()
}
};
describe('AddressModule', function() {
describe('#getAPIMethods', function() {
it('should return the correct methods', function() {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
var methods = am.getAPIMethods();
methods.length.should.equal(5);
});
@ -23,7 +29,7 @@ describe('AddressModule', function() {
describe('#getPublishEvents', function() {
it('will return an array of publish event objects', function() {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
am.subscribe = sinon.spy();
am.unsubscribe = sinon.spy();
var events = am.getPublishEvents();
@ -55,14 +61,52 @@ describe('AddressModule', function() {
});
});
describe('#transactionOutputHandler', function() {
it('create a message for an address', function() {
var txBuf = new Buffer('01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000', 'hex');
var tx = bitcore.Transaction().fromBuffer(txBuf);
var am = new AddressModule({db: mockdb});
var address = '12c6DSiU4Rq3P4ZxziKxzrL5LmMBrzjrJX';
var messages = {};
am.transactionOutputHandler(messages, tx, 0, true);
should.exist(messages[address]);
var message = messages[address];
message.tx.should.equal(tx);
message.outputIndexes.should.deep.equal([0]);
message.address.should.equal(address);
message.rejected.should.equal(true);
});
});
describe('#transactionHandler', function() {
it('will pass outputs to transactionOutputHandler and call transactionEventHandler', function() {
var txBuf = new Buffer('01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000', 'hex');
var am = new AddressModule({db: mockdb});
var address = '12c6DSiU4Rq3P4ZxziKxzrL5LmMBrzjrJX';
var message = {};
am.transactionOutputHandler = function(messages) {
messages[address] = message;
};
am.transactionEventHandler = sinon.spy();
am.transactionHandler({
buffer: txBuf
});
am.transactionEventHandler.callCount.should.equal(1);
});
});
describe('#blockHandler', function() {
var block = bitcore.Block.fromString(blockData);
var am;
var db = {
getTransactionsFromBlock: function() {
return block.transactions.slice(0, 8);
},
bitcoind: {
on: sinon.stub()
}
};
var am = new AddressModule({db: db, network: 'livenet'});
var block = bitcore.Block.fromString(blockData);
var data = [
{
@ -110,6 +154,10 @@ describe('AddressModule', function() {
var key64 = data[2].key;
var value64 = data[2].value;
before(function() {
am = new AddressModule({db: db, network: 'livenet'});
});
it('should create the correct operations when updating/adding outputs', function(done) {
am.blockHandler({__height: 345003, timestamp: new Date(1424836934000)}, true, function(err, operations) {
should.not.exist(err);
@ -161,6 +209,9 @@ describe('AddressModule', function() {
var db = {
getTransactionsFromBlock: function() {
return transactions;
},
bitcoind: {
on: sinon.stub()
}
};
@ -177,6 +228,9 @@ describe('AddressModule', function() {
var db = {
getTransactionsFromBlock: function() {
return block.transactions.slice(0, 8);
},
bitcoind: {
on: sinon.stub()
}
};
var am = new AddressModule({db: db, network: 'livenet'});
@ -201,26 +255,37 @@ describe('AddressModule', function() {
describe('#transactionEventHandler', function() {
it('will emit a transaction if there is a subscriber', function(done) {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
var emitter = new EventEmitter();
am.subscriptions.transaction = {
'1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N': [emitter]
};
var block = {};
var block = {
__height: 0,
timestamp: new Date()
};
var tx = {};
emitter.on('transaction', function(address, t, b) {
address.should.equal('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N');
t.should.equal(tx);
b.should.equal(block);
emitter.on('transaction', function(obj) {
obj.address.should.equal('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N');
obj.tx.should.equal(tx);
obj.timestamp.should.equal(block.timestamp);
obj.height.should.equal(block.__height);
obj.outputIndexes.should.deep.equal([1]);
done();
});
am.transactionEventHandler(block, '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N', tx);
am.transactionEventHandler({
address: '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N',
height: block.__height,
timestamp: block.timestamp,
outputIndexes: [1],
tx: tx
});
});
});
describe('#balanceEventHandler', function() {
it('will emit a balance if there is a subscriber', function(done) {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
var emitter = new EventEmitter();
am.subscriptions.balance = {
'1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N': [emitter]
@ -240,7 +305,7 @@ describe('AddressModule', function() {
describe('#subscribe', function() {
it('will add emitters to the subscribers array (transaction)', function() {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
var emitter = new EventEmitter();
var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N';
@ -257,7 +322,7 @@ describe('AddressModule', function() {
am.subscriptions.transaction[address].should.deep.equal([emitter, emitter2]);
});
it('will add an emitter to the subscribers array (balance)', function() {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
var emitter = new EventEmitter();
var name = 'balance';
var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N';
@ -276,7 +341,7 @@ describe('AddressModule', function() {
describe('#unsubscribe', function() {
it('will remove emitter from subscribers array (transaction)', function() {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
var emitter = new EventEmitter();
var emitter2 = new EventEmitter();
var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N';
@ -286,7 +351,7 @@ describe('AddressModule', function() {
am.subscriptions.transaction[address].should.deep.equal([emitter2]);
});
it('will remove emitter from subscribers array (balance)', function() {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
var emitter = new EventEmitter();
var emitter2 = new EventEmitter();
var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N';
@ -296,7 +361,7 @@ describe('AddressModule', function() {
am.subscriptions.balance[address].should.deep.equal([emitter2]);
});
it('should unsubscribe from all addresses if no addresses are specified', function() {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
var emitter = new EventEmitter();
var emitter2 = new EventEmitter();
am.subscriptions.balance = {
@ -313,7 +378,7 @@ describe('AddressModule', function() {
describe('#getBalance', function() {
it('should sum up the unspent outputs', function(done) {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
var outputs = [
{satoshis: 1000}, {satoshis: 2000}, {satoshis: 3000}
];
@ -326,7 +391,7 @@ describe('AddressModule', function() {
});
it('will handle error from unspent outputs', function(done) {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
am.getUnspentOutputs = sinon.stub().callsArgWith(2, new Error('error'));
am.getBalance('someaddress', false, function(err) {
should.exist(err);
@ -338,8 +403,17 @@ describe('AddressModule', function() {
});
describe('#getOutputs', function() {
var am = new AddressModule({db: {}});
var am;
var address = '1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W';
var db = {
bitcoind: {
on: sinon.stub()
}
};
before(function() {
am = new AddressModule({db: db});
});
it('should get outputs for an address', function(done) {
var readStream1 = new EventEmitter();
@ -433,7 +507,7 @@ describe('AddressModule', function() {
];
var i = 0;
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
am.getOutputs = sinon.stub().callsArgWith(2, null, outputs);
am.isUnspent = function(output, queryMempool, callback) {
callback(!outputs[i].spent);
@ -449,7 +523,7 @@ describe('AddressModule', function() {
});
});
it('should handle an error from getOutputs', function(done) {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
am.getOutputs = sinon.stub().callsArgWith(2, new Error('error'));
am.getUnspentOutputs('1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W', false, function(err, outputs) {
should.exist(err);
@ -458,7 +532,7 @@ describe('AddressModule', function() {
});
});
it('should handle when there are no outputs', function(done) {
var am = new AddressModule({});
var am = new AddressModule({db: mockdb});
am.getOutputs = sinon.stub().callsArgWith(2, null, []);
am.getUnspentOutputs('1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W', false, function(err, outputs) {
should.exist(err);
@ -470,7 +544,11 @@ describe('AddressModule', function() {
});
describe('#isUnspent', function() {
var am = new AddressModule({});
var am;
before(function() {
am = new AddressModule({db: mockdb});
});
it('should give true when isSpent() gives false', function(done) {
am.isSpent = sinon.stub().callsArgWith(2, false);
@ -498,10 +576,19 @@ describe('AddressModule', function() {
});
describe('#isSpent', function() {
var am = new AddressModule({db: {}});
am.db.bitcoind = {
isSpent: sinon.stub().returns(true)
var am;
var db = {
bitcoind: {
on: sinon.stub()
}
};
before(function() {
am = new AddressModule({db: db});
am.db.bitcoind = {
isSpent: sinon.stub().returns(true),
on: sinon.stub()
};
});
it('should give true if bitcoind.isSpent gives true', function(done) {
am.isSpent('output', true, function(spent) {
@ -516,6 +603,9 @@ describe('AddressModule', function() {
var db = {
store: {
get: sinon.stub().callsArgWith(1, null, 'spendtxid:1')
},
bitcoind: {
on: sinon.stub()
}
};
var am = new AddressModule({db: db});
@ -616,6 +706,9 @@ describe('AddressModule', function() {
}
}
callback(new Error('tx ' + txid + ' not found'));
},
bitcoind: {
on: sinon.stub()
}
};
var am = new AddressModule({db: db});

View File

@ -437,6 +437,9 @@ describe('Bitcoind Node', function() {
it('will call chain.initialize() on ready event', function(done) {
var node = new Node({});
node.db = new EventEmitter();
node.db.addModule = sinon.spy();
var module = {};
node.db._modules = [module];
node.chain = {
initialize: sinon.spy()
};
@ -445,6 +448,7 @@ describe('Bitcoind Node', function() {
setImmediate(function() {
chainlib.log.info.callCount.should.equal(1);
chainlib.log.info.restore();
node.db.addModule.callCount.should.equal(1);
node.chain.initialize.callCount.should.equal(1);
done();
});