config: cswal_light, mempool_broadcast, mempool_reap

This commit is contained in:
Ethan Buchman 2016-03-01 16:04:19 -05:00
parent 69d906f7dd
commit 3891e4d66d
12 changed files with 51 additions and 7 deletions

View File

@ -67,6 +67,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/data/cswal")
mapConfig.SetDefault("cswal_light", false)
mapConfig.SetDefault("block_size", 10000)
mapConfig.SetDefault("timeout_propose", 3000)
@ -77,6 +78,8 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("timeout_precommit_delta", 500)
mapConfig.SetDefault("timeout_commit", 1000)
mapConfig.SetDefault("mempool_recheck", true)
mapConfig.SetDefault("mempool_broadcast", true)
mapConfig.SetDefault("mempool_reap", true)
return mapConfig
}

View File

@ -91,6 +91,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/data/cswal")
mapConfig.SetDefault("cswal_light", false)
mapConfig.SetDefault("block_size", 10000)
mapConfig.SetDefault("timeout_propose", 100)
@ -101,6 +102,8 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("timeout_precommit_delta", 1)
mapConfig.SetDefault("timeout_commit", 1)
mapConfig.SetDefault("mempool_recheck", true)
mapConfig.SetDefault("mempool_broadcast", true)
mapConfig.SetDefault("mempool_reap", true)
return mapConfig
}

View File

@ -63,7 +63,7 @@ func TestReplayCatchup(t *testing.T) {
func openWAL(t *testing.T, cs *ConsensusState, file string) {
// open the wal
wal, err := NewWAL(file)
wal, err := NewWAL(file, config.GetBool("cswal_light"))
if err != nil {
t.Fatal(err)
}

View File

@ -328,7 +328,7 @@ func (cs *ConsensusState) OnStop() {
func (cs *ConsensusState) OpenWAL(file string) (err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
wal, err := NewWAL(file)
wal, err := NewWAL(file, config.GetBool("cswal_light"))
if err != nil {
return err
}
@ -655,7 +655,6 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
err = cs.setProposal(msg.Proposal)
case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
// if we're the only validator, the enterPrevote may take us through to the next round
_, err = cs.addProposalBlockPart(msg.Height, msg.Part)
case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature
@ -675,7 +674,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
log.Warn("Unknown msg type", reflect.TypeOf(msg))
}
if err != nil {
log.Error("Error with msg", "type", reflect.TypeOf(msg), "error", err, "msg", msg)
log.Error("Error with msg", "type", reflect.TypeOf(msg), "peer", peerKey, "error", err, "msg", msg)
}
}

View File

@ -39,9 +39,11 @@ type WAL struct {
exists bool // if the file already existed (restarted process)
done chan struct{}
light bool // ignore block parts
}
func NewWAL(file string) (*WAL, error) {
func NewWAL(file string, light bool) (*WAL, error) {
var walExists bool
if _, err := os.Stat(file); err == nil {
walExists = true
@ -54,12 +56,20 @@ func NewWAL(file string) (*WAL, error) {
fp: fp,
exists: walExists,
done: make(chan struct{}),
light: light,
}, nil
}
// called in newStep and for each pass in receiveRoutine
func (wal *WAL) Save(msg ConsensusLogMessageInterface) {
if wal != nil {
if wal.light {
if m, ok := msg.(msgInfo); ok {
if _, ok := m.Msg.(*BlockPartMessage); ok {
return
}
}
}
var n int
var err error
wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, wal.fp, &n, &err)

View File

@ -30,7 +30,7 @@ func TestSeek(t *testing.T) {
}
f.Close()
wal, err := NewWAL(path.Join(os.TempDir(), name))
wal, err := NewWAL(path.Join(os.TempDir(), name), config.GetBool("cswal_light"))
if err != nil {
t.Fatal(err)
}

View File

@ -184,6 +184,10 @@ func (mem *Mempool) resCbRecheck(req *tmsp.Request, res *tmsp.Response) {
// Get the valid transactions remaining
func (mem *Mempool) Reap() []types.Tx {
if !config.GetBool("mempool_reap") {
return []types.Tx{}
}
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()

View File

@ -101,6 +101,10 @@ type Peer interface {
// TODO: Handle mempool or reactor shutdown?
// As is this routine may block forever if no new txs come in.
func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
if !config.GetBool("mempool_broadcast") {
return
}
var next *clist.CElement
for {
if !memR.IsRunning() {

View File

@ -58,8 +58,9 @@ func NewNode(privValidator *types.PrivValidator) *Node {
proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash)
proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash)
// add the chainid to the global config
// add the chainid and number of validators to the global config
config.Set("chain_id", state.ChainID)
config.Set("num_vals", state.Validators.Size())
// Generate node PrivKey
privKey := crypto.GenPrivKeyEd25519()

View File

@ -39,3 +39,8 @@ func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
txs := mempoolReactor.Mempool.Reap()
return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil
}
func TestStartMempool() (*ctypes.ResultTestStartMempool, error) {
config.Set("mempool_reap", true)
return &ctypes.ResultTestStartMempool{}, nil
}

View File

@ -20,6 +20,7 @@ var Routes = map[string]*rpc.RPCFunc{
"broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSyncResult, "tx"),
"broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsyncResult, "tx"),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""),
"test_start_mempool": rpc.NewRPCFunc(TestStartMempoolResult, ""), // move to test server ?
// subscribe/unsubscribe are reserved for websocket events.
}
@ -126,3 +127,11 @@ func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) {
return r, nil
}
}
func TestStartMempoolResult() (ctypes.TMResult, error) {
if r, err := TestStartMempool(); err != nil {
return nil, err
} else {
return r, nil
}
}

View File

@ -68,6 +68,8 @@ type ResultUnconfirmedTxs struct {
Txs []types.Tx `json:"txs"`
}
type ResultTestStartMempool struct{}
type ResultSubscribe struct {
}
@ -105,6 +107,9 @@ const (
ResultTypeSubscribe = byte(0x80)
ResultTypeUnsubscribe = byte(0x81)
ResultTypeEvent = byte(0x82)
// 0xa bytes for testing
ResultTypeTestStartMempool = byte(0xa0)
)
type TMResult interface {
@ -127,4 +132,5 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe},
wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe},
wire.ConcreteType{&ResultEvent{}, ResultTypeEvent},
wire.ConcreteType{&ResultTestStartMempool{}, ResultTypeTestStartMempool},
)