Merge pull request #271 from tendermint/release-0.7.1

Release 0.7.1
This commit is contained in:
Ethan Buchman 2016-09-10 21:14:44 -04:00 committed by GitHub
commit e36e79a474
53 changed files with 1153 additions and 369 deletions

View File

@ -44,7 +44,7 @@ type BlockchainReactor struct {
sw *p2p.Switch
state *sm.State
proxyAppConn proxy.AppConn // same as consensus.proxyAppConn
proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn
store *BlockStore
pool *BlockPool
fastSync bool
@ -55,7 +55,7 @@ type BlockchainReactor struct {
evsw *events.EventSwitch
}
func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConn, store *BlockStore, fastSync bool) *BlockchainReactor {
func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
if state.LastBlockHeight == store.Height()-1 {
store.height -= 1 // XXX HACK, make this better
}
@ -231,19 +231,22 @@ FOR_LOOP:
break SYNC_LOOP
} else {
bcR.pool.PopRequest()
// TODO: use ApplyBlock instead of Exec/Commit/SetAppHash/Save
err := bcR.state.ExecBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader)
if err != nil {
// TODO This is bad, are we zombie?
PanicQ(Fmt("Failed to process committed block: %v", err))
PanicQ(Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
// NOTE: we could improve performance if we
// didn't make the app commit to disk every block
// ... but we would need a way to get the hash without it persisting
res := bcR.proxyAppConn.CommitSync()
if res.IsErr() {
// TODO Handle gracefully.
PanicQ(Fmt("Failed to commit block at application: %v", res))
}
/*
err = bcR.proxyAppConn.CommitSync()
if err != nil {
// TODO Handle gracefully.
PanicQ(Fmt("Failed to commit block at application: %v", err))
}
*/
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
bcR.state.AppHash = res.Data
bcR.state.Save()
}
}

View File

@ -8,10 +8,6 @@ machine:
hosts:
circlehost: 127.0.0.1
localhost: 127.0.0.1
pre:
- curl -sSL https://s3.amazonaws.com/circle-downloads/install-circleci-docker.sh | sudo bash -s -- $DOCKER_VERSION
services:
- docker
checkout:
post:
@ -23,7 +19,10 @@ checkout:
dependencies:
override:
- echo $MACH_PREFIX $GOPATH $REPO $DOCKER_VERSION $DOCKER_MACHINE_VERSION
- curl -sSL https://s3.amazonaws.com/circle-downloads/install-circleci-docker.sh | sudo bash -s -- $DOCKER_VERSION
- sudo curl -sSL -o /usr/bin/docker-machine https://github.com/docker/machine/releases/download/v$DOCKER_MACHINE_VERSION/docker-machine-linux-x86_64; sudo chmod 0755 /usr/bin/docker-machine
- sudo start docker
- go version
- docker version
- docker-machine version

View File

