diff --git a/src/asyncrpcoperation.cpp b/src/asyncrpcoperation.cpp index 013b0fbb..7a66c451 100644 --- a/src/asyncrpcoperation.cpp +++ b/src/asyncrpcoperation.cpp @@ -25,25 +25,27 @@ std::map OperationStatusMap = { {OperationStatus::SUCCESS, "success"} }; -AsyncRPCOperation::AsyncRPCOperation() : errorCode(0), errorMessage() { +/** + * Every operation instance should have a globally unique id + */ +AsyncRPCOperation::AsyncRPCOperation() : error_code_(0), error_message_() { // Set a unique reference for each operation boost::uuids::uuid uuid = uuidgen(); std::string s = "opid-" + boost::uuids::to_string(uuid); - setId(s); + set_id(s); - setState(OperationStatus::READY); - creationTime = (int64_t)time(NULL); + set_state(OperationStatus::READY); + creation_time_ = (int64_t)time(NULL); } -AsyncRPCOperation::AsyncRPCOperation(const AsyncRPCOperation& o) : id(o.id), creationTime(o.creationTime), state(o.state.load()) +AsyncRPCOperation::AsyncRPCOperation(const AsyncRPCOperation& o) : id_(o.id_), creation_time_(o.creation_time_), state_(o.state_.load()) { } - AsyncRPCOperation& AsyncRPCOperation::operator=( const AsyncRPCOperation& other ) { - this->id = other.getId(); - this->creationTime = other.creationTime; - this->state.store(other.state.load()); + this->id_ = other.getId(); + this->creation_time_ = other.creation_time_; + this->state_.store(other.state_.load()); return *this; } @@ -51,73 +53,85 @@ AsyncRPCOperation& AsyncRPCOperation::operator=( const AsyncRPCOperation& other AsyncRPCOperation::~AsyncRPCOperation() { } +/** + * Override this cancel() method if you can interrupt main() when executing. + */ void AsyncRPCOperation::cancel() { - if (isReady()) - setState(OperationStatus::CANCELLED); + if (isReady()) { + set_state(OperationStatus::CANCELLED); + } } - -void AsyncRPCOperation::startExecutionClock() { - startTime = std::chrono::system_clock::now(); +/** + * Start timing the execution run of the code you're interested in + */ +void AsyncRPCOperation::start_execution_clock() { + start_time_ = std::chrono::system_clock::now(); } -void AsyncRPCOperation::stopExecutionClock() { - endTime = std::chrono::system_clock::now(); +/** + * Stop timing the execution run + */ +void AsyncRPCOperation::stop_execution_clock() { + end_time_ = std::chrono::system_clock::now(); } - -// Implement this method in any subclass. -// This is just an example implementation. - - +/** + * Implement this virtual method in any subclass. This is just an example implementation. + */ void AsyncRPCOperation::main() { - if (isCancelled()) + if (isCancelled()) { return; + } - setState(OperationStatus::EXECUTING); + set_state(OperationStatus::EXECUTING); - // - // Do some work here... - // + start_execution_clock(); - startExecutionClock(); + // Do some work here.. - //std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + stop_execution_clock(); - stopExecutionClock(); + // If there was an error, you might set it like this: + /* + setErrorCode(123); + setErrorMessage("Murphy's law"); + setState(OperationStatus::FAILED); + */ - - // If there was an error... -// setErrorCode(123); -// setErrorMessage("Murphy's law"); -// setState(OperationStatus::FAILED); - - - // Otherwise + // Otherwise, if the operation was a success: Value v("We have a result!"); - setResult(v); - setState(OperationStatus::SUCCESS); + set_result(v); + set_state(OperationStatus::SUCCESS); } +/** + * Return the error of the completed operation as a Value object. + */ Value AsyncRPCOperation::getError() const { - if (!isFailed()) + if (!isFailed()) { return Value::null; + } Object error; - error.push_back(Pair("code", this->errorCode)); - error.push_back(Pair("message", this->errorMessage)); + error.push_back(Pair("code", this->error_code_)); + error.push_back(Pair("message", this->error_message_)); return Value(error); } +/** + * Return the result of the completed operation as a Value object. + */ Value AsyncRPCOperation::getResult() const { - if (!isSuccess()) + if (!isSuccess()) { return Value::null; + } - return this->resultValue; + return this->result_; } -/* +/** * Returns a status Value object. * If the operation has failed, it will include an error object. * If the operation has succeeded, it will include the result value. @@ -127,7 +141,7 @@ Value AsyncRPCOperation::getStatus() const { Object obj; obj.push_back(Pair("id", this->getId())); obj.push_back(Pair("status", OperationStatusMap[status])); - obj.push_back(Pair("creation_time", this->creationTime)); + obj.push_back(Pair("creation_time", this->creation_time_)); // creation, exec time, duration, exec end, etc. Value err = this->getError(); if (!err.is_null()) { @@ -138,14 +152,16 @@ Value AsyncRPCOperation::getStatus() const { obj.push_back(Pair("result", result)); // Include execution time for successful operation - std::chrono::duration elapsed_seconds = endTime - startTime; + std::chrono::duration elapsed_seconds = end_time_ - start_time_; obj.push_back(Pair("execution_secs", elapsed_seconds.count())); } return Value(obj); } - +/** + * Return the operation state in human readable form. + */ std::string AsyncRPCOperation::getStateAsString() const { OperationStatus status = this->getState(); return OperationStatusMap[status]; diff --git a/src/asyncrpcoperation.h b/src/asyncrpcoperation.h index 7e804a5c..99c3252d 100644 --- a/src/asyncrpcoperation.h +++ b/src/asyncrpcoperation.h @@ -20,11 +20,11 @@ using namespace std; using namespace json_spirit; /** - * AsyncRPCOperations are given to the AsyncRPCQueue for processing. + * AsyncRPCOperation objects are submitted to the AsyncRPCQueue for processing. * - * How to subclass: - * Implement the main() method, this is where work is performed. + * To subclass AsyncRPCOperation, implement the main() method. * Update the operation status as work is underway and completes. + * If main() can be interrupted, inmplement the cancel() method. */ typedef std::string AsyncRPCOperationId; @@ -40,30 +40,26 @@ typedef enum class operationStateEnum { class AsyncRPCOperation { public: AsyncRPCOperation(); - - // Todo: keep or delete copy constructors and assignment? - AsyncRPCOperation(const AsyncRPCOperation& orig); - AsyncRPCOperation& operator=( const AsyncRPCOperation& other ); - virtual ~AsyncRPCOperation(); - // Implement this method in your subclass. + // You must implement this method in your subclass. virtual void main(); + // Override this method if you can interrupt execution of main() in your subclass. void cancel(); // Getters and setters OperationStatus getState() const { - return state.load(); + return state_.load(); } AsyncRPCOperationId getId() const { - return id; + return id_; } int64_t getCreationTime() const { - return creationTime; + return creation_time_; } Value getStatus() const; @@ -75,11 +71,11 @@ public: std::string getStateAsString() const; int getErrorCode() const { - return errorCode; + return error_code_; } std::string getErrorMessage() const { - return errorMessage; + return error_message_; } bool isCancelled() const { @@ -104,46 +100,47 @@ public: protected: - Value resultValue; - int errorCode; - std::string errorMessage; - std::atomic state; - std::chrono::time_point startTime, endTime; + Value result_; + int error_code_; + std::string error_message_; + std::atomic state_; + std::chrono::time_point start_time_, end_time_; - void startExecutionClock(); - void stopExecutionClock(); + void start_execution_clock(); + void stop_execution_clock(); - void setState(OperationStatus state) { - this->state.store(state); + void set_state(OperationStatus state) { + this->state_.store(state); } - void setErrorCode(int errorCode) { - this->errorCode = errorCode; + void set_error_code(int errorCode) { + this->error_code_ = errorCode; } - void setErrorMessage(std::string errorMessage) { - this->errorMessage = errorMessage; + void set_error_message(std::string errorMessage) { + this->error_message_ = errorMessage; } - void setResult(Value v) { - this->resultValue = v; + void set_result(Value v) { + this->result_ = v; } private: + + // Derived classes should write their own copy constructor and assignment operators + AsyncRPCOperation(const AsyncRPCOperation& orig); + AsyncRPCOperation& operator=( const AsyncRPCOperation& other ); - // Todo: Private for now. If copying an operation is possible, should it - // receive a new id and a new creation time? - void setId(AsyncRPCOperationId id) { - this->id = id; + void set_id(AsyncRPCOperationId id) { + this->id_ = id; } - // Todo: Ditto above. - void setCreationTime(int64_t creationTime) { - this->creationTime = creationTime; + void set_creation_time(int64_t creationTime) { + this->creation_time_ = creationTime; } - AsyncRPCOperationId id; - int64_t creationTime; + AsyncRPCOperationId id_; + int64_t creation_time_; }; #endif /* ASYNCRPCOPERATION_H */ diff --git a/src/asyncrpcqueue.cpp b/src/asyncrpcqueue.cpp index 4cc9336c..5bc96668 100644 --- a/src/asyncrpcqueue.cpp +++ b/src/asyncrpcqueue.cpp @@ -4,47 +4,41 @@ #include "asyncrpcqueue.h" -static std::atomic workerCounter(0); +static std::atomic workerCounter(0); -AsyncRPCQueue::AsyncRPCQueue() : closed(false) { +AsyncRPCQueue::AsyncRPCQueue() : closed_(false) { } -/* - * Calling thread will join on all the worker threads - */ AsyncRPCQueue::~AsyncRPCQueue() { - this->closed = true; // set this in case close() was not invoked - for (std::thread & t : this->workers) { - t.join(); - } + closeAndWait(); // join on all worker threads } -/* +/** * A worker will execute this method on a new thread */ -void AsyncRPCQueue::run(int workerId) { -// std::cout << "Launched queue worker " << workerId << std::endl; +void AsyncRPCQueue::run(size_t workerId) { while (!isClosed()) { AsyncRPCOperationId key; std::shared_ptr operation; { - std::unique_lock< std::mutex > guard(cs_lock); - while (operationIdQueue.empty() && !isClosed()) { - this->cs_condition.wait(guard); + std::unique_lock< std::mutex > guard(lock_); + while (operation_id_queue_.empty() && !isClosed()) { + this->condition_.wait(guard); } // Exit if the queue is closing. - if (isClosed()) + if (isClosed()) { break; + } // Get operation id - key = operationIdQueue.front(); - operationIdQueue.pop(); + key = operation_id_queue_.front(); + operation_id_queue_.pop(); // Search operation map - AsyncRPCOperationMap::const_iterator iter = operationMap.find(key); - if (iter != operationMap.end()) { + AsyncRPCOperationMap::const_iterator iter = operation_map_.find(key); + if (iter != operation_map_.end()) { operation = iter->second; } } @@ -57,15 +51,14 @@ void AsyncRPCQueue::run(int workerId) { operation->main(); } } -// std::cout << "Terminating queue worker " << workerId << std::endl; } -/* +/** * Add shared_ptr to operation. * * To retain polymorphic behaviour, i.e. main() method of derived classes is invoked, - * caller should create the shared_ptr like thi: + * caller should create the shared_ptr like this: * * std::shared_ptr ptr(new MyCustomAsyncRPCOperation(params)); * @@ -74,84 +67,116 @@ void AsyncRPCQueue::run(int workerId) { void AsyncRPCQueue::addOperation(const std::shared_ptr &ptrOperation) { // Don't add if queue is closed - if (isClosed()) + if (isClosed()) { return; + } AsyncRPCOperationId id = ptrOperation->getId(); - { - std::lock_guard< std::mutex > guard(cs_lock); - operationMap.emplace(id, ptrOperation); - operationIdQueue.push(id); - this->cs_condition.notify_one(); - } + std::lock_guard< std::mutex > guard(lock_); + operation_map_.emplace(id, ptrOperation); + operation_id_queue_.push(id); + this->condition_.notify_one(); } - -std::shared_ptr AsyncRPCQueue::getOperationForId(AsyncRPCOperationId id) { +/** + * Return the operation for a given operation id. + */ +std::shared_ptr AsyncRPCQueue::getOperationForId(AsyncRPCOperationId id) const { std::shared_ptr ptr; - std::lock_guard< std::mutex > guard(cs_lock); - AsyncRPCOperationMap::const_iterator iter = operationMap.find(id); - if (iter != operationMap.end()) { + std::lock_guard< std::mutex > guard(lock_); + AsyncRPCOperationMap::const_iterator iter = operation_map_.find(id); + if (iter != operation_map_.end()) { ptr = iter->second; } return ptr; } +/** + * Return the operation for a given operation id and then remove the operation from internal storage. + */ std::shared_ptr AsyncRPCQueue::popOperationForId(AsyncRPCOperationId id) { std::shared_ptr ptr = getOperationForId(id); if (ptr) { - std::lock_guard< std::mutex > guard(cs_lock); + std::lock_guard< std::mutex > guard(lock_); // Note: if the id still exists in the operationIdQueue, when it gets processed by a worker // there will no operation in the map to execute, so nothing will happen. - operationMap.erase(id); + operation_map_.erase(id); } return ptr; } -bool AsyncRPCQueue::isClosed() { - return closed; +/** + * Return true if the queue is closed to new operations. + */ +bool AsyncRPCQueue::isClosed() const { + return closed_; } +/** + * Close the queue and cancel all existing operations + */ void AsyncRPCQueue::close() { - this->closed = true; + this->closed_ = true; cancelAllOperations(); } -/* +/** * Call cancel() on all operations */ void AsyncRPCQueue::cancelAllOperations() { - std::unique_lock< std::mutex > guard(cs_lock); - for (auto key : operationMap) { + std::unique_lock< std::mutex > guard(lock_); + for (auto key : operation_map_) { key.second->cancel(); } - this->cs_condition.notify_all(); + this->condition_.notify_all(); } -int AsyncRPCQueue::getOperationCount() { - std::unique_lock< std::mutex > guard(cs_lock); - return operationIdQueue.size(); +/** + * Return the number of operations in the queue + */ +size_t AsyncRPCQueue::getOperationCount() const { + std::unique_lock< std::mutex > guard(lock_); + return operation_id_queue_.size(); } -/* +/** * Spawn a worker thread */ void AsyncRPCQueue::addWorker() { - std::unique_lock< std::mutex > guard(cs_lock); // Todo: could just have a lock on the vector - workers.emplace_back( std::thread(&AsyncRPCQueue::run, this, ++workerCounter) ); + std::unique_lock< std::mutex > guard(lock_); // Todo: could just have a lock on the vector + workers_.emplace_back( std::thread(&AsyncRPCQueue::run, this, ++workerCounter) ); } -int AsyncRPCQueue::getNumberOfWorkers() { - return workers.size(); +/** + * Return the number of worker threads spawned by the queue + */ +size_t AsyncRPCQueue::getNumberOfWorkers() const { + return workers_.size(); } - -std::vector AsyncRPCQueue::getAllOperationIds() { - std::unique_lock< std::mutex > guard(cs_lock); +/** + * Return a list of all known operation ids found in internal storage. + */ +std::vector AsyncRPCQueue::getAllOperationIds() const { + std::unique_lock< std::mutex > guard(lock_); std::vector v; - for(auto & entry: operationMap) + for(auto & entry: operation_map_) { v.push_back(entry.first); + } return v; } +/** + * Calling thread will close and wait for worker threads to join. + */ +void AsyncRPCQueue::closeAndWait() { + if (!this->closed_) { + close(); + } + for (std::thread & t : this->workers_) { + if (t.joinable()) { + t.join(); + } + } +} diff --git a/src/asyncrpcqueue.h b/src/asyncrpcqueue.h index f86c4f84..3de7b5ba 100644 --- a/src/asyncrpcqueue.h +++ b/src/asyncrpcqueue.h @@ -16,7 +16,6 @@ #include #include #include - #include @@ -35,26 +34,28 @@ public: AsyncRPCQueue& operator=(AsyncRPCQueue &&) = delete; // Move assign void addWorker(); - int getNumberOfWorkers(); - bool isClosed(); + size_t getNumberOfWorkers() const; + bool isClosed() const; void close(); + void closeAndWait(); void cancelAllOperations(); - int getOperationCount(); - std::shared_ptr getOperationForId(AsyncRPCOperationId); + size_t getOperationCount() const; + std::shared_ptr getOperationForId(AsyncRPCOperationId) const; std::shared_ptr popOperationForId(AsyncRPCOperationId); void addOperation(const std::shared_ptr &ptrOperation); - std::vector getAllOperationIds(); + std::vector getAllOperationIds() const; private: // addWorker() will spawn a new thread on this method - void run(int workerId); + void run(size_t workerId); - std::mutex cs_lock; - std::condition_variable cs_condition; - bool closed; - AsyncRPCOperationMap operationMap; - std::queue operationIdQueue; - std::vector workers; + // Why this is not a recursive lock: http://www.zaval.org/resources/library/butenhof1.html + mutable std::mutex lock_; + std::condition_variable condition_; + bool closed_; + AsyncRPCOperationMap operation_map_; + std::queue operation_id_queue_; + std::vector workers_; }; #endif diff --git a/src/rpcserver.cpp b/src/rpcserver.cpp index 906ecee9..88c11aba 100644 --- a/src/rpcserver.cpp +++ b/src/rpcserver.cpp @@ -809,8 +809,8 @@ void StopRPCThreads() delete rpc_io_service; rpc_io_service = NULL; // Tells async queue to cancel all operations and shutdown. - // The async queue destructor will block and join on worker threads. - async_rpc_queue->close(); + LogPrintf("%s: waiting for async rpc workers to stop\n", __func__); + async_rpc_queue->closeAndWait(); } bool IsRPCRunning() diff --git a/src/wallet/asyncrpcoperation_sendmany.cpp b/src/wallet/asyncrpcoperation_sendmany.cpp index 70518bbe..9fd2cd08 100644 --- a/src/wallet/asyncrpcoperation_sendmany.cpp +++ b/src/wallet/asyncrpcoperation_sendmany.cpp @@ -36,15 +36,18 @@ AsyncRPCOperation_sendmany::AsyncRPCOperation_sendmany( int minDepth) : fromaddress_(fromAddress), t_outputs_(tOutputs), z_outputs_(zOutputs), mindepth_(minDepth) { - if (minDepth < 0) + if (minDepth < 0) { throw JSONRPCError(RPC_INVALID_PARAMETER, "Minconf cannot be negative"); - - if (fromAddress.size() == 0) + } + + if (fromAddress.size() == 0) { throw JSONRPCError(RPC_INVALID_PARAMETER, "From address parameter missing"); - - if (tOutputs.size() == 0 && zOutputs.size() == 0) + } + + if (tOutputs.size() == 0 && zOutputs.size() == 0) { throw JSONRPCError(RPC_INVALID_PARAMETER, "No recipients"); - + } + fromtaddr_ = CBitcoinAddress(fromAddress); isfromtaddr_ = fromtaddr_.IsValid(); isfromzaddr_ = false; @@ -56,13 +59,15 @@ AsyncRPCOperation_sendmany::AsyncRPCOperation_sendmany( PaymentAddress addr = address.Get(); // We don't need to lock on the wallet as spending key related methods are thread-safe - if (!pwalletMain->HaveSpendingKey(addr)) + if (!pwalletMain->HaveSpendingKey(addr)) { throw JSONRPCError(RPC_INVALID_ADDRESS_OR_KEY, "Invalid from address, should be a taddr or zaddr."); - + } + SpendingKey key; - if (!pwalletMain->GetSpendingKey(addr, key)) + if (!pwalletMain->GetSpendingKey(addr, key)) { throw JSONRPCError(RPC_INVALID_ADDRESS_OR_KEY, "Invalid from address, no spending key found for zaddr"); - + } + isfromzaddr_ = true; frompaymentaddress_ = addr; spendingkey_ = key; @@ -79,8 +84,8 @@ void AsyncRPCOperation_sendmany::main() { if (isCancelled()) return; - setState(OperationStatus::EXECUTING); - startExecutionClock(); + set_state(OperationStatus::EXECUTING); + start_execution_clock(); bool success = false; @@ -89,25 +94,25 @@ void AsyncRPCOperation_sendmany::main() { } catch (Object objError) { int code = find_value(objError, "code").get_int(); std::string message = find_value(objError, "message").get_str(); - setErrorCode(code); - setErrorMessage(message); + set_error_code(code); + set_error_message(message); } catch (runtime_error e) { - setErrorCode(-1); - setErrorMessage("runtime error: " + string(e.what())); + set_error_code(-1); + set_error_message("runtime error: " + string(e.what())); } catch (logic_error e) { - setErrorCode(-1); - setErrorMessage("logic error: " + string(e.what())); + set_error_code(-1); + set_error_message("logic error: " + string(e.what())); } catch (...) { - setErrorCode(-2); - setErrorMessage("unknown error"); + set_error_code(-2); + set_error_message("unknown error"); } - stopExecutionClock(); + stop_execution_clock(); if (success) { - setState(OperationStatus::SUCCESS); + set_state(OperationStatus::SUCCESS); } else { - setState(OperationStatus::FAILED); + set_state(OperationStatus::FAILED); } } @@ -124,12 +129,14 @@ bool AsyncRPCOperation_sendmany::main_impl() { CAmount minersFee = ASYNC_RPC_OPERATION_DEFAULT_MINERS_FEE; // Regardless of the from address, add all taddr outputs to the raw transaction. - if (isfromtaddr_ && !find_utxos()) + if (isfromtaddr_ && !find_utxos()) { throw JSONRPCError(RPC_WALLET_INSUFFICIENT_FUNDS, "Insufficient funds, no UTXOs found for taddr from address."); - - if (isfromzaddr_ && !find_unspent_notes()) + } + + if (isfromzaddr_ && !find_unspent_notes()) { throw JSONRPCError(RPC_WALLET_INSUFFICIENT_FUNDS, "Insufficient funds, no unspent notes found for zaddr from address."); - + } + CAmount t_inputs_total = 0; for (SendManyInputUTXO & t : t_inputs_) { t_inputs_total += std::get<2>(t); @@ -163,12 +170,13 @@ bool AsyncRPCOperation_sendmany::main_impl() { std::cout << "targetAmount: " << targetAmount << std::endl; #endif - if (isfromtaddr_ && (t_inputs_total < targetAmount)) + if (isfromtaddr_ && (t_inputs_total < targetAmount)) { throw JSONRPCError(RPC_WALLET_INSUFFICIENT_FUNDS, strprintf("Insufficient transparent funds, have %ld, need %ld plus fee %ld", t_inputs_total, t_outputs_total, minersFee)); - - if (isfromzaddr_ && (z_inputs_total < targetAmount)) + } + + if (isfromzaddr_ && (z_inputs_total < targetAmount)) { throw JSONRPCError(RPC_WALLET_INSUFFICIENT_FUNDS, strprintf("Insufficient protected funds, have %ld, need %ld plus fee %ld", z_inputs_total, t_outputs_total, minersFee)); - + } // If from address is a taddr, select UTXOs to spend CAmount selectedUTXOAmount = 0; @@ -177,8 +185,9 @@ bool AsyncRPCOperation_sendmany::main_impl() { for (SendManyInputUTXO & t : t_inputs_) { selectedUTXOAmount += std::get<2>(t); selectedTInputs.push_back(t); - if (selectedUTXOAmount >= targetAmount) + if (selectedUTXOAmount >= targetAmount) { break; + } } t_inputs_ = selectedTInputs; t_inputs_total = selectedUTXOAmount; @@ -273,9 +282,10 @@ bool AsyncRPCOperation_sendmany::main_impl() { if (hexMemo.size() > 0) { std::vector rawMemo = ParseHex(hexMemo.c_str()); boost::array memo = {{0x00}}; - if (rawMemo.size() > ZC_MEMO_SIZE) + if (rawMemo.size() > ZC_MEMO_SIZE) { throw JSONRPCError(RPC_INVALID_PARAMETER, strprintf("Memo size of %d is too big, maximum allowed is %d", rawMemo.size(), ZC_MEMO_SIZE)); - + } + int lenMemo = rawMemo.size(); for (int i = 0; i < ZC_MEMO_SIZE && i < lenMemo; i++) { memo[i] = rawMemo[i]; @@ -294,9 +304,9 @@ bool AsyncRPCOperation_sendmany::main_impl() { // Private change will flow back to sender's zaddr, while transparent change flows to a new taddr. CAmount change = funds - fundsSpent; - if (change < 0) + if (change < 0) { throw JSONRPCError(RPC_WALLET_INSUFFICIENT_FUNDS, strprintf("Insufficient funds or internal error, spent too much leaving negative change %ld", change)); - if (change > 0) { + } else if (change > 0) { if (isfromzaddr_) { info.vjsout.push_back(JSOutput(frompaymentaddress_, change)); } else if (isfromtaddr_) { @@ -308,8 +318,9 @@ bool AsyncRPCOperation_sendmany::main_impl() { CReserveKey keyChange(pwalletMain); CPubKey vchPubKey; bool ret = keyChange.GetReservedKey(vchPubKey); - if (!ret) + if (!ret) { throw JSONRPCError(RPC_WALLET_KEYPOOL_RAN_OUT, "Could not generate a taddr to use as a change address"); // should never fail, as we just unlocked + } CScript scriptPubKey = GetScriptForDestination(vchPubKey.GetID()); CTxOut out(change, scriptPubKey); rawTx.vout.push_back(out); @@ -359,7 +370,7 @@ bool AsyncRPCOperation_sendmany::main_impl() { Object o; o.push_back(Pair("txid", txid)); //o.push_back(Pair("hex", signedtxn)); - setResult(Value(o)); + set_result(Value(o)); } else { // Test mode does not send the transaction to the network. @@ -371,7 +382,7 @@ bool AsyncRPCOperation_sendmany::main_impl() { o.push_back(Pair("test", 1)); o.push_back(Pair("txid", tx.GetTxid().ToString())); o.push_back(Pair("hex", signedtxn)); - setResult(Value(o)); + set_result(Value(o)); } return true; @@ -387,16 +398,19 @@ bool AsyncRPCOperation_sendmany::find_utxos() { pwalletMain->AvailableCoins(vecOutputs, false, NULL, true); BOOST_FOREACH(const COutput& out, vecOutputs) { - if (out.nDepth < mindepth_) + if (out.nDepth < mindepth_) { continue; + } if (setAddress.size()) { CTxDestination address; - if (!ExtractDestination(out.tx->vout[out.i].scriptPubKey, address)) + if (!ExtractDestination(out.tx->vout[out.i].scriptPubKey, address)) { continue; + } - if (!setAddress.count(address)) + if (!setAddress.count(address)) { continue; + } } // TODO: Also examine out.fSpendable ? @@ -416,13 +430,15 @@ bool AsyncRPCOperation_sendmany::find_unspent_notes() { CWalletTx wtx = p.second; // Filter the transactions before checking for notes - if (!CheckFinalTx(wtx) || wtx.GetBlocksToMaturity() > 0 || wtx.GetDepthInMainChain() < mindepth_) + if (!CheckFinalTx(wtx) || wtx.GetBlocksToMaturity() > 0 || wtx.GetDepthInMainChain() < mindepth_) { continue; + } mapNoteData_t mapNoteData = pwalletMain->FindMyNotes(wtx); - if (mapNoteData.size() == 0) + if (mapNoteData.size() == 0) { continue; + } for (auto & pair : mapNoteData) { JSOutPoint jsop = pair.first; @@ -431,8 +447,9 @@ bool AsyncRPCOperation_sendmany::find_unspent_notes() { PaymentAddress pa = nd.address; // skip notes which belong to a different payment address in the wallet - if (!(pa == frompaymentaddress_)) + if (!(pa == frompaymentaddress_)) { continue; + } int i = jsop.js; // Index into CTransaction.vjoinsplit int j = jsop.n; // Index into JSDescription.ciphertexts @@ -451,8 +468,9 @@ bool AsyncRPCOperation_sendmany::find_unspent_notes() { uint256 nullifier = plaintext.note(frompaymentaddress_).nullifier(spendingkey_); bool isSpent = pwalletMain->IsSpent(nullifier); - if (isSpent) + if (isSpent) { continue; + } z_inputs_.push_back(SendManyInputNPT(plaintext, CAmount(plaintext.value))); @@ -473,8 +491,9 @@ bool AsyncRPCOperation_sendmany::find_unspent_notes() { } } - if (z_inputs_.size() == 0) + if (z_inputs_.size() == 0) { return false; + } // sort in descending order, so big notes appear first std::sort(z_inputs_.begin(), z_inputs_.end(), [](SendManyInputNPT i, SendManyInputNPT j) -> bool { @@ -496,8 +515,9 @@ Object AsyncRPCOperation_sendmany::perform_joinsplit(AsyncJoinSplitInfo & info) // Unlock critical section - if (!(witnesses.size() == info.notes.size()) || !(info.notes.size() == info.keys.size())) + if (!(witnesses.size() == info.notes.size()) || !(info.notes.size() == info.keys.size())) { throw runtime_error("number of notes and witnesses and keys do not match"); + } for (size_t i = 0; i < witnesses.size(); i++) { if (!witnesses[i]) { @@ -548,8 +568,9 @@ Object AsyncRPCOperation_sendmany::perform_joinsplit(AsyncJoinSplitInfo & info) info.vpub_old, info.vpub_new); - if (!(jsdesc.Verify(*pzcashParams, joinSplitPubKey))) + if (!(jsdesc.Verify(*pzcashParams, joinSplitPubKey))) { throw std::runtime_error("error verifying joinsplt"); + } mtx.vjoinsplit.push_back(jsdesc); @@ -563,14 +584,18 @@ Object AsyncRPCOperation_sendmany::perform_joinsplit(AsyncJoinSplitInfo & info) dataToBeSigned.begin(), 32, joinSplitPrivKey ) == 0)) + { throw std::runtime_error("crypto_sign_detached failed"); + } // Sanity check if (!(crypto_sign_verify_detached(&mtx.joinSplitSig[0], dataToBeSigned.begin(), 32, mtx.joinSplitPubKey.begin() ) == 0)) + { throw std::runtime_error("crypto_sign_verify_detached failed"); + } CTransaction rawTx(mtx); tx_ = rawTx; @@ -615,8 +640,9 @@ void AsyncRPCOperation_sendmany::add_taddr_outputs_to_tx() { CAmount nAmount = std::get<1>(r); CBitcoinAddress address(outputAddress); - if (!address.IsValid()) + if (!address.IsValid()) { throw JSONRPCError(RPC_INVALID_ADDRESS_OR_KEY, "Invalid output address, not a valid taddr."); + } CScript scriptPubKey = GetScriptForDestination(address.Get());