Progress commit - add redux/action/actioncreators/proxy

This commit is contained in:
HenryNguyen5 2018-01-17 12:39:47 -05:00
parent c2241dd9df
commit 6f4f9febe6
15 changed files with 785 additions and 24 deletions

View File

@ -0,0 +1,86 @@
import {
BalancerFlushAction,
NodeAddedAction,
WorkerKilledAction,
WorkerProcessingAction,
WorkerSpawnedAction,
NodeCallFailedAction,
NodeCallRequestedAction,
NodeCallSucceededAction,
NodeCallTimeoutAction,
NodeOfflineAction,
NodeOnlineAction,
NodeRemovedAction,
TypeKeys
} from 'actions/nodeBalancer';
import { Omit } from 'react-router';
export const balancerFlush = (): BalancerFlushAction => ({
type: TypeKeys.BALANCER_FLUSH
});
export const nodeOnline = (payload: NodeOnlineAction['payload']): NodeOnlineAction => ({
type: TypeKeys.NODE_ONLINE,
payload
});
export const nodeOffline = (payload: NodeOfflineAction['payload']): NodeOfflineAction => ({
type: TypeKeys.NODE_OFFLINE,
payload
});
export const nodeAdded = (payload: NodeAddedAction['payload']): NodeAddedAction => ({
type: TypeKeys.NODE_ADDED,
payload
});
export const nodeRemoved = (payload: NodeRemovedAction['payload']): NodeRemovedAction => ({
type: TypeKeys.NODE_REMOVED,
payload
});
export const workerSpawned = (payload: WorkerSpawnedAction['payload']): WorkerSpawnedAction => ({
type: TypeKeys.WORKER_SPAWNED,
payload
});
export const workerProcessing = (
payload: WorkerProcessingAction['payload']
): WorkerProcessingAction => ({
type: TypeKeys.WORKER_PROCESSING,
payload
});
export const workerKilled = (payload: WorkerKilledAction['payload']): WorkerKilledAction => ({
type: TypeKeys.WORKER_KILLED,
payload
});
export const nodeCallRequested = (() => {
let i = 0;
return (
payload: Omit<NodeCallRequestedAction['payload'], 'callId'>
): NodeCallRequestedAction => ({
type: TypeKeys.NODE_CALL_REQUESTED,
payload: { ...payload, callId: i++ }
});
})();
export const nodeCallTimeout = (
payload: NodeCallTimeoutAction['payload']
): NodeCallTimeoutAction => ({
type: TypeKeys.NODE_CALL_TIMEOUT,
payload
});
export const nodeCallFailed = (payload: NodeCallFailedAction['payload']): NodeCallFailedAction => ({
type: TypeKeys.NODE_CALL_FAILED,
payload
});
export const nodeCallSucceeded = (
payload: NodeCallSucceededAction['payload']
): NodeCallSucceededAction => ({
type: TypeKeys.NODE_CALL_SUCCEEDED,
payload
});

View File

