From 782a836db0d97437db1b7a13e42c6114467b2719 Mon Sep 17 00:00:00 2001 From: Adrian Brink Date: Mon, 16 Oct 2017 16:01:32 +0200 Subject: [PATCH] Cleanup of code and code docs This cleans up some of the code in the state package --- mempool/mempool.go | 14 ++-- rpc/lib/client/http_client.go | 4 +- state/state.go | 117 ++++++++++++++++++++-------------- state/state_test.go | 39 ++++++++---- state/txindex/indexer.go | 19 +++--- state/txindex/kv/kv.go | 8 ++- state/txindex/null/null.go | 4 +- types/block.go | 6 +- 8 files changed, 127 insertions(+), 84 deletions(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index 07b267c4..c1130d9d 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -50,9 +50,10 @@ TODO: Better handle abci client errors. (make it automatically handle connection const cacheSize = 100000 -// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus round. -// Transaction validity is checked using the CheckTx abci message before the transaction is added to the pool. -// The Mempool uses a concurrent list structure for storing transactions that can be efficiently accessed by multiple concurrent readers. +// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus +// round. Transaction validity is checked using the CheckTx abci message before the transaction is +// added to the pool. The Mempool uses a concurrent list structure for storing transactions that +// can be efficiently accessed by multiple concurrent readers. type Mempool struct { config *cfg.MempoolConfig @@ -78,6 +79,7 @@ type Mempool struct { } // NewMempool returns a new Mempool with the given configuration and connection to an application. +// TODO: Extract logger into arguments. func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int) *Mempool { mempool := &Mempool{ config: config, @@ -162,7 +164,7 @@ func (mem *Mempool) TxsFrontWait() *clist.CElement { // It blocks if we're waiting on Update() or Reap(). // cb: A callback from the CheckTx command. // It gets called from another goroutine. -// CONTRACT: Either cb will get called, or err returned. +// NOTE: Either cb will get called, or err returned. func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() @@ -287,9 +289,7 @@ func (mem *Mempool) notifyTxsAvailable() { if mem.Size() == 0 { panic("notified txs available but mempool is empty!") } - if mem.txsAvailable != nil && - !mem.notifiedTxsAvailable { - + if mem.txsAvailable != nil && !mem.notifiedTxsAvailable { mem.notifiedTxsAvailable = true mem.txsAvailable <- mem.height + 1 } diff --git a/rpc/lib/client/http_client.go b/rpc/lib/client/http_client.go index 1fbaedfa..df0e175e 100644 --- a/rpc/lib/client/http_client.go +++ b/rpc/lib/client/http_client.go @@ -12,6 +12,7 @@ import ( "strings" "github.com/pkg/errors" + types "github.com/tendermint/tendermint/rpc/lib/types" ) @@ -60,12 +61,13 @@ func makeHTTPClient(remoteAddr string) (string, *http.Client) { //------------------------------------------------------------------------------------ -// JSON rpc takes params as a slice +// JSONRPCClient takes params as a slice type JSONRPCClient struct { address string client *http.Client } +// NewJSONRPCClient takes an address and returns a pointer to an instance of JSONRPCClient func NewJSONRPCClient(remote string) *JSONRPCClient { address, client := makeHTTPClient(remote) return &JSONRPCClient{ diff --git a/state/state.go b/state/state.go index d3646cf1..801101d7 100644 --- a/state/state.go +++ b/state/state.go @@ -8,11 +8,13 @@ import ( "time" abci "github.com/tendermint/abci/types" + cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" wire "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" @@ -50,18 +52,18 @@ type State struct { LastBlockTime time.Time Validators *types.ValidatorSet LastValidators *types.ValidatorSet - - // AppHash is updated after Commit - AppHash []byte - - TxIndexer txindex.TxIndexer `json:"-"` // Transaction indexer - // When a block returns a validator set change via EndBlock, // the change only applies to the next block. // So, if s.LastBlockHeight causes a valset change, // we set s.LastHeightValidatorsChanged = s.LastBlockHeight + 1 LastHeightValidatorsChanged int + // AppHash is updated after Commit + AppHash []byte + + // TxIndexer indexes transactions + TxIndexer txindex.TxIndexer `json:"-"` + logger log.Logger } @@ -88,19 +90,21 @@ func LoadState(db dbm.DB) *State { } func loadState(db dbm.DB, key []byte) *State { - s := &State{db: db, TxIndexer: &null.TxIndex{}} buf := db.Get(key) if len(buf) == 0 { return nil - } else { - r, n, err := bytes.NewReader(buf), new(int), new(error) - wire.ReadBinaryPtr(&s, r, 0, n, err) - if *err != nil { - // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - cmn.Exit(cmn.Fmt("LoadState: Data has been corrupted or its spec has changed: %v\n", *err)) - } - // TODO: ensure that buf is completely read. } + + s := &State{db: db, TxIndexer: &null.TxIndex{}} + r, n, err := bytes.NewReader(buf), new(int), new(error) + wire.ReadBinaryPtr(&s, r, 0, n, err) + if *err != nil { + // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED + cmn.Exit(cmn.Fmt(`LoadState: Data has been corrupted or its spec has changed: + %v\n`, *err)) + } + // TODO: ensure that buf is completely read. + return s } @@ -110,6 +114,8 @@ func (s *State) SetLogger(l log.Logger) { } // Copy makes a copy of the State for mutating. +// NOTE: Does not create a copy of TxIndexer. It creates a new pointer that points to the same +// underlying TxIndexer. func (s *State) Copy() *State { return &State{ db: s.db, @@ -119,7 +125,7 @@ func (s *State) Copy() *State { Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), AppHash: s.AppHash, - TxIndexer: s.TxIndexer, // pointer here, not value + TxIndexer: s.TxIndexer, LastHeightValidatorsChanged: s.LastHeightValidatorsChanged, logger: s.logger, ChainID: s.ChainID, @@ -131,6 +137,7 @@ func (s *State) Copy() *State { func (s *State) Save() { s.mtx.Lock() defer s.mtx.Unlock() + s.saveValidatorsInfo() s.db.SetSync(stateKey, s.Bytes()) } @@ -142,38 +149,43 @@ func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) { } // LoadABCIResponses loads the ABCIResponses from the database. +// This is useful for recovering from crashes where we called app.Commit and before we called +// s.Save() func (s *State) LoadABCIResponses() *ABCIResponses { - abciResponses := new(ABCIResponses) - buf := s.db.Get(abciResponsesKey) - if len(buf) != 0 { - r, n, err := bytes.NewReader(buf), new(int), new(error) - wire.ReadBinaryPtr(abciResponses, r, 0, n, err) - if *err != nil { - // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - cmn.Exit(cmn.Fmt("LoadABCIResponses: Data has been corrupted or its spec has changed: %v\n", *err)) - } - // TODO: ensure that buf is completely read. + if len(buf) == 0 { + return nil } + + abciResponses := new(ABCIResponses) + r, n, err := bytes.NewReader(buf), new(int), new(error) + wire.ReadBinaryPtr(abciResponses, r, 0, n, err) + if *err != nil { + // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED + cmn.Exit(cmn.Fmt(`LoadABCIResponses: Data has been corrupted or its spec has + changed: %v\n`, *err)) + } + // TODO: ensure that buf is completely read. + return abciResponses } // LoadValidators loads the ValidatorSet for a given height. func (s *State) LoadValidators(height int) (*types.ValidatorSet, error) { - v := s.loadValidators(height) - if v == nil { + valInfo := s.loadValidators(height) + if valInfo == nil { return nil, ErrNoValSetForHeight{height} } - if v.ValidatorSet == nil { - v = s.loadValidators(v.LastHeightChanged) - if v == nil { - cmn.PanicSanity(fmt.Sprintf(`Couldn't find validators at - height %d as last changed from height %d`, v.LastHeightChanged, height)) + if valInfo.ValidatorSet == nil { + valInfo = s.loadValidators(valInfo.LastHeightChanged) + if valInfo == nil { + cmn.PanicSanity(fmt.Sprintf(`Couldn't find validators at height %d as + last changed from height %d`, valInfo.LastHeightChanged, height)) } } - return v.ValidatorSet, nil + return valInfo.ValidatorSet, nil } func (s *State) loadValidators(height int) *ValidatorsInfo { @@ -187,9 +199,11 @@ func (s *State) loadValidators(height int) *ValidatorsInfo { wire.ReadBinaryPtr(v, r, 0, n, err) if *err != nil { // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - cmn.Exit(cmn.Fmt("LoadValidators: Data has been corrupted or its spec has changed: %v\n", *err)) + cmn.Exit(cmn.Fmt(`LoadValidators: Data has been corrupted or its spec has changed: + %v\n`, *err)) } // TODO: ensure that buf is completely read. + return v } @@ -200,13 +214,13 @@ func (s *State) loadValidators(height int) *ValidatorsInfo { func (s *State) saveValidatorsInfo() { changeHeight := s.LastHeightValidatorsChanged nextHeight := s.LastBlockHeight + 1 - vi := &ValidatorsInfo{ + valInfo := &ValidatorsInfo{ LastHeightChanged: changeHeight, } if changeHeight == nextHeight { - vi.ValidatorSet = s.Validators + valInfo.ValidatorSet = s.Validators } - s.db.SetSync(calcValidatorsKey(nextHeight), vi.Bytes()) + s.db.SetSync(calcValidatorsKey(nextHeight), valInfo.Bytes()) } // Equals returns true if the States are identical. @@ -219,8 +233,10 @@ func (s *State) Bytes() []byte { return wire.BinaryBytes(s) } -// SetBlockAndValidators mutates State variables to update block and validators after running EndBlock. -func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, abciResponses *ABCIResponses) { +// SetBlockAndValidators mutates State variables to update block and validators after running +// EndBlock. +func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, + abciResponses *ABCIResponses) { // copy the valset so we can apply changes from EndBlock // and update s.LastValidators and s.Validators @@ -248,8 +264,7 @@ func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader typ } -func (s *State) setBlockAndValidators( - height int, blockID types.BlockID, blockTime time.Time, +func (s *State) setBlockAndValidators(height int, blockID types.BlockID, blockTime time.Time, prevValSet, nextValSet *types.ValidatorSet) { s.LastBlockHeight = height @@ -260,10 +275,17 @@ func (s *State) setBlockAndValidators( } // GetValidators returns the last and current validator sets. -func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) { +func (s *State) GetValidators() (last *types.ValidatorSet, current *types.ValidatorSet) { return s.LastValidators, s.Validators } +// Params returns the consensus parameters used for validating blocks +func (s *State) Params() types.ConsensusParams { + // TODO: this should move into the State proper + // when we allow the app to change it + return *s.GenesisDoc.ConsensusParams +} + //------------------------------------------------------------------------ // ABCIResponses retains the responses of the various ABCI calls during block processing. @@ -293,15 +315,15 @@ func (a *ABCIResponses) Bytes() []byte { //----------------------------------------------------------------------------- -// ValidatorsInfo represents the latest validator set, or the last time it changed +// ValidatorsInfo represents the latest validator set, or the last height it changed type ValidatorsInfo struct { ValidatorSet *types.ValidatorSet LastHeightChanged int } // Bytes serializes the ValidatorsInfo using go-wire -func (vi *ValidatorsInfo) Bytes() []byte { - return wire.BinaryBytes(*vi) +func (valInfo *ValidatorsInfo) Bytes() []byte { + return wire.BinaryBytes(*valInfo) } //------------------------------------------------------------------------ @@ -353,6 +375,7 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) { } } + // we do not need indexer during replay and in tests return &State{ db: db, @@ -365,7 +388,7 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) { Validators: types.NewValidatorSet(validators), LastValidators: types.NewValidatorSet(nil), AppHash: genDoc.AppHash, - TxIndexer: &null.TxIndex{}, // we do not need indexer during replay and in tests + TxIndexer: &null.TxIndex{}, LastHeightValidatorsChanged: 1, }, nil } diff --git a/state/state_test.go b/state/state_test.go index 8ac2eada..7bb43afa 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -7,15 +7,16 @@ import ( "github.com/stretchr/testify/assert" - cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/types" - abci "github.com/tendermint/abci/types" + crypto "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/types" ) // setupTestCase does setup common to all test cases @@ -31,22 +32,29 @@ func setupTestCase(t *testing.T) (func(t *testing.T), dbm.DB, *State) { return tearDown, stateDB, state } +// TestStateCopy tests the correct copying behaviour of State. func TestStateCopy(t *testing.T) { tearDown, _, state := setupTestCase(t) defer tearDown(t) + // nolint: vetshadow assert := assert.New(t) stateCopy := state.Copy() assert.True(state.Equals(stateCopy), - cmn.Fmt("expected state and its copy to be identical. got %v\n expected %v\n", stateCopy, state)) + cmn.Fmt(`expected state and its copy to be identical. got %v\n expected %v\n`, + stateCopy, state)) + stateCopy.LastBlockHeight++ - assert.False(state.Equals(stateCopy), cmn.Fmt("expected states to be different. got same %v", state)) + assert.False(state.Equals(stateCopy), cmn.Fmt(`expected states to be different. got same + %v`, state)) } +// TestStateSaveLoad tests saving and loading State from a db. func TestStateSaveLoad(t *testing.T) { tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) + // nolint: vetshadow assert := assert.New(t) state.LastBlockHeight++ @@ -54,12 +62,15 @@ func TestStateSaveLoad(t *testing.T) { loadedState := LoadState(stateDB) assert.True(state.Equals(loadedState), - cmn.Fmt("expected state and its copy to be identical. got %v\n expected %v\n", loadedState, state)) + cmn.Fmt(`expected state and its copy to be identical. got %v\n expected %v\n`, + loadedState, state)) } +// TestABCIResponsesSaveLoad tests saving and loading ABCIResponses. func TestABCIResponsesSaveLoad(t *testing.T) { tearDown, _, state := setupTestCase(t) defer tearDown(t) + // nolint: vetshadow assert := assert.New(t) state.LastBlockHeight++ @@ -78,17 +89,20 @@ func TestABCIResponsesSaveLoad(t *testing.T) { abciResponses.txs = nil state.SaveABCIResponses(abciResponses) - abciResponses2 := state.LoadABCIResponses() - assert.Equal(abciResponses, abciResponses2, - cmn.Fmt("ABCIResponses don't match: Got %v, Expected %v", abciResponses2, abciResponses)) + loadedAbciResponses := state.LoadABCIResponses() + assert.Equal(abciResponses, loadedAbciResponses, + cmn.Fmt(`ABCIResponses don't match: Got %v, Expected %v`, loadedAbciResponses, + abciResponses)) } +// TestValidatorSimpleSaveLoad tests saving and loading validators. func TestValidatorSimpleSaveLoad(t *testing.T) { tearDown, _, state := setupTestCase(t) defer tearDown(t) + // nolint: vetshadow assert := assert.New(t) - // cant load anything for height 0 + // can't load anything for height 0 v, err := state.LoadValidators(0) assert.IsType(ErrNoValSetForHeight{}, err, "expected err at height 0") @@ -116,9 +130,11 @@ func TestValidatorSimpleSaveLoad(t *testing.T) { assert.IsType(ErrNoValSetForHeight{}, err, "expected err at unknown height") } +// TestValidatorChangesSaveLoad tests saving and loading a validator set with changes. func TestValidatorChangesSaveLoad(t *testing.T) { tearDown, _, state := setupTestCase(t) defer tearDown(t) + // nolint: vetshadow assert := assert.New(t) // change vals at these heights @@ -171,7 +187,8 @@ func TestValidatorChangesSaveLoad(t *testing.T) { assert.Equal(v.Size(), 1, "validator set size is greater than 1: %d", v.Size()) addr, _ := v.GetByIndex(0) - assert.Equal(addr, testCase.vals.Address(), fmt.Sprintf("unexpected pubkey at height %d", testCase.height)) + assert.Equal(addr, testCase.vals.Address(), fmt.Sprintf(`unexpected pubkey at + height %d`, testCase.height)) } } diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index 1c311830..20bbbf1d 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -6,17 +6,17 @@ import ( "github.com/tendermint/tendermint/types" ) -// Indexer interface defines methods to index and search transactions. +// TxIndexer interface defines methods to index and search transactions. +// TODO: Add the ability to create an independent TxIndexer. type TxIndexer interface { - // Batch analyzes, indexes or stores a batch of transactions. - // - // NOTE We do not specify Index method for analyzing a single transaction + // AddBatch analyzes, indexes or stores a batch of transactions. + // NOTE: We do not specify Index method for analyzing a single transaction // here because it bears heavy perfomance loses. Almost all advanced indexers // support batching. AddBatch(b *Batch) error - // Tx returns specified transaction or nil if the transaction is not indexed + // Get return the transaction specified by hash or nil if the transaction is not indexed // or stored. Get(hash []byte) (*types.TxResult, error) } @@ -24,10 +24,9 @@ type TxIndexer interface { //---------------------------------------------------- // Txs are written as a batch -// A Batch groups together multiple Index operations you would like performed -// at the same time. The Batch structure is NOT thread-safe. You should only -// perform operations on a batch from a single thread at a time. Once batch -// execution has started, you may not modify it. +// Batch groups together multiple Index operations you would like performed +// at the same time. +// NOTE: Bach is NOT thread-safe and should not be modified after starting its execution. type Batch struct { Ops []types.TxResult } @@ -39,7 +38,7 @@ func NewBatch(n int) *Batch { } } -// Index adds or updates entry for the given result.Index. +// Add or update an entry for the given result.Index. func (b *Batch) Add(result types.TxResult) error { b.Ops[result.Index] = result return nil diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index 8f684c4a..db075e54 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -4,14 +4,16 @@ import ( "bytes" "fmt" - db "github.com/tendermint/tmlibs/db" "github.com/tendermint/go-wire" + + db "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" ) // TxIndex is the simplest possible indexer, backed by Key-Value storage (levelDB). -// It could only index transaction by its identifier. +// It can only index transaction by its identifier. type TxIndex struct { store db.DB } @@ -44,7 +46,7 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { return txResult, nil } -// Batch writes a batch of transactions into the TxIndex storage. +// AddBatch writes a batch of transactions into the TxIndex storage. func (txi *TxIndex) AddBatch(b *txindex.Batch) error { storeBatch := txi.store.NewBatch() for _, result := range b.Ops { diff --git a/state/txindex/null/null.go b/state/txindex/null/null.go index 4999bbde..38027883 100644 --- a/state/txindex/null/null.go +++ b/state/txindex/null/null.go @@ -10,12 +10,12 @@ import ( // TxIndex acts as a /dev/null. type TxIndex struct{} -// Tx panics. +// Get with a hash panics. func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`) } -// Batch returns nil. +// AddBatch returns nil. func (txi *TxIndex) AddBatch(batch *txindex.Batch) error { return nil } diff --git a/types/block.go b/types/block.go index c8cdf81a..984aaf5d 100644 --- a/types/block.go +++ b/types/block.go @@ -14,15 +14,15 @@ import ( "github.com/tendermint/tmlibs/merkle" ) -// Block defines the atomic unit of a Tendermint blockchain +// Block defines the atomic unit of a Tendermint blockchain. type Block struct { *Header `json:"header"` *Data `json:"data"` LastCommit *Commit `json:"last_commit"` } -// MakeBlock returns a new block and corresponding part set from the given information -// TODO: version +// MakeBlock returns a new block and corresponding part set from the given information. +// TODO: Add version information to the Block struct. func MakeBlock(height int, chainID string, txs []Tx, commit *Commit, prevBlockID BlockID, valHash, appHash []byte, partSize int) (*Block, *PartSet) { block := &Block{