diff --git a/consensus/state.go b/consensus/state.go index 9cfe6dce..ee566469 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -289,6 +289,12 @@ func (cs *ConsensusState) getRoundState() *RoundState { return &rs } +func (cs *ConsensusState) GetValidators() (int, []*types.Validator) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + return cs.state.LastBlockHeight, cs.state.Validators.Copy().Validators +} + func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { cs.mtx.Lock() defer cs.mtx.Unlock() diff --git a/node/node.go b/node/node.go index 8da813ba..5f9d73b4 100644 --- a/node/node.go +++ b/node/node.go @@ -206,10 +206,9 @@ func (n *Node) StartRPC() ([]net.Listener, error) { rpccore.SetEventSwitch(n.evsw) rpccore.SetBlockStore(n.blockStore) rpccore.SetConsensusState(n.consensusState) - rpccore.SetConsensusReactor(n.consensusReactor) - rpccore.SetMempoolReactor(n.mempoolReactor) + rpccore.SetMempool(n.mempoolReactor.Mempool) rpccore.SetSwitch(n.sw) - rpccore.SetPrivValidator(n.privValidator) + rpccore.SetPubKey(n.privValidator.PubKey) rpccore.SetGenesisDoc(n.genesisDoc) rpccore.SetProxyAppQuery(n.proxyApp.Query()) @@ -260,6 +259,14 @@ func (n *Node) PrivValidator() *types.PrivValidator { return n.privValidator } +func (n *Node) GenesisDoc() *types.GenesisDoc { + return n.genesisDoc +} + +func (n *Node) ProxyApp() proxy.AppConns { + return n.proxyApp +} + func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo { nodeInfo := &p2p.NodeInfo{ diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 2a0b4361..e2ccce44 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -8,18 +8,7 @@ import ( ) func Validators() (*ctypes.ResultValidators, error) { - var blockHeight int - var validators []*types.Validator - - // XXX: this is racy. - // Either use state.LoadState(db) or make state atomic (see #165) - state := consensusState.GetState() - blockHeight = state.LastBlockHeight - state.Validators.Iterate(func(index int, val *types.Validator) bool { - validators = append(validators, val) - return false - }) - + blockHeight, validators := consensusState.GetValidators() return &ctypes.ResultValidators{blockHeight, validators}, nil } diff --git a/rpc/core/dev.go b/rpc/core/dev.go index 6ae2014b..43a98953 100644 --- a/rpc/core/dev.go +++ b/rpc/core/dev.go @@ -10,7 +10,7 @@ import ( ) func UnsafeFlushMempool() (*ctypes.ResultUnsafeFlushMempool, error) { - mempoolReactor.Mempool.Flush() + mempool.Flush() return &ctypes.ResultUnsafeFlushMempool{}, nil } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 8705f867..ad599d22 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -14,7 +14,7 @@ import ( // Returns right away, with no response func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - err := mempoolReactor.BroadcastTx(tx, nil) + err := mempool.CheckTx(tx, nil) if err != nil { return nil, fmt.Errorf("Error broadcasting transaction: %v", err) } @@ -24,7 +24,7 @@ func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // Returns with the response from CheckTx func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *tmsp.Response, 1) - err := mempoolReactor.BroadcastTx(tx, func(res *tmsp.Response) { + err := mempool.CheckTx(tx, func(res *tmsp.Response) { resCh <- res }) if err != nil { @@ -58,7 +58,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // broadcast the tx and register checktx callback checkTxResCh := make(chan *tmsp.Response, 1) - err := mempoolReactor.BroadcastTx(tx, func(res *tmsp.Response) { + err := mempool.CheckTx(tx, func(res *tmsp.Response) { checkTxResCh <- res }) if err != nil { @@ -101,10 +101,10 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { } func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { - txs := mempoolReactor.Mempool.Reap(-1) + txs := mempool.Reap(-1) return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil } func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { - return &ctypes.ResultUnconfirmedTxs{N: mempoolReactor.Mempool.Size()}, nil + return &ctypes.ResultUnconfirmedTxs{N: mempool.Size()}, nil } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 8839c4f7..ac776c7f 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -2,26 +2,62 @@ package core import ( cfg "github.com/tendermint/go-config" + "github.com/tendermint/go-crypto" "github.com/tendermint/go-p2p" - bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" - mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" + tmsp "github.com/tendermint/tmsp/types" ) -var eventSwitch types.EventSwitch -var blockStore *bc.BlockStore -var consensusState *consensus.ConsensusState -var consensusReactor *consensus.ConsensusReactor -var mempoolReactor *mempl.MempoolReactor -var p2pSwitch *p2p.Switch -var privValidator *types.PrivValidator -var genDoc *types.GenesisDoc // cache the genesis structure -var proxyAppQuery proxy.AppConnQuery +//----------------------------------------------------- +// Interfaces for use by RPC +// NOTE: these methods must be thread safe! -var config cfg.Config = nil +type BlockStore interface { + Height() int + LoadBlockMeta(height int) *types.BlockMeta + LoadBlock(height int) *types.Block +} + +type Consensus interface { + GetValidators() (int, []*types.Validator) + GetRoundState() *consensus.RoundState +} + +type Mempool interface { + Size() int + CheckTx(types.Tx, func(*tmsp.Response)) error + Reap(int) []types.Tx + Flush() +} + +type P2P interface { + Listeners() []p2p.Listener + Peers() p2p.IPeerSet + NumPeers() (outbound, inbound, dialig int) + NodeInfo() *p2p.NodeInfo + IsListening() bool + DialSeeds([]string) +} + +var ( + // external, thread safe interfaces + eventSwitch types.EventSwitch + proxyAppQuery proxy.AppConnQuery + config cfg.Config + + // interfaces defined above + blockStore BlockStore + consensusState Consensus + mempool Mempool + p2pSwitch P2P + + // objects + pubKey crypto.PubKey + genDoc *types.GenesisDoc // cache the genesis structure +) func SetConfig(c cfg.Config) { config = c @@ -31,28 +67,24 @@ func SetEventSwitch(evsw types.EventSwitch) { eventSwitch = evsw } -func SetBlockStore(bs *bc.BlockStore) { +func SetBlockStore(bs BlockStore) { blockStore = bs } -func SetConsensusState(cs *consensus.ConsensusState) { +func SetConsensusState(cs Consensus) { consensusState = cs } -func SetConsensusReactor(cr *consensus.ConsensusReactor) { - consensusReactor = cr +func SetMempool(mem Mempool) { + mempool = mem } -func SetMempoolReactor(mr *mempl.MempoolReactor) { - mempoolReactor = mr -} - -func SetSwitch(sw *p2p.Switch) { +func SetSwitch(sw P2P) { p2pSwitch = sw } -func SetPrivValidator(pv *types.PrivValidator) { - privValidator = pv +func SetPubKey(pk crypto.PubKey) { + pubKey = pk } func SetGenesisDoc(doc *types.GenesisDoc) { diff --git a/rpc/core/status.go b/rpc/core/status.go index bf3d69ff..8edadf13 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -22,7 +22,7 @@ func Status() (*ctypes.ResultStatus, error) { return &ctypes.ResultStatus{ NodeInfo: p2pSwitch.NodeInfo(), - PubKey: privValidator.PubKey, + PubKey: pubKey, LatestBlockHash: latestBlockHash, LatestAppHash: latestAppHash, LatestBlockHeight: latestHeight,