Proposer->Proposal; sign heartbeats

This commit is contained in:
Ethan Buchman 2017-07-29 14:15:10 -04:00
parent 10f8101314
commit ab753abfa0
6 changed files with 74 additions and 32 deletions

View File

@ -293,6 +293,15 @@ func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *ty
return nil
}
func (privVal *ByzantinePrivValidator) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
privVal.mtx.Lock()
defer privVal.mtx.Unlock()
// Sign
heartbeat.Signature = privVal.Sign(types.SignBytes(chainID, heartbeat))
return nil
}
func (privVal *ByzantinePrivValidator) String() string {
return Fmt("PrivValidator{%X}", privVal.Address)
}

View File

@ -312,19 +312,14 @@ func (conR *ConsensusReactor) registerEventCallbacks() {
conR.broadcastHasVoteMessage(edv.Vote)
})
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposerHeartbeat(), func(data types.TMEventData) {
heartbeat := data.Unwrap().(types.EventDataProposerHeartbeat)
conR.broadcastProposerHeartbeatMessage(heartbeat)
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposalHeartbeat(), func(data types.TMEventData) {
heartbeat := data.Unwrap().(types.EventDataProposalHeartbeat)
conR.broadcastProposalHeartbeatMessage(heartbeat)
})
}
func (conR *ConsensusReactor) broadcastProposerHeartbeatMessage(heartbeat types.EventDataProposerHeartbeat) {
msg := &ProposerHeartbeatMessage{
Height: heartbeat.Height,
Round: heartbeat.Round,
Proposer: heartbeat.Proposer,
Sequence: heartbeat.Sequence,
}
func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) {
msg := &ProposalHeartbeatMessage{heartbeat.Heartbeat}
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
}
@ -1323,15 +1318,12 @@ func (m *VoteSetBitsMessage) String() string {
//-------------------------------------
// ProposerHeartbeatMessage is sent to signal that the proposer is alive and waiting for transactions
type ProposerHeartbeatMessage struct {
Height int
Round int
Proposer []byte
Sequence int
// ProposalHeartbeatMessage is sent to signal that the proposer is alive and waiting for transactions
type ProposalHeartbeatMessage struct {
Heartbeat *types.Heartbeat
}
// String returns a string representation.
func (m *ProposerHeartbeatMessage) String() string {
return fmt.Sprintf("[HEARTBEAT %v/%02d %X %d]", m.Height, m.Round, m.Proposer, m.Sequence)
func (m *ProposalHeartbeatMessage) String() string {
return fmt.Sprintf("[HEARTBEAT %v]", m.Heartbeat)
}

View File

@ -181,6 +181,7 @@ type PrivValidator interface {
GetAddress() []byte
SignVote(chainID string, vote *types.Vote) error
SignProposal(chainID string, proposal *types.Proposal) error
SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error
}
// ConsensusState handles execution of the consensus algorithm.
@ -810,13 +811,26 @@ func (cs *ConsensusState) needProofBlock(height int) bool {
func (cs *ConsensusState) proposalHeartbeat() {
counter := 0
addr := cs.privValidator.GetAddress()
valIndex, v := cs.Validators.GetByAddress(addr)
if v == nil {
// not a validator
valIndex = -1
}
for {
select {
default:
if cs.evsw != nil {
rs := cs.GetRoundState().RoundStateEvent()
heartbeat := types.EventDataProposerHeartbeat{rs, addr, counter}
types.FireEventProposerHeartbeat(cs.evsw, heartbeat)
heartbeat := &types.Heartbeat{
Height: rs.Height,
Round: rs.Round,
Sequence: counter,
ValidatorAddress: addr,
ValidatorIndex: valIndex,
}
cs.privValidator.SignHeartbeat(cs.state.ChainID, heartbeat)
heartbeatEvent := types.EventDataProposalHeartbeat{heartbeat}
types.FireEventProposalHeartbeat(cs.evsw, heartbeatEvent)
counter += 1
}
time.Sleep(time.Second)

View File

@ -31,6 +31,14 @@ type CanonicalJSONVote struct {
Type byte `json:"type"`
}
type CanonicalJSONHeartbeat struct {
Height int `json:"height"`
Round int `json:"round"`
Sequence int `json:"sequence"`
ValidatorAddress data.Bytes `json:"validator_address"`
ValidatorIndex int `json:"validator_index"`
}
//------------------------------------
// Messages including a "chain id" can only be applied to one chain, hence "Once"
@ -44,6 +52,11 @@ type CanonicalJSONOnceVote struct {
Vote CanonicalJSONVote `json:"vote"`
}
type CanonicalJSONOnceHeartbeat struct {
ChainID string `json:"chain_id"`
Heartbeat CanonicalJSONHeartbeat `json:"heartbeat"`
}
//-----------------------------------
// Canonicalize the structs
@ -79,3 +92,13 @@ func CanonicalVote(vote *Vote) CanonicalJSONVote {
vote.Type,
}
}
func CanonicalHeartbeat(heartbeat *Heartbeat) CanonicalJSONHeartbeat {
return CanonicalJSONHeartbeat{
heartbeat.Height,
heartbeat.Round,
heartbeat.Sequence,
heartbeat.ValidatorAddress,
heartbeat.ValidatorIndex,
}
}

View File

@ -31,7 +31,7 @@ func EventStringRelock() string { return "Relock" }
func EventStringTimeoutWait() string { return "TimeoutWait" }
func EventStringVote() string { return "Vote" }
func EventStringProposerHeartbeat() string { return "ProposerHeartbeat" }
func EventStringProposalHeartbeat() string { return "ProposalHeartbeat" }
//----------------------------------------
@ -42,7 +42,7 @@ var (
EventDataNameRoundState = "round_state"
EventDataNameVote = "vote"
EventDataNameProposerHeartbeat = "proposer_heartbeat"
EventDataNameProposalHeartbeat = "proposer_heartbeat"
)
//----------------------------------------
@ -89,7 +89,7 @@ const (
EventDataTypeRoundState = byte(0x11)
EventDataTypeVote = byte(0x12)
EventDataTypeProposerHeartbeat = byte(0x20)
EventDataTypeProposalHeartbeat = byte(0x20)
)
var tmEventDataMapper = data.NewMapper(TMEventData{}).
@ -98,7 +98,7 @@ var tmEventDataMapper = data.NewMapper(TMEventData{}).
RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx).
RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState).
RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote).
RegisterImplementation(EventDataProposerHeartbeat{}, EventDataNameProposerHeartbeat, EventDataTypeProposerHeartbeat)
RegisterImplementation(EventDataProposalHeartbeat{}, EventDataNameProposalHeartbeat, EventDataTypeProposalHeartbeat)
// Most event messages are basic types (a block, a transaction)
// but some (an input to a call tx or a receive) are more exotic
@ -122,11 +122,8 @@ type EventDataTx struct {
Error string `json:"error"` // this is redundant information for now
}
type EventDataProposerHeartbeat struct {
EventDataRoundState
Proposer []byte `json:"proposer"`
Sequence int `json:"sequence"`
type EventDataProposalHeartbeat struct {
Heartbeat *Heartbeat
}
// NOTE: This goes into the replay WAL
@ -149,7 +146,7 @@ func (_ EventDataTx) AssertIsTMEventData() {}
func (_ EventDataRoundState) AssertIsTMEventData() {}
func (_ EventDataVote) AssertIsTMEventData() {}
func (_ EventDataProposerHeartbeat) AssertIsTMEventData() {}
func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {}
//----------------------------------------
// Wrappers for type safety
@ -249,6 +246,6 @@ func FireEventLock(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringLock(), TMEventData{rs})
}
func FireEventProposerHeartbeat(fireable events.Fireable, rs EventDataProposerHeartbeat) {
fireEvent(fireable, EventStringProposerHeartbeat(), TMEventData{rs})
func FireEventProposalHeartbeat(fireable events.Fireable, rs EventDataProposalHeartbeat) {
fireEvent(fireable, EventStringProposalHeartbeat(), TMEventData{rs})
}

View File

@ -252,6 +252,13 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt
}
func (privVal *PrivValidator) SignHeartbeat(chainID string, heartbeat *Heartbeat) error {
privVal.mtx.Lock()
defer privVal.mtx.Unlock()
heartbeat.Signature = privVal.Sign(SignBytes(chainID, heartbeat))
return nil
}
func (privVal *PrivValidator) String() string {
return fmt.Sprintf("PrivValidator{%v LH:%v, LR:%v, LS:%v}", privVal.Address, privVal.LastHeight, privVal.LastRound, privVal.LastStep)
}