update grpc broadcast tx

This commit is contained in:
Ethan Buchman 2016-11-30 17:28:41 -05:00
parent 2ef695da97
commit b74a97a4f6
7 changed files with 98 additions and 50 deletions

View File

@ -1274,7 +1274,10 @@ func (cs *ConsensusState) finalizeCommit(height int) {
// Execute and commit the block, and update the mempool.
// All calls to the proxyAppConn should come here.
// NOTE: the block.AppHash wont reflect these txs until the next block
stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
if err != nil {
// TODO!
}
fail.Fail() // XXX

View File

@ -40,7 +40,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
}
// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app)
// or if we timeout waiting for tx to commit
// or if we timeout waiting for tx to commit.
// If CheckTx or AppendTx fail, no error will be returned, but the returned result
// will contain a non-OK TMSP code.
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block
@ -55,6 +57,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
checkTxResCh <- res
})
if err != nil {
log.Error("err", "err", err)
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
checkTxRes := <-checkTxResCh
@ -69,16 +72,23 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// Wait for the tx to be included in a block,
// timeout after something reasonable.
timer := time.NewTimer(60 * 5 * time.Second)
// TODO: configureable?
timer := time.NewTimer(60 * 2 * time.Second)
select {
case appendTxRes := <-appendTxResCh:
// The tx was included in a block.
appendTxR := appendTxRes.GetAppendTx()
appendTxR := &tmsp.ResponseAppendTx{
Code: appendTxRes.Code,
Data: appendTxRes.Data,
Log: appendTxRes.Log,
}
log.Error("appendtx passed ", "r", appendTxR)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
AppendTx: appendTxR,
}, nil
case <-timer.C:
log.Error("failed to include tx")
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
AppendTx: nil,

View File

@ -44,6 +44,13 @@ func (m *RequestBroadcastTx) String() string { return proto.CompactTe
func (*RequestBroadcastTx) ProtoMessage() {}
func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *RequestBroadcastTx) GetTx() []byte {
if m != nil {
return m.Tx
}
return nil
}
type ResponseBroadcastTx struct {
CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"`
AppendTx *types.ResponseAppendTx `protobuf:"bytes,2,opt,name=append_tx,json=appendTx" json:"append_tx,omitempty"`
@ -79,7 +86,7 @@ var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion3
const _ = grpc.SupportPackageIsVersion4
// Client API for BroadcastAPI service
@ -142,25 +149,26 @@ var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{},
Metadata: fileDescriptor0,
Metadata: "types.proto",
}
func init() { proto.RegisterFile("types.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 223 bytes of a gzipped FileDescriptorProto
// 226 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48,
0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f,
0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f,
0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0xc9, 0x2d, 0x2e, 0xd0, 0x07,
0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xa4, 0xc2, 0x25, 0x14, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2,
0x54, 0x94, 0x9f, 0x98, 0x92, 0x9c, 0x58, 0x5c, 0x12, 0x52, 0x21, 0xc4, 0xc7, 0xc5, 0x54, 0x52,
0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0x04, 0x64, 0x29, 0xd5, 0x71, 0x09, 0x07, 0xa5, 0x16,
0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x22, 0x2b, 0x33, 0xe4, 0xe2, 0x48, 0xce, 0x48, 0x4d, 0xce, 0x8e,
0x87, 0x2a, 0xe6, 0x36, 0x12, 0xd3, 0x83, 0x18, 0x0e, 0x53, 0xed, 0x0c, 0x92, 0x0e, 0xa9, 0x08,
0x62, 0x4f, 0x86, 0x30, 0x84, 0x4c, 0xb8, 0x38, 0x13, 0x0b, 0x0a, 0x80, 0xce, 0x02, 0xe9, 0x61,
0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x07, 0x6a, 0xe2, 0x48, 0x84, 0xb2, 0x8c, 0x62,
0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb, 0x43, 0x56, 0x0f,
0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89, 0x0d, 0x1c, 0x14,
0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01, 0x00, 0x00,
0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0xc4, 0x54, 0x52, 0xa1, 0x54, 0xc7, 0x25, 0x1c, 0x94,
0x5a, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0xac, 0xcc, 0x90, 0x8b, 0x23, 0x39, 0x23, 0x35, 0x39,
0x3b, 0x1e, 0xaa, 0x98, 0xdb, 0x48, 0x4c, 0x0f, 0x62, 0x38, 0x4c, 0xb5, 0x33, 0x48, 0x3a, 0xa4,
0x22, 0x88, 0x3d, 0x19, 0xc2, 0x10, 0x32, 0xe1, 0xe2, 0x4c, 0x2c, 0x28, 0x48, 0xcd, 0x4b, 0x01,
0xe9, 0x61, 0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x0f, 0xa9, 0x08, 0xe2, 0x48, 0x84,
0xb2, 0x8c, 0x62, 0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb,
0x43, 0x56, 0x0f, 0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89,
0x0d, 0x1c, 0x14, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01,
0x00, 0x00,
}

View File

@ -88,11 +88,11 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
// NOTE: if we count we can access the tx from the block instead of
// pulling it from the req
event := types.EventDataTx{
Tx: req.GetAppendTx().Tx,
Result: apTx.Data,
Code: apTx.Code,
Log: apTx.Log,
Error: txError,
Tx: req.GetAppendTx().Tx,
Data: apTx.Data,
Code: apTx.Code,
Log: apTx.Log,
Error: txError,
}
types.FireEventTx(eventCache, event)
}

View File

@ -1,4 +1,5 @@
#! /bin/bash
set -u
#####################
# counter over socket
@ -7,6 +8,19 @@ TESTNAME=$1
# Send some txs
function getCode() {
R=$1
if [[ "$R" == "{}" ]]; then
# protobuf auto adds `omitempty` to everything so code OK and empty data/log
# will not even show when marshalled into json
# apparently we can use github.com/golang/protobuf/jsonpb to do the marshalling ...
echo 0
else
# this wont actually work if theres an error ...
echo "$R" | jq .code
fi
}
function sendTx() {
TX=$1
if [[ "$GRPC_BROADCAST_TX" == "" ]]; then
@ -15,7 +29,10 @@ function sendTx() {
ERROR=`echo $RESPONSE | jq .error`
ERROR=$(echo "$ERROR" | tr -d '"') # remove surrounding quotes
else
RESPONSE=`go run grpc_client.go $TX`
if [ ! -f grpc_client ]; then
go build -o grpc_client grpc_client.go
fi
RESPONSE=`./grpc_client $TX`
echo $RESPONSE | jq . &> /dev/null
IS_JSON=$?
if [[ "$IS_JSON" != "0" ]]; then
@ -23,72 +40,81 @@ function sendTx() {
else
ERROR="" # reset
fi
APPEND_TX_RESPONSE=`echo $RESPONSE | jq .append_tx`
APPEND_TX_CODE=`getCode "$APPEND_TX_RESPONSE"`
CHECK_TX_RESPONSE=`echo $RESPONSE | jq .check_tx`
CHECK_TX_CODE=`getCode "$CHECK_TX_RESPONSE"`
if [[ "$RESPONSE" == "{}" ]]; then
# protobuf auto adds `omitempty` to everything so code OK and empty data/log
# will not even show when marshalled into json
# apparently we can use github.com/golang/protobuf/jsonpb to do the marshalling ...
CODE=0
else
# this wont actually work if theres an error ...
CODE=`echo $RESPONSE | jq .code`
fi
#echo "-------"
#echo "TX $TX"
#echo "RESPONSE $RESPONSE"
#echo "CODE $CODE"
#echo "ERROR $ERROR"
#echo "----"
echo "-------"
echo "TX $TX"
echo "RESPONSE $RESPONSE"
echo "CHECK_TX_RESPONSE $CHECK_TX_RESPONSE"
echo "APPEND_TX_RESPONSE $APPEND_TX_RESPONSE"
echo "CHECK_TX_CODE $CHECK_TX_CODE"
echo "APPEND_TX_CODE $APPEND_TX_CODE"
echo "----"
fi
}
echo "... sending tx. expect no error"
# 0 should pass once and get in block, with no error
TX=00
sendTx $TX
if [[ $CODE != 0 ]]; then
if [[ $APPEND_TX_CODE != 0 ]]; then
echo "Got non-zero exit code for $TX. $RESPONSE"
exit 1
fi
if [[ "$ERROR" != "" ]]; then
if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then
echo "Unexpected error. Tx $TX should have been included in a block. $ERROR"
exit 1
fi
echo "... sending tx. expect error"
# second time should get rejected by the mempool (return error and non-zero code)
sendTx $TX
if [[ $CODE == 0 ]]; then
echo "CHECKTX CODE: $CHECK_TX_CODE"
if [[ "$CHECK_TX_CODE" == 0 ]]; then
echo "Got zero exit code for $TX. Expected tx to be rejected by mempool. $RESPONSE"
exit 1
fi
if [[ "$ERROR" == "" ]]; then
if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" == "" ]]; then
echo "Expected to get an error - tx $TX should have been rejected from mempool"
echo "$RESPONSE"
exit 1
fi
echo "... sending tx. expect no error"
# now, TX=01 should pass, with no error
TX=01
sendTx $TX
if [[ $CODE != 0 ]]; then
if [[ $APPEND_TX_CODE != 0 ]]; then
echo "Got non-zero exit code for $TX. $RESPONSE"
exit 1
fi
if [[ "$ERROR" != "" ]]; then
if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then
echo "Unexpected error. Tx $TX should have been accepted in block. $ERROR"
exit 1
fi
echo "... sending tx. expect no error, but invalid"
# now, TX=03 should get in a block (passes CheckTx, no error), but is invalid
TX=03
sendTx $TX
if [[ $CODE == 0 ]]; then
if [[ "$CHECK_TX_CODE" != 0 ]]; then
echo "Got non-zero exit code for checktx on $TX. $RESPONSE"
exit 1
fi
if [[ $APPEND_TX_CODE == 0 ]]; then
echo "Got zero exit code for $TX. Should have been bad nonce. $RESPONSE"
exit 1
fi
if [[ "$ERROR" != "" ]]; then
if [[ "$GRPC_BROADCAST_TX" == "" && "$ERROR" != "" ]]; then
echo "Unexpected error. Tx $TX should have been included in a block. $ERROR"
exit 1
fi

View File

@ -83,6 +83,7 @@ function counter_over_grpc_grpc() {
echo "Starting counter and tendermint"
counter --serial --tmsp grpc > /dev/null &
pid_counter=$!
sleep 1
GRPC_PORT=36656
tendermint node --tmsp grpc --grpc_laddr tcp://localhost:$GRPC_PORT > tendermint.log &
pid_tendermint=$!

View File

@ -73,11 +73,11 @@ type EventDataNewBlockHeader struct {
// All txs fire EventDataTx
type EventDataTx struct {
Tx Tx `json:"tx"`
Result []byte `json:"result"`
Log string `json:"log"`
Code tmsp.CodeType `json:"code"`
Error string `json:"error"`
Tx Tx `json:"tx"`
Data []byte `json:"data"`
Log string `json:"log"`
Code tmsp.CodeType `json:"code"`
Error string `json:"error"` // this is redundant information for now
}
// NOTE: This goes into the replay WAL