From e86073ec960fce129c78d6268628437a3d77281c Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 4 Aug 2015 18:44:15 -0700 Subject: [PATCH] Use rpc/client/ws_client; OnStart() returns error --- blockchain/pool.go | 3 +- blockchain/reactor.go | 5 +-- cmd/sim_txs/main.go | 4 +-- common/service.go | 23 +++++++------ consensus/reactor.go | 10 ++++-- consensus/state.go | 5 +-- crawler/crawl.go | 76 ++++++++++++++++------------------------- crawler/log.go | 7 ++++ events/events.go | 3 +- mempool/reactor.go | 6 +--- node/node.go | 5 +-- p2p/addrbook.go | 3 +- p2p/connection.go | 5 +-- p2p/listener.go | 3 +- p2p/peer.go | 7 ++-- p2p/pex_reactor.go | 5 +-- p2p/switch.go | 3 +- rpc/client/ws_client.go | 33 ++++++++++++------ rpc/server/handlers.go | 3 +- 19 files changed, 112 insertions(+), 97 deletions(-) create mode 100644 crawler/log.go diff --git a/blockchain/pool.go b/blockchain/pool.go index 51813033..9f2d042d 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -68,10 +68,11 @@ func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- s return bp } -func (pool *BlockPool) OnStart() { +func (pool *BlockPool) OnStart() error { pool.BaseService.OnStart() pool.repeater = NewRepeatTimer("", requestIntervalMS*time.Millisecond) go pool.run() + return nil } func (pool *BlockPool) OnStop() { diff --git a/blockchain/reactor.go b/blockchain/reactor.go index fba51de2..fb7ff205 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -7,12 +7,12 @@ import ( "reflect" "time" - "github.com/tendermint/tendermint/wire" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/wire" ) const ( @@ -76,12 +76,13 @@ func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *Blockc return bcR } -func (bcR *BlockchainReactor) OnStart() { +func (bcR *BlockchainReactor) OnStart() error { bcR.BaseReactor.OnStart() if bcR.sync { bcR.pool.Start() go bcR.poolRoutine() } + return nil } func (bcR *BlockchainReactor) OnStop() { diff --git a/cmd/sim_txs/main.go b/cmd/sim_txs/main.go index 1e6de751..a9b0fc01 100644 --- a/cmd/sim_txs/main.go +++ b/cmd/sim_txs/main.go @@ -75,12 +75,12 @@ func main() { sendTx := makeRandomTransaction(10, rootAccount.Sequence+1, root, 2, accounts) fmt.Println(sendTx) - wsClient, err := rpcclient.NewWSClient("ws://" + remote + "/websocket") + wsClient := rpcclient.NewWSClient("ws://" + remote + "/websocket") + _, err = wsClient.Start() if err != nil { Exit(Fmt("Failed to establish websocket connection: %v", err)) } wsClient.Subscribe(types.EventStringAccInput(sendTx.Inputs[0].Address)) - wsClient.Start() go func() { for { diff --git a/common/service.go b/common/service.go index c01391dd..a992cf2d 100644 --- a/common/service.go +++ b/common/service.go @@ -23,13 +23,13 @@ func NewFooService() *FooService { return fs } -func (fs *FooService) OnStart() { +func (fs *FooService) OnStart() error { fs.BaseService.OnStart() // Always call the overridden method. // initialize private fields // start subroutines, etc. } -func (fs *FooService) OnStop() { +func (fs *FooService) OnStop() error { fs.BaseService.OnStop() // Always call the overridden method. // close/destroy private fields // stop subroutines, etc. @@ -42,8 +42,8 @@ import "sync/atomic" import "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/log15" type Service interface { - Start() bool - OnStart() + Start() (bool, error) + OnStart() error Stop() bool OnStop() @@ -72,24 +72,24 @@ func NewBaseService(log log15.Logger, name string, impl Service) *BaseService { } // Implements Servce -func (bs *BaseService) Start() bool { +func (bs *BaseService) Start() (bool, error) { if atomic.CompareAndSwapUint32(&bs.started, 0, 1) { if atomic.LoadUint32(&bs.stopped) == 1 { bs.log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl) - return false + return false, nil } else { bs.log.Notice(Fmt("Starting %v", bs.name), "impl", bs.impl) } - bs.impl.OnStart() - return true + err := bs.impl.OnStart() + return true, err } else { bs.log.Info(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl) - return false + return false, nil } } // Implements Service -func (bs *BaseService) OnStart() {} +func (bs *BaseService) OnStart() error { return nil } // Implements Service func (bs *BaseService) Stop() bool { @@ -131,8 +131,9 @@ func NewQuitService(log log15.Logger, name string, impl Service) *QuitService { } // NOTE: when overriding OnStart, must call .QuitService.OnStart(). -func (qs *QuitService) OnStart() { +func (qs *QuitService) OnStart() error { qs.Quit = make(chan struct{}) + return nil } // NOTE: when overriding OnStop, must call .QuitService.OnStop(). diff --git a/consensus/reactor.go b/consensus/reactor.go index 1778f7d5..bc400985 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/tendermint/tendermint/wire" bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/consensus/types" @@ -16,6 +15,7 @@ import ( "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/wire" ) const ( @@ -49,13 +49,17 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto return conR } -func (conR *ConsensusReactor) OnStart() { +func (conR *ConsensusReactor) OnStart() error { log.Notice("ConsensusReactor ", "fastSync", conR.fastSync) conR.BaseReactor.OnStart() if !conR.fastSync { - conR.conS.Start() + _, err := conR.conS.Start() + if err != nil { + return err + } } go conR.broadcastNewRoundStepRoutine() + return nil } func (conR *ConsensusReactor) OnStop() { diff --git a/consensus/state.go b/consensus/state.go index abea5b2d..5db6b41b 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -158,7 +158,6 @@ import ( "time" acm "github.com/tendermint/tendermint/account" - "github.com/tendermint/tendermint/wire" bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/consensus/types" @@ -166,6 +165,7 @@ import ( mempl "github.com/tendermint/tendermint/mempool" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/wire" ) var ( @@ -360,9 +360,10 @@ func (cs *ConsensusState) NewStepCh() chan *RoundState { return cs.newStepCh } -func (cs *ConsensusState) OnStart() { +func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart() cs.scheduleRound0(cs.Height) + return nil } func (cs *ConsensusState) OnStop() { diff --git a/crawler/crawl.go b/crawler/crawl.go index 0a405815..ac4665b2 100644 --- a/crawler/crawl.go +++ b/crawler/crawl.go @@ -2,13 +2,15 @@ package crawler import ( "fmt" - "github.com/tendermint/tendermint/wire" - ctypes "github.com/tendermint/tendermint/rpc/core/types" - rpcclient "github.com/tendermint/tendermint/rpc/core_client" - "github.com/tendermint/tendermint/types" "time" - "io/ioutil" + . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/rpc/client" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + core "github.com/tendermint/tendermint/rpc/core_client" + "github.com/tendermint/tendermint/rpc/types" + "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/wire" ) const ( @@ -58,14 +60,14 @@ func (n *Node) SetInfo(status *ctypes.ResponseStatus, netinfo *ctypes.ResponseNe // A node client is used to talk to a node over rpc and websockets type NodeClient struct { - rpc rpcclient.Client + rpc core.Client ws *rpcclient.WSClient } // Create a new client for the node at the given addr func NewNodeClient(addr string) *NodeClient { return &NodeClient{ - rpc: rpcclient.NewClient("http://"+addr, "JSONRPC"), + rpc: core.NewClient("http://"+addr, "JSONRPC"), ws: rpcclient.NewWSClient("ws://" + addr + "/events"), } } @@ -90,6 +92,8 @@ func (ni nodeInfo) unpack() (string, uint16, string, bool, bool) { // A crawler has a local node, a set of potential nodes in the nodePool, and connected nodes. // Maps are only accessed by one go-routine, mediated by the checkQueue type Crawler struct { + QuitService + self *Node client *NodeClient @@ -98,22 +102,22 @@ type Crawler struct { nodes map[string]*Node nodeQueue chan *Node - quit chan struct{} } // Create a new Crawler using the local RPC server at addr func NewCrawler(host string, port uint16) *Crawler { - return &Crawler{ + crawler := &Crawler{ self: &Node{Host: host, RPCPort: port, client: NewNodeClient(fmt.Sprintf("%s:%d", host, port))}, checkQueue: make(chan nodeInfo, CheckQueueBufferSize), nodePool: make(map[string]*Node), nodes: make(map[string]*Node), nodeQueue: make(chan *Node, NodeQueueBufferSize), - quit: make(chan struct{}), } + crawler.QuitService = *NewQuitService(log, "Crawler", crawler) + return crawler } -func (c *Crawler) Start() error { +func (c *Crawler) OnStart() error { // connect to local node first, set info, // and fire peers onto the checkQueue if err := c.pollNode(c.self); err != nil { @@ -122,10 +126,8 @@ func (c *Crawler) Start() error { // connect to weboscket, subscribe to local events // and run the read loop to listen for new blocks - if r, err := c.self.client.ws.Dial(); err != nil { - fmt.Println(r) - b, _ := ioutil.ReadAll(r.Body) - fmt.Println(string(b)) + _, err := c.self.client.ws.Start() + if err != nil { return err } if err := c.self.client.ws.Subscribe(types.EventStringNewBlock()); err != nil { @@ -147,20 +149,16 @@ func (c *Crawler) Start() error { return nil } -func (c *Crawler) Stop() { - close(c.quit) -} - // listen for events from the node and ping it for peers on a ticker func (c *Crawler) readLoop(node *Node) { - wsChan := node.client.ws.Read() + eventsCh := node.client.ws.EventsCh getPeersTicker := time.Tick(time.Second * GetPeersTickerSeconds) for { select { - case wsMsg := <-wsChan: + case eventMsg := <-eventsCh: // update the node with his new info - if err := c.consumeMessage(wsMsg, node); err != nil { + if err := c.consumeMessage(eventMsg, node); err != nil { // lost the node, put him back on the checkQueu c.checkNode(nodeInfo{ host: node.Host, @@ -178,37 +176,21 @@ func (c *Crawler) readLoop(node *Node) { disconnected: true, }) } - case <-c.quit: + case <-c.Quit: return } } } -func (c *Crawler) consumeMessage(wsMsg *rpcclient.WSMsg, node *Node) error { - if wsMsg.Error != nil { - return wsMsg.Error - } - // unmarshal block event - var response struct { - Event string - Data *types.Block - Error string - } +func (c *Crawler) consumeMessage(eventMsg rpctypes.RPCEventResult, node *Node) error { + var block *types.Block var err error - wire.ReadJSON(&response, wsMsg.Data, &err) - if err != nil { - return err - } - if response.Error != "" { - return fmt.Errorf(response.Error) - } - block := response.Data + wire.ReadJSONObject(block, eventMsg.Data, &err) node.LastSeen = time.Now() node.BlockHeight = block.Height node.BlockHistory[block.Height] = node.LastSeen - return nil } @@ -251,7 +233,7 @@ func (c *Crawler) checkLoop() { // queue it for connecting to c.nodeQueue <- n - case <-c.quit: + case <-c.Quit: return } } @@ -263,7 +245,7 @@ func (c *Crawler) connectLoop() { select { case node := <-c.nodeQueue: go c.connectToNode(node) - case <-c.quit: + case <-c.Quit: // close all connections for addr, node := range c.nodes { _, _ = addr, node @@ -278,9 +260,9 @@ func (c *Crawler) connectToNode(node *Node) { addr := node.Address() node.client = NewNodeClient(addr) - - if b, err := node.client.ws.Dial(); err != nil { - fmt.Println("err on ws dial:", b, err) + _, err := node.client.ws.Start() + if err != nil { + fmt.Println("err on ws start:", err) // set failed, return } diff --git a/crawler/log.go b/crawler/log.go new file mode 100644 index 00000000..77b9bdfa --- /dev/null +++ b/crawler/log.go @@ -0,0 +1,7 @@ +package crawler + +import ( + "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/log15" +) + +var log = log15.New("module", "crawler") diff --git a/events/events.go b/events/events.go index 4db65823..25fc7b33 100644 --- a/events/events.go +++ b/events/events.go @@ -31,10 +31,11 @@ func NewEventSwitch() *EventSwitch { return evsw } -func (evsw *EventSwitch) OnStart() { +func (evsw *EventSwitch) OnStart() error { evsw.BaseService.OnStart() evsw.eventCells = make(map[string]*eventCell) evsw.listeners = make(map[string]*eventListener) + return nil } func (evsw *EventSwitch) OnStop() { diff --git a/mempool/reactor.go b/mempool/reactor.go index 2633648c..4f99c449 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -5,11 +5,11 @@ import ( "fmt" "reflect" - "github.com/tendermint/tendermint/wire" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/wire" ) var ( @@ -34,10 +34,6 @@ func NewMempoolReactor(mempool *Mempool) *MempoolReactor { return memR } -// func (memR *MempoolReactor) OnStart() { memR.BaseReactor.OnStart() } - -// func (memR *MempoolReactor) OnStop() { memR.BaseReactor.OnStop() } - // Implements Reactor func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ diff --git a/node/node.go b/node/node.go index 39b6e378..5c7358b6 100644 --- a/node/node.go +++ b/node/node.go @@ -11,7 +11,6 @@ import ( "time" acm "github.com/tendermint/tendermint/account" - "github.com/tendermint/tendermint/wire" bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/consensus" @@ -23,6 +22,7 @@ import ( "github.com/tendermint/tendermint/rpc/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/wire" ) import _ "net/http/pprof" @@ -141,11 +141,12 @@ func NewNode() *Node { } // Call Start() after adding the listeners. -func (n *Node) Start() { +func (n *Node) Start() error { n.book.Start() n.sw.SetNodeInfo(makeNodeInfo(n.sw, n.privKey)) n.sw.SetNodePrivKey(n.privKey) n.sw.Start() + return nil } func (n *Node) Stop() { diff --git a/p2p/addrbook.go b/p2p/addrbook.go index bce9c9c4..9e244751 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -121,11 +121,12 @@ func (a *AddrBook) init() { } } -func (a *AddrBook) OnStart() { +func (a *AddrBook) OnStart() error { a.QuitService.OnStart() a.loadFromFile(a.filePath) a.wg.Add(1) go a.saveRoutine() + return nil } func (a *AddrBook) OnStop() { diff --git a/p2p/connection.go b/p2p/connection.go index 6d3591de..83bafa97 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -11,8 +11,8 @@ import ( "time" flow "github.com/tendermint/tendermint/Godeps/_workspace/src/code.google.com/p/mxk/go1/flowcontrol" - "github.com/tendermint/tendermint/wire" //"github.com/tendermint/log15" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/wire" //"github.com/tendermint/log15" ) const ( @@ -127,7 +127,7 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei return mconn } -func (c *MConnection) OnStart() { +func (c *MConnection) OnStart() error { c.BaseService.OnStart() c.quit = make(chan struct{}) c.flushTimer = NewThrottleTimer("flush", flushThrottleMS*time.Millisecond) @@ -135,6 +135,7 @@ func (c *MConnection) OnStart() { c.chStatsTimer = NewRepeatTimer("chStats", updateStatsSeconds*time.Second) go c.sendRoutine() go c.recvRoutine() + return nil } func (c *MConnection) OnStop() { diff --git a/p2p/listener.go b/p2p/listener.go index dcbaddac..6a62f9ba 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -108,9 +108,10 @@ SKIP_UPNP: return dl } -func (l *DefaultListener) OnStart() { +func (l *DefaultListener) OnStart() error { l.BaseService.OnStart() go l.listenRoutine() + return nil } func (l *DefaultListener) OnStop() { diff --git a/p2p/peer.go b/p2p/peer.go index c1f12db5..711a4fe1 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -5,9 +5,9 @@ import ( "io" "net" - "github.com/tendermint/tendermint/wire" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/wire" ) type Peer struct { @@ -72,9 +72,10 @@ func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactor return p } -func (p *Peer) OnStart() { +func (p *Peer) OnStart() error { p.BaseService.OnStart() - p.mconn.Start() + _, err := p.mconn.Start() + return err } func (p *Peer) OnStop() { diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 0f298d9b..cce7cc1d 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -8,9 +8,9 @@ import ( "reflect" "time" - "github.com/tendermint/tendermint/wire" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/events" + "github.com/tendermint/tendermint/wire" ) var pexErrInvalidMessage = errors.New("Invalid PEX message") @@ -41,9 +41,10 @@ func NewPEXReactor(book *AddrBook) *PEXReactor { return pexR } -func (pexR *PEXReactor) OnStart() { +func (pexR *PEXReactor) OnStart() error { pexR.BaseReactor.OnStart() go pexR.ensurePeersRoutine() + return nil } func (pexR *PEXReactor) OnStop() { diff --git a/p2p/switch.go b/p2p/switch.go index ca2f51a1..11106036 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -153,7 +153,7 @@ func (sw *Switch) SetNodePrivKey(nodePrivKey acm.PrivKeyEd25519) { } // Switch.Start() starts all the reactors, peers, and listeners. -func (sw *Switch) OnStart() { +func (sw *Switch) OnStart() error { sw.BaseService.OnStart() // Start reactors for _, reactor := range sw.reactors { @@ -167,6 +167,7 @@ func (sw *Switch) OnStart() { for _, listener := range sw.listeners { go sw.listenerRoutine(listener) } + return nil } func (sw *Switch) OnStop() { diff --git a/rpc/client/ws_client.go b/rpc/client/ws_client.go index 6480ac96..85f35df2 100644 --- a/rpc/client/ws_client.go +++ b/rpc/client/ws_client.go @@ -16,31 +16,44 @@ const wsResponsesChannelCapacity = 10 type WSClient struct { QuitService + Address string *websocket.Conn EventsCh chan rpctypes.RPCEventResult ResponsesCh chan rpctypes.RPCResponse } // create a new connection -func NewWSClient(addr string) (*WSClient, error) { - dialer := websocket.DefaultDialer - rHeader := http.Header{} - con, _, err := dialer.Dial(addr, rHeader) - if err != nil { - return nil, err - } +func NewWSClient(addr string) *WSClient { wsClient := &WSClient{ - Conn: con, + Address: addr, + Conn: nil, EventsCh: make(chan rpctypes.RPCEventResult, wsEventsChannelCapacity), ResponsesCh: make(chan rpctypes.RPCResponse, wsResponsesChannelCapacity), } wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient) - return wsClient, nil + return wsClient } -func (wsc *WSClient) OnStart() { +func (wsc *WSClient) OnStart() error { wsc.QuitService.OnStart() + err := wsc.dial() + if err != nil { + return err + } go wsc.receiveEventsRoutine() + return nil +} + +func (wsc *WSClient) dial() error { + // Dial + dialer := websocket.DefaultDialer + rHeader := http.Header{} + con, _, err := dialer.Dial(wsc.Address, rHeader) + if err != nil { + return err + } + wsc.Conn = con + return nil } func (wsc *WSClient) OnStop() { diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index 153b778f..16942fac 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -240,7 +240,7 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw } // wsc.Start() blocks until the connection closes. -func (wsc *WSConnection) OnStart() { +func (wsc *WSConnection) OnStart() error { wsc.QuitService.OnStart() // Read subscriptions/unsubscriptions to events @@ -262,6 +262,7 @@ func (wsc *WSConnection) OnStart() { // Write responses, BLOCKING. wsc.writeRoutine() + return nil } func (wsc *WSConnection) OnStop() {