diff --git a/lib/bitcoind.js b/lib/bitcoind.js index d69f63d7..bddb7c14 100644 --- a/lib/bitcoind.js +++ b/lib/bitcoind.js @@ -138,14 +138,33 @@ Bitcoin.prototype.start = function(options, callback) { this.log_pipe = bitcoindjs.start(options, function(err, status) { self._started = true; + // Poll for queued packets setInterval(function() { var packets = bitcoindjs.hookPackets(); - if (!packets.length) return; + + if (!packets) { + if (self.debug) { + bitcoind.error('Error polling packet queue.'); + } + return; + } + + if (!packets.length) { + return; + } + self.emit('packets', packets); + packets.forEach(function(packet) { setImmediate(function() { self.emit('packet:' + packet.name, packet); self.emit('packet', packet); + // if (packet.name === 'block' || packet.name === 'tx') { + // self.emit(packet.name, packet.block || packet.tx); + // if (packet.tx && !packet.tx.blockhash) { + // self.emit('mptx', packet.tx); + // } + // } }); }); }, 50); diff --git a/src/bitcoindjs.cc b/src/bitcoindjs.cc index 9072b87b..258127ac 100644 --- a/src/bitcoindjs.cc +++ b/src/bitcoindjs.cc @@ -3393,60 +3393,61 @@ NAN_METHOD(HookPackets) { poll_packets_mutex.lock(); + for (cur = packets_queue_head; cur; cur = next) { + CNode *pfrom = cur->pfrom; std::string strCommand(cur->strCommand); + CDataStream vRecv = *cur->vRecv; + int64_t nTimeReceived = cur->nTimeReceived; Local o = NanNew(); - o->Set(NanNew("name"), NanNew(cur->strCommand)); - o->Set(NanNew("received"), NanNew((int64_t)cur->nTimeReceived)); - o->Set(NanNew("peerId"), NanNew(cur->pfrom->id)); - //o->Set(NanNew("peerId"), NanNew(cur->pfrom->GetId())); - o->Set(NanNew("agent"), NanNew(cur->pfrom->cleanSubVer.c_str())); + o->Set(NanNew("name"), NanNew(strCommand.c_str())); + o->Set(NanNew("received"), NanNew((int64_t)nTimeReceived)); + o->Set(NanNew("peerId"), NanNew(pfrom->id)); + // o->Set(NanNew("peerId"), NanNew(pfrom->GetId())); + o->Set(NanNew("userAgent"), + NanNew(pfrom->cleanSubVer.c_str())); if (strCommand == "version") { // Each connection can only send one version message - if (cur->pfrom->nVersion != 0) { - // reject - // return false; - NanReturnValue(obj); + if (pfrom->nVersion != 0) { + NanReturnValue(Undefined()); } bool fRelayTxes = false; int nStartingHeight = 0; int cleanSubVer = 0; - //std::string strSubVer(strdup(cur->pfrom->strSubVer.c_str())); - std::string strSubVer = cur->pfrom->strSubVer; - int nVersion = cur->pfrom->nVersion; - uint64_t nServices = cur->pfrom->nServices; + //std::string strSubVer(strdup(pfrom->strSubVer.c_str())); + std::string strSubVer = pfrom->strSubVer; + int nVersion = pfrom->nVersion; + uint64_t nServices = pfrom->nServices; int64_t nTime; CAddress addrMe; CAddress addrFrom; uint64_t nNonce = 1; - *cur->vRecv >> nVersion >> nServices >> nTime >> addrMe; - if (cur->pfrom->nVersion < MIN_PEER_PROTO_VERSION) { + vRecv >> nVersion >> nServices >> nTime >> addrMe; + if (pfrom->nVersion < MIN_PEER_PROTO_VERSION) { // disconnect from peers older than this proto version - // reject - // return false; - NanReturnValue(obj); + NanReturnValue(Undefined()); } if (nVersion == 10300) { nVersion = 300; } - if (!cur->vRecv->empty()) { - *cur->vRecv >> addrFrom >> nNonce; + if (!vRecv.empty()) { + vRecv >> addrFrom >> nNonce; } - if (!cur->vRecv->empty()) { - *cur->vRecv >> LIMITED_STRING(strSubVer, 256); + if (!vRecv.empty()) { + vRecv >> LIMITED_STRING(strSubVer, 256); //cleanSubVer = SanitizeString(strSubVer); cleanSubVer = atoi(strSubVer.c_str()); } - if (!cur->vRecv->empty()) { - *cur->vRecv >> nStartingHeight; + if (!vRecv.empty()) { + vRecv >> nStartingHeight; } - if (!cur->vRecv->empty()) { + if (!vRecv.empty()) { fRelayTxes = false; } else { fRelayTxes = true; @@ -3454,7 +3455,6 @@ NAN_METHOD(HookPackets) { // Disconnect if we connected to ourself if (nNonce == nLocalHostNonce && nNonce > 1) { - // return true; NanReturnValue(obj); } @@ -3462,28 +3462,25 @@ NAN_METHOD(HookPackets) { o->Set(NanNew("version"), NanNew(nVersion)); o->Set(NanNew("height"), NanNew(nStartingHeight)); o->Set(NanNew("us"), NanNew(addrMe.ToString())); - o->Set(NanNew("address"), NanNew(cur->pfrom->addr.ToString())); + o->Set(NanNew("address"), NanNew(pfrom->addr.ToString())); o->Set(NanNew("relay"), NanNew(fRelayTxes)); - } else if (cur->pfrom->nVersion == 0) { + } else if (pfrom->nVersion == 0) { // Must have a version message before anything else - // return false; - NanReturnValue(obj); + NanReturnValue(Undefined()); } else if (strCommand == "verack") { - o->Set(NanNew("receiveVersion"), NanNew(min(cur->pfrom->nVersion, PROTOCOL_VERSION))); + o->Set(NanNew("receiveVersion"), NanNew(min(pfrom->nVersion, PROTOCOL_VERSION))); } else if (strCommand == "addr") { vector vAddr; - *cur->vRecv >> vAddr; + vRecv >> vAddr; // Don't want addr from older versions unless seeding - if (cur->pfrom->nVersion < CADDR_TIME_VERSION && addrman.size() > 1000) { - // return true; + if (pfrom->nVersion < CADDR_TIME_VERSION && addrman.size() > 1000) { NanReturnValue(obj); } // Bad address size if (vAddr.size() > 1000) { - // return false; // ? - NanReturnValue(obj); + NanReturnValue(Undefined()); } Local array = NanNew(); @@ -3522,12 +3519,11 @@ NAN_METHOD(HookPackets) { o->Set(NanNew("addresses"), array); } else if (strCommand == "inv") { vector vInv; - *cur->vRecv >> vInv; + vRecv >> vInv; // Bad size if (vInv.size() > MAX_INV_SZ) { - // return false; - NanReturnValue(obj); + NanReturnValue(Undefined()); } LOCK(cs_main); @@ -3543,9 +3539,8 @@ NAN_METHOD(HookPackets) { //bool fAlreadyHave = AlreadyHave(inv); // Bad size - if (cur->pfrom->nSendSize > (SendBufferSize() * 2)) { - // return false; - NanReturnValue(obj); + if (pfrom->nSendSize > (SendBufferSize() * 2)) { + NanReturnValue(Undefined()); } Local item = NanNew(); @@ -3567,12 +3562,11 @@ NAN_METHOD(HookPackets) { o->Set(NanNew("items"), array); } else if (strCommand == "getdata") { vector vInv; - *cur->vRecv >> vInv; + vRecv >> vInv; // Bad size if (vInv.size() > MAX_INV_SZ) { - // return false; - NanReturnValue(obj); + NanReturnValue(Undefined()); } o->Set(NanNew("size"), NanNew(vInv.size())); @@ -3582,7 +3576,7 @@ NAN_METHOD(HookPackets) { } else if (strCommand == "getblocks") { CBlockLocator locator; uint256 hashStop; - *cur->vRecv >> locator >> hashStop; + vRecv >> locator >> hashStop; LOCK(cs_main); @@ -3601,7 +3595,7 @@ NAN_METHOD(HookPackets) { } else if (strCommand == "getheaders") { CBlockLocator locator; uint256 hashStop; - *cur->vRecv >> locator >> hashStop; + vRecv >> locator >> hashStop; LOCK(cs_main); @@ -3610,7 +3604,6 @@ NAN_METHOD(HookPackets) { // If locator is null, return the hashStop block BlockMap::iterator mi = mapBlockIndex.find(hashStop); if (mi == mapBlockIndex.end()) { - // return true; NanReturnValue(obj); } pindex = (*mi).second; @@ -3627,14 +3620,14 @@ NAN_METHOD(HookPackets) { } else if (strCommand == "tx") { // XXX Potentially check for "reject" in original code CTransaction tx; - *cur->vRecv >> tx; + vRecv >> tx; Local jstx = NanNew(); ctx_to_jstx(tx, 0, jstx); // ctx_to_jstx(tx, 0, o); o->Set(NanNew("tx"), jstx); } else if (strCommand == "block" && !fImporting && !fReindex) { CBlock block; - *cur->vRecv >> block; + vRecv >> block; Local jsblock = NanNew(); cblock_to_jsblock(block, NULL, jsblock); // cblock_to_jsblock(block, NULL, o); @@ -3644,9 +3637,9 @@ NAN_METHOD(HookPackets) { } else if (strCommand == "mempool") { ; // not much other information in getaddr as long as we know we got a getaddr } else if (strCommand == "ping") { - if (cur->pfrom->nVersion > BIP0031_VERSION) { + if (pfrom->nVersion > BIP0031_VERSION) { uint64_t nonce = 0; - *cur->vRecv >> nonce; + vRecv >> nonce; char sNonce[21] = {0}; int written = snprintf(sNonce, sizeof(sNonce), "%020lu", (uint64_t)nonce); assert(written == 20); @@ -3658,21 +3651,21 @@ NAN_METHOD(HookPackets) { o->Set(NanNew("nonce"), NanNew(sNonce)); } } else if (strCommand == "pong") { - int64_t pingUsecEnd = cur->nTimeReceived; + int64_t pingUsecEnd = nTimeReceived; uint64_t nonce = 0; - size_t nAvail = cur->vRecv->in_avail(); + size_t nAvail = vRecv.in_avail(); bool bPingFinished = false; std::string sProblem; if (nAvail >= sizeof(nonce)) { - *cur->vRecv >> nonce; + vRecv >> nonce; // Only process pong message if there is an outstanding ping (old ping without nonce should never pong) - if (cur->pfrom->nPingNonceSent != 0) { - if (nonce == cur->pfrom->nPingNonceSent) { + if (pfrom->nPingNonceSent != 0) { + if (nonce == pfrom->nPingNonceSent) { // Matching pong received, this ping is no longer outstanding bPingFinished = true; - int64_t pingUsecTime = pingUsecEnd - cur->pfrom->nPingUsecStart; + int64_t pingUsecTime = pingUsecEnd - pfrom->nPingUsecStart; if (pingUsecTime > 0) { // Successful ping time measurement, replace previous ; @@ -3703,7 +3696,7 @@ NAN_METHOD(HookPackets) { assert(written == 20); char sPingNonceSent[21] = {0}; - written = snprintf(sPingNonceSent, sizeof(sPingNonceSent), "%020lu", (uint64_t)cur->pfrom->nPingNonceSent); + written = snprintf(sPingNonceSent, sizeof(sPingNonceSent), "%020lu", (uint64_t)pfrom->nPingNonceSent); assert(written == 20); o->Set(NanNew("expected"), NanNew(sPingNonceSent)); @@ -3721,13 +3714,13 @@ NAN_METHOD(HookPackets) { } } else if (strCommand == "alert") { CAlert alert; - *cur->vRecv >> alert; + vRecv >> alert; uint256 alertHash = alert.GetHash(); o->Set(NanNew("hash"), NanNew(alertHash.GetHex().c_str())); - if (cur->pfrom->setKnown.count(alertHash) == 0) { + if (pfrom->setKnown.count(alertHash) == 0) { if (alert.ProcessAlert()) { std::string vchMsg(alert.vchMsg.begin(), alert.vchMsg.end()); std::string vchSig(alert.vchSig.begin(), alert.vchSig.end()); @@ -3746,13 +3739,13 @@ NAN_METHOD(HookPackets) { } } else if (strCommand == "filterload") { CBloomFilter filter; - *cur->vRecv >> filter; + vRecv >> filter; if (!filter.IsWithinSizeConstraints()) { // There is no excuse for sending a too-large filter o->Set(NanNew("misbehaving"), NanNew(true)); } else { - LOCK(cur->pfrom->cs_filter); + LOCK(pfrom->cs_filter); filter.UpdateEmptyFull(); //std::string svData(filter.vData.begin(), filter.vData.end()); @@ -3776,15 +3769,15 @@ NAN_METHOD(HookPackets) { } } else if (strCommand == "filteradd") { vector vData; - *cur->vRecv >> vData; + vRecv >> vData; // Nodes must NEVER send a data item > 520 bytes (the max size for a script data object, // and thus, the maximum size any matched object can have) in a filteradd message if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) { o->Set(NanNew("misbehaving"), NanNew(true)); } else { - LOCK(cur->pfrom->cs_filter); - if (cur->pfrom->pfilter) { + LOCK(pfrom->cs_filter); + if (pfrom->pfilter) { //std::string svData(vData.begin(), vData.end()); //char *cvData = svData.c_str(); //int vDataHexLen = sizeof(char) * (strlen(cvData) * 2) + 1; @@ -3811,7 +3804,7 @@ NAN_METHOD(HookPackets) { } // Update the last seen time for this node's address - if (cur->pfrom->fNetworkNode) { + if (pfrom->fNetworkNode) { if (strCommand == "version" || strCommand == "addr" || strCommand == "inv" @@ -3833,10 +3826,9 @@ NAN_METHOD(HookPackets) { } next = cur->next; - // XXX Figure out what to do here: - // delete cur->pfrom; // cleaned up elsewhere? C++ I DON'T UNDERSTAND YOU + // delete cur->pfrom; // cleaned up on disconnect free(cur->strCommand); - // delete cur->vRecv; // cleaned up elsewhere? + delete cur->vRecv; free(cur); } @@ -3944,73 +3936,10 @@ process_packet(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTim packets_queue_tail = cur; } - //cur->pfrom = pfrom; - - // XXX Bad - sends version again and creates a new global peer - // XXX Figure out what to do here - CNode *pfrom_ = new CNode(pfrom->hSocket, pfrom->addr, pfrom->addrName, pfrom->fInbound); - - // copy properties here: - pfrom_->nServices = pfrom->nServices; - //pfrom_->hSocket = pfrom->hSocket; - pfrom_->ssSend = pfrom->ssSend; - pfrom_->nSendSize = pfrom->nSendSize; - pfrom_->nSendOffset = pfrom->nSendOffset; - pfrom_->nSendBytes = pfrom->nSendBytes; - pfrom_->vSendMsg = pfrom->vSendMsg; - //pfrom_->cs_vSend = pfrom->cs_vSend; - pfrom_->vRecvGetData = pfrom->vRecvGetData; - pfrom_->vRecvMsg = pfrom->vRecvMsg; - //pfrom_->cs_vRecvMsg = pfrom->cs_vRecvMsg; - pfrom_->nRecvBytes = pfrom->nRecvBytes; - pfrom_->nRecvVersion = pfrom->nRecvVersion; - pfrom_->nLastSend = pfrom->nLastSend; - pfrom_->nLastRecv = pfrom->nLastRecv; - pfrom_->nTimeConnected = pfrom->nTimeConnected; - //pfrom_->addr = pfrom->addr; - //pfrom_->addrName = pfrom->addrName; - pfrom_->addrLocal = pfrom->addrLocal; - pfrom_->nVersion = pfrom->nVersion; - pfrom_->strSubVer = pfrom->strSubVer; - pfrom_->cleanSubVer = pfrom->cleanSubVer; - pfrom_->fWhitelisted = pfrom->fWhitelisted; - pfrom_->fOneShot = pfrom->fOneShot; - pfrom_->fClient = pfrom->fClient; - //pfrom_->fInbound = pfrom->fInbound; - pfrom_->fNetworkNode = pfrom->fNetworkNode; - pfrom_->fSuccessfullyConnected = pfrom->fSuccessfullyConnected; - pfrom_->fDisconnect = pfrom->fDisconnect; - pfrom_->fRelayTxes = pfrom->fRelayTxes; - pfrom_->grantOutbound = pfrom->grantOutbound; - //pfrom_->cs_filter = pfrom->cs_filter; - pfrom_->pfilter = pfrom->pfilter; - pfrom_->nRefCount = pfrom->nRefCount; - pfrom_->id = pfrom->id; - pfrom_->hashContinue = pfrom->hashContinue; - pfrom_->pindexLastGetBlocksBegin = pfrom->pindexLastGetBlocksBegin; - pfrom_->hashLastGetBlocksEnd = pfrom->hashLastGetBlocksEnd; - pfrom_->nStartingHeight = pfrom->nStartingHeight; - pfrom_->fStartSync = pfrom->fStartSync; - pfrom_->vAddrToSend = pfrom->vAddrToSend; - pfrom_->setAddrKnown = pfrom->setAddrKnown; - pfrom_->fGetAddr = pfrom->fGetAddr; - pfrom_->setKnown = pfrom->setKnown; - pfrom_->setInventoryKnown = pfrom->setInventoryKnown; - pfrom_->vInventoryToSend = pfrom->vInventoryToSend; - //pfrom_->cs_inventory = pfrom->cs_inventory; - pfrom_->mapAskFor = pfrom->mapAskFor; - pfrom_->nPingNonceSent = pfrom->nPingNonceSent; - pfrom_->nPingUsecStart = pfrom->nPingUsecStart; - pfrom_->nPingUsecTime = pfrom->nPingUsecTime; - pfrom_->fPingQueued = pfrom->fPingQueued; - - cur->pfrom = pfrom_; - - //cur->vRecv = &vRecv; - + cur->pfrom = pfrom; + // NOTE: Copy the data stream. CDataStream *vRecv_ = new CDataStream(vRecv.begin(), vRecv.end(), vRecv.GetType(), vRecv.GetVersion()); cur->vRecv = vRecv_; - cur->nTimeReceived = nTimeReceived; cur->strCommand = strdup(strCommand.c_str()); cur->next = NULL;