BroadcastTx results

This commit is contained in:
Jae Kwon 2016-02-08 00:48:58 -08:00
parent baa18fb0f7
commit a4f57e164b
14 changed files with 116 additions and 88 deletions

View File

@ -1,6 +1,6 @@
.PHONY: get_deps build all list_deps install
all: test install
all: get_deps install test
TMROOT = $${TMROOT:-$$HOME/.tendermint}
define NEWLINE

View File

@ -75,7 +75,9 @@ func (mem *Mempool) TxsFrontWait() *clist.CElement {
// Try a new transaction in the mempool.
// Potentially blocking if we're blocking on Update() or Reap().
func (mem *Mempool) CheckTx(tx types.Tx) (err error) {
// cb: A callback from the CheckTx command.
// It gets called from another goroutine.
func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
@ -96,7 +98,11 @@ func (mem *Mempool) CheckTx(tx types.Tx) (err error) {
if err = mem.proxyAppConn.Error(); err != nil {
return err
}
mem.proxyAppConn.CheckTxAsync(tx)
reqRes := mem.proxyAppConn.CheckTxAsync(tx)
if cb != nil {
reqRes.SetCallback(cb)
}
return nil
}

View File

@ -27,7 +27,7 @@ func TestSerialReap(t *testing.T) {
// This will succeed
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
err := mempool.CheckTx(txBytes)
err := mempool.CheckTx(txBytes, nil)
if err != nil {
t.Fatal("Error after CheckTx: %v", err)
}
@ -35,7 +35,7 @@ func TestSerialReap(t *testing.T) {
// This will fail because not serial (incrementing)
// However, error should still be nil.
// It just won't show up on Reap().
err = mempool.CheckTx(txBytes)
err = mempool.CheckTx(txBytes, nil)
if err != nil {
t.Fatal("Error after CheckTx: %v", err)
}

View File

@ -8,10 +8,11 @@ import (
"github.com/tendermint/go-clist"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-events"
"github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire"
"github.com/tendermint/go-events"
"github.com/tendermint/tendermint/types"
tmsp "github.com/tendermint/tmsp/types"
)
const (
@ -67,7 +68,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *TxMessage:
err := memR.Mempool.CheckTx(msg.Tx)
err := memR.Mempool.CheckTx(msg.Tx, nil)
if err != nil {
// Bad, seen, or conflicting tx.
log.Info("Could not add tx", "tx", msg.Tx)
@ -82,8 +83,8 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
}
// Just an alias for CheckTx since broadcasting happens in peer routines
func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
return memR.Mempool.CheckTx(tx)
func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*tmsp.Response)) error {
return memR.Mempool.CheckTx(tx, cb)
}
type PeerState interface {

View File

@ -234,13 +234,10 @@ func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) {
mtx := new(sync.Mutex)
proxyAppConn = proxy.NewLocalAppConn(mtx, app)
} else {
proxyConn, err := Connect(addr)
remoteApp, err := proxy.NewRemoteAppConn(addr)
if err != nil {
Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
}
remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024)
remoteApp.Start()
proxyAppConn = remoteApp
}
@ -274,7 +271,6 @@ func getState() *sm.State {
// should fork tendermint/tendermint and implement RunNode to
// load their custom priv validator and call NewNode(privVal)
func RunNode() {
// Wait until the genesis doc becomes available
genDocFile := config.GetString("genesis_file")
if !FileExists(genDocFile) {

View File

@ -8,12 +8,12 @@ type AppConn interface {
SetResponseCallback(tmspcli.Callback)
Error() error
EchoAsync(msg string)
FlushAsync()
AppendTxAsync(tx []byte)
CheckTxAsync(tx []byte)
GetHashAsync()
SetOptionAsync(key string, value string)
EchoAsync(msg string) *tmspcli.ReqRes
FlushAsync() *tmspcli.ReqRes
AppendTxAsync(tx []byte) *tmspcli.ReqRes
CheckTxAsync(tx []byte) *tmspcli.ReqRes
GetHashAsync() *tmspcli.ReqRes
SetOptionAsync(key string, value string) *tmspcli.ReqRes
InfoSync() (info string, err error)
FlushSync() error

View File

@ -30,18 +30,20 @@ func (app *localAppConn) Error() error {
return nil
}
func (app *localAppConn) EchoAsync(msg string) {
func (app *localAppConn) EchoAsync(msg string) *tmspcli.ReqRes {
app.Callback(
tmsp.RequestEcho(msg),
tmsp.ResponseEcho(msg),
)
return nil // TODO maybe create a ReqRes
}
func (app *localAppConn) FlushAsync() {
func (app *localAppConn) FlushAsync() *tmspcli.ReqRes {
// Do nothing
return nil // TODO maybe create a ReqRes
}
func (app *localAppConn) SetOptionAsync(key string, value string) {
func (app *localAppConn) SetOptionAsync(key string, value string) *tmspcli.ReqRes {
app.mtx.Lock()
log := app.Application.SetOption(key, value)
app.mtx.Unlock()
@ -49,9 +51,10 @@ func (app *localAppConn) SetOptionAsync(key string, value string) {
tmsp.RequestSetOption(key, value),
tmsp.ResponseSetOption(log),
)
return nil // TODO maybe create a ReqRes
}
func (app *localAppConn) AppendTxAsync(tx []byte) {
func (app *localAppConn) AppendTxAsync(tx []byte) *tmspcli.ReqRes {
app.mtx.Lock()
code, result, log := app.Application.AppendTx(tx)
app.mtx.Unlock()
@ -59,9 +62,10 @@ func (app *localAppConn) AppendTxAsync(tx []byte) {
tmsp.RequestAppendTx(tx),
tmsp.ResponseAppendTx(code, result, log),
)
return nil // TODO maybe create a ReqRes
}
func (app *localAppConn) CheckTxAsync(tx []byte) {
func (app *localAppConn) CheckTxAsync(tx []byte) *tmspcli.ReqRes {
app.mtx.Lock()
code, result, log := app.Application.CheckTx(tx)
app.mtx.Unlock()
@ -69,9 +73,10 @@ func (app *localAppConn) CheckTxAsync(tx []byte) {
tmsp.RequestCheckTx(tx),
tmsp.ResponseCheckTx(code, result, log),
)
return nil // TODO maybe create a ReqRes
}
func (app *localAppConn) GetHashAsync() {
func (app *localAppConn) GetHashAsync() *tmspcli.ReqRes {
app.mtx.Lock()
hash, log := app.Application.GetHash()
app.mtx.Unlock()
@ -79,6 +84,7 @@ func (app *localAppConn) GetHashAsync() {
tmsp.RequestGetHash(),
tmsp.ResponseGetHash(hash, log),
)
return nil // TODO maybe create a ReqRes
}
func (app *localAppConn) InfoSync() (info string, err error) {

View File

@ -1,8 +1,6 @@
package proxy
import (
"net"
tmspcli "github.com/tendermint/tmsp/client"
)
@ -13,9 +11,13 @@ type remoteAppConn struct {
*tmspcli.TMSPClient
}
func NewRemoteAppConn(conn net.Conn, bufferSize int) *remoteAppConn {
app := &remoteAppConn{
TMSPClient: tmspcli.NewTMSPClient(conn, bufferSize),
func NewRemoteAppConn(addr string) (*remoteAppConn, error) {
client, err := tmspcli.NewTMSPClient(addr)
if err != nil {
return nil, err
}
return app
appConn := &remoteAppConn{
TMSPClient: client,
}
return appConn, nil
}

View File

@ -1,63 +1,51 @@
package proxy
import (
"bytes"
"strings"
"testing"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-logio"
"github.com/tendermint/tmsp/example/golang"
"github.com/tendermint/tmsp/server"
)
func TestEcho(t *testing.T) {
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
// Start server
_, err := server.StartListener(sockPath, example.NewDummyApplication())
if err != nil {
Exit(err.Error())
}
conn, err := Connect(sockPath)
// Start client
proxy, err := NewRemoteAppConn(sockPath)
if err != nil {
Exit(err.Error())
} else {
t.Log("Connected")
}
logBuffer := bytes.NewBuffer(nil)
logConn := logio.NewLoggedConn(conn, logBuffer)
proxy := NewRemoteAppConn(logConn, 10)
proxy.SetResponseCallback(nil)
proxy.Start()
for i := 0; i < 1000; i++ {
proxy.EchoAsync(Fmt("echo-%v", i))
}
proxy.FlushSync()
/*
if t.Failed() {
logio.PrintReader(logBuffer)
}
*/
}
func BenchmarkEcho(b *testing.B) {
b.StopTimer() // Initialize
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
// Start server
_, err := server.StartListener(sockPath, example.NewDummyApplication())
if err != nil {
Exit(err.Error())
}
conn, err := Connect(sockPath)
// Start client
proxy, err := NewRemoteAppConn(sockPath)
if err != nil {
Exit(err.Error())
} else {
b.Log("Connected")
}
proxy := NewRemoteAppConn(conn, 10)
proxy.Start()
echoString := strings.Repeat(" ", 200)
b.StartTimer() // Start benchmarking tests
@ -73,20 +61,18 @@ func BenchmarkEcho(b *testing.B) {
func TestInfo(t *testing.T) {
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
// Start server
_, err := server.StartListener(sockPath, example.NewDummyApplication())
if err != nil {
Exit(err.Error())
}
conn, err := Connect(sockPath)
// Start client
proxy, err := NewRemoteAppConn(sockPath)
if err != nil {
Exit(err.Error())
} else {
t.Log("Connected")
}
logBuffer := bytes.NewBuffer(nil)
logConn := logio.NewLoggedConn(conn, logBuffer)
proxy := NewRemoteAppConn(logConn, 10)
proxy.Start()
data, err := proxy.InfoSync()
if err != nil {

View File

@ -31,7 +31,7 @@ func BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, err
//-----------------------------------------------------------------------------
func GetBlock(height int) (*ctypes.ResultGetBlock, error) {
func Block(height int) (*ctypes.ResultBlock, error) {
if height == 0 {
return nil, fmt.Errorf("Height must be greater than 0")
}
@ -41,5 +41,5 @@ func GetBlock(height int) (*ctypes.ResultGetBlock, error) {
blockMeta := blockStore.LoadBlockMeta(height)
block := blockStore.LoadBlock(height)
return &ctypes.ResultGetBlock{blockMeta, block}, nil
return &ctypes.ResultBlock{blockMeta, block}, nil
}

View File

@ -7,7 +7,7 @@ import (
"github.com/tendermint/tendermint/types"
)
func ListValidators() (*ctypes.ResultListValidators, error) {
func Validators() (*ctypes.ResultValidators, error) {
var blockHeight int
var validators []*types.Validator
@ -18,7 +18,7 @@ func ListValidators() (*ctypes.ResultListValidators, error) {
return false
})
return &ctypes.ResultListValidators{blockHeight, validators}, nil
return &ctypes.ResultValidators{blockHeight, validators}, nil
}
func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {

View File

@ -4,20 +4,38 @@ import (
"fmt"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
tmsp "github.com/tendermint/tmsp/types"
)
//-----------------------------------------------------------------------------
// Note: tx must be signed
func BroadcastTx(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := mempoolReactor.BroadcastTx(tx)
// NOTE: tx must be signed
func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := mempoolReactor.BroadcastTx(tx, nil)
if err != nil {
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
return &ctypes.ResultBroadcastTx{}, nil
}
func ListUnconfirmedTxs() (*ctypes.ResultListUnconfirmedTxs, error) {
txs, err := mempoolReactor.Mempool.Reap()
return &ctypes.ResultListUnconfirmedTxs{len(txs), txs}, err
// Note: tx must be signed
func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
resCh := make(chan *tmsp.Response)
err := mempoolReactor.BroadcastTx(tx, func(res *tmsp.Response) {
resCh <- res
})
if err != nil {
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
res := <-resCh
return &ctypes.ResultBroadcastTx{
Code: res.Code,
Data: res.Data,
Log: res.Log,
}, nil
}
func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
txs, err := mempoolReactor.Mempool.Reap()
return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, err
}

View File

@ -15,11 +15,12 @@ var Routes = map[string]*rpc.RPCFunc{
"dial_seeds": rpc.NewRPCFunc(DialSeedsResult, "seeds"),
"blockchain": rpc.NewRPCFunc(BlockchainInfoResult, "minHeight,maxHeight"),
"genesis": rpc.NewRPCFunc(GenesisResult, ""),
"get_block": rpc.NewRPCFunc(GetBlockResult, "height"),
"list_validators": rpc.NewRPCFunc(ListValidatorsResult, ""),
"block": rpc.NewRPCFunc(BlockResult, "height"),
"validators": rpc.NewRPCFunc(ValidatorsResult, ""),
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""),
"broadcast_tx": rpc.NewRPCFunc(BroadcastTxResult, "tx"),
"list_unconfirmed_txs": rpc.NewRPCFunc(ListUnconfirmedTxsResult, ""),
"broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSyncResult, "tx"),
"broadcast_tx_asyn": rpc.NewRPCFunc(BroadcastTxAsyncResult, "tx"),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""),
// subscribe/unsubscribe are reserved for websocket events.
}
@ -79,16 +80,16 @@ func GenesisResult() (ctypes.TMResult, error) {
}
}
func GetBlockResult(height int) (ctypes.TMResult, error) {
if r, err := GetBlock(height); err != nil {
func BlockResult(height int) (ctypes.TMResult, error) {
if r, err := Block(height); err != nil {
return nil, err
} else {
return r, nil
}
}
func ListValidatorsResult() (ctypes.TMResult, error) {
if r, err := ListValidators(); err != nil {
func ValidatorsResult() (ctypes.TMResult, error) {
if r, err := Validators(); err != nil {
return nil, err
} else {
return r, nil
@ -103,16 +104,24 @@ func DumpConsensusStateResult() (ctypes.TMResult, error) {
}
}
func ListUnconfirmedTxsResult() (ctypes.TMResult, error) {
if r, err := ListUnconfirmedTxs(); err != nil {
func UnconfirmedTxsResult() (ctypes.TMResult, error) {
if r, err := UnconfirmedTxs(); err != nil {
return nil, err
} else {
return r, nil
}
}
func BroadcastTxResult(tx []byte) (ctypes.TMResult, error) {
if r, err := BroadcastTx(tx); err != nil {
func BroadcastTxSyncResult(tx []byte) (ctypes.TMResult, error) {
if r, err := BroadcastTxSync(tx); err != nil {
return nil, err
} else {
return r, nil
}
}
func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) {
if r, err := BroadcastTxAsync(tx); err != nil {
return nil, err
} else {
return r, nil

View File

@ -6,6 +6,7 @@ import (
"github.com/tendermint/go-rpc/types"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types"
tmsp "github.com/tendermint/tmsp/types"
)
type ResultBlockchainInfo struct {
@ -17,7 +18,7 @@ type ResultGenesis struct {
Genesis *types.GenesisDoc `json:"genesis"`
}
type ResultGetBlock struct {
type ResultBlock struct {
BlockMeta *types.BlockMeta `json:"block_meta"`
Block *types.Block `json:"block"`
}
@ -46,7 +47,7 @@ type Peer struct {
ConnectionStatus p2p.ConnectionStatus `json:"connection_status"`
}
type ResultListValidators struct {
type ResultValidators struct {
BlockHeight int `json:"block_height"`
Validators []*types.Validator `json:"validators"`
}
@ -57,9 +58,12 @@ type ResultDumpConsensusState struct {
}
type ResultBroadcastTx struct {
Code tmsp.CodeType `json:"code"`
Data []byte `json:"data"`
Log string `json:"log"`
}
type ResultListUnconfirmedTxs struct {
type ResultUnconfirmedTxs struct {
N int `json:"n_txs"`
Txs []types.Tx `json:"txs"`
}
@ -82,7 +86,7 @@ const (
// 0x0 bytes are for the blockchain
ResultTypeGenesis = byte(0x01)
ResultTypeBlockchainInfo = byte(0x02)
ResultTypeGetBlock = byte(0x03)
ResultTypeBlock = byte(0x03)
// 0x2 bytes are for the network
ResultTypeStatus = byte(0x20)
@ -90,12 +94,12 @@ const (
ResultTypeDialSeeds = byte(0x22)
// 0x4 bytes are for the consensus
ResultTypeListValidators = byte(0x40)
ResultTypeValidators = byte(0x40)
ResultTypeDumpConsensusState = byte(0x41)
// 0x6 bytes are for txs / the application
ResultTypeBroadcastTx = byte(0x60)
ResultTypeListUnconfirmedTxs = byte(0x61)
ResultTypeBroadcastTx = byte(0x60)
ResultTypeUnconfirmedTxs = byte(0x61)
// 0x8 bytes are for events
ResultTypeSubscribe = byte(0x80)
@ -112,14 +116,14 @@ var _ = wire.RegisterInterface(
struct{ TMResult }{},
wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis},
wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo},
wire.ConcreteType{&ResultGetBlock{}, ResultTypeGetBlock},
wire.ConcreteType{&ResultBlock{}, ResultTypeBlock},
wire.ConcreteType{&ResultStatus{}, ResultTypeStatus},
wire.ConcreteType{&ResultNetInfo{}, ResultTypeNetInfo},
wire.ConcreteType{&ResultDialSeeds{}, ResultTypeDialSeeds},
wire.ConcreteType{&ResultListValidators{}, ResultTypeListValidators},
wire.ConcreteType{&ResultValidators{}, ResultTypeValidators},
wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState},
wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx},
wire.ConcreteType{&ResultListUnconfirmedTxs{}, ResultTypeListUnconfirmedTxs},
wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs},
wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe},
wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe},
wire.ConcreteType{&ResultEvent{}, ResultTypeEvent},