dont catchupReplay on wal if we fast synced

This commit is contained in:
Ethan Buchman 2017-10-26 18:29:23 -04:00
parent bb6c15b00a
commit 591dd9e662
6 changed files with 58 additions and 29 deletions

View File

@ -26,10 +26,10 @@ eg, L = latency = 0.1s
*/ */
const ( const (
requestIntervalMS = 250 requestIntervalMS = 100
maxTotalRequesters = 300 maxTotalRequesters = 1000
maxPendingRequests = maxTotalRequesters maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 10 maxPendingRequestsPerPeer = 50
minRecvRate = 10240 // 10Kb/s minRecvRate = 10240 // 10Kb/s
) )
@ -56,7 +56,8 @@ type BlockPool struct {
height int // the lowest key in requesters. height int // the lowest key in requesters.
numPending int32 // number of requests pending assignment or block response numPending int32 // number of requests pending assignment or block response
// peers // peers
peers map[string]*bpPeer peers map[string]*bpPeer
maxPeerHeight int
requestsCh chan<- BlockRequest requestsCh chan<- BlockRequest
timeoutsCh chan<- string timeoutsCh chan<- string
@ -87,10 +88,12 @@ func (pool *BlockPool) OnStop() {}
// Run spawns requesters as needed. // Run spawns requesters as needed.
func (pool *BlockPool) makeRequestersRoutine() { func (pool *BlockPool) makeRequestersRoutine() {
for { for {
if !pool.IsRunning() { if !pool.IsRunning() {
break break
} }
_, numPending, lenRequesters := pool.GetStatus() _, numPending, lenRequesters := pool.GetStatus()
if numPending >= maxPendingRequests { if numPending >= maxPendingRequests {
// sleep for a bit. // sleep for a bit.
@ -147,16 +150,10 @@ func (pool *BlockPool) IsCaughtUp() bool {
return false return false
} }
maxPeerHeight := 0
for _, peer := range pool.peers {
maxPeerHeight = cmn.MaxInt(maxPeerHeight, peer.height)
}
// some conditions to determine if we're caught up // some conditions to determine if we're caught up
receivedBlockOrTimedOut := (pool.height > 0 || time.Since(pool.startTime) > 5*time.Second) receivedBlockOrTimedOut := (pool.height > 0 || time.Since(pool.startTime) > 5*time.Second)
ourChainIsLongestAmongPeers := maxPeerHeight == 0 || pool.height >= maxPeerHeight ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
pool.Logger.Info(cmn.Fmt("IsCaughtUp: %v", isCaughtUp), "height", pool.height, "maxPeerHeight", maxPeerHeight)
return isCaughtUp return isCaughtUp
} }
@ -235,6 +232,13 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int
} }
} }
// MaxPeerHeight returns the heighest height reported by a peer
func (pool *BlockPool) MaxPeerHeight() int {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.maxPeerHeight
}
// Sets the peer's alleged blockchain height. // Sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerID string, height int) { func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
pool.mtx.Lock() pool.mtx.Lock()
@ -248,6 +252,10 @@ func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
peer.setLogger(pool.Logger.With("peer", peerID)) peer.setLogger(pool.Logger.With("peer", peerID))
pool.peers[peerID] = peer pool.peers[peerID] = peer
} }
if height > pool.maxPeerHeight {
pool.maxPeerHeight = height
}
} }
func (pool *BlockPool) RemovePeer(peerID string) { func (pool *BlockPool) RemovePeer(peerID string) {
@ -298,7 +306,7 @@ func (pool *BlockPool) makeNextRequester() {
nextHeight := pool.height + len(pool.requesters) nextHeight := pool.height + len(pool.requesters)
request := newBPRequester(pool, nextHeight) request := newBPRequester(pool, nextHeight)
request.SetLogger(pool.Logger.With("height", nextHeight)) // request.SetLogger(pool.Logger.With("height", nextHeight))
pool.requesters[nextHeight] = request pool.requesters[nextHeight] = request
pool.numPending++ pool.numPending++

View File

@ -19,8 +19,8 @@ const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40) BlockchainChannel = byte(0x40)
defaultChannelCapacity = 100 defaultChannelCapacity = 1000
trySyncIntervalMS = 100 trySyncIntervalMS = 50
// stop syncing when last block's time is // stop syncing when last block's time is
// within this much of the system time. // within this much of the system time.
// stopSyncingDurationMinutes = 10 // stopSyncingDurationMinutes = 10
@ -34,7 +34,7 @@ const (
type consensusReactor interface { type consensusReactor interface {
// for when we switch from blockchain reactor and fast sync to // for when we switch from blockchain reactor and fast sync to
// the consensus machine // the consensus machine
SwitchToConsensus(*sm.State) SwitchToConsensus(*sm.State, int)
} }
// BlockchainReactor handles long-term catchup syncing. // BlockchainReactor handles long-term catchup syncing.
@ -110,8 +110,8 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{ return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
ID: BlockchainChannel, ID: BlockchainChannel,
Priority: 5, Priority: 10,
SendQueueCapacity: 100, SendQueueCapacity: 1000,
}, },
} }
} }
@ -194,8 +194,13 @@ func (bcR *BlockchainReactor) poolRoutine() {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second) switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
blocksSynced := 0
chainID := bcR.state.ChainID chainID := bcR.state.ChainID
lastHundred := time.Now()
lastRate := 0.0
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
@ -223,14 +228,14 @@ FOR_LOOP:
case <-switchToConsensusTicker.C: case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus() height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers() outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Info("Consensus ticker", "numPending", numPending, "total", lenRequesters, bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound) "outbound", outbound, "inbound", inbound)
if bcR.pool.IsCaughtUp() { if bcR.pool.IsCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
bcR.pool.Stop() bcR.pool.Stop()
conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
conR.SwitchToConsensus(bcR.state) conR.SwitchToConsensus(bcR.state, blocksSynced)
break FOR_LOOP break FOR_LOOP
} }
@ -271,6 +276,14 @@ FOR_LOOP:
// TODO This is bad, are we zombie? // TODO This is bad, are we zombie?
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
} }
blocksSynced += 1
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
lastHundred = time.Now()
}
} }
} }
continue FOR_LOOP continue FOR_LOOP