@ -0,0 +1,104 @@
import { TypeKeys } from './constants';
import { DefaultNodeNames } from 'config/data';
import { Task } from 'redux-saga';
import { INodeStats } from 'reducers/nodeBalancer/nodes';
export type AllNodeNames = DefaultNodeNames | string;
export interface NodeCall {
callId: number;
rpcMethod: string;
rpcArgs: string[];
numOfTimeouts: number;
nodeWhiteList?: AllNodeNames;
}
export interface BalancerFlushAction {
type: TypeKeys.BALANCER_FLUSH;
}
export interface NodeOnlineAction {
type: TypeKeys.NODE_ONLINE;
payload: {
nodeName: AllNodeNames;
};
}
export interface NodeOfflineAction {
type: TypeKeys.NODE_OFFLINE;
payload: {
nodeName: AllNodeNames;
};
}
// this is for when new nodes get added dynamically
export interface NodeAddedAction {
type: TypeKeys.NODE_ADDED;
payload: {
nodeName: AllNodeNames;
} & INodeStats;
}
export interface NodeRemovedAction {
type: TypeKeys.NODE_REMOVED;
payload: { nodeName: AllNodeNames };
}
export interface WorkerSpawnedAction {
type: TypeKeys.WORKER_SPAWNED;
payload: {
nodeName: AllNodeNames;
workerId: string;
task: Task;
};
}
export interface WorkerProcessingAction {
type: TypeKeys.WORKER_PROCESSING;
payload: {
workerId: string;
currentPayload: NodeCall;
};
}
export interface WorkerKilledAction {
type: TypeKeys.WORKER_KILLED;
payload: {
nodeName: AllNodeNames;
workerId: string;
};
}
export interface NodeCallRequestedAction {
type: TypeKeys.NODE_CALL_REQUESTED;
payload: NodeCall;
}
export interface NodeCallTimeoutAction {
type: TypeKeys.NODE_CALL_TIMEOUT;
payload: NodeCall & { nodeName: AllNodeNames };
}
export interface NodeCallFailedAction {
type: TypeKeys.NODE_CALL_FAILED;
payload: NodeCall;
}
export interface NodeCallSucceededAction {
type: TypeKeys.NODE_CALL_SUCCEEDED;
payload: NodeCall;
}
export type BalancerAction = BalancerFlushAction;
export type NodeAction = NodeOnlineAction | NodeOfflineAction | NodeAddedAction | NodeRemovedAction;
export type NodeCallAction =
| NodeCallRequestedAction
| NodeCallTimeoutAction
| NodeCallFailedAction
| NodeCallSucceededAction;
export type WorkerAction = WorkerSpawnedAction | WorkerProcessingAction | WorkerKilledAction;
export type NodeBalancerAction = NodeAction | NodeCallAction | WorkerAction | BalancerAction;

View File

@ -0,0 +1,17 @@
export enum TypeKeys {
NODE_ONLINE = 'NODE_ONLINE',
NODE_OFFLINE = 'NODE_OFFLINE',
NODE_ADDED = 'NODE_ADDED',
NODE_REMOVED = 'NODE_REMOVED',
WORKER_PROCESSING = 'WORKER_PROCESSING',
WORKER_SPAWNED = 'WORKER_SPAWNED',
WORKER_KILLED = 'WORKER_KILLED',
BALANCER_FLUSH = 'BALANCER_FLUSH',
NODE_CALL_REQUESTED = 'NODE_CALL_REQUESTED',
NODE_CALL_TIMEOUT = 'NODE_CALL_TIMEOUT',
NODE_CALL_SUCCEEDED = 'NODE_CALL_SUCCEEDED',
NODE_CALL_FAILED = 'NODE_CALL_FAILED'
}

View File

@ -0,0 +1,3 @@
export * from './actionTypes';
export * from './constants';
export * from './actionCreators';

View File

