diff --git a/.gitignore b/.gitignore index e322bd9..daa3529 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +# from https://github.com/github/gitignore/blob/master/Node.gitignore lib-cov *.seed *.log @@ -11,6 +12,15 @@ lib-cov pids logs results +build + +node_modules + +# extras +*.swp +*~ +.project +peerdb.json npm-debug.log node_modules diff --git a/Gruntfile.js b/Gruntfile.js index 743d984..ab534a1 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -39,6 +39,13 @@ module.exports = function(grunt) { } } }, + mochaTest: { + options: { + reporter: 'spec', + }, + src: ['test/*.js'] + }, + nodemon: { dev: { options: { @@ -61,13 +68,6 @@ module.exports = function(grunt) { logConcurrentOutput: true } }, - mochaTest: { - options: { - reporter: 'spec', - require: 'server.js' - }, - src: ['test/*.js'] - }, env: { test: { NODE_ENV: 'test' @@ -87,7 +87,7 @@ module.exports = function(grunt) { grunt.option('force', true); //Default task(s). - grunt.registerTask('default', ['jshint', 'concurrent']); + grunt.registerTask('default', ['jshint','concurrent']); //Test task. grunt.registerTask('test', ['env:test', 'mochaTest']); diff --git a/HeaderDB.js b/HeaderDB.js new file mode 100644 index 0000000..a751cd4 --- /dev/null +++ b/HeaderDB.js @@ -0,0 +1,213 @@ +require('classtool'); + +function ClassSpec(b) { + var assert = require('assert'); + var fs = require('fs'); + var Block = require('bitcore/Block').class(); + var Deserialize = require('bitcore/Deserialize'); + var Parser = require('bitcore/util/BinaryParser').class(); + var coinUtil = require('bitcore/util/util'); + + function HeaderDB(b) { + this.network = b.network; + this.fd = null; + this.blocks = {}; + this.byHeight = []; + this.bestBlock = null; + this.cached_size = 0; + } + + HeaderDB.prototype.size = function() { + this.cached_size = Object.keys(this.blocks).length; + return this.cached_size; + }; + + HeaderDB.prototype.locator = function(block) { + if (!block) + block = this.bestBlock; + + var step = 1; + var start = 0; + var loc = []; + // see https://en.bitcoin.it/wiki/Protocol_specification#getblocks + for (var i = block.height; i > 0; i -= step, ++start) { + if (start >= 10) + step *= 2; + loc.push(this.byHeight[i]); + } + assert.equal(this.byHeight[0].toString(), + this.network.genesisBlock.hash.toString()); + loc.push(this.byHeight[0]); + console.log("Requesting more headers. Current height: " + block.height ); + return loc; + }; + + HeaderDB.prototype.add = function(block) { + var hash = block.calcHash(); + block.hash = hash; + var curWork = Deserialize.intFromCompact(block.bits); + + if (hash in this.blocks) { + var old = this.blocks[hash]; + throw new Error("duplicate block (was at height " + old.height + ")"); + } + + var bestChain = false; + + var reorg = { + oldBest: null, + conn: 0, + disconn: 0, + }; + + if (this.cached_size == 0) { + if (this.network.genesisBlock.hash.toString() != + hash.toString()) + throw new Error("Invalid genesis block"); + + block.height = 0; + block.work = curWork; + bestChain = true; + this.cached_size++; + } else { + var prevBlock = this.blocks[block.prev_hash]; + if (!prevBlock) + throw new Error("orphan block; prev not found"); + + block.height = prevBlock.height + 1; + block.work = prevBlock.work + curWork; + this.cached_size++; + + if (block.work > this.bestBlock.work) + bestChain = true; + } + + + // add to by-hash index + this.blocks[hash] = block; + + if (bestChain) { + var oldBest = this.bestBlock; + var newBest = block; + + reorg.oldBest = oldBest; + + // likely case: new best chain has greater height + if (!oldBest) { + while (newBest) { + newBest = this.blocks[newBest.prev_hash]; + reorg.conn++; + } + } else { + while (newBest && + (newBest.height > oldBest.height)) { + newBest = this.blocks[newBest.prev_hash]; + reorg.conn++; + } + } + + // unlikely: old best chain has greater height + while (oldBest && newBest && + (oldBest.height > newBest.height)) { + oldBest = this.blocks[oldBest.prev_hash]; + reorg.disconn++; + } + + // height matches, but still walking parallel + while (oldBest && newBest && (oldBest != newBest)) { + newBest = this.blocks[newBest.prev_hash]; + reorg.conn++; + + oldBest = this.blocks[oldBest.prev_hash]; + reorg.disconn++; + } + + var shuf = (reorg.conn > reorg.disconn) ? + reorg.conn : reorg.disconn; + + // reorg analyzed, updated best-chain pointer + this.bestBlock = block; + + // update by-height index + var ptr = block; + var updated = []; + for (var idx = block.height; + idx > (block.height - shuf); idx--) { + if (idx < 0) + break; + var update = [ idx, ptr ]; + updated.push(update); + ptr = this.blocks[ptr.prev_hash]; + } + + updated.reverse(); + + for (var i = 0; i < updated.length; i++) { + var update = updated[i]; + var idx = update[0]; + var ptr = update[1]; + + if (idx < this.byHeight.length) + this.byHeight[idx] = ptr.hash; + else + this.byHeight.push(ptr.hash); + } + } + return reorg; + }; + + HeaderDB.prototype.addBuf = function(buf) { + var block = new Block(); + var parser = new Parser(buf); + block.parse(parser, true); + this.add(block); + + }; + + + HeaderDB.prototype.readFile = function(filename) { + var fd = fs.openSync(filename, 'r'); + var stats = fs.fstatSync(fd); + if (stats.size % 80 != 0) + throw new Error("Corrupted header db"); + + while (1) { + var buf = new Buffer(80); + var bread = fs.readSync(fd, buf, 0, 80, null); + if (bread < 80) + break; + + this.addBuf(buf); + + if ( ! ( this.cached_size % 1000 )) { + console.log("\tblock..." + this.cached_size ) ; + } + } + + fs.closeSync(fd); + }; + + HeaderDB.prototype.writeFile = function(filename) { + var block = this.bestBlock; + var data = []; + while (block) { + var s = block.getHeader(); + data.push(s); + block = this.blocks[block.prev_hash]; + } + + data.reverse(); + + var fd = fs.openSync(filename, 'w'); + + data.forEach(function(datum) { + fs.writeSync(fd, datum, 0, 80, null); + }); + + fs.closeSync(fd); + }; + + return HeaderDB; +}; +module.defineClass(ClassSpec); + diff --git a/README.md b/README.md index eb30b46..c986815 100644 --- a/README.md +++ b/README.md @@ -58,12 +58,21 @@ $ npm install -g bower Run sync from mystery repository: $ utils/sync.js + +check utils/sync.js --help for options. + ### Blocks ``` /block/[:hash] /block/00000000a967199a2fad0877433c93df785a8d8ce062e5f9b451cd1397bdbf62 ``` +### Transactions +``` + /tx/[:txid] + /tx/525de308971eabd941b139f46c7198b5af9479325c2395db7f2fb5ae8562556c +``` + diff --git a/Sync.js b/Sync.js new file mode 100644 index 0000000..ae7d23e --- /dev/null +++ b/Sync.js @@ -0,0 +1,219 @@ +require('classtool'); + +function spec(b) { + var mongoose = require('mongoose'); + var util = require('util'); + + var RpcClient = require('bitcore/RpcClient').class(); + var networks = require('bitcore/networks'); + var async = require('async'); + + var config = require('./config/config'); + var Block = require('./app/models/Block'); + var Transaction=require('./app/models/Transaction'); + + function Sync(config) { + this.network = config.networkName == 'testnet' ? networks.testnet : networks.livenet; + } + + var progress_bar = function(string, current, total) { + console.log( util.format("\t%s %d/%d [%d%%]", + string, current, total, parseInt(100 * current/total)) + ); + } + + Sync.prototype.getNextBlock = function (blockHash,cb) { + var that = this; + + if ( !blockHash ) { + return cb(); + } + + this.rpc.getBlock(blockHash, function(err, blockInfo) { + if (err) return cb(err); + + if ( ! ( blockInfo.result.height % 1000) ) { + var h = blockInfo.result.height, + d = blockInfo.result.confirmations; + progress_bar('height', h, h + d); + } + + Block.create( blockInfo.result, function(err, inBlock) { + + // E11000 => already exists + if (err && ! err.toString().match(/E11000/)) { + return cb(err); + } + + if (inBlock) { + inBlock.explodeTransactions(function (err) { + return that.getNextBlock(blockInfo.result.nextblockhash, cb); + }); + } + else + return that.getNextBlock(blockInfo.result.nextblockhash, cb); + }); + }); + } + + Sync.prototype.syncBlocks = function (reindex, cb) { + + var that = this; + var genesisHash = this.network.genesisBlock.hash.reverse().toString('hex'); + + console.log("Syncing Blocks..."); + if (reindex) + return this.getNextBlock(genesisHash, cb); + + + Block.findOne({}, {}, { sort: { 'confirmations' : 1 } }, function(err, block) { + if (err) return cb(err); + + var nextHash = + block && block.hash + ? block.hash + : genesisHash + ; + + + console.log('\tStarting at hash: ' + nextHash); + return that.getNextBlock(nextHash, cb); + }); + } + + + Sync.prototype.syncTXs = function (reindex, cb) { + + var that = this; + + console.log("Syncing TXs..."); + if (reindex) { + // TODO? + } + + + Transaction.find({blockhash: null}, function(err, txs) { + if (err) return cb(err); + + var read = 0; + var pull = 0; + var write = 0; + var total = txs.length; + console.log("\tneed to pull %d txs", total); + + if (!total) return cb(); + + async.each(txs, + function(tx, next){ + if (! tx.txid) { + console.log("NO TXID skipping...", tx); + return next(); + } + + if ( ! ( read++ % 1000) ) + progress_bar('read', read, total); + + + that.rpc.getRawTransaction(tx.txid, 1, function(err, txInfo) { + + if ( ! ( pull++ % 1000) ) + progress_bar('\tpull', pull, total); + + if (!err && txInfo) { + Transaction.update({txid: tx.txid}, txInfo.result, function(err) { + if (err) return next(err); + + if ( ! ( write++ % 1000) ) + progress_bar('\t\twrite', write, total); + + return next(); + }); + } + else return next(); + }); + }, + function(err){ + if (err) return cb(err); + return cb(err); + } + ); + }); + } + + Sync.prototype.start = function (opts, next) { + + + mongoose.connect(config.db); + var db = mongoose.connection; + this.rpc = new RpcClient(config.bitcoind); + var that = this; + + + db.on('error', console.error.bind(console, 'connection error:')); + + db.once('open', function (){ + + async.series([ + function(cb){ + if (opts.destroy) { + console.log("Deleting Blocks..."); + return Block.remove().exec(cb); + } + return cb(); + }, + function(cb){ + if (opts.destroy) { + console.log("Deleting TXs..."); + return Transaction.remove().exec(cb); + } + return cb(); + }, + function(cb) { + + if (! opts.skip_blocks) { + that.syncBlocks(opts.reindex, function(err) { + if (err) { + return cb(err); + + } + console.log("\tBlocks done."); + + return cb(); + }); + } + else { + return cb(); + } + }, + function(cb) { + if (! opts.skip_txs) { + that.syncTXs(opts.reindex, function(err) { + if (err) { + return cb(err); + + } + return cb(); + }); + } + else { + return cb(); + } + }, + function(cb) { + db.close(); + return cb(); + }, + ], + function(err) { + if (err) { + db.close(); + return next(err); + } + return next(); + }); + }); + } + return Sync; +}; +module.defineClass(spec); + diff --git a/app/controllers/transactions.js b/app/controllers/transactions.js new file mode 100644 index 0000000..8bf4635 --- /dev/null +++ b/app/controllers/transactions.js @@ -0,0 +1,33 @@ +'use strict'; + + +var Transaction = require('../models/Transaction'); +//, _ = require('lodash'); + + + +/** + * Module dependencies. + */ + + +/** + * Find block by hash ... + */ +exports.transaction = function(req, res, next, txid) { + Transaction.fromID(txid, function(err, tx) { + if (err) return next(err); + if (!tx) return next(new Error('Failed to load TX ' + txid)); + req.transaction = tx; + next(); + }); +}; + + +/** + * Show block + */ +exports.show = function(req, res) { + res.jsonp(req.transaction); +}; + diff --git a/app/models/Block.js b/app/models/Block.js index de89280..63ef58d 100644 --- a/app/models/Block.js +++ b/app/models/Block.js @@ -3,9 +3,11 @@ /** * Module dependencies. */ -var mongoose = require('mongoose'), - Schema = mongoose.Schema; +var mongoose = require('mongoose'), + Schema = mongoose.Schema; +var async = require('async'); +var Transaction = require('./Transaction'); /** * Block Schema @@ -38,6 +40,32 @@ var BlockSchema = new Schema({ }, }); +BlockSchema.methods.explodeTransactions = function(next) { + + // console.log('exploding %s', this.hash, typeof this.tx); + + async.forEach( this.tx, + function(tx, callback) { + // console.log('procesing TX %s', tx); + Transaction.create({ txid: tx }, function(err) { + if (err && ! err.toString().match(/E11000/)) { + return callback(); + } + if (err) { + + return callback(err); + } + return callback(); + + }); + }, + function(err) { + if (err) return next(err); + return next(); + } + ); +}; + /** * Validations */ diff --git a/app/models/Transaction.js b/app/models/Transaction.js new file mode 100644 index 0000000..566a480 --- /dev/null +++ b/app/models/Transaction.js @@ -0,0 +1,64 @@ +'use strict'; + +/** + * Module dependencies. + */ +var mongoose = require('mongoose'), + Schema = mongoose.Schema; + + +/** + */ +var TransactionSchema = new Schema({ + txid: { + type: String, + index: true, + unique: true, + }, + version: Number, + locktime: Number, + vin: { + type: Array, + default: [], + }, + vout: { + type: Array, + default: [], + }, + blockhash: { + type: String, + index: true, + default: null, + }, + confirmations: Number, + time: Number, + blocktime: Number, +}); + +/** + * Statics + */ + +TransactionSchema.statics.load = function(id, cb) { + this.findOne({ + _id: id + }).exec(cb); +}; + + +TransactionSchema.statics.fromID = function(txid, cb) { + this.findOne({ + txid: txid, + }).exec(cb); +}; + +/* + * virtual + */ + +// ugly? new object every call? +TransactionSchema.virtual('date').get(function () { + return new Date(this.time); +}); + +module.exports = mongoose.model('Transaction', TransactionSchema); diff --git a/config/routes.js b/config/routes.js index 3da0264..03378b6 100644 --- a/config/routes.js +++ b/config/routes.js @@ -8,10 +8,13 @@ module.exports = function(app) { app.get('/last_blocks', index.all); //Block routes - var blocks = require('../app/controllers/blocks'); app.get('/block/:blockHash', blocks.show); - app.param('blockHash', blocks.block); + + var transactions = require('../app/controllers/transactions'); + app.get('/tx/:txid', transactions.show); + + app.param('txid', transactions.transaction); }; diff --git a/p2p.js b/p2p.js new file mode 100755 index 0000000..f7474a9 --- /dev/null +++ b/p2p.js @@ -0,0 +1,220 @@ +var fs = require('fs'); +var HeaderDB = require('./HeaderDB').class(); +var Block = require('bitcore/Block').class(); +var CoinConst = require('bitcore/const'); +var coinUtil = require('bitcore/util/util'); +var networks = require('bitcore/networks'); +var Parser = require('bitcore/util/BinaryParser').class(); + +var peerdb_fn = 'peerdb.json'; + +var peerdb = undefined; +var hdrdb = undefined; +var network = networks.testnet; +var config = { + network : network.name +}; +var PeerManager = require('bitcore/PeerManager').createClass({ + config : config +}); +var Peer = require('bitcore/Peer').class(); + +var syncState = 'init'; + +function peerdb_load() { + try { + peerdb = JSON.parse(fs.readFileSync(peerdb_fn)); + } catch (d) { + console.warn('Unable to read peer db', peerdb_fn, 'creating new one.'); + peerdb = [ { + ipv4 : '127.0.0.1', + port : 18333 + }, ]; + + fs.writeFileSync(peerdb_fn, JSON.stringify(peerdb)); + } +} + +function hdrdb_load() +{ + hdrdb = new HeaderDB({network: network}); +} + +function get_more_headers(info) { + var conn = info.conn; + var loc = hdrdb.locator(); + conn.sendGetHeaders(loc, coinUtil.NULL_HASH); +} + +function add_header(info, block) { + var hashStr = coinUtil.formatHashFull(block.calcHash()); + + try { + hdrdb.add(block); + } catch (e) { + return; + } +} + +function handle_headers(info) { + console.log("handle headers"); + var conn = info.conn; + var headers = info.message.headers; + + headers.forEach(function(hdr) { + add_header(info, hdr); + }); + + // We persist the header DB after each batch + //hdrdb.writeFile(hdrdb_fn); + + // Only one request per batch of headers we receive. + get_more_headers(info); +} + +function handle_block(info) { + console.log("handle block") + var block = info.message.block; + add_header(info, block); + + if (syncState === 'init') { + syncState = 'headers'; + get_more_headers(info); + } +} + +function handle_verack(info) { + var inv = { + type : CoinConst.MSG.BLOCK, + hash : network.genesisBlock.hash, + }; + var invs = [ inv ]; + console.log('p2psync: Asking for the genesis block'); + + // Asks for the genesis block + info.conn.sendGetData(invs); +} + +function handle_connected(data) { + var peerman = data.pm; + var peers_n = peerman.peers.length; + console.log('p2psync: Connected to ' + peers_n + ' peer' + + (peers_n != 1 ? 's' : '')); +} + +function p2psync() { + var peerman = new PeerManager(); + + peerdb.forEach(function(datum) { + var peer = new Peer(datum.ipv4, datum.port); + peerman.addPeer(peer); + }); + + peerman.on('connection', function(conn) { + conn.on('verack', handle_verack); + conn.on('block', handle_block); + conn.on('headers', handle_headers); + }); + peerman.on('connect', handle_connected); + + peerman.start(); +} + +function filesync_block_buf(blkdir, fn, buf) { + var parser = new Parser(buf); + var block = new Block(); + block.parse(parser, true); + + var hashStr = coinUtil.formatHashFull(block.calcHash()); + + try { + hdrdb.add(block); + } catch (e) { + var height = hdrdb.size(); + console.log('HeaderDB failed adding block #' + height + ', ' + hashStr); + console.log(' Reason: ' + e); + return; + } + + var height = hdrdb.size() - 1; + if ((height % 1000) == 0) + console.log('HeaderDB added block #' + height + ', ' + hashStr); +} + +function filesync_open_cb(err, fd, blkdir, fn) { + if (err) + throw err; + + var hdrbuf = new Buffer(4 * 2); + while (1) { + // read 2x 32-bit header + var bread = fs.readSync(fd, hdrbuf, 0, 4 * 2, null); + if (bread < (4 * 2)) { + console.log('Short read/EOF, ending scan of ' + fn); + break; + } + + // check magic matches + var magic = hdrbuf.slice(0, 4); + if (magic.toString() != network.magic.toString()) { + console.log('Invalid network magic, ending scan of ' + fn); + break; + } + + // block size + var blkSize = hdrbuf.readUInt32LE(4); + if (blkSize > (1 * 1024 * 1024)) + throw new Error('Invalid block size ' + blkSize); + + // read raw block data + var blkBuf = new Buffer(blkSize); + bread = fs.readSync(fd, blkBuf, 0, blkSize, null); + if (bread != blkSize) + throw new Error('Failed to read block'); + + // process block + filesync_block_buf(blkdir, fn, blkBuf); + } + + fs.closeSync(fd); + + hdrdb.writeFile(hdrdb_fn); + console.log('Wrote header db'); +} + +function filesync_block_file(blkdir, fn) { + console.log('Scanning ' + fn + ' for block data.'); + + var pathname = blkdir + '/' + fn; + fs.open(pathname, 'r', function(err, fd) { + filesync_open_cb(err, fd, blkdir, fn); + }); +} + +function cmd_filesync_rd(err, files, blkdir) { + if (err) + throw err; + + files = files.sort(); + + var scanned = 0; + files.forEach(function(fn) { + var re = /^blk\d+\.dat$/; + if (fn.match(re)) { + filesync_block_file(blkdir, fn); + scanned++; + } + }); + + console.log('Scanned ' + scanned + ' of ' + files.length + ' files in ' + + blkdir); +} + +function main() { + peerdb_load(); + hdrdb_load(); + + p2psync(); +} + +main(); diff --git a/package.json b/package.json index 0fc2f26..1dd908e 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,9 @@ "postinstall": "node node_modules/bower/bin/bower install" }, "dependencies": { + "async": "*", "classtool": "*", + "commander": "*", "express": "~3.4.7", "jade": "~1.0.2", "mongoose": "~3.8.3", diff --git a/util/sync.js b/util/sync.js index 9aff445..22d1a53 100755 --- a/util/sync.js +++ b/util/sync.js @@ -1,89 +1,34 @@ #!/usr/bin/env node process.env.NODE_ENV = process.env.NODE_ENV || 'development'; + require('buffertools').extend(); -var util = require('util'); -var RpcClient = require('../node_modules/bitcore/RpcClient').class(); -var networks = require('../node_modules/bitcore/networks'); +var SYNC_VERSION = '0.1'; +var program = require('commander'); +var Sync = require('../Sync').class(); -var Block = require('../app/models/Block'); -var config = require('../config/config'); -var mongoose = require('mongoose'); +program + .version(SYNC_VERSION) + .option('-N --network [livenet]', 'Set bitcoin network [testnet]', 'testnet') + .option('-R --reindex', 'Force reindexing', '0') + .option('-D --destroy', 'Remove current DB', '0') + .option('--skip_blocks', 'Sync blocks') + .option('--skip_txs', 'Sync transactions') + .parse(process.argv); -var networkName = process.argv[2] || 'testnet'; -var network = networkName == 'testnet' ? networks.testnet : networks.livenet; +var sync = new Sync({ networkName: program.network }); +if (program.remove) { -function getNextBlock(blockHash,cb) { +} - if ( !blockHash ) { - console.log("done"); - return cb(); +sync.start( program, function(err){ + if (err) { + console.log("CRITICAL ERROR: ", err); + } + else { + console.log('Done!'); } - - rpc.getBlock(blockHash, function(err, blockInfo) { - if (err) { - return cb(err); - } - - if ( ! ( blockInfo.result.height % 1000) ) { - var h = blockInfo.result.height, - d = blockInfo.result.confirmations; - console.log( util.format("Height: %d/%d [%d%%]", h, d, 100*h/(h+d))); - } - - Block.create( blockInfo.result, function(err, inBlock) { - - // E11000 => already exists - if (err && ! err.toString().match(/E11000/)) { - return cb(err); - } - - return getNextBlock(blockInfo.result.nextblockhash, cb); - - }); - }); - -} - -function syncBlocks(network, cb) { - - Block.findOne({}, {}, { sort: { 'confirmations' : 1 } }, function(err, block) { - if (err) { - return cb(err); - } - - - - var nextHash = - block && block.hash - ? block.hash - : network.genesisBlock.hash.reverse().toString('hex') - ; - - - console.log('Starting at hash: ' + nextHash); - getNextBlock(nextHash, cb); - }); -} - - -mongoose.connect(config.db); - -var db = mongoose.connection; -var rpc = new RpcClient(config.bitcoind); - - -db.on('error', console.error.bind(console, 'connection error:')); -db.once('open', function callback () { - syncBlocks(network, function(err) { - if (err) { - console.log(err); - } - mongoose.connection.close(); - }); }); - -