From f1d2009418370ea958ed5e7a2461bfef89e5444c Mon Sep 17 00:00:00 2001 From: Yemel Jardi Date: Fri, 5 Dec 2014 11:39:30 -0300 Subject: [PATCH] Add transport/connection class --- index.js | 4 + lib/transport/connection.js | 575 +++++++++++++++++++++++++++++++++++ lib/util/buffer.js | 6 + test/transport/connection.js | 41 +++ 4 files changed, 626 insertions(+) create mode 100644 lib/transport/connection.js create mode 100644 test/transport/connection.js diff --git a/index.js b/index.js index 627909f01..acc743798 100644 --- a/index.js +++ b/index.js @@ -24,6 +24,10 @@ bitcore.util.bitcoin = require('./lib/util/bitcoin'); bitcore.util.buffer = require('./lib/util/buffer'); bitcore.util.js = require('./lib/util/js'); +// transport +bitcore.transport = {}; +bitcore.transport.Connection = require('./lib/transport/connection'); + // errors thrown by the library bitcore.errors = require('./lib/errors'); diff --git a/lib/transport/connection.js b/lib/transport/connection.js new file mode 100644 index 000000000..b9495d4ff --- /dev/null +++ b/lib/transport/connection.js @@ -0,0 +1,575 @@ +'use strict'; + +var MAX_RECEIVE_BUFFER = 10000000; +var PROTOCOL_VERSION = 70000; + +var Util = require('util'); +var Put = require('bufferput'); +var Buffers = require('buffers'); +var buffertools = require('buffertools'); + +// PATCH TODO: Remove (yemel) +Buffers.prototype.skip = function (i) { + if (i == 0) { + return; + } else if (i == this.length) { + this.buffers = []; + this.length = 0; + return; + } + var pos = this.pos(i); + this.buffers = this.buffers.slice(pos.buf); + this.buffers[0] = new Buffer(this.buffers[0].slice(pos.offset)); + this.length -= i; +}; + +var networks = require('../networks'); +var Block = require('../block'); +var Transaction = require('../transaction'); +var BufferUtil = require('../util/buffer'); +var BufferReader = require('../encoding/bufferreader'); +var Hash = require('../crypto/hash'); +var Random = require('../crypto/random'); +var nonce = Random.getPseudoRandomBuffer(8); +var EventEmitter = require('events').EventEmitter; + +var BIP0031_VERSION = 60000; + +function Connection(socket, peer, opts) { + this.config = opts || {}; + + this.network = this.config.network || networks.livenet; + this.socket = socket; + this.peer = peer; + + // check for socks5 proxy options and construct a proxied socket + if (this.config.proxy) { + var Socks5Client = require('socks5-client'); + this.socket = new Socks5Client(this.config.proxy.host, this.config.proxy.port); + } + + // A connection is considered "active" once we have received verack + this.active = false; + // The version incoming packages are interpreted as + this.recvVer = 0; + // The version outgoing packages are sent as + this.sendVer = 0; + // The (claimed) height of the remote peer's block chain + this.bestHeight = 0; + // Is this an inbound connection? + this.inbound = !!this.socket.server; + // Have we sent a getaddr on this connection? + this.getaddr = false; + + // Receive buffer + this.buffers = new Buffers(); + + // Starting 20 Feb 2012, Version 0.2 is obsolete + // This is the same behavior as the official client + if (new Date().getTime() > 1329696000000) { + this.recvVer = 209; + this.sendVer = 209; + } + + this.setupHandlers(); +} +Util.inherits(Connection, EventEmitter); + +Connection.prototype.open = function(callback) { + if (typeof callback === 'function') this.once('connect', callback); + this.socket.connect(this.peer.port, this.peer.host); + return this; +}; + +Connection.prototype.setupHandlers = function() { + this.socket.addListener('connect', this.handleConnect.bind(this)); + this.socket.addListener('error', this.handleError.bind(this)); + this.socket.addListener('end', this.handleDisconnect.bind(this)); + this.socket.addListener('data', (function(data) { + var dumpLen = 35; + console.debug('[' + this.peer + '] ' + + 'Recieved ' + data.length + ' bytes of data:'); + console.debug('... ' + buffertools.toHex(data.slice(0, dumpLen > data.length ? + data.length : dumpLen)) + + (data.length > dumpLen ? '...' : '')); + }).bind(this)); + this.socket.addListener('data', this.handleData.bind(this)); +}; + +Connection.prototype.handleConnect = function() { + if (!this.inbound) { + this.sendVersion(); + } + this.emit('connect', { + conn: this, + socket: this.socket, + peer: this.peer + }); +}; + +Connection.prototype.handleError = function(err) { + if (err.errno == 110 || err.errno == 'ETIMEDOUT') { + console.info('connection timed out for ' + this.peer); + } else if (err.errno == 111 || err.errno == 'ECONNREFUSED') { + console.info('connection refused for ' + this.peer); + } else { + console.warn('connection with ' + this.peer + ' ' + err.toString()); + } + this.emit('error', { + conn: this, + socket: this.socket, + peer: this.peer, + err: err + }); +}; + +Connection.prototype.handleDisconnect = function() { + this.emit('disconnect', { + conn: this, + socket: this.socket, + peer: this.peer + }); +}; + +Connection.prototype.handleMessage = function(message) { + if (!message) { + // Parser was unable to make sense of the message, drop it + return; + } + + try { + switch (message.command) { + case 'version': + // Did we connect to ourself? + if (buffertools.compare(nonce, message.nonce) === 0) { + this.socket.end(); + return; + } + + if (this.inbound) { + this.sendVersion(); + } + + if (message.version >= 209) { + this.sendMessage('verack', new Buffer([])); + } + this.sendVer = Math.min(message.version, PROTOCOL_VERSION); + if (message.version < 209) { + this.recvVer = Math.min(message.version, PROTOCOL_VERSION); + } else { + // We won't start expecting a checksum until after we've received + // the 'verack' message. + this.once('verack', (function() { + this.recvVer = message.version; + }).bind(this)); + } + this.bestHeight = message.start_height; + break; + + case 'verack': + this.recvVer = Math.min(message.version, PROTOCOL_VERSION); + this.active = true; + break; + + case 'ping': + if ('object' === typeof message.nonce) { + this.sendPong(message.nonce); + } + break; + } + } catch (e) { + console.err('Error while handling "' + message.command + '" message from ' + + this.peer + ':\n' + + (e.stack ? e.stack : e.toString())); + return; + } + this.emit(message.command, { + conn: this, + socket: this.socket, + peer: this.peer, + message: message + }); +}; + +Connection.prototype.sendPong = function(nonce) { + this.sendMessage('pong', nonce); +}; + +Connection.prototype.sendVersion = function() { + var subversion = '/BitcoinX:0.1/'; + + var put = new Put(); + put.word32le(PROTOCOL_VERSION); // version + put.word64le(1); // services + put.word64le(Math.round(new Date().getTime() / 1000)); // timestamp + put.pad(26); // addr_me + put.pad(26); // addr_you + put.put(nonce); + put.varint(subversion.length); + put.put(new Buffer(subversion, 'ascii')); + put.word32le(0); + + this.sendMessage('version', put.buffer()); +}; + +Connection.prototype.sendGetBlocks = function(starts, stop, wantHeaders) { + // Default value for stop is 0 to get as many blocks as possible (500) + stop = stop || BufferUtil.NULL_HASH; + + var put = new Put(); + + // https://en.bitcoin.it/wiki/Protocol_specification#getblocks + put.word32le(this.sendVer); + put.varint(starts.length); + + for (var i = 0; i < starts.length; i++) { + if (starts[i].length != 32) { + throw new Error('Invalid hash length'); + } + + put.put(starts[i]); + } + + var stopBuffer = new Buffer(stop, 'binary'); + if (stopBuffer.length != 32) { + throw new Error('Invalid hash length'); + } + + put.put(stopBuffer); + + var command = 'getblocks'; + if (wantHeaders) + command = 'getheaders'; + this.sendMessage(command, put.buffer()); +}; + +Connection.prototype.sendGetHeaders = function(starts, stop) { + this.sendGetBlocks(starts, stop, true); +}; + +Connection.prototype.sendGetData = function(invs) { + var put = new Put(); + put.varint(invs.length); + for (var i = 0; i < invs.length; i++) { + put.word32le(invs[i].type); + put.put(invs[i].hash); + } + this.sendMessage('getdata', put.buffer()); +}; + +Connection.prototype.sendGetAddr = function(invs) { + var put = new Put(); + this.sendMessage('getaddr', put.buffer()); +}; + +Connection.prototype.sendInv = function(data) { + if (!Array.isArray(data)) data = [data]; + var put = new Put(); + put.varint(data.length); + data.forEach(function(value) { + if (value instanceof Block) { + // Block + put.word32le(2); // MSG_BLOCK + } else { + // Transaction + put.word32le(1); // MSG_TX + } + put.put(value.getHash()); + }); + this.sendMessage('inv', put.buffer()); +}; + +Connection.prototype.sendHeaders = function(headers) { + var put = new Put(); + put.varint(headers.length); + headers.forEach(function(header) { + put.put(header); + + // Indicate 0 transactions + put.word8(0); + }); + this.sendMessage('headers', put.buffer()); +}; + +Connection.prototype.sendTx = function(tx) { + this.sendMessage('tx', tx.serialize()); +}; + +Connection.prototype.sendBlock = function(block, txs) { + var put = new Put(); + + // Block header + put.put(block.getHeader()); + + // List of transactions + put.varint(txs.length); + txs.forEach(function(tx) { + put.put(tx.serialize()); + }); + + this.sendMessage('block', put.buffer()); +}; + +Connection.prototype.sendMessage = function(command, payload) { + try { + var magic = this.network.magic; + var commandBuf = new Buffer(command, 'ascii'); + if (commandBuf.length > 12) throw 'Command name too long'; + + var checksum; + if (this.sendVer >= 209) { + checksum = Hash.sha256sha256(payload).slice(0, 4); + } else { + checksum = new Buffer([]); + } + + var message = new Put(); // -- HEADER -- + message.put(magic); // magic bytes + message.put(commandBuf); // command name + message.pad(12 - commandBuf.length); // zero-padded + message.word32le(payload.length); // payload length + message.put(checksum); // checksum + // -- BODY -- + message.put(payload); // payload data + + var buffer = message.buffer(); + + console.debug('[' + this.peer + '] ' + + 'Sending message ' + command + ' (' + payload.length + ' bytes)'); + + this.socket.write(buffer); + } catch (err) { + // TODO: We should catch this error one level higher in order to better + // determine how to react to it. For now though, ignoring it will do. + console.err('Error while sending message to peer ' + this.peer + ': ' + + (err.stack ? err.stack : err.toString())); + } +}; + +Connection.prototype.handleData = function(data) { + this.buffers.push(data); + + if (this.buffers.length > MAX_RECEIVE_BUFFER) { + console.err('Peer ' + this.peer + ' exceeded maxreceivebuffer, disconnecting.' + + (err.stack ? err.stack : err.toString())); + this.socket.destroy(); + return; + } + + this.processData(); +}; + +Connection.prototype.processData = function() { + // If there are less than 20 bytes there can't be a message yet. + if (this.buffers.length < 20) return; + + var magic = this.network.magic; + var i = 0; + for (;;) { + if (this.buffers.get(i) === magic[0] && + this.buffers.get(i + 1) === magic[1] && + this.buffers.get(i + 2) === magic[2] && + this.buffers.get(i + 3) === magic[3]) { + if (i !== 0) { + console.debug('[' + this.peer + '] ' + + 'Received ' + i + + ' bytes of inter-message garbage: '); + console.debug('... ' + this.buffers.slice(0, i)); + + this.buffers.skip(i); + } + break; + } + + if (i > (this.buffers.length - 4)) { + this.buffers.skip(i); + return; + } + i++; + } + + var payloadLen = (this.buffers.get(16)) + + (this.buffers.get(17) << 8) + + (this.buffers.get(18) << 16) + + (this.buffers.get(19) << 24); + + var startPos = (this.recvVer >= 209) ? 24 : 20; + var endPos = startPos + payloadLen; + + if (this.buffers.length < endPos) return; + + var command = this.buffers.slice(4, 16).toString('ascii').replace(/\0+$/, ''); + var payload = this.buffers.slice(startPos, endPos); + var checksum = (this.recvVer >= 209) ? this.buffers.slice(20, 24) : null; + + console.debug('[' + this.peer + '] ' + + 'Received message ' + command + + ' (' + payloadLen + ' bytes)'); + + if (checksum !== null) { + var checksumConfirm = Hash.sha256sha256(payload).slice(0, 4); + if (buffertools.compare(checksumConfirm, checksum) !== 0) { + console.err('[' + this.peer + '] ' + + 'Checksum failed', { + cmd: command, + expected: checksumConfirm.toString('hex'), + actual: checksum.toString('hex') + }); + return; + } + } + + var message; + try { + message = this.parseMessage(command, payload); + } catch (e) { + console.err('Error while parsing message ' + command + ' from ' + + this.peer + ':\n' + + (e.stack ? e.stack : e.toString())); + } + + if (message) { + this.handleMessage(message); + } + + this.buffers.skip(endPos); + this.processData(); +}; + +Connection.prototype.parseMessage = function(command, payload) { + var parser = new BufferReader(payload); + + var data = { + command: command + }; + + var i; + + switch (command) { + case 'version': // https://en.bitcoin.it/wiki/Protocol_specification#version + data.version = parser.readUInt32LE(); + data.services = parser.readUInt64LEBN(); + data.timestamp = parser.readUInt64LEBN(); + data.addr_me = parser.read(26); + data.addr_you = parser.read(26); + data.nonce = parser.read(8); + data.subversion = parser.readVarintBuf(); + data.start_height = parser.readUInt32LE(); + break; + + case 'inv': + case 'getdata': + data.count = parser.readVarintNum(); + + data.invs = []; + for (i = 0; i < data.count; i++) { + data.invs.push({ + type: parser.readUInt32LE(), + hash: parser.read(32) + }); + } + break; + + case 'headers': + data.count = parser.readVarintNum(); + + data.headers = []; + for (i = 0; i < data.count; i++) { + var header = Block.fromBufferReader(parser); + data.headers.push(header); + } + break; + + case 'block': + var block = Block.fromBufferReader(parser); + + data.block = block; + data.version = block.version; + data.prev_hash = block.prev_hash; + data.merkle_root = block.merkle_root; + data.timestamp = block.timestamp; + data.bits = block.bits; + data.nonce = block.nonce; + + data.txs = block.txs; + + data.size = payload.length; + break; + + case 'tx': + var tx = Transaction.fromBufferReader(parser); + return { + command: command, + version: tx.version, + lock_time: tx.lock_time, + ins: tx.ins, + outs: tx.outs, + tx: tx, + }; + + case 'getblocks': + case 'getheaders': + // parse out the version + data.version = parser.readUInt32LE(); + + // TODO: Limit block locator size? + // reference implementation limits to 500 results + var startCount = parser.readVarintNum(); + + data.starts = []; + for (i = 0; i < startCount; i++) { + data.starts.push(parser.read(32)); + } + data.stop = parser.read(32); + break; + + case 'addr': + var addrCount = parser.readVarintNum(); + + // Enforce a maximum number of addresses per message + if (addrCount > 1000) { + addrCount = 1000; + } + + data.addrs = []; + for (i = 0; i < addrCount; i++) { + // TODO: Time actually depends on the version of the other peer (>=31402) + data.addrs.push({ + time: parser.readUInt32LE(), + services: parser.readUInt64LEBN(), + ip: parser.read(16), + port: parser.readUInt16BE() + }); + } + break; + + case 'alert': + data.payload = parser.readVarintBuf(); + data.signature = parser.readVarintBuf(); + break; + + case 'ping': + if (this.recvVer > BIP0031_VERSION) { + data.nonce = parser.read(8); + } + break; + + case 'getaddr': + case 'verack': + case 'reject': + // Empty message, nothing to parse + break; + + default: + console.err('Connection.parseMessage(): Command not implemented', { + cmd: command + }); + + // This tells the calling function not to issue an event + return null; + } + + return data; +}; + +module.exports = Connection; \ No newline at end of file diff --git a/lib/util/buffer.js b/lib/util/buffer.js index 73a60b336..a84023e69 100644 --- a/lib/util/buffer.js +++ b/lib/util/buffer.js @@ -2,6 +2,7 @@ var buffer = require('buffer'); var assert = require('assert'); +var buffertools = require('buffertools'); var js = require('./js'); @@ -19,6 +20,11 @@ function equals(a, b) { } module.exports = { + NULL_HASH: buffertools.fill(new Buffer(32), 0), + EMPTY_BUFFER: new Buffer(0), + ZERO_VALUE: buffertools.fill(new Buffer(8), 0), + INT64_MAX: new Buffer('ffffffffffffffff', 'hex'), + /** * Returns true if the given argument is an instance of a buffer. Tests for * both node's Buffer and Uint8Array diff --git a/test/transport/connection.js b/test/transport/connection.js new file mode 100644 index 000000000..2b52b483c --- /dev/null +++ b/test/transport/connection.js @@ -0,0 +1,41 @@ +'use strict'; + +var chai = require('chai'); +var bitcore = require('../..'); + +var should = chai.should(); + +var ConnectionModule = bitcore.transport.Connection; +var Connection; +var nop = function() {}; + +describe.only('Connection', function() { + it('should initialze the main object', function() { + should.exist(ConnectionModule); + }); + it('should be able to create class', function() { + Connection = ConnectionModule; + should.exist(Connection); + }); + it('should be able to create instance', function() { + var mSocket = {server: null, addListener: nop}, + mPeer; + var c = new Connection(mSocket, mPeer); + should.exist(c); + }); + + if (typeof process !== 'undefined' && process.versions) { //node-only tests + it('should create a proxied socket if instructed', function() { + var mPeer; + var c = new Connection(null, mPeer, { + proxy: { host: 'localhost', port: 9050 } + }); + should.exist(c.socket); + }); + }; +}); + + + + +