Merge pull request #205 from tendermint/develop

Variety of optimization and some dev features
This commit is contained in:
Jae Kwon 2016-04-28 21:24:13 -07:00
commit 46d2a13288
25 changed files with 279 additions and 214 deletions

View File

@ -1,117 +0,0 @@
package config
import (
"github.com/naoina/toml"
"sync"
"time"
. "github.com/tendermint/go-common"
)
type Config interface {
Get(key string) interface{}
GetBool(key string) bool
GetFloat64(key string) float64
GetInt(key string) int
GetString(key string) string
GetStringMap(key string) map[string]interface{}
GetStringMapString(key string) map[string]string
GetStringSlice(key string) []string
GetTime(key string) time.Time
IsSet(key string) bool
Set(key string, value interface{})
}
type MapConfig struct {
required map[string]struct{} // blows up if trying to use before setting.
data map[string]interface{}
}
func ReadMapConfigFromFile(filePath string) (MapConfig, error) {
var configData = make(map[string]interface{})
fileBytes := MustReadFile(filePath)
err := toml.Unmarshal(fileBytes, configData)
if err != nil {
return MapConfig{}, err
}
return NewMapConfig(configData), nil
}
func NewMapConfig(data map[string]interface{}) MapConfig {
if data == nil {
data = make(map[string]interface{})
}
return MapConfig{
required: make(map[string]struct{}),
data: data,
}
}
func (cfg MapConfig) Get(key string) interface{} {
if _, ok := cfg.required[key]; ok {
PanicSanity(Fmt("config key %v is required but was not set.", key))
}
return cfg.data[key]
}
func (cfg MapConfig) GetBool(key string) bool { return cfg.Get(key).(bool) }
func (cfg MapConfig) GetFloat64(key string) float64 { return cfg.Get(key).(float64) }
func (cfg MapConfig) GetInt(key string) int { return cfg.Get(key).(int) }
func (cfg MapConfig) GetString(key string) string { return cfg.Get(key).(string) }
func (cfg MapConfig) GetStringMap(key string) map[string]interface{} {
return cfg.Get(key).(map[string]interface{})
}
func (cfg MapConfig) GetStringMapString(key string) map[string]string {
return cfg.Get(key).(map[string]string)
}
func (cfg MapConfig) GetStringSlice(key string) []string { return cfg.Get(key).([]string) }
func (cfg MapConfig) GetTime(key string) time.Time { return cfg.Get(key).(time.Time) }
func (cfg MapConfig) IsSet(key string) bool { _, ok := cfg.data[key]; return ok }
func (cfg MapConfig) Set(key string, value interface{}) {
delete(cfg.required, key)
cfg.data[key] = value
}
func (cfg MapConfig) SetDefault(key string, value interface{}) {
delete(cfg.required, key)
if cfg.IsSet(key) {
return
}
cfg.data[key] = value
}
func (cfg MapConfig) SetRequired(key string) {
if cfg.IsSet(key) {
return
}
cfg.required[key] = struct{}{}
}
//--------------------------------------------------------------------------------
// A little convenient hack to notify listeners upon config changes.
type Configurable func(Config)
var mtx sync.Mutex
var globalConfig Config
var confs []Configurable
func OnConfig(conf func(Config)) {
mtx.Lock()
defer mtx.Unlock()
confs = append(confs, conf)
if globalConfig != nil {
conf(globalConfig)
}
}
func ApplyConfig(config Config) {
mtx.Lock()
globalConfig = config
confsCopy := make([]Configurable, len(confs))
copy(confsCopy, confs)
mtx.Unlock()
for _, conf := range confsCopy {
conf(config)
}
}

View File

@ -70,6 +70,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("cswal_light", false)
mapConfig.SetDefault("block_size", 10000)
mapConfig.SetDefault("disable_data_hash", false)
mapConfig.SetDefault("timeout_propose", 3000)
mapConfig.SetDefault("timeout_propose_delta", 500)
mapConfig.SetDefault("timeout_prevote", 1000)

View File

@ -88,6 +88,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("cswal_light", false)
mapConfig.SetDefault("block_size", 10000)
mapConfig.SetDefault("disable_data_hash", false)
mapConfig.SetDefault("timeout_propose", 100)
mapConfig.SetDefault("timeout_propose_delta", 1)
mapConfig.SetDefault("timeout_prevote", 1)

