commit
ae5af4d44e
|
@ -1,7 +1,7 @@
|
|||
package blockchain
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -10,345 +10,367 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
maxOutstandingRequestsPerPeer = 10
|
||||
eventsChannelCapacity = 100
|
||||
requestTimeoutSeconds = 10
|
||||
maxTries = 3
|
||||
requestIntervalMS = 500
|
||||
requestBatchSize = 50
|
||||
maxPendingRequests = 50
|
||||
maxTotalRequests = 100
|
||||
maxPeersPerRequest = 1
|
||||
maxTries = 3
|
||||
inputsChannelCapacity = 200
|
||||
requestIntervalMS = 500
|
||||
maxPendingRequests = 200
|
||||
maxTotalRequests = 300
|
||||
maxRequestsPerPeer = 300
|
||||
)
|
||||
|
||||
type BlockRequest struct {
|
||||
Height uint
|
||||
PeerId string
|
||||
}
|
||||
var (
|
||||
requestTimeoutSeconds = time.Duration(1)
|
||||
)
|
||||
|
||||
type BlockPool struct {
|
||||
peers map[string]*bpPeer
|
||||
blockInfos map[uint]*bpBlockInfo
|
||||
height uint // the lowest key in blockInfos.
|
||||
started int32 // atomic
|
||||
stopped int32 // atomic
|
||||
numPending int32
|
||||
numTotal int32
|
||||
eventsCh chan interface{} // internal events.
|
||||
requestsCh chan<- BlockRequest // output of new requests to make.
|
||||
timeoutsCh chan<- string // output of peers that timed out.
|
||||
blocksCh chan<- *types.Block // output of ordered blocks.
|
||||
repeater *RepeatTimer // for requesting more bocks.
|
||||
quit chan struct{}
|
||||
// block requests
|
||||
requestsMtx sync.Mutex
|
||||
requests map[uint]*bpRequest
|
||||
height uint // the lowest key in requests.
|
||||
numPending int32
|
||||
numTotal int32
|
||||
|
||||
// peers
|
||||
peersMtx sync.Mutex
|
||||
peers map[string]*bpPeer
|
||||
|
||||
requestsCh chan<- BlockRequest
|
||||
timeoutsCh chan<- string
|
||||
repeater *RepeatTimer
|
||||
|
||||
running int32 // atomic
|
||||
}
|
||||
|
||||
func NewBlockPool(start uint, timeoutsCh chan<- string, requestsCh chan<- BlockRequest, blocksCh chan<- *types.Block) *BlockPool {
|
||||
func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
|
||||
return &BlockPool{
|
||||
peers: make(map[string]*bpPeer),
|
||||
blockInfos: make(map[uint]*bpBlockInfo),
|
||||
peers: make(map[string]*bpPeer),
|
||||
|
||||
requests: make(map[uint]*bpRequest),
|
||||
height: start,
|
||||
started: 0,
|
||||
stopped: 0,
|
||||
numPending: 0,
|
||||
numTotal: 0,
|
||||
quit: make(chan struct{}),
|
||||
|
||||
eventsCh: make(chan interface{}, eventsChannelCapacity),
|
||||
requestsCh: requestsCh,
|
||||
timeoutsCh: timeoutsCh,
|
||||
blocksCh: blocksCh,
|
||||
repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond),
|
||||
|
||||
running: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (bp *BlockPool) Start() {
|
||||
if atomic.CompareAndSwapInt32(&bp.started, 0, 1) {
|
||||
func (pool *BlockPool) Start() {
|
||||
if atomic.CompareAndSwapInt32(&pool.running, 0, 1) {
|
||||
log.Info("Starting BlockPool")
|
||||
go bp.run()
|
||||
go pool.run()
|
||||
}
|
||||
}
|
||||
|
||||
func (bp *BlockPool) Stop() {
|
||||
if atomic.CompareAndSwapInt32(&bp.stopped, 0, 1) {
|
||||
func (pool *BlockPool) Stop() {
|
||||
if atomic.CompareAndSwapInt32(&pool.running, 1, 0) {
|
||||
log.Info("Stopping BlockPool")
|
||||
close(bp.quit)
|
||||
close(bp.eventsCh)
|
||||
close(bp.requestsCh)
|
||||
close(bp.timeoutsCh)
|
||||
close(bp.blocksCh)
|
||||
bp.repeater.Stop()
|
||||
pool.repeater.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// AddBlock should be called when a block is received.
|
||||
func (bp *BlockPool) AddBlock(block *types.Block, peerId string) {
|
||||
bp.eventsCh <- bpBlockResponse{block, peerId}
|
||||
func (pool *BlockPool) IsRunning() bool {
|
||||
return atomic.LoadInt32(&pool.running) == 1
|
||||
}
|
||||
|
||||
func (bp *BlockPool) SetPeerStatus(peerId string, height uint) {
|
||||
bp.eventsCh <- bpPeerStatus{peerId, height}
|
||||
}
|
||||
|
||||
// Runs in a goroutine and processes messages.
|
||||
func (bp *BlockPool) run() {
|
||||
FOR_LOOP:
|
||||
// Run spawns requests as needed.
|
||||
func (pool *BlockPool) run() {
|
||||
RUN_LOOP:
|
||||
for {
|
||||
select {
|
||||
case msg := <-bp.eventsCh:
|
||||
bp.handleEvent(msg)
|
||||
case <-bp.repeater.Ch:
|
||||
bp.makeMoreBlockInfos()
|
||||
bp.requestBlocksFromRandomPeers(10)
|
||||
case <-bp.quit:
|
||||
break FOR_LOOP
|
||||
if atomic.LoadInt32(&pool.running) == 0 {
|
||||
break RUN_LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bp *BlockPool) handleEvent(event_ interface{}) {
|
||||
switch event := event_.(type) {
|
||||
case bpBlockResponse:
|
||||
peer := bp.peers[event.peerId]
|
||||
blockInfo := bp.blockInfos[event.block.Height]
|
||||
if blockInfo == nil {
|
||||
// block was unwanted.
|
||||
if peer != nil {
|
||||
peer.bad++
|
||||
}
|
||||
_, numPending, numTotal := pool.GetStatus()
|
||||
if numPending >= maxPendingRequests {
|
||||
// sleep for a bit.
|
||||
time.Sleep(requestIntervalMS * time.Millisecond)
|
||||
} else if numTotal >= maxTotalRequests {
|
||||
// sleep for a bit.
|
||||
time.Sleep(requestIntervalMS * time.Millisecond)
|
||||
} else {
|
||||
// block was wanted.
|
||||
if peer != nil {
|
||||
peer.good++
|
||||
}
|
||||
delete(peer.requests, event.block.Height)
|
||||
if blockInfo.block == nil {
|
||||
// peer is the first to give it to us.
|
||||
blockInfo.block = event.block
|
||||
blockInfo.blockBy = peer.id
|
||||
bp.numPending--
|
||||
if event.block.Height == bp.height {
|
||||
go bp.pushBlocksFromStart()
|
||||
}
|
||||
}
|
||||
}
|
||||
case bpPeerStatus: // updated or new status from peer
|
||||
// request blocks if possible.
|
||||
peer := bp.peers[event.peerId]
|
||||
if peer == nil {
|
||||
peer = bpNewPeer(event.peerId, event.height)
|
||||
bp.peers[peer.id] = peer
|
||||
}
|
||||
bp.requestBlocksFromPeer(peer)
|
||||
case bpRequestTimeout: // unconditional timeout for each peer's request.
|
||||
peer := bp.peers[event.peerId]
|
||||
if peer == nil {
|
||||
// cleanup was already handled.
|
||||
return
|
||||
}
|
||||
height := event.height
|
||||
request := peer.requests[height]
|
||||
if request == nil || request.block != nil {
|
||||
// the request was fulfilled by some peer or this peer.
|
||||
return
|
||||
// request for more blocks.
|
||||
height := pool.nextHeight()
|
||||
pool.makeRequest(height)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A request for peer timed out.
|
||||
peer.bad++
|
||||
if request.tries < maxTries {
|
||||
log.Warn("Timeout: Trying again.", "tries", request.tries, "peerId", peer.id)
|
||||
// try again.
|
||||
select {
|
||||
case bp.requestsCh <- BlockRequest{height, peer.id}:
|
||||
request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries
|
||||
default:
|
||||
// The request cannot be made because requestCh is full.
|
||||
// Just delete the request.
|
||||
delete(peer.requests, height)
|
||||
}
|
||||
func (pool *BlockPool) GetStatus() (uint, int32, int32) {
|
||||
pool.requestsMtx.Lock() // Lock
|
||||
defer pool.requestsMtx.Unlock()
|
||||
|
||||
return pool.height, pool.numPending, pool.numTotal
|
||||
}
|
||||
|
||||
// We need to see the second block's Validation to validate the first block.
|
||||
// So we peek two blocks at a time.
|
||||
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
|
||||
pool.requestsMtx.Lock() // Lock
|
||||
defer pool.requestsMtx.Unlock()
|
||||
|
||||
if r := pool.requests[pool.height]; r != nil {
|
||||
first = r.block
|
||||
}
|
||||
if r := pool.requests[pool.height+1]; r != nil {
|
||||
second = r.block
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Pop the first block at pool.height
|
||||
// It must have been validated by 'second'.Validation from PeekTwoBlocks().
|
||||
func (pool *BlockPool) PopRequest() {
|
||||
pool.requestsMtx.Lock() // Lock
|
||||
defer pool.requestsMtx.Unlock()
|
||||
|
||||
if r := pool.requests[pool.height]; r == nil || r.block == nil {
|
||||
panic("PopRequest() requires a valid block")
|
||||
}
|
||||
|
||||
delete(pool.requests, pool.height)
|
||||
pool.height++
|
||||
pool.numTotal--
|
||||
}
|
||||
|
||||
// Invalidates the block at pool.height.
|
||||
// Remove the peer and request from others.
|
||||
func (pool *BlockPool) RedoRequest(height uint) {
|
||||
pool.requestsMtx.Lock() // Lock
|
||||
defer pool.requestsMtx.Unlock()
|
||||
|
||||
request := pool.requests[height]
|
||||
if request.block == nil {
|
||||
panic("Expected block to be non-nil")
|
||||
}
|
||||
pool.RemovePeer(request.peerId) // Lock on peersMtx.
|
||||
request.block = nil
|
||||
request.peerId = ""
|
||||
pool.numPending++
|
||||
|
||||
go requestRoutine(pool, height)
|
||||
}
|
||||
|
||||
func (pool *BlockPool) hasBlock(height uint) bool {
|
||||
pool.requestsMtx.Lock() // Lock
|
||||
defer pool.requestsMtx.Unlock()
|
||||
|
||||
request := pool.requests[height]
|
||||
return request != nil && request.block != nil
|
||||
}
|
||||
|
||||
func (pool *BlockPool) setPeerForRequest(height uint, peerId string) {
|
||||
pool.requestsMtx.Lock() // Lock
|
||||
defer pool.requestsMtx.Unlock()
|
||||
|
||||
request := pool.requests[height]
|
||||
if request == nil {
|
||||
return
|
||||
}
|
||||
request.peerId = peerId
|
||||
}
|
||||
|
||||
func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
|
||||
pool.requestsMtx.Lock() // Lock
|
||||
defer pool.requestsMtx.Unlock()
|
||||
|
||||
request := pool.requests[block.Height]
|
||||
if request == nil {
|
||||
return
|
||||
}
|
||||
if request.peerId != peerId {
|
||||
return
|
||||
}
|
||||
if request.block != nil {
|
||||
return
|
||||
}
|
||||
request.block = block
|
||||
pool.numPending--
|
||||
}
|
||||
|
||||
func (pool *BlockPool) getPeer(peerId string) *bpPeer {
|
||||
pool.peersMtx.Lock() // Lock
|
||||
defer pool.peersMtx.Unlock()
|
||||
|
||||
peer := pool.peers[peerId]
|
||||
return peer
|
||||
}
|
||||
|
||||
// Sets the peer's blockchain height.
|
||||
func (pool *BlockPool) SetPeerHeight(peerId string, height uint) {
|
||||
pool.peersMtx.Lock() // Lock
|
||||
defer pool.peersMtx.Unlock()
|
||||
|
||||
peer := pool.peers[peerId]
|
||||
if peer != nil {
|
||||
peer.height = height
|
||||
} else {
|
||||
peer = &bpPeer{
|
||||
height: height,
|
||||
id: peerId,
|
||||
numRequests: 0,
|
||||
}
|
||||
pool.peers[peerId] = peer
|
||||
}
|
||||
}
|
||||
|
||||
func (pool *BlockPool) RemovePeer(peerId string) {
|
||||
pool.peersMtx.Lock() // Lock
|
||||
defer pool.peersMtx.Unlock()
|
||||
|
||||
delete(pool.peers, peerId)
|
||||
}
|
||||
|
||||
// Pick an available peer with at least the given minHeight.
|
||||
// If no peers are available, returns nil.
|
||||
func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer {
|
||||
pool.peersMtx.Lock()
|
||||
defer pool.peersMtx.Unlock()
|
||||
|
||||
for _, peer := range pool.peers {
|
||||
if peer.numRequests >= maxRequestsPerPeer {
|
||||
continue
|
||||
}
|
||||
if peer.height < minHeight {
|
||||
continue
|
||||
}
|
||||
peer.numRequests++
|
||||
return peer
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pool *BlockPool) decrPeer(peerId string) {
|
||||
pool.peersMtx.Lock()
|
||||
defer pool.peersMtx.Unlock()
|
||||
|
||||
peer := pool.peers[peerId]
|
||||
if peer == nil {
|
||||
return
|
||||
}
|
||||
peer.numRequests--
|
||||
}
|
||||
|
||||
func (pool *BlockPool) nextHeight() uint {
|
||||
pool.requestsMtx.Lock() // Lock
|
||||
defer pool.requestsMtx.Unlock()
|
||||
|
||||
return pool.height + uint(pool.numTotal)
|
||||
}
|
||||
|
||||
func (pool *BlockPool) makeRequest(height uint) {
|
||||
pool.requestsMtx.Lock() // Lock
|
||||
defer pool.requestsMtx.Unlock()
|
||||
|
||||
request := &bpRequest{
|
||||
height: height,
|
||||
peerId: "",
|
||||
block: nil,
|
||||
}
|
||||
pool.requests[height] = request
|
||||
|
||||
nextHeight := pool.height + uint(pool.numTotal)
|
||||
if nextHeight == height {
|
||||
pool.numTotal++
|
||||
pool.numPending++
|
||||
}
|
||||
|
||||
go requestRoutine(pool, height)
|
||||
}
|
||||
|
||||
func (pool *BlockPool) sendRequest(height uint, peerId string) {
|
||||
if atomic.LoadInt32(&pool.running) == 0 {
|
||||
return
|
||||
}
|
||||
pool.requestsCh <- BlockRequest{height, peerId}
|
||||
}
|
||||
|
||||
func (pool *BlockPool) sendTimeout(peerId string) {
|
||||
if atomic.LoadInt32(&pool.running) == 0 {
|
||||
return
|
||||
}
|
||||
pool.timeoutsCh <- peerId
|
||||
}
|
||||
|
||||
func (pool *BlockPool) debug() string {
|
||||
pool.requestsMtx.Lock() // Lock
|
||||
defer pool.requestsMtx.Unlock()
|
||||
|
||||
str := ""
|
||||
for h := pool.height; h < pool.height+uint(pool.numTotal); h++ {
|
||||
if pool.requests[h] == nil {
|
||||
str += Fmt("H(%v):X ", h)
|
||||
} else {
|
||||
log.Warn("Timeout: Deleting request")
|
||||
// delete the request.
|
||||
delete(peer.requests, height)
|
||||
blockInfo := bp.blockInfos[height]
|
||||
if blockInfo != nil {
|
||||
delete(blockInfo.requests, peer.id)
|
||||
}
|
||||
select {
|
||||
case bp.timeoutsCh <- peer.id:
|
||||
default:
|
||||
}
|
||||
|
||||
str += Fmt("H(%v):", h)
|
||||
str += Fmt("B?(%v) ", pool.requests[h].block != nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: This function is sufficient, but we should find pending blocks
|
||||
// and sample the peers in one go rather than the current O(n^2) impl.
|
||||
func (bp *BlockPool) requestBlocksFromRandomPeers(maxPeers int) {
|
||||
chosen := bp.pickAvailablePeers(maxPeers)
|
||||
log.Debug("requestBlocksFromRandomPeers", "chosen", len(chosen))
|
||||
for _, peer := range chosen {
|
||||
bp.requestBlocksFromPeer(peer)
|
||||
}
|
||||
}
|
||||
|
||||
func (bp *BlockPool) requestBlocksFromPeer(peer *bpPeer) {
|
||||
// If peer is available and can provide something...
|
||||
for height := bp.height; peer.available(); height++ {
|
||||
blockInfo := bp.blockInfos[height]
|
||||
if blockInfo == nil {
|
||||
// We're out of range.
|
||||
return
|
||||
}
|
||||
needsMorePeers := blockInfo.needsMorePeers()
|
||||
alreadyAskedPeer := blockInfo.requests[peer.id] != nil
|
||||
if needsMorePeers && !alreadyAskedPeer {
|
||||
select {
|
||||
case bp.requestsCh <- BlockRequest{height, peer.id}:
|
||||
// Create a new request and start the timer.
|
||||
request := &bpBlockRequest{
|
||||
height: height,
|
||||
peer: peer,
|
||||
}
|
||||
blockInfo.requests[peer.id] = request
|
||||
peer.requests[height] = request
|
||||
request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries
|
||||
default:
|
||||
// The request cannot be made because requestCh is full.
|
||||
// Just stop.
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bp *BlockPool) makeMoreBlockInfos() {
|
||||
// make more requests if necessary.
|
||||
for i := 0; i < requestBatchSize; i++ {
|
||||
//log.Debug("Confused?",
|
||||
// "numPending", bp.numPending, "maxPendingRequests", maxPendingRequests, "numtotal", bp.numTotal, "maxTotalRequests", maxTotalRequests)
|
||||
if bp.numPending < maxPendingRequests && bp.numTotal < maxTotalRequests {
|
||||
// Make a request for the next block height
|
||||
requestHeight := bp.height + uint(bp.numTotal)
|
||||
log.Debug("New blockInfo", "height", requestHeight)
|
||||
blockInfo := bpNewBlockInfo(requestHeight)
|
||||
bp.blockInfos[requestHeight] = blockInfo
|
||||
bp.numPending++
|
||||
bp.numTotal++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bp *BlockPool) pickAvailablePeers(choose int) []*bpPeer {
|
||||
available := []*bpPeer{}
|
||||
for _, peer := range bp.peers {
|
||||
if peer.available() {
|
||||
available = append(available, peer)
|
||||
}
|
||||
}
|
||||
perm := rand.Perm(MinInt(choose, len(available)))
|
||||
chosen := make([]*bpPeer, len(perm))
|
||||
for i, idx := range perm {
|
||||
chosen[i] = available[idx]
|
||||
}
|
||||
return chosen
|
||||
}
|
||||
|
||||
// blocking
|
||||
func (bp *BlockPool) pushBlocksFromStart() {
|
||||
for height := bp.height; ; height++ {
|
||||
// push block to blocksCh.
|
||||
blockInfo := bp.blockInfos[height]
|
||||
if blockInfo == nil || blockInfo.block == nil {
|
||||
break
|
||||
}
|
||||
bp.numTotal--
|
||||
bp.height++
|
||||
delete(bp.blockInfos, height)
|
||||
bp.blocksCh <- blockInfo.block
|
||||
}
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
type bpBlockInfo struct {
|
||||
height uint
|
||||
requests map[string]*bpBlockRequest
|
||||
block *types.Block // first block received
|
||||
blockBy string // peerId of source
|
||||
}
|
||||
|
||||
func bpNewBlockInfo(height uint) *bpBlockInfo {
|
||||
return &bpBlockInfo{
|
||||
height: height,
|
||||
requests: make(map[string]*bpBlockRequest),
|
||||
}
|
||||
}
|
||||
|
||||
func (blockInfo *bpBlockInfo) needsMorePeers() bool {
|
||||
return len(blockInfo.requests) < maxPeersPerRequest
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
type bpBlockRequest struct {
|
||||
peer *bpPeer
|
||||
height uint
|
||||
block *types.Block
|
||||
tries int
|
||||
}
|
||||
|
||||
// bump tries++ and set timeout.
|
||||
// NOTE: the timer is unconditional.
|
||||
func (request *bpBlockRequest) startAndTimeoutTo(eventsCh chan<- interface{}) {
|
||||
request.tries++
|
||||
time.AfterFunc(requestTimeoutSeconds*time.Second, func() {
|
||||
eventsCh <- bpRequestTimeout{
|
||||
peerId: request.peer.id,
|
||||
height: request.height,
|
||||
}
|
||||
})
|
||||
return str
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
type bpPeer struct {
|
||||
id string
|
||||
height uint
|
||||
requests map[uint]*bpBlockRequest
|
||||
// Count good/bad events from peer.
|
||||
good uint
|
||||
bad uint
|
||||
id string
|
||||
height uint
|
||||
numRequests int32
|
||||
}
|
||||
|
||||
func bpNewPeer(peerId string, height uint) *bpPeer {
|
||||
return &bpPeer{
|
||||
id: peerId,
|
||||
height: height,
|
||||
requests: make(map[uint]*bpBlockRequest),
|
||||
}
|
||||
}
|
||||
|
||||
func (peer *bpPeer) available() bool {
|
||||
return len(peer.requests) < maxOutstandingRequestsPerPeer
|
||||
type bpRequest struct {
|
||||
height uint
|
||||
peerId string
|
||||
block *types.Block
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
// bp.eventsCh messages
|
||||
|
||||
type bpBlockResponse struct {
|
||||
block *types.Block
|
||||
peerId string
|
||||
// Responsible for making more requests as necessary
|
||||
// Returns when a block is found (e.g. AddBlock() is called)
|
||||
func requestRoutine(pool *BlockPool, height uint) {
|
||||
for {
|
||||
var peer *bpPeer = nil
|
||||
PICK_LOOP:
|
||||
for {
|
||||
if !pool.IsRunning() {
|
||||
log.Debug("BlockPool not running. Stopping requestRoutine", "height", height)
|
||||
return
|
||||
}
|
||||
peer = pool.pickIncrAvailablePeer(height)
|
||||
if peer == nil {
|
||||
//log.Debug("No peers available", "height", height)
|
||||
time.Sleep(requestIntervalMS * time.Millisecond)
|
||||
continue PICK_LOOP
|
||||
}
|
||||
break PICK_LOOP
|
||||
}
|
||||
|
||||
pool.setPeerForRequest(height, peer.id)
|
||||
|
||||
for try := 0; try < maxTries; try++ {
|
||||
pool.sendRequest(height, peer.id)
|
||||
time.Sleep(requestTimeoutSeconds * time.Second)
|
||||
if pool.hasBlock(height) {
|
||||
pool.decrPeer(peer.id)
|
||||
return
|
||||
}
|
||||
bpHeight, _, _ := pool.GetStatus()
|
||||
if height < bpHeight {
|
||||
pool.decrPeer(peer.id)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
pool.RemovePeer(peer.id)
|
||||
pool.sendTimeout(peer.id)
|
||||
}
|
||||
}
|
||||
|
||||
type bpPeerStatus struct {
|
||||
peerId string
|
||||
height uint // blockchain tip of peer
|
||||
}
|
||||
//-------------------------------------
|
||||
|
||||
type bpRequestTimeout struct {
|
||||
peerId string
|
||||
height uint
|
||||
type BlockRequest struct {
|
||||
Height uint
|
||||
PeerId string
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package blockchain
|
|||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
. "github.com/tendermint/tendermint/common"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
|
@ -24,26 +25,34 @@ func makePeers(numPeers int, minHeight, maxHeight uint) map[string]testPeer {
|
|||
}
|
||||
|
||||
func TestBasic(t *testing.T) {
|
||||
// 100 peers anywhere at height 0 to 1000.
|
||||
peers := makePeers(100, 0, 1000)
|
||||
|
||||
peers := makePeers(10, 0, 1000)
|
||||
start := uint(42)
|
||||
maxHeight := uint(300)
|
||||
timeoutsCh := make(chan string, 100)
|
||||
requestsCh := make(chan BlockRequest, 100)
|
||||
blocksCh := make(chan *types.Block, 100)
|
||||
|
||||
pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh)
|
||||
pool := NewBlockPool(start, requestsCh, timeoutsCh)
|
||||
pool.Start()
|
||||
|
||||
// Introduce each peer.
|
||||
go func() {
|
||||
for _, peer := range peers {
|
||||
pool.SetPeerStatus(peer.id, peer.height)
|
||||
pool.SetPeerHeight(peer.id, peer.height)
|
||||
}
|
||||
}()
|
||||
|
||||
lastSeenBlock := uint(41)
|
||||
// Start a goroutine to pull blocks
|
||||
go func() {
|
||||
for {
|
||||
if !pool.IsRunning() {
|
||||
return
|
||||
}
|
||||
first, second := pool.PeekTwoBlocks()
|
||||
if first != nil && second != nil {
|
||||
pool.PopRequest()
|
||||
} else {
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Pull from channels
|
||||
for {
|
||||
|
@ -52,21 +61,15 @@ func TestBasic(t *testing.T) {
|
|||
t.Errorf("timeout: %v", peerId)
|
||||
case request := <-requestsCh:
|
||||
log.Debug("TEST: Pulled new BlockRequest", "request", request)
|
||||
// After a while, pretend like we got a block from the peer.
|
||||
if request.Height == 300 {
|
||||
return // Done!
|
||||
}
|
||||
// Request desired, pretend like we got the block immediately.
|
||||
go func() {
|
||||
block := &types.Block{Header: &types.Header{Height: request.Height}}
|
||||
pool.AddBlock(block, request.PeerId)
|
||||
log.Debug("TEST: Added block", "block", request.Height, "peer", request.PeerId)
|
||||
}()
|
||||
case block := <-blocksCh:
|
||||
log.Debug("TEST: Pulled new Block", "height", block.Height)
|
||||
if block.Height != lastSeenBlock+1 {
|
||||
t.Fatalf("Wrong order of blocks seen. Expected: %v Got: %v", lastSeenBlock+1, block.Height)
|
||||
}
|
||||
lastSeenBlock++
|
||||
if block.Height == maxHeight {
|
||||
return // Done!
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,39 +77,52 @@ func TestBasic(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTimeout(t *testing.T) {
|
||||
peers := makePeers(100, 0, 1000)
|
||||
peers := makePeers(10, 0, 1000)
|
||||
start := uint(42)
|
||||
timeoutsCh := make(chan string, 10)
|
||||
requestsCh := make(chan BlockRequest, 10)
|
||||
blocksCh := make(chan *types.Block, 100)
|
||||
|
||||
pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh)
|
||||
timeoutsCh := make(chan string, 100)
|
||||
requestsCh := make(chan BlockRequest, 100)
|
||||
pool := NewBlockPool(start, requestsCh, timeoutsCh)
|
||||
pool.Start()
|
||||
|
||||
// Introduce each peer.
|
||||
go func() {
|
||||
for _, peer := range peers {
|
||||
pool.SetPeerStatus(peer.id, peer.height)
|
||||
pool.SetPeerHeight(peer.id, peer.height)
|
||||
}
|
||||
}()
|
||||
|
||||
// Start a goroutine to pull blocks
|
||||
go func() {
|
||||
for {
|
||||
if !pool.IsRunning() {
|
||||
return
|
||||
}
|
||||
first, second := pool.PeekTwoBlocks()
|
||||
if first != nil && second != nil {
|
||||
pool.PopRequest()
|
||||
} else {
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Pull from channels
|
||||
counter := 0
|
||||
timedOut := map[string]struct{}{}
|
||||
for {
|
||||
select {
|
||||
case peerId := <-timeoutsCh:
|
||||
// Timed out. Done!
|
||||
if peers[peerId].id != peerId {
|
||||
t.Errorf("Unexpected peer from timeoutsCh")
|
||||
log.Debug("Timeout", "peerId", peerId)
|
||||
if _, ok := timedOut[peerId]; !ok {
|
||||
counter++
|
||||
if counter == len(peers) {
|
||||
return // Done!
|
||||
}
|
||||
}
|
||||
return
|
||||
case _ = <-requestsCh:
|
||||
// Don't do anything, let it time out.
|
||||
case _ = <-blocksCh:
|
||||
t.Errorf("Got block when none expected")
|
||||
return
|
||||
case request := <-requestsCh:
|
||||
log.Debug("TEST: Pulled new BlockRequest", "request", request)
|
||||
}
|
||||
}
|
||||
|
||||
pool.Stop()
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,304 @@
|
|||
package blockchain
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/binary"
|
||||
. "github.com/tendermint/tendermint/common"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const (
|
||||
BlockchainChannel = byte(0x40)
|
||||
defaultChannelCapacity = 100
|
||||
defaultSleepIntervalMS = 500
|
||||
trySyncIntervalMS = 100
|
||||
|
||||
// stop syncing when last block's time is
|
||||
// within this much of the system time.
|
||||
stopSyncingDurationMinutes = 10
|
||||
)
|
||||
|
||||
type stateResetter interface {
|
||||
ResetToState(*sm.State)
|
||||
}
|
||||
|
||||
// BlockchainReactor handles long-term catchup syncing.
|
||||
type BlockchainReactor struct {
|
||||
sw *p2p.Switch
|
||||
state *sm.State
|
||||
store *BlockStore
|
||||
pool *BlockPool
|
||||
sync bool
|
||||
requestsCh chan BlockRequest
|
||||
timeoutsCh chan string
|
||||
lastBlock *types.Block
|
||||
quit chan struct{}
|
||||
running uint32
|
||||
}
|
||||
|
||||
func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
|
||||
if state.LastBlockHeight != store.Height() {
|
||||
panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
|
||||
}
|
||||
requestsCh := make(chan BlockRequest, defaultChannelCapacity)
|
||||
timeoutsCh := make(chan string, defaultChannelCapacity)
|
||||
pool := NewBlockPool(
|
||||
store.Height()+1,
|
||||
requestsCh,
|
||||
timeoutsCh,
|
||||
)
|
||||
bcR := &BlockchainReactor{
|
||||
state: state,
|
||||
store: store,
|
||||
pool: pool,
|
||||
sync: sync,
|
||||
requestsCh: requestsCh,
|
||||
timeoutsCh: timeoutsCh,
|
||||
quit: make(chan struct{}),
|
||||
running: uint32(0),
|
||||
}
|
||||
return bcR
|
||||
}
|
||||
|
||||
// Implements Reactor
|
||||
func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
|
||||
if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) {
|
||||
log.Info("Starting BlockchainReactor")
|
||||
bcR.sw = sw
|
||||
bcR.pool.Start()
|
||||
if bcR.sync {
|
||||
go bcR.poolRoutine()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Implements Reactor
|
||||
func (bcR *BlockchainReactor) Stop() {
|
||||
if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) {
|
||||
log.Info("Stopping BlockchainReactor")
|
||||
close(bcR.quit)
|
||||
bcR.pool.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Implements Reactor
|
||||
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
return []*p2p.ChannelDescriptor{
|
||||
&p2p.ChannelDescriptor{
|
||||
Id: BlockchainChannel,
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 100,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Implements Reactor
|
||||
func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
|
||||
// Send peer our state.
|
||||
peer.Send(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
|
||||
}
|
||||
|
||||
// Implements Reactor
|
||||
func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
|
||||
// Remove peer from the pool.
|
||||
bcR.pool.RemovePeer(peer.Key)
|
||||
}
|
||||
|
||||
// Implements Reactor
|
||||
func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
|
||||
_, msg_, err := DecodeMessage(msgBytes)
|
||||
if err != nil {
|
||||
log.Warn("Error decoding message", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("Received message", "msg", msg_)
|
||||
|
||||
switch msg := msg_.(type) {
|
||||
case bcBlockRequestMessage:
|
||||
// Got a request for a block. Respond with block if we have it.
|
||||
block := bcR.store.LoadBlock(msg.Height)
|
||||
if block != nil {
|
||||
msg := bcBlockResponseMessage{Block: block}
|
||||
queued := src.TrySend(BlockchainChannel, msg)
|
||||
if !queued {
|
||||
// queue is full, just ignore.
|
||||
}
|
||||
} else {
|
||||
// TODO peer is asking for things we don't have.
|
||||
}
|
||||
case bcBlockResponseMessage:
|
||||
// Got a block.
|
||||
bcR.pool.AddBlock(msg.Block, src.Key)
|
||||
case bcPeerStatusMessage:
|
||||
// Got a peer status.
|
||||
bcR.pool.SetPeerHeight(src.Key, msg.Height)
|
||||
default:
|
||||
// Ignore unknown message
|
||||
}
|
||||
}
|
||||
|
||||
// Handle messages from the poolReactor telling the reactor what to do.
|
||||
func (bcR *BlockchainReactor) poolRoutine() {
|
||||
|
||||
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
|
||||
|
||||
FOR_LOOP:
|
||||
for {
|
||||
select {
|
||||
case request := <-bcR.requestsCh: // chan BlockRequest
|
||||
peer := bcR.sw.Peers().Get(request.PeerId)
|
||||
if peer == nil {
|
||||
// We can't fulfill the request.
|
||||
continue FOR_LOOP
|
||||
}
|
||||
msg := bcBlockRequestMessage{request.Height}
|
||||
queued := peer.TrySend(BlockchainChannel, msg)
|
||||
if !queued {
|
||||
// We couldn't queue the request.
|
||||
time.Sleep(defaultSleepIntervalMS * time.Millisecond)
|
||||
continue FOR_LOOP
|
||||
}
|
||||
case peerId := <-bcR.timeoutsCh: // chan string
|
||||
// Peer timed out.
|
||||
peer := bcR.sw.Peers().Get(peerId)
|
||||
if peer != nil {
|
||||
bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
|
||||
}
|
||||
case _ = <-trySyncTicker.C: // chan time
|
||||
//var lastValidatedBlock *types.Block
|
||||
SYNC_LOOP:
|
||||
for i := 0; i < 10; i++ {
|
||||
// See if there are any blocks to sync.
|
||||
first, second := bcR.pool.PeekTwoBlocks()
|
||||
//log.Debug("TrySync peeked", "first", first, "second", second)
|
||||
if first == nil || second == nil {
|
||||
// We need both to sync the first block.
|
||||
break SYNC_LOOP
|
||||
}
|
||||
firstParts := first.MakePartSet()
|
||||
firstPartsHeader := firstParts.Header()
|
||||
// Finally, verify the first block using the second's validation.
|
||||
err := bcR.state.BondedValidators.VerifyValidation(
|
||||
first.Hash(), firstPartsHeader, first.Height, second.Validation)
|
||||
if err != nil {
|
||||
log.Debug("error in validation", "error", err)
|
||||
bcR.pool.RedoRequest(first.Height)
|
||||
break SYNC_LOOP
|
||||
} else {
|
||||
bcR.pool.PopRequest()
|
||||
err := bcR.state.AppendBlock(first, firstPartsHeader)
|
||||
if err != nil {
|
||||
// TODO This is bad, are we zombie?
|
||||
panic(Fmt("Failed to process committed block: %v", err))
|
||||
}
|
||||
bcR.store.SaveBlock(first, firstParts, second.Validation)
|
||||
bcR.state.Save()
|
||||
//lastValidatedBlock = first
|
||||
}
|
||||
}
|
||||
/*
|
||||
// We're done syncing for now (will do again shortly)
|
||||
// See if we want to stop syncing and turn on the
|
||||
// consensus reactor.
|
||||
// TODO: use other heuristics too besides blocktime.
|
||||
// It's not a security concern, as it only needs to happen
|
||||
// upon node sync, and there's also a second (slower)
|
||||
// method of syncing in the consensus reactor.
|
||||
|
||||
if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
|
||||
go func() {
|
||||
log.Info("Stopping blockpool syncing, turning on consensus...")
|
||||
trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others.
|
||||
conR := bcR.sw.Reactor("CONSENSUS")
|
||||
conR.(stateResetter).ResetToState(bcR.state)
|
||||
conR.Start(bcR.sw)
|
||||
for _, peer := range bcR.sw.Peers().List() {
|
||||
conR.AddPeer(peer)
|
||||
}
|
||||
}()
|
||||
break FOR_LOOP
|
||||
}
|
||||
*/
|
||||
continue FOR_LOOP
|
||||
case <-bcR.quit:
|
||||
break FOR_LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bcR *BlockchainReactor) BroadcastStatus() error {
|
||||
bcR.sw.Broadcast(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
|
||||
return nil
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
const (
|
||||
msgTypeUnknown = byte(0x00)
|
||||
msgTypeBlockRequest = byte(0x10)
|
||||
msgTypeBlockResponse = byte(0x11)
|
||||
msgTypePeerStatus = byte(0x20)
|
||||
)
|
||||
|
||||
// TODO: check for unnecessary extra bytes at the end.
|
||||
func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
|
||||
n := new(int64)
|
||||
msgType = bz[0]
|
||||
r := bytes.NewReader(bz)
|
||||
switch msgType {
|
||||
case msgTypeBlockRequest:
|
||||
msg = binary.ReadBinary(bcBlockRequestMessage{}, r, n, &err)
|
||||
case msgTypeBlockResponse:
|
||||
msg = binary.ReadBinary(bcBlockResponseMessage{}, r, n, &err)
|
||||
case msgTypePeerStatus:
|
||||
msg = binary.ReadBinary(bcPeerStatusMessage{}, r, n, &err)
|
||||
default:
|
||||
msg = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
type bcBlockRequestMessage struct {
|
||||
Height uint
|
||||
}
|
||||
|
||||
func (m bcBlockRequestMessage) TypeByte() byte { return msgTypeBlockRequest }
|
||||
|
||||
func (m bcBlockRequestMessage) String() string {
|
||||
return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
type bcBlockResponseMessage struct {
|
||||
Block *types.Block
|
||||
}
|
||||
|
||||
func (m bcBlockResponseMessage) TypeByte() byte { return msgTypeBlockResponse }
|
||||
|
||||
func (m bcBlockResponseMessage) String() string {
|
||||
return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
type bcPeerStatusMessage struct {
|
||||
Height uint
|
||||
}
|
||||
|
||||
func (m bcPeerStatusMessage) TypeByte() byte { return msgTypePeerStatus }
|
||||
|
||||
func (m bcPeerStatusMessage) String() string {
|
||||
return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height)
|
||||
}
|
|
@ -57,7 +57,7 @@ func (bs *BlockStore) LoadBlock(height uint) *types.Block {
|
|||
if r == nil {
|
||||
panic(Fmt("Block does not exist at height %v", height))
|
||||
}
|
||||
meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta)
|
||||
meta := binary.ReadBinary(&types.BlockMeta{}, r, &n, &err).(*types.BlockMeta)
|
||||
if err != nil {
|
||||
panic(Fmt("Error reading block meta: %v", err))
|
||||
}
|
||||
|
@ -87,14 +87,14 @@ func (bs *BlockStore) LoadBlockPart(height uint, index uint) *types.Part {
|
|||
return part
|
||||
}
|
||||
|
||||
func (bs *BlockStore) LoadBlockMeta(height uint) *BlockMeta {
|
||||
func (bs *BlockStore) LoadBlockMeta(height uint) *types.BlockMeta {
|
||||
var n int64
|
||||
var err error
|
||||
r := bs.GetReader(calcBlockMetaKey(height))
|
||||
if r == nil {
|
||||
panic(Fmt("BlockMeta does not exist for height %v", height))
|
||||
}
|
||||
meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta)
|
||||
meta := binary.ReadBinary(&types.BlockMeta{}, r, &n, &err).(*types.BlockMeta)
|
||||
if err != nil {
|
||||
panic(Fmt("Error reading block meta: %v", err))
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
|
|||
}
|
||||
|
||||
// Save block meta
|
||||
meta := makeBlockMeta(block, blockParts)
|
||||
meta := types.NewBlockMeta(block, blockParts)
|
||||
metaBytes := binary.BinaryBytes(meta)
|
||||
bs.db.Set(calcBlockMetaKey(height), metaBytes)
|
||||
|
||||
|
@ -184,22 +184,6 @@ func (bs *BlockStore) saveBlockPart(height uint, index uint, part *types.Part) {
|
|||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
type BlockMeta struct {
|
||||
Hash []byte // The block hash
|
||||
Header *types.Header // The block's Header
|
||||
Parts types.PartSetHeader // The PartSetHeader, for transfer
|
||||
}
|
||||
|
||||
func makeBlockMeta(block *types.Block, blockParts *types.PartSet) *BlockMeta {
|
||||
return &BlockMeta{
|
||||
Hash: block.Hash(),
|
||||
Header: block.Header,
|
||||
Parts: blockParts.Header(),
|
||||
}
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
func calcBlockMetaKey(height uint) []byte {
|
||||
return []byte(fmt.Sprintf("H:%v", height))
|
||||
}
|
||||
|
|
|
@ -1,44 +1,65 @@
|
|||
package common
|
||||
|
||||
import "time"
|
||||
import "sync"
|
||||
|
||||
/*
|
||||
RepeatTimer repeatedly sends a struct{}{} to .Ch after each "dur" period.
|
||||
It's good for keeping connections alive.
|
||||
*/
|
||||
type RepeatTimer struct {
|
||||
Name string
|
||||
Ch chan struct{}
|
||||
quit chan struct{}
|
||||
dur time.Duration
|
||||
timer *time.Timer
|
||||
Ch chan time.Time
|
||||
|
||||
mtx sync.Mutex
|
||||
name string
|
||||
ticker *time.Ticker
|
||||
quit chan struct{}
|
||||
dur time.Duration
|
||||
}
|
||||
|
||||
func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer {
|
||||
var ch = make(chan struct{})
|
||||
var quit = make(chan struct{})
|
||||
var t = &RepeatTimer{Name: name, Ch: ch, dur: dur, quit: quit}
|
||||
t.timer = time.AfterFunc(dur, t.fireRoutine)
|
||||
var t = &RepeatTimer{
|
||||
Ch: make(chan time.Time),
|
||||
ticker: time.NewTicker(dur),
|
||||
quit: make(chan struct{}),
|
||||
name: name,
|
||||
dur: dur,
|
||||
}
|
||||
go t.fireRoutine(t.ticker)
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *RepeatTimer) fireRoutine() {
|
||||
select {
|
||||
case t.Ch <- struct{}{}:
|
||||
t.timer.Reset(t.dur)
|
||||
case <-t.quit:
|
||||
// do nothing
|
||||
default:
|
||||
t.timer.Reset(t.dur)
|
||||
func (t *RepeatTimer) fireRoutine(ticker *time.Ticker) {
|
||||
for {
|
||||
select {
|
||||
case t_ := <-ticker.C:
|
||||
t.Ch <- t_
|
||||
case <-t.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait the duration again before firing.
|
||||
func (t *RepeatTimer) Reset() {
|
||||
t.timer.Reset(t.dur)
|
||||
t.mtx.Lock() // Lock
|
||||
defer t.mtx.Unlock()
|
||||
|
||||
if t.ticker != nil {
|
||||
t.ticker.Stop()
|
||||
}
|
||||
t.ticker = time.NewTicker(t.dur)
|
||||
go t.fireRoutine(t.ticker)
|
||||
}
|
||||
|
||||
func (t *RepeatTimer) Stop() bool {
|
||||
close(t.quit)
|
||||
return t.timer.Stop()
|
||||
t.mtx.Lock() // Lock
|
||||
defer t.mtx.Unlock()
|
||||
|
||||
exists := t.ticker != nil
|
||||
if exists {
|
||||
t.ticker.Stop()
|
||||
t.ticker = nil
|
||||
}
|
||||
return exists
|
||||
}
|
||||
|
|
|
@ -104,6 +104,8 @@ func initDefaults(rootDir string) {
|
|||
app.SetDefault("GenesisFile", rootDir+"/genesis.json")
|
||||
app.SetDefault("AddrBookFile", rootDir+"/addrbook.json")
|
||||
app.SetDefault("PrivValidatorfile", rootDir+"/priv_validator.json")
|
||||
|
||||
app.SetDefault("FastSync", false)
|
||||
}
|
||||
|
||||
func Init(rootDir string) {
|
||||
|
@ -161,6 +163,7 @@ func ParseFlags(args []string) {
|
|||
flags.BoolVar(&printHelp, "help", false, "Print this help message.")
|
||||
flags.String("listen_addr", app.GetString("ListenAddr"), "Listen address. (0.0.0.0:0 means any interface, any port)")
|
||||
flags.String("seed_node", app.GetString("SeedNode"), "Address of seed node")
|
||||
flags.Bool("fast_sync", app.GetBool("FastSync"), "Fast blockchain syncing")
|
||||
flags.String("rpc_http_listen_addr", app.GetString("RPC.HTTP.ListenAddr"), "RPC listen address. Port required")
|
||||
flags.Parse(args)
|
||||
if printHelp {
|
||||
|
@ -171,6 +174,7 @@ func ParseFlags(args []string) {
|
|||
// Merge parsed flag values onto app.
|
||||
app.BindPFlag("ListenAddr", flags.Lookup("listen_addr"))
|
||||
app.BindPFlag("SeedNode", flags.Lookup("seed_node"))
|
||||
app.BindPFlag("FastSync", flags.Lookup("fast_sync"))
|
||||
app.BindPFlag("RPC.HTTP.ListenAddr", flags.Lookup("rpc_http_listen_addr"))
|
||||
|
||||
// Confused?
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/tendermint/tendermint/account"
|
||||
"github.com/tendermint/tendermint/binary"
|
||||
. "github.com/tendermint/tendermint/common"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
|
@ -94,3 +95,7 @@ func (pol *POL) StringShort() string {
|
|||
Fingerprint(pol.BlockHash), pol.BlockParts)
|
||||
}
|
||||
}
|
||||
|
||||
func (pol *POL) MakePartSet() *types.PartSet {
|
||||
return types.NewPartSetFromData(binary.BinaryBytes(pol))
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/binary"
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
. "github.com/tendermint/tendermint/common"
|
||||
. "github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
|
@ -17,9 +18,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
StateCh = byte(0x20)
|
||||
DataCh = byte(0x21)
|
||||
VoteCh = byte(0x22)
|
||||
StateChannel = byte(0x20)
|
||||
DataChannel = byte(0x21)
|
||||
VoteChannel = byte(0x22)
|
||||
|
||||
peerStateKey = "ConsensusReactor.peerState"
|
||||
|
||||
|
@ -28,17 +29,18 @@ const (
|
|||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
// The reactor's underlying ConsensusState may change state at any time.
|
||||
// We atomically copy the RoundState struct before using it.
|
||||
type ConsensusReactor struct {
|
||||
sw *p2p.Switch
|
||||
started uint32
|
||||
stopped uint32
|
||||
running uint32
|
||||
quit chan struct{}
|
||||
|
||||
blockStore *types.BlockStore
|
||||
blockStore *bc.BlockStore
|
||||
conS *ConsensusState
|
||||
}
|
||||
|
||||
func NewConsensusReactor(consensusState *ConsensusState, blockStore *types.BlockStore) *ConsensusReactor {
|
||||
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor {
|
||||
conR := &ConsensusReactor{
|
||||
blockStore: blockStore,
|
||||
quit: make(chan struct{}),
|
||||
|
@ -49,7 +51,7 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *types.Block
|
|||
|
||||
// Implements Reactor
|
||||
func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
|
||||
if atomic.CompareAndSwapUint32(&conR.started, 0, 1) {
|
||||
if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
|
||||
log.Info("Starting ConsensusReactor")
|
||||
conR.sw = sw
|
||||
conR.conS.Start()
|
||||
|
@ -59,15 +61,15 @@ func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
|
|||
|
||||
// Implements Reactor
|
||||
func (conR *ConsensusReactor) Stop() {
|
||||
if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) {
|
||||
if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
|
||||
log.Info("Stopping ConsensusReactor")
|
||||
conR.conS.Stop()
|
||||
close(conR.quit)
|
||||
}
|
||||
}
|
||||
|
||||
func (conR *ConsensusReactor) IsStopped() bool {
|
||||
return atomic.LoadUint32(&conR.stopped) == 1
|
||||
func (conR *ConsensusReactor) IsRunning() bool {
|
||||
return atomic.LoadUint32(&conR.running) == 1
|
||||
}
|
||||
|
||||
// Implements Reactor
|
||||
|
@ -75,15 +77,15 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
|
|||
// TODO optimize
|
||||
return []*p2p.ChannelDescriptor{
|
||||
&p2p.ChannelDescriptor{
|
||||
Id: StateCh,
|
||||
Id: StateChannel,
|
||||
Priority: 5,
|
||||
},
|
||||
&p2p.ChannelDescriptor{
|
||||
Id: DataCh,
|
||||
Id: DataChannel,
|
||||
Priority: 5,
|
||||
},
|
||||
&p2p.ChannelDescriptor{
|
||||
Id: VoteCh,
|
||||
Id: VoteChannel,
|
||||
Priority: 5,
|
||||
},
|
||||
}
|
||||
|
@ -91,6 +93,10 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
|
|||
|
||||
// Implements Reactor
|
||||
func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
|
||||
if !conR.IsRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
// Create peerState for peer
|
||||
peerState := NewPeerState(peer)
|
||||
peer.Data.Set(peerStateKey, peerState)
|
||||
|
@ -105,11 +111,18 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
|
|||
|
||||
// Implements Reactor
|
||||
func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
|
||||
if !conR.IsRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
//peer.Data.Get(peerStateKey).(*PeerState).Disconnect()
|
||||
}
|
||||
|
||||
// Implements Reactor
|
||||
func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
|
||||
if !conR.IsRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
// Get round state
|
||||
rs := conR.conS.GetRoundState()
|
||||
|
@ -122,7 +135,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
|
|||
log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes)
|
||||
|
||||
switch chId {
|
||||
case StateCh:
|
||||
case StateChannel:
|
||||
switch msg := msg_.(type) {
|
||||
case *NewRoundStepMessage:
|
||||
ps.ApplyNewRoundStepMessage(msg, rs)
|
||||
|
@ -134,7 +147,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
|
|||
// Ignore unknown message
|
||||
}
|
||||
|
||||
case DataCh:
|
||||
case DataChannel:
|
||||
switch msg := msg_.(type) {
|
||||
case *Proposal:
|
||||
ps.SetHasProposal(msg)
|
||||
|
@ -155,7 +168,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
|
|||
// Ignore unknown message
|
||||
}
|
||||
|
||||
case VoteCh:
|
||||
case VoteChannel:
|
||||
switch msg := msg_.(type) {
|
||||
case *VoteMessage:
|
||||
vote := msg.Vote
|
||||
|
@ -192,7 +205,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
|
|||
Type: vote.Type,
|
||||
Index: index,
|
||||
}
|
||||
conR.sw.Broadcast(StateCh, msg)
|
||||
conR.sw.Broadcast(StateChannel, msg)
|
||||
}
|
||||
|
||||
default:
|
||||
|
@ -212,6 +225,11 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
|
|||
conR.conS.SetPrivValidator(priv)
|
||||
}
|
||||
|
||||
// Reset to some state.
|
||||
func (conR *ConsensusReactor) ResetToState(state *sm.State) {
|
||||
conR.conS.updateToState(state, false)
|
||||
}
|
||||
|
||||
//--------------------------------------
|
||||
|
||||
func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
|
||||
|
@ -252,10 +270,10 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
|
|||
|
||||
nrsMsg, csMsg := makeRoundStepMessages(rs)
|
||||
if nrsMsg != nil {
|
||||
conR.sw.Broadcast(StateCh, nrsMsg)
|
||||
conR.sw.Broadcast(StateChannel, nrsMsg)
|
||||
}
|
||||
if csMsg != nil {
|
||||
conR.sw.Broadcast(StateCh, csMsg)
|
||||
conR.sw.Broadcast(StateChannel, csMsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -264,10 +282,10 @@ func (conR *ConsensusReactor) sendNewRoundStepRoutine(peer *p2p.Peer) {
|
|||
rs := conR.conS.GetRoundState()
|
||||
nrsMsg, csMsg := makeRoundStepMessages(rs)
|
||||
if nrsMsg != nil {
|
||||
peer.Send(StateCh, nrsMsg)
|
||||
peer.Send(StateChannel, nrsMsg)
|
||||
}
|
||||
if csMsg != nil {
|
||||
peer.Send(StateCh, nrsMsg)
|
||||
peer.Send(StateChannel, nrsMsg)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -276,7 +294,7 @@ func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
|
|||
OUTER_LOOP:
|
||||
for {
|
||||
// Manage disconnects from self or peer.
|
||||
if peer.IsStopped() || conR.IsStopped() {
|
||||
if !peer.IsRunning() || !conR.IsRunning() {
|
||||
log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
|
||||
return
|
||||
}
|
||||
|
@ -296,7 +314,7 @@ OUTER_LOOP:
|
|||
Type: partTypeProposalBlock,
|
||||
Part: part,
|
||||
}
|
||||
peer.Send(DataCh, msg)
|
||||
peer.Send(DataChannel, msg)
|
||||
ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
|
@ -306,7 +324,7 @@ OUTER_LOOP:
|
|||
if 0 < prs.Height && prs.Height < rs.Height {
|
||||
//log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
|
||||
if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
|
||||
// Ensure that the peer's PartSetHeaeder is correct
|
||||
// Ensure that the peer's PartSetHeader is correct
|
||||
blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
|
||||
if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
|
||||
log.Debug("Peer ProposalBlockParts mismatch, sleeping",
|
||||
|
@ -329,7 +347,7 @@ OUTER_LOOP:
|
|||
Type: partTypeProposalBlock,
|
||||
Part: part,
|
||||
}
|
||||
peer.Send(DataCh, msg)
|
||||
peer.Send(DataChannel, msg)
|
||||
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
|
||||
continue OUTER_LOOP
|
||||
} else {
|
||||
|
@ -349,7 +367,7 @@ OUTER_LOOP:
|
|||
// Send proposal?
|
||||
if rs.Proposal != nil && !prs.Proposal {
|
||||
msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal}
|
||||
peer.Send(DataCh, msg)
|
||||
peer.Send(DataChannel, msg)
|
||||
ps.SetHasProposal(rs.Proposal)
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
|
@ -363,7 +381,7 @@ OUTER_LOOP:
|
|||
Type: partTypeProposalPOL,
|
||||
Part: rs.ProposalPOLParts.GetPart(index),
|
||||
}
|
||||
peer.Send(DataCh, msg)
|
||||
peer.Send(DataChannel, msg)
|
||||
ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
|
@ -379,7 +397,7 @@ func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState)
|
|||
OUTER_LOOP:
|
||||
for {
|
||||
// Manage disconnects from self or peer.
|
||||
if peer.IsStopped() || conR.IsStopped() {
|
||||
if !peer.IsRunning() || !conR.IsRunning() {
|
||||
log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
|
||||
return
|
||||
}
|
||||
|
@ -397,7 +415,7 @@ OUTER_LOOP:
|
|||
vote := voteSet.GetByIndex(index)
|
||||
// NOTE: vote may be a commit.
|
||||
msg := &VoteMessage{index, vote}
|
||||
peer.Send(VoteCh, msg)
|
||||
peer.Send(VoteChannel, msg)
|
||||
ps.SetHasVote(vote, index)
|
||||
return true
|
||||
}
|
||||
|
@ -421,7 +439,7 @@ OUTER_LOOP:
|
|||
Signature: commit.Signature,
|
||||
}
|
||||
msg := &VoteMessage{index, vote}
|
||||
peer.Send(VoteCh, msg)
|
||||
peer.Send(VoteChannel, msg)
|
||||
ps.SetHasVote(vote, index)
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ import (
|
|||
|
||||
"github.com/tendermint/tendermint/account"
|
||||
"github.com/tendermint/tendermint/binary"
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
. "github.com/tendermint/tendermint/common"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
. "github.com/tendermint/tendermint/consensus/types"
|
||||
|
@ -234,7 +235,7 @@ type ConsensusState struct {
|
|||
stopped uint32
|
||||
quit chan struct{}
|
||||
|
||||
blockStore *types.BlockStore
|
||||
blockStore *bc.BlockStore
|
||||
mempoolReactor *mempl.MempoolReactor
|
||||
runActionCh chan RoundAction
|
||||
newStepCh chan *RoundState
|
||||
|
@ -247,7 +248,7 @@ type ConsensusState struct {
|
|||
lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on.
|
||||
}
|
||||
|
||||
func NewConsensusState(state *sm.State, blockStore *types.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState {
|
||||
func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState {
|
||||
cs := &ConsensusState{
|
||||
quit: make(chan struct{}),
|
||||
blockStore: blockStore,
|
||||
|
@ -255,7 +256,7 @@ func NewConsensusState(state *sm.State, blockStore *types.BlockStore, mempoolRea
|
|||
runActionCh: make(chan RoundAction, 1),
|
||||
newStepCh: make(chan *RoundState, 1),
|
||||
}
|
||||
cs.updateToState(state)
|
||||
cs.updateToState(state, true)
|
||||
return cs
|
||||
}
|
||||
|
||||
|
@ -456,9 +457,9 @@ ACTION_LOOP:
|
|||
// If calculated round is greater than 0 (based on BlockTime or calculated StartTime)
|
||||
// then also sets up the appropriate round, and cs.Step becomes RoundStepNewRound.
|
||||
// Otherwise the round is 0 and cs.Step becomes RoundStepNewHeight.
|
||||
func (cs *ConsensusState) updateToState(state *sm.State) {
|
||||
func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) {
|
||||
// Sanity check state.
|
||||
if cs.Height > 0 && cs.Height != state.LastBlockHeight {
|
||||
if contiguous && cs.Height > 0 && cs.Height != state.LastBlockHeight {
|
||||
panic(Fmt("updateToState() expected state height of %v but found %v",
|
||||
cs.Height, state.LastBlockHeight))
|
||||
}
|
||||
|
@ -466,6 +467,8 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
|
|||
// Reset fields based on state.
|
||||
validators := state.BondedValidators
|
||||
height := state.LastBlockHeight + 1 // next desired block height
|
||||
|
||||
// RoundState fields
|
||||
cs.Height = height
|
||||
cs.Round = 0
|
||||
cs.Step = RoundStepNewHeight
|
||||
|
@ -641,12 +644,12 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) {
|
|||
return
|
||||
}
|
||||
|
||||
blockParts = types.NewPartSetFromData(binary.BinaryBytes(block))
|
||||
blockParts = block.MakePartSet()
|
||||
pol = cs.LockedPOL // If exists, is a PoUnlock.
|
||||
}
|
||||
|
||||
if pol != nil {
|
||||
polParts = types.NewPartSetFromData(binary.BinaryBytes(pol))
|
||||
polParts = pol.MakePartSet()
|
||||
}
|
||||
|
||||
// Make proposal
|
||||
|
@ -856,7 +859,7 @@ func (cs *ConsensusState) TryFinalizeCommit(height uint) bool {
|
|||
// We have the block, so save/stage/sign-commit-vote.
|
||||
cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Commits)
|
||||
// Increment height.
|
||||
cs.updateToState(cs.stagedState)
|
||||
cs.updateToState(cs.stagedState, true)
|
||||
// cs.Step is now RoundStepNewHeight or RoundStepNewRound
|
||||
cs.newStepCh <- cs.getRoundState()
|
||||
return true
|
||||
|
|
|
@ -3,15 +3,15 @@ package consensus
|
|||
import (
|
||||
"sort"
|
||||
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
dbm "github.com/tendermint/tendermint/db"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func randConsensusState() (*ConsensusState, []*sm.PrivValidator) {
|
||||
state, _, privValidators := sm.RandGenesisState(20, false, 1000, 10, false, 1000)
|
||||
blockStore := types.NewBlockStore(dbm.NewMemDB())
|
||||
blockStore := bc.NewBlockStore(dbm.NewMemDB())
|
||||
mempool := mempl.NewMempool(state)
|
||||
mempoolReactor := mempl.NewMempoolReactor(mempool)
|
||||
cs := NewConsensusState(state, blockStore, mempoolReactor)
|
||||
|
|
|
@ -34,7 +34,7 @@ type VoteSet struct {
|
|||
maj23Exists bool
|
||||
}
|
||||
|
||||
// Constructs a new VoteSet struct used to accumulate votes for each round.
|
||||
// Constructs a new VoteSet struct used to accumulate votes for given height/round.
|
||||
func NewVoteSet(height uint, round uint, type_ byte, valSet *sm.ValidatorSet) *VoteSet {
|
||||
if height == 0 {
|
||||
panic("Cannot make VoteSet for height == 0, doesn't make sense.")
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
. "github.com/tendermint/tendermint/common"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/consensus"
|
||||
|
@ -12,15 +13,15 @@ import (
|
|||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/rpc"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
type Node struct {
|
||||
lz []p2p.Listener
|
||||
sw *p2p.Switch
|
||||
book *p2p.AddrBook
|
||||
blockStore *bc.BlockStore
|
||||
pexReactor *p2p.PEXReactor
|
||||
blockStore *types.BlockStore
|
||||
bcReactor *bc.BlockchainReactor
|
||||
mempoolReactor *mempl.MempoolReactor
|
||||
consensusState *consensus.ConsensusState
|
||||
consensusReactor *consensus.ConsensusReactor
|
||||
|
@ -30,7 +31,7 @@ type Node struct {
|
|||
func NewNode() *Node {
|
||||
// Get BlockStore
|
||||
blockStoreDB := dbm.GetDB("blockstore")
|
||||
blockStore := types.NewBlockStore(blockStoreDB)
|
||||
blockStore := bc.NewBlockStore(blockStoreDB)
|
||||
|
||||
// Get State
|
||||
stateDB := dbm.GetDB("state")
|
||||
|
@ -53,6 +54,9 @@ func NewNode() *Node {
|
|||
book := p2p.NewAddrBook(config.App().GetString("AddrBookFile"))
|
||||
pexReactor := p2p.NewPEXReactor(book)
|
||||
|
||||
// Get BlockchainReactor
|
||||
bcReactor := bc.NewBlockchainReactor(state, blockStore, config.App().GetBool("FastSync"))
|
||||
|
||||
// Get MempoolReactor
|
||||
mempool := mempl.NewMempool(state.Copy())
|
||||
mempoolReactor := mempl.NewMempoolReactor(mempool)
|
||||
|
@ -64,14 +68,23 @@ func NewNode() *Node {
|
|||
consensusReactor.SetPrivValidator(privValidator)
|
||||
}
|
||||
|
||||
sw := p2p.NewSwitch([]p2p.Reactor{pexReactor, mempoolReactor, consensusReactor})
|
||||
sw.SetChainId(state.Hash(), config.App().GetString("Network"))
|
||||
sw := p2p.NewSwitch()
|
||||
sw.SetNetwork(config.App().GetString("Network"))
|
||||
sw.AddReactor("PEX", pexReactor).Start(sw)
|
||||
sw.AddReactor("MEMPOOL", mempoolReactor).Start(sw)
|
||||
sw.AddReactor("BLOCKCHAIN", bcReactor).Start(sw)
|
||||
if !config.App().GetBool("FastSync") {
|
||||
sw.AddReactor("CONSENSUS", consensusReactor).Start(sw)
|
||||
} else {
|
||||
sw.AddReactor("CONSENSUS", consensusReactor)
|
||||
}
|
||||
|
||||
return &Node{
|
||||
sw: sw,
|
||||
book: book,
|
||||
pexReactor: pexReactor,
|
||||
blockStore: blockStore,
|
||||
pexReactor: pexReactor,
|
||||
bcReactor: bcReactor,
|
||||
mempoolReactor: mempoolReactor,
|
||||
consensusState: consensusState,
|
||||
consensusReactor: consensusReactor,
|
||||
|
@ -85,7 +98,7 @@ func (n *Node) Start() {
|
|||
go n.inboundConnectionRoutine(l)
|
||||
}
|
||||
n.book.Start()
|
||||
n.sw.Start()
|
||||
//n.sw.StartReactors()
|
||||
}
|
||||
|
||||
func (n *Node) Stop() {
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
MempoolCh = byte(0x30)
|
||||
MempoolChannel = byte(0x30)
|
||||
)
|
||||
|
||||
// MempoolReactor handles mempool tx broadcasting amongst peers.
|
||||
|
@ -52,7 +52,7 @@ func (memR *MempoolReactor) Stop() {
|
|||
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
return []*p2p.ChannelDescriptor{
|
||||
&p2p.ChannelDescriptor{
|
||||
Id: MempoolCh,
|
||||
Id: MempoolChannel,
|
||||
Priority: 5,
|
||||
},
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
|
|||
if peer.Key == src.Key {
|
||||
continue
|
||||
}
|
||||
peer.TrySend(MempoolCh, msg)
|
||||
peer.TrySend(MempoolChannel, msg)
|
||||
}
|
||||
|
||||
default:
|
||||
|
@ -106,7 +106,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
|
|||
return err
|
||||
}
|
||||
msg := &TxMessage{Tx: tx}
|
||||
memR.sw.Broadcast(MempoolCh, msg)
|
||||
memR.sw.Broadcast(MempoolChannel, msg)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -381,7 +381,7 @@ out:
|
|||
for {
|
||||
select {
|
||||
case <-dumpAddressTicker.C:
|
||||
log.Debug("Saving book to file", "size", a.Size())
|
||||
log.Debug("Saving AddrBook to file", "size", a.Size())
|
||||
a.saveToFile(a.filePath)
|
||||
case <-a.quit:
|
||||
break out
|
||||
|
|
|
@ -50,8 +50,9 @@ There are two methods for sending messages:
|
|||
func (m MConnection) TrySend(chId byte, msg interface{}) bool {}
|
||||
|
||||
`Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued
|
||||
for the channel with the given id byte `chId`. The message `msg` is serialized
|
||||
using the `tendermint/binary` submodule's `WriteBinary()` reflection routine.
|
||||
for the channel with the given id byte `chId`, or until the request times out.
|
||||
The message `msg` is serialized using the `tendermint/binary` submodule's
|
||||
`WriteBinary()` reflection routine.
|
||||
|
||||
`TrySend(chId, msg)` is a nonblocking call that returns false if the channel's
|
||||
queue is full.
|
||||
|
@ -416,6 +417,7 @@ FOR_LOOP:
|
|||
}
|
||||
msgBytes := channel.recvMsgPacket(pkt)
|
||||
if msgBytes != nil {
|
||||
log.Debug("Received bytes", "chId", pkt.ChannelId, "msgBytes", msgBytes)
|
||||
c.onReceive(pkt.ChannelId, msgBytes)
|
||||
}
|
||||
default:
|
||||
|
@ -437,8 +439,19 @@ FOR_LOOP:
|
|||
//-----------------------------------------------------------------------------
|
||||
|
||||
type ChannelDescriptor struct {
|
||||
Id byte
|
||||
Priority uint
|
||||
Id byte
|
||||
Priority uint
|
||||
SendQueueCapacity uint
|
||||
RecvBufferCapacity uint
|
||||
}
|
||||
|
||||
func (chDesc *ChannelDescriptor) FillDefaults() {
|
||||
if chDesc.SendQueueCapacity == 0 {
|
||||
chDesc.SendQueueCapacity = defaultSendQueueCapacity
|
||||
}
|
||||
if chDesc.RecvBufferCapacity == 0 {
|
||||
chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: lowercase.
|
||||
|
@ -448,7 +461,7 @@ type Channel struct {
|
|||
desc *ChannelDescriptor
|
||||
id byte
|
||||
sendQueue chan []byte
|
||||
sendQueueSize uint32
|
||||
sendQueueSize uint32 // atomic.
|
||||
recving []byte
|
||||
sending []byte
|
||||
priority uint
|
||||
|
@ -456,6 +469,7 @@ type Channel struct {
|
|||
}
|
||||
|
||||
func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
|
||||
desc.FillDefaults()
|
||||
if desc.Priority <= 0 {
|
||||
panic("Channel default priority must be a postive integer")
|
||||
}
|
||||
|
@ -463,8 +477,8 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
|
|||
conn: conn,
|
||||
desc: desc,
|
||||
id: desc.Id,
|
||||
sendQueue: make(chan []byte, defaultSendQueueCapacity),
|
||||
recving: make([]byte, 0, defaultRecvBufferCapacity),
|
||||
sendQueue: make(chan []byte, desc.SendQueueCapacity),
|
||||
recving: make([]byte, 0, desc.RecvBufferCapacity),
|
||||
priority: desc.Priority,
|
||||
}
|
||||
}
|
||||
|
|
19
p2p/peer.go
19
p2p/peer.go
|
@ -13,8 +13,7 @@ import (
|
|||
type Peer struct {
|
||||
outbound bool
|
||||
mconn *MConnection
|
||||
started uint32
|
||||
stopped uint32
|
||||
running uint32
|
||||
|
||||
Key string
|
||||
Data *CMap // User data.
|
||||
|
@ -37,7 +36,7 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc
|
|||
p = &Peer{
|
||||
outbound: outbound,
|
||||
mconn: mconn,
|
||||
stopped: 0,
|
||||
running: 0,
|
||||
Key: mconn.RemoteAddress.String(),
|
||||
Data: NewCMap(),
|
||||
}
|
||||
|
@ -45,21 +44,21 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc
|
|||
}
|
||||
|
||||
func (p *Peer) start() {
|
||||
if atomic.CompareAndSwapUint32(&p.started, 0, 1) {
|
||||
if atomic.CompareAndSwapUint32(&p.running, 0, 1) {
|
||||
log.Debug("Starting Peer", "peer", p)
|
||||
p.mconn.Start()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) stop() {
|
||||
if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
|
||||
if atomic.CompareAndSwapUint32(&p.running, 1, 0) {
|
||||
log.Debug("Stopping Peer", "peer", p)
|
||||
p.mconn.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) IsStopped() bool {
|
||||
return atomic.LoadUint32(&p.stopped) == 1
|
||||
func (p *Peer) IsRunning() bool {
|
||||
return atomic.LoadUint32(&p.running) == 1
|
||||
}
|
||||
|
||||
func (p *Peer) Connection() *MConnection {
|
||||
|
@ -71,21 +70,21 @@ func (p *Peer) IsOutbound() bool {
|
|||
}
|
||||
|
||||
func (p *Peer) Send(chId byte, msg interface{}) bool {
|
||||
if atomic.LoadUint32(&p.stopped) == 1 {
|
||||
if atomic.LoadUint32(&p.running) == 0 {
|
||||
return false
|
||||
}
|
||||
return p.mconn.Send(chId, msg)
|
||||
}
|
||||
|
||||
func (p *Peer) TrySend(chId byte, msg interface{}) bool {
|
||||
if atomic.LoadUint32(&p.stopped) == 1 {
|
||||
if atomic.LoadUint32(&p.running) == 0 {
|
||||
return false
|
||||
}
|
||||
return p.mconn.TrySend(chId, msg)
|
||||
}
|
||||
|
||||
func (p *Peer) CanSend(chId byte) bool {
|
||||
if atomic.LoadUint32(&p.stopped) == 1 {
|
||||
if atomic.LoadUint32(&p.running) == 0 {
|
||||
return false
|
||||
}
|
||||
return p.mconn.CanSend(chId)
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
// IPeerSet has a (immutable) subset of the methods of PeerSet.
|
||||
type IPeerSet interface {
|
||||
Has(key string) bool
|
||||
Get(key string) *Peer
|
||||
List() []*Peer
|
||||
Size() int
|
||||
}
|
||||
|
@ -55,6 +56,17 @@ func (ps *PeerSet) Has(peerKey string) bool {
|
|||
return ok
|
||||
}
|
||||
|
||||
func (ps *PeerSet) Get(peerKey string) *Peer {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
item, ok := ps.lookup[peerKey]
|
||||
if ok {
|
||||
return item.peer
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *PeerSet) Remove(peer *Peer) {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
var pexErrInvalidMessage = errors.New("Invalid PEX message")
|
||||
|
||||
const (
|
||||
PexCh = byte(0x00)
|
||||
PexChannel = byte(0x00)
|
||||
ensurePeersPeriodSeconds = 30
|
||||
minNumOutboundPeers = 10
|
||||
maxNumPeers = 50
|
||||
|
@ -62,8 +62,9 @@ func (pexR *PEXReactor) Stop() {
|
|||
func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor {
|
||||
return []*ChannelDescriptor{
|
||||
&ChannelDescriptor{
|
||||
Id: PexCh,
|
||||
Priority: 1,
|
||||
Id: PexChannel,
|
||||
Priority: 1,
|
||||
SendQueueCapacity: 10,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -97,9 +98,9 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
|
|||
|
||||
switch msg.(type) {
|
||||
case *pexHandshakeMessage:
|
||||
chainId := msg.(*pexHandshakeMessage).ChainId
|
||||
if chainId != pexR.sw.chainId {
|
||||
err := fmt.Sprintf("Peer is on a different chain/network. Got %s, expected %s", chainId, pexR.sw.chainId)
|
||||
network := msg.(*pexHandshakeMessage).Network
|
||||
if network != pexR.sw.network {
|
||||
err := fmt.Sprintf("Peer is on a different chain/network. Got %s, expected %s", network, pexR.sw.network)
|
||||
pexR.sw.StopPeerForError(src, err)
|
||||
}
|
||||
case *pexRequestMessage:
|
||||
|
@ -122,11 +123,11 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
|
|||
|
||||
// Asks peer for more addresses.
|
||||
func (pexR *PEXReactor) RequestPEX(peer *Peer) {
|
||||
peer.Send(PexCh, &pexRequestMessage{})
|
||||
peer.Send(PexChannel, &pexRequestMessage{})
|
||||
}
|
||||
|
||||
func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) {
|
||||
peer.Send(PexCh, &pexAddrsMessage{Addrs: addrs})
|
||||
peer.Send(PexChannel, &pexAddrsMessage{Addrs: addrs})
|
||||
}
|
||||
|
||||
// Ensures that sufficient peers are connected. (continuous)
|
||||
|
@ -175,10 +176,12 @@ func (pexR *PEXReactor) ensurePeers() {
|
|||
alreadyDialing := pexR.sw.IsDialing(try)
|
||||
alreadyConnected := pexR.sw.Peers().Has(try.String())
|
||||
if alreadySelected || alreadyDialing || alreadyConnected {
|
||||
log.Debug("Cannot dial address", "addr", try,
|
||||
"alreadySelected", alreadySelected,
|
||||
"alreadyDialing", alreadyDialing,
|
||||
"alreadyConnected", alreadyConnected)
|
||||
/*
|
||||
log.Debug("Cannot dial address", "addr", try,
|
||||
"alreadySelected", alreadySelected,
|
||||
"alreadyDialing", alreadyDialing,
|
||||
"alreadyConnected", alreadyConnected)
|
||||
*/
|
||||
continue
|
||||
} else {
|
||||
log.Debug("Will dial address", "addr", try)
|
||||
|
@ -237,7 +240,7 @@ func DecodeMessage(bz []byte) (msg interface{}, err error) {
|
|||
A pexHandshakeMessage contains the peer's chainId
|
||||
*/
|
||||
type pexHandshakeMessage struct {
|
||||
ChainId string
|
||||
Network string
|
||||
}
|
||||
|
||||
func (m *pexHandshakeMessage) TypeByte() byte { return msgTypeHandshake }
|
||||
|
|
129
p2p/switch.go
129
p2p/switch.go
|
@ -1,11 +1,9 @@
|
|||
package p2p
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
. "github.com/tendermint/tendermint/common"
|
||||
|
@ -29,20 +27,16 @@ or more `Channels`. So while sending outgoing messages is typically performed o
|
|||
incoming messages are received on the reactor.
|
||||
*/
|
||||
type Switch struct {
|
||||
reactors []Reactor
|
||||
network string
|
||||
reactors map[string]Reactor
|
||||
chDescs []*ChannelDescriptor
|
||||
reactorsByCh map[byte]Reactor
|
||||
peers *PeerSet
|
||||
dialing *CMap
|
||||
listeners *CMap // listenerName -> chan interface{}
|
||||
quit chan struct{}
|
||||
started uint32
|
||||
stopped uint32
|
||||
chainId string
|
||||
}
|
||||
|
||||
var (
|
||||
ErrSwitchStopped = errors.New("Switch already stopped")
|
||||
ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
|
||||
)
|
||||
|
||||
|
@ -50,71 +44,83 @@ const (
|
|||
peerDialTimeoutSeconds = 3
|
||||
)
|
||||
|
||||
func NewSwitch(reactors []Reactor) *Switch {
|
||||
|
||||
// Validate the reactors. no two reactors can share the same channel.
|
||||
chDescs := []*ChannelDescriptor{}
|
||||
reactorsByCh := make(map[byte]Reactor)
|
||||
for _, reactor := range reactors {
|
||||
reactorChannels := reactor.GetChannels()
|
||||
for _, chDesc := range reactorChannels {
|
||||
chId := chDesc.Id
|
||||
if reactorsByCh[chId] != nil {
|
||||
panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, reactorsByCh[chId], reactor))
|
||||
}
|
||||
chDescs = append(chDescs, chDesc)
|
||||
reactorsByCh[chId] = reactor
|
||||
}
|
||||
}
|
||||
func NewSwitch() *Switch {
|
||||
|
||||
sw := &Switch{
|
||||
reactors: reactors,
|
||||
chDescs: chDescs,
|
||||
reactorsByCh: reactorsByCh,
|
||||
network: "",
|
||||
reactors: make(map[string]Reactor),
|
||||
chDescs: make([]*ChannelDescriptor, 0),
|
||||
reactorsByCh: make(map[byte]Reactor),
|
||||
peers: NewPeerSet(),
|
||||
dialing: NewCMap(),
|
||||
listeners: NewCMap(),
|
||||
quit: make(chan struct{}),
|
||||
stopped: 0,
|
||||
}
|
||||
|
||||
return sw
|
||||
}
|
||||
|
||||
func (sw *Switch) Start() {
|
||||
if atomic.CompareAndSwapUint32(&sw.started, 0, 1) {
|
||||
log.Info("Starting Switch")
|
||||
for _, reactor := range sw.reactors {
|
||||
reactor.Start(sw)
|
||||
// Not goroutine safe.
|
||||
func (sw *Switch) SetNetwork(network string) {
|
||||
sw.network = network
|
||||
}
|
||||
|
||||
// Not goroutine safe.
|
||||
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
|
||||
// Validate the reactor.
|
||||
// No two reactors can share the same channel.
|
||||
reactorChannels := reactor.GetChannels()
|
||||
for _, chDesc := range reactorChannels {
|
||||
chId := chDesc.Id
|
||||
if sw.reactorsByCh[chId] != nil {
|
||||
panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, sw.reactorsByCh[chId], reactor))
|
||||
}
|
||||
sw.chDescs = append(sw.chDescs, chDesc)
|
||||
sw.reactorsByCh[chId] = reactor
|
||||
}
|
||||
sw.reactors[name] = reactor
|
||||
return reactor
|
||||
}
|
||||
|
||||
func (sw *Switch) Reactor(name string) Reactor {
|
||||
return sw.reactors[name]
|
||||
}
|
||||
|
||||
// Convenience function
|
||||
func (sw *Switch) StartReactors() {
|
||||
for _, reactor := range sw.reactors {
|
||||
reactor.Start(sw)
|
||||
}
|
||||
}
|
||||
|
||||
// Convenience function
|
||||
func (sw *Switch) StopReactors() {
|
||||
// Stop all reactors.
|
||||
for _, reactor := range sw.reactors {
|
||||
reactor.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Convenience function
|
||||
func (sw *Switch) StopPeers() {
|
||||
// Stop each peer.
|
||||
for _, peer := range sw.peers.List() {
|
||||
peer.stop()
|
||||
}
|
||||
sw.peers = NewPeerSet()
|
||||
}
|
||||
|
||||
// Convenience function
|
||||
func (sw *Switch) Stop() {
|
||||
if atomic.CompareAndSwapUint32(&sw.stopped, 0, 1) {
|
||||
log.Info("Stopping Switch")
|
||||
close(sw.quit)
|
||||
// Stop each peer.
|
||||
for _, peer := range sw.peers.List() {
|
||||
peer.stop()
|
||||
}
|
||||
sw.peers = NewPeerSet()
|
||||
// Stop all reactors.
|
||||
for _, reactor := range sw.reactors {
|
||||
reactor.Stop()
|
||||
}
|
||||
}
|
||||
sw.StopPeers()
|
||||
sw.StopReactors()
|
||||
}
|
||||
|
||||
func (sw *Switch) Reactors() []Reactor {
|
||||
// Not goroutine safe to modify.
|
||||
func (sw *Switch) Reactors() map[string]Reactor {
|
||||
return sw.reactors
|
||||
}
|
||||
|
||||
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
|
||||
if atomic.LoadUint32(&sw.stopped) == 1 {
|
||||
return nil, ErrSwitchStopped
|
||||
}
|
||||
|
||||
peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
|
||||
|
||||
// Add the peer to .peers
|
||||
|
@ -126,23 +132,19 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er
|
|||
}
|
||||
|
||||
// Start the peer
|
||||
go peer.start()
|
||||
peer.start()
|
||||
|
||||
// Notify listeners.
|
||||
sw.doAddPeer(peer)
|
||||
|
||||
// Send handshake
|
||||
msg := &pexHandshakeMessage{ChainId: sw.chainId}
|
||||
peer.Send(PexCh, msg)
|
||||
msg := &pexHandshakeMessage{Network: sw.network}
|
||||
peer.Send(PexChannel, msg)
|
||||
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
|
||||
if atomic.LoadUint32(&sw.stopped) == 1 {
|
||||
return nil, ErrSwitchStopped
|
||||
}
|
||||
|
||||
log.Debug("Dialing address", "address", addr)
|
||||
sw.dialing.Set(addr.String(), addr)
|
||||
conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
|
||||
|
@ -164,13 +166,10 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool {
|
|||
return sw.dialing.Has(addr.String())
|
||||
}
|
||||
|
||||
// Broadcast runs a go routine for each attemptted send, which will block
|
||||
// Broadcast runs a go routine for each attempted send, which will block
|
||||
// trying to send for defaultSendTimeoutSeconds. Returns a channel
|
||||
// which receives success values for each attempted send (false if times out)
|
||||
func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool {
|
||||
if atomic.LoadUint32(&sw.stopped) == 1 {
|
||||
return nil
|
||||
}
|
||||
successChan := make(chan bool, len(sw.peers.List()))
|
||||
log.Debug("Broadcast", "channel", chId, "msg", msg)
|
||||
for _, peer := range sw.peers.List() {
|
||||
|
@ -223,10 +222,6 @@ func (sw *Switch) StopPeerGracefully(peer *Peer) {
|
|||
sw.doRemovePeer(peer, nil)
|
||||
}
|
||||
|
||||
func (sw *Switch) SetChainId(hash []byte, network string) {
|
||||
sw.chainId = hex.EncodeToString(hash) + "-" + network
|
||||
}
|
||||
|
||||
func (sw *Switch) IsListening() bool {
|
||||
return sw.listeners.Size() > 0
|
||||
}
|
||||
|
|
|
@ -69,11 +69,11 @@ func (tr *TestReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {
|
|||
//-----------------------------------------------------------------------------
|
||||
|
||||
// convenience method for creating two switches connected to each other.
|
||||
func makeSwitchPair(t testing.TB, reactorsGenerator func() []Reactor) (*Switch, *Switch) {
|
||||
func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *Switch) {
|
||||
|
||||
// Create two switches that will be interconnected.
|
||||
s1 := NewSwitch(reactorsGenerator())
|
||||
s2 := NewSwitch(reactorsGenerator())
|
||||
s1 := initSwitch(NewSwitch())
|
||||
s2 := initSwitch(NewSwitch())
|
||||
|
||||
// Create a listener for s1
|
||||
l := NewDefaultListener("tcp", ":8001", true)
|
||||
|
@ -104,18 +104,17 @@ func makeSwitchPair(t testing.TB, reactorsGenerator func() []Reactor) (*Switch,
|
|||
}
|
||||
|
||||
func TestSwitches(t *testing.T) {
|
||||
s1, s2 := makeSwitchPair(t, func() []Reactor {
|
||||
s1, s2 := makeSwitchPair(t, func(sw *Switch) *Switch {
|
||||
// Make two reactors of two channels each
|
||||
reactors := make([]Reactor, 2)
|
||||
reactors[0] = NewTestReactor([]*ChannelDescriptor{
|
||||
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
|
||||
&ChannelDescriptor{Id: byte(0x00), Priority: 10},
|
||||
&ChannelDescriptor{Id: byte(0x01), Priority: 10},
|
||||
}, true)
|
||||
reactors[1] = NewTestReactor([]*ChannelDescriptor{
|
||||
}, true)).Start(sw) // Start the reactor
|
||||
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
|
||||
&ChannelDescriptor{Id: byte(0x02), Priority: 10},
|
||||
&ChannelDescriptor{Id: byte(0x03), Priority: 10},
|
||||
}, true)
|
||||
return reactors
|
||||
}, true)).Start(sw) // Start the reactor
|
||||
return sw
|
||||
})
|
||||
defer s1.Stop()
|
||||
defer s2.Stop()
|
||||
|
@ -129,8 +128,8 @@ func TestSwitches(t *testing.T) {
|
|||
}
|
||||
|
||||
ch0Msg := "channel zero"
|
||||
ch1Msg := "channel one"
|
||||
ch2Msg := "channel two"
|
||||
ch1Msg := "channel foo"
|
||||
ch2Msg := "channel bar"
|
||||
|
||||
s1.Broadcast(byte(0x00), ch0Msg)
|
||||
s1.Broadcast(byte(0x01), ch1Msg)
|
||||
|
@ -140,7 +139,7 @@ func TestSwitches(t *testing.T) {
|
|||
time.Sleep(5000 * time.Millisecond)
|
||||
|
||||
// Check message on ch0
|
||||
ch0Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x00)]
|
||||
ch0Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x00)]
|
||||
if len(ch0Msgs) != 2 {
|
||||
t.Errorf("Expected to have received 1 message in ch0")
|
||||
}
|
||||
|
@ -149,7 +148,7 @@ func TestSwitches(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check message on ch1
|
||||
ch1Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x01)]
|
||||
ch1Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x01)]
|
||||
if len(ch1Msgs) != 1 {
|
||||
t.Errorf("Expected to have received 1 message in ch1")
|
||||
}
|
||||
|
@ -158,7 +157,7 @@ func TestSwitches(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check message on ch2
|
||||
ch2Msgs := s2.Reactors()[1].(*TestReactor).msgsReceived[byte(0x02)]
|
||||
ch2Msgs := s2.Reactor("bar").(*TestReactor).msgsReceived[byte(0x02)]
|
||||
if len(ch2Msgs) != 1 {
|
||||
t.Errorf("Expected to have received 1 message in ch2")
|
||||
}
|
||||
|
@ -172,18 +171,17 @@ func BenchmarkSwitches(b *testing.B) {
|
|||
|
||||
b.StopTimer()
|
||||
|
||||
s1, s2 := makeSwitchPair(b, func() []Reactor {
|
||||
// Make two reactors of two channels each
|
||||
reactors := make([]Reactor, 2)
|
||||
reactors[0] = NewTestReactor([]*ChannelDescriptor{
|
||||
s1, s2 := makeSwitchPair(b, func(sw *Switch) *Switch {
|
||||
// Make bar reactors of bar channels each
|
||||
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
|
||||
&ChannelDescriptor{Id: byte(0x00), Priority: 10},
|
||||
&ChannelDescriptor{Id: byte(0x01), Priority: 10},
|
||||
}, false)
|
||||
reactors[1] = NewTestReactor([]*ChannelDescriptor{
|
||||
}, false))
|
||||
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
|
||||
&ChannelDescriptor{Id: byte(0x02), Priority: 10},
|
||||
&ChannelDescriptor{Id: byte(0x03), Priority: 10},
|
||||
}, false)
|
||||
return reactors
|
||||
}, false))
|
||||
return sw
|
||||
})
|
||||
defer s1.Stop()
|
||||
defer s2.Stop()
|
||||
|
@ -194,7 +192,7 @@ func BenchmarkSwitches(b *testing.B) {
|
|||
|
||||
numSuccess, numFailure := 0, 0
|
||||
|
||||
// Send random message from one channel to another
|
||||
// Send random message from foo channel to another
|
||||
for i := 0; i < b.N; i++ {
|
||||
chId := byte(i % 4)
|
||||
successChan := s1.Broadcast(chId, "test data")
|
||||
|
|
|
@ -1,18 +1,18 @@
|
|||
package rpc
|
||||
|
||||
import (
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
"github.com/tendermint/tendermint/consensus"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var blockStore *types.BlockStore
|
||||
var blockStore *bc.BlockStore
|
||||
var consensusState *consensus.ConsensusState
|
||||
var mempoolReactor *mempl.MempoolReactor
|
||||
var p2pSwitch *p2p.Switch
|
||||
|
||||
func SetRPCBlockStore(bs *types.BlockStore) {
|
||||
func SetRPCBlockStore(bs *bc.BlockStore) {
|
||||
blockStore = bs
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ package state
|
|||
|
||||
import (
|
||||
"github.com/tendermint/tendermint/account"
|
||||
"github.com/tendermint/tendermint/binary"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
|
||||
|
@ -91,7 +90,7 @@ func TestGenesisSaveLoad(t *testing.T) {
|
|||
|
||||
// Make complete block and blockParts
|
||||
block := makeBlock(t, s0, nil, nil)
|
||||
blockParts := types.NewPartSetFromData(binary.BinaryBytes(block))
|
||||
blockParts := block.MakePartSet()
|
||||
|
||||
// Now append the block to s0.
|
||||
err := s0.AppendBlock(block, blockParts.Header())
|
||||
|
@ -338,7 +337,7 @@ func TestAddValidator(t *testing.T) {
|
|||
|
||||
// Make complete block and blockParts
|
||||
block0 := makeBlock(t, s0, nil, []types.Tx{bondTx})
|
||||
block0Parts := types.NewPartSetFromData(binary.BinaryBytes(block0))
|
||||
block0Parts := block0.MakePartSet()
|
||||
|
||||
// Sanity check
|
||||
if s0.BondedValidators.Size() != 1 {
|
||||
|
@ -379,7 +378,7 @@ func TestAddValidator(t *testing.T) {
|
|||
},
|
||||
}, nil,
|
||||
)
|
||||
block1Parts := types.NewPartSetFromData(binary.BinaryBytes(block1))
|
||||
block1Parts := block1.MakePartSet()
|
||||
err = s0.AppendBlock(block1, block1Parts.Header())
|
||||
if err != nil {
|
||||
t.Error("Error appending secondary block:", err)
|
||||
|
|
|
@ -2,12 +2,15 @@ package state
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/tendermint/tendermint/account"
|
||||
. "github.com/tendermint/tendermint/common"
|
||||
"github.com/tendermint/tendermint/merkle"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// ValidatorSet represent a set of *Validator at a given height.
|
||||
|
@ -198,6 +201,50 @@ func (valSet *ValidatorSet) Iterate(fn func(index uint, val *Validator) bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// Verify that +2/3 of the set had signed the given signBytes
|
||||
func (valSet *ValidatorSet) VerifyValidation(hash []byte, parts types.PartSetHeader, height uint, v *types.Validation) error {
|
||||
if valSet.Size() != uint(len(v.Commits)) {
|
||||
return errors.New(Fmt("Invalid validation -- wrong set size: %v vs %v",
|
||||
valSet.Size(), len(v.Commits)))
|
||||
}
|
||||
|
||||
talliedVotingPower := uint64(0)
|
||||
seenValidators := map[string]struct{}{}
|
||||
|
||||
for idx, commit := range v.Commits {
|
||||
// may be zero, in which case skip.
|
||||
if commit.Signature.IsZero() {
|
||||
continue
|
||||
}
|
||||
_, val := valSet.GetByIndex(uint(idx))
|
||||
commitSignBytes := account.SignBytes(&types.Vote{
|
||||
Height: height, Round: commit.Round, Type: types.VoteTypeCommit,
|
||||
BlockHash: hash,
|
||||
BlockParts: parts,
|
||||
})
|
||||
|
||||
// Validate
|
||||
if _, seen := seenValidators[string(val.Address)]; seen {
|
||||
return Errorf("Duplicate validator for commit %v for Validation %v", commit, v)
|
||||
}
|
||||
|
||||
if !val.PubKey.VerifyBytes(commitSignBytes, commit.Signature) {
|
||||
return Errorf("Invalid signature for commit %v for Validation %v", commit, v)
|
||||
}
|
||||
|
||||
// Tally
|
||||
seenValidators[string(val.Address)] = struct{}{}
|
||||
talliedVotingPower += val.VotingPower
|
||||
}
|
||||
|
||||
if talliedVotingPower > valSet.TotalVotingPower()*2/3 {
|
||||
return nil
|
||||
} else {
|
||||
return Errorf("insufficient voting power %v, needed %v",
|
||||
talliedVotingPower, (valSet.TotalVotingPower()*2/3 + 1))
|
||||
}
|
||||
}
|
||||
|
||||
func (valSet *ValidatorSet) String() string {
|
||||
return valSet.StringIndented("")
|
||||
}
|
||||
|
|
|
@ -39,7 +39,9 @@ func (b *Block) ValidateBasic(lastBlockHeight uint, lastBlockHash []byte,
|
|||
if !b.LastBlockParts.Equals(lastBlockParts) {
|
||||
return errors.New("Wrong Block.Header.LastBlockParts")
|
||||
}
|
||||
/* TODO: Determine bounds.
|
||||
/* TODO: Determine bounds
|
||||
See blockchain/reactor "stopSyncingDurationMinutes"
|
||||
|
||||
if !b.Time.After(lastBlockTime) {
|
||||
return errors.New("Invalid Block.Header.Time")
|
||||
}
|
||||
|
@ -66,6 +68,10 @@ func (b *Block) Hash() []byte {
|
|||
return merkle.HashFromHashes(hashes)
|
||||
}
|
||||
|
||||
func (b *Block) MakePartSet() *PartSet {
|
||||
return NewPartSetFromData(binary.BinaryBytes(b))
|
||||
}
|
||||
|
||||
// Convenience.
|
||||
// A nil block never hashes to anything.
|
||||
// Nothing hashes to a nil hash.
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
package types
|
||||
|
||||
type BlockMeta struct {
|
||||
Hash []byte // The block hash
|
||||
Header *Header // The block's Header
|
||||
Parts PartSetHeader // The PartSetHeader, for transfer
|
||||
}
|
||||
|
||||
func NewBlockMeta(block *Block, blockParts *PartSet) *BlockMeta {
|
||||
return &BlockMeta{
|
||||
Hash: block.Hash(),
|
||||
Header: block.Header,
|
||||
Parts: blockParts.Header(),
|
||||
}
|
||||
}
|
247
types/store.go
247
types/store.go
|
@ -1,247 +0,0 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/tendermint/tendermint/binary"
|
||||
. "github.com/tendermint/tendermint/common"
|
||||
dbm "github.com/tendermint/tendermint/db"
|
||||
)
|
||||
|
||||
/*
|
||||
Simple low level store for blocks.
|
||||
|
||||
There are three types of information stored:
|
||||
- BlockMeta: Meta information about each block
|
||||
- Block part: Parts of each block, aggregated w/ PartSet
|
||||
- Validation: The Validation part of each block, for gossiping commit votes
|
||||
|
||||
Currently the commit signatures are duplicated in the Block parts as
|
||||
well as the Validation. In the future this may change, perhaps by moving
|
||||
the Validation data outside the Block.
|
||||
*/
|
||||
type BlockStore struct {
|
||||
height uint
|
||||
db dbm.DB
|
||||
}
|
||||
|
||||
func NewBlockStore(db dbm.DB) *BlockStore {
|
||||
bsjson := LoadBlockStoreStateJSON(db)
|
||||
return &BlockStore{
|
||||
height: bsjson.Height,
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
// Height() returns the last known contiguous block height.
|
||||
func (bs *BlockStore) Height() uint {
|
||||
return bs.height
|
||||
}
|
||||
|
||||
func (bs *BlockStore) GetReader(key []byte) io.Reader {
|
||||
bytez := bs.db.Get(key)
|
||||
if bytez == nil {
|
||||
return nil
|
||||
}
|
||||
return bytes.NewReader(bytez)
|
||||
}
|
||||
|
||||
func (bs *BlockStore) LoadBlock(height uint) *Block {
|
||||
var n int64
|
||||
var err error
|
||||
r := bs.GetReader(calcBlockMetaKey(height))
|
||||
if r == nil {
|
||||
panic(Fmt("Block does not exist at height %v", height))
|
||||
}
|
||||
meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta)
|
||||
if err != nil {
|
||||
panic(Fmt("Error reading block meta: %v", err))
|
||||
}
|
||||
bytez := []byte{}
|
||||
for i := uint(0); i < meta.Parts.Total; i++ {
|
||||
part := bs.LoadBlockPart(height, i)
|
||||
bytez = append(bytez, part.Bytes...)
|
||||
}
|
||||
block := binary.ReadBinary(&Block{}, bytes.NewReader(bytez), &n, &err).(*Block)
|
||||
if err != nil {
|
||||
panic(Fmt("Error reading block: %v", err))
|
||||
}
|
||||
return block
|
||||
}
|
||||
|
||||
func (bs *BlockStore) LoadBlockPart(height uint, index uint) *Part {
|
||||
var n int64
|
||||
var err error
|
||||
r := bs.GetReader(calcBlockPartKey(height, index))
|
||||
if r == nil {
|
||||
panic(Fmt("BlockPart does not exist for height %v index %v", height, index))
|
||||
}
|
||||
part := binary.ReadBinary(&Part{}, r, &n, &err).(*Part)
|
||||
if err != nil {
|
||||
panic(Fmt("Error reading block part: %v", err))
|
||||
}
|
||||
return part
|
||||
}
|
||||
|
||||
func (bs *BlockStore) LoadBlockMeta(height uint) *BlockMeta {
|
||||
var n int64
|
||||
var err error
|
||||
r := bs.GetReader(calcBlockMetaKey(height))
|
||||
if r == nil {
|
||||
panic(Fmt("BlockMeta does not exist for height %v", height))
|
||||
}
|
||||
meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta)
|
||||
if err != nil {
|
||||
panic(Fmt("Error reading block meta: %v", err))
|
||||
}
|
||||
return meta
|
||||
}
|
||||
|
||||
// NOTE: the Commit-vote heights are for the block at `height-1`
|
||||
// Since these are included in the subsequent block, the height
|
||||
// is off by 1.
|
||||
func (bs *BlockStore) LoadBlockValidation(height uint) *Validation {
|
||||
var n int64
|
||||
var err error
|
||||
r := bs.GetReader(calcBlockValidationKey(height))
|
||||
if r == nil {
|
||||
panic(Fmt("BlockValidation does not exist for height %v", height))
|
||||
}
|
||||
validation := binary.ReadBinary(&Validation{}, r, &n, &err).(*Validation)
|
||||
if err != nil {
|
||||
panic(Fmt("Error reading validation: %v", err))
|
||||
}
|
||||
return validation
|
||||
}
|
||||
|
||||
// NOTE: the Commit-vote heights are for the block at `height`
|
||||
func (bs *BlockStore) LoadSeenValidation(height uint) *Validation {
|
||||
var n int64
|
||||
var err error
|
||||
r := bs.GetReader(calcSeenValidationKey(height))
|
||||
if r == nil {
|
||||
panic(Fmt("SeenValidation does not exist for height %v", height))
|
||||
}
|
||||
validation := binary.ReadBinary(&Validation{}, r, &n, &err).(*Validation)
|
||||
if err != nil {
|
||||
panic(Fmt("Error reading validation: %v", err))
|
||||
}
|
||||
return validation
|
||||
}
|
||||
|
||||
// blockParts: Must be parts of the block
|
||||
// seenValidation: The +2/3 commits that were seen which finalized the height.
|
||||
// If all the nodes restart after committing a block,
|
||||
// we need this to reload the commits to catch-up nodes to the
|
||||
// most recent height. Otherwise they'd stall at H-1.
|
||||
// Also good to have to debug consensus issues & punish wrong-signers
|
||||
// whose commits weren't included in the block.
|
||||
func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet, seenValidation *Validation) {
|
||||
height := block.Height
|
||||
if height != bs.height+1 {
|
||||
panic(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height))
|
||||
}
|
||||
if !blockParts.IsComplete() {
|
||||
panic(Fmt("BlockStore can only save complete block part sets"))
|
||||
}
|
||||
|
||||
// Save block meta
|
||||
meta := makeBlockMeta(block, blockParts)
|
||||
metaBytes := binary.BinaryBytes(meta)
|
||||
bs.db.Set(calcBlockMetaKey(height), metaBytes)
|
||||
|
||||
// Save block parts
|
||||
for i := uint(0); i < blockParts.Total(); i++ {
|
||||
bs.saveBlockPart(height, i, blockParts.GetPart(i))
|
||||
}
|
||||
|
||||
// Save block validation (duplicate and separate from the Block)
|
||||
blockValidationBytes := binary.BinaryBytes(block.Validation)
|
||||
bs.db.Set(calcBlockValidationKey(height), blockValidationBytes)
|
||||
|
||||
// Save seen validation (seen +2/3 commits)
|
||||
seenValidationBytes := binary.BinaryBytes(seenValidation)
|
||||
bs.db.Set(calcSeenValidationKey(height), seenValidationBytes)
|
||||
|
||||
// Save new BlockStoreStateJSON descriptor
|
||||
BlockStoreStateJSON{Height: height}.Save(bs.db)
|
||||
|
||||
// Done!
|
||||
bs.height = height
|
||||
}
|
||||
|
||||
func (bs *BlockStore) saveBlockPart(height uint, index uint, part *Part) {
|
||||
if height != bs.height+1 {
|
||||
panic(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height))
|
||||
}
|
||||
partBytes := binary.BinaryBytes(part)
|
||||
bs.db.Set(calcBlockPartKey(height, index), partBytes)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
type BlockMeta struct {
|
||||
Hash []byte // The block hash
|
||||
Header *Header // The block's Header
|
||||
Parts PartSetHeader // The PartSetHeader, for transfer
|
||||
}
|
||||
|
||||
func makeBlockMeta(block *Block, blockParts *PartSet) *BlockMeta {
|
||||
return &BlockMeta{
|
||||
Hash: block.Hash(),
|
||||
Header: block.Header,
|
||||
Parts: blockParts.Header(),
|
||||
}
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
func calcBlockMetaKey(height uint) []byte {
|
||||
return []byte(fmt.Sprintf("H:%v", height))
|
||||
}
|
||||
|
||||
func calcBlockPartKey(height uint, partIndex uint) []byte {
|
||||
return []byte(fmt.Sprintf("P:%v:%v", height, partIndex))
|
||||
}
|
||||
|
||||
func calcBlockValidationKey(height uint) []byte {
|
||||
return []byte(fmt.Sprintf("V:%v", height))
|
||||
}
|
||||
|
||||
func calcSeenValidationKey(height uint) []byte {
|
||||
return []byte(fmt.Sprintf("SV:%v", height))
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
var blockStoreKey = []byte("blockStore")
|
||||
|
||||
type BlockStoreStateJSON struct {
|
||||
Height uint
|
||||
}
|
||||
|
||||
func (bsj BlockStoreStateJSON) Save(db dbm.DB) {
|
||||
bytes, err := json.Marshal(bsj)
|
||||
if err != nil {
|
||||
panic(Fmt("Could not marshal state bytes: %v", err))
|
||||
}
|
||||
db.Set(blockStoreKey, bytes)
|
||||
}
|
||||
|
||||
func LoadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON {
|
||||
bytes := db.Get(blockStoreKey)
|
||||
if bytes == nil {
|
||||
return BlockStoreStateJSON{
|
||||
Height: 0,
|
||||
}
|
||||
}
|
||||
bsj := BlockStoreStateJSON{}
|
||||
err := json.Unmarshal(bytes, &bsj)
|
||||
if err != nil {
|
||||
panic(Fmt("Could not unmarshal bytes: %X", bytes))
|
||||
}
|
||||
return bsj
|
||||
}
|
Loading…
Reference in New Issue