@ -22,10 +22,7 @@ func reset_priv_validator() {
privValidatorFile := config.GetString("priv_validator_file")
if _, err := os.Stat(privValidatorFile); err == nil {
privValidator = types.LoadPrivValidator(privValidatorFile)
privValidator.LastHeight = 0
privValidator.LastRound = 0
privValidator.LastStep = 0
privValidator.Save()
privValidator.Reset()
log.Notice("Reset PrivValidator", "file", privValidatorFile)
} else {
privValidator = types.GenPrivValidator()

View File

@ -55,7 +55,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("proxy_app", "tcp://127.0.0.1:46658")
mapConfig.SetDefault("tmsp", "socket")
mapConfig.SetDefault("moniker", "anonymous")
mapConfig.SetDefault("node_laddr", "0.0.0.0:46656")
mapConfig.SetDefault("node_laddr", "tcp://0.0.0.0:46656")
mapConfig.SetDefault("seeds", "")
// mapConfig.SetDefault("seeds", "goldenalchemist.chaintest.net:46656")
mapConfig.SetDefault("fast_sync", true)
@ -65,11 +65,12 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("db_backend", "leveldb")
mapConfig.SetDefault("db_dir", rootDir+"/data")
mapConfig.SetDefault("log_level", "info")
mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657")
mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:46657")
mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/data/cswal")
mapConfig.SetDefault("cswal_light", false)
mapConfig.SetDefault("filter_peers", false)
mapConfig.SetDefault("block_size", 10000)
mapConfig.SetDefault("disable_data_hash", false)
@ -92,12 +93,12 @@ var defaultConfigTmpl = `# This is a TOML config file.
proxy_app = "tcp://127.0.0.1:46658"
moniker = "__MONIKER__"
node_laddr = "0.0.0.0:46656"
node_laddr = "tcp://0.0.0.0:46656"
seeds = ""
fast_sync = true
db_backend = "leveldb"
log_level = "notice"
rpc_laddr = "0.0.0.0:46657"
rpc_laddr = "tcp://0.0.0.0:46657"
`
func defaultConfig(moniker string) (defaultConfig string) {

View File

@ -70,7 +70,7 @@ func ResetConfig(localPath string) cfg.Config {
mapConfig.SetDefault("proxy_app", "dummy")
mapConfig.SetDefault("tmsp", "socket")
mapConfig.SetDefault("moniker", "anonymous")
mapConfig.SetDefault("node_laddr", "0.0.0.0:36656")
mapConfig.SetDefault("node_laddr", "tcp://0.0.0.0:36656")
mapConfig.SetDefault("fast_sync", false)
mapConfig.SetDefault("skip_upnp", true)
mapConfig.SetDefault("addrbook_file", rootDir+"/addrbook.json")
@ -78,21 +78,22 @@ func ResetConfig(localPath string) cfg.Config {
mapConfig.SetDefault("db_backend", "memdb")
mapConfig.SetDefault("db_dir", rootDir+"/data")
mapConfig.SetDefault("log_level", "debug")
mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657")
mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:36657")
mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/data/cswal")
mapConfig.SetDefault("cswal_light", false)
mapConfig.SetDefault("filter_peers", false)
mapConfig.SetDefault("block_size", 10000)
mapConfig.SetDefault("disable_data_hash", false)
mapConfig.SetDefault("timeout_propose", 3000)
mapConfig.SetDefault("timeout_propose_delta", 1000)
mapConfig.SetDefault("timeout_prevote", 2000)
mapConfig.SetDefault("timeout_prevote_delta", 1000)
mapConfig.SetDefault("timeout_precommit", 2000)
mapConfig.SetDefault("timeout_precommit_delta", 1000)
mapConfig.SetDefault("timeout_commit", 1000)
mapConfig.SetDefault("timeout_propose", 2000)
mapConfig.SetDefault("timeout_propose_delta", 500)
mapConfig.SetDefault("timeout_prevote", 1000)
mapConfig.SetDefault("timeout_prevote_delta", 500)
mapConfig.SetDefault("timeout_precommit", 1000)
mapConfig.SetDefault("timeout_precommit_delta", 500)
mapConfig.SetDefault("timeout_commit", 100)
mapConfig.SetDefault("mempool_recheck", true)
mapConfig.SetDefault("mempool_recheck_empty", true)
mapConfig.SetDefault("mempool_broadcast", true)
@ -105,12 +106,12 @@ var defaultConfigTmpl = `# This is a TOML config file.
proxy_app = "dummy"
moniker = "__MONIKER__"
node_laddr = "0.0.0.0:36656"
node_laddr = "tcp://0.0.0.0:36656"
seeds = ""
fast_sync = false
db_backend = "memdb"
log_level = "debug"
rpc_laddr = "0.0.0.0:36657"
rpc_laddr = "tcp://0.0.0.0:36657"
`
func defaultConfig(moniker string) (defaultConfig string) {

View File

@ -1,4 +1,18 @@
The core consensus algorithm.
# The core consensus algorithm.
* state.go - The state machine as detailed in the whitepaper
* reactor.go - A reactor that connects the state machine to the gossip network
# Go-routine summary
The reactor runs 2 go-routines for each added peer: gossipDataRoutine and gossipVotesRoutine.
The consensus state runs two persistent go-routines: timeoutRoutine and receiveRoutine.
Go-routines are also started to trigger timeouts and to avoid blocking when the internalMsgQueue is really backed up.
# Replay/WAL
A write-ahead log is used to record all messages processed by the receiveRoutine,
which amounts to all inputs to the consensus state machine:
messages from peers, messages from ourselves, and timeouts.
They can be played back deterministically at startup or using the replay console.

View File

@ -316,8 +316,9 @@ func fixedConsensusState() *ConsensusState {
state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
return newConsensusState(state, privValidator, counter.NewCounterApplication(true))
privValidator.Reset()
cs := newConsensusState(state, privValidator, counter.NewCounterApplication(true))
return cs
}
func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState {

View File

@ -140,6 +140,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// Messages affect either a peer state or the consensus state.
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered by the receiveRoutine
// NOTE: blocks on consensus state for proposals, block parts, and votes
func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
if !conR.IsRunning() {
log.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)

View File

@ -19,6 +19,8 @@ import (
)
// unmarshal and apply a single message to the consensus state
// as if it were received in receiveRoutine
// NOTE: receiveRoutine should not be running
func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error {
var err error
var msg ConsensusLogMessage
@ -31,7 +33,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte
// for logging
switch m := msg.Msg.(type) {
case types.EventDataRoundState:
log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step)
log.Notice("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
// these are playback checks
ticker := time.After(time.Second * 2)
if newStepCh != nil {
@ -53,41 +55,37 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte
switch msg := m.Msg.(type) {
case *ProposalMessage:
p := msg.Proposal
log.Notice("Proposal", "height", p.Height, "round", p.Round, "header",
log.Notice("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey)
case *BlockPartMessage:
log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey)
log.Notice("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey)
case *VoteMessage:
v := msg.Vote
log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type,
log.Notice("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey)
}
// internal or from peer
if m.PeerKey == "" {
cs.internalMsgQueue <- m
} else {
cs.peerMsgQueue <- m
}
cs.handleMsg(m, cs.RoundState)
case timeoutInfo:
log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.tockChan <- m
log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.handleTimeout(m, cs.RoundState)
default:
return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg))
return fmt.Errorf("Replay: Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg))
}
return nil
}
// replay only those messages since the last block
// replay only those messages since the last block.
// timeoutRoutine should run concurrently to read off tickChan
func (cs *ConsensusState) catchupReplay(height int) error {
if cs.wal == nil {
log.Warn("consensus msg log is nil")
return nil
}
if !cs.wal.exists {
// new wal, nothing to catchup on
if !cs.wal.Exists() {
return nil
}
// set replayMode
cs.replayMode = true
defer func() { cs.replayMode = false }()
// starting from end of file,
// read messages until a new height is found
nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool {
@ -142,7 +140,7 @@ func (cs *ConsensusState) catchupReplay(height int) error {
return err
}
}
log.Info("Done catchup replay")
log.Notice("Done catchup replay")
return nil
}
@ -255,8 +253,9 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
}
func (cs *ConsensusState) startForReplay() {
// don't want to start full cs
cs.BaseService.OnStart()
go cs.receiveRoutine(0)
// since we replay tocks we just ignore ticks
go func() {
for {

View File

@ -1,12 +1,16 @@
package consensus
import (
"fmt"
"io/ioutil"
"os"
"strings"
"testing"
"time"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-events"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types"
)
@ -56,38 +60,37 @@ var testLog = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254
{"time":"2016-04-03T11:23:54.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"39147DA595F08B73CF8C899967C8403B5872FD9042FFA4E239159E0B6C5D9665C9CA81D766EACA2AE658872F94C2FCD1E34BF51859CD5B274DA8512BACE4B50D"}}],"peer_key":""}]}
`
func TestReplayCatchup(t *testing.T) {
// map lines in the above wal to privVal step
var mapPrivValStep = map[int]int8{
0: 0,
1: 0,
2: 1,
3: 1,
4: 1,
5: 2,
6: 2,
7: 3,
}
func writeWAL(log string) string {
fmt.Println("writing", log)
// write the needed wal to file
f, err := ioutil.TempFile(os.TempDir(), "replay_test_")
if err != nil {
panic(err)
}
name := f.Name()
_, err = f.WriteString(testLog)
_, err = f.WriteString(log)
if err != nil {
panic(err)
}
name := f.Name()
f.Close()
return name
}
cs := fixedConsensusState()
// we've already precommitted on the first block
// without replay catchup we would be halted here forever
cs.privValidator.LastHeight = 1 // first block
cs.privValidator.LastStep = 3 // precommit
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0)
// start timeout and receive routines
cs.startRoutines(0)
// open wal and run catchup messages
openWAL(t, cs, name)
if err := cs.catchupReplay(cs.Height); err != nil {
panic(Fmt("Error on catchup replay %v", err))
}
after := time.After(time.Second * 15)
func waitForBlock(newBlockCh chan interface{}) {
after := time.After(time.Second * 10)
select {
case <-newBlockCh:
case <-after:
@ -95,12 +98,106 @@ func TestReplayCatchup(t *testing.T) {
}
}
func openWAL(t *testing.T, cs *ConsensusState, file string) {
// open the wal
wal, err := NewWAL(file, config.GetBool("cswal_light"))
if err != nil {
panic(err)
}
wal.exists = true
cs.wal = wal
func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}) {
cs.config.Set("cswal", fileName)
cs.Start()
// Wait to make a new block.
// This is just a signal that we haven't halted; its not something contained in the WAL itself.
// Assuming the consensus state is running, replay of any WAL, including the empty one,
// should eventually be followed by a new block, or else something is wrong
waitForBlock(newBlockCh)
cs.Stop()
}
func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) {
fmt.Println("-------------------------------------")
log.Notice(Fmt("Starting replay test of %d lines of WAL (crash before write)", nLines))
lineStep := nLines
if crashAfter {
lineStep -= 1
}
split := strings.Split(testLog, "\n")
lastMsg := split[nLines]
// we write those lines up to (not including) one with the signature
fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n")
cs := fixedConsensusState()
// set the last step according to when we crashed vs the wal
cs.privValidator.LastHeight = 1 // first block
cs.privValidator.LastStep = mapPrivValStep[lineStep]
fmt.Println("LAST STEP", cs.privValidator.LastStep)
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
return cs, newBlockCh, lastMsg, fileName
}
//-----------------------------------------------
// Test the log at every iteration, and set the privVal last step
// as if the log was written after signing, before the crash
func TestReplayCrashAfterWrite(t *testing.T) {
split := strings.Split(testLog, "\n")
for i := 0; i < len(split)-1; i++ {
cs, newBlockCh, _, f := setupReplayTest(i+1, true)
runReplayTest(t, cs, f, newBlockCh)
}
}
//-----------------------------------------------
// Test the log as if we crashed after signing but before writing.
// This relies on privValidator.LastSignature being set
func TestReplayCrashBeforeWritePropose(t *testing.T) {
cs, newBlockCh, proposalMsg, f := setupReplayTest(2, false) // propose
// Set LastSig
var err error
var msg ConsensusLogMessage
wire.ReadJSON(&msg, []byte(proposalMsg), &err)
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
cs.privValidator.LastSignature = proposal.Proposal.Signature
runReplayTest(t, cs, f, newBlockCh)
}
func TestReplayCrashBeforeWritePrevote(t *testing.T) {
cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote
cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) {
// Set LastSig
var err error
var msg ConsensusLogMessage
wire.ReadJSON(&msg, []byte(voteMsg), &err)
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
cs.privValidator.LastSignature = vote.Vote.Signature
})
runReplayTest(t, cs, f, newBlockCh)
}
func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit
cs.evsw.AddListenerForEvent("tester", types.EventStringPolka(), func(data events.EventData) {
// Set LastSig
var err error
var msg ConsensusLogMessage
wire.ReadJSON(&msg, []byte(voteMsg), &err)
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
cs.privValidator.LastSignature = vote.Vote.Signature
})
runReplayTest(t, cs, f, newBlockCh)
}

View File

@ -215,7 +215,7 @@ type ConsensusState struct {
QuitService
config cfg.Config
proxyAppConn proxy.AppConn
proxyAppConn proxy.AppConnConsensus
blockStore *bc.BlockStore
mempool *mempl.Mempool
privValidator *types.PrivValidator
@ -233,12 +233,13 @@ type ConsensusState struct {
evsw *events.EventSwitch
wal *WAL
wal *WAL
replayMode bool // so we don't log signing errors during replay
nSteps int // used for testing to limit the number of transitions the state makes
}
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConn, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
cs := &ConsensusState{
config: config,
proxyAppConn: proxyAppConn,
@ -298,8 +299,17 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) {
func (cs *ConsensusState) OnStart() error {
cs.QuitService.OnStart()
// start timeout and receive routines
cs.startRoutines(0)
err := cs.OpenWAL(cs.config.GetString("cswal"))
if err != nil {
return err
}
// we need the timeoutRoutine for replay so
// we don't block on the tick chan.
// NOTE: we will get a build up of garbage go routines
// firing on the tockChan until the receiveRoutine is started
// to deal with them (by that point, at most one will be valid)
go cs.timeoutRoutine()
// we may have lost some votes if the process crashed
// reload from consensus log to catchup
@ -308,8 +318,12 @@ func (cs *ConsensusState) OnStart() error {
// let's go for it anyways, maybe we're fine
}
// now start the receiveRoutine
go cs.receiveRoutine(0)
// schedule the first round!
cs.scheduleRound0(cs.Height)
// use GetRoundState so we don't race the receiveRoutine for access
cs.scheduleRound0(cs.GetRoundState())
return nil
}
@ -407,13 +421,13 @@ func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) {
}
// enterNewRound(height, 0) at cs.StartTime.
func (cs *ConsensusState) scheduleRound0(height int) {
func (cs *ConsensusState) scheduleRound0(rs *RoundState) {
//log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime)
sleepDuration := cs.StartTime.Sub(time.Now())
sleepDuration := rs.StartTime.Sub(time.Now())
if sleepDuration < time.Duration(0) {
sleepDuration = time.Duration(0)
}
cs.scheduleTimeout(sleepDuration, height, 0, RoundStepNewHeight)
cs.scheduleTimeout(sleepDuration, rs.Height, 0, RoundStepNewHeight)
}
// Attempt to schedule a timeout by sending timeoutInfo on the tickChan.
@ -432,7 +446,7 @@ func (cs *ConsensusState) sendInternalMessage(mi msgInfo) {
// be processed out of order.
// TODO: use CList here for strict determinism and
// attempt push to internalMsgQueue in receiveRoutine
log.Debug("Internal msg queue is full. Using a go-routine")
log.Warn("Internal msg queue is full. Using a go-routine")
go func() { cs.internalMsgQueue <- mi }()
}
}
@ -843,7 +857,9 @@ func (cs *ConsensusState) decideProposal(height, round int) {
log.Info("Signed proposal", "height", height, "round", round, "proposal", proposal)
log.Debug(Fmt("Signed proposal block: %v", block))
} else {
log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err)
if !cs.replayMode {
log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err)
}
}
}
@ -1254,7 +1270,7 @@ func (cs *ConsensusState) finalizeCommit(height int) {
// cs.StartTime is already set.
// Schedule Round0 to start soon.
cs.scheduleRound0(height + 1)
cs.scheduleRound0(&cs.RoundState)
// By here,
// * cs.Height has been increment to height+1
@ -1270,9 +1286,6 @@ func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Blo
cs.mempool.Lock()
defer cs.mempool.Unlock()
// flush out any CheckTx that have already started
cs.proxyAppConn.FlushSync()
// Commit block, get hash back
res := cs.proxyAppConn.CommitSync()
if res.IsErr() {
@ -1502,8 +1515,9 @@ func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSet
return vote, err
}
// signs the vote, publishes on internalMsgQueue
// sign the vote and publish on internalMsgQueue
func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote {
if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) {
return nil
}
@ -1515,7 +1529,9 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part
log.Info("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
return vote
} else {
log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
if !cs.replayMode {
log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
}
return nil
}
}

View File

@ -236,7 +236,7 @@ func TestFullRound1(t *testing.T) {
cs, vss := randConsensusState(1)
height, round := cs.Height, cs.Round
voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 0)
propCh := subscribeToEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), 1)
newRoundCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewRound(), 1)
@ -249,6 +249,8 @@ func TestFullRound1(t *testing.T) {
propBlockHash := re.(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash()
<-voteCh // wait for prevote
// NOTE: voteChan cap of 0 ensures we can complete this
// before consensus can move to the next height (and cause a race condition)
validatePrevote(t, cs, round, vss[0], propBlockHash)
<-voteCh // wait for precommit

View File

@ -60,6 +60,14 @@ func NewWAL(file string, light bool) (*WAL, error) {
}, nil
}
func (wal *WAL) Exists() bool {
if wal == nil {
log.Warn("consensus msg log is nil")
return false
}
return wal.exists
}
// called in newStep and for each pass in receiveRoutine
func (wal *WAL) Save(clm ConsensusLogMessageInterface) {
if wal != nil {

66
glide.lock generated
View File

@ -1,64 +1,66 @@
hash: d87a1fe0061d41c1e6ec78d405d54ae321e75f4bff22b38d19d3255bbd17f21e
updated: 2016-06-11T18:38:47.019992204-07:00
updated: 2016-09-10T18:02:24.023038691-04:00
imports:
- name: github.com/btcsuite/btcd
version: ff4ada0b0e1ebffa3f9c15cadc96ab0d08a11034
version: 2ef82e7db35dc8c499fa9091d768dc99bbaff893
subpackages:
- btcec
- name: github.com/btcsuite/fastsha256
version: 302ad4db268b46f9ebda3078f6f7397f96047735
version: 637e656429416087660c84436a2a035d69d54e2e
- name: github.com/BurntSushi/toml
version: f0aeabca5a127c4078abb8c8d64298b147264b55
version: 99064174e013895bbd9b025c31100bd1d9b590ca
- name: github.com/go-stack/stack
version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82
- name: github.com/gogo/protobuf
version: 318371cbef6bab80e8d1c69b470fffa79eebfb54
version: a11c89fbb0ad4acfa8abc4a4d5f7e27c477169b1
subpackages:
- proto
- name: github.com/golang/protobuf
version: 8616e8ee5e20a1704615e6c8d7afcdac06087a67
version: 1f49d83d9aa00e6ce4fc8258c71cc7786aec968a
subpackages:
- proto
- name: github.com/golang/snappy
version: d9eb7a3d35ec988b8585d4a0068e462c27d28380
- name: github.com/gorilla/websocket
version: a68708917c6a4f06314ab4e52493cc61359c9d42
version: a69d25be2fe2923a97c2af6849b2f52426f68fc0
- name: github.com/mattn/go-colorable
version: 9056b7a9f2d1f2d96498d6d146acd1f9d5ed3d59
version: ed8eb9e318d7a84ce5915b495b7d35e0cfe7b5a8
- name: github.com/mattn/go-isatty
version: 56b76bdf51f7708750eac80fa38b952bb9f32639
version: 66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8
- name: github.com/spf13/pflag
version: 367864438f1b1a3c7db4da06a2f55b144e6784e0
version: 6fd2ff4ff8dfcdf5556fbdc0ac0284408274b1a7
- name: github.com/syndtr/goleveldb
version: fa5b5c78794bc5c18f330361059f871ae8c2b9d6
version: 6ae1797c0b42b9323fc27ff7dcf568df88f2f33d
subpackages:
- leveldb
- leveldb/errors
- leveldb/opt
- leveldb/cache
- leveldb/comparer
- leveldb/errors
- leveldb/filter
- leveldb/iterator
- leveldb/journal
- leveldb/memdb
- leveldb/opt
- leveldb/storage
- leveldb/table
- leveldb/util
- name: github.com/tendermint/ed25519
version: 1f52c6f8b8a5c7908aff4497c186af344b428925
subpackages:
- extra25519
- edwards25519
- extra25519
- name: github.com/tendermint/flowcontrol
version: 84d9671090430e8ec80e35b339907e0579b999eb
- name: github.com/tendermint/go-clist
version: 3baa390bbaf7634251c42ad69a8682e7e3990552
- name: github.com/tendermint/go-common
version: dee6622bf7f811d3ba8638a3f5ffaf8d679aa9d9
version: 47e06734f6ee488cc2e61550a38642025e1d4227
subpackages:
- test
- name: github.com/tendermint/go-config
version: e64b424499acd0eb9856b88e10c0dff41628c0d6
- name: github.com/tendermint/go-crypto
version: 41cfb7b677f4e16cdfd22b6ce0946c89919fbc7b
version: 4b11d62bdb324027ea01554e5767b71174680ba0
- name: github.com/tendermint/go-db
version: 31fdd21c7eaeed53e0ea7ca597fb1e960e2988a5
- name: github.com/tendermint/go-events
@ -68,11 +70,11 @@ imports:
- name: github.com/tendermint/go-merkle
version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8
- name: github.com/tendermint/go-p2p
version: 929cf433b9c8e987af5f7f3ca3ce717e1e3eda53
version: f508f3f20b5bb36f03d3bc83647b7a92425139d1
subpackages:
- upnp
- name: github.com/tendermint/go-rpc
version: dea910cd3e71bbfaf1973fd7ba295f0ee515a25f
version: 479510be0e80dd9e5d6b1f941adad168df0af85f
subpackages:
- client
- server
@ -84,38 +86,40 @@ imports:
subpackages:
- term
- name: github.com/tendermint/tmsp
version: ba11348508939e9d273cdc1cc476c5c611e14e66
version: ead192adbbbf85ac581cf775b18ae70d59f86457
subpackages:
- client
- example/counter
- example/dummy
- example/nil
- server
- types
- name: golang.org/x/crypto
version: 77f4136a99ffb5ecdbdd0226bd5cb146cf56bc0e
version: aa2481cbfe81d911eb62b642b7a6b5ec58bbea71
subpackages:
- ripemd160
- curve25519
- nacl/box
- nacl/secretbox
- openpgp/armor
- curve25519
- salsa20/salsa
- poly1305
- openpgp/errors
- poly1305
- ripemd160
- salsa20/salsa
- name: golang.org/x/net
version: 3f122ce3dbbe488b7e6a8bdb26f41edec852a40b
version: cfe3c2a7525b50c3d707256e371c90938cfef98a
subpackages:
- context
- http2
- trace
- http2/hpack
- lex/httplex
- internal/timeseries
- lex/httplex
- trace
- name: golang.org/x/sys
version: 7f918dd405547ecb864d14a8ecbbfe205b5f930f
version: 30de6d19a3bd89a5f38ae4028e23aaa5582648af
subpackages:
- unix
- name: google.golang.org/grpc
version: daeb9cc0f2607997cce611a1458e71b981ce5986
version: 28707e14b1d2b2f5da81474dea2790d71e526987
subpackages:
- codes
- credentials
@ -123,6 +127,6 @@ imports:
- internal
- metadata
- naming
- transport
- peer
devImports: []
- transport
testImports: []

View File

@ -49,7 +49,7 @@ type Mempool struct {
config cfg.Config
proxyMtx sync.Mutex
proxyAppConn proxy.AppConn
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
counter int64 // simple incrementing counter
height int // the last block Update()'d to
@ -63,7 +63,7 @@ type Mempool struct {
cacheList *list.List // to remove oldest tx when cache gets too big
}
func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool {
func NewMempool(config cfg.Config, proxyAppConn proxy.AppConnMempool) *Mempool {
mempool := &Mempool{
config: config,
proxyAppConn: proxyAppConn,

View File

@ -6,7 +6,6 @@ import (
"net"
"net/http"
"strings"
"sync"
"time"
. "github.com/tendermint/go-common"
@ -26,9 +25,6 @@ import (
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy"
"github.com/tendermint/tmsp/example/nil"
)
import _ "net/http/pprof"
@ -45,9 +41,17 @@ type Node struct {
privValidator *types.PrivValidator
genesisDoc *types.GenesisDoc
privKey crypto.PrivKeyEd25519
proxyApp proxy.AppConns
}
func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr, transport string, appHash []byte) proxy.AppConn) *Node {
func NewNodeDefault(config cfg.Config) *Node {
// Get PrivValidator
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
return NewNode(config, privValidator, proxy.DefaultClientCreator(config))
}
func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator) *Node {
EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here
@ -61,12 +65,9 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
// Get State
state := getState(config, stateDB)
// Create two proxyAppConn connections,
// one for the consensus and one for the mempool.
proxyAddr := config.GetString("proxy_app")
transport := config.GetString("tmsp")
proxyAppConnMempool := getProxyApp(proxyAddr, transport, state.AppHash)
proxyAppConnConsensus := getProxyApp(proxyAddr, transport, state.AppHash)
// Create the proxyApp, which houses three connections:
// query, consensus, and mempool
proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore)
// add the chainid and number of validators to the global config
config.Set("chain_id", state.ChainID)
@ -93,14 +94,14 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
}
// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppConnConsensus, blockStore, fastSync)
bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
// Make MempoolReactor
mempool := mempl.NewMempool(config, proxyAppConnMempool)
mempool := mempl.NewMempool(config, proxyApp.Mempool())
mempoolReactor := mempl.NewMempoolReactor(config, mempool)
// Make ConsensusReactor
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, fastSync)
if privValidator != nil {
consensusReactor.SetPrivValidator(privValidator)
@ -118,6 +119,27 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
// filter peers by addr or pubkey with a tmsp query.
// if the query return code is OK, add peer
// XXX: query format subject to change
if config.GetBool("filter_peers") {
// NOTE: addr is ip:port
sw.SetAddrFilter(func(addr net.Addr) error {
res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/addr/%s", addr.String())))
if res.IsOK() {
return nil
}
return res
})
sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/pubkey/%X", pubkey.Bytes())))
if res.IsOK() {
return nil
}
return res
})
}
// add the event switch to all services
// they should all satisfy events.Eventable
SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
@ -125,6 +147,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
// run the profile server
profileHost := config.GetString("prof_laddr")
if profileHost != "" {
go func() {
log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
}()
@ -142,6 +165,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
privValidator: privValidator,
genesisDoc: state.GenesisDoc,
privKey: privKey,
proxyApp: proxyApp,
}
}
@ -185,6 +209,7 @@ func (n *Node) StartRPC() ([]net.Listener, error) {
rpccore.SetSwitch(n.sw)
rpccore.SetPrivValidator(n.privValidator)
rpccore.SetGenesisDoc(n.genesisDoc)
rpccore.SetProxyAppQuery(n.proxyApp.Query())
listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",")
@ -270,40 +295,6 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255
return nodeInfo
}
// Get a connection to the proxyAppConn addr.
// Check the current hash, and panic if it doesn't match.
func GetProxyApp(addr, transport string, hash []byte) (proxyAppConn proxy.AppConn) {
// use local app (for testing)
switch addr {
case "nilapp":
app := nilapp.NewNilApplication()
mtx := new(sync.Mutex)
proxyAppConn = tmspcli.NewLocalClient(mtx, app)
case "dummy":
app := dummy.NewDummyApplication()
mtx := new(sync.Mutex)
proxyAppConn = tmspcli.NewLocalClient(mtx, app)
default:
// Run forever in a loop
remoteApp, err := proxy.NewRemoteAppConn(addr, transport)
if err != nil {
Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
}
proxyAppConn = remoteApp
}
// Check the hash
res := proxyAppConn.CommitSync()
if res.IsErr() {
PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", res))
}
if !bytes.Equal(hash, res.Data) {
log.Warn(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, res.Data))
}
return proxyAppConn
}
// Load the most recent state from "state" db,
// or create a new one (and save) from genesis.
func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
@ -317,9 +308,12 @@ func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
//------------------------------------------------------------------------------
// Users wishing to use an external signer for their validators
// Users wishing to:
// * use an external signer for their validators
// * supply an in-proc tmsp app
// should fork tendermint/tendermint and implement RunNode to
// load their custom priv validator and call NewNode(privVal, getProxyFunc)
// call NewNode with their custom priv validator and/or custom
// proxy.ClientCreator interface
func RunNode(config cfg.Config) {
// Wait until the genesis doc becomes available
genDocFile := config.GetString("genesis_file")
@ -342,13 +336,11 @@ func RunNode(config cfg.Config) {
}
}
// Get PrivValidator
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
// Create & start node
n := NewNode(config, privValidator, GetProxyApp)
l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp"))
n := NewNodeDefault(config)
protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
n.AddListener(l)
err := n.Start()
if err != nil {
@ -400,10 +392,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
// Create two proxyAppConn connections,
// one for the consensus and one for the mempool.
proxyAddr := config.GetString("proxy_app")
transport := config.GetString("tmsp")
proxyAppConnMempool := GetProxyApp(proxyAddr, transport, state.AppHash)
proxyAppConnConsensus := GetProxyApp(proxyAddr, transport, state.AppHash)
proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore)
// add the chainid to the global config
config.Set("chain_id", state.ChainID)
@ -415,9 +404,9 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
Exit(Fmt("Failed to start event switch: %v", err))
}
mempool := mempl.NewMempool(config, proxyAppConnMempool)
mempool := mempl.NewMempool(config, proxyApp.Mempool())
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
consensusState.SetEventSwitch(eventSwitch)
return consensusState
}
@ -448,3 +437,13 @@ func RunReplay(config cfg.Config) {
}
log.Notice("Replay run successfully")
}
// Defaults to tcp
func ProtocolAndAddress(listenAddr string) (string, string) {
protocol, address := "tcp", listenAddr
parts := strings.SplitN(address, "://", 2)
if len(parts) == 2 {
protocol, address = parts[0], parts[1]
}
return protocol, address
}

View File

@ -6,19 +6,15 @@ import (
"github.com/tendermint/go-p2p"
"github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/types"
)
func TestNodeStartStop(t *testing.T) {
config := tendermint_test.ResetConfig("node_node_test")
// Get PrivValidator
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
// Create & start node
n := NewNode(config, privValidator, GetProxyApp)
l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp"))
n := NewNodeDefault(config)
protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
n.AddListener(l)
n.Start()
log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())

View File

@ -2,8 +2,139 @@ package proxy
import (
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/types"
)
type AppConn interface {
tmspcli.Client
//----------------------------------------------------------------------------------------
// Enforce which tmsp msgs can be sent on a connection at the type level
type AppConnConsensus interface {
SetResponseCallback(tmspcli.Callback)
Error() error
InitChainSync(validators []*types.Validator) (err error)
BeginBlockSync(height uint64) (err error)
AppendTxAsync(tx []byte) *tmspcli.ReqRes
EndBlockSync(height uint64) (changedValidators []*types.Validator, err error)
CommitSync() (res types.Result)
}
type AppConnMempool interface {
SetResponseCallback(tmspcli.Callback)
Error() error
CheckTxAsync(tx []byte) *tmspcli.ReqRes
FlushAsync() *tmspcli.ReqRes
FlushSync() error
}
type AppConnQuery interface {
Error() error
EchoSync(string) (res types.Result)
InfoSync() (res types.Result)
QuerySync(tx []byte) (res types.Result)
// SetOptionSync(key string, value string) (res types.Result)
}
//-----------------------------------------------------------------------------------------
// Implements AppConnConsensus (subset of tmspcli.Client)
type appConnConsensus struct {
appConn tmspcli.Client
}
func NewAppConnConsensus(appConn tmspcli.Client) *appConnConsensus {
return &appConnConsensus{
appConn: appConn,
}
}
func (app *appConnConsensus) SetResponseCallback(cb tmspcli.Callback) {
app.appConn.SetResponseCallback(cb)
}
func (app *appConnConsensus) Error() error {
return app.appConn.Error()
}
func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) {
return app.appConn.InitChainSync(validators)
}
func (app *appConnConsensus) BeginBlockSync(height uint64) (err error) {
return app.appConn.BeginBlockSync(height)
}
func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes {
return app.appConn.AppendTxAsync(tx)
}
func (app *appConnConsensus) EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) {
return app.appConn.EndBlockSync(height)
}
func (app *appConnConsensus) CommitSync() (res types.Result) {
return app.appConn.CommitSync()
}
//------------------------------------------------
// Implements AppConnMempool (subset of tmspcli.Client)
type appConnMempool struct {
appConn tmspcli.Client
}
func NewAppConnMempool(appConn tmspcli.Client) *appConnMempool {
return &appConnMempool{
appConn: appConn,
}
}
func (app *appConnMempool) SetResponseCallback(cb tmspcli.Callback) {
app.appConn.SetResponseCallback(cb)
}
func (app *appConnMempool) Error() error {
return app.appConn.Error()
}
func (app *appConnMempool) FlushAsync() *tmspcli.ReqRes {
return app.appConn.FlushAsync()
}
func (app *appConnMempool) FlushSync() error {
return app.appConn.FlushSync()
}
func (app *appConnMempool) CheckTxAsync(tx []byte) *tmspcli.ReqRes {
return app.appConn.CheckTxAsync(tx)
}
//------------------------------------------------
// Implements AppConnQuery (subset of tmspcli.Client)
type appConnQuery struct {
appConn tmspcli.Client
}
func NewAppConnQuery(appConn tmspcli.Client) *appConnQuery {
return &appConnQuery{
appConn: appConn,
}
}
func (app *appConnQuery) Error() error {
return app.appConn.Error()
}
func (app *appConnQuery) EchoSync(msg string) (res types.Result) {
return app.appConn.EchoSync(msg)
}
func (app *appConnQuery) InfoSync() (res types.Result) {
return app.appConn.InfoSync()
}
func (app *appConnQuery) QuerySync(tx []byte) (res types.Result) {
return app.appConn.QuerySync(tx)
}

View File

@ -5,14 +5,47 @@ import (
"testing"
. "github.com/tendermint/go-common"
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy"
"github.com/tendermint/tmsp/server"
"github.com/tendermint/tmsp/types"
)
//----------------------------------------
type AppConnTest interface {
EchoAsync(string) *tmspcli.ReqRes
FlushSync() error
InfoSync() (res types.Result)
}
type appConnTest struct {
appConn tmspcli.Client
}
func NewAppConnTest(appConn tmspcli.Client) AppConnTest {
return &appConnTest{appConn}
}
func (app *appConnTest) EchoAsync(msg string) *tmspcli.ReqRes {
return app.appConn.EchoAsync(msg)
}
func (app *appConnTest) FlushSync() error {
return app.appConn.FlushSync()
}
func (app *appConnTest) InfoSync() types.Result {
return app.appConn.InfoSync()
}
//----------------------------------------
var SOCKET = "socket"
func TestEcho(t *testing.T) {
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
// Start server
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
@ -21,12 +54,12 @@ func TestEcho(t *testing.T) {
}
defer s.Stop()
// Start client
proxy, err := NewRemoteAppConn(sockPath, SOCKET)
cli, err := clientCreator.NewTMSPClient()
if err != nil {
Exit(err.Error())
} else {
t.Log("Connected")
}
proxy := NewAppConnTest(cli)
t.Log("Connected")
for i := 0; i < 1000; i++ {
proxy.EchoAsync(Fmt("echo-%v", i))
@ -37,6 +70,7 @@ func TestEcho(t *testing.T) {
func BenchmarkEcho(b *testing.B) {
b.StopTimer() // Initialize
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
// Start server
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
if err != nil {
@ -44,12 +78,12 @@ func BenchmarkEcho(b *testing.B) {
}
defer s.Stop()
// Start client
proxy, err := NewRemoteAppConn(sockPath, SOCKET)
cli, err := clientCreator.NewTMSPClient()
if err != nil {
Exit(err.Error())
} else {
b.Log("Connected")
}
proxy := NewAppConnTest(cli)
b.Log("Connected")
echoString := strings.Repeat(" ", 200)
b.StartTimer() // Start benchmarking tests
@ -65,6 +99,7 @@ func BenchmarkEcho(b *testing.B) {
func TestInfo(t *testing.T) {
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
// Start server
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
if err != nil {
@ -72,12 +107,13 @@ func TestInfo(t *testing.T) {
}
defer s.Stop()
// Start client
proxy, err := NewRemoteAppConn(sockPath, SOCKET)
cli, err := clientCreator.NewTMSPClient()
if err != nil {
Exit(err.Error())
} else {
t.Log("Connected")
}
proxy := NewAppConnTest(cli)
t.Log("Connected")
res := proxy.InfoSync()
if res.IsErr() {
t.Errorf("Unexpected error: %v", err)

81
proxy/client.go Normal file
View File

@ -0,0 +1,81 @@
package proxy
import (
"fmt"
"sync"
cfg "github.com/tendermint/go-config"
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy"
nilapp "github.com/tendermint/tmsp/example/nil"
"github.com/tendermint/tmsp/types"
)
// NewTMSPClient returns newly connected client
type ClientCreator interface {
NewTMSPClient() (tmspcli.Client, error)
}
//----------------------------------------------------
// local proxy uses a mutex on an in-proc app
type localClientCreator struct {
mtx *sync.Mutex
app types.Application
}
func NewLocalClientCreator(app types.Application) ClientCreator {
return &localClientCreator{
mtx: new(sync.Mutex),
app: app,
}
}
func (l *localClientCreator) NewTMSPClient() (tmspcli.Client, error) {
return tmspcli.NewLocalClient(l.mtx, l.app), nil
}
//---------------------------------------------------------------
// remote proxy opens new connections to an external app process
type remoteClientCreator struct {
addr string
transport string
mustConnect bool
}
func NewRemoteClientCreator(addr, transport string, mustConnect bool) ClientCreator {
return &remoteClientCreator{
addr: addr,
transport: transport,
mustConnect: mustConnect,
}
}
func (r *remoteClientCreator) NewTMSPClient() (tmspcli.Client, error) {
// Run forever in a loop
fmt.Println("ADDR", r.addr, r.transport)
remoteApp, err := tmspcli.NewClient(r.addr, r.transport, r.mustConnect)
if err != nil {
return nil, fmt.Errorf("Failed to connect to proxy: %v", err)
}
return remoteApp, nil
}
//-----------------------------------------------------------------
// default
func DefaultClientCreator(config cfg.Config) ClientCreator {
addr := config.GetString("proxy_app")
transport := config.GetString("tmsp")
switch addr {
case "dummy":
return NewLocalClientCreator(dummy.NewDummyApplication())
case "nilapp":
return NewLocalClientCreator(nilapp.NewNilApplication())
default:
mustConnect := true
return NewRemoteClientCreator(addr, transport, mustConnect)
}
}

94
proxy/multi_app_conn.go Normal file
View File

@ -0,0 +1,94 @@
package proxy
import (
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
)
// Tendermint's interface to the application consists of multiple connections
type AppConns interface {
Mempool() AppConnMempool
Consensus() AppConnConsensus
Query() AppConnQuery
}
func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) AppConns {
return NewMultiAppConn(config, clientCreator, state, blockStore)
}
// a multiAppConn is made of a few appConns (mempool, consensus, query)
// and manages their underlying tmsp clients, ensuring they reboot together
type multiAppConn struct {
QuitService
config cfg.Config
state State
blockStore BlockStore
mempoolConn *appConnMempool
consensusConn *appConnConsensus
queryConn *appConnQuery
clientCreator ClientCreator
}
// Make all necessary tmsp connections to the application
func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) *multiAppConn {
multiAppConn := &multiAppConn{
config: config,
state: state,
blockStore: blockStore,
clientCreator: clientCreator,
}
multiAppConn.QuitService = *NewQuitService(log, "multiAppConn", multiAppConn)
multiAppConn.Start()
return multiAppConn
}
// Returns the mempool connection
func (app *multiAppConn) Mempool() AppConnMempool {
return app.mempoolConn
}
// Returns the consensus Connection
func (app *multiAppConn) Consensus() AppConnConsensus {
return app.consensusConn
}
func (app *multiAppConn) Query() AppConnQuery {
return app.queryConn
}
func (app *multiAppConn) OnStart() error {
app.QuitService.OnStart()
// query connection
querycli, err := app.clientCreator.NewTMSPClient()
if err != nil {
return err
}
app.queryConn = NewAppConnQuery(querycli)
// mempool connection
memcli, err := app.clientCreator.NewTMSPClient()
if err != nil {
return err
}
app.mempoolConn = NewAppConnMempool(memcli)
// consensus connection
concli, err := app.clientCreator.NewTMSPClient()
if err != nil {
return err
}
app.consensusConn = NewAppConnConsensus(concli)
// TODO: handshake
// TODO: replay blocks
// TODO: (on restart) replay mempool
return nil
}

View File

@ -1,23 +0,0 @@
package proxy
import (
tmspcli "github.com/tendermint/tmsp/client"
)
// This is goroutine-safe, but users should beware that
// the application in general is not meant to be interfaced
// with concurrent callers.
type remoteAppConn struct {
tmspcli.Client
}
func NewRemoteAppConn(addr, transport string) (*remoteAppConn, error) {
client, err := tmspcli.NewClient(addr, transport, false)
if err != nil {
return nil, err
}
appConn := &remoteAppConn{
Client: client,
}
return appConn, nil
}

9
proxy/state.go Normal file
View File

@ -0,0 +1,9 @@
package proxy
type State interface {
// TODO
}
type BlockStore interface {
// TODO
}

View File

@ -11,6 +11,8 @@ func Validators() (*ctypes.ResultValidators, error) {
var blockHeight int
var validators []*types.Validator
// XXX: this is racy.
// Either use state.LoadState(db) or make state atomic (see #165)
state := consensusState.GetState()
blockHeight = state.LastBlockHeight
state.Validators.Iterate(func(index int, val *types.Validator) bool {

View File

@ -20,6 +20,6 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri
func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) {
log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event)
wsCtx.GetEventSwitch().RemoveListener(event)
wsCtx.GetEventSwitch().RemoveListenerForEvent(event, wsCtx.GetRemoteAddr())
return &ctypes.ResultUnsubscribe{}, nil
}

View File

@ -8,6 +8,7 @@ import (
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/consensus"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
@ -19,6 +20,7 @@ var mempoolReactor *mempl.MempoolReactor
var p2pSwitch *p2p.Switch
var privValidator *types.PrivValidator
var genDoc *types.GenesisDoc // cache the genesis structure
var proxyAppQuery proxy.AppConnQuery
var config cfg.Config = nil
@ -57,3 +59,7 @@ func SetPrivValidator(pv *types.PrivValidator) {
func SetGenesisDoc(doc *types.GenesisDoc) {
genDoc = doc
}
func SetProxyAppQuery(appConn proxy.AppConnQuery) {
proxyAppQuery = appConn
}

View File

@ -25,6 +25,9 @@ var Routes = map[string]*rpc.RPCFunc{
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""),
"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""),
"tmsp_query": rpc.NewRPCFunc(TMSPQueryResult, "query"),
"tmsp_info": rpc.NewRPCFunc(TMSPInfoResult, ""),
"unsafe_flush_mempool": rpc.NewRPCFunc(UnsafeFlushMempool, ""),
"unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"),
"unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfilerResult, "filename"),
@ -152,6 +155,22 @@ func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) {
}
}
func TMSPQueryResult(query []byte) (ctypes.TMResult, error) {
if r, err := TMSPQuery(query); err != nil {
return nil, err
} else {
return r, nil
}
}
func TMSPInfoResult() (ctypes.TMResult, error) {
if r, err := TMSPInfo(); err != nil {
return nil, err
} else {
return r, nil
}
}
func UnsafeFlushMempoolResult() (ctypes.TMResult, error) {
if r, err := UnsafeFlushMempool(); err != nil {
return nil, err

17
rpc/core/tmsp.go Normal file
View File

@ -0,0 +1,17 @@
package core
import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)
//-----------------------------------------------------------------------------
func TMSPQuery(query []byte) (*ctypes.ResultTMSPQuery, error) {
res := proxyAppQuery.QuerySync(query)
return &ctypes.ResultTMSPQuery{res}, nil
}
func TMSPInfo() (*ctypes.ResultTMSPInfo, error) {
res := proxyAppQuery.InfoSync()
return &ctypes.ResultTMSPInfo{res}, nil
}

View File

@ -68,6 +68,14 @@ type ResultUnconfirmedTxs struct {
Txs []types.Tx `json:"txs"`
}
type ResultTMSPInfo struct {
Result tmsp.Result `json:"result"`
}
type ResultTMSPQuery struct {
Result tmsp.Result `json:"result"`
}
type ResultUnsafeFlushMempool struct{}
type ResultUnsafeSetConfig struct{}
@ -107,6 +115,10 @@ const (
ResultTypeBroadcastTx = byte(0x60)
ResultTypeUnconfirmedTxs = byte(0x61)
// 0x7 bytes are for querying the application
ResultTypeTMSPQuery = byte(0x70)
ResultTypeTMSPInfo = byte(0x71)
// 0x8 bytes are for events
ResultTypeSubscribe = byte(0x80)
ResultTypeUnsubscribe = byte(0x81)
@ -145,4 +157,6 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStopCPUProfiler},
wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeWriteHeapProfile},
wire.ConcreteType{&ResultUnsafeFlushMempool{}, ResultTypeUnsafeFlushMempool},
wire.ConcreteType{&ResultTMSPQuery{}, ResultTypeTMSPQuery},
wire.ConcreteType{&ResultTMSPInfo{}, ResultTypeTMSPInfo},
)

View File

@ -2,9 +2,12 @@ package rpctest
import (
"bytes"
"crypto/rand"
crand "crypto/rand"
"fmt"
"math/rand"
"strings"
"testing"
"time"
. "github.com/tendermint/go-common"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
@ -14,6 +17,7 @@ import (
//--------------------------------------------------------------------------------
// Test the HTTP client
// These tests assume the dummy app
//--------------------------------------------------------------------------------
//--------------------------------------------------------------------------------
@ -49,20 +53,22 @@ func testStatus(t *testing.T, statusI interface{}) {
//--------------------------------------------------------------------------------
// broadcast tx sync
func testTx() []byte {
buf := make([]byte, 16)
_, err := rand.Read(buf)
// random bytes (excluding byte('='))
func randBytes() []byte {
n := rand.Intn(10) + 2
buf := make([]byte, n)
_, err := crand.Read(buf)
if err != nil {
panic(err)
}
return buf
return bytes.Replace(buf, []byte("="), []byte{100}, -1)
}
func TestURIBroadcastTxSync(t *testing.T) {
config.Set("block_size", 0)
defer config.Set("block_size", -1)
tmResult := new(ctypes.TMResult)
tx := testTx()
tx := randBytes()
_, err := clientURI.Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, tmResult)
if err != nil {
panic(err)
@ -74,7 +80,7 @@ func TestJSONBroadcastTxSync(t *testing.T) {
config.Set("block_size", 0)
defer config.Set("block_size", -1)
tmResult := new(ctypes.TMResult)
tx := testTx()
tx := randBytes()
_, err := clientJSON.Call("broadcast_tx_sync", []interface{}{tx}, tmResult)
if err != nil {
panic(err)
@ -95,18 +101,73 @@ func testBroadcastTxSync(t *testing.T, resI interface{}, tx []byte) {
txs := mem.Reap(1)
if !bytes.Equal(txs[0], tx) {
panic(Fmt("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], testTx))
panic(Fmt("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], tx))
}
mem.Flush()
}
//--------------------------------------------------------------------------------
// query
func testTxKV() ([]byte, []byte, []byte) {
k := randBytes()
v := randBytes()
return k, v, []byte(Fmt("%s=%s", k, v))
}
func sendTx() ([]byte, []byte) {
tmResult := new(ctypes.TMResult)
k, v, tx := testTxKV()
_, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult)
if err != nil {
panic(err)
}
fmt.Println("SENT TX", tx)
fmt.Printf("SENT TX %X\n", tx)
fmt.Printf("k %X; v %X", k, v)
return k, v
}
func TestURITMSPQuery(t *testing.T) {
k, v := sendTx()
time.Sleep(time.Second)
tmResult := new(ctypes.TMResult)
_, err := clientURI.Call("tmsp_query", map[string]interface{}{"query": Fmt("%X", k)}, tmResult)
if err != nil {
panic(err)
}
testTMSPQuery(t, tmResult, v)
}
func TestJSONTMSPQuery(t *testing.T) {
k, v := sendTx()
tmResult := new(ctypes.TMResult)
_, err := clientJSON.Call("tmsp_query", []interface{}{Fmt("%X", k)}, tmResult)
if err != nil {
panic(err)
}
testTMSPQuery(t, tmResult, v)
}
func testTMSPQuery(t *testing.T, statusI interface{}, value []byte) {
tmRes := statusI.(*ctypes.TMResult)
query := (*tmRes).(*ctypes.ResultTMSPQuery)
if query.Result.IsErr() {
panic(Fmt("Query returned an err: %v", query))
}
// XXX: specific to value returned by the dummy
if !strings.Contains(string(query.Result.Data), "exists=true") {
panic(Fmt("Query error. Expected to find 'exists=true'. Got: %s", query.Result.Data))
}
}
//--------------------------------------------------------------------------------
// broadcast tx commit
func TestURIBroadcastTxCommit(t *testing.T) {
tmResult := new(ctypes.TMResult)
tx := testTx()
tx := randBytes()
_, err := clientURI.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult)
if err != nil {
panic(err)
@ -116,7 +177,7 @@ func TestURIBroadcastTxCommit(t *testing.T) {
func TestJSONBroadcastTxCommit(t *testing.T) {
tmResult := new(ctypes.TMResult)
tx := testTx()
tx := randBytes()
_, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult)
if err != nil {
panic(err)

View File

@ -13,7 +13,6 @@ import (
"github.com/tendermint/tendermint/config/tendermint_test"
nm "github.com/tendermint/tendermint/node"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
)
// global variables for use across all tests
@ -52,10 +51,9 @@ func init() {
// create a new node and sleep forever
func newNode(ready chan struct{}) {
// Create & start node
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
node = nm.NewNode(config, privValidator, nm.GetProxyApp)
l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), true)
node = nm.NewNodeDefault(config)
protocol, address := nm.ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, true)
node.AddListener(l)
node.Start()

View File

@ -1,7 +1,8 @@
#! /bin/bash
set -euo pipefail
GLIDE=$1
LIB=$2
LIB=$1
GLIDE=$GOPATH/src/github.com/tendermint/tendermint/glide.lock
cat $GLIDE | grep -A1 $LIB | grep -v $LIB | awk '{print $2}'

View File

@ -2,12 +2,13 @@
# for every github.com/tendermint dependency, warn is if its not synced with origin/master
GLIDE=$1
GLIDE=$GOPATH/src/github.com/tendermint/tendermint/glide.lock
# make list of libs
LIBS=($(grep "github.com/tendermint" $GLIDE | awk '{print $3}'))
UPTODATE=true
for lib in "${LIBS[@]}"; do
# get vendored commit
VENDORED=`grep -A1 $lib $GLIDE | grep -v $lib | awk '{print $2}'`
@ -18,6 +19,7 @@ for lib in "${LIBS[@]}"; do
cd $PWD
if [[ "$VENDORED" != "$MASTER" ]]; then
UPTODATE=false
echo ""
if [[ "$VENDORED" != "$HEAD" ]]; then
echo "Vendored version of $lib differs from origin/master and HEAD"
@ -32,3 +34,7 @@ for lib in "${LIBS[@]}"; do
fi
done
if [[ "$UPTODATE" == "true" ]]; then
echo "All vendored versions up to date"
fi

View File

@ -4,10 +4,11 @@ IFS=$'\n\t'
# script to update the given dependency in the glide.lock file with the checked out branch on the local host
GLIDE=$1
LIB=$2
LIB=$1
OLD_COMMIT=`bash scripts/glide/parse.sh $GLIDE $LIB`
GLIDE=$GOPATH/src/github.com/tendermint/tendermint/glide.lock
OLD_COMMIT=`bash scripts/glide/parse.sh $LIB`
PWD=`pwd`
cd $GOPATH/src/github.com/tendermint/$LIB

View File

@ -2,6 +2,7 @@
go get github.com/tendermint/tmsp/...
# get the tmsp commit used by tendermint
COMMIT=`bash scripts/glide/parse.sh $(pwd)/glide.lock tmsp`
cd $GOPATH/src/github.com/tendermint/tmsp

View File

@ -18,7 +18,7 @@ func (s *State) ValidateBlock(block *types.Block) error {
// Execute the block to mutate State.
// Validates block and then executes Data.Txs in the block.
func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block, blockPartsHeader types.PartSetHeader) error {
func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error {
// Validate the block.
err := s.validateBlock(block)
@ -55,7 +55,7 @@ func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn
// Executes block's transactions on proxyAppConn.
// TODO: Generate a bitmap or otherwise store tx validity in state.
func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block) error {
func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error {
var validTxs, invalidTxs = 0, 0

View File

@ -1,23 +0,0 @@
# Pull base image.
FROM golang:1.6
# Grab deps (jq, hexdump)
RUN apt-get update && \
apt-get install -y --no-install-recommends \
jq bsdmainutils && \
rm -rf /var/lib/apt/lists/*
ENV TENDERMINT_ORG $GOPATH/src/github.com/tendermint/
RUN mkdir -p $TENDERMINT_ORG
COPY . $TENDERMINT_ORG/tendermint
WORKDIR $TENDERMINT_ORG/tendermint
RUN make get_vendor_deps
RUN go install ./cmd/tendermint
RUN bash scripts/install_tmsp_apps.sh
EXPOSE 46656
EXPOSE 46657

View File

@ -13,10 +13,15 @@ TESTNAME=$1
# store key value pair
KEY="abcd"
VALUE="dcba"
curl 127.0.0.1:46657/broadcast_tx_commit?tx=\"$(toHex $KEY=$VALUE)\"
curl -s 127.0.0.1:46657/broadcast_tx_commit?tx=\"$(toHex $KEY=$VALUE)\"
echo $?
echo ""
###########################
# test using the tmsp-cli
###########################
# we should be able to look up the key
RESPONSE=`tmsp-cli query $KEY`
@ -40,4 +45,34 @@ if [[ $? == 0 ]]; then
fi
set -e
#############################
# test using the /tmsp_query
#############################
# we should be able to look up the key
RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $KEY)\"`
RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p`
set +e
A=`echo $RESPONSE | grep exists=true`
if [[ $? != 0 ]]; then
echo "Failed to find 'exists=true' for $KEY. Response:"
echo "$RESPONSE"
exit 1
fi
set -e
# we should not be able to look up the value
RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $VALUE)\"`
RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p`
set +e
A=`echo $RESPONSE | grep exists=true`
if [[ $? == 0 ]]; then
echo "Found 'exists=true' for $VALUE when we should not have. Response:"
echo "$RESPONSE"
exit 1
fi
set -e
echo "Passed Test: $TESTNAME"

22
test/docker/Dockerfile Normal file
View File

@ -0,0 +1,22 @@
# Pull base image.
FROM golang:1.6
# Grab deps (jq, hexdump, xxd, killall)
RUN apt-get update && \
apt-get install -y --no-install-recommends \
jq bsdmainutils vim-common psmisc
ENV REPO $GOPATH/src/github.com/tendermint/tendermint
WORKDIR $REPO
ADD glide.yaml glide.yaml
ADD glide.lock glide.lock
ADD Makefile Makefile
RUN make get_vendor_deps
COPY . $REPO
RUN go install ./cmd/tendermint
RUN bash scripts/install_tmsp_apps.sh
EXPOSE 46656
EXPOSE 46657

3
test/docker/build.sh Normal file
View File

@ -0,0 +1,3 @@
#! /bin/bash
docker build -t tester -f ./test/docker/Dockerfile .

9
test/docker/update.sh Normal file
View File

@ -0,0 +1,9 @@
#! /bin/bash
# update the `tester` image by copying in the latest tendermint binary
docker run --name builder tester true
docker cp $GOPATH/bin/tendermint builder:/go/bin/tendermint
docker commit builder tester
docker rm -vf builder

View File

@ -1,6 +1,8 @@
#! /bin/bash
set -eu
# start a testnet and benchmark throughput using mintnet+netmon via the network_testing repo
DATACENTER=single
VALSETSIZE=4
BLOCKSIZE=8092

View File

@ -1,9 +1,20 @@
#! /bin/bash
###################################################################
# wait for all peers to come online
# for each peer:
# wait to have 3 peers
# wait to be at height > 1
# send a tx, wait for commit
# assert app hash on every peer reflects the post tx state
###################################################################
N=4
# wait for everyone to come online
echo "Waiting for nodes to come online"
for i in `seq 1 4`; do
addr="172.57.0.$((100+$i)):46657"
for i in `seq 1 $N`; do
addr=$(test/p2p/ip.sh $i):46657
curl -s $addr/status > /dev/null
ERR=$?
while [ "$ERR" != 0 ]; do
@ -16,8 +27,8 @@ done
echo ""
# run the test on each of them
for i in `seq 1 4`; do
addr="172.57.0.$((100+$i)):46657"
for i in `seq 1 $N`; do
addr=$(test/p2p/ip.sh $i):46657
# - assert everyone has 3 other peers
N_PEERS=`curl -s $addr/net_info | jq '.result[1].peers | length'`
@ -61,9 +72,10 @@ for i in `seq 1 4`; do
fi
# check we get the same new hash on all other nodes
for j in `seq 1 4`; do
for j in `seq 1 $N`; do
if [[ "$i" != "$j" ]]; then
HASH3=`curl -s 172.57.0.$((100+$j)):46657/status | jq .result[1].latest_app_hash`
addrJ=$(test/p2p/ip.sh $j):46657
HASH3=`curl -s $addrJ/status | jq .result[1].latest_app_hash`
if [[ "$HASH2" != "$HASH3" ]]; then
echo "App hash for node $j doesn't match. Got $HASH3, expected $HASH2"
@ -77,3 +89,4 @@ done
echo ""
echo "PASS"
echo ""

View File

@ -1,4 +1,5 @@
#! /bin/bash
# clean everything
docker rm -vf $(docker ps -aq)
docker network rm local_testnet

View File

@ -3,14 +3,16 @@ set -eu
DOCKER_IMAGE=$1
NETWORK_NAME=$2
CMD=$3
ID=$3
CMD=$4
echo "starting test client container with CMD=$CMD"
# run the test container on the local network
docker run -t \
--rm \
-v $GOPATH/src/github.com/tendermint/tendermint/test/p2p/:/go/src/github.com/tendermint/tendermint/test/p2p \
--net=$NETWORK_NAME \
--ip=172.57.0.99 \
--name test_container \
--ip=$(test/p2p/ip.sh "-1") \
--name test_container_$ID \
--entrypoint bash \
$DOCKER_IMAGE $CMD

View File

@ -0,0 +1,44 @@
#! /bin/bash
set -eu
set -o pipefail
###############################################################
# for each peer:
# kill peer
# bring it back online via fast sync
# check app hash
###############################################################
ID=$1
addr=$(test/p2p/ip.sh $ID):46657
peerID=$(( $(($ID % 4)) + 1 )) # 1->2 ... 3->4 ... 4->1
peer_addr=$(test/p2p/ip.sh $peerID):46657
# get another peer's height
h1=`curl -s $peer_addr/status | jq .result[1].latest_block_height`
# get another peer's state
root1=`curl -s $peer_addr/status | jq .result[1].latest_app_hash`
echo "Other peer is on height $h1 with state $root1"
echo "Waiting for peer $ID to catch up"
# wait for it to sync to past its previous height
set +e
set +o pipefail
h2="0"
while [[ "$h2" -lt "$(($h1+3))" ]]; do
sleep 1
h2=`curl -s $addr/status | jq .result[1].latest_block_height`
echo "... $h2"
done
# check the app hash
root2=`curl -s $addr/status | jq .result[1].latest_app_hash`
if [[ "$root1" != "$root2" ]]; then
echo "App hash after fast sync does not match. Got $root2; expected $root1"
exit 1
fi
echo "... fast sync successful"

7
test/p2p/ip.sh Executable file
View File

@ -0,0 +1,7 @@
#! /bin/bash
set -eu
ID=$1
echo "172.57.0.$((100+$ID))"

View File

@ -10,19 +10,12 @@ cd $GOPATH/src/github.com/tendermint/tendermint
docker network create --driver bridge --subnet 172.57.0.0/16 $NETWORK_NAME
N=4
seeds="172.57.0.101:46656"
seeds="$(test/p2p/ip.sh 1):46656"
for i in `seq 2 $N`; do
seeds="$seeds,172.57.0.$((100+$i)):46656"
seeds="$seeds,$(test/p2p/ip.sh $i):46656"
done
echo "Seeds: $seeds"
for i in `seq 1 $N`; do
# start tendermint container
docker run -d \
--net=$NETWORK_NAME \
--ip=172.57.0.$((100+$i)) \
--name local_testnet_$i \
--entrypoint tendermint \
-e TMROOT=/go/src/github.com/tendermint/tendermint/test/p2p/data/mach$i/core \
$DOCKER_IMAGE node --seeds $seeds --proxy_app=dummy
bash test/p2p/peer.sh $DOCKER_IMAGE $NETWORK_NAME $i $seeds
done

23
test/p2p/peer.sh Normal file
View File

@ -0,0 +1,23 @@
#! /bin/bash
set -eu
DOCKER_IMAGE=$1
NETWORK_NAME=$2
ID=$3
set +u
SEEDS=$4
set -u
if [[ "$SEEDS" != "" ]]; then
SEEDS=" --seeds $SEEDS "
fi
echo "starting tendermint peer ID=$ID"
# start tendermint container on the network
docker run -d \
--net=$NETWORK_NAME \
--ip=$(test/p2p/ip.sh $ID) \
--name local_testnet_$ID \
--entrypoint tendermint \
-e TMROOT=/go/src/github.com/tendermint/tendermint/test/p2p/data/mach$ID/core \
$DOCKER_IMAGE node $SEEDS --proxy_app=dummy

View File

@ -1,10 +1,38 @@
#! /bin/bash
set -eu
DOCKER_IMAGE=$1
NETWORK_NAME=local_testnet
cd $GOPATH/src/github.com/tendermint/tendermint
# start the testnet on a local network
bash test/p2p/local_testnet.sh $DOCKER_IMAGE $NETWORK_NAME
# run the test
bash test/p2p/test_client.sh $DOCKER_IMAGE $NETWORK_NAME test/p2p/run_test.sh
# test atomic broadcast
bash test/p2p/client.sh $DOCKER_IMAGE $NETWORK_NAME ab test/p2p/atomic_broadcast/test.sh
# test fast sync (from current state of network)
# run it on each of them
N=4
for i in `seq 1 $N`; do
echo "Testing fasysync on node $i"
# kill peer
set +e # circle sigh :(
docker rm -vf local_testnet_$i
set -e
# restart peer - should have an empty blockchain
SEEDS="$(test/p2p/ip.sh 1):46656"
for j in `seq 2 $N`; do
SEEDS="$SEEDS,$(test/p2p/ip.sh $j):46656"
done
bash test/p2p/peer.sh $DOCKER_IMAGE $NETWORK_NAME $i $SEEDS
bash test/p2p/client.sh $DOCKER_IMAGE $NETWORK_NAME fs_$i "test/p2p/fast_sync/test.sh $i"
done
echo ""
echo "PASS"
echo ""

View File

@ -1,20 +1,21 @@
#! /bin/bash
set -eu
# Top Level Testing Script
# See the github.com/tendermint/tendermint/test/README.md
echo ""
echo "* building docker file"
docker build -t tester -f ./test/Dockerfile .
echo "* building docker image"
bash ./test/docker/build.sh
echo ""
echo "* running go tests and app tests"
echo "* running go tests and app tests in docker container"
docker run -t tester bash test/run_test.sh
# test basic network connectivity
# by starting a local testnet and checking peers connect and make blocks
echo ""
echo "* running basic peer tests"
echo "* running p2p tests on a local docker network"
bash test/p2p/test.sh tester
# only run the cloud benchmark for releases

View File

@ -35,11 +35,13 @@ func voteToStep(vote *Vote) int8 {
}
type PrivValidator struct {
Address []byte `json:"address"`
PubKey crypto.PubKey `json:"pub_key"`
LastHeight int `json:"last_height"`
LastRound int `json:"last_round"`
LastStep int8 `json:"last_step"`
Address []byte `json:"address"`
PubKey crypto.PubKey `json:"pub_key"`
LastHeight int `json:"last_height"`
LastRound int `json:"last_round"`
LastStep int8 `json:"last_step"`
LastSignature crypto.Signature `json:"last_signature"` // so we dont lose signatures
LastSignBytes []byte `json:"last_signbytes"` // so we dont lose signatures
// PrivKey should be empty if a Signer other than the default is being used.
PrivKey crypto.PrivKey `json:"priv_key"`
@ -85,14 +87,16 @@ func GenPrivValidator() *PrivValidator {
pubKey := crypto.PubKeyEd25519(*pubKeyBytes)
privKey := crypto.PrivKeyEd25519(*privKeyBytes)
return &PrivValidator{
Address: pubKey.Address(),
PubKey: pubKey,
PrivKey: privKey,
LastHeight: 0,
LastRound: 0,
LastStep: stepNone,
filePath: "",
Signer: NewDefaultSigner(privKey),
Address: pubKey.Address(),
PubKey: pubKey,
PrivKey: privKey,
LastHeight: 0,
LastRound: 0,
LastStep: stepNone,
LastSignature: nil,
LastSignBytes: nil,
filePath: "",
Signer: NewDefaultSigner(privKey),
}
}
@ -149,56 +153,84 @@ func (privVal *PrivValidator) save() {
}
}
// NOTE: Unsafe!
func (privVal *PrivValidator) Reset() {
privVal.LastHeight = 0
privVal.LastRound = 0
privVal.LastStep = 0
privVal.LastSignature = nil
privVal.LastSignBytes = nil
privVal.Save()
}
func (privVal *PrivValidator) SignVote(chainID string, vote *Vote) error {
privVal.mtx.Lock()
defer privVal.mtx.Unlock()
// If height regression, panic
if privVal.LastHeight > vote.Height {
return errors.New("Height regression in SignVote")
signature, err := privVal.signBytesHRS(vote.Height, vote.Round, voteToStep(vote), SignBytes(chainID, vote))
if err != nil {
return errors.New(Fmt("Error signing vote: %v", err))
}
// More cases for when the height matches
if privVal.LastHeight == vote.Height {
// If round regression, panic
if privVal.LastRound > vote.Round {
return errors.New("Round regression in SignVote")
}
// If step regression, panic
if privVal.LastRound == vote.Round && privVal.LastStep > voteToStep(vote) {
return errors.New("Step regression in SignVote")
}
}
// Persist height/round/step
privVal.LastHeight = vote.Height
privVal.LastRound = vote.Round
privVal.LastStep = voteToStep(vote)
privVal.save()
// Sign
vote.Signature = privVal.Sign(SignBytes(chainID, vote)).(crypto.SignatureEd25519)
vote.Signature = signature.(crypto.SignatureEd25519)
return nil
}
func (privVal *PrivValidator) SignProposal(chainID string, proposal *Proposal) error {
privVal.mtx.Lock()
defer privVal.mtx.Unlock()
if privVal.LastHeight < proposal.Height ||
privVal.LastHeight == proposal.Height && privVal.LastRound < proposal.Round ||
privVal.LastHeight == 0 && privVal.LastRound == 0 && privVal.LastStep == stepNone {
// Persist height/round/step
privVal.LastHeight = proposal.Height
privVal.LastRound = proposal.Round
privVal.LastStep = stepPropose
privVal.save()
// Sign
proposal.Signature = privVal.Sign(SignBytes(chainID, proposal)).(crypto.SignatureEd25519)
return nil
} else {
return errors.New(fmt.Sprintf("Attempt of duplicate signing of proposal: Height %v, Round %v", proposal.Height, proposal.Round))
signature, err := privVal.signBytesHRS(proposal.Height, proposal.Round, stepPropose, SignBytes(chainID, proposal))
if err != nil {
return errors.New(Fmt("Error signing proposal: %v", err))
}
proposal.Signature = signature.(crypto.SignatureEd25519)
return nil
}
// check if there's a regression. Else sign and write the hrs+signature to disk
func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signBytes []byte) (crypto.Signature, error) {
// If height regression, err
if privVal.LastHeight > height {
return nil, errors.New("Height regression")
}
// More cases for when the height matches
if privVal.LastHeight == height {
// If round regression, err
if privVal.LastRound > round {
return nil, errors.New("Round regression")
}
// If step regression, err
if privVal.LastRound == round {
if privVal.LastStep > step {
return nil, errors.New("Step regression")
} else if privVal.LastStep == step {
if privVal.LastSignBytes != nil {
if privVal.LastSignature == nil {
PanicSanity("privVal: LastSignature is nil but LastSignBytes is not!")
}
// so we dont sign a conflicting vote or proposal
// NOTE: proposals are non-deterministic (include time),
// so we can actually lose them, but will still never sign conflicting ones
if bytes.Equal(privVal.LastSignBytes, signBytes) {
log.Notice("Using privVal.LastSignature", "sig", privVal.LastSignature)
return privVal.LastSignature, nil
}
}
return nil, errors.New("Step regression")
}
}
}
// Sign
signature := privVal.Sign(signBytes)
// Persist height/round/step
privVal.LastHeight = height
privVal.LastRound = round
privVal.LastStep = step
privVal.LastSignature = signature
privVal.LastSignBytes = signBytes
privVal.save()
return signature, nil
}
func (privVal *PrivValidator) String() string {

View File

@ -2,6 +2,6 @@ package version
const Maj = "0"
const Min = "7" // tmsp useability (protobuf, unix); optimizations; broadcast_tx_commit
const Fix = "0"
const Fix = "1" // query conn, peer filter, fast sync fix
const Version = Maj + "." + Min + "." + Fix