View File

@ -91,9 +91,9 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 100,
},
&p2p.ChannelDescriptor{
ID: DataChannel,
Priority: 2,
SendQueueCapacity: 50,
ID: DataChannel, // maybe split between gossiping current block and catchup stuff
Priority: 10, // once we gossip the whole block there's nothing left to send until next height or round
SendQueueCapacity: 100,
RecvBufferCapacity: 50 * 4096,
},
&p2p.ChannelDescriptor{

View File

@ -86,13 +86,12 @@ func TestReplayCatchup(t *testing.T) {
t.Fatalf("Error on catchup replay %v", err)
}
after := time.After(time.Second * 2)
after := time.After(time.Second * 15)
select {
case <-newBlockCh:
case <-after:
t.Fatal("Timed out waiting for new block")
}
}
func openWAL(t *testing.T, cs *ConsensusState, file string) {

View File

@ -655,7 +655,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
err = cs.setProposal(msg.Proposal)
case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
_, err = cs.addProposalBlockPart(msg.Height, msg.Part)
_, err = cs.addProposalBlockPart(msg.Height, msg.Part, peerKey != "")
if err != nil && msg.Round != cs.Round {
err = nil
}
@ -835,8 +835,8 @@ func (cs *ConsensusState) decideProposal(height, round int) {
part := blockParts.GetPart(i)
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
}
log.Info("Signed and sent proposal", "height", height, "round", round, "proposal", proposal)
log.Debug(Fmt("Signed and sent proposal block: %v", block))
log.Info("Signed proposal", "height", height, "round", round, "proposal", proposal)
log.Debug(Fmt("Signed proposal block: %v", block))
} else {
log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err)
}
@ -1206,6 +1206,7 @@ func (cs *ConsensusState) finalizeCommit(height int) {
// Fire off event for new block.
// TODO: Handle app failure. See #177
cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block})
cs.evsw.FireEvent(types.EventStringNewBlockHeader(), types.EventDataNewBlockHeader{block.Header})
// Create a copy of the state for staging
stateCopy := cs.state.Copy()
@ -1291,7 +1292,7 @@ func (cs *ConsensusState) setProposal(proposal *types.Proposal) error {
// NOTE: block is not necessarily valid.
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit, once we have the full block.
func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) {
func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part, verify bool) (added bool, err error) {
// Blocks might be reused, so round mismatch is OK
if cs.Height != height {
return false, nil
@ -1302,7 +1303,7 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad
return false, nil // TODO: bad peer? Return error?
}
added, err = cs.ProposalBlockParts.AddPart(part)
added, err = cs.ProposalBlockParts.AddPart(part, verify)
if err != nil {
return added, err
}

View File

@ -491,7 +491,7 @@ func TestLockPOLRelock(t *testing.T) {
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1)
newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1)
newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlock(), 1)
newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlockHeader(), 1)
log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound)
@ -577,14 +577,14 @@ func TestLockPOLRelock(t *testing.T) {
_, _ = <-voteCh, <-voteCh
be := <-newBlockCh
b := be.(types.EventDataNewBlock)
b := be.(types.EventDataNewBlockHeader)
re = <-newRoundCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
if rs.Height != 2 {
t.Fatal("Expected height to increment")
}
if !bytes.Equal(b.Block.Hash(), propBlockHash) {
if !bytes.Equal(b.Header.Hash(), propBlockHash) {
t.Fatal("Expected new block to be proposal block")
}
}

13
consensus/version.go Normal file
View File

@ -0,0 +1,13 @@
package consensus
import (
. "github.com/tendermint/go-common"
)
// kind of arbitrary
var Spec = "1" // async
var Major = "0" //
var Minor = "2" // replay refactor
var Revision = "1" // round state fix
var Version = Fmt("v%s/%s.%s.%s", Spec, Major, Minor, Revision)

View File

@ -61,21 +61,23 @@ func NewWAL(file string, light bool) (*WAL, error) {
}
// called in newStep and for each pass in receiveRoutine
func (wal *WAL) Save(msg ConsensusLogMessageInterface) {
func (wal *WAL) Save(clm ConsensusLogMessageInterface) {
if wal != nil {
if wal.light {
if m, ok := msg.(msgInfo); ok {
if _, ok := m.Msg.(*BlockPartMessage); ok {
// in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
if mi, ok := clm.(msgInfo); ok {
_ = mi
if mi.PeerKey != "" {
return
}
}
}
var n int
var err error
wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, wal.fp, &n, &err)
wire.WriteJSON(ConsensusLogMessage{time.Now(), clm}, wal.fp, &n, &err)
wire.WriteTo([]byte("\n"), wal.fp, &n, &err) // one message per line
if err != nil {
PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, msg))
PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, clm))
}
}
}

