first working packet listener.

This commit is contained in:
Christopher Jeffrey 2014-10-21 18:06:17 -07:00
parent f3f0c0b852
commit 19a7c07c60
2 changed files with 260 additions and 3 deletions

View File

@ -138,6 +138,11 @@ Bitcoin.prototype.start = function(options, callback) {
this.log_pipe = bitcoindjs.start(options, function(err, status) {
self._started = true;
setInterval(function() {
var packets = bitcoindjs.hookPackets();
console.log(packets);
}, 50);
[sigint, sighup, sigquit].forEach(function(signal) {
process.on(signal.name, signal.listener = function() {
if (process.listeners(signal.name).length > 1) {

View File

@ -195,6 +195,7 @@ NAN_METHOD(WalletLock);
NAN_METHOD(WalletEncrypt);
NAN_METHOD(WalletSetTxFee);
NAN_METHOD(WalletImportKey);
NAN_METHOD(HookPackets);
/**
* Node.js Internal Function Templates
@ -278,6 +279,21 @@ jsblock_to_cblock(const Local<Object> jsblock, CBlock& cblock);
static inline void
jstx_to_ctx(const Local<Object> jstx, CTransaction& ctx);
static void
hook_packets(CNodeSignals& nodeSignals);
static void
unhook_packets(CNodeSignals& nodeSignals);
static bool
process_packets(CNode* pfrom);
static bool
process_packet(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived);
bool static
process_getdata(CNode* pfrom);
extern "C" void
init(Handle<Object>);
@ -311,6 +327,18 @@ struct async_node_data {
Persistent<Function> callback;
};
/**
* async_hook_data
* Hook into bitcoind packet reception.
*/
struct async_hook_data {
std::string err_msg;
std::string result;
CNode result_pfrom;
Persistent<Function> callback;
};
/**
* async_block_data
*/
@ -569,6 +597,9 @@ start_node(void) {
signal(SIGHUP, SIG_DFL);
signal(SIGQUIT, SIG_DFL);
// Hook into packet handling
hook_packets(GetNodeSignals());
return 0;
}
@ -585,10 +616,10 @@ start_node_thread(void) {
argc++;
if (g_data_dir) {
const int argl = 9 + strlen(g_data_dir);
char *arg = (char *)malloc(sizeof(char) * (argl + 1));
const int argl = 9 + strlen(g_data_dir) + 1;
char *arg = (char *)malloc(sizeof(char) * argl);
int w = snprintf(arg, argl, "-datadir=%s", g_data_dir);
if (w > 9 && w <= argl) {
if (w >= 10 && w <= argl) {
arg[w] = '\0';
argv[argc] = arg;
argc++;
@ -715,6 +746,7 @@ NAN_METHOD(StopBitcoind) {
static void
async_stop_node(uv_work_t *req) {
async_node_data *data = static_cast<async_node_data*>(req->data);
unhook_packets(GetNodeSignals());
StartShutdown();
data->result = std::string("stop_node(): bitcoind shutdown.");
}
@ -3327,6 +3359,224 @@ jstx_to_ctx(const Local<Object> jstx, CTransaction& ctx_) {
ctx.nLockTime = (unsigned int)jstx->Get(NanNew<String>("locktime"))->Uint32Value();
}
/**
* Linked List for queued packets
*/
typedef struct _poll_packets_list {
//CNode *pfrom;
//std::string strCommand;
//CDataStream& vRec;
//int64_t nTimeReceived;
char *strCommand;
struct _poll_packets_list *next;
} poll_packets_list;
poll_packets_list *packets_queue_head = NULL;
poll_packets_list *packets_queue_tail = NULL;
boost::mutex poll_packets_mutex;
/**
* HookPackets()
* bitcoind.hookPackets(callback)
* NOTE: THIS NEEDS A MUTEX XXXXXXXXXX
*/
NAN_METHOD(HookPackets) {
NanScope();
Local<Array> obj = NanNew<Array>();
poll_packets_list *cur = NULL;
poll_packets_list *next = NULL;
int i = 0;
poll_packets_mutex.lock();
for (cur = packets_queue_head; cur; cur = next) {
//obj->Set(i, NanNew<String>(cur->strCommand.c_str()));
obj->Set(i, NanNew<String>(cur->strCommand));
i++;
if (cur == packets_queue_head) {
packets_queue_head = NULL;
}
if (cur == packets_queue_tail) {
packets_queue_tail = NULL;
}
next = cur->next;
//delete cur->pfrom;
//delete cur->strCommand;
//delete cur;
free(cur->strCommand);
free(cur);
}
poll_packets_mutex.unlock();
NanReturnValue(obj);
}
static void
hook_packets(CNodeSignals& nodeSignals) {
nodeSignals.ProcessMessages.connect(&process_packets);
}
static void
unhook_packets(CNodeSignals& nodeSignals) {
nodeSignals.ProcessMessages.disconnect(&process_packets);
}
static bool
process_packets(CNode* pfrom) {
bool fOk = true;
if (!pfrom->vRecvGetData.empty()) {
return process_getdata(pfrom);
}
std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway
if (pfrom->nSendSize >= SendBufferSize()) {
break;
}
// get next message
CNetMessage& msg = *it;
// end, if an incomplete message is found
if (!msg.complete()) {
break;
}
// at this point, any failure means we can delete the current message
it++;
// Scan for message start
if (memcmp(msg.hdr.pchMessageStart, Params().MessageStart(), MESSAGE_START_SIZE) != 0) {
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", msg.hdr.GetCommand(), pfrom->id);
fOk = false;
break;
}
// Read header
CMessageHeader& hdr = msg.hdr;
if (!hdr.IsValid()) {
LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", hdr.GetCommand(), pfrom->id);
continue;
}
string strCommand = hdr.GetCommand();
// Message size
unsigned int nMessageSize = hdr.nMessageSize;
// Checksum
CDataStream& vRecv = msg.vRecv;
uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize);
unsigned int nChecksum = 0;
memcpy(&nChecksum, &hash, sizeof(nChecksum));
if (nChecksum != hdr.nChecksum) {
LogPrintf("ProcessMessages(%s, %u bytes) : CHECKSUM ERROR nChecksum=%08x hdr.nChecksum=%08x\n",
strCommand, nMessageSize, nChecksum, hdr.nChecksum);
continue;
}
// Process message
bool fRet = false;
fRet = process_packet(pfrom, strCommand, vRecv, msg.nTime);
boost::this_thread::interruption_point();
break;
}
return fOk;
}
static bool
process_packet(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived) {
// RandAddSeedPerfmon();
// {
// LOCK(cs_main);
// State(pfrom->GetId())->nLastBlockProcess = GetTimeMicros();
// }
// printf("BITCOIND.JS: ------- %s ------\n", strCommand.c_str());
// if (cb_hooked) {
// const unsigned argc = 2;
// Local<Value> argv[argc] = {
// Local<Value>::New(Null()),
// Local<Value>::New(String::New(strCommand.c_str()))
// };
// TryCatch try_catch;
// hook_cb->Call(Context::GetCurrent()->Global(), argc, argv);
// if (try_catch.HasCaught()) {
// node::FatalException(try_catch);
// }
// }
poll_packets_mutex.lock();
//poll_packets_list *cur = new poll_packets_list();
poll_packets_list *cur = (poll_packets_list *)malloc(sizeof(poll_packets_list));
if (!packets_queue_head) {
packets_queue_head = cur;
packets_queue_tail = cur;
} else {
packets_queue_tail->next = cur;
packets_queue_tail = cur;
}
//cur->pfrom = pfrom;
//cur->strCommand = strCommand;
//cur->vRec = vRecv;
//cur->nTimeReceived = nTimeReceived;
cur->strCommand = strdup(strCommand.c_str());
cur->next = NULL;
poll_packets_mutex.unlock();
if (strCommand == "version") {
} else if (pfrom->nVersion == 0) {
} else if (strCommand == "verack") {
} else if (strCommand == "addr") {
} else if (strCommand == "inv") {
} else if (strCommand == "getdata") {
} else if (strCommand == "getblocks") {
} else if (strCommand == "getheaders") {
} else if (strCommand == "tx") {
} else if (strCommand == "block" && !fImporting && !fReindex) { // Ignore blocks received while importing
} else if (strCommand == "getaddr") {
} else if (strCommand == "mempool") {
} else if (strCommand == "ping") {
} else if (strCommand == "pong") {
} else if (strCommand == "alert") {
} else if (strCommand == "filterload") {
} else if (strCommand == "filteradd") {
} else if (strCommand == "filterclear") {
} else if (strCommand == "reject") {
} else {
}
// Update the last seen time for this node's address
if (pfrom->fNetworkNode) {
if (strCommand == "version"
|| strCommand == "addr"
|| strCommand == "inv"
|| strCommand == "getdata"
|| strCommand == "ping") {
;
}
}
return true;
}
bool static
process_getdata(CNode* pfrom) {
return true;
}
/**
* Init()
* Initialize the singleton object known as bitcoindjs.
@ -3377,6 +3627,8 @@ init(Handle<Object> target) {
NODE_SET_METHOD(target, "walletEncrypt", WalletEncrypt);
NODE_SET_METHOD(target, "walletSetTxFee", WalletSetTxFee);
NODE_SET_METHOD(target, "walletImportKey", WalletImportKey);
NODE_SET_METHOD(target, "hookPackets", HookPackets);
}
NODE_MODULE(bitcoindjs, init)