tendermint/blockchain/pool.go

416 lines
9.2 KiB
Go
Raw Normal View History

package blockchain
2015-03-22 03:30:22 -07:00
import (
2015-03-24 11:02:30 -07:00
"sync"
2015-03-22 03:30:22 -07:00
"sync/atomic"
"time"
2015-04-01 17:30:16 -07:00
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/types"
2015-03-22 03:30:22 -07:00
)
const (
maxTries = 3
inputsChannelCapacity = 200
requestIntervalMS = 500
maxPendingRequests = 200
maxTotalRequests = 300
maxRequestsPerPeer = 300
2015-03-22 03:30:22 -07:00
)
// numTotal = numPending + blocks in the pool we havnt synced yet
2015-03-22 19:20:54 -07:00
var (
2015-04-23 14:59:12 -07:00
requestTimeoutSeconds = time.Duration(3)
2015-03-22 19:20:54 -07:00
)
/*
Peers self report their heights when a new peer joins the block pool.
Starting from whatever we've got (pool.height), we request blocks
in sequence from peers that reported higher heights than ours.
Every so often we ask peers what height they're on so we can keep going.
Requests are continuously made for blocks of heigher heights until
the limits. If most of the requests have no available peers, and we
are not at peer limits, we can probably switch to consensus reactor
*/
2015-03-22 03:30:22 -07:00
type BlockPool struct {
2015-03-24 11:02:30 -07:00
// block requests
requestsMtx sync.Mutex
requests map[uint]*bpRequest
peerless int32 // number of requests without peers
height uint // the lowest key in requests.
2015-03-24 11:02:30 -07:00
numPending int32
numTotal int32
// peers
peersMtx sync.Mutex
peers map[string]*bpPeer
requestsCh chan<- BlockRequest
timeoutsCh chan<- string
repeater *RepeatTimer
running int32 // atomic
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
2015-03-22 03:30:22 -07:00
return &BlockPool{
2015-03-24 11:02:30 -07:00
peers: make(map[string]*bpPeer),
requests: make(map[uint]*bpRequest),
2015-03-22 03:30:22 -07:00
height: start,
numPending: 0,
numTotal: 0,
requestsCh: requestsCh,
timeoutsCh: timeoutsCh,
repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond),
2015-03-24 11:02:30 -07:00
running: 0,
2015-03-22 03:30:22 -07:00
}
}
func (pool *BlockPool) Start() {
if atomic.CompareAndSwapInt32(&pool.running, 0, 1) {
2015-03-22 03:30:22 -07:00
log.Info("Starting BlockPool")
go pool.run()
2015-03-22 03:30:22 -07:00
}
}
func (pool *BlockPool) Stop() {
if atomic.CompareAndSwapInt32(&pool.running, 1, 0) {
2015-03-22 03:30:22 -07:00
log.Info("Stopping BlockPool")
pool.repeater.Stop()
2015-03-22 03:30:22 -07:00
}
}
func (pool *BlockPool) IsRunning() bool {
return atomic.LoadInt32(&pool.running) == 1
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
// Run spawns requests as needed.
func (pool *BlockPool) run() {
2015-03-24 11:02:30 -07:00
RUN_LOOP:
2015-03-22 03:30:22 -07:00
for {
if atomic.LoadInt32(&pool.running) == 0 {
2015-03-24 11:02:30 -07:00
break RUN_LOOP
}
_, numPending, numTotal := pool.GetStatus()
2015-03-24 11:02:30 -07:00
if numPending >= maxPendingRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
} else if numTotal >= maxTotalRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
} else {
// request for more blocks.
height := pool.nextHeight()
pool.makeRequest(height)
2015-03-22 03:30:22 -07:00
}
}
}
func (pool *BlockPool) GetStatus() (uint, int32, int32) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-22 16:23:24 -07:00
return pool.height, pool.numPending, pool.numTotal
2015-03-24 11:02:30 -07:00
}
2015-03-22 16:23:24 -07:00
2015-03-24 11:02:30 -07:00
// We need to see the second block's Validation to validate the first block.
// So we peek two blocks at a time.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
if r := pool.requests[pool.height]; r != nil {
2015-03-24 11:02:30 -07:00
first = r.block
}
if r := pool.requests[pool.height+1]; r != nil {
2015-03-24 11:02:30 -07:00
second = r.block
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
return
2015-03-22 03:30:22 -07:00
}
// Pop the first block at pool.height
2015-03-24 11:02:30 -07:00
// It must have been validated by 'second'.Validation from PeekTwoBlocks().
func (pool *BlockPool) PopRequest() {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
if r := pool.requests[pool.height]; r == nil || r.block == nil {
2015-03-24 11:02:30 -07:00
panic("PopRequest() requires a valid block")
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
delete(pool.requests, pool.height)
pool.height++
pool.numTotal--
2015-03-22 12:46:53 -07:00
}
// Invalidates the block at pool.height.
2015-03-24 11:02:30 -07:00
// Remove the peer and request from others.
func (pool *BlockPool) RedoRequest(height uint) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
request := pool.requests[height]
2015-03-24 11:02:30 -07:00
if request.block == nil {
panic("Expected block to be non-nil")
2015-03-22 03:30:22 -07:00
}
// TODO: record this malfeasance
// maybe punish peer on switch (an invalid block!)
pool.RemovePeer(request.peerId) // Lock on peersMtx.
2015-03-24 11:02:30 -07:00
request.block = nil
request.peerId = ""
pool.numPending++
pool.peerless++
2015-03-24 11:02:30 -07:00
go requestRoutine(pool, height)
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) hasBlock(height uint) bool {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
request := pool.requests[height]
2015-03-24 11:02:30 -07:00
return request != nil && request.block != nil
}
func (pool *BlockPool) setPeerForRequest(height uint, peerId string) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
request := pool.requests[height]
2015-03-24 11:02:30 -07:00
if request == nil {
return
2015-03-22 12:46:53 -07:00
}
pool.peerless--
2015-03-24 11:02:30 -07:00
request.peerId = peerId
2015-03-22 12:46:53 -07:00
}
func (pool *BlockPool) removePeerForRequest(height uint, peerId string) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
request := pool.requests[height]
if request == nil {
return
}
pool.peerless++
request.peerId = ""
}
func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
request := pool.requests[block.Height]
2015-03-24 11:02:30 -07:00
if request == nil {
return
2015-03-22 12:46:53 -07:00
}
2015-03-24 11:02:30 -07:00
if request.peerId != peerId {
return
2015-03-22 12:46:53 -07:00
}
2015-03-24 11:02:30 -07:00
if request.block != nil {
return
}
request.block = block
pool.numPending--
2015-03-24 11:02:30 -07:00
}
func (pool *BlockPool) getPeer(peerId string) *bpPeer {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
2015-03-24 11:02:30 -07:00
peer := pool.peers[peerId]
2015-03-24 11:02:30 -07:00
return peer
2015-03-22 12:46:53 -07:00
}
// Sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerId string, height uint) {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
2015-03-24 11:02:30 -07:00
peer := pool.peers[peerId]
2015-03-24 11:02:30 -07:00
if peer != nil {
peer.height = height
} else {
peer = &bpPeer{
height: height,
id: peerId,
numRequests: 0,
2015-03-22 12:46:53 -07:00
}
pool.peers[peerId] = peer
2015-03-22 12:46:53 -07:00
}
}
func (pool *BlockPool) RemovePeer(peerId string) {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
2015-03-22 03:30:22 -07:00
delete(pool.peers, peerId)
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
// Pick an available peer with at least the given minHeight.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer {
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()
2015-03-24 11:02:30 -07:00
for _, peer := range pool.peers {
2015-03-24 11:02:30 -07:00
if peer.numRequests >= maxRequestsPerPeer {
continue
}
if peer.height < minHeight {
continue
}
peer.numRequests++
return peer
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
return nil
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) decrPeer(peerId string) {
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()
2015-03-22 03:30:22 -07:00
peer := pool.peers[peerId]
2015-03-24 11:02:30 -07:00
if peer == nil {
return
}
peer.numRequests--
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) nextHeight() uint {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
// we make one request per height.
return pool.height + uint(pool.numTotal)
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) makeRequest(height uint) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-22 03:30:22 -07:00
2015-03-24 11:02:30 -07:00
request := &bpRequest{
height: height,
peerId: "",
block: nil,
}
pool.requests[height] = request
2015-03-24 11:02:30 -07:00
pool.peerless++
nextHeight := pool.height + uint(pool.numTotal)
2015-03-24 11:02:30 -07:00
if nextHeight == height {
pool.numTotal++
pool.numPending++
2015-03-24 11:02:30 -07:00
}
go requestRoutine(pool, height)
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) sendRequest(height uint, peerId string) {
if atomic.LoadInt32(&pool.running) == 0 {
2015-03-24 11:02:30 -07:00
return
2015-03-22 03:30:22 -07:00
}
pool.requestsCh <- BlockRequest{height, peerId}
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) sendTimeout(peerId string) {
if atomic.LoadInt32(&pool.running) == 0 {
2015-03-24 11:02:30 -07:00
return
}
pool.timeoutsCh <- peerId
2015-03-24 11:02:30 -07:00
}
func (pool *BlockPool) debug() string {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
str := ""
for h := pool.height; h < pool.height+uint(pool.numTotal); h++ {
if pool.requests[h] == nil {
2015-03-24 11:02:30 -07:00
str += Fmt("H(%v):X ", h)
} else {
str += Fmt("H(%v):", h)
str += Fmt("B?(%v) ", pool.requests[h].block != nil)
2015-03-24 11:02:30 -07:00
}
}
return str
2015-03-22 03:30:22 -07:00
}
//-------------------------------------
2015-03-24 11:02:30 -07:00
type bpPeer struct {
id string
height uint
numRequests int32
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
type bpRequest struct {
height uint
2015-03-22 03:30:22 -07:00
peerId string
2015-03-24 11:02:30 -07:00
block *types.Block
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
//-------------------------------------
// Responsible for making more requests as necessary
// Returns only when a block is found (e.g. AddBlock() is called)
func requestRoutine(pool *BlockPool, height uint) {
2015-03-24 11:02:30 -07:00
for {
var peer *bpPeer = nil
PICK_LOOP:
for {
if !pool.IsRunning() {
log.Debug("BlockPool not running. Stopping requestRoutine", "height", height)
2015-03-24 11:02:30 -07:00
return
}
peer = pool.pickIncrAvailablePeer(height)
2015-03-24 11:02:30 -07:00
if peer == nil {
//log.Debug("No peers available", "height", height)
2015-03-24 11:02:30 -07:00
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_LOOP
}
break PICK_LOOP
}
// set the peer, decrement peerless
pool.setPeerForRequest(height, peer.id)
2015-03-24 11:02:30 -07:00
for try := 0; try < maxTries; try++ {
pool.sendRequest(height, peer.id)
2015-03-24 11:02:30 -07:00
time.Sleep(requestTimeoutSeconds * time.Second)
// if successful the block is either in the pool,
if pool.hasBlock(height) {
pool.decrPeer(peer.id)
2015-03-24 11:02:30 -07:00
return
}
// or already processed and we've moved past it
bpHeight, _, _ := pool.GetStatus()
2015-03-24 11:02:30 -07:00
if height < bpHeight {
pool.decrPeer(peer.id)
2015-03-24 11:02:30 -07:00
return
}
}
// unset the peer, increment peerless
pool.removePeerForRequest(height, peer.id)
// this peer failed us, try again
pool.RemovePeer(peer.id)
pool.sendTimeout(peer.id)
2015-03-24 11:02:30 -07:00
}
}
//-------------------------------------
type BlockRequest struct {
Height uint
PeerId string
2015-03-22 03:30:22 -07:00
}