Add test for AsyncRPCQueue and AsyncRPCOperation.
This commit is contained in:
parent
017b3ede33
commit
b922924d14
|
@ -13,6 +13,14 @@
|
|||
|
||||
#include "zcash/Address.hpp"
|
||||
|
||||
#include "rpcserver.h"
|
||||
#include "asyncrpcqueue.h"
|
||||
#include "asyncrpcoperation.h"
|
||||
#include "wallet/asyncrpcoperation_sendmany.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
#include <fstream>
|
||||
#include <unordered_set>
|
||||
|
||||
|
@ -441,4 +449,192 @@ BOOST_AUTO_TEST_CASE(rpc_wallet_z_importexport)
|
|||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Testing Async RPC operations.
|
||||
*
|
||||
* Tip: Create mock operations by subclassing AsyncRPCOperation.
|
||||
*/
|
||||
class MockSleepOperation : public AsyncRPCOperation {
|
||||
public:
|
||||
std::chrono::milliseconds naptime;
|
||||
MockSleepOperation(int t=1000) {
|
||||
this->naptime = std::chrono::milliseconds(t);
|
||||
}
|
||||
virtual ~MockSleepOperation() {
|
||||
}
|
||||
virtual void main() {
|
||||
set_state(OperationStatus::EXECUTING);
|
||||
start_execution_clock();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(naptime));
|
||||
stop_execution_clock();
|
||||
set_result(Value("done"));
|
||||
set_state(OperationStatus::SUCCESS);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* Test Aysnc RPC queue and operations.
|
||||
*/
|
||||
BOOST_AUTO_TEST_CASE(rpc_wallet_async_operations)
|
||||
{
|
||||
std::shared_ptr<AsyncRPCQueue> q = std::make_shared<AsyncRPCQueue>();
|
||||
BOOST_CHECK(q->getNumberOfWorkers() == 0);
|
||||
std::vector<AsyncRPCOperationId> ids = q->getAllOperationIds();
|
||||
BOOST_CHECK(ids.size()==0);
|
||||
|
||||
std::shared_ptr<AsyncRPCOperation> op1 = std::make_shared<AsyncRPCOperation>();
|
||||
q->addOperation(op1);
|
||||
BOOST_CHECK(q->getOperationCount() == 1);
|
||||
|
||||
OperationStatus status = op1->getState();
|
||||
BOOST_CHECK(status == OperationStatus::READY);
|
||||
|
||||
AsyncRPCOperationId id1 = op1->getId();
|
||||
int64_t creationTime1 = op1->getCreationTime();
|
||||
|
||||
q->addWorker();
|
||||
BOOST_CHECK(q->getNumberOfWorkers() == 1);
|
||||
|
||||
// an AsyncRPCOperation doesn't do anything so will finish immediately
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
BOOST_CHECK(q->getOperationCount() == 0);
|
||||
|
||||
// operation should be a success
|
||||
BOOST_CHECK_EQUAL(op1->isCancelled(), false);
|
||||
BOOST_CHECK_EQUAL(op1->isExecuting(), false);
|
||||
BOOST_CHECK_EQUAL(op1->isReady(), false);
|
||||
BOOST_CHECK_EQUAL(op1->isFailed(), false);
|
||||
BOOST_CHECK_EQUAL(op1->isSuccess(), true);
|
||||
BOOST_CHECK(op1->getError() == Value::null );
|
||||
BOOST_CHECK(op1->getResult().is_null() == false );
|
||||
BOOST_CHECK_EQUAL(op1->getStateAsString(), "success");
|
||||
BOOST_CHECK_NE(op1->getStateAsString(), "executing");
|
||||
|
||||
// Create a second operation which just sleeps
|
||||
std::shared_ptr<AsyncRPCOperation> op2(new MockSleepOperation(2500));
|
||||
AsyncRPCOperationId id2 = op2->getId();
|
||||
int64_t creationTime2 = op2->getCreationTime();
|
||||
|
||||
// it's different from the previous operation
|
||||
BOOST_CHECK_NE(op1.get(), op2.get());
|
||||
BOOST_CHECK_NE(id1, id2);
|
||||
BOOST_CHECK_NE(creationTime1, creationTime2);
|
||||
|
||||
// Only the first operation has been added to the queue
|
||||
std::vector<AsyncRPCOperationId> v = q->getAllOperationIds();
|
||||
std::set<AsyncRPCOperationId> opids(v.begin(), v.end());
|
||||
BOOST_CHECK(opids.size() == 1);
|
||||
BOOST_CHECK(opids.count(id1)==1);
|
||||
BOOST_CHECK(opids.count(id2)==0);
|
||||
std::shared_ptr<AsyncRPCOperation> p1 = q->getOperationForId(id1);
|
||||
BOOST_CHECK_EQUAL(p1.get(), op1.get());
|
||||
std::shared_ptr<AsyncRPCOperation> p2 = q->getOperationForId(id2);
|
||||
BOOST_CHECK(!p2); // null ptr as not added to queue yet
|
||||
|
||||
// Add operation 2 and 3 to the queue
|
||||
q->addOperation(op2);
|
||||
std::shared_ptr<AsyncRPCOperation> op3(new MockSleepOperation(1000));
|
||||
q->addOperation(op3);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
BOOST_CHECK_EQUAL(op2->isExecuting(), true);
|
||||
op2->cancel(); // too late, already executing
|
||||
op3->cancel();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
|
||||
BOOST_CHECK_EQUAL(op2->isSuccess(), true);
|
||||
BOOST_CHECK_EQUAL(op2->isCancelled(), false);
|
||||
BOOST_CHECK_EQUAL(op3->isCancelled(), true);
|
||||
|
||||
|
||||
v = q->getAllOperationIds();
|
||||
std::copy( v.begin(), v.end(), std::inserter( opids, opids.end() ) );
|
||||
BOOST_CHECK(opids.size() == 3);
|
||||
BOOST_CHECK(opids.count(id1)==1);
|
||||
BOOST_CHECK(opids.count(id2)==1);
|
||||
BOOST_CHECK(opids.count(op3->getId())==1);
|
||||
q->finishAndWait();
|
||||
}
|
||||
|
||||
|
||||
// The CountOperation will increment this global
|
||||
std::atomic<int64_t> gCounter(0);
|
||||
|
||||
class CountOperation : public AsyncRPCOperation {
|
||||
public:
|
||||
CountOperation() {}
|
||||
virtual ~CountOperation() {}
|
||||
virtual void main() {
|
||||
set_state(OperationStatus::EXECUTING);
|
||||
gCounter++;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
||||
set_state(OperationStatus::SUCCESS);
|
||||
}
|
||||
};
|
||||
|
||||
// This tests the queue waiting for multiple workers to finish
|
||||
BOOST_AUTO_TEST_CASE(rpc_wallet_async_operations_parallel_wait)
|
||||
{
|
||||
gCounter = 0;
|
||||
|
||||
std::shared_ptr<AsyncRPCQueue> q = std::make_shared<AsyncRPCQueue>();
|
||||
q->addWorker();
|
||||
q->addWorker();
|
||||
q->addWorker();
|
||||
q->addWorker();
|
||||
BOOST_CHECK(q->getNumberOfWorkers() == 4);
|
||||
|
||||
int64_t numOperations = 10; // 10 * 1000ms / 4 = 2.5 secs to finish
|
||||
for (int i=0; i<numOperations; i++) {
|
||||
std::shared_ptr<AsyncRPCOperation> op(new CountOperation());
|
||||
q->addOperation(op);
|
||||
}
|
||||
|
||||
std::vector<AsyncRPCOperationId> ids = q->getAllOperationIds();
|
||||
BOOST_CHECK(ids.size()==numOperations);
|
||||
q->finishAndWait();
|
||||
BOOST_CHECK_EQUAL(q->isFinishing(), true);
|
||||
BOOST_CHECK_EQUAL(numOperations, gCounter.load());
|
||||
}
|
||||
|
||||
// This tests the queue shutting down immediately
|
||||
BOOST_AUTO_TEST_CASE(rpc_wallet_async_operations_parallel_cancel)
|
||||
{
|
||||
gCounter = 0;
|
||||
|
||||
std::shared_ptr<AsyncRPCQueue> q = std::make_shared<AsyncRPCQueue>();
|
||||
q->addWorker();
|
||||
q->addWorker();
|
||||
BOOST_CHECK(q->getNumberOfWorkers() == 2);
|
||||
|
||||
int numOperations = 10000;
|
||||
for (int i=0; i<numOperations; i++) {
|
||||
std::shared_ptr<AsyncRPCOperation> op(new CountOperation());
|
||||
q->addOperation(op);
|
||||
}
|
||||
std::vector<AsyncRPCOperationId> ids = q->getAllOperationIds();
|
||||
BOOST_CHECK(ids.size()==numOperations);
|
||||
q->closeAndWait();
|
||||
|
||||
BOOST_CHECK_NE(numOperations, gCounter.load());
|
||||
|
||||
int numSuccess = 0;
|
||||
int numCancelled = 0;
|
||||
for (auto & id : ids) {
|
||||
std::shared_ptr<AsyncRPCOperation> ptr = q->popOperationForId(id);
|
||||
if (ptr->isCancelled()) {
|
||||
numCancelled++;
|
||||
} else if (ptr->isSuccess()) {
|
||||
numSuccess++;
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_CHECK_EQUAL(numOperations, numSuccess+numCancelled);
|
||||
BOOST_CHECK_EQUAL(gCounter.load(), numSuccess);
|
||||
BOOST_CHECK(q->getOperationCount() == 0);
|
||||
ids = q->getAllOperationIds();
|
||||
BOOST_CHECK(ids.size()==0);
|
||||
}
|
||||
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
Loading…
Reference in New Issue