34
glide.lock generated
View File

@ -1,18 +1,18 @@
hash: f3eab3f91c9d2c07574e8ec6f2f5d56bd946af1b061533a0baf9db8765f97a51
updated: 2016-03-05T17:20:40.721925401-05:00
updated: 2016-03-24T16:39:27.330201414-04:00
imports:
- name: github.com/gogo/protobuf
version: f4cc07910fc38f5b6b8d6e75d7457cf504157b6c
version: 4168943e65a2802828518e95310aeeed6d84c4e5
subpackages:
- proto
- name: github.com/golang/protobuf
version: c75fbf01dc6cb73649c4cd4326182c3e44aa9dbb
version: 8d92cf5fc15a4382f8964b08e1f42a75c0591aa3
subpackages:
- proto
- name: github.com/golang/snappy
version: 5f1c01d9f64b941dd9582c638279d046eda6ca31
- name: github.com/gorilla/websocket
version: c45a635370221f34fea2d5163fd156fcb4e38e8a
version: e2e3d8414d0fbae04004f151979f4e27c6747fe7
- name: github.com/inconshreveable/log15
version: 210d6fdc4d979ef6579778f1b6ed84571454abb4
subpackages:
@ -33,7 +33,7 @@ imports:
- name: github.com/spf13/pflag
version: 7f60f83a2c81bc3c3c0d5297f61ddfa68da9d3b7
- name: github.com/syndtr/goleveldb
version: ad0d8b2ab58a55ed5c58073aa46451d5e1ca1280
version: 917f41c560270110ceb73c5b38be2a9127387071
subpackages:
- leveldb
- leveldb/errors
@ -59,23 +59,23 @@ imports:
- name: github.com/tendermint/go-clist
version: 634527f5b60fd7c71ca811262493df2ad65ee0ca
- name: github.com/tendermint/go-common
version: 1559ae1ac90c88b1373ff114c409399c5a1cedac
version: dcfa46af1341d03b80d32e4901019d1668b978b9
- name: github.com/tendermint/go-config
version: c077af2c1ecf584fb797fd1956758545b25d952b
version: c47b67203b070d8bea835a928d50cb739972c48a
- name: github.com/tendermint/go-crypto
version: 76ba23e4c0c627b8c66d1f97b6a18dc77f4f0297
version: 3f0d9b3f29f30e5d0cbc2cef04fa45e5a606c622
- name: github.com/tendermint/go-db
version: a7878f1d0d8eaebf15f87bc2df15f7a1088cce7f
- name: github.com/tendermint/go-events
version: 7b75ca7bb55aa25e9ef765eb8c0b69486b227357
- name: github.com/tendermint/go-logger
version: 4901b71ade2b834ca0f4c2ca69edb96792dca05b
version: 84391b36d3f5960e691c688d06b768708f0fa2f3
- name: github.com/tendermint/go-logio
version: 04f3aa0a3b38d06dcadefbafd988c8b85e499225
- name: github.com/tendermint/go-merkle
version: 67b535ce9633be7df575dc3a7833fa2301020c25
version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8
- name: github.com/tendermint/go-p2p
version: 7f6aad20fbad6ef1a132d5a8bebd18f3521fff1a
version: 10619248c665dee6b8f81455f7f27ab93d5ec366
subpackages:
- upnp
- name: github.com/tendermint/go-rpc
@ -85,27 +85,29 @@ imports:
- types
- server
- name: github.com/tendermint/go-wire
version: 9acb294893c790427e2b9abf2877e69690cd5b6c
version: 7a15dd53dfdecc0f967676edcd6b335c59344c83
- name: github.com/tendermint/log15
version: 6e460758f10ef42a4724b8e4a82fee59aaa0e41d
- name: github.com/tendermint/tmsp
version: 72540f9cac4840989cb05b147cc89be8cd91f043
version: 1dfc6950dddf47ff397e670a67d405d25da138ea
subpackages:
- types
- client
- example/dummy
- example/nil
- client
- name: golang.org/x/crypto
version: 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3
version: c197bcf24cde29d3f73c7b4ac6fd41f4384e8af6
subpackages:
- ripemd160
- nacl/box
- nacl/secretbox
- openpgp/armor
- curve25519
- salsa20/salsa
- poly1305
- openpgp/errors
- name: golang.org/x/sys
version: 7a56174f0086b32866ebd746a794417edbc678a1
version: afce3de5756ca82699128ebae46ac95ad59d6297
subpackages:
- unix
devImports: []

