Merge pull request #255 from tendermint/broadcast_tx_grpc

broadcast_tx via grpc
This commit is contained in:
Ethan Buchman 2016-12-02 00:31:40 -05:00 committed by GitHub
commit dc436e72f9
20 changed files with 496 additions and 55 deletions

View File

@ -16,6 +16,7 @@ func parseFlags(config cfg.Config, args []string) {
fastSync bool
skipUPNP bool
rpcLaddr string
grpcLaddr string
logLevel string
proxyApp string
tmspTransport string
@ -30,6 +31,7 @@ func parseFlags(config cfg.Config, args []string) {
flags.BoolVar(&fastSync, "fast_sync", config.GetBool("fast_sync"), "Fast blockchain syncing")
flags.BoolVar(&skipUPNP, "skip_upnp", config.GetBool("skip_upnp"), "Skip UPNP configuration")
flags.StringVar(&rpcLaddr, "rpc_laddr", config.GetString("rpc_laddr"), "RPC listen address. Port required")
flags.StringVar(&grpcLaddr, "grpc_laddr", config.GetString("grpc_laddr"), "GRPC listen address (BroadcastTx only). Port required")
flags.StringVar(&logLevel, "log_level", config.GetString("log_level"), "Log level")
flags.StringVar(&proxyApp, "proxy_app", config.GetString("proxy_app"),
"Proxy app address, or 'nilapp' or 'dummy' for local testing.")
@ -47,6 +49,7 @@ func parseFlags(config cfg.Config, args []string) {
config.Set("fast_sync", fastSync)
config.Set("skip_upnp", skipUPNP)
config.Set("rpc_laddr", rpcLaddr)
config.Set("grpc_laddr", grpcLaddr)
config.Set("log_level", logLevel)
config.Set("proxy_app", proxyApp)
config.Set("tmsp", tmspTransport)

View File

@ -67,6 +67,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("db_dir", rootDir+"/data")
mapConfig.SetDefault("log_level", "info")
mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:46657")
mapConfig.SetDefault("grpc_laddr", "")
mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cs_wal_dir", rootDir+"/data/cs.wal")

View File

@ -80,6 +80,7 @@ func ResetConfig(localPath string) cfg.Config {
mapConfig.SetDefault("db_dir", rootDir+"/data")
mapConfig.SetDefault("log_level", "debug")
mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:36657")
mapConfig.SetDefault("grpc_laddr", "tcp://0.0.0.0:36658")
mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cs_wal_dir", rootDir+"/data/cs.wal")

View File

@ -1274,7 +1274,10 @@ func (cs *ConsensusState) finalizeCommit(height int) {
// Execute and commit the block, and update the mempool.
// All calls to the proxyAppConn should come here.
// NOTE: the block.AppHash wont reflect these txs until the next block
stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
if err != nil {
// TODO!
}
fail.Fail() // XXX

View File

@ -21,6 +21,7 @@ import (
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy"
rpccore "github.com/tendermint/tendermint/rpc/core"
grpccore "github.com/tendermint/tendermint/rpc/grpc"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
@ -218,6 +219,17 @@ func (n *Node) StartRPC() ([]net.Listener, error) {
}
listeners[i] = listener
}
// we expose a simplified api over grpc for convenience to app devs
grpcListenAddr := n.config.GetString("grpc_laddr")
if grpcListenAddr != "" {
listener, err := grpccore.StartGRPCServer(grpcListenAddr)
if err != nil {
return nil, err
}
listeners = append(listeners, listener)
}
return listeners, nil
}

View File

@ -39,16 +39,11 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
}, nil
}
// CONTRACT: returns error==nil iff the tx is included in a block.
//
// If CheckTx fails, return with the response from CheckTx AND an error.
// Else, block until the tx is included in a block,
// and return the result of AppendTx (with no error).
// Even if AppendTx fails, so long as the tx is included in a block this function
// will not return an error - it is the caller's responsibility to check res.Code.
// The function times out after five minutes and returns the result of CheckTx and an error.
// TODO: smarter timeout logic or someway to cancel (tx not getting committed is a sign of a larger problem!)
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app)
// or if we timeout waiting for tx to commit.
// If CheckTx or AppendTx fail, no error will be returned, but the returned result
// will contain a non-OK TMSP code.
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block
appendTxResCh := make(chan types.EventDataTx, 1)
@ -62,38 +57,41 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
checkTxResCh <- res
})
if err != nil {
log.Error("err", "err", err)
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
checkTxRes := <-checkTxResCh
checkTxR := checkTxRes.GetCheckTx()
if r := checkTxR; r.Code != tmsp.CodeType_OK {
if checkTxR.Code != tmsp.CodeType_OK {
// CheckTx failed!
return &ctypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
}, fmt.Errorf("Check tx failed with non-zero code: %s. Data: %X; Log: %s", r.Code.String(), r.Data, r.Log)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
AppendTx: nil,
}, nil
}
// Wait for the tx to be included in a block,
// timeout after something reasonable.
timer := time.NewTimer(60 * 5 * time.Second)
// TODO: configureable?
timer := time.NewTimer(60 * 2 * time.Second)
select {
case appendTxRes := <-appendTxResCh:
// The tx was included in a block.
// NOTE we don't return an error regardless of the AppendTx code;
// clients must check this to see if they need to send a new tx!
return &ctypes.ResultBroadcastTx{
appendTxR := &tmsp.ResponseAppendTx{
Code: appendTxRes.Code,
Data: appendTxRes.Result,
Data: appendTxRes.Data,
Log: appendTxRes.Log,
}
log.Error("appendtx passed ", "r", appendTxR)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
AppendTx: appendTxR,
}, nil
case <-timer.C:
r := checkTxR
return &ctypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
log.Error("failed to include tx")
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
AppendTx: nil,
}, fmt.Errorf("Timed out waiting for transaction to be included in a block")
}

