Implement BFT time

This commit is contained in:
Jae Kwon 2018-05-23 03:36:18 -07:00
parent 5161d0aa86
commit b4120e25ff
10 changed files with 89 additions and 17 deletions

View File

@ -132,7 +132,7 @@ vagrant_test:
### go tests
test:
@echo "--> Running go test"
@go test $(PACKAGES)
@GOCACHE=off go test $(PACKAGES)
test_race:
@echo "--> Running go test --race"

View File

@ -31,7 +31,7 @@ func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) {
func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainReactor {
state, blockStore := makeStateAndBlockStore(logger)
// Make the blockchainReactor itself
// Make the blockchainReactor itself.
fastSync := true
var nilApp proxy.AppConnConsensus
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nilApp,
@ -40,10 +40,10 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
// Next: we need to set a switch in order for peers to be added in
// Next: we need to set a switch in order for peers to be added in.
bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig())
// Lastly: let's add some blocks in
// Lastly: let's add some blocks in.
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
firstBlock := makeBlock(blockHeight, state)
secondBlock := makeBlock(blockHeight+1, state)
@ -155,7 +155,7 @@ func makeTxs(height int64) (txs []types.Tx) {
}
func makeBlock(height int64, state sm.State) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), new(types.Commit))
block, _ := state.MakeBlock(height, makeTxs(height), new(types.Commit), 0)
return block
}

View File