View File

@ -77,6 +77,10 @@ func NewMempool(proxyAppConn proxy.AppConn) *Mempool {
return mempool
}
func (mem *Mempool) Size() int {
return mem.txs.Len()
}
// Return the first element of mem.txs for peer goroutines to call .NextWait() on.
// Blocks until txs has elements.
func (mem *Mempool) TxsFrontWait() *clist.CElement {
@ -197,9 +201,11 @@ func (mem *Mempool) Reap(maxTxs int) []types.Tx {
return txs
}
// maxTxs: 0 means uncapped
// maxTxs: -1 means uncapped, 0 means none
func (mem *Mempool) collectTxs(maxTxs int) []types.Tx {
if maxTxs == 0 {
return []types.Tx{}
} else if maxTxs < 0 {
maxTxs = mem.txs.Len()
}
txs := make([]types.Tx, 0, MinInt(mem.txs.Len(), maxTxs))

View File

@ -48,7 +48,7 @@ func TestSerialReap(t *testing.T) {
}
reapCheck := func(exp int) {
txs := mempool.Reap(0)
txs := mempool.Reap(-1)
if len(txs) != exp {
t.Fatalf("Expected to reap %v txs but got %v", exp, len(txs))
}

View File

@ -224,6 +224,7 @@ func makeNodeInfo(sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
Other: []string{
Fmt("wire_version=%v", wire.Version),
Fmt("p2p_version=%v", p2p.Version),
Fmt("consensus_version=%v", consensus.Version),
Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
},
}

67
rpc/core/dev.go Normal file
View File

@ -0,0 +1,67 @@
package core
import (
"fmt"
"os"
"runtime/pprof"
"strconv"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)
func UnsafeSetConfig(typ, key, value string) (*ctypes.ResultUnsafeSetConfig, error) {
switch typ {
case "string":
config.Set(key, value)
case "int":
val, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("non-integer value found. key:%s; value:%s; err:%v", key, value, err)
}
config.Set(key, val)
case "bool":
switch value {
case "true":
config.Set(key, true)
case "false":
config.Set(key, false)
default:
return nil, fmt.Errorf("bool value must be true or false. got %s", value)
}
default:
return nil, fmt.Errorf("Unknown type %s", typ)
}
return &ctypes.ResultUnsafeSetConfig{}, nil
}
var profFile *os.File
func UnsafeStartCPUProfiler(filename string) (*ctypes.ResultUnsafeProfile, error) {
var err error
profFile, err = os.Create(filename)
if err != nil {
return nil, err
}
err = pprof.StartCPUProfile(profFile)
if err != nil {
return nil, err
}
return &ctypes.ResultUnsafeProfile{}, nil
}
func UnsafeStopCPUProfiler() (*ctypes.ResultUnsafeProfile, error) {
pprof.StopCPUProfile()
profFile.Close()
return &ctypes.ResultUnsafeProfile{}, nil
}
func UnsafeWriteHeapProfile(filename string) (*ctypes.ResultUnsafeProfile, error) {
memProfFile, err := os.Create(filename)
if err != nil {
return nil, err
}
pprof.WriteHeapProfile(memProfFile)
memProfFile.Close()
return &ctypes.ResultUnsafeProfile{}, nil
}

View File

@ -39,3 +39,7 @@ func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
txs := mempoolReactor.Mempool.Reap(0)
return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil
}
func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
return &ctypes.ResultUnconfirmedTxs{N: mempoolReactor.Mempool.Size()}, nil
}

