From 2c1474cb340440118c87dec3efc1a4d6084af5f7 Mon Sep 17 00:00:00 2001 From: HenryNguyen5 Date: Sat, 10 Feb 2018 21:09:40 -0500 Subject: [PATCH] POC working commit --- common/actions/nodeBalancer/actionCreators.ts | 16 +- common/actions/nodeBalancer/actionTypes.ts | 27 +- .../components/InteractExplorer/index.tsx | 2 +- common/libs/nodes/index.ts | 7 +- common/reducers/config/nodes/staticNodes.ts | 19 +- common/reducers/index.ts | 6 +- common/reducers/nodeBalancer/nodes.ts | 118 +++-- common/reducers/nodeBalancer/workers.ts | 52 ++- common/sagas/config/node.ts | 1 - common/sagas/index.ts | 5 +- common/sagas/node/node.ts | 404 ++++++++++++------ common/sagas/node/test.ts | 88 ---- common/selectors/config/nodes.ts | 7 +- common/selectors/nodeBalancer/index.ts | 127 ++++++ shared/types/node.d.ts | 6 +- 15 files changed, 576 insertions(+), 309 deletions(-) delete mode 100644 common/sagas/node/test.ts create mode 100644 common/selectors/nodeBalancer/index.ts diff --git a/common/actions/nodeBalancer/actionCreators.ts b/common/actions/nodeBalancer/actionCreators.ts index c686ce74..0000d700 100644 --- a/common/actions/nodeBalancer/actionCreators.ts +++ b/common/actions/nodeBalancer/actionCreators.ts @@ -13,7 +13,6 @@ import { NodeRemovedAction, TypeKeys } from 'actions/nodeBalancer'; -import { Omit } from 'react-router'; export const balancerFlush = (): BalancerFlushAction => ({ type: TypeKeys.BALANCER_FLUSH @@ -56,15 +55,12 @@ export const workerKilled = (payload: WorkerKilledAction['payload']): WorkerKill payload }); -export const nodeCallRequested = (() => { - let i = 0; - return ( - payload: Omit - ): NodeCallRequestedAction => ({ - type: TypeKeys.NODE_CALL_REQUESTED, - payload: { ...payload, callId: i++ } - }); -})(); +export const nodeCallRequested = ( + payload: NodeCallRequestedAction['payload'] +): NodeCallRequestedAction => ({ + type: TypeKeys.NODE_CALL_REQUESTED, + payload: { ...payload } +}); export const nodeCallTimeout = ( payload: NodeCallTimeoutAction['payload'] diff --git a/common/actions/nodeBalancer/actionTypes.ts b/common/actions/nodeBalancer/actionTypes.ts index 8720783e..821bb793 100644 --- a/common/actions/nodeBalancer/actionTypes.ts +++ b/common/actions/nodeBalancer/actionTypes.ts @@ -1,16 +1,17 @@ import { TypeKeys } from './constants'; -import { DefaultNodeNames } from 'config/data'; import { Task } from 'redux-saga'; import { INodeStats } from 'reducers/nodeBalancer/nodes'; +import { StaticNodeId } from 'types/node'; -export type AllNodeNames = DefaultNodeNames | string; +export type AllNodeIds = StaticNodeId | string; export interface NodeCall { callId: number; rpcMethod: string; rpcArgs: string[]; numOfTimeouts: number; - nodeWhiteList?: AllNodeNames; + minPriorityNodeList: AllNodeIds[]; + nodeWhiteList?: AllNodeIds[]; } export interface BalancerFlushAction { @@ -20,34 +21,33 @@ export interface BalancerFlushAction { export interface NodeOnlineAction { type: TypeKeys.NODE_ONLINE; payload: { - nodeName: AllNodeNames; + nodeId: AllNodeIds; }; } export interface NodeOfflineAction { type: TypeKeys.NODE_OFFLINE; payload: { - nodeName: AllNodeNames; + nodeId: AllNodeIds; }; } -// this is for when new nodes get added dynamically export interface NodeAddedAction { type: TypeKeys.NODE_ADDED; payload: { - nodeName: AllNodeNames; + nodeId: AllNodeIds; } & INodeStats; } export interface NodeRemovedAction { type: TypeKeys.NODE_REMOVED; - payload: { nodeName: AllNodeNames }; + payload: { nodeId: AllNodeIds }; } export interface WorkerSpawnedAction { type: TypeKeys.WORKER_SPAWNED; payload: { - nodeName: AllNodeNames; + nodeId: AllNodeIds; workerId: string; task: Task; }; @@ -64,8 +64,9 @@ export interface WorkerProcessingAction { export interface WorkerKilledAction { type: TypeKeys.WORKER_KILLED; payload: { - nodeName: AllNodeNames; + nodeId: AllNodeIds; workerId: string; + error: Error; }; } @@ -76,17 +77,17 @@ export interface NodeCallRequestedAction { export interface NodeCallTimeoutAction { type: TypeKeys.NODE_CALL_TIMEOUT; - payload: NodeCall & { nodeName: AllNodeNames }; + payload: NodeCall & { nodeId: AllNodeIds; error: Error }; } export interface NodeCallFailedAction { type: TypeKeys.NODE_CALL_FAILED; - payload: NodeCall; + payload: { error: string; nodeCall: NodeCall }; } export interface NodeCallSucceededAction { type: TypeKeys.NODE_CALL_SUCCEEDED; - payload: NodeCall; + payload: { result: string; nodeCall: NodeCall }; } export type BalancerAction = BalancerFlushAction; diff --git a/common/containers/Tabs/Contracts/components/Interact/components/InteractExplorer/index.tsx b/common/containers/Tabs/Contracts/components/Interact/components/InteractExplorer/index.tsx index 8e6f05c6..ee76303b 100644 --- a/common/containers/Tabs/Contracts/components/Interact/components/InteractExplorer/index.tsx +++ b/common/containers/Tabs/Contracts/components/Interact/components/InteractExplorer/index.tsx @@ -11,7 +11,7 @@ import { Fields } from './components'; import { setDataField, TSetDataField } from 'actions/transaction'; import { Data } from 'libs/units'; import Select from 'react-select'; -import { Web3Node } from 'libs/nodes'; +import Web3Node from 'libs/nodes/web3'; import RpcNode from 'libs/nodes/rpc'; interface StateProps { diff --git a/common/libs/nodes/index.ts b/common/libs/nodes/index.ts index 0b8894bd..bc240fe1 100644 --- a/common/libs/nodes/index.ts +++ b/common/libs/nodes/index.ts @@ -1,17 +1,16 @@ -import { INodeConstructor, INode } from 'libs/nodes/INode'; +import { INode } from 'libs/nodes/INode'; import PRPCNode from './rpc'; import PInfuraNode from './infura'; import PEtherscanNode from './etherscan'; import PCustomNode from './custom'; import PWeb3Node from './web3'; -import { requester } from 'sagas/node/node'; -type methods = keyof RPCNode; +import { nodeCallRequester } from 'sagas/node/node'; const handler: ProxyHandler = { get: (target, methodName: string) => { const nodeMethods = Object.getOwnPropertyNames(Object.getPrototypeOf(target)); if (nodeMethods.includes(methodName)) { - return requester(methodName); + return nodeCallRequester(methodName); } } }; diff --git a/common/reducers/config/nodes/staticNodes.ts b/common/reducers/config/nodes/staticNodes.ts index b5b9e029..c6c86aaf 100644 --- a/common/reducers/config/nodes/staticNodes.ts +++ b/common/reducers/config/nodes/staticNodes.ts @@ -1,4 +1,7 @@ import { EtherscanNode, InfuraNode, RPCNode } from 'libs/nodes'; +import PRPCNode from 'libs/nodes/rpc'; +import PEtherscanNode from 'libs/nodes/etherscan'; +import PInfuraNode from 'libs/nodes/infura'; import { TypeKeys, NodeAction } from 'actions/config'; import { NonWeb3NodeConfigs, Web3NodeConfigs } from 'types/node'; @@ -8,7 +11,8 @@ export const INITIAL_STATE: State = { eth_mycrypto: { network: 'ETH', isCustom: false, - lib: new RPCNode('https://api.mycryptoapi.com/eth'), + lib: RPCNode('https://api.mycryptoapi.com/eth'), + pLib: new PRPCNode('https://api.mycryptoapi.com/eth'), service: 'MyCrypto', estimateGas: true }, @@ -16,16 +20,21 @@ export const INITIAL_STATE: State = { network: 'ETH', isCustom: false, service: 'Etherscan.io', - lib: new EtherscanNode('https://api.etherscan.io/api'), + lib: EtherscanNode('https://api.etherscan.io/api'), + pLib: new PEtherscanNode('https://api.etherscan.io/api'), + estimateGas: false }, eth_infura: { network: 'ETH', isCustom: false, service: 'infura.io', - lib: new InfuraNode('https://mainnet.infura.io/mew'), + lib: InfuraNode('https://mainnet.infura.io/mew'), + pLib: new PInfuraNode('https://mainnet.infura.io/mew'), + estimateGas: false - }, + } + /* rop_infura: { network: 'Ropsten', isCustom: false, @@ -74,7 +83,7 @@ export const INITIAL_STATE: State = { service: 'Expanse.tech', lib: new RPCNode('https://node.expanse.tech/'), estimateGas: true - } + }*/ }; export const staticNodes = (state: State = INITIAL_STATE, action: NodeAction) => { diff --git a/common/reducers/index.ts b/common/reducers/index.ts index e29992df..767481bf 100644 --- a/common/reducers/index.ts +++ b/common/reducers/index.ts @@ -10,7 +10,7 @@ import { State as SwapState, swap } from './swap'; import { State as WalletState, wallet } from './wallet'; import { State as TransactionState, transaction } from './transaction'; import { onboardStatus, State as OnboardStatusState } from './onboardStatus'; - +import { nodeBalancer, State as NodeBalancerState } from './nodeBalancer'; export interface AppState { // Custom reducers config: ConfigState; @@ -26,6 +26,7 @@ export interface AppState { routing: any; swap: SwapState; transaction: TransactionState; + nodeBalancer: NodeBalancerState; } export default combineReducers({ @@ -39,5 +40,6 @@ export default combineReducers({ rates, deterministicWallets, routing: routerReducer, - transaction + transaction, + nodeBalancer }); diff --git a/common/reducers/nodeBalancer/nodes.ts b/common/reducers/nodeBalancer/nodes.ts index e8d24f23..b1d48053 100644 --- a/common/reducers/nodeBalancer/nodes.ts +++ b/common/reducers/nodeBalancer/nodes.ts @@ -19,64 +19,130 @@ import { TypeKeys } from 'actions/nodeBalancer/constants'; export interface INodeStats { maxWorkers: number; currWorkersById: string[]; - timeoutThreshold: number; + timeoutThresholdMs: number; isOffline: boolean; requestFailures: number; requestFailureThreshold: number; avgResponseTime: number; - supportedMethods: (keyof RpcNode)[]; + supportedMethods: { [rpcMethod in keyof RpcNode]: boolean }; } export interface State { - [key: string]: Readonly; + [nodeId: string]: Readonly; } -// handle custom node removal - -const INITIAL_STATE = {}; +// hard code in the nodes for now +const INITIAL_STATE: State = { + eth_mycrypto: { + avgResponseTime: 1, + currWorkersById: [], + timeoutThresholdMs: 1000, + isOffline: false, + maxWorkers: 5, + requestFailures: 0, + requestFailureThreshold: 2, + supportedMethods: { + client: true, + requests: true, + ping: true, + sendCallRequest: true, + getBalance: true, + estimateGas: true, + getTokenBalance: true, + getTokenBalances: true, + getTransactionCount: true, + getCurrentBlock: true, + sendRawTx: true + } + }, + eth_ethscan: { + avgResponseTime: 1, + currWorkersById: [], + timeoutThresholdMs: 1000, + isOffline: false, + maxWorkers: 5, + requestFailures: 0, + requestFailureThreshold: 2, + supportedMethods: { + client: true, + requests: true, + ping: true, + sendCallRequest: true, + getBalance: true, + estimateGas: true, + getTokenBalance: true, + getTokenBalances: true, + getTransactionCount: true, + getCurrentBlock: true, + sendRawTx: true + } + }, + eth_infura: { + avgResponseTime: 1, + currWorkersById: [], + timeoutThresholdMs: 1000, + isOffline: false, + maxWorkers: 5, + requestFailures: 0, + requestFailureThreshold: 2, + supportedMethods: { + client: true, + requests: true, + ping: true, + sendCallRequest: true, + getBalance: true, + estimateGas: true, + getTokenBalance: true, + getTokenBalances: true, + getTransactionCount: true, + getCurrentBlock: true, + sendRawTx: true + } + } +}; const handleWorkerKilled: Reducer = ( state: State, - { payload: { nodeName, workerId } }: WorkerKilledAction + { payload: { nodeId, workerId } }: WorkerKilledAction ) => { - const nodeToChange = state[nodeName]; + const nodeToChange = state[nodeId]; const nextNodeState = { ...nodeToChange, currWorkersById: nodeToChange.currWorkersById.filter(id => id !== workerId) }; - return { ...state, [nodeName]: nextNodeState }; + return { ...state, [nodeId]: nextNodeState }; }; const handleWorkerSpawned: Reducer = ( state: State, - { payload: { nodeName, workerId } }: WorkerSpawnedAction + { payload: { nodeId, workerId } }: WorkerSpawnedAction ) => { - const nodeToChange = state[nodeName]; + const nodeToChange = state[nodeId]; const nextNodeState = { ...nodeToChange, currWorkersById: [...nodeToChange.currWorkersById, workerId] }; - return { ...state, [nodeName]: nextNodeState }; + return { ...state, [nodeId]: nextNodeState }; }; const handleNodeOnline: Reducer = ( state: State, - { payload: { nodeName } }: NodeOnlineAction + { payload: { nodeId } }: NodeOnlineAction ) => ({ ...state, - [nodeName]: { - ...state[nodeName], + [nodeId]: { + ...state[nodeId], isOffline: false } }); const handleNodeOffline: Reducer = ( state: State, - { payload: { nodeName } }: NodeOfflineAction + { payload: { nodeId } }: NodeOfflineAction ) => ({ ...state, - [nodeName]: { - ...state[nodeName], + [nodeId]: { + ...state[nodeId], isOffline: true, requestFailures: 0 } @@ -84,29 +150,29 @@ const handleNodeOffline: Reducer = ( const handleNodeAdded: Reducer = ( state: State, - { payload: { nodeName, ...nodeStats } }: NodeAddedAction -) => ({ ...state, [nodeName]: { ...nodeStats } }); + { payload: { nodeId, ...nodeStats } }: NodeAddedAction +) => ({ ...state, [nodeId]: { ...nodeStats } }); const handleNodeRemoved: Reducer = (state: State, { payload }: NodeRemovedAction) => { const stateCopy = { ...state }; - Reflect.deleteProperty(state, payload.nodeName); + Reflect.deleteProperty(state, payload.nodeId); return stateCopy; }; const handleNodeCallTimeout: Reducer = ( state: State, - { payload: { nodeName } }: NodeCallTimeoutAction + { payload: { nodeId } }: NodeCallTimeoutAction ) => ({ ...state, - [nodeName]: { - ...state[nodeName], - requestFailures: state[nodeName].requestFailures + 1 + [nodeId]: { + ...state[nodeId], + requestFailures: state[nodeId].requestFailures + 1 } }); const handleBalancerFlush: Reducer = (state: State, _: BalancerFlushAction) => Object.entries(state).reduce( - (obj, [nodeName, nodeStats]) => ({ ...obj, [nodeName]: { ...nodeStats, requestFailures: 0 } }), + (obj, [nodeId, nodeStats]) => ({ ...obj, [nodeId]: { ...nodeStats, requestFailures: 0 } }), {} as State ); diff --git a/common/reducers/nodeBalancer/workers.ts b/common/reducers/nodeBalancer/workers.ts index 90443245..753db39e 100644 --- a/common/reducers/nodeBalancer/workers.ts +++ b/common/reducers/nodeBalancer/workers.ts @@ -1,28 +1,29 @@ import { Task } from 'redux-saga'; import { - AllNodeNames, + AllNodeIds, NodeCall, WorkerKilledAction, WorkerProcessingAction, WorkerSpawnedAction, NodeCallSucceededAction, WorkerAction, - NodeCallAction + NodeCallAction, + NodeCallTimeoutAction } from 'actions/nodeBalancer'; import { Reducer } from 'redux'; import { TypeKeys } from 'actions/nodeBalancer/constants'; interface IWorker { task: Task; - assignedNode: AllNodeNames; + assignedNode: AllNodeIds; currentPayload: NodeCall | null; } export interface State { - [key: string]: Readonly; + [workerId: string]: Readonly; } -const INITIAL_STATE = {}; +const INITIAL_STATE: State = {}; const handleWorkerKilled: Reducer = (state: State, { payload }: WorkerKilledAction) => { const stateCopy = { ...state }; @@ -40,29 +41,42 @@ const handleWorkerProcessing: Reducer = ( const handleWorkerSpawned: Reducer = (state: State, { payload }: WorkerSpawnedAction) => ({ ...state, - [payload.workerId]: { assignedNode: payload.nodeName, task: payload.task, currentPayload: null } + [payload.workerId]: { assignedNode: payload.nodeId, task: payload.task, currentPayload: null } }); const handleNodeCallSucceeded: Reducer = ( state: State, - { payload: { callId } }: NodeCallSucceededAction + { payload }: NodeCallSucceededAction ) => { - const workerIdToRemove = Object.entries(state).find( + const { nodeCall: { callId } } = payload; + const worker = Object.entries(state).find( ([_, { currentPayload }]) => (currentPayload ? currentPayload.callId === callId : false) ); - - console.assert( - workerIdToRemove, - 'WorkerID not found for successful payload, this should never happen' - ); - if (!workerIdToRemove) { - throw Error(); + if (!worker) { + throw Error(`Worker not found for a successful request, this should never happen`); } - const stateCopy = { ...state }; - Reflect.deleteProperty(stateCopy, workerIdToRemove[0]); + const [workerId, workerInst] = worker; - return stateCopy; + return { ...state, [workerId]: { ...workerInst, currentPayload: null } }; +}; + +// This is almost the exact same as the above, abstract it +const handleNodeCallTimeout: Reducer = ( + state: State, + { payload }: NodeCallTimeoutAction +) => { + const { callId } = payload; + const worker = Object.entries(state).find( + ([_, { currentPayload }]) => (currentPayload ? currentPayload.callId === callId : false) + ); + if (!worker) { + throw Error(`Worker not found for a successful request, this should never happen`); + } + + const [workerId, workerInst] = worker; + + return { ...state, [workerId]: { ...workerInst, currentPayload: null } }; }; export const workers: Reducer = ( @@ -78,6 +92,8 @@ export const workers: Reducer = ( return handleWorkerProcessing(state, action); case TypeKeys.NODE_CALL_SUCCEEDED: return handleNodeCallSucceeded(state, action); + case TypeKeys.NODE_CALL_TIMEOUT: + return handleNodeCallTimeout(state, action); default: return state; } diff --git a/common/sagas/config/node.ts b/common/sagas/config/node.ts index 23029509..a9a329bf 100644 --- a/common/sagas/config/node.ts +++ b/common/sagas/config/node.ts @@ -49,7 +49,6 @@ export function* pollOfflineStatus(): SagaIterator { pingSucceeded: call(nodeConfig.lib.ping.bind(nodeConfig.lib)), timeout: call(delay, 5000) }); - if (pingSucceeded && isOffline) { // If we were able to ping but redux says we're offline, mark online yield put( diff --git a/common/sagas/index.ts b/common/sagas/index.ts index ac57d8ac..3db43b53 100644 --- a/common/sagas/index.ts +++ b/common/sagas/index.ts @@ -15,7 +15,7 @@ import { getBityRatesSaga, getShapeShiftRatesSaga, swapProviderSaga } from './sw import wallet from './wallet'; import { ens } from './ens'; import { transaction } from './transaction'; - +import { nodeBalancer } from './node/node'; export default { ens, liteSend, @@ -33,5 +33,6 @@ export default { transaction, deterministicWallets, swapProviderSaga, - rates + rates, + nodeBalancer }; diff --git a/common/sagas/node/node.ts b/common/sagas/node/node.ts index 7ae53a12..a2cc1c2a 100644 --- a/common/sagas/node/node.ts +++ b/common/sagas/node/node.ts @@ -1,4 +1,4 @@ -import { delay, SagaIterator, buffers, channel, Task, Channel } from 'redux-saga'; +import { delay, SagaIterator, buffers, channel, Task, Channel, takeEvery } from 'redux-saga'; import { call, cancel, @@ -9,158 +9,288 @@ import { race, apply, spawn, - flush + flush, + all, + actionChannel } from 'redux-saga/effects'; -import { nodeCallRequested } from 'actions/nodeBalancer'; +import { + nodeCallRequested, + NodeCall, + workerSpawned, + NodeCallRequestedAction, + nodeCallSucceeded, + workerProcessing, + TypeKeys, + NodeCallSucceededAction, + NodeCallFailedAction, + nodeOffline, + nodeCallFailed, + nodeCallTimeout, + NodeCallTimeoutAction, + NodeOfflineAction, + nodeOnline, + BalancerFlushAction +} from 'actions/nodeBalancer'; +import { + getAvailableNodes, + AvailableNodes, + getNodeStatsById, + getAllMethodsAvailable, + getAvailableNodeId +} from 'selectors/nodeBalancer'; +import { getOffline, getNodeById } from 'selectors/config'; +import { toggleOffline } from 'actions/config'; +import { StaticNodeConfig, CustomNodeConfig, NodeConfig } from '../../../shared/types/node'; +import { INodeStats } from 'reducers/nodeBalancer/nodes'; +// need to check this arbitary number +const MAX_NODE_CALL_TIMEOUTS = 3; + +/** + * For now we're going to hard code the initial node configuration in, + * ideally on initialization, a ping call gets sent to every node in the current network + * to determine which nodes are offline on app start using 'NodeAdded' + * then spawn workers for each node from there using 'WorkerSpawned' + * + */ + +/** + * Each channel id is a 1-1 mapping of a nodeId + */ interface IChannels { - [key: string]: Channel; + [key: string]: Channel; } -function* handleRequest(node, chan) { - let currentPayload; - while (true) { - try { - const { payload } = yield take(chan); - currentPayload = payload; - const result = yield race({ - result: apply(node, node[payload.methodName], payload.methodArgs), - timeout: call(delay, node.timeout) - }); - - if (result.timeout) { - throw Error(); - } - - yield put(result.result, payload.id); - } catch { - const beyondTimeout = yield select(timeoutCounter, node); - if (beyondTimeout) { - const payload = { - type: 'NODE_DOWN', - payload: { node, payload: currentPayload } - }; - return yield put(payload); - } else { - const payload = { - type: 'NODE_TIMEOUT', - payload: { node, payload: currentPayload } - }; - return yield put(payload); - } - } - } -} - -// both nodes and requests may have independent retry counters -// on task cancellation, dispatch a fail action -// on channel flush, dispatch a fail action for all flushed actions +const channels: IChannels = {}; function* initAndChannelNodePool(): SagaIterator { - const availableNodes = yield select(getAvailableNodes); - const getChannelTypes = yield select(getChannelTypes); - const channels: IChannels = {}; - const tasks: Task[] = []; + console.log('Initializing channel and node pool started'); + const availableNodes: AvailableNodes = yield select(getAvailableNodes); + const availableNodesArr = Object.entries(availableNodes); - for (const node of availableNodes) { - const c: Channel = yield call(channel, buffers.expanding(10)); - const t: Task = yield spawn(handleRequest, node, c); - tasks.push(t); - channels[node] = c; - } - - yield put('NODE_POOL_INIT', tasks); - return tasks; -} - -function* handleNodeStatusChanges(chan) { - while (true) { - const { payload } = yield take('NODE_DOWN' | 'NODE_TIMEOUT' | 'NODE_ONLINE'); - const { node } = payload; - const nodeStatus = yield select(getNodeStatus(node)); - - if (nodeStatus.timedOut) { - yield call(delay, 2000); - yield spawn(handleRequest, node, chan); - } - - if (nodeStatus.online) { - const task = yield spawn(handleRequest, node, chan); - yield put('NODE_SPAWNED', task); - } - - if (nodeStatus.offline) { - yield fork(pollOffline, node); - - const availableNode = yield select(getAvailableNode); - - if (availableNode) { - yield spawn(handleRequest, availableNode, chan); - } - - const allNodesDown = yield select(getIsAllNodesDown); - - if (allNodesDown) { - yield put('FLUSH_REQUESTS'); - } - } - // make sure to keep the same ID - yield put('NETWORK_REQUEST'); - } -} - -function* flushHandler(chan): SagaIterator { - let tasks: Task[] = []; - while (true) { - const { node, shouldFlush } = yield race({ - node: take('NODE_SPAWNED', 'NODE_POOL_INIT'), - shouldFlush: take('FLUSH_REQUESTS') - }); - - if (shouldFlush) { - yield cancel(...tasks); - yield flush(chan); - tasks = []; - } else { - tasks.push(node); - } - } -} - -function* pollOffline(node) { - while (true) { - const isOffline = yield call(node, ping); + // if there are no available nodes during the initialization, put the app in an offline state + if (availableNodesArr.length === 0) { + const isOffline: boolean = yield select(getOffline); if (!isOffline) { - yield put({ type: 'NODE_ONLINE', payload: node }); + yield put(toggleOffline()); } - yield call(delay, 2000); + } + + // make a channel per available node and init its workers up to the maxiumum allowed workers + for (const [nodeId, nodeConfig] of availableNodesArr) { + const nodeChannel: Channel = yield call(channel, buffers.expanding(10)); + channels[nodeId] = nodeChannel; + + for ( + let workerNumber = nodeConfig.currWorkersById.length; + workerNumber < nodeConfig.maxWorkers; + workerNumber++ + ) { + const workerId = `${nodeId}_worker_${workerNumber}`; + const workerTask: Task = yield spawn(spawnWorker, workerId, nodeId, nodeChannel); + console.log(`Worker ${workerId} spawned for ${nodeId}`); + yield put(workerSpawned({ nodeId, workerId, task: workerTask })); + } + } + console.log('Initializing channel and node pool finished'); +} + +function* handleNodeCallRequests(): SagaIterator { + const requestChan = yield actionChannel(TypeKeys.NODE_CALL_REQUESTED); + while (true) { + const { payload }: NodeCallRequestedAction = yield take(requestChan); + // check if the app is offline + + // wait until its back online + + // get an available nodeId to put the action to the channel + const nodeId: string = yield select(getAvailableNodeId, payload); + const nodeChannel = channels[nodeId]; + yield put(nodeChannel, payload); } } -function* managerNodePool(): SagaIterator { - const chan = yield call(channel, buffers.expanding(15)); +function* handleCallTimeouts({ + payload: { error, nodeId, ...nodeCall } +}: NodeCallTimeoutAction): SagaIterator { + const nodeStats: Readonly | undefined = yield select(getNodeStatsById, nodeId); + if (!nodeStats) { + throw Error('Could not find node stats'); + } + // if the node has reached maximum failures, declare it as offline + if (nodeStats.requestFailures >= nodeStats.requestFailureThreshold) { + yield put(nodeOffline({ nodeId })); - const channels = yield initAndChannelNodePool(chan); - - yield fork(function*() { - while (true) { - const { payload } = yield take('NETWORK_REQUEST'); - yield put(chan, payload); + //check if all methods are still available after this node goes down + const isAllMethodsAvailable: boolean = yield select(getAllMethodsAvailable); + if (!isAllMethodsAvailable) { + // if not, set app state offline and flush channels + const appIsOffline: boolean = yield select(getOffline); + if (!appIsOffline) { + yield put(toggleOffline()); + } } - }); - yield fork(handleNodeStatusChanges, chan); - yield fork(flushHandler, chan); + } + + // if the payload exceeds timeout limits, return a response failure + if (nodeCall.numOfTimeouts > MAX_NODE_CALL_TIMEOUTS) { + yield put(nodeCallFailed({ error: error.message, nodeCall })); + } else { + // else consider it a timeout on the request to be retried + // might want to make this a seperate action + // add nodeId to min priority to avoid it if possible + const nextNodeCall: NodeCall = { + ...nodeCall, + minPriorityNodeList: [...nodeCall.minPriorityNodeList, nodeId], + numOfTimeouts: ++nodeCall.numOfTimeouts + }; + yield put(nodeCallRequested(nextNodeCall)); + } } -export const requester = name => - function*(args) { - const networkReq = makeNetworkRequest(name, args); - console.log(networkReq); - yield put(networkReq); - const { res } = yield take( - action => action.type === 'NETWORK_SUCCESS' && action.id === networkReq.payload.id - ); - return res; +function* watchOfflineNode({ payload: { nodeId } }: NodeOfflineAction) { + const nodeConfig: NodeConfig = yield select(getNodeById, nodeId); + while (true) { + try { + console.log(`Polling ${nodeId} to see if its online...`); + const { lb } = yield race({ + lb: apply(nodeConfig.pLib, nodeConfig.pLib.getCurrentBlock), + to: call(delay, 5000) + }); + if (lb) { + console.log(`${nodeId} online!`); + yield put(nodeOnline({ nodeId })); + + // check if all methods are available after this node is online + const isAllMethodsAvailable: boolean = yield select(getAllMethodsAvailable); + + // if they are, put app in online state + if (isAllMethodsAvailable) { + const appIsOffline: boolean = yield select(getOffline); + if (appIsOffline) { + yield put(toggleOffline()); + } + } + } + } catch (error) { + yield call(delay, 5000); + console.info(error); + } + + console.log(`${nodeId} still offline`); + } +} + +function* spawnWorker(thisId: string, nodeId: string, chan: IChannels[string]) { + /** + * @description used to differentiate between errors from worker code vs a network call error + * @param message + */ + const createInternalError = (message: string) => { + const e = Error(message); + e.name = 'InternalError'; + return e; }; -const node = [fork(managerNodePool)]; + //select the node config on initialization to avoid re-selecting on every request handled + const nodeConfig: StaticNodeConfig | CustomNodeConfig | undefined = yield select( + getNodeById, + nodeId + ); + if (!nodeConfig) { + throw Error(`Node ${nodeId} not found when selecting from state`); + } + + let currentPayload: NodeCall; + while (true) { + try { + // take from the assigned action channel + const payload: NodeCall = yield take(chan); + currentPayload = payload; + // after taking a request, declare processing state + yield put(workerProcessing({ currentPayload: payload, workerId: thisId })); + + const nodeStats: Readonly | undefined = yield select(getNodeStatsById, nodeId); + + if (!nodeStats) { + throw createInternalError(`Could not find stats for node ${nodeId}`); + } + + const lib = nodeConfig.pLib; + + // make the call in the allotted timeout time + // this will create an infinite loop + const { result, timeout } = yield race({ + result: apply(lib, lib[payload.rpcMethod], payload.rpcArgs), + timeout: call(delay, nodeStats.timeoutThresholdMs) + }); + + //TODO: clean this up + if (timeout || !result) { + throw createInternalError(`Request timed out for ${nodeId}`); + } + + yield put(nodeCallSucceeded({ result, nodeCall: payload })); + } catch (error) { + const e: Error = error; + if (!(e.name === 'InternalError')) { + e.name = `NetworkError_${e.name}`; + } + yield put(nodeCallTimeout({ ...currentPayload!, nodeId, error })); + } + } +} + +export const nodeCallRequester = (() => { + let callId = 0; + return (rpcMethod: string) => { + return function*(...rpcArgs: string[]) { + // allow all nodes for now + const nodeCall: NodeCall = { + callId: ++callId, + numOfTimeouts: 0, + rpcArgs, + rpcMethod, + minPriorityNodeList: [] + }; + + // make the request to the load balancer + const networkReq = nodeCallRequested(nodeCall); + console.log(networkReq); + yield put(networkReq); + + //wait for either a success or error response + const response: NodeCallSucceededAction | NodeCallFailedAction = yield take( + (action: NodeCallSucceededAction | NodeCallFailedAction) => + (action.type === TypeKeys.NODE_CALL_SUCCEEDED || + action.type === TypeKeys.NODE_CALL_FAILED) && + action.payload.nodeCall.callId === networkReq.payload.callId + ); + + // return the result as expected + if (response.type === TypeKeys.NODE_CALL_SUCCEEDED) { + return response.payload.result; + } else { + // or throw an error + throw Error(response.payload.error); + } + }; + }; +})(); + +function* flushHandler(_: BalancerFlushAction): SagaIterator { + const channelValues = Object.values(channels); + for (const chan of channelValues) { + yield flush(chan); + } +} + +export function* nodeBalancer() { + yield all([ + call(initAndChannelNodePool), + takeEvery(TypeKeys.NODE_OFFLINE, watchOfflineNode), + fork(handleNodeCallRequests), + takeEvery(TypeKeys.NODE_CALL_TIMEOUT, handleCallTimeouts), + takeEvery(TypeKeys.BALANCER_FLUSH, flushHandler) + ]); +} diff --git a/common/sagas/node/test.ts b/common/sagas/node/test.ts deleted file mode 100644 index 563d6c8c..00000000 --- a/common/sagas/node/test.ts +++ /dev/null @@ -1,88 +0,0 @@ -export function channel(buffer = buffers.expanding()) { - let closed = false; - let takers = []; - - if (process.env.NODE_ENV === 'development') { - check(buffer, is.buffer, INVALID_BUFFER); - } - - function checkForbiddenStates() { - if (closed && takers.length) { - throw internalErr('Cannot have a closed channel with pending takers'); - } - if (takers.length && !buffer.isEmpty()) { - throw internalErr('Cannot have pending takers with non empty buffer'); - } - } - - function put(input) { - checkForbiddenStates(); - - if (process.env.NODE_ENV === 'development') { - check(input, is.notUndef, UNDEFINED_INPUT_ERROR); - } - - if (closed) { - return; - } - if (!takers.length) { - return buffer.put(input); - } - const cb = takers[0]; - takers.splice(0, 1); - cb(input); - } - - function take(cb) { - checkForbiddenStates(); - - if (process.env.NODE_ENV === 'development') { - check(cb, is.func, "channel.take's callback must be a function"); - } - - if (closed && buffer.isEmpty()) { - cb(END); - } else if (!buffer.isEmpty()) { - cb(buffer.take()); - } else { - takers.push(cb); - cb.cancel = () => remove(takers, cb); - } - } - - function flush(cb) { - checkForbiddenStates(); // TODO: check if some new state should be forbidden now - - if (process.env.NODE_ENV === 'development') { - check(cb, is.func, "channel.flush' callback must be a function"); - } - - if (closed && buffer.isEmpty()) { - cb(END); - return; - } - cb(buffer.flush()); - } - - function close() { - checkForbiddenStates(); - if (!closed) { - closed = true; - if (takers.length) { - const arr = takers; - takers = []; - for (let i = 0, len = arr.length; i < len; i++) { - const taker = arr[i]; - taker(END); - } - } - } - } - - return { - take, - put, - flush, - close - }; -} diff --git a/common/selectors/config/nodes.ts b/common/selectors/config/nodes.ts index f6e6366b..1ef91c93 100644 --- a/common/selectors/config/nodes.ts +++ b/common/selectors/config/nodes.ts @@ -44,10 +44,15 @@ export const getStaticAltNodeIdToWeb3 = (state: AppState) => { export const getStaticNodeFromId = (state: AppState, nodeId: StaticNodeId) => getStaticNodeConfigs(state)[nodeId]; +export const getNodeById = (state: AppState, nodeId: string) => + isStaticNodeId(state, nodeId) + ? getStaticNodeFromId(state, nodeId) + : getCustomNodeFromId(state, nodeId); + export const isStaticNodeId = (state: AppState, nodeId: string): nodeId is StaticNodeWithWeb3Id => Object.keys(getStaticNodeConfigs(state)).includes(nodeId); -const getStaticNodeConfigs = (state: AppState) => getNodes(state).staticNodes; +export const getStaticNodeConfigs = (state: AppState) => getNodes(state).staticNodes; export const getStaticNodeConfig = (state: AppState) => { const { staticNodes, selectedNode: { nodeId } } = getNodes(state); diff --git a/common/selectors/nodeBalancer/index.ts b/common/selectors/nodeBalancer/index.ts new file mode 100644 index 00000000..36622e96 --- /dev/null +++ b/common/selectors/nodeBalancer/index.ts @@ -0,0 +1,127 @@ +import { AppState } from 'reducers'; +import { State as NodeBalancerState, INodeStats } from 'reducers/nodeBalancer/nodes'; +import { Omit } from 'react-redux'; +import { NodeCall } from 'actions/nodeBalancer'; + +const allMethods = [ + 'client', + 'requests', + 'ping', + 'sendCallRequest', + 'getBalance', + 'estimateGas', + 'getTokenBalance', + 'getTokenBalances', + 'getTransactionCount', + 'getCurrentBlock', + 'sendRawTx' +]; + +export const getNodeBalancer = (state: AppState) => state.nodeBalancer; +export const getNodes = (state: AppState) => getNodeBalancer(state).nodes; + +export type AvailableNodes = { + [nodeId in keyof NodeBalancerState]: Omit & { + isOffline: false; + } +}; + +/** + * @description an available node === it being online + * @param state + */ +export const getAvailableNodes = (state: AppState): AvailableNodes => { + const nodes = getNodes(state); + const initialState: AvailableNodes = {}; + + const isAvailable = (node: NodeBalancerState[string]): node is AvailableNodes['string'] => + !node.isOffline; + + return Object.entries(nodes).reduce((accu, [curNodeId, curNode]) => { + if (isAvailable(curNode)) { + return { ...accu, [curNodeId]: curNode }; + } + return accu; + }, initialState); +}; + +export const getAllMethodsAvailable = (state: AppState): boolean => { + const availableNodes = getAvailableNodes(state); + + // goes through each available node and reduces all of their + // available methods into a mapping that contains all supported methods + const availableMethods = Object.values(availableNodes).reduce( + (methods, { supportedMethods }) => ({ + ...methods, + ...Object.entries(supportedMethods).reduce( + // creates a mapping of all supported methods, excluding unsupported ones + (accu, [rpcMethod, isSupported]) => ({ + ...accu, + ...(isSupported ? { [rpcMethod]: true } : {}) + }), + {} + ) + }), + {} + ); + + return allMethods.reduce( + (allAvailable, curMethod) => allAvailable && availableMethods[curMethod], + true + ); +}; + +// TODO: handle cases when no node is selected +// available nodes -> nodes that support the method -> nodes that are whitelisted -> prioritized nodes -> workers not busy +// TODO: include response time in prioritization +export const getAvailableNodeId = (state: AppState, payload: NodeCall) => { + const { nodeWhiteList, rpcMethod, minPriorityNodeList } = payload; + const availableNodes = getAvailableNodes(state); + + const availableNodesArr = Object.entries(availableNodes); + // filter by nodes that can support this method + const supportsMethod = availableNodesArr.filter( + ([_, stats]) => stats.supportedMethods[rpcMethod] + ); + + // filter nodes that are in the whitelist + const isWhitelisted = nodeWhiteList + ? supportsMethod.filter(([nodeId, _]) => nodeWhiteList.includes(nodeId)) + : supportsMethod; + + // grab the nodes that are not included in min priority + const prioritized1 = isWhitelisted.filter(([nodeId, _]) => !minPriorityNodeList.includes(nodeId)); + + // grab the nodes that are included + const prioritized2 = isWhitelisted.filter(([nodeId, _]) => minPriorityNodeList.includes(nodeId)); + + // prioritize the list by using nodes with most workers free + const listToPrioritizeByWorker = prioritized1.length > 0 ? prioritized1 : prioritized2; + + let selectedNode: { nodeId: string; numOfRequestsCurrentProcessing: number }; + + for (const [nodeId, stats] of listToPrioritizeByWorker) { + const numOfRequestsCurrentProcessing = stats.currWorkersById.reduce((processing, wId) => { + const worker = getWorkersById(state, wId); + return worker.currentPayload ? processing + 1 : processing; + }, 0); + + if (!selectedNode!) { + selectedNode = { nodeId, numOfRequestsCurrentProcessing }; + } else { + if (selectedNode!.numOfRequestsCurrentProcessing > numOfRequestsCurrentProcessing) { + selectedNode = { nodeId, numOfRequestsCurrentProcessing }; + } + } + } + + return selectedNode.nodeId; +}; + +export const getWorkers = (state: AppState) => getNodeBalancer(state).workers; +export const getWorkersById = (state: AppState, workerId: string) => getWorkers(state)[workerId]; + +export const getNodeStatsById = ( + state: AppState, + nodeId: string +): Readonly | undefined => getNodes(state)[nodeId]; diff --git a/shared/types/node.d.ts b/shared/types/node.d.ts index dc30d0bd..2373e0bf 100644 --- a/shared/types/node.d.ts +++ b/shared/types/node.d.ts @@ -1,13 +1,15 @@ -import { RPCNode, Web3Node } from 'libs/nodes'; import { StaticNetworkIds } from './network'; import { StaticNodesState, CustomNodesState } from 'reducers/config/nodes'; import CustomNode from 'libs/nodes/custom'; +import RPCNode from 'libs/nodes/rpc'; +import Web3Node from 'libs/nodes/web3'; interface CustomNodeConfig { id: string; isCustom: true; name: string; lib: CustomNode; + pLib: CustomNode; service: 'your custom node'; url: string; port: number; @@ -22,6 +24,8 @@ interface StaticNodeConfig { isCustom: false; network: StaticNetworkIds; lib: RPCNode | Web3Node; + pLib: RPCNode | Web3Node; + service: string; estimateGas?: boolean; hidden?: boolean;