diff --git a/blockchain/pool.go b/blockchain/pool.go index 099594c1..192165bf 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -60,32 +60,32 @@ func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- } } -func (bp *BlockPool) Start() { - if atomic.CompareAndSwapInt32(&bp.running, 0, 1) { +func (pool *BlockPool) Start() { + if atomic.CompareAndSwapInt32(&pool.running, 0, 1) { log.Info("Starting BlockPool") - go bp.run() + go pool.run() } } -func (bp *BlockPool) Stop() { - if atomic.CompareAndSwapInt32(&bp.running, 1, 0) { +func (pool *BlockPool) Stop() { + if atomic.CompareAndSwapInt32(&pool.running, 1, 0) { log.Info("Stopping BlockPool") - bp.repeater.Stop() + pool.repeater.Stop() } } -func (bp *BlockPool) IsRunning() bool { - return atomic.LoadInt32(&bp.running) == 1 +func (pool *BlockPool) IsRunning() bool { + return atomic.LoadInt32(&pool.running) == 1 } // Run spawns requests as needed. -func (bp *BlockPool) run() { +func (pool *BlockPool) run() { RUN_LOOP: for { - if atomic.LoadInt32(&bp.running) == 0 { + if atomic.LoadInt32(&pool.running) == 0 { break RUN_LOOP } - height, numPending, numTotal := bp.GetStatus() + height, numPending, numTotal := pool.GetStatus() log.Debug("BlockPool.run", "height", height, "numPending", numPending, "numTotal", numTotal) if numPending >= maxPendingRequests { @@ -96,91 +96,91 @@ RUN_LOOP: time.Sleep(requestIntervalMS * time.Millisecond) } else { // request for more blocks. - height := bp.nextHeight() - bp.makeRequest(height) + height := pool.nextHeight() + pool.makeRequest(height) } } } -func (bp *BlockPool) GetStatus() (uint, int32, int32) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) GetStatus() (uint, int32, int32) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - return bp.height, bp.numPending, bp.numTotal + return pool.height, pool.numPending, pool.numTotal } // We need to see the second block's Validation to validate the first block. // So we peek two blocks at a time. -func (bp *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - if r := bp.requests[bp.height]; r != nil { + if r := pool.requests[pool.height]; r != nil { first = r.block } - if r := bp.requests[bp.height+1]; r != nil { + if r := pool.requests[pool.height+1]; r != nil { second = r.block } return } -// Pop the first block at bp.height +// Pop the first block at pool.height // It must have been validated by 'second'.Validation from PeekTwoBlocks(). -func (bp *BlockPool) PopRequest() { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) PopRequest() { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - if r := bp.requests[bp.height]; r == nil || r.block == nil { + if r := pool.requests[pool.height]; r == nil || r.block == nil { panic("PopRequest() requires a valid block") } - delete(bp.requests, bp.height) - bp.height++ - bp.numTotal-- + delete(pool.requests, pool.height) + pool.height++ + pool.numTotal-- } -// Invalidates the block at bp.height. +// Invalidates the block at pool.height. // Remove the peer and request from others. -func (bp *BlockPool) RedoRequest(height uint) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) RedoRequest(height uint) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - request := bp.requests[height] + request := pool.requests[height] if request.block == nil { panic("Expected block to be non-nil") } - bp.RemovePeer(request.peerId) // Lock on peersMtx. + pool.RemovePeer(request.peerId) // Lock on peersMtx. request.block = nil request.peerId = "" - bp.numPending++ + pool.numPending++ - go requestRoutine(bp, height) + go requestRoutine(pool, height) } -func (bp *BlockPool) hasBlock(height uint) bool { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) hasBlock(height uint) bool { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - request := bp.requests[height] + request := pool.requests[height] return request != nil && request.block != nil } -func (bp *BlockPool) setPeerForRequest(height uint, peerId string) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) setPeerForRequest(height uint, peerId string) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - request := bp.requests[height] + request := pool.requests[height] if request == nil { return } request.peerId = peerId } -func (bp *BlockPool) AddBlock(block *types.Block, peerId string) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) AddBlock(block *types.Block, peerId string) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - request := bp.requests[block.Height] + request := pool.requests[block.Height] if request == nil { return } @@ -191,23 +191,23 @@ func (bp *BlockPool) AddBlock(block *types.Block, peerId string) { return } request.block = block - bp.numPending-- + pool.numPending-- } -func (bp *BlockPool) getPeer(peerId string) *bpPeer { - bp.peersMtx.Lock() // Lock - defer bp.peersMtx.Unlock() +func (pool *BlockPool) getPeer(peerId string) *bpPeer { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() - peer := bp.peers[peerId] + peer := pool.peers[peerId] return peer } // Sets the peer's blockchain height. -func (bp *BlockPool) SetPeerHeight(peerId string, height uint) { - bp.peersMtx.Lock() // Lock - defer bp.peersMtx.Unlock() +func (pool *BlockPool) SetPeerHeight(peerId string, height uint) { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() - peer := bp.peers[peerId] + peer := pool.peers[peerId] if peer != nil { peer.height = height } else { @@ -216,24 +216,24 @@ func (bp *BlockPool) SetPeerHeight(peerId string, height uint) { id: peerId, numRequests: 0, } - bp.peers[peerId] = peer + pool.peers[peerId] = peer } } -func (bp *BlockPool) RemovePeer(peerId string) { - bp.peersMtx.Lock() // Lock - defer bp.peersMtx.Unlock() +func (pool *BlockPool) RemovePeer(peerId string) { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() - delete(bp.peers, peerId) + delete(pool.peers, peerId) } // Pick an available peer with at least the given minHeight. // If no peers are available, returns nil. -func (bp *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { - bp.peersMtx.Lock() - defer bp.peersMtx.Unlock() +func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { + pool.peersMtx.Lock() + defer pool.peersMtx.Unlock() - for _, peer := range bp.peers { + for _, peer := range pool.peers { if peer.numRequests >= maxRequestsPerPeer { continue } @@ -247,69 +247,69 @@ func (bp *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { return nil } -func (bp *BlockPool) decrPeer(peerId string) { - bp.peersMtx.Lock() - defer bp.peersMtx.Unlock() +func (pool *BlockPool) decrPeer(peerId string) { + pool.peersMtx.Lock() + defer pool.peersMtx.Unlock() - peer := bp.peers[peerId] + peer := pool.peers[peerId] if peer == nil { return } peer.numRequests-- } -func (bp *BlockPool) nextHeight() uint { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) nextHeight() uint { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - return bp.height + uint(bp.numTotal) + return pool.height + uint(pool.numTotal) } -func (bp *BlockPool) makeRequest(height uint) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) makeRequest(height uint) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() request := &bpRequest{ height: height, peerId: "", block: nil, } - bp.requests[height] = request + pool.requests[height] = request - nextHeight := bp.height + uint(bp.numTotal) + nextHeight := pool.height + uint(pool.numTotal) if nextHeight == height { - bp.numTotal++ - bp.numPending++ + pool.numTotal++ + pool.numPending++ } - go requestRoutine(bp, height) + go requestRoutine(pool, height) } -func (bp *BlockPool) sendRequest(height uint, peerId string) { - if atomic.LoadInt32(&bp.running) == 0 { +func (pool *BlockPool) sendRequest(height uint, peerId string) { + if atomic.LoadInt32(&pool.running) == 0 { return } - bp.requestsCh <- BlockRequest{height, peerId} + pool.requestsCh <- BlockRequest{height, peerId} } -func (bp *BlockPool) sendTimeout(peerId string) { - if atomic.LoadInt32(&bp.running) == 0 { +func (pool *BlockPool) sendTimeout(peerId string) { + if atomic.LoadInt32(&pool.running) == 0 { return } - bp.timeoutsCh <- peerId + pool.timeoutsCh <- peerId } -func (bp *BlockPool) debug() string { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) debug() string { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() str := "" - for h := bp.height; h < bp.height+uint(bp.numTotal); h++ { - if bp.requests[h] == nil { + for h := pool.height; h < pool.height+uint(pool.numTotal); h++ { + if pool.requests[h] == nil { str += Fmt("H(%v):X ", h) } else { str += Fmt("H(%v):", h) - str += Fmt("B?(%v) ", bp.requests[h].block != nil) + str += Fmt("B?(%v) ", pool.requests[h].block != nil) } } return str @@ -333,15 +333,15 @@ type bpRequest struct { // Responsible for making more requests as necessary // Returns when a block is found (e.g. AddBlock() is called) -func requestRoutine(bp *BlockPool, height uint) { +func requestRoutine(pool *BlockPool, height uint) { for { var peer *bpPeer = nil PICK_LOOP: for { - if !bp.IsRunning() { + if !pool.IsRunning() { return } - peer = bp.pickIncrAvailablePeer(height) + peer = pool.pickIncrAvailablePeer(height) if peer == nil { time.Sleep(requestIntervalMS * time.Millisecond) continue PICK_LOOP @@ -349,24 +349,24 @@ func requestRoutine(bp *BlockPool, height uint) { break PICK_LOOP } - bp.setPeerForRequest(height, peer.id) + pool.setPeerForRequest(height, peer.id) for try := 0; try < maxTries; try++ { - bp.sendRequest(height, peer.id) + pool.sendRequest(height, peer.id) time.Sleep(requestTimeoutSeconds * time.Second) - if bp.hasBlock(height) { - bp.decrPeer(peer.id) + if pool.hasBlock(height) { + pool.decrPeer(peer.id) return } - bpHeight, _, _ := bp.GetStatus() + bpHeight, _, _ := pool.GetStatus() if height < bpHeight { - bp.decrPeer(peer.id) + pool.decrPeer(peer.id) return } } - bp.RemovePeer(peer.id) - bp.sendTimeout(peer.id) + pool.RemovePeer(peer.id) + pool.sendTimeout(peer.id) } } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 1aba782e..e5976877 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -8,7 +8,9 @@ import ( "time" "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/p2p" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -16,11 +18,17 @@ const ( BlockchainChannel = byte(0x40) defaultChannelCapacity = 100 defaultSleepIntervalMS = 500 + trySyncIntervalMS = 100 + + // stop syncing when last block's time is + // within this much of the system time. + stopSyncingDurationMinutes = 10 ) // BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { sw *p2p.Switch + state *sm.State store *BlockStore pool *BlockPool requestsCh chan BlockRequest @@ -31,7 +39,10 @@ type BlockchainReactor struct { stopped uint32 } -func NewBlockchainReactor(store *BlockStore) *BlockchainReactor { +func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor { + if state.LastBlockHeight != store.Height() { + panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) + } requestsCh := make(chan BlockRequest, defaultChannelCapacity) timeoutsCh := make(chan string, defaultChannelCapacity) pool := NewBlockPool( @@ -40,6 +51,7 @@ func NewBlockchainReactor(store *BlockStore) *BlockchainReactor { timeoutsCh, ) bcR := &BlockchainReactor{ + state: state, store: store, pool: pool, requestsCh: requestsCh, @@ -129,7 +141,11 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) } } +// Handle messages from the poolReactor telling the reactor what to do. func (bcR *BlockchainReactor) poolRoutine() { + + trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) + FOR_LOOP: for { select { @@ -150,6 +166,48 @@ FOR_LOOP: // Peer timed out. peer := bcR.sw.Peers().Get(peerId) bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) + case _ = <-trySyncTicker.C: // chan time + var lastValidatedBlock *types.Block + SYNC_LOOP: + for i := 0; i < 10; i++ { + // See if there are any blocks to sync. + first, second := bcR.pool.PeekTwoBlocks() + if first == nil || second == nil { + // We need both to sync the first block. + break SYNC_LOOP + } + firstParts := first.MakePartSet().Header() + // Finally, verify the first block using the second's validation. + err := bcR.state.BondedValidators.VerifyValidation( + first.Hash(), firstParts, first.Height, second.Validation) + if err != nil { + bcR.pool.RedoRequest(first.Height) + break SYNC_LOOP + } else { + bcR.pool.PopRequest() + err := bcR.state.AppendBlock(first, firstParts) + if err != nil { + // TODO This is bad, are we zombie? + panic(Fmt("Failed to process committed block: %v", err)) + } + lastValidatedBlock = first + } + } + // We're done syncing for now (will do again shortly) + // See if we want to stop syncing and turn on the + // consensus reactor. + // TODO: use other heuristics too besides blocktime. + // It's not a security concern, as it only needs to happen + // upon node sync, and there's also a second (slower) + // method of syncing in the consensus reactor. + if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { + go func() { + bcR.sw.Reactor("BLOCKCHAIN").Stop() + bcR.sw.Reactor("CONSENSUS").Start(bcR.sw) + }() + break FOR_LOOP + } + continue FOR_LOOP case <-bcR.quit: break FOR_LOOP } diff --git a/consensus/reactor.go b/consensus/reactor.go index 7abd75ec..bbc39c89 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -29,6 +29,8 @@ const ( //----------------------------------------------------------------------------- +// The reactor's underlying ConsensusState may change state at any time. +// We atomically copy the RoundState struct before using it. type ConsensusReactor struct { sw *p2p.Switch started uint32 diff --git a/consensus/state.go b/consensus/state.go index ad8b79cf..e2738ca9 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -467,6 +467,8 @@ func (cs *ConsensusState) updateToState(state *sm.State) { // Reset fields based on state. validators := state.BondedValidators height := state.LastBlockHeight + 1 // next desired block height + + // RoundState fields cs.Height = height cs.Round = 0 cs.Step = RoundStepNewHeight diff --git a/consensus/vote_set.go b/consensus/vote_set.go index c640fc51..973d4b7a 100644 --- a/consensus/vote_set.go +++ b/consensus/vote_set.go @@ -34,7 +34,7 @@ type VoteSet struct { maj23Exists bool } -// Constructs a new VoteSet struct used to accumulate votes for each round. +// Constructs a new VoteSet struct used to accumulate votes for given height/round. func NewVoteSet(height uint, round uint, type_ byte, valSet *sm.ValidatorSet) *VoteSet { if height == 0 { panic("Cannot make VoteSet for height == 0, doesn't make sense.") diff --git a/daemon/daemon.go b/daemon/daemon.go index a4b573be..fae9c979 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -55,7 +55,7 @@ func NewNode() *Node { pexReactor := p2p.NewPEXReactor(book) // Get BlockchainReactor - bcReactor := bc.NewBlockchainReactor(blockStore) + bcReactor := bc.NewBlockchainReactor(state, blockStore) // Get MempoolReactor mempool := mempl.NewMempool(state.Copy()) @@ -70,10 +70,10 @@ func NewNode() *Node { sw := p2p.NewSwitch() sw.SetChainId(state.Hash(), config.App().GetString("Network")) - sw.AddReactor("PEX", pexReactor) - //sw.AddReactor("BLOCKCHAIN", bcReactor) - sw.AddReactor("MEMPOOL", mempoolReactor) - sw.AddReactor("CONSENSUS", consensusReactor) + sw.AddReactor("PEX", pexReactor).Start(sw) + sw.AddReactor("BLOCKCHAIN", bcReactor).Start(sw) + sw.AddReactor("MEMPOOL", mempoolReactor).Start(sw) + sw.AddReactor("CONSENSUS", consensusReactor) // Do not start yet. return &Node{ sw: sw, diff --git a/p2p/peer_set.go b/p2p/peer_set.go index 23f49c51..effad6dc 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -59,7 +59,12 @@ func (ps *PeerSet) Has(peerKey string) bool { func (ps *PeerSet) Get(peerKey string) *Peer { ps.mtx.Lock() defer ps.mtx.Unlock() - return ps.lookup[peerKey].peer + item, ok := ps.lookup[peerKey] + if ok { + return item.peer + } else { + return nil + } } func (ps *PeerSet) Remove(peer *Peer) { diff --git a/state/validator_set.go b/state/validator_set.go index 50f76f42..09589a9d 100644 --- a/state/validator_set.go +++ b/state/validator_set.go @@ -2,12 +2,15 @@ package state import ( "bytes" + "errors" "fmt" "sort" "strings" + "github.com/tendermint/tendermint/account" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/merkle" + "github.com/tendermint/tendermint/types" ) // ValidatorSet represent a set of *Validator at a given height. @@ -198,6 +201,50 @@ func (valSet *ValidatorSet) Iterate(fn func(index uint, val *Validator) bool) { } } +// Verify that +2/3 of the set had signed the given signBytes +func (valSet *ValidatorSet) VerifyValidation(hash []byte, parts types.PartSetHeader, height uint, v *types.Validation) error { + if valSet.Size() != uint(len(v.Commits)) { + return errors.New(Fmt("Invalid validation -- wrong set size: %v vs %v", + valSet.Size(), len(v.Commits))) + } + + talliedVotingPower := uint64(0) + seenValidators := map[string]struct{}{} + + for idx, commit := range v.Commits { + // may be zero, in which case skip. + if commit.Signature.IsZero() { + continue + } + _, val := valSet.GetByIndex(uint(idx)) + commitSignBytes := account.SignBytes(&types.Vote{ + Height: height, Round: commit.Round, Type: types.VoteTypeCommit, + BlockHash: hash, + BlockParts: parts, + }) + + // Validate + if _, seen := seenValidators[string(val.Address)]; seen { + return Errorf("Duplicate validator for commit %v for Validation %v", commit, v) + } + + if !val.PubKey.VerifyBytes(commitSignBytes, commit.Signature) { + return Errorf("Invalid signature for commit %v for Validation %v", commit, v) + } + + // Tally + seenValidators[string(val.Address)] = struct{}{} + talliedVotingPower += val.VotingPower + } + + if talliedVotingPower > valSet.TotalVotingPower()*2/3 { + return nil + } else { + return Errorf("insufficient voting power %v, needed %v", + talliedVotingPower, (valSet.TotalVotingPower()*2/3 + 1)) + } +} + func (valSet *ValidatorSet) String() string { return valSet.StringIndented("") } diff --git a/types/block.go b/types/block.go index 11dfb3b9..176ce438 100644 --- a/types/block.go +++ b/types/block.go @@ -39,7 +39,9 @@ func (b *Block) ValidateBasic(lastBlockHeight uint, lastBlockHash []byte, if !b.LastBlockParts.Equals(lastBlockParts) { return errors.New("Wrong Block.Header.LastBlockParts") } - /* TODO: Determine bounds. + /* TODO: Determine bounds + See blockchain/reactor "stopSyncingDurationMinutes" + if !b.Time.After(lastBlockTime) { return errors.New("Invalid Block.Header.Time") }