var imports = require('soop').imports(); var EventEmitter= imports.EventEmitter || require('events').EventEmitter; /* * Emits * 'networkChange' * when network layout has change (new/lost peers, etc) * * 'data' * when an unknown data type arrives * * Provides * send(toPeerIds, {data}, cb?) * */ function Network(opts) { opts = opts || {}; this.peerId = opts.peerId; this.apiKey = opts.apiKey || 'lwjd5qra8257b9'; this.debug = opts.debug || 3; this.maxPeers = opts.maxPeers || 5; this.connectedPeers = []; } Network.parent=EventEmitter; // Array helpers Network._arrayDiff = function(a, b) { var seen = []; var diff = []; for (var i = 0; i < b.length; i++) seen[b[i]] = true; for (var j = 0; j < a.length; j++) if (!seen[a[j]]) diff.push(a[j]); return diff; }; Network._inArray = function(el, array) { return array.indexOf(el) > -1; }; Network._arrayPushOnce = function(el, array) { var ret = false; if (!Network._inArray(el, array)) { array.push(el); ret = true; } return ret; }; Network._arrayRemove = function(el, array) { var pos = array.indexOf(el); if (pos >= 0) array.splice(pos, 1); return array; }; // DEBUG Network.prototype._showConnectedPeers = function() { console.log("### CONNECTED PEERS", this.connectedPeers); }; Network.prototype._onClose = function(peerId) { console.log('[Network.js.70] _onClose'); //TODO this.connectedPeers = Network._arrayRemove(peerId, this.connectedPeers); this._notify(); }; Network.prototype._connectToPeers = function(peerIds) { var self = this; var ret = false; var arrayDiff1= Network._arrayDiff(peerIds, this.connectedPeers); var arrayDiff = Network._arrayDiff(arrayDiff1, [this.peerId]); arrayDiff.forEach(function(peerId) { console.log('### CONNECTING TO:', peerId); self.connectTo(peerId); ret = true; }); return ret; }; Network.prototype._onData = function(data, isInbound) { var obj; try { obj = JSON.parse(data); } catch (e) { console.log('### ERROR ON DATA: "%s" ', data, isInbound, e); return; }; console.log('### RECEIVED TYPE: %s FROM %s', obj.data.type, obj.sender, obj.data); switch(obj.data.type) { case 'peerList': this._connectToPeers(obj.data.peers); this._notify(); break; case 'disconnect': this._onClose(obj.sender); break; default: this.emit('data', obj.sender, obj.data, isInbound); } }; Network.prototype._sendPeers = function(peerIds) { console.log('#### SENDING PEER LIST: ', this.connectedPeers, ' TO ', peerIds?peerIds: 'ALL'); this.send(peerIds, { type: 'peerList', peers: this.connectedPeers, }); }; Network.prototype._addPeer = function(peerId, isInbound) { var hasChanged = Network._arrayPushOnce(peerId, this.connectedPeers); if (isInbound && hasChanged) { this._sendPeers(); //broadcast peer list } else { if (isInbound) { this._sendPeers(peerId); } } }; Network.prototype._setupConnectionHandlers = function( dataConn, isInbound, openCallback, closeCallback) { var self=this; dataConn.on('open', function() { if (!Network._inArray(dataConn.peer, self.connectedPeers)) { console.log('### DATA CONNECTION READY TO: ADDING PEER: %s (inbound: %s)', dataConn.peer, isInbound); self._addPeer(dataConn.peer, isInbound); self._notify( isInbound ? dataConn.peer : null); if (typeof openCallback === 'function') openCallback(); } }); dataConn.on('data', function(data) { self._onData(data, isInbound); }); dataConn.on('error', function(e) { console.log('### DATA ERROR',e ); //TODO }); dataConn.on('close', function() { if (self.closing) return; console.log('### CLOSE RECV FROM:', dataConn.peer); self._onClose(dataConn.peer); if (typeof closeCallback === 'function') closeCallback(); }); }; Network.prototype._notify = function(newPeer) { console.log('[Network.js.168:_notify:]'); //TODO this._showConnectedPeers(); this.emit('networkChange', newPeer); }; Network.prototype._setupPeerHandlers = function(openCallback) { var self=this; var p = this.peer; p.on('open', function(peerId) { console.log('### PEER OPEN. I AM:' + peerId); self.peerId = peerId; self.connectedPeers = [peerId]; self._notify(); return openCallback(peerId); }); p.on('error', function(err) { console.log('### PEER ERROR:', err); self.peer.disconnect(); self.peer.destroy(); self.peer = null; this.emit('abort'); }); p.on('connection', function(dataConn) { console.log('### NEW INBOUND CONNECTION %d/%d', self.connectedPeers.length, self.maxPeers); if (self.connectedPeers.length >= self.maxPeers) { console.log('### PEER REJECTED. PEER MAX LIMIT REACHED'); dataConn.on('open', function() { console.log('### CLOSING CONN FROM:' + dataConn.peer); dataConn.close(); }); } else { self._setupConnectionHandlers(dataConn, true); } }); }; Network.prototype.start = function(openCallback) { // Start PeerJS Peer this.peer = new Peer(this.peerId, { key: this.apiKey, // TODO: we need our own PeerServer KEY (http://peerjs.com/peerserver) debug: this.debug, }); this._setupPeerHandlers(openCallback); }; Network.prototype._sendToOne = function(peerId, data, cb) { if (peerId !== this.peerId) { console.log('[WebRTC.js.222:peerId:]',peerId, data); //TODO var conns = this.peer.connections[peerId]; if (conns) { var str = JSON.stringify({ sender: this.peerId, data: data }); for (var i = 0; i < conns.length; i++) { var conn = conns[i]; conn.send(str); } } } if (typeof cb === 'function') cb(); }; Network.prototype.send = function(peerIds, data, cb) { var self=this; console.log('[WebRTC.js.242:peerIds:]',peerIds); //TODO if (!peerIds) { peerIds = this.connectedPeers; data.isBroadcast = 1; } console.log('[WebRTC.js.246:peerIds:]',peerIds, data); //TODO if (Array.isArray(peerIds)) { var l = peerIds.length; var i = 0; peerIds.forEach(function(peerId) { self._sendToOne(peerId, data, function () { if (++i === l && typeof cb === 'function') cb(); }); }); } else if (typeof peerIds === 'string') self._sendToOne(peerIds, data, cb); }; Network.prototype.connectTo = function(peerId, openCallback, closeCallback ) { var self = this; console.log('### STARTING TO CONNECT TO:' + peerId ); var dataConn = this.peer.connect(peerId, { // label: 'wallet', serialization: 'none', reliable: true, metadata: { message: 'hi copayer!' } }); self._setupConnectionHandlers(dataConn, false, openCallback, closeCallback); }; Network.prototype.disconnect = function(peerId, cb) { console.log('[Network.js.268:disconnect:]'); //TODO var self = this; self.closing = 1; this.send(null, { type: 'disconnect' }, function() { console.log('[Network.js.273] disconnect CB'); //TODO self.connectedPeers = []; self.peerId = null; if (self.peer) { self.peer.disconnect(); self.peer.destroy(); self.peer = null; } self.closing = 0; if (typeof cb === 'function') cb(); }); }; module.exports = require('soop')(Network);