View File

@ -22,8 +22,12 @@ var Routes = map[string]*rpc.RPCFunc{
"broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSyncResult, "tx"),
"broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsyncResult, "tx"),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""),
"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""),
"unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"),
"unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"),
"unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfilerResult, "filename"),
"unsafe_stop_cpu_profiler": rpc.NewRPCFunc(UnsafeStopCPUProfilerResult, ""),
"unsafe_write_heap_profile": rpc.NewRPCFunc(UnsafeWriteHeapProfileResult, "filename"),
}
func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) {
@ -114,6 +118,14 @@ func UnconfirmedTxsResult() (ctypes.TMResult, error) {
}
}
func NumUnconfirmedTxsResult() (ctypes.TMResult, error) {
if r, err := NumUnconfirmedTxs(); err != nil {
return nil, err
} else {
return r, nil
}
}
func BroadcastTxSyncResult(tx []byte) (ctypes.TMResult, error) {
if r, err := BroadcastTxSync(tx); err != nil {
return nil, err
@ -137,3 +149,27 @@ func UnsafeSetConfigResult(typ, key, value string) (ctypes.TMResult, error) {
return r, nil
}
}
func UnsafeStartCPUProfilerResult(filename string) (ctypes.TMResult, error) {
if r, err := UnsafeStartCPUProfiler(filename); err != nil {
return nil, err
} else {
return r, nil
}
}
func UnsafeStopCPUProfilerResult() (ctypes.TMResult, error) {
if r, err := UnsafeStopCPUProfiler(); err != nil {
return nil, err
} else {
return r, nil
}
}
func UnsafeWriteHeapProfileResult(filename string) (ctypes.TMResult, error) {
if r, err := UnsafeWriteHeapProfile(filename); err != nil {
return nil, err
} else {
return r, nil
}
}

View File

@ -1,9 +1,6 @@
package core
import (
"fmt"
"strconv"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
)
@ -31,28 +28,3 @@ func Status() (*ctypes.ResultStatus, error) {
LatestBlockHeight: latestHeight,
LatestBlockTime: latestBlockTime}, nil
}
func UnsafeSetConfig(typ, key, value string) (*ctypes.ResultUnsafeSetConfig, error) {
switch typ {
case "string":
config.Set(key, value)
case "int":
val, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("non-integer value found. key:%s; value:%s; err:%v", key, value, err)
}
config.Set(key, val)
case "bool":
switch value {
case "true":
config.Set(key, true)
case "false":
config.Set(key, false)
default:
return nil, fmt.Errorf("bool value must be true or false. got %s", value)
}
default:
return nil, fmt.Errorf("Unknown type %s", typ)
}
return &ctypes.ResultUnsafeSetConfig{}, nil
}

View File

@ -70,6 +70,8 @@ type ResultUnconfirmedTxs struct {
type ResultUnsafeSetConfig struct{}
type ResultUnsafeProfile struct{}
type ResultSubscribe struct {
}
@ -109,7 +111,10 @@ const (
ResultTypeEvent = byte(0x82)
// 0xa bytes for testing
ResultTypeUnsafeSetConfig = byte(0xa0)
ResultTypeUnsafeSetConfig = byte(0xa0)
ResultTypeUnsafeStartCPUProfiler = byte(0xa1)
ResultTypeUnsafeStopCPUProfiler = byte(0xa2)
ResultTypeUnsafeWriteHeapProfile = byte(0xa3)
)
type TMResult interface {
@ -133,4 +138,7 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe},
wire.ConcreteType{&ResultEvent{}, ResultTypeEvent},
wire.ConcreteType{&ResultUnsafeSetConfig{}, ResultTypeUnsafeSetConfig},
wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStartCPUProfiler},
wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStopCPUProfiler},
wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeWriteHeapProfile},
)

View File

@ -2,4 +2,4 @@ package core
// a single integer is sufficient here
const Version = "2" // add DialSeeds; re-organize type bytes
const Version = "3" // rpc routes for profiling, setting config

View File

