diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 3a68a2f5..94a03c7a 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -293,6 +293,15 @@ func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *ty return nil } +func (privVal *ByzantinePrivValidator) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + + // Sign + heartbeat.Signature = privVal.Sign(types.SignBytes(chainID, heartbeat)) + return nil +} + func (privVal *ByzantinePrivValidator) String() string { return Fmt("PrivValidator{%X}", privVal.Address) } diff --git a/consensus/reactor.go b/consensus/reactor.go index ce25442f..3e12a314 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -311,6 +311,16 @@ func (conR *ConsensusReactor) registerEventCallbacks() { edv := data.Unwrap().(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote) }) + + types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposalHeartbeat(), func(data types.TMEventData) { + heartbeat := data.Unwrap().(types.EventDataProposalHeartbeat) + conR.broadcastProposalHeartbeatMessage(heartbeat) + }) +} + +func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) { + msg := &ProposalHeartbeatMessage{heartbeat.Heartbeat} + conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg}) } func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) { @@ -1305,3 +1315,15 @@ type VoteSetBitsMessage struct { func (m *VoteSetBitsMessage) String() string { return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes) } + +//------------------------------------- + +// ProposalHeartbeatMessage is sent to signal that a node is alive and waiting for transactions for a proposal. +type ProposalHeartbeatMessage struct { + Heartbeat *types.Heartbeat +} + +// String returns a string representation. +func (m *ProposalHeartbeatMessage) String() string { + return fmt.Sprintf("[HEARTBEAT %v]", m.Heartbeat) +} diff --git a/consensus/state.go b/consensus/state.go index 4695a630..e6fd665a 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -181,6 +181,7 @@ type PrivValidator interface { GetAddress() []byte SignVote(chainID string, vote *types.Vote) error SignProposal(chainID string, proposal *types.Proposal) error + SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error } // ConsensusState handles execution of the consensus algorithm. @@ -787,7 +788,7 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { // we may need an empty "proof" block, and enterPropose immediately. waitForTxs := cs.config.NoEmptyBlocks && round == 0 && !cs.needProofBlock(height) if waitForTxs { - go cs.proposalHeartbeat() + go cs.proposalHeartbeat(height, round) } else { cs.enterPropose(height, round) } @@ -807,14 +808,32 @@ func (cs *ConsensusState) needProofBlock(height int) bool { return false } -func (cs *ConsensusState) proposalHeartbeat() { +func (cs *ConsensusState) proposalHeartbeat(height, round int) { + counter := 0 + addr := cs.privValidator.GetAddress() + valIndex, v := cs.Validators.GetByAddress(addr) + if v == nil { + // not a validator + valIndex = -1 + } for { - select { - default: - // TODO: broadcast heartbeat - - time.Sleep(time.Second) + rs := cs.GetRoundState() + // if we've already moved on, no need to send more heartbeats + if rs.Step > RoundStepNewRound || rs.Round > round || rs.Height > height { + return } + heartbeat := &types.Heartbeat{ + Height: rs.Height, + Round: rs.Round, + Sequence: counter, + ValidatorAddress: addr, + ValidatorIndex: valIndex, + } + cs.privValidator.SignHeartbeat(cs.state.ChainID, heartbeat) + heartbeatEvent := types.EventDataProposalHeartbeat{heartbeat} + types.FireEventProposalHeartbeat(cs.evsw, heartbeatEvent) + counter += 1 + time.Sleep(time.Second) } } diff --git a/types/canonical_json.go b/types/canonical_json.go index 2e8583a4..5f1a0aca 100644 --- a/types/canonical_json.go +++ b/types/canonical_json.go @@ -31,6 +31,14 @@ type CanonicalJSONVote struct { Type byte `json:"type"` } +type CanonicalJSONHeartbeat struct { + Height int `json:"height"` + Round int `json:"round"` + Sequence int `json:"sequence"` + ValidatorAddress data.Bytes `json:"validator_address"` + ValidatorIndex int `json:"validator_index"` +} + //------------------------------------ // Messages including a "chain id" can only be applied to one chain, hence "Once" @@ -44,6 +52,11 @@ type CanonicalJSONOnceVote struct { Vote CanonicalJSONVote `json:"vote"` } +type CanonicalJSONOnceHeartbeat struct { + ChainID string `json:"chain_id"` + Heartbeat CanonicalJSONHeartbeat `json:"heartbeat"` +} + //----------------------------------- // Canonicalize the structs @@ -79,3 +92,13 @@ func CanonicalVote(vote *Vote) CanonicalJSONVote { vote.Type, } } + +func CanonicalHeartbeat(heartbeat *Heartbeat) CanonicalJSONHeartbeat { + return CanonicalJSONHeartbeat{ + heartbeat.Height, + heartbeat.Round, + heartbeat.Sequence, + heartbeat.ValidatorAddress, + heartbeat.ValidatorIndex, + } +} diff --git a/types/events.go b/types/events.go index 8c29c444..79e17fe0 100644 --- a/types/events.go +++ b/types/events.go @@ -31,6 +31,8 @@ func EventStringRelock() string { return "Relock" } func EventStringTimeoutWait() string { return "TimeoutWait" } func EventStringVote() string { return "Vote" } +func EventStringProposalHeartbeat() string { return "ProposalHeartbeat" } + //---------------------------------------- var ( @@ -39,6 +41,8 @@ var ( EventDataNameTx = "tx" EventDataNameRoundState = "round_state" EventDataNameVote = "vote" + + EventDataNameProposalHeartbeat = "proposer_heartbeat" ) //---------------------------------------- @@ -84,6 +88,8 @@ const ( EventDataTypeRoundState = byte(0x11) EventDataTypeVote = byte(0x12) + + EventDataTypeProposalHeartbeat = byte(0x20) ) var tmEventDataMapper = data.NewMapper(TMEventData{}). @@ -91,7 +97,8 @@ var tmEventDataMapper = data.NewMapper(TMEventData{}). RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader). RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx). RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState). - RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote) + RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote). + RegisterImplementation(EventDataProposalHeartbeat{}, EventDataNameProposalHeartbeat, EventDataTypeProposalHeartbeat) // Most event messages are basic types (a block, a transaction) // but some (an input to a call tx or a receive) are more exotic @@ -115,6 +122,10 @@ type EventDataTx struct { Error string `json:"error"` // this is redundant information for now } +type EventDataProposalHeartbeat struct { + Heartbeat *Heartbeat +} + // NOTE: This goes into the replay WAL type EventDataRoundState struct { Height int `json:"height"` @@ -135,6 +146,8 @@ func (_ EventDataTx) AssertIsTMEventData() {} func (_ EventDataRoundState) AssertIsTMEventData() {} func (_ EventDataVote) AssertIsTMEventData() {} +func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {} + //---------------------------------------- // Wrappers for type safety @@ -232,3 +245,7 @@ func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) { func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { fireEvent(fireable, EventStringLock(), TMEventData{rs}) } + +func FireEventProposalHeartbeat(fireable events.Fireable, rs EventDataProposalHeartbeat) { + fireEvent(fireable, EventStringProposalHeartbeat(), TMEventData{rs}) +} diff --git a/types/heartbeat.go b/types/heartbeat.go new file mode 100644 index 00000000..378f6202 --- /dev/null +++ b/types/heartbeat.go @@ -0,0 +1,42 @@ +package types + +import ( + "fmt" + "io" + + "github.com/tendermint/go-crypto" + "github.com/tendermint/go-wire" + "github.com/tendermint/go-wire/data" + cmn "github.com/tendermint/tmlibs/common" +) + +type Heartbeat struct { + ValidatorAddress data.Bytes `json:"validator_address"` + ValidatorIndex int `json:"validator_index"` + Height int `json:"height"` + Round int `json:"round"` + Sequence int `json:"sequence"` + Signature crypto.Signature `json:"signature"` +} + +func (heartbeat *Heartbeat) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) { + wire.WriteJSON(CanonicalJSONOnceHeartbeat{ + chainID, + CanonicalHeartbeat(heartbeat), + }, w, n, err) +} + +func (heartbeat *Heartbeat) Copy() *Heartbeat { + heartbeatCopy := *heartbeat + return &heartbeatCopy +} + +func (heartbeat *Heartbeat) String() string { + if heartbeat == nil { + return "nil-heartbeat" + } + + return fmt.Sprintf("Heartbeat{%v:%X %v/%02d (%v) %v}", + heartbeat.ValidatorIndex, cmn.Fingerprint(heartbeat.ValidatorAddress), + heartbeat.Height, heartbeat.Round, heartbeat.Sequence, heartbeat.Signature) +} diff --git a/types/priv_validator.go b/types/priv_validator.go index 8c9a011c..69082493 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -252,6 +252,13 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt } +func (privVal *PrivValidator) SignHeartbeat(chainID string, heartbeat *Heartbeat) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + heartbeat.Signature = privVal.Sign(SignBytes(chainID, heartbeat)) + return nil +} + func (privVal *PrivValidator) String() string { return fmt.Sprintf("PrivValidator{%v LH:%v, LR:%v, LS:%v}", privVal.Address, privVal.LastHeight, privVal.LastRound, privVal.LastStep) }