diff --git a/bws-mq.js b/bws-mq.js deleted file mode 100644 index 033a6e8..0000000 --- a/bws-mq.js +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env node - -'use strict'; - -var _ = require('lodash'); -var io = require('socket.io'); - -var DEFAULT_PORT = 3380; - -var port = parseInt(process.argv[2]) || DEFAULT_PORT; - -var server = io(port); -server.on('connection', function(socket) { - socket.on('notification', function(data) { - server.emit('notification', data); - }); -}); - -console.log('Message queue server listening on port ' + port) diff --git a/config.js b/config.js index b6d9300..8b36020 100644 --- a/config.js +++ b/config.js @@ -23,6 +23,9 @@ var config = { // port: 3231, // }, }, + messageQueueOpts: { + url: 'http://localhost:3380', + }, blockchainExplorerOpts: { livenet: { provider: 'insight', diff --git a/lib/messagequeue.js b/lib/messagequeue.js new file mode 100644 index 0000000..fa31297 --- /dev/null +++ b/lib/messagequeue.js @@ -0,0 +1,23 @@ +'use strict'; + +var $ = require('preconditions').singleton(); +var io = require('socket.io'); +var log = require('npmlog'); +log.debug = log.verbose; + +var MessageQueue = function() {}; + +MessageQueue.start = function(opts, cb) { + opts = opts || {}; + $.checkIsNumber(opts.port, 'Invalid port number'); + + var server = io(opts.port); + server.on('connection', function(socket) { + socket.on('notification', function(data) { + server.emit('notification', data); + }); + }); + return cb(); +}; + +module.exports = MessageQueue; diff --git a/lib/server.js b/lib/server.js index 3b30cdf..1e79ae5 100644 --- a/lib/server.js +++ b/lib/server.js @@ -87,15 +87,7 @@ WalletService.initialize = function(opts, cb) { messageQueue = opts.messageQueue; return cb(); } - messageQueue = io.connect('http://localhost:3380', { - 'force new connection': true, - }); - messageQueue.on('connect', function() { - return cb(); - }); - messageQueue.on('connect_error', function(err) { - log.warn('Could not connect to message queue server'); - }); + return cb(); }; async.series([ @@ -350,7 +342,9 @@ WalletService.prototype._notify = function(type, data, isGlobal, cb) { }); this.storage.storeNotification(walletId, n, function() { self._emit('notification', n); - self.messageQueue.emit('notification', n); + if (self.messageQueue) { + self.messageQueue.emit('notification', n); + } if (cb) return cb(); }); }; diff --git a/lib/wsapp.js b/lib/wsapp.js index 8d9597c..d424932 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -22,37 +22,30 @@ WsApp._unauthorized = function(socket) { socket.disconnect(); }; -// WsApp._handleNotification = function(notification) { -// console.log('*** [wsapp.js ln26] notification:', notification); // TODO - -// io.to(notification.walletId).emit('notification', notification); -// }; - -WsApp._initMessageQueue = function(cb) { - function handleNotification(notification) { - io.to(notification.walletId).emit('notification', notification); - }; - - messageQueue = require('socket.io-client').connect('http://localhost:3380', { - 'force new connection': true, - }); - messageQueue.on('connect_error', function(err) { - log.warn('Could not connect to message queue server'); - }); - messageQueue.on('notification', handleNotification); - - messageQueue.on('connect', function() { - return cb(); - }); +WsApp._handleNotification = function(notification) { + io.to(notification.walletId).emit('notification', notification); }; -WsApp.start = function(server, config, cb) { +WsApp.start = function(server, opts, cb) { + opts = opts || {}; + $.checkState(opts.messageQueueOpts); + io = require('socket.io')(server); async.series([ function(done) { - WsApp._initMessageQueue(done); + messageQueue = require('socket.io-client').connect(opts.messageQueueOpts.url, { + 'force new connection': true, + }); + messageQueue.on('connect_error', function(err) { + log.warn('Could not connect to message queue server'); + }); + messageQueue.on('notification', WsApp._handleNotification); + + messageQueue.on('connect', function() { + done(); + }); }, function(done) { io.on('connection', function(socket) { diff --git a/messagequeue/bws-mq.js b/messagequeue/bws-mq.js new file mode 100644 index 0000000..c466902 --- /dev/null +++ b/messagequeue/bws-mq.js @@ -0,0 +1,16 @@ +#!/usr/bin/env node + +'use strict'; + +var MQ = require('../lib/messagequeue'); + +var DEFAULT_PORT = 3380; + +var opts = { + port: parseInt(process.argv[2]) || DEFAULT_PORT, +}; + +MQ.start(opts, function(err) { + if (err) throw err; + console.log('Message queue server listening on port ' + port) +}); diff --git a/test/integration/server.js b/test/integration/server.js index 3f9a12d..ea2f16d 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -865,7 +865,7 @@ describe('Wallet service', function() { }); }); - describe.only('#createTx', function() { + describe('#createTx', function() { var server, wallet; beforeEach(function(done) { helpers.createAndJoinWallet(2, 3, function(s, w) {