From 530626dab7fd2986d1949ec41a568e18aeff4367 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 20 Jul 2017 15:09:44 -0400 Subject: [PATCH 1/5] broadcast proposer heartbeat msg --- consensus/reactor.go | 30 ++++++++++++++++++++++++++++++ consensus/state.go | 10 ++++++++-- types/events.go | 22 +++++++++++++++++++++- 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index ce25442f..cd0ebbd3 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -311,6 +311,21 @@ func (conR *ConsensusReactor) registerEventCallbacks() { edv := data.Unwrap().(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote) }) + + types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposerHeartbeat(), func(data types.TMEventData) { + heartbeat := data.Unwrap().(types.EventDataProposerHeartbeat) + conR.broadcastProposerHeartbeatMessage(heartbeat) + }) +} + +func (conR *ConsensusReactor) broadcastProposerHeartbeatMessage(heartbeat types.EventDataProposerHeartbeat) { + msg := &ProposerHeartbeatMessage{ + Height: heartbeat.Height, + Round: heartbeat.Round, + Proposer: heartbeat.Proposer, + Sequence: heartbeat.Sequence, + } + conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg}) } func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) { @@ -1305,3 +1320,18 @@ type VoteSetBitsMessage struct { func (m *VoteSetBitsMessage) String() string { return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes) } + +//------------------------------------- + +// ProposerHeartbeatMessage is sent to signal that the proposer is alive and waiting for transactions +type ProposerHeartbeatMessage struct { + Height int + Round int + Proposer []byte + Sequence int +} + +// String returns a string representation. +func (m *ProposerHeartbeatMessage) String() string { + return fmt.Sprintf("[HEARTBEAT %v/%02d %X %d]", m.Height, m.Round, m.Proposer, m.Sequence) +} diff --git a/consensus/state.go b/consensus/state.go index 4695a630..222e2eb1 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -808,11 +808,17 @@ func (cs *ConsensusState) needProofBlock(height int) bool { } func (cs *ConsensusState) proposalHeartbeat() { + counter := 0 + addr := cs.privValidator.GetAddress() for { select { default: - // TODO: broadcast heartbeat - + if cs.evsw != nil { + rs := cs.RoundStateEvent() + heartbeat := types.EventDataProposerHeartbeat{rs, addr, counter} + types.FireEventProposerHeartbeat(cs.evsw, heartbeat) + counter += 1 + } time.Sleep(time.Second) } } diff --git a/types/events.go b/types/events.go index 8c29c444..625e0aa9 100644 --- a/types/events.go +++ b/types/events.go @@ -31,6 +31,8 @@ func EventStringRelock() string { return "Relock" } func EventStringTimeoutWait() string { return "TimeoutWait" } func EventStringVote() string { return "Vote" } +func EventStringProposerHeartbeat() string { return "ProposerHeartbeat" } + //---------------------------------------- var ( @@ -39,6 +41,8 @@ var ( EventDataNameTx = "tx" EventDataNameRoundState = "round_state" EventDataNameVote = "vote" + + EventDataNameProposerHeartbeat = "proposer_heartbeat" ) //---------------------------------------- @@ -84,6 +88,8 @@ const ( EventDataTypeRoundState = byte(0x11) EventDataTypeVote = byte(0x12) + + EventDataTypeProposerHeartbeat = byte(0x20) ) var tmEventDataMapper = data.NewMapper(TMEventData{}). @@ -91,7 +97,8 @@ var tmEventDataMapper = data.NewMapper(TMEventData{}). RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader). RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx). RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState). - RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote) + RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote). + RegisterImplementation(EventDataProposerHeartbeat{}, EventDataNameProposerHeartbeat, EventDataTypeProposerHeartbeat) // Most event messages are basic types (a block, a transaction) // but some (an input to a call tx or a receive) are more exotic @@ -115,6 +122,13 @@ type EventDataTx struct { Error string `json:"error"` // this is redundant information for now } +type EventDataProposerHeartbeat struct { + EventDataRoundState + + Proposer []byte `json:"proposer"` + Sequence int `json:"sequence"` +} + // NOTE: This goes into the replay WAL type EventDataRoundState struct { Height int `json:"height"` @@ -135,6 +149,8 @@ func (_ EventDataTx) AssertIsTMEventData() {} func (_ EventDataRoundState) AssertIsTMEventData() {} func (_ EventDataVote) AssertIsTMEventData() {} +func (_ EventDataProposerHeartbeat) AssertIsTMEventData() {} + //---------------------------------------- // Wrappers for type safety @@ -232,3 +248,7 @@ func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) { func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { fireEvent(fireable, EventStringLock(), TMEventData{rs}) } + +func FireEventProposerHeartbeat(fireable events.Fireable, rs EventDataProposerHeartbeat) { + fireEvent(fireable, EventStringProposerHeartbeat(), TMEventData{rs}) +} From 10f81013142d653a14cbc7c36dfa7b2f024c6c2c Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 25 Jul 2017 11:55:34 -0400 Subject: [PATCH 2/5] fix race --- consensus/state.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/state.go b/consensus/state.go index 222e2eb1..5d1e106d 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -814,7 +814,7 @@ func (cs *ConsensusState) proposalHeartbeat() { select { default: if cs.evsw != nil { - rs := cs.RoundStateEvent() + rs := cs.GetRoundState().RoundStateEvent() heartbeat := types.EventDataProposerHeartbeat{rs, addr, counter} types.FireEventProposerHeartbeat(cs.evsw, heartbeat) counter += 1 From ab753abfa0ab519f3fc8ca68765a6a632ebc261b Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 29 Jul 2017 14:15:10 -0400 Subject: [PATCH 3/5] Proposer->Proposal; sign heartbeats --- consensus/byzantine_test.go | 9 +++++++++ consensus/reactor.go | 28 ++++++++++------------------ consensus/state.go | 18 ++++++++++++++++-- types/canonical_json.go | 23 +++++++++++++++++++++++ types/events.go | 21 +++++++++------------ types/priv_validator.go | 7 +++++++ 6 files changed, 74 insertions(+), 32 deletions(-) diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 3a68a2f5..94a03c7a 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -293,6 +293,15 @@ func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *ty return nil } +func (privVal *ByzantinePrivValidator) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + + // Sign + heartbeat.Signature = privVal.Sign(types.SignBytes(chainID, heartbeat)) + return nil +} + func (privVal *ByzantinePrivValidator) String() string { return Fmt("PrivValidator{%X}", privVal.Address) } diff --git a/consensus/reactor.go b/consensus/reactor.go index cd0ebbd3..6c8a7544 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -312,19 +312,14 @@ func (conR *ConsensusReactor) registerEventCallbacks() { conR.broadcastHasVoteMessage(edv.Vote) }) - types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposerHeartbeat(), func(data types.TMEventData) { - heartbeat := data.Unwrap().(types.EventDataProposerHeartbeat) - conR.broadcastProposerHeartbeatMessage(heartbeat) + types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposalHeartbeat(), func(data types.TMEventData) { + heartbeat := data.Unwrap().(types.EventDataProposalHeartbeat) + conR.broadcastProposalHeartbeatMessage(heartbeat) }) } -func (conR *ConsensusReactor) broadcastProposerHeartbeatMessage(heartbeat types.EventDataProposerHeartbeat) { - msg := &ProposerHeartbeatMessage{ - Height: heartbeat.Height, - Round: heartbeat.Round, - Proposer: heartbeat.Proposer, - Sequence: heartbeat.Sequence, - } +func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) { + msg := &ProposalHeartbeatMessage{heartbeat.Heartbeat} conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg}) } @@ -1323,15 +1318,12 @@ func (m *VoteSetBitsMessage) String() string { //------------------------------------- -// ProposerHeartbeatMessage is sent to signal that the proposer is alive and waiting for transactions -type ProposerHeartbeatMessage struct { - Height int - Round int - Proposer []byte - Sequence int +// ProposalHeartbeatMessage is sent to signal that the proposer is alive and waiting for transactions +type ProposalHeartbeatMessage struct { + Heartbeat *types.Heartbeat } // String returns a string representation. -func (m *ProposerHeartbeatMessage) String() string { - return fmt.Sprintf("[HEARTBEAT %v/%02d %X %d]", m.Height, m.Round, m.Proposer, m.Sequence) +func (m *ProposalHeartbeatMessage) String() string { + return fmt.Sprintf("[HEARTBEAT %v]", m.Heartbeat) } diff --git a/consensus/state.go b/consensus/state.go index 5d1e106d..2d0a9a21 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -181,6 +181,7 @@ type PrivValidator interface { GetAddress() []byte SignVote(chainID string, vote *types.Vote) error SignProposal(chainID string, proposal *types.Proposal) error + SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error } // ConsensusState handles execution of the consensus algorithm. @@ -810,13 +811,26 @@ func (cs *ConsensusState) needProofBlock(height int) bool { func (cs *ConsensusState) proposalHeartbeat() { counter := 0 addr := cs.privValidator.GetAddress() + valIndex, v := cs.Validators.GetByAddress(addr) + if v == nil { + // not a validator + valIndex = -1 + } for { select { default: if cs.evsw != nil { rs := cs.GetRoundState().RoundStateEvent() - heartbeat := types.EventDataProposerHeartbeat{rs, addr, counter} - types.FireEventProposerHeartbeat(cs.evsw, heartbeat) + heartbeat := &types.Heartbeat{ + Height: rs.Height, + Round: rs.Round, + Sequence: counter, + ValidatorAddress: addr, + ValidatorIndex: valIndex, + } + cs.privValidator.SignHeartbeat(cs.state.ChainID, heartbeat) + heartbeatEvent := types.EventDataProposalHeartbeat{heartbeat} + types.FireEventProposalHeartbeat(cs.evsw, heartbeatEvent) counter += 1 } time.Sleep(time.Second) diff --git a/types/canonical_json.go b/types/canonical_json.go index 2e8583a4..5f1a0aca 100644 --- a/types/canonical_json.go +++ b/types/canonical_json.go @@ -31,6 +31,14 @@ type CanonicalJSONVote struct { Type byte `json:"type"` } +type CanonicalJSONHeartbeat struct { + Height int `json:"height"` + Round int `json:"round"` + Sequence int `json:"sequence"` + ValidatorAddress data.Bytes `json:"validator_address"` + ValidatorIndex int `json:"validator_index"` +} + //------------------------------------ // Messages including a "chain id" can only be applied to one chain, hence "Once" @@ -44,6 +52,11 @@ type CanonicalJSONOnceVote struct { Vote CanonicalJSONVote `json:"vote"` } +type CanonicalJSONOnceHeartbeat struct { + ChainID string `json:"chain_id"` + Heartbeat CanonicalJSONHeartbeat `json:"heartbeat"` +} + //----------------------------------- // Canonicalize the structs @@ -79,3 +92,13 @@ func CanonicalVote(vote *Vote) CanonicalJSONVote { vote.Type, } } + +func CanonicalHeartbeat(heartbeat *Heartbeat) CanonicalJSONHeartbeat { + return CanonicalJSONHeartbeat{ + heartbeat.Height, + heartbeat.Round, + heartbeat.Sequence, + heartbeat.ValidatorAddress, + heartbeat.ValidatorIndex, + } +} diff --git a/types/events.go b/types/events.go index 625e0aa9..79e17fe0 100644 --- a/types/events.go +++ b/types/events.go @@ -31,7 +31,7 @@ func EventStringRelock() string { return "Relock" } func EventStringTimeoutWait() string { return "TimeoutWait" } func EventStringVote() string { return "Vote" } -func EventStringProposerHeartbeat() string { return "ProposerHeartbeat" } +func EventStringProposalHeartbeat() string { return "ProposalHeartbeat" } //---------------------------------------- @@ -42,7 +42,7 @@ var ( EventDataNameRoundState = "round_state" EventDataNameVote = "vote" - EventDataNameProposerHeartbeat = "proposer_heartbeat" + EventDataNameProposalHeartbeat = "proposer_heartbeat" ) //---------------------------------------- @@ -89,7 +89,7 @@ const ( EventDataTypeRoundState = byte(0x11) EventDataTypeVote = byte(0x12) - EventDataTypeProposerHeartbeat = byte(0x20) + EventDataTypeProposalHeartbeat = byte(0x20) ) var tmEventDataMapper = data.NewMapper(TMEventData{}). @@ -98,7 +98,7 @@ var tmEventDataMapper = data.NewMapper(TMEventData{}). RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx). RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState). RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote). - RegisterImplementation(EventDataProposerHeartbeat{}, EventDataNameProposerHeartbeat, EventDataTypeProposerHeartbeat) + RegisterImplementation(EventDataProposalHeartbeat{}, EventDataNameProposalHeartbeat, EventDataTypeProposalHeartbeat) // Most event messages are basic types (a block, a transaction) // but some (an input to a call tx or a receive) are more exotic @@ -122,11 +122,8 @@ type EventDataTx struct { Error string `json:"error"` // this is redundant information for now } -type EventDataProposerHeartbeat struct { - EventDataRoundState - - Proposer []byte `json:"proposer"` - Sequence int `json:"sequence"` +type EventDataProposalHeartbeat struct { + Heartbeat *Heartbeat } // NOTE: This goes into the replay WAL @@ -149,7 +146,7 @@ func (_ EventDataTx) AssertIsTMEventData() {} func (_ EventDataRoundState) AssertIsTMEventData() {} func (_ EventDataVote) AssertIsTMEventData() {} -func (_ EventDataProposerHeartbeat) AssertIsTMEventData() {} +func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {} //---------------------------------------- // Wrappers for type safety @@ -249,6 +246,6 @@ func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { fireEvent(fireable, EventStringLock(), TMEventData{rs}) } -func FireEventProposerHeartbeat(fireable events.Fireable, rs EventDataProposerHeartbeat) { - fireEvent(fireable, EventStringProposerHeartbeat(), TMEventData{rs}) +func FireEventProposalHeartbeat(fireable events.Fireable, rs EventDataProposalHeartbeat) { + fireEvent(fireable, EventStringProposalHeartbeat(), TMEventData{rs}) } diff --git a/types/priv_validator.go b/types/priv_validator.go index 8c9a011c..69082493 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -252,6 +252,13 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt } +func (privVal *PrivValidator) SignHeartbeat(chainID string, heartbeat *Heartbeat) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + heartbeat.Signature = privVal.Sign(SignBytes(chainID, heartbeat)) + return nil +} + func (privVal *PrivValidator) String() string { return fmt.Sprintf("PrivValidator{%v LH:%v, LR:%v, LS:%v}", privVal.Address, privVal.LastHeight, privVal.LastRound, privVal.LastStep) } From b8ac67e240148102dc4e1d4064ddb1fd7e6e2ab0 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 3 Aug 2017 13:25:26 -0400 Subject: [PATCH 4/5] some fixes --- consensus/reactor.go | 2 +- consensus/state.go | 37 ++++++++++++++++++------------------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 6c8a7544..3e12a314 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -1318,7 +1318,7 @@ func (m *VoteSetBitsMessage) String() string { //------------------------------------- -// ProposalHeartbeatMessage is sent to signal that the proposer is alive and waiting for transactions +// ProposalHeartbeatMessage is sent to signal that a node is alive and waiting for transactions for a proposal. type ProposalHeartbeatMessage struct { Heartbeat *types.Heartbeat } diff --git a/consensus/state.go b/consensus/state.go index 2d0a9a21..e6fd665a 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -788,7 +788,7 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { // we may need an empty "proof" block, and enterPropose immediately. waitForTxs := cs.config.NoEmptyBlocks && round == 0 && !cs.needProofBlock(height) if waitForTxs { - go cs.proposalHeartbeat() + go cs.proposalHeartbeat(height, round) } else { cs.enterPropose(height, round) } @@ -808,7 +808,7 @@ func (cs *ConsensusState) needProofBlock(height int) bool { return false } -func (cs *ConsensusState) proposalHeartbeat() { +func (cs *ConsensusState) proposalHeartbeat(height, round int) { counter := 0 addr := cs.privValidator.GetAddress() valIndex, v := cs.Validators.GetByAddress(addr) @@ -817,24 +817,23 @@ func (cs *ConsensusState) proposalHeartbeat() { valIndex = -1 } for { - select { - default: - if cs.evsw != nil { - rs := cs.GetRoundState().RoundStateEvent() - heartbeat := &types.Heartbeat{ - Height: rs.Height, - Round: rs.Round, - Sequence: counter, - ValidatorAddress: addr, - ValidatorIndex: valIndex, - } - cs.privValidator.SignHeartbeat(cs.state.ChainID, heartbeat) - heartbeatEvent := types.EventDataProposalHeartbeat{heartbeat} - types.FireEventProposalHeartbeat(cs.evsw, heartbeatEvent) - counter += 1 - } - time.Sleep(time.Second) + rs := cs.GetRoundState() + // if we've already moved on, no need to send more heartbeats + if rs.Step > RoundStepNewRound || rs.Round > round || rs.Height > height { + return } + heartbeat := &types.Heartbeat{ + Height: rs.Height, + Round: rs.Round, + Sequence: counter, + ValidatorAddress: addr, + ValidatorIndex: valIndex, + } + cs.privValidator.SignHeartbeat(cs.state.ChainID, heartbeat) + heartbeatEvent := types.EventDataProposalHeartbeat{heartbeat} + types.FireEventProposalHeartbeat(cs.evsw, heartbeatEvent) + counter += 1 + time.Sleep(time.Second) } } From d0965cca05981e10609ddf19865e4f8620acf5b6 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 3 Aug 2017 13:58:17 -0400 Subject: [PATCH 5/5] forgot heartbeat file --- types/heartbeat.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 types/heartbeat.go diff --git a/types/heartbeat.go b/types/heartbeat.go new file mode 100644 index 00000000..378f6202 --- /dev/null +++ b/types/heartbeat.go @@ -0,0 +1,42 @@ +package types + +import ( + "fmt" + "io" + + "github.com/tendermint/go-crypto" + "github.com/tendermint/go-wire" + "github.com/tendermint/go-wire/data" + cmn "github.com/tendermint/tmlibs/common" +) + +type Heartbeat struct { + ValidatorAddress data.Bytes `json:"validator_address"` + ValidatorIndex int `json:"validator_index"` + Height int `json:"height"` + Round int `json:"round"` + Sequence int `json:"sequence"` + Signature crypto.Signature `json:"signature"` +} + +func (heartbeat *Heartbeat) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) { + wire.WriteJSON(CanonicalJSONOnceHeartbeat{ + chainID, + CanonicalHeartbeat(heartbeat), + }, w, n, err) +} + +func (heartbeat *Heartbeat) Copy() *Heartbeat { + heartbeatCopy := *heartbeat + return &heartbeatCopy +} + +func (heartbeat *Heartbeat) String() string { + if heartbeat == nil { + return "nil-heartbeat" + } + + return fmt.Sprintf("Heartbeat{%v:%X %v/%02d (%v) %v}", + heartbeat.ValidatorIndex, cmn.Fingerprint(heartbeat.ValidatorAddress), + heartbeat.Height, heartbeat.Round, heartbeat.Sequence, heartbeat.Signature) +}