@ -329,19 +329,20 @@ type Data struct {
// Txs that will be applied by state @ block.Height+1.
// NOTE: not all txs here are valid. We're just agreeing on the order first.
// This means that block.AppHash does not include these txs.
Txs []Tx `json:"txs"`
Txs Txs `json:"txs"`
// Volatile
hash []byte
}
func (data *Data) Hash() []byte {
if config.GetBool("disable_data_hash") {
// we could use the part_set hash instead
data.hash = []byte{}
return data.hash
}
if data.hash == nil {
txs := make([]interface{}, len(data.Txs))
for i, tx := range data.Txs {
txs[i] = tx
}
data.hash = merkle.SimpleHashFromBinaries(txs) // NOTE: leaves are TxIDs.
data.hash = data.Txs.Hash() // NOTE: leaves of merkle tree are TxIDs
}
return data.hash
}

View File

@ -16,6 +16,7 @@ func EventStringDupeout() string { return "Dupeout" }
func EventStringFork() string { return "Fork" }
func EventStringNewBlock() string { return "NewBlock" }
func EventStringNewBlockHeader() string { return "NewBlockHeader" }
func EventStringNewRound() string { return "NewRound" }
func EventStringNewRoundStep() string { return "NewRoundStep" }
func EventStringTimeoutPropose() string { return "TimeoutPropose" }
@ -36,9 +37,10 @@ type TMEventData interface {
}
const (
EventDataTypeNewBlock = byte(0x01)
EventDataTypeFork = byte(0x02)
EventDataTypeTx = byte(0x03)
EventDataTypeNewBlock = byte(0x01)
EventDataTypeFork = byte(0x02)
EventDataTypeTx = byte(0x03)
EventDataTypeNewBlockHeader = byte(0x04)
EventDataTypeRoundState = byte(0x11)
EventDataTypeVote = byte(0x12)
@ -47,6 +49,7 @@ const (
var _ = wire.RegisterInterface(
struct{ TMEventData }{},
wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock},
wire.ConcreteType{EventDataNewBlockHeader{}, EventDataTypeNewBlockHeader},
// wire.ConcreteType{EventDataFork{}, EventDataTypeFork },
wire.ConcreteType{EventDataTx{}, EventDataTypeTx},
wire.ConcreteType{EventDataRoundState{}, EventDataTypeRoundState},
@ -60,6 +63,11 @@ type EventDataNewBlock struct {
Block *Block `json:"block"`
}
// light weight event for benchmarking
type EventDataNewBlockHeader struct {
Header *Header `json:"header"`
}
// All txs fire EventDataTx
type EventDataTx struct {
Tx Tx `json:"tx"`
@ -84,7 +92,8 @@ type EventDataVote struct {
Vote *Vote
}
func (_ EventDataNewBlock) AssertIsTMEventData() {}
func (_ EventDataTx) AssertIsTMEventData() {}
func (_ EventDataRoundState) AssertIsTMEventData() {}
func (_ EventDataVote) AssertIsTMEventData() {}
func (_ EventDataNewBlock) AssertIsTMEventData() {}
func (_ EventDataNewBlockHeader) AssertIsTMEventData() {}
func (_ EventDataTx) AssertIsTMEventData() {}
func (_ EventDataRoundState) AssertIsTMEventData() {}
func (_ EventDataVote) AssertIsTMEventData() {}

View File

@ -15,7 +15,7 @@ import (
)
const (
partSize = 4096 // 4KB
partSize = 65536 // 64KB ... 4096 // 4KB
)
var (
@ -188,7 +188,7 @@ func (ps *PartSet) Total() int {
return ps.total
}
func (ps *PartSet) AddPart(part *Part) (bool, error) {
func (ps *PartSet) AddPart(part *Part, verify bool) (bool, error) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
@ -203,8 +203,10 @@ func (ps *PartSet) AddPart(part *Part) (bool, error) {
}
// Check hash proof
if !part.Proof.Verify(part.Index, ps.total, part.Hash(), ps.Hash()) {
return false, ErrPartSetInvalidProof
if verify {
if !part.Proof.Verify(part.Index, ps.total, part.Hash(), ps.Hash()) {
return false, ErrPartSetInvalidProof
}
}
// Add part
@ -228,11 +230,42 @@ func (ps *PartSet) GetReader() io.Reader {
if !ps.IsComplete() {
PanicSanity("Cannot GetReader() on incomplete PartSet")
}
buf := []byte{}
for _, part := range ps.parts {
buf = append(buf, part.Bytes...)
return NewPartSetReader(ps.parts)
}
type PartSetReader struct {
i int
parts []*Part
reader *bytes.Reader
}
func NewPartSetReader(parts []*Part) *PartSetReader {
return &PartSetReader{
i: 0,
parts: parts,
reader: bytes.NewReader(parts[0].Bytes),
}
return bytes.NewReader(buf)
}
func (psr *PartSetReader) Read(p []byte) (n int, err error) {
readerLen := psr.reader.Len()
if readerLen >= len(p) {
return psr.reader.Read(p)
} else if readerLen > 0 {
n1, err := psr.Read(p[:readerLen])
if err != nil {
return n1, err
}
n2, err := psr.Read(p[readerLen:])
return n1 + n2, err
}
psr.i += 1
if psr.i >= len(psr.parts) {
return 0, io.EOF
}
psr.reader = bytes.NewReader(psr.parts[psr.i].Bytes)
return psr.Read(p)
}
func (ps *PartSet) StringShort() string {

View File

@ -30,7 +30,7 @@ func TestBasicPartSet(t *testing.T) {
for i := 0; i < partSet.Total(); i++ {
part := partSet.GetPart(i)
//t.Logf("\n%v", part)
added, err := partSet2.AddPart(part)
added, err := partSet2.AddPart(part, true)
if !added || err != nil {
t.Errorf("Failed to add part %v, error: %v", i, err)
}
@ -70,7 +70,7 @@ func TestWrongProof(t *testing.T) {
// Test adding a part with wrong trail.
part := partSet.GetPart(0)
part.Proof.Aunts[0][0] += byte(0x01)
added, err := partSet2.AddPart(part)
added, err := partSet2.AddPart(part, true)
if added || err == nil {
t.Errorf("Expected to fail adding a part with bad trail.")
}
@ -78,7 +78,7 @@ func TestWrongProof(t *testing.T) {
// Test adding a part with wrong bytes.
part = partSet.GetPart(1)
part.Bytes[0] += byte(0x01)
added, err = partSet2.AddPart(part)
added, err = partSet2.AddPart(part, true)
if added || err == nil {
t.Errorf("Expected to fail adding a part with bad bytes.")
}

View File

@ -1,3 +1,24 @@
package types
import (
"github.com/tendermint/go-merkle"
)
type Tx []byte
type Txs []Tx
func (txs Txs) Hash() []byte {
// Recursive impl.
// Copied from go-merkle to avoid allocations
switch len(txs) {
case 0:
return nil
case 1:
return merkle.SimpleHashFromBinary(txs[0])
default:
left := Txs(txs[:(len(txs)+1)/2]).Hash()
right := Txs(txs[(len(txs)+1)/2:]).Hash()
return merkle.SimpleHashFromTwoHashes(left, right)
}
}

View File

@ -128,17 +128,16 @@ func (voteSet *VoteSet) addVote(val *Validator, valIndex int, vote *Vote) (bool,
return false, 0, ErrVoteUnexpectedStep
}
// Check signature.
if !val.PubKey.VerifyBytes(SignBytes(config.GetString("chain_id"), vote), vote.Signature) {
// Bad signature.
return false, 0, ErrVoteInvalidSignature
}
// If vote already exists, return false.
if existingVote := voteSet.votes[valIndex]; existingVote != nil {
if bytes.Equal(existingVote.BlockHash, vote.BlockHash) {
return false, valIndex, nil
} else {
// Check signature.
if !val.PubKey.VerifyBytes(SignBytes(config.GetString("chain_id"), vote), vote.Signature) {
// Bad signature.
return false, 0, ErrVoteInvalidSignature
}
return false, valIndex, &ErrVoteConflictingSignature{
VoteA: existingVote,
VoteB: vote,
@ -146,6 +145,12 @@ func (voteSet *VoteSet) addVote(val *Validator, valIndex int, vote *Vote) (bool,
}
}
// Check signature.
if !val.PubKey.VerifyBytes(SignBytes(config.GetString("chain_id"), vote), vote.Signature) {
// Bad signature.
return false, 0, ErrVoteInvalidSignature
}
// Add vote.
voteSet.votes[valIndex] = vote
voteSet.votesBitArray.SetIndex(valIndex, true)