poll mempool.

This commit is contained in:
Christopher Jeffrey 2014-09-23 13:57:49 -07:00
parent 09088d4a72
commit 288cac6e63
3 changed files with 185 additions and 11 deletions

View File

@ -23,6 +23,10 @@ bitcoind.start(function(err) {
// print('Found tx:');
// print(tx);
// });
bitcoind.on('mptx', function(mptx) {
print('Found mempool tx:');
print(mptx);
});
});
});

View File

@ -132,6 +132,38 @@ Bitcoin.prototype.start = function(callback) {
this._emitted = {};
this.on('newListener', function(name) {
if (name === 'block') {
self._pollBlocks();
return;
}
if (name === 'tx') {
self._pollBlocks();
self._pollMempool();
return;
}
if (name === 'mptx') {
self._pollMempool();
return;
}
});
if (this.log_pipe !== -1) {
this.log('log pipe opened: %d', this.log_pipe);
this._pipe = new net.Socket(this.log_pipe);
this._pipe.on('data', function(data) {
return process.stdout.write('bitcoind: ' + data + '\n');
});
this._pipe.on('error', function(err) {
; // ignore for now
});
this._pipe.resume();
}
};
Bitcoin.prototype._pollBlocks = function() {
if (this._pollingBlocks) return;
this._pollingBlocks = true;
(function next() {
return bitcoindjs.pollBlocks(function(err, blocks) {
if (err) return setTimeout(next, self.pollInterval);
@ -161,18 +193,34 @@ Bitcoin.prototype.start = function(callback) {
});
});
})();
};
if (this.log_pipe !== -1) {
this.log('log pipe opened: %d', this.log_pipe);
this._pipe = new net.Socket(this.log_pipe);
this._pipe.on('data', function(data) {
return process.stdout.write('bitcoind: ' + data + '\n');
Bitcoin.prototype._pollMempool = function() {
if (this._pollingMempool) return;
this._pollingMempool = true;
(function next() {
return bitcoindjs.pollMempool(function(err, txs) {
if (err) return setTimeout(next, self.pollInterval);
return utils.forEach(txs, function(tx, nextTx) {
// XXX Bad workaround
if (self._emitted[tx.hash]) {
return setImmediate(function() {
return nextTx();
});
}
self._emitted[tx.hash] = true;
self.emit('mptx', tx);
self.emit('tx', tx);
return setImmediate(function() {
return nextTx();
});
}, function() {
return setTimeout(next, self.pollInterval);
});
});
this._pipe.on('error', function(err) {
; // ignore for now
});
this._pipe.resume();
}
})();
};
Bitcoin.prototype.getBlock = function(blockHash, callback) {

View File

@ -124,6 +124,7 @@ NAN_METHOD(StopBitcoind);
NAN_METHOD(GetBlock);
NAN_METHOD(GetTx);
NAN_METHOD(PollBlocks);
NAN_METHOD(PollMempool);
static void
async_start_node_work(uv_work_t *req);
@ -162,9 +163,15 @@ static void
async_poll_blocks_after(uv_work_t *req);
static void
ctx_to_js(const CTransaction& tx, uint256 hashBlock, Local<Object> entry);
async_poll_mempool(uv_work_t *req);
static void
async_poll_mempool_after(uv_work_t *req);
static inline void
ctx_to_js(const CTransaction& tx, uint256 hashBlock, Local<Object> entry);
static inline void
cblock_to_js(const CBlock& block, const CBlockIndex* blockindex, Local<Object> obj);
extern "C" void
@ -236,6 +243,19 @@ struct async_poll_blocks_data {
Persistent<Function> callback;
};
/**
* async_poll_mempool_data
*/
struct async_poll_mempool_data {
std::string err_msg;
int poll_saved_height;
int poll_top_height;
Persistent<Array> result_array;
Persistent<Function> callback;
};
/**
* StartBitcoind
* bitcoind.start(callback)
@ -809,6 +829,107 @@ async_poll_blocks_after(uv_work_t *req) {
delete req;
}
/**
* PollMempool(callback)
* bitcoind.pollMempool(callback)
*/
NAN_METHOD(PollMempool) {
NanScope();
if (args.Length() < 1 || !args[0]->IsFunction()) {
return NanThrowError(
"Usage: bitcoindjs.pollMempool(callback)");
}
Local<Function> callback = Local<Function>::Cast(args[0]);
async_poll_mempool_data *data = new async_poll_mempool_data();
data->poll_saved_height = -1;
data->poll_top_height = -1;
data->err_msg = std::string("");
data->callback = Persistent<Function>::New(callback);
uv_work_t *req = new uv_work_t();
req->data = data;
int status = uv_queue_work(uv_default_loop(),
req, async_poll_mempool,
(uv_after_work_cb)async_poll_mempool_after);
assert(status == 0);
NanReturnValue(Undefined());
}
static void
async_poll_mempool(uv_work_t *req) {
// async_poll_blocks_data* data = static_cast<async_poll_blocks_data*>(req->data);
// Nothing really async to do here. It's all in memory. Placeholder for now.
useconds_t usec = 20 * 1000;
usleep(usec);
}
static void
async_poll_mempool_after(uv_work_t *req) {
NanScope();
async_poll_blocks_data* data = static_cast<async_poll_blocks_data*>(req->data);
if (!data->err_msg.empty()) {
Local<Value> err = Exception::Error(String::New(data->err_msg.c_str()));
const unsigned argc = 1;
Local<Value> argv[argc] = { err };
TryCatch try_catch;
data->callback->Call(Context::GetCurrent()->Global(), argc, argv);
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
} else {
int ti = 0;
Local<Array> txs = NanNew<Array>();
{
std::map<uint256, CTxMemPoolEntry>::const_iterator it = mempool.mapTx.begin();
for (; it != mempool.mapTx.end(); it++) {
const CTransaction& tx = it->second.GetTx();
// uint256 hash = it->second.GetTx().GetHash();
Local<Object> entry = NanNew<Object>();
ctx_to_js(tx, 0, entry);
txs->Set(ti, entry);
ti++;
}
}
{
std::map<COutPoint, CInPoint>::const_iterator it = mempool.mapNextTx.begin();
for (; it != mempool.mapNextTx.end(); it++) {
const CTransaction tx = *it->second.ptx;
// uint256 hash = it->second.ptx->GetHash();
Local<Object> entry = NanNew<Object>();
ctx_to_js(tx, 0, entry);
txs->Set(ti, entry);
ti++;
}
}
const unsigned argc = 2;
Local<Value> argv[argc] = {
Local<Value>::New(Null()),
Local<Value>::New(txs)
};
TryCatch try_catch;
data->callback->Call(Context::GetCurrent()->Global(), argc, argv);
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
}
data->callback.Dispose();
delete data;
delete req;
}
/**
* Conversions
*/
@ -1034,6 +1155,7 @@ init(Handle<Object> target) {
NODE_SET_METHOD(target, "getBlock", GetBlock);
NODE_SET_METHOD(target, "getTx", GetTx);
NODE_SET_METHOD(target, "pollBlocks", PollBlocks);
NODE_SET_METHOD(target, "pollMempool", PollMempool);
}
NODE_MODULE(bitcoindjs, init)