diff --git a/Gopkg.lock b/Gopkg.lock index ff5cbcfe..270e9c5c 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -244,7 +244,7 @@ [[projects]] branch = "master" - digest = "1:dad2e5a2153ee7a6c9ab8fc13673a16ee4fb64434a7da980965a3741b0c981a3" + digest = "1:63b68062b8968092eb86bedc4e68894bd096ea6b24920faca8b9dcf451f54bb5" name = "github.com/prometheus/common" packages = [ "expfmt", @@ -377,14 +377,6 @@ revision = "a8328986c1608950fa5d3d1c0472cccc4f8fc02c" version = "v0.12.0-rc0" -[[projects]] - digest = "1:cefd237469d443fe56cbeadf68a1baf46cef293f071dc0e317f8b8062c3ffe72" - name = "github.com/tendermint/go-crypto" - packages = ["tmhash"] - pruneopts = "UT" - revision = "6a6b591a3d7592a04b46af95451cb5be3b114f76" - version = "v0.9.0" - [[projects]] branch = "master" digest = "1:c31a37cafc12315b8bd745c8ad6a006ac25350472488162a821e557b3e739d67" @@ -426,7 +418,7 @@ [[projects]] branch = "master" - digest = "1:70656e26ab4a96e683a21d677630edb5239a3d60b2d54bdc861c808ab5aa42c7" + digest = "1:bb0fe59917bdd5b89f49b9a8b26e5f465e325d9223b3a8e32254314bdf51e0f1" name = "golang.org/x/sys" packages = [ "cpu", @@ -547,7 +539,6 @@ "github.com/tendermint/ed25519", "github.com/tendermint/ed25519/extra25519", "github.com/tendermint/go-amino", - "github.com/tendermint/go-crypto/tmhash", "golang.org/x/crypto/bcrypt", "golang.org/x/crypto/chacha20poly1305", "golang.org/x/crypto/curve25519", diff --git a/docs/app-dev/subscribing-to-events-via-websocket.md b/docs/app-dev/subscribing-to-events-via-websocket.md index 9e7c642a..69ab59f5 100644 --- a/docs/app-dev/subscribing-to-events-via-websocket.md +++ b/docs/app-dev/subscribing-to-events-via-websocket.md @@ -26,3 +26,39 @@ more information on query syntax and other options. You can also use tags, given you had included them into DeliverTx response, to query transaction results. See [Indexing transactions](./indexing-transactions.md) for details. + +### ValidatorSetUpdates + +When validator set changes, ValidatorSetUpdates event is published. The +event carries a list of pubkey/power pairs. The list is the same +Tendermint receives from ABCI application (see [EndBlock +section](https://tendermint.com/docs/app-dev/abci-spec.html#endblock) in +the ABCI spec). + +Response: + +``` +{ + "jsonrpc": "2.0", + "id": "0#event", + "result": { + "query": "tm.event='ValidatorSetUpdates'", + "data": { + "type": "tendermint/event/ValidatorSetUpdates", + "value": { + "validator_updates": [ + { + "address": "09EAD022FD25DE3A02E64B0FE9610B1417183EE4", + "pub_key": { + "type": "tendermint/PubKeyEd25519", + "value": "ww0z4WaZ0Xg+YI10w43wTWbBmM3dpVza4mmSQYsd0ck=" + }, + "voting_power": "10", + "accum": "0" + } + ] + } + } + } +} +``` diff --git a/state/execution.go b/state/execution.go index 54e5c950..7e7bc37a 100644 --- a/state/execution.go +++ b/state/execution.go @@ -380,6 +380,13 @@ func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *ty Result: *(abciResponses.DeliverTx[i]), }}) } + + if len(abciResponses.EndBlock.ValidatorUpdates) > 0 { + // if there were an error, we would've stopped in updateValidators + updates, _ := types.PB2TM.Validators(abciResponses.EndBlock.ValidatorUpdates) + eventBus.PublishEventValidatorSetUpdates( + types.EventDataValidatorSetUpdates{ValidatorUpdates: updates}) + } } //---------------------------------------------------------------------------------------------------- diff --git a/state/execution_test.go b/state/execution_test.go index d5b0dda0..161b96f2 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -1,6 +1,7 @@ package state import ( + "context" "fmt" "testing" "time" @@ -232,6 +233,62 @@ func TestUpdateValidators(t *testing.T) { } } +// TestEndBlockValidatorUpdates ensures we update validator set and send an event. +func TestEndBlockValidatorUpdates(t *testing.T) { + app := &testApp{} + cc := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(cc, nil) + err := proxyApp.Start() + require.Nil(t, err) + defer proxyApp.Stop() + + state, stateDB := state(1, 1) + + blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), + MockMempool{}, MockEvidencePool{}) + eventBus := types.NewEventBus() + err = eventBus.Start() + require.NoError(t, err) + defer eventBus.Stop() + blockExec.SetEventBus(eventBus) + + updatesCh := make(chan interface{}, 1) + err = eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates, updatesCh) + require.NoError(t, err) + + block := makeBlock(state, 1) + blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()} + + pubkey := ed25519.GenPrivKey().PubKey() + app.ValidatorUpdates = []abci.Validator{ + {PubKey: types.TM2PB.PubKey(pubkey), Power: 10}, + } + + state, err = blockExec.ApplyBlock(state, blockID, block) + require.Nil(t, err) + + // test new validator was added to NextValidators + if assert.Equal(t, state.Validators.Size()+1, state.NextValidators.Size()) { + idx, _ := state.NextValidators.GetByAddress(pubkey.Address()) + if idx < 0 { + t.Fatalf("can't find address %v in the set %v", pubkey.Address(), state.NextValidators) + } + } + + // test we threw an event + select { + case e := <-updatesCh: + event, ok := e.(types.EventDataValidatorSetUpdates) + require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", e) + if assert.NotEmpty(t, event.ValidatorUpdates) { + assert.Equal(t, pubkey, event.ValidatorUpdates[0].PubKey) + assert.EqualValues(t, 10, event.ValidatorUpdates[0].VotingPower) + } + case <-time.After(1 * time.Second): + t.Fatal("Did not receive EventValidatorSetUpdates within 1 sec.") + } +} + //---------------------------------------------------------------------------- // make some bogus txs @@ -275,18 +332,15 @@ func makeBlock(state State, height int64) *types.Block { //---------------------------------------------------------------------------- -var _ abci.Application = (*testApp)(nil) - type testApp struct { abci.BaseApplication Validators []abci.SigningValidator ByzantineValidators []abci.Evidence + ValidatorUpdates []abci.Validator } -func NewKVStoreApplication() *testApp { - return &testApp{} -} +var _ abci.Application = (*testApp)(nil) func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) { return abci.ResponseInfo{} @@ -298,6 +352,10 @@ func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlo return abci.ResponseBeginBlock{} } +func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { + return abci.ResponseEndBlock{ValidatorUpdates: app.ValidatorUpdates} +} + func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx { return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}} } diff --git a/types/event_bus.go b/types/event_bus.go index b4965fee..d11c6520 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -71,34 +71,32 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error { return nil } -//--- block, tx, and vote events - -func (b *EventBus) PublishEventNewBlock(event EventDataNewBlock) error { - return b.Publish(EventNewBlock, event) +func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error { + return b.Publish(EventNewBlock, data) } -func (b *EventBus) PublishEventNewBlockHeader(event EventDataNewBlockHeader) error { - return b.Publish(EventNewBlockHeader, event) +func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error { + return b.Publish(EventNewBlockHeader, data) } -func (b *EventBus) PublishEventVote(event EventDataVote) error { - return b.Publish(EventVote, event) +func (b *EventBus) PublishEventVote(data EventDataVote) error { + return b.Publish(EventVote, data) } // PublishEventTx publishes tx event with tags from Result. Note it will add // predefined tags (EventTypeKey, TxHashKey). Existing tags with the same names // will be overwritten. -func (b *EventBus) PublishEventTx(event EventDataTx) error { +func (b *EventBus) PublishEventTx(data EventDataTx) error { // no explicit deadline for publishing events ctx := context.Background() tags := make(map[string]string) // validate and fill tags from tx result - for _, tag := range event.Result.Tags { + for _, tag := range data.Result.Tags { // basic validation if len(tag.Key) == 0 { - b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", event.Tx) + b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", data.Tx) continue } tags[string(tag.Key)] = string(tag.Value) @@ -109,55 +107,57 @@ func (b *EventBus) PublishEventTx(event EventDataTx) error { tags[EventTypeKey] = EventTx logIfTagExists(TxHashKey, tags, b.Logger) - tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash()) + tags[TxHashKey] = fmt.Sprintf("%X", data.Tx.Hash()) logIfTagExists(TxHeightKey, tags, b.Logger) - tags[TxHeightKey] = fmt.Sprintf("%d", event.Height) + tags[TxHeightKey] = fmt.Sprintf("%d", data.Height) - b.pubsub.PublishWithTags(ctx, event, tmpubsub.NewTagMap(tags)) + b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags)) return nil } -func (b *EventBus) PublishEventProposalHeartbeat(event EventDataProposalHeartbeat) error { - return b.Publish(EventProposalHeartbeat, event) +func (b *EventBus) PublishEventProposalHeartbeat(data EventDataProposalHeartbeat) error { + return b.Publish(EventProposalHeartbeat, data) } -//--- EventDataRoundState events - -func (b *EventBus) PublishEventNewRoundStep(event EventDataRoundState) error { - return b.Publish(EventNewRoundStep, event) +func (b *EventBus) PublishEventNewRoundStep(data EventDataRoundState) error { + return b.Publish(EventNewRoundStep, data) } -func (b *EventBus) PublishEventTimeoutPropose(event EventDataRoundState) error { - return b.Publish(EventTimeoutPropose, event) +func (b *EventBus) PublishEventTimeoutPropose(data EventDataRoundState) error { + return b.Publish(EventTimeoutPropose, data) } -func (b *EventBus) PublishEventTimeoutWait(event EventDataRoundState) error { - return b.Publish(EventTimeoutWait, event) +func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error { + return b.Publish(EventTimeoutWait, data) } -func (b *EventBus) PublishEventNewRound(event EventDataRoundState) error { - return b.Publish(EventNewRound, event) +func (b *EventBus) PublishEventNewRound(data EventDataRoundState) error { + return b.Publish(EventNewRound, data) } -func (b *EventBus) PublishEventCompleteProposal(event EventDataRoundState) error { - return b.Publish(EventCompleteProposal, event) +func (b *EventBus) PublishEventCompleteProposal(data EventDataRoundState) error { + return b.Publish(EventCompleteProposal, data) } -func (b *EventBus) PublishEventPolka(event EventDataRoundState) error { - return b.Publish(EventPolka, event) +func (b *EventBus) PublishEventPolka(data EventDataRoundState) error { + return b.Publish(EventPolka, data) } -func (b *EventBus) PublishEventUnlock(event EventDataRoundState) error { - return b.Publish(EventUnlock, event) +func (b *EventBus) PublishEventUnlock(data EventDataRoundState) error { + return b.Publish(EventUnlock, data) } -func (b *EventBus) PublishEventRelock(event EventDataRoundState) error { - return b.Publish(EventRelock, event) +func (b *EventBus) PublishEventRelock(data EventDataRoundState) error { + return b.Publish(EventRelock, data) } -func (b *EventBus) PublishEventLock(event EventDataRoundState) error { - return b.Publish(EventLock, event) +func (b *EventBus) PublishEventLock(data EventDataRoundState) error { + return b.Publish(EventLock, data) +} + +func (b *EventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error { + return b.Publish(EventValidatorSetUpdates, data) } func logIfTagExists(tag string, tags map[string]string, logger log.Logger) { diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 2ee9f886..f0e825d5 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -68,7 +68,7 @@ func TestEventBusPublish(t *testing.T) { err = eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, eventsCh) require.NoError(t, err) - const numEventsExpected = 14 + const numEventsExpected = 15 done := make(chan struct{}) go func() { numEvents := 0 @@ -108,6 +108,8 @@ func TestEventBusPublish(t *testing.T) { require.NoError(t, err) err = eventBus.PublishEventLock(EventDataRoundState{}) require.NoError(t, err) + err = eventBus.PublishEventValidatorSetUpdates(EventDataValidatorSetUpdates{}) + require.NoError(t, err) select { case <-done: diff --git a/types/events.go b/types/events.go index c26fecb7..09f7216e 100644 --- a/types/events.go +++ b/types/events.go @@ -8,42 +8,34 @@ import ( tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ) -// Reserved event types +// Reserved event types (alphabetically sorted). const ( - EventCompleteProposal = "CompleteProposal" - EventLock = "Lock" - EventNewBlock = "NewBlock" - EventNewBlockHeader = "NewBlockHeader" - EventNewRound = "NewRound" - EventNewRoundStep = "NewRoundStep" - EventPolka = "Polka" - EventRelock = "Relock" - EventTimeoutPropose = "TimeoutPropose" - EventTimeoutWait = "TimeoutWait" - EventTx = "Tx" - EventUnlock = "Unlock" - EventVote = "Vote" - EventProposalHeartbeat = "ProposalHeartbeat" + EventCompleteProposal = "CompleteProposal" + EventLock = "Lock" + EventNewBlock = "NewBlock" + EventNewBlockHeader = "NewBlockHeader" + EventNewRound = "NewRound" + EventNewRoundStep = "NewRoundStep" + EventPolka = "Polka" + EventProposalHeartbeat = "ProposalHeartbeat" + EventRelock = "Relock" + EventTimeoutPropose = "TimeoutPropose" + EventTimeoutWait = "TimeoutWait" + EventTx = "Tx" + EventUnlock = "Unlock" + EventValidatorSetUpdates = "ValidatorSetUpdates" + EventVote = "Vote" ) /////////////////////////////////////////////////////////////////////////////// // ENCODING / DECODING /////////////////////////////////////////////////////////////////////////////// -// implements events.EventData +// TMEventData implements events.EventData. type TMEventData interface { - AssertIsTMEventData() // empty interface } -func (_ EventDataNewBlock) AssertIsTMEventData() {} -func (_ EventDataNewBlockHeader) AssertIsTMEventData() {} -func (_ EventDataTx) AssertIsTMEventData() {} -func (_ EventDataRoundState) AssertIsTMEventData() {} -func (_ EventDataVote) AssertIsTMEventData() {} -func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {} -func (_ EventDataString) AssertIsTMEventData() {} - func RegisterEventDatas(cdc *amino.Codec) { cdc.RegisterInterface((*TMEventData)(nil), nil) cdc.RegisterConcrete(EventDataNewBlock{}, "tendermint/event/NewBlock", nil) @@ -52,6 +44,7 @@ func RegisterEventDatas(cdc *amino.Codec) { cdc.RegisterConcrete(EventDataRoundState{}, "tendermint/event/RoundState", nil) cdc.RegisterConcrete(EventDataVote{}, "tendermint/event/Vote", nil) cdc.RegisterConcrete(EventDataProposalHeartbeat{}, "tendermint/event/ProposalHeartbeat", nil) + cdc.RegisterConcrete(EventDataValidatorSetUpdates{}, "tendermint/event/ValidatorSetUpdates", nil) cdc.RegisterConcrete(EventDataString(""), "tendermint/event/ProposalString", nil) } @@ -92,6 +85,10 @@ type EventDataVote struct { type EventDataString string +type EventDataValidatorSetUpdates struct { + ValidatorUpdates []*Validator `json:"validator_updates"` +} + /////////////////////////////////////////////////////////////////////////////// // PUBSUB /////////////////////////////////////////////////////////////////////////////// @@ -108,20 +105,21 @@ const ( ) var ( - EventQueryNewBlock = QueryForEvent(EventNewBlock) - EventQueryNewBlockHeader = QueryForEvent(EventNewBlockHeader) - EventQueryNewRound = QueryForEvent(EventNewRound) - EventQueryNewRoundStep = QueryForEvent(EventNewRoundStep) - EventQueryTimeoutPropose = QueryForEvent(EventTimeoutPropose) - EventQueryCompleteProposal = QueryForEvent(EventCompleteProposal) - EventQueryPolka = QueryForEvent(EventPolka) - EventQueryUnlock = QueryForEvent(EventUnlock) - EventQueryLock = QueryForEvent(EventLock) - EventQueryRelock = QueryForEvent(EventRelock) - EventQueryTimeoutWait = QueryForEvent(EventTimeoutWait) - EventQueryVote = QueryForEvent(EventVote) - EventQueryProposalHeartbeat = QueryForEvent(EventProposalHeartbeat) - EventQueryTx = QueryForEvent(EventTx) + EventQueryCompleteProposal = QueryForEvent(EventCompleteProposal) + EventQueryLock = QueryForEvent(EventLock) + EventQueryNewBlock = QueryForEvent(EventNewBlock) + EventQueryNewBlockHeader = QueryForEvent(EventNewBlockHeader) + EventQueryNewRound = QueryForEvent(EventNewRound) + EventQueryNewRoundStep = QueryForEvent(EventNewRoundStep) + EventQueryPolka = QueryForEvent(EventPolka) + EventQueryProposalHeartbeat = QueryForEvent(EventProposalHeartbeat) + EventQueryRelock = QueryForEvent(EventRelock) + EventQueryTimeoutPropose = QueryForEvent(EventTimeoutPropose) + EventQueryTimeoutWait = QueryForEvent(EventTimeoutWait) + EventQueryTx = QueryForEvent(EventTx) + EventQueryUnlock = QueryForEvent(EventUnlock) + EventQueryValidatorSetUpdates = QueryForEvent(EventValidatorSetUpdates) + EventQueryVote = QueryForEvent(EventVote) ) func EventQueryTxFor(tx Tx) tmpubsub.Query { @@ -137,6 +135,7 @@ type BlockEventPublisher interface { PublishEventNewBlock(block EventDataNewBlock) error PublishEventNewBlockHeader(header EventDataNewBlockHeader) error PublishEventTx(EventDataTx) error + PublishEventValidatorSetUpdates(EventDataValidatorSetUpdates) error } type TxEventPublisher interface { diff --git a/types/nop_event_bus.go b/types/nop_event_bus.go index cd1eab8c..93694da4 100644 --- a/types/nop_event_bus.go +++ b/types/nop_event_bus.go @@ -20,58 +20,58 @@ func (NopEventBus) UnsubscribeAll(ctx context.Context, subscriber string) error return nil } -//--- block, tx, and vote events - -func (NopEventBus) PublishEventNewBlock(block EventDataNewBlock) error { +func (NopEventBus) PublishEventNewBlock(data EventDataNewBlock) error { return nil } -func (NopEventBus) PublishEventNewBlockHeader(header EventDataNewBlockHeader) error { +func (NopEventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error { return nil } -func (NopEventBus) PublishEventVote(vote EventDataVote) error { +func (NopEventBus) PublishEventVote(data EventDataVote) error { return nil } -func (NopEventBus) PublishEventTx(tx EventDataTx) error { +func (NopEventBus) PublishEventTx(data EventDataTx) error { return nil } -//--- EventDataRoundState events - -func (NopEventBus) PublishEventNewRoundStep(rs EventDataRoundState) error { +func (NopEventBus) PublishEventNewRoundStep(data EventDataRoundState) error { return nil } -func (NopEventBus) PublishEventTimeoutPropose(rs EventDataRoundState) error { +func (NopEventBus) PublishEventTimeoutPropose(data EventDataRoundState) error { return nil } -func (NopEventBus) PublishEventTimeoutWait(rs EventDataRoundState) error { +func (NopEventBus) PublishEventTimeoutWait(data EventDataRoundState) error { return nil } -func (NopEventBus) PublishEventNewRound(rs EventDataRoundState) error { +func (NopEventBus) PublishEventNewRound(data EventDataRoundState) error { return nil } -func (NopEventBus) PublishEventCompleteProposal(rs EventDataRoundState) error { +func (NopEventBus) PublishEventCompleteProposal(data EventDataRoundState) error { return nil } -func (NopEventBus) PublishEventPolka(rs EventDataRoundState) error { +func (NopEventBus) PublishEventPolka(data EventDataRoundState) error { return nil } -func (NopEventBus) PublishEventUnlock(rs EventDataRoundState) error { +func (NopEventBus) PublishEventUnlock(data EventDataRoundState) error { return nil } -func (NopEventBus) PublishEventRelock(rs EventDataRoundState) error { +func (NopEventBus) PublishEventRelock(data EventDataRoundState) error { return nil } -func (NopEventBus) PublishEventLock(rs EventDataRoundState) error { +func (NopEventBus) PublishEventLock(data EventDataRoundState) error { + return nil +} + +func (NopEventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error { return nil }