@ -94,7 +94,7 @@ export interface CustomNetworkConfig {
export interface NodeConfig {
network: string;
lib: RPCNode | Web3Node;
lib: typeof RPCNode | typeof Web3Node;
service: string;
estimateGas?: boolean;
hidden?: boolean;
@ -167,53 +167,65 @@ export const NETWORKS: { [key: string]: NetworkConfig } = {
}
};
export enum DefaultNodeNames {
eth_mew = 'eth_mew',
eth_ethscan = 'eth_ethscan',
eth_infura = 'eth_infura',
rop_mew = 'rop_mew',
rop_infura = 'rop_infura',
kov_ethscan = 'kov_ethscan',
rin_ethscan = 'rin_ethscan',
rin_infura = 'rin_infura',
web3 = 'web3'
}
export const NODES: { [key: string]: NodeConfig } = {
eth_mew: {
network: 'ETH',
lib: new RPCNode('https://api.myetherapi.com/eth'),
lib: RPCNode('https://api.myetherapi.com/eth'),
service: 'MyEtherWallet',
estimateGas: true
},
eth_ethscan: {
network: 'ETH',
service: 'Etherscan.io',
lib: new EtherscanNode('https://api.etherscan.io/api'),
lib: EtherscanNode('https://api.etherscan.io/api'),
estimateGas: false
},
eth_infura: {
network: 'ETH',
service: 'infura.io',
lib: new InfuraNode('https://mainnet.infura.io/mew'),
lib: InfuraNode('https://mainnet.infura.io/mew'),
estimateGas: false
},
rop_mew: {
network: 'Ropsten',
service: 'MyEtherWallet',
lib: new RPCNode('https://api.myetherapi.com/rop'),
lib: RPCNode('https://api.myetherapi.com/rop'),
estimateGas: false
},
rop_infura: {
network: 'Ropsten',
service: 'infura.io',
lib: new InfuraNode('https://ropsten.infura.io/mew'),
lib: InfuraNode('https://ropsten.infura.io/mew'),
estimateGas: false
},
kov_ethscan: {
network: 'Kovan',
service: 'Etherscan.io',
lib: new EtherscanNode('https://kovan.etherscan.io/api'),
lib: EtherscanNode('https://kovan.etherscan.io/api'),
estimateGas: false
},
rin_ethscan: {
network: 'Rinkeby',
service: 'Etherscan.io',
lib: new EtherscanNode('https://rinkeby.etherscan.io/api'),
lib: EtherscanNode('https://rinkeby.etherscan.io/api'),
estimateGas: false
},
rin_infura: {
network: 'Rinkeby',
service: 'infura.io',
lib: new InfuraNode('https://rinkeby.infura.io/mew'),
lib: InfuraNode('https://rinkeby.infura.io/mew'),
estimateGas: false
}
};
@ -232,7 +244,7 @@ export async function setupWeb3Node(): Promise<Web3NodeInfo> {
);
}
const lib = new Web3Node();
const lib = Web3Node();
const networkId = await lib.getNetVersion();
const accounts = await lib.getAccounts();

View File

