From 0d1fa8e8842250b7e4e850c553118e95c5a10978 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 28 Jun 2017 11:12:45 -0400 Subject: [PATCH] fixes from review --- blockchain/pool.go | 2 +- consensus/common.go | 6 ++++++ consensus/state.go | 2 +- consensus/state_test.go | 15 ++++----------- p2p/switch.go | 2 +- rpc/lib/server/handlers.go | 15 ++++++++++----- 6 files changed, 23 insertions(+), 19 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 48a258c7..e1288c9f 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -28,7 +28,7 @@ var peerTimeoutSeconds = time.Duration(15) // not const so we can override with Every so often we ask peers what height they're on so we can keep going. Requests are continuously made for blocks of higher heights until - we reach the limits. If most of the requests have no available peers, and we + the limit is reached. If most of the requests have no available peers, and we are not at peer limits, we can probably switch to consensus reactor */ diff --git a/consensus/common.go b/consensus/common.go index 6f76d188..1e16c4da 100644 --- a/consensus/common.go +++ b/consensus/common.go @@ -27,3 +27,9 @@ func subscribeToEventRespond(evsw types.EventSwitch, receiver, eventID string) c }) return ch } + +func discardFromChan(ch chan interface{}, n int) { + for i := 0; i < n; i++ { + <-ch + } +} diff --git a/consensus/state.go b/consensus/state.go index 5a473773..cc9cd51e 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1357,7 +1357,7 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerKey string) error { //----------------------------------------------------------------------------- func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, err error) { - cs.Logger.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height) + cs.Logger.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "valIndex", vote.ValidatorIndex, "csHeight", cs.Height) // A precommit for the previous height? // These come in while we wait timeoutCommit diff --git a/consensus/state_test.go b/consensus/state_test.go index ee88ac25..81ef016b 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -524,9 +524,7 @@ func TestLockPOLRelock(t *testing.T) { signAddVotes(cs1, types.VoteTypePrevote, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), vs2, vs3, vs4) // prevotes - <-voteCh - <-voteCh - <-voteCh + discardFromChan(voteCh, 3) <-voteCh // our precommit // the proposed block should now be locked and our precommit added @@ -536,9 +534,7 @@ func TestLockPOLRelock(t *testing.T) { signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs4) signAddVotes(cs1, types.VoteTypePrecommit, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), vs3) // precommites - <-voteCh - <-voteCh - <-voteCh + discardFromChan(voteCh, 3) // before we timeout to the new round set the new proposal prop, propBlock := decideProposal(cs1, vs2, vs2.Height, vs2.Round+1) @@ -577,9 +573,7 @@ func TestLockPOLRelock(t *testing.T) { // now lets add prevotes from everyone else for the new block signAddVotes(cs1, types.VoteTypePrevote, propBlockHash, propBlockParts.Header(), vs2, vs3, vs4) // prevotes - <-voteCh - <-voteCh - <-voteCh + discardFromChan(voteCh, 3) // now either we go to PrevoteWait or Precommit select { @@ -594,8 +588,7 @@ func TestLockPOLRelock(t *testing.T) { validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash, propBlockHash) signAddVotes(cs1, types.VoteTypePrecommit, propBlockHash, propBlockParts.Header(), vs2, vs3) - <-voteCh - <-voteCh + discardFromChan(voteCh, 2) be := <-newBlockCh b := be.(types.TMEventData).Unwrap().(types.EventDataNewBlockHeader) diff --git a/p2p/switch.go b/p2p/switch.go index 2e9d213d..2d8d3435 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -381,7 +381,7 @@ func (sw *Switch) Peers() IPeerSet { // TODO: make record depending on reason. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { addr := NewNetAddress(peer.Addr()) - sw.Logger.Info("Stopping peer for error", "peer", peer, "err", reason) + sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason) sw.stopAndRemovePeer(peer, reason) if peer.IsPersistent() { diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 1538f08a..b6431a1e 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -338,7 +338,7 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) { const ( writeChanCapacity = 1000 - wsWriteTimeoutSeconds = 30 // each write times out after this + wsWriteTimeoutSeconds = 30 // each write times out after this. wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. ) @@ -535,8 +535,7 @@ func (wsc *wsConnection) writeRoutine() { case <-wsc.Quit: return case <-wsc.pingTicker.C: - wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) - err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{}) + err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{}) if err != nil { wsc.Logger.Error("Failed to write ping message on websocket", "err", err) wsc.Stop() @@ -547,8 +546,7 @@ func (wsc *wsConnection) writeRoutine() { if err != nil { wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) } else { - wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) - if err = wsc.baseConn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil { + if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil { wsc.Logger.Error("Failed to write response on websocket", "err", err) wsc.Stop() return @@ -558,6 +556,13 @@ func (wsc *wsConnection) writeRoutine() { } } +// All writes to the websocket must (re)set the write deadline. +// If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553) +func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error { + wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) + return wsc.baseConn.WriteMessage(msgType, msg) +} + //---------------------------------------- // Main manager for all websocket connections