improve packet polling.

This commit is contained in:
Christopher Jeffrey 2014-10-23 11:46:42 -07:00
parent 639463bb1b
commit 7190fc21aa
2 changed files with 84 additions and 136 deletions

View File

@ -138,14 +138,33 @@ Bitcoin.prototype.start = function(options, callback) {
this.log_pipe = bitcoindjs.start(options, function(err, status) {
self._started = true;
// Poll for queued packets
setInterval(function() {
var packets = bitcoindjs.hookPackets();
if (!packets.length) return;
if (!packets) {
if (self.debug) {
bitcoind.error('Error polling packet queue.');
}
return;
}
if (!packets.length) {
return;
}
self.emit('packets', packets);
packets.forEach(function(packet) {
setImmediate(function() {
self.emit('packet:' + packet.name, packet);
self.emit('packet', packet);
// if (packet.name === 'block' || packet.name === 'tx') {
// self.emit(packet.name, packet.block || packet.tx);
// if (packet.tx && !packet.tx.blockhash) {
// self.emit('mptx', packet.tx);
// }
// }
});
});
}, 50);

View File

@ -3393,60 +3393,61 @@ NAN_METHOD(HookPackets) {
poll_packets_mutex.lock();
for (cur = packets_queue_head; cur; cur = next) {
CNode *pfrom = cur->pfrom;
std::string strCommand(cur->strCommand);
CDataStream vRecv = *cur->vRecv;
int64_t nTimeReceived = cur->nTimeReceived;
Local<Object> o = NanNew<Object>();
o->Set(NanNew<String>("name"), NanNew<String>(cur->strCommand));
o->Set(NanNew<String>("received"), NanNew<Number>((int64_t)cur->nTimeReceived));
o->Set(NanNew<String>("peerId"), NanNew<Number>(cur->pfrom->id));
//o->Set(NanNew<String>("peerId"), NanNew<Number>(cur->pfrom->GetId()));
o->Set(NanNew<String>("agent"), NanNew<String>(cur->pfrom->cleanSubVer.c_str()));
o->Set(NanNew<String>("name"), NanNew<String>(strCommand.c_str()));
o->Set(NanNew<String>("received"), NanNew<Number>((int64_t)nTimeReceived));
o->Set(NanNew<String>("peerId"), NanNew<Number>(pfrom->id));
// o->Set(NanNew<String>("peerId"), NanNew<Number>(pfrom->GetId()));
o->Set(NanNew<String>("userAgent"),
NanNew<String>(pfrom->cleanSubVer.c_str()));
if (strCommand == "version") {
// Each connection can only send one version message
if (cur->pfrom->nVersion != 0) {
// reject
// return false;
NanReturnValue(obj);
if (pfrom->nVersion != 0) {
NanReturnValue(Undefined());
}
bool fRelayTxes = false;
int nStartingHeight = 0;
int cleanSubVer = 0;
//std::string strSubVer(strdup(cur->pfrom->strSubVer.c_str()));
std::string strSubVer = cur->pfrom->strSubVer;
int nVersion = cur->pfrom->nVersion;
uint64_t nServices = cur->pfrom->nServices;
//std::string strSubVer(strdup(pfrom->strSubVer.c_str()));
std::string strSubVer = pfrom->strSubVer;
int nVersion = pfrom->nVersion;
uint64_t nServices = pfrom->nServices;
int64_t nTime;
CAddress addrMe;
CAddress addrFrom;
uint64_t nNonce = 1;
*cur->vRecv >> nVersion >> nServices >> nTime >> addrMe;
if (cur->pfrom->nVersion < MIN_PEER_PROTO_VERSION) {
vRecv >> nVersion >> nServices >> nTime >> addrMe;
if (pfrom->nVersion < MIN_PEER_PROTO_VERSION) {
// disconnect from peers older than this proto version
// reject
// return false;
NanReturnValue(obj);
NanReturnValue(Undefined());
}
if (nVersion == 10300) {
nVersion = 300;
}
if (!cur->vRecv->empty()) {
*cur->vRecv >> addrFrom >> nNonce;
if (!vRecv.empty()) {
vRecv >> addrFrom >> nNonce;
}
if (!cur->vRecv->empty()) {
*cur->vRecv >> LIMITED_STRING(strSubVer, 256);
if (!vRecv.empty()) {
vRecv >> LIMITED_STRING(strSubVer, 256);
//cleanSubVer = SanitizeString(strSubVer);
cleanSubVer = atoi(strSubVer.c_str());
}
if (!cur->vRecv->empty()) {
*cur->vRecv >> nStartingHeight;
if (!vRecv.empty()) {
vRecv >> nStartingHeight;
}
if (!cur->vRecv->empty()) {
if (!vRecv.empty()) {
fRelayTxes = false;
} else {
fRelayTxes = true;
@ -3454,7 +3455,6 @@ NAN_METHOD(HookPackets) {
// Disconnect if we connected to ourself
if (nNonce == nLocalHostNonce && nNonce > 1) {
// return true;
NanReturnValue(obj);
}
@ -3462,28 +3462,25 @@ NAN_METHOD(HookPackets) {
o->Set(NanNew<String>("version"), NanNew<Number>(nVersion));
o->Set(NanNew<String>("height"), NanNew<Number>(nStartingHeight));
o->Set(NanNew<String>("us"), NanNew<String>(addrMe.ToString()));
o->Set(NanNew<String>("address"), NanNew<String>(cur->pfrom->addr.ToString()));
o->Set(NanNew<String>("address"), NanNew<String>(pfrom->addr.ToString()));
o->Set(NanNew<String>("relay"), NanNew<Boolean>(fRelayTxes));
} else if (cur->pfrom->nVersion == 0) {
} else if (pfrom->nVersion == 0) {
// Must have a version message before anything else
// return false;
NanReturnValue(obj);
NanReturnValue(Undefined());
} else if (strCommand == "verack") {
o->Set(NanNew<String>("receiveVersion"), NanNew<Number>(min(cur->pfrom->nVersion, PROTOCOL_VERSION)));
o->Set(NanNew<String>("receiveVersion"), NanNew<Number>(min(pfrom->nVersion, PROTOCOL_VERSION)));
} else if (strCommand == "addr") {
vector<CAddress> vAddr;
*cur->vRecv >> vAddr;
vRecv >> vAddr;
// Don't want addr from older versions unless seeding
if (cur->pfrom->nVersion < CADDR_TIME_VERSION && addrman.size() > 1000) {
// return true;
if (pfrom->nVersion < CADDR_TIME_VERSION && addrman.size() > 1000) {
NanReturnValue(obj);
}
// Bad address size
if (vAddr.size() > 1000) {
// return false; // ?
NanReturnValue(obj);
NanReturnValue(Undefined());
}
Local<Array> array = NanNew<Array>();
@ -3522,12 +3519,11 @@ NAN_METHOD(HookPackets) {
o->Set(NanNew<String>("addresses"), array);
} else if (strCommand == "inv") {
vector<CInv> vInv;
*cur->vRecv >> vInv;
vRecv >> vInv;
// Bad size
if (vInv.size() > MAX_INV_SZ) {
// return false;
NanReturnValue(obj);
NanReturnValue(Undefined());
}
LOCK(cs_main);
@ -3543,9 +3539,8 @@ NAN_METHOD(HookPackets) {
//bool fAlreadyHave = AlreadyHave(inv);
// Bad size
if (cur->pfrom->nSendSize > (SendBufferSize() * 2)) {
// return false;
NanReturnValue(obj);
if (pfrom->nSendSize > (SendBufferSize() * 2)) {
NanReturnValue(Undefined());
}
Local<Object> item = NanNew<Object>();
@ -3567,12 +3562,11 @@ NAN_METHOD(HookPackets) {
o->Set(NanNew<String>("items"), array);
} else if (strCommand == "getdata") {
vector<CInv> vInv;
*cur->vRecv >> vInv;
vRecv >> vInv;
// Bad size
if (vInv.size() > MAX_INV_SZ) {
// return false;
NanReturnValue(obj);
NanReturnValue(Undefined());
}
o->Set(NanNew<String>("size"), NanNew<Number>(vInv.size()));
@ -3582,7 +3576,7 @@ NAN_METHOD(HookPackets) {
} else if (strCommand == "getblocks") {
CBlockLocator locator;
uint256 hashStop;
*cur->vRecv >> locator >> hashStop;
vRecv >> locator >> hashStop;
LOCK(cs_main);
@ -3601,7 +3595,7 @@ NAN_METHOD(HookPackets) {
} else if (strCommand == "getheaders") {
CBlockLocator locator;
uint256 hashStop;
*cur->vRecv >> locator >> hashStop;
vRecv >> locator >> hashStop;
LOCK(cs_main);
@ -3610,7 +3604,6 @@ NAN_METHOD(HookPackets) {
// If locator is null, return the hashStop block
BlockMap::iterator mi = mapBlockIndex.find(hashStop);
if (mi == mapBlockIndex.end()) {
// return true;
NanReturnValue(obj);
}
pindex = (*mi).second;
@ -3627,14 +3620,14 @@ NAN_METHOD(HookPackets) {
} else if (strCommand == "tx") {
// XXX Potentially check for "reject" in original code
CTransaction tx;
*cur->vRecv >> tx;
vRecv >> tx;
Local<Object> jstx = NanNew<Object>();
ctx_to_jstx(tx, 0, jstx);
// ctx_to_jstx(tx, 0, o);
o->Set(NanNew<String>("tx"), jstx);
} else if (strCommand == "block" && !fImporting && !fReindex) {
CBlock block;
*cur->vRecv >> block;
vRecv >> block;
Local<Object> jsblock = NanNew<Object>();
cblock_to_jsblock(block, NULL, jsblock);
// cblock_to_jsblock(block, NULL, o);
@ -3644,9 +3637,9 @@ NAN_METHOD(HookPackets) {
} else if (strCommand == "mempool") {
; // not much other information in getaddr as long as we know we got a getaddr
} else if (strCommand == "ping") {
if (cur->pfrom->nVersion > BIP0031_VERSION) {
if (pfrom->nVersion > BIP0031_VERSION) {
uint64_t nonce = 0;
*cur->vRecv >> nonce;
vRecv >> nonce;
char sNonce[21] = {0};
int written = snprintf(sNonce, sizeof(sNonce), "%020lu", (uint64_t)nonce);
assert(written == 20);
@ -3658,21 +3651,21 @@ NAN_METHOD(HookPackets) {
o->Set(NanNew<String>("nonce"), NanNew<String>(sNonce));
}
} else if (strCommand == "pong") {
int64_t pingUsecEnd = cur->nTimeReceived;
int64_t pingUsecEnd = nTimeReceived;
uint64_t nonce = 0;
size_t nAvail = cur->vRecv->in_avail();
size_t nAvail = vRecv.in_avail();
bool bPingFinished = false;
std::string sProblem;
if (nAvail >= sizeof(nonce)) {
*cur->vRecv >> nonce;
vRecv >> nonce;
// Only process pong message if there is an outstanding ping (old ping without nonce should never pong)
if (cur->pfrom->nPingNonceSent != 0) {
if (nonce == cur->pfrom->nPingNonceSent) {
if (pfrom->nPingNonceSent != 0) {
if (nonce == pfrom->nPingNonceSent) {
// Matching pong received, this ping is no longer outstanding
bPingFinished = true;
int64_t pingUsecTime = pingUsecEnd - cur->pfrom->nPingUsecStart;
int64_t pingUsecTime = pingUsecEnd - pfrom->nPingUsecStart;
if (pingUsecTime > 0) {
// Successful ping time measurement, replace previous
;
@ -3703,7 +3696,7 @@ NAN_METHOD(HookPackets) {
assert(written == 20);
char sPingNonceSent[21] = {0};
written = snprintf(sPingNonceSent, sizeof(sPingNonceSent), "%020lu", (uint64_t)cur->pfrom->nPingNonceSent);
written = snprintf(sPingNonceSent, sizeof(sPingNonceSent), "%020lu", (uint64_t)pfrom->nPingNonceSent);
assert(written == 20);
o->Set(NanNew<String>("expected"), NanNew<String>(sPingNonceSent));
@ -3721,13 +3714,13 @@ NAN_METHOD(HookPackets) {
}
} else if (strCommand == "alert") {
CAlert alert;
*cur->vRecv >> alert;
vRecv >> alert;
uint256 alertHash = alert.GetHash();
o->Set(NanNew<String>("hash"), NanNew<String>(alertHash.GetHex().c_str()));
if (cur->pfrom->setKnown.count(alertHash) == 0) {
if (pfrom->setKnown.count(alertHash) == 0) {
if (alert.ProcessAlert()) {
std::string vchMsg(alert.vchMsg.begin(), alert.vchMsg.end());
std::string vchSig(alert.vchSig.begin(), alert.vchSig.end());
@ -3746,13 +3739,13 @@ NAN_METHOD(HookPackets) {
}
} else if (strCommand == "filterload") {
CBloomFilter filter;
*cur->vRecv >> filter;
vRecv >> filter;
if (!filter.IsWithinSizeConstraints()) {
// There is no excuse for sending a too-large filter
o->Set(NanNew<String>("misbehaving"), NanNew<Boolean>(true));
} else {
LOCK(cur->pfrom->cs_filter);
LOCK(pfrom->cs_filter);
filter.UpdateEmptyFull();
//std::string svData(filter.vData.begin(), filter.vData.end());
@ -3776,15 +3769,15 @@ NAN_METHOD(HookPackets) {
}
} else if (strCommand == "filteradd") {
vector<unsigned char> vData;
*cur->vRecv >> vData;
vRecv >> vData;
// Nodes must NEVER send a data item > 520 bytes (the max size for a script data object,
// and thus, the maximum size any matched object can have) in a filteradd message
if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) {
o->Set(NanNew<String>("misbehaving"), NanNew<Boolean>(true));
} else {
LOCK(cur->pfrom->cs_filter);
if (cur->pfrom->pfilter) {
LOCK(pfrom->cs_filter);
if (pfrom->pfilter) {
//std::string svData(vData.begin(), vData.end());
//char *cvData = svData.c_str();
//int vDataHexLen = sizeof(char) * (strlen(cvData) * 2) + 1;
@ -3811,7 +3804,7 @@ NAN_METHOD(HookPackets) {
}
// Update the last seen time for this node's address
if (cur->pfrom->fNetworkNode) {
if (pfrom->fNetworkNode) {
if (strCommand == "version"
|| strCommand == "addr"
|| strCommand == "inv"
@ -3833,10 +3826,9 @@ NAN_METHOD(HookPackets) {
}
next = cur->next;
// XXX Figure out what to do here:
// delete cur->pfrom; // cleaned up elsewhere? C++ I DON'T UNDERSTAND YOU
// delete cur->pfrom; // cleaned up on disconnect
free(cur->strCommand);
// delete cur->vRecv; // cleaned up elsewhere?
delete cur->vRecv;
free(cur);
}
@ -3944,73 +3936,10 @@ process_packet(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTim
packets_queue_tail = cur;
}
//cur->pfrom = pfrom;
// XXX Bad - sends version again and creates a new global peer
// XXX Figure out what to do here
CNode *pfrom_ = new CNode(pfrom->hSocket, pfrom->addr, pfrom->addrName, pfrom->fInbound);
// copy properties here:
pfrom_->nServices = pfrom->nServices;
//pfrom_->hSocket = pfrom->hSocket;
pfrom_->ssSend = pfrom->ssSend;
pfrom_->nSendSize = pfrom->nSendSize;
pfrom_->nSendOffset = pfrom->nSendOffset;
pfrom_->nSendBytes = pfrom->nSendBytes;
pfrom_->vSendMsg = pfrom->vSendMsg;
//pfrom_->cs_vSend = pfrom->cs_vSend;
pfrom_->vRecvGetData = pfrom->vRecvGetData;
pfrom_->vRecvMsg = pfrom->vRecvMsg;
//pfrom_->cs_vRecvMsg = pfrom->cs_vRecvMsg;
pfrom_->nRecvBytes = pfrom->nRecvBytes;
pfrom_->nRecvVersion = pfrom->nRecvVersion;
pfrom_->nLastSend = pfrom->nLastSend;
pfrom_->nLastRecv = pfrom->nLastRecv;
pfrom_->nTimeConnected = pfrom->nTimeConnected;
//pfrom_->addr = pfrom->addr;
//pfrom_->addrName = pfrom->addrName;
pfrom_->addrLocal = pfrom->addrLocal;
pfrom_->nVersion = pfrom->nVersion;
pfrom_->strSubVer = pfrom->strSubVer;
pfrom_->cleanSubVer = pfrom->cleanSubVer;
pfrom_->fWhitelisted = pfrom->fWhitelisted;
pfrom_->fOneShot = pfrom->fOneShot;
pfrom_->fClient = pfrom->fClient;
//pfrom_->fInbound = pfrom->fInbound;
pfrom_->fNetworkNode = pfrom->fNetworkNode;
pfrom_->fSuccessfullyConnected = pfrom->fSuccessfullyConnected;
pfrom_->fDisconnect = pfrom->fDisconnect;
pfrom_->fRelayTxes = pfrom->fRelayTxes;
pfrom_->grantOutbound = pfrom->grantOutbound;
//pfrom_->cs_filter = pfrom->cs_filter;
pfrom_->pfilter = pfrom->pfilter;
pfrom_->nRefCount = pfrom->nRefCount;
pfrom_->id = pfrom->id;
pfrom_->hashContinue = pfrom->hashContinue;
pfrom_->pindexLastGetBlocksBegin = pfrom->pindexLastGetBlocksBegin;
pfrom_->hashLastGetBlocksEnd = pfrom->hashLastGetBlocksEnd;
pfrom_->nStartingHeight = pfrom->nStartingHeight;
pfrom_->fStartSync = pfrom->fStartSync;
pfrom_->vAddrToSend = pfrom->vAddrToSend;
pfrom_->setAddrKnown = pfrom->setAddrKnown;
pfrom_->fGetAddr = pfrom->fGetAddr;
pfrom_->setKnown = pfrom->setKnown;
pfrom_->setInventoryKnown = pfrom->setInventoryKnown;
pfrom_->vInventoryToSend = pfrom->vInventoryToSend;
//pfrom_->cs_inventory = pfrom->cs_inventory;
pfrom_->mapAskFor = pfrom->mapAskFor;
pfrom_->nPingNonceSent = pfrom->nPingNonceSent;
pfrom_->nPingUsecStart = pfrom->nPingUsecStart;
pfrom_->nPingUsecTime = pfrom->nPingUsecTime;
pfrom_->fPingQueued = pfrom->fPingQueued;
cur->pfrom = pfrom_;
//cur->vRecv = &vRecv;
cur->pfrom = pfrom;
// NOTE: Copy the data stream.
CDataStream *vRecv_ = new CDataStream(vRecv.begin(), vRecv.end(), vRecv.GetType(), vRecv.GetVersion());
cur->vRecv = vRecv_;
cur->nTimeReceived = nTimeReceived;
cur->strCommand = strdup(strCommand.c_str());
cur->next = NULL;