Convert TMSP to use Protobuf

This commit is contained in:
Jae Kwon 2016-01-30 19:36:33 -08:00
parent 6132ad7d6e
commit 2936c68339
12 changed files with 392 additions and 341 deletions

View File

@ -1,6 +1,9 @@
.PHONY: all test get_deps .PHONY: all test get_deps
all: test install all: protoc test install
protoc:
protoc --go_out=. types/*.proto
install: get_deps install: get_deps
go install github.com/tendermint/tmsp/cmd/... go install github.com/tendermint/tmsp/cmd/...

View File

@ -6,18 +6,16 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"reflect"
"sync" "sync"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-wire" "github.com/tendermint/tmsp/types"
tmsp "github.com/tendermint/tmsp/types"
) )
const maxResponseSize = 1048576 // 1MB TODO make configurable const maxResponseSize = 1048576 // 1MB TODO make configurable
const flushThrottleMS = 20 // Don't wait longer than... const flushThrottleMS = 20 // Don't wait longer than...
type Callback func(tmsp.Request, tmsp.Response) type Callback func(*types.Request, *types.Response)
// This is goroutine-safe, but users should beware that // This is goroutine-safe, but users should beware that
// the application in general is not meant to be interfaced // the application in general is not meant to be interfaced
@ -34,7 +32,7 @@ type TMSPClient struct {
bufWriter *bufio.Writer bufWriter *bufio.Writer
err error err error
reqSent *list.List reqSent *list.List
resCb func(tmsp.Request, tmsp.Response) resCb func(*types.Request, *types.Response)
} }
func NewTMSPClient(conn net.Conn, bufferSize int) *TMSPClient { func NewTMSPClient(conn net.Conn, bufferSize int) *TMSPClient {
@ -91,12 +89,10 @@ func (cli *TMSPClient) Error() error {
func (cli *TMSPClient) sendRequestsRoutine() { func (cli *TMSPClient) sendRequestsRoutine() {
for { for {
var n int
var err error
select { select {
case <-cli.flushTimer.Ch: case <-cli.flushTimer.Ch:
select { select {
case cli.reqQueue <- newReqRes(tmsp.RequestFlush{}): case cli.reqQueue <- newReqRes(types.RequestFlush()):
default: default:
// Probably will fill the buffer, or retry later. // Probably will fill the buffer, or retry later.
} }
@ -104,13 +100,13 @@ func (cli *TMSPClient) sendRequestsRoutine() {
return return
case reqres := <-cli.reqQueue: case reqres := <-cli.reqQueue:
cli.willSendReq(reqres) cli.willSendReq(reqres)
wire.WriteBinaryLengthPrefixed(struct{ tmsp.Request }{reqres.Request}, cli.bufWriter, &n, &err) // Length prefix err := types.WriteMessage(reqres.Request, cli.bufWriter)
if err != nil { if err != nil {
cli.StopForError(err) cli.StopForError(err)
return return
} }
// log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
if _, ok := reqres.Request.(tmsp.RequestFlush); ok { if reqres.Request.Type == types.RequestTypeFlush {
err = cli.bufWriter.Flush() err = cli.bufWriter.Flush()
if err != nil { if err != nil {
cli.StopForError(err) cli.StopForError(err)
@ -124,16 +120,14 @@ func (cli *TMSPClient) sendRequestsRoutine() {
func (cli *TMSPClient) recvResponseRoutine() { func (cli *TMSPClient) recvResponseRoutine() {
r := bufio.NewReader(cli.conn) // Buffer reads r := bufio.NewReader(cli.conn) // Buffer reads
for { for {
var res tmsp.Response var res = &types.Response{}
var n int err := types.ReadMessage(r, res)
var err error
wire.ReadBinaryPtrLengthPrefixed(&res, r, maxResponseSize, &n, &err)
if err != nil { if err != nil {
cli.StopForError(err) cli.StopForError(err)
return return
} }
switch res := res.(type) { switch res.Type {
case tmsp.ResponseException: case types.ResponseTypeException:
// XXX After setting cli.err, release waiters (e.g. reqres.Done()) // XXX After setting cli.err, release waiters (e.g. reqres.Done())
cli.StopForError(errors.New(res.Error)) cli.StopForError(errors.New(res.Error))
default: default:
@ -152,19 +146,19 @@ func (cli *TMSPClient) willSendReq(reqres *reqRes) {
cli.reqSent.PushBack(reqres) cli.reqSent.PushBack(reqres)
} }
func (cli *TMSPClient) didRecvResponse(res tmsp.Response) error { func (cli *TMSPClient) didRecvResponse(res *types.Response) error {
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock() defer cli.mtx.Unlock()
// Get the first reqRes // Get the first reqRes
next := cli.reqSent.Front() next := cli.reqSent.Front()
if next == nil { if next == nil {
return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res)) return fmt.Errorf("Unexpected result type %v when nothing expected", res.Type)
} }
reqres := next.Value.(*reqRes) reqres := next.Value.(*reqRes)
if !resMatchesReq(reqres.Request, res) { if !resMatchesReq(reqres.Request, res) {
return fmt.Errorf("Unexpected result type %v when response to %v expected", return fmt.Errorf("Unexpected result type %v when response to %v expected",
reflect.TypeOf(res), reflect.TypeOf(reqres.Request)) res.Type, reqres.Request.Type)
} }
reqres.Response = res // Set response reqres.Response = res // Set response
@ -182,99 +176,99 @@ func (cli *TMSPClient) didRecvResponse(res tmsp.Response) error {
//---------------------------------------- //----------------------------------------
func (cli *TMSPClient) EchoAsync(msg string) { func (cli *TMSPClient) EchoAsync(msg string) {
cli.queueRequest(tmsp.RequestEcho{msg}) cli.queueRequest(types.RequestEcho(msg))
} }
func (cli *TMSPClient) FlushAsync() { func (cli *TMSPClient) FlushAsync() {
cli.queueRequest(tmsp.RequestFlush{}) cli.queueRequest(types.RequestFlush())
} }
func (cli *TMSPClient) SetOptionAsync(key string, value string) { func (cli *TMSPClient) SetOptionAsync(key string, value string) {
cli.queueRequest(tmsp.RequestSetOption{key, value}) cli.queueRequest(types.RequestSetOption(key, value))
} }
func (cli *TMSPClient) AppendTxAsync(tx []byte) { func (cli *TMSPClient) AppendTxAsync(tx []byte) {
cli.queueRequest(tmsp.RequestAppendTx{tx}) cli.queueRequest(types.RequestAppendTx(tx))
} }
func (cli *TMSPClient) CheckTxAsync(tx []byte) { func (cli *TMSPClient) CheckTxAsync(tx []byte) {
cli.queueRequest(tmsp.RequestCheckTx{tx}) cli.queueRequest(types.RequestCheckTx(tx))
} }
func (cli *TMSPClient) GetHashAsync() { func (cli *TMSPClient) GetHashAsync() {
cli.queueRequest(tmsp.RequestGetHash{}) cli.queueRequest(types.RequestGetHash())
} }
func (cli *TMSPClient) QueryAsync(query []byte) { func (cli *TMSPClient) QueryAsync(query []byte) {
cli.queueRequest(tmsp.RequestQuery{query}) cli.queueRequest(types.RequestQuery(query))
} }
//---------------------------------------- //----------------------------------------
func (cli *TMSPClient) InfoSync() (info string, err error) { func (cli *TMSPClient) InfoSync() (info string, err error) {
reqres := cli.queueRequest(tmsp.RequestInfo{}) reqres := cli.queueRequest(types.RequestInfo())
cli.FlushSync() cli.FlushSync()
if cli.err != nil { if cli.err != nil {
return "", cli.err return "", cli.err
} }
return reqres.Response.(tmsp.ResponseInfo).Info, nil return string(reqres.Response.Data), nil
} }
func (cli *TMSPClient) FlushSync() error { func (cli *TMSPClient) FlushSync() error {
cli.queueRequest(tmsp.RequestFlush{}).Wait() cli.queueRequest(types.RequestFlush()).Wait()
return cli.err return cli.err
} }
func (cli *TMSPClient) AppendTxSync(tx []byte) (code tmsp.RetCode, result []byte, log string, err error) { func (cli *TMSPClient) AppendTxSync(tx []byte) (code types.RetCode, result []byte, log string, err error) {
reqres := cli.queueRequest(tmsp.RequestAppendTx{tx}) reqres := cli.queueRequest(types.RequestAppendTx(tx))
cli.FlushSync() cli.FlushSync()
if cli.err != nil { if cli.err != nil {
return tmsp.RetCodeInternalError, nil, "", cli.err return types.RetCodeInternalError, nil, "", cli.err
} }
res := reqres.Response.(tmsp.ResponseAppendTx) res := reqres.Response
return res.Code, res.Result, res.Log, nil return types.RetCode(res.Code), res.Data, res.Log, nil
} }
func (cli *TMSPClient) CheckTxSync(tx []byte) (code tmsp.RetCode, result []byte, log string, err error) { func (cli *TMSPClient) CheckTxSync(tx []byte) (code types.RetCode, result []byte, log string, err error) {
reqres := cli.queueRequest(tmsp.RequestCheckTx{tx}) reqres := cli.queueRequest(types.RequestCheckTx(tx))
cli.FlushSync() cli.FlushSync()
if cli.err != nil { if cli.err != nil {
return tmsp.RetCodeInternalError, nil, "", cli.err return types.RetCodeInternalError, nil, "", cli.err
} }
res := reqres.Response.(tmsp.ResponseCheckTx) res := reqres.Response
return res.Code, res.Result, res.Log, nil return types.RetCode(res.Code), res.Data, res.Log, nil
} }
func (cli *TMSPClient) GetHashSync() (hash []byte, log string, err error) { func (cli *TMSPClient) GetHashSync() (hash []byte, log string, err error) {
reqres := cli.queueRequest(tmsp.RequestGetHash{}) reqres := cli.queueRequest(types.RequestGetHash())
cli.FlushSync() cli.FlushSync()
if cli.err != nil { if cli.err != nil {
return nil, "", cli.err return nil, "", cli.err
} }
res := reqres.Response.(tmsp.ResponseGetHash) res := reqres.Response
return res.Hash, res.Log, nil return res.Data, res.Log, nil
} }
func (cli *TMSPClient) QuerySync(query []byte) (result []byte, log string, err error) { func (cli *TMSPClient) QuerySync(query []byte) (result []byte, log string, err error) {
reqres := cli.queueRequest(tmsp.RequestQuery{query}) reqres := cli.queueRequest(types.RequestQuery(query))
cli.FlushSync() cli.FlushSync()
if cli.err != nil { if cli.err != nil {
return nil, "", cli.err return nil, "", cli.err
} }
res := reqres.Response.(tmsp.ResponseQuery) res := reqres.Response
return res.Result, res.Log, nil return res.Data, res.Log, nil
} }
//---------------------------------------- //----------------------------------------
func (cli *TMSPClient) queueRequest(req tmsp.Request) *reqRes { func (cli *TMSPClient) queueRequest(req *types.Request) *reqRes {
reqres := newReqRes(req) reqres := newReqRes(req)
// TODO: set cli.err if reqQueue times out // TODO: set cli.err if reqQueue times out
cli.reqQueue <- reqres cli.reqQueue <- reqres
// Maybe auto-flush, or unset auto-flush // Maybe auto-flush, or unset auto-flush
switch req.(type) { switch req.Type {
case tmsp.RequestFlush: case types.RequestTypeFlush:
cli.flushTimer.Unset() cli.flushTimer.Unset()
default: default:
cli.flushTimer.Set() cli.flushTimer.Set()
@ -285,37 +279,17 @@ func (cli *TMSPClient) queueRequest(req tmsp.Request) *reqRes {
//---------------------------------------- //----------------------------------------
func resMatchesReq(req tmsp.Request, res tmsp.Response) (ok bool) { func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
switch req.(type) { return req.Type+0x10 == res.Type
case tmsp.RequestEcho:
_, ok = res.(tmsp.ResponseEcho)
case tmsp.RequestFlush:
_, ok = res.(tmsp.ResponseFlush)
case tmsp.RequestInfo:
_, ok = res.(tmsp.ResponseInfo)
case tmsp.RequestSetOption:
_, ok = res.(tmsp.ResponseSetOption)
case tmsp.RequestAppendTx:
_, ok = res.(tmsp.ResponseAppendTx)
case tmsp.RequestCheckTx:
_, ok = res.(tmsp.ResponseCheckTx)
case tmsp.RequestGetHash:
_, ok = res.(tmsp.ResponseGetHash)
case tmsp.RequestQuery:
_, ok = res.(tmsp.ResponseQuery)
default:
return false
}
return
} }
type reqRes struct { type reqRes struct {
tmsp.Request *types.Request
*sync.WaitGroup *sync.WaitGroup
tmsp.Response // Not set atomically, so be sure to use WaitGroup. *types.Response // Not set atomically, so be sure to use WaitGroup.
} }
func newReqRes(req tmsp.Request) *reqRes { func newReqRes(req *types.Request) *reqRes {
return &reqRes{ return &reqRes{
Request: req, Request: req,
WaitGroup: waitGroup1(), WaitGroup: waitGroup1(),

View File

@ -8,14 +8,11 @@ import (
"io" "io"
"net" "net"
"os" "os"
"reflect"
"strings" "strings"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tmsp/types"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
. "github.com/tendermint/go-common"
"github.com/tendermint/tmsp/types"
) )
// connection is a global variable so it can be reused by the console // connection is a global variable so it can be reused by the console
@ -162,7 +159,7 @@ func cmdEcho(c *cli.Context) {
fmt.Println("echo takes 1 argument") fmt.Println("echo takes 1 argument")
return return
} }
res, err := makeRequest(conn, types.RequestEcho{args[0]}) res, err := makeRequest(conn, types.RequestEcho(args[0]))
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
return return
@ -172,7 +169,7 @@ func cmdEcho(c *cli.Context) {
// Get some info from the application // Get some info from the application
func cmdInfo(c *cli.Context) { func cmdInfo(c *cli.Context) {
res, err := makeRequest(conn, types.RequestInfo{}) res, err := makeRequest(conn, types.RequestInfo())
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
return return
@ -187,7 +184,7 @@ func cmdSetOption(c *cli.Context) {
fmt.Println("set_option takes 2 arguments (key, value)") fmt.Println("set_option takes 2 arguments (key, value)")
return return
} }
_, err := makeRequest(conn, types.RequestSetOption{args[0], args[1]}) _, err := makeRequest(conn, types.RequestSetOption(args[0], args[1]))
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
return return
@ -213,7 +210,7 @@ func cmdAppendTx(c *cli.Context) {
} }
} }
res, err := makeRequest(conn, types.RequestAppendTx{tx}) res, err := makeRequest(conn, types.RequestAppendTx(tx))
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
return return
@ -239,7 +236,7 @@ func cmdCheckTx(c *cli.Context) {
} }
} }
res, err := makeRequest(conn, types.RequestCheckTx{tx}) res, err := makeRequest(conn, types.RequestCheckTx(tx))
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
return return
@ -249,12 +246,12 @@ func cmdCheckTx(c *cli.Context) {
// Get application Merkle root hash // Get application Merkle root hash
func cmdGetHash(c *cli.Context) { func cmdGetHash(c *cli.Context) {
res, err := makeRequest(conn, types.RequestGetHash{}) res, err := makeRequest(conn, types.RequestGetHash())
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
return return
} }
fmt.Printf("%X\n", res.(types.ResponseGetHash).Hash) fmt.Printf("%X\n", res.Data)
} }
// Query application state // Query application state
@ -275,7 +272,7 @@ func cmdQuery(c *cli.Context) {
} }
} }
res, err := makeRequest(conn, types.RequestQuery{query}) res, err := makeRequest(conn, types.RequestQuery(query))
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
return return
@ -285,37 +282,35 @@ func cmdQuery(c *cli.Context) {
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
func makeRequest(conn net.Conn, req types.Request) (types.Response, error) { func makeRequest(conn net.Conn, req *types.Request) (*types.Response, error) {
var n int
var err error
// Write desired request // Write desired request
wire.WriteBinaryLengthPrefixed(struct{ types.Request }{req}, conn, &n, &err) err := types.WriteMessage(req, conn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Write flush request // Write flush request
wire.WriteBinaryLengthPrefixed(struct{ types.Request }{types.RequestFlush{}}, conn, &n, &err) err = types.WriteMessage(types.RequestFlush(), conn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Read desired response // Read desired response
var res types.Response var res = &types.Response{}
wire.ReadBinaryPtrLengthPrefixed(&res, conn, 0, &n, &err) err = types.ReadMessage(conn, res)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Read flush response // Read flush response
var resFlush types.Response var resFlush = &types.Response{}
wire.ReadBinaryPtrLengthPrefixed(&resFlush, conn, 0, &n, &err) err = types.ReadMessage(conn, resFlush)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if _, ok := resFlush.(types.ResponseFlush); !ok { if resFlush.Type != types.ResponseTypeFlush {
return nil, errors.New(Fmt("Expected types.ResponseFlush but got %v instead", reflect.TypeOf(resFlush))) return nil, errors.New(Fmt("Expected types.ResponseTypesFlush but got %v instead", resFlush.Type))
} }
return res, nil return res, nil

View File

@ -1,12 +1,10 @@
package example package example
import ( import (
"reflect"
"testing" "testing"
"time" "time"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tmsp/server" "github.com/tendermint/tmsp/server"
"github.com/tendermint/tmsp/types" "github.com/tendermint/tmsp/types"
) )
@ -32,19 +30,18 @@ func TestStream(t *testing.T) {
go func() { go func() {
counter := 0 counter := 0
for { for {
var n int
var err error var res = &types.Response{}
var res types.Response err := types.ReadMessage(conn, res)
wire.ReadBinaryPtrLengthPrefixed(&res, conn, 0, &n, &err)
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
// Process response // Process response
switch res := res.(type) { switch res.Type {
case types.ResponseAppendTx: case types.ResponseTypeAppendTx:
counter += 1 counter += 1
if res.Code != types.RetCodeOK { if types.RetCode(res.Code) != types.RetCodeOK {
t.Error("AppendTx failed with ret_code", res.Code) t.Error("AppendTx failed with ret_code", res.Code)
} }
if counter > numAppendTxs { if counter > numAppendTxs {
@ -57,10 +54,10 @@ func TestStream(t *testing.T) {
close(done) close(done)
}() }()
} }
case types.ResponseFlush: case types.ResponseTypeFlush:
// ignore // ignore
default: default:
t.Error("Unexpected response type", reflect.TypeOf(res)) t.Error("Unexpected response type", res.Type)
} }
} }
}() }()
@ -68,10 +65,8 @@ func TestStream(t *testing.T) {
// Write requests // Write requests
for counter := 0; counter < numAppendTxs; counter++ { for counter := 0; counter < numAppendTxs; counter++ {
// Send request // Send request
var n int var req = types.RequestAppendTx([]byte("test"))
var err error err := types.WriteMessage(req, conn)
var req types.Request = types.RequestAppendTx{TxBytes: []byte("test")}
wire.WriteBinaryLengthPrefixed(struct{ types.Request }{req}, conn, &n, &err)
if err != nil { if err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} }
@ -79,7 +74,7 @@ func TestStream(t *testing.T) {
// Sometimes send flush messages // Sometimes send flush messages
if counter%123 == 0 { if counter%123 == 0 {
t.Log("flush") t.Log("flush")
wire.WriteBinaryLengthPrefixed(struct{ types.Request }{types.RequestFlush{}}, conn, &n, &err) err := types.WriteMessage(types.RequestFlush(), conn)
if err != nil { if err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} }
@ -87,8 +82,7 @@ func TestStream(t *testing.T) {
} }
// Send final flush message // Send final flush message
var n int err = types.WriteMessage(types.RequestFlush(), conn)
wire.WriteBinaryLengthPrefixed(struct{ types.Request }{types.RequestFlush{}}, conn, &n, &err)
if err != nil { if err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} }

View File

@ -9,7 +9,6 @@ import (
"sync" "sync"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tmsp/types" "github.com/tendermint/tmsp/types"
) )
@ -40,8 +39,8 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error
fmt.Println("Accepted a new connection") fmt.Println("Accepted a new connection")
} }
closeConn := make(chan error, 2) // Push to signal connection closed closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan types.Response, 1000) // A channel to buffer responses responses := make(chan *types.Response, 1000) // A channel to buffer responses
// Read requests from conn and deal with them // Read requests from conn and deal with them
go handleRequests(&mtx, app, closeConn, conn, responses) go handleRequests(&mtx, app, closeConn, conn, responses)
@ -73,14 +72,13 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error
} }
// Read requests from conn and deal with them // Read requests from conn and deal with them
func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error, conn net.Conn, responses chan<- types.Response) { func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
var count int var count int
var bufReader = bufio.NewReader(conn) var bufReader = bufio.NewReader(conn)
for { for {
var n int
var err error var req = &types.Request{}
var req types.Request err := types.ReadMessage(bufReader, req)
wire.ReadBinaryPtrLengthPrefixed(&req, bufReader, 0, &n, &err)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
closeConn <- fmt.Errorf("Connection closed by client") closeConn <- fmt.Errorf("Connection closed by client")
@ -96,49 +94,47 @@ func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error
} }
} }
func handleRequest(app types.Application, req types.Request, responses chan<- types.Response) { func handleRequest(app types.Application, req *types.Request, responses chan<- *types.Response) {
switch req := req.(type) { switch req.Type {
case types.RequestEcho: case types.RequestTypeEcho:
responses <- types.ResponseEcho{req.Message} responses <- types.ResponseEcho(string(req.Data))
case types.RequestFlush: case types.RequestTypeFlush:
responses <- types.ResponseFlush{} responses <- types.ResponseFlush()
case types.RequestInfo: case types.RequestTypeInfo:
data := app.Info() data := app.Info()
responses <- types.ResponseInfo{data} responses <- types.ResponseInfo(data)
case types.RequestSetOption: case types.RequestTypeSetOption:
logStr := app.SetOption(req.Key, req.Value) logStr := app.SetOption(req.Key, req.Value)
responses <- types.ResponseSetOption{logStr} responses <- types.ResponseSetOption(logStr)
case types.RequestAppendTx: case types.RequestTypeAppendTx:
code, result, logStr := app.AppendTx(req.TxBytes) code, result, logStr := app.AppendTx(req.Data)
responses <- types.ResponseAppendTx{code, result, logStr} responses <- types.ResponseAppendTx(code, result, logStr)
case types.RequestCheckTx: case types.RequestTypeCheckTx:
code, result, logStr := app.CheckTx(req.TxBytes) code, result, logStr := app.CheckTx(req.Data)
responses <- types.ResponseCheckTx{code, result, logStr} responses <- types.ResponseCheckTx(code, result, logStr)
case types.RequestGetHash: case types.RequestTypeGetHash:
hash, logStr := app.GetHash() hash, logStr := app.GetHash()
responses <- types.ResponseGetHash{hash, logStr} responses <- types.ResponseGetHash(hash, logStr)
case types.RequestQuery: case types.RequestTypeQuery:
result, logStr := app.Query(req.QueryBytes) result, logStr := app.Query(req.Data)
responses <- types.ResponseQuery{result, logStr} responses <- types.ResponseQuery(result, logStr)
default: default:
responses <- types.ResponseException{"Unknown request"} responses <- types.ResponseException("Unknown request")
} }
} }
// Pull responses from 'responses' and write them to conn. // Pull responses from 'responses' and write them to conn.
func handleResponses(closeConn chan error, responses <-chan types.Response, conn net.Conn) { func handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
var count int var count int
var bufWriter = bufio.NewWriter(conn) var bufWriter = bufio.NewWriter(conn)
for { for {
var res = <-responses var res = <-responses
var n int err := types.WriteMessage(res, bufWriter)
var err error
wire.WriteBinaryLengthPrefixed(struct{ types.Response }{res}, bufWriter, &n, &err)
if err != nil { if err != nil {
closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error()) closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
return return
} }
if _, ok := res.(types.ResponseFlush); ok { if res.Type == types.ResponseTypeFlush {
err = bufWriter.Flush() err = bufWriter.Flush()
if err != nil { if err != nil {
closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error()) closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())

View File

@ -6,7 +6,6 @@ import (
//"encoding/hex" //"encoding/hex"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tmsp/types" "github.com/tendermint/tmsp/types"
) )
@ -21,10 +20,8 @@ func main() {
go func() { go func() {
counter := 0 counter := 0
for { for {
var res types.Response var res = &types.Response{}
var n int err := types.ReadMessage(conn, res)
var err error
wire.ReadBinaryPtrLengthPrefixed(&res, conn, 0, &n, &err)
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
@ -39,10 +36,9 @@ func main() {
counter := 0 counter := 0
for i := 0; ; i++ { for i := 0; ; i++ {
var bufWriter = bufio.NewWriter(conn) var bufWriter = bufio.NewWriter(conn)
var req types.Request = types.RequestEcho{"foobar"} var req = types.RequestEcho("foobar")
var n int
var err error err := types.WriteMessage(req, bufWriter)
wire.WriteBinaryLengthPrefixed(struct{ types.Request }{req}, bufWriter, &n, &err)
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
@ -50,6 +46,7 @@ func main() {
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
counter += 1 counter += 1
if counter%1000 == 0 { if counter%1000 == 0 {
fmt.Println("Write", counter) fmt.Println("Write", counter)

View File

@ -5,11 +5,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"reflect"
//"encoding/hex" //"encoding/hex"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tmsp/types" "github.com/tendermint/tmsp/types"
) )
@ -23,7 +21,7 @@ func main() {
// Make a bunch of requests // Make a bunch of requests
counter := 0 counter := 0
for i := 0; ; i++ { for i := 0; ; i++ {
req := types.RequestEcho{"foobar"} req := types.RequestEcho("foobar")
_, err := makeRequest(conn, req) _, err := makeRequest(conn, req)
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
@ -35,35 +33,32 @@ func main() {
} }
} }
func makeRequest(conn net.Conn, req types.Request) (types.Response, error) { func makeRequest(conn net.Conn, req *types.Request) (*types.Response, error) {
var bufWriter = bufio.NewWriter(conn) var bufWriter = bufio.NewWriter(conn)
var n int
var err error
// Write desired request // Write desired request
wire.WriteBinaryLengthPrefixed(struct{ types.Request }{req}, bufWriter, &n, &err) err := types.WriteMessage(req, bufWriter)
if err != nil { if err != nil {
return nil, err return nil, err
} }
bufWriter.Write([]byte{0x01, 0x01, types.RequestTypeFlush}) // Write flush msg err = types.WriteMessage(types.RequestFlush(), bufWriter)
err = bufWriter.Flush()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Read desired response // Read desired response
var res types.Response var res = &types.Response{}
wire.ReadBinaryPtrLengthPrefixed(&res, conn, 0, &n, &err) err = types.ReadMessage(conn, res)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var resFlush types.Response // Read flush msg var resFlush = &types.Response{}
wire.ReadBinaryPtrLengthPrefixed(&resFlush, conn, 0, &n, &err) err = types.ReadMessage(conn, resFlush)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if _, ok := resFlush.(types.ResponseFlush); !ok { if resFlush.Type != types.ResponseTypeFlush {
return nil, errors.New(Fmt("Expected flush response but got something else", reflect.TypeOf(resFlush))) return nil, errors.New(Fmt("Expected flush response but got something else: %v", resFlush.Type))
} }
return res, nil return res, nil

View File

@ -1,44 +0,0 @@
package benchmarks
import (
"bytes"
"testing"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tmsp/types"
)
func BenchmarkRequestWire(b *testing.B) {
b.StopTimer()
var bz = make([]byte, 1024)
copy(bz, []byte{1, 9, 0x01, 1, 6, 34, 34, 34, 34, 34, 34})
var buf = bytes.NewBuffer(bz)
var req types.Request
b.StartTimer()
for i := 0; i < b.N; i++ {
{
buf = bytes.NewBuffer(bz)
var n int
var err error
wire.ReadBinaryPtrLengthPrefixed(&req, buf, 0, &n, &err)
if err != nil {
Exit(err.Error())
return
}
}
{
buf = bytes.NewBuffer(bz)
var n int
var err error
wire.WriteBinaryLengthPrefixed(struct{ types.Request }{req}, buf, &n, &err)
if err != nil {
Exit(err.Error())
return
}
}
}
}

View File

@ -1,155 +1,180 @@
package types package types
import "github.com/tendermint/go-wire" import (
"io"
"github.com/golang/protobuf/proto"
"github.com/tendermint/go-wire"
)
const ( const (
RequestTypeEcho = byte(0x01) RequestTypeEcho = uint32(0x01)
RequestTypeFlush = byte(0x02) RequestTypeFlush = uint32(0x02)
RequestTypeInfo = byte(0x03) RequestTypeInfo = uint32(0x03)
RequestTypeSetOption = byte(0x04) RequestTypeSetOption = uint32(0x04)
// reserved for GetOption = byte(0x05) // reserved for GetOption = uint32(0x05)
ResponseTypeException = byte(0x10) ResponseTypeException = uint32(0x10)
ResponseTypeEcho = byte(0x11) ResponseTypeEcho = uint32(0x11)
ResponseTypeFlush = byte(0x12) ResponseTypeFlush = uint32(0x12)
ResponseTypeInfo = byte(0x13) ResponseTypeInfo = uint32(0x13)
ResponseTypeSetOption = byte(0x14) ResponseTypeSetOption = uint32(0x14)
// reserved for GetOption = byte(0x15) // reserved for GetOption = uint32(0x15)
RequestTypeAppendTx = byte(0x21) RequestTypeAppendTx = uint32(0x21)
RequestTypeCheckTx = byte(0x22) RequestTypeCheckTx = uint32(0x22)
RequestTypeGetHash = byte(0x23) RequestTypeGetHash = uint32(0x23)
RequestTypeQuery = byte(0x24) RequestTypeQuery = uint32(0x24)
ResponseTypeAppendTx = byte(0x31) ResponseTypeAppendTx = uint32(0x31)
ResponseTypeCheckTx = byte(0x32) ResponseTypeCheckTx = uint32(0x32)
ResponseTypeGetHash = byte(0x33) ResponseTypeGetHash = uint32(0x33)
ResponseTypeQuery = byte(0x34) ResponseTypeQuery = uint32(0x34)
) )
func RequestEcho(message string) *Request {
return &Request{
Type: RequestTypeEcho,
Data: []byte(message),
}
}
func RequestFlush() *Request {
return &Request{
Type: RequestTypeFlush,
}
}
func RequestInfo() *Request {
return &Request{
Type: RequestTypeInfo,
}
}
func RequestSetOption(key string, value string) *Request {
return &Request{
Type: RequestTypeSetOption,
Key: key,
Value: value,
}
}
func RequestAppendTx(txBytes []byte) *Request {
return &Request{
Type: RequestTypeAppendTx,
Data: txBytes,
}
}
func RequestCheckTx(txBytes []byte) *Request {
return &Request{
Type: RequestTypeCheckTx,
Data: txBytes,
}
}
func RequestGetHash() *Request {
return &Request{
Type: RequestTypeGetHash,
}
}
func RequestQuery(queryBytes []byte) *Request {
return &Request{
Type: RequestTypeQuery,
Data: queryBytes,
}
}
//---------------------------------------- //----------------------------------------
type RequestEcho struct { func ResponseException(errStr string) *Response {
Message string return &Response{
Type: ResponseTypeException,
Error: errStr,
}
} }
type RequestFlush struct { func ResponseEcho(message string) *Response {
return &Response{
Type: ResponseTypeEcho,
Data: []byte(message),
}
} }
type RequestInfo struct { func ResponseFlush() *Response {
return &Response{
Type: ResponseTypeFlush,
}
} }
type RequestSetOption struct { func ResponseInfo(info string) *Response {
Key string return &Response{
Value string Type: ResponseTypeInfo,
Data: []byte(info),
}
} }
type RequestAppendTx struct { func ResponseSetOption(log string) *Response {
TxBytes []byte return &Response{
Type: ResponseTypeSetOption,
Log: log,
}
} }
type RequestCheckTx struct { func ResponseAppendTx(code RetCode, result []byte, log string) *Response {
TxBytes []byte return &Response{
Type: ResponseTypeAppendTx,
Data: result,
Log: log,
}
} }
type RequestGetHash struct { func ResponseCheckTx(code RetCode, result []byte, log string) *Response {
return &Response{
Type: ResponseTypeCheckTx,
Data: result,
Log: log,
}
} }
type RequestQuery struct { func ResponseGetHash(hash []byte, log string) *Response {
QueryBytes []byte return &Response{
Type: ResponseTypeGetHash,
Data: hash,
Log: log,
}
} }
type Request interface { func ResponseQuery(result []byte, log string) *Response {
AssertRequestType() return &Response{
Type: ResponseTypeQuery,
Data: result,
Log: log,
}
} }
func (_ RequestEcho) AssertRequestType() {}
func (_ RequestFlush) AssertRequestType() {}
func (_ RequestInfo) AssertRequestType() {}
func (_ RequestSetOption) AssertRequestType() {}
func (_ RequestAppendTx) AssertRequestType() {}
func (_ RequestCheckTx) AssertRequestType() {}
func (_ RequestGetHash) AssertRequestType() {}
func (_ RequestQuery) AssertRequestType() {}
var _ = wire.RegisterInterface(
struct{ Request }{},
wire.ConcreteType{RequestEcho{}, RequestTypeEcho},
wire.ConcreteType{RequestFlush{}, RequestTypeFlush},
wire.ConcreteType{RequestInfo{}, RequestTypeInfo},
wire.ConcreteType{RequestSetOption{}, RequestTypeSetOption},
wire.ConcreteType{RequestAppendTx{}, RequestTypeAppendTx},
wire.ConcreteType{RequestCheckTx{}, RequestTypeCheckTx},
wire.ConcreteType{RequestGetHash{}, RequestTypeGetHash},
wire.ConcreteType{RequestQuery{}, RequestTypeQuery},
)
//---------------------------------------- //----------------------------------------
type ResponseException struct { // Write proto message, length delimited
Error string func WriteMessage(msg proto.Message, w io.Writer) error {
bz, err := proto.Marshal(msg)
if err != nil {
return err
}
var n int
wire.WriteByteSlice(bz, w, &n, &err)
return err
} }
type ResponseEcho struct { // Read proto message, length delimited
Message string func ReadMessage(r io.Reader, msg proto.Message) error {
var n int
var err error
bz := wire.ReadByteSlice(r, 0, &n, &err)
if err != nil {
return err
}
err = proto.Unmarshal(bz, msg)
return err
} }
type ResponseFlush struct {
}
type ResponseInfo struct {
Info string
}
type ResponseSetOption struct {
Log string
}
type ResponseAppendTx struct {
Code RetCode
Result []byte
Log string
}
type ResponseCheckTx struct {
Code RetCode
Result []byte
Log string
}
type ResponseGetHash struct {
Hash []byte
Log string
}
type ResponseQuery struct {
Result []byte
Log string
}
type Response interface {
AssertResponseType()
}
func (_ ResponseEcho) AssertResponseType() {}
func (_ ResponseFlush) AssertResponseType() {}
func (_ ResponseInfo) AssertResponseType() {}
func (_ ResponseSetOption) AssertResponseType() {}
func (_ ResponseAppendTx) AssertResponseType() {}
func (_ ResponseCheckTx) AssertResponseType() {}
func (_ ResponseGetHash) AssertResponseType() {}
func (_ ResponseException) AssertResponseType() {}
func (_ ResponseQuery) AssertResponseType() {}
var _ = wire.RegisterInterface(
struct{ Response }{},
wire.ConcreteType{ResponseEcho{}, ResponseTypeEcho},
wire.ConcreteType{ResponseFlush{}, ResponseTypeFlush},
wire.ConcreteType{ResponseInfo{}, ResponseTypeInfo},
wire.ConcreteType{ResponseSetOption{}, ResponseTypeSetOption},
wire.ConcreteType{ResponseAppendTx{}, ResponseTypeAppendTx},
wire.ConcreteType{ResponseCheckTx{}, ResponseTypeCheckTx},
wire.ConcreteType{ResponseGetHash{}, ResponseTypeGetHash},
wire.ConcreteType{ResponseException{}, ResponseTypeException},
wire.ConcreteType{ResponseQuery{}, ResponseTypeQuery},
)

View File

@ -1,6 +1,6 @@
package types package types
type RetCode int type RetCode uint
// Reserved return codes // Reserved return codes
const ( const (

69
types/types.pb.go Normal file
View File

@ -0,0 +1,69 @@
// Code generated by protoc-gen-go.
// source: types/types.proto
// DO NOT EDIT!
/*
Package types is a generated protocol buffer package.
It is generated from these files:
types/types.proto
It has these top-level messages:
Request
Response
*/
package types
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type Request struct {
Type uint32 `protobuf:"varint,1,opt,name=type" json:"type,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
Key string `protobuf:"bytes,3,opt,name=key" json:"key,omitempty"`
Value string `protobuf:"bytes,4,opt,name=value" json:"value,omitempty"`
}
func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type Response struct {
Type uint32 `protobuf:"varint,1,opt,name=type" json:"type,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
Code uint32 `protobuf:"varint,3,opt,name=code" json:"code,omitempty"`
Error string `protobuf:"bytes,4,opt,name=error" json:"error,omitempty"`
Log string `protobuf:"bytes,5,opt,name=log" json:"log,omitempty"`
}
func (m *Response) Reset() { *m = Response{} }
func (m *Response) String() string { return proto.CompactTextString(m) }
func (*Response) ProtoMessage() {}
func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func init() {
proto.RegisterType((*Request)(nil), "types.Request")
proto.RegisterType((*Response)(nil), "types.Response")
}
var fileDescriptor0 = []byte{
// 165 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x8f, 0xb1, 0xae, 0xc2, 0x30,
0x0c, 0x45, 0xd5, 0xd7, 0xe6, 0x01, 0x16, 0x48, 0x10, 0x31, 0x64, 0x44, 0x9d, 0x98, 0x60, 0xe0,
0x4f, 0xb2, 0x31, 0x06, 0x6a, 0x31, 0x50, 0xd5, 0x21, 0x49, 0x91, 0xfa, 0xf7, 0xd8, 0xae, 0xd8,
0x59, 0xa2, 0x73, 0x8f, 0xa2, 0xab, 0x6b, 0xd8, 0x95, 0x29, 0x62, 0x3e, 0xeb, 0x7b, 0x8a, 0x89,
0x0a, 0x59, 0xa3, 0xa1, 0xbd, 0xc2, 0xc2, 0xe3, 0x6b, 0xc4, 0x5c, 0xac, 0x85, 0x46, 0x9c, 0xab,
0x0e, 0xd5, 0x71, 0xe3, 0x95, 0xc5, 0x75, 0xa1, 0x04, 0xf7, 0xc7, 0x6e, 0xed, 0x95, 0xed, 0x16,
0xea, 0x27, 0x4e, 0xae, 0x66, 0xb5, 0xf2, 0x82, 0x76, 0x0f, 0xe6, 0x1d, 0xfa, 0x11, 0x5d, 0xa3,
0x6e, 0x0e, 0xed, 0x00, 0x4b, 0x8f, 0x39, 0xd2, 0x90, 0xf1, 0xe7, 0x6e, 0x76, 0x77, 0xea, 0x50,
0xcb, 0xf9, 0x9f, 0xb0, 0xb4, 0x63, 0x4a, 0x94, 0xbe, 0xed, 0x1a, 0x64, 0x45, 0x4f, 0x0f, 0x67,
0xe6, 0x15, 0x8c, 0xb7, 0x7f, 0x3d, 0xec, 0xf2, 0x09, 0x00, 0x00, 0xff, 0xff, 0xce, 0x9d, 0x3d,
0x4f, 0xed, 0x00, 0x00, 0x00,
}

47
types/types.proto Normal file
View File

@ -0,0 +1,47 @@
syntax = "proto3";
package types;
//----------------------------------------
// Message types
/*
RequestTypeEcho = 0x01;
RequestTypeFlush = 0x02;
RequestTypeInfo = 0x03;
RequestTypeSetOption = 0x04;
RequestTypeAppendTx = 0x21;
RequestTypeCheckTx = 0x22;
RequestTypeGetHash = 0x23;
RequestTypeQuery = 0x24;
ResponseTypeEcho = 0x11;
ResponseTypeFlush = 0x12;
ResponseTypeInfo = 0x13;
ResponseTypeSetOption = 0x14;
ResponseTypeAppendTx = 0x31;
ResponseTypeCheckTx = 0x32;
ResponseTypeGetHash = 0x33;
ResponseTypeQuery = 0x34;
*/
//----------------------------------------
// Request types
message Request {
uint32 type = 1;
bytes data = 2;
string key = 3;
string value = 4;
}
//----------------------------------------
// Response types
message Response {
uint32 type = 1;
bytes data = 2;
uint32 code = 3;
string error = 4;
string log = 5;
}