From 19a7c07c609a536c72b4f30d886e0849bcd22b5e Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Tue, 21 Oct 2014 18:06:17 -0700 Subject: [PATCH] first working packet listener. --- lib/bitcoind.js | 5 + src/bitcoindjs.cc | 258 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 260 insertions(+), 3 deletions(-) diff --git a/lib/bitcoind.js b/lib/bitcoind.js index 1c52411d..8a7c13d8 100644 --- a/lib/bitcoind.js +++ b/lib/bitcoind.js @@ -138,6 +138,11 @@ Bitcoin.prototype.start = function(options, callback) { this.log_pipe = bitcoindjs.start(options, function(err, status) { self._started = true; + setInterval(function() { + var packets = bitcoindjs.hookPackets(); + console.log(packets); + }, 50); + [sigint, sighup, sigquit].forEach(function(signal) { process.on(signal.name, signal.listener = function() { if (process.listeners(signal.name).length > 1) { diff --git a/src/bitcoindjs.cc b/src/bitcoindjs.cc index bda4c51e..5f5b33f1 100644 --- a/src/bitcoindjs.cc +++ b/src/bitcoindjs.cc @@ -195,6 +195,7 @@ NAN_METHOD(WalletLock); NAN_METHOD(WalletEncrypt); NAN_METHOD(WalletSetTxFee); NAN_METHOD(WalletImportKey); +NAN_METHOD(HookPackets); /** * Node.js Internal Function Templates @@ -278,6 +279,21 @@ jsblock_to_cblock(const Local jsblock, CBlock& cblock); static inline void jstx_to_ctx(const Local jstx, CTransaction& ctx); +static void +hook_packets(CNodeSignals& nodeSignals); + +static void +unhook_packets(CNodeSignals& nodeSignals); + +static bool +process_packets(CNode* pfrom); + +static bool +process_packet(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived); + +bool static +process_getdata(CNode* pfrom); + extern "C" void init(Handle); @@ -311,6 +327,18 @@ struct async_node_data { Persistent callback; }; +/** + * async_hook_data + * Hook into bitcoind packet reception. + */ + +struct async_hook_data { + std::string err_msg; + std::string result; + CNode result_pfrom; + Persistent callback; +}; + /** * async_block_data */ @@ -569,6 +597,9 @@ start_node(void) { signal(SIGHUP, SIG_DFL); signal(SIGQUIT, SIG_DFL); + // Hook into packet handling + hook_packets(GetNodeSignals()); + return 0; } @@ -585,10 +616,10 @@ start_node_thread(void) { argc++; if (g_data_dir) { - const int argl = 9 + strlen(g_data_dir); - char *arg = (char *)malloc(sizeof(char) * (argl + 1)); + const int argl = 9 + strlen(g_data_dir) + 1; + char *arg = (char *)malloc(sizeof(char) * argl); int w = snprintf(arg, argl, "-datadir=%s", g_data_dir); - if (w > 9 && w <= argl) { + if (w >= 10 && w <= argl) { arg[w] = '\0'; argv[argc] = arg; argc++; @@ -715,6 +746,7 @@ NAN_METHOD(StopBitcoind) { static void async_stop_node(uv_work_t *req) { async_node_data *data = static_cast(req->data); + unhook_packets(GetNodeSignals()); StartShutdown(); data->result = std::string("stop_node(): bitcoind shutdown."); } @@ -3327,6 +3359,224 @@ jstx_to_ctx(const Local jstx, CTransaction& ctx_) { ctx.nLockTime = (unsigned int)jstx->Get(NanNew("locktime"))->Uint32Value(); } +/** + * Linked List for queued packets + */ + +typedef struct _poll_packets_list { + //CNode *pfrom; + //std::string strCommand; + //CDataStream& vRec; + //int64_t nTimeReceived; + char *strCommand; + struct _poll_packets_list *next; +} poll_packets_list; + +poll_packets_list *packets_queue_head = NULL; +poll_packets_list *packets_queue_tail = NULL; +boost::mutex poll_packets_mutex; + +/** + * HookPackets() + * bitcoind.hookPackets(callback) + * NOTE: THIS NEEDS A MUTEX XXXXXXXXXX + */ + +NAN_METHOD(HookPackets) { + NanScope(); + + Local obj = NanNew(); + poll_packets_list *cur = NULL; + poll_packets_list *next = NULL; + int i = 0; + + poll_packets_mutex.lock(); + + for (cur = packets_queue_head; cur; cur = next) { + //obj->Set(i, NanNew(cur->strCommand.c_str())); + obj->Set(i, NanNew(cur->strCommand)); + i++; + if (cur == packets_queue_head) { + packets_queue_head = NULL; + } + if (cur == packets_queue_tail) { + packets_queue_tail = NULL; + } + next = cur->next; + //delete cur->pfrom; + //delete cur->strCommand; + //delete cur; + free(cur->strCommand); + free(cur); + } + + poll_packets_mutex.unlock(); + + NanReturnValue(obj); +} + +static void +hook_packets(CNodeSignals& nodeSignals) { + nodeSignals.ProcessMessages.connect(&process_packets); +} + +static void +unhook_packets(CNodeSignals& nodeSignals) { + nodeSignals.ProcessMessages.disconnect(&process_packets); +} + +static bool +process_packets(CNode* pfrom) { + bool fOk = true; + + if (!pfrom->vRecvGetData.empty()) { + return process_getdata(pfrom); + } + + std::deque::iterator it = pfrom->vRecvMsg.begin(); + while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { + // Don't bother if send buffer is too full to respond anyway + if (pfrom->nSendSize >= SendBufferSize()) { + break; + } + + // get next message + CNetMessage& msg = *it; + + // end, if an incomplete message is found + if (!msg.complete()) { + break; + } + + // at this point, any failure means we can delete the current message + it++; + + // Scan for message start + if (memcmp(msg.hdr.pchMessageStart, Params().MessageStart(), MESSAGE_START_SIZE) != 0) { + LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", msg.hdr.GetCommand(), pfrom->id); + fOk = false; + break; + } + + // Read header + CMessageHeader& hdr = msg.hdr; + if (!hdr.IsValid()) { + LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", hdr.GetCommand(), pfrom->id); + continue; + } + string strCommand = hdr.GetCommand(); + + // Message size + unsigned int nMessageSize = hdr.nMessageSize; + + // Checksum + CDataStream& vRecv = msg.vRecv; + uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize); + unsigned int nChecksum = 0; + memcpy(&nChecksum, &hash, sizeof(nChecksum)); + if (nChecksum != hdr.nChecksum) { + LogPrintf("ProcessMessages(%s, %u bytes) : CHECKSUM ERROR nChecksum=%08x hdr.nChecksum=%08x\n", + strCommand, nMessageSize, nChecksum, hdr.nChecksum); + continue; + } + + // Process message + bool fRet = false; + fRet = process_packet(pfrom, strCommand, vRecv, msg.nTime); + boost::this_thread::interruption_point(); + + break; + } + + return fOk; +} + +static bool +process_packet(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived) { + // RandAddSeedPerfmon(); + + // { + // LOCK(cs_main); + // State(pfrom->GetId())->nLastBlockProcess = GetTimeMicros(); + // } + + // printf("BITCOIND.JS: ------- %s ------\n", strCommand.c_str()); + + // if (cb_hooked) { + // const unsigned argc = 2; + // Local argv[argc] = { + // Local::New(Null()), + // Local::New(String::New(strCommand.c_str())) + // }; + // TryCatch try_catch; + // hook_cb->Call(Context::GetCurrent()->Global(), argc, argv); + // if (try_catch.HasCaught()) { + // node::FatalException(try_catch); + // } + // } + + poll_packets_mutex.lock(); + + //poll_packets_list *cur = new poll_packets_list(); + poll_packets_list *cur = (poll_packets_list *)malloc(sizeof(poll_packets_list)); + if (!packets_queue_head) { + packets_queue_head = cur; + packets_queue_tail = cur; + } else { + packets_queue_tail->next = cur; + packets_queue_tail = cur; + } + + //cur->pfrom = pfrom; + //cur->strCommand = strCommand; + //cur->vRec = vRecv; + //cur->nTimeReceived = nTimeReceived; + cur->strCommand = strdup(strCommand.c_str()); + cur->next = NULL; + + poll_packets_mutex.unlock(); + + if (strCommand == "version") { + } else if (pfrom->nVersion == 0) { + } else if (strCommand == "verack") { + } else if (strCommand == "addr") { + } else if (strCommand == "inv") { + } else if (strCommand == "getdata") { + } else if (strCommand == "getblocks") { + } else if (strCommand == "getheaders") { + } else if (strCommand == "tx") { + } else if (strCommand == "block" && !fImporting && !fReindex) { // Ignore blocks received while importing + } else if (strCommand == "getaddr") { + } else if (strCommand == "mempool") { + } else if (strCommand == "ping") { + } else if (strCommand == "pong") { + } else if (strCommand == "alert") { + } else if (strCommand == "filterload") { + } else if (strCommand == "filteradd") { + } else if (strCommand == "filterclear") { + } else if (strCommand == "reject") { + } else { + } + + // Update the last seen time for this node's address + if (pfrom->fNetworkNode) { + if (strCommand == "version" + || strCommand == "addr" + || strCommand == "inv" + || strCommand == "getdata" + || strCommand == "ping") { + ; + } + } + + return true; +} + +bool static +process_getdata(CNode* pfrom) { + return true; +} + /** * Init() * Initialize the singleton object known as bitcoindjs. @@ -3377,6 +3627,8 @@ init(Handle target) { NODE_SET_METHOD(target, "walletEncrypt", WalletEncrypt); NODE_SET_METHOD(target, "walletSetTxFee", WalletSetTxFee); NODE_SET_METHOD(target, "walletImportKey", WalletImportKey); + + NODE_SET_METHOD(target, "hookPackets", HookPackets); } NODE_MODULE(bitcoindjs, init)