@ -1,15 +1,20 @@
import { Token } from 'config/data';
import { Wei, TokenValue } from 'libs/units';
import { IHexStrTransaction } from 'libs/transaction';
import {} from './';
export interface TxObj {
to: string;
data: string;
}
interface TokenBalanceResult {
balance: TokenValue;
error: string | null;
}
export interface INodeConstructor {
new (endpoint: string): INode;
}
export interface INode {
ping(): Promise<boolean>;
getBalance(address: string): Promise<Wei>;

View File

@ -1,5 +1,53 @@
export { default as RPCNode } from './rpc';
export { default as InfuraNode } from './infura';
export { default as EtherscanNode } from './etherscan';
export { default as CustomNode } from './custom';
export { default as Web3Node } from './web3';
import { INodeConstructor, INode } from 'libs/nodes/INode';
import PRPCNode from './rpc';
import PInfuraNode from './infura';
import PEtherscanNode from './etherscan';
import PCustomNode from './custom';
import PWeb3Node from './web3';
import { requester } from 'sagas/node/node';
type methods = keyof RPCNode;
const handler: ProxyHandler<INode> = {
get: (target, methodName: string) => {
const nodeMethods = Object.getOwnPropertyNames(Object.getPrototypeOf(target));
if (nodeMethods.includes(methodName)) {
return requester(methodName);
}
}
};
const createNode = (ctor: any, args: any) => {
const instance = new ctor(...args);
return new Proxy(instance, handler);
};
const obj = {
RPCNode: PRPCNode,
InfuraNode: PInfuraNode,
EtherscanNode: PEtherscanNode,
CustomNode: PCustomNode,
Web3Node: PWeb3Node
};
interface INodeInterfaces {
RPCNode: typeof PRPCNode;
InfuraNode: typeof PInfuraNode;
EtherscanNode: typeof PEtherscanNode;
CustomNode: typeof PCustomNode;
Web3Node: typeof PWeb3Node;
}
const x = Object.entries(obj).reduce(
(acc, [key, value]) => {
return {
...acc,
[key](...args) {
return createNode(value, args);
}
};
},
{} as INodeInterfaces
);
const { CustomNode, EtherscanNode, InfuraNode, RPCNode, Web3Node } = x;
export { CustomNode, EtherscanNode, InfuraNode, RPCNode, Web3Node };

View File

@ -46,8 +46,6 @@ export default class RpcNode implements INode {
}
public estimateGas(transaction: Partial<IHexStrTransaction>): Promise<Wei> {
// Timeout after 10 seconds
return this.client
.call(this.requests.estimateGas(transaction))
.then(isValidEstimateGas)

View File

@ -0,0 +1,10 @@
import { nodes, State as NodeState } from './nodes';
import { State as WorkerState, workers } from './workers';
import { combineReducers } from 'redux';
export interface State {
nodes: NodeState;
workers: WorkerState;
}
export const nodeBalancer = combineReducers({ nodes, workers });

View File

@ -0,0 +1,137 @@
import RpcNode from 'libs/nodes/rpc';
import { Reducer } from 'redux';
import {
NodeOnlineAction,
NodeOfflineAction,
NodeAddedAction,
NodeCallTimeoutAction,
WorkerKilledAction,
WorkerSpawnedAction,
NodeAction,
WorkerAction,
NodeCallAction,
BalancerFlushAction,
BalancerAction,
NodeRemovedAction
} from 'actions/nodeBalancer';
import { TypeKeys } from 'actions/nodeBalancer/constants';
export interface INodeStats {
maxWorkers: number;
currWorkersById: string[];
timeoutThreshold: number;
isOffline: boolean;
requestFailures: number;
requestFailureThreshold: number;
avgResponseTime: number;
supportedMethods: (keyof RpcNode)[];
}
export interface State {
[key: string]: Readonly<INodeStats>;
}
// handle custom node removal
const INITIAL_STATE = {};
const handleWorkerKilled: Reducer<State> = (
state: State,
{ payload: { nodeName, workerId } }: WorkerKilledAction
) => {
const nodeToChange = state[nodeName];
const nextNodeState = {
...nodeToChange,
currWorkersById: nodeToChange.currWorkersById.filter(id => id !== workerId)
};
return { ...state, [nodeName]: nextNodeState };
};
const handleWorkerSpawned: Reducer<State> = (
state: State,
{ payload: { nodeName, workerId } }: WorkerSpawnedAction
) => {
const nodeToChange = state[nodeName];
const nextNodeState = {
...nodeToChange,
currWorkersById: [...nodeToChange.currWorkersById, workerId]
};
return { ...state, [nodeName]: nextNodeState };
};
const handleNodeOnline: Reducer<State> = (
state: State,
{ payload: { nodeName } }: NodeOnlineAction
) => ({
...state,
[nodeName]: {
...state[nodeName],
isOffline: false
}
});
const handleNodeOffline: Reducer<State> = (
state: State,
{ payload: { nodeName } }: NodeOfflineAction
) => ({
...state,
[nodeName]: {
...state[nodeName],
isOffline: true,
requestFailures: 0
}
});
const handleNodeAdded: Reducer<State> = (
state: State,
{ payload: { nodeName, ...nodeStats } }: NodeAddedAction
) => ({ ...state, [nodeName]: { ...nodeStats } });
const handleNodeRemoved: Reducer<State> = (state: State, { payload }: NodeRemovedAction) => {
const stateCopy = { ...state };
Reflect.deleteProperty(state, payload.nodeName);
return stateCopy;
};
const handleNodeCallTimeout: Reducer<State> = (
state: State,
{ payload: { nodeName } }: NodeCallTimeoutAction
) => ({
...state,
[nodeName]: {
...state[nodeName],
requestFailures: state[nodeName].requestFailures + 1
}
});
const handleBalancerFlush: Reducer<State> = (state: State, _: BalancerFlushAction) =>
Object.entries(state).reduce(
(obj, [nodeName, nodeStats]) => ({ ...obj, [nodeName]: { ...nodeStats, requestFailures: 0 } }),
{} as State
);
export const nodes: Reducer<State> = (
state: State = INITIAL_STATE,
action: NodeAction | WorkerAction | NodeCallAction | BalancerAction
): State => {
switch (action.type) {
case TypeKeys.WORKER_KILLED:
return handleWorkerKilled(state, action);
case TypeKeys.WORKER_SPAWNED:
return handleWorkerSpawned(state, action);
case TypeKeys.NODE_ONLINE:
return handleNodeOnline(state, action);
case TypeKeys.NODE_OFFLINE:
return handleNodeOffline(state, action);
case TypeKeys.NODE_ADDED:
return handleNodeAdded(state, action);
case TypeKeys.NODE_REMOVED:
return handleNodeRemoved(state, action);
case TypeKeys.NODE_CALL_TIMEOUT:
return handleNodeCallTimeout(state, action);
case TypeKeys.BALANCER_FLUSH:
return handleBalancerFlush(state, action);
default:
return state;
}
};

View File

@ -0,0 +1,84 @@
import { Task } from 'redux-saga';
import {
AllNodeNames,
NodeCall,
WorkerKilledAction,
WorkerProcessingAction,
WorkerSpawnedAction,
NodeCallSucceededAction,
WorkerAction,
NodeCallAction
} from 'actions/nodeBalancer';
import { Reducer } from 'redux';
import { TypeKeys } from 'actions/nodeBalancer/constants';
interface IWorker {
task: Task;
assignedNode: AllNodeNames;
currentPayload: NodeCall | null;
}
export interface State {
[key: string]: Readonly<IWorker>;
}
const INITIAL_STATE = {};
const handleWorkerKilled: Reducer<State> = (state: State, { payload }: WorkerKilledAction) => {
const stateCopy = { ...state };
Reflect.deleteProperty(stateCopy, payload.workerId);
return stateCopy;
};
const handleWorkerProcessing: Reducer<State> = (
state: State,
{ payload: { currentPayload, workerId } }: WorkerProcessingAction
) => ({
...state,
[workerId]: { ...state[workerId], currentPayload }
});
const handleWorkerSpawned: Reducer<State> = (state: State, { payload }: WorkerSpawnedAction) => ({
...state,
[payload.workerId]: { assignedNode: payload.nodeName, task: payload.task, currentPayload: null }
});
const handleNodeCallSucceeded: Reducer<State> = (
state: State,
{ payload: { callId } }: NodeCallSucceededAction
) => {
const workerIdToRemove = Object.entries(state).find(
([_, { currentPayload }]) => (currentPayload ? currentPayload.callId === callId : false)
);
console.assert(
workerIdToRemove,
'WorkerID not found for successful payload, this should never happen'
);
if (!workerIdToRemove) {
throw Error();
}
const stateCopy = { ...state };
Reflect.deleteProperty(stateCopy, workerIdToRemove[0]);
return stateCopy;
};
export const workers: Reducer<State> = (
state: State = INITIAL_STATE,
action: WorkerAction | NodeCallAction
): State => {
switch (action.type) {
case TypeKeys.WORKER_SPAWNED:
return handleWorkerSpawned(state, action);
case TypeKeys.WORKER_KILLED:
return handleWorkerKilled(state, action);
case TypeKeys.WORKER_PROCESSING:
return handleWorkerProcessing(state, action);
case TypeKeys.NODE_CALL_SUCCEEDED:
return handleNodeCallSucceeded(state, action);
default:
return state;
}
};

View File

@ -8,7 +8,8 @@ import {
takeLatest,
takeEvery,
select,
race
race,
apply
} from 'redux-saga/effects';
import { NODES, NETWORKS, NodeConfig, CustomNodeConfig, CustomNetworkConfig } from 'config/data';
import {
@ -22,7 +23,8 @@ import {
getNodeConfig,
getCustomNodeConfigs,
getCustomNetworkConfigs,
getOffline
getOffline,
getNodeLib
} from 'selectors/config';
import { AppState } from 'reducers';
import { TypeKeys } from 'actions/config/constants';
@ -49,12 +51,13 @@ export const getConfig = (state: AppState): ConfigState => state.config;
let hasCheckedOnline = false;
export function* pollOfflineStatus(): SagaIterator {
while (true) {
const node: NodeConfig = yield select(getNodeConfig);
const node = yield select(getNodeLib);
const isOffline: boolean = yield select(getOffline);
// If our offline state disagrees with the browser, run a check
// Don't check if the user is in another tab or window
const shouldPing = !hasCheckedOnline || navigator.onLine === isOffline;
const x = yield apply(node, node.ping);
if (shouldPing && !document.hidden) {
const { pingSucceeded } = yield race({
pingSucceeded: call(node.lib.ping.bind(node.lib)),

166
common/sagas/node/node.ts Normal file
View File

@ -0,0 +1,166 @@
import { delay, SagaIterator, buffers, channel, Task, Channel } from 'redux-saga';
import {
call,
cancel,
fork,
put,
take,
select,
race,
apply,
spawn,
flush
} from 'redux-saga/effects';
import { nodeCallRequested } from 'actions/nodeBalancer';
interface IChannels {
[key: string]: Channel<IPayload>;
}
function* handleRequest(node, chan) {
let currentPayload;
while (true) {
try {
const { payload } = yield take(chan);
currentPayload = payload;
const result = yield race({
result: apply(node, node[payload.methodName], payload.methodArgs),
timeout: call(delay, node.timeout)
});
if (result.timeout) {
throw Error();
}
yield put(result.result, payload.id);
} catch {
const beyondTimeout = yield select(timeoutCounter, node);
if (beyondTimeout) {
const payload = {
type: 'NODE_DOWN',
payload: { node, payload: currentPayload }
};
return yield put(payload);
} else {
const payload = {
type: 'NODE_TIMEOUT',
payload: { node, payload: currentPayload }
};
return yield put(payload);
}
}
}
}
// both nodes and requests may have independent retry counters
// on task cancellation, dispatch a fail action
// on channel flush, dispatch a fail action for all flushed actions
function* initAndChannelNodePool(): SagaIterator {
const availableNodes = yield select(getAvailableNodes);
const getChannelTypes = yield select(getChannelTypes);
const channels: IChannels = {};
const tasks: Task[] = [];
for (const node of availableNodes) {
const c: Channel<any> = yield call(channel, buffers.expanding(10));
const t: Task = yield spawn(handleRequest, node, c);
tasks.push(t);
channels[node] = c;
}
yield put('NODE_POOL_INIT', tasks);
return tasks;
}
function* handleNodeStatusChanges(chan) {
while (true) {
const { payload } = yield take('NODE_DOWN' | 'NODE_TIMEOUT' | 'NODE_ONLINE');
const { node } = payload;
const nodeStatus = yield select(getNodeStatus(node));
if (nodeStatus.timedOut) {
yield call(delay, 2000);
yield spawn(handleRequest, node, chan);
}
if (nodeStatus.online) {
const task = yield spawn(handleRequest, node, chan);
yield put('NODE_SPAWNED', task);
}
if (nodeStatus.offline) {
yield fork(pollOffline, node);
const availableNode = yield select(getAvailableNode);
if (availableNode) {
yield spawn(handleRequest, availableNode, chan);
}
const allNodesDown = yield select(getIsAllNodesDown);
if (allNodesDown) {
yield put('FLUSH_REQUESTS');
}
}
// make sure to keep the same ID
yield put('NETWORK_REQUEST');
}
}
function* flushHandler(chan): SagaIterator {
let tasks: Task[] = [];
while (true) {
const { node, shouldFlush } = yield race({
node: take('NODE_SPAWNED', 'NODE_POOL_INIT'),
shouldFlush: take('FLUSH_REQUESTS')
});
if (shouldFlush) {
yield cancel(...tasks);
yield flush(chan);
tasks = [];
} else {
tasks.push(node);
}
}
}
function* pollOffline(node) {
while (true) {
const isOffline = yield call(node, ping);
if (!isOffline) {
yield put({ type: 'NODE_ONLINE', payload: node });
}
yield call(delay, 2000);
}
}
function* managerNodePool(): SagaIterator {
const chan = yield call(channel, buffers.expanding(15));
const channels = yield initAndChannelNodePool(chan);
yield fork(function*() {
while (true) {
const { payload } = yield take('NETWORK_REQUEST');
yield put(chan, payload);
}
});
yield fork(handleNodeStatusChanges, chan);
yield fork(flushHandler, chan);
}
export const requester = name =>
function*(args) {
const networkReq = makeNetworkRequest(name, args);
console.log(networkReq);
yield put(networkReq);
const { res } = yield take(
action => action.type === 'NETWORK_SUCCESS' && action.id === networkReq.payload.id
);
return res;
};
const node = [fork(managerNodePool)];

88
common/sagas/node/test.ts Normal file
View File

@ -0,0 +1,88 @@
export function channel(buffer = buffers.expanding()) {
let closed = false;
let takers = [];
if (process.env.NODE_ENV === 'development') {
check(buffer, is.buffer, INVALID_BUFFER);
}
function checkForbiddenStates() {
if (closed && takers.length) {
throw internalErr('Cannot have a closed channel with pending takers');
}
if (takers.length && !buffer.isEmpty()) {
throw internalErr('Cannot have pending takers with non empty buffer');
}
}
function put(input) {
checkForbiddenStates();
if (process.env.NODE_ENV === 'development') {
check(input, is.notUndef, UNDEFINED_INPUT_ERROR);
}
if (closed) {
return;
}
if (!takers.length) {
return buffer.put(input);
}
const cb = takers[0];
takers.splice(0, 1);
cb(input);
}
function take(cb) {
checkForbiddenStates();
if (process.env.NODE_ENV === 'development') {
check(cb, is.func, "channel.take's callback must be a function");
}
if (closed && buffer.isEmpty()) {
cb(END);
} else if (!buffer.isEmpty()) {
cb(buffer.take());
} else {
takers.push(cb);
cb.cancel = () => remove(takers, cb);
}
}
function flush(cb) {
checkForbiddenStates(); // TODO: check if some new state should be forbidden now
if (process.env.NODE_ENV === 'development') {
check(cb, is.func, "channel.flush' callback must be a function");
}
if (closed && buffer.isEmpty()) {
cb(END);
return;
}
cb(buffer.flush());
}
function close() {
checkForbiddenStates();
if (!closed) {
closed = true;
if (takers.length) {
const arr = takers;
takers = [];
for (let i = 0, len = arr.length; i < len; i++) {
const taker = arr[i];
taker(END);
}
}
}
}
return {
take,
put,
flush,
close
};
}

View File

@ -10,12 +10,12 @@ import {
State as TransactionState
} from 'reducers/transaction';
import { State as SwapState, INITIAL_STATE as swapInitialState } from 'reducers/swap';
import { applyMiddleware, createStore } from 'redux';
import { applyMiddleware, createStore, Store } from 'redux';
import { composeWithDevTools } from 'redux-devtools-extension';
import { createLogger } from 'redux-logger';
import createSagaMiddleware from 'redux-saga';
import { loadStatePropertyOrEmptyObject, saveState } from 'utils/localStorage';
import RootReducer from './reducers';
import RootReducer, { AppState } from './reducers';
import promiseMiddleware from 'redux-promise-middleware';
import { getNodeConfigFromId } from 'utils/node';
import { getNetworkConfigFromId } from 'utils/network';
@ -164,4 +164,4 @@ const configureStore = () => {
return store;
};
export const configuredStore = configureStore();
export const configuredStore: Store<AppState> = configureStore();