fix address streamed data

This commit is contained in:
Manuel Araoz 2014-02-14 16:35:32 -03:00
parent b3ffb86a40
commit 5c9c7aeb5c
5 changed files with 484 additions and 423 deletions

View File

@ -41,7 +41,6 @@ module.exports.broadcastBlock = function(block) {
}; };
module.exports.broadcastAddressTx = function(address, tx) { module.exports.broadcastAddressTx = function(address, tx) {
console.log('bcatx = '+address+' '+tx);
if (ios) ios.sockets. in (address).emit(address, tx); if (ios) ios.sockets. in (address).emit(address, tx);
}; };

View File

@ -5,6 +5,7 @@ require('classtool');
function spec(b) { function spec(b) {
var superclass = b.superclass || require('events').EventEmitter;
var TIMESTAMP_PREFIX = 'bts-'; // b-ts-<ts> => <hash> var TIMESTAMP_PREFIX = 'bts-'; // b-ts-<ts> => <hash>
var PREV_PREFIX = 'bpr-'; // b-prev-<hash> => <prev_hash> var PREV_PREFIX = 'bpr-'; // b-prev-<hash> => <prev_hash>
var NEXT_PREFIX = 'bne-'; // b-next-<hash> => <next_hash> var NEXT_PREFIX = 'bne-'; // b-next-<hash> => <next_hash>
@ -22,7 +23,9 @@ function spec(b) {
var Rpc = b.rpc || require('./Rpc').class(); var Rpc = b.rpc || require('./Rpc').class();
var BlockDb = function() { var BlockDb = function() {
BlockDb.super(this, arguments);
}; };
BlockDb.superclass = superclass;
BlockDb.prototype.close = function(cb) { BlockDb.prototype.close = function(cb) {
db.close(cb); db.close(cb);
@ -42,6 +45,7 @@ function spec(b) {
// the block prev to the new block, nor TIP pointer // the block prev to the new block, nor TIP pointer
// //
BlockDb.prototype.add = function(b, cb) { BlockDb.prototype.add = function(b, cb) {
var self = this;
var time_key = TIMESTAMP_PREFIX + var time_key = TIMESTAMP_PREFIX +
( b.time || Math.round(new Date().getTime() / 1000) ); ( b.time || Math.round(new Date().getTime() / 1000) );
@ -49,7 +53,12 @@ function spec(b) {
.put(time_key, b.hash) .put(time_key, b.hash)
.put(MAIN_PREFIX + b.hash, 1) .put(MAIN_PREFIX + b.hash, 1)
.put(PREV_PREFIX + b.hash, b.previousblockhash) .put(PREV_PREFIX + b.hash, b.previousblockhash)
.write(cb); .write(function(err){
if (!err) {
self.emit('new_block', {blockid: b.hash});
}
cb(err);
});
}; };
BlockDb.prototype.getTip = function(cb) { BlockDb.prototype.getTip = function(cb) {

View File

@ -6,7 +6,6 @@ function spec() {
var CoinConst = require('bitcore/const'); var CoinConst = require('bitcore/const');
var coinUtil = require('bitcore/util/util'); var coinUtil = require('bitcore/util/util');
var Sync = require('./Sync').class(); var Sync = require('./Sync').class();
var Script = require('bitcore/Script').class();
var Peer = require('bitcore/Peer').class(); var Peer = require('bitcore/Peer').class();
var config = require('../config/config'); var config = require('../config/config');
var networks = require('bitcore/networks'); var networks = require('bitcore/networks');
@ -60,24 +59,12 @@ function spec() {
}; };
PeerSync.prototype.handleTx = function(info) { PeerSync.prototype.handleTx = function(info) {
var self =this;
var tx = info.message.tx.getStandardizedObject(); var tx = info.message.tx.getStandardizedObject();
tx.outs = info.message.tx.outs;
tx.ins = info.message.tx.ins;
console.log('[p2p_sync] Handle tx: ' + tx.hash); console.log('[p2p_sync] Handle tx: ' + tx.hash);
tx.time = tx.time || Math.round(new Date().getTime() / 1000); tx.time = tx.time || Math.round(new Date().getTime() / 1000);
var to=0;
info.message.tx.outs.forEach( function(o) {
var s = new Script(o.s);
var addrs = self.sync.getAddrStr(s);
// support only for p2pubkey p2pubkeyhash and p2sh
if (addrs.length === 1) {
tx.out[to].addrStr = addrs[0];
tx.out[to].n = to;
}
to++;
});
this.sync.storeTxs([tx], function(err) { this.sync.storeTxs([tx], function(err) {
if (err) { if (err) {
console.log('[p2p_sync] Error in handle TX: ' + JSON.stringify(err)); console.log('[p2p_sync] Error in handle TX: ' + JSON.stringify(err));

View File

@ -4,31 +4,27 @@ require('classtool');
function spec() { function spec() {
var sockets = require('../app/controllers/socket.js'); var sockets = require('../app/controllers/socket.js');
var BlockDb = require('./BlockDb').class(); var BlockDb = require('./BlockDb').class();
var bitutil = require('bitcore/util/util');
// This is 0.1.2 => c++ version of base57-native var TransactionDb = require('./TransactionDb').class();
var base58 = require('base58-native');
var encodedData = require('bitcore/util/EncodedData').class({base58: base58});
var versionedData = require('bitcore/util/VersionedData').class({superclass: encodedData});
var Address = require('bitcore/Address').class({superclass: versionedData});
var TransactionDb = require('./TransactionDb').class();
var config = require('../config/config'); var config = require('../config/config');
var networks = require('bitcore/networks'); var networks = require('bitcore/networks');
var Script = require('bitcore/Script').class(); var async = require('async');
var async = require('async');
function Sync() { function Sync() {}
}
Sync.prototype.init = function(opts, cb) { Sync.prototype.init = function(opts, cb) {
var self = this; var self = this;
self.opts = opts; self.opts = opts;
this.bDb = new BlockDb(opts); this.bDb = new BlockDb(opts);
this.txDb = new TransactionDb(opts); this.txDb = new TransactionDb(opts);
this.network = config.network === 'testnet' ? networks.testnet: networks.livenet; this.txDb.on('tx_for_address', this.handleTxForAddress.bind(this));
this.txDb.on('new_tx', this.handleNewTx.bind(this));
this.bDb.on('new_block', this.handleNewBlock.bind(this));
this.network = config.network === 'testnet' ? networks.testnet : networks.livenet;
return cb(); return cb();
}; };
@ -43,8 +39,13 @@ function spec() {
Sync.prototype.destroy = function(next) { Sync.prototype.destroy = function(next) {
var self = this; var self = this;
async.series([ async.series([
function(b) { self.bDb.drop(b); },
function(b) { self.txDb.drop(b); }, function(b) {
self.bDb.drop(b);
},
function(b) {
self.txDb.drop(b);
},
], next); ], next);
}; };
@ -56,25 +57,25 @@ function spec() {
* *
* Case 1) * Case 1)
* A-B-C-D-E(TIP) * A-B-C-D-E(TIP)
* \ * \
* NEW * NEW
* *
* 1) Declare D-E orphans (and possible invalidate TXs on them) * 1) Declare D-E orphans (and possible invalidate TXs on them)
* *
* Case 2) * Case 2)
* A-B-C-D-E(TIP) * A-B-C-D-E(TIP)
* \ * \
* F-G-NEW * F-G-NEW
* 1) Set F-G as connected (mark TXs as valid) * 1) Set F-G as connected (mark TXs as valid)
* 2) Declare D-E orphans (and possible invalidate TXs on them) * 2) Declare D-E orphans (and possible invalidate TXs on them)
* *
* *
* Case 3) * Case 3)
* *
* A-B-C-D-E(TIP) ... NEW * A-B-C-D-E(TIP) ... NEW
* *
* NEW is ignored * NEW is ignored
* *
*/ */
Sync.prototype.storeTipBlock = function(b, allowReorgs, cb) { Sync.prototype.storeTipBlock = function(b, allowReorgs, cb) {
@ -88,67 +89,66 @@ function spec() {
var self = this; var self = this;
var oldTip, oldNext, needReorg = false; var oldTip, oldNext, needReorg = false;
var newPrev = b.previousblockhash; var newPrev = b.previousblockhash;
var updatedAddrs;
async.series([ async.series([
function(c) {
self.bDb.has(b.hash, function(err, val) {
return c(err ||
(val ? new Error('WARN: Ignoring already existing block:' + b.hash) : null ));
});
},
function(c) {
if (!allowReorgs) return c();
self.bDb.has(newPrev, function(err, val) { function(c) {
if (!val && newPrev.match(/^0+$/)) return c(); self.bDb.has(b.hash, function(err, val) {
return c(err || return c(err ||
(!val ? new Error('WARN: Ignoring block with non existing prev:' + b.hash) : null )); (val ? new Error('WARN: Ignoring already existing block:' + b.hash) : null));
}); });
}, },
function(c) { function(c) {
self.txDb.createFromBlock(b, function(err, addrs) { if (!allowReorgs) return c();
updatedAddrs = addrs;
return c(err);
});
},
function(c) {
if (!allowReorgs) return c();
self.bDb.getTip(function(err, val) {
oldTip = val;
if (oldTip && newPrev !== oldTip) needReorg = true;
return c();
});
},
function(c) {
if (!needReorg) return c();
self.bDb.getNext( newPrev, function(err, val) { self.bDb.has(newPrev, function(err, val) {
if (err) return c(err); if (!val && newPrev.match(/^0+$/)) return c();
oldNext = val; return c(err ||
return c(); (!val ? new Error('WARN: Ignoring block with non existing prev:' + b.hash) : null));
}); });
}, },
function(c) { function(c) {
self.bDb.add(b, c); self.txDb.createFromBlock(b, function(err) {
},
function(c) {
if (!needReorg) return c();
console.log('NEW TIP: %s NEED REORG (old tip: %s)', b.hash, oldTip);
self.processReorg(oldTip, oldNext, newPrev, c);
},
function(c) {
self.bDb.setTip(b.hash, function(err) {
if (err) return c(err);
self.bDb.setNext(newPrev, b.hash, function(err) {
return c(err); return c(err);
}); });
}); },
}], function(c) {
if (!allowReorgs) return c();
self.bDb.getTip(function(err, val) {
oldTip = val;
if (oldTip && newPrev !== oldTip) needReorg = true;
return c();
});
},
function(c) {
if (!needReorg) return c();
self.bDb.getNext(newPrev, function(err, val) {
if (err) return c(err);
oldNext = val;
return c();
});
},
function(c) {
self.bDb.add(b, c);
},
function(c) {
if (!needReorg) return c();
console.log('NEW TIP: %s NEED REORG (old tip: %s)', b.hash, oldTip);
self.processReorg(oldTip, oldNext, newPrev, c);
},
function(c) {
self.bDb.setTip(b.hash, function(err) {
if (err) return c(err);
self.bDb.setNext(newPrev, b.hash, function(err) {
return c(err);
});
});
}
],
function(err) { function(err) {
if (!err) self._handleBroadcast(b.hash, null, updatedAddrs); if (err && err.toString().match(/WARN/)) {
if (err && err.toString().match(/WARN/) ) { err = null;
err=null;
} }
return cb(err); return cb(err);
}); });
@ -162,40 +162,41 @@ function spec() {
var orphanizeFrom; var orphanizeFrom;
async.series([ async.series([
function(c) {
self.bDb.isMain(newPrev, function(err,val) {
if (!val) return c();
console.log('# Reorg Case 1)'); function(c) {
// case 1 self.bDb.isMain(newPrev, function(err, val) {
orphanizeFrom = oldNext; if (!val) return c();
return c(err);
});
},
function(c) {
if (orphanizeFrom) return c();
console.log('# Reorg Case 2)'); console.log('# Reorg Case 1)');
self.setBranchConnectedBackwards(newPrev, function(err, yHash, newYHashNext) { // case 1
if (err) return c(err); orphanizeFrom = oldNext;
self.bDb.getNext(yHash, function(err, yHashNext) { return c(err);
orphanizeFrom = yHashNext; });
self.bDb.setNext(yHash, newYHashNext, function(err) { },
return c(err); function(c) {
if (orphanizeFrom) return c();
console.log('# Reorg Case 2)');
self.setBranchConnectedBackwards(newPrev, function(err, yHash, newYHashNext) {
if (err) return c(err);
self.bDb.getNext(yHash, function(err, yHashNext) {
orphanizeFrom = yHashNext;
self.bDb.setNext(yHash, newYHashNext, function(err) {
return c(err);
});
}); });
}); });
}); },
}, function(c) {
function(c) { if (!orphanizeFrom) return c();
if (!orphanizeFrom) return c(); self.setBranchOrphan(orphanizeFrom, function(err) {
self.setBranchOrphan(orphanizeFrom, function(err) { return c(err);
return c(err); });
}); },
}, ],
], function(err) {
function(err) { return cb(err);
return cb(err); });
});
}; };
Sync.prototype.setBlockMain = function(hash, isMain, cb) { Sync.prototype.setBlockMain = function(hash, isMain, cb) {
@ -208,14 +209,16 @@ function spec() {
Sync.prototype.setBranchOrphan = function(fromHash, cb) { Sync.prototype.setBranchOrphan = function(fromHash, cb) {
var self = this, var self = this,
hashInterator = fromHash; hashInterator = fromHash;
async.whilst( async.whilst(
function() { return hashInterator; }, function() {
return hashInterator;
},
function(c) { function(c) {
self.setBlockMain(hashInterator, false, function(err) { self.setBlockMain(hashInterator, false, function(err) {
if (err) return cb(err); if (err) return cb(err);
self.bDb.getNext(hashInterator, function (err, val) { self.bDb.getNext(hashInterator, function(err, val) {
hashInterator = val; hashInterator = val;
return c(err); return c(err);
}); });
@ -225,26 +228,28 @@ function spec() {
Sync.prototype.setBranchConnectedBackwards = function(fromHash, cb) { Sync.prototype.setBranchConnectedBackwards = function(fromHash, cb) {
var self = this, var self = this,
hashInterator = fromHash, hashInterator = fromHash,
lastHash = fromHash, lastHash = fromHash,
isMain; isMain;
async.doWhilst( async.doWhilst(
function(c) { function(c) {
self.setBlockMain(hashInterator, true, function (err) { self.setBlockMain(hashInterator, true, function(err) {
if (err) return c(err); if (err) return c(err);
self.bDb.getPrev(hashInterator, function (err, val) { self.bDb.getPrev(hashInterator, function(err, val) {
if (err) return c(err); if (err) return c(err);
lastHash = hashInterator; lastHash = hashInterator;
hashInterator = val; hashInterator = val;
self.bDb.isMain(hashInterator, function (err, val) { self.bDb.isMain(hashInterator, function(err, val) {
isMain = val; isMain = val;
return c(); return c();
}); });
}); });
}); });
}, },
function() { return hashInterator && !isMain; }, function() {
return hashInterator && !isMain;
},
function(err) { function(err) {
console.log('\tFound yBlock:', hashInterator); console.log('\tFound yBlock:', hashInterator);
return cb(err, hashInterator, lastHash); return cb(err, hashInterator, lastHash);
@ -252,79 +257,39 @@ function spec() {
); );
}; };
Sync.prototype._handleBroadcast = function(hash, updatedTxs, updatedAddrs) { Sync.prototype._handleBroadcast = function() {
var self = this; var self = this;
console.log('broadcast:' + self.opts.shouldBroadcast);
};
if (self.opts.shouldBroadcast) {
if (hash) {
sockets.broadcastBlock(hash);
}
if (updatedTxs) { Sync.prototype.handleTxForAddress = function(data) {
updatedTxs.forEach(function(tx) { if (this.opts.shouldBroadcast) {
sockets.broadcastTx(tx); sockets.broadcastAddressTx(data.address, data.txid);
}); }
} };
if (updatedAddrs ) { Sync.prototype.handleNewTx = function(data) {
updatedAddrs.forEach(function(addr, txs){ if (this.opts.shouldBroadcast) {
txs.forEach(function(addr, t){ sockets.broadcastTx(data.txid);
sockets.broadcastAddressTx(addr, t); }
}); };
});
} Sync.prototype.handleNewBlock = function(data) {
if (this.opts.shouldBroadcast) {
sockets.broadcastBlock(data.blockid);
} }
}; };
Sync.prototype.storeTxs = function(txs, cb) { Sync.prototype.storeTxs = function(txs, cb) {
var self = this; var self = this;
self.txDb.createFromArray(txs, null, function(err) {
self.txDb.createFromArray(txs, null, function(err, updatedAddrs) {
if (err) return cb(err); if (err) return cb(err);
self._handleBroadcast(null, txs, updatedAddrs);
return cb(err); return cb(err);
}); });
}; };
// TODO. replace with
// Script.prototype.getAddrStrs if that one get merged in bitcore
Sync.prototype.getAddrStr = function(s) {
var self = this;
var addrStrs = [];
var type = s.classify();
var addr;
switch(type) {
case Script.TX_PUBKEY:
var chunk = s.captureOne();
addr = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk));
addrStrs = [ addr.toString() ];
break;
case Script.TX_PUBKEYHASH:
addr = new Address(self.network.addressPubkey, s.captureOne());
addrStrs = [ addr.toString() ];
break;
case Script.TX_SCRIPTHASH:
addr = new Address(self.network.addressScript, s.captureOne());
addrStrs = [ addr.toString() ];
break;
case Script.TX_MULTISIG:
var chunks = s.capture();
chunks.forEach(function(chunk) {
var a = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk));
addrStrs.push(a.toString());
});
break;
case Script.TX_UNKNOWN:
break;
}
return addrStrs;
};
return Sync; return Sync;
} }
module.defineClass(spec); module.defineClass(spec);

View File

@ -5,36 +5,55 @@ require('classtool');
function spec(b) { function spec(b) {
var superclass = b.superclass || require('events').EventEmitter;
// blockHash -> txid mapping // blockHash -> txid mapping
var IN_BLK_PREFIX = 'txb-'; //txb-<txid>-<block> => 1/0 (connected or not) var IN_BLK_PREFIX = 'txb-'; //txb-<txid>-<block> => 1/0 (connected or not)
// Only for orphan blocks // Only for orphan blocks
var FROM_BLK_PREFIX = 'tx-'; //tx-<block>-<txid> => 1 var FROM_BLK_PREFIX = 'tx-'; //tx-<block>-<txid> => 1
// to show tx outs // to show tx outs
var OUTS_PREFIX = 'txo-'; //txo-<txid>-<n> => [addr, btc_sat] var OUTS_PREFIX = 'txo-'; //txo-<txid>-<n> => [addr, btc_sat]
var SPEND_PREFIX = 'txs-'; //txs-<txid(out)>-<n(out)>-<txid(in)>-<n(in)> = ts var SPEND_PREFIX = 'txs-'; //txs-<txid(out)>-<n(out)>-<txid(in)>-<n(in)> = ts
// to sum up addr balance (only outs, spends are gotten later) // to sum up addr balance (only outs, spends are gotten later)
var ADDR_PREFIX = 'txa-'; //txa-<addr>-<txid>-<n> => + btc_sat:ts var ADDR_PREFIX = 'txa-'; //txa-<addr>-<txid>-<n> => + btc_sat:ts
// TODO: use bitcore networks module // TODO: use bitcore networks module
var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b'; var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b';
var CONCURRENCY = 100; var CONCURRENCY = 100;
/** /**
* Module dependencies. * Module dependencies.
*/ */
var Rpc = b.rpc || require('./Rpc').class(), var Rpc = b.rpc || require('./Rpc').class(),
util = require('bitcore/util/util'), util = require('bitcore/util/util'),
levelup = require('levelup'), levelup = require('levelup'),
async = require('async'), async = require('async'),
config = require('../config/config'), config = require('../config/config'),
assert = require('assert'); assert = require('assert');
var db = b.db || levelup(config.leveldb + '/txs'); var db = b.db || levelup(config.leveldb + '/txs');
var Script = require('bitcore/Script').class();
// This is 0.1.2 => c++ version of base57-native
var base58 = require('base58-native');
var encodedData = require('bitcore/util/EncodedData').class({
// TODO: check why c++ implementation differs
//base58: base58
});
var versionedData = require('bitcore/util/VersionedData').class({
superclass: encodedData
});
var Address = require('bitcore/Address').class({
superclass: versionedData
});
var bitutil = require('bitcore/util/util');
var networks = require('bitcore/networks');
var TransactionDb = function() { var TransactionDb = function() {
TransactionDb.super(this, arguments);
this.network = config.network === 'testnet' ? networks.testnet : networks.livenet;
}; };
TransactionDb.superclass = superclass;
TransactionDb.prototype.close = function(cb) { TransactionDb.prototype.close = function(cb) {
db.close(cb); db.close(cb);
@ -43,7 +62,7 @@ function spec(b) {
TransactionDb.prototype.drop = function(cb) { TransactionDb.prototype.drop = function(cb) {
var path = config.leveldb + '/txs'; var path = config.leveldb + '/txs';
db.close(function() { db.close(function() {
require('leveldown').destroy(path, function () { require('leveldown').destroy(path, function() {
db = levelup(path); db = levelup(path);
return cb(); return cb();
}); });
@ -54,7 +73,7 @@ function spec(b) {
TransactionDb.prototype.has = function(txid, cb) { TransactionDb.prototype.has = function(txid, cb) {
var k = OUTS_PREFIX + txid; var k = OUTS_PREFIX + txid;
db.get(k, function (err,val) { db.get(k, function(err, val) {
var ret; var ret;
@ -81,26 +100,28 @@ function spec(b) {
txid: txid, txid: txid,
index: parseInt(index), index: parseInt(index),
}); });
} } else {
else { r.spendTxId = txid;
r.spendTxId = txid;
r.spendIndex = parseInt(index); r.spendIndex = parseInt(index);
r.spendTs = parseInt(ts); r.spendTs = parseInt(ts);
} }
}; };
// This is not used now // This is not used now
TransactionDb.prototype.fromTxId = function(txid, cb) { TransactionDb.prototype.fromTxId = function(txid, cb) {
var self = this; var self = this;
var k = OUTS_PREFIX + txid; var k = OUTS_PREFIX + txid;
var ret=[]; var ret = [];
var idx={}; var idx = {};
var i = 0; var i = 0;
// outs. // outs.
db.createReadStream({start: k, end: k + '~'}) db.createReadStream({
.on('data', function (data) { start: k,
end: k + '~'
})
.on('data', function(data) {
var k = data.key.split('-'); var k = data.key.split('-');
var v = data.value.split(':'); var v = data.value.split(':');
ret.push({ ret.push({
@ -108,28 +129,31 @@ function spec(b) {
value_sat: parseInt(v[1]), value_sat: parseInt(v[1]),
index: parseInt(k[2]), index: parseInt(k[2]),
}); });
idx[parseInt(k[2])]= i++; idx[parseInt(k[2])] = i++;
}) })
.on('error', function (err) { .on('error', function(err) {
return cb(err); return cb(err);
}) })
.on('end', function () { .on('end', function() {
var k = SPEND_PREFIX + txid; var k = SPEND_PREFIX + txid;
db.createReadStream({start: k, end: k + '~'}) db.createReadStream({
.on('data', function (data) { start: k,
end: k + '~'
})
.on('data', function(data) {
var k = data.key.split('-'); var k = data.key.split('-');
var j = idx[parseInt(k[2])]; var j = idx[parseInt(k[2])];
assert(typeof j !== 'undefined','Spent could not be stored: tx ' + txid + assert(typeof j !== 'undefined', 'Spent could not be stored: tx ' + txid +
'spend in TX:' + k[1] + ',' + k[2]+ ' j:' + j); 'spend in TX:' + k[1] + ',' + k[2] + ' j:' + j);
self._addSpendInfo(ret[j], k[3], k[4], data.value); self._addSpendInfo(ret[j], k[3], k[4], data.value);
}) })
.on('error', function (err) { .on('error', function(err) {
return cb(err); return cb(err);
}) })
.on('end', function (err) { .on('end', function(err) {
return cb(err, ret); return cb(err, ret);
}); });
}); });
@ -137,27 +161,30 @@ function spec(b) {
TransactionDb.prototype._fillSpend = function(info, cb) { TransactionDb.prototype._fillSpend = function(info, cb) {
var self = this; var self = this;
if (!info) return cb(); if (!info) return cb();
var k = SPEND_PREFIX + info.txid; var k = SPEND_PREFIX + info.txid;
db.createReadStream({start: k, end: k + '~'}) db.createReadStream({
.on('data', function (data) { start: k,
end: k + '~'
})
.on('data', function(data) {
var k = data.key.split('-'); var k = data.key.split('-');
self._addSpendInfo(info.vout[k[2]], k[3], k[4], data.value); self._addSpendInfo(info.vout[k[2]], k[3], k[4], data.value);
}) })
.on('error', function (err) { .on('error', function(err) {
return cb(err); return cb(err);
}) })
.on('end', function (err) { .on('end', function(err) {
return cb(err); return cb(err);
}); });
}; };
TransactionDb.prototype._fillOutpoints = function(info, cb) { TransactionDb.prototype._fillOutpoints = function(info, cb) {
var self = this; var self = this;
if (!info || info.isCoinBase) return cb(); if (!info || info.isCoinBase) return cb();
@ -165,64 +192,59 @@ function spec(b) {
var incompleteInputs = 0; var incompleteInputs = 0;
async.eachLimit(info.vin, CONCURRENCY, function(i, c_in) { async.eachLimit(info.vin, CONCURRENCY, function(i, c_in) {
self.fromTxIdN(i.txid, i.vout, function(err, ret) { self.fromTxIdN(i.txid, i.vout, function(err, ret) {
//console.log('[TransactionDb.js.154:ret:]',ret); //TODO //console.log('[TransactionDb.js.154:ret:]',ret); //TODO
if (!ret || !ret.addr || !ret.valueSat ) { if (!ret || !ret.addr || !ret.valueSat) {
console.log('Could not get TXouts in %s,%d from %s ', i.txid, i.vout, info.txid); console.log('Could not get TXouts in %s,%d from %s ', i.txid, i.vout, info.txid);
if (ret) i.unconfirmedInput = ret.unconfirmedInput; if (ret) i.unconfirmedInput = ret.unconfirmedInput;
incompleteInputs = 1; incompleteInputs = 1;
return c_in(); // error not scalated return c_in(); // error not scalated
} }
info.firstSeenTs = ret.spendTs; info.firstSeenTs = ret.spendTs;
i.unconfirmedInput = i.unconfirmedInput; i.unconfirmedInput = i.unconfirmedInput;
i.addr = ret.addr; i.addr = ret.addr;
i.valueSat = ret.valueSat; i.valueSat = ret.valueSat;
i.value = ret.valueSat / util.COIN; i.value = ret.valueSat / util.COIN;
// Double spend? // Double spend?
if ( ret.multipleSpendAttempt || if (ret.multipleSpendAttempt || !ret.spendTxId ||
!ret.spendTxId ||
(ret.spendTxId && ret.spendTxId !== info.txid) (ret.spendTxId && ret.spendTxId !== info.txid)
) { ) {
if (ret.multipleSpendAttempts ) { if (ret.multipleSpendAttempts) {
ret.multipleSpendAttempts.each(function(mul) { ret.multipleSpendAttempts.each(function(mul) {
if (mul.spendTxId !== info.txid) { if (mul.spendTxId !== info.txid) {
i.doubleSpendTxID = ret.spendTxId; i.doubleSpendTxID = ret.spendTxId;
i.doubleSpendIndex = ret.spendIndex; i.doubleSpendIndex = ret.spendIndex;
} }
}); });
} else if (!ret.spendTxId) {
i.dbError = 'Input spend not registered';
} else {
i.doubleSpendTxID = ret.spendTxId;
i.doubleSpendIndex = ret.spendIndex;
}
} else {
i.doubleSpendTxID = null;
} }
else if (!ret.spendTxId) {
i.dbError = 'Input spend not registered';
}
else {
i.doubleSpendTxID = ret.spendTxId;
i.doubleSpendIndex = ret.spendIndex;
}
}
else {
i.doubleSpendTxID = null;
}
valueIn += i.valueSat; valueIn += i.valueSat;
return c_in(); return c_in();
});
},
function() {
if (!incompleteInputs) {
info.valueIn = valueIn / util.COIN;
info.fees = (valueIn - parseInt(info.valueOut * util.COIN)) / util.COIN;
} else {
info.incompleteInputs = 1;
}
return cb();
}); });
},
function () {
if (! incompleteInputs ) {
info.valueIn = valueIn / util.COIN;
info.fees = (valueIn - parseInt(info.valueOut * util.COIN)) / util.COIN ;
}
else {
info.incompleteInputs = 1;
}
return cb();
});
}; };
TransactionDb.prototype._getInfo = function(txid, next) { TransactionDb.prototype._getInfo = function(txid, next) {
var self = this; var self = this;
Rpc.getRpcInfo(txid, function(err, info) { Rpc.getRpcInfo(txid, function(err, info) {
if (err) return next(err); if (err) return next(err);
@ -236,13 +258,16 @@ function spec(b) {
}; };
TransactionDb.prototype.fromIdWithInfo = function (txid, cb) { TransactionDb.prototype.fromIdWithInfo = function(txid, cb) {
var self = this; var self = this;
self._getInfo(txid, function(err, info) { self._getInfo(txid, function(err, info) {
if (err) return cb(err); if (err) return cb(err);
if (!info ) return cb(); if (!info) return cb();
return cb(err, {txid: txid, info: info} ); return cb(err, {
txid: txid,
info: info
});
}); });
}; };
@ -250,37 +275,42 @@ function spec(b) {
var self = this; var self = this;
var k = OUTS_PREFIX + txid + '-' + n; var k = OUTS_PREFIX + txid + '-' + n;
db.get(k, function (err,val) { db.get(k, function(err, val) {
if (!val || (err && err.notFound) ) { if (!val || (err && err.notFound)) {
return cb(null, { unconfirmedInput: 1} ); return cb(null, {
unconfirmedInput: 1
});
} }
var a = val.split(':'); var a = val.split(':');
var ret = { var ret = {
addr: a[0], addr: a[0],
valueSat: parseInt(a[1]), valueSat: parseInt(a[1]),
}; };
// Spend? // Spend?
var k = SPEND_PREFIX + txid + '-' + n; var k = SPEND_PREFIX + txid + '-' + n;
db.createReadStream({start: k, end: k + '~'}) db.createReadStream({
.on('data', function (data) { start: k,
var k = data.key.split('-'); end: k + '~'
self._addSpendInfo(ret, k[3], k[4], data.value);
}) })
.on('error', function (error) { .on('data', function(data) {
return cb(error); var k = data.key.split('-');
}) self._addSpendInfo(ret, k[3], k[4], data.value);
.on('end', function () { })
return cb(null,ret); .on('error', function(error) {
}); return cb(error);
})
.on('end', function() {
return cb(null, ret);
});
}); });
}; };
TransactionDb.prototype.fillConfirmations = function(o, cb) { TransactionDb.prototype.fillConfirmations = function(o, cb) {
var self = this; var self = this;
self.isConfirmed(o.txid, function(err,is) { self.isConfirmed(o.txid, function(err, is) {
if (err) return cb(err); if (err) return cb(err);
o.isConfirmed = is; o.isConfirmed = is;
@ -289,20 +319,19 @@ function spec(b) {
if (o.multipleSpendAttempts) { if (o.multipleSpendAttempts) {
async.each(o.multipleSpendAttempts, async.each(o.multipleSpendAttempts,
function (oi, e_c) { function(oi, e_c) {
self.isConfirmed(oi.spendTxId, function(err,is) { self.isConfirmed(oi.spendTxId, function(err, is) {
if (err) return; if (err) return;
if (is) { if (is) {
o.spendTxId = oi.spendTxId; o.spendTxId = oi.spendTxId;
o.index = oi.index; o.index = oi.index;
o.spendIsConfirmed = 1; o.spendIsConfirmed = 1;
} }
return e_c(); return e_c();
}); });
}, cb); }, cb);
} } else {
else { self.isConfirmed(o.spendTxId, function(err, is) {
self.isConfirmed(o.spendTxId, function(err,is) {
if (err) return cb(err); if (err) return cb(err);
o.spendIsConfirmed = is; o.spendIsConfirmed = is;
return cb(); return cb();
@ -311,14 +340,17 @@ function spec(b) {
}); });
}; };
TransactionDb.prototype.fromAddr = function(addr, cb) { TransactionDb.prototype.fromAddr = function(addr, cb) {
var self = this; var self = this;
var k = ADDR_PREFIX + addr; var k = ADDR_PREFIX + addr;
var ret=[]; var ret = [];
db.createReadStream({start: k, end: k + '~'}) db.createReadStream({
.on('data', function (data) { start: k,
end: k + '~'
})
.on('data', function(data) {
var k = data.key.split('-'); var k = data.key.split('-');
var v = data.value.split(':'); var v = data.value.split(':');
ret.push({ ret.push({
@ -328,32 +360,35 @@ function spec(b) {
ts: parseInt(v[1]), ts: parseInt(v[1]),
}); });
}) })
.on('error', function (err) { .on('error', function(err) {
return cb(err); return cb(err);
}) })
.on('end', function () { .on('end', function() {
async.each(ret, function(o, e_c) { async.each(ret, function(o, e_c) {
var k = SPEND_PREFIX + o.txid + '-' + o.index; var k = SPEND_PREFIX + o.txid + '-' + o.index;
db.createReadStream({start: k, end: k + '~'}) db.createReadStream({
.on('data', function (data) { start: k,
var k = data.key.split('-'); end: k + '~'
self._addSpendInfo(o, k[3], k[4], data.value);
}) })
.on('error', function (err) { .on('data', function(data) {
return e_c(err); var k = data.key.split('-');
}) self._addSpendInfo(o, k[3], k[4], data.value);
.on('end', function (err) { })
return e_c(err); .on('error', function(err) {
return e_c(err);
})
.on('end', function(err) {
return e_c(err);
});
},
function() {
async.each(ret, function(o, e_c) {
self.fillConfirmations(o, e_c);
}, function(err) {
return cb(err, ret);
}); });
},
function() {
async.each(ret, function(o, e_c){
self.fillConfirmations(o,e_c);
},function(err) {
return cb(err,ret);
}); });
});
}); });
}; };
@ -362,46 +397,102 @@ function spec(b) {
TransactionDb.prototype.removeFromTxId = function(txid, cb) { TransactionDb.prototype.removeFromTxId = function(txid, cb) {
async.series([ async.series([
function(c) {
db.createReadStream({ function(c) {
db.createReadStream({
start: OUTS_PREFIX + txid, start: OUTS_PREFIX + txid,
end: OUTS_PREFIX + txid + '~', end: OUTS_PREFIX + txid + '~',
}).pipe( }).pipe(
db.createWriteStream({type:'del'}) db.createWriteStream({
type: 'del'
})
).on('close', c); ).on('close', c);
}, },
function(c) { function(c) {
db.createReadStream({ db.createReadStream({
start: SPEND_PREFIX + txid, start: SPEND_PREFIX + txid,
end: SPEND_PREFIX + txid + '~' end: SPEND_PREFIX + txid + '~'
}) })
.pipe( .pipe(
db.createWriteStream({type:'del'}) db.createWriteStream({
type: 'del'
})
).on('close', c); ).on('close', c);
}], }
],
function(err) { function(err) {
cb(err); cb(err);
}); });
}; };
// TODO. replace with
// Script.prototype.getAddrStrs if that one get merged in bitcore
TransactionDb.prototype.getAddrStr = function(s) {
var self = this;
var addrStrs = [];
var type = s.classify();
var addr;
switch (type) {
case Script.TX_PUBKEY:
var chunk = s.captureOne();
addr = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk));
addrStrs.push(addr.toString());
break;
case Script.TX_PUBKEYHASH:
addr = new Address(self.network.addressPubkey, s.captureOne());
addrStrs.push(addr.toString());
break;
case Script.TX_SCRIPTHASH:
addr = new Address(self.network.addressScript, s.captureOne());
addrStrs.push(addr.toString());
break;
case Script.TX_MULTISIG:
var chunks = s.capture();
chunks.forEach(function(chunk) {
var a = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk));
addrStrs.push(a.toString());
});
break;
case Script.TX_UNKNOWN:
break;
}
return addrStrs;
};
TransactionDb.prototype.adaptTxObject = function(txInfo) { TransactionDb.prototype.adaptTxObject = function(txInfo) {
var self = this;
// adapt bitcore TX object to bitcoind JSON response // adapt bitcore TX object to bitcoind JSON response
txInfo.txid = txInfo.hash; txInfo.txid = txInfo.hash;
var count = 0;
txInfo.vin = txInfo.in.map(function (txin) {
var i = {};
var to=0;
var tx = txInfo;
tx.outs.forEach( function(o) {
var s = new Script(o.s);
var addrs = self.getAddrStr(s);
// support only for p2pubkey p2pubkeyhash and p2sh
if (addrs.length === 1) {
tx.out[to].addrStr = addrs[0];
tx.out[to].n = to;
}
to++;
});
var count = 0;
txInfo.vin = txInfo.in.map(function(txin) {
var i = {};
if (txin.coinbase) { if (txin.coinbase) {
txInfo.isCoinBase = true; txInfo.isCoinBase = true;
} } else {
else { i.txid = txin.prev_out.hash;
i.txid= txin.prev_out.hash; i.vout = txin.prev_out.n;
i.vout= txin.prev_out.n;
} }
i.n = count++; i.n = count++;
return i; return i;
@ -409,13 +500,13 @@ function spec(b) {
count = 0; count = 0;
txInfo.vout = txInfo.out.map(function (txout) { txInfo.vout = txInfo.out.map(function(txout) {
var o = {}; var o = {};
o.value = txout.value; o.value = txout.value;
o.n = count++; o.n = count++;
if (txout.addrStr){ if (txout.addrStr) {
o.scriptPubKey = {}; o.scriptPubKey = {};
o.scriptPubKey.addresses = [txout.addrStr]; o.scriptPubKey.addresses = [txout.addrStr];
} }
@ -427,26 +518,26 @@ function spec(b) {
TransactionDb.prototype.add = function(tx, blockhash, cb) { TransactionDb.prototype.add = function(tx, blockhash, cb) {
var self = this; var self = this;
var addrs = []; var addrs = [];
if (tx.hash) self.adaptTxObject(tx); if (tx.hash) self.adaptTxObject(tx);
var ts = tx.time; var ts = tx.time;
async.series([ async.series([
// Input Outpoints (mark them as spended) // Input Outpoints (mark them as spent)
function(p_c) { function(p_c) {
if (tx.isCoinBase) return p_c(); if (tx.isCoinBase) return p_c();
async.forEachLimit(tx.vin, CONCURRENCY, async.forEachLimit(tx.vin, CONCURRENCY,
function(i, next_out) { function(i, next_out) {
db.batch() db.batch()
.put( SPEND_PREFIX + i.txid + '-' + i.vout + '-' + tx.txid + '-' + i.n, .put(SPEND_PREFIX + i.txid + '-' + i.vout + '-' + tx.txid + '-' + i.n,
ts || 0) ts || 0)
.write(next_out); .write(next_out);
}, },
function (err) { function(err) {
return p_c(err); return p_c(err);
}); });
}, },
// Parse Outputs // Parse Outputs
function(p_c) { function(p_c) {
@ -454,50 +545,60 @@ function spec(b) {
function(o, next_out) { function(o, next_out) {
if (o.value && o.scriptPubKey && if (o.value && o.scriptPubKey &&
o.scriptPubKey.addresses && o.scriptPubKey.addresses &&
o.scriptPubKey.addresses[0] && o.scriptPubKey.addresses[0] && !o.scriptPubKey.addresses[1] // TODO : not supported
! o.scriptPubKey.addresses[1] // TODO : not supported ) {
){ var addr = o.scriptPubKey.addresses[0];
// This is only to broadcast (WIP) var sat = Math.round(o.value * util.COIN);
if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) {
addrs.push(o.scriptPubKey.addresses[0]); if (addrs.indexOf(addr) === -1) {
addrs.push(addr);
} }
var addr = o.scriptPubKey.addresses[0];
var sat = Math.round(o.value * util.COIN);
// existed? // existed?
var k = OUTS_PREFIX + tx.txid + '-' + o.n; var k = OUTS_PREFIX + tx.txid + '-' + o.n;
db.get(k, function(err) { db.get(k, function(err) {
if (err && err.notFound) { if (err && err.notFound) {
db.batch() db.batch()
.put( k, addr + ':' + sat) .put(k, addr + ':' + sat)
.put( ADDR_PREFIX + addr + '-' + tx.txid + '-' + o.n, sat+':'+ts) .put(ADDR_PREFIX + addr + '-' + tx.txid + '-' + o.n, sat + ':' + ts)
.write(next_out); .write(next_out);
} } else {
else {
return next_out(); return next_out();
} }
}); });
} } else {
else {
//console.log ('WARN in TX: %s could not parse OUTPUT %d', tx.txid, o.n);
return next_out(); return next_out();
} }
}, },
function (err) { function(err) {
if (err) { if (err) {
console.log('ERR at TX %s: %s', tx.txid, err); console.log('ERR at TX %s: %s', tx.txid, err);
return cb(err); return cb(err);
} }
return p_c(); return p_c();
}); });
}, },
function (p_c) { function(p_c) {
if (!blockhash) return p_c(); if (!blockhash) {
return self.setConfirmation(tx.txid,blockhash, true, p_c); return p_c();
}
return self.setConfirmation(tx.txid, blockhash, true, p_c);
}, },
], function(err) { ], function(err) {
return cb(err, addrs); if (addrs.length > 0 && !blockhash) {
// only emit if we are processing a single tx (not from a block)
addrs.forEach(function(addr) {
self.emit('tx_for_address', {
address: addr,
txid: tx.txid
});
});
}
self.emit('new_tx', {
txid: tx.txid
});
return cb(err);
}); });
}; };
@ -510,7 +611,7 @@ function spec(b) {
db.batch() db.batch()
.put(IN_BLK_PREFIX + txId + '-' + blockHash, confirmed) .put(IN_BLK_PREFIX + txId + '-' + blockHash, confirmed)
.put(FROM_BLK_PREFIX + blockHash + '-' + txId, 1) .put(FROM_BLK_PREFIX + blockHash + '-' + txId, 1)
.write(c); .write(c);
}; };
@ -520,15 +621,18 @@ function spec(b) {
var k = IN_BLK_PREFIX + txId; var k = IN_BLK_PREFIX + txId;
var ret = false; var ret = false;
db.createReadStream({start: k, end: k + '~'}) db.createReadStream({
.on('data', function (data) { start: k,
end: k + '~'
})
.on('data', function(data) {
if (data.value === '1') ret = true; if (data.value === '1') ret = true;
}) })
.on('error', function (err) { .on('error', function(err) {
return c(err); return c(err);
}) })
.on('end', function (err) { .on('end', function(err) {
return c(err,ret); return c(err, ret);
}); });
}; };
@ -539,21 +643,24 @@ function spec(b) {
var k = FROM_BLK_PREFIX + hash; var k = FROM_BLK_PREFIX + hash;
var k2 = IN_BLK_PREFIX; var k2 = IN_BLK_PREFIX;
// This is slow, but prevent us to create a new block->tx index. // This is slow, but prevent us to create a new block->tx index.
db.createReadStream({start: k, end: k + '~'}) db.createReadStream({
.on('data', function (data) { start: k,
var ks = data.key.split('-'); end: k + '~'
toChange.push({ })
key: k2 + ks[2] + '-' + ks[1], .on('data', function(data) {
type: 'put', var ks = data.key.split('-');
value: isMain?1:0, toChange.push({
key: k2 + ks[2] + '-' + ks[1],
type: 'put',
value: isMain ? 1 : 0,
}); });
}) })
.on('error', function (err) { .on('error', function(err) {
return cb(err); return cb(err);
}) })
.on('end', function (err) { .on('end', function(err) {
if (err) return cb(err); if (err) return cb(err);
console.log('\t%s %d Txs', isMain?'Confirming':'Invalidating',toChange.length); console.log('\t%s %d Txs', isMain ? 'Confirming' : 'Invalidating', toChange.length);
db.batch(toChange, cb); db.batch(toChange, cb);
}); });
}; };
@ -561,31 +668,25 @@ function spec(b) {
// txs can be a [hashes] or [txObjects] // txs can be a [hashes] or [txObjects]
TransactionDb.prototype.createFromArray = function(txs, blockHash, next) { TransactionDb.prototype.createFromArray = function(txs, blockHash, next) {
var self = this; var self = this;
if (!txs) return next(); if (!txs) return next();
var updatedAddrs = []; // TODO
async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) { async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) {
if (typeof t === 'string') { if (typeof t === 'string') {
// TODO: parse it from networks.genesisTX?
if (t === genesisTXID) return each_cb();
// Is it from genesis block? (testnet==livenet) Rpc.getRpcInfo(t, function(err, inInfo) {
// TODO: parse it from networks.genesisTX? if (!inInfo) return each_cb(err);
if (t === genesisTXID) return each_cb();
Rpc.getRpcInfo(t, function(err, inInfo) { return self.add(inInfo, blockHash, each_cb);
if (!inInfo) return each_cb(err); });
} else {
return self.add(inInfo, blockHash, each_cb); return self.add(t, blockHash, each_cb);
}); }
} },
else { function(err) {
return self.add(t, blockHash, each_cb); return next(err);
} });
},
function(err) {
return next(err, updatedAddrs);
});
}; };
@ -598,17 +699,17 @@ function spec(b) {
TransactionDb.prototype.setOrphan = function(blockHash, next) { TransactionDb.prototype.setOrphan = function(blockHash, next) {
// var self = this; // var self = this;
//Get Txs //Get Txs
// TODO // TODO
//Mark Tx's output as fromOrphan //Mark Tx's output as fromOrphan
//Mark Tx's outpoiunt as fromOrphan. Undo spents //Mark Tx's outpoiunt as fromOrphan. Undo spents
return next(); return next();
}; };
return TransactionDb; return TransactionDb;
} }
module.defineClass(spec); module.defineClass(spec);