Merge pull request #44 from matiu/bug/04sync

limit concurrency
This commit is contained in:
Mario Colque 2014-01-14 13:16:10 -08:00
commit fd90910fc5
2 changed files with 10 additions and 5 deletions

View File

@ -17,6 +17,7 @@ var mongoose = require('mongoose'),
config = require('../../config/config'), config = require('../../config/config'),
TransactionItem = require('./TransactionItem'); TransactionItem = require('./TransactionItem');
var CONCURRENCY = 5;
/** /**
*/ */
@ -122,7 +123,7 @@ TransactionSchema.statics.explodeTransactionItems = function(txid, cb) {
i.n = index++; i.n = index++;
}); });
async.each(t.info.vin, function(i, next_in) { async.forEachLimit(t.info.vin, CONCURRENCY, function(i, next_in) {
if (i.addr && i.value) { if (i.addr && i.value) {
//console.log("Creating IN %s %d", i.addr, i.valueSat); //console.log("Creating IN %s %d", i.addr, i.valueSat);
@ -143,7 +144,7 @@ TransactionSchema.statics.explodeTransactionItems = function(txid, cb) {
}, },
function (err) { function (err) {
if (err) console.log (err); if (err) console.log (err);
async.each(t.info.vout, function(o, next_out) { async.forEachLimit(t.info.vout, CONCURRENCY, function(o, next_out) {
/* /*
* TODO Support multisigs * TODO Support multisigs
@ -180,7 +181,7 @@ TransactionSchema.methods.fillInputValues = function (tx, next) {
var network = ( config.network === 'testnet') ? networks.testnet : networks.livenet ; var network = ( config.network === 'testnet') ? networks.testnet : networks.livenet ;
var that = this; var that = this;
async.each(tx.ins, function(i, cb) { async.forEachLimit(tx.ins, CONCURRENCY, function(i, cb) {
var outHash = i.getOutpointHash(); var outHash = i.getOutpointHash();
var outIndex = i.getOutpointIndex(); var outIndex = i.getOutpointIndex();

View File

@ -14,6 +14,7 @@ function spec() {
var Transaction = require('../app/models/Transaction'); var Transaction = require('../app/models/Transaction');
var TransactionItem = require('../app/models/TransactionItem'); var TransactionItem = require('../app/models/TransactionItem');
var sockets = require('../app/views/sockets/main.js'); var sockets = require('../app/views/sockets/main.js');
var CONCURRENCY = 1;
function Sync(config) { function Sync(config) {
this.tx_count =0; this.tx_count =0;
@ -64,12 +65,14 @@ function spec() {
var that=this; var that=this;
Transaction.createFromArray(txids, function(err, inserted_txs) { Transaction.createFromArray(txids, function(err, inserted_txs) {
if (err) return cb(err); if (err) return cb(err);
async.each(inserted_txs, function(new_tx, next) { async.forEachLimit(inserted_txs, CONCURRENCY, function(new_tx, next) {
var txid = new_tx.txid; var txid = new_tx.txid;
if (that.opts.broadcast_txs) { if (that.opts.broadcast_txs) {
sockets.broadcast_tx(new_tx); sockets.broadcast_tx(new_tx);
} }
// This will trigger an RPC call // This will trigger an RPC call
Transaction.explodeTransactionItems( txid, function(err) { Transaction.explodeTransactionItems( txid, function(err) {
that.tx_count++; that.tx_count++;
@ -184,7 +187,8 @@ function spec() {
if (!total) return cb(); if (!total) return cb();
async.each(txs, function(tx, next) {
async.forEachLimit(txs, CONCURRENCY, function(tx, next) {
if (read++ % 1000 === 0) progress_bar('read', read, total); if (read++ % 1000 === 0) progress_bar('read', read, total);
if (!tx.txid) { if (!tx.txid) {