refactor tx, update tests

This commit is contained in:
Matias Alejo Garcia 2014-01-27 12:51:11 -03:00
parent a40b87c4b6
commit 66e3dd977b
6 changed files with 378 additions and 384 deletions

View File

@ -1,14 +1,13 @@
'use strict';
/**
* Module dependencies.
*/
require('classtool');
var mongoose = require('mongoose'),
Schema = mongoose.Schema,
async = require('async'),
function spec() {
var async = require('async'),
RpcClient = require('bitcore/RpcClient').class(),
Transaction = require('bitcore/Transaction').class(),
BitcoreTransaction = require('bitcore/Transaction').class(),
Address = require('bitcore/Address').class(),
BitcoreBlock = require('bitcore/Block').class(),
networks = require('bitcore/networks'),
@ -18,183 +17,41 @@ var mongoose = require('mongoose'),
sockets = require('../controllers/socket.js'),
TransactionItem = require('./TransactionItem');
var CONCURRENCY = 5;
// TODO: use bitcore networks module
var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b';
/**
*/
var TransactionSchema = new Schema({
// For now we keep this as short as possible
// More fields will be propably added as we move
// forward with the UX
txid: {
type: String,
index: true,
unique: true,
},
/* TODO?
orphaned: {
type: Boolean,
default: false,
},
*/
time: Number,
});
/**
* Statics
*/
TransactionSchema.statics.load = function(id, cb) {
this.findOne({
_id: id
}).exec(cb);
};
var CONCURRENCY = 15;
TransactionSchema.statics.fromId = function(txid, cb) {
this.findOne({
txid: txid,
}).exec(cb);
};
function Transaction() {
this.txid = null;
}
TransactionSchema.statics.fromIdWithInfo = function(txid, cb) {
var That = this;
this.fromId(txid, function(err, tx) {
if (err) return cb(err);
if (!tx) {
// No in mongo...but maybe in bitcoind... lets query it
tx = new That();
Transaction.fromIdWithInfo = function (txid,cb) {
var tx = new Transaction();
tx.txid = txid;
tx.fillInfo(function(err, txInfo) {
if (err) return cb(err);
if (!txInfo) return cb();
tx.save(function(err) {
return cb(err,tx);
});
});
}
else {
tx.fillInfo(function(err) {
if (err) return cb(err);
if (! tx.info ) return cb();
return cb(err,tx);
});
}
});
};
};
TransactionSchema.statics.createFromArray = function(txs, time, next) {
var that = this;
if (!txs) return next();
var mongo_txs = [];
async.forEachLimit(txs, CONCURRENCY, function(txid, cb) {
Transaction.prototype.fillInfo = function(next) {
var self = this;
that.explodeTransactionItems( txid, time, function(err, addrs) {
Transaction.queryInfo(self.txid, function(err, info) {
if (err) return next(err);
if (addrs) {
async.each(addrs, function(addr){
sockets.broadcast_address_tx(addr, {'txid': txid});
self.info = info;
return next();
});
}
that.create({txid: txid, time: time}, function(err, new_tx) {
if (err && ! err.toString().match(/E11000/)) return cb(err);
if (new_tx) {
mongo_txs.push(new_tx);
}
return cb();
});
});
},
function(err) {
return next(err, mongo_txs);
});
};
};
TransactionSchema.statics.explodeTransactionItems = function(txid, time, cb) {
var addrs = [];
// Is it from genesis block? (testnet==livenet)
// TODO: parse it from networks.genesisTX
if (txid === genesisTXID) return cb();
this.queryInfo(txid, function(err, info) {
if (err || !info) return cb(err);
var index = 0;
info.vin.forEach( function(i){
i.n = index++;
});
async.forEachLimit(info.vin, CONCURRENCY, function(i, next_in) {
if (i.addr && i.value) {
TransactionItem.create({
txid : txid,
value_sat : -1 * i.valueSat,
addr : i.addr,
index : i.n,
ts : time,
}, next_in);
if (addrs.indexOf(i.addr) === -1) {
addrs.push(i.addr);
}
}
else {
if ( !i.coinbase ) {
console.log ('WARN in TX: %s: could not parse INPUT %d', txid, i.n);
}
return next_in();
}
},
function (err) {
if (err && !err.message.match(/E11000/) ) console.log (err);
async.forEachLimit(info.vout, CONCURRENCY, function(o, next_out) {
/*
* TODO Support multisigs
*/
if (o.value && o.scriptPubKey && o.scriptPubKey.addresses && o.scriptPubKey.addresses[0]) {
TransactionItem.create({
txid : txid,
value_sat : o.valueSat,
addr : o.scriptPubKey.addresses[0], // TODO: only address 0?
index : o.n,
ts : time,
}, next_out);
if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) {
addrs.push(o.scriptPubKey.addresses[0]);
}
}
else {
console.log ('WARN in TX: %s could not parse OUTPUT %d', txid, o.n);
return next_out();
}
},
function (err) {
if (err && ! err.toString().match(/E11000/)) return cb(err);
return cb(null, addrs);
});
});
});
};
TransactionSchema.statics.getOutpoints = function (tx, next) {
Transaction.getOutpoints = function (tx, next) {
if (tx.isCoinBase()) return next();
@ -209,7 +66,7 @@ TransactionSchema.statics.getOutpoints = function (tx, next) {
var c=0;
rpc.getRawTransaction(outHashBase64, function(err, txdata) {
var txin = new Transaction();
var txin = new BitcoreTransaction();
if (err || ! txdata.result) return cb( new Error('Input TX '+outHashBase64+' not found'));
var b = new Buffer(txdata.result,'hex');
@ -245,11 +102,11 @@ TransactionSchema.statics.getOutpoints = function (tx, next) {
return next(err);
}
);
};
};
TransactionSchema.statics.queryInfo = function(txid, cb) {
var that = this;
Transaction.queryInfo = function(txid, cb) {
var self = this;
var network = ( config.network === 'testnet') ? networks.testnet : networks.livenet ;
var rpc = new RpcClient(config.bitcoind);
@ -264,10 +121,10 @@ TransactionSchema.statics.queryInfo = function(txid, cb) {
// Transaction parsing
var b = new Buffer(txInfo.result.hex,'hex');
var tx = new Transaction();
var tx = new BitcoreTransaction();
tx.parse(b);
that.getOutpoints(tx, function(err) {
self.getOutpoints(tx, function(err) {
if (err) return cb(err);
// Copy TX relevant values to .info
@ -331,31 +188,13 @@ TransactionSchema.statics.queryInfo = function(txid, cb) {
}
info.size = b.length;
return cb(null, info);
});
});
};
};
return Transaction;
}
module.defineClass(spec);
TransactionSchema.methods.fillInfo = function(next) {
var that = this;
mongoose.model('Transaction', TransactionSchema).queryInfo(that.txid, function(err, info) {
if (err) return next(err);
that.info = info;
if (! that.info) {
return next();
}
else {
that.info.time = that.time;
return next();
}
});
};
module.exports = mongoose.model('Transaction', TransactionSchema);

View File

@ -4,8 +4,12 @@
* Module dependencies.
*/
var mongoose = require('mongoose'),
async = require('async'),
Transaction = require('./Transaction').class(),
Schema = mongoose.Schema;
var CONCURRENCY = 15;
var TransactionItemSchema = new Schema({
txid: String,
index: Number,
@ -20,6 +24,10 @@ var TransactionItemSchema = new Schema({
});
// TODO: use bitcore networks module
var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b';
// Compound index
TransactionItemSchema.index({txid: 1, index: 1, value_sat: 1}, {unique: true, dropDups: true});
@ -51,4 +59,125 @@ TransactionItemSchema.statics.fromTxId = function(txid, cb) {
});
};
TransactionItemSchema.statics.explodeTransactionItems = function(txid, cb) {
var Self = this;
var addrs = [];
var is_new = true;
// Is it from genesis block? (testnet==livenet)
// TODO: parse it from networks.genesisTX
if (txid === genesisTXID) return cb();
Transaction.queryInfo(txid, function(err, info) {
if (err || !info) return cb(err);
var index = 0;
info.vin.forEach( function(i){
i.n = index++;
});
async.forEachLimit(info.vin, CONCURRENCY, function(i, next_in) {
if (i.addr && i.value) {
Self.create({
txid : txid,
value_sat : -1 * i.valueSat,
addr : i.addr,
index : i.n,
ts : info.time,
}, next_in);
if (addrs.indexOf(i.addr) === -1) {
addrs.push(i.addr);
}
}
else {
if ( !i.coinbase ) {
console.log ('WARN in TX: %s: could not parse INPUT %d', txid, i.n);
}
return next_in();
}
},
function (err) {
if (err) {
if (err.message.match(/E11000/)) {
is_new = false;
}
else {
console.log('ERR at TX %s: %s', txid, err);
return cb(err);
}
}
// Parse Outputs
async.forEachLimit(info.vout, CONCURRENCY, function(o, next_out) {
/*
* TODO Support multisigs
*/
if (o.value && o.scriptPubKey && o.scriptPubKey.addresses && o.scriptPubKey.addresses[0]) {
Self.create({
txid : txid,
value_sat : o.valueSat,
addr : o.scriptPubKey.addresses[0], // TODO: only address 0?
index : o.n,
ts : info.time,
}, next_out);
if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) {
addrs.push(o.scriptPubKey.addresses[0]);
}
}
else {
console.log ('WARN in TX: %s could not parse OUTPUT %d', txid, o.n);
return next_out();
}
},
function (err) {
if (err) {
if (err.message.match(/E11000/)) {
is_new = false;
}
else {
console.log('ERR at TX %s: %s', txid, err);
return cb(err);
}
}
return cb(null, addrs, is_new);
});
});
});
};
TransactionItemSchema.statics.createFromArray = function(txs, next) {
if (!txs) return next();
var inserted_txs = [];
async.forEachLimit(txs, CONCURRENCY, function(txid, cb, was_new) {
TransactionItemSchema.explodeTransactionItems( txid, function(err, addrs) {
if (err) return next(err);
if (addrs) {
async.each(addrs, function(addr){
sockets.broadcast_address_tx(addr, {'txid': txid});
});
}
if (was_new) {
inserted_txs.push(txid);
}
return cb();
});
},
function(err) {
return next(err, inserted_txs);
});
};
module.exports = mongoose.model('TransactionItem', TransactionItemSchema);

View File

@ -62,7 +62,7 @@ function spec() {
if (this.verbose) {
console.log('[p2p_sync] Handle tx: ' + tx.hash);
}
this.sync.storeTxs([tx.hash], null, function(err) {
this.sync.storeTxs([tx.hash], function(err) {
if (err) {
console.log('[PeerSync.js.71:err:]',err); //TODO
console.log('[p2p_sync] Error in handle TX: ' + JSON.stringify(err));

View File

@ -91,12 +91,10 @@ function spec() {
};
Sync.prototype.storeTxs = function(txs, inTime, cb) {
Sync.prototype.storeTxs = function(txs, cb) {
var self = this;
var time = inTime ? inTime : Math.round(new Date().getTime() / 1000);
Transaction.createFromArray(txs, time, function(err, inserted_txs) {
Transaction.createFromArray(txs, function(err, inserted_txs) {
if (!err && inserted_txs && self.opts.broadcast_txs) {
inserted_txs.forEach(function(tx) {

View File

@ -1,20 +1,17 @@
#!/usr/bin/env node
'use strict';
process.env.NODE_ENV = process.env.NODE_ENV || 'development';
var
mongoose= require('mongoose'),
var mongoose= require('mongoose'),
assert = require('assert'),
config = require('../../config/config'),
Transaction = require('../../app/models/Transaction'),
TransactionItem = require('../../app/models/TransactionItem'),
fs = require('fs'),
util = require('util');
Transaction = require('../../app/models/Transaction').class(),
TransactionItem = require('../../app/models/TransactionItem');
var txItemsValid = JSON.parse(fs.readFileSync('test/model/txitems.json'));
mongoose.connection.on('error', function(err) { console.log(err); });
describe('Transaction', function(){
@ -122,43 +119,5 @@ describe('Transaction', function(){
});
});
txItemsValid.forEach( function(v) {
if (v.disabled) return;
it('test a exploding TX ' + v.txid, function(done) {
// Remove first
TransactionItem.remove({txid: v.txid}, function(err) {
var now = Math.round(new Date().getTime() / 1000);
Transaction.explodeTransactionItems(v.txid, now, function(err, tx) {
if (err) done(err);
TransactionItem
.fromTxId( v.txid, function(err, readItems) {
var unmatch={};
v.items.forEach(function(validItem){
unmatch[validItem.addr] =1;
});
v.items.forEach(function(validItem){
var readItem = readItems.shift();
assert.equal(readItem.addr,validItem.addr);
assert.equal(readItem.value_sat,validItem.value_sat);
assert.equal(readItem.index,validItem.index);
delete unmatch[validItem.addr];
});
var valid = util.inspect(v.items, { depth: null });
assert(!Object.keys(unmatch).length,
'\n\tUnmatchs:' + Object.keys(unmatch) + "\n\n" +valid + '\nvs.\n' + readItems);
done();
});
});
});
});
});
});

View File

@ -0,0 +1,69 @@
#!/usr/bin/env node
'use strict';
process.env.NODE_ENV = process.env.NODE_ENV || 'development';
var mongoose = require('mongoose'),
assert = require('assert'),
fs = require('fs'),
util = require('util'),
config = require('../../config/config'),
Transaction = require('../../app/models/Transaction').class(),
TransactionItem = require('../../app/models/TransactionItem');
var txItemsValid = JSON.parse(fs.readFileSync('test/model/txitems.json'));
mongoose.connection.on('error', function(err) { console.log(err); });
describe('TransactionItem', function(){
before(function(done) {
mongoose.connect(config.db);
done();
});
after(function(done) {
mongoose.connection.close();
done();
});
txItemsValid.forEach( function(v) {
if (v.disabled) return;
it('test a exploding TX ' + v.txid, function(done) {
// Remove first
TransactionItem.remove({txid: v.txid}, function(err) {
TransactionItem.explodeTransactionItems(v.txid, function(err, tx) {
if (err) done(err);
TransactionItem
.fromTxId( v.txid, function(err, readItems) {
var unmatch={};
v.items.forEach(function(validItem){
unmatch[validItem.addr] =1;
});
v.items.forEach(function(validItem){
var readItem = readItems.shift();
assert.equal(readItem.addr,validItem.addr);
assert.equal(readItem.value_sat,validItem.value_sat);
assert.equal(readItem.index,validItem.index);
delete unmatch[validItem.addr];
});
var valid = util.inspect(v.items, { depth: null });
assert(!Object.keys(unmatch).length,'\n\tUnmatchs:' + Object.keys(unmatch) + "\n\n" +valid + '\nvs.\n' + readItems);
done();
});
});
});
});
});
});