diff --git a/blockchain/pool.go b/blockchain/pool.go index 348ba09b..47e59711 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -26,10 +26,10 @@ eg, L = latency = 0.1s */ const ( - requestIntervalMS = 250 - maxTotalRequesters = 300 + requestIntervalMS = 100 + maxTotalRequesters = 1000 maxPendingRequests = maxTotalRequesters - maxPendingRequestsPerPeer = 10 + maxPendingRequestsPerPeer = 50 minRecvRate = 10240 // 10Kb/s ) @@ -56,7 +56,8 @@ type BlockPool struct { height int // the lowest key in requesters. numPending int32 // number of requests pending assignment or block response // peers - peers map[string]*bpPeer + peers map[string]*bpPeer + maxPeerHeight int requestsCh chan<- BlockRequest timeoutsCh chan<- string @@ -87,10 +88,12 @@ func (pool *BlockPool) OnStop() {} // Run spawns requesters as needed. func (pool *BlockPool) makeRequestersRoutine() { + for { if !pool.IsRunning() { break } + _, numPending, lenRequesters := pool.GetStatus() if numPending >= maxPendingRequests { // sleep for a bit. @@ -147,16 +150,10 @@ func (pool *BlockPool) IsCaughtUp() bool { return false } - maxPeerHeight := 0 - for _, peer := range pool.peers { - maxPeerHeight = cmn.MaxInt(maxPeerHeight, peer.height) - } - // some conditions to determine if we're caught up receivedBlockOrTimedOut := (pool.height > 0 || time.Since(pool.startTime) > 5*time.Second) - ourChainIsLongestAmongPeers := maxPeerHeight == 0 || pool.height >= maxPeerHeight + ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers - pool.Logger.Info(cmn.Fmt("IsCaughtUp: %v", isCaughtUp), "height", pool.height, "maxPeerHeight", maxPeerHeight) return isCaughtUp } @@ -235,6 +232,13 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int } } +// MaxPeerHeight returns the heighest height reported by a peer +func (pool *BlockPool) MaxPeerHeight() int { + pool.mtx.Lock() + defer pool.mtx.Unlock() + return pool.maxPeerHeight +} + // Sets the peer's alleged blockchain height. func (pool *BlockPool) SetPeerHeight(peerID string, height int) { pool.mtx.Lock() @@ -248,6 +252,10 @@ func (pool *BlockPool) SetPeerHeight(peerID string, height int) { peer.setLogger(pool.Logger.With("peer", peerID)) pool.peers[peerID] = peer } + + if height > pool.maxPeerHeight { + pool.maxPeerHeight = height + } } func (pool *BlockPool) RemovePeer(peerID string) { @@ -298,7 +306,7 @@ func (pool *BlockPool) makeNextRequester() { nextHeight := pool.height + len(pool.requesters) request := newBPRequester(pool, nextHeight) - request.SetLogger(pool.Logger.With("height", nextHeight)) + // request.SetLogger(pool.Logger.With("height", nextHeight)) pool.requesters[nextHeight] = request pool.numPending++ diff --git a/blockchain/reactor.go b/blockchain/reactor.go index b46ad40f..9ac58031 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -19,8 +19,8 @@ const ( // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) BlockchainChannel = byte(0x40) - defaultChannelCapacity = 100 - trySyncIntervalMS = 100 + defaultChannelCapacity = 1000 + trySyncIntervalMS = 50 // stop syncing when last block's time is // within this much of the system time. // stopSyncingDurationMinutes = 10 @@ -34,7 +34,7 @@ const ( type consensusReactor interface { // for when we switch from blockchain reactor and fast sync to // the consensus machine - SwitchToConsensus(*sm.State) + SwitchToConsensus(*sm.State, int) } // BlockchainReactor handles long-term catchup syncing. @@ -110,8 +110,8 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ ID: BlockchainChannel, - Priority: 5, - SendQueueCapacity: 100, + Priority: 10, + SendQueueCapacity: 1000, }, } } @@ -194,8 +194,13 @@ func (bcR *BlockchainReactor) poolRoutine() { statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second) + blocksSynced := 0 + chainID := bcR.state.ChainID + lastHundred := time.Now() + lastRate := 0.0 + FOR_LOOP: for { select { @@ -223,14 +228,14 @@ FOR_LOOP: case <-switchToConsensusTicker.C: height, numPending, lenRequesters := bcR.pool.GetStatus() outbound, inbound, _ := bcR.Switch.NumPeers() - bcR.Logger.Info("Consensus ticker", "numPending", numPending, "total", lenRequesters, + bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters, "outbound", outbound, "inbound", inbound) if bcR.pool.IsCaughtUp() { bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) bcR.pool.Stop() conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) - conR.SwitchToConsensus(bcR.state) + conR.SwitchToConsensus(bcR.state, blocksSynced) break FOR_LOOP } @@ -271,6 +276,14 @@ FOR_LOOP: // TODO This is bad, are we zombie? cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } + blocksSynced += 1 + + if blocksSynced%100 == 0 { + lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) + bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, + "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) + lastHundred = time.Now() + } } } continue FOR_LOOP diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 38dff406..c96ccf97 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -101,10 +101,10 @@ func TestByzantine(t *testing.T) { // start the state machines byzR := reactors[0].(*ByzantineReactor) s := byzR.reactor.conS.GetState() - byzR.reactor.SwitchToConsensus(s) + byzR.reactor.SwitchToConsensus(s, 0) for i := 1; i < N; i++ { cr := reactors[i].(*ConsensusReactor) - cr.SwitchToConsensus(cr.conS.GetState()) + cr.SwitchToConsensus(cr.conS.GetState(), 0) } // byz proposer sends one block to peers[0] diff --git a/consensus/reactor.go b/consensus/reactor.go index 48041e2f..e6849992 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -76,7 +76,7 @@ func (conR *ConsensusReactor) OnStop() { // SwitchToConsensus switches from fast_sync mode to consensus mode. // It resets the state, turns off fast_sync, and starts the consensus state-machine -func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { +func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced int) { conR.Logger.Info("SwitchToConsensus") conR.conS.reconstructLastCommit(state) // NOTE: The line below causes broadcastNewRoundStepRoutine() to @@ -87,6 +87,10 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { conR.fastSync = false conR.mtx.Unlock() + if blocksSynced > 0 { + // dont bother with the WAL if we fast synced + conR.conS.doWALCatchup = false + } conR.conS.Start() } diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 623a6541..ed8fa87b 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -54,7 +54,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int, subscribeEven // we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors for i := 0; i < N; i++ { s := reactors[i].conS.GetState() - reactors[i].SwitchToConsensus(s) + reactors[i].SwitchToConsensus(s, 0) } return reactors, eventChans } diff --git a/consensus/state.go b/consensus/state.go index e01c2ab2..e5b7641f 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -97,8 +97,9 @@ type ConsensusState struct { // a Write-Ahead Log ensures we can recover from any kind of crash // and helps us avoid signing conflicting votes - wal *WAL - replayMode bool // so we don't log signing errors during replay + wal *WAL + replayMode bool // so we don't log signing errors during replay + doWALCatchup bool // determines if we even try to do the catchup // for tests where we want to limit the number of transitions the state makes nSteps int @@ -123,6 +124,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppCon internalMsgQueue: make(chan msgInfo, msgQueueSize), timeoutTicker: NewTimeoutTicker(), done: make(chan struct{}), + doWALCatchup: true, } // set function defaults (may be overwritten before calling Start) cs.decideProposal = cs.defaultDecideProposal @@ -226,10 +228,12 @@ func (cs *ConsensusState) OnStart() error { // we may have lost some votes if the process crashed // reload from consensus log to catchup - if err := cs.catchupReplay(cs.Height); err != nil { - cs.Logger.Error("Error on catchup replay. Proceeding to start ConsensusState anyway", "err", err.Error()) - // NOTE: if we ever do return an error here, - // make sure to stop the timeoutTicker + if cs.doWALCatchup { + if err := cs.catchupReplay(cs.Height); err != nil { + cs.Logger.Error("Error on catchup replay. Proceeding to start ConsensusState anyway", "err", err.Error()) + // NOTE: if we ever do return an error here, + // make sure to stop the timeoutTicker + } } // now start the receiveRoutine