diff --git a/README.md b/README.md index 219048b..8dfb666 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ Optional Arguments: * limit: Total number of records to return (return all available records if not specified). Returns: - * History of incomming and outgoing transactions of the wallet. The list is paginated using the `skip` & `limit` params. Each item has the following fields: + * History of incoming and outgoing transactions of the wallet. The list is paginated using the `skip` & `limit` params. Each item has the following fields: * action ('sent', 'received', 'moved') * amount * fees @@ -72,7 +72,7 @@ Returns: `/v1/addresses/`: Get Wallet's main addresses (does not include change addresses) Returns: - * List of Addresses object: (https://github.com/bitpay/bitcore-wallet-service/blob/master/lib/model/adddress.js)). This call is mainly provided so the client check this addresses for incomming transactions (using a service like [Insight](https://insight.is) + * List of Addresses object: (https://github.com/bitpay/bitcore-wallet-service/blob/master/lib/model/adddress.js)). This call is mainly provided so the client check this addresses for incoming transactions (using a service like [Insight](https://insight.is) `/v1/balance/`: Get Wallet's balance diff --git a/lib/blockchainexplorer.js b/lib/blockchainexplorer.js new file mode 100644 index 0000000..c11a82c --- /dev/null +++ b/lib/blockchainexplorer.js @@ -0,0 +1,57 @@ +'use strict'; + +var _ = require('lodash'); +var $ = require('preconditions').singleton(); +var log = require('npmlog'); +log.debug = log.verbose; + +var Explorers = require('bitcore-explorers'); +var request = require('request'); +var io = require('socket.io-client'); + + +function BlockChainExplorer(opts) { + $.checkArgument(opts); + var provider = opts.provider || 'insight'; + var network = opts.network || 'livenet'; + + var url; + switch (provider) { + case 'insight': + switch (network) { + default: + case 'livenet': + url = 'https://insight.bitpay.com:443'; + break; + case 'testnet': + url = 'https://test-insight.bitpay.com:443' + break; + } + var explorer = new Explorers.Insight(url, network); + explorer.getTransactions = _.bind(getTransactionsInsight, explorer, url); + explorer.initSocket = _.bind(initSocketInsight, explorer, url); + return explorer; + default: + throw new Error('Provider ' + provider + ' not supported'); + }; +}; + +function getTransactionsInsight(url, addresses, cb) { + request({ + method: "POST", + url: url + '/api/addrs/txs', + json: { + addrs: [].concat(addresses).join(',') + } + }, function(err, res, body) { + if (err || res.statusCode != 200) return cb(err || res); + return cb(null, body); + }); +}; + +function initSocketInsight(url) { + var socket = io.connect(url, {}); + return socket; +}; + +module.exports = BlockChainExplorer; diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js new file mode 100644 index 0000000..2947e72 --- /dev/null +++ b/lib/blockchainmonitor.js @@ -0,0 +1,100 @@ +'use strict'; + +var $ = require('preconditions').singleton(); +var _ = require('lodash'); +var async = require('async'); +var log = require('npmlog'); +log.debug = log.verbose; +var Uuid = require('uuid'); +var inherits = require('inherits'); +var events = require('events'); +var nodeutil = require('util'); + +var WalletUtils = require('bitcore-wallet-utils'); +var Bitcore = WalletUtils.Bitcore; +var WalletService = require('./server'); +var BlockchainExplorer = require('./blockchainexplorer'); + +var Notification = require('./model/notification'); + +function BlockchainMonitor() { + var self = this; + this.subscriptions = {}; + this.subscriber = {}; + this.subscriber['livenet'] = self._getAddressSubscriber('insight', 'livenet'); + this.subscriber['testnet'] = self._getAddressSubscriber('insight', 'testnet'); +}; + +nodeutil.inherits(BlockchainMonitor, events.EventEmitter); + +BlockchainMonitor.prototype._getAddressSubscriber = function(provider, network) { + $.checkArgument(provider == 'insight', 'Blockchain monitor ' + provider + ' not supported'); + + var explorer = new BlockchainExplorer({ + provider: provider, + network: network, + }); + + var socket = explorer.initSocket(); + + // TODO: Extract on its own class once more providers are implemented + return { + subscribe: function(address, handler) { + socket.emit('subscribe', address); + socket.on(address, handler); + }, + }; +}; + +BlockchainMonitor.prototype.subscribeAddresses = function(walletId, addresses) { + $.checkArgument(walletId); + + var self = this; + + if (!addresses || addresses.length == 0) return; + + function handlerFor(address, txid) { + var notification = Notification.create({ + walletId: this, + type: 'NewIncomingTx', + data: { + address: address, + txid: txid, + }, + }); + self.emit('notification', notification); + }; + + if (!self.subscriptions[walletId]) { + self.subscriptions[walletId] = { + addresses: [], + }; + }; + + var addresses = [].concat(addresses); + var network = Bitcore.Address.fromString(addresses[0]).network.name; + var subscriber = self.subscriber[network]; + _.each(addresses, function(address) { + self.subscriptions[walletId].addresses.push(address); + subscriber.subscribe(address, _.bind(handlerFor, walletId, address)); + }); +}; + +BlockchainMonitor.prototype.subscribeWallet = function(walletService, cb) { + var self = this; + + var walletId = walletService.walletId; + if (self.subscriptions[walletId]) return; + + walletService.getMainAddresses({}, function(err, addresses) { + if (err) { + delete self.subscriptions[walletId]; + return cb(new Error('Could not subscribe to addresses for wallet ' + walletId)); + } + self.subscribeAddresses(walletService.walletId, _.pluck(addresses, 'address')); + return cb(); + }); +}; + + +module.exports = BlockchainMonitor; diff --git a/lib/eventbroadcaster.js b/lib/eventbroadcaster.js deleted file mode 100644 index e8ab506..0000000 --- a/lib/eventbroadcaster.js +++ /dev/null @@ -1,25 +0,0 @@ -'use strict'; - -var log = require('npmlog'); -log.debug = log.verbose; -var inherits = require('inherits'); -var events = require('events'); -var nodeutil = require('util'); - -function EventBroadcaster() {}; - -nodeutil.inherits(EventBroadcaster, events.EventEmitter); - -EventBroadcaster.prototype.broadcast = function(eventName, serviceInstance, args) { - this.emit(eventName, serviceInstance, args); -}; - -var _eventBroadcasterInstance; -EventBroadcaster.singleton = function() { - if (!_eventBroadcasterInstance) { - _eventBroadcasterInstance = new EventBroadcaster(); - } - return _eventBroadcasterInstance; -}; - -module.exports = EventBroadcaster.singleton(); diff --git a/lib/model/notification.js b/lib/model/notification.js index 4e45c0f..143aced 100644 --- a/lib/model/notification.js +++ b/lib/model/notification.js @@ -12,8 +12,8 @@ var Uuid = require('uuid'); * txProposalFinallyRejected - txProposalId * txProposalFinallyAccepted - txProposalId * - * newIncommingTx (amount) - * newOutgoingTx - (txProposalId, txid) + * NewIncomingTx (address, txid) + * NewOutgoingTx - (txProposalId, txid) * * data Examples: * { amount: 'xxx', address: 'xxx'} diff --git a/lib/notificationbroadcaster.js b/lib/notificationbroadcaster.js new file mode 100644 index 0000000..692c43e --- /dev/null +++ b/lib/notificationbroadcaster.js @@ -0,0 +1,25 @@ +'use strict'; + +var log = require('npmlog'); +log.debug = log.verbose; +var inherits = require('inherits'); +var events = require('events'); +var nodeutil = require('util'); + +function NotificationBroadcaster() {}; + +nodeutil.inherits(NotificationBroadcaster, events.EventEmitter); + +NotificationBroadcaster.prototype.broadcast = function(eventName, notification) { + this.emit(eventName, notification); +}; + +var _instance; +NotificationBroadcaster.singleton = function() { + if (!_instance) { + _instance = new NotificationBroadcaster(); + } + return _instance; +}; + +module.exports = NotificationBroadcaster.singleton(); diff --git a/lib/server.js b/lib/server.js index 4fcc19a..33ec1e3 100644 --- a/lib/server.js +++ b/lib/server.js @@ -10,12 +10,12 @@ var Bitcore = WalletUtils.Bitcore; var PublicKey = Bitcore.PublicKey; var HDPublicKey = Bitcore.HDPublicKey; var Address = Bitcore.Address; -var Explorers = require('bitcore-explorers'); var ClientError = require('./clienterror'); var Utils = require('./utils'); var Storage = require('./storage'); -var EventBroadcaster = require('./eventbroadcaster'); +var NotificationBroadcaster = require('./notificationbroadcaster'); +var BlockchainExplorer = require('./blockchainexplorer'); var Wallet = require('./model/wallet'); var Copayer = require('./model/copayer'); @@ -24,7 +24,7 @@ var TxProposal = require('./model/txproposal'); var Notification = require('./model/notification'); var initialized = false; -var storage, blockExplorer; +var storage, blockchainExplorer; /** @@ -36,24 +36,24 @@ function WalletService() { throw new Error('Server not initialized'); this.storage = storage; - this.blockExplorer = blockExplorer; + this.blockchainExplorer = blockchainExplorer; this.notifyTicker = 0; }; WalletService.onNotification = function(func) { - EventBroadcaster.on('notification', func); + NotificationBroadcaster.on('notification', func); }; /** * Initializes global settings for all instances. * @param {Object} opts * @param {Storage} [opts.storage] - The storage provider. - * @param {Storage} [opts.blockExplorer] - The blockExporer provider. + * @param {Storage} [opts.blockchainExplorer] - The blockchainExporer provider. */ WalletService.initialize = function(opts) { opts = opts || {}; storage = opts.storage ||  new Storage(); - blockExplorer = opts.blockExplorer; + blockchainExplorer = opts.blockchainExplorer; initialized = true; }; @@ -167,7 +167,7 @@ WalletService.prototype._verifySignature = function(text, signature, pubKey) { * @param {Object} args */ WalletService.prototype._emit = function(eventName, args) { - EventBroadcaster.broadcast(eventName, this, args); + NotificationBroadcaster.broadcast(eventName, args); }; /** @@ -284,7 +284,9 @@ WalletService.prototype.createAddress = function(opts, cb) { self.storage.storeAddressAndWallet(wallet, address, function(err) { if (err) return cb(err); - self._notify('NewAddress'); + self._notify('NewAddress', { + address: address.address, + }); return cb(null, address); }); }); @@ -333,43 +335,15 @@ WalletService.prototype.verifyMessageSignature = function(opts, cb) { }; -WalletService.prototype._getBlockExplorer = function(provider, network) { - var url; - - function getTransactionsInsight(url, addresses, cb) { - var request = require('request'); - request({ - method: "POST", - url: url + '/api/addrs/txs', - json: { - addrs: [].concat(addresses).join(',') - } - }, function(err, res, body) { - if (err || res.statusCode != 200) return cb(err || res); - return cb(null, body); +WalletService.prototype._getBlockchainExplorer = function(provider, network) { + if (!this.blockchainExplorer) { + this.blockchainExplorer = new BlockchainExplorer({ + provider: provider, + network: network, }); - }; - - if (this.blockExplorer) - return this.blockExplorer; - - switch (provider) { - default: ; - case 'insight': - switch (network) { - default: - case 'livenet': - url = 'https://insight.bitpay.com:443'; - break; - case 'testnet': - url = 'https://test-insight.bitpay.com:443' - break; - } - var bc = new Explorers.Insight(url, network); - bc.getTransactions = _.bind(getTransactionsInsight, bc, url); - return bc; - break; } + + return this.blockchainExplorer; }; /** @@ -389,7 +363,7 @@ WalletService.prototype._getUtxos = function(cb) { var addressToPath = _.indexBy(addresses, 'address'); // TODO : check performance var networkName = Bitcore.Address(addressStrs[0]).toObject().network; - var bc = self._getBlockExplorer('insight', networkName); + var bc = self._getBlockchainExplorer('insight', networkName); bc.getUnspentUtxos(addressStrs, function(err, inutxos) { if (err) return cb(err); var utxos = _.map(inutxos, function(i) { @@ -695,7 +669,7 @@ WalletService.prototype._broadcastTx = function(txp, cb) { } catch (ex) { return cb(ex); } - var bc = this._getBlockExplorer('insight', txp.getNetworkName()); + var bc = this._getBlockchainExplorer('insight', txp.getNetworkName()); bc.broadcast(raw, function(err, txid) { return cb(err, txid); }) @@ -1040,7 +1014,7 @@ WalletService.prototype.getTxHistory = function(opts, cb) { var addressStrs = _.pluck(addresses, 'address'); var networkName = Bitcore.Address(addressStrs[0]).toObject().network; - var bc = self._getBlockExplorer('insight', networkName); + var bc = self._getBlockchainExplorer('insight', networkName); async.parallel([ function(next) { diff --git a/lib/wsapp.js b/lib/wsapp.js index 8196e03..d307be2 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -4,17 +4,19 @@ var $ = require('preconditions').singleton(); var _ = require('lodash'); var async = require('async'); var log = require('npmlog'); -var express = require('express'); -var querystring = require('querystring'); -var bodyParser = require('body-parser') +log.debug = log.verbose; var Uuid = require('uuid'); +var WalletUtils = require('bitcore-wallet-utils'); +var Bitcore = WalletUtils.Bitcore; var WalletService = require('./server'); +var BlockchainMonitor = require('./blockchainmonitor') + +var Notification = require('./model/notification'); -log.debug = log.verbose; log.level = 'debug'; -var subscriptions = {}; +var io, bcMonitor; var WsApp = function() {}; @@ -23,17 +25,27 @@ WsApp._unauthorized = function() { socket.disconnect(); }; +WsApp.handleNotification = function(service, notification) { + if (notification.type == 'NewAddress') { + self.subscribeAddresses(notification.walletId, notification.data.address); + } + io.to(notification.walletId).emit('notification', notification); +}; + WsApp.start = function(server) { - var self = this; + io = require('socket.io')(server); - var io = require('socket.io')(server); + bcMonitor = new BlockchainMonitor(); - WalletService.onNotification(function(serviceInstance, args) { - var room = serviceInstance.walletId || args.walletId; - if (room) { - io.to(room).emit('notification', args); + function handleNotification(notification) { + if (notification.type == 'NewAddress') { + bcMonitor.subscribeAddresses(notification.walletId, notification.data.address); } - }); + io.to(notification.walletId).emit('notification', notification); + }; + + bcMonitor.on('notification', handleNotification); + WalletService.onNotification(handleNotification); io.on('connection', function(socket) { socket.nonce = Uuid.v4(); @@ -42,11 +54,15 @@ WsApp.start = function(server) { socket.on('authorize', function(data) { if (data.message != socket.nonce) return WsApp.unauthorized(); - WalletService.getInstanceWithAuth(data, function(err, res) { + WalletService.getInstanceWithAuth(data, function(err, service) { if (err) return WsApp.unauthorized(); - socket.join(res.walletId); + socket.join(service.walletId); socket.emit('authorized'); + + bcMonitor.subscribeWallet(service, function(err) { + if (err) log.warn(err.message); + }); }); }); }); diff --git a/package.json b/package.json index 6313f7d..93d5b1d 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "bitcore-wallet-service", "description": "A service for Mutisig HD Bitcoin Wallets", "author": "BitPay Inc", - "version": "0.0.16", + "version": "0.0.17", "keywords": [ "bitcoin", "copay", @@ -19,8 +19,8 @@ }, "dependencies": { "async": "^0.9.0", - "bitcore-wallet-utils": "^0.0.7", "bitcore-explorers": "^0.9.1", + "bitcore-wallet-utils": "^0.0.7", "body-parser": "^1.11.0", "coveralls": "^2.11.2", "express": "^4.10.0", @@ -36,6 +36,7 @@ "request": "^2.53.0", "sjcl": "^1.0.2", "socket.io": "^1.3.5", + "socket.io-client": "^1.3.5", "uuid": "*" }, "devDependencies": { diff --git a/test/blockchainexplorer.js b/test/blockchainexplorer.js new file mode 100644 index 0000000..b1fbd3d --- /dev/null +++ b/test/blockchainexplorer.js @@ -0,0 +1,35 @@ +'use strict'; + +var _ = require('lodash'); +var chai = require('chai'); +var sinon = require('sinon'); +var should = chai.should(); +var BlockchainExplorer = require('../lib/blockchainexplorer'); + +describe('Blockchain explorer', function() { + describe('#constructor', function() { + it('should return a blockchain explorer with basic methods', function() { + var exp = BlockchainExplorer({ + provider: 'insight', + network: 'testnet', + }); + should.exist(exp); + exp.should.respondTo('broadcast'); + exp.should.respondTo('getTransactions'); + exp.should.respondTo('getUnspentUtxos'); + exp.should.respondTo('initSocket'); + var exp = BlockchainExplorer({ + provider: 'insight', + network: 'livenet', + }); + should.exist(exp); + }); + it('should fail on unsupported provider', function() { + (function() { + var exp = BlockchainExplorer({ + provider: 'dummy', + }); + }).should.throw('not supported'); + }); + }); +}); diff --git a/test/integration/server.js b/test/integration/server.js index b3d1c30..bd700e2 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -16,6 +16,7 @@ var Utils = require('../../lib/utils'); var WalletUtils = require('bitcore-wallet-utils'); var Bitcore = WalletUtils.Bitcore; var Storage = require('../../lib/storage'); +var BlockchainMonitor = require('../../lib/blockchainmonitor'); var Wallet = require('../../lib/model/wallet'); var TxProposal = require('../../lib/model/txproposal'); @@ -150,22 +151,22 @@ helpers.stubUtxos = function(server, wallet, amounts, cb) { }; return obj; }); - blockExplorer.getUnspentUtxos = sinon.stub().callsArgWith(1, null, utxos); + blockchainExplorer.getUnspentUtxos = sinon.stub().callsArgWith(1, null, utxos); return cb(utxos); }); }; helpers.stubBroadcast = function(txid) { - blockExplorer.broadcast = sinon.stub().callsArgWith(1, null, txid); + blockchainExplorer.broadcast = sinon.stub().callsArgWith(1, null, txid); }; helpers.stubBroadcastFail = function() { - blockExplorer.broadcast = sinon.stub().callsArgWith(1, 'broadcast error'); + blockchainExplorer.broadcast = sinon.stub().callsArgWith(1, 'broadcast error'); }; helpers.stubHistory = function(txs) { - blockExplorer.getTransactions = sinon.stub().callsArgWith(1, null, txs); + blockchainExplorer.getTransactions = sinon.stub().callsArgWith(1, null, txs); }; helpers.clientSign = WalletUtils.signTxp; @@ -198,10 +199,10 @@ helpers.createAddresses = function(server, wallet, main, change, cb) { }); }; -var db, storage, blockExplorer; +var db, storage, blockchainExplorer; -describe('Copay server', function() { +describe('Wallet service', function() { beforeEach(function() { db = levelup(memdown, { valueEncoding: 'json' @@ -209,11 +210,11 @@ describe('Copay server', function() { storage = new Storage({ db: db }); - blockExplorer = sinon.stub(); + blockchainExplorer = sinon.stub(); WalletService.initialize({ storage: storage, - blockExplorer: blockExplorer, + blockchainExplorer: blockchainExplorer, }); helpers.offset = 0; }); @@ -608,7 +609,15 @@ describe('Copay server', function() { address.address.should.equal('3KxttbKQQPWmpsnXZ3rB4mgJTuLnVR7frg'); address.isChange.should.be.false; address.path.should.equal('m/2147483647/0/0'); - done(); + server.getNotifications({}, function(err, notifications) { + should.not.exist(err); + var notif = _.find(notifications, { + type: 'NewAddress' + }); + should.exist(notif); + notif.data.address.should.equal('3KxttbKQQPWmpsnXZ3rB4mgJTuLnVR7frg'); + done(); + }); }); }); @@ -691,7 +700,7 @@ describe('Copay server', function() { }); }); it('should get balance when there are no funds', function(done) { - blockExplorer.getUnspentUtxos = sinon.stub().callsArgWith(1, null, []); + blockchainExplorer.getUnspentUtxos = sinon.stub().callsArgWith(1, null, []); server.createAddress({}, function(err, address) { should.not.exist(err); server.getBalance({}, function(err, balance) { @@ -2465,3 +2474,97 @@ describe('Copay server', function() { }); }); }); + + +describe('Blockchain monitor', function() { + var addressSubscriber; + + beforeEach(function() { + db = levelup(memdown, { + valueEncoding: 'json' + }); + storage = new Storage({ + db: db + }); + blockchainExplorer = sinon.stub(); + + WalletService.initialize({ + storage: storage, + blockchainExplorer: blockchainExplorer, + }); + helpers.offset = 0; + + addressSubscriber = sinon.stub(); + addressSubscriber.subscribe = sinon.stub(); + sinon.stub(BlockchainMonitor.prototype, '_getAddressSubscriber').onFirstCall().returns(addressSubscriber); + }); + + afterEach(function() { + BlockchainMonitor.prototype._getAddressSubscriber.restore(); + }); + + it('should subscribe wallet', function(done) { + var monitor = new BlockchainMonitor(); + helpers.createAndJoinWallet(2, 2, function(server, wallet) { + server.createAddress({}, function(err, address1) { + should.not.exist(err); + server.createAddress({}, function(err, address2) { + should.not.exist(err); + monitor.subscribeWallet(server, function(err) { + should.not.exist(err); + addressSubscriber.subscribe.calledTwice.should.be.true; + addressSubscriber.subscribe.calledWith(address1.address).should.be.true; + addressSubscriber.subscribe.calledWith(address2.address).should.be.true; + done(); + }); + }); + }); + }); + }); + + it('should be able to subscribe new address', function(done) { + var monitor = new BlockchainMonitor(); + helpers.createAndJoinWallet(2, 2, function(server, wallet) { + server.createAddress({}, function(err, address1) { + should.not.exist(err); + monitor.subscribeWallet(server, function(err) { + should.not.exist(err); + addressSubscriber.subscribe.calledOnce.should.be.true; + addressSubscriber.subscribe.calledWith(address1.address).should.be.true; + server.createAddress({}, function(err, address2) { + should.not.exist(err); + monitor.subscribeAddresses(wallet.id, address2.address); + addressSubscriber.subscribe.calledTwice.should.be.true; + addressSubscriber.subscribe.calledWith(address2.address).should.be.true; + done(); + }); + }); + }); + }); + }); + + it('should create NewIncomingTx notification when a new tx arrives on registered address', function(done) { + var monitor = new BlockchainMonitor(); + helpers.createAndJoinWallet(2, 2, function(server, wallet) { + server.createAddress({}, function(err, address1) { + should.not.exist(err); + monitor.subscribeWallet(server, function(err) { + should.not.exist(err); + addressSubscriber.subscribe.calledOnce.should.be.true; + addressSubscriber.subscribe.getCall(0).args[0].should.equal(address1.address); + var handler = addressSubscriber.subscribe.getCall(0).args[1]; + _.isFunction(handler).should.be.true; + + monitor.on('notification', function(notification) { + notification.type.should.equal('NewIncomingTx'); + notification.data.address.should.equal(address1.address); + notification.data.txid.should.equal('txid'); + done(); + }); + + handler('txid'); + }); + }); + }); + }); +});