From f08a9c66bfb2f2bfe680971c369733df9c34be05 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Tue, 1 Sep 2015 10:52:24 -0400 Subject: [PATCH] New way to persist callbacks to alleviate calling of setImmediate. --- src/libbitcoind.cc | 457 +++++++++++++++++++++------------------------ 1 file changed, 211 insertions(+), 246 deletions(-) diff --git a/src/libbitcoind.cc b/src/libbitcoind.cc index 1c6fc879..bb866707 100644 --- a/src/libbitcoind.cc +++ b/src/libbitcoind.cc @@ -112,8 +112,10 @@ static boost::thread_group threadGroup; */ struct async_tip_update_data { + uv_work_t req; size_t result; - Eternal callback; + Isolate* isolate; + Persistent callback; }; /** @@ -122,9 +124,11 @@ struct async_tip_update_data { */ struct async_block_ready_data { + uv_work_t req; std::string err_msg; std::string result; - Eternal callback; + Isolate* isolate; + Persistent callback; }; /** @@ -133,6 +137,7 @@ struct async_block_ready_data { */ struct async_node_data { + uv_work_t req; std::string err_msg; std::string result; std::string datadir; @@ -140,7 +145,8 @@ struct async_node_data { bool testnet; bool regtest; bool txindex; - Eternal callback; + Isolate* isolate; + Persistent callback; }; /** @@ -148,6 +154,7 @@ struct async_node_data { */ struct async_block_data { + uv_work_t req; std::string err_msg; std::string hash; int64_t height; @@ -155,7 +162,8 @@ struct async_block_data { uint32_t size; CBlock cblock; CBlockIndex* cblock_index; - Eternal callback; + Isolate* isolate; + Persistent callback; }; /** @@ -163,6 +171,7 @@ struct async_block_data { */ struct async_tx_data { + uv_work_t req; std::string err_msg; std::string txid; std::string blockHash; @@ -170,7 +179,8 @@ struct async_tx_data { int64_t height; bool queryMempool; CTransaction ctx; - Eternal callback; + Isolate* isolate; + Persistent callback; }; /** @@ -356,22 +366,18 @@ scan_messages(CNode* pfrom) { */ NAN_METHOD(OnTipUpdate) { - Isolate* isolate = Isolate::GetCurrent(); + Isolate* isolate = args.GetIsolate(); HandleScope scope(isolate); - Local callback; - callback = Local::Cast(args[0]); + async_tip_update_data *req = new async_tip_update_data(); - async_tip_update_data *data = new async_tip_update_data(); - - Eternal eternal(isolate, callback); - - data->callback = eternal; - uv_work_t *req = new uv_work_t(); - req->data = data; + Local callback = Local::Cast(args[0]); + req->callback.Reset(isolate, callback); + req->req.data = req; + req->isolate = isolate; int status = uv_queue_work(uv_default_loop(), - req, async_tip_update, + &req->req, async_tip_update, (uv_after_work_cb)async_tip_update_after); assert(status == 0); @@ -382,7 +388,7 @@ NAN_METHOD(OnTipUpdate) { static void async_tip_update(uv_work_t *req) { - async_tip_update_data *data = static_cast(req->data); + async_tip_update_data *data = reinterpret_cast(req->data); size_t lastHeight = chainActive.Height(); @@ -395,49 +401,45 @@ async_tip_update(uv_work_t *req) { } static void -async_tip_update_after(uv_work_t *req) { - Isolate* isolate = Isolate::GetCurrent(); +async_tip_update_after(uv_work_t *r) { + async_tip_update_data *req = reinterpret_cast(r->data); + Isolate* isolate = req->isolate; HandleScope scope(isolate); - async_tip_update_data *data = static_cast(req->data); + Local cb = Local::New(isolate, req->callback); - Local cb = data->callback.Get(isolate); - const unsigned argc = 1; + TryCatch try_catch; Local result = Undefined(isolate); + if (!shutdown_complete) { - result = NanNew(data->result); + result = NanNew(req->result); } - Local argv[argc] = { + Local argv[1] = { Local::New(isolate, result) }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); + cb->Call(isolate->GetCurrentContext()->Global(), 1, argv); if (try_catch.HasCaught()) { node::FatalException(try_catch); } - delete data; + req->callback.Reset(); delete req; } NAN_METHOD(OnBlocksReady) { - Isolate* isolate = Isolate::GetCurrent(); + Isolate* isolate = args.GetIsolate(); HandleScope scope(isolate); - Local callback; - callback = Local::Cast(args[0]); + async_block_ready_data *req = new async_block_ready_data(); + req->err_msg = std::string(""); + req->result = std::string(""); + req->req.data = req; + req->isolate = isolate; - async_block_ready_data *data = new async_block_ready_data(); - data->err_msg = std::string(""); - data->result = std::string(""); - - Eternal eternal(isolate, callback); - - data->callback = eternal; - uv_work_t *req = new uv_work_t(); - req->data = data; + Local callback = Local::Cast(args[0]); + req->callback.Reset(isolate, callback); int status = uv_queue_work(uv_default_loop(), - req, async_blocks_ready, + &req->req, async_blocks_ready, (uv_after_work_cb)async_blocks_ready_after); assert(status == 0); @@ -452,7 +454,7 @@ NAN_METHOD(OnBlocksReady) { static void async_blocks_ready(uv_work_t *req) { - async_block_ready_data *data = static_cast(req->data); + async_block_ready_data *data = reinterpret_cast(req->data); data->result = std::string(""); while(!chainActive.Tip()) { @@ -491,35 +493,31 @@ async_blocks_ready(uv_work_t *req) { } static void -async_blocks_ready_after(uv_work_t *req) { - Isolate* isolate = Isolate::GetCurrent(); +async_blocks_ready_after(uv_work_t *r) { + async_block_ready_data* req = reinterpret_cast(r->data); + Isolate* isolate = req->isolate; HandleScope scope(isolate); - async_block_ready_data *data = static_cast(req->data); - Local cb = data->callback.Get(isolate); - if (data->err_msg != "") { - Local err = Exception::Error(NanNew(data->err_msg)); - const unsigned argc = 1; - Local argv[argc] = { err }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + TryCatch try_catch; + Local cb = Local::New(isolate, req->callback); + + if (req->err_msg != "") { + Local err = Exception::Error(NanNew(req->err_msg)); + Local argv[1] = { err }; + cb->Call(isolate->GetCurrentContext()->Global(), 1, argv); } else { - const unsigned argc = 2; - Local argv[argc] = { + Local argv[2] = { v8::Null(isolate), - Local::New(isolate, NanNew(data->result)) + Local::New(isolate, NanNew(req->result)) }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + cb->Call(isolate->GetCurrentContext()->Global(), 2, argv); } - delete data; + if (try_catch.HasCaught()) { + node::FatalException(try_catch); + } + + req->callback.Reset(); delete req; } @@ -529,7 +527,7 @@ async_blocks_ready_after(uv_work_t *req) { * Start the bitcoind node with AppInit2() on a separate thread. */ NAN_METHOD(StartBitcoind) { - Isolate* isolate = Isolate::GetCurrent(); + Isolate* isolate = args.GetIsolate(); HandleScope scope(isolate); Local callback; @@ -576,23 +574,21 @@ NAN_METHOD(StartBitcoind) { // Run bitcoind's StartNode() on a separate thread. // - async_node_data *data = new async_node_data(); - data->err_msg = std::string(""); - data->result = std::string(""); - data->datadir = datadir; - data->rpc = rpc; - data->testnet = testnet; - data->regtest = regtest; - data->txindex = txindex; + async_node_data *req = new async_node_data(); + req->err_msg = std::string(""); + req->result = std::string(""); + req->datadir = datadir; + req->rpc = rpc; + req->testnet = testnet; + req->regtest = regtest; + req->txindex = txindex; - Eternal eternal(isolate, callback); - - data->callback = eternal; - uv_work_t *req = new uv_work_t(); - req->data = data; + req->isolate = isolate; + req->callback.Reset(isolate, callback); + req->req.data = req; int status = uv_queue_work(uv_default_loop(), - req, async_start_node, + &req->req, async_start_node, (uv_after_work_cb)async_start_node_after); assert(status == 0); @@ -607,7 +603,7 @@ NAN_METHOD(StartBitcoind) { static void async_start_node(uv_work_t *req) { - async_node_data *data = static_cast(req->data); + async_node_data *data = reinterpret_cast(req->data); if (data->datadir != "") { g_data_dir = (char *)data->datadir.c_str(); } else { @@ -629,35 +625,31 @@ async_start_node(uv_work_t *req) { */ static void -async_start_node_after(uv_work_t *req) { - Isolate* isolate = Isolate::GetCurrent(); +async_start_node_after(uv_work_t *r) { + async_node_data *req = reinterpret_cast(r->data); + Isolate* isolate = req->isolate; HandleScope scope(isolate); - async_node_data *data = static_cast(req->data); - Local cb = data->callback.Get(isolate); - if (data->err_msg != "") { - Local err = Exception::Error(NanNew(data->err_msg)); - const unsigned argc = 1; - Local argv[argc] = { err }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + TryCatch try_catch; + Local cb = Local::New(isolate, req->callback); + + if (req->err_msg != "") { + Local err = Exception::Error(NanNew(req->err_msg)); + Local argv[1] = { err }; + cb->Call(isolate->GetCurrentContext()->Global(), 1, argv); } else { - const unsigned argc = 2; - Local argv[argc] = { + Local argv[2] = { v8::Null(isolate), - Local::New(isolate, NanNew(data->result)) + Local::New(isolate, NanNew(req->result)) }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + cb->Call(isolate->GetCurrentContext()->Global(), 2, argv); } - delete data; + if (try_catch.HasCaught()) { + node::FatalException(try_catch); + } + + req->callback.Reset(); delete req; } @@ -793,7 +785,7 @@ start_node_thread(void) { */ NAN_METHOD(StopBitcoind) { - Isolate* isolate = Isolate::GetCurrent(); + Isolate* isolate = args.GetIsolate(); HandleScope scope(isolate); if (args.Length() < 1 || !args[0]->IsFunction()) { @@ -807,17 +799,15 @@ NAN_METHOD(StopBitcoind) { // Run bitcoind's StartShutdown() on a separate thread. // - async_node_data *data = new async_node_data(); - data->err_msg = std::string(""); - data->result = std::string(""); - Eternal eternal(isolate, callback); - data->callback = eternal; - - uv_work_t *req = new uv_work_t(); - req->data = data; + async_node_data *req = new async_node_data(); + req->err_msg = std::string(""); + req->result = std::string(""); + req->callback.Reset(isolate, callback); + req->req.data = req; + req->isolate = isolate; int status = uv_queue_work(uv_default_loop(), - req, async_stop_node, + &req->req, async_stop_node, (uv_after_work_cb)async_stop_node_after); assert(status == 0); @@ -833,7 +823,7 @@ NAN_METHOD(StopBitcoind) { static void async_stop_node(uv_work_t *req) { - async_node_data *data = static_cast(req->data); + async_node_data *data = reinterpret_cast(req->data); StartShutdown(); @@ -849,35 +839,30 @@ async_stop_node(uv_work_t *req) { */ static void -async_stop_node_after(uv_work_t *req) { - Isolate* isolate = Isolate::GetCurrent(); +async_stop_node_after(uv_work_t *r) { + async_node_data* req = reinterpret_cast(r->data); + Isolate* isolate = req->isolate; HandleScope scope(isolate); - async_node_data* data = static_cast(req->data); - Local cb = data->callback.Get(isolate); - if (data->err_msg != "") { - Local err = Exception::Error(NanNew(data->err_msg)); - const unsigned argc = 1; - Local argv[argc] = { err }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + TryCatch try_catch; + Local cb = Local::New(isolate, req->callback); + + if (req->err_msg != "") { + Local err = Exception::Error(NanNew(req->err_msg)); + Local argv[1] = { err }; + cb->Call(isolate->GetCurrentContext()->Global(), 1, argv); } else { - const unsigned argc = 2; - Local argv[argc] = { + Local argv[2] = { Local::New(isolate, NanNull()), - Local::New(isolate, NanNew(data->result)) + Local::New(isolate, NanNew(req->result)) }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + cb->Call(isolate->GetCurrentContext()->Global(), 2, argv); } - delete data; + if (try_catch.HasCaught()) { + node::FatalException(try_catch); + } + req->callback.Reset(); delete req; } @@ -888,7 +873,7 @@ async_stop_node_after(uv_work_t *req) { */ NAN_METHOD(GetBlock) { - Isolate* isolate = Isolate::GetCurrent(); + Isolate* isolate = args.GetIsolate(); HandleScope scope(isolate); if (args.Length() < 2 || (!args[0]->IsString() && !args[0]->IsNumber()) @@ -897,30 +882,28 @@ NAN_METHOD(GetBlock) { "Usage: bitcoindjs.getBlock([blockhash,blockheight], callback)"); } - async_block_data *data = new async_block_data(); + async_block_data *req = new async_block_data(); if (args[0]->IsNumber()) { int64_t height = args[0]->IntegerValue(); - data->err_msg = std::string(""); - data->hash = std::string(""); - data->height = height; + req->err_msg = std::string(""); + req->hash = std::string(""); + req->height = height; } else { String::Utf8Value hash_(args[0]->ToString()); std::string hash = std::string(*hash_); - data->err_msg = std::string(""); - data->hash = hash; - data->height = -1; + req->err_msg = std::string(""); + req->hash = hash; + req->height = -1; } Local callback = Local::Cast(args[1]); - Eternal eternal(isolate, callback); - data->callback = eternal; - - uv_work_t *req = new uv_work_t(); - req->data = data; + req->req.data = req; + req->isolate = isolate; + req->callback.Reset(isolate, callback); int status = uv_queue_work(uv_default_loop(), - req, async_get_block, + &req->req, async_get_block, (uv_after_work_cb)async_get_block_after); assert(status == 0); @@ -930,7 +913,7 @@ NAN_METHOD(GetBlock) { static void async_get_block(uv_work_t *req) { - async_block_data* data = static_cast(req->data); + async_block_data* data = reinterpret_cast(req->data); CBlockIndex* pblockindex; uint256 hash = uint256S(data->hash); @@ -984,41 +967,37 @@ async_get_block(uv_work_t *req) { } static void -async_get_block_after(uv_work_t *req) { - Isolate *isolate = Isolate::GetCurrent(); +async_get_block_after(uv_work_t *r) { + async_block_data* req = reinterpret_cast(r->data); + Isolate *isolate = req->isolate; HandleScope scope(isolate); - async_block_data* data = static_cast(req->data); - Local cb = data->callback.Get(isolate); - if (data->err_msg != "") { - Local err = Exception::Error(NanNew(data->err_msg)); - const unsigned argc = 1; - Local argv[argc] = { err }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + TryCatch try_catch; + Local cb = Local::New(isolate, req->callback); + + if (req->err_msg != "") { + Local err = Exception::Error(NanNew(req->err_msg)); + Local argv[1] = { err }; + cb->Call(isolate->GetCurrentContext()->Global(), 1, argv); } else { - Local rawNodeBuffer = node::Buffer::New(isolate, data->buffer, data->size); + Local rawNodeBuffer = node::Buffer::New(isolate, req->buffer, req->size); - delete data->buffer; - data->buffer = NULL; + delete req->buffer; + req->buffer = NULL; - const unsigned argc = 2; - Local argv[argc] = { + Local argv[2] = { Local::New(isolate, NanNull()), rawNodeBuffer }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + cb->Call(isolate->GetCurrentContext()->Global(), 2, argv); } - delete data; + if (try_catch.HasCaught()) { + node::FatalException(try_catch); + } + + req->callback.Reset(); delete req; } @@ -1029,7 +1008,7 @@ async_get_block_after(uv_work_t *req) { */ NAN_METHOD(GetTransaction) { - Isolate* isolate = Isolate::GetCurrent(); + Isolate* isolate = args.GetIsolate(); HandleScope scope(isolate); if (args.Length() < 3 || !args[0]->IsString() @@ -1043,23 +1022,21 @@ NAN_METHOD(GetTransaction) { bool queryMempool = args[1]->BooleanValue(); Local callback = Local::Cast(args[2]); - async_tx_data *data = new async_tx_data(); + async_tx_data *req = new async_tx_data(); - data->err_msg = std::string(""); - data->txid = std::string(""); + req->err_msg = std::string(""); + req->txid = std::string(""); std::string txid = std::string(*txid_); - data->txid = txid; - data->queryMempool = queryMempool; - Eternal eternal(isolate, callback); - data->callback = eternal; - - uv_work_t *req = new uv_work_t(); - req->data = data; + req->txid = txid; + req->queryMempool = queryMempool; + req->isolate = isolate; + req->req.data = req; + req->callback.Reset(isolate, callback); int status = uv_queue_work(uv_default_loop(), - req, async_get_tx, + &req->req, async_get_tx, (uv_after_work_cb)async_get_tx_after); assert(status == 0); @@ -1069,7 +1046,7 @@ NAN_METHOD(GetTransaction) { static void async_get_tx(uv_work_t *req) { - async_tx_data* data = static_cast(req->data); + async_tx_data* data = reinterpret_cast(req->data); uint256 hash = uint256S(data->txid); uint256 blockhash; @@ -1112,23 +1089,19 @@ async_get_tx(uv_work_t *req) { } static void -async_get_tx_after(uv_work_t *req) { - Isolate* isolate = Isolate::GetCurrent(); +async_get_tx_after(uv_work_t *r) { + async_tx_data* req = reinterpret_cast(r->data); + Isolate* isolate = req->isolate; HandleScope scope(isolate); - async_tx_data* data = static_cast(req->data); - CTransaction ctx = data->ctx; - Local cb = data->callback.Get(isolate); + CTransaction ctx = req->ctx; + TryCatch try_catch; + Local cb = Local::New(isolate, req->callback); - if (data->err_msg != "") { - Local err = Exception::Error(NanNew(data->err_msg)); - const unsigned argc = 1; - Local argv[argc] = { err }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + if (req->err_msg != "") { + Local err = Exception::Error(NanNew(req->err_msg)); + Local argv[1] = { err }; + cb->Call(isolate->GetCurrentContext()->Global(), 1, argv); } else { Local result = Local::New(isolate, NanNull()); @@ -1140,18 +1113,18 @@ async_get_tx_after(uv_work_t *req) { result = node::Buffer::New(isolate, stx.c_str(), stx.size()); } - const unsigned argc = 2; - Local argv[argc] = { + Local argv[2] = { Local::New(isolate, NanNull()), result }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + cb->Call(isolate->GetCurrentContext()->Global(), 2, argv); } - delete data; + + if (try_catch.HasCaught()) { + node::FatalException(try_catch); + } + + req->callback.Reset(); delete req; } @@ -1162,7 +1135,7 @@ async_get_tx_after(uv_work_t *req) { */ NAN_METHOD(GetTransactionWithBlockInfo) { - Isolate* isolate = Isolate::GetCurrent(); + Isolate* isolate = args.GetIsolate(); HandleScope scope(isolate); if (args.Length() < 3 || !args[0]->IsString() @@ -1176,23 +1149,21 @@ NAN_METHOD(GetTransactionWithBlockInfo) { bool queryMempool = args[1]->BooleanValue(); Local callback = Local::Cast(args[2]); - async_tx_data *data = new async_tx_data(); + async_tx_data *req = new async_tx_data(); - data->err_msg = std::string(""); - data->txid = std::string(""); + req->err_msg = std::string(""); + req->txid = std::string(""); std::string txid = std::string(*txid_); - data->txid = txid; - data->queryMempool = queryMempool; - Eternal eternal(isolate, callback); - data->callback = eternal; - - uv_work_t *req = new uv_work_t(); - req->data = data; + req->txid = txid; + req->queryMempool = queryMempool; + req->req.data = req; + req->isolate = isolate; + req->callback.Reset(isolate, callback); int status = uv_queue_work(uv_default_loop(), - req, async_get_tx_and_info, + &req->req, async_get_tx_and_info, (uv_after_work_cb)async_get_tx_and_info_after); assert(status == 0); @@ -1202,7 +1173,7 @@ NAN_METHOD(GetTransactionWithBlockInfo) { static void async_get_tx_and_info(uv_work_t *req) { - async_tx_data* data = static_cast(req->data); + async_tx_data* data = reinterpret_cast(req->data); uint256 hash = uint256S(data->txid); uint256 blockHash; @@ -1260,24 +1231,20 @@ async_get_tx_and_info(uv_work_t *req) { } static void -async_get_tx_and_info_after(uv_work_t *req) { - Isolate* isolate = Isolate::GetCurrent(); +async_get_tx_and_info_after(uv_work_t *r) { + async_tx_data* req = reinterpret_cast(r->data); + Isolate* isolate = req->isolate; HandleScope scope(isolate); - async_tx_data* data = static_cast(req->data); - CTransaction ctx = data->ctx; - Local cb = data->callback.Get(isolate); + CTransaction ctx = req->ctx; + TryCatch try_catch; + Local cb = Local::New(isolate, req->callback); Local obj = NanNew(); - if (data->err_msg != "") { - Local err = Exception::Error(NanNew(data->err_msg)); - const unsigned argc = 1; - Local argv[argc] = { err }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + if (req->err_msg != "") { + Local err = Exception::Error(NanNew(req->err_msg)); + Local argv[1] = { err }; + cb->Call(isolate->GetCurrentContext()->Global(), 1, argv); } else { CDataStream ssTx(SER_NETWORK, PROTOCOL_VERSION); @@ -1285,23 +1252,21 @@ async_get_tx_and_info_after(uv_work_t *req) { std::string stx = ssTx.str(); Local rawNodeBuffer = node::Buffer::New(isolate, stx.c_str(), stx.size()); - obj->Set(NanNew("blockHash"), NanNew(data->blockHash)); - obj->Set(NanNew("height"), NanNew(data->height)); - obj->Set(NanNew("timestamp"), NanNew(data->nTime)); + obj->Set(NanNew("blockHash"), NanNew(req->blockHash)); + obj->Set(NanNew("height"), NanNew(req->height)); + obj->Set(NanNew("timestamp"), NanNew(req->nTime)); obj->Set(NanNew("buffer"), rawNodeBuffer); - const unsigned argc = 2; - Local argv[argc] = { + Local argv[2] = { Local::New(isolate, NanNull()), obj }; - TryCatch try_catch; - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - if (try_catch.HasCaught()) { - node::FatalException(try_catch); - } + cb->Call(isolate->GetCurrentContext()->Global(), 2, argv); } - delete data; + if (try_catch.HasCaught()) { + node::FatalException(try_catch); + } + req->callback.Reset(); delete req; }