@ -394,6 +394,11 @@ type ConsensusConfig struct {
// Reactor sleep duration parameters are in milliseconds
PeerGossipSleepDuration int `mapstructure:"peer_gossip_sleep_duration"`
PeerQueryMaj23SleepDuration int `mapstructure:"peer_query_maj23_sleep_duration"`
// Block time parameters in milliseconds
BlockTimeIota int `mapstructure:"blocktime_iota"`
BlockTimeWiggle int `mapstructure:"blocktime_wiggle"`
BlockTimeWiggleR float64 `mapstructure:"blocktime_wiggle_r"`
}
// DefaultConsensusConfig returns a default configuration for the consensus service
@ -414,6 +419,9 @@ func DefaultConsensusConfig() *ConsensusConfig {
CreateEmptyBlocksInterval: 0,
PeerGossipSleepDuration: 100,
PeerQueryMaj23SleepDuration: 2000,
BlockTimeIota: 10,
BlockTimeWiggle: 20000,
BlockTimeWiggleR: 0.05,
}
}
@ -430,6 +438,10 @@ func TestConsensusConfig() *ConsensusConfig {
cfg.SkipTimeoutCommit = true
cfg.PeerGossipSleepDuration = 5
cfg.PeerQueryMaj23SleepDuration = 250
// 1 second so we actually test the iota function.
// This is enforced logically in the propose function.
cfg.BlockTimeIota = 200
return cfg
}
@ -473,6 +485,33 @@ func (cfg *ConsensusConfig) PeerQueryMaj23Sleep() time.Duration {
return time.Duration(cfg.PeerQueryMaj23SleepDuration) * time.Millisecond
}
// BlockTimeMinValidTime returns the minimum acceptable block time, as part
// of "subjective time validity". See the BFT time spec.
func (cfg *ConsensusConfig) BlockTimeMinValidTime(lastBlockTime, now time.Time, round int) time.Time {
var minValidTime time.Time = lastBlockTime.Add(time.Duration(cfg.BlockTimeIota) * time.Millisecond)
if round == 0 {
wiggleAgo := now.Add(-1 * time.Duration(cfg.BlockTimeWiggle) * time.Millisecond)
if wiggleAgo.After(minValidTime) {
minValidTime = wiggleAgo
}
} else {
// For all subsequent rounds, we accept any block > last_block_time+iota.
}
return minValidTime
}
// BlockTimeMaxValidTime returns the maximum acceptable block time, as part
// of "subjective time validity". See the BFT time spec.
func (cfg *ConsensusConfig) BlockTimeMaxValidTime(lastBlockTime, now time.Time, round int) time.Time {
return now.
Add(time.Duration(cfg.BlockTimeWiggle) * time.Millisecond).
Add(
time.Duration(
float64(cfg.BlockTimeWiggle)*cfg.BlockTimeWiggleR*float64(round),
) * time.Millisecond,
)
}
// WalFile returns the full path to the write-ahead log file
func (cfg *ConsensusConfig) WalFile() string {
if cfg.walFile != "" {

View File

@ -120,7 +120,7 @@ func startTestRound(cs *ConsensusState, height int64, round int) {
cs.startRoutines(0)
}
// Create proposal block from cs1 but sign it with vs
// Create proposal block from cs1 but sign it with vs.
func decideProposal(cs1 *ConsensusState, vs *validatorStub, height int64, round int) (proposal *types.Proposal, block *types.Block) {
block, blockParts := cs1.createProposalBlock()
if block == nil { // on error

View File

@ -885,7 +885,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
// Mempool validated transactions
txs := cs.mempool.Reap(cs.config.MaxBlockSizeTxs)
block, parts := cs.state.MakeBlock(cs.Height, txs, commit)
block, parts := cs.state.MakeBlock(cs.Height, txs, commit, cs.config.BlockTimeIota)
evidence := cs.evpool.PendingEvidence()
block.AddEvidence(evidence)
return block, parts
@ -941,8 +941,26 @@ func (cs *ConsensusState) defaultDoPrevote(height int64, round int) {
return
}
// Validate proposal block timestamp.
// TODOerr := cs.validate
// Validate proposal block timestamp ("subjective time validity").
// See the BFT time spec.
lastBlockTime := cs.state.LastBlockTime
now := time.Now().Round(0).UTC()
minValidTime := cs.config.BlockTimeMinValidTime(lastBlockTime, now, round)
if cs.ProposalBlock.Time.Before(minValidTime) {
logger.Info("enterPrevote: ProposalBlock time too low",
"blockTime", cs.ProposalBlock.Time,
"minValidTime", minValidTime)
cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{})
return
}
maxValidTime := cs.config.BlockTimeMaxValidTime(lastBlockTime, now, round)
if maxValidTime.Before(cs.ProposalBlock.Time) {
logger.Info("enterPrevote: ProposalBlock time too high",
"blockTime", cs.ProposalBlock.Time,
"maxValidTime", maxValidTime)
cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{})
return
}
// Validate proposal block.
err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock)

View File

@ -29,7 +29,7 @@ func MinValidTime(last_block_time, now time.Time, round int) time.Time {
}
// wiggle and wiggle_r are provided by consensus config.
func MaxValidTime(last_block_time, round int) time.Time {
func MaxValidTime(last_block_time, now time.Time, round int) time.Time {
return now.
Add(wiggle).
Add(wiggle*wiggle_r*round)

View File

@ -74,7 +74,7 @@ func TestBeginBlockAbsentValidators(t *testing.T) {
for _, tc := range testCases {
lastCommit := &types.Commit{BlockID: prevBlockID, Precommits: tc.lastCommitPrecommits}
block, _ := state.MakeBlock(2, makeTxs(2), lastCommit)
block, _ := state.MakeBlock(2, makeTxs(2), lastCommit, 0)
_, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger())
require.Nil(t, err, tc.desc)
@ -118,7 +118,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
for _, tc := range testCases {
lastCommit := &types.Commit{BlockID: prevBlockID}
block, _ := state.MakeBlock(10, makeTxs(2), lastCommit)
block, _ := state.MakeBlock(10, makeTxs(2), lastCommit, 0)
block.Evidence.Evidence = tc.evidence
_, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger())
require.Nil(t, err, tc.desc)
@ -150,7 +150,7 @@ func state() State {
}
func makeBlock(state State, height int64) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(state.LastBlockHeight), new(types.Commit))
block, _ := state.MakeBlock(height, makeTxs(state.LastBlockHeight), new(types.Commit), 0)
return block
}

View File

@ -102,11 +102,12 @@ func (s State) GetValidators() (last *types.ValidatorSet, current *types.Validat
// Create a block from the latest state
// MakeBlock builds a block with the given txs and commit from the current state.
func (s State) MakeBlock(height int64, txs []types.Tx, commit *types.Commit) (*types.Block, *types.PartSet) {
// build base block
func (s State) MakeBlock(height int64, txs []types.Tx, commit *types.Commit, blockTimeIota int) (*types.Block, *types.PartSet) {
// Build base block.
block := types.MakeBlock(height, txs, commit)
// fill header with state data
// Fill header with state data.
block.ChainID = s.ChainID
block.TotalTxs = s.LastBlockTotalTx + block.NumTxs
block.LastBlockID = s.LastBlockID
@ -115,6 +116,12 @@ func (s State) MakeBlock(height int64, txs []types.Tx, commit *types.Commit) (*t
block.ConsensusHash = s.ConsensusParams.Hash()
block.LastResultsHash = s.LastResultsHash
// Ensure valid time. See BFT time spec.
minValidTime := s.LastBlockTime.Add(time.Duration(blockTimeIota) * time.Millisecond)
if block.Time.Before(minValidTime) {
block.Time = minValidTime
}
return block, block.MakePartSet(s.ConsensusParams.BlockGossip.BlockPartSizeBytes)
}

View File

@ -31,6 +31,7 @@ func MakeBlock(height int64, txs []Tx, commit *Commit) *Block {
Height: height,
Time: time.Now().Round(0), // Strip monotonic.
NumTxs: int64(len(txs)),
Nonce: cmn.RandStr(12),
},
LastCommit: commit,
Data: &Data{
@ -177,6 +178,7 @@ type Header struct {
Height int64 `json:"height"`
Time time.Time `json:"time"`
NumTxs int64 `json:"num_txs"`
Nonce string `json:"nonce"`
// prev block info
LastBlockID BlockID `json:"last_block_id"`
@ -209,6 +211,7 @@ func (h *Header) Hash() cmn.HexBytes {
"Height": aminoHasher(h.Height),
"Time": aminoHasher(h.Time),
"NumTxs": aminoHasher(h.NumTxs),
"Nonce": aminoHasher(h.Nonce),
"TotalTxs": aminoHasher(h.TotalTxs),
"LastBlockID": aminoHasher(h.LastBlockID),
"LastCommit": aminoHasher(h.LastCommitHash),
@ -231,6 +234,7 @@ func (h *Header) StringIndented(indent string) string {
%s Height: %v
%s Time: %v
%s NumTxs: %v
%s Nonce: %v
%s TotalTxs: %v
%s LastBlockID: %v
%s LastCommit: %v
@ -245,6 +249,7 @@ func (h *Header) StringIndented(indent string) string {
indent, h.Height,
indent, h.Time,
indent, h.NumTxs,
indent, h.Nonce,
indent, h.TotalTxs,
indent, h.LastBlockID,
indent, h.LastCommitHash,

View File

@ -21,7 +21,10 @@ type EventBusSubscriber interface {
// are proxied to underlying pubsub server. All events must be published using
// EventBus to ensure correct data types.
type EventBus struct {
// JAE: This is supposed to be unnecessary as pubsub is already.
// TODO: Use BaseService as intended by overriding OnStart/Stop.
cmn.BaseService
pubsub *tmpubsub.Server
}
@ -32,7 +35,7 @@ func NewEventBus() *EventBus {
// NewEventBusWithBufferCapacity returns a new event bus with the given buffer capacity.
func NewEventBusWithBufferCapacity(cap int) *EventBus {
// capacity could be exposed later if needed
// Capacity could be exposed later if needed.
pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(cap))
b := &EventBus{pubsub: pubsub}
b.BaseService = *cmn.NewBaseService(nil, "EventBus", b)