Refactor Application to Application & AppContext

This commit is contained in:
Jae Kwon 2015-11-29 14:22:30 -08:00
parent 8b9df7d685
commit 5d994944c6
5 changed files with 175 additions and 61 deletions

View File

@ -66,7 +66,7 @@ func cmdAppendTx(c *cli.Context) {
if err != nil {
Exit(err.Error())
}
res, err := write(conn, types.RequestAppendTx{[]byte(args[0])})
res, err := makeRequest(conn, types.RequestAppendTx{[]byte(args[0])})
if err != nil {
Exit(err.Error())
}
@ -79,7 +79,7 @@ func cmdGetHash(c *cli.Context) {
if err != nil {
Exit(err.Error())
}
res, err := write(conn, types.RequestGetHash{})
res, err := makeRequest(conn, types.RequestGetHash{})
if err != nil {
Exit(err.Error())
}
@ -92,7 +92,7 @@ func cmdCommit(c *cli.Context) {
if err != nil {
Exit(err.Error())
}
_, err = write(conn, types.RequestCommit{})
_, err = makeRequest(conn, types.RequestCommit{})
if err != nil {
Exit(err.Error())
}
@ -105,7 +105,7 @@ func cmdRollback(c *cli.Context) {
if err != nil {
Exit(err.Error())
}
_, err = write(conn, types.RequestRollback{})
_, err = makeRequest(conn, types.RequestRollback{})
if err != nil {
Exit(err.Error())
}
@ -114,21 +114,35 @@ func cmdRollback(c *cli.Context) {
//--------------------------------------------------------------------------------
func write(conn net.Conn, req types.Request) (types.Response, error) {
func makeRequest(conn net.Conn, req types.Request) (types.Response, error) {
var n int
var err error
// Write desired request
wire.WriteBinary(req, conn, &n, &err)
if err != nil {
return nil, err
}
// flush!
// Write flush request
wire.WriteBinary(types.RequestFlush{}, conn, &n, &err)
if err != nil {
return nil, err
}
// Read desired response
var res types.Response
wire.ReadBinaryPtr(&res, conn, 0, &n, &err)
return res, err
if err != nil {
return nil, err
}
// Read flush response
var resFlush types.ResponseFlush
wire.ReadBinaryPtr(&resFlush, conn, 0, &n, &err)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -1,6 +1,8 @@
package example
import (
"sync"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-merkle"
"github.com/tendermint/go-wire"
@ -8,8 +10,8 @@ import (
)
type DummyApplication struct {
state merkle.Tree
lastCommitState merkle.Tree
mtx sync.Mutex
state merkle.Tree
}
func NewDummyApplication() *DummyApplication {
@ -19,48 +21,77 @@ func NewDummyApplication() *DummyApplication {
0,
nil,
)
return &DummyApplication{
state: state,
lastCommitState: state,
return &DummyApplication{state: state}
}
func (dapp *DummyApplication) Open() types.AppContext {
dapp.mtx.Lock()
defer dapp.mtx.Unlock()
return &DummyAppContext{
app: dapp,
state: dapp.state.Copy(),
}
}
func (dapp *DummyApplication) Echo(message string) string {
func (dapp *DummyApplication) commitState(state merkle.Tree) {
dapp.mtx.Lock()
defer dapp.mtx.Unlock()
dapp.state = state.Copy()
}
func (dapp *DummyApplication) getState() merkle.Tree {
dapp.mtx.Lock()
defer dapp.mtx.Unlock()
return dapp.state.Copy()
}
//--------------------------------------------------------------------------------
type DummyAppContext struct {
app *DummyApplication
state merkle.Tree
}
func (dac *DummyAppContext) Echo(message string) string {
return message
}
func (dapp *DummyApplication) Info() []string {
return []string{Fmt("size:%v", dapp.state.Size())}
func (dac *DummyAppContext) Info() []string {
return []string{Fmt("size:%v", dac.state.Size())}
}
func (dapp *DummyApplication) SetOption(key string, value string) types.RetCode {
func (dac *DummyAppContext) SetOption(key string, value string) types.RetCode {
return 0
}
func (dapp *DummyApplication) AppendTx(tx []byte) ([]types.Event, types.RetCode) {
dapp.state.Set(tx, tx)
func (dac *DummyAppContext) AppendTx(tx []byte) ([]types.Event, types.RetCode) {
dac.state.Set(tx, tx)
return nil, 0
}
func (dapp *DummyApplication) GetHash() ([]byte, types.RetCode) {
hash := dapp.state.Hash()
func (dac *DummyAppContext) GetHash() ([]byte, types.RetCode) {
hash := dac.state.Hash()
return hash, 0
}
func (dapp *DummyApplication) Commit() types.RetCode {
dapp.lastCommitState = dapp.state.Copy()
func (dac *DummyAppContext) Commit() types.RetCode {
dac.app.commitState(dac.state)
return 0
}
func (dapp *DummyApplication) Rollback() types.RetCode {
dapp.state = dapp.lastCommitState.Copy()
func (dac *DummyAppContext) Rollback() types.RetCode {
dac.state = dac.app.getState()
return 0
}
func (dapp *DummyApplication) AddListener(key string) types.RetCode {
func (dac *DummyAppContext) AddListener(key string) types.RetCode {
return 0
}
func (dapp *DummyApplication) RemListener(key string) types.RetCode {
func (dac *DummyAppContext) RemListener(key string) types.RetCode {
return 0
}
func (dac *DummyAppContext) Close() error {
return nil
}

View File

@ -1,8 +1,9 @@
package example
import (
// "fmt"
"reflect"
"testing"
"time"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
@ -12,6 +13,8 @@ import (
func TestStream(t *testing.T) {
numAppendTxs := 200000
// Start the listener
_, err := server.StartListener("tcp://127.0.0.1:8080", NewDummyApplication())
if err != nil {
@ -25,7 +28,9 @@ func TestStream(t *testing.T) {
}
// Read response data
done := make(chan struct{})
go func() {
counter := 0
for {
var n int
var err error
@ -34,19 +39,59 @@ func TestStream(t *testing.T) {
if err != nil {
Exit(err.Error())
}
// fmt.Println("Read", n)
// Process response
switch res := res.(type) {
case types.ResponseAppendTx:
counter += 1
if res.RetCode != types.RetCodeOK {
t.Error("AppendTx failed with ret_code", res.RetCode)
}
if counter > numAppendTxs {
t.Fatal("Too many AppendTx responses")
}
t.Log("response", counter)
if counter == numAppendTxs {
go func() {
time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow
close(done)
}()
}
case types.ResponseFlush:
// ignore
default:
t.Error("Unexpected response type", reflect.TypeOf(res))
}
}
}()
// Write requests
for {
for counter := 0; counter < numAppendTxs; counter++ {
// Send request
var n int
var err error
var req types.Request = types.RequestAppendTx{TxBytes: []byte("test")}
wire.WriteBinary(req, conn, &n, &err)
if err != nil {
Exit(err.Error())
t.Fatal(err.Error())
}
// Sometimes send flush messages
if counter%123 == 0 {
t.Log("flush")
wire.WriteBinary(types.RequestFlush{}, conn, &n, &err)
if err != nil {
t.Fatal(err.Error())
}
}
// fmt.Println("Wrote", n)
}
// Send final flush message
var n int
wire.WriteBinary(types.RequestFlush{}, conn, &n, &err)
if err != nil {
t.Fatal(err.Error())
}
<-done
}

View File

@ -11,7 +11,7 @@ import (
"github.com/tendermint/tmsp/types"
)
var maxNumberConnections = 2
// var maxNumberConnections = 2
func StartListener(protoAddr string, app types.Application) (net.Listener, error) {
parts := strings.SplitN(protoAddr, "://", 2)
@ -23,33 +23,51 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error
// A goroutine to accept a connection.
go func() {
semaphore := make(chan struct{}, maxNumberConnections)
// semaphore := make(chan struct{}, maxNumberConnections)
for {
semaphore <- struct{}{}
// semaphore <- struct{}{}
// Accept a connection
fmt.Println("Waiting for new connection...")
conn, err := ln.Accept()
if err != nil {
Exit("Failed to accept connection")
} else {
fmt.Println("Accepted a new connection")
}
connClosed := make(chan struct{}, 2) // Push to signal connection closed
appContext := app.Open()
closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan types.Response, 1000) // A channel to buffer responses
// Read requests from conn and deal with them
go handleRequests(app, connClosed, conn, responses)
go handleRequests(appContext, closeConn, conn, responses)
// Pull responses from 'responses' and write them to conn.
go handleResponses(connClosed, responses, conn)
go handleResponses(closeConn, responses, conn)
go func() {
// Wait until connection is closed
<-connClosed
fmt.Println("Connection was closed. Waiting for new connection...")
// Wait until signal to close connection
errClose := <-closeConn
if errClose != nil {
fmt.Printf("Connection error: %v\n", errClose)
} else {
fmt.Println("Connection was closed.")
}
<-semaphore
// Close the connection
err := conn.Close()
if err != nil {
fmt.Printf("Error in closing connection: %v\n", err)
}
// Close the AppContext
err = appContext.Close()
if err != nil {
fmt.Printf("Error in closing app context: %v\n", err)
}
// <-semaphore
}()
}
@ -59,7 +77,7 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error
}
// Read requests from conn and deal with them
func handleRequests(app types.Application, connClosed chan struct{}, conn net.Conn, responses chan<- types.Response) {
func handleRequests(appC types.AppContext, closeConn chan error, conn net.Conn, responses chan<- types.Response) {
var count int
var bufReader = bufio.NewReader(conn)
for {
@ -68,48 +86,47 @@ func handleRequests(app types.Application, connClosed chan struct{}, conn net.Co
var req types.Request
wire.ReadBinaryPtr(&req, bufReader, 0, &n, &err)
if err != nil {
fmt.Println("Error in handleRequests:", err.Error())
connClosed <- struct{}{}
closeConn <- fmt.Errorf("Error in handleRequests: %v", err.Error())
return
}
count++
handleRequest(app, req, responses)
handleRequest(appC, req, responses)
}
}
func handleRequest(app types.Application, req types.Request, responses chan<- types.Response) {
func handleRequest(appC types.AppContext, req types.Request, responses chan<- types.Response) {
switch req := req.(type) {
case types.RequestEcho:
msg := app.Echo(req.Message)
msg := appC.Echo(req.Message)
responses <- types.ResponseEcho{msg}
case types.RequestFlush:
responses <- types.ResponseFlush{}
case types.RequestInfo:
data := app.Info()
data := appC.Info()
responses <- types.ResponseInfo{data}
case types.RequestSetOption:
retCode := app.SetOption(req.Key, req.Value)
retCode := appC.SetOption(req.Key, req.Value)
responses <- types.ResponseSetOption{retCode}
case types.RequestAppendTx:
events, retCode := app.AppendTx(req.TxBytes)
events, retCode := appC.AppendTx(req.TxBytes)
responses <- types.ResponseAppendTx{retCode}
for _, event := range events {
responses <- types.ResponseEvent{event}
}
case types.RequestGetHash:
hash, retCode := app.GetHash()
hash, retCode := appC.GetHash()
responses <- types.ResponseGetHash{retCode, hash}
case types.RequestCommit:
retCode := app.Commit()
retCode := appC.Commit()
responses <- types.ResponseCommit{retCode}
case types.RequestRollback:
retCode := app.Rollback()
retCode := appC.Rollback()
responses <- types.ResponseRollback{retCode}
case types.RequestAddListener:
retCode := app.AddListener(req.EventKey)
retCode := appC.AddListener(req.EventKey)
responses <- types.ResponseAddListener{retCode}
case types.RequestRemListener:
retCode := app.RemListener(req.EventKey)
retCode := appC.RemListener(req.EventKey)
responses <- types.ResponseRemListener{retCode}
default:
responses <- types.ResponseException{"Unknown request"}
@ -117,7 +134,7 @@ func handleRequest(app types.Application, req types.Request, responses chan<- ty
}
// Pull responses from 'responses' and write them to conn.
func handleResponses(connClosed chan struct{}, responses <-chan types.Response, conn net.Conn) {
func handleResponses(closeConn chan error, responses <-chan types.Response, conn net.Conn) {
var count int
var bufWriter = bufio.NewWriter(conn)
for {
@ -126,15 +143,13 @@ func handleResponses(connClosed chan struct{}, responses <-chan types.Response,
var err error
wire.WriteBinary(res, bufWriter, &n, &err)
if err != nil {
fmt.Println(err.Error())
connClosed <- struct{}{}
closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
return
}
if _, ok := res.(types.ResponseFlush); ok {
err = bufWriter.Flush()
if err != nil {
fmt.Println(err.Error())
connClosed <- struct{}{}
closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
return
}
}

View File

@ -2,6 +2,12 @@ package types
type Application interface {
// For new socket connections
Open() AppContext
}
type AppContext interface {
// Echo a message
Echo(message string) string
@ -28,4 +34,7 @@ type Application interface {
// Remove event listener
RemListener(key string) RetCode
// Close this AppContext
Close() error
}