View File

@ -101,10 +101,10 @@ func TestByzantine(t *testing.T) {
// start the state machines // start the state machines
byzR := reactors[0].(*ByzantineReactor) byzR := reactors[0].(*ByzantineReactor)
s := byzR.reactor.conS.GetState() s := byzR.reactor.conS.GetState()
byzR.reactor.SwitchToConsensus(s) byzR.reactor.SwitchToConsensus(s, 0)
for i := 1; i < N; i++ { for i := 1; i < N; i++ {
cr := reactors[i].(*ConsensusReactor) cr := reactors[i].(*ConsensusReactor)
cr.SwitchToConsensus(cr.conS.GetState()) cr.SwitchToConsensus(cr.conS.GetState(), 0)
} }
// byz proposer sends one block to peers[0] // byz proposer sends one block to peers[0]

View File

@ -76,7 +76,7 @@ func (conR *ConsensusReactor) OnStop() {
// SwitchToConsensus switches from fast_sync mode to consensus mode. // SwitchToConsensus switches from fast_sync mode to consensus mode.
// It resets the state, turns off fast_sync, and starts the consensus state-machine // It resets the state, turns off fast_sync, and starts the consensus state-machine
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced int) {
conR.Logger.Info("SwitchToConsensus") conR.Logger.Info("SwitchToConsensus")
conR.conS.reconstructLastCommit(state) conR.conS.reconstructLastCommit(state)
// NOTE: The line below causes broadcastNewRoundStepRoutine() to // NOTE: The line below causes broadcastNewRoundStepRoutine() to
@ -87,6 +87,10 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
conR.fastSync = false conR.fastSync = false
conR.mtx.Unlock() conR.mtx.Unlock()
if blocksSynced > 0 {
// dont bother with the WAL if we fast synced
conR.conS.doWALCatchup = false
}
conR.conS.Start() conR.conS.Start()
} }

View File

@ -54,7 +54,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int, subscribeEven
// we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors // we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
s := reactors[i].conS.GetState() s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s) reactors[i].SwitchToConsensus(s, 0)
} }
return reactors, eventChans return reactors, eventChans
} }

View File

@ -97,8 +97,9 @@ type ConsensusState struct {
// a Write-Ahead Log ensures we can recover from any kind of crash // a Write-Ahead Log ensures we can recover from any kind of crash
// and helps us avoid signing conflicting votes // and helps us avoid signing conflicting votes
wal *WAL wal *WAL
replayMode bool // so we don't log signing errors during replay replayMode bool // so we don't log signing errors during replay
doWALCatchup bool // determines if we even try to do the catchup
// for tests where we want to limit the number of transitions the state makes // for tests where we want to limit the number of transitions the state makes
nSteps int nSteps int
@ -123,6 +124,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppCon
internalMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(), timeoutTicker: NewTimeoutTicker(),
done: make(chan struct{}), done: make(chan struct{}),
doWALCatchup: true,
} }
// set function defaults (may be overwritten before calling Start) // set function defaults (may be overwritten before calling Start)
cs.decideProposal = cs.defaultDecideProposal cs.decideProposal = cs.defaultDecideProposal
@ -226,10 +228,12 @@ func (cs *ConsensusState) OnStart() error {
// we may have lost some votes if the process crashed // we may have lost some votes if the process crashed
// reload from consensus log to catchup // reload from consensus log to catchup
if err := cs.catchupReplay(cs.Height); err != nil { if cs.doWALCatchup {
cs.Logger.Error("Error on catchup replay. Proceeding to start ConsensusState anyway", "err", err.Error()) if err := cs.catchupReplay(cs.Height); err != nil {
// NOTE: if we ever do return an error here, cs.Logger.Error("Error on catchup replay. Proceeding to start ConsensusState anyway", "err", err.Error())
// make sure to stop the timeoutTicker // NOTE: if we ever do return an error here,
// make sure to stop the timeoutTicker
}
} }
// now start the receiveRoutine // now start the receiveRoutine