tendermint/block/pool.go

330 lines
7.9 KiB
Go
Raw Normal View History

2015-03-22 03:30:22 -07:00
package block
import (
2015-03-22 12:46:53 -07:00
"math/rand"
2015-03-22 03:30:22 -07:00
"sync/atomic"
"time"
. "github.com/tendermint/tendermint/common"
)
const (
maxOutstandingRequestsPerPeer = 10
eventsChannelCapacity = 100
requestTimeoutSeconds = 10
maxTries = 3
requestIntervalMS = 500
requestBatchSize = 50
maxPendingRequests = 50
maxTotalRequests = 100
maxPeersPerRequest = 1
)
type BlockRequest struct {
Height uint
PeerId string
}
type BlockPool struct {
peers map[string]*bpPeer
blockInfos map[uint]*bpBlockInfo
2015-03-22 12:46:53 -07:00
height uint // the lowest key in blockInfos.
started int32 // atomic
stopped int32 // atomic
2015-03-22 03:30:22 -07:00
numPending int32
numTotal int32
2015-03-22 12:46:53 -07:00
eventsCh chan interface{} // internal events.
2015-03-22 03:30:22 -07:00
requestsCh chan<- BlockRequest // output of new requests to make.
timeoutsCh chan<- string // output of peers that timed out.
blocksCh chan<- *Block // output of ordered blocks.
repeater *RepeatTimer // for requesting more bocks.
2015-03-22 12:46:53 -07:00
quit chan struct{}
2015-03-22 03:30:22 -07:00
}
func NewBlockPool(start uint, timeoutsCh chan<- string, requestsCh chan<- BlockRequest, blocksCh chan<- *Block) *BlockPool {
return &BlockPool{
peers: make(map[string]*bpPeer),
blockInfos: make(map[uint]*bpBlockInfo),
height: start,
started: 0,
stopped: 0,
numPending: 0,
numTotal: 0,
quit: make(chan struct{}),
eventsCh: make(chan interface{}, eventsChannelCapacity),
requestsCh: requestsCh,
timeoutsCh: timeoutsCh,
2015-03-22 12:46:53 -07:00
blocksCh: blocksCh,
2015-03-22 03:30:22 -07:00
repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond),
}
}
func (bp *BlockPool) Start() {
if atomic.CompareAndSwapInt32(&bp.started, 0, 1) {
log.Info("Starting BlockPool")
go bp.run()
}
}
func (bp *BlockPool) Stop() {
if atomic.CompareAndSwapInt32(&bp.stopped, 0, 1) {
log.Info("Stopping BlockPool")
close(bp.quit)
close(bp.eventsCh)
close(bp.requestsCh)
close(bp.timeoutsCh)
close(bp.blocksCh)
bp.repeater.Stop()
}
}
// AddBlock should be called when a block is received.
func (bp *BlockPool) AddBlock(block *Block, peerId string) {
bp.eventsCh <- bpBlockResponse{block, peerId}
}
func (bp *BlockPool) SetPeerStatus(peerId string, height uint) {
bp.eventsCh <- bpPeerStatus{peerId, height}
}
// Runs in a goroutine and processes messages.
func (bp *BlockPool) run() {
FOR_LOOP:
for {
select {
case msg := <-bp.eventsCh:
bp.handleEvent(msg)
case <-bp.repeater.Ch:
2015-03-22 12:46:53 -07:00
bp.makeMoreBlockInfos()
bp.requestBlocksFromRandomPeers(10)
2015-03-22 03:30:22 -07:00
case <-bp.quit:
break FOR_LOOP
}
}
}
func (bp *BlockPool) handleEvent(event_ interface{}) {
switch event := event_.(type) {
case bpBlockResponse:
peer := bp.peers[event.peerId]
blockInfo := bp.blockInfos[event.block.Height]
if blockInfo == nil {
// block was unwanted.
if peer != nil {
peer.bad++
}
} else {
// block was wanted.
if peer != nil {
peer.good++
}
2015-03-22 12:46:53 -07:00
delete(peer.requests, event.block.Height)
2015-03-22 03:30:22 -07:00
if blockInfo.block == nil {
// peer is the first to give it to us.
blockInfo.block = event.block
blockInfo.blockBy = peer.id
2015-03-22 12:46:53 -07:00
bp.numPending--
2015-03-22 03:30:22 -07:00
if event.block.Height == bp.height {
2015-03-22 12:46:53 -07:00
go bp.pushBlocksFromStart()
2015-03-22 03:30:22 -07:00
}
}
}
case bpPeerStatus:
// we have updated (or new) status from peer,
// request blocks if possible.
2015-03-22 12:46:53 -07:00
peer := bp.peers[event.peerId]
if peer == nil {
peer = bpNewPeer(event.peerId, event.height)
bp.peers[peer.id] = peer
}
bp.requestBlocksFromPeer(peer)
2015-03-22 03:30:22 -07:00
case bpRequestTimeout:
peer := bp.peers[event.peerId]
request := peer.requests[event.height]
if request == nil || request.block != nil {
// a request for event.height might have timed out for peer.
// but not necessarily, the timeout is unconditional.
} else {
peer.bad++
if request.tries < maxTries {
// try again, start timer again.
request.start(bp.eventsCh)
2015-03-22 12:46:53 -07:00
msg := BlockRequest{event.height, peer.id}
go func() { bp.requestsCh <- msg }()
2015-03-22 03:30:22 -07:00
} else {
// delete the request.
if peer != nil {
delete(peer.requests, event.height)
}
blockInfo := bp.blockInfos[event.height]
if blockInfo != nil {
delete(blockInfo.requests, peer.id)
}
go func() { bp.timeoutsCh <- peer.id }()
}
}
}
}
2015-03-22 12:46:53 -07:00
func (bp *BlockPool) requestBlocksFromRandomPeers(maxPeers int) {
chosen := bp.pickAvailablePeers(maxPeers)
log.Debug("requestBlocksFromRandomPeers", "chosen", len(chosen))
for _, peer := range chosen {
bp.requestBlocksFromPeer(peer)
2015-03-22 03:30:22 -07:00
}
2015-03-22 12:46:53 -07:00
}
func (bp *BlockPool) requestBlocksFromPeer(peer *bpPeer) {
2015-03-22 03:30:22 -07:00
// If peer is available and can provide something...
for height := bp.height; peer.available(); height++ {
blockInfo := bp.blockInfos[height]
if blockInfo == nil {
// We're out of range.
return
}
needsMorePeers := blockInfo.needsMorePeers()
alreadyAskedPeer := blockInfo.requests[peer.id] != nil
if needsMorePeers && !alreadyAskedPeer {
// Create a new request and start the timer.
request := &bpBlockRequest{
height: height,
peer: peer,
}
blockInfo.requests[peer.id] = request
peer.requests[height] = request
request.start(bp.eventsCh)
msg := BlockRequest{height, peer.id}
go func() { bp.requestsCh <- msg }()
}
}
}
2015-03-22 12:46:53 -07:00
func (bp *BlockPool) makeMoreBlockInfos() {
// make more requests if necessary.
for i := 0; i < requestBatchSize; i++ {
if bp.numPending < maxPendingRequests && bp.numTotal < maxTotalRequests {
// Make a request for the next block height
requestHeight := bp.height + uint(bp.numTotal)
log.Debug("New blockInfo", "height", requestHeight)
blockInfo := bpNewBlockInfo(requestHeight)
bp.blockInfos[requestHeight] = blockInfo
bp.numPending++
bp.numTotal++
} else {
break
}
}
}
func (bp *BlockPool) pickAvailablePeers(choose int) []*bpPeer {
available := []*bpPeer{}
for _, peer := range bp.peers {
if peer.available() {
available = append(available, peer)
}
}
perm := rand.Perm(MinInt(choose, len(available)))
chosen := make([]*bpPeer, len(perm))
for i, idx := range perm {
chosen[i] = available[idx]
}
return chosen
}
func (bp *BlockPool) pushBlocksFromStart() {
for height := bp.height; ; height++ {
// push block to blocksCh.
blockInfo := bp.blockInfos[height]
if blockInfo == nil || blockInfo.block == nil {
break
}
bp.numTotal--
bp.height++
delete(bp.blockInfos, height)
go func() { bp.blocksCh <- blockInfo.block }()
}
}
2015-03-22 03:30:22 -07:00
//-----------------------------------------------------------------------------
type bpBlockInfo struct {
height uint
requests map[string]*bpBlockRequest
block *Block // first block received
blockBy string // peerId of source
}
func bpNewBlockInfo(height uint) *bpBlockInfo {
return &bpBlockInfo{
height: height,
requests: make(map[string]*bpBlockRequest),
}
}
func (blockInfo *bpBlockInfo) needsMorePeers() bool {
return len(blockInfo.requests) < maxPeersPerRequest
}
//-------------------------------------
type bpBlockRequest struct {
peer *bpPeer
height uint
block *Block
tries int
}
// bump tries++ and set timeout.
// NOTE: the timer is unconditional.
func (request bpBlockRequest) start(eventsCh chan<- interface{}) {
request.tries++
time.AfterFunc(requestTimeoutSeconds*time.Second, func() {
eventsCh <- bpRequestTimeout{
peerId: request.peer.id,
height: request.height,
}
})
}
//-------------------------------------
type bpPeer struct {
id string
height uint
requests map[uint]*bpBlockRequest
// Count good/bad events from peer.
good uint
bad uint
}
func bpNewPeer(peerId string, height uint) *bpPeer {
return &bpPeer{
id: peerId,
height: height,
requests: make(map[uint]*bpBlockRequest),
}
}
func (peer *bpPeer) available() bool {
return len(peer.requests) < maxOutstandingRequestsPerPeer
}
//-------------------------------------
// bp.eventsCh messages
type bpBlockResponse struct {
block *Block
peerId string
}
type bpPeerStatus struct {
peerId string
height uint // blockchain tip of peer
}
type bpRequestTimeout struct {
peerId string
height uint
}