From ef93c958532d7a827033a491ee55e711f00f4ecb Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 8 Nov 2015 15:18:58 -0800 Subject: [PATCH] Added new Flush request/response type --- server/server.go | 19 ++++++++++++++++--- types/messages.go | 40 ++++++++++++++++++++++++++-------------- 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/server/server.go b/server/server.go index 2b22300e..161e5e0b 100644 --- a/server/server.go +++ b/server/server.go @@ -1,6 +1,7 @@ package server import ( + "bufio" "fmt" "net" "reflect" @@ -51,11 +52,12 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error // Read requests from conn and deal with them func handleRequests(app types.Application, connClosed chan struct{}, conn net.Conn, responses chan<- types.Response) { var count int + var bufReader = bufio.NewReader(conn) for { var n int64 var err error var req types.Request - wire.ReadBinaryPtr(&req, conn, &n, &err) + wire.ReadBinaryPtr(&req, bufReader, &n, &err) if err != nil { fmt.Println(err.Error()) @@ -77,6 +79,8 @@ func handleRequest(app types.Application, req types.Request, responses chan<- ty case types.RequestEcho: retCode, msg := app.Echo(req.Message) responses <- types.ResponseEcho{retCode, msg} + case types.RequestFlush: + responses <- types.ResponseFlush{} case types.RequestAppendTx: retCode := app.AppendTx(req.TxBytes) responses <- types.ResponseAppendTx{retCode} @@ -116,18 +120,27 @@ func handleRequest(app types.Application, req types.Request, responses chan<- ty // Pull responses from 'responses' and write them to conn. func handleResponses(connClosed chan struct{}, responses <-chan types.Response, conn net.Conn) { var count int + var bufWriter = bufio.NewWriter(conn) for { var res = <-responses var n int64 var err error - wire.WriteBinary(res, conn, &n, &err) - + wire.WriteBinary(res, bufWriter, &n, &err) if err != nil { fmt.Println(err.Error()) connClosed <- struct{}{} return } + if _, ok := res.(types.ResponseFlush); ok { + err = bufWriter.Flush() + if err != nil { + fmt.Println(err.Error()) + connClosed <- struct{}{} + return + } + } + count++ if count%1000 == 0 { fmt.Println("Sent response", reflect.TypeOf(res), res, n, err, count) diff --git a/types/messages.go b/types/messages.go index da4731ae..70470a2d 100644 --- a/types/messages.go +++ b/types/messages.go @@ -4,22 +4,24 @@ import "github.com/tendermint/go-wire" const ( requestTypeEcho = byte(0x01) - requestTypeAppendTx = byte(0x02) - requestTypeGetHash = byte(0x03) - requestTypeCommit = byte(0x04) - requestTypeRollback = byte(0x05) - requestTypeSetEventsMode = byte(0x06) - requestTypeAddListener = byte(0x07) - requestTypeRemListener = byte(0x08) + requestTypeFlush = byte(0x02) + requestTypeAppendTx = byte(0x03) + requestTypeGetHash = byte(0x04) + requestTypeCommit = byte(0x05) + requestTypeRollback = byte(0x06) + requestTypeSetEventsMode = byte(0x07) + requestTypeAddListener = byte(0x08) + requestTypeRemListener = byte(0x09) responseTypeEcho = byte(0x11) - responseTypeAppendTx = byte(0x12) - responseTypeGetHash = byte(0x13) - responseTypeCommit = byte(0x14) - responseTypeRollback = byte(0x15) - responseTypeSetEventsMode = byte(0x16) - responseTypeAddListener = byte(0x17) - responseTypeRemListener = byte(0x18) + responseTypeFlush = byte(0x12) + responseTypeAppendTx = byte(0x13) + responseTypeGetHash = byte(0x14) + responseTypeCommit = byte(0x15) + responseTypeRollback = byte(0x16) + responseTypeSetEventsMode = byte(0x17) + responseTypeAddListener = byte(0x18) + responseTypeRemListener = byte(0x19) responseTypeException = byte(0x20) responseTypeEvent = byte(0x21) @@ -31,6 +33,9 @@ type RequestEcho struct { Message string } +type RequestFlush struct { +} + type RequestAppendTx struct { TxBytes []byte } @@ -61,6 +66,7 @@ type Request interface { } func (_ RequestEcho) AssertRequestType() {} +func (_ RequestFlush) AssertRequestType() {} func (_ RequestAppendTx) AssertRequestType() {} func (_ RequestGetHash) AssertRequestType() {} func (_ RequestCommit) AssertRequestType() {} @@ -72,6 +78,7 @@ func (_ RequestRemListener) AssertRequestType() {} var _ = wire.RegisterInterface( struct{ Request }{}, wire.ConcreteType{RequestEcho{}, requestTypeEcho}, + wire.ConcreteType{RequestFlush{}, requestTypeFlush}, wire.ConcreteType{RequestAppendTx{}, requestTypeAppendTx}, wire.ConcreteType{RequestGetHash{}, requestTypeGetHash}, wire.ConcreteType{RequestCommit{}, requestTypeCommit}, @@ -88,6 +95,9 @@ type ResponseEcho struct { Message string } +type ResponseFlush struct { +} + type ResponseAppendTx struct { RetCode } @@ -130,6 +140,7 @@ type Response interface { } func (_ ResponseEcho) AssertResponseType() {} +func (_ ResponseFlush) AssertResponseType() {} func (_ ResponseAppendTx) AssertResponseType() {} func (_ ResponseGetHash) AssertResponseType() {} func (_ ResponseCommit) AssertResponseType() {} @@ -143,6 +154,7 @@ func (_ ResponseEvent) AssertResponseType() {} var _ = wire.RegisterInterface( struct{ Response }{}, wire.ConcreteType{ResponseEcho{}, responseTypeEcho}, + wire.ConcreteType{ResponseFlush{}, responseTypeFlush}, wire.ConcreteType{ResponseAppendTx{}, responseTypeAppendTx}, wire.ConcreteType{ResponseGetHash{}, responseTypeGetHash}, wire.ConcreteType{ResponseCommit{}, responseTypeCommit},