diff --git a/binary/bit_array.go b/binary/bit_array.go index 563196c0..e3e4f7a2 100644 --- a/binary/bit_array.go +++ b/binary/bit_array.go @@ -4,18 +4,22 @@ import ( "fmt" "math/rand" "strings" + "sync" . "github.com/tendermint/tendermint/common" ) -// Not goroutine safe type BitArray struct { + mtx sync.Mutex Bits uint // NOTE: persisted via reflect, must be exported Elems []uint64 // NOTE: persisted via reflect, must be exported } func NewBitArray(bits uint) BitArray { - return BitArray{bits, make([]uint64, (bits+63)/64)} + return BitArray{ + Bits: bits, + Elems: make([]uint64, (bits+63)/64), + } } func (bA BitArray) Size() uint { @@ -28,39 +32,69 @@ func (bA BitArray) IsZero() bool { // NOTE: behavior is undefined if i >= bA.Bits func (bA BitArray) GetIndex(i uint) bool { + bA.mtx.Lock() + defer bA.mtx.Unlock() + + return bA.getIndex(i) +} + +func (bA BitArray) getIndex(i uint) bool { if i >= bA.Bits { return false } - return bA.Elems[i/64]&uint64(1<<(i%64)) > 0 + return bA.Elems[i/64]&(uint64(1)<<(i%64)) > 0 } // NOTE: behavior is undefined if i >= bA.Bits func (bA BitArray) SetIndex(i uint, v bool) bool { + bA.mtx.Lock() + defer bA.mtx.Unlock() + + return bA.setIndex(i, v) +} + +func (bA BitArray) setIndex(i uint, v bool) bool { + if i >= bA.Bits { return false } if v { - bA.Elems[i/64] |= uint64(1 << (i % 64)) + bA.Elems[i/64] |= (uint64(1) << (i % 64)) } else { - bA.Elems[i/64] &= ^uint64(1 << (i % 64)) + bA.Elems[i/64] &= ^(uint64(1) << (i % 64)) } return true } func (bA BitArray) Copy() BitArray { + bA.mtx.Lock() + defer bA.mtx.Unlock() + return bA.copy() +} + +func (bA BitArray) copy() BitArray { c := make([]uint64, len(bA.Elems)) copy(c, bA.Elems) - return BitArray{bA.Bits, c} + return BitArray{ + Bits: bA.Bits, + Elems: c, + } } func (bA BitArray) copyBits(bits uint) BitArray { c := make([]uint64, (bits+63)/64) copy(c, bA.Elems) - return BitArray{bits, c} + return BitArray{ + Bits: bits, + Elems: c, + } } // Returns a BitArray of larger bits size. func (bA BitArray) Or(o BitArray) BitArray { + bA.mtx.Lock() + defer bA.mtx.Unlock() + c := bA.copyBits(MaxUint(bA.Bits, o.Bits)) for i := 0; i < len(c.Elems); i++ { c.Elems[i] |= o.Elems[i] @@ -70,6 +104,13 @@ func (bA BitArray) Or(o BitArray) BitArray { // Returns a BitArray of smaller bit size. func (bA BitArray) And(o BitArray) BitArray { + bA.mtx.Lock() + defer bA.mtx.Unlock() + + return bA.and(o) +} + +func (bA BitArray) and(o BitArray) BitArray { c := bA.copyBits(MinUint(bA.Bits, o.Bits)) for i := 0; i < len(c.Elems); i++ { c.Elems[i] &= o.Elems[i] @@ -78,7 +119,10 @@ func (bA BitArray) And(o BitArray) BitArray { } func (bA BitArray) Not() BitArray { - c := bA.Copy() + bA.mtx.Lock() + defer bA.mtx.Unlock() + + c := bA.copy() for i := 0; i < len(c.Elems); i++ { c.Elems[i] = ^c.Elems[i] } @@ -86,24 +130,51 @@ func (bA BitArray) Not() BitArray { } func (bA BitArray) Sub(o BitArray) BitArray { + bA.mtx.Lock() + defer bA.mtx.Unlock() + if bA.Bits > o.Bits { - c := bA.Copy() + c := bA.copy() for i := 0; i < len(o.Elems)-1; i++ { c.Elems[i] &= ^c.Elems[i] } i := uint(len(o.Elems) - 1) if i >= 0 { for idx := i * 64; idx < o.Bits; idx++ { - c.SetIndex(idx, c.GetIndex(idx) && !o.GetIndex(idx)) + c.setIndex(idx, c.getIndex(idx) && !o.GetIndex(idx)) } } return c } else { - return bA.And(o.Not()) + return bA.and(o.Not()) } } +func (bA BitArray) IsFull() bool { + bA.mtx.Lock() + defer bA.mtx.Unlock() + + if bA.Bits == 0 { + return false + } + + // Check all elements except the last + for _, elem := range bA.Elems[:len(bA.Elems)-1] { + if (^elem) != 0 { + return false + } + } + + // Check that the last element has (lastElemBits) 1's + lastElemBits := (bA.Bits+63)%64 + 1 + lastElem := bA.Elems[len(bA.Elems)-1] + return (lastElem+1)&((uint64(1)< 0 { + if (bA.Elems[elemIdx] & (uint64(1) << uint(bitIdx))) > 0 { return 64*uint(elemIdx) + uint(bitIdx), true } } @@ -131,7 +202,7 @@ func (bA BitArray) PickRandom() (uint, bool) { randBitStart := rand.Intn(elemBits) for j := 0; j < elemBits; j++ { bitIdx := ((j + randBitStart) % elemBits) - if (bA.Elems[elemIdx] & (1 << uint(bitIdx))) > 0 { + if (bA.Elems[elemIdx] & (uint64(1) << uint(bitIdx))) > 0 { return 64*uint(elemIdx) + uint(bitIdx), true } } @@ -141,14 +212,24 @@ func (bA BitArray) PickRandom() (uint, bool) { } func (bA BitArray) String() string { - return bA.StringIndented("") + bA.mtx.Lock() + defer bA.mtx.Unlock() + + return bA.stringIndented("") } func (bA BitArray) StringIndented(indent string) string { + bA.mtx.Lock() + defer bA.mtx.Unlock() + return bA.StringIndented(indent) +} + +func (bA BitArray) stringIndented(indent string) string { + lines := []string{} bits := "" for i := uint(0); i < bA.Bits; i++ { - if bA.GetIndex(i) { + if bA.getIndex(i) { bits += "X" } else { bits += "_" diff --git a/consensus/reactor.go b/consensus/reactor.go index b286ad8d..fc459b2d 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -35,14 +35,16 @@ type ConsensusReactor struct { stopped uint32 quit chan struct{} - conS *ConsensusState + blockStore *BlockStore + conS *ConsensusState } func NewConsensusReactor(blockStore *BlockStore, mempoolReactor *mempool.MempoolReactor, state *state.State) *ConsensusReactor { conS := NewConsensusState(state, blockStore, mempoolReactor) conR := &ConsensusReactor{ - quit: make(chan struct{}), - conS: conS, + blockStore: blockStore, + quit: make(chan struct{}), + conS: conS, } return conR } @@ -263,13 +265,13 @@ OUTER_LOOP: // won't necessarily match, but that's OK. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) { log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts) - if index, ok := rs.ProposalBlockParts.BitArray().Sub( - prs.ProposalBlockBitArray).PickRandom(); ok { + if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok { + part := rs.ProposalBlockParts.GetPart(index) msg := &PartMessage{ Height: rs.Height, Round: rs.Round, Type: partTypeProposalBlock, - Part: rs.ProposalBlockParts.GetPart(index), + Part: part, } peer.Send(DataCh, msg) ps.SetHasProposalBlockPart(rs.Height, rs.Round, index) @@ -277,9 +279,46 @@ OUTER_LOOP: } } + // If the peer is on a previous height, help catch up. + if rs.Height > prs.Height { + log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height) + if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok { + // Ensure that the peer's PartSetHeaeder is correct + blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) + if !blockMeta.Parts.Equals(prs.ProposalBlockParts) { + log.Debug("Peer ProposalBlockParts mismatch, sleeping", + "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts) + time.Sleep(peerGossipSleepDuration) + continue OUTER_LOOP + } + // Load the part + part := conR.blockStore.LoadBlockPart(prs.Height, index) + if part == nil { + log.Warn("Could not load part", "index", index, + "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts) + time.Sleep(peerGossipSleepDuration) + continue OUTER_LOOP + } + // Send the part + msg := &PartMessage{ + Height: prs.Height, + Round: prs.Round, + Type: partTypeProposalBlock, + Part: part, + } + peer.Send(DataCh, msg) + ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) + continue OUTER_LOOP + } else { + log.Debug("No parts to send in catch-up, sleeping") + time.Sleep(peerGossipSleepDuration) + continue OUTER_LOOP + } + } + // If height and round doesn't match, sleep. if rs.Height != prs.Height || rs.Round != prs.Round { - log.Debug("Height or Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer) + log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer) time.Sleep(peerGossipSleepDuration) continue OUTER_LOOP } @@ -294,8 +333,7 @@ OUTER_LOOP: // Send proposal POL parts? if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) { - if index, ok := rs.ProposalPOLParts.BitArray().Sub( - prs.ProposalPOLBitArray).PickRandom(); ok { + if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok { msg := &PartMessage{ Height: rs.Height, Round: rs.Round, @@ -327,12 +365,7 @@ OUTER_LOOP: trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) { // TODO: give priority to our vote. - // peerVoteSet BitArray is being accessed concurrently with - // writes from Receive() routines. We must lock like so here: - ps.mtx.Lock() - index, ok := voteSet.BitArray().Sub(peerVoteSet).PickRandom() - ps.mtx.Unlock() - if ok { + if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok { vote := voteSet.GetByIndex(index) // NOTE: vote may be a commit. msg := &VoteMessage{index, vote} @@ -392,8 +425,7 @@ OUTER_LOOP: } // If peer is lagging by more than 1, load and send Validation and send Commits. - if prs.Height != 0 && !prs.HasAllValidationCommits && - rs.Height >= prs.Height+2 { + if prs.Height != 0 && !prs.HasAllCatchupCommits && rs.Height >= prs.Height+2 { // Load the block header and validation for prs.Height+1, // which contains commit signatures for prs.Height. @@ -404,8 +436,7 @@ OUTER_LOOP: // Initialize Commits if needed ps.EnsureVoteBitArrays(prs.Height, size) - index, ok := validation.BitArray().Sub(prs.Commits).PickRandom() - if ok { + if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok { commit := validation.Commits[index] log.Debug("Picked commit to send", "index", index, "commit", commit) // Reconstruct vote. @@ -423,7 +454,7 @@ OUTER_LOOP: continue OUTER_LOOP } else { log.Debug("No commits to send", "ours", validation.BitArray(), "theirs", prs.Commits) - ps.SetHasAllValidationCommits(prs.Height) + ps.SetHasAllCatchupCommits(prs.Height) } } @@ -437,20 +468,20 @@ OUTER_LOOP: // Read only when returned by PeerState.GetRoundState(). type PeerRoundState struct { - Height uint // Height peer is at - Round uint // Round peer is at - Step RoundStep // Step peer is at - StartTime time.Time // Estimated start of round 0 at this height - Proposal bool // True if peer has proposal for this round - ProposalBlockParts PartSetHeader // - ProposalBlockBitArray BitArray // True bit -> has part - ProposalPOLParts PartSetHeader // - ProposalPOLBitArray BitArray // True bit -> has part - Prevotes BitArray // All votes peer has for this round - Precommits BitArray // All precommits peer has for this round - Commits BitArray // All commits peer has for this height - HasAllValidationCommits bool // Used for catch-up - LastCommits BitArray // All commits peer has for last height + Height uint // Height peer is at + Round uint // Round peer is at + Step RoundStep // Step peer is at + StartTime time.Time // Estimated start of round 0 at this height + Proposal bool // True if peer has proposal for this round + ProposalBlockParts PartSetHeader // + ProposalBlockBitArray BitArray // True bit -> has part + ProposalPOLParts PartSetHeader // + ProposalPOLBitArray BitArray // True bit -> has part + Prevotes BitArray // All votes peer has for this round + Precommits BitArray // All precommits peer has for this round + Commits BitArray // All commits peer has for this height + LastCommits BitArray // All commits peer has for last height + HasAllCatchupCommits bool // Used for catch-up } //----------------------------------------------------------------------------- @@ -474,6 +505,7 @@ func NewPeerState(peer *p2p.Peer) *PeerState { func (ps *PeerState) GetRoundState() *PeerRoundState { ps.mtx.Lock() defer ps.mtx.Unlock() + prs := ps.PeerRoundState // copy return &prs } @@ -540,6 +572,7 @@ func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) { func (ps *PeerState) SetHasVote(vote *Vote, index uint) { ps.mtx.Lock() defer ps.mtx.Unlock() + ps.setHasVote(vote.Height, vote.Round, vote.Type, index) } @@ -571,11 +604,12 @@ func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) // When catching up, this helps keep track of whether // we should send more commit votes from the block (validation) store -func (ps *PeerState) SetHasAllValidationCommits(height uint) { +func (ps *PeerState) SetHasAllCatchupCommits(height uint) { ps.mtx.Lock() defer ps.mtx.Unlock() + if ps.Height == height { - ps.HasAllValidationCommits = true + ps.HasAllCatchupCommits = true } } @@ -612,7 +646,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *Roun } // We'll update the BitArray capacity later. ps.Commits = BitArray{} - ps.HasAllValidationCommits = false + ps.HasAllCatchupCommits = false } } diff --git a/consensus/state.go b/consensus/state.go index 14654e16..a4889487 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -304,7 +304,7 @@ func (cs *ConsensusState) stepTransitionRoutine() { // we're running in a separate goroutine, which avoids deadlocks. rs := cs.getRoundState() round, roundStartTime, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime) - log.Debug("Scheduling next action", "round", round, "roundStartTime", roundStartTime, "elapsedRatio", elapsedRatio) + log.Info("Scheduling next action", "height", rs.Height, "round", round, "step", rs.Step, "roundStartTime", roundStartTime, "elapsedRatio", elapsedRatio) switch rs.Step { case RoundStepNewHeight: // We should run RoundActionPropose when rs.StartTime passes. @@ -357,7 +357,7 @@ ACTION_LOOP: height, round, action := roundAction.Height, roundAction.Round, roundAction.Action rs := cs.GetRoundState() - log.Info("Running round action", "action", action, "height", rs.Height, "round", rs.Round, "step", rs.Step, "startTime", rs.StartTime) + log.Info("Running round action", "height", rs.Height, "round", rs.Round, "step", rs.Step, "action", action, "startTime", rs.StartTime) // Continue if action is not relevant if height != rs.Height {