tendermint/blockchain/pool.go

377 lines
7.9 KiB
Go
Raw Normal View History

package blockchain
2015-03-22 03:30:22 -07:00
import (
2015-03-24 11:02:30 -07:00
"sync"
2015-03-22 03:30:22 -07:00
"sync/atomic"
"time"
2015-04-01 17:30:16 -07:00
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/types"
2015-03-22 03:30:22 -07:00
)
const (
maxTries = 3
inputsChannelCapacity = 200
requestIntervalMS = 500
maxPendingRequests = 200
maxTotalRequests = 300
maxRequestsPerPeer = 300
2015-03-22 03:30:22 -07:00
)
2015-03-22 19:20:54 -07:00
var (
2015-03-24 11:02:30 -07:00
requestTimeoutSeconds = time.Duration(1)
2015-03-22 19:20:54 -07:00
)
2015-03-22 03:30:22 -07:00
type BlockPool struct {
2015-03-24 11:02:30 -07:00
// block requests
requestsMtx sync.Mutex
requests map[uint]*bpRequest
height uint // the lowest key in requests.
numPending int32
numTotal int32
// peers
peersMtx sync.Mutex
peers map[string]*bpPeer
requestsCh chan<- BlockRequest
timeoutsCh chan<- string
repeater *RepeatTimer
running int32 // atomic
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
2015-03-22 03:30:22 -07:00
return &BlockPool{
2015-03-24 11:02:30 -07:00
peers: make(map[string]*bpPeer),
requests: make(map[uint]*bpRequest),
2015-03-22 03:30:22 -07:00
height: start,
numPending: 0,
numTotal: 0,
requestsCh: requestsCh,
timeoutsCh: timeoutsCh,
repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond),
2015-03-24 11:02:30 -07:00
running: 0,
2015-03-22 03:30:22 -07:00
}
}
func (pool *BlockPool) Start() {
if atomic.CompareAndSwapInt32(&pool.running, 0, 1) {
2015-03-22 03:30:22 -07:00
log.Info("Starting BlockPool")
go pool.run()
2015-03-22 03:30:22 -07:00
}
}
func (pool *BlockPool) Stop() {
if atomic.CompareAndSwapInt32(&pool.running, 1, 0) {
2015-03-22 03:30:22 -07:00
log.Info("Stopping BlockPool")
pool.repeater.Stop()
2015-03-22 03:30:22 -07:00
}
}
func (pool *BlockPool) IsRunning() bool {
return atomic.LoadInt32(&pool.running) == 1
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
// Run spawns requests as needed.
func (pool *BlockPool) run() {
2015-03-24 11:02:30 -07:00
RUN_LOOP:
2015-03-22 03:30:22 -07:00
for {
if atomic.LoadInt32(&pool.running) == 0 {
2015-03-24 11:02:30 -07:00
break RUN_LOOP
}
_, numPending, numTotal := pool.GetStatus()
2015-03-24 11:02:30 -07:00
if numPending >= maxPendingRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
} else if numTotal >= maxTotalRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
} else {
// request for more blocks.
height := pool.nextHeight()
pool.makeRequest(height)
2015-03-22 03:30:22 -07:00
}
}
}
func (pool *BlockPool) GetStatus() (uint, int32, int32) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-22 16:23:24 -07:00
return pool.height, pool.numPending, pool.numTotal
2015-03-24 11:02:30 -07:00
}
2015-03-22 16:23:24 -07:00
2015-03-24 11:02:30 -07:00
// We need to see the second block's Validation to validate the first block.
// So we peek two blocks at a time.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
if r := pool.requests[pool.height]; r != nil {
2015-03-24 11:02:30 -07:00
first = r.block
}
if r := pool.requests[pool.height+1]; r != nil {
2015-03-24 11:02:30 -07:00
second = r.block
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
return
2015-03-22 03:30:22 -07:00
}
// Pop the first block at pool.height
2015-03-24 11:02:30 -07:00
// It must have been validated by 'second'.Validation from PeekTwoBlocks().
func (pool *BlockPool) PopRequest() {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
if r := pool.requests[pool.height]; r == nil || r.block == nil {
2015-03-24 11:02:30 -07:00
panic("PopRequest() requires a valid block")
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
delete(pool.requests, pool.height)
pool.height++
pool.numTotal--
2015-03-22 12:46:53 -07:00
}
// Invalidates the block at pool.height.
2015-03-24 11:02:30 -07:00
// Remove the peer and request from others.
func (pool *BlockPool) RedoRequest(height uint) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
request := pool.requests[height]
2015-03-24 11:02:30 -07:00
if request.block == nil {
panic("Expected block to be non-nil")
2015-03-22 03:30:22 -07:00
}
pool.RemovePeer(request.peerId) // Lock on peersMtx.
2015-03-24 11:02:30 -07:00
request.block = nil
request.peerId = ""
pool.numPending++
2015-03-24 11:02:30 -07:00
go requestRoutine(pool, height)
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) hasBlock(height uint) bool {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
request := pool.requests[height]
2015-03-24 11:02:30 -07:00
return request != nil && request.block != nil
}
func (pool *BlockPool) setPeerForRequest(height uint, peerId string) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
request := pool.requests[height]
2015-03-24 11:02:30 -07:00
if request == nil {
return
2015-03-22 12:46:53 -07:00
}
2015-03-24 11:02:30 -07:00
request.peerId = peerId
2015-03-22 12:46:53 -07:00
}
func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
request := pool.requests[block.Height]
2015-03-24 11:02:30 -07:00
if request == nil {
return
2015-03-22 12:46:53 -07:00
}
2015-03-24 11:02:30 -07:00
if request.peerId != peerId {
return
2015-03-22 12:46:53 -07:00
}
2015-03-24 11:02:30 -07:00
if request.block != nil {
return
}
request.block = block
pool.numPending--
2015-03-24 11:02:30 -07:00
}
func (pool *BlockPool) getPeer(peerId string) *bpPeer {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
2015-03-24 11:02:30 -07:00
peer := pool.peers[peerId]
2015-03-24 11:02:30 -07:00
return peer
2015-03-22 12:46:53 -07:00
}
2015-03-24 11:02:30 -07:00
// Sets the peer's blockchain height.
func (pool *BlockPool) SetPeerHeight(peerId string, height uint) {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
2015-03-24 11:02:30 -07:00
peer := pool.peers[peerId]
2015-03-24 11:02:30 -07:00
if peer != nil {
peer.height = height
} else {
peer = &bpPeer{
height: height,
id: peerId,
numRequests: 0,
2015-03-22 12:46:53 -07:00
}
pool.peers[peerId] = peer
2015-03-22 12:46:53 -07:00
}
}
func (pool *BlockPool) RemovePeer(peerId string) {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
2015-03-22 03:30:22 -07:00
delete(pool.peers, peerId)
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
// Pick an available peer with at least the given minHeight.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer {
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()
2015-03-24 11:02:30 -07:00
for _, peer := range pool.peers {
2015-03-24 11:02:30 -07:00
if peer.numRequests >= maxRequestsPerPeer {
continue
}
if peer.height < minHeight {
continue
}
peer.numRequests++
return peer
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
return nil
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) decrPeer(peerId string) {
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()
2015-03-22 03:30:22 -07:00
peer := pool.peers[peerId]
2015-03-24 11:02:30 -07:00
if peer == nil {
return
}
peer.numRequests--
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) nextHeight() uint {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
return pool.height + uint(pool.numTotal)
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) makeRequest(height uint) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-22 03:30:22 -07:00
2015-03-24 11:02:30 -07:00
request := &bpRequest{
height: height,
peerId: "",
block: nil,
}
pool.requests[height] = request
2015-03-24 11:02:30 -07:00
nextHeight := pool.height + uint(pool.numTotal)
2015-03-24 11:02:30 -07:00
if nextHeight == height {
pool.numTotal++
pool.numPending++
2015-03-24 11:02:30 -07:00
}
go requestRoutine(pool, height)
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) sendRequest(height uint, peerId string) {
if atomic.LoadInt32(&pool.running) == 0 {
2015-03-24 11:02:30 -07:00
return
2015-03-22 03:30:22 -07:00
}
pool.requestsCh <- BlockRequest{height, peerId}
2015-03-22 03:30:22 -07:00
}
func (pool *BlockPool) sendTimeout(peerId string) {
if atomic.LoadInt32(&pool.running) == 0 {
2015-03-24 11:02:30 -07:00
return
}
pool.timeoutsCh <- peerId
2015-03-24 11:02:30 -07:00
}
func (pool *BlockPool) debug() string {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
2015-03-24 11:02:30 -07:00
str := ""
for h := pool.height; h < pool.height+uint(pool.numTotal); h++ {
if pool.requests[h] == nil {
2015-03-24 11:02:30 -07:00
str += Fmt("H(%v):X ", h)
} else {
str += Fmt("H(%v):", h)
str += Fmt("B?(%v) ", pool.requests[h].block != nil)
2015-03-24 11:02:30 -07:00
}
}
return str
2015-03-22 03:30:22 -07:00
}
//-------------------------------------
2015-03-24 11:02:30 -07:00
type bpPeer struct {
id string
height uint
numRequests int32
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
type bpRequest struct {
height uint
2015-03-22 03:30:22 -07:00
peerId string
2015-03-24 11:02:30 -07:00
block *types.Block
2015-03-22 03:30:22 -07:00
}
2015-03-24 11:02:30 -07:00
//-------------------------------------
// Responsible for making more requests as necessary
// Returns when a block is found (e.g. AddBlock() is called)
func requestRoutine(pool *BlockPool, height uint) {
2015-03-24 11:02:30 -07:00
for {
var peer *bpPeer = nil
PICK_LOOP:
for {
if !pool.IsRunning() {
log.Debug("BlockPool not running. Stopping requestRoutine", "height", height)
2015-03-24 11:02:30 -07:00
return
}
peer = pool.pickIncrAvailablePeer(height)
2015-03-24 11:02:30 -07:00
if peer == nil {
//log.Debug("No peers available", "height", height)
2015-03-24 11:02:30 -07:00
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_LOOP
}
break PICK_LOOP
}
pool.setPeerForRequest(height, peer.id)
2015-03-24 11:02:30 -07:00
for try := 0; try < maxTries; try++ {
pool.sendRequest(height, peer.id)
2015-03-24 11:02:30 -07:00
time.Sleep(requestTimeoutSeconds * time.Second)
if pool.hasBlock(height) {
pool.decrPeer(peer.id)
2015-03-24 11:02:30 -07:00
return
}
bpHeight, _, _ := pool.GetStatus()
2015-03-24 11:02:30 -07:00
if height < bpHeight {
pool.decrPeer(peer.id)
2015-03-24 11:02:30 -07:00
return
}
}
pool.RemovePeer(peer.id)
pool.sendTimeout(peer.id)
2015-03-24 11:02:30 -07:00
}
}
//-------------------------------------
type BlockRequest struct {
Height uint
PeerId string
2015-03-22 03:30:22 -07:00
}