POC working commit

This commit is contained in:
HenryNguyen5 2018-02-10 21:09:40 -05:00
parent d4c07160f0
commit 2c1474cb34
15 changed files with 576 additions and 309 deletions

View File

@ -13,7 +13,6 @@ import {
NodeRemovedAction,
TypeKeys
} from 'actions/nodeBalancer';
import { Omit } from 'react-router';
export const balancerFlush = (): BalancerFlushAction => ({
type: TypeKeys.BALANCER_FLUSH
@ -56,15 +55,12 @@ export const workerKilled = (payload: WorkerKilledAction['payload']): WorkerKill
payload
});
export const nodeCallRequested = (() => {
let i = 0;
return (
payload: Omit<NodeCallRequestedAction['payload'], 'callId'>
): NodeCallRequestedAction => ({
type: TypeKeys.NODE_CALL_REQUESTED,
payload: { ...payload, callId: i++ }
});
})();
export const nodeCallRequested = (
payload: NodeCallRequestedAction['payload']
): NodeCallRequestedAction => ({
type: TypeKeys.NODE_CALL_REQUESTED,
payload: { ...payload }
});
export const nodeCallTimeout = (
payload: NodeCallTimeoutAction['payload']

View File

@ -1,16 +1,17 @@
import { TypeKeys } from './constants';
import { DefaultNodeNames } from 'config/data';
import { Task } from 'redux-saga';
import { INodeStats } from 'reducers/nodeBalancer/nodes';
import { StaticNodeId } from 'types/node';
export type AllNodeNames = DefaultNodeNames | string;
export type AllNodeIds = StaticNodeId | string;
export interface NodeCall {
callId: number;
rpcMethod: string;
rpcArgs: string[];
numOfTimeouts: number;
nodeWhiteList?: AllNodeNames;
minPriorityNodeList: AllNodeIds[];
nodeWhiteList?: AllNodeIds[];
}
export interface BalancerFlushAction {
@ -20,34 +21,33 @@ export interface BalancerFlushAction {
export interface NodeOnlineAction {
type: TypeKeys.NODE_ONLINE;
payload: {
nodeName: AllNodeNames;
nodeId: AllNodeIds;
};
}
export interface NodeOfflineAction {
type: TypeKeys.NODE_OFFLINE;
payload: {
nodeName: AllNodeNames;
nodeId: AllNodeIds;
};
}
// this is for when new nodes get added dynamically
export interface NodeAddedAction {
type: TypeKeys.NODE_ADDED;
payload: {
nodeName: AllNodeNames;
nodeId: AllNodeIds;
} & INodeStats;
}
export interface NodeRemovedAction {
type: TypeKeys.NODE_REMOVED;
payload: { nodeName: AllNodeNames };
payload: { nodeId: AllNodeIds };
}
export interface WorkerSpawnedAction {
type: TypeKeys.WORKER_SPAWNED;
payload: {
nodeName: AllNodeNames;
nodeId: AllNodeIds;
workerId: string;
task: Task;
};
@ -64,8 +64,9 @@ export interface WorkerProcessingAction {
export interface WorkerKilledAction {
type: TypeKeys.WORKER_KILLED;
payload: {
nodeName: AllNodeNames;
nodeId: AllNodeIds;
workerId: string;
error: Error;
};
}
@ -76,17 +77,17 @@ export interface NodeCallRequestedAction {
export interface NodeCallTimeoutAction {
type: TypeKeys.NODE_CALL_TIMEOUT;
payload: NodeCall & { nodeName: AllNodeNames };
payload: NodeCall & { nodeId: AllNodeIds; error: Error };
}
export interface NodeCallFailedAction {
type: TypeKeys.NODE_CALL_FAILED;
payload: NodeCall;
payload: { error: string; nodeCall: NodeCall };
}
export interface NodeCallSucceededAction {
type: TypeKeys.NODE_CALL_SUCCEEDED;
payload: NodeCall;
payload: { result: string; nodeCall: NodeCall };
}
export type BalancerAction = BalancerFlushAction;

View File

@ -11,7 +11,7 @@ import { Fields } from './components';
import { setDataField, TSetDataField } from 'actions/transaction';
import { Data } from 'libs/units';
import Select from 'react-select';
import { Web3Node } from 'libs/nodes';
import Web3Node from 'libs/nodes/web3';
import RpcNode from 'libs/nodes/rpc';
interface StateProps {

View File

@ -1,17 +1,16 @@
import { INodeConstructor, INode } from 'libs/nodes/INode';
import { INode } from 'libs/nodes/INode';
import PRPCNode from './rpc';
import PInfuraNode from './infura';
import PEtherscanNode from './etherscan';
import PCustomNode from './custom';
import PWeb3Node from './web3';
import { requester } from 'sagas/node/node';
type methods = keyof RPCNode;
import { nodeCallRequester } from 'sagas/node/node';
const handler: ProxyHandler<INode> = {
get: (target, methodName: string) => {
const nodeMethods = Object.getOwnPropertyNames(Object.getPrototypeOf(target));
if (nodeMethods.includes(methodName)) {
return requester(methodName);
return nodeCallRequester(methodName);
}
}
};

View File

@ -1,4 +1,7 @@
import { EtherscanNode, InfuraNode, RPCNode } from 'libs/nodes';
import PRPCNode from 'libs/nodes/rpc';
import PEtherscanNode from 'libs/nodes/etherscan';
import PInfuraNode from 'libs/nodes/infura';
import { TypeKeys, NodeAction } from 'actions/config';
import { NonWeb3NodeConfigs, Web3NodeConfigs } from 'types/node';
@ -8,7 +11,8 @@ export const INITIAL_STATE: State = {
eth_mycrypto: {
network: 'ETH',
isCustom: false,
lib: new RPCNode('https://api.mycryptoapi.com/eth'),
lib: RPCNode('https://api.mycryptoapi.com/eth'),
pLib: new PRPCNode('https://api.mycryptoapi.com/eth'),
service: 'MyCrypto',
estimateGas: true
},
@ -16,16 +20,21 @@ export const INITIAL_STATE: State = {
network: 'ETH',
isCustom: false,
service: 'Etherscan.io',
lib: new EtherscanNode('https://api.etherscan.io/api'),
lib: EtherscanNode('https://api.etherscan.io/api'),
pLib: new PEtherscanNode('https://api.etherscan.io/api'),
estimateGas: false
},
eth_infura: {
network: 'ETH',
isCustom: false,
service: 'infura.io',
lib: new InfuraNode('https://mainnet.infura.io/mew'),
lib: InfuraNode('https://mainnet.infura.io/mew'),
pLib: new PInfuraNode('https://mainnet.infura.io/mew'),
estimateGas: false
},
}
/*
rop_infura: {
network: 'Ropsten',
isCustom: false,
@ -74,7 +83,7 @@ export const INITIAL_STATE: State = {
service: 'Expanse.tech',
lib: new RPCNode('https://node.expanse.tech/'),
estimateGas: true
}
}*/
};
export const staticNodes = (state: State = INITIAL_STATE, action: NodeAction) => {

View File

@ -10,7 +10,7 @@ import { State as SwapState, swap } from './swap';
import { State as WalletState, wallet } from './wallet';
import { State as TransactionState, transaction } from './transaction';
import { onboardStatus, State as OnboardStatusState } from './onboardStatus';
import { nodeBalancer, State as NodeBalancerState } from './nodeBalancer';
export interface AppState {
// Custom reducers
config: ConfigState;
@ -26,6 +26,7 @@ export interface AppState {
routing: any;
swap: SwapState;
transaction: TransactionState;
nodeBalancer: NodeBalancerState;
}
export default combineReducers<AppState>({
@ -39,5 +40,6 @@ export default combineReducers<AppState>({
rates,
deterministicWallets,
routing: routerReducer,
transaction
transaction,
nodeBalancer
});

View File

@ -19,64 +19,130 @@ import { TypeKeys } from 'actions/nodeBalancer/constants';
export interface INodeStats {
maxWorkers: number;
currWorkersById: string[];
timeoutThreshold: number;
timeoutThresholdMs: number;
isOffline: boolean;
requestFailures: number;
requestFailureThreshold: number;
avgResponseTime: number;
supportedMethods: (keyof RpcNode)[];
supportedMethods: { [rpcMethod in keyof RpcNode]: boolean };
}
export interface State {
[key: string]: Readonly<INodeStats>;
[nodeId: string]: Readonly<INodeStats>;
}
// handle custom node removal
const INITIAL_STATE = {};
// hard code in the nodes for now
const INITIAL_STATE: State = {
eth_mycrypto: {
avgResponseTime: 1,
currWorkersById: [],
timeoutThresholdMs: 1000,
isOffline: false,
maxWorkers: 5,
requestFailures: 0,
requestFailureThreshold: 2,
supportedMethods: {
client: true,
requests: true,
ping: true,
sendCallRequest: true,
getBalance: true,
estimateGas: true,
getTokenBalance: true,
getTokenBalances: true,
getTransactionCount: true,
getCurrentBlock: true,
sendRawTx: true
}
},
eth_ethscan: {
avgResponseTime: 1,
currWorkersById: [],
timeoutThresholdMs: 1000,
isOffline: false,
maxWorkers: 5,
requestFailures: 0,
requestFailureThreshold: 2,
supportedMethods: {
client: true,
requests: true,
ping: true,
sendCallRequest: true,
getBalance: true,
estimateGas: true,
getTokenBalance: true,
getTokenBalances: true,
getTransactionCount: true,
getCurrentBlock: true,
sendRawTx: true
}
},
eth_infura: {
avgResponseTime: 1,
currWorkersById: [],
timeoutThresholdMs: 1000,
isOffline: false,
maxWorkers: 5,
requestFailures: 0,
requestFailureThreshold: 2,
supportedMethods: {
client: true,
requests: true,
ping: true,
sendCallRequest: true,
getBalance: true,
estimateGas: true,
getTokenBalance: true,
getTokenBalances: true,
getTransactionCount: true,
getCurrentBlock: true,
sendRawTx: true
}
}
};
const handleWorkerKilled: Reducer<State> = (
state: State,
{ payload: { nodeName, workerId } }: WorkerKilledAction
{ payload: { nodeId, workerId } }: WorkerKilledAction
) => {
const nodeToChange = state[nodeName];
const nodeToChange = state[nodeId];
const nextNodeState = {
...nodeToChange,
currWorkersById: nodeToChange.currWorkersById.filter(id => id !== workerId)
};
return { ...state, [nodeName]: nextNodeState };
return { ...state, [nodeId]: nextNodeState };
};
const handleWorkerSpawned: Reducer<State> = (
state: State,
{ payload: { nodeName, workerId } }: WorkerSpawnedAction
{ payload: { nodeId, workerId } }: WorkerSpawnedAction
) => {
const nodeToChange = state[nodeName];
const nodeToChange = state[nodeId];
const nextNodeState = {
...nodeToChange,
currWorkersById: [...nodeToChange.currWorkersById, workerId]
};
return { ...state, [nodeName]: nextNodeState };
return { ...state, [nodeId]: nextNodeState };
};
const handleNodeOnline: Reducer<State> = (
state: State,
{ payload: { nodeName } }: NodeOnlineAction
{ payload: { nodeId } }: NodeOnlineAction
) => ({
...state,
[nodeName]: {
...state[nodeName],
[nodeId]: {
...state[nodeId],
isOffline: false
}
});
const handleNodeOffline: Reducer<State> = (
state: State,
{ payload: { nodeName } }: NodeOfflineAction
{ payload: { nodeId } }: NodeOfflineAction
) => ({
...state,
[nodeName]: {
...state[nodeName],
[nodeId]: {
...state[nodeId],
isOffline: true,
requestFailures: 0
}
@ -84,29 +150,29 @@ const handleNodeOffline: Reducer<State> = (
const handleNodeAdded: Reducer<State> = (
state: State,
{ payload: { nodeName, ...nodeStats } }: NodeAddedAction
) => ({ ...state, [nodeName]: { ...nodeStats } });
{ payload: { nodeId, ...nodeStats } }: NodeAddedAction
) => ({ ...state, [nodeId]: { ...nodeStats } });
const handleNodeRemoved: Reducer<State> = (state: State, { payload }: NodeRemovedAction) => {
const stateCopy = { ...state };
Reflect.deleteProperty(state, payload.nodeName);
Reflect.deleteProperty(state, payload.nodeId);
return stateCopy;
};
const handleNodeCallTimeout: Reducer<State> = (
state: State,
{ payload: { nodeName } }: NodeCallTimeoutAction
{ payload: { nodeId } }: NodeCallTimeoutAction
) => ({
...state,
[nodeName]: {
...state[nodeName],
requestFailures: state[nodeName].requestFailures + 1
[nodeId]: {
...state[nodeId],
requestFailures: state[nodeId].requestFailures + 1
}
});
const handleBalancerFlush: Reducer<State> = (state: State, _: BalancerFlushAction) =>
Object.entries(state).reduce(
(obj, [nodeName, nodeStats]) => ({ ...obj, [nodeName]: { ...nodeStats, requestFailures: 0 } }),
(obj, [nodeId, nodeStats]) => ({ ...obj, [nodeId]: { ...nodeStats, requestFailures: 0 } }),
{} as State
);

View File

@ -1,28 +1,29 @@
import { Task } from 'redux-saga';
import {
AllNodeNames,
AllNodeIds,
NodeCall,
WorkerKilledAction,
WorkerProcessingAction,
WorkerSpawnedAction,
NodeCallSucceededAction,
WorkerAction,
NodeCallAction
NodeCallAction,
NodeCallTimeoutAction
} from 'actions/nodeBalancer';
import { Reducer } from 'redux';
import { TypeKeys } from 'actions/nodeBalancer/constants';
interface IWorker {
task: Task;
assignedNode: AllNodeNames;
assignedNode: AllNodeIds;
currentPayload: NodeCall | null;
}
export interface State {
[key: string]: Readonly<IWorker>;
[workerId: string]: Readonly<IWorker>;
}
const INITIAL_STATE = {};
const INITIAL_STATE: State = {};
const handleWorkerKilled: Reducer<State> = (state: State, { payload }: WorkerKilledAction) => {
const stateCopy = { ...state };
@ -40,29 +41,42 @@ const handleWorkerProcessing: Reducer<State> = (
const handleWorkerSpawned: Reducer<State> = (state: State, { payload }: WorkerSpawnedAction) => ({
...state,
[payload.workerId]: { assignedNode: payload.nodeName, task: payload.task, currentPayload: null }
[payload.workerId]: { assignedNode: payload.nodeId, task: payload.task, currentPayload: null }
});
const handleNodeCallSucceeded: Reducer<State> = (
state: State,
{ payload: { callId } }: NodeCallSucceededAction
{ payload }: NodeCallSucceededAction
) => {
const workerIdToRemove = Object.entries(state).find(
const { nodeCall: { callId } } = payload;
const worker = Object.entries(state).find(
([_, { currentPayload }]) => (currentPayload ? currentPayload.callId === callId : false)
);
console.assert(
workerIdToRemove,
'WorkerID not found for successful payload, this should never happen'
);
if (!workerIdToRemove) {
throw Error();
if (!worker) {
throw Error(`Worker not found for a successful request, this should never happen`);
}
const stateCopy = { ...state };
Reflect.deleteProperty(stateCopy, workerIdToRemove[0]);
const [workerId, workerInst] = worker;
return stateCopy;
return { ...state, [workerId]: { ...workerInst, currentPayload: null } };
};
// This is almost the exact same as the above, abstract it
const handleNodeCallTimeout: Reducer<State> = (
state: State,
{ payload }: NodeCallTimeoutAction
) => {
const { callId } = payload;
const worker = Object.entries(state).find(
([_, { currentPayload }]) => (currentPayload ? currentPayload.callId === callId : false)
);
if (!worker) {
throw Error(`Worker not found for a successful request, this should never happen`);
}
const [workerId, workerInst] = worker;
return { ...state, [workerId]: { ...workerInst, currentPayload: null } };
};
export const workers: Reducer<State> = (
@ -78,6 +92,8 @@ export const workers: Reducer<State> = (
return handleWorkerProcessing(state, action);
case TypeKeys.NODE_CALL_SUCCEEDED:
return handleNodeCallSucceeded(state, action);
case TypeKeys.NODE_CALL_TIMEOUT:
return handleNodeCallTimeout(state, action);
default:
return state;
}

View File

@ -49,7 +49,6 @@ export function* pollOfflineStatus(): SagaIterator {
pingSucceeded: call(nodeConfig.lib.ping.bind(nodeConfig.lib)),
timeout: call(delay, 5000)
});
if (pingSucceeded && isOffline) {
// If we were able to ping but redux says we're offline, mark online
yield put(

View File

@ -15,7 +15,7 @@ import { getBityRatesSaga, getShapeShiftRatesSaga, swapProviderSaga } from './sw
import wallet from './wallet';
import { ens } from './ens';
import { transaction } from './transaction';
import { nodeBalancer } from './node/node';
export default {
ens,
liteSend,
@ -33,5 +33,6 @@ export default {
transaction,
deterministicWallets,
swapProviderSaga,
rates
rates,
nodeBalancer
};

View File

@ -1,4 +1,4 @@
import { delay, SagaIterator, buffers, channel, Task, Channel } from 'redux-saga';
import { delay, SagaIterator, buffers, channel, Task, Channel, takeEvery } from 'redux-saga';
import {
call,
cancel,
@ -9,158 +9,288 @@ import {
race,
apply,
spawn,
flush
flush,
all,
actionChannel
} from 'redux-saga/effects';
import { nodeCallRequested } from 'actions/nodeBalancer';
import {
nodeCallRequested,
NodeCall,
workerSpawned,
NodeCallRequestedAction,
nodeCallSucceeded,
workerProcessing,
TypeKeys,
NodeCallSucceededAction,
NodeCallFailedAction,
nodeOffline,
nodeCallFailed,
nodeCallTimeout,
NodeCallTimeoutAction,
NodeOfflineAction,
nodeOnline,
BalancerFlushAction
} from 'actions/nodeBalancer';
import {
getAvailableNodes,
AvailableNodes,
getNodeStatsById,
getAllMethodsAvailable,
getAvailableNodeId
} from 'selectors/nodeBalancer';
import { getOffline, getNodeById } from 'selectors/config';
import { toggleOffline } from 'actions/config';
import { StaticNodeConfig, CustomNodeConfig, NodeConfig } from '../../../shared/types/node';
import { INodeStats } from 'reducers/nodeBalancer/nodes';
// need to check this arbitary number
const MAX_NODE_CALL_TIMEOUTS = 3;
/**
* For now we're going to hard code the initial node configuration in,
* ideally on initialization, a ping call gets sent to every node in the current network
* to determine which nodes are offline on app start using 'NodeAdded'
* then spawn workers for each node from there using 'WorkerSpawned'
*
*/
/**
* Each channel id is a 1-1 mapping of a nodeId
*/
interface IChannels {
[key: string]: Channel<IPayload>;
[key: string]: Channel<NodeCall>;
}
function* handleRequest(node, chan) {
let currentPayload;
while (true) {
try {
const { payload } = yield take(chan);
currentPayload = payload;
const result = yield race({
result: apply(node, node[payload.methodName], payload.methodArgs),
timeout: call(delay, node.timeout)
});
if (result.timeout) {
throw Error();
}
yield put(result.result, payload.id);
} catch {
const beyondTimeout = yield select(timeoutCounter, node);
if (beyondTimeout) {
const payload = {
type: 'NODE_DOWN',
payload: { node, payload: currentPayload }
};
return yield put(payload);
} else {
const payload = {
type: 'NODE_TIMEOUT',
payload: { node, payload: currentPayload }
};
return yield put(payload);
}
}
}
}
// both nodes and requests may have independent retry counters
// on task cancellation, dispatch a fail action
// on channel flush, dispatch a fail action for all flushed actions
const channels: IChannels = {};
function* initAndChannelNodePool(): SagaIterator {
const availableNodes = yield select(getAvailableNodes);
const getChannelTypes = yield select(getChannelTypes);
const channels: IChannels = {};
const tasks: Task[] = [];
console.log('Initializing channel and node pool started');
const availableNodes: AvailableNodes = yield select(getAvailableNodes);
const availableNodesArr = Object.entries(availableNodes);
for (const node of availableNodes) {
const c: Channel<any> = yield call(channel, buffers.expanding(10));
const t: Task = yield spawn(handleRequest, node, c);
tasks.push(t);
channels[node] = c;
}
yield put('NODE_POOL_INIT', tasks);
return tasks;
}
function* handleNodeStatusChanges(chan) {
while (true) {
const { payload } = yield take('NODE_DOWN' | 'NODE_TIMEOUT' | 'NODE_ONLINE');
const { node } = payload;
const nodeStatus = yield select(getNodeStatus(node));
if (nodeStatus.timedOut) {
yield call(delay, 2000);
yield spawn(handleRequest, node, chan);
}
if (nodeStatus.online) {
const task = yield spawn(handleRequest, node, chan);
yield put('NODE_SPAWNED', task);
}
if (nodeStatus.offline) {
yield fork(pollOffline, node);
const availableNode = yield select(getAvailableNode);
if (availableNode) {
yield spawn(handleRequest, availableNode, chan);
}
const allNodesDown = yield select(getIsAllNodesDown);
if (allNodesDown) {
yield put('FLUSH_REQUESTS');
}
}
// make sure to keep the same ID
yield put('NETWORK_REQUEST');
}
}
function* flushHandler(chan): SagaIterator {
let tasks: Task[] = [];
while (true) {
const { node, shouldFlush } = yield race({
node: take('NODE_SPAWNED', 'NODE_POOL_INIT'),
shouldFlush: take('FLUSH_REQUESTS')
});
if (shouldFlush) {
yield cancel(...tasks);
yield flush(chan);
tasks = [];
} else {
tasks.push(node);
}
}
}
function* pollOffline(node) {
while (true) {
const isOffline = yield call(node, ping);
// if there are no available nodes during the initialization, put the app in an offline state
if (availableNodesArr.length === 0) {
const isOffline: boolean = yield select(getOffline);
if (!isOffline) {
yield put({ type: 'NODE_ONLINE', payload: node });
yield put(toggleOffline());
}
yield call(delay, 2000);
}
// make a channel per available node and init its workers up to the maxiumum allowed workers
for (const [nodeId, nodeConfig] of availableNodesArr) {
const nodeChannel: Channel<NodeCall> = yield call(channel, buffers.expanding(10));
channels[nodeId] = nodeChannel;
for (
let workerNumber = nodeConfig.currWorkersById.length;
workerNumber < nodeConfig.maxWorkers;
workerNumber++
) {
const workerId = `${nodeId}_worker_${workerNumber}`;
const workerTask: Task = yield spawn(spawnWorker, workerId, nodeId, nodeChannel);
console.log(`Worker ${workerId} spawned for ${nodeId}`);
yield put(workerSpawned({ nodeId, workerId, task: workerTask }));
}
}
console.log('Initializing channel and node pool finished');
}
function* handleNodeCallRequests(): SagaIterator {
const requestChan = yield actionChannel(TypeKeys.NODE_CALL_REQUESTED);
while (true) {
const { payload }: NodeCallRequestedAction = yield take(requestChan);
// check if the app is offline
// wait until its back online
// get an available nodeId to put the action to the channel
const nodeId: string = yield select(getAvailableNodeId, payload);
const nodeChannel = channels[nodeId];
yield put(nodeChannel, payload);
}
}
function* managerNodePool(): SagaIterator {
const chan = yield call(channel, buffers.expanding(15));
function* handleCallTimeouts({
payload: { error, nodeId, ...nodeCall }
}: NodeCallTimeoutAction): SagaIterator {
const nodeStats: Readonly<INodeStats> | undefined = yield select(getNodeStatsById, nodeId);
if (!nodeStats) {
throw Error('Could not find node stats');
}
// if the node has reached maximum failures, declare it as offline
if (nodeStats.requestFailures >= nodeStats.requestFailureThreshold) {
yield put(nodeOffline({ nodeId }));
const channels = yield initAndChannelNodePool(chan);
yield fork(function*() {
while (true) {
const { payload } = yield take('NETWORK_REQUEST');
yield put(chan, payload);
//check if all methods are still available after this node goes down
const isAllMethodsAvailable: boolean = yield select(getAllMethodsAvailable);
if (!isAllMethodsAvailable) {
// if not, set app state offline and flush channels
const appIsOffline: boolean = yield select(getOffline);
if (!appIsOffline) {
yield put(toggleOffline());
}
}
});
yield fork(handleNodeStatusChanges, chan);
yield fork(flushHandler, chan);
}
// if the payload exceeds timeout limits, return a response failure
if (nodeCall.numOfTimeouts > MAX_NODE_CALL_TIMEOUTS) {
yield put(nodeCallFailed({ error: error.message, nodeCall }));
} else {
// else consider it a timeout on the request to be retried
// might want to make this a seperate action
// add nodeId to min priority to avoid it if possible
const nextNodeCall: NodeCall = {
...nodeCall,
minPriorityNodeList: [...nodeCall.minPriorityNodeList, nodeId],
numOfTimeouts: ++nodeCall.numOfTimeouts
};
yield put(nodeCallRequested(nextNodeCall));
}
}
export const requester = name =>
function*(args) {
const networkReq = makeNetworkRequest(name, args);
console.log(networkReq);
yield put(networkReq);
const { res } = yield take(
action => action.type === 'NETWORK_SUCCESS' && action.id === networkReq.payload.id
);
return res;
function* watchOfflineNode({ payload: { nodeId } }: NodeOfflineAction) {
const nodeConfig: NodeConfig = yield select(getNodeById, nodeId);
while (true) {
try {
console.log(`Polling ${nodeId} to see if its online...`);
const { lb } = yield race({
lb: apply(nodeConfig.pLib, nodeConfig.pLib.getCurrentBlock),
to: call(delay, 5000)
});
if (lb) {
console.log(`${nodeId} online!`);
yield put(nodeOnline({ nodeId }));
// check if all methods are available after this node is online
const isAllMethodsAvailable: boolean = yield select(getAllMethodsAvailable);
// if they are, put app in online state
if (isAllMethodsAvailable) {
const appIsOffline: boolean = yield select(getOffline);
if (appIsOffline) {
yield put(toggleOffline());
}
}
}
} catch (error) {
yield call(delay, 5000);
console.info(error);
}
console.log(`${nodeId} still offline`);
}
}
function* spawnWorker(thisId: string, nodeId: string, chan: IChannels[string]) {
/**
* @description used to differentiate between errors from worker code vs a network call error
* @param message
*/
const createInternalError = (message: string) => {
const e = Error(message);
e.name = 'InternalError';
return e;
};
const node = [fork(managerNodePool)];
//select the node config on initialization to avoid re-selecting on every request handled
const nodeConfig: StaticNodeConfig | CustomNodeConfig | undefined = yield select(
getNodeById,
nodeId
);
if (!nodeConfig) {
throw Error(`Node ${nodeId} not found when selecting from state`);
}
let currentPayload: NodeCall;
while (true) {
try {
// take from the assigned action channel
const payload: NodeCall = yield take(chan);
currentPayload = payload;
// after taking a request, declare processing state
yield put(workerProcessing({ currentPayload: payload, workerId: thisId }));
const nodeStats: Readonly<INodeStats> | undefined = yield select(getNodeStatsById, nodeId);
if (!nodeStats) {
throw createInternalError(`Could not find stats for node ${nodeId}`);
}
const lib = nodeConfig.pLib;
// make the call in the allotted timeout time
// this will create an infinite loop
const { result, timeout } = yield race({
result: apply(lib, lib[payload.rpcMethod], payload.rpcArgs),
timeout: call(delay, nodeStats.timeoutThresholdMs)
});
//TODO: clean this up
if (timeout || !result) {
throw createInternalError(`Request timed out for ${nodeId}`);
}
yield put(nodeCallSucceeded({ result, nodeCall: payload }));
} catch (error) {
const e: Error = error;
if (!(e.name === 'InternalError')) {
e.name = `NetworkError_${e.name}`;
}
yield put(nodeCallTimeout({ ...currentPayload!, nodeId, error }));
}
}
}
export const nodeCallRequester = (() => {
let callId = 0;
return (rpcMethod: string) => {
return function*(...rpcArgs: string[]) {
// allow all nodes for now
const nodeCall: NodeCall = {
callId: ++callId,
numOfTimeouts: 0,
rpcArgs,
rpcMethod,
minPriorityNodeList: []
};
// make the request to the load balancer
const networkReq = nodeCallRequested(nodeCall);
console.log(networkReq);
yield put(networkReq);
//wait for either a success or error response
const response: NodeCallSucceededAction | NodeCallFailedAction = yield take(
(action: NodeCallSucceededAction | NodeCallFailedAction) =>
(action.type === TypeKeys.NODE_CALL_SUCCEEDED ||
action.type === TypeKeys.NODE_CALL_FAILED) &&
action.payload.nodeCall.callId === networkReq.payload.callId
);
// return the result as expected
if (response.type === TypeKeys.NODE_CALL_SUCCEEDED) {
return response.payload.result;
} else {
// or throw an error
throw Error(response.payload.error);
}
};
};
})();
function* flushHandler(_: BalancerFlushAction): SagaIterator {
const channelValues = Object.values(channels);
for (const chan of channelValues) {
yield flush(chan);
}
}
export function* nodeBalancer() {
yield all([
call(initAndChannelNodePool),
takeEvery(TypeKeys.NODE_OFFLINE, watchOfflineNode),
fork(handleNodeCallRequests),
takeEvery(TypeKeys.NODE_CALL_TIMEOUT, handleCallTimeouts),
takeEvery(TypeKeys.BALANCER_FLUSH, flushHandler)
]);
}

View File

@ -1,88 +0,0 @@
export function channel(buffer = buffers.expanding()) {
let closed = false;
let takers = [];
if (process.env.NODE_ENV === 'development') {
check(buffer, is.buffer, INVALID_BUFFER);
}
function checkForbiddenStates() {
if (closed && takers.length) {
throw internalErr('Cannot have a closed channel with pending takers');
}
if (takers.length && !buffer.isEmpty()) {
throw internalErr('Cannot have pending takers with non empty buffer');
}
}
function put(input) {
checkForbiddenStates();
if (process.env.NODE_ENV === 'development') {
check(input, is.notUndef, UNDEFINED_INPUT_ERROR);
}
if (closed) {
return;
}
if (!takers.length) {
return buffer.put(input);
}
const cb = takers[0];
takers.splice(0, 1);
cb(input);
}
function take(cb) {
checkForbiddenStates();
if (process.env.NODE_ENV === 'development') {
check(cb, is.func, "channel.take's callback must be a function");
}
if (closed && buffer.isEmpty()) {
cb(END);
} else if (!buffer.isEmpty()) {
cb(buffer.take());
} else {
takers.push(cb);
cb.cancel = () => remove(takers, cb);
}
}
function flush(cb) {
checkForbiddenStates(); // TODO: check if some new state should be forbidden now
if (process.env.NODE_ENV === 'development') {
check(cb, is.func, "channel.flush' callback must be a function");
}
if (closed && buffer.isEmpty()) {
cb(END);
return;
}
cb(buffer.flush());
}
function close() {
checkForbiddenStates();
if (!closed) {
closed = true;
if (takers.length) {
const arr = takers;
takers = [];
for (let i = 0, len = arr.length; i < len; i++) {
const taker = arr[i];
taker(END);
}
}
}
}
return {
take,
put,
flush,
close
};
}

View File

@ -44,10 +44,15 @@ export const getStaticAltNodeIdToWeb3 = (state: AppState) => {
export const getStaticNodeFromId = (state: AppState, nodeId: StaticNodeId) =>
getStaticNodeConfigs(state)[nodeId];
export const getNodeById = (state: AppState, nodeId: string) =>
isStaticNodeId(state, nodeId)
? getStaticNodeFromId(state, nodeId)
: getCustomNodeFromId(state, nodeId);
export const isStaticNodeId = (state: AppState, nodeId: string): nodeId is StaticNodeWithWeb3Id =>
Object.keys(getStaticNodeConfigs(state)).includes(nodeId);
const getStaticNodeConfigs = (state: AppState) => getNodes(state).staticNodes;
export const getStaticNodeConfigs = (state: AppState) => getNodes(state).staticNodes;
export const getStaticNodeConfig = (state: AppState) => {
const { staticNodes, selectedNode: { nodeId } } = getNodes(state);

View File

@ -0,0 +1,127 @@
import { AppState } from 'reducers';
import { State as NodeBalancerState, INodeStats } from 'reducers/nodeBalancer/nodes';
import { Omit } from 'react-redux';
import { NodeCall } from 'actions/nodeBalancer';
const allMethods = [
'client',
'requests',
'ping',
'sendCallRequest',
'getBalance',
'estimateGas',
'getTokenBalance',
'getTokenBalances',
'getTransactionCount',
'getCurrentBlock',
'sendRawTx'
];
export const getNodeBalancer = (state: AppState) => state.nodeBalancer;
export const getNodes = (state: AppState) => getNodeBalancer(state).nodes;
export type AvailableNodes = {
[nodeId in keyof NodeBalancerState]: Omit<NodeBalancerState[nodeId], 'isOffline'> & {
isOffline: false;
}
};
/**
* @description an available node === it being online
* @param state
*/
export const getAvailableNodes = (state: AppState): AvailableNodes => {
const nodes = getNodes(state);
const initialState: AvailableNodes = {};
const isAvailable = (node: NodeBalancerState[string]): node is AvailableNodes['string'] =>
!node.isOffline;
return Object.entries(nodes).reduce((accu, [curNodeId, curNode]) => {
if (isAvailable(curNode)) {
return { ...accu, [curNodeId]: curNode };
}
return accu;
}, initialState);
};
export const getAllMethodsAvailable = (state: AppState): boolean => {
const availableNodes = getAvailableNodes(state);
// goes through each available node and reduces all of their
// available methods into a mapping that contains all supported methods
const availableMethods = Object.values(availableNodes).reduce(
(methods, { supportedMethods }) => ({
...methods,
...Object.entries(supportedMethods).reduce(
// creates a mapping of all supported methods, excluding unsupported ones
(accu, [rpcMethod, isSupported]) => ({
...accu,
...(isSupported ? { [rpcMethod]: true } : {})
}),
{}
)
}),
{}
);
return allMethods.reduce(
(allAvailable, curMethod) => allAvailable && availableMethods[curMethod],
true
);
};
// TODO: handle cases when no node is selected
// available nodes -> nodes that support the method -> nodes that are whitelisted -> prioritized nodes -> workers not busy
// TODO: include response time in prioritization
export const getAvailableNodeId = (state: AppState, payload: NodeCall) => {
const { nodeWhiteList, rpcMethod, minPriorityNodeList } = payload;
const availableNodes = getAvailableNodes(state);
const availableNodesArr = Object.entries(availableNodes);
// filter by nodes that can support this method
const supportsMethod = availableNodesArr.filter(
([_, stats]) => stats.supportedMethods[rpcMethod]
);
// filter nodes that are in the whitelist
const isWhitelisted = nodeWhiteList
? supportsMethod.filter(([nodeId, _]) => nodeWhiteList.includes(nodeId))
: supportsMethod;
// grab the nodes that are not included in min priority
const prioritized1 = isWhitelisted.filter(([nodeId, _]) => !minPriorityNodeList.includes(nodeId));
// grab the nodes that are included
const prioritized2 = isWhitelisted.filter(([nodeId, _]) => minPriorityNodeList.includes(nodeId));
// prioritize the list by using nodes with most workers free
const listToPrioritizeByWorker = prioritized1.length > 0 ? prioritized1 : prioritized2;
let selectedNode: { nodeId: string; numOfRequestsCurrentProcessing: number };
for (const [nodeId, stats] of listToPrioritizeByWorker) {
const numOfRequestsCurrentProcessing = stats.currWorkersById.reduce((processing, wId) => {
const worker = getWorkersById(state, wId);
return worker.currentPayload ? processing + 1 : processing;
}, 0);
if (!selectedNode!) {
selectedNode = { nodeId, numOfRequestsCurrentProcessing };
} else {
if (selectedNode!.numOfRequestsCurrentProcessing > numOfRequestsCurrentProcessing) {
selectedNode = { nodeId, numOfRequestsCurrentProcessing };
}
}
}
return selectedNode.nodeId;
};
export const getWorkers = (state: AppState) => getNodeBalancer(state).workers;
export const getWorkersById = (state: AppState, workerId: string) => getWorkers(state)[workerId];
export const getNodeStatsById = (
state: AppState,
nodeId: string
): Readonly<INodeStats> | undefined => getNodes(state)[nodeId];

View File

@ -1,13 +1,15 @@
import { RPCNode, Web3Node } from 'libs/nodes';
import { StaticNetworkIds } from './network';
import { StaticNodesState, CustomNodesState } from 'reducers/config/nodes';
import CustomNode from 'libs/nodes/custom';
import RPCNode from 'libs/nodes/rpc';
import Web3Node from 'libs/nodes/web3';
interface CustomNodeConfig {
id: string;
isCustom: true;
name: string;
lib: CustomNode;
pLib: CustomNode;
service: 'your custom node';
url: string;
port: number;
@ -22,6 +24,8 @@ interface StaticNodeConfig {
isCustom: false;
network: StaticNetworkIds;
lib: RPCNode | Web3Node;
pLib: RPCNode | Web3Node;
service: string;
estimateGas?: boolean;
hidden?: boolean;