Merge pull request #714 from tendermint/feature/no-block-response
Feature/no block response
This commit is contained in:
commit
765c325441
|
@ -121,6 +121,24 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
|||
bcR.pool.RemovePeer(peer.Key())
|
||||
}
|
||||
|
||||
// respondToPeer loads a block and sends it to the requesting peer,
|
||||
// if we have it. Otherwise, we'll respond saying we don't have it.
|
||||
// According to the Tendermint spec, if all nodes are honest,
|
||||
// no node should be requesting for a block that's non-existent.
|
||||
func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage, src p2p.Peer) (queued bool) {
|
||||
block := bcR.store.LoadBlock(msg.Height)
|
||||
if block != nil {
|
||||
msg := &bcBlockResponseMessage{Block: block}
|
||||
return src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
|
||||
}
|
||||
|
||||
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
|
||||
|
||||
return src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{
|
||||
&bcNoBlockResponseMessage{Height: msg.Height},
|
||||
})
|
||||
}
|
||||
|
||||
// Receive implements Reactor by handling 4 types of messages (look below).
|
||||
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
_, msg, err := DecodeMessage(msgBytes, bcR.maxMsgSize())
|
||||
|
@ -134,16 +152,8 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
|
|||
// TODO: improve logic to satisfy megacheck
|
||||
switch msg := msg.(type) {
|
||||
case *bcBlockRequestMessage:
|
||||
// Got a request for a block. Respond with block if we have it.
|
||||
block := bcR.store.LoadBlock(msg.Height)
|
||||
if block != nil {
|
||||
msg := &bcBlockResponseMessage{Block: block}
|
||||
queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
|
||||
if !queued {
|
||||
// queue is full, just ignore.
|
||||
}
|
||||
} else {
|
||||
// TODO peer is asking for things we don't have.
|
||||
if queued := bcR.respondToPeer(msg, src); !queued {
|
||||
// Unfortunately not queued since the queue is full.
|
||||
}
|
||||
case *bcBlockResponseMessage:
|
||||
// Got a block.
|
||||
|
@ -276,10 +286,11 @@ func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) {
|
|||
// Messages
|
||||
|
||||
const (
|
||||
msgTypeBlockRequest = byte(0x10)
|
||||
msgTypeBlockResponse = byte(0x11)
|
||||
msgTypeStatusResponse = byte(0x20)
|
||||
msgTypeStatusRequest = byte(0x21)
|
||||
msgTypeBlockRequest = byte(0x10)
|
||||
msgTypeBlockResponse = byte(0x11)
|
||||
msgTypeNoBlockResponse = byte(0x12)
|
||||
msgTypeStatusResponse = byte(0x20)
|
||||
msgTypeStatusRequest = byte(0x21)
|
||||
)
|
||||
|
||||
// BlockchainMessage is a generic message for this reactor.
|
||||
|
@ -289,6 +300,7 @@ var _ = wire.RegisterInterface(
|
|||
struct{ BlockchainMessage }{},
|
||||
wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
|
||||
wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
|
||||
wire.ConcreteType{&bcNoBlockResponseMessage{}, msgTypeNoBlockResponse},
|
||||
wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
|
||||
wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
|
||||
)
|
||||
|
@ -316,6 +328,14 @@ func (m *bcBlockRequestMessage) String() string {
|
|||
return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height)
|
||||
}
|
||||
|
||||
type bcNoBlockResponseMessage struct {
|
||||
Height int
|
||||
}
|
||||
|
||||
func (brm *bcNoBlockResponseMessage) String() string {
|
||||
return cmn.Fmt("[bcNoBlockResponseMessage %d]", brm.Height)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// NOTE: keep up-to-date with maxBlockchainResponseSize
|
||||
|
|
|
@ -0,0 +1,158 @@
|
|||
package blockchain
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
wire "github.com/tendermint/go-wire"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
"github.com/tendermint/tmlibs/db"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func newBlockchainReactor(logger log.Logger, maxBlockHeight int) *BlockchainReactor {
|
||||
config := cfg.ResetTestRoot("node_node_test")
|
||||
|
||||
blockStoreDB := db.NewDB("blockstore", config.DBBackend, config.DBDir())
|
||||
blockStore := NewBlockStore(blockStoreDB)
|
||||
|
||||
stateLogger := logger.With("module", "state")
|
||||
|
||||
// Get State
|
||||
stateDB := db.NewDB("state", config.DBBackend, config.DBDir())
|
||||
state, _ := sm.GetState(stateDB, config.GenesisFile())
|
||||
|
||||
state.SetLogger(stateLogger)
|
||||
state.Save()
|
||||
|
||||
// Make the blockchainReactor itself
|
||||
fastSync := true
|
||||
bcReactor := NewBlockchainReactor(state.Copy(), nil, blockStore, fastSync)
|
||||
|
||||
// Next: we need to set a switch in order for peers to be added in
|
||||
bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig())
|
||||
bcReactor.SetLogger(logger.With("module", "blockchain"))
|
||||
|
||||
// Lastly: let's add some blocks in
|
||||
for blockHeight := 1; blockHeight <= maxBlockHeight; blockHeight++ {
|
||||
firstBlock := makeBlock(blockHeight, state)
|
||||
secondBlock := makeBlock(blockHeight+1, state)
|
||||
firstParts := firstBlock.MakePartSet(state.Params().BlockGossipParams.BlockPartSizeBytes)
|
||||
blockStore.SaveBlock(firstBlock, firstParts, secondBlock.LastCommit)
|
||||
}
|
||||
|
||||
return bcReactor
|
||||
}
|
||||
|
||||
func TestNoBlockMessageResponse(t *testing.T) {
|
||||
logBuf := new(bytes.Buffer)
|
||||
logger := log.NewTMLogger(logBuf)
|
||||
maxBlockHeight := 20
|
||||
|
||||
bcr := newBlockchainReactor(logger, maxBlockHeight)
|
||||
go bcr.OnStart()
|
||||
defer bcr.Stop()
|
||||
|
||||
// Add some peers in
|
||||
peer := newbcrTestPeer(cmn.RandStr(12))
|
||||
bcr.AddPeer(peer)
|
||||
|
||||
chID := byte(0x01)
|
||||
|
||||
tests := []struct {
|
||||
height int
|
||||
existent bool
|
||||
}{
|
||||
{maxBlockHeight + 2, false},
|
||||
{10, true},
|
||||
{1, true},
|
||||
{100, false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
reqBlockMsg := &bcBlockRequestMessage{tt.height}
|
||||
reqBlockBytes := wire.BinaryBytes(struct{ BlockchainMessage }{reqBlockMsg})
|
||||
bcr.Receive(chID, peer, reqBlockBytes)
|
||||
value := peer.lastValue()
|
||||
msg := value.(struct{ BlockchainMessage }).BlockchainMessage
|
||||
|
||||
if tt.existent {
|
||||
if blockMsg, ok := msg.(*bcBlockResponseMessage); !ok {
|
||||
t.Fatalf("Expected to receive a block response for height %d", tt.height)
|
||||
} else if blockMsg.Block.Height != tt.height {
|
||||
t.Fatalf("Expected response to be for height %d, got %d", tt.height, blockMsg.Block.Height)
|
||||
}
|
||||
} else {
|
||||
if noBlockMsg, ok := msg.(*bcNoBlockResponseMessage); !ok {
|
||||
t.Fatalf("Expected to receive a no block response for height %d", tt.height)
|
||||
} else if noBlockMsg.Height != tt.height {
|
||||
t.Fatalf("Expected response to be for height %d, got %d", tt.height, noBlockMsg.Height)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//----------------------------------------------
|
||||
// utility funcs
|
||||
|
||||
func makeTxs(blockNumber int) (txs []types.Tx) {
|
||||
for i := 0; i < 10; i++ {
|
||||
txs = append(txs, types.Tx([]byte{byte(blockNumber), byte(i)}))
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
func makeBlock(blockNumber int, state *sm.State) *types.Block {
|
||||
prevHash := state.LastBlockID.Hash
|
||||
prevParts := types.PartSetHeader{}
|
||||
valHash := state.Validators.Hash()
|
||||
prevBlockID := types.BlockID{prevHash, prevParts}
|
||||
block, _ := types.MakeBlock(blockNumber, "test_chain", makeTxs(blockNumber),
|
||||
new(types.Commit), prevBlockID, valHash, state.AppHash, state.Params().BlockGossipParams.BlockPartSizeBytes)
|
||||
return block
|
||||
}
|
||||
|
||||
// The Test peer
|
||||
type bcrTestPeer struct {
|
||||
cmn.Service
|
||||
key string
|
||||
ch chan interface{}
|
||||
}
|
||||
|
||||
var _ p2p.Peer = (*bcrTestPeer)(nil)
|
||||
|
||||
func newbcrTestPeer(key string) *bcrTestPeer {
|
||||
return &bcrTestPeer{
|
||||
Service: cmn.NewBaseService(nil, "bcrTestPeer", nil),
|
||||
key: key,
|
||||
ch: make(chan interface{}, 2),
|
||||
}
|
||||
}
|
||||
|
||||
func (tp *bcrTestPeer) lastValue() interface{} { return <-tp.ch }
|
||||
|
||||
func (tp *bcrTestPeer) TrySend(chID byte, value interface{}) bool {
|
||||
if _, ok := value.(struct{ BlockchainMessage }).BlockchainMessage.(*bcStatusResponseMessage); ok {
|
||||
// Discard status response messages since they skew our results
|
||||
// We only want to deal with:
|
||||
// + bcBlockResponseMessage
|
||||
// + bcNoBlockResponseMessage
|
||||
} else {
|
||||
tp.ch <- value
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (tp *bcrTestPeer) Send(chID byte, data interface{}) bool { return tp.TrySend(chID, data) }
|
||||
func (tp *bcrTestPeer) NodeInfo() *p2p.NodeInfo { return nil }
|
||||
func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} }
|
||||
func (tp *bcrTestPeer) Key() string { return tp.key }
|
||||
func (tp *bcrTestPeer) IsOutbound() bool { return false }
|
||||
func (tp *bcrTestPeer) IsPersistent() bool { return true }
|
||||
func (tp *bcrTestPeer) Get(s string) interface{} { return s }
|
||||
func (tp *bcrTestPeer) Set(string, interface{}) {}
|
Loading…
Reference in New Issue