Blockpool tests

This commit is contained in:
Jae Kwon 2015-03-22 16:23:24 -07:00
parent 6c7d85c64c
commit 14161ea39c
2 changed files with 99 additions and 40 deletions

View File

@ -130,8 +130,7 @@ func (bp *BlockPool) handleEvent(event_ interface{}) {
} }
} }
} }
case bpPeerStatus: case bpPeerStatus: // updated or new status from peer
// we have updated (or new) status from peer,
// request blocks if possible. // request blocks if possible.
peer := bp.peers[event.peerId] peer := bp.peers[event.peerId]
if peer == nil { if peer == nil {
@ -139,30 +138,45 @@ func (bp *BlockPool) handleEvent(event_ interface{}) {
bp.peers[peer.id] = peer bp.peers[peer.id] = peer
} }
bp.requestBlocksFromPeer(peer) bp.requestBlocksFromPeer(peer)
case bpRequestTimeout: case bpRequestTimeout: // unconditional timeout for each peer's request.
peer := bp.peers[event.peerId] peer := bp.peers[event.peerId]
request := peer.requests[event.height] if peer == nil {
// cleanup was already handled.
return
}
height := event.height
request := peer.requests[height]
if request == nil || request.block != nil { if request == nil || request.block != nil {
// a request for event.height might have timed out for peer. // the request was fulfilled by some peer or this peer.
// but not necessarily, the timeout is unconditional. return
} else { }
peer.bad++
if request.tries < maxTries { // A request for peer timed out.
// try again, start timer again. peer.bad++
request.start(bp.eventsCh) if request.tries < maxTries {
msg := BlockRequest{event.height, peer.id} log.Warn("Timeout: Trying again.", "tries", request.tries, "peerId", peer.id)
go func() { bp.requestsCh <- msg }() // try again.
} else { select {
// delete the request. case bp.requestsCh <- BlockRequest{height, peer.id}:
if peer != nil { request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries
delete(peer.requests, event.height) default:
} // The request cannot be made because requestCh is full.
blockInfo := bp.blockInfos[event.height] // Just delete the request.
if blockInfo != nil { delete(peer.requests, height)
delete(blockInfo.requests, peer.id)
}
go func() { bp.timeoutsCh <- peer.id }()
} }
} else {
log.Warn("Timeout: Deleting request")
// delete the request.
delete(peer.requests, height)
blockInfo := bp.blockInfos[height]
if blockInfo != nil {
delete(blockInfo.requests, peer.id)
}
select {
case bp.timeoutsCh <- peer.id:
default:
}
} }
} }
} }
@ -186,16 +200,21 @@ func (bp *BlockPool) requestBlocksFromPeer(peer *bpPeer) {
needsMorePeers := blockInfo.needsMorePeers() needsMorePeers := blockInfo.needsMorePeers()
alreadyAskedPeer := blockInfo.requests[peer.id] != nil alreadyAskedPeer := blockInfo.requests[peer.id] != nil
if needsMorePeers && !alreadyAskedPeer { if needsMorePeers && !alreadyAskedPeer {
// Create a new request and start the timer. select {
request := &bpBlockRequest{ case bp.requestsCh <- BlockRequest{height, peer.id}:
height: height, // Create a new request and start the timer.
peer: peer, request := &bpBlockRequest{
height: height,
peer: peer,
}
blockInfo.requests[peer.id] = request
peer.requests[height] = request
request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries
default:
// The request cannot be made because requestCh is full.
// Just stop.
return
} }
blockInfo.requests[peer.id] = request
peer.requests[height] = request
request.start(bp.eventsCh)
msg := BlockRequest{height, peer.id}
go func() { bp.requestsCh <- msg }()
} }
} }
} }
@ -203,6 +222,8 @@ func (bp *BlockPool) requestBlocksFromPeer(peer *bpPeer) {
func (bp *BlockPool) makeMoreBlockInfos() { func (bp *BlockPool) makeMoreBlockInfos() {
// make more requests if necessary. // make more requests if necessary.
for i := 0; i < requestBatchSize; i++ { for i := 0; i < requestBatchSize; i++ {
//log.Debug("Confused?",
// "numPending", bp.numPending, "maxPendingRequests", maxPendingRequests, "numtotal", bp.numTotal, "maxTotalRequests", maxTotalRequests)
if bp.numPending < maxPendingRequests && bp.numTotal < maxTotalRequests { if bp.numPending < maxPendingRequests && bp.numTotal < maxTotalRequests {
// Make a request for the next block height // Make a request for the next block height
requestHeight := bp.height + uint(bp.numTotal) requestHeight := bp.height + uint(bp.numTotal)
@ -232,6 +253,7 @@ func (bp *BlockPool) pickAvailablePeers(choose int) []*bpPeer {
return chosen return chosen
} }
// blocking
func (bp *BlockPool) pushBlocksFromStart() { func (bp *BlockPool) pushBlocksFromStart() {
for height := bp.height; ; height++ { for height := bp.height; ; height++ {
// push block to blocksCh. // push block to blocksCh.
@ -242,7 +264,7 @@ func (bp *BlockPool) pushBlocksFromStart() {
bp.numTotal-- bp.numTotal--
bp.height++ bp.height++
delete(bp.blockInfos, height) delete(bp.blockInfos, height)
go func() { bp.blocksCh <- blockInfo.block }() bp.blocksCh <- blockInfo.block
} }
} }
@ -277,7 +299,7 @@ type bpBlockRequest struct {
// bump tries++ and set timeout. // bump tries++ and set timeout.
// NOTE: the timer is unconditional. // NOTE: the timer is unconditional.
func (request bpBlockRequest) start(eventsCh chan<- interface{}) { func (request *bpBlockRequest) startAndTimeoutTo(eventsCh chan<- interface{}) {
request.tries++ request.tries++
time.AfterFunc(requestTimeoutSeconds*time.Second, func() { time.AfterFunc(requestTimeoutSeconds*time.Second, func() {
eventsCh <- bpRequestTimeout{ eventsCh <- bpRequestTimeout{

View File

@ -28,9 +28,9 @@ func TestBasic(t *testing.T) {
start := uint(42) start := uint(42)
maxHeight := uint(300) maxHeight := uint(300)
timeoutsCh := make(chan string) timeoutsCh := make(chan string, 100)
requestsCh := make(chan BlockRequest) requestsCh := make(chan BlockRequest, 100)
blocksCh := make(chan *Block) blocksCh := make(chan *Block, 100)
pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh) pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh)
pool.Start() pool.Start()
@ -50,15 +50,15 @@ func TestBasic(t *testing.T) {
case peerId := <-timeoutsCh: case peerId := <-timeoutsCh:
t.Errorf("timeout: %v", peerId) t.Errorf("timeout: %v", peerId)
case request := <-requestsCh: case request := <-requestsCh:
log.Debug("Pulled new BlockRequest", "request", request) log.Debug("TEST: Pulled new BlockRequest", "request", request)
// After a while, pretend like we got a block from the peer. // After a while, pretend like we got a block from the peer.
go func() { go func() {
block := &Block{Header: &Header{Height: request.Height}} block := &Block{Header: &Header{Height: request.Height}}
pool.AddBlock(block, request.PeerId) pool.AddBlock(block, request.PeerId)
log.Debug("Added block", "block", request.Height, "peer", request.PeerId) log.Debug("TEST: Added block", "block", request.Height, "peer", request.PeerId)
}() }()
case block := <-blocksCh: case block := <-blocksCh:
log.Debug("Pulled new Block", "height", block.Height) log.Debug("TEST: Pulled new Block", "height", block.Height)
if block.Height != lastSeenBlock+1 { if block.Height != lastSeenBlock+1 {
t.Fatalf("Wrong order of blocks seen. Expected: %v Got: %v", lastSeenBlock+1, block.Height) t.Fatalf("Wrong order of blocks seen. Expected: %v Got: %v", lastSeenBlock+1, block.Height)
} }
@ -69,6 +69,43 @@ func TestBasic(t *testing.T) {
} }
} }
pool.Stop()
}
func TestTimeout(t *testing.T) {
peers := makePeers(100, 0, 1000)
start := uint(42)
timeoutsCh := make(chan string, 10)
requestsCh := make(chan BlockRequest, 10)
blocksCh := make(chan *Block, 100)
pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh)
pool.Start()
// Introduce each peer.
go func() {
for _, peer := range peers {
pool.SetPeerStatus(peer.id, peer.height)
}
}()
// Pull from channels
for {
select {
case peerId := <-timeoutsCh:
// Timed out. Done!
if peers[peerId].id != peerId {
t.Errorf("Unexpected peer from timeoutsCh")
}
//return
case _ = <-requestsCh:
// Don't do anything, let it time out.
case _ = <-blocksCh:
t.Errorf("Got block when none expected")
return
}
}
pool.Stop() pool.Stop()
} }