Progress on sending and receiving messages
This commit is contained in:
parent
a1e8f3c596
commit
a9e54a7856
1
index.js
1
index.js
|
@ -28,6 +28,7 @@ bitcore.util.js = require('./lib/util/js');
|
|||
bitcore.transport = {};
|
||||
bitcore.transport.Connection = require('./lib/transport/connection');
|
||||
bitcore.transport.Peer = require('./lib/transport/peer');
|
||||
bitcore.transport.Peer2 = require('./lib/transport/peer2');
|
||||
bitcore.transport.PeerManager = require('./lib/transport/peermanager');
|
||||
|
||||
// errors thrown by the library
|
||||
|
|
|
@ -9,6 +9,8 @@ var _ = require('lodash');
|
|||
*/
|
||||
function Network() {}
|
||||
|
||||
var hex = function(hex) { return new Buffer(hex, 'hex'); };
|
||||
|
||||
/**
|
||||
* @instance
|
||||
* @member Network#livenet
|
||||
|
@ -22,6 +24,7 @@ _.extend(livenet, {
|
|||
scripthash: 0x05,
|
||||
xpubkey: 0x0488b21e,
|
||||
xprivkey: 0x0488ade4,
|
||||
networkMagic: hex('f9beb4d9'),
|
||||
port: 8333,
|
||||
dnsSeeds: [
|
||||
'seed.bitcoin.sipa.be',
|
||||
|
@ -46,6 +49,7 @@ _.extend(testnet, {
|
|||
scripthash: 0xc4,
|
||||
xpubkey: 0x043587cf,
|
||||
xprivkey: 0x04358394,
|
||||
networkMagic: hex('0b110907'),
|
||||
port: 18333,
|
||||
dnsSeeds: [
|
||||
'testnet-seed.bitcoin.petertodd.org',
|
||||
|
|
|
@ -0,0 +1,182 @@
|
|||
'use strict';
|
||||
|
||||
var Put = require('bufferput');
|
||||
|
||||
var Random = require('../crypto/random');
|
||||
var BufferReader = require('../encoding/bufferreader');
|
||||
var Block = require('../block');
|
||||
|
||||
var CONNECTION_NONCE = Random.getPseudoRandomBuffer(8);
|
||||
var PROTOCOL_VERSION = 70000;
|
||||
|
||||
var MESSAGES = {
|
||||
'version' : Version,
|
||||
'verack': VerAck,
|
||||
'inv': Inventory,
|
||||
'ping': Ping,
|
||||
'pong': Pong
|
||||
}
|
||||
|
||||
module.exports.buildMessage = function(command, payload) {
|
||||
var Message = MESSAGES[command];
|
||||
try {
|
||||
console.log('Message Class', Message);
|
||||
return Message.fromBuffer(payload);
|
||||
} catch (err) {
|
||||
console.log('Error while parrsing message', command);
|
||||
console.log(err);
|
||||
}
|
||||
}
|
||||
|
||||
// ====== VERSION MESSAGE ======
|
||||
function Version(subversion, nonce) {
|
||||
this.command = 'version';
|
||||
this.subversion = subversion || '/BitcoinX:0.1/';
|
||||
this.nonce = nonce || CONNECTION_NONCE;
|
||||
}
|
||||
|
||||
Version.fromBuffer = function(payload) {
|
||||
var message = new Version();
|
||||
|
||||
var parser = new BufferReader(payload);
|
||||
message.version = parser.readUInt32LE();
|
||||
message.services = parser.readUInt64LEBN();
|
||||
message.timestamp = parser.readUInt64LEBN();
|
||||
message.addr_me = parser.read(26);
|
||||
message.addr_you = parser.read(26);
|
||||
message.nonce = parser.read(8);
|
||||
message.subversion = parser.readVarintBuf();
|
||||
message.start_height = parser.readUInt32LE();
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
Version.prototype.serialize = function() {
|
||||
var put = new Put();
|
||||
put.word32le(PROTOCOL_VERSION); // version
|
||||
put.word64le(1); // services
|
||||
put.word64le(Math.round(new Date().getTime() / 1000)); // timestamp
|
||||
put.pad(26); // addr_me
|
||||
put.pad(26); // addr_you
|
||||
put.put(this.nonce);
|
||||
put.varint(this.subversion.length);
|
||||
put.put(new Buffer(this.subversion, 'ascii'));
|
||||
put.word32le(0);
|
||||
|
||||
return put.buffer();
|
||||
}
|
||||
|
||||
module.exports.Version = Version;
|
||||
|
||||
// ====== INV MESSAGE ======
|
||||
function Inventory(inventory) {
|
||||
this.command = 'inv';
|
||||
this.inventory = inventory || [];
|
||||
}
|
||||
|
||||
Inventory.fromBuffer = function(payload) {
|
||||
var message = new Inventory();
|
||||
|
||||
var parser = new BufferReader(payload);
|
||||
var count = parser.readVarintNum();
|
||||
for (var i = 0; i < count; i++) {
|
||||
message.inventory.push({
|
||||
type: parser.readUInt32LE(),
|
||||
hash: parser.read(32)
|
||||
});
|
||||
}
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
Inventory.prototype.serialize = function() {
|
||||
var put = new Put();
|
||||
|
||||
put.varint(this.inventory.length);
|
||||
this.inventory.forEach(function(value) {
|
||||
value instanceof Block ? put.word32le(2) : put.word32le(1);
|
||||
put.put(value.getHash());
|
||||
});
|
||||
|
||||
return put.buffer();
|
||||
}
|
||||
|
||||
|
||||
// ====== PING/PONG MESSAGE ======
|
||||
function Ping(nonce) {
|
||||
this.command = 'ping';
|
||||
this.nonce = nonce || CONNECTION_NONCE;
|
||||
}
|
||||
|
||||
Ping.fromBuffer = function(payload) {
|
||||
var nonce = new BufferReader(payload).read(8);
|
||||
return new Ping(nonce);
|
||||
}
|
||||
|
||||
Ping.prototype.serialize = function() {
|
||||
return this.nonce;
|
||||
}
|
||||
|
||||
function Pong(nonce) {
|
||||
this.command = 'pong';
|
||||
this.nonce = nonce || CONNECTION_NONCE;
|
||||
}
|
||||
|
||||
Pong.fromBuffer = Ping.fromBuffer;
|
||||
Pong.prototype.serialize = Ping.prototype.serialize;
|
||||
|
||||
|
||||
// ====== VARIOUS MESSAGE ======
|
||||
|
||||
|
||||
|
||||
function GetAddr() {};
|
||||
|
||||
function VerAck() {};
|
||||
VerAck.fromBuffer = function() {
|
||||
return new VerAck();
|
||||
}
|
||||
|
||||
function Reject() {};
|
||||
|
||||
function Ping(payload) {
|
||||
var parser = new BufferReader(payload);
|
||||
|
||||
this.nonce = parser.read(8);
|
||||
};
|
||||
|
||||
// ====== PING MESSAGE ======
|
||||
function Address(payload) {
|
||||
var parser = new BufferReader(payload);
|
||||
|
||||
var addrCount = parser.readVarintNum();
|
||||
addrCount = Math.min(addrCount, 1000);
|
||||
|
||||
this.addresses = [];
|
||||
for (i = 0; i < addrCount; i++) {
|
||||
// TODO: Time actually depends on the version of the other peer (>=31402)
|
||||
this.addresses.push({
|
||||
time: parser.readUInt32LE(),
|
||||
services: parser.readUInt64LEBN(),
|
||||
ip: parser.read(16), // TODO: Parse IP Address
|
||||
port: parser.readUInt16BE()
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
function GetHeaders(payload) {
|
||||
var parser = new BufferReader(payload);
|
||||
|
||||
this.version = parser.readUInt32LE();
|
||||
|
||||
var startCount = parser.readVarintNum();
|
||||
startCount = Math.min(startCount, 500);
|
||||
|
||||
this.starts = [];
|
||||
for (i = 0; i < startCount; i++) {
|
||||
this.starts.push(parser.read(32));
|
||||
}
|
||||
|
||||
this.stop = parser.read(32);
|
||||
}
|
||||
|
|
@ -0,0 +1,285 @@
|
|||
'use strict';
|
||||
|
||||
var Net = require('net');
|
||||
var Put = require('bufferput');
|
||||
var Buffers = require('buffers');
|
||||
var buffertools = require('buffertools');
|
||||
var Socks5Client = require('socks5-client');
|
||||
var EventEmitter = require('events').EventEmitter;
|
||||
|
||||
var Networks = require('../networks');
|
||||
var Hash = require('../crypto/hash');
|
||||
var Message = require('./message');
|
||||
|
||||
var MAX_RECEIVE_BUFFER = 10000000;
|
||||
|
||||
/**
|
||||
* A Peer instance represents a remote bitcoin node and allows to communicate
|
||||
* with it using the standar messages of the bitcoin p2p protocol.
|
||||
*
|
||||
* @example
|
||||
*
|
||||
* var peer = new Peer('127.0.0.1').setProxy('127.0.0.1', 9050);
|
||||
* peer.on('tx', function(tx) {
|
||||
* console.log('New transaction: ', tx.id);
|
||||
* });
|
||||
* peer.connect();
|
||||
*
|
||||
* @param {String} host - IP address of the remote host
|
||||
* @param {Number} [port] - Port number of the remote host
|
||||
* @param {Network} [network] - The context for this communication
|
||||
* @returns {Peer} A new instance of Peer.
|
||||
* @constructor
|
||||
*/
|
||||
function Peer(host, port, network) {
|
||||
if (!(this instanceof Peer)) {
|
||||
return new Peer(host, port, network);
|
||||
}
|
||||
|
||||
// overloading stuff
|
||||
if (port instanceof Object && !network) {
|
||||
network = port;
|
||||
port = undefined;
|
||||
}
|
||||
|
||||
this.host = host;
|
||||
this.status = Peer.STATUS.DISCONNECTED;
|
||||
this.network = network || Networks.livenet;
|
||||
this.port = port || this.network.port;
|
||||
|
||||
this.dataBuffer = new Buffers();
|
||||
|
||||
this.version = 0;
|
||||
this.bestHeight = 0;
|
||||
this.subversion = null;
|
||||
|
||||
// set message handlers
|
||||
var self = this;
|
||||
this.on('verack', function() {
|
||||
self.status = Peer.STATUS.READY;
|
||||
self.emit('ready');
|
||||
});
|
||||
|
||||
this.on('version', function(message) {
|
||||
self.version = message.version;
|
||||
self.subversion = message.subversion;
|
||||
self.bestHeight = message.start_height
|
||||
});
|
||||
|
||||
this.on('ping', function(message) {
|
||||
self.sendPong(message.nonce);
|
||||
});
|
||||
|
||||
}
|
||||
util.inherits(Peer, EventEmitter);
|
||||
|
||||
Peer.STATUS = {
|
||||
DISCONNECTED: 'disconnected',
|
||||
CONNECTING: 'connecting',
|
||||
CONNECTED: 'connected',
|
||||
READY: 'ready'
|
||||
};
|
||||
|
||||
/**
|
||||
* Set a socks5 proxy for the connection. Enables the use of the TOR network.
|
||||
*
|
||||
* @param {String} host - IP address of the proxy
|
||||
* @param {Number} port - Port number of the proxy
|
||||
* @returns {Peer} The same Peer instance.
|
||||
*/
|
||||
Peer.prototype.setProxy = function(host, port) {
|
||||
if (this.status != Peer.STATUS.DISCONNECTED) {
|
||||
throw Error('Invalid State');
|
||||
}
|
||||
|
||||
this.proxy = {
|
||||
host: host,
|
||||
port: port
|
||||
};
|
||||
return this;
|
||||
};
|
||||
|
||||
/**
|
||||
* Init the connection with the remote peer.
|
||||
*
|
||||
* @returns {Socket} The same peer instance.
|
||||
*/
|
||||
Peer.prototype.connect = function() {
|
||||
this.socket = this._getSocket();
|
||||
this.status = Peer.STATUS.CONNECTING;
|
||||
|
||||
var self = this;
|
||||
this.socket.on('connect', function(ev) {
|
||||
self.status = Peer.STATUS.CONNECTED;
|
||||
self.emit('connect');
|
||||
self._sendVersion();
|
||||
});
|
||||
|
||||
this.socket.on('error', self.disconnect.bind(this));
|
||||
this.socket.on('end', self.disconnect.bind(this));
|
||||
|
||||
this.socket.on('data', function(data) {
|
||||
self.dataBuffer.push(data);
|
||||
|
||||
if (self.dataBuffer.length > MAX_RECEIVE_BUFFER) return self.disconnect();
|
||||
self._readMessage();
|
||||
});
|
||||
|
||||
this.socket.connect(this.port, this.host);
|
||||
return this;
|
||||
};
|
||||
|
||||
/**
|
||||
* Disconnects the remote connection.
|
||||
*
|
||||
* @returns {Socket} The same peer instance.
|
||||
*/
|
||||
Peer.prototype.disconnect = function() {
|
||||
this.status = Peer.STATUS.DISCONNECTED;
|
||||
this.socket.destroy();
|
||||
this.emit('disconnect');
|
||||
return this;
|
||||
};
|
||||
|
||||
/**
|
||||
* Internal function that tries to read a message from the data buffer
|
||||
*/
|
||||
Peer.prototype._readMessage = function() {
|
||||
if (this.dataBuffer.length < 20) return;
|
||||
var magic = this.network.networkMagic;
|
||||
|
||||
// Search the next magic number
|
||||
if (!this._discardUntilNextMessage()) return;
|
||||
|
||||
var PAYLOAD_START = 16;
|
||||
var payloadLen = (this.dataBuffer.get(PAYLOAD_START)) +
|
||||
(this.dataBuffer.get(PAYLOAD_START + 1) << 8) +
|
||||
(this.dataBuffer.get(PAYLOAD_START + 2) << 16) +
|
||||
(this.dataBuffer.get(PAYLOAD_START + 3) << 24);
|
||||
|
||||
var messageLength = 24 + payloadLen;
|
||||
if (this.dataBuffer.length < messageLength) return;
|
||||
|
||||
var command = this.dataBuffer.slice(4, 16).toString('ascii').replace(/\0+$/, '');
|
||||
var payload = this.dataBuffer.slice(24, messageLength);
|
||||
var checksum = this.dataBuffer.slice(20, 24);
|
||||
|
||||
var checksumConfirm = Hash.sha256sha256(payload).slice(0, 4);
|
||||
if (buffertools.compare(checksumConfirm, checksum) !== 0) {
|
||||
this.dataBuffer.skip(messageLength);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('we have a message:', command);
|
||||
var message = Message.buildMessage(command, payload);
|
||||
if (message) this.emit(command, message);
|
||||
console.log('Emiting message', command, message);
|
||||
|
||||
this.dataBuffer.skip(messageLength);
|
||||
this._readMessage();
|
||||
};
|
||||
|
||||
/**
|
||||
* Internal function that discards data until founds the next message.
|
||||
*/
|
||||
Peer.prototype._discardUntilNextMessage = function() {
|
||||
var magicNumber = this.network.networkMagic;
|
||||
|
||||
var i = 0;
|
||||
for (;;) {
|
||||
// check if it's the beginning of a new message
|
||||
var packageNumber = this.dataBuffer.slice(0, 4);
|
||||
if (buffertools.compare(packageNumber, magicNumber) == 0) {
|
||||
this.dataBuffer.skip(i);
|
||||
return true;
|
||||
}
|
||||
|
||||
// did we reach the end of the buffer?
|
||||
if (i > (this.dataBuffer.length - 4)) {
|
||||
this.dataBuffer.skip(i);
|
||||
return false;
|
||||
}
|
||||
|
||||
i++; // continue scanning
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal function that sends VERSION message to the remote peer.
|
||||
*/
|
||||
Peer.prototype._sendVersion = function() {
|
||||
var message = new Message.Version();
|
||||
this._sendMessage(message.command, message.serialize());
|
||||
};
|
||||
|
||||
/**
|
||||
* Send a PING message to the remote peer.
|
||||
*/
|
||||
Peer.prototype.sendPing = function(nonce) {
|
||||
var message = new Message.Pong(nonce);
|
||||
this._sendMessage(message.command, message.serialize());
|
||||
};
|
||||
|
||||
/**
|
||||
* Send a PONG message to the remote peer.
|
||||
*/
|
||||
Peer.prototype.sendPong = function(nonce) {
|
||||
var message = new Message.Pong(nonce);
|
||||
this._sendMessage(message.command, message.serialize());
|
||||
};
|
||||
|
||||
/**
|
||||
* Internal function that sends a message to the remote peer.
|
||||
*/
|
||||
Peer.prototype._sendMessage = function(command, payload) {
|
||||
var magic = this.network.networkMagic;
|
||||
var commandBuf = new Buffer(command, 'ascii');
|
||||
if (commandBuf.length > 12) throw 'Command name too long';
|
||||
|
||||
var checksum = Hash.sha256sha256(payload).slice(0, 4);
|
||||
|
||||
// -- HEADER --
|
||||
var message = new Put();
|
||||
message.put(magic); // magic bytes
|
||||
message.put(commandBuf); // command name
|
||||
message.pad(12 - commandBuf.length); // zero-padded
|
||||
message.word32le(payload.length); // payload length
|
||||
message.put(checksum); // checksum
|
||||
|
||||
// -- BODY --
|
||||
message.put(payload); // payload data
|
||||
|
||||
this.socket.write(message.buffer());
|
||||
};
|
||||
|
||||
/**
|
||||
* Internal function that creates a socket using a proxy if neccesary.
|
||||
*
|
||||
* @returns {Socket} A Socket instance not yet connected.
|
||||
*/
|
||||
Peer.prototype._getSocket = function() {
|
||||
if (this.proxy) {
|
||||
return new Socks5Client(this.proxy.host, this.proxy.port);
|
||||
}
|
||||
|
||||
return new Net.Socket();
|
||||
};
|
||||
|
||||
|
||||
// TODO: Remove this PATCH (yemel)
|
||||
Buffers.prototype.skip = function (i) {
|
||||
if (i == 0) return;
|
||||
|
||||
if (i == this.length) {
|
||||
this.buffers = [];
|
||||
this.length = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
var pos = this.pos(i);
|
||||
this.buffers = this.buffers.slice(pos.buf);
|
||||
this.buffers[0] = new Buffer(this.buffers[0].slice(pos.offset));
|
||||
this.length -= i;
|
||||
};
|
||||
|
||||
module.exports = Peer;
|
|
@ -0,0 +1,73 @@
|
|||
'use strict';
|
||||
|
||||
var chai = require('chai');
|
||||
var Net = require('net');
|
||||
var Socks5Client = require('socks5-client');
|
||||
|
||||
var should = chai.should();
|
||||
var expect = chai.expect;
|
||||
|
||||
var bitcore = require('../..');
|
||||
var Peer = bitcore.transport.Peer2;
|
||||
var Networks = bitcore.Networks;
|
||||
|
||||
describe.only('Peer2', function() {
|
||||
|
||||
it('should be able to create instance', function() {
|
||||
var peer = new Peer('localhost');
|
||||
peer.host.should.equal('localhost');
|
||||
peer.network.should.equal(Networks.livenet);
|
||||
peer.port.should.equal(Networks.livenet.port);
|
||||
});
|
||||
|
||||
it('should be able to create instance setting a port', function() {
|
||||
var peer = new Peer('localhost', 8111);
|
||||
peer.host.should.equal('localhost');
|
||||
peer.network.should.equal(Networks.livenet);
|
||||
peer.port.should.equal(8111);
|
||||
});
|
||||
|
||||
it('should be able to create instance setting a network', function() {
|
||||
var peer = new Peer('localhost', Networks.testnet);
|
||||
peer.host.should.equal('localhost');
|
||||
peer.network.should.equal(Networks.testnet);
|
||||
peer.port.should.equal(Networks.testnet.port);
|
||||
});
|
||||
|
||||
it('should be able to create instance setting port and network', function() {
|
||||
var peer = new Peer('localhost', 8111, Networks.testnet);
|
||||
peer.host.should.equal('localhost');
|
||||
peer.network.should.equal(Networks.testnet);
|
||||
peer.port.should.equal(8111);
|
||||
});
|
||||
|
||||
it('should support creating instance without new', function() {
|
||||
var peer = Peer('localhost', 8111, Networks.testnet);
|
||||
peer.host.should.equal('localhost');
|
||||
peer.network.should.equal(Networks.testnet);
|
||||
peer.port.should.equal(8111);
|
||||
});
|
||||
|
||||
it('should be able to set a proxy', function() {
|
||||
var peer, peer2, socket;
|
||||
|
||||
peer = new Peer('localhost');
|
||||
expect(peer.proxy).to.be.undefined;
|
||||
socket = peer._getSocket();
|
||||
socket.should.be.instanceof(Net.Socket);
|
||||
|
||||
peer2 = peer.setProxy('127.0.0.1', 9050);
|
||||
peer2.proxy.host.should.equal('127.0.0.1');
|
||||
peer2.proxy.port.should.equal(9050);
|
||||
socket = peer2._getSocket();
|
||||
socket.should.be.instanceof(Socks5Client);
|
||||
|
||||
peer.should.equal(peer2);
|
||||
});
|
||||
|
||||
it('should create connection', function() {
|
||||
var peer = new Peer('localhost').connect();
|
||||
});
|
||||
|
||||
|
||||
});
|
Loading…
Reference in New Issue