From cb0176d4c7ee4925251775133c8ee7b6aa3b1989 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 22 Mar 2015 03:30:22 -0700 Subject: [PATCH] first draft of blockpool --- block/block_pool.go | 288 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 288 insertions(+) create mode 100644 block/block_pool.go diff --git a/block/block_pool.go b/block/block_pool.go new file mode 100644 index 00000000..1326c6e2 --- /dev/null +++ b/block/block_pool.go @@ -0,0 +1,288 @@ +package block + +import ( + "sync/atomic" + "time" + + . "github.com/tendermint/tendermint/common" +) + +const ( + maxOutstandingRequestsPerPeer = 10 + eventsChannelCapacity = 100 + requestTimeoutSeconds = 10 + maxTries = 3 + requestIntervalMS = 500 + requestBatchSize = 50 + maxPendingRequests = 50 + maxTotalRequests = 100 + maxPeersPerRequest = 1 +) + +type BlockRequest struct { + Height uint + PeerId string +} + +type BlockPool struct { + peers map[string]*bpPeer + blockInfos map[uint]*bpBlockInfo + height uint // the lowest key in blockInfos. + started int32 + stopped int32 + numPending int32 + numTotal int32 + quit chan struct{} + + eventsCh chan interface{} + requestsCh chan<- BlockRequest // output of new requests to make. + timeoutsCh chan<- string // output of peers that timed out. + blocksCh chan<- *Block // output of ordered blocks. + repeater *RepeatTimer // for requesting more bocks. +} + +func NewBlockPool(start uint, timeoutsCh chan<- string, requestsCh chan<- BlockRequest, blocksCh chan<- *Block) *BlockPool { + return &BlockPool{ + peers: make(map[string]*bpPeer), + blockInfos: make(map[uint]*bpBlockInfo), + height: start, + started: 0, + stopped: 0, + numPending: 0, + numTotal: 0, + quit: make(chan struct{}), + + eventsCh: make(chan interface{}, eventsChannelCapacity), + requestsCh: requestsCh, + timeoutsCh: timeoutsCh, + repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond), + } +} + +func (bp *BlockPool) Start() { + if atomic.CompareAndSwapInt32(&bp.started, 0, 1) { + log.Info("Starting BlockPool") + go bp.run() + } +} + +func (bp *BlockPool) Stop() { + if atomic.CompareAndSwapInt32(&bp.stopped, 0, 1) { + log.Info("Stopping BlockPool") + close(bp.quit) + close(bp.eventsCh) + close(bp.requestsCh) + close(bp.timeoutsCh) + close(bp.blocksCh) + bp.repeater.Stop() + } +} + +// AddBlock should be called when a block is received. +func (bp *BlockPool) AddBlock(block *Block, peerId string) { + bp.eventsCh <- bpBlockResponse{block, peerId} +} + +func (bp *BlockPool) SetPeerStatus(peerId string, height uint) { + bp.eventsCh <- bpPeerStatus{peerId, height} +} + +// Runs in a goroutine and processes messages. +func (bp *BlockPool) run() { +FOR_LOOP: + for { + select { + case msg := <-bp.eventsCh: + bp.handleEvent(msg) + case <-bp.repeater.Ch: + // make more requests if necessary. + for i := 0; i < requestBatchSize; i++ { + if atomic.LoadInt32(&bp.numPending) < maxPendingRequests && + atomic.LoadInt32(&bp.numTotal) < maxTotalRequests { + atomic.AddInt32(&bp.numPending, 1) + atomic.AddInt32(&bp.numTotal, 1) + requestHeight := bp.height + uint(bp.numTotal) + blockInfo := bpNewBlockInfo(requestHeight) + bp.blockInfos[requestHeight] = blockInfo + } else { + break + } + } + case <-bp.quit: + break FOR_LOOP + } + } +} + +func (bp *BlockPool) handleEvent(event_ interface{}) { + switch event := event_.(type) { + case bpBlockResponse: + peer := bp.peers[event.peerId] + blockInfo := bp.blockInfos[event.block.Height] + if blockInfo == nil { + // block was unwanted. + if peer != nil { + peer.bad++ + } + } else { + // block was wanted. + if peer != nil { + peer.good++ + } + if blockInfo.block == nil { + // peer is the first to give it to us. + blockInfo.block = event.block + blockInfo.blockBy = peer.id + atomic.AddInt32(&bp.numPending, -1) + if event.block.Height == bp.height { + // push block to blocksCh. + atomic.AddInt32(&bp.numTotal, -1) + delete(peer.requests, bp.height) + delete(blockInfo.requests, peer.id) + go func() { bp.blocksCh <- event.block }() + } + } + } + case bpPeerStatus: + // we have updated (or new) status from peer, + // request blocks if possible. + bp.requestBlocksFromPeer(event.peerId, event.height) + case bpRequestTimeout: + peer := bp.peers[event.peerId] + request := peer.requests[event.height] + if request == nil || request.block != nil { + // a request for event.height might have timed out for peer. + // but not necessarily, the timeout is unconditional. + } else { + peer.bad++ + if request.tries < maxTries { + // try again, start timer again. + request.start(bp.eventsCh) + event := BlockRequest{event.height, peer.id} + go func() { bp.requestsCh <- event }() + } else { + // delete the request. + if peer != nil { + delete(peer.requests, event.height) + } + blockInfo := bp.blockInfos[event.height] + if blockInfo != nil { + delete(blockInfo.requests, peer.id) + } + go func() { bp.timeoutsCh <- peer.id }() + } + } + } +} + +func (bp *BlockPool) requestBlocksFromPeer(peerId string, height uint) { + peer := bp.peers[peerId] + if peer == nil { + bp.peers[peerId] = bpNewPeer(peerId, height) + } + // If peer is available and can provide something... + for height := bp.height; peer.available(); height++ { + blockInfo := bp.blockInfos[height] + if blockInfo == nil { + // We're out of range. + return + } + + needsMorePeers := blockInfo.needsMorePeers() + alreadyAskedPeer := blockInfo.requests[peer.id] != nil + if needsMorePeers && !alreadyAskedPeer { + // Create a new request and start the timer. + request := &bpBlockRequest{ + height: height, + peer: peer, + } + blockInfo.requests[peer.id] = request + peer.requests[height] = request + request.start(bp.eventsCh) + msg := BlockRequest{height, peer.id} + go func() { bp.requestsCh <- msg }() + } + } +} + +//----------------------------------------------------------------------------- + +type bpBlockInfo struct { + height uint + requests map[string]*bpBlockRequest + block *Block // first block received + blockBy string // peerId of source +} + +func bpNewBlockInfo(height uint) *bpBlockInfo { + return &bpBlockInfo{ + height: height, + requests: make(map[string]*bpBlockRequest), + } +} + +func (blockInfo *bpBlockInfo) needsMorePeers() bool { + return len(blockInfo.requests) < maxPeersPerRequest +} + +//------------------------------------- + +type bpBlockRequest struct { + peer *bpPeer + height uint + block *Block + tries int +} + +// bump tries++ and set timeout. +// NOTE: the timer is unconditional. +func (request bpBlockRequest) start(eventsCh chan<- interface{}) { + request.tries++ + time.AfterFunc(requestTimeoutSeconds*time.Second, func() { + eventsCh <- bpRequestTimeout{ + peerId: request.peer.id, + height: request.height, + } + }) +} + +//------------------------------------- + +type bpPeer struct { + id string + height uint + requests map[uint]*bpBlockRequest + // Count good/bad events from peer. + good uint + bad uint +} + +func bpNewPeer(peerId string, height uint) *bpPeer { + return &bpPeer{ + id: peerId, + height: height, + requests: make(map[uint]*bpBlockRequest), + } +} + +func (peer *bpPeer) available() bool { + return len(peer.requests) < maxOutstandingRequestsPerPeer +} + +//------------------------------------- +// bp.eventsCh messages + +type bpBlockResponse struct { + block *Block + peerId string +} + +type bpPeerStatus struct { + peerId string + height uint // blockchain tip of peer +} + +type bpRequestTimeout struct { + peerId string + height uint +}