diff --git a/app/controllers/socket.js b/app/controllers/socket.js index 50d5733..825713b 100644 --- a/app/controllers/socket.js +++ b/app/controllers/socket.js @@ -41,7 +41,6 @@ module.exports.broadcastBlock = function(block) { }; module.exports.broadcastAddressTx = function(address, tx) { - console.log('bcatx = '+address+' '+tx); if (ios) ios.sockets. in (address).emit(address, tx); }; diff --git a/lib/BlockDb.js b/lib/BlockDb.js index b664bc6..b994054 100644 --- a/lib/BlockDb.js +++ b/lib/BlockDb.js @@ -5,6 +5,7 @@ require('classtool'); function spec(b) { + var superclass = b.superclass || require('events').EventEmitter; var TIMESTAMP_PREFIX = 'bts-'; // b-ts- => var PREV_PREFIX = 'bpr-'; // b-prev- => var NEXT_PREFIX = 'bne-'; // b-next- => @@ -22,7 +23,9 @@ function spec(b) { var Rpc = b.rpc || require('./Rpc').class(); var BlockDb = function() { + BlockDb.super(this, arguments); }; + BlockDb.superclass = superclass; BlockDb.prototype.close = function(cb) { db.close(cb); @@ -42,6 +45,7 @@ function spec(b) { // the block prev to the new block, nor TIP pointer // BlockDb.prototype.add = function(b, cb) { + var self = this; var time_key = TIMESTAMP_PREFIX + ( b.time || Math.round(new Date().getTime() / 1000) ); @@ -49,7 +53,12 @@ function spec(b) { .put(time_key, b.hash) .put(MAIN_PREFIX + b.hash, 1) .put(PREV_PREFIX + b.hash, b.previousblockhash) - .write(cb); + .write(function(err){ + if (!err) { + self.emit('new_block', {blockid: b.hash}); + } + cb(err); + }); }; BlockDb.prototype.getTip = function(cb) { diff --git a/lib/PeerSync.js b/lib/PeerSync.js index 5680939..f10fafb 100644 --- a/lib/PeerSync.js +++ b/lib/PeerSync.js @@ -6,7 +6,6 @@ function spec() { var CoinConst = require('bitcore/const'); var coinUtil = require('bitcore/util/util'); var Sync = require('./Sync').class(); - var Script = require('bitcore/Script').class(); var Peer = require('bitcore/Peer').class(); var config = require('../config/config'); var networks = require('bitcore/networks'); @@ -60,24 +59,12 @@ function spec() { }; PeerSync.prototype.handleTx = function(info) { - var self =this; var tx = info.message.tx.getStandardizedObject(); + tx.outs = info.message.tx.outs; + tx.ins = info.message.tx.ins; console.log('[p2p_sync] Handle tx: ' + tx.hash); tx.time = tx.time || Math.round(new Date().getTime() / 1000); - var to=0; - info.message.tx.outs.forEach( function(o) { - var s = new Script(o.s); - var addrs = self.sync.getAddrStr(s); - - // support only for p2pubkey p2pubkeyhash and p2sh - if (addrs.length === 1) { - tx.out[to].addrStr = addrs[0]; - tx.out[to].n = to; - } - to++; - }); - this.sync.storeTxs([tx], function(err) { if (err) { console.log('[p2p_sync] Error in handle TX: ' + JSON.stringify(err)); diff --git a/lib/Sync.js b/lib/Sync.js index f182e5b..c162ce3 100644 --- a/lib/Sync.js +++ b/lib/Sync.js @@ -4,31 +4,27 @@ require('classtool'); function spec() { - var sockets = require('../app/controllers/socket.js'); - var BlockDb = require('./BlockDb').class(); - var bitutil = require('bitcore/util/util'); + var sockets = require('../app/controllers/socket.js'); + var BlockDb = require('./BlockDb').class(); - // This is 0.1.2 => c++ version of base57-native - var base58 = require('base58-native'); - var encodedData = require('bitcore/util/EncodedData').class({base58: base58}); - var versionedData = require('bitcore/util/VersionedData').class({superclass: encodedData}); - var Address = require('bitcore/Address').class({superclass: versionedData}); - var TransactionDb = require('./TransactionDb').class(); + var TransactionDb = require('./TransactionDb').class(); var config = require('../config/config'); var networks = require('bitcore/networks'); - var Script = require('bitcore/Script').class(); - var async = require('async'); + var async = require('async'); - function Sync() { - } + function Sync() {} Sync.prototype.init = function(opts, cb) { var self = this; self.opts = opts; - this.bDb = new BlockDb(opts); - this.txDb = new TransactionDb(opts); - this.network = config.network === 'testnet' ? networks.testnet: networks.livenet; + this.bDb = new BlockDb(opts); + this.txDb = new TransactionDb(opts); + this.txDb.on('tx_for_address', this.handleTxForAddress.bind(this)); + this.txDb.on('new_tx', this.handleNewTx.bind(this)); + this.bDb.on('new_block', this.handleNewBlock.bind(this)); + + this.network = config.network === 'testnet' ? networks.testnet : networks.livenet; return cb(); }; @@ -43,8 +39,13 @@ function spec() { Sync.prototype.destroy = function(next) { var self = this; async.series([ - function(b) { self.bDb.drop(b); }, - function(b) { self.txDb.drop(b); }, + + function(b) { + self.bDb.drop(b); + }, + function(b) { + self.txDb.drop(b); + }, ], next); }; @@ -56,25 +57,25 @@ function spec() { * * Case 1) * A-B-C-D-E(TIP) - * \ + * \ * NEW * * 1) Declare D-E orphans (and possible invalidate TXs on them) * * Case 2) * A-B-C-D-E(TIP) - * \ + * \ * F-G-NEW * 1) Set F-G as connected (mark TXs as valid) - * 2) Declare D-E orphans (and possible invalidate TXs on them) + * 2) Declare D-E orphans (and possible invalidate TXs on them) * * * Case 3) * * A-B-C-D-E(TIP) ... NEW * - * NEW is ignored - * + * NEW is ignored + * */ Sync.prototype.storeTipBlock = function(b, allowReorgs, cb) { @@ -88,67 +89,66 @@ function spec() { var self = this; var oldTip, oldNext, needReorg = false; var newPrev = b.previousblockhash; - var updatedAddrs; async.series([ - function(c) { - self.bDb.has(b.hash, function(err, val) { - return c(err || - (val ? new Error('WARN: Ignoring already existing block:' + b.hash) : null )); - }); - }, - function(c) { - if (!allowReorgs) return c(); - self.bDb.has(newPrev, function(err, val) { - if (!val && newPrev.match(/^0+$/)) return c(); - return c(err || - (!val ? new Error('WARN: Ignoring block with non existing prev:' + b.hash) : null )); - }); - }, - function(c) { - self.txDb.createFromBlock(b, function(err, addrs) { - updatedAddrs = addrs; - return c(err); - }); - }, - function(c) { - if (!allowReorgs) return c(); - self.bDb.getTip(function(err, val) { - oldTip = val; - if (oldTip && newPrev !== oldTip) needReorg = true; - return c(); - }); - }, - function(c) { - if (!needReorg) return c(); + function(c) { + self.bDb.has(b.hash, function(err, val) { + return c(err || + (val ? new Error('WARN: Ignoring already existing block:' + b.hash) : null)); + }); + }, + function(c) { + if (!allowReorgs) return c(); - self.bDb.getNext( newPrev, function(err, val) { - if (err) return c(err); - oldNext = val; - return c(); - }); - }, - function(c) { - self.bDb.add(b, c); - }, - function(c) { - if (!needReorg) return c(); - console.log('NEW TIP: %s NEED REORG (old tip: %s)', b.hash, oldTip); - self.processReorg(oldTip, oldNext, newPrev, c); - }, - function(c) { - self.bDb.setTip(b.hash, function(err) { - if (err) return c(err); - self.bDb.setNext(newPrev, b.hash, function(err) { + self.bDb.has(newPrev, function(err, val) { + if (!val && newPrev.match(/^0+$/)) return c(); + return c(err || + (!val ? new Error('WARN: Ignoring block with non existing prev:' + b.hash) : null)); + }); + }, + function(c) { + self.txDb.createFromBlock(b, function(err) { return c(err); }); - }); - }], + }, + function(c) { + if (!allowReorgs) return c(); + self.bDb.getTip(function(err, val) { + oldTip = val; + if (oldTip && newPrev !== oldTip) needReorg = true; + return c(); + }); + }, + function(c) { + if (!needReorg) return c(); + + self.bDb.getNext(newPrev, function(err, val) { + if (err) return c(err); + oldNext = val; + return c(); + }); + }, + function(c) { + self.bDb.add(b, c); + }, + function(c) { + if (!needReorg) return c(); + console.log('NEW TIP: %s NEED REORG (old tip: %s)', b.hash, oldTip); + self.processReorg(oldTip, oldNext, newPrev, c); + }, + function(c) { + self.bDb.setTip(b.hash, function(err) { + if (err) return c(err); + self.bDb.setNext(newPrev, b.hash, function(err) { + return c(err); + }); + }); + } + ], function(err) { - if (!err) self._handleBroadcast(b.hash, null, updatedAddrs); - if (err && err.toString().match(/WARN/) ) { - err=null; + if (err && err.toString().match(/WARN/)) { + err = null; } return cb(err); }); @@ -162,40 +162,41 @@ function spec() { var orphanizeFrom; async.series([ - function(c) { - self.bDb.isMain(newPrev, function(err,val) { - if (!val) return c(); - console.log('# Reorg Case 1)'); - // case 1 - orphanizeFrom = oldNext; - return c(err); - }); - }, - function(c) { - if (orphanizeFrom) return c(); + function(c) { + self.bDb.isMain(newPrev, function(err, val) { + if (!val) return c(); - console.log('# Reorg Case 2)'); - self.setBranchConnectedBackwards(newPrev, function(err, yHash, newYHashNext) { - if (err) return c(err); - self.bDb.getNext(yHash, function(err, yHashNext) { - orphanizeFrom = yHashNext; - self.bDb.setNext(yHash, newYHashNext, function(err) { - return c(err); + console.log('# Reorg Case 1)'); + // case 1 + orphanizeFrom = oldNext; + return c(err); + }); + }, + function(c) { + if (orphanizeFrom) return c(); + + console.log('# Reorg Case 2)'); + self.setBranchConnectedBackwards(newPrev, function(err, yHash, newYHashNext) { + if (err) return c(err); + self.bDb.getNext(yHash, function(err, yHashNext) { + orphanizeFrom = yHashNext; + self.bDb.setNext(yHash, newYHashNext, function(err) { + return c(err); + }); }); }); - }); - }, - function(c) { - if (!orphanizeFrom) return c(); - self.setBranchOrphan(orphanizeFrom, function(err) { - return c(err); - }); - }, - ], - function(err) { - return cb(err); - }); + }, + function(c) { + if (!orphanizeFrom) return c(); + self.setBranchOrphan(orphanizeFrom, function(err) { + return c(err); + }); + }, + ], + function(err) { + return cb(err); + }); }; Sync.prototype.setBlockMain = function(hash, isMain, cb) { @@ -208,14 +209,16 @@ function spec() { Sync.prototype.setBranchOrphan = function(fromHash, cb) { var self = this, - hashInterator = fromHash; + hashInterator = fromHash; async.whilst( - function() { return hashInterator; }, + function() { + return hashInterator; + }, function(c) { self.setBlockMain(hashInterator, false, function(err) { if (err) return cb(err); - self.bDb.getNext(hashInterator, function (err, val) { + self.bDb.getNext(hashInterator, function(err, val) { hashInterator = val; return c(err); }); @@ -225,26 +228,28 @@ function spec() { Sync.prototype.setBranchConnectedBackwards = function(fromHash, cb) { var self = this, - hashInterator = fromHash, - lastHash = fromHash, - isMain; + hashInterator = fromHash, + lastHash = fromHash, + isMain; async.doWhilst( function(c) { - self.setBlockMain(hashInterator, true, function (err) { + self.setBlockMain(hashInterator, true, function(err) { if (err) return c(err); - self.bDb.getPrev(hashInterator, function (err, val) { + self.bDb.getPrev(hashInterator, function(err, val) { if (err) return c(err); - lastHash = hashInterator; + lastHash = hashInterator; hashInterator = val; - self.bDb.isMain(hashInterator, function (err, val) { + self.bDb.isMain(hashInterator, function(err, val) { isMain = val; return c(); }); }); }); }, - function() { return hashInterator && !isMain; }, + function() { + return hashInterator && !isMain; + }, function(err) { console.log('\tFound yBlock:', hashInterator); return cb(err, hashInterator, lastHash); @@ -252,79 +257,39 @@ function spec() { ); }; - Sync.prototype._handleBroadcast = function(hash, updatedTxs, updatedAddrs) { + Sync.prototype._handleBroadcast = function() { var self = this; + console.log('broadcast:' + self.opts.shouldBroadcast); + }; - if (self.opts.shouldBroadcast) { - if (hash) { - sockets.broadcastBlock(hash); - } - if (updatedTxs) { - updatedTxs.forEach(function(tx) { - sockets.broadcastTx(tx); - }); - } + Sync.prototype.handleTxForAddress = function(data) { + if (this.opts.shouldBroadcast) { + sockets.broadcastAddressTx(data.address, data.txid); + } + }; - if (updatedAddrs ) { - updatedAddrs.forEach(function(addr, txs){ - txs.forEach(function(addr, t){ - sockets.broadcastAddressTx(addr, t); - }); - }); - } + Sync.prototype.handleNewTx = function(data) { + if (this.opts.shouldBroadcast) { + sockets.broadcastTx(data.txid); + } + }; + + Sync.prototype.handleNewBlock = function(data) { + if (this.opts.shouldBroadcast) { + sockets.broadcastBlock(data.blockid); } }; Sync.prototype.storeTxs = function(txs, cb) { var self = this; - - self.txDb.createFromArray(txs, null, function(err, updatedAddrs) { + self.txDb.createFromArray(txs, null, function(err) { if (err) return cb(err); - - self._handleBroadcast(null, txs, updatedAddrs); return cb(err); }); }; - // TODO. replace with - // Script.prototype.getAddrStrs if that one get merged in bitcore - Sync.prototype.getAddrStr = function(s) { - var self = this; - - var addrStrs = []; - var type = s.classify(); - var addr; - - switch(type) { - case Script.TX_PUBKEY: - var chunk = s.captureOne(); - addr = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk)); - addrStrs = [ addr.toString() ]; - break; - case Script.TX_PUBKEYHASH: - addr = new Address(self.network.addressPubkey, s.captureOne()); - addrStrs = [ addr.toString() ]; - break; - case Script.TX_SCRIPTHASH: - addr = new Address(self.network.addressScript, s.captureOne()); - addrStrs = [ addr.toString() ]; - break; - case Script.TX_MULTISIG: - var chunks = s.capture(); - chunks.forEach(function(chunk) { - var a = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk)); - addrStrs.push(a.toString()); - }); - break; - case Script.TX_UNKNOWN: - break; - } - - return addrStrs; - }; return Sync; } module.defineClass(spec); - diff --git a/lib/TransactionDb.js b/lib/TransactionDb.js index 720e62f..d676890 100644 --- a/lib/TransactionDb.js +++ b/lib/TransactionDb.js @@ -5,36 +5,55 @@ require('classtool'); function spec(b) { + var superclass = b.superclass || require('events').EventEmitter; // blockHash -> txid mapping - var IN_BLK_PREFIX = 'txb-'; //txb-- => 1/0 (connected or not) + var IN_BLK_PREFIX = 'txb-'; //txb-- => 1/0 (connected or not) // Only for orphan blocks - var FROM_BLK_PREFIX = 'tx-'; //tx-- => 1 + var FROM_BLK_PREFIX = 'tx-'; //tx-- => 1 // to show tx outs - var OUTS_PREFIX = 'txo-'; //txo-- => [addr, btc_sat] - var SPEND_PREFIX = 'txs-'; //txs---- = ts + var OUTS_PREFIX = 'txo-'; //txo-- => [addr, btc_sat] + var SPEND_PREFIX = 'txs-'; //txs---- = ts // to sum up addr balance (only outs, spends are gotten later) - var ADDR_PREFIX = 'txa-'; //txa--- => + btc_sat:ts + var ADDR_PREFIX = 'txa-'; //txa--- => + btc_sat:ts // TODO: use bitcore networks module var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b'; var CONCURRENCY = 100; /** - * Module dependencies. - */ + * Module dependencies. + */ var Rpc = b.rpc || require('./Rpc').class(), - util = require('bitcore/util/util'), - levelup = require('levelup'), - async = require('async'), - config = require('../config/config'), - assert = require('assert'); + util = require('bitcore/util/util'), + levelup = require('levelup'), + async = require('async'), + config = require('../config/config'), + assert = require('assert'); var db = b.db || levelup(config.leveldb + '/txs'); + var Script = require('bitcore/Script').class(); + // This is 0.1.2 => c++ version of base57-native + var base58 = require('base58-native'); + var encodedData = require('bitcore/util/EncodedData').class({ + // TODO: check why c++ implementation differs + //base58: base58 + }); + var versionedData = require('bitcore/util/VersionedData').class({ + superclass: encodedData + }); + var Address = require('bitcore/Address').class({ + superclass: versionedData + }); + var bitutil = require('bitcore/util/util'); + var networks = require('bitcore/networks'); var TransactionDb = function() { + TransactionDb.super(this, arguments); + this.network = config.network === 'testnet' ? networks.testnet : networks.livenet; }; + TransactionDb.superclass = superclass; TransactionDb.prototype.close = function(cb) { db.close(cb); @@ -43,7 +62,7 @@ function spec(b) { TransactionDb.prototype.drop = function(cb) { var path = config.leveldb + '/txs'; db.close(function() { - require('leveldown').destroy(path, function () { + require('leveldown').destroy(path, function() { db = levelup(path); return cb(); }); @@ -54,7 +73,7 @@ function spec(b) { TransactionDb.prototype.has = function(txid, cb) { var k = OUTS_PREFIX + txid; - db.get(k, function (err,val) { + db.get(k, function(err, val) { var ret; @@ -81,26 +100,28 @@ function spec(b) { txid: txid, index: parseInt(index), }); - } - else { - r.spendTxId = txid; + } else { + r.spendTxId = txid; r.spendIndex = parseInt(index); - r.spendTs = parseInt(ts); + r.spendTs = parseInt(ts); } }; // This is not used now - TransactionDb.prototype.fromTxId = function(txid, cb) { + TransactionDb.prototype.fromTxId = function(txid, cb) { var self = this; var k = OUTS_PREFIX + txid; - var ret=[]; - var idx={}; + var ret = []; + var idx = {}; var i = 0; // outs. - db.createReadStream({start: k, end: k + '~'}) - .on('data', function (data) { + db.createReadStream({ + start: k, + end: k + '~' + }) + .on('data', function(data) { var k = data.key.split('-'); var v = data.value.split(':'); ret.push({ @@ -108,28 +129,31 @@ function spec(b) { value_sat: parseInt(v[1]), index: parseInt(k[2]), }); - idx[parseInt(k[2])]= i++; + idx[parseInt(k[2])] = i++; }) - .on('error', function (err) { + .on('error', function(err) { return cb(err); }) - .on('end', function () { + .on('end', function() { var k = SPEND_PREFIX + txid; - db.createReadStream({start: k, end: k + '~'}) - .on('data', function (data) { + db.createReadStream({ + start: k, + end: k + '~' + }) + .on('data', function(data) { var k = data.key.split('-'); var j = idx[parseInt(k[2])]; - assert(typeof j !== 'undefined','Spent could not be stored: tx ' + txid + - 'spend in TX:' + k[1] + ',' + k[2]+ ' j:' + j); + assert(typeof j !== 'undefined', 'Spent could not be stored: tx ' + txid + + 'spend in TX:' + k[1] + ',' + k[2] + ' j:' + j); - self._addSpendInfo(ret[j], k[3], k[4], data.value); + self._addSpendInfo(ret[j], k[3], k[4], data.value); }) - .on('error', function (err) { + .on('error', function(err) { return cb(err); }) - .on('end', function (err) { + .on('end', function(err) { return cb(err, ret); }); }); @@ -137,27 +161,30 @@ function spec(b) { TransactionDb.prototype._fillSpend = function(info, cb) { - var self = this; + var self = this; if (!info) return cb(); var k = SPEND_PREFIX + info.txid; - db.createReadStream({start: k, end: k + '~'}) - .on('data', function (data) { + db.createReadStream({ + start: k, + end: k + '~' + }) + .on('data', function(data) { var k = data.key.split('-'); - self._addSpendInfo(info.vout[k[2]], k[3], k[4], data.value); + self._addSpendInfo(info.vout[k[2]], k[3], k[4], data.value); }) - .on('error', function (err) { + .on('error', function(err) { return cb(err); }) - .on('end', function (err) { + .on('end', function(err) { return cb(err); }); }; TransactionDb.prototype._fillOutpoints = function(info, cb) { - var self = this; + var self = this; if (!info || info.isCoinBase) return cb(); @@ -165,64 +192,59 @@ function spec(b) { var incompleteInputs = 0; async.eachLimit(info.vin, CONCURRENCY, function(i, c_in) { - self.fromTxIdN(i.txid, i.vout, function(err, ret) { - //console.log('[TransactionDb.js.154:ret:]',ret); //TODO - if (!ret || !ret.addr || !ret.valueSat ) { - console.log('Could not get TXouts in %s,%d from %s ', i.txid, i.vout, info.txid); - if (ret) i.unconfirmedInput = ret.unconfirmedInput; - incompleteInputs = 1; - return c_in(); // error not scalated - } + self.fromTxIdN(i.txid, i.vout, function(err, ret) { + //console.log('[TransactionDb.js.154:ret:]',ret); //TODO + if (!ret || !ret.addr || !ret.valueSat) { + console.log('Could not get TXouts in %s,%d from %s ', i.txid, i.vout, info.txid); + if (ret) i.unconfirmedInput = ret.unconfirmedInput; + incompleteInputs = 1; + return c_in(); // error not scalated + } - info.firstSeenTs = ret.spendTs; - i.unconfirmedInput = i.unconfirmedInput; - i.addr = ret.addr; - i.valueSat = ret.valueSat; - i.value = ret.valueSat / util.COIN; + info.firstSeenTs = ret.spendTs; + i.unconfirmedInput = i.unconfirmedInput; + i.addr = ret.addr; + i.valueSat = ret.valueSat; + i.value = ret.valueSat / util.COIN; - // Double spend? - if ( ret.multipleSpendAttempt || - !ret.spendTxId || + // Double spend? + if (ret.multipleSpendAttempt || !ret.spendTxId || (ret.spendTxId && ret.spendTxId !== info.txid) ) { - if (ret.multipleSpendAttempts ) { - ret.multipleSpendAttempts.each(function(mul) { - if (mul.spendTxId !== info.txid) { - i.doubleSpendTxID = ret.spendTxId; - i.doubleSpendIndex = ret.spendIndex; - } - }); + if (ret.multipleSpendAttempts) { + ret.multipleSpendAttempts.each(function(mul) { + if (mul.spendTxId !== info.txid) { + i.doubleSpendTxID = ret.spendTxId; + i.doubleSpendIndex = ret.spendIndex; + } + }); + } else if (!ret.spendTxId) { + i.dbError = 'Input spend not registered'; + } else { + i.doubleSpendTxID = ret.spendTxId; + i.doubleSpendIndex = ret.spendIndex; + } + } else { + i.doubleSpendTxID = null; } - else if (!ret.spendTxId) { - i.dbError = 'Input spend not registered'; - } - else { - i.doubleSpendTxID = ret.spendTxId; - i.doubleSpendIndex = ret.spendIndex; - } - } - else { - i.doubleSpendTxID = null; - } - valueIn += i.valueSat; - return c_in(); + valueIn += i.valueSat; + return c_in(); + }); + }, + function() { + if (!incompleteInputs) { + info.valueIn = valueIn / util.COIN; + info.fees = (valueIn - parseInt(info.valueOut * util.COIN)) / util.COIN; + } else { + info.incompleteInputs = 1; + } + return cb(); }); - }, - function () { - if (! incompleteInputs ) { - info.valueIn = valueIn / util.COIN; - info.fees = (valueIn - parseInt(info.valueOut * util.COIN)) / util.COIN ; - } - else { - info.incompleteInputs = 1; - } - return cb(); - }); }; TransactionDb.prototype._getInfo = function(txid, next) { - var self = this; + var self = this; Rpc.getRpcInfo(txid, function(err, info) { if (err) return next(err); @@ -236,13 +258,16 @@ function spec(b) { }; - TransactionDb.prototype.fromIdWithInfo = function (txid, cb) { + TransactionDb.prototype.fromIdWithInfo = function(txid, cb) { var self = this; self._getInfo(txid, function(err, info) { if (err) return cb(err); - if (!info ) return cb(); - return cb(err, {txid: txid, info: info} ); + if (!info) return cb(); + return cb(err, { + txid: txid, + info: info + }); }); }; @@ -250,37 +275,42 @@ function spec(b) { var self = this; var k = OUTS_PREFIX + txid + '-' + n; - db.get(k, function (err,val) { - if (!val || (err && err.notFound) ) { - return cb(null, { unconfirmedInput: 1} ); + db.get(k, function(err, val) { + if (!val || (err && err.notFound)) { + return cb(null, { + unconfirmedInput: 1 + }); } var a = val.split(':'); var ret = { - addr: a[0], + addr: a[0], valueSat: parseInt(a[1]), }; // Spend? var k = SPEND_PREFIX + txid + '-' + n; - db.createReadStream({start: k, end: k + '~'}) - .on('data', function (data) { - var k = data.key.split('-'); - self._addSpendInfo(ret, k[3], k[4], data.value); + db.createReadStream({ + start: k, + end: k + '~' }) - .on('error', function (error) { - return cb(error); - }) - .on('end', function () { - return cb(null,ret); - }); + .on('data', function(data) { + var k = data.key.split('-'); + self._addSpendInfo(ret, k[3], k[4], data.value); + }) + .on('error', function(error) { + return cb(error); + }) + .on('end', function() { + return cb(null, ret); + }); }); }; - TransactionDb.prototype.fillConfirmations = function(o, cb) { + TransactionDb.prototype.fillConfirmations = function(o, cb) { var self = this; - self.isConfirmed(o.txid, function(err,is) { + self.isConfirmed(o.txid, function(err, is) { if (err) return cb(err); o.isConfirmed = is; @@ -289,20 +319,19 @@ function spec(b) { if (o.multipleSpendAttempts) { async.each(o.multipleSpendAttempts, - function (oi, e_c) { - self.isConfirmed(oi.spendTxId, function(err,is) { + function(oi, e_c) { + self.isConfirmed(oi.spendTxId, function(err, is) { if (err) return; if (is) { o.spendTxId = oi.spendTxId; - o.index = oi.index; + o.index = oi.index; o.spendIsConfirmed = 1; } return e_c(); }); }, cb); - } - else { - self.isConfirmed(o.spendTxId, function(err,is) { + } else { + self.isConfirmed(o.spendTxId, function(err, is) { if (err) return cb(err); o.spendIsConfirmed = is; return cb(); @@ -311,14 +340,17 @@ function spec(b) { }); }; - TransactionDb.prototype.fromAddr = function(addr, cb) { + TransactionDb.prototype.fromAddr = function(addr, cb) { var self = this; var k = ADDR_PREFIX + addr; - var ret=[]; + var ret = []; - db.createReadStream({start: k, end: k + '~'}) - .on('data', function (data) { + db.createReadStream({ + start: k, + end: k + '~' + }) + .on('data', function(data) { var k = data.key.split('-'); var v = data.value.split(':'); ret.push({ @@ -328,32 +360,35 @@ function spec(b) { ts: parseInt(v[1]), }); }) - .on('error', function (err) { + .on('error', function(err) { return cb(err); }) - .on('end', function () { + .on('end', function() { async.each(ret, function(o, e_c) { - var k = SPEND_PREFIX + o.txid + '-' + o.index; - db.createReadStream({start: k, end: k + '~'}) - .on('data', function (data) { - var k = data.key.split('-'); - self._addSpendInfo(o, k[3], k[4], data.value); + var k = SPEND_PREFIX + o.txid + '-' + o.index; + db.createReadStream({ + start: k, + end: k + '~' }) - .on('error', function (err) { - return e_c(err); - }) - .on('end', function (err) { - return e_c(err); + .on('data', function(data) { + var k = data.key.split('-'); + self._addSpendInfo(o, k[3], k[4], data.value); + }) + .on('error', function(err) { + return e_c(err); + }) + .on('end', function(err) { + return e_c(err); + }); + }, + function() { + async.each(ret, function(o, e_c) { + self.fillConfirmations(o, e_c); + }, function(err) { + return cb(err, ret); }); - }, - function() { - async.each(ret, function(o, e_c){ - self.fillConfirmations(o,e_c); - },function(err) { - return cb(err,ret); }); - }); }); }; @@ -362,46 +397,102 @@ function spec(b) { TransactionDb.prototype.removeFromTxId = function(txid, cb) { async.series([ - function(c) { - db.createReadStream({ + + function(c) { + db.createReadStream({ start: OUTS_PREFIX + txid, end: OUTS_PREFIX + txid + '~', }).pipe( - db.createWriteStream({type:'del'}) + db.createWriteStream({ + type: 'del' + }) ).on('close', c); - }, - function(c) { - db.createReadStream({ + }, + function(c) { + db.createReadStream({ start: SPEND_PREFIX + txid, end: SPEND_PREFIX + txid + '~' }) - .pipe( - db.createWriteStream({type:'del'}) + .pipe( + db.createWriteStream({ + type: 'del' + }) ).on('close', c); - }], + } + ], function(err) { cb(err); - }); - + }); + }; + // TODO. replace with + // Script.prototype.getAddrStrs if that one get merged in bitcore + TransactionDb.prototype.getAddrStr = function(s) { + var self = this; + + var addrStrs = []; + var type = s.classify(); + var addr; + + switch (type) { + case Script.TX_PUBKEY: + var chunk = s.captureOne(); + addr = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk)); + addrStrs.push(addr.toString()); + break; + case Script.TX_PUBKEYHASH: + addr = new Address(self.network.addressPubkey, s.captureOne()); + addrStrs.push(addr.toString()); + break; + case Script.TX_SCRIPTHASH: + addr = new Address(self.network.addressScript, s.captureOne()); + addrStrs.push(addr.toString()); + break; + case Script.TX_MULTISIG: + var chunks = s.capture(); + chunks.forEach(function(chunk) { + var a = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk)); + addrStrs.push(a.toString()); + }); + break; + case Script.TX_UNKNOWN: + break; + } + + return addrStrs; + }; TransactionDb.prototype.adaptTxObject = function(txInfo) { - + var self = this; // adapt bitcore TX object to bitcoind JSON response txInfo.txid = txInfo.hash; - var count = 0; - txInfo.vin = txInfo.in.map(function (txin) { - var i = {}; + var to=0; + var tx = txInfo; + tx.outs.forEach( function(o) { + var s = new Script(o.s); + var addrs = self.getAddrStr(s); + + // support only for p2pubkey p2pubkeyhash and p2sh + if (addrs.length === 1) { + tx.out[to].addrStr = addrs[0]; + tx.out[to].n = to; + } + to++; + }); + + var count = 0; + txInfo.vin = txInfo.in.map(function(txin) { + var i = {}; + if (txin.coinbase) { txInfo.isCoinBase = true; - } - else { - i.txid= txin.prev_out.hash; - i.vout= txin.prev_out.n; + } else { + i.txid = txin.prev_out.hash; + i.vout = txin.prev_out.n; } i.n = count++; return i; @@ -409,13 +500,13 @@ function spec(b) { count = 0; - txInfo.vout = txInfo.out.map(function (txout) { + txInfo.vout = txInfo.out.map(function(txout) { var o = {}; - + o.value = txout.value; o.n = count++; - if (txout.addrStr){ + if (txout.addrStr) { o.scriptPubKey = {}; o.scriptPubKey.addresses = [txout.addrStr]; } @@ -427,26 +518,26 @@ function spec(b) { TransactionDb.prototype.add = function(tx, blockhash, cb) { var self = this; - var addrs = []; + var addrs = []; if (tx.hash) self.adaptTxObject(tx); var ts = tx.time; async.series([ - // Input Outpoints (mark them as spended) + // Input Outpoints (mark them as spent) function(p_c) { if (tx.isCoinBase) return p_c(); async.forEachLimit(tx.vin, CONCURRENCY, function(i, next_out) { db.batch() - .put( SPEND_PREFIX + i.txid + '-' + i.vout + '-' + tx.txid + '-' + i.n, - ts || 0) + .put(SPEND_PREFIX + i.txid + '-' + i.vout + '-' + tx.txid + '-' + i.n, + ts || 0) .write(next_out); }, - function (err) { + function(err) { return p_c(err); - }); + }); }, // Parse Outputs function(p_c) { @@ -454,50 +545,60 @@ function spec(b) { function(o, next_out) { if (o.value && o.scriptPubKey && o.scriptPubKey.addresses && - o.scriptPubKey.addresses[0] && - ! o.scriptPubKey.addresses[1] // TODO : not supported - ){ - // This is only to broadcast (WIP) - if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) { - addrs.push(o.scriptPubKey.addresses[0]); + o.scriptPubKey.addresses[0] && !o.scriptPubKey.addresses[1] // TODO : not supported + ) { + var addr = o.scriptPubKey.addresses[0]; + var sat = Math.round(o.value * util.COIN); + + if (addrs.indexOf(addr) === -1) { + addrs.push(addr); } - var addr = o.scriptPubKey.addresses[0]; - var sat = Math.round(o.value * util.COIN); - // existed? - var k = OUTS_PREFIX + tx.txid + '-' + o.n; + var k = OUTS_PREFIX + tx.txid + '-' + o.n; db.get(k, function(err) { - if (err && err.notFound) { + if (err && err.notFound) { db.batch() - .put( k, addr + ':' + sat) - .put( ADDR_PREFIX + addr + '-' + tx.txid + '-' + o.n, sat+':'+ts) + .put(k, addr + ':' + sat) + .put(ADDR_PREFIX + addr + '-' + tx.txid + '-' + o.n, sat + ':' + ts) .write(next_out); - } - else { + } else { return next_out(); } }); - } - else { - //console.log ('WARN in TX: %s could not parse OUTPUT %d', tx.txid, o.n); + } else { return next_out(); } }, - function (err) { + function(err) { if (err) { - console.log('ERR at TX %s: %s', tx.txid, err); + console.log('ERR at TX %s: %s', tx.txid, err); return cb(err); } return p_c(); - }); + }); }, - function (p_c) { - if (!blockhash) return p_c(); - return self.setConfirmation(tx.txid,blockhash, true, p_c); + function(p_c) { + if (!blockhash) { + return p_c(); + } + return self.setConfirmation(tx.txid, blockhash, true, p_c); }, ], function(err) { - return cb(err, addrs); + if (addrs.length > 0 && !blockhash) { + // only emit if we are processing a single tx (not from a block) + addrs.forEach(function(addr) { + self.emit('tx_for_address', { + address: addr, + txid: tx.txid + }); + }); + } + self.emit('new_tx', { + txid: tx.txid + }); + + return cb(err); }); }; @@ -510,7 +611,7 @@ function spec(b) { db.batch() .put(IN_BLK_PREFIX + txId + '-' + blockHash, confirmed) - .put(FROM_BLK_PREFIX + blockHash + '-' + txId, 1) + .put(FROM_BLK_PREFIX + blockHash + '-' + txId, 1) .write(c); }; @@ -520,15 +621,18 @@ function spec(b) { var k = IN_BLK_PREFIX + txId; var ret = false; - db.createReadStream({start: k, end: k + '~'}) - .on('data', function (data) { + db.createReadStream({ + start: k, + end: k + '~' + }) + .on('data', function(data) { if (data.value === '1') ret = true; }) - .on('error', function (err) { + .on('error', function(err) { return c(err); }) - .on('end', function (err) { - return c(err,ret); + .on('end', function(err) { + return c(err, ret); }); }; @@ -539,21 +643,24 @@ function spec(b) { var k = FROM_BLK_PREFIX + hash; var k2 = IN_BLK_PREFIX; // This is slow, but prevent us to create a new block->tx index. - db.createReadStream({start: k, end: k + '~'}) - .on('data', function (data) { - var ks = data.key.split('-'); - toChange.push({ - key: k2 + ks[2] + '-' + ks[1], - type: 'put', - value: isMain?1:0, + db.createReadStream({ + start: k, + end: k + '~' + }) + .on('data', function(data) { + var ks = data.key.split('-'); + toChange.push({ + key: k2 + ks[2] + '-' + ks[1], + type: 'put', + value: isMain ? 1 : 0, }); }) - .on('error', function (err) { + .on('error', function(err) { return cb(err); }) - .on('end', function (err) { + .on('end', function(err) { if (err) return cb(err); - console.log('\t%s %d Txs', isMain?'Confirming':'Invalidating',toChange.length); + console.log('\t%s %d Txs', isMain ? 'Confirming' : 'Invalidating', toChange.length); db.batch(toChange, cb); }); }; @@ -561,31 +668,25 @@ function spec(b) { // txs can be a [hashes] or [txObjects] TransactionDb.prototype.createFromArray = function(txs, blockHash, next) { var self = this; - if (!txs) return next(); - var updatedAddrs = []; // TODO - async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) { - if (typeof t === 'string') { + if (typeof t === 'string') { + // TODO: parse it from networks.genesisTX? + if (t === genesisTXID) return each_cb(); - // Is it from genesis block? (testnet==livenet) - // TODO: parse it from networks.genesisTX? - if (t === genesisTXID) return each_cb(); + Rpc.getRpcInfo(t, function(err, inInfo) { + if (!inInfo) return each_cb(err); - Rpc.getRpcInfo(t, function(err, inInfo) { - if (!inInfo) return each_cb(err); - - return self.add(inInfo, blockHash, each_cb); - }); - } - else { - return self.add(t, blockHash, each_cb); - } - }, - function(err) { - return next(err, updatedAddrs); - }); + return self.add(inInfo, blockHash, each_cb); + }); + } else { + return self.add(t, blockHash, each_cb); + } + }, + function(err) { + return next(err); + }); }; @@ -598,17 +699,17 @@ function spec(b) { TransactionDb.prototype.setOrphan = function(blockHash, next) { -// var self = this; + // var self = this; //Get Txs -// TODO + // TODO - //Mark Tx's output as fromOrphan - //Mark Tx's outpoiunt as fromOrphan. Undo spents + //Mark Tx's output as fromOrphan + //Mark Tx's outpoiunt as fromOrphan. Undo spents return next(); }; - return TransactionDb; + return TransactionDb; } module.defineClass(spec);