diff --git a/lib/transport/message.js b/lib/transport/message.js index dd2adbaa2..4aaec145d 100644 --- a/lib/transport/message.js +++ b/lib/transport/message.js @@ -1,62 +1,175 @@ 'use strict'; +var Buffers = require('buffers'); +var buffertools = require('buffertools'); var Put = require('bufferput'); +var util = require('util'); var Block = require('../block'); var BufferReader = require('../encoding/bufferreader'); var BufferUtil = require('../util/buffer'); +var Hash = require('../crypto/hash'); var Random = require('../crypto/random'); var CONNECTION_NONCE = Random.getPseudoRandomBuffer(8); var PROTOCOL_VERSION = 70000; -var MESSAGES = { - 'version' : Version, - 'verack': VerAck, - 'inv': Inventory, - 'ping': Ping, - 'pong': Pong, - 'addr': Addresses, - 'getaddr': GetAddresses, - 'reject': Reject -} +/** + * Static helper for consuming a data buffer until the next message. + * + * @param{Network} network - the network object + * @param{Buffer} dataBuffer - the buffer to read from + * @returns{Message|undefined} A message or undefined if there is nothing to read. + */ +var parseMessage = function(network, dataBuffer) { + if (dataBuffer.length < 20) return; -module.exports.buildMessage = function(command, payload) { - var Message = MESSAGES[command]; - try { - console.log('Message Class', Message); - return new Message().fromBuffer(payload); - } catch (err) { - console.log('Error while parrsing message', command); - console.log(err); + // Search the next magic number + if (!discardUntilNextMessage(network, dataBuffer)) return; + + var PAYLOAD_START = 16; + var payloadLen = (dataBuffer.get(PAYLOAD_START)) + + (dataBuffer.get(PAYLOAD_START + 1) << 8) + + (dataBuffer.get(PAYLOAD_START + 2) << 16) + + (dataBuffer.get(PAYLOAD_START + 3) << 24); + + var messageLength = 24 + payloadLen; + if (dataBuffer.length < messageLength) return; + + var command = dataBuffer.slice(4, 16).toString('ascii').replace(/\0+$/, ''); + var payload = dataBuffer.slice(24, messageLength); + var checksum = dataBuffer.slice(20, 24); + + var checksumConfirm = Hash.sha256sha256(payload).slice(0, 4); + if (buffertools.compare(checksumConfirm, checksum) !== 0) { + dataBuffer.skip(messageLength); + return; + } + + dataBuffer.skip(messageLength); + return Message.buildMessage(command, payload); +}; + +module.exports.parseMessage = parseMessage; + +/** + * Internal function that discards data until founds the next message. + */ +function discardUntilNextMessage(network, dataBuffer) { + var magicNumber = network.networkMagic; + + var i = 0; + for (;;) { + // check if it's the beginning of a new message + var packageNumber = dataBuffer.slice(0, 4); + if (buffertools.compare(packageNumber, magicNumber) == 0) { + dataBuffer.skip(i); + return true; + } + + // did we reach the end of the buffer? + if (i > (dataBuffer.length - 4)) { + dataBuffer.skip(i); + return false; + } + + i++; // continue scanning } } -// ====== VERSION MESSAGE ====== +/** + * Abstract Message that knows how to parse and serialize itself. + * Concret subclases should implement {fromBuffer} and {getPayload} methods. + */ +function Message() {}; + +Message.COMMANDS = {}; + +Message.buildMessage = function(command, payload) { + try { + var CommandClass = Message.COMMANDS[command]; + return new CommandClass().fromBuffer(payload); + } catch (err) { + console.log('Error while parsing message', err); + throw err; + } +}; + +/** + * Parse instance state from buffer. + * + * @param{Buffer} payload - the buffer to read from + * @returns{Message} The same message instance + */ +Message.prototype.fromBuffer = function(payload) { + return this; +}; + +/** + * Serialize the payload into a buffer. + * + * @returns{Buffer} the serialized payload + */ +Message.prototype.getPayload = function() { + return BufferUtil.EMPTY_BUFFER; +}; + +/** + * Serialize the message into a buffer. + * + * @returns{Buffer} the serialized message + */ +Message.prototype.serialize = function(network) { + var magic = network.networkMagic; + var commandBuf = new Buffer(this.command, 'ascii'); + if (commandBuf.length > 12) throw 'Command name too long'; + + var payload = this.getPayload(); + var checksum = Hash.sha256sha256(payload).slice(0, 4); + + // -- HEADER -- + var message = new Put(); + message.put(magic); + message.put(commandBuf); + message.pad(12 - commandBuf.length); // zero-padded + message.word32le(payload.length); + message.put(checksum); + + // -- BODY -- + message.put(payload); + + return message.buffer(); +} + +/** + * Version Message + * + * @param{string} subversion - version of the client + * @param{Buffer} nonce - a random 8 bytes buffer + */ function Version(subversion, nonce) { this.command = 'version'; this.version = PROTOCOL_VERSION; this.subversion = subversion || '/BitcoinX:0.1/'; this.nonce = nonce || CONNECTION_NONCE; }; +util.inherits(Version, Message); Version.prototype.fromBuffer = function(payload) { - var message = new Version(); - var parser = new BufferReader(payload); - message.version = parser.readUInt32LE(); - message.services = parser.readUInt64LEBN(); - message.timestamp = parser.readUInt64LEBN(); - message.addr_me = parser.read(26); - message.addr_you = parser.read(26); - message.nonce = parser.read(8); - message.subversion = parser.readVarintBuf(); - message.start_height = parser.readUInt32LE(); + this.version = parser.readUInt32LE(); + this.services = parser.readUInt64LEBN(); + this.timestamp = parser.readUInt64LEBN(); + this.addr_me = parser.read(26); + this.addr_you = parser.read(26); + this.nonce = parser.read(8); + this.subversion = parser.readVarintBuf(); + this.start_height = parser.readUInt32LE(); - return message; + return this; }; -Version.prototype.serialize = function() { +Version.prototype.getPayload = function() { var put = new Put(); put.word32le(this.version); // version put.word64le(1); // services @@ -71,30 +184,33 @@ Version.prototype.serialize = function() { return put.buffer(); }; -module.exports.Version = Version; +module.exports.Version = Message.COMMANDS['version'] = Version; -// ====== INV MESSAGE ====== +/** + * Inv Message + * + * @param{Array} inventory - reported elements + */ function Inventory(inventory) { this.command = 'inv'; this.inventory = inventory || []; } +util.inherits(Inventory, Message); Inventory.prototype.fromBuffer = function(payload) { - var message = new Inventory(); - var parser = new BufferReader(payload); var count = parser.readVarintNum(); for (var i = 0; i < count; i++) { - message.inventory.push({ + this.inventory.push({ type: parser.readUInt32LE(), hash: parser.read(32) }); } - return message; + return this; }; -Inventory.prototype.serialize = function() { +Inventory.prototype.getPayload = function() { var put = new Put(); put.varint(this.inventory.length); @@ -106,9 +222,13 @@ Inventory.prototype.serialize = function() { return put.buffer(); }; -module.exports.Inventory = Inventory; +module.exports.Inventory = Message.COMMANDS['inv'] = Inventory; -// ====== GETDATA MESSAGE ====== +/** + * Getdata Message + * + * @param{Array} inventory - requested elements + */ function GetData(inventory) { this.command = 'getdata'; this.inventory = inventory || []; @@ -117,24 +237,33 @@ function GetData(inventory) { util.inherits(GetData, Inventory); module.exports.GetData = GetData; -// ====== PING MESSAGE ====== +/** + * Ping Message + * + * @param{Buffer} nonce - a random 8 bytes buffer + */ function Ping(nonce) { this.command = 'ping'; this.nonce = nonce || CONNECTION_NONCE; } +util.inherits(Ping, Message); Ping.prototype.fromBuffer = function(payload) { - var nonce = new BufferReader(payload).read(8); - return new Ping(nonce); + this.nonce = new BufferReader(payload).read(8); + return this; }; -Ping.prototype.serialize = function() { +Ping.prototype.getPayload = function() { return this.nonce; }; -module.exports.Ping = Ping; +module.exports.Ping = Message.COMMANDS['ping'] = Ping; -// ====== PONG MESSAGE ====== +/** + * Pong Message + * + * @param{Buffer} nonce - a random 8 bytes buffer + */ function Pong(nonce) { this.command = 'pong'; this.nonce = nonce || CONNECTION_NONCE; @@ -143,22 +272,25 @@ function Pong(nonce) { util.inherits(Pong, Ping); module.exports.Pong = Pong; -// ====== ADDR MESSAGE ====== -function Addresses(nonce) { +/** + * Addr Message + * + * @param{Array} addresses - array of know addresses + */ +function Addresses(addresses) { this.command = 'addr'; - this.addresses = []; + this.addresses = addresses || []; } +util.inherits(Addresses, Message); -Address.prototype.fromBuffer = function(payload) { - var message = new Address(); - +Addresses.prototype.fromBuffer = function(payload) { var parser = new BufferReader(payload); var addrCount = Math.min(parser.readVarintNum(), 1000); - message.addresses = []; + this.addresses = []; for (var i = 0; i < addrCount; i++) { // TODO: Time actually depends on the version of the other peer (>=31402) - message.addresses.push({ + this.addresses.push({ time: parser.readUInt32LE(), services: parser.readUInt64LEBN(), ip: parser.read(16), @@ -166,167 +298,177 @@ Address.prototype.fromBuffer = function(payload) { }); } - return message; + return this; }; -Address.prototype.serialize = function() { +Addresses.prototype.getPayload = function() { return BufferUtil.EMPTY_BUFFER; // TODO }; -module.exports.Address = Address; +module.exports.Addresses = Message.COMMANDS['addr'] = Addresses; -// ====== GETADDR MESSAGE ====== +/** + * GetAddr Message + * + */ function GetAddresses() { this.command = 'getaddr'; } -GetAddresses.prototype.fromBuffer = function() { - return new GetAddresses(); -}; +util.inherits(GetAddresses, Message); +module.exports.GetAddresses = Message.COMMANDS['getaddr'] = GetAddresses; -GetAddresses.prototype.serialize = function() { - return BufferUtil.EMPTY_BUFFER; -}; - -module.exports.GetAddresses = GetAddresses; - -// ====== VERACK MESSAGE ====== +/** + * Verack Message + * + */ function VerAck() { this.command = 'verack'; } -VerAck.prototype.fromBuffer = function() { - return new VerAck(); -}; +util.inherits(VerAck, Message); +module.exports.VerAck = Message.COMMANDS['verack'] = VerAck; -VerAck.prototype.serialize = function() { - return BufferUtil.EMPTY_BUFFER; -}; - -module.exports.VerAck = VerAck; - -// ====== REJECT MESSAGE ====== -// TODO: Parse REJECT message +/** + * Reject Message + * + */ function Reject() { this.command = 'reject'; } +util.inherits(Reject, Message); -Reject.prototype.fromBuffer = function() { - return new Reject(); -}; +// TODO: Parse REJECT message -Reject.prototype.serialize = function() { - return BufferUtil.EMPTY_BUFFER; -}; +module.exports.Reject = Message.COMMANDS['reject'] = Reject; -module.exports.Reject = Reject; - -// ====== ALERT MESSAGE ====== -function Alert(payload) { - this.command = 'reject'; +/** + * Alert Message + * + */ +function Alert() { + this.command = 'alert'; } +util.inherits(Alert, Message); Alert.prototype.fromBuffer = function() { - var message = new Alert(); - var parser = new BufferReader(payload); - message.payload = parser.readVarintBuf(); // TODO: Use current format - message.signature = parser.readVarintBuf(); - return message; + this.payload = parser.readVarintBuf(); // TODO: Use current format + this.signature = parser.readVarintBuf(); + return this; }; -Alert.prototype.serialize = function() { +Alert.prototype.getPayload = function() { return BufferUtil.EMPTY_BUFFER; // TODO: Serialize }; -module.exports.Alert = Alert; +module.exports.Alert = Message.COMMANDS['alert'] = Alert; -// ====== HEADERS MESSAGE ====== +/** + * Headers Message + * + * @param{Array} blockheaders - array of block headers + */ function Headers(blockheaders) { this.command = 'headers'; this.headers = blockheaders || []; } +util.inherits(Headers, Message); Headers.prototype.fromBuffer = function() { - var message = new Headers(); - var parser = new BufferReader(payload); var count = parser.readVarintNum(); - message.headers = []; + this.headers = []; for (i = 0; i < count; i++) { var header = Block().fromBufferReader(parser); - message.headers.push(header); + this.headers.push(header); } - return message; + return this; }; -Headers.prototype.serialize = function() { +Headers.prototype.getPayload = function() { return BufferUtil.EMPTY_BUFFER; // TODO: Serialize }; -module.exports.Headers = Headers; +module.exports.Headers = Message.COMMANDS['headers'] = Headers; -// ====== BLOCK MESSAGE ====== +/** + * Block Message + * + * @param{Block} block + */ function Block(block) { this.command = 'block'; this.block = block; } +util.inherits(Block, Message); Block.prototype.fromBuffer = function() { var parser = new BufferReader(payload); - var block = Block().fromBufferReader(parser); - return new Block(block); + this.block = Block().fromBufferReader(parser); + return this; }; -Block.prototype.serialize = function() { +Block.prototype.getPayload = function() { return BufferUtil.EMPTY_BUFFER; // TODO: Serialize }; -module.exports.Block = Block; +module.exports.Block = Message.COMMANDS['block'] = Block; -// ====== TX MESSAGE ====== +/** + * Tx Message + * + * @param{Transaction} transaction + */ function Transaction(transaction) { this.command = 'tx'; this.transaction = transaction; } +util.inherits(Transaction, Message); Transaction.prototype.fromBuffer = function() { var parser = new BufferReader(payload); - var transaction = Transaction().fromBufferReader(parser); - return new Transaction(transaction); + this.transaction = Transaction().fromBufferReader(parser); + return this; }; -Transaction.prototype.serialize = function() { +Transaction.prototype.getPayload = function() { return BufferUtil.EMPTY_BUFFER; // TODO: Serialize }; -module.exports.Transaction = Transaction; +module.exports.Transaction = Message.COMMANDS['tx'] = Transaction; -// ====== GETBLOCKS MESSAGE ====== +/** + * Getblocks Message + * + * @param{Array} starts - array of buffers with the starting block hashes + * @param{Buffer} [stop] - hash of the last block + */ function GetBlocks(starts, stop) { this.command = 'getblocks'; this.version = PROTOCOL_VERSION; this.starts = starts || []; this.stop = stop || BufferUtil.NULL_HASH; } +util.inherits(GetBlocks, Message); GetBlocks.prototype.fromBuffer = function() { - var message = new GetBlocks(); - var parser = new BufferReader(payload); - message.version = parser.readUInt32LE(); + this.version = parser.readUInt32LE(); var startCount = Math.min(parser.readVarintNum(), 500); - message.starts = []; + this.starts = []; for (var i = 0; i < startCount; i++) { - message.starts.push(parser.read(32)); + this.starts.push(parser.read(32)); } - message.stop = parser.read(32); + this.stop = parser.read(32); + + return this; }; -GetBlocks.prototype.serialize = function() { +GetBlocks.prototype.getPayload = function() { var put = new Put(); put.word32le(this.version); put.varint(this.starts.length); @@ -346,9 +488,14 @@ GetBlocks.prototype.serialize = function() { return put.buffer(); }; -module.exports.GetBlocks = GetBlocks; +module.exports.GetBlocks = Message.COMMANDS['getblocks'] = GetBlocks; -// ====== GETHEADERS MESSAGE ====== +/** + * Getheaders Message + * + * @param{Array} starts - array of buffers with the starting block hashes + * @param{Buffer} [stop] - hash of the last block + */ function GetHeaders(starts, stop) { this.command = 'getheaders'; this.version = PROTOCOL_VERSION; @@ -357,4 +504,21 @@ function GetHeaders(starts, stop) { } util.inherits(GetHeaders, GetBlocks); -module.exports.GetHeaders = GetHeaders; +module.exports.GetHeaders = Message.COMMANDS['getheaders'] = GetHeaders; + + +// TODO: Remove this PATCH (yemel) +Buffers.prototype.skip = function (i) { + if (i == 0) return; + + if (i == this.length) { + this.buffers = []; + this.length = 0; + return; + } + + var pos = this.pos(i); + this.buffers = this.buffers.slice(pos.buf); + this.buffers[0] = new Buffer(this.buffers[0].slice(pos.offset)); + this.length -= i; +}; diff --git a/lib/transport/peer2.js b/lib/transport/peer2.js index 525e1063d..3b38bc19b 100644 --- a/lib/transport/peer2.js +++ b/lib/transport/peer2.js @@ -1,16 +1,14 @@ 'use strict'; -var util = require('util'); +var Buffers = require('buffers'); +var EventEmitter = require('events').EventEmitter; var Net = require('net'); var Put = require('bufferput'); -var Buffers = require('buffers'); -var buffertools = require('buffertools'); var Socks5Client = require('socks5-client'); -var EventEmitter = require('events').EventEmitter; +var util = require('util'); var Networks = require('../networks'); -var Hash = require('../crypto/hash'); -var Message = require('./message'); +var Messages = require('./message'); var MAX_RECEIVE_BUFFER = 10000000; @@ -143,114 +141,38 @@ Peer.prototype.disconnect = function() { }; /** - * Internal function that tries to read a message from the data buffer + * Send a Message to the remote peer. */ -Peer.prototype._readMessage = function() { - if (this.dataBuffer.length < 20) return; - var magic = this.network.networkMagic; - - // Search the next magic number - if (!this._discardUntilNextMessage()) return; - - var PAYLOAD_START = 16; - var payloadLen = (this.dataBuffer.get(PAYLOAD_START)) + - (this.dataBuffer.get(PAYLOAD_START + 1) << 8) + - (this.dataBuffer.get(PAYLOAD_START + 2) << 16) + - (this.dataBuffer.get(PAYLOAD_START + 3) << 24); - - var messageLength = 24 + payloadLen; - if (this.dataBuffer.length < messageLength) return; - - var command = this.dataBuffer.slice(4, 16).toString('ascii').replace(/\0+$/, ''); - var payload = this.dataBuffer.slice(24, messageLength); - var checksum = this.dataBuffer.slice(20, 24); - - var checksumConfirm = Hash.sha256sha256(payload).slice(0, 4); - if (buffertools.compare(checksumConfirm, checksum) !== 0) { - this.dataBuffer.skip(messageLength); - return; - } - - console.log('we have a message:', command); - var message = Message.buildMessage(command, payload); - if (message) this.emit(command, message); - console.log('Emiting message', command, message); - - this.dataBuffer.skip(messageLength); - this._readMessage(); +Peer.prototype.sendMessage = function(message) { + this.socket.write(message.serialize(this.network)); }; -/** - * Internal function that discards data until founds the next message. - */ -Peer.prototype._discardUntilNextMessage = function() { - var magicNumber = this.network.networkMagic; - - var i = 0; - for (;;) { - // check if it's the beginning of a new message - var packageNumber = this.dataBuffer.slice(0, 4); - if (buffertools.compare(packageNumber, magicNumber) == 0) { - this.dataBuffer.skip(i); - return true; - } - - // did we reach the end of the buffer? - if (i > (this.dataBuffer.length - 4)) { - this.dataBuffer.skip(i); - return false; - } - - i++; // continue scanning - } -} - /** * Internal function that sends VERSION message to the remote peer. */ Peer.prototype._sendVersion = function() { - var message = new Message.Version(); - this._sendMessage(message.command, message.serialize()); -}; - -/** - * Send a PING message to the remote peer. - */ -Peer.prototype.sendPing = function(nonce) { - var message = new Message.Pong(nonce); - this._sendMessage(message.command, message.serialize()); + var message = new Messages.Version(); + this.sendMessage(message); }; /** * Send a PONG message to the remote peer. */ -Peer.prototype.sendPong = function(nonce) { - var message = new Message.Pong(nonce); - this._sendMessage(message.command, message.serialize()); +Peer.prototype._sendPong = function(nonce) { + var message = new Messages.Pong(nonce); + this.sendMessage(message); }; /** - * Internal function that sends a message to the remote peer. + * Internal function that tries to read a message from the data buffer */ -Peer.prototype._sendMessage = function(command, payload) { - var magic = this.network.networkMagic; - var commandBuf = new Buffer(command, 'ascii'); - if (commandBuf.length > 12) throw 'Command name too long'; +Peer.prototype._readMessage = function() { + var message = Messages.parseMessage(this.network, this.dataBuffer); - var checksum = Hash.sha256sha256(payload).slice(0, 4); - - // -- HEADER -- - var message = new Put(); - message.put(magic); // magic bytes - message.put(commandBuf); // command name - message.pad(12 - commandBuf.length); // zero-padded - message.word32le(payload.length); // payload length - message.put(checksum); // checksum - - // -- BODY -- - message.put(payload); // payload data - - this.socket.write(message.buffer()); + if (message) { + this.emit(message.command, message); + this._readMessage(); + } }; /** @@ -266,21 +188,4 @@ Peer.prototype._getSocket = function() { return new Net.Socket(); }; - -// TODO: Remove this PATCH (yemel) -Buffers.prototype.skip = function (i) { - if (i == 0) return; - - if (i == this.length) { - this.buffers = []; - this.length = 0; - return; - } - - var pos = this.pos(i); - this.buffers = this.buffers.slice(pos.buf); - this.buffers[0] = new Buffer(this.buffers[0].slice(pos.offset)); - this.length -= i; -}; - module.exports = Peer;