View File

@ -63,6 +63,11 @@ type ResultBroadcastTx struct {
Log string `json:"log"`
}
type ResultBroadcastTxCommit struct {
CheckTx *tmsp.ResponseCheckTx `json:"check_tx"`
AppendTx *tmsp.ResponseAppendTx `json:"append_tx"`
}
type ResultUnconfirmedTxs struct {
N int `json:"n_txs"`
Txs []types.Tx `json:"txs"`
@ -115,8 +120,9 @@ const (
ResultTypeDumpConsensusState = byte(0x41)
// 0x6 bytes are for txs / the application
ResultTypeBroadcastTx = byte(0x60)
ResultTypeUnconfirmedTxs = byte(0x61)
ResultTypeBroadcastTx = byte(0x60)
ResultTypeUnconfirmedTxs = byte(0x61)
ResultTypeBroadcastTxCommit = byte(0x62)
// 0x7 bytes are for querying the application
ResultTypeTMSPQuery = byte(0x70)
@ -151,6 +157,7 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&ResultValidators{}, ResultTypeValidators},
wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState},
wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx},
wire.ConcreteType{&ResultBroadcastTxCommit{}, ResultTypeBroadcastTxCommit},
wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs},
wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe},
wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe},

18
rpc/grpc/api.go Normal file
View File

@ -0,0 +1,18 @@
package core_grpc
import (
core "github.com/tendermint/tendermint/rpc/core"
context "golang.org/x/net/context"
)
type broadcastAPI struct {
}
func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) {
res, err := core.BroadcastTxCommit(req.Tx)
if err != nil {
return nil, err
}
return &ResponseBroadcastTx{res.CheckTx, res.AppendTx}, nil
}

44
rpc/grpc/client_server.go Normal file
View File

@ -0,0 +1,44 @@
package core_grpc
import (
"fmt"
"net"
"strings"
"time"
"google.golang.org/grpc"
. "github.com/tendermint/go-common"
)
// Start the grpcServer in a go routine
func StartGRPCServer(protoAddr string) (net.Listener, error) {
parts := strings.SplitN(protoAddr, "://", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("Invalid listen address for grpc server (did you forget a tcp:// prefix?) : %s", protoAddr)
}
proto, addr := parts[0], parts[1]
ln, err := net.Listen(proto, addr)
if err != nil {
return nil, err
}
grpcServer := grpc.NewServer()
RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{})
go grpcServer.Serve(ln)
return ln, nil
}
// Start the client by dialing the server
func StartGRPCClient(protoAddr string) BroadcastAPIClient {
conn, err := grpc.Dial(protoAddr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
if err != nil {
panic(err)
}
return NewBroadcastAPIClient(conn)
}
func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
return Connect(addr)
}

3
rpc/grpc/compile.sh Normal file
View File

@ -0,0 +1,3 @@
#! /bin/bash
protoc --go_out=plugins=grpc:. -I $GOPATH/src/ -I . types.proto

174
rpc/grpc/types.pb.go Normal file
View File

@ -0,0 +1,174 @@
// Code generated by protoc-gen-go.
// source: types.proto
// DO NOT EDIT!
/*
Package core_grpc is a generated protocol buffer package.
It is generated from these files:
types.proto
It has these top-level messages:
RequestBroadcastTx
ResponseBroadcastTx
*/
package core_grpc
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import types "github.com/tendermint/tmsp/types"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type RequestBroadcastTx struct {
Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"`
}
func (m *RequestBroadcastTx) Reset() { *m = RequestBroadcastTx{} }
func (m *RequestBroadcastTx) String() string { return proto.CompactTextString(m) }
func (*RequestBroadcastTx) ProtoMessage() {}
func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *RequestBroadcastTx) GetTx() []byte {
if m != nil {
return m.Tx
}
return nil
}
type ResponseBroadcastTx struct {
CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"`
AppendTx *types.ResponseAppendTx `protobuf:"bytes,2,opt,name=append_tx,json=appendTx" json:"append_tx,omitempty"`
}
func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} }
func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) }
func (*ResponseBroadcastTx) ProtoMessage() {}
func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *ResponseBroadcastTx) GetCheckTx() *types.ResponseCheckTx {
if m != nil {
return m.CheckTx
}
return nil
}
func (m *ResponseBroadcastTx) GetAppendTx() *types.ResponseAppendTx {
if m != nil {
return m.AppendTx
}
return nil
}
func init() {
proto.RegisterType((*RequestBroadcastTx)(nil), "core_grpc.RequestBroadcastTx")
proto.RegisterType((*ResponseBroadcastTx)(nil), "core_grpc.ResponseBroadcastTx")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for BroadcastAPI service
type BroadcastAPIClient interface {
BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error)
}
type broadcastAPIClient struct {
cc *grpc.ClientConn
}
func NewBroadcastAPIClient(cc *grpc.ClientConn) BroadcastAPIClient {
return &broadcastAPIClient{cc}
}
func (c *broadcastAPIClient) BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) {
out := new(ResponseBroadcastTx)
err := grpc.Invoke(ctx, "/core_grpc.BroadcastAPI/BroadcastTx", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for BroadcastAPI service
type BroadcastAPIServer interface {
BroadcastTx(context.Context, *RequestBroadcastTx) (*ResponseBroadcastTx, error)
}
func RegisterBroadcastAPIServer(s *grpc.Server, srv BroadcastAPIServer) {
s.RegisterService(&_BroadcastAPI_serviceDesc, srv)
}
func _BroadcastAPI_BroadcastTx_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestBroadcastTx)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BroadcastAPIServer).BroadcastTx(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/core_grpc.BroadcastAPI/BroadcastTx",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BroadcastAPIServer).BroadcastTx(ctx, req.(*RequestBroadcastTx))
}
return interceptor(ctx, in, info, handler)
}
var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{
ServiceName: "core_grpc.BroadcastAPI",
HandlerType: (*BroadcastAPIServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "BroadcastTx",
Handler: _BroadcastAPI_BroadcastTx_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "types.proto",
}
func init() { proto.RegisterFile("types.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 226 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48,
0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f,
0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f,
0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0xc9, 0x2d, 0x2e, 0xd0, 0x07,
0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xa4, 0xc2, 0x25, 0x14, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2,
0x54, 0x94, 0x9f, 0x98, 0x92, 0x9c, 0x58, 0x5c, 0x12, 0x52, 0x21, 0xc4, 0xc7, 0xc5, 0x54, 0x52,
0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0xc4, 0x54, 0x52, 0xa1, 0x54, 0xc7, 0x25, 0x1c, 0x94,
0x5a, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0xac, 0xcc, 0x90, 0x8b, 0x23, 0x39, 0x23, 0x35, 0x39,
0x3b, 0x1e, 0xaa, 0x98, 0xdb, 0x48, 0x4c, 0x0f, 0x62, 0x38, 0x4c, 0xb5, 0x33, 0x48, 0x3a, 0xa4,
0x22, 0x88, 0x3d, 0x19, 0xc2, 0x10, 0x32, 0xe1, 0xe2, 0x4c, 0x2c, 0x28, 0x48, 0xcd, 0x4b, 0x01,
0xe9, 0x61, 0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x0f, 0xa9, 0x08, 0xe2, 0x48, 0x84,
0xb2, 0x8c, 0x62, 0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb,
0x43, 0x56, 0x0f, 0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89,
0x0d, 0x1c, 0x14, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01,
0x00, 0x00,
}

29
rpc/grpc/types.proto Normal file
View File

@ -0,0 +1,29 @@
syntax = "proto3";
package core_grpc;
import "github.com/tendermint/tmsp/types/types.proto";
//----------------------------------------
// Message types
//----------------------------------------
// Request types
message RequestBroadcastTx {
bytes tx = 1;
}
//----------------------------------------
// Response types
message ResponseBroadcastTx{
types.ResponseCheckTx check_tx = 1;
types.ResponseAppendTx append_tx = 2;
}
//----------------------------------------
// Service Definition
service BroadcastAPI {
rpc BroadcastTx(RequestBroadcastTx) returns (ResponseBroadcastTx) ;
}

View File

@ -193,9 +193,14 @@ func TestJSONBroadcastTxCommit(t *testing.T) {
func testBroadcastTxCommit(t *testing.T, resI interface{}, tx []byte) {
tmRes := resI.(*ctypes.TMResult)
res := (*tmRes).(*ctypes.ResultBroadcastTx)
if res.Code != tmsp.CodeType_OK {
panic(Fmt("BroadcastTxCommit got non-zero exit code: %v. %X; %s", res.Code, res.Data, res.Log))
res := (*tmRes).(*ctypes.ResultBroadcastTxCommit)
checkTx := res.CheckTx
if checkTx.Code != tmsp.CodeType_OK {
panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", checkTx.Code, checkTx.Data, checkTx.Log))
}
appendTx := res.AppendTx
if appendTx.Code != tmsp.CodeType_OK {
panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", appendTx.Code, appendTx.Data, appendTx.Log))
}
mem := node.MempoolReactor().Mempool
if mem.Size() != 0 {

24
rpc/test/grpc_test.go Normal file
View File

@ -0,0 +1,24 @@
package rpctest
import (
"testing"
"golang.org/x/net/context"
"github.com/tendermint/tendermint/rpc/grpc"
)
//-------------------------------------------
func TestBroadcastTx(t *testing.T) {
res, err := clientGRPC.BroadcastTx(context.Background(), &core_grpc.RequestBroadcastTx{[]byte("this is a tx")})
if err != nil {
t.Fatal(err)
}
if res.CheckTx.Code != 0 {
t.Fatalf("Non-zero check tx code: %d", res.CheckTx.Code)
}
if res.AppendTx.Code != 0 {
t.Fatalf("Non-zero append tx code: %d", res.AppendTx.Code)
}
}

View File

@ -13,6 +13,7 @@ import (
"github.com/tendermint/tendermint/config/tendermint_test"
nm "github.com/tendermint/tendermint/node"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/rpc/grpc"
)
// global variables for use across all tests
@ -24,8 +25,10 @@ var (
requestAddr string
websocketAddr string
websocketEndpoint string
grpcAddr string
clientURI *client.ClientURI
clientJSON *client.ClientJSONRPC
clientGRPC core_grpc.BroadcastAPIClient
)
// initialize config and create new node
@ -33,12 +36,14 @@ func init() {
config = tendermint_test.ResetConfig("rpc_test_client_test")
chainID = config.GetString("chain_id")
rpcAddr = config.GetString("rpc_laddr")
grpcAddr = config.GetString("grpc_laddr")
requestAddr = rpcAddr
websocketAddr = rpcAddr
websocketEndpoint = "/websocket"
clientURI = client.NewClientURI(requestAddr)
clientJSON = client.NewClientJSONRPC(requestAddr)
clientGRPC = core_grpc.StartGRPCClient(grpcAddr)
// TODO: change consensus/state.go timeouts to be shorter
@ -59,6 +64,8 @@ func newNode(ready chan struct{}) {
// Run the RPC server.
node.StartRPC()
time.Sleep(time.Second)
ready <- struct{}{}
// Sleep forever

View File

@ -88,11 +88,11 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
// NOTE: if we count we can access the tx from the block instead of
// pulling it from the req
event := types.EventDataTx{
Tx: req.GetAppendTx().Tx,
Result: apTx.Data,
Code: apTx.Code,
Log: apTx.Log,
Error: txError,
Tx: req.GetAppendTx().Tx,
Data: apTx.Data,
Code: apTx.Code,
Log: apTx.Log,
Error: txError,
}
types.FireEventTx(eventCache, event)
}

View File

@ -1,4 +1,5 @@
#! /bin/bash
set -u
#####################
# counter over socket
@ -7,61 +8,113 @@ TESTNAME=$1
# Send some txs
function getCode() {
R=$1
if [[ "$R" == "{}" ]]; then
# protobuf auto adds `omitempty` to everything so code OK and empty data/log
# will not even show when marshalled into json
# apparently we can use github.com/golang/protobuf/jsonpb to do the marshalling ...
echo 0
else
# this wont actually work if theres an error ...
echo "$R" | jq .code
fi
}
function sendTx() {
TX=$1
RESPONSE=`curl -s localhost:46657/broadcast_tx_commit?tx=\"$TX\"`
CODE=`echo $RESPONSE | jq .result[1].code`
ERROR=`echo $RESPONSE | jq .error`
ERROR=$(echo "$ERROR" | tr -d '"') # remove surrounding quotes
if [[ "$GRPC_BROADCAST_TX" == "" ]]; then
RESPONSE=`curl -s localhost:46657/broadcast_tx_commit?tx=\"$TX\"`
CODE=`echo $RESPONSE | jq .result[1].code`
ERROR=`echo $RESPONSE | jq .error`
ERROR=$(echo "$ERROR" | tr -d '"') # remove surrounding quotes
else
if [ ! -f grpc_client ]; then
go build -o grpc_client grpc_client.go
fi
RESPONSE=`./grpc_client $TX`
echo $RESPONSE | jq . &> /dev/null
IS_JSON=$?
if [[ "$IS_JSON" != "0" ]]; then
ERROR="$RESPONSE"
else
ERROR="" # reset
fi
APPEND_TX_RESPONSE=`echo $RESPONSE | jq .append_tx`
APPEND_TX_CODE=`getCode "$APPEND_TX_RESPONSE"`
CHECK_TX_RESPONSE=`echo $RESPONSE | jq .check_tx`
CHECK_TX_CODE=`getCode "$CHECK_TX_RESPONSE"`
echo "-------"
echo "TX $TX"
echo "RESPONSE $RESPONSE"
echo "CHECK_TX_RESPONSE $CHECK_TX_RESPONSE"
echo "APPEND_TX_RESPONSE $APPEND_TX_RESPONSE"
echo "CHECK_TX_CODE $CHECK_TX_CODE"
echo "APPEND_TX_CODE $APPEND_TX_CODE"
echo "----"
fi
}
echo "... sending tx. expect no error"
# 0 should pass once and get in block, with no error
TX=00
sendTx $TX
if [[ $CODE != 0 ]]; then
if [[ $APPEND_TX_CODE != 0 ]]; then
echo "Got non-zero exit code for $TX. $RESPONSE"
exit 1
fi
if [[ "$ERROR" != "" ]]; then
if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then
echo "Unexpected error. Tx $TX should have been included in a block. $ERROR"
exit 1
fi
echo "... sending tx. expect error"
# second time should get rejected by the mempool (return error and non-zero code)
sendTx $TX
if [[ $CODE == 0 ]]; then
echo "CHECKTX CODE: $CHECK_TX_CODE"
if [[ "$CHECK_TX_CODE" == 0 ]]; then
echo "Got zero exit code for $TX. Expected tx to be rejected by mempool. $RESPONSE"
exit 1
fi
if [[ "$ERROR" == "" ]]; then
if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" == "" ]]; then
echo "Expected to get an error - tx $TX should have been rejected from mempool"
echo "$RESPONSE"
exit 1
fi
echo "... sending tx. expect no error"
# now, TX=01 should pass, with no error
TX=01
sendTx $TX
if [[ $CODE != 0 ]]; then
if [[ $APPEND_TX_CODE != 0 ]]; then
echo "Got non-zero exit code for $TX. $RESPONSE"
exit 1
fi
if [[ "$ERROR" != "" ]]; then
if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then
echo "Unexpected error. Tx $TX should have been accepted in block. $ERROR"
exit 1
fi
echo "... sending tx. expect no error, but invalid"
# now, TX=03 should get in a block (passes CheckTx, no error), but is invalid
TX=03
sendTx $TX
if [[ $CODE == 0 ]]; then
if [[ "$CHECK_TX_CODE" != 0 ]]; then
echo "Got non-zero exit code for checktx on $TX. $RESPONSE"
exit 1
fi
if [[ $APPEND_TX_CODE == 0 ]]; then
echo "Got zero exit code for $TX. Should have been bad nonce. $RESPONSE"
exit 1
fi
if [[ "$ERROR" != "" ]]; then
if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then
echo "Unexpected error. Tx $TX should have been included in a block. $ERROR"
exit 1
fi

36
test/app/grpc_client.go Normal file
View File

@ -0,0 +1,36 @@
package main
import (
"encoding/hex"
"fmt"
"os"
"golang.org/x/net/context"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/rpc/grpc"
)
var grpcAddr = "tcp://localhost:36656"
func main() {
args := os.Args
if len(args) == 1 {
fmt.Println("Must enter a transaction to send (hex)")
os.Exit(1)
}
tx := args[1]
txBytes, err := hex.DecodeString(tx)
if err != nil {
fmt.Println("Invalid hex", err)
os.Exit(1)
}
clientGRPC := core_grpc.StartGRPCClient(grpcAddr)
res, err := clientGRPC.BroadcastTx(context.Background(), &core_grpc.RequestBroadcastTx{txBytes})
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(string(wire.JSONBytes(res)))
}

View File

@ -77,6 +77,24 @@ function counter_over_grpc() {
kill -9 $pid_counter $pid_tendermint
}
function counter_over_grpc_grpc() {
rm -rf $TMROOT
tendermint init
echo "Starting counter and tendermint"
counter --serial --tmsp grpc > /dev/null &
pid_counter=$!
sleep 1
GRPC_PORT=36656
tendermint node --tmsp grpc --grpc_laddr tcp://localhost:$GRPC_PORT > tendermint.log &
pid_tendermint=$!
sleep 5
echo "running test"
GRPC_BROADCAST_TX=true bash counter_test.sh "Counter over GRPC via GRPC BroadcastTx"
kill -9 $pid_counter $pid_tendermint
}
cd $GOPATH/src/github.com/tendermint/tendermint/test/app
case "$1" in
@ -92,6 +110,9 @@ case "$1" in
"counter_over_grpc")
counter_over_grpc
;;
"counter_over_grpc_grpc")
counter_over_grpc_grpc
;;
*)
echo "Running all"
dummy_over_socket
@ -101,5 +122,7 @@ case "$1" in
counter_over_socket
echo ""
counter_over_grpc
echo ""
counter_over_grpc_grpc
esac

View File

@ -73,11 +73,11 @@ type EventDataNewBlockHeader struct {
// All txs fire EventDataTx
type EventDataTx struct {
Tx Tx `json:"tx"`
Result []byte `json:"result"`
Log string `json:"log"`
Code tmsp.CodeType `json:"code"`
Error string `json:"error"`
Tx Tx `json:"tx"`
Data []byte `json:"data"`
Log string `json:"log"`
Code tmsp.CodeType `json:"code"`
Error string `json:"error"` // this is redundant information for now
}
// NOTE: This goes into the replay WAL