From a4f57e164b4fbd137c797d8b8852553c4be627b2 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 8 Feb 2016 00:48:58 -0800 Subject: [PATCH] BroadcastTx results --- Makefile | 2 +- mempool/mempool.go | 10 ++++++++-- mempool/mempool_test.go | 4 ++-- mempool/reactor.go | 9 +++++---- node/node.go | 6 +----- proxy/app_conn.go | 12 ++++++------ proxy/local_app_conn.go | 18 ++++++++++++------ proxy/remote_app_conn.go | 14 ++++++++------ proxy/remote_app_conn_test.go | 34 ++++++++++------------------------ rpc/core/blocks.go | 4 ++-- rpc/core/consensus.go | 4 ++-- rpc/core/mempool.go | 30 ++++++++++++++++++++++++------ rpc/core/routes.go | 33 +++++++++++++++++++++------------ rpc/core/types/responses.go | 24 ++++++++++++++---------- 14 files changed, 116 insertions(+), 88 deletions(-) diff --git a/Makefile b/Makefile index 303756f4..54b1cd74 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .PHONY: get_deps build all list_deps install -all: test install +all: get_deps install test TMROOT = $${TMROOT:-$$HOME/.tendermint} define NEWLINE diff --git a/mempool/mempool.go b/mempool/mempool.go index 53eaa287..edd18846 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -75,7 +75,9 @@ func (mem *Mempool) TxsFrontWait() *clist.CElement { // Try a new transaction in the mempool. // Potentially blocking if we're blocking on Update() or Reap(). -func (mem *Mempool) CheckTx(tx types.Tx) (err error) { +// cb: A callback from the CheckTx command. +// It gets called from another goroutine. +func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() @@ -96,7 +98,11 @@ func (mem *Mempool) CheckTx(tx types.Tx) (err error) { if err = mem.proxyAppConn.Error(); err != nil { return err } - mem.proxyAppConn.CheckTxAsync(tx) + reqRes := mem.proxyAppConn.CheckTxAsync(tx) + if cb != nil { + reqRes.SetCallback(cb) + } + return nil } diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index f1fb3eaa..ac241f29 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -27,7 +27,7 @@ func TestSerialReap(t *testing.T) { // This will succeed txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := mempool.CheckTx(txBytes) + err := mempool.CheckTx(txBytes, nil) if err != nil { t.Fatal("Error after CheckTx: %v", err) } @@ -35,7 +35,7 @@ func TestSerialReap(t *testing.T) { // This will fail because not serial (incrementing) // However, error should still be nil. // It just won't show up on Reap(). - err = mempool.CheckTx(txBytes) + err = mempool.CheckTx(txBytes, nil) if err != nil { t.Fatal("Error after CheckTx: %v", err) } diff --git a/mempool/reactor.go b/mempool/reactor.go index 51be9bea..9a8649fd 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -8,10 +8,11 @@ import ( "github.com/tendermint/go-clist" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - "github.com/tendermint/go-events" "github.com/tendermint/tendermint/types" + tmsp "github.com/tendermint/tmsp/types" ) const ( @@ -67,7 +68,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { switch msg := msg.(type) { case *TxMessage: - err := memR.Mempool.CheckTx(msg.Tx) + err := memR.Mempool.CheckTx(msg.Tx, nil) if err != nil { // Bad, seen, or conflicting tx. log.Info("Could not add tx", "tx", msg.Tx) @@ -82,8 +83,8 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { } // Just an alias for CheckTx since broadcasting happens in peer routines -func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { - return memR.Mempool.CheckTx(tx) +func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*tmsp.Response)) error { + return memR.Mempool.CheckTx(tx, cb) } type PeerState interface { diff --git a/node/node.go b/node/node.go index a0631805..7072068e 100644 --- a/node/node.go +++ b/node/node.go @@ -234,13 +234,10 @@ func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) { mtx := new(sync.Mutex) proxyAppConn = proxy.NewLocalAppConn(mtx, app) } else { - proxyConn, err := Connect(addr) + remoteApp, err := proxy.NewRemoteAppConn(addr) if err != nil { Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) } - remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024) - remoteApp.Start() - proxyAppConn = remoteApp } @@ -274,7 +271,6 @@ func getState() *sm.State { // should fork tendermint/tendermint and implement RunNode to // load their custom priv validator and call NewNode(privVal) func RunNode() { - // Wait until the genesis doc becomes available genDocFile := config.GetString("genesis_file") if !FileExists(genDocFile) { diff --git a/proxy/app_conn.go b/proxy/app_conn.go index 1cba8812..391254fe 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -8,12 +8,12 @@ type AppConn interface { SetResponseCallback(tmspcli.Callback) Error() error - EchoAsync(msg string) - FlushAsync() - AppendTxAsync(tx []byte) - CheckTxAsync(tx []byte) - GetHashAsync() - SetOptionAsync(key string, value string) + EchoAsync(msg string) *tmspcli.ReqRes + FlushAsync() *tmspcli.ReqRes + AppendTxAsync(tx []byte) *tmspcli.ReqRes + CheckTxAsync(tx []byte) *tmspcli.ReqRes + GetHashAsync() *tmspcli.ReqRes + SetOptionAsync(key string, value string) *tmspcli.ReqRes InfoSync() (info string, err error) FlushSync() error diff --git a/proxy/local_app_conn.go b/proxy/local_app_conn.go index b982c062..713f7f38 100644 --- a/proxy/local_app_conn.go +++ b/proxy/local_app_conn.go @@ -30,18 +30,20 @@ func (app *localAppConn) Error() error { return nil } -func (app *localAppConn) EchoAsync(msg string) { +func (app *localAppConn) EchoAsync(msg string) *tmspcli.ReqRes { app.Callback( tmsp.RequestEcho(msg), tmsp.ResponseEcho(msg), ) + return nil // TODO maybe create a ReqRes } -func (app *localAppConn) FlushAsync() { +func (app *localAppConn) FlushAsync() *tmspcli.ReqRes { // Do nothing + return nil // TODO maybe create a ReqRes } -func (app *localAppConn) SetOptionAsync(key string, value string) { +func (app *localAppConn) SetOptionAsync(key string, value string) *tmspcli.ReqRes { app.mtx.Lock() log := app.Application.SetOption(key, value) app.mtx.Unlock() @@ -49,9 +51,10 @@ func (app *localAppConn) SetOptionAsync(key string, value string) { tmsp.RequestSetOption(key, value), tmsp.ResponseSetOption(log), ) + return nil // TODO maybe create a ReqRes } -func (app *localAppConn) AppendTxAsync(tx []byte) { +func (app *localAppConn) AppendTxAsync(tx []byte) *tmspcli.ReqRes { app.mtx.Lock() code, result, log := app.Application.AppendTx(tx) app.mtx.Unlock() @@ -59,9 +62,10 @@ func (app *localAppConn) AppendTxAsync(tx []byte) { tmsp.RequestAppendTx(tx), tmsp.ResponseAppendTx(code, result, log), ) + return nil // TODO maybe create a ReqRes } -func (app *localAppConn) CheckTxAsync(tx []byte) { +func (app *localAppConn) CheckTxAsync(tx []byte) *tmspcli.ReqRes { app.mtx.Lock() code, result, log := app.Application.CheckTx(tx) app.mtx.Unlock() @@ -69,9 +73,10 @@ func (app *localAppConn) CheckTxAsync(tx []byte) { tmsp.RequestCheckTx(tx), tmsp.ResponseCheckTx(code, result, log), ) + return nil // TODO maybe create a ReqRes } -func (app *localAppConn) GetHashAsync() { +func (app *localAppConn) GetHashAsync() *tmspcli.ReqRes { app.mtx.Lock() hash, log := app.Application.GetHash() app.mtx.Unlock() @@ -79,6 +84,7 @@ func (app *localAppConn) GetHashAsync() { tmsp.RequestGetHash(), tmsp.ResponseGetHash(hash, log), ) + return nil // TODO maybe create a ReqRes } func (app *localAppConn) InfoSync() (info string, err error) { diff --git a/proxy/remote_app_conn.go b/proxy/remote_app_conn.go index 4f750498..35200943 100644 --- a/proxy/remote_app_conn.go +++ b/proxy/remote_app_conn.go @@ -1,8 +1,6 @@ package proxy import ( - "net" - tmspcli "github.com/tendermint/tmsp/client" ) @@ -13,9 +11,13 @@ type remoteAppConn struct { *tmspcli.TMSPClient } -func NewRemoteAppConn(conn net.Conn, bufferSize int) *remoteAppConn { - app := &remoteAppConn{ - TMSPClient: tmspcli.NewTMSPClient(conn, bufferSize), +func NewRemoteAppConn(addr string) (*remoteAppConn, error) { + client, err := tmspcli.NewTMSPClient(addr) + if err != nil { + return nil, err } - return app + appConn := &remoteAppConn{ + TMSPClient: client, + } + return appConn, nil } diff --git a/proxy/remote_app_conn_test.go b/proxy/remote_app_conn_test.go index 2ece5695..14d01693 100644 --- a/proxy/remote_app_conn_test.go +++ b/proxy/remote_app_conn_test.go @@ -1,63 +1,51 @@ package proxy import ( - "bytes" "strings" "testing" . "github.com/tendermint/go-common" - "github.com/tendermint/go-logio" "github.com/tendermint/tmsp/example/golang" "github.com/tendermint/tmsp/server" ) func TestEcho(t *testing.T) { sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + + // Start server _, err := server.StartListener(sockPath, example.NewDummyApplication()) if err != nil { Exit(err.Error()) } - conn, err := Connect(sockPath) + // Start client + proxy, err := NewRemoteAppConn(sockPath) if err != nil { Exit(err.Error()) } else { t.Log("Connected") } - logBuffer := bytes.NewBuffer(nil) - logConn := logio.NewLoggedConn(conn, logBuffer) - proxy := NewRemoteAppConn(logConn, 10) - proxy.SetResponseCallback(nil) - proxy.Start() - for i := 0; i < 1000; i++ { proxy.EchoAsync(Fmt("echo-%v", i)) } proxy.FlushSync() - - /* - if t.Failed() { - logio.PrintReader(logBuffer) - } - */ } func BenchmarkEcho(b *testing.B) { b.StopTimer() // Initialize sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + // Start server _, err := server.StartListener(sockPath, example.NewDummyApplication()) if err != nil { Exit(err.Error()) } - conn, err := Connect(sockPath) + // Start client + proxy, err := NewRemoteAppConn(sockPath) if err != nil { Exit(err.Error()) } else { b.Log("Connected") } - - proxy := NewRemoteAppConn(conn, 10) - proxy.Start() echoString := strings.Repeat(" ", 200) b.StartTimer() // Start benchmarking tests @@ -73,20 +61,18 @@ func BenchmarkEcho(b *testing.B) { func TestInfo(t *testing.T) { sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + // Start server _, err := server.StartListener(sockPath, example.NewDummyApplication()) if err != nil { Exit(err.Error()) } - conn, err := Connect(sockPath) + // Start client + proxy, err := NewRemoteAppConn(sockPath) if err != nil { Exit(err.Error()) } else { t.Log("Connected") } - - logBuffer := bytes.NewBuffer(nil) - logConn := logio.NewLoggedConn(conn, logBuffer) - proxy := NewRemoteAppConn(logConn, 10) proxy.Start() data, err := proxy.InfoSync() if err != nil { diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index a04de832..fd5e2ff6 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -31,7 +31,7 @@ func BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, err //----------------------------------------------------------------------------- -func GetBlock(height int) (*ctypes.ResultGetBlock, error) { +func Block(height int) (*ctypes.ResultBlock, error) { if height == 0 { return nil, fmt.Errorf("Height must be greater than 0") } @@ -41,5 +41,5 @@ func GetBlock(height int) (*ctypes.ResultGetBlock, error) { blockMeta := blockStore.LoadBlockMeta(height) block := blockStore.LoadBlock(height) - return &ctypes.ResultGetBlock{blockMeta, block}, nil + return &ctypes.ResultBlock{blockMeta, block}, nil } diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index e4bfb8c3..92ea4edf 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -7,7 +7,7 @@ import ( "github.com/tendermint/tendermint/types" ) -func ListValidators() (*ctypes.ResultListValidators, error) { +func Validators() (*ctypes.ResultValidators, error) { var blockHeight int var validators []*types.Validator @@ -18,7 +18,7 @@ func ListValidators() (*ctypes.ResultListValidators, error) { return false }) - return &ctypes.ResultListValidators{blockHeight, validators}, nil + return &ctypes.ResultValidators{blockHeight, validators}, nil } func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 8df0cc01..3e4d81b1 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -4,20 +4,38 @@ import ( "fmt" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" + tmsp "github.com/tendermint/tmsp/types" ) //----------------------------------------------------------------------------- -// Note: tx must be signed -func BroadcastTx(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - err := mempoolReactor.BroadcastTx(tx) +// NOTE: tx must be signed +func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + err := mempoolReactor.BroadcastTx(tx, nil) if err != nil { return nil, fmt.Errorf("Error broadcasting transaction: %v", err) } return &ctypes.ResultBroadcastTx{}, nil } -func ListUnconfirmedTxs() (*ctypes.ResultListUnconfirmedTxs, error) { - txs, err := mempoolReactor.Mempool.Reap() - return &ctypes.ResultListUnconfirmedTxs{len(txs), txs}, err +// Note: tx must be signed +func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + resCh := make(chan *tmsp.Response) + err := mempoolReactor.BroadcastTx(tx, func(res *tmsp.Response) { + resCh <- res + }) + if err != nil { + return nil, fmt.Errorf("Error broadcasting transaction: %v", err) + } + res := <-resCh + return &ctypes.ResultBroadcastTx{ + Code: res.Code, + Data: res.Data, + Log: res.Log, + }, nil +} + +func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { + txs, err := mempoolReactor.Mempool.Reap() + return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, err } diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 4a8d3800..8122039b 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -15,11 +15,12 @@ var Routes = map[string]*rpc.RPCFunc{ "dial_seeds": rpc.NewRPCFunc(DialSeedsResult, "seeds"), "blockchain": rpc.NewRPCFunc(BlockchainInfoResult, "minHeight,maxHeight"), "genesis": rpc.NewRPCFunc(GenesisResult, ""), - "get_block": rpc.NewRPCFunc(GetBlockResult, "height"), - "list_validators": rpc.NewRPCFunc(ListValidatorsResult, ""), + "block": rpc.NewRPCFunc(BlockResult, "height"), + "validators": rpc.NewRPCFunc(ValidatorsResult, ""), "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""), - "broadcast_tx": rpc.NewRPCFunc(BroadcastTxResult, "tx"), - "list_unconfirmed_txs": rpc.NewRPCFunc(ListUnconfirmedTxsResult, ""), + "broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSyncResult, "tx"), + "broadcast_tx_asyn": rpc.NewRPCFunc(BroadcastTxAsyncResult, "tx"), + "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), // subscribe/unsubscribe are reserved for websocket events. } @@ -79,16 +80,16 @@ func GenesisResult() (ctypes.TMResult, error) { } } -func GetBlockResult(height int) (ctypes.TMResult, error) { - if r, err := GetBlock(height); err != nil { +func BlockResult(height int) (ctypes.TMResult, error) { + if r, err := Block(height); err != nil { return nil, err } else { return r, nil } } -func ListValidatorsResult() (ctypes.TMResult, error) { - if r, err := ListValidators(); err != nil { +func ValidatorsResult() (ctypes.TMResult, error) { + if r, err := Validators(); err != nil { return nil, err } else { return r, nil @@ -103,16 +104,24 @@ func DumpConsensusStateResult() (ctypes.TMResult, error) { } } -func ListUnconfirmedTxsResult() (ctypes.TMResult, error) { - if r, err := ListUnconfirmedTxs(); err != nil { +func UnconfirmedTxsResult() (ctypes.TMResult, error) { + if r, err := UnconfirmedTxs(); err != nil { return nil, err } else { return r, nil } } -func BroadcastTxResult(tx []byte) (ctypes.TMResult, error) { - if r, err := BroadcastTx(tx); err != nil { +func BroadcastTxSyncResult(tx []byte) (ctypes.TMResult, error) { + if r, err := BroadcastTxSync(tx); err != nil { + return nil, err + } else { + return r, nil + } +} + +func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) { + if r, err := BroadcastTxAsync(tx); err != nil { return nil, err } else { return r, nil diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index e6fa6e06..b56d701b 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -6,6 +6,7 @@ import ( "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" + tmsp "github.com/tendermint/tmsp/types" ) type ResultBlockchainInfo struct { @@ -17,7 +18,7 @@ type ResultGenesis struct { Genesis *types.GenesisDoc `json:"genesis"` } -type ResultGetBlock struct { +type ResultBlock struct { BlockMeta *types.BlockMeta `json:"block_meta"` Block *types.Block `json:"block"` } @@ -46,7 +47,7 @@ type Peer struct { ConnectionStatus p2p.ConnectionStatus `json:"connection_status"` } -type ResultListValidators struct { +type ResultValidators struct { BlockHeight int `json:"block_height"` Validators []*types.Validator `json:"validators"` } @@ -57,9 +58,12 @@ type ResultDumpConsensusState struct { } type ResultBroadcastTx struct { + Code tmsp.CodeType `json:"code"` + Data []byte `json:"data"` + Log string `json:"log"` } -type ResultListUnconfirmedTxs struct { +type ResultUnconfirmedTxs struct { N int `json:"n_txs"` Txs []types.Tx `json:"txs"` } @@ -82,7 +86,7 @@ const ( // 0x0 bytes are for the blockchain ResultTypeGenesis = byte(0x01) ResultTypeBlockchainInfo = byte(0x02) - ResultTypeGetBlock = byte(0x03) + ResultTypeBlock = byte(0x03) // 0x2 bytes are for the network ResultTypeStatus = byte(0x20) @@ -90,12 +94,12 @@ const ( ResultTypeDialSeeds = byte(0x22) // 0x4 bytes are for the consensus - ResultTypeListValidators = byte(0x40) + ResultTypeValidators = byte(0x40) ResultTypeDumpConsensusState = byte(0x41) // 0x6 bytes are for txs / the application - ResultTypeBroadcastTx = byte(0x60) - ResultTypeListUnconfirmedTxs = byte(0x61) + ResultTypeBroadcastTx = byte(0x60) + ResultTypeUnconfirmedTxs = byte(0x61) // 0x8 bytes are for events ResultTypeSubscribe = byte(0x80) @@ -112,14 +116,14 @@ var _ = wire.RegisterInterface( struct{ TMResult }{}, wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, - wire.ConcreteType{&ResultGetBlock{}, ResultTypeGetBlock}, + wire.ConcreteType{&ResultBlock{}, ResultTypeBlock}, wire.ConcreteType{&ResultStatus{}, ResultTypeStatus}, wire.ConcreteType{&ResultNetInfo{}, ResultTypeNetInfo}, wire.ConcreteType{&ResultDialSeeds{}, ResultTypeDialSeeds}, - wire.ConcreteType{&ResultListValidators{}, ResultTypeListValidators}, + wire.ConcreteType{&ResultValidators{}, ResultTypeValidators}, wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState}, wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx}, - wire.ConcreteType{&ResultListUnconfirmedTxs{}, ResultTypeListUnconfirmedTxs}, + wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs}, wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, wire.ConcreteType{&ResultEvent{}, ResultTypeEvent},