From f66e4c314ce3dc7fadbb85c1a0726b5ade3635a0 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Mon, 20 Apr 2015 12:03:27 -0300 Subject: [PATCH 01/13] add mongodb dep --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index 979d51b..d670edf 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "locker-server": "^0.1.3", "lodash": "^3.3.1", "mocha-lcov-reporter": "0.0.1", + "mongodb": "^2.0.27", "morgan": "*", "multilevel": "^6.1.0", "npmlog": "^0.1.1", From b8501ddb1fa4683890cdd5c93df7b617815cffc8 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Mon, 20 Apr 2015 12:03:50 -0300 Subject: [PATCH 02/13] unit tests for storage --- lib/model/index.js | 5 + lib/storage_mongo.js | 390 +++++++++++++++++++++++++++++++++++++++++++ test/storage.js | 54 ++++++ 3 files changed, 449 insertions(+) create mode 100644 lib/model/index.js create mode 100644 lib/storage_mongo.js create mode 100644 test/storage.js diff --git a/lib/model/index.js b/lib/model/index.js new file mode 100644 index 0000000..7d1c5d8 --- /dev/null +++ b/lib/model/index.js @@ -0,0 +1,5 @@ +var Model = {}; + +Model.Wallet = require('./wallet'); + +module.exports = Model; diff --git a/lib/storage_mongo.js b/lib/storage_mongo.js new file mode 100644 index 0000000..1e3ec37 --- /dev/null +++ b/lib/storage_mongo.js @@ -0,0 +1,390 @@ +'use strict'; + +var _ = require('lodash'); +var levelup = require('mongodb'); +var async = require('async'); +var $ = require('preconditions').singleton(); +var log = require('npmlog'); +log.debug = log.verbose; +log.disableColor(); +var util = require('util'); + +var mongodb = require('mongodb'); + +var Wallet = require('./model/wallet'); +var Copayer = require('./model/copayer'); +var Address = require('./model/address'); +var TxProposal = require('./model/txproposal'); +var Notification = require('./model/notification'); + +var Storage = function(opts) { + opts = opts || {}; + this.db = opts.db; + + if (!this.db) { + var url = 'mongodb://localhost:27017/bws'; + mongodb.MongoClient.connect(url, function(err, db) { + if (err) { + log.error('Unable to connect to the mongoDB server. Error:', err); + return; + } + this.db = db; + console.log('Connection established to ', url); + }); + } +}; + +var zeroPad = function(x, length) { + return _.padLeft(parseInt(x), length, '0'); +}; + +var walletPrefix = function(id) { + return 'w!' + id; +}; + +var opKey = function(key) { + return key ? '!' + key : ''; +}; + +var MAX_TS = _.repeat('9', 14); + + +var KEY = { + WALLET: function(walletId) { + return walletPrefix(walletId) + '!main'; + }, + COPAYER: function(id) { + return 'copayer!' + id; + }, + TXP: function(walletId, txProposalId) { + return walletPrefix(walletId) + '!txp' + opKey(txProposalId); + }, + NOTIFICATION: function(walletId, notificationId) { + return walletPrefix(walletId) + '!not' + opKey(notificationId); + }, + PENDING_TXP: function(walletId, txProposalId) { + return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId); + }, + ADDRESS: function(walletId, address) { + return walletPrefix(walletId) + '!addr' + opKey(address); + }, +}; + +Storage.prototype.fetchWallet = function(id, cb) { + this.db.get(KEY.WALLET(id), function(err, data) { + if (err) { + if (err.notFound) return cb(); + return cb(err); + } + return cb(null, Wallet.fromObj(data)); + }); +}; + +Storage.prototype.storeWallet = function(wallet, cb) { + this.db.put(KEY.WALLET(wallet.id), wallet, cb); +}; + +Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { + var ops = []; + ops.push({ + type: 'put', + key: KEY.WALLET(wallet.id), + value: wallet + }); + _.each(wallet.copayers, function(copayer) { + var value = { + walletId: wallet.id, + requestPubKey: copayer.requestPubKey, + }; + ops.push({ + type: 'put', + key: KEY.COPAYER(copayer.id), + value: value + }); + }); + this.db.batch(ops, cb); +}; + +Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { + this.db.get(KEY.COPAYER(copayerId), function(err, data) { + if (err) { + if (err.notFound) return cb(); + return cb(err); + } + return cb(null, data); + }); +}; + +Storage.prototype._completeTxData = function(walletId, txs, cb) { + var txList = [].concat(txs); + this.fetchWallet(walletId, function(err, wallet) { + if (err) return cb(err); + _.each(txList, function(tx) { + tx.creatorName = wallet.getCopayer(tx.creatorId).name; + _.each(tx.actions, function(action) { + action.copayerName = wallet.getCopayer(action.copayerId).name; + }); + }); + return cb(null, txs); + }); +}; + +Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { + var self = this; + this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) { + if (err) { + if (err.notFound) return cb(); + return cb(err); + } + return self._completeTxData(walletId, TxProposal.fromObj(data), cb); + }); +}; + + +Storage.prototype.fetchPendingTxs = function(walletId, cb) { + var self = this; + + var txs = []; + var key = KEY.PENDING_TXP(walletId); + this.db.createReadStream({ + gte: key, + lt: key + '~' + }) + .on('data', function(data) { + txs.push(TxProposal.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return self._completeTxData(walletId, txs, cb); + }); +}; + +/** + * fetchTxs. Times are in UNIX EPOCH (seconds) + * + * @param walletId + * @param opts.minTs + * @param opts.maxTs + * @param opts.limit + */ +Storage.prototype.fetchTxs = function(walletId, opts, cb) { + var self = this; + + var txs = []; + opts = opts || {}; + opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; + opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; + opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; + + var key = KEY.TXP(walletId, opts.minTs); + var endkey = KEY.TXP(walletId, opts.maxTs); + + this.db.createReadStream({ + gt: key, + lt: endkey + '~', + reverse: true, + limit: opts.limit, + }) + .on('data', function(data) { + txs.push(TxProposal.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return self._completeTxData(walletId, txs, cb); + }); +}; + + +/** + * fetchNotifications + * + * @param walletId + * @param opts.minTs + * @param opts.maxTs + * @param opts.limit + */ +Storage.prototype.fetchNotifications = function(walletId, opts, cb) { + var txs = []; + opts = opts || {}; + opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; + opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; + opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; + + var key = KEY.NOTIFICATION(walletId, opts.minTs); + var endkey = KEY.NOTIFICATION(walletId, opts.maxTs); + + this.db.createReadStream({ + gt: key, + lt: endkey + '~', + reverse: opts.reverse, + limit: opts.limit, + }) + .on('data', function(data) { + txs.push(Notification.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return cb(null, txs); + }); +}; + + +Storage.prototype.storeNotification = function(walletId, notification, cb) { + this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb); +}; + + +// TODO should we store only txp.id on keys for indexing +// or the whole txp? For now, the entire record makes sense +// (faster + easier to access) +Storage.prototype.storeTx = function(walletId, txp, cb) { + var ops = [{ + type: 'put', + key: KEY.TXP(walletId, txp.id), + value: txp, + }]; + + if (txp.isPending()) { + ops.push({ + type: 'put', + key: KEY.PENDING_TXP(walletId, txp.id), + value: txp, + }); + } else { + ops.push({ + type: 'del', + key: KEY.PENDING_TXP(walletId, txp.id), + }); + } + this.db.batch(ops, cb); +}; + +Storage.prototype.removeTx = function(walletId, txProposalId, cb) { + var ops = [{ + type: 'del', + key: KEY.TXP(walletId, txProposalId), + }, { + type: 'del', + key: KEY.PENDING_TXP(walletId, txProposalId), + }]; + + this.db.batch(ops, cb); +}; + +Storage.prototype._delByKey = function(key, cb) { + var self = this; + var keys = []; + this.db.createKeyStream({ + gte: key, + lt: key + '~', + }) + .on('data', function(key) { + keys.push(key); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function(err) { + self.db.batch(_.map(keys, function(k) { + return { + key: k, + type: 'del' + }; + }), function(err) { + return cb(err); + }); + }); +}; + +Storage.prototype._removeCopayers = function(walletId, cb) { + var self = this; + + this.fetchWallet(walletId, function(err, w) { + if (err || !w) return cb(err); + + self.db.batch(_.map(w.copayers, function(c) { + return { + type: 'del', + key: KEY.COPAYER(c.id), + }; + }), cb); + }); +}; + +Storage.prototype.removeWallet = function(walletId, cb) { + var self = this; + + async.series([ + + function(next) { + // This should be the first step. Will check the wallet exists + self._removeCopayers(walletId, next); + }, + function(next) { + self._delByKey(walletPrefix(walletId), cb); + }, + ], cb); +}; + + +Storage.prototype.fetchAddresses = function(walletId, cb) { + var addresses = []; + var key = KEY.ADDRESS(walletId); + this.db.createReadStream({ + gte: key, + lt: key + '~' + }) + .on('data', function(data) { + addresses.push(Address.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return cb(null, addresses); + }); +}; + +Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { + var ops = _.map([].concat(addresses), function(address) { + return { + type: 'put', + key: KEY.ADDRESS(wallet.id, address.address), + value: address, + }; + }); + ops.unshift({ + type: 'put', + key: KEY.WALLET(wallet.id), + value: wallet, + }); + + this.db.batch(ops, cb); +}; + +Storage.prototype._dump = function(cb, fn) { + fn = fn || console.log; + + this.db.readStream() + .on('data', function(data) { + fn(util.inspect(data, { + depth: 10 + })); + }) + .on('end', function() { + if (cb) return cb(); + }); +}; + +module.exports = Storage; diff --git a/test/storage.js b/test/storage.js new file mode 100644 index 0000000..0378d61 --- /dev/null +++ b/test/storage.js @@ -0,0 +1,54 @@ +'use strict'; + +var _ = require('lodash'); +var chai = require('chai'); +var sinon = require('sinon'); +var should = chai.should(); +var levelup = require('levelup'); +var memdown = require('memdown'); +var Storage = require('../lib/storage'); + +var Model = require('../lib/model'); + + +describe.only('Storage', function() { + var storage; + beforeEach(function() { + var db = levelup(memdown, { + valueEncoding: 'json' + }); + storage = new Storage({ + db: db + }); + }); + describe('Store & fetch wallet', function() { + it('should correctly store and fetch wallet', function(done) { + var wallet = Model.Wallet.create({ + id: '123', + name: 'my wallet', + m: 2, + n: 3, + }); + should.exist(wallet); + storage.storeWallet(wallet, function(err) { + should.not.exist(err); + storage.fetchWallet('123', function(err, w) { + should.not.exist(err); + should.exist(w); + w.id.should.equal(wallet.id); + w.name.should.equal(wallet.name); + w.m.should.equal(wallet.m); + w.n.should.equal(wallet.n); + done(); + }) + }); + }); + it('should not return error if wallet not found', function(done) { + storage.fetchWallet('123', function(err, w) { + should.not.exist(err); + should.not.exist(w); + done(); + }); + }); + }); +}); From 706079da826b61e73e39e6b03a3b85fccf50e898 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Mon, 20 Apr 2015 16:05:02 -0300 Subject: [PATCH 03/13] fix opts passed to fetchTx --- lib/server.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/server.js b/lib/server.js index 0eb4be8..c6a48e6 100644 --- a/lib/server.js +++ b/lib/server.js @@ -1120,7 +1120,7 @@ WalletService.prototype.getTxHistory = function(opts, cb) { async.parallel([ function(next) { - self.storage.fetchTxs(self.walletId, opts, function(err, txps) { + self.storage.fetchTxs(self.walletId, {}, function(err, txps) { if (err) return next(err); next(null, txps); }); From 1b2b0dc146551f6ca0ff26e73090107a5fbdd031 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Mon, 20 Apr 2015 17:04:26 -0300 Subject: [PATCH 04/13] implement mongodb storage --- lib/model/index.js | 4 + lib/storage_mongo.js | 409 +++++++++++++++++-------------------------- package.json | 20 +-- test/storage.js | 174 +++++++++++++++++- 4 files changed, 338 insertions(+), 269 deletions(-) diff --git a/lib/model/index.js b/lib/model/index.js index 7d1c5d8..3a3fcf2 100644 --- a/lib/model/index.js +++ b/lib/model/index.js @@ -1,5 +1,9 @@ var Model = {}; Model.Wallet = require('./wallet'); +Model.Copayer = require('./copayer'); +Model.TxProposal = require('./txproposal'); +Model.Address = require('./address'); +Model.Notification = require('./notification'); module.exports = Model; diff --git a/lib/storage_mongo.js b/lib/storage_mongo.js index 1e3ec37..cb0e9bd 100644 --- a/lib/storage_mongo.js +++ b/lib/storage_mongo.js @@ -1,7 +1,6 @@ 'use strict'; var _ = require('lodash'); -var levelup = require('mongodb'); var async = require('async'); var $ = require('preconditions').singleton(); var log = require('npmlog'); @@ -11,11 +10,7 @@ var util = require('util'); var mongodb = require('mongodb'); -var Wallet = require('./model/wallet'); -var Copayer = require('./model/copayer'); -var Address = require('./model/address'); -var TxProposal = require('./model/txproposal'); -var Notification = require('./model/notification'); +var Model = require('./model'); var Storage = function(opts) { opts = opts || {}; @@ -34,87 +29,51 @@ var Storage = function(opts) { } }; -var zeroPad = function(x, length) { - return _.padLeft(parseInt(x), length, '0'); -}; - -var walletPrefix = function(id) { - return 'w!' + id; -}; - -var opKey = function(key) { - return key ? '!' + key : ''; -}; - -var MAX_TS = _.repeat('9', 14); - - -var KEY = { - WALLET: function(walletId) { - return walletPrefix(walletId) + '!main'; - }, - COPAYER: function(id) { - return 'copayer!' + id; - }, - TXP: function(walletId, txProposalId) { - return walletPrefix(walletId) + '!txp' + opKey(txProposalId); - }, - NOTIFICATION: function(walletId, notificationId) { - return walletPrefix(walletId) + '!not' + opKey(notificationId); - }, - PENDING_TXP: function(walletId, txProposalId) { - return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId); - }, - ADDRESS: function(walletId, address) { - return walletPrefix(walletId) + '!addr' + opKey(address); - }, -}; - Storage.prototype.fetchWallet = function(id, cb) { - this.db.get(KEY.WALLET(id), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return cb(null, Wallet.fromObj(data)); + this.db.collection('wallets').findOne({ + id: id + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + return cb(null, Model.Wallet.fromObj(result)); }); }; Storage.prototype.storeWallet = function(wallet, cb) { - this.db.put(KEY.WALLET(wallet.id), wallet, cb); + this.db.collection('wallets').update({ + id: wallet.id + }, wallet, { + w: 1, + upsert: true, + }, cb); }; Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { - var ops = []; - ops.push({ - type: 'put', - key: KEY.WALLET(wallet.id), - value: wallet - }); - _.each(wallet.copayers, function(copayer) { - var value = { - walletId: wallet.id, - requestPubKey: copayer.requestPubKey, - }; - ops.push({ - type: 'put', - key: KEY.COPAYER(copayer.id), - value: value - }); - }); - this.db.batch(ops, cb); + return this.storeWallet(wallet, cb); }; Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { - this.db.get(KEY.COPAYER(copayerId), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return cb(null, data); + this.db.collection('wallets').findOne({ + 'copayers.id': copayerId + }, { + fields: { + id: 1, + copayers: 1, + }, + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var copayer = _.find(result.copayers, { + id: copayerId + }); + return cb(null, { + walletId: result.id, + requestPubKey: copayer.requestPubKey, + }); }); }; +// TODO: should be done client-side Storage.prototype._completeTxData = function(walletId, txs, cb) { var txList = [].concat(txs); this.fetchWallet(walletId, function(err, wallet) { @@ -131,12 +90,14 @@ Storage.prototype._completeTxData = function(walletId, txs, cb) { Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { var self = this; - this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return self._completeTxData(walletId, TxProposal.fromObj(data), cb); + + this.db.collection('txs').findOne({ + id: txProposalId, + walletId: walletId + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + return self._completeTxData(walletId, Model.TxProposal.fromObj(result), cb); }); }; @@ -144,22 +105,19 @@ Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { Storage.prototype.fetchPendingTxs = function(walletId, cb) { var self = this; - var txs = []; - var key = KEY.PENDING_TXP(walletId); - this.db.createReadStream({ - gte: key, - lt: key + '~' - }) - .on('data', function(data) { - txs.push(TxProposal.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return self._completeTxData(walletId, txs, cb); + this.db.collection('txs').find({ + walletId: walletId, + isPending: true + }).sort({ + createdOn: 1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var txs = _.map(result, function(tx) { + return Model.TxProposal.fromObj(tx); }); + return self._completeTxData(walletId, txs, cb); + }); }; /** @@ -173,31 +131,30 @@ Storage.prototype.fetchPendingTxs = function(walletId, cb) { Storage.prototype.fetchTxs = function(walletId, opts, cb) { var self = this; - var txs = []; opts = opts || {}; - opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; - opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; - opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; - var key = KEY.TXP(walletId, opts.minTs); - var endkey = KEY.TXP(walletId, opts.maxTs); + var tsFilter = {}; + if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; + if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; - this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: true, - limit: opts.limit, - }) - .on('data', function(data) { - txs.push(TxProposal.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return self._completeTxData(walletId, txs, cb); + var filter = { + walletId: walletId + }; + if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; + + var mods = {}; + if (_.isNumber(opts.limit)) mods.limit = opts.limit; + + this.db.collection('txs').find(filter, mods).sort({ + createdOn: 1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var txs = _.map(result, function(tx) { + return Model.TxProposal.fromObj(tx); }); + return self._completeTxData(walletId, txs, cb); + }); }; @@ -210,181 +167,133 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) { * @param opts.limit */ Storage.prototype.fetchNotifications = function(walletId, opts, cb) { - var txs = []; + var self = this; + opts = opts || {}; - opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; - opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; - opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; - var key = KEY.NOTIFICATION(walletId, opts.minTs); - var endkey = KEY.NOTIFICATION(walletId, opts.maxTs); + var tsFilter = {}; + if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; + if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; - this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: opts.reverse, - limit: opts.limit, - }) - .on('data', function(data) { - txs.push(Notification.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return cb(null, txs); + var filter = { + walletId: walletId + }; + if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; + + var mods = {}; + if (_.isNumber(opts.limit)) mods.limit = opts.limit; + + this.db.collection('notifications').find(filter, mods).sort({ + createdOn: 1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var notifications = _.map(result, function(notification) { + return Model.Notification.fromObj(notification); }); + return cb(null, notifications); + }); }; +// TODO: remove walletId from signature Storage.prototype.storeNotification = function(walletId, notification, cb) { - this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb); + this.db.collection('notifications').insert(notification, { + w: 1 + }, cb); }; - -// TODO should we store only txp.id on keys for indexing -// or the whole txp? For now, the entire record makes sense -// (faster + easier to access) +// TODO: remove walletId from signature Storage.prototype.storeTx = function(walletId, txp, cb) { - var ops = [{ - type: 'put', - key: KEY.TXP(walletId, txp.id), - value: txp, - }]; - - if (txp.isPending()) { - ops.push({ - type: 'put', - key: KEY.PENDING_TXP(walletId, txp.id), - value: txp, - }); - } else { - ops.push({ - type: 'del', - key: KEY.PENDING_TXP(walletId, txp.id), - }); - } - this.db.batch(ops, cb); + txp.isPending = txp.isPending(); // Persist attribute to use when querying + this.db.collection('txs').update({ + id: txp.id, + walletId: walletId + }, txp, { + w: 1, + upsert: true, + }, cb); }; Storage.prototype.removeTx = function(walletId, txProposalId, cb) { - var ops = [{ - type: 'del', - key: KEY.TXP(walletId, txProposalId), + this.db.collection('txs').findAndRemove({ + id: txProposalId, + walletId: walletId }, { - type: 'del', - key: KEY.PENDING_TXP(walletId, txProposalId), - }]; - - this.db.batch(ops, cb); -}; - -Storage.prototype._delByKey = function(key, cb) { - var self = this; - var keys = []; - this.db.createKeyStream({ - gte: key, - lt: key + '~', - }) - .on('data', function(key) { - keys.push(key); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function(err) { - self.db.batch(_.map(keys, function(k) { - return { - key: k, - type: 'del' - }; - }), function(err) { - return cb(err); - }); - }); -}; - -Storage.prototype._removeCopayers = function(walletId, cb) { - var self = this; - - this.fetchWallet(walletId, function(err, w) { - if (err || !w) return cb(err); - - self.db.batch(_.map(w.copayers, function(c) { - return { - type: 'del', - key: KEY.COPAYER(c.id), - }; - }), cb); - }); + w: 1 + }, cb); }; Storage.prototype.removeWallet = function(walletId, cb) { var self = this; - async.series([ + async.parallel([ function(next) { - // This should be the first step. Will check the wallet exists - self._removeCopayers(walletId, next); + this.db.collections('wallets').findAndRemove({ + id: walletId + }, next); }, function(next) { - self._delByKey(walletPrefix(walletId), cb); + this.db.collections('addresses').findAndRemove({ + walletId: walletId + }, next); + }, + function(next) { + this.db.collections('txs').findAndRemove({ + walletId: walletId + }, next); + }, + function(next) { + this.db.collections('notifications').findAndRemove({ + walletId: walletId + }, next); }, ], cb); }; Storage.prototype.fetchAddresses = function(walletId, cb) { - var addresses = []; - var key = KEY.ADDRESS(walletId); - this.db.createReadStream({ - gte: key, - lt: key + '~' - }) - .on('data', function(data) { - addresses.push(Address.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return cb(null, addresses); + var self = this; + + this.db.collection('addresses').find({ + walletId: walletId, + }).sort({ + createdOn: 1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var addresses = _.map(result, function(address) { + return Model.Address.fromObj(address); }); + return cb(null, addresses); + }); }; Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { - var ops = _.map([].concat(addresses), function(address) { - return { - type: 'put', - key: KEY.ADDRESS(wallet.id, address.address), - value: address, - }; + var self = this; + this.db.collection('addresses').insert([].concat(addresses), { + w: 1 + }, function(err) { + if (err) return cb(err); + self.storeWallet(wallet, cb); }); - ops.unshift({ - type: 'put', - key: KEY.WALLET(wallet.id), - value: wallet, - }); - - this.db.batch(ops, cb); }; Storage.prototype._dump = function(cb, fn) { fn = fn || console.log; - this.db.readStream() - .on('data', function(data) { - fn(util.inspect(data, { - depth: 10 - })); - }) - .on('end', function() { - if (cb) return cb(); - }); + var self = this; + this.db.collections(function(err, collections) { + if (err) return cb(err); + async.eachSeries(collections, function(col, next) { + fn('--------' + col); + col.find().toArray(function(err, item) { + fn(item); + next(err); + }); + }, cb); + }); }; module.exports = Storage; diff --git a/package.json b/package.json index d670edf..66b52f3 100644 --- a/package.json +++ b/package.json @@ -52,7 +52,8 @@ "memdown": "^1.0.0", "mocha": "^1.18.2", "sinon": "^1.10.3", - "supertest": "*" + "supertest": "*", + "tingodb": "^0.3.4" }, "scripts": { "start": "node bws.js", @@ -60,14 +61,11 @@ "test": "./node_modules/.bin/mocha", "coveralls": "./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" }, - "contributors": [ - { - "name": "Ivan Socolsky", - "email": "ivan@bitpay.com" - }, - { - "name": "Matias Alejo Garcia", - "email": "ematiu@gmail.com" - } - ] + "contributors": [{ + "name": "Ivan Socolsky", + "email": "ivan@bitpay.com" + }, { + "name": "Matias Alejo Garcia", + "email": "ematiu@gmail.com" + }] } diff --git a/test/storage.js b/test/storage.js index 0378d61..143259d 100644 --- a/test/storage.js +++ b/test/storage.js @@ -1,24 +1,62 @@ 'use strict'; var _ = require('lodash'); +var async = require('async'); var chai = require('chai'); var sinon = require('sinon'); var should = chai.should(); var levelup = require('levelup'); var memdown = require('memdown'); -var Storage = require('../lib/storage'); +var mongodb = require('mongodb'); + +var StorageLevelDb = require('../lib/storage'); +var StorageMongoDb = require('../lib/storage_mongo'); var Model = require('../lib/model'); -describe.only('Storage', function() { - var storage; - beforeEach(function() { - var db = levelup(memdown, { - valueEncoding: 'json' +function initStorageLevelDb(cb) { + var db = levelup(memdown, { + valueEncoding: 'json' + }); + return cb(null, db); +}; + +function initStorageMongoDb(cb) { + var url = 'mongodb://localhost:27017/bws'; + mongodb.MongoClient.connect(url, function(err, db) { + should.not.exist(err); + db.dropDatabase(function(err) { + return cb(null, db); }); - storage = new Storage({ - db: db + }); +}; + + + +var Storage, initDb; + + +var useLevel = false; + +if (useLevel) { + Storage = StorageLevelDb; + initDb = initStorageLevelDb; +} else { + Storage = StorageMongoDb; + initDb = initStorageMongoDb; +} + + +describe('Storage', function() { + var storage; + beforeEach(function(done) { + initDb(function(err, db) { + should.not.exist(err); + storage = new Storage({ + db: db + }); + done(); }); }); describe('Store & fetch wallet', function() { @@ -51,4 +89,124 @@ describe.only('Storage', function() { }); }); }); + describe('Copayer lookup', function() { + it('should correctly store and fetch copayer lookup', function(done) { + var wallet = Model.Wallet.create({ + id: '123', + name: 'my wallet', + m: 2, + n: 3, + }); + _.each(_.range(3), function(i) { + var copayer = Model.Copayer.create({ + name: 'copayer ' + i, + xPubKey: 'xPubKey ' + i, + requestPubKey: 'requestPubKey ' + i, + }); + wallet.addCopayer(copayer); + }); + + should.exist(wallet); + storage.storeWalletAndUpdateCopayersLookup(wallet, function(err) { + should.not.exist(err); + storage.fetchCopayerLookup(wallet.copayers[1].id, function(err, lookup) { + should.not.exist(err); + should.exist(lookup); + lookup.walletId.should.equal('123'); + lookup.requestPubKey.should.equal('requestPubKey 1'); + done(); + }) + }); + }); + it('should not return error if copayer not found', function(done) { + storage.fetchCopayerLookup('2', function(err, lookup) { + should.not.exist(err); + should.not.exist(lookup); + done(); + }); + }); + }); + + describe('Transaction proposals', function() { + var wallet, proposals; + + beforeEach(function(done) { + wallet = Model.Wallet.create({ + id: '123', + name: 'my wallet', + m: 2, + n: 3, + }); + _.each(_.range(3), function(i) { + var copayer = Model.Copayer.create({ + name: 'copayer ' + i, + xPubKey: 'xPubKey ' + i, + requestPubKey: 'requestPubKey ' + i, + }); + wallet.addCopayer(copayer); + }); + should.exist(wallet); + storage.storeWalletAndUpdateCopayersLookup(wallet, function(err) { + should.not.exist(err); + + proposals = _.map(_.range(4), function(i) { + var tx = Model.TxProposal.create({ + walletId: '123', + creatorId: wallet.copayers[0].id, + amount: i + 100, + }); + if (i % 2 == 0) { + tx.status = 'rejected'; + tx.isPending().should.be.false; + } + return tx; + }); + async.each(proposals, function(tx, next) { + storage.storeTx('123', tx, next); + }, function(err) { + should.not.exist(err); + done(); + }); + }); + }); + it('should fetch tx', function(done) { + storage.fetchTx('123', proposals[0].id, function(err, tx) { + should.not.exist(err); + should.exist(tx); + tx.id.should.equal(proposals[0].id); + tx.walletId.should.equal(proposals[0].walletId); + tx.creatorName.should.equal('copayer 0'); + done(); + }); + }); + it('should fetch all pending txs', function(done) { + storage.fetchPendingTxs('123', function(err, txs) { + should.not.exist(err); + should.exist(txs); + txs.length.should.equal(2); + txs = _.sortBy(txs, 'amount'); + txs[0].amount.should.equal(101); + txs[1].amount.should.equal(103); + done(); + }); + }); + it('should remove tx', function(done) { + storage.removeTx('123', proposals[0].id, function(err) { + should.not.exist(err); + storage.fetchTx('123', proposals[0].id, function(err, tx) { + should.not.exist(err); + should.not.exist(tx); + storage.fetchTxs('123', {}, function(err, txs) { + should.not.exist(err); + should.exist(txs); + txs.length.should.equal(3); + _.any(txs, { + id: proposals[0].id + }).should.be.false; + done(); + }); + }); + }); + }); + }); }); From 88c7323a0ee9c1fa7dfd32663db6e55640a51436 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Mon, 20 Apr 2015 17:46:40 -0300 Subject: [PATCH 05/13] add walletId to address --- lib/model/address.js | 2 ++ lib/model/copayer.js | 6 +++++- lib/model/wallet.js | 5 ++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/lib/model/address.js b/lib/model/address.js index a5da342..2590caf 100644 --- a/lib/model/address.js +++ b/lib/model/address.js @@ -13,6 +13,7 @@ Address.create = function(opts) { x.createdOn = Math.floor(Date.now() / 1000); x.address = opts.address; + x.walletId = opts.walletId; x.isChange = opts.isChange; x.path = opts.path; x.publicKeys = opts.publicKeys; @@ -24,6 +25,7 @@ Address.fromObj = function(obj) { x.createdOn = obj.createdOn; x.address = obj.address; + x.walletId = obj.walletId; x.isChange = obj.isChange; x.path = obj.path; x.publicKeys = obj.publicKeys; diff --git a/lib/model/copayer.js b/lib/model/copayer.js index d5a3444..1845350 100644 --- a/lib/model/copayer.js +++ b/lib/model/copayer.js @@ -60,7 +60,11 @@ Copayer.prototype.createAddress = function(wallet, isChange) { $.checkState(wallet.isComplete()); var path = this.addressManager.getNewAddressPath(isChange); - var address = Address.create(WalletUtils.deriveAddress(wallet.publicKeyRing, path, wallet.m, wallet.network)); + var raw = Address.create(WalletUtils.deriveAddress(wallet.publicKeyRing, path, wallet.m, wallet.network)); + var address = Address.create(_.extend(raw, { + walletId: wallet.id, + })); + address.isChange = isChange; return address; }; diff --git a/lib/model/wallet.js b/lib/model/wallet.js index 7cbe973..f95eda3 100644 --- a/lib/model/wallet.js +++ b/lib/model/wallet.js @@ -145,7 +145,10 @@ Wallet.prototype.createAddress = function(isChange) { $.checkState(this.isComplete()); var path = this.addressManager.getNewAddressPath(isChange); - var address = Address.create(WalletUtils.deriveAddress(this.publicKeyRing, path, this.m, this.network)); + var raw = WalletUtils.deriveAddress(this.publicKeyRing, path, this.m, this.network); + var address = Address.create(_.extend(raw, { + walletId: this.id, + })); address.isChange = isChange; return address; }; From ea3d251c0dbba528cc304a60878ac636fd68ea85 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Mon, 20 Apr 2015 19:45:45 -0300 Subject: [PATCH 06/13] all tests passing --- lib/server.js | 2 +- lib/storage_mongo.js | 56 ++++---- test/integration/server.js | 262 ++++++++++++++++++++++++------------- 3 files changed, 208 insertions(+), 112 deletions(-) diff --git a/lib/server.js b/lib/server.js index c6a48e6..9f61da4 100644 --- a/lib/server.js +++ b/lib/server.js @@ -942,7 +942,7 @@ WalletService.prototype.getPendingTxs = function(opts, cb) { * @param {Object} opts.minTs (defaults to 0) * @param {Object} opts.maxTs (defaults to now) * @param {Object} opts.limit - * @returns {TxProposal[]} Transaction proposals, first newer + * @returns {TxProposal[]} Transaction proposals, newer first */ WalletService.prototype.getTxs = function(opts, cb) { var self = this; diff --git a/lib/storage_mongo.js b/lib/storage_mongo.js index cb0e9bd..19fe047 100644 --- a/lib/storage_mongo.js +++ b/lib/storage_mongo.js @@ -12,6 +12,13 @@ var mongodb = require('mongodb'); var Model = require('./model'); +var collections = { + WALLETS: 'wallets', + TXS: 'txs', + ADDRESSES: 'addresses', + NOTIFICATIONS: 'notifications', +}; + var Storage = function(opts) { opts = opts || {}; this.db = opts.db; @@ -30,7 +37,7 @@ var Storage = function(opts) { }; Storage.prototype.fetchWallet = function(id, cb) { - this.db.collection('wallets').findOne({ + this.db.collection(collections.WALLETS).findOne({ id: id }, function(err, result) { if (err) return cb(err); @@ -40,7 +47,7 @@ Storage.prototype.fetchWallet = function(id, cb) { }; Storage.prototype.storeWallet = function(wallet, cb) { - this.db.collection('wallets').update({ + this.db.collection(collections.WALLETS).update({ id: wallet.id }, wallet, { w: 1, @@ -53,7 +60,7 @@ Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { }; Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { - this.db.collection('wallets').findOne({ + this.db.collection(collections.WALLETS).findOne({ 'copayers.id': copayerId }, { fields: { @@ -91,7 +98,7 @@ Storage.prototype._completeTxData = function(walletId, txs, cb) { Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { var self = this; - this.db.collection('txs').findOne({ + this.db.collection(collections.TXS).findOne({ id: txProposalId, walletId: walletId }, function(err, result) { @@ -105,11 +112,11 @@ Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { Storage.prototype.fetchPendingTxs = function(walletId, cb) { var self = this; - this.db.collection('txs').find({ + this.db.collection(collections.TXS).find({ walletId: walletId, isPending: true }).sort({ - createdOn: 1 + createdOn: -1 }).toArray(function(err, result) { if (err) return cb(err); if (!result) return cb(); @@ -145,8 +152,8 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) { var mods = {}; if (_.isNumber(opts.limit)) mods.limit = opts.limit; - this.db.collection('txs').find(filter, mods).sort({ - createdOn: 1 + this.db.collection(collections.TXS).find(filter, mods).sort({ + createdOn: -1 }).toArray(function(err, result) { if (err) return cb(err); if (!result) return cb(); @@ -165,6 +172,7 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) { * @param opts.minTs * @param opts.maxTs * @param opts.limit + * @param opts.reverse */ Storage.prototype.fetchNotifications = function(walletId, opts, cb) { var self = this; @@ -183,8 +191,8 @@ Storage.prototype.fetchNotifications = function(walletId, opts, cb) { var mods = {}; if (_.isNumber(opts.limit)) mods.limit = opts.limit; - this.db.collection('notifications').find(filter, mods).sort({ - createdOn: 1 + this.db.collection(collections.NOTIFICATIONS).find(filter, mods).sort({ + id: opts.reverse ? -1 : 1, }).toArray(function(err, result) { if (err) return cb(err); if (!result) return cb(); @@ -198,7 +206,7 @@ Storage.prototype.fetchNotifications = function(walletId, opts, cb) { // TODO: remove walletId from signature Storage.prototype.storeNotification = function(walletId, notification, cb) { - this.db.collection('notifications').insert(notification, { + this.db.collection(collections.NOTIFICATIONS).insert(notification, { w: 1 }, cb); }; @@ -206,7 +214,7 @@ Storage.prototype.storeNotification = function(walletId, notification, cb) { // TODO: remove walletId from signature Storage.prototype.storeTx = function(walletId, txp, cb) { txp.isPending = txp.isPending(); // Persist attribute to use when querying - this.db.collection('txs').update({ + this.db.collection(collections.TXS).update({ id: txp.id, walletId: walletId }, txp, { @@ -216,7 +224,7 @@ Storage.prototype.storeTx = function(walletId, txp, cb) { }; Storage.prototype.removeTx = function(walletId, txProposalId, cb) { - this.db.collection('txs').findAndRemove({ + this.db.collection(collections.TXS).findAndRemove({ id: txProposalId, walletId: walletId }, { @@ -230,22 +238,22 @@ Storage.prototype.removeWallet = function(walletId, cb) { async.parallel([ function(next) { - this.db.collections('wallets').findAndRemove({ + self.db.collection(collections.WALLETS).findAndRemove({ id: walletId }, next); }, function(next) { - this.db.collections('addresses').findAndRemove({ + self.db.collection(collections.ADDRESSES).remove({ walletId: walletId }, next); }, function(next) { - this.db.collections('txs').findAndRemove({ + self.db.collection(collections.TXS).remove({ walletId: walletId }, next); }, function(next) { - this.db.collections('notifications').findAndRemove({ + self.db.collection(collections.NOTIFICATIONS).remove({ walletId: walletId }, next); }, @@ -256,7 +264,7 @@ Storage.prototype.removeWallet = function(walletId, cb) { Storage.prototype.fetchAddresses = function(walletId, cb) { var self = this; - this.db.collection('addresses').find({ + this.db.collection(collections.ADDRESSES).find({ walletId: walletId, }).sort({ createdOn: 1 @@ -272,7 +280,9 @@ Storage.prototype.fetchAddresses = function(walletId, cb) { Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { var self = this; - this.db.collection('addresses').insert([].concat(addresses), { + var addresses = [].concat(addresses); + if (addresses.length == 0) return cb(); + this.db.collection(collections.ADDRESSES).insert(addresses, { w: 1 }, function(err) { if (err) return cb(err); @@ -282,14 +292,16 @@ Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { Storage.prototype._dump = function(cb, fn) { fn = fn || console.log; + cb = cb || function() {}; var self = this; this.db.collections(function(err, collections) { if (err) return cb(err); async.eachSeries(collections, function(col, next) { - fn('--------' + col); - col.find().toArray(function(err, item) { - fn(item); + col.find().toArray(function(err, items) { + fn('--------', col.s.name); + fn(items); + fn('------------------------------------------------------------------\n\n'); next(err); }); }, cb); diff --git a/test/integration/server.js b/test/integration/server.js index 9e3c728..1b05a35 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -9,13 +9,14 @@ var sinon = require('sinon'); var should = chai.should(); var levelup = require('levelup'); var memdown = require('memdown'); +var mongodb = require('mongodb'); var log = require('npmlog'); log.debug = log.verbose; var Utils = require('../../lib/utils'); var WalletUtils = require('bitcore-wallet-utils'); var Bitcore = WalletUtils.Bitcore; -var Storage = require('../../lib/storage'); +var Storage = require('../../lib/storage_mongo'); var BlockchainMonitor = require('../../lib/blockchainmonitor'); var Wallet = require('../../lib/model/wallet'); @@ -208,24 +209,56 @@ helpers.createAddresses = function(server, wallet, main, change, cb) { var db, storage, blockchainExplorer; +function openDb(cb) { + function dropDb(cb) { + db.dropDatabase(function(err) { + should.not.exist(err); + return cb(); + }); + }; + if (db) { + return dropDb(cb); + } else { + var url = 'mongodb://localhost:27017/bws'; + mongodb.MongoClient.connect(url, function(err, _db) { + should.not.exist(err); + db = _db; + return dropDb(cb); + }); + } +}; + +function closeDb(cb) { + if (db) { + db.close(true, function(err) { + should.not.exist(err); + db = null; + return cb(); + }); + } else { + return cb(); + } +}; describe('Wallet service', function() { - beforeEach(function() { - db = levelup(memdown, { - valueEncoding: 'json' - }); - storage = new Storage({ - db: db - }); - blockchainExplorer = sinon.stub(); + beforeEach(function(done) { + openDb(function() { + storage = new Storage({ + db: db + }); + blockchainExplorer = sinon.stub(); - WalletService.initialize({ - storage: storage, - blockchainExplorer: blockchainExplorer, + WalletService.initialize({ + storage: storage, + blockchainExplorer: blockchainExplorer, + }); + helpers.offset = 0; + done(); }); - helpers.offset = 0; }); - + after(function(done) { + closeDb(done); + }); describe('#getInstanceWithAuth', function() { beforeEach(function() {}); @@ -1172,6 +1205,7 @@ describe('Wallet service', function() { helpers.stubUtxos(server, wallet, _.range(1, 9), function() { var txOpts = helpers.createProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 10, null, TestData.copayers[0].privKey_1H_0); server.createTx(txOpts, function(err, tx) { + should.not.exist(err); should.exist(tx); txid = tx.id; @@ -1814,9 +1848,7 @@ describe('Wallet service', function() { var server, wallet, clock; beforeEach(function(done) { - if (server) return done(); this.timeout(5000); - console.log('\tCreating TXS...'); clock = sinon.useFakeTimers(); helpers.createAndJoinWallet(1, 1, function(s, w) { server = s; @@ -1824,7 +1856,7 @@ describe('Wallet service', function() { helpers.stubUtxos(server, wallet, _.range(10), function() { var txOpts = helpers.createProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 0.1, null, TestData.copayers[0].privKey_1H_0); async.eachSeries(_.range(10), function(i, next) { - clock.tick(10000); + clock.tick(10 * 1000); server.createTx(txOpts, function(err, tx) { next(); }); @@ -1883,17 +1915,18 @@ describe('Wallet service', function() { }); - it('should txs from times 50 to 70', function(done) { - server.getTxs({ - minTs: 50, - maxTs: 70, - }, function(err, txps) { - should.not.exist(err); - var times = _.pluck(txps, 'createdOn'); - times.should.deep.equal([70, 60, 50]); - done(); + it('should txs from times 50 to 70', + function(done) { + server.getTxs({ + minTs: 50, + maxTs: 70, + }, function(err, txps) { + should.not.exist(err); + var times = _.pluck(txps, 'createdOn'); + times.should.deep.equal([70, 60, 50]); + done(); + }); }); - }); }); describe('Notifications', function() { @@ -2073,62 +2106,110 @@ describe('Wallet service', function() { }); }); }); + it('should delete a wallet', function(done) { - var i = 0; - var count = function() { - return ++i; - }; - server.storage._dump(function() { - i.should.above(1); - server.removeWallet({}, function(err) { - i = 0; - server.storage._dump(function() { - server.storage._dump(); - i.should.equal(0); + server.removeWallet({}, function(err) { + should.not.exist(err); + server.getWallet({}, function(err, w) { + should.exist(err); + err.message.should.equal('Wallet not found'); + should.not.exist(w); + async.parallel([ + + function(next) { + server.storage.fetchAddresses(wallet.id, function(err, items) { + items.length.should.equal(0); + next(); + }); + }, + function(next) { + server.storage.fetchTxs(wallet.id, {}, function(err, items) { + items.length.should.equal(0); + next(); + }); + }, + function(next) { + server.storage.fetchNotifications(wallet.id, {}, function(err, items) { + items.length.should.equal(0); + next(); + }); + }, + ], function(err) { + should.not.exist(err); done(); - }, count); + }); }); - }, count); + }); }); // creates 2 wallet, and deletes only 1. it('should delete a wallet, and only that wallet', function(done) { - var i = 0; - var db = []; - var cat = function(data) { - db.push(data); - }; - server.storage._dump(function() { - var before = _.clone(db); - db.length.should.above(1); + var server2, wallet2; + async.series([ - helpers.offset = 1; - helpers.createAndJoinWallet(2, 3, function(s, w) { - server = s; - wallet = w; + function(next) { + helpers.offset = 1; + helpers.createAndJoinWallet(1, 1, function(s, w) { + server2 = s; + wallet2 = w; - helpers.stubUtxos(server, wallet, _.range(2), function() { - var txOpts = { - toAddress: '18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', - amount: helpers.toSatoshi(0.1), - }; - async.eachSeries(_.range(2), function(i, next) { - server.createTx(txOpts, function(err, tx) { - next(); - }); - }, function() { - server.removeWallet({}, function(err) { - db = []; - server.storage._dump(function() { - var after = _.clone(db); - after.should.deep.equal(before); - done(); - }, cat); - }); - }, cat); + helpers.stubUtxos(server2, wallet2, _.range(1, 3), function() { + var txOpts = helpers.createProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 0.1, 'some message', TestData.copayers[1].privKey_1H_0); + async.eachSeries(_.range(2), function(i, next) { + server2.createTx(txOpts, function(err, tx) { + should.not.exist(err); + next(err); + }); + }, next); + }); }); - }); - }, cat); + }, + function(next) { + server.removeWallet({}, next); + }, + function(next) { + server.getWallet({}, function(err, wallet) { + should.exist(err); + err.message.should.contain('not found'); + next(); + }); + }, + function(next) { + server2.getWallet({}, function(err, wallet) { + should.not.exist(err); + should.exist(wallet); + wallet.id.should.equal(wallet2.id); + next(); + }); + }, + function(next) { + server2.getMainAddresses({}, function(err, addresses) { + should.not.exist(err); + should.exist(addresses); + addresses.length.should.above(0); + next(); + }); + }, + function(next) { + server2.getTxs({}, function(err, txs) { + should.not.exist(err); + should.exist(txs); + txs.length.should.equal(2); + next(); + }); + }, + function(next) { + server2.getNotifications({}, function(err, notifications) { + should.not.exist(err); + should.exist(notifications); + notifications.length.should.above(0); + next(); + }); + }, + ], function(err) { + should.not.exist(err); + done(); + }); }); }); @@ -2967,29 +3048,32 @@ describe('Wallet service', function() { describe('Blockchain monitor', function() { var addressSubscriber; - beforeEach(function() { - db = levelup(memdown, { - valueEncoding: 'json' - }); - storage = new Storage({ - db: db - }); - blockchainExplorer = sinon.stub(); - - WalletService.initialize({ - storage: storage, - blockchainExplorer: blockchainExplorer, - }); - helpers.offset = 0; - + beforeEach(function(done) { addressSubscriber = sinon.stub(); addressSubscriber.subscribe = sinon.stub(); sinon.stub(BlockchainMonitor.prototype, '_getAddressSubscriber').onFirstCall().returns(addressSubscriber); - }); + openDb(function() { + storage = new Storage({ + db: db + }); + blockchainExplorer = sinon.stub(); + + WalletService.initialize({ + storage: storage, + blockchainExplorer: blockchainExplorer, + }); + helpers.offset = 0; + + done(); + }); + }); afterEach(function() { BlockchainMonitor.prototype._getAddressSubscriber.restore(); }); + after(function(done) { + closeDb(done); + }); it('should subscribe wallet', function(done) { var monitor = new BlockchainMonitor(); From b3c33b2781e33aa908143e07d29fba26df064668 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Mon, 20 Apr 2015 19:53:19 -0300 Subject: [PATCH 07/13] remove leveldb --- lib/storage.js | 446 +++++++++++++++---------------------- lib/storage_leveldb.js | 391 ++++++++++++++++++++++++++++++++ lib/storage_mongo.js | 311 -------------------------- package.json | 20 +- test/integration/server.js | 17 +- test/storage.js | 30 +-- 6 files changed, 595 insertions(+), 620 deletions(-) create mode 100644 lib/storage_leveldb.js delete mode 100644 lib/storage_mongo.js diff --git a/lib/storage.js b/lib/storage.js index f2bed0b..19fe047 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -1,121 +1,86 @@ 'use strict'; var _ = require('lodash'); -var levelup = require('levelup'); -var multilevel = require('multilevel'); -var net = require('net'); var async = require('async'); var $ = require('preconditions').singleton(); var log = require('npmlog'); -var util = require('util'); log.debug = log.verbose; log.disableColor(); +var util = require('util'); -var Wallet = require('./model/wallet'); -var Copayer = require('./model/copayer'); -var Address = require('./model/address'); -var TxProposal = require('./model/txproposal'); -var Notification = require('./model/notification'); +var mongodb = require('mongodb'); + +var Model = require('./model'); + +var collections = { + WALLETS: 'wallets', + TXS: 'txs', + ADDRESSES: 'addresses', + NOTIFICATIONS: 'notifications', +}; var Storage = function(opts) { opts = opts || {}; this.db = opts.db; if (!this.db) { - if (opts.multiLevel) { - this.db = multilevel.client(); - var con = net.connect(opts.multiLevel); - con.pipe(this.db.createRpcStream()).pipe(con); - log.info('Using multilevel server:' + opts.multiLevel.host + ':' + opts.multiLevel.port); - } else { - this.db = levelup(opts.dbPath || './db/bws.db', { - valueEncoding: 'json' - }); - } + var url = 'mongodb://localhost:27017/bws'; + mongodb.MongoClient.connect(url, function(err, db) { + if (err) { + log.error('Unable to connect to the mongoDB server. Error:', err); + return; + } + this.db = db; + console.log('Connection established to ', url); + }); } }; -var zeroPad = function(x, length) { - return _.padLeft(parseInt(x), length, '0'); -}; - -var walletPrefix = function(id) { - return 'w!' + id; -}; - -var opKey = function(key) { - return key ? '!' + key : ''; -}; - -var MAX_TS = _.repeat('9', 14); - - -var KEY = { - WALLET: function(walletId) { - return walletPrefix(walletId) + '!main'; - }, - COPAYER: function(id) { - return 'copayer!' + id; - }, - TXP: function(walletId, txProposalId) { - return walletPrefix(walletId) + '!txp' + opKey(txProposalId); - }, - NOTIFICATION: function(walletId, notificationId) { - return walletPrefix(walletId) + '!not' + opKey(notificationId); - }, - PENDING_TXP: function(walletId, txProposalId) { - return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId); - }, - ADDRESS: function(walletId, address) { - return walletPrefix(walletId) + '!addr' + opKey(address); - }, -}; - Storage.prototype.fetchWallet = function(id, cb) { - this.db.get(KEY.WALLET(id), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return cb(null, Wallet.fromObj(data)); + this.db.collection(collections.WALLETS).findOne({ + id: id + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + return cb(null, Model.Wallet.fromObj(result)); }); }; Storage.prototype.storeWallet = function(wallet, cb) { - this.db.put(KEY.WALLET(wallet.id), wallet, cb); + this.db.collection(collections.WALLETS).update({ + id: wallet.id + }, wallet, { + w: 1, + upsert: true, + }, cb); }; Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { - var ops = []; - ops.push({ - type: 'put', - key: KEY.WALLET(wallet.id), - value: wallet - }); - _.each(wallet.copayers, function(copayer) { - var value = { - walletId: wallet.id, - requestPubKey: copayer.requestPubKey, - }; - ops.push({ - type: 'put', - key: KEY.COPAYER(copayer.id), - value: value - }); - }); - this.db.batch(ops, cb); + return this.storeWallet(wallet, cb); }; Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { - this.db.get(KEY.COPAYER(copayerId), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return cb(null, data); + this.db.collection(collections.WALLETS).findOne({ + 'copayers.id': copayerId + }, { + fields: { + id: 1, + copayers: 1, + }, + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var copayer = _.find(result.copayers, { + id: copayerId + }); + return cb(null, { + walletId: result.id, + requestPubKey: copayer.requestPubKey, + }); }); }; +// TODO: should be done client-side Storage.prototype._completeTxData = function(walletId, txs, cb) { var txList = [].concat(txs); this.fetchWallet(walletId, function(err, wallet) { @@ -132,12 +97,14 @@ Storage.prototype._completeTxData = function(walletId, txs, cb) { Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { var self = this; - this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return self._completeTxData(walletId, TxProposal.fromObj(data), cb); + + this.db.collection(collections.TXS).findOne({ + id: txProposalId, + walletId: walletId + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + return self._completeTxData(walletId, Model.TxProposal.fromObj(result), cb); }); }; @@ -145,22 +112,19 @@ Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { Storage.prototype.fetchPendingTxs = function(walletId, cb) { var self = this; - var txs = []; - var key = KEY.PENDING_TXP(walletId); - this.db.createReadStream({ - gte: key, - lt: key + '~' - }) - .on('data', function(data) { - txs.push(TxProposal.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return self._completeTxData(walletId, txs, cb); + this.db.collection(collections.TXS).find({ + walletId: walletId, + isPending: true + }).sort({ + createdOn: -1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var txs = _.map(result, function(tx) { + return Model.TxProposal.fromObj(tx); }); + return self._completeTxData(walletId, txs, cb); + }); }; /** @@ -174,31 +138,30 @@ Storage.prototype.fetchPendingTxs = function(walletId, cb) { Storage.prototype.fetchTxs = function(walletId, opts, cb) { var self = this; - var txs = []; opts = opts || {}; - opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; - opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; - opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; - var key = KEY.TXP(walletId, opts.minTs); - var endkey = KEY.TXP(walletId, opts.maxTs); + var tsFilter = {}; + if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; + if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; - this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: true, - limit: opts.limit, - }) - .on('data', function(data) { - txs.push(TxProposal.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return self._completeTxData(walletId, txs, cb); + var filter = { + walletId: walletId + }; + if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; + + var mods = {}; + if (_.isNumber(opts.limit)) mods.limit = opts.limit; + + this.db.collection(collections.TXS).find(filter, mods).sort({ + createdOn: -1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var txs = _.map(result, function(tx) { + return Model.TxProposal.fromObj(tx); }); + return self._completeTxData(walletId, txs, cb); + }); }; @@ -209,183 +172,140 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) { * @param opts.minTs * @param opts.maxTs * @param opts.limit + * @param opts.reverse */ Storage.prototype.fetchNotifications = function(walletId, opts, cb) { - var txs = []; + var self = this; + opts = opts || {}; - opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; - opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; - opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; - var key = KEY.NOTIFICATION(walletId, opts.minTs); - var endkey = KEY.NOTIFICATION(walletId, opts.maxTs); + var tsFilter = {}; + if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; + if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; - this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: opts.reverse, - limit: opts.limit, - }) - .on('data', function(data) { - txs.push(Notification.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return cb(null, txs); + var filter = { + walletId: walletId + }; + if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; + + var mods = {}; + if (_.isNumber(opts.limit)) mods.limit = opts.limit; + + this.db.collection(collections.NOTIFICATIONS).find(filter, mods).sort({ + id: opts.reverse ? -1 : 1, + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var notifications = _.map(result, function(notification) { + return Model.Notification.fromObj(notification); }); + return cb(null, notifications); + }); }; +// TODO: remove walletId from signature Storage.prototype.storeNotification = function(walletId, notification, cb) { - this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb); + this.db.collection(collections.NOTIFICATIONS).insert(notification, { + w: 1 + }, cb); }; - -// TODO should we store only txp.id on keys for indexing -// or the whole txp? For now, the entire record makes sense -// (faster + easier to access) +// TODO: remove walletId from signature Storage.prototype.storeTx = function(walletId, txp, cb) { - var ops = [{ - type: 'put', - key: KEY.TXP(walletId, txp.id), - value: txp, - }]; - - if (txp.isPending()) { - ops.push({ - type: 'put', - key: KEY.PENDING_TXP(walletId, txp.id), - value: txp, - }); - } else { - ops.push({ - type: 'del', - key: KEY.PENDING_TXP(walletId, txp.id), - }); - } - this.db.batch(ops, cb); + txp.isPending = txp.isPending(); // Persist attribute to use when querying + this.db.collection(collections.TXS).update({ + id: txp.id, + walletId: walletId + }, txp, { + w: 1, + upsert: true, + }, cb); }; Storage.prototype.removeTx = function(walletId, txProposalId, cb) { - var ops = [{ - type: 'del', - key: KEY.TXP(walletId, txProposalId), + this.db.collection(collections.TXS).findAndRemove({ + id: txProposalId, + walletId: walletId }, { - type: 'del', - key: KEY.PENDING_TXP(walletId, txProposalId), - }]; - - this.db.batch(ops, cb); -}; - -Storage.prototype._delByKey = function(key, cb) { - var self = this; - var keys = []; - this.db.createKeyStream({ - gte: key, - lt: key + '~', - }) - .on('data', function(key) { - keys.push(key); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function(err) { - self.db.batch(_.map(keys, function(k) { - return { - key: k, - type: 'del' - }; - }), function(err) { - return cb(err); - }); - }); -}; - -Storage.prototype._removeCopayers = function(walletId, cb) { - var self = this; - - this.fetchWallet(walletId, function(err, w) { - if (err || !w) return cb(err); - - self.db.batch(_.map(w.copayers, function(c) { - return { - type: 'del', - key: KEY.COPAYER(c.id), - }; - }), cb); - }); + w: 1 + }, cb); }; Storage.prototype.removeWallet = function(walletId, cb) { var self = this; - async.series([ + async.parallel([ function(next) { - // This should be the first step. Will check the wallet exists - self._removeCopayers(walletId, next); + self.db.collection(collections.WALLETS).findAndRemove({ + id: walletId + }, next); }, function(next) { - self._delByKey(walletPrefix(walletId), cb); + self.db.collection(collections.ADDRESSES).remove({ + walletId: walletId + }, next); + }, + function(next) { + self.db.collection(collections.TXS).remove({ + walletId: walletId + }, next); + }, + function(next) { + self.db.collection(collections.NOTIFICATIONS).remove({ + walletId: walletId + }, next); }, ], cb); }; Storage.prototype.fetchAddresses = function(walletId, cb) { - var addresses = []; - var key = KEY.ADDRESS(walletId); - this.db.createReadStream({ - gte: key, - lt: key + '~' - }) - .on('data', function(data) { - addresses.push(Address.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return cb(null, addresses); + var self = this; + + this.db.collection(collections.ADDRESSES).find({ + walletId: walletId, + }).sort({ + createdOn: 1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var addresses = _.map(result, function(address) { + return Model.Address.fromObj(address); }); + return cb(null, addresses); + }); }; Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { - var ops = _.map([].concat(addresses), function(address) { - return { - type: 'put', - key: KEY.ADDRESS(wallet.id, address.address), - value: address, - }; + var self = this; + var addresses = [].concat(addresses); + if (addresses.length == 0) return cb(); + this.db.collection(collections.ADDRESSES).insert(addresses, { + w: 1 + }, function(err) { + if (err) return cb(err); + self.storeWallet(wallet, cb); }); - ops.unshift({ - type: 'put', - key: KEY.WALLET(wallet.id), - value: wallet, - }); - - this.db.batch(ops, cb); }; Storage.prototype._dump = function(cb, fn) { fn = fn || console.log; + cb = cb || function() {}; - this.db.readStream() - .on('data', function(data) { - fn(util.inspect(data, { - depth: 10 - })); - }) - .on('end', function() { - if (cb) return cb(); - }); + var self = this; + this.db.collections(function(err, collections) { + if (err) return cb(err); + async.eachSeries(collections, function(col, next) { + col.find().toArray(function(err, items) { + fn('--------', col.s.name); + fn(items); + fn('------------------------------------------------------------------\n\n'); + next(err); + }); + }, cb); + }); }; module.exports = Storage; diff --git a/lib/storage_leveldb.js b/lib/storage_leveldb.js new file mode 100644 index 0000000..f2bed0b --- /dev/null +++ b/lib/storage_leveldb.js @@ -0,0 +1,391 @@ +'use strict'; + +var _ = require('lodash'); +var levelup = require('levelup'); +var multilevel = require('multilevel'); +var net = require('net'); +var async = require('async'); +var $ = require('preconditions').singleton(); +var log = require('npmlog'); +var util = require('util'); +log.debug = log.verbose; +log.disableColor(); + +var Wallet = require('./model/wallet'); +var Copayer = require('./model/copayer'); +var Address = require('./model/address'); +var TxProposal = require('./model/txproposal'); +var Notification = require('./model/notification'); + +var Storage = function(opts) { + opts = opts || {}; + this.db = opts.db; + + if (!this.db) { + if (opts.multiLevel) { + this.db = multilevel.client(); + var con = net.connect(opts.multiLevel); + con.pipe(this.db.createRpcStream()).pipe(con); + log.info('Using multilevel server:' + opts.multiLevel.host + ':' + opts.multiLevel.port); + } else { + this.db = levelup(opts.dbPath || './db/bws.db', { + valueEncoding: 'json' + }); + } + } +}; + +var zeroPad = function(x, length) { + return _.padLeft(parseInt(x), length, '0'); +}; + +var walletPrefix = function(id) { + return 'w!' + id; +}; + +var opKey = function(key) { + return key ? '!' + key : ''; +}; + +var MAX_TS = _.repeat('9', 14); + + +var KEY = { + WALLET: function(walletId) { + return walletPrefix(walletId) + '!main'; + }, + COPAYER: function(id) { + return 'copayer!' + id; + }, + TXP: function(walletId, txProposalId) { + return walletPrefix(walletId) + '!txp' + opKey(txProposalId); + }, + NOTIFICATION: function(walletId, notificationId) { + return walletPrefix(walletId) + '!not' + opKey(notificationId); + }, + PENDING_TXP: function(walletId, txProposalId) { + return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId); + }, + ADDRESS: function(walletId, address) { + return walletPrefix(walletId) + '!addr' + opKey(address); + }, +}; + +Storage.prototype.fetchWallet = function(id, cb) { + this.db.get(KEY.WALLET(id), function(err, data) { + if (err) { + if (err.notFound) return cb(); + return cb(err); + } + return cb(null, Wallet.fromObj(data)); + }); +}; + +Storage.prototype.storeWallet = function(wallet, cb) { + this.db.put(KEY.WALLET(wallet.id), wallet, cb); +}; + +Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { + var ops = []; + ops.push({ + type: 'put', + key: KEY.WALLET(wallet.id), + value: wallet + }); + _.each(wallet.copayers, function(copayer) { + var value = { + walletId: wallet.id, + requestPubKey: copayer.requestPubKey, + }; + ops.push({ + type: 'put', + key: KEY.COPAYER(copayer.id), + value: value + }); + }); + this.db.batch(ops, cb); +}; + +Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { + this.db.get(KEY.COPAYER(copayerId), function(err, data) { + if (err) { + if (err.notFound) return cb(); + return cb(err); + } + return cb(null, data); + }); +}; + +Storage.prototype._completeTxData = function(walletId, txs, cb) { + var txList = [].concat(txs); + this.fetchWallet(walletId, function(err, wallet) { + if (err) return cb(err); + _.each(txList, function(tx) { + tx.creatorName = wallet.getCopayer(tx.creatorId).name; + _.each(tx.actions, function(action) { + action.copayerName = wallet.getCopayer(action.copayerId).name; + }); + }); + return cb(null, txs); + }); +}; + +Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { + var self = this; + this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) { + if (err) { + if (err.notFound) return cb(); + return cb(err); + } + return self._completeTxData(walletId, TxProposal.fromObj(data), cb); + }); +}; + + +Storage.prototype.fetchPendingTxs = function(walletId, cb) { + var self = this; + + var txs = []; + var key = KEY.PENDING_TXP(walletId); + this.db.createReadStream({ + gte: key, + lt: key + '~' + }) + .on('data', function(data) { + txs.push(TxProposal.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return self._completeTxData(walletId, txs, cb); + }); +}; + +/** + * fetchTxs. Times are in UNIX EPOCH (seconds) + * + * @param walletId + * @param opts.minTs + * @param opts.maxTs + * @param opts.limit + */ +Storage.prototype.fetchTxs = function(walletId, opts, cb) { + var self = this; + + var txs = []; + opts = opts || {}; + opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; + opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; + opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; + + var key = KEY.TXP(walletId, opts.minTs); + var endkey = KEY.TXP(walletId, opts.maxTs); + + this.db.createReadStream({ + gt: key, + lt: endkey + '~', + reverse: true, + limit: opts.limit, + }) + .on('data', function(data) { + txs.push(TxProposal.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return self._completeTxData(walletId, txs, cb); + }); +}; + + +/** + * fetchNotifications + * + * @param walletId + * @param opts.minTs + * @param opts.maxTs + * @param opts.limit + */ +Storage.prototype.fetchNotifications = function(walletId, opts, cb) { + var txs = []; + opts = opts || {}; + opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; + opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; + opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; + + var key = KEY.NOTIFICATION(walletId, opts.minTs); + var endkey = KEY.NOTIFICATION(walletId, opts.maxTs); + + this.db.createReadStream({ + gt: key, + lt: endkey + '~', + reverse: opts.reverse, + limit: opts.limit, + }) + .on('data', function(data) { + txs.push(Notification.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return cb(null, txs); + }); +}; + + +Storage.prototype.storeNotification = function(walletId, notification, cb) { + this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb); +}; + + +// TODO should we store only txp.id on keys for indexing +// or the whole txp? For now, the entire record makes sense +// (faster + easier to access) +Storage.prototype.storeTx = function(walletId, txp, cb) { + var ops = [{ + type: 'put', + key: KEY.TXP(walletId, txp.id), + value: txp, + }]; + + if (txp.isPending()) { + ops.push({ + type: 'put', + key: KEY.PENDING_TXP(walletId, txp.id), + value: txp, + }); + } else { + ops.push({ + type: 'del', + key: KEY.PENDING_TXP(walletId, txp.id), + }); + } + this.db.batch(ops, cb); +}; + +Storage.prototype.removeTx = function(walletId, txProposalId, cb) { + var ops = [{ + type: 'del', + key: KEY.TXP(walletId, txProposalId), + }, { + type: 'del', + key: KEY.PENDING_TXP(walletId, txProposalId), + }]; + + this.db.batch(ops, cb); +}; + +Storage.prototype._delByKey = function(key, cb) { + var self = this; + var keys = []; + this.db.createKeyStream({ + gte: key, + lt: key + '~', + }) + .on('data', function(key) { + keys.push(key); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function(err) { + self.db.batch(_.map(keys, function(k) { + return { + key: k, + type: 'del' + }; + }), function(err) { + return cb(err); + }); + }); +}; + +Storage.prototype._removeCopayers = function(walletId, cb) { + var self = this; + + this.fetchWallet(walletId, function(err, w) { + if (err || !w) return cb(err); + + self.db.batch(_.map(w.copayers, function(c) { + return { + type: 'del', + key: KEY.COPAYER(c.id), + }; + }), cb); + }); +}; + +Storage.prototype.removeWallet = function(walletId, cb) { + var self = this; + + async.series([ + + function(next) { + // This should be the first step. Will check the wallet exists + self._removeCopayers(walletId, next); + }, + function(next) { + self._delByKey(walletPrefix(walletId), cb); + }, + ], cb); +}; + + +Storage.prototype.fetchAddresses = function(walletId, cb) { + var addresses = []; + var key = KEY.ADDRESS(walletId); + this.db.createReadStream({ + gte: key, + lt: key + '~' + }) + .on('data', function(data) { + addresses.push(Address.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return cb(null, addresses); + }); +}; + +Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { + var ops = _.map([].concat(addresses), function(address) { + return { + type: 'put', + key: KEY.ADDRESS(wallet.id, address.address), + value: address, + }; + }); + ops.unshift({ + type: 'put', + key: KEY.WALLET(wallet.id), + value: wallet, + }); + + this.db.batch(ops, cb); +}; + +Storage.prototype._dump = function(cb, fn) { + fn = fn || console.log; + + this.db.readStream() + .on('data', function(data) { + fn(util.inspect(data, { + depth: 10 + })); + }) + .on('end', function() { + if (cb) return cb(); + }); +}; + +module.exports = Storage; diff --git a/lib/storage_mongo.js b/lib/storage_mongo.js deleted file mode 100644 index 19fe047..0000000 --- a/lib/storage_mongo.js +++ /dev/null @@ -1,311 +0,0 @@ -'use strict'; - -var _ = require('lodash'); -var async = require('async'); -var $ = require('preconditions').singleton(); -var log = require('npmlog'); -log.debug = log.verbose; -log.disableColor(); -var util = require('util'); - -var mongodb = require('mongodb'); - -var Model = require('./model'); - -var collections = { - WALLETS: 'wallets', - TXS: 'txs', - ADDRESSES: 'addresses', - NOTIFICATIONS: 'notifications', -}; - -var Storage = function(opts) { - opts = opts || {}; - this.db = opts.db; - - if (!this.db) { - var url = 'mongodb://localhost:27017/bws'; - mongodb.MongoClient.connect(url, function(err, db) { - if (err) { - log.error('Unable to connect to the mongoDB server. Error:', err); - return; - } - this.db = db; - console.log('Connection established to ', url); - }); - } -}; - -Storage.prototype.fetchWallet = function(id, cb) { - this.db.collection(collections.WALLETS).findOne({ - id: id - }, function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - return cb(null, Model.Wallet.fromObj(result)); - }); -}; - -Storage.prototype.storeWallet = function(wallet, cb) { - this.db.collection(collections.WALLETS).update({ - id: wallet.id - }, wallet, { - w: 1, - upsert: true, - }, cb); -}; - -Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { - return this.storeWallet(wallet, cb); -}; - -Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { - this.db.collection(collections.WALLETS).findOne({ - 'copayers.id': copayerId - }, { - fields: { - id: 1, - copayers: 1, - }, - }, function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - var copayer = _.find(result.copayers, { - id: copayerId - }); - return cb(null, { - walletId: result.id, - requestPubKey: copayer.requestPubKey, - }); - }); -}; - -// TODO: should be done client-side -Storage.prototype._completeTxData = function(walletId, txs, cb) { - var txList = [].concat(txs); - this.fetchWallet(walletId, function(err, wallet) { - if (err) return cb(err); - _.each(txList, function(tx) { - tx.creatorName = wallet.getCopayer(tx.creatorId).name; - _.each(tx.actions, function(action) { - action.copayerName = wallet.getCopayer(action.copayerId).name; - }); - }); - return cb(null, txs); - }); -}; - -Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { - var self = this; - - this.db.collection(collections.TXS).findOne({ - id: txProposalId, - walletId: walletId - }, function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - return self._completeTxData(walletId, Model.TxProposal.fromObj(result), cb); - }); -}; - - -Storage.prototype.fetchPendingTxs = function(walletId, cb) { - var self = this; - - this.db.collection(collections.TXS).find({ - walletId: walletId, - isPending: true - }).sort({ - createdOn: -1 - }).toArray(function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - var txs = _.map(result, function(tx) { - return Model.TxProposal.fromObj(tx); - }); - return self._completeTxData(walletId, txs, cb); - }); -}; - -/** - * fetchTxs. Times are in UNIX EPOCH (seconds) - * - * @param walletId - * @param opts.minTs - * @param opts.maxTs - * @param opts.limit - */ -Storage.prototype.fetchTxs = function(walletId, opts, cb) { - var self = this; - - opts = opts || {}; - - var tsFilter = {}; - if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; - if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; - - var filter = { - walletId: walletId - }; - if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; - - var mods = {}; - if (_.isNumber(opts.limit)) mods.limit = opts.limit; - - this.db.collection(collections.TXS).find(filter, mods).sort({ - createdOn: -1 - }).toArray(function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - var txs = _.map(result, function(tx) { - return Model.TxProposal.fromObj(tx); - }); - return self._completeTxData(walletId, txs, cb); - }); -}; - - -/** - * fetchNotifications - * - * @param walletId - * @param opts.minTs - * @param opts.maxTs - * @param opts.limit - * @param opts.reverse - */ -Storage.prototype.fetchNotifications = function(walletId, opts, cb) { - var self = this; - - opts = opts || {}; - - var tsFilter = {}; - if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; - if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; - - var filter = { - walletId: walletId - }; - if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; - - var mods = {}; - if (_.isNumber(opts.limit)) mods.limit = opts.limit; - - this.db.collection(collections.NOTIFICATIONS).find(filter, mods).sort({ - id: opts.reverse ? -1 : 1, - }).toArray(function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - var notifications = _.map(result, function(notification) { - return Model.Notification.fromObj(notification); - }); - return cb(null, notifications); - }); -}; - - -// TODO: remove walletId from signature -Storage.prototype.storeNotification = function(walletId, notification, cb) { - this.db.collection(collections.NOTIFICATIONS).insert(notification, { - w: 1 - }, cb); -}; - -// TODO: remove walletId from signature -Storage.prototype.storeTx = function(walletId, txp, cb) { - txp.isPending = txp.isPending(); // Persist attribute to use when querying - this.db.collection(collections.TXS).update({ - id: txp.id, - walletId: walletId - }, txp, { - w: 1, - upsert: true, - }, cb); -}; - -Storage.prototype.removeTx = function(walletId, txProposalId, cb) { - this.db.collection(collections.TXS).findAndRemove({ - id: txProposalId, - walletId: walletId - }, { - w: 1 - }, cb); -}; - -Storage.prototype.removeWallet = function(walletId, cb) { - var self = this; - - async.parallel([ - - function(next) { - self.db.collection(collections.WALLETS).findAndRemove({ - id: walletId - }, next); - }, - function(next) { - self.db.collection(collections.ADDRESSES).remove({ - walletId: walletId - }, next); - }, - function(next) { - self.db.collection(collections.TXS).remove({ - walletId: walletId - }, next); - }, - function(next) { - self.db.collection(collections.NOTIFICATIONS).remove({ - walletId: walletId - }, next); - }, - ], cb); -}; - - -Storage.prototype.fetchAddresses = function(walletId, cb) { - var self = this; - - this.db.collection(collections.ADDRESSES).find({ - walletId: walletId, - }).sort({ - createdOn: 1 - }).toArray(function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - var addresses = _.map(result, function(address) { - return Model.Address.fromObj(address); - }); - return cb(null, addresses); - }); -}; - -Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { - var self = this; - var addresses = [].concat(addresses); - if (addresses.length == 0) return cb(); - this.db.collection(collections.ADDRESSES).insert(addresses, { - w: 1 - }, function(err) { - if (err) return cb(err); - self.storeWallet(wallet, cb); - }); -}; - -Storage.prototype._dump = function(cb, fn) { - fn = fn || console.log; - cb = cb || function() {}; - - var self = this; - this.db.collections(function(err, collections) { - if (err) return cb(err); - async.eachSeries(collections, function(col, next) { - col.find().toArray(function(err, items) { - fn('--------', col.s.name); - fn(items); - fn('------------------------------------------------------------------\n\n'); - next(err); - }); - }, cb); - }); -}; - -module.exports = Storage; diff --git a/package.json b/package.json index 66b52f3..48953f0 100644 --- a/package.json +++ b/package.json @@ -26,15 +26,12 @@ "coveralls": "^2.11.2", "express": "^4.10.0", "inherits": "^2.0.1", - "leveldown": "^0.10.0", - "levelup": "^0.19.0", "locker": "^0.1.0", "locker-server": "^0.1.3", "lodash": "^3.3.1", "mocha-lcov-reporter": "0.0.1", "mongodb": "^2.0.27", "morgan": "*", - "multilevel": "^6.1.0", "npmlog": "^0.1.1", "preconditions": "^1.0.7", "read": "^1.0.5", @@ -61,11 +58,14 @@ "test": "./node_modules/.bin/mocha", "coveralls": "./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" }, - "contributors": [{ - "name": "Ivan Socolsky", - "email": "ivan@bitpay.com" - }, { - "name": "Matias Alejo Garcia", - "email": "ematiu@gmail.com" - }] + "contributors": [ + { + "name": "Ivan Socolsky", + "email": "ivan@bitpay.com" + }, + { + "name": "Matias Alejo Garcia", + "email": "ematiu@gmail.com" + } + ] } diff --git a/test/integration/server.js b/test/integration/server.js index 1b05a35..1017c08 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -7,22 +7,23 @@ var inspect = require('util').inspect; var chai = require('chai'); var sinon = require('sinon'); var should = chai.should(); -var levelup = require('levelup'); -var memdown = require('memdown'); -var mongodb = require('mongodb'); var log = require('npmlog'); log.debug = log.verbose; +var mongodb = require('mongodb'); + var Utils = require('../../lib/utils'); var WalletUtils = require('bitcore-wallet-utils'); var Bitcore = WalletUtils.Bitcore; -var Storage = require('../../lib/storage_mongo'); +var Storage = require('../../lib/storage'); var BlockchainMonitor = require('../../lib/blockchainmonitor'); -var Wallet = require('../../lib/model/wallet'); -var TxProposal = require('../../lib/model/txproposal'); -var Address = require('../../lib/model/address'); -var Copayer = require('../../lib/model/copayer'); +var Model = require('../../lib/model'); +var Wallet = Model.Wallet; +var TxProposal = Model.TxProposal; +var Address = Model.Address; +var Copayer = Model.Copayer; + var WalletService = require('../../lib/server'); var NotificationBroadcaster = require('../../lib/notificationbroadcaster'); var TestData = require('../testdata'); diff --git a/test/storage.js b/test/storage.js index 143259d..e2ed216 100644 --- a/test/storage.js +++ b/test/storage.js @@ -5,24 +5,13 @@ var async = require('async'); var chai = require('chai'); var sinon = require('sinon'); var should = chai.should(); -var levelup = require('levelup'); -var memdown = require('memdown'); var mongodb = require('mongodb'); - -var StorageLevelDb = require('../lib/storage'); -var StorageMongoDb = require('../lib/storage_mongo'); +var Storage = require('../lib/storage'); var Model = require('../lib/model'); -function initStorageLevelDb(cb) { - var db = levelup(memdown, { - valueEncoding: 'json' - }); - return cb(null, db); -}; - -function initStorageMongoDb(cb) { +function initDb(cb) { var url = 'mongodb://localhost:27017/bws'; mongodb.MongoClient.connect(url, function(err, db) { should.not.exist(err); @@ -33,21 +22,6 @@ function initStorageMongoDb(cb) { }; - -var Storage, initDb; - - -var useLevel = false; - -if (useLevel) { - Storage = StorageLevelDb; - initDb = initStorageLevelDb; -} else { - Storage = StorageMongoDb; - initDb = initStorageMongoDb; -} - - describe('Storage', function() { var storage; beforeEach(function(done) { From 498392e72cbb4f876323c7f40f6ec25b39e40086 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Mon, 20 Apr 2015 20:46:45 -0300 Subject: [PATCH 08/13] refactor storage initialization --- bws.js | 4 +- config.js | 20 +++++----- lib/server.js | 3 +- lib/storage.js | 30 +++++++++------ multilevel/clientMultilevel.js | 70 ---------------------------------- multilevel/multilevel.js | 17 --------- test/integration/server.js | 40 ++++++++++--------- test/storage.js | 7 +++- 8 files changed, 59 insertions(+), 132 deletions(-) delete mode 100755 multilevel/clientMultilevel.js delete mode 100755 multilevel/multilevel.js diff --git a/bws.js b/bws.js index 271cafe..b0e5247 100755 --- a/bws.js +++ b/bws.js @@ -51,7 +51,7 @@ var start = function() { }); }; -if (config.cluster && (!config.storageOpts.multiLevel || !config.lockOpts.lockerServer)) - throw 'When running in cluster mode, multilevel and locker server need to be configured'; +if (config.cluster && !config.lockOpts.lockerServer) + throw 'When running in cluster mode, locker server need to be configured'; start(); diff --git a/config.js b/config.js index 1507220..b31a895 100644 --- a/config.js +++ b/config.js @@ -1,7 +1,7 @@ var config = { basePath: '/bws/api', disableLogs: false, - port: 3232, + port: 3232, // Uncomment to make BWS a forking server // cluster: true, // Uncomment to use the nr of availalbe CPUs @@ -12,19 +12,17 @@ var config = { // certificateFile: 'cert.pem', storageOpts: { - dbPath: './db', - // Uncomment to use multilevel server - // multiLevel: { - // host: 'localhost', - // port: 3230, - // }, + mongoDb: { + host: 'localhost', + port: 27017, + }, }, lockOpts: { // To use locker-server, uncomment this: - // lockerServer: { - // host: 'localhost', - // port: 3231, - // }, + // lockerServer: { + // host: 'localhost', + // port: 3231, + // }, }, blockchainExplorerOpts: { livenet: { diff --git a/lib/server.js b/lib/server.js index 9f61da4..2d5a8c6 100644 --- a/lib/server.js +++ b/lib/server.js @@ -57,7 +57,8 @@ WalletService.onNotification = function(func) { WalletService.initialize = function(opts) { opts = opts || {}; lock = opts.lock || new Lock(opts.lockOpts); - storage = opts.storage || new Storage(opts.storageOpts); + // TODO: This method needs to be async + storage = opts.storage || new Storage().connect(opts.storageOpts, function() {}); blockchainExplorer = opts.blockchainExplorer; blockchainExplorerOpts = opts.blockchainExplorerOpts; initialized = true; diff --git a/lib/storage.js b/lib/storage.js index 19fe047..f7105e4 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -19,21 +19,27 @@ var collections = { NOTIFICATIONS: 'notifications', }; -var Storage = function(opts) { +var Storage = function() {}; + +Storage.prototype.connect = function(opts, cb) { + var self = this; + opts = opts || {}; this.db = opts.db; - if (!this.db) { - var url = 'mongodb://localhost:27017/bws'; - mongodb.MongoClient.connect(url, function(err, db) { - if (err) { - log.error('Unable to connect to the mongoDB server. Error:', err); - return; - } - this.db = db; - console.log('Connection established to ', url); - }); - } + if (this.db) return cb(); + + var config = opts.mongoDb || {}; + var url = 'mongodb://' + (config.host || 'localhost') + ':' + (config.port ||  27017) + '/bws'; + mongodb.MongoClient.connect(url, function(err, db) { + if (err) { + log.error('Unable to connect to the mongoDB server.'); + return cb(err); + } + self.db = db; + console.log('Connection established to ', url); + return cb(); + }); }; Storage.prototype.fetchWallet = function(id, cb) { diff --git a/multilevel/clientMultilevel.js b/multilevel/clientMultilevel.js deleted file mode 100755 index f2d028a..0000000 --- a/multilevel/clientMultilevel.js +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env node - -var multilevel = require('multilevel'); -var net = require('net'); -var moment = require('moment'); - -var PORT = 3230; - -var otherDate; - -//trying to parse optional parameter to get stats on any given date -try { - otherDate = process.argv[2] && moment(process.argv[2]).isValid() ? moment(process.argv[2]) : null; -} catch (e) { - console.log('Enter the date in the format YYYY-MM-DD.'); -} - -var db = multilevel.client(); -var con = net.connect(PORT); -con.pipe(db.createRpcStream()).pipe(con); - - -var Today = otherDate || moment(); -var TotalTx = 0; -var TotalAmount = 0; -var TotalNewWallets = 0; - -var IsToday = function(date) { - if (!date) return false; - var date = moment(date * 1000); - return (date >= Today.startOf('day') && date <= Today.endOf('day')); -} - -var TotalTxpForToday = function(data) { - if (!data) return; - if (data.key.indexOf('!txp!') < 0) return; - if (!data.value || !IsToday(data.value.createdOn)) return; - TotalTx++; - TotalAmount = TotalAmount + data.value.amount; -}; - -var TotalNewWalletForToday = function(data) { - if (!data) return; - if (data.key.indexOf('!main') < 0) return; - if (!data.value || !IsToday(data.value.createdOn)) return; - TotalNewWallets++; -}; - -var PrintStats = function() { - console.log('Stats for date : ', Today.format("YYYY-MM-DD")); - console.log('New wallets : ', TotalNewWallets); - console.log('Total tx : ', TotalTx); - console.log('Total amount in tx (satoshis) : ', TotalAmount); -}; - -var ProcessData = function(data) { - TotalTxpForToday(data); - TotalNewWalletForToday(data); -}; - -// streams -db.createReadStream().on('data', function(data) { - ProcessData(data); -}).on('error', function(err) { - console.log('Error : ', err); - process.exit(code = 1); -}).on('close', function() { - PrintStats(); - process.exit(code = 0); -}); diff --git a/multilevel/multilevel.js b/multilevel/multilevel.js deleted file mode 100755 index 742638a..0000000 --- a/multilevel/multilevel.js +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env node - -var multilevel = require('multilevel'); -var net = require('net'); -var level = require('levelup'); - -var db = level('./db', { - valueEncoding: 'json' -}); -var HOST = 'localhost'; -var PORT = 3230; - - -console.log('Server started at port ' + PORT + '...'); -net.createServer(function(con) { - con.pipe(multilevel.server(db)).pipe(con); -}).listen(PORT, HOST); diff --git a/test/integration/server.js b/test/integration/server.js index 1017c08..5a0140e 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -244,17 +244,20 @@ function closeDb(cb) { describe('Wallet service', function() { beforeEach(function(done) { openDb(function() { - storage = new Storage({ + storage = new Storage(); + storage.connect({ db: db - }); - blockchainExplorer = sinon.stub(); + }, function(err) { + should.not.exist(err); + blockchainExplorer = sinon.stub(); - WalletService.initialize({ - storage: storage, - blockchainExplorer: blockchainExplorer, + WalletService.initialize({ + storage: storage, + blockchainExplorer: blockchainExplorer, + }); + helpers.offset = 0; + done(); }); - helpers.offset = 0; - done(); }); }); after(function(done) { @@ -3055,18 +3058,21 @@ describe('Blockchain monitor', function() { sinon.stub(BlockchainMonitor.prototype, '_getAddressSubscriber').onFirstCall().returns(addressSubscriber); openDb(function() { - storage = new Storage({ + storage = new Storage(); + storage.connect({ db: db - }); - blockchainExplorer = sinon.stub(); + }, function(err) { + should.not.exist(err); + blockchainExplorer = sinon.stub(); - WalletService.initialize({ - storage: storage, - blockchainExplorer: blockchainExplorer, - }); - helpers.offset = 0; + WalletService.initialize({ + storage: storage, + blockchainExplorer: blockchainExplorer, + }); + helpers.offset = 0; - done(); + done(); + }); }); }); afterEach(function() { diff --git a/test/storage.js b/test/storage.js index e2ed216..1748f58 100644 --- a/test/storage.js +++ b/test/storage.js @@ -27,10 +27,13 @@ describe('Storage', function() { beforeEach(function(done) { initDb(function(err, db) { should.not.exist(err); - storage = new Storage({ + storage = new Storage(); + storage.connect({ db: db + }, function(err) { + should.not.exist(err); + done(); }); - done(); }); }); describe('Store & fetch wallet', function() { From d7ea3e48bb2652106161ff40b2a66359e31c39d5 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Mon, 20 Apr 2015 21:11:10 -0300 Subject: [PATCH 09/13] cleanup --- lib/storage.js | 6 ++- test/integration/server.js | 107 +++++++++++++++++++------------------ test/storage.js | 9 ++-- 3 files changed, 61 insertions(+), 61 deletions(-) diff --git a/lib/storage.js b/lib/storage.js index f7105e4..c9c5180 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -19,13 +19,15 @@ var collections = { NOTIFICATIONS: 'notifications', }; -var Storage = function() {}; +var Storage = function(opts) { + opts = opts || {}; + this.db = opts.db; +}; Storage.prototype.connect = function(opts, cb) { var self = this; opts = opts || {}; - this.db = opts.db; if (this.db) return cb(); diff --git a/test/integration/server.js b/test/integration/server.js index 5a0140e..b51e49e 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -211,53 +211,51 @@ helpers.createAddresses = function(server, wallet, main, change, cb) { var db, storage, blockchainExplorer; function openDb(cb) { - function dropDb(cb) { - db.dropDatabase(function(err) { - should.not.exist(err); - return cb(); - }); - }; - if (db) { - return dropDb(cb); - } else { - var url = 'mongodb://localhost:27017/bws'; - mongodb.MongoClient.connect(url, function(err, _db) { - should.not.exist(err); - db = _db; - return dropDb(cb); - }); - } + var url = 'mongodb://localhost:27017/bws'; + mongodb.MongoClient.connect(url, function(err, _db) { + should.not.exist(err); + db = _db; + return cb(); + }); +}; + +function resetDb(cb) { + if (!db) return cb(); + db.dropDatabase(function(err) { + should.not.exist(err); + return cb(); + }); }; function closeDb(cb) { - if (db) { - db.close(true, function(err) { - should.not.exist(err); - db = null; - return cb(); - }); - } else { + if (!db) return cb(); + db.close(true, function(err) { + should.not.exist(err); + db = null; return cb(); - } + }); }; -describe('Wallet service', function() { - beforeEach(function(done) { - openDb(function() { - storage = new Storage(); - storage.connect({ - db: db - }, function(err) { - should.not.exist(err); - blockchainExplorer = sinon.stub(); - WalletService.initialize({ - storage: storage, - blockchainExplorer: blockchainExplorer, - }); - helpers.offset = 0; - done(); +describe('Wallet service', function() { + before(function(done) { + openDb(function() { + storage = new Storage({ + db: db }); + done(); + }); + }); + beforeEach(function(done) { + resetDb(function() { + blockchainExplorer = sinon.stub(); + + WalletService.initialize({ + storage: storage, + blockchainExplorer: blockchainExplorer, + }); + helpers.offset = 0; + done(); }); }); after(function(done) { @@ -3052,27 +3050,30 @@ describe('Wallet service', function() { describe('Blockchain monitor', function() { var addressSubscriber; + before(function(done) { + openDb(function() { + storage = new Storage({ + db: db + }); + done(); + }); + }); + beforeEach(function(done) { addressSubscriber = sinon.stub(); addressSubscriber.subscribe = sinon.stub(); sinon.stub(BlockchainMonitor.prototype, '_getAddressSubscriber').onFirstCall().returns(addressSubscriber); - openDb(function() { - storage = new Storage(); - storage.connect({ - db: db - }, function(err) { - should.not.exist(err); - blockchainExplorer = sinon.stub(); + resetDb(function() { + blockchainExplorer = sinon.stub(); - WalletService.initialize({ - storage: storage, - blockchainExplorer: blockchainExplorer, - }); - helpers.offset = 0; - - done(); + WalletService.initialize({ + storage: storage, + blockchainExplorer: blockchainExplorer, }); + helpers.offset = 0; + + done(); }); }); afterEach(function() { diff --git a/test/storage.js b/test/storage.js index 1748f58..f3240b7 100644 --- a/test/storage.js +++ b/test/storage.js @@ -12,7 +12,7 @@ var Model = require('../lib/model'); function initDb(cb) { - var url = 'mongodb://localhost:27017/bws'; + var url = 'mongodb://localhost:27017'; mongodb.MongoClient.connect(url, function(err, db) { should.not.exist(err); db.dropDatabase(function(err) { @@ -27,13 +27,10 @@ describe('Storage', function() { beforeEach(function(done) { initDb(function(err, db) { should.not.exist(err); - storage = new Storage(); - storage.connect({ + storage = new Storage({ db: db - }, function(err) { - should.not.exist(err); - done(); }); + done(); }); }); describe('Store & fetch wallet', function() { From b84c1dc178480963ed8ccb9b6be4084efd1ef6cf Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Tue, 21 Apr 2015 14:43:35 -0300 Subject: [PATCH 10/13] async initialize --- bws.js | 35 ++++++++++++++++++++--------------- lib/expressapp.js | 9 ++++++--- lib/server.js | 37 +++++++++++++++++++++++++++++++++---- lib/storage.js | 12 +++++++++++- test/integration/server.js | 27 ++++++++------------------- 5 files changed, 78 insertions(+), 42 deletions(-) diff --git a/bws.js b/bws.js index b0e5247..3b166fb 100755 --- a/bws.js +++ b/bws.js @@ -28,30 +28,35 @@ if (config.https) { serverOpts.cert = fs.readFileSync(config.certificateFile || './ssl/certificate.pem'); } -var start = function() { +var start = function(cb) { var server; if (config.cluster) { server = sticky(clusterInstances, function() { - var app = ExpressApp.start(config); - var server = config.https ? serverModule.createServer(serverOpts, app) : + ExpressApp.start(config, function(err, app) { + var server = config.https ? serverModule.createServer(serverOpts, app) : + serverModule.Server(app); + WsApp.start(server, config); + return server; + }); + }); + return cb(server); + } else { + ExpressApp.start(config, function(err, app) { + server = config.https ? serverModule.createServer(serverOpts, app) : serverModule.Server(app); WsApp.start(server, config); - return server; + return cb(server); }); - } else { - var app = ExpressApp.start(config); - server = config.https ? serverModule.createServer(serverOpts, app) : - serverModule.Server(app); - WsApp.start(server, config); - } - server.listen(port, function(err) { - if (err) console.log('ERROR: ', err); - log.info('Bitcore Wallet Service running on port ' + port); - }); + }; }; if (config.cluster && !config.lockOpts.lockerServer) throw 'When running in cluster mode, locker server need to be configured'; -start(); +start(function(server) { + server.listen(port, function(err) { + if (err) console.log('ERROR: ', err); + log.info('Bitcore Wallet Service running on port ' + port); + }); +}); diff --git a/lib/expressapp.js b/lib/expressapp.js index 3e6bbe4..318387e 100644 --- a/lib/expressapp.js +++ b/lib/expressapp.js @@ -22,11 +22,11 @@ var ExpressApp = function() {}; * @param opts.WalletService options for WalletService class * @param opts.basePath * @param opts.disableLogs + * @param {Callback} cb */ -ExpressApp.start = function(opts) { +ExpressApp.start = function(opts, cb) { opts = opts || {}; - WalletService.initialize(opts); var app = express(); app.use(function(req, res, next) { res.setHeader('Access-Control-Allow-Origin', '*'); @@ -316,7 +316,10 @@ ExpressApp.start = function(opts) { app.use(opts.basePath || '/bws/api', router); - return app; + + WalletService.initialize(opts, function(err) { + return cb(err,app); + }); }; module.exports = ExpressApp; diff --git a/lib/server.js b/lib/server.js index 2d5a8c6..71caba6 100644 --- a/lib/server.js +++ b/lib/server.js @@ -53,15 +53,44 @@ WalletService.onNotification = function(func) { * @param {Object} opts * @param {Storage} [opts.storage] - The storage provider. * @param {Storage} [opts.blockchainExplorer] - The blockchainExporer provider. + * @param {Callback} cb */ -WalletService.initialize = function(opts) { +WalletService.initialize = function(opts, cb) { + $.shouldBeFunction(cb); + + opts = opts || {}; lock = opts.lock || new Lock(opts.lockOpts); - // TODO: This method needs to be async - storage = opts.storage || new Storage().connect(opts.storageOpts, function() {}); blockchainExplorer = opts.blockchainExplorer; blockchainExplorerOpts = opts.blockchainExplorerOpts; - initialized = true; + + if (initialized) + return cb(); + + if (opts.storage) { + storage = opts.storage; + initialized = true; + return cb(); + } else { + var newStorage = new Storage(); + newStorage.connect(opts.storageOpts, function(err) { + if (err) return cb(err); + storage = newStorage; + initialized = true; + return cb(); + }); + } +}; + + +WalletService.shutDown = function(cb) { + if (initialized) { + storage.disconnect(function(err) { + if (err) return cb(err); + initialized = false; + return cb(); + }); + } }; WalletService.getInstance = function() { diff --git a/lib/storage.js b/lib/storage.js index c9c5180..350fb9a 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -29,7 +29,7 @@ Storage.prototype.connect = function(opts, cb) { opts = opts || {}; - if (this.db) return cb(); + if (this.db) return cb(null); var config = opts.mongoDb || {}; var url = 'mongodb://' + (config.host || 'localhost') + ':' + (config.port ||  27017) + '/bws'; @@ -40,6 +40,16 @@ Storage.prototype.connect = function(opts, cb) { } self.db = db; console.log('Connection established to ', url); + return cb(null); + }); +}; + + +Storage.prototype.disconnect = function(cb) { + var self = this; + this.db.close(true, function(err) { + if (err) return cb(err); + self.db = null; return cb(); }); }; diff --git a/test/integration/server.js b/test/integration/server.js index b51e49e..086970a 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -227,15 +227,6 @@ function resetDb(cb) { }); }; -function closeDb(cb) { - if (!db) return cb(); - db.close(true, function(err) { - should.not.exist(err); - db = null; - return cb(); - }); -}; - describe('Wallet service', function() { before(function(done) { @@ -249,21 +240,20 @@ describe('Wallet service', function() { beforeEach(function(done) { resetDb(function() { blockchainExplorer = sinon.stub(); - WalletService.initialize({ storage: storage, blockchainExplorer: blockchainExplorer, + }, function() { + helpers.offset = 0; + done(); }); - helpers.offset = 0; - done(); }); }); after(function(done) { - closeDb(done); + WalletService.shutDown(done); }); describe('#getInstanceWithAuth', function() { - beforeEach(function() {}); it('should get server instance for existing copayer', function(done) { @@ -3066,21 +3056,20 @@ describe('Blockchain monitor', function() { resetDb(function() { blockchainExplorer = sinon.stub(); - WalletService.initialize({ storage: storage, blockchainExplorer: blockchainExplorer, + }, function() { + helpers.offset = 0; + done(); }); - helpers.offset = 0; - - done(); }); }); afterEach(function() { BlockchainMonitor.prototype._getAddressSubscriber.restore(); }); after(function(done) { - closeDb(done); + WalletService.shutDown(done); }); it('should subscribe wallet', function(done) { From 7a0ec9f11190041d26438f5309865c05356233c6 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Tue, 21 Apr 2015 16:24:01 -0300 Subject: [PATCH 11/13] first attempt at integrating tingodb --- lib/storage.js | 29 +++++++++++++++++++++++------ test/integration/server.js | 15 +++++++++------ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/lib/storage.js b/lib/storage.js index 350fb9a..c9e6ac4 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -77,14 +77,9 @@ Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { return this.storeWallet(wallet, cb); }; -Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { +Storage.prototype.fetchCopayerLookup2 = function(copayerId, cb) { this.db.collection(collections.WALLETS).findOne({ 'copayers.id': copayerId - }, { - fields: { - id: 1, - copayers: 1, - }, }, function(err, result) { if (err) return cb(err); if (!result) return cb(); @@ -98,6 +93,28 @@ Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { }); }; +Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { + this.db.collection(collections.WALLETS).find({}).toArray(function(err, result) { + if (err) return cb(err); + + result = _.find(result, function(w) { + return _.any(w.copayers, { + id: copayerId + }); + }); + + + if (!result) return cb(); + var copayer = _.find(result.copayers, { + id: copayerId + }); + return cb(null, { + walletId: result.id, + requestPubKey: copayer.requestPubKey, + }); + }); +}; + // TODO: should be done client-side Storage.prototype._completeTxData = function(walletId, txs, cb) { var txList = [].concat(txs); diff --git a/test/integration/server.js b/test/integration/server.js index 086970a..87cae74 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -10,7 +10,8 @@ var should = chai.should(); var log = require('npmlog'); log.debug = log.verbose; -var mongodb = require('mongodb'); +var fs = require('fs'); +var tingodb = require('tingodb')(); var Utils = require('../../lib/utils'); var WalletUtils = require('bitcore-wallet-utils'); @@ -211,10 +212,13 @@ helpers.createAddresses = function(server, wallet, main, change, cb) { var db, storage, blockchainExplorer; function openDb(cb) { - var url = 'mongodb://localhost:27017/bws'; - mongodb.MongoClient.connect(url, function(err, _db) { - should.not.exist(err); - db = _db; + var tingodb = require('tingodb')(); + var dbDir = './db/test/'; + fs.mkdir(dbDir, function(err) { + if (err && err.code != 'EEXIST') { + throw new Error('Could not create test db directory at ./db/test/'); + } + db = new tingodb.Db(dbDir, {}); return cb(); }); }; @@ -222,7 +226,6 @@ function openDb(cb) { function resetDb(cb) { if (!db) return cb(); db.dropDatabase(function(err) { - should.not.exist(err); return cb(); }); }; From 95b0b72416fca0f880ed7dcdacc7fb077c720ffc Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Tue, 21 Apr 2015 23:16:18 -0300 Subject: [PATCH 12/13] all tests passing with tingodb memStore --- lib/storage.js | 70 ++++++++++++++++---------------------- test/integration/server.js | 15 +++----- test/storage.js | 31 ++++++++++------- 3 files changed, 53 insertions(+), 63 deletions(-) diff --git a/lib/storage.js b/lib/storage.js index c9e6ac4..c1f1a2c 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -17,6 +17,7 @@ var collections = { TXS: 'txs', ADDRESSES: 'addresses', NOTIFICATIONS: 'notifications', + COPAYERS_LOOKUP: 'copayers_lookup', }; var Storage = function(opts) { @@ -74,44 +75,38 @@ Storage.prototype.storeWallet = function(wallet, cb) { }; Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { - return this.storeWallet(wallet, cb); -}; + var self = this; -Storage.prototype.fetchCopayerLookup2 = function(copayerId, cb) { - this.db.collection(collections.WALLETS).findOne({ - 'copayers.id': copayerId - }, function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - var copayer = _.find(result.copayers, { - id: copayerId - }); - return cb(null, { - walletId: result.id, + var copayerLookups = _.map(wallet.copayers, function(copayer) { + return { + copayerId: copayer.id, + walletId: wallet.id, requestPubKey: copayer.requestPubKey, + }; + }); + + this.db.collection(collections.COPAYERS_LOOKUP).remove({ + walletId: wallet.id + }, { + w: 1 + }, function(err) { + if (err) return cb(err); + self.db.collection(collections.COPAYERS_LOOKUP).insert(copayerLookups, { + w: 1 + }, function(err) { + if (err) return cb(err); + return self.storeWallet(wallet, cb); }); }); }; Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { - this.db.collection(collections.WALLETS).find({}).toArray(function(err, result) { + this.db.collection(collections.COPAYERS_LOOKUP).findOne({ + copayerId: copayerId + }, function(err, result) { if (err) return cb(err); - - result = _.find(result, function(w) { - return _.any(w.copayers, { - id: copayerId - }); - }); - - if (!result) return cb(); - var copayer = _.find(result.copayers, { - id: copayerId - }); - return cb(null, { - walletId: result.id, - requestPubKey: copayer.requestPubKey, - }); + return cb(null, result); }); }; @@ -278,18 +273,11 @@ Storage.prototype.removeWallet = function(walletId, cb) { }, next); }, function(next) { - self.db.collection(collections.ADDRESSES).remove({ - walletId: walletId - }, next); - }, - function(next) { - self.db.collection(collections.TXS).remove({ - walletId: walletId - }, next); - }, - function(next) { - self.db.collection(collections.NOTIFICATIONS).remove({ - walletId: walletId + var otherCollections = _.without(_.values(collections), collections.WALLETS); + async.each(otherCollections, function(col, next) { + self.db.collection(col).remove({ + walletId: walletId + }, next); }, next); }, ], cb); diff --git a/test/integration/server.js b/test/integration/server.js index 87cae74..165fc31 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -11,7 +11,9 @@ var log = require('npmlog'); log.debug = log.verbose; var fs = require('fs'); -var tingodb = require('tingodb')(); +var tingodb = require('tingodb')({ + memStore: true +}); var Utils = require('../../lib/utils'); var WalletUtils = require('bitcore-wallet-utils'); @@ -212,15 +214,8 @@ helpers.createAddresses = function(server, wallet, main, change, cb) { var db, storage, blockchainExplorer; function openDb(cb) { - var tingodb = require('tingodb')(); - var dbDir = './db/test/'; - fs.mkdir(dbDir, function(err) { - if (err && err.code != 'EEXIST') { - throw new Error('Could not create test db directory at ./db/test/'); - } - db = new tingodb.Db(dbDir, {}); - return cb(); - }); + db = new tingodb.Db('./db/test', {}); + return cb(); }; function resetDb(cb) { diff --git a/test/storage.js b/test/storage.js index f3240b7..5f5ff87 100644 --- a/test/storage.js +++ b/test/storage.js @@ -5,34 +5,41 @@ var async = require('async'); var chai = require('chai'); var sinon = require('sinon'); var should = chai.should(); -var mongodb = require('mongodb'); +var tingodb = require('tingodb')({ + memStore: true +}); var Storage = require('../lib/storage'); var Model = require('../lib/model'); +var db, storage; -function initDb(cb) { - var url = 'mongodb://localhost:27017'; - mongodb.MongoClient.connect(url, function(err, db) { - should.not.exist(err); - db.dropDatabase(function(err) { - return cb(null, db); - }); +function openDb(cb) { + db = new tingodb.Db('./db/test', {}); + return cb(); +}; + +function resetDb(cb) { + if (!db) return cb(); + db.dropDatabase(function(err) { + return cb(); }); }; describe('Storage', function() { - var storage; - beforeEach(function(done) { - initDb(function(err, db) { - should.not.exist(err); + before(function(done) { + openDb(function() { storage = new Storage({ db: db }); done(); }); }); + beforeEach(function(done) { + resetDb(done); + }); + describe('Store & fetch wallet', function() { it('should correctly store and fetch wallet', function(done) { var wallet = Model.Wallet.create({ From 4864e81fba8de6e4d33af142df5f82ce8451b356 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 22 Apr 2015 10:20:20 -0300 Subject: [PATCH 13/13] migration script --- lib/storage_leveldb.js | 14 ++-------- scripts/level2mongo.js | 63 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 11 deletions(-) create mode 100644 scripts/level2mongo.js diff --git a/lib/storage_leveldb.js b/lib/storage_leveldb.js index f2bed0b..8553287 100644 --- a/lib/storage_leveldb.js +++ b/lib/storage_leveldb.js @@ -2,7 +2,6 @@ var _ = require('lodash'); var levelup = require('levelup'); -var multilevel = require('multilevel'); var net = require('net'); var async = require('async'); var $ = require('preconditions').singleton(); @@ -22,16 +21,9 @@ var Storage = function(opts) { this.db = opts.db; if (!this.db) { - if (opts.multiLevel) { - this.db = multilevel.client(); - var con = net.connect(opts.multiLevel); - con.pipe(this.db.createRpcStream()).pipe(con); - log.info('Using multilevel server:' + opts.multiLevel.host + ':' + opts.multiLevel.port); - } else { - this.db = levelup(opts.dbPath || './db/bws.db', { - valueEncoding: 'json' - }); - } + this.db = levelup(opts.dbPath || './db/bws.db', { + valueEncoding: 'json' + }); } }; diff --git a/scripts/level2mongo.js b/scripts/level2mongo.js new file mode 100644 index 0000000..125d484 --- /dev/null +++ b/scripts/level2mongo.js @@ -0,0 +1,63 @@ +'use strict'; + +var LevelStorage = require('../lib/storage_leveldb'); +var MongoStorage = require('../lib/storage'); + + +var level = new LevelStorage({ + dbPath: './db/bws.db', +}); + +var mongo = new MongoStorage(); +mongo.connect({ + host: 'localhost', + port: '27017' +}, function(err) { + if (err) throw err; + mongo.db.dropDatabase(function(err) { + if (err) throw err; + run(function(err) { + if (err) throw err; + console.log('All data successfully migrated'); + process.exit(0); + // mongo._dump(function() { + // process.exit(0); + // }); + }); + }); +}); + + +function run(cb) { + level.db.readStream() + .on('data', function(data) { + migrate(data.key, data.value, function(err) { + if (err) throw err; + }); + }) + .on('error', function(err) { + return cb(err); + }) + .on('end', function() { + return cb(); + }); +}; + +function migrate(key, value, cb) { + if (key.match(/^copayer!/)) { + value.copayerId = key.substring(key.indexOf('!') + 1); + mongo.db.collection('copayers_lookup').insert(value, cb); + } else if (key.match(/!addr!/)) { + value.walletId = key.substring(2, key.indexOf('!addr')); + mongo.db.collection('addresses').insert(value, cb); + } else if (key.match(/!not!/)) { + mongo.db.collection('notifications').insert(value, cb); + } else if (key.match(/!p?txp!/)) { + value.isPending = key.indexOf('!ptxp!') != -1; + mongo.db.collection('txs').insert(value, cb); + } else if (key.match(/!main$/)) { + mongo.db.collection('wallets').insert(value, cb); + } else { + return cb(new Error('Invalid key ' + key)); + } +};