From c3e1250fdee8b28d01c909fb3b78dd4c216751b3 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Tue, 14 Jan 2014 18:09:45 -0300 Subject: [PATCH] limit concurrency --- app/models/Transaction.js | 7 ++++--- lib/Sync.js | 8 ++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/app/models/Transaction.js b/app/models/Transaction.js index 5bd351e7..0e71ec7c 100644 --- a/app/models/Transaction.js +++ b/app/models/Transaction.js @@ -17,6 +17,7 @@ var mongoose = require('mongoose'), config = require('../../config/config'), TransactionItem = require('./TransactionItem'); +var CONCURRENCY = 5; /** */ @@ -122,7 +123,7 @@ TransactionSchema.statics.explodeTransactionItems = function(txid, cb) { i.n = index++; }); - async.each(t.info.vin, function(i, next_in) { + async.forEachLimit(t.info.vin, CONCURRENCY, function(i, next_in) { if (i.addr && i.value) { //console.log("Creating IN %s %d", i.addr, i.valueSat); @@ -143,7 +144,7 @@ TransactionSchema.statics.explodeTransactionItems = function(txid, cb) { }, function (err) { if (err) console.log (err); - async.each(t.info.vout, function(o, next_out) { + async.forEachLimit(t.info.vout, CONCURRENCY, function(o, next_out) { /* * TODO Support multisigs @@ -180,7 +181,7 @@ TransactionSchema.methods.fillInputValues = function (tx, next) { var network = ( config.network === 'testnet') ? networks.testnet : networks.livenet ; var that = this; - async.each(tx.ins, function(i, cb) { + async.forEachLimit(tx.ins, CONCURRENCY, function(i, cb) { var outHash = i.getOutpointHash(); var outIndex = i.getOutpointIndex(); diff --git a/lib/Sync.js b/lib/Sync.js index a25fab2c..35c66ab1 100644 --- a/lib/Sync.js +++ b/lib/Sync.js @@ -14,6 +14,7 @@ function spec() { var Transaction = require('../app/models/Transaction'); var TransactionItem = require('../app/models/TransactionItem'); var sockets = require('../app/views/sockets/main.js'); + var CONCURRENCY = 1; function Sync(config) { this.tx_count =0; @@ -64,12 +65,14 @@ function spec() { var that=this; Transaction.createFromArray(txids, function(err, inserted_txs) { if (err) return cb(err); - async.each(inserted_txs, function(new_tx, next) { + async.forEachLimit(inserted_txs, CONCURRENCY, function(new_tx, next) { var txid = new_tx.txid; if (that.opts.broadcast_txs) { sockets.broadcast_tx(new_tx); } + + // This will trigger an RPC call Transaction.explodeTransactionItems( txid, function(err) { that.tx_count++; @@ -184,7 +187,8 @@ function spec() { if (!total) return cb(); - async.each(txs, function(tx, next) { + + async.forEachLimit(txs, CONCURRENCY, function(tx, next) { if (read++ % 1000 === 0) progress_bar('read', read, total); if (!tx.txid) {