From 8d39d02ee66526093220022c1c486dc393bcbf68 Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Tue, 21 Jul 2015 14:59:08 -0600 Subject: [PATCH] modularize db --- lib/db.js | 278 +++++++++-------------------------------- lib/module.js | 28 +++++ lib/modules/address.js | 204 ++++++++++++++++++++++++++++++ 3 files changed, 288 insertions(+), 222 deletions(-) create mode 100644 lib/module.js create mode 100644 lib/modules/address.js diff --git a/lib/db.js b/lib/db.js index f950ca78..4f6ee98a 100644 --- a/lib/db.js +++ b/lib/db.js @@ -6,12 +6,14 @@ var BaseDB = chainlib.DB; var Transaction = require('./transaction'); var async = require('async'); var bitcore = require('bitcore'); +var $ = bitcore.util.preconditions; var BufferWriter = bitcore.encoding.BufferWriter; var errors = require('./errors'); var levelup = chainlib.deps.levelup; var log = chainlib.log; -var PublicKey = bitcore.PublicKey; var Address = bitcore.Address; +var BaseModule = require('./module'); +var AddressModule = require('./modules/address'); function DB(options) { if(!options) { @@ -25,15 +27,21 @@ function DB(options) { this.Transaction = Transaction; this.network = bitcore.Networks.get(options.network) || bitcore.Networks.testnet; + this.modules = []; + + // Add address module + this.addModule(AddressModule); + + // Add other modules + if(options.modules && options.modules.length) { + for(var i = 0; i < options.modules.length; i++) { + this.addModule(options.modules[i]); + } + } } util.inherits(DB, BaseDB); -DB.PREFIXES = { - OUTPUTS: 'outs' -}; -DB.CONCURRENCY = 10; - DB.prototype.getBlock = function(hash, callback) { var self = this; @@ -159,241 +167,67 @@ DB.prototype.getInputTotal = function(transactions) { return grandTotal; }; -DB.prototype._updateOutputs = function(block, addOutput, callback) { - var txs = this.getTransactionsFromBlock(block); - - log.debug('Processing transactions', txs); - log.debug('Updating outputs'); - - var action = 'put'; - if (!addOutput) { - action = 'del'; - } - - var operations = []; - - for (var i = 0; i < txs.length; i++) { - - var tx = txs[i]; - var txid = tx.id; - var inputs = tx.inputs; - var outputs = tx.outputs; - - for (var j = 0; j < outputs.length; j++) { - var output = outputs[j]; - - var script = output.script; - if(!script) { - log.debug('Invalid script'); - continue; - } - - if (!script.isPublicKeyHashOut() && !script.isScriptHashOut() && !script.isPublicKeyOut()) { - // ignore for now - log.debug('script was not pubkeyhashout, scripthashout, or pubkeyout'); - continue; - } - - var address; - - if(script.isPublicKeyOut()) { - var pubkey = script.chunks[0].buf; - address = Address.fromPublicKey(new PublicKey(pubkey), this.network); - } else { - address = output.script.toAddress(this.network); - } - - var outputIndex = j; - - var timestamp = block.timestamp.getTime(); - var height = block.height; - - operations.push({ - type: action, - key: [DB.PREFIXES.OUTPUTS, address, timestamp, txid, outputIndex].join('-'), - value: [output.satoshis, script, height].join(':') - }); - } - - if(tx.isCoinbase()) { - continue; - } - - } - - setImmediate(function() { - callback(null, operations); - }); -}; - DB.prototype._onChainAddBlock = function(block, callback) { - - var self = this; - log.debug('DB handling new chain block'); // Remove block from mempool - self.mempool.removeBlock(block.hash); - - async.series([ - this._updateOutputs.bind(this, block, true), // add outputs - ], function(err, results) { - - if (err) { - return callback(err); - } - - var operations = []; - for (var i = 0; i < results.length; i++) { - operations = operations.concat(results[i]); - } - - log.debug('Updating the database with operations', operations); - - self.store.batch(operations, callback); - - }); - + this.mempool.removeBlock(block.hash); + this.blockHandler(block, true, callback); }; - DB.prototype._onChainRemoveBlock = function(block, callback) { + log.debug('DB removing chain block'); + this.blockHandler(block, false, callback); +}; +DB.prototype.blockHandler = function(block, add, callback) { var self = this; + var operations = []; - async.series([ - this._updateOutputs.bind(this, block, false), // remove outputs - ], function(err, results) { + async.eachSeries( + this.modules, + function(module, next) { + module['blockHandler'].call(module, block, add, function(err, ops) { + if(err) { + return next(err); + } - if (err) { - return callback(err); + operations = operations.concat(ops); + next(); + }); + }, + function(err) { + if (err) { + return callback(err); + } + + log.debug('Updating the database with operations', operations); + + self.store.batch(operations, callback); } - - var operations = []; - for (var i = 0; i < results.length; i++) { - operations = operations.concat(results[i]); - } - self.store.batch(operations, callback); - - }); - + ); }; DB.prototype.getAPIMethods = function() { - return [ - ['getTransaction', this, this.getTransaction, 2], - ['getBalance', this, this.getBalance, 2], - ['getOutputs', this, this.getOutputs, 2], - ['getUnspentOutputs', this, this.getUnspentOutputs, 2], - ['isSpent', this, this.isSpent, 2] + var methods = [ + ['getTransaction', this, this.getTransaction, 2] ]; + + for(var i = 0; i < this.modules.length; i++) { + methods = methods.concat(this.modules[i]['methods'].call(this.modules[i])); + } + + return methods; }; -DB.prototype.getBalance = function(address, queryMempool, callback) { - this.getUnspentOutputs(address, queryMempool, function(err, outputs) { - if(err) { - return callback(err); - } - - var satoshis = outputs.map(function(output) { - return output.satoshis; - }); - - var sum = satoshis.reduce(function(a, b) { - return a + b; - }, 0); - - return callback(null, sum); - }); -}; - -DB.prototype.getOutputs = function(address, queryMempool, callback) { - var self = this; - - var outputs = []; - var key = [DB.PREFIXES.OUTPUTS, address].join('-'); - - var stream = this.store.createReadStream({ - start: key, - end: key + '~' - }); - - stream.on('data', function(data) { - - var key = data.key.split('-'); - var value = data.value.split(':'); - - var output = { - address: key[1], - txid: key[3], - outputIndex: Number(key[4]), - satoshis: Number(value[0]), - script: value[1], - blockHeight: Number(value[2]) - }; - - outputs.push(output); - - }); - - var error; - - stream.on('error', function(streamError) { - if (streamError) { - error = streamError; - } - }); - - stream.on('close', function() { - if (error) { - return callback(error); - } - - if(queryMempool) { - outputs = outputs.concat(self.bitcoind.getMempoolOutputs(address)); - } - - callback(null, outputs); - }); - - return stream; - -}; - -DB.prototype.getUnspentOutputs = function(address, queryMempool, callback) { - - var self = this; - - this.getOutputs(address, queryMempool, function(err, outputs) { - if (err) { - return callback(err); - } else if(!outputs.length) { - return callback(new errors.NoOutputs('Address ' + address + ' has no outputs'), []); - } - - var isUnspent = function(output, callback) { - self.isUnspent(output, queryMempool, callback); - }; - - async.filter(outputs, isUnspent, function(results) { - callback(null, results); - }); - }); -}; - -DB.prototype.isUnspent = function(output, queryMempool, callback) { - this.isSpent(output, queryMempool, function(spent) { - callback(!spent); - }); -}; - -DB.prototype.isSpent = function(output, queryMempool, callback) { - var self = this; - var txid = output.prevTxId ? output.prevTxId.toString('hex') : output.txid; - - setImmediate(function() { - callback(self.bitcoind.isSpent(txid, output.outputIndex)); +DB.prototype.addModule = function(Module) { + var module = new Module({ + db: this, + bitcoind: this.bitcoind, + network: this.network }); + $.checkArgumentType(module, BaseModule); + this.modules.push(module); }; module.exports = DB; diff --git a/lib/module.js b/lib/module.js new file mode 100644 index 00000000..896f8a95 --- /dev/null +++ b/lib/module.js @@ -0,0 +1,28 @@ +'use strict'; + +var Module = function(options) { + this.db = options.db; + this.bitcoind = options.bitcoind; + this.network = options.network; +}; + +Module.prototype.blockHandler = function(block, add, callback) { + // implement in the child class + setImmediate(callback); +}; + +Module.prototype.methods = function() { + // Example: + // return [ + // ['getData', this, this.getData, 1] + // ]; + + return []; +}; + +// Example: +// Module.prototype.getData = function(arg1, callback) { +// +// }; + +module.exports = Module; \ No newline at end of file diff --git a/lib/modules/address.js b/lib/modules/address.js new file mode 100644 index 00000000..8d4f72e7 --- /dev/null +++ b/lib/modules/address.js @@ -0,0 +1,204 @@ +'use strict'; + +var BaseModule = require('../module'); +var inherits = require('util').inherits; +var async = require('async'); +var chainlib = require('chainlib'); +var log = chainlib.log; +var bitcore = require('bitcore'); +var PublicKey = bitcore.PublicKey; +var Address = bitcore.Address; + +var AddressModule = function(options) { + BaseModule.call(this, options); +}; + +inherits(AddressModule, BaseModule); + +AddressModule.PREFIXES = { + OUTPUTS: 'outs' +}; + +AddressModule.prototype.methods = function() { + return [ + ['getBalance', this, this.getBalance, 2], + ['getOutputs', this, this.getOutputs, 2], + ['getUnspentOutputs', this, this.getUnspentOutputs, 2], + ['isSpent', this, this.isSpent, 2] + ]; +}; + +AddressModule.prototype.blockHandler = function(block, addOutput, callback) { + var txs = this.db.getTransactionsFromBlock(block); + + log.debug('Updating output index'); + + var action = 'put'; + if (!addOutput) { + action = 'del'; + } + + var operations = []; + + for (var i = 0; i < txs.length; i++) { + + var tx = txs[i]; + var txid = tx.id; + var inputs = tx.inputs; + var outputs = tx.outputs; + + for (var j = 0; j < outputs.length; j++) { + var output = outputs[j]; + + var script = output.script; + if(!script) { + log.debug('Invalid script'); + continue; + } + + if (!script.isPublicKeyHashOut() && !script.isScriptHashOut() && !script.isPublicKeyOut()) { + // ignore for now + log.debug('script was not pubkeyhashout, scripthashout, or pubkeyout'); + continue; + } + + var address; + + if(script.isPublicKeyOut()) { + var pubkey = script.chunks[0].buf; + address = Address.fromPublicKey(new PublicKey(pubkey), this.network); + } else { + address = output.script.toAddress(this.network); + } + + var outputIndex = j; + + var timestamp = block.timestamp.getTime(); + var height = block.height; + + operations.push({ + type: action, + key: [AddressModule.PREFIXES.OUTPUTS, address, timestamp, txid, outputIndex].join('-'), + value: [output.satoshis, script, height].join(':') + }); + } + + if(tx.isCoinbase()) { + continue; + } + + } + + setImmediate(function() { + callback(null, operations); + }); +}; + +AddressModule.prototype.getBalance = function(address, queryMempool, callback) { + this.getUnspentOutputs(address, queryMempool, function(err, outputs) { + if(err) { + return callback(err); + } + + var satoshis = outputs.map(function(output) { + return output.satoshis; + }); + + var sum = satoshis.reduce(function(a, b) { + return a + b; + }, 0); + + return callback(null, sum); + }); +}; + +AddressModule.prototype.getOutputs = function(address, queryMempool, callback) { + var self = this; + + var outputs = []; + var key = [AddressModule.PREFIXES.OUTPUTS, address].join('-'); + + var stream = this.db.store.createReadStream({ + start: key, + end: key + '~' + }); + + stream.on('data', function(data) { + + var key = data.key.split('-'); + var value = data.value.split(':'); + + var output = { + address: key[1], + txid: key[3], + outputIndex: Number(key[4]), + satoshis: Number(value[0]), + script: value[1], + blockHeight: Number(value[2]) + }; + + outputs.push(output); + + }); + + var error; + + stream.on('error', function(streamError) { + if (streamError) { + error = streamError; + } + }); + + stream.on('close', function() { + if (error) { + return callback(error); + } + + if(queryMempool) { + outputs = outputs.concat(self.bitcoind.getMempoolOutputs(address)); + } + + callback(null, outputs); + }); + + return stream; + +}; + +AddressModule.prototype.getUnspentOutputs = function(address, queryMempool, callback) { + + var self = this; + + this.getOutputs(address, queryMempool, function(err, outputs) { + if (err) { + return callback(err); + } else if(!outputs.length) { + return callback(new errors.NoOutputs('Address ' + address + ' has no outputs'), []); + } + + var isUnspent = function(output, callback) { + self.isUnspent(output, queryMempool, callback); + }; + + async.filter(outputs, isUnspent, function(results) { + callback(null, results); + }); + }); +}; + +AddressModule.prototype.isUnspent = function(output, queryMempool, callback) { + this.isSpent(output, queryMempool, function(spent) { + callback(!spent); + }); +}; + +AddressModule.prototype.isSpent = function(output, queryMempool, callback) { + var self = this; + var txid = output.prevTxId ? output.prevTxId.toString('hex') : output.txid; + + setImmediate(function() { + callback(self.bitcoind.isSpent(txid, output.outputIndex)); + }); +}; + +module.exports = AddressModule; \ No newline at end of file