diff --git a/lib/model/index.js b/lib/model/index.js index 7d1c5d8..3a3fcf2 100644 --- a/lib/model/index.js +++ b/lib/model/index.js @@ -1,5 +1,9 @@ var Model = {}; Model.Wallet = require('./wallet'); +Model.Copayer = require('./copayer'); +Model.TxProposal = require('./txproposal'); +Model.Address = require('./address'); +Model.Notification = require('./notification'); module.exports = Model; diff --git a/lib/storage_mongo.js b/lib/storage_mongo.js index 1e3ec37..cb0e9bd 100644 --- a/lib/storage_mongo.js +++ b/lib/storage_mongo.js @@ -1,7 +1,6 @@ 'use strict'; var _ = require('lodash'); -var levelup = require('mongodb'); var async = require('async'); var $ = require('preconditions').singleton(); var log = require('npmlog'); @@ -11,11 +10,7 @@ var util = require('util'); var mongodb = require('mongodb'); -var Wallet = require('./model/wallet'); -var Copayer = require('./model/copayer'); -var Address = require('./model/address'); -var TxProposal = require('./model/txproposal'); -var Notification = require('./model/notification'); +var Model = require('./model'); var Storage = function(opts) { opts = opts || {}; @@ -34,87 +29,51 @@ var Storage = function(opts) { } }; -var zeroPad = function(x, length) { - return _.padLeft(parseInt(x), length, '0'); -}; - -var walletPrefix = function(id) { - return 'w!' + id; -}; - -var opKey = function(key) { - return key ? '!' + key : ''; -}; - -var MAX_TS = _.repeat('9', 14); - - -var KEY = { - WALLET: function(walletId) { - return walletPrefix(walletId) + '!main'; - }, - COPAYER: function(id) { - return 'copayer!' + id; - }, - TXP: function(walletId, txProposalId) { - return walletPrefix(walletId) + '!txp' + opKey(txProposalId); - }, - NOTIFICATION: function(walletId, notificationId) { - return walletPrefix(walletId) + '!not' + opKey(notificationId); - }, - PENDING_TXP: function(walletId, txProposalId) { - return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId); - }, - ADDRESS: function(walletId, address) { - return walletPrefix(walletId) + '!addr' + opKey(address); - }, -}; - Storage.prototype.fetchWallet = function(id, cb) { - this.db.get(KEY.WALLET(id), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return cb(null, Wallet.fromObj(data)); + this.db.collection('wallets').findOne({ + id: id + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + return cb(null, Model.Wallet.fromObj(result)); }); }; Storage.prototype.storeWallet = function(wallet, cb) { - this.db.put(KEY.WALLET(wallet.id), wallet, cb); + this.db.collection('wallets').update({ + id: wallet.id + }, wallet, { + w: 1, + upsert: true, + }, cb); }; Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { - var ops = []; - ops.push({ - type: 'put', - key: KEY.WALLET(wallet.id), - value: wallet - }); - _.each(wallet.copayers, function(copayer) { - var value = { - walletId: wallet.id, - requestPubKey: copayer.requestPubKey, - }; - ops.push({ - type: 'put', - key: KEY.COPAYER(copayer.id), - value: value - }); - }); - this.db.batch(ops, cb); + return this.storeWallet(wallet, cb); }; Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { - this.db.get(KEY.COPAYER(copayerId), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return cb(null, data); + this.db.collection('wallets').findOne({ + 'copayers.id': copayerId + }, { + fields: { + id: 1, + copayers: 1, + }, + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var copayer = _.find(result.copayers, { + id: copayerId + }); + return cb(null, { + walletId: result.id, + requestPubKey: copayer.requestPubKey, + }); }); }; +// TODO: should be done client-side Storage.prototype._completeTxData = function(walletId, txs, cb) { var txList = [].concat(txs); this.fetchWallet(walletId, function(err, wallet) { @@ -131,12 +90,14 @@ Storage.prototype._completeTxData = function(walletId, txs, cb) { Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { var self = this; - this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return self._completeTxData(walletId, TxProposal.fromObj(data), cb); + + this.db.collection('txs').findOne({ + id: txProposalId, + walletId: walletId + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + return self._completeTxData(walletId, Model.TxProposal.fromObj(result), cb); }); }; @@ -144,22 +105,19 @@ Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { Storage.prototype.fetchPendingTxs = function(walletId, cb) { var self = this; - var txs = []; - var key = KEY.PENDING_TXP(walletId); - this.db.createReadStream({ - gte: key, - lt: key + '~' - }) - .on('data', function(data) { - txs.push(TxProposal.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return self._completeTxData(walletId, txs, cb); + this.db.collection('txs').find({ + walletId: walletId, + isPending: true + }).sort({ + createdOn: 1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var txs = _.map(result, function(tx) { + return Model.TxProposal.fromObj(tx); }); + return self._completeTxData(walletId, txs, cb); + }); }; /** @@ -173,31 +131,30 @@ Storage.prototype.fetchPendingTxs = function(walletId, cb) { Storage.prototype.fetchTxs = function(walletId, opts, cb) { var self = this; - var txs = []; opts = opts || {}; - opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; - opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; - opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; - var key = KEY.TXP(walletId, opts.minTs); - var endkey = KEY.TXP(walletId, opts.maxTs); + var tsFilter = {}; + if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; + if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; - this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: true, - limit: opts.limit, - }) - .on('data', function(data) { - txs.push(TxProposal.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return self._completeTxData(walletId, txs, cb); + var filter = { + walletId: walletId + }; + if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; + + var mods = {}; + if (_.isNumber(opts.limit)) mods.limit = opts.limit; + + this.db.collection('txs').find(filter, mods).sort({ + createdOn: 1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var txs = _.map(result, function(tx) { + return Model.TxProposal.fromObj(tx); }); + return self._completeTxData(walletId, txs, cb); + }); }; @@ -210,181 +167,133 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) { * @param opts.limit */ Storage.prototype.fetchNotifications = function(walletId, opts, cb) { - var txs = []; + var self = this; + opts = opts || {}; - opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; - opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; - opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; - var key = KEY.NOTIFICATION(walletId, opts.minTs); - var endkey = KEY.NOTIFICATION(walletId, opts.maxTs); + var tsFilter = {}; + if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; + if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; - this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: opts.reverse, - limit: opts.limit, - }) - .on('data', function(data) { - txs.push(Notification.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return cb(null, txs); + var filter = { + walletId: walletId + }; + if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; + + var mods = {}; + if (_.isNumber(opts.limit)) mods.limit = opts.limit; + + this.db.collection('notifications').find(filter, mods).sort({ + createdOn: 1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var notifications = _.map(result, function(notification) { + return Model.Notification.fromObj(notification); }); + return cb(null, notifications); + }); }; +// TODO: remove walletId from signature Storage.prototype.storeNotification = function(walletId, notification, cb) { - this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb); + this.db.collection('notifications').insert(notification, { + w: 1 + }, cb); }; - -// TODO should we store only txp.id on keys for indexing -// or the whole txp? For now, the entire record makes sense -// (faster + easier to access) +// TODO: remove walletId from signature Storage.prototype.storeTx = function(walletId, txp, cb) { - var ops = [{ - type: 'put', - key: KEY.TXP(walletId, txp.id), - value: txp, - }]; - - if (txp.isPending()) { - ops.push({ - type: 'put', - key: KEY.PENDING_TXP(walletId, txp.id), - value: txp, - }); - } else { - ops.push({ - type: 'del', - key: KEY.PENDING_TXP(walletId, txp.id), - }); - } - this.db.batch(ops, cb); + txp.isPending = txp.isPending(); // Persist attribute to use when querying + this.db.collection('txs').update({ + id: txp.id, + walletId: walletId + }, txp, { + w: 1, + upsert: true, + }, cb); }; Storage.prototype.removeTx = function(walletId, txProposalId, cb) { - var ops = [{ - type: 'del', - key: KEY.TXP(walletId, txProposalId), + this.db.collection('txs').findAndRemove({ + id: txProposalId, + walletId: walletId }, { - type: 'del', - key: KEY.PENDING_TXP(walletId, txProposalId), - }]; - - this.db.batch(ops, cb); -}; - -Storage.prototype._delByKey = function(key, cb) { - var self = this; - var keys = []; - this.db.createKeyStream({ - gte: key, - lt: key + '~', - }) - .on('data', function(key) { - keys.push(key); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function(err) { - self.db.batch(_.map(keys, function(k) { - return { - key: k, - type: 'del' - }; - }), function(err) { - return cb(err); - }); - }); -}; - -Storage.prototype._removeCopayers = function(walletId, cb) { - var self = this; - - this.fetchWallet(walletId, function(err, w) { - if (err || !w) return cb(err); - - self.db.batch(_.map(w.copayers, function(c) { - return { - type: 'del', - key: KEY.COPAYER(c.id), - }; - }), cb); - }); + w: 1 + }, cb); }; Storage.prototype.removeWallet = function(walletId, cb) { var self = this; - async.series([ + async.parallel([ function(next) { - // This should be the first step. Will check the wallet exists - self._removeCopayers(walletId, next); + this.db.collections('wallets').findAndRemove({ + id: walletId + }, next); }, function(next) { - self._delByKey(walletPrefix(walletId), cb); + this.db.collections('addresses').findAndRemove({ + walletId: walletId + }, next); + }, + function(next) { + this.db.collections('txs').findAndRemove({ + walletId: walletId + }, next); + }, + function(next) { + this.db.collections('notifications').findAndRemove({ + walletId: walletId + }, next); }, ], cb); }; Storage.prototype.fetchAddresses = function(walletId, cb) { - var addresses = []; - var key = KEY.ADDRESS(walletId); - this.db.createReadStream({ - gte: key, - lt: key + '~' - }) - .on('data', function(data) { - addresses.push(Address.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return cb(null, addresses); + var self = this; + + this.db.collection('addresses').find({ + walletId: walletId, + }).sort({ + createdOn: 1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var addresses = _.map(result, function(address) { + return Model.Address.fromObj(address); }); + return cb(null, addresses); + }); }; Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { - var ops = _.map([].concat(addresses), function(address) { - return { - type: 'put', - key: KEY.ADDRESS(wallet.id, address.address), - value: address, - }; + var self = this; + this.db.collection('addresses').insert([].concat(addresses), { + w: 1 + }, function(err) { + if (err) return cb(err); + self.storeWallet(wallet, cb); }); - ops.unshift({ - type: 'put', - key: KEY.WALLET(wallet.id), - value: wallet, - }); - - this.db.batch(ops, cb); }; Storage.prototype._dump = function(cb, fn) { fn = fn || console.log; - this.db.readStream() - .on('data', function(data) { - fn(util.inspect(data, { - depth: 10 - })); - }) - .on('end', function() { - if (cb) return cb(); - }); + var self = this; + this.db.collections(function(err, collections) { + if (err) return cb(err); + async.eachSeries(collections, function(col, next) { + fn('--------' + col); + col.find().toArray(function(err, item) { + fn(item); + next(err); + }); + }, cb); + }); }; module.exports = Storage; diff --git a/package.json b/package.json index d670edf..66b52f3 100644 --- a/package.json +++ b/package.json @@ -52,7 +52,8 @@ "memdown": "^1.0.0", "mocha": "^1.18.2", "sinon": "^1.10.3", - "supertest": "*" + "supertest": "*", + "tingodb": "^0.3.4" }, "scripts": { "start": "node bws.js", @@ -60,14 +61,11 @@ "test": "./node_modules/.bin/mocha", "coveralls": "./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" }, - "contributors": [ - { - "name": "Ivan Socolsky", - "email": "ivan@bitpay.com" - }, - { - "name": "Matias Alejo Garcia", - "email": "ematiu@gmail.com" - } - ] + "contributors": [{ + "name": "Ivan Socolsky", + "email": "ivan@bitpay.com" + }, { + "name": "Matias Alejo Garcia", + "email": "ematiu@gmail.com" + }] } diff --git a/test/storage.js b/test/storage.js index 0378d61..143259d 100644 --- a/test/storage.js +++ b/test/storage.js @@ -1,24 +1,62 @@ 'use strict'; var _ = require('lodash'); +var async = require('async'); var chai = require('chai'); var sinon = require('sinon'); var should = chai.should(); var levelup = require('levelup'); var memdown = require('memdown'); -var Storage = require('../lib/storage'); +var mongodb = require('mongodb'); + +var StorageLevelDb = require('../lib/storage'); +var StorageMongoDb = require('../lib/storage_mongo'); var Model = require('../lib/model'); -describe.only('Storage', function() { - var storage; - beforeEach(function() { - var db = levelup(memdown, { - valueEncoding: 'json' +function initStorageLevelDb(cb) { + var db = levelup(memdown, { + valueEncoding: 'json' + }); + return cb(null, db); +}; + +function initStorageMongoDb(cb) { + var url = 'mongodb://localhost:27017/bws'; + mongodb.MongoClient.connect(url, function(err, db) { + should.not.exist(err); + db.dropDatabase(function(err) { + return cb(null, db); }); - storage = new Storage({ - db: db + }); +}; + + + +var Storage, initDb; + + +var useLevel = false; + +if (useLevel) { + Storage = StorageLevelDb; + initDb = initStorageLevelDb; +} else { + Storage = StorageMongoDb; + initDb = initStorageMongoDb; +} + + +describe('Storage', function() { + var storage; + beforeEach(function(done) { + initDb(function(err, db) { + should.not.exist(err); + storage = new Storage({ + db: db + }); + done(); }); }); describe('Store & fetch wallet', function() { @@ -51,4 +89,124 @@ describe.only('Storage', function() { }); }); }); + describe('Copayer lookup', function() { + it('should correctly store and fetch copayer lookup', function(done) { + var wallet = Model.Wallet.create({ + id: '123', + name: 'my wallet', + m: 2, + n: 3, + }); + _.each(_.range(3), function(i) { + var copayer = Model.Copayer.create({ + name: 'copayer ' + i, + xPubKey: 'xPubKey ' + i, + requestPubKey: 'requestPubKey ' + i, + }); + wallet.addCopayer(copayer); + }); + + should.exist(wallet); + storage.storeWalletAndUpdateCopayersLookup(wallet, function(err) { + should.not.exist(err); + storage.fetchCopayerLookup(wallet.copayers[1].id, function(err, lookup) { + should.not.exist(err); + should.exist(lookup); + lookup.walletId.should.equal('123'); + lookup.requestPubKey.should.equal('requestPubKey 1'); + done(); + }) + }); + }); + it('should not return error if copayer not found', function(done) { + storage.fetchCopayerLookup('2', function(err, lookup) { + should.not.exist(err); + should.not.exist(lookup); + done(); + }); + }); + }); + + describe('Transaction proposals', function() { + var wallet, proposals; + + beforeEach(function(done) { + wallet = Model.Wallet.create({ + id: '123', + name: 'my wallet', + m: 2, + n: 3, + }); + _.each(_.range(3), function(i) { + var copayer = Model.Copayer.create({ + name: 'copayer ' + i, + xPubKey: 'xPubKey ' + i, + requestPubKey: 'requestPubKey ' + i, + }); + wallet.addCopayer(copayer); + }); + should.exist(wallet); + storage.storeWalletAndUpdateCopayersLookup(wallet, function(err) { + should.not.exist(err); + + proposals = _.map(_.range(4), function(i) { + var tx = Model.TxProposal.create({ + walletId: '123', + creatorId: wallet.copayers[0].id, + amount: i + 100, + }); + if (i % 2 == 0) { + tx.status = 'rejected'; + tx.isPending().should.be.false; + } + return tx; + }); + async.each(proposals, function(tx, next) { + storage.storeTx('123', tx, next); + }, function(err) { + should.not.exist(err); + done(); + }); + }); + }); + it('should fetch tx', function(done) { + storage.fetchTx('123', proposals[0].id, function(err, tx) { + should.not.exist(err); + should.exist(tx); + tx.id.should.equal(proposals[0].id); + tx.walletId.should.equal(proposals[0].walletId); + tx.creatorName.should.equal('copayer 0'); + done(); + }); + }); + it('should fetch all pending txs', function(done) { + storage.fetchPendingTxs('123', function(err, txs) { + should.not.exist(err); + should.exist(txs); + txs.length.should.equal(2); + txs = _.sortBy(txs, 'amount'); + txs[0].amount.should.equal(101); + txs[1].amount.should.equal(103); + done(); + }); + }); + it('should remove tx', function(done) { + storage.removeTx('123', proposals[0].id, function(err) { + should.not.exist(err); + storage.fetchTx('123', proposals[0].id, function(err, tx) { + should.not.exist(err); + should.not.exist(tx); + storage.fetchTxs('123', {}, function(err, txs) { + should.not.exist(err); + should.exist(txs); + txs.length.should.equal(3); + _.any(txs, { + id: proposals[0].id + }).should.be.false; + done(); + }); + }); + }); + }); + }); });