implement mongodb storage

This commit is contained in:
Ivan Socolsky 2015-04-20 17:04:26 -03:00
parent 706079da82
commit 1b2b0dc146
4 changed files with 338 additions and 269 deletions

View File

@ -1,5 +1,9 @@
var Model = {};
Model.Wallet = require('./wallet');
Model.Copayer = require('./copayer');
Model.TxProposal = require('./txproposal');
Model.Address = require('./address');
Model.Notification = require('./notification');
module.exports = Model;

View File

@ -1,7 +1,6 @@
'use strict';
var _ = require('lodash');
var levelup = require('mongodb');
var async = require('async');
var $ = require('preconditions').singleton();
var log = require('npmlog');
@ -11,11 +10,7 @@ var util = require('util');
var mongodb = require('mongodb');
var Wallet = require('./model/wallet');
var Copayer = require('./model/copayer');
var Address = require('./model/address');
var TxProposal = require('./model/txproposal');
var Notification = require('./model/notification');
var Model = require('./model');
var Storage = function(opts) {
opts = opts || {};
@ -34,87 +29,51 @@ var Storage = function(opts) {
}
};
var zeroPad = function(x, length) {
return _.padLeft(parseInt(x), length, '0');
};
var walletPrefix = function(id) {
return 'w!' + id;
};
var opKey = function(key) {
return key ? '!' + key : '';
};
var MAX_TS = _.repeat('9', 14);
var KEY = {
WALLET: function(walletId) {
return walletPrefix(walletId) + '!main';
},
COPAYER: function(id) {
return 'copayer!' + id;
},
TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!txp' + opKey(txProposalId);
},
NOTIFICATION: function(walletId, notificationId) {
return walletPrefix(walletId) + '!not' + opKey(notificationId);
},
PENDING_TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId);
},
ADDRESS: function(walletId, address) {
return walletPrefix(walletId) + '!addr' + opKey(address);
},
};
Storage.prototype.fetchWallet = function(id, cb) {
this.db.get(KEY.WALLET(id), function(err, data) {
if (err) {
if (err.notFound) return cb();
return cb(err);
}
return cb(null, Wallet.fromObj(data));
this.db.collection('wallets').findOne({
id: id
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return cb(null, Model.Wallet.fromObj(result));
});
};
Storage.prototype.storeWallet = function(wallet, cb) {
this.db.put(KEY.WALLET(wallet.id), wallet, cb);
this.db.collection('wallets').update({
id: wallet.id
}, wallet, {
w: 1,
upsert: true,
}, cb);
};
Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) {
var ops = [];
ops.push({
type: 'put',
key: KEY.WALLET(wallet.id),
value: wallet
});
_.each(wallet.copayers, function(copayer) {
var value = {
walletId: wallet.id,
requestPubKey: copayer.requestPubKey,
};
ops.push({
type: 'put',
key: KEY.COPAYER(copayer.id),
value: value
});
});
this.db.batch(ops, cb);
return this.storeWallet(wallet, cb);
};
Storage.prototype.fetchCopayerLookup = function(copayerId, cb) {
this.db.get(KEY.COPAYER(copayerId), function(err, data) {
if (err) {
if (err.notFound) return cb();
return cb(err);
}
return cb(null, data);
this.db.collection('wallets').findOne({
'copayers.id': copayerId
}, {
fields: {
id: 1,
copayers: 1,
},
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var copayer = _.find(result.copayers, {
id: copayerId
});
return cb(null, {
walletId: result.id,
requestPubKey: copayer.requestPubKey,
});
});
};
// TODO: should be done client-side
Storage.prototype._completeTxData = function(walletId, txs, cb) {
var txList = [].concat(txs);
this.fetchWallet(walletId, function(err, wallet) {
@ -131,12 +90,14 @@ Storage.prototype._completeTxData = function(walletId, txs, cb) {
Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
var self = this;
this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) {
if (err) {
if (err.notFound) return cb();
return cb(err);
}
return self._completeTxData(walletId, TxProposal.fromObj(data), cb);
this.db.collection('txs').findOne({
id: txProposalId,
walletId: walletId
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return self._completeTxData(walletId, Model.TxProposal.fromObj(result), cb);
});
};
@ -144,22 +105,19 @@ Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
Storage.prototype.fetchPendingTxs = function(walletId, cb) {
var self = this;
var txs = [];
var key = KEY.PENDING_TXP(walletId);
this.db.createReadStream({
gte: key,
lt: key + '~'
})
.on('data', function(data) {
txs.push(TxProposal.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return self._completeTxData(walletId, txs, cb);
this.db.collection('txs').find({
walletId: walletId,
isPending: true
}).sort({
createdOn: 1
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var txs = _.map(result, function(tx) {
return Model.TxProposal.fromObj(tx);
});
return self._completeTxData(walletId, txs, cb);
});
};
/**
@ -173,31 +131,30 @@ Storage.prototype.fetchPendingTxs = function(walletId, cb) {
Storage.prototype.fetchTxs = function(walletId, opts, cb) {
var self = this;
var txs = [];
opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0;
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS;
var key = KEY.TXP(walletId, opts.minTs);
var endkey = KEY.TXP(walletId, opts.maxTs);
var tsFilter = {};
if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs;
if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs;
this.db.createReadStream({
gt: key,
lt: endkey + '~',
reverse: true,
limit: opts.limit,
})
.on('data', function(data) {
txs.push(TxProposal.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return self._completeTxData(walletId, txs, cb);
var filter = {
walletId: walletId
};
if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter;
var mods = {};
if (_.isNumber(opts.limit)) mods.limit = opts.limit;
this.db.collection('txs').find(filter, mods).sort({
createdOn: 1
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var txs = _.map(result, function(tx) {
return Model.TxProposal.fromObj(tx);
});
return self._completeTxData(walletId, txs, cb);
});
};
@ -210,181 +167,133 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) {
* @param opts.limit
*/
Storage.prototype.fetchNotifications = function(walletId, opts, cb) {
var txs = [];
var self = this;
opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0;
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS;
var key = KEY.NOTIFICATION(walletId, opts.minTs);
var endkey = KEY.NOTIFICATION(walletId, opts.maxTs);
var tsFilter = {};
if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs;
if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs;
this.db.createReadStream({
gt: key,
lt: endkey + '~',
reverse: opts.reverse,
limit: opts.limit,
})
.on('data', function(data) {
txs.push(Notification.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return cb(null, txs);
var filter = {
walletId: walletId
};
if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter;
var mods = {};
if (_.isNumber(opts.limit)) mods.limit = opts.limit;
this.db.collection('notifications').find(filter, mods).sort({
createdOn: 1
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var notifications = _.map(result, function(notification) {
return Model.Notification.fromObj(notification);
});
return cb(null, notifications);
});
};
// TODO: remove walletId from signature
Storage.prototype.storeNotification = function(walletId, notification, cb) {
this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb);
this.db.collection('notifications').insert(notification, {
w: 1
}, cb);
};
// TODO should we store only txp.id on keys for indexing
// or the whole txp? For now, the entire record makes sense
// (faster + easier to access)
// TODO: remove walletId from signature
Storage.prototype.storeTx = function(walletId, txp, cb) {
var ops = [{
type: 'put',
key: KEY.TXP(walletId, txp.id),
value: txp,
}];
if (txp.isPending()) {
ops.push({
type: 'put',
key: KEY.PENDING_TXP(walletId, txp.id),
value: txp,
});
} else {
ops.push({
type: 'del',
key: KEY.PENDING_TXP(walletId, txp.id),
});
}
this.db.batch(ops, cb);
txp.isPending = txp.isPending(); // Persist attribute to use when querying
this.db.collection('txs').update({
id: txp.id,
walletId: walletId
}, txp, {
w: 1,
upsert: true,
}, cb);
};
Storage.prototype.removeTx = function(walletId, txProposalId, cb) {
var ops = [{
type: 'del',
key: KEY.TXP(walletId, txProposalId),
this.db.collection('txs').findAndRemove({
id: txProposalId,
walletId: walletId
}, {
type: 'del',
key: KEY.PENDING_TXP(walletId, txProposalId),
}];
this.db.batch(ops, cb);
};
Storage.prototype._delByKey = function(key, cb) {
var self = this;
var keys = [];
this.db.createKeyStream({
gte: key,
lt: key + '~',
})
.on('data', function(key) {
keys.push(key);
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function(err) {
self.db.batch(_.map(keys, function(k) {
return {
key: k,
type: 'del'
};
}), function(err) {
return cb(err);
});
});
};
Storage.prototype._removeCopayers = function(walletId, cb) {
var self = this;
this.fetchWallet(walletId, function(err, w) {
if (err || !w) return cb(err);
self.db.batch(_.map(w.copayers, function(c) {
return {
type: 'del',
key: KEY.COPAYER(c.id),
};
}), cb);
});
w: 1
}, cb);
};
Storage.prototype.removeWallet = function(walletId, cb) {
var self = this;
async.series([
async.parallel([
function(next) {
// This should be the first step. Will check the wallet exists
self._removeCopayers(walletId, next);
this.db.collections('wallets').findAndRemove({
id: walletId
}, next);
},
function(next) {
self._delByKey(walletPrefix(walletId), cb);
this.db.collections('addresses').findAndRemove({
walletId: walletId
}, next);
},
function(next) {
this.db.collections('txs').findAndRemove({
walletId: walletId
}, next);
},
function(next) {
this.db.collections('notifications').findAndRemove({
walletId: walletId
}, next);
},
], cb);
};
Storage.prototype.fetchAddresses = function(walletId, cb) {
var addresses = [];
var key = KEY.ADDRESS(walletId);
this.db.createReadStream({
gte: key,
lt: key + '~'
})
.on('data', function(data) {
addresses.push(Address.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return cb(null, addresses);
var self = this;
this.db.collection('addresses').find({
walletId: walletId,
}).sort({
createdOn: 1
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var addresses = _.map(result, function(address) {
return Model.Address.fromObj(address);
});
return cb(null, addresses);
});
};
Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) {
var ops = _.map([].concat(addresses), function(address) {
return {
type: 'put',
key: KEY.ADDRESS(wallet.id, address.address),
value: address,
};
var self = this;
this.db.collection('addresses').insert([].concat(addresses), {
w: 1
}, function(err) {
if (err) return cb(err);
self.storeWallet(wallet, cb);
});
ops.unshift({
type: 'put',
key: KEY.WALLET(wallet.id),
value: wallet,
});
this.db.batch(ops, cb);
};
Storage.prototype._dump = function(cb, fn) {
fn = fn || console.log;
this.db.readStream()
.on('data', function(data) {
fn(util.inspect(data, {
depth: 10
}));
})
.on('end', function() {
if (cb) return cb();
});
var self = this;
this.db.collections(function(err, collections) {
if (err) return cb(err);
async.eachSeries(collections, function(col, next) {
fn('--------' + col);
col.find().toArray(function(err, item) {
fn(item);
next(err);
});
}, cb);
});
};
module.exports = Storage;

View File

@ -52,7 +52,8 @@
"memdown": "^1.0.0",
"mocha": "^1.18.2",
"sinon": "^1.10.3",
"supertest": "*"
"supertest": "*",
"tingodb": "^0.3.4"
},
"scripts": {
"start": "node bws.js",
@ -60,14 +61,11 @@
"test": "./node_modules/.bin/mocha",
"coveralls": "./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage"
},
"contributors": [
{
"name": "Ivan Socolsky",
"email": "ivan@bitpay.com"
},
{
"name": "Matias Alejo Garcia",
"email": "ematiu@gmail.com"
}
]
"contributors": [{
"name": "Ivan Socolsky",
"email": "ivan@bitpay.com"
}, {
"name": "Matias Alejo Garcia",
"email": "ematiu@gmail.com"
}]
}

View File

@ -1,24 +1,62 @@
'use strict';
var _ = require('lodash');
var async = require('async');
var chai = require('chai');
var sinon = require('sinon');
var should = chai.should();
var levelup = require('levelup');
var memdown = require('memdown');
var Storage = require('../lib/storage');
var mongodb = require('mongodb');
var StorageLevelDb = require('../lib/storage');
var StorageMongoDb = require('../lib/storage_mongo');
var Model = require('../lib/model');
describe.only('Storage', function() {
var storage;
beforeEach(function() {
var db = levelup(memdown, {
valueEncoding: 'json'
function initStorageLevelDb(cb) {
var db = levelup(memdown, {
valueEncoding: 'json'
});
return cb(null, db);
};
function initStorageMongoDb(cb) {
var url = 'mongodb://localhost:27017/bws';
mongodb.MongoClient.connect(url, function(err, db) {
should.not.exist(err);
db.dropDatabase(function(err) {
return cb(null, db);
});
storage = new Storage({
db: db
});
};
var Storage, initDb;
var useLevel = false;
if (useLevel) {
Storage = StorageLevelDb;
initDb = initStorageLevelDb;
} else {
Storage = StorageMongoDb;
initDb = initStorageMongoDb;
}
describe('Storage', function() {
var storage;
beforeEach(function(done) {
initDb(function(err, db) {
should.not.exist(err);
storage = new Storage({
db: db
});
done();
});
});
describe('Store & fetch wallet', function() {
@ -51,4 +89,124 @@ describe.only('Storage', function() {
});
});
});
describe('Copayer lookup', function() {
it('should correctly store and fetch copayer lookup', function(done) {
var wallet = Model.Wallet.create({
id: '123',
name: 'my wallet',
m: 2,
n: 3,
});
_.each(_.range(3), function(i) {
var copayer = Model.Copayer.create({
name: 'copayer ' + i,
xPubKey: 'xPubKey ' + i,
requestPubKey: 'requestPubKey ' + i,
});
wallet.addCopayer(copayer);
});
should.exist(wallet);
storage.storeWalletAndUpdateCopayersLookup(wallet, function(err) {
should.not.exist(err);
storage.fetchCopayerLookup(wallet.copayers[1].id, function(err, lookup) {
should.not.exist(err);
should.exist(lookup);
lookup.walletId.should.equal('123');
lookup.requestPubKey.should.equal('requestPubKey 1');
done();
})
});
});
it('should not return error if copayer not found', function(done) {
storage.fetchCopayerLookup('2', function(err, lookup) {
should.not.exist(err);
should.not.exist(lookup);
done();
});
});
});
describe('Transaction proposals', function() {
var wallet, proposals;
beforeEach(function(done) {
wallet = Model.Wallet.create({
id: '123',
name: 'my wallet',
m: 2,
n: 3,
});
_.each(_.range(3), function(i) {
var copayer = Model.Copayer.create({
name: 'copayer ' + i,
xPubKey: 'xPubKey ' + i,
requestPubKey: 'requestPubKey ' + i,
});
wallet.addCopayer(copayer);
});
should.exist(wallet);
storage.storeWalletAndUpdateCopayersLookup(wallet, function(err) {
should.not.exist(err);
proposals = _.map(_.range(4), function(i) {
var tx = Model.TxProposal.create({
walletId: '123',
creatorId: wallet.copayers[0].id,
amount: i + 100,
});
if (i % 2 == 0) {
tx.status = 'rejected';
tx.isPending().should.be.false;
}
return tx;
});
async.each(proposals, function(tx, next) {
storage.storeTx('123', tx, next);
}, function(err) {
should.not.exist(err);
done();
});
});
});
it('should fetch tx', function(done) {
storage.fetchTx('123', proposals[0].id, function(err, tx) {
should.not.exist(err);
should.exist(tx);
tx.id.should.equal(proposals[0].id);
tx.walletId.should.equal(proposals[0].walletId);
tx.creatorName.should.equal('copayer 0');
done();
});
});
it('should fetch all pending txs', function(done) {
storage.fetchPendingTxs('123', function(err, txs) {
should.not.exist(err);
should.exist(txs);
txs.length.should.equal(2);
txs = _.sortBy(txs, 'amount');
txs[0].amount.should.equal(101);
txs[1].amount.should.equal(103);
done();
});
});
it('should remove tx', function(done) {
storage.removeTx('123', proposals[0].id, function(err) {
should.not.exist(err);
storage.fetchTx('123', proposals[0].id, function(err, tx) {
should.not.exist(err);
should.not.exist(tx);
storage.fetchTxs('123', {}, function(err, txs) {
should.not.exist(err);
should.exist(txs);
txs.length.should.equal(3);
_.any(txs, {
id: proposals[0].id
}).should.be.false;
done();
});
});
});
});
});
});