From 0bd4061cf6496df6796d456f21fc6df28d20e48f Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 2 Nov 2015 07:39:53 -0800 Subject: [PATCH] initial commit --- README.md | 63 ++++++++++++++++++ cmd/cli.go | 127 +++++++++++++++++++++++++++++++++++ example/main.go | 84 +++++++++++++++++++++++ example/main_test.go | 52 +++++++++++++++ server/server.go | 136 +++++++++++++++++++++++++++++++++++++ types/application.go | 31 +++++++++ types/events.go | 14 ++++ types/messages.go | 155 +++++++++++++++++++++++++++++++++++++++++++ types/retcode.go | 12 ++++ 9 files changed, 674 insertions(+) create mode 100644 README.md create mode 100644 cmd/cli.go create mode 100644 example/main.go create mode 100644 example/main_test.go create mode 100644 server/server.go create mode 100644 types/application.go create mode 100644 types/events.go create mode 100644 types/messages.go create mode 100644 types/retcode.go diff --git a/README.md b/README.md new file mode 100644 index 00000000..2e5b5652 --- /dev/null +++ b/README.md @@ -0,0 +1,63 @@ +# Tendermint Streaming Protocol (TMSP) + +**TMSP** is a socket protocol, which means applications can be written in any programming language. +TMSP is an asynchronous streaming protocol: message responses are written back asynchronously to the platform. + +*Applications must be deterministic.* + +## Message types + +#### AppendTx + * __Arguments__: + * `TxBytes ([]byte)` + * __Returns__: + * `RetCode (int8)` + * __Usage__:
+ Append and run a transaction. The transaction may or may not be final. + +#### GetHash + * __Returns__: + * `RetCode (int8)` + * `Hash ([]byte)` + * __Usage__:
+ Return a Merkle root hash of the application state + +#### Commit + * __Returns__: + * `RetCode (int8)` + * __Usage__:
+ Finalize all appended transactions + +#### Rollback + * __Returns__: + * `RetCode (int8)` + * __Usage__:
+ Roll back to the last commit + +#### SetEventsMode + * __Arguments__: + * `EventsMode (int8)`: + * `EventsModeOff (0)`: Events are not reported. Used for mempool. + * `EventsModeCached (1)`: Events are cached. + * `EventsModeOn (2)`: Flush cache and report events. + * __Returns__: + * `RetCode (int8)` + * __Usage__:
+ Set event reporting mode for future transactions + +#### AddListener + * __Arguments__: + * `EventKey (string)` + * __Returns__: + * `RetCode (int8)` + * __Usage__:
+ Add event listener callback for events with given key. + +#### RemoveListener + * __Arguments__: + * `EventKey (string)` + * __Returns__: + * `RetCode (int8)` + * __Usage__:
+ Remove event listener callback for events with given key. + diff --git a/cmd/cli.go b/cmd/cli.go new file mode 100644 index 00000000..2885b991 --- /dev/null +++ b/cmd/cli.go @@ -0,0 +1,127 @@ +package main + +import ( + "fmt" + "net" + "os" + + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" + "github.com/tendermint/tmsp/types" + + "github.com/codegangsta/cli" +) + +func main() { + app := cli.NewApp() + app.Name = "cli" + app.Usage = "cli [command] [args...]" + app.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "address", + Value: "tcp://127.0.0.1:8080", + Usage: "address of application socket", + }, + } + app.Commands = []cli.Command{ + { + Name: "append_tx", + Usage: "Append a new tx to application", + Action: func(c *cli.Context) { + cmdAppendTx(c) + }, + }, + { + Name: "get_hash", + Usage: "Get application Merkle root hash", + Action: func(c *cli.Context) { + cmdGetHash(c) + }, + }, + { + Name: "commit", + Usage: "Commit the application state", + Action: func(c *cli.Context) { + cmdCommit(c) + }, + }, + { + Name: "rollback", + Usage: "Roll back the application state to the latest commit", + Action: func(c *cli.Context) { + cmdRollback(c) + }, + }, + } + app.Run(os.Args) + +} + +//-------------------------------------------------------------------------------- + +// Append a new tx to application +func cmdAppendTx(c *cli.Context) { + args := c.Args() // Args to AppendTx + conn, err := Connect(c.GlobalString("address")) + if err != nil { + Exit(err.Error()) + } + res, err := write(conn, types.RequestAppendTx{[]byte(args[0])}) + if err != nil { + Exit(err.Error()) + } + fmt.Println("Sent tx:", args[0], "response:", res) +} + +// Get application Merkle root hash +func cmdGetHash(c *cli.Context) { + conn, err := Connect(c.GlobalString("address")) + if err != nil { + Exit(err.Error()) + } + res, err := write(conn, types.RequestGetHash{}) + if err != nil { + Exit(err.Error()) + } + fmt.Println("Got hash:", Fmt("%X", res.(types.ResponseGetHash).Hash)) +} + +// Commit the application state +func cmdCommit(c *cli.Context) { + conn, err := Connect(c.GlobalString("address")) + if err != nil { + Exit(err.Error()) + } + _, err = write(conn, types.RequestCommit{}) + if err != nil { + Exit(err.Error()) + } + fmt.Println("Committed.") +} + +// Roll back the application state to the latest commit +func cmdRollback(c *cli.Context) { + conn, err := Connect(c.GlobalString("address")) + if err != nil { + Exit(err.Error()) + } + _, err = write(conn, types.RequestRollback{}) + if err != nil { + Exit(err.Error()) + } + fmt.Println("Rolled back.") +} + +//-------------------------------------------------------------------------------- + +func write(conn net.Conn, req types.Request) (types.Response, error) { + var n int64 + var err error + wire.WriteBinary(req, conn, &n, &err) + if err != nil { + return nil, err + } + var res types.Response + wire.ReadBinaryPtr(&res, conn, &n, &err) + return res, err +} diff --git a/example/main.go b/example/main.go new file mode 100644 index 00000000..bf6f06d9 --- /dev/null +++ b/example/main.go @@ -0,0 +1,84 @@ +package main + +import ( + . "github.com/tendermint/go-common" + "github.com/tendermint/go-merkle" + "github.com/tendermint/go-wire" + "github.com/tendermint/tmsp/server" + "github.com/tendermint/tmsp/types" +) + +func main() { + + // Start the listener + _, err := server.StartListener("tcp://127.0.0.1:8080", &DummyApplication{}) + if err != nil { + Exit(err.Error()) + } + + // Wait forever + TrapSignal(func() { + // Cleanup + }) + +} + +//-------------------------------------------------------------------------------- + +type DummyApplication struct { + state merkle.Tree + lastCommitState merkle.Tree +} + +func NewDummyApplication() *DummyApplication { + state := merkle.NewIAVLTree( + wire.BasicCodec, + wire.BasicCodec, + 0, + nil, + ) + return &DummyApplication{ + state: state, + lastCommitState: state, + } +} + +func (dapp *DummyApplication) Echo(message string) (types.RetCode, string) { + return 0, message +} + +func (dapp *DummyApplication) AppendTx(tx []byte) types.RetCode { + dapp.state.Set(tx, tx) + return 0 +} + +func (dapp *DummyApplication) GetHash() ([]byte, types.RetCode) { + hash := dapp.state.Hash() + return hash, 0 +} + +func (dapp *DummyApplication) Commit() types.RetCode { + dapp.lastCommitState = dapp.state.Copy() + return 0 +} + +func (dapp *DummyApplication) Rollback() types.RetCode { + dapp.state = dapp.lastCommitState.Copy() + return 0 +} + +func (dapp *DummyApplication) SetEventsMode(mode types.EventsMode) types.RetCode { + return 0 +} + +func (dapp *DummyApplication) AddListener(key string) types.RetCode { + return 0 +} + +func (dapp *DummyApplication) RemListener(key string) types.RetCode { + return 0 +} + +func (dapp *DummyApplication) GetEvents() []types.Event { + return nil +} diff --git a/example/main_test.go b/example/main_test.go new file mode 100644 index 00000000..0a5b53d6 --- /dev/null +++ b/example/main_test.go @@ -0,0 +1,52 @@ +package main + +import ( + // "fmt" + "testing" + + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" + "github.com/tendermint/tmsp/server" + "github.com/tendermint/tmsp/types" +) + +func TestStream(t *testing.T) { + + // Start the listener + _, err := server.StartListener("tcp://127.0.0.1:8080", NewDummyApplication()) + if err != nil { + Exit(err.Error()) + } + + // Connect to the socket + conn, err := Connect("tcp://127.0.0.1:8080") + if err != nil { + Exit(err.Error()) + } + + // Read response data + go func() { + for { + var n int64 + var err error + var res types.Response + wire.ReadBinaryPtr(&res, conn, &n, &err) + if err != nil { + Exit(err.Error()) + } + // fmt.Println("Read", n) + } + }() + + // Write requests + for { + var n int64 + var err error + var req types.Request = types.RequestAppendTx{TxBytes: []byte("test")} + wire.WriteBinary(req, conn, &n, &err) + if err != nil { + Exit(err.Error()) + } + // fmt.Println("Wrote", n) + } +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 00000000..2b22300e --- /dev/null +++ b/server/server.go @@ -0,0 +1,136 @@ +package server + +import ( + "fmt" + "net" + "reflect" + "strings" + + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" + "github.com/tendermint/tmsp/types" +) + +func StartListener(protoAddr string, app types.Application) (net.Listener, error) { + parts := strings.SplitN(protoAddr, "://", 2) + proto, addr := parts[0], parts[1] + ln, err := net.Listen(proto, addr) + if err != nil { + return nil, err + } + + // A goroutine to accept a connection. + go func() { + + for { + // Accept a connection + conn, err := ln.Accept() + if err != nil { + Exit("Failed to accept connection") + } else { + fmt.Println("Accepted a new connection") + } + connClosed := make(chan struct{}, 2) // Push to signal connection closed + responses := make(chan types.Response, 1000) // A channel to buffer responses + + // Read requests from conn and deal with them + go handleRequests(app, connClosed, conn, responses) + // Pull responses from 'responses' and write them to conn. + go handleResponses(connClosed, responses, conn) + + // Wait until connection is closed + <-connClosed + fmt.Println("Connection was closed. Waiting for new connection...") + } + + }() + + return ln, nil +} + +// Read requests from conn and deal with them +func handleRequests(app types.Application, connClosed chan struct{}, conn net.Conn, responses chan<- types.Response) { + var count int + for { + var n int64 + var err error + var req types.Request + wire.ReadBinaryPtr(&req, conn, &n, &err) + + if err != nil { + fmt.Println(err.Error()) + connClosed <- struct{}{} + return + } + + count++ + if count%1000 == 0 { + fmt.Println("Received request", reflect.TypeOf(req), req, n, err, count) + } + + handleRequest(app, req, responses) + } +} + +func handleRequest(app types.Application, req types.Request, responses chan<- types.Response) { + switch req := req.(type) { + case types.RequestEcho: + retCode, msg := app.Echo(req.Message) + responses <- types.ResponseEcho{retCode, msg} + case types.RequestAppendTx: + retCode := app.AppendTx(req.TxBytes) + responses <- types.ResponseAppendTx{retCode} + events := app.GetEvents() + for _, event := range events { + responses <- types.ResponseEvent{event} + } + case types.RequestGetHash: + hash, retCode := app.GetHash() + responses <- types.ResponseGetHash{retCode, hash} + case types.RequestCommit: + retCode := app.Commit() + responses <- types.ResponseCommit{retCode} + case types.RequestRollback: + retCode := app.Rollback() + responses <- types.ResponseRollback{retCode} + case types.RequestSetEventsMode: + retCode := app.SetEventsMode(req.EventsMode) + responses <- types.ResponseSetEventsMode{retCode} + if req.EventsMode == types.EventsModeOn { + events := app.GetEvents() + for _, event := range events { + responses <- types.ResponseEvent{event} + } + } + case types.RequestAddListener: + retCode := app.AddListener(req.EventKey) + responses <- types.ResponseAddListener{retCode} + case types.RequestRemListener: + retCode := app.RemListener(req.EventKey) + responses <- types.ResponseRemListener{retCode} + default: + responses <- types.ResponseException{"Unknown request"} + } +} + +// Pull responses from 'responses' and write them to conn. +func handleResponses(connClosed chan struct{}, responses <-chan types.Response, conn net.Conn) { + var count int + for { + var res = <-responses + var n int64 + var err error + wire.WriteBinary(res, conn, &n, &err) + + if err != nil { + fmt.Println(err.Error()) + connClosed <- struct{}{} + return + } + + count++ + if count%1000 == 0 { + fmt.Println("Sent response", reflect.TypeOf(res), res, n, err, count) + } + } +} diff --git a/types/application.go b/types/application.go new file mode 100644 index 00000000..2596f445 --- /dev/null +++ b/types/application.go @@ -0,0 +1,31 @@ +package types + +type Application interface { + + // Echo a message + Echo(message string) (RetCode, string) + + // Append a tx, which may or may not get committed + AppendTx(tx []byte) RetCode + + // Return the application Merkle root hash + GetHash() ([]byte, RetCode) + + // Set commit checkpoint + Commit() RetCode + + // Rollback to the latest commit + Rollback() RetCode + + // Set events reporting mode + SetEventsMode(mode EventsMode) RetCode + + // Add event listener + AddListener(key string) RetCode + + // Remove event listener + RemListener(key string) RetCode + + // Get all events + GetEvents() []Event +} diff --git a/types/events.go b/types/events.go new file mode 100644 index 00000000..8a7fa237 --- /dev/null +++ b/types/events.go @@ -0,0 +1,14 @@ +package types + +type EventsMode int8 + +const ( + EventsModeOff = EventsMode(0) + EventsModeCached = EventsMode(1) + EventsModeOn = EventsMode(2) +) + +type Event struct { + Key string + TxBytes []byte +} diff --git a/types/messages.go b/types/messages.go new file mode 100644 index 00000000..da4731ae --- /dev/null +++ b/types/messages.go @@ -0,0 +1,155 @@ +package types + +import "github.com/tendermint/go-wire" + +const ( + requestTypeEcho = byte(0x01) + requestTypeAppendTx = byte(0x02) + requestTypeGetHash = byte(0x03) + requestTypeCommit = byte(0x04) + requestTypeRollback = byte(0x05) + requestTypeSetEventsMode = byte(0x06) + requestTypeAddListener = byte(0x07) + requestTypeRemListener = byte(0x08) + + responseTypeEcho = byte(0x11) + responseTypeAppendTx = byte(0x12) + responseTypeGetHash = byte(0x13) + responseTypeCommit = byte(0x14) + responseTypeRollback = byte(0x15) + responseTypeSetEventsMode = byte(0x16) + responseTypeAddListener = byte(0x17) + responseTypeRemListener = byte(0x18) + + responseTypeException = byte(0x20) + responseTypeEvent = byte(0x21) +) + +//---------------------------------------- + +type RequestEcho struct { + Message string +} + +type RequestAppendTx struct { + TxBytes []byte +} + +type RequestGetHash struct { +} + +type RequestCommit struct { +} + +type RequestRollback struct { +} + +type RequestSetEventsMode struct { + EventsMode +} + +type RequestAddListener struct { + EventKey string +} + +type RequestRemListener struct { + EventKey string +} + +type Request interface { + AssertRequestType() +} + +func (_ RequestEcho) AssertRequestType() {} +func (_ RequestAppendTx) AssertRequestType() {} +func (_ RequestGetHash) AssertRequestType() {} +func (_ RequestCommit) AssertRequestType() {} +func (_ RequestRollback) AssertRequestType() {} +func (_ RequestSetEventsMode) AssertRequestType() {} +func (_ RequestAddListener) AssertRequestType() {} +func (_ RequestRemListener) AssertRequestType() {} + +var _ = wire.RegisterInterface( + struct{ Request }{}, + wire.ConcreteType{RequestEcho{}, requestTypeEcho}, + wire.ConcreteType{RequestAppendTx{}, requestTypeAppendTx}, + wire.ConcreteType{RequestGetHash{}, requestTypeGetHash}, + wire.ConcreteType{RequestCommit{}, requestTypeCommit}, + wire.ConcreteType{RequestRollback{}, requestTypeRollback}, + wire.ConcreteType{RequestSetEventsMode{}, requestTypeSetEventsMode}, + wire.ConcreteType{RequestAddListener{}, requestTypeAddListener}, + wire.ConcreteType{RequestRemListener{}, requestTypeRemListener}, +) + +//---------------------------------------- + +type ResponseEcho struct { + RetCode + Message string +} + +type ResponseAppendTx struct { + RetCode +} + +type ResponseGetHash struct { + RetCode + Hash []byte +} + +type ResponseCommit struct { + RetCode +} + +type ResponseRollback struct { + RetCode +} + +type ResponseSetEventsMode struct { + RetCode +} + +type ResponseAddListener struct { + RetCode +} + +type ResponseRemListener struct { + RetCode +} + +type ResponseException struct { + Error string +} + +type ResponseEvent struct { + Event +} + +type Response interface { + AssertResponseType() +} + +func (_ ResponseEcho) AssertResponseType() {} +func (_ ResponseAppendTx) AssertResponseType() {} +func (_ ResponseGetHash) AssertResponseType() {} +func (_ ResponseCommit) AssertResponseType() {} +func (_ ResponseRollback) AssertResponseType() {} +func (_ ResponseSetEventsMode) AssertResponseType() {} +func (_ ResponseAddListener) AssertResponseType() {} +func (_ ResponseRemListener) AssertResponseType() {} +func (_ ResponseException) AssertResponseType() {} +func (_ ResponseEvent) AssertResponseType() {} + +var _ = wire.RegisterInterface( + struct{ Response }{}, + wire.ConcreteType{ResponseEcho{}, responseTypeEcho}, + wire.ConcreteType{ResponseAppendTx{}, responseTypeAppendTx}, + wire.ConcreteType{ResponseGetHash{}, responseTypeGetHash}, + wire.ConcreteType{ResponseCommit{}, responseTypeCommit}, + wire.ConcreteType{ResponseRollback{}, responseTypeRollback}, + wire.ConcreteType{ResponseSetEventsMode{}, responseTypeSetEventsMode}, + wire.ConcreteType{ResponseAddListener{}, responseTypeAddListener}, + wire.ConcreteType{ResponseRemListener{}, responseTypeRemListener}, + wire.ConcreteType{ResponseException{}, responseTypeException}, + wire.ConcreteType{ResponseEvent{}, responseTypeEvent}, +) diff --git a/types/retcode.go b/types/retcode.go new file mode 100644 index 00000000..bccd9a4c --- /dev/null +++ b/types/retcode.go @@ -0,0 +1,12 @@ +package types + +type RetCode int + +// Reserved return codes +const ( + RetCodeOK = RetCode(0) + RetCodeInternalError = RetCode(1) + RetCodeUnauthorized = RetCode(2) + RetCodeInsufficientFees = RetCode(3) + RetCodeUnknownRequest = RetCode(4) +)