Merge pull request #57 from braydonf/feature/pub-sub

Event Bus Subscriptions
This commit is contained in:
Chris Kleeschulte 2015-07-30 12:40:37 -04:00
commit 7cf5570071
9 changed files with 399 additions and 10 deletions

View File

@ -106,7 +106,7 @@ the blockchain. One built-in module is the address module which exposes the API
### Writing a Module
A new module can be created by inheriting from `BitcoindJS.Module`, implementing the methods `blockHandler()` and `getAPIMethods()`, and any additional methods for querying the data. Here is an example:
A new module can be created by inheriting from `BitcoindJS.Module`, implementing the methods `blockHandler()`, `getAPIMethods()`, `getPublishEvents()` and any additional methods for querying the data. Here is an example:
```js
var inherits = require('util').inherits;
@ -158,6 +158,31 @@ MyModule.prototype.getAPIMethods = function() {
];
};
/**
* the bus events available for subscription
* @return {Array} array of events
*/
MyModule.prototype.getPublishEvents = function() {
return [
{
name: 'custom',
scope: this,
subscribe: this.subscribeCustom,
unsubscribe: this.unsubscribeCustom
}
]
};
/**
* Will keep track of event listeners to later publish and emit events.
*/
MyModule.prototype.subscribeCustom = function(emitter, param) {
if(!this.subscriptions[param]) {
this.subscriptions[param] = [];
}
this.subscriptions[param].push(emitter);
}
MyModule.prototype.getData = function(arg1, callback) {
// You can query the data by reading from the leveldb store on db
this.db.store.get(arg1, callback);

43
lib/bus.js Normal file
View File

@ -0,0 +1,43 @@
'use strict';
var events = require('events');
var util = require('util');
function Bus(params) {
events.EventEmitter.call(this);
this.db = params.db;
}
util.inherits(Bus, events.EventEmitter);
Bus.prototype.subscribe = function(name) {
for (var i = 0; i < this.db.modules.length; i++) {
var mod = this.db.modules[i];
var events = mod.getPublishEvents();
for (var j = 0; j < events.length; j++) {
var event = events[j];
var params = Array.prototype.slice.call(arguments).slice(1);
params.unshift(this);
if (name === event.name) {
event.subscribe.apply(event.scope, params);
}
}
}
};
Bus.prototype.unsubscribe = function(name) {
for (var i = 0; i < this.db.modules.length; i++) {
var mod = this.db.modules[i];
var events = mod.getPublishEvents();
for (var j = 0; j < events.length; j++) {
var event = events[j];
var params = Array.prototype.slice.call(arguments).slice(1);
params.unshift(this);
if (name === event.name) {
event.unsubscribe.apply(event.scope, params);
}
}
}
};
module.exports = Bus;

View File

@ -15,6 +15,18 @@ Module.prototype.blockHandler = function(block, add, callback) {
setImmediate(callback);
};
/**
* the bus events available for subscription
* @return {Array} an array of event info
*/
Module.prototype.getPublishEvents = function() {
// Example:
// return [
// ['eventname', this, this.subscribeEvent, this.unsubscribeEvent],
// ];
return [];
};
/**
* the API methods to expose
* @return {Array} return array of methods
@ -33,4 +45,4 @@ Module.prototype.getAPIMethods = function() {
//
// };
module.exports = Module;
module.exports = Module;

View File

@ -7,11 +7,17 @@ var chainlib = require('chainlib');
var log = chainlib.log;
var errors = chainlib.errors;
var bitcore = require('bitcore');
var $ = bitcore.util.preconditions;
var EventEmitter = require('events').EventEmitter;
var PublicKey = bitcore.PublicKey;
var Address = bitcore.Address;
var AddressModule = function(options) {
BaseModule.call(this, options);
this.subscriptions = {};
this.subscriptions.transaction = {};
this.subscriptions.balance = {};
};
inherits(AddressModule, BaseModule);
@ -29,6 +35,23 @@ AddressModule.prototype.getAPIMethods = function() {
];
};
AddressModule.prototype.getPublishEvents = function() {
return [
{
name: 'transaction',
scope: this,
subscribe: this.subscribe.bind(this, 'transaction'),
unsubscribe: this.unsubscribe.bind(this, 'transaction')
},
{
name: 'balance',
scope: this,
subscribe: this.subscribe.bind(this, 'balance'),
unsubscribe: this.unsubscribe.bind(this, 'balance')
}
];
};
AddressModule.prototype.blockHandler = function(block, addOutput, callback) {
var txs = this.db.getTransactionsFromBlock(block);
@ -82,6 +105,11 @@ AddressModule.prototype.blockHandler = function(block, addOutput, callback) {
key: [AddressModule.PREFIXES.OUTPUTS, address, timestamp, txid, outputIndex].join('-'),
value: [output.satoshis, script, height].join(':')
});
// publish events to any subscribers
this.transactionEventHandler(block, address, tx);
this.balanceEventHandler(block, address);
}
if(tx.isCoinbase()) {
@ -95,6 +123,57 @@ AddressModule.prototype.blockHandler = function(block, addOutput, callback) {
});
};
AddressModule.prototype.transactionEventHandler = function(block, address, tx) {
if(this.subscriptions.transaction[address]) {
var emitters = this.subscriptions.transaction[address];
for(var k = 0; k < emitters.length; k++) {
emitters[k].emit('transaction', address, tx, block);
}
}
};
AddressModule.prototype.balanceEventHandler = function(block, address) {
if(this.subscriptions.balance[address]) {
var emitters = this.subscriptions.balance[address];
this.getBalance(address, true, function(err, balance) {
if(err) {
return this.emit(err);
}
for(var i = 0; i < emitters.length; i++) {
emitters[i].emit('balance', address, balance, block);
}
});
}
};
AddressModule.prototype.subscribe = function(name, emitter, addresses) {
$.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter');
$.checkArgument(Array.isArray(addresses), 'Second argument is expected to be an Array of addresses');
for(var i = 0; i < addresses.length; i++) {
if(!this.subscriptions[name][addresses[i]]) {
this.subscriptions[name][addresses[i]] = [];
}
this.subscriptions[name][addresses[i]].push(emitter);
}
};
AddressModule.prototype.unsubscribe = function(name, emitter, addresses) {
$.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter');
$.checkArgument(Array.isArray(addresses), 'Second argument is expected to be an Array of addresses');
for(var i = 0; i < addresses.length; i++) {
if(this.subscriptions[name][addresses[i]]) {
var emitters = this.subscriptions[name][addresses[i]];
var index = emitters.indexOf(emitter);
if(index > -1) {
emitters.splice(index, 1);
}
}
}
};
AddressModule.prototype.getBalance = function(address, queryMempool, callback) {
this.getUnspentOutputs(address, queryMempool, function(err, outputs) {
if(err) {
@ -202,4 +281,4 @@ AddressModule.prototype.isSpent = function(output, queryMempool, callback) {
});
};
module.exports = AddressModule;
module.exports = AddressModule;

View File

@ -16,6 +16,7 @@ var _ = bitcore.deps._;
var $ = bitcore.util.preconditions;
var genesis = require('./genesis.json');
var daemon = require('./daemon');
var Bus = require('./bus');
function Node(config) {
BaseNode.call(this, config);
@ -24,6 +25,10 @@ function Node(config) {
util.inherits(Node, BaseNode);
Node.prototype.openBus = function() {
return new Bus({db: this.db});
};
Node.prototype._loadConfiguration = function(config) {
var self = this;
this._loadBitcoinConf(config);

View File

@ -38,14 +38,14 @@
"bitcoind"
],
"dependencies": {
"async": "1.3.0",
"bindings": "^1.2.1",
"mkdirp": "0.5.0",
"nan": "1.3.0",
"tiny": "0.0.10",
"chainlib": "^0.1.1",
"errno": "^0.1.2",
"async": "1.3.0",
"memdown": "^1.0.0"
"memdown": "^1.0.0",
"mkdirp": "0.5.0",
"nan": "1.3.0",
"tiny": "0.0.10"
},
"devDependencies": {
"benchmark": "1.0.0",

61
test/bus.unit.js Normal file
View File

@ -0,0 +1,61 @@
'use strict';
var should = require('chai').should();
var sinon = require('sinon');
var Bus = require('../lib/bus');
describe('Bus', function() {
describe('#subscribe', function() {
it('will call modules subscribe function with the correct arguments', function() {
var subscribe = sinon.spy();
var db = {
modules: [
{
getPublishEvents: sinon.stub().returns([
{
name: 'test',
scope: this,
subscribe: subscribe,
}
])
}
]
};
var bus = new Bus({db: db});
bus.subscribe('test', 'a', 'b', 'c');
subscribe.callCount.should.equal(1);
subscribe.args[0][0].should.equal(bus);
subscribe.args[0][1].should.equal('a');
subscribe.args[0][2].should.equal('b');
subscribe.args[0][3].should.equal('c');
});
});
describe('#unsubscribe', function() {
it('will call modules unsubscribe function with the correct arguments', function() {
var unsubscribe = sinon.spy();
var db = {
modules: [
{
getPublishEvents: sinon.stub().returns([
{
name: 'test',
scope: this,
unsubscribe: unsubscribe
}
])
}
]
};
var bus = new Bus({db: db});
bus.unsubscribe('test', 'a', 'b', 'c');
unsubscribe.callCount.should.equal(1);
unsubscribe.args[0][0].should.equal(bus);
unsubscribe.args[0][1].should.equal('a');
unsubscribe.args[0][2].should.equal('b');
unsubscribe.args[0][3].should.equal('c');
});
});
});

View File

@ -2,8 +2,6 @@
var should = require('chai').should();
var sinon = require('sinon');
var chainlib = require('chainlib');
var levelup = chainlib.deps.levelup;
var bitcoindjs = require('../../');
var AddressModule = bitcoindjs.modules.AddressModule;
var blockData = require('../data/livenet-345003.json');
@ -21,6 +19,40 @@ describe('AddressModule', function() {
});
});
describe('#getPublishEvents', function() {
it('will return an array of publish event objects', function() {
var am = new AddressModule({});
am.subscribe = sinon.spy();
am.unsubscribe = sinon.spy();
var events = am.getPublishEvents();
var callCount = 0;
function testName(event, name) {
event.name.should.equal(name);
event.scope.should.equal(am);
var emitter = new EventEmitter();
var addresses = [];
event.subscribe(emitter, addresses);
am.subscribe.callCount.should.equal(callCount + 1);
am.subscribe.args[callCount][0].should.equal(name);
am.subscribe.args[callCount][1].should.equal(emitter);
am.subscribe.args[callCount][2].should.equal(addresses);
am.subscribe.thisValues[callCount].should.equal(am);
event.unsubscribe(emitter, addresses);
am.unsubscribe.callCount.should.equal(callCount + 1);
am.unsubscribe.args[callCount][0].should.equal(name);
am.unsubscribe.args[callCount][1].should.equal(emitter);
am.unsubscribe.args[callCount][2].should.equal(addresses);
am.unsubscribe.thisValues[callCount].should.equal(am);
callCount++;
}
events.forEach(function(event) {
testName(event, event.name);
});
});
});
describe('#blockHandler', function() {
var block = bitcore.Block.fromString(blockData);
var db = {
@ -124,6 +156,129 @@ describe('AddressModule', function() {
done();
});
});
it('will call event handlers', function() {
var block = bitcore.Block.fromString(blockData);
var db = {
getTransactionsFromBlock: function() {
return block.transactions.slice(0, 8);
}
};
var am = new AddressModule({db: db, network: 'livenet'});
am.transactionEventHandler = sinon.spy();
am.balanceEventHandler = sinon.spy();
am.blockHandler(
{
__height: 345003,
timestamp: new Date(1424836934000)
},
true,
function(err) {
if (err) {
throw err;
}
am.transactionEventHandler.callCount.should.equal(11);
am.balanceEventHandler.callCount.should.equal(11);
}
);
});
});
describe('#transactionEventHandler', function() {
it('will emit a transaction if there is a subscriber', function(done) {
var am = new AddressModule({});
var emitter = new EventEmitter();
am.subscriptions.transaction = {
'1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N': [emitter]
};
var block = {};
var tx = {};
emitter.on('transaction', function(address, t, b) {
address.should.equal('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N');
t.should.equal(tx);
b.should.equal(block);
done();
});
am.transactionEventHandler(block, '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N', tx);
});
});
describe('#balanceEventHandler', function() {
it('will emit a balance if there is a subscriber', function(done) {
var am = new AddressModule({});
var emitter = new EventEmitter();
am.subscriptions.balance = {
'1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N': [emitter]
};
var block = {};
var balance = 1000;
am.getBalance = sinon.stub().callsArgWith(2, null, balance);
emitter.on('balance', function(address, bal, b) {
address.should.equal('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N');
bal.should.equal(balance);
b.should.equal(block);
done();
});
am.balanceEventHandler(block, '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N');
});
});
describe('#subscribe', function() {
it('will add emitters to the subscribers array (transaction)', function() {
var am = new AddressModule({});
var emitter = new EventEmitter();
var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N';
var name = 'transaction';
am.subscribe(name, emitter, [address]);
am.subscriptions.transaction[address].should.deep.equal([emitter]);
var address2 = '1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W';
am.subscribe(name, emitter, [address2]);
am.subscriptions.transaction[address2].should.deep.equal([emitter]);
var emitter2 = new EventEmitter();
am.subscribe(name, emitter2, [address]);
am.subscriptions.transaction[address].should.deep.equal([emitter, emitter2]);
});
it('will add an emitter to the subscribers array (balance)', function() {
var am = new AddressModule({});
var emitter = new EventEmitter();
var name = 'balance';
var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N';
am.subscribe(name, emitter, [address]);
am.subscriptions.balance[address].should.deep.equal([emitter]);
var address2 = '1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W';
am.subscribe(name, emitter, [address2]);
am.subscriptions.balance[address2].should.deep.equal([emitter]);
var emitter2 = new EventEmitter();
am.subscribe(name, emitter2, [address]);
am.subscriptions.balance[address].should.deep.equal([emitter, emitter2]);
});
});
describe('#unsubscribe', function() {
it('will remove emitter from subscribers array (transaction)', function() {
var am = new AddressModule({});
var emitter = new EventEmitter();
var emitter2 = new EventEmitter();
var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N';
am.subscriptions.transaction[address] = [emitter, emitter2];
var name = 'transaction';
am.unsubscribe(name, emitter, [address]);
am.subscriptions.transaction[address].should.deep.equal([emitter2]);
});
it('will remove emitter from subscribers array (balance)', function() {
var am = new AddressModule({});
var emitter = new EventEmitter();
var emitter2 = new EventEmitter();
var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N';
var name = 'balance';
am.subscriptions.balance[address] = [emitter, emitter2];
am.unsubscribe(name, emitter, [address]);
am.subscriptions.balance[address].should.deep.equal([emitter2]);
});
});
describe('#getBalance', function() {

View File

@ -31,6 +31,15 @@ var Node = proxyquire('../lib/node', {
chainlib.Node = OriginalNode;
describe('Bitcoind Node', function() {
describe('#openBus', function() {
it('will create a new bus', function() {
var node = new Node({});
var db = {};
node.db = db;
var bus = node.openBus();
bus.db.should.equal(db);
});
});
describe('#_loadConfiguration', function() {
it('should call the necessary methods', function() {
var node = new Node({});