modularize db
This commit is contained in:
parent
078577499b
commit
8d39d02ee6
278
lib/db.js
278
lib/db.js
|
@ -6,12 +6,14 @@ var BaseDB = chainlib.DB;
|
|||
var Transaction = require('./transaction');
|
||||
var async = require('async');
|
||||
var bitcore = require('bitcore');
|
||||
var $ = bitcore.util.preconditions;
|
||||
var BufferWriter = bitcore.encoding.BufferWriter;
|
||||
var errors = require('./errors');
|
||||
var levelup = chainlib.deps.levelup;
|
||||
var log = chainlib.log;
|
||||
var PublicKey = bitcore.PublicKey;
|
||||
var Address = bitcore.Address;
|
||||
var BaseModule = require('./module');
|
||||
var AddressModule = require('./modules/address');
|
||||
|
||||
function DB(options) {
|
||||
if(!options) {
|
||||
|
@ -25,15 +27,21 @@ function DB(options) {
|
|||
this.Transaction = Transaction;
|
||||
|
||||
this.network = bitcore.Networks.get(options.network) || bitcore.Networks.testnet;
|
||||
this.modules = [];
|
||||
|
||||
// Add address module
|
||||
this.addModule(AddressModule);
|
||||
|
||||
// Add other modules
|
||||
if(options.modules && options.modules.length) {
|
||||
for(var i = 0; i < options.modules.length; i++) {
|
||||
this.addModule(options.modules[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
util.inherits(DB, BaseDB);
|
||||
|
||||
DB.PREFIXES = {
|
||||
OUTPUTS: 'outs'
|
||||
};
|
||||
DB.CONCURRENCY = 10;
|
||||
|
||||
DB.prototype.getBlock = function(hash, callback) {
|
||||
var self = this;
|
||||
|
||||
|
@ -159,241 +167,67 @@ DB.prototype.getInputTotal = function(transactions) {
|
|||
return grandTotal;
|
||||
};
|
||||
|
||||
DB.prototype._updateOutputs = function(block, addOutput, callback) {
|
||||
var txs = this.getTransactionsFromBlock(block);
|
||||
|
||||
log.debug('Processing transactions', txs);
|
||||
log.debug('Updating outputs');
|
||||
|
||||
var action = 'put';
|
||||
if (!addOutput) {
|
||||
action = 'del';
|
||||
}
|
||||
|
||||
var operations = [];
|
||||
|
||||
for (var i = 0; i < txs.length; i++) {
|
||||
|
||||
var tx = txs[i];
|
||||
var txid = tx.id;
|
||||
var inputs = tx.inputs;
|
||||
var outputs = tx.outputs;
|
||||
|
||||
for (var j = 0; j < outputs.length; j++) {
|
||||
var output = outputs[j];
|
||||
|
||||
var script = output.script;
|
||||
if(!script) {
|
||||
log.debug('Invalid script');
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!script.isPublicKeyHashOut() && !script.isScriptHashOut() && !script.isPublicKeyOut()) {
|
||||
// ignore for now
|
||||
log.debug('script was not pubkeyhashout, scripthashout, or pubkeyout');
|
||||
continue;
|
||||
}
|
||||
|
||||
var address;
|
||||
|
||||
if(script.isPublicKeyOut()) {
|
||||
var pubkey = script.chunks[0].buf;
|
||||
address = Address.fromPublicKey(new PublicKey(pubkey), this.network);
|
||||
} else {
|
||||
address = output.script.toAddress(this.network);
|
||||
}
|
||||
|
||||
var outputIndex = j;
|
||||
|
||||
var timestamp = block.timestamp.getTime();
|
||||
var height = block.height;
|
||||
|
||||
operations.push({
|
||||
type: action,
|
||||
key: [DB.PREFIXES.OUTPUTS, address, timestamp, txid, outputIndex].join('-'),
|
||||
value: [output.satoshis, script, height].join(':')
|
||||
});
|
||||
}
|
||||
|
||||
if(tx.isCoinbase()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
setImmediate(function() {
|
||||
callback(null, operations);
|
||||
});
|
||||
};
|
||||
|
||||
DB.prototype._onChainAddBlock = function(block, callback) {
|
||||
|
||||
var self = this;
|
||||
|
||||
log.debug('DB handling new chain block');
|
||||
|
||||
// Remove block from mempool
|
||||
self.mempool.removeBlock(block.hash);
|
||||
|
||||
async.series([
|
||||
this._updateOutputs.bind(this, block, true), // add outputs
|
||||
], function(err, results) {
|
||||
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
var operations = [];
|
||||
for (var i = 0; i < results.length; i++) {
|
||||
operations = operations.concat(results[i]);
|
||||
}
|
||||
|
||||
log.debug('Updating the database with operations', operations);
|
||||
|
||||
self.store.batch(operations, callback);
|
||||
|
||||
});
|
||||
|
||||
this.mempool.removeBlock(block.hash);
|
||||
this.blockHandler(block, true, callback);
|
||||
};
|
||||
|
||||
|
||||
DB.prototype._onChainRemoveBlock = function(block, callback) {
|
||||
log.debug('DB removing chain block');
|
||||
this.blockHandler(block, false, callback);
|
||||
};
|
||||
|
||||
DB.prototype.blockHandler = function(block, add, callback) {
|
||||
var self = this;
|
||||
var operations = [];
|
||||
|
||||
async.series([
|
||||
this._updateOutputs.bind(this, block, false), // remove outputs
|
||||
], function(err, results) {
|
||||
async.eachSeries(
|
||||
this.modules,
|
||||
function(module, next) {
|
||||
module['blockHandler'].call(module, block, add, function(err, ops) {
|
||||
if(err) {
|
||||
return next(err);
|
||||
}
|
||||
|
||||
if (err) {
|
||||
return callback(err);
|
||||
operations = operations.concat(ops);
|
||||
next();
|
||||
});
|
||||
},
|
||||
function(err) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
log.debug('Updating the database with operations', operations);
|
||||
|
||||
self.store.batch(operations, callback);
|
||||
}
|
||||
|
||||
var operations = [];
|
||||
for (var i = 0; i < results.length; i++) {
|
||||
operations = operations.concat(results[i]);
|
||||
}
|
||||
self.store.batch(operations, callback);
|
||||
|
||||
});
|
||||
|
||||
);
|
||||
};
|
||||
|
||||
DB.prototype.getAPIMethods = function() {
|
||||
return [
|
||||
['getTransaction', this, this.getTransaction, 2],
|
||||
['getBalance', this, this.getBalance, 2],
|
||||
['getOutputs', this, this.getOutputs, 2],
|
||||
['getUnspentOutputs', this, this.getUnspentOutputs, 2],
|
||||
['isSpent', this, this.isSpent, 2]
|
||||
var methods = [
|
||||
['getTransaction', this, this.getTransaction, 2]
|
||||
];
|
||||
|
||||
for(var i = 0; i < this.modules.length; i++) {
|
||||
methods = methods.concat(this.modules[i]['methods'].call(this.modules[i]));
|
||||
}
|
||||
|
||||
return methods;
|
||||
};
|
||||
|
||||
DB.prototype.getBalance = function(address, queryMempool, callback) {
|
||||
this.getUnspentOutputs(address, queryMempool, function(err, outputs) {
|
||||
if(err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
var satoshis = outputs.map(function(output) {
|
||||
return output.satoshis;
|
||||
});
|
||||
|
||||
var sum = satoshis.reduce(function(a, b) {
|
||||
return a + b;
|
||||
}, 0);
|
||||
|
||||
return callback(null, sum);
|
||||
});
|
||||
};
|
||||
|
||||
DB.prototype.getOutputs = function(address, queryMempool, callback) {
|
||||
var self = this;
|
||||
|
||||
var outputs = [];
|
||||
var key = [DB.PREFIXES.OUTPUTS, address].join('-');
|
||||
|
||||
var stream = this.store.createReadStream({
|
||||
start: key,
|
||||
end: key + '~'
|
||||
});
|
||||
|
||||
stream.on('data', function(data) {
|
||||
|
||||
var key = data.key.split('-');
|
||||
var value = data.value.split(':');
|
||||
|
||||
var output = {
|
||||
address: key[1],
|
||||
txid: key[3],
|
||||
outputIndex: Number(key[4]),
|
||||
satoshis: Number(value[0]),
|
||||
script: value[1],
|
||||
blockHeight: Number(value[2])
|
||||
};
|
||||
|
||||
outputs.push(output);
|
||||
|
||||
});
|
||||
|
||||
var error;
|
||||
|
||||
stream.on('error', function(streamError) {
|
||||
if (streamError) {
|
||||
error = streamError;
|
||||
}
|
||||
});
|
||||
|
||||
stream.on('close', function() {
|
||||
if (error) {
|
||||
return callback(error);
|
||||
}
|
||||
|
||||
if(queryMempool) {
|
||||
outputs = outputs.concat(self.bitcoind.getMempoolOutputs(address));
|
||||
}
|
||||
|
||||
callback(null, outputs);
|
||||
});
|
||||
|
||||
return stream;
|
||||
|
||||
};
|
||||
|
||||
DB.prototype.getUnspentOutputs = function(address, queryMempool, callback) {
|
||||
|
||||
var self = this;
|
||||
|
||||
this.getOutputs(address, queryMempool, function(err, outputs) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
} else if(!outputs.length) {
|
||||
return callback(new errors.NoOutputs('Address ' + address + ' has no outputs'), []);
|
||||
}
|
||||
|
||||
var isUnspent = function(output, callback) {
|
||||
self.isUnspent(output, queryMempool, callback);
|
||||
};
|
||||
|
||||
async.filter(outputs, isUnspent, function(results) {
|
||||
callback(null, results);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
DB.prototype.isUnspent = function(output, queryMempool, callback) {
|
||||
this.isSpent(output, queryMempool, function(spent) {
|
||||
callback(!spent);
|
||||
});
|
||||
};
|
||||
|
||||
DB.prototype.isSpent = function(output, queryMempool, callback) {
|
||||
var self = this;
|
||||
var txid = output.prevTxId ? output.prevTxId.toString('hex') : output.txid;
|
||||
|
||||
setImmediate(function() {
|
||||
callback(self.bitcoind.isSpent(txid, output.outputIndex));
|
||||
DB.prototype.addModule = function(Module) {
|
||||
var module = new Module({
|
||||
db: this,
|
||||
bitcoind: this.bitcoind,
|
||||
network: this.network
|
||||
});
|
||||
$.checkArgumentType(module, BaseModule);
|
||||
this.modules.push(module);
|
||||
};
|
||||
|
||||
module.exports = DB;
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
'use strict';
|
||||
|
||||
var Module = function(options) {
|
||||
this.db = options.db;
|
||||
this.bitcoind = options.bitcoind;
|
||||
this.network = options.network;
|
||||
};
|
||||
|
||||
Module.prototype.blockHandler = function(block, add, callback) {
|
||||
// implement in the child class
|
||||
setImmediate(callback);
|
||||
};
|
||||
|
||||
Module.prototype.methods = function() {
|
||||
// Example:
|
||||
// return [
|
||||
// ['getData', this, this.getData, 1]
|
||||
// ];
|
||||
|
||||
return [];
|
||||
};
|
||||
|
||||
// Example:
|
||||
// Module.prototype.getData = function(arg1, callback) {
|
||||
//
|
||||
// };
|
||||
|
||||
module.exports = Module;
|
|
@ -0,0 +1,204 @@
|
|||
'use strict';
|
||||
|
||||
var BaseModule = require('../module');
|
||||
var inherits = require('util').inherits;
|
||||
var async = require('async');
|
||||
var chainlib = require('chainlib');
|
||||
var log = chainlib.log;
|
||||
var bitcore = require('bitcore');
|
||||
var PublicKey = bitcore.PublicKey;
|
||||
var Address = bitcore.Address;
|
||||
|
||||
var AddressModule = function(options) {
|
||||
BaseModule.call(this, options);
|
||||
};
|
||||
|
||||
inherits(AddressModule, BaseModule);
|
||||
|
||||
AddressModule.PREFIXES = {
|
||||
OUTPUTS: 'outs'
|
||||
};
|
||||
|
||||
AddressModule.prototype.methods = function() {
|
||||
return [
|
||||
['getBalance', this, this.getBalance, 2],
|
||||
['getOutputs', this, this.getOutputs, 2],
|
||||
['getUnspentOutputs', this, this.getUnspentOutputs, 2],
|
||||
['isSpent', this, this.isSpent, 2]
|
||||
];
|
||||
};
|
||||
|
||||
AddressModule.prototype.blockHandler = function(block, addOutput, callback) {
|
||||
var txs = this.db.getTransactionsFromBlock(block);
|
||||
|
||||
log.debug('Updating output index');
|
||||
|
||||
var action = 'put';
|
||||
if (!addOutput) {
|
||||
action = 'del';
|
||||
}
|
||||
|
||||
var operations = [];
|
||||
|
||||
for (var i = 0; i < txs.length; i++) {
|
||||
|
||||
var tx = txs[i];
|
||||
var txid = tx.id;
|
||||
var inputs = tx.inputs;
|
||||
var outputs = tx.outputs;
|
||||
|
||||
for (var j = 0; j < outputs.length; j++) {
|
||||
var output = outputs[j];
|
||||
|
||||
var script = output.script;
|
||||
if(!script) {
|
||||
log.debug('Invalid script');
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!script.isPublicKeyHashOut() && !script.isScriptHashOut() && !script.isPublicKeyOut()) {
|
||||
// ignore for now
|
||||
log.debug('script was not pubkeyhashout, scripthashout, or pubkeyout');
|
||||
continue;
|
||||
}
|
||||
|
||||
var address;
|
||||
|
||||
if(script.isPublicKeyOut()) {
|
||||
var pubkey = script.chunks[0].buf;
|
||||
address = Address.fromPublicKey(new PublicKey(pubkey), this.network);
|
||||
} else {
|
||||
address = output.script.toAddress(this.network);
|
||||
}
|
||||
|
||||
var outputIndex = j;
|
||||
|
||||
var timestamp = block.timestamp.getTime();
|
||||
var height = block.height;
|
||||
|
||||
operations.push({
|
||||
type: action,
|
||||
key: [AddressModule.PREFIXES.OUTPUTS, address, timestamp, txid, outputIndex].join('-'),
|
||||
value: [output.satoshis, script, height].join(':')
|
||||
});
|
||||
}
|
||||
|
||||
if(tx.isCoinbase()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
setImmediate(function() {
|
||||
callback(null, operations);
|
||||
});
|
||||
};
|
||||
|
||||
AddressModule.prototype.getBalance = function(address, queryMempool, callback) {
|
||||
this.getUnspentOutputs(address, queryMempool, function(err, outputs) {
|
||||
if(err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
var satoshis = outputs.map(function(output) {
|
||||
return output.satoshis;
|
||||
});
|
||||
|
||||
var sum = satoshis.reduce(function(a, b) {
|
||||
return a + b;
|
||||
}, 0);
|
||||
|
||||
return callback(null, sum);
|
||||
});
|
||||
};
|
||||
|
||||
AddressModule.prototype.getOutputs = function(address, queryMempool, callback) {
|
||||
var self = this;
|
||||
|
||||
var outputs = [];
|
||||
var key = [AddressModule.PREFIXES.OUTPUTS, address].join('-');
|
||||
|
||||
var stream = this.db.store.createReadStream({
|
||||
start: key,
|
||||
end: key + '~'
|
||||
});
|
||||
|
||||
stream.on('data', function(data) {
|
||||
|
||||
var key = data.key.split('-');
|
||||
var value = data.value.split(':');
|
||||
|
||||
var output = {
|
||||
address: key[1],
|
||||
txid: key[3],
|
||||
outputIndex: Number(key[4]),
|
||||
satoshis: Number(value[0]),
|
||||
script: value[1],
|
||||
blockHeight: Number(value[2])
|
||||
};
|
||||
|
||||
outputs.push(output);
|
||||
|
||||
});
|
||||
|
||||
var error;
|
||||
|
||||
stream.on('error', function(streamError) {
|
||||
if (streamError) {
|
||||
error = streamError;
|
||||
}
|
||||
});
|
||||
|
||||
stream.on('close', function() {
|
||||
if (error) {
|
||||
return callback(error);
|
||||
}
|
||||
|
||||
if(queryMempool) {
|
||||
outputs = outputs.concat(self.bitcoind.getMempoolOutputs(address));
|
||||
}
|
||||
|
||||
callback(null, outputs);
|
||||
});
|
||||
|
||||
return stream;
|
||||
|
||||
};
|
||||
|
||||
AddressModule.prototype.getUnspentOutputs = function(address, queryMempool, callback) {
|
||||
|
||||
var self = this;
|
||||
|
||||
this.getOutputs(address, queryMempool, function(err, outputs) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
} else if(!outputs.length) {
|
||||
return callback(new errors.NoOutputs('Address ' + address + ' has no outputs'), []);
|
||||
}
|
||||
|
||||
var isUnspent = function(output, callback) {
|
||||
self.isUnspent(output, queryMempool, callback);
|
||||
};
|
||||
|
||||
async.filter(outputs, isUnspent, function(results) {
|
||||
callback(null, results);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
AddressModule.prototype.isUnspent = function(output, queryMempool, callback) {
|
||||
this.isSpent(output, queryMempool, function(spent) {
|
||||
callback(!spent);
|
||||
});
|
||||
};
|
||||
|
||||
AddressModule.prototype.isSpent = function(output, queryMempool, callback) {
|
||||
var self = this;
|
||||
var txid = output.prevTxId ? output.prevTxId.toString('hex') : output.txid;
|
||||
|
||||
setImmediate(function() {
|
||||
callback(self.bitcoind.isSpent(txid, output.outputIndex));
|
||||
});
|
||||
};
|
||||
|
||||
module.exports = AddressModule;
|
Loading…
Reference in New Issue