store txs individually
Conflicts: lib/storage.js test/integration/server.js
This commit is contained in:
parent
251c743e16
commit
8342d22041
245
lib/storage.js
245
lib/storage.js
|
@ -609,92 +609,10 @@ Storage.prototype.storeActiveAddresses = function(walletId, addresses, cb) {
|
|||
}, cb);
|
||||
};
|
||||
|
||||
Storage.prototype.softResetAllTxHistoryCache = function(cb) {
|
||||
this.db.collection(collections.CACHE).update({
|
||||
type: 'historyCacheStatus',
|
||||
}, {
|
||||
isUpdated: false,
|
||||
}, {
|
||||
multi: true,
|
||||
}, cb);
|
||||
};
|
||||
|
||||
|
||||
|
||||
Storage.prototype.softResetTxHistoryCache = function(walletId, cb) {
|
||||
this.db.collection(collections.CACHE).update({
|
||||
walletId: walletId,
|
||||
type: 'historyCacheStatus',
|
||||
key: null
|
||||
}, {
|
||||
isUpdated: false,
|
||||
}, {
|
||||
w: 1,
|
||||
upsert: true,
|
||||
}, cb);
|
||||
};
|
||||
|
||||
|
||||
Storage.prototype.clearTxHistoryCache = function(walletId, cb) {
|
||||
var self = this;
|
||||
self.db.collection(collections.CACHE).remove({
|
||||
walletId: walletId,
|
||||
type: 'historyCache'
|
||||
}, {
|
||||
multi: 1
|
||||
}, function(err) {
|
||||
self.db.collection(collections.CACHE).remove({
|
||||
walletId: walletId,
|
||||
type: 'historyCacheStatus',
|
||||
key: null
|
||||
}, {
|
||||
w: 1
|
||||
}, cb);
|
||||
});
|
||||
};
|
||||
|
||||
var bucketKey = function(bucket, size) {
|
||||
return bucket + ':' + size;
|
||||
};
|
||||
|
||||
var BUCKET_SIZE = 100;
|
||||
|
||||
Storage.prototype._doGetTxHistoryCacheBucket = function(walletId, fwdIndex, cb) {
|
||||
var self = this;
|
||||
|
||||
var bucket = Math.floor(fwdIndex / BUCKET_SIZE);
|
||||
var bucketStart = bucket * BUCKET_SIZE;
|
||||
|
||||
self.db.collection(collections.CACHE).findOne({
|
||||
walletId: walletId,
|
||||
type: 'historyCache',
|
||||
key: bucketKey(bucket, BUCKET_SIZE),
|
||||
}, function(err, res1) {
|
||||
if (err) return cb(err);
|
||||
|
||||
var h1 = res1 ? res1.history || [] : [];
|
||||
|
||||
if (h1.length < BUCKET_SIZE)
|
||||
h1[BUCKET_SIZE-1] = null;
|
||||
|
||||
self.db.collection(collections.CACHE).findOne({
|
||||
walletId: walletId,
|
||||
type: 'historyCache',
|
||||
key: bucketKey(bucket + 1, BUCKET_SIZE),
|
||||
}, function(err, res2) {
|
||||
if (err) return cb(err);
|
||||
var h2 = res2 ? res2.history || [] : [];
|
||||
|
||||
if (h2.length < BUCKET_SIZE)
|
||||
h2[BUCKET_SIZE-1] = null;
|
||||
var h = (new Array(bucketStart)).concat(h1).concat(h2);
|
||||
return cb(null, h, bucket);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
|
||||
// -------- --------------------------- Total
|
||||
// > Time >
|
||||
// ^to <= ^from
|
||||
// ^fwdIndex => ^end
|
||||
Storage.prototype.getTxHistoryCache = function(walletId, from, to, cb) {
|
||||
var self = this;
|
||||
$.checkArgument(from >= 0);
|
||||
|
@ -715,107 +633,115 @@ Storage.prototype.getTxHistoryCache = function(walletId, from, to, cb) {
|
|||
if (fwdIndex < 0) {
|
||||
fwdIndex = 0;
|
||||
}
|
||||
|
||||
var end = result.totalItems - from;
|
||||
|
||||
// nothing to return
|
||||
if (end <= 0)
|
||||
return cb(null, []);
|
||||
if (end <= 0) return cb(null, []);
|
||||
|
||||
self._doGetTxHistoryCacheBucket(walletId, fwdIndex, function(err, h) {
|
||||
// Cache is OK.
|
||||
self.db.collection(collections.CACHE).find({
|
||||
walletId: walletId,
|
||||
type: 'historyCache',
|
||||
key: {
|
||||
$gte: fwdIndex,
|
||||
$lt: end
|
||||
},
|
||||
}).sort({
|
||||
key: -1,
|
||||
}).toArray(function(err, result) {
|
||||
if (err) return cb(err);
|
||||
|
||||
if (!h) //|| result.history.length < end)
|
||||
return cb();
|
||||
if (!result) return cb();
|
||||
|
||||
var res = h.slice(fwdIndex, end);
|
||||
|
||||
if (_.any(res, function(i) {
|
||||
return !i;
|
||||
})) {
|
||||
if (result.length < end - fwdIndex) {
|
||||
// some items are not yet defined.
|
||||
return cb();
|
||||
}
|
||||
return cb(null, res.reverse());
|
||||
|
||||
var txs = _.pluck(result, 'tx');
|
||||
return cb(null, txs);
|
||||
});
|
||||
})
|
||||
};
|
||||
|
||||
Storage.prototype.softResetAllTxHistoryCache = function(cb) {
|
||||
this.db.collection(collections.CACHE).update({
|
||||
type: 'historyCacheStatus',
|
||||
}, {
|
||||
isUpdated: false,
|
||||
}, {
|
||||
multi: true,
|
||||
}, cb);
|
||||
};
|
||||
|
||||
Storage.prototype._doSaveTxHistoryCache = function(walletId, fwdIndex, items, cb) {
|
||||
$.checkArgument(items.length < BUCKET_SIZE);
|
||||
Storage.prototype.softResetTxHistoryCache = function(walletId, cb) {
|
||||
this.db.collection(collections.CACHE).update({
|
||||
walletId: walletId,
|
||||
type: 'historyCacheStatus',
|
||||
key: null
|
||||
}, {
|
||||
isUpdated: false,
|
||||
}, {
|
||||
w: 1,
|
||||
upsert: true,
|
||||
}, cb);
|
||||
};
|
||||
|
||||
Storage.prototype.clearTxHistoryCache = function(walletId, cb) {
|
||||
var self = this;
|
||||
var bucket = Math.floor(fwdIndex / BUCKET_SIZE);
|
||||
var bucketStart = bucket * BUCKET_SIZE;
|
||||
|
||||
self._doGetTxHistoryCacheBucket(walletId, fwdIndex, function(err, h, bucket) {
|
||||
|
||||
// Add new items
|
||||
_.each(items, function(i) {
|
||||
h[fwdIndex++] = i;
|
||||
});
|
||||
|
||||
var toSave = h.slice(bucketStart, bucketStart + BUCKET_SIZE);
|
||||
self.db.collection(collections.CACHE).update({
|
||||
self.db.collection(collections.CACHE).remove({
|
||||
walletId: walletId,
|
||||
type: 'historyCache',
|
||||
}, {
|
||||
multi: 1
|
||||
}, function(err) {
|
||||
if (err) return cb(err);
|
||||
self.db.collection(collections.CACHE).remove({
|
||||
walletId: walletId,
|
||||
type: 'historyCache',
|
||||
key: bucketKey(bucket, BUCKET_SIZE),
|
||||
type: 'historyCacheStatus',
|
||||
key: null
|
||||
}, {
|
||||
walletId: walletId,
|
||||
type: 'historyCache',
|
||||
key: bucketKey(bucket, BUCKET_SIZE),
|
||||
history: toSave,
|
||||
}, {
|
||||
w: 1,
|
||||
upsert: true,
|
||||
}, function(err) {
|
||||
if (err) return cb(err);
|
||||
|
||||
bucket++;
|
||||
bucketStart += BUCKET_SIZE;
|
||||
|
||||
toSave = h.slice(bucketStart, bucketStart + BUCKET_SIZE);
|
||||
self.db.collection(collections.CACHE).update({
|
||||
walletId: walletId,
|
||||
type: 'historyCache',
|
||||
key: bucketKey(bucket, BUCKET_SIZE),
|
||||
}, {
|
||||
walletId: walletId,
|
||||
type: 'historyCache',
|
||||
key: bucketKey(bucket, BUCKET_SIZE),
|
||||
history: toSave,
|
||||
}, {
|
||||
w: 1,
|
||||
upsert: true,
|
||||
}, cb);
|
||||
|
||||
});
|
||||
w: 1
|
||||
}, cb);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// items should be in reverse CHRONOLOGICAL order
|
||||
// firstPosition, is the
|
||||
Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, to, items, cb) {
|
||||
$.shouldBeNumber(to);
|
||||
$.checkArgument(to >= 0);
|
||||
// items should be in CHRONOLOGICAL order
|
||||
Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, firstPosition, items, cb) {
|
||||
$.shouldBeNumber(firstPosition);
|
||||
$.checkArgument(firstPosition >= 0);
|
||||
$.shouldBeNumber(totalItems);
|
||||
$.checkArgument(totalItems >= 0);
|
||||
|
||||
|
||||
var fwdIndex = totalItems - to;
|
||||
if (fwdIndex < 0) fwdIndex = 0;
|
||||
|
||||
var self = this;
|
||||
|
||||
self._doSaveTxHistoryCache(walletId, fwdIndex, items.reverse(), function(err) {
|
||||
_.each(items, function(item, i) {
|
||||
item.position = firstPosition + i;
|
||||
});
|
||||
var cacheIsComplete = (firstPosition == 0);
|
||||
|
||||
// TODO: check txid uniqness?
|
||||
async.each(items, function(item, next) {
|
||||
var pos = item.position;
|
||||
delete item.position;
|
||||
self.db.collection(collections.CACHE).update({
|
||||
walletId: walletId,
|
||||
type: 'historyCache',
|
||||
key: pos,
|
||||
}, {
|
||||
walletId: walletId,
|
||||
type: 'historyCache',
|
||||
key: pos,
|
||||
tx: item,
|
||||
}, {
|
||||
w: 1,
|
||||
upsert: true,
|
||||
}, next);
|
||||
}, function(err) {
|
||||
if (err) return cb(err);
|
||||
|
||||
var now = Date.now();
|
||||
>>> >>> > store txs individually
|
||||
self.db.collection(collections.CACHE).update({
|
||||
walletId: walletId,
|
||||
type: 'historyCacheStatus',
|
||||
|
@ -825,7 +751,8 @@ Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, to, items
|
|||
type: 'historyCacheStatus',
|
||||
key: null,
|
||||
totalItems: totalItems,
|
||||
updatedOn: now,
|
||||
updatedOn: Date.now(),
|
||||
isComplete: cacheIsComplete,
|
||||
isUpdated: true,
|
||||
}, {
|
||||
w: 1,
|
||||
|
|
Loading…
Reference in New Issue