diff --git a/README.md b/README.md index 2991b534..284d2af9 100644 --- a/README.md +++ b/README.md @@ -12,9 +12,9 @@ For more information on TMSP, motivations, and tutorials, please visit [our blog * __Arguments__: * `TxBytes ([]byte)` * __Returns__: - * `RetCode (int8)` - * `RetData ([]KVPair)` - * `Error (string)` + * `Code (int8)` + * `Result ([]byte)` + * `Log (string)` * __Usage__:
Append and run a transaction. The transaction may or may not be final. @@ -22,15 +22,16 @@ For more information on TMSP, motivations, and tutorials, please visit [our blog * __Arguments__: * `TxBytes ([]byte)` * __Returns__: - * `RetCode (int8)` - * `RetData ([]KVPair)` - * `Error (string)` + * `Code (int8)` + * `Result ([]byte)` + * `Log (string)` * __Usage__:
Validate a transaction. This message should not mutate the state. #### GetHash * __Returns__: * `Hash ([]byte)` + * `Log (string)` * __Usage__:
Return a Merkle root hash of the application state @@ -49,7 +50,7 @@ For more information on TMSP, motivations, and tutorials, please visit [our blog * `Key (string)` * `Value (string)` * __Returns__: - * `Error (string)` + * `Log (string)` * __Usage__:
Set application options. E.g. Key="mode", Value="mempool" for a mempool connection, or Key="mode", Value="consensus" for a consensus connection. Other options are application specific. @@ -59,9 +60,10 @@ For more information on TMSP, motivations, and tutorials, please visit [our blog ### Jan 23th, 2016 * Added CheckTx/Query TMSP message types -* Added RetData/Error fields to AppendTx/CheckTx/SetOption +* Added Result/Log fields to AppendTx/CheckTx/SetOption * Removed Listener messages -* Removed RetCode from ResponseSetOption and ResponseGetHash +* Removed Code from ResponseSetOption and ResponseGetHash +* Made examples BigEndian ### Jan 12th, 2016 diff --git a/client/golang/client.go b/client/client.go similarity index 87% rename from client/golang/client.go rename to client/client.go index 270bb95e..d25348d1 100644 --- a/client/golang/client.go +++ b/client/client.go @@ -134,6 +134,7 @@ func (cli *TMSPClient) recvResponseRoutine() { } switch res := res.(type) { case tmsp.ResponseException: + // XXX After setting cli.err, release waiters (e.g. reqres.Done()) cli.StopForError(errors.New(res.Error)) default: // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) @@ -155,12 +156,6 @@ func (cli *TMSPClient) didRecvResponse(res tmsp.Response) error { cli.mtx.Lock() defer cli.mtx.Unlock() - // Special logic for events which have no corresponding requests. - if _, ok := res.(tmsp.ResponseEvent); ok && cli.resCb != nil { - cli.resCb(nil, res) - return nil - } - // Get the first reqRes next := cli.reqSent.Front() if next == nil { @@ -210,27 +205,19 @@ func (cli *TMSPClient) GetHashAsync() { cli.queueRequest(tmsp.RequestGetHash{}) } -func (cli *TMSPClient) AddListenerAsync(key string) { - cli.queueRequest(tmsp.RequestAddListener{key}) -} - -func (cli *TMSPClient) RemListenerAsync(key string) { - cli.queueRequest(tmsp.RequestRemListener{key}) -} - func (cli *TMSPClient) QueryAsync(query []byte) { cli.queueRequest(tmsp.RequestQuery{query}) } //---------------------------------------- -func (cli *TMSPClient) InfoSync() (info []string, err error) { +func (cli *TMSPClient) InfoSync() (info string, err error) { reqres := cli.queueRequest(tmsp.RequestInfo{}) cli.FlushSync() if cli.err != nil { - return nil, cli.err + return "", cli.err } - return reqres.Response.(tmsp.ResponseInfo).Data, nil + return reqres.Response.(tmsp.ResponseInfo).Info, nil } func (cli *TMSPClient) FlushSync() error { @@ -238,34 +225,44 @@ func (cli *TMSPClient) FlushSync() error { return cli.err } -func (cli *TMSPClient) AppendTxSync(tx []byte) error { +func (cli *TMSPClient) AppendTxSync(tx []byte) (code tmsp.RetCode, result []byte, log string, err error) { reqres := cli.queueRequest(tmsp.RequestAppendTx{tx}) cli.FlushSync() if cli.err != nil { - return cli.err + return tmsp.RetCodeInternalError, nil, "", cli.err } res := reqres.Response.(tmsp.ResponseAppendTx) - return res.RetCode.Error() + return res.Code, res.Result, res.Log, nil } -func (cli *TMSPClient) GetHashSync() (hash []byte, err error) { +func (cli *TMSPClient) CheckTxSync(tx []byte) (code tmsp.RetCode, result []byte, log string, err error) { + reqres := cli.queueRequest(tmsp.RequestCheckTx{tx}) + cli.FlushSync() + if cli.err != nil { + return tmsp.RetCodeInternalError, nil, "", cli.err + } + res := reqres.Response.(tmsp.ResponseCheckTx) + return res.Code, res.Result, res.Log, nil +} + +func (cli *TMSPClient) GetHashSync() (hash []byte, log string, err error) { reqres := cli.queueRequest(tmsp.RequestGetHash{}) cli.FlushSync() if cli.err != nil { - return nil, cli.err + return nil, "", cli.err } res := reqres.Response.(tmsp.ResponseGetHash) - return res.Hash, res.RetCode.Error() + return res.Hash, res.Log, nil } -func (cli *TMSPClient) QuerySync(query []byte) (result []byte, err error) { +func (cli *TMSPClient) QuerySync(query []byte) (result []byte, log string, err error) { reqres := cli.queueRequest(tmsp.RequestQuery{query}) cli.FlushSync() if cli.err != nil { - return nil, cli.err + return nil, "", cli.err } res := reqres.Response.(tmsp.ResponseQuery) - return res.Result, res.RetCode.Error() + return res.Result, res.Log, nil } //---------------------------------------- @@ -304,10 +301,6 @@ func resMatchesReq(req tmsp.Request, res tmsp.Response) (ok bool) { _, ok = res.(tmsp.ResponseCheckTx) case tmsp.RequestGetHash: _, ok = res.(tmsp.ResponseGetHash) - case tmsp.RequestAddListener: - _, ok = res.(tmsp.ResponseAddListener) - case tmsp.RequestRemListener: - _, ok = res.(tmsp.ResponseRemListener) case tmsp.RequestQuery: _, ok = res.(tmsp.ResponseQuery) default: diff --git a/example/golang/counter.go b/example/golang/counter.go index 434aa57b..962c0760 100644 --- a/example/golang/counter.go +++ b/example/golang/counter.go @@ -2,6 +2,7 @@ package example import ( "encoding/binary" + "fmt" . "github.com/tendermint/go-common" "github.com/tendermint/tmsp/types" @@ -17,66 +18,54 @@ func NewCounterApplication(serial bool) *CounterApplication { return &CounterApplication{serial: serial} } -func (app *CounterApplication) Echo(message string) string { - return message +func (app *CounterApplication) Info() string { + return Fmt("hashes:%v, txs:%v", app.hashCount, app.txCount) } -func (app *CounterApplication) Info() []string { - return []string{Fmt("hashes:%v, txs:%v", app.hashCount, app.txCount)} -} - -func (app *CounterApplication) SetOption(key string, value string) types.RetCode { +func (app *CounterApplication) SetOption(key string, value string) (log string) { if key == "serial" && value == "on" { app.serial = true } - return types.RetCodeOK + return "" } -func (app *CounterApplication) AppendTx(tx []byte) ([]types.Event, types.RetCode) { +func (app *CounterApplication) AppendTx(tx []byte) (code types.RetCode, result []byte, log string) { if app.serial { tx8 := make([]byte, 8) - copy(tx8, tx) - txValue := binary.LittleEndian.Uint64(tx8) + copy(tx8[len(tx8)-len(tx):], tx) + txValue := binary.BigEndian.Uint64(tx8) if txValue != uint64(app.txCount) { - return nil, types.RetCodeBadNonce + return types.RetCodeBadNonce, nil, fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.txCount, txValue) } } app.txCount += 1 - return nil, types.RetCodeOK + return types.RetCodeOK, nil, "" } -func (app *CounterApplication) CheckTx(tx []byte) types.RetCode { +func (app *CounterApplication) CheckTx(tx []byte) (code types.RetCode, result []byte, log string) { if app.serial { tx8 := make([]byte, 8) - copy(tx8, tx) - txValue := binary.LittleEndian.Uint64(tx8) + copy(tx8[len(tx8)-len(tx):], tx) + txValue := binary.BigEndian.Uint64(tx8) if txValue < uint64(app.txCount) { - return types.RetCodeBadNonce + return types.RetCodeBadNonce, nil, fmt.Sprintf("Invalid nonce. Expected >= %v, got %v", app.txCount, txValue) } } - return types.RetCodeOK + return types.RetCodeOK, nil, "" } -func (app *CounterApplication) GetHash() ([]byte, types.RetCode) { +func (app *CounterApplication) GetHash() (hash []byte, log string) { app.hashCount += 1 if app.txCount == 0 { - return nil, types.RetCodeOK + return nil, "" } else { - hash := make([]byte, 32) - binary.LittleEndian.PutUint64(hash, uint64(app.txCount)) - return hash, types.RetCodeOK + hash := make([]byte, 8) + binary.BigEndian.PutUint64(hash, uint64(app.txCount)) + return hash, "" } } -func (app *CounterApplication) AddListener(key string) types.RetCode { - return types.RetCodeOK -} - -func (app *CounterApplication) RemListener(key string) types.RetCode { - return types.RetCodeOK -} - -func (app *CounterApplication) Query(query []byte) ([]byte, types.RetCode) { - return nil, types.RetCodeOK +func (app *CounterApplication) Query(query []byte) (result []byte, log string) { + return nil, fmt.Sprintf("Query is not supported") } diff --git a/example/golang/dummy.go b/example/golang/dummy.go index 899bb495..9835eeb8 100644 --- a/example/golang/dummy.go +++ b/example/golang/dummy.go @@ -18,40 +18,28 @@ func NewDummyApplication() *DummyApplication { return &DummyApplication{state: state} } -func (app *DummyApplication) Echo(message string) string { - return message +func (app *DummyApplication) Info() string { + return Fmt("size:%v", app.state.Size()) } -func (app *DummyApplication) Info() []string { - return []string{Fmt("size:%v", app.state.Size())} +func (app *DummyApplication) SetOption(key string, value string) (log string) { + return "" } -func (app *DummyApplication) SetOption(key string, value string) types.RetCode { - return types.RetCodeOK -} - -func (app *DummyApplication) AppendTx(tx []byte) ([]types.Event, types.RetCode) { +func (app *DummyApplication) AppendTx(tx []byte) (code types.RetCode, result []byte, log string) { app.state.Set(tx, tx) - return nil, types.RetCodeOK + return types.RetCodeOK, nil, "" } -func (app *DummyApplication) CheckTx(tx []byte) types.RetCode { - return types.RetCodeOK // all txs are valid +func (app *DummyApplication) CheckTx(tx []byte) (code types.RetCode, result []byte, log string) { + return types.RetCodeOK, nil, "" } -func (app *DummyApplication) GetHash() ([]byte, types.RetCode) { - hash := app.state.Hash() - return hash, types.RetCodeOK +func (app *DummyApplication) GetHash() (hash []byte, log string) { + hash = app.state.Hash() + return hash, "" } -func (app *DummyApplication) AddListener(key string) types.RetCode { - return types.RetCodeOK -} - -func (app *DummyApplication) RemListener(key string) types.RetCode { - return types.RetCodeOK -} - -func (app *DummyApplication) Query(query []byte) ([]byte, types.RetCode) { - return nil, types.RetCodeOK +func (app *DummyApplication) Query(query []byte) (result []byte, log string) { + return nil, "Query not supported" } diff --git a/example/golang/dummy_test.go b/example/golang/dummy_test.go index d0d16ac9..ccf34f3f 100644 --- a/example/golang/dummy_test.go +++ b/example/golang/dummy_test.go @@ -44,8 +44,8 @@ func TestStream(t *testing.T) { switch res := res.(type) { case types.ResponseAppendTx: counter += 1 - if res.RetCode != types.RetCodeOK { - t.Error("AppendTx failed with ret_code", res.RetCode) + if res.Code != types.RetCodeOK { + t.Error("AppendTx failed with ret_code", res.Code) } if counter > numAppendTxs { t.Fatal("Too many AppendTx responses") diff --git a/example/js/.gitignore b/example/js/.gitignore new file mode 100644 index 00000000..3c3629e6 --- /dev/null +++ b/example/js/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/example/js/app.js b/example/js/app.js index bbdf0490..78804e22 100644 --- a/example/js/app.js +++ b/example/js/app.js @@ -1,6 +1,9 @@ -server = require("./server") -wire = require("./wire") -util = require("util") +var server = require("./server"); +var wire = require("js-wire"); +var util = require("util"); +var msg = require("./msgs"); +var types = require("./types"); + function CounterApp(){ this.hashCount = 0; @@ -8,75 +11,65 @@ function CounterApp(){ this.serial = false; }; -CounterApp.prototype.echo = function(msg){ - return {"response": msg, "ret_code":0} +CounterApp.prototype.info = function(cb) { + return cb(util.format("hashes:%d, txs:%d", this.hashCount, this.txCount)); } -CounterApp.prototype.info = function(){ - return {"response": [util.format("hashes:%d, txs:%d", this.hashCount, this.txCount)]} -} - -CounterApp.prototype.set_option = function(key, value){ - if (key == "serial" && value == "on"){ +CounterApp.prototype.set_option = function(cb, key, value) { + if (key == "serial" && value == "on") { this.serial = true; } - return {"ret_code":0} + return cb(""); } -CounterApp.prototype.append_tx = function(txBytes){ +CounterApp.prototype.append_tx = function(cb, txBytes) { if (this.serial) { - txByteArray = new Buffer(txBytes) if (txBytes.length >= 2 && txBytes.slice(0, 2) == "0x") { - txByteArray = wire.hex2bytes(txBytes.slice(2)); + var hexString = txBytes.toString("ascii", 2); + var hexBytes = new Buffer(hexString, "hex"); + txBytes = hexBytes; } - r = new msg.buffer(txByteArray) - txValue = wire.decode_big_endian(r, txBytes.length) + var txValue = txBytes.readIntBE(0, txBytes.length); if (txValue != this.txCount){ - return {"ret_code":6} + return cb(types.RetCodeInvalidNonce, "", "Nonce is invalid"); } } this.txCount += 1; - return {"ret_code":0} // TODO: return events + return cb(types.RetCodeOK, "", ""); } -CounterApp.prototype.check_tx = function(txBytes){ +CounterApp.prototype.check_tx = function(cb, txBytes) { if (this.serial) { - txByteArray = new Buffer(txBytes) if (txBytes.length >= 2 && txBytes.slice(0, 2) == "0x") { - txByteArray = wire.hex2bytes(txBytes.slice(2)); + var hexString = txBytes.toString("ascii", 2); + var hexBytes = new Buffer(hexString, "hex"); + txBytes = hexBytes; } - r = new msg.buffer(txByteArray) - txValue = wire.decode_big_endian(r, txBytes.length) + var txValue = txBytes.readIntBE(0, txBytes.length); if (txValue < this.txCount){ - return {"ret_code":6} + return cb(types.RetCodeInvalidNonce, "", "Nonce is too low"); } } - return {"ret_code":0} + this.txCount += 1; + return cb(types.RetCodeOK, "", ""); } -CounterApp.prototype.get_hash = function(){ +CounterApp.prototype.get_hash = function(cb) { this.hashCount += 1; if (this.txCount == 0){ - return {"response": "", "ret_code":0} + return cb("", "Zero tx count; hash is empth"); } - h = wire.encode_big_endian(this.txCount, 8); - h = wire.reverse(h); // TODO - return {"response": h.toString(), "ret_code":0} + var buf = new Buffer(8); + buf.writeIntBE(this.txCount, 0, 8); + cb(buf, ""); } -CounterApp.prototype.add_listener = function(){ - return {"ret_code":0} +CounterApp.prototype.query = function(cb) { + return cb("", "Query not yet supporrted"); } -CounterApp.prototype.rm_listener = function(){ - return {"ret_code":0} -} - -CounterApp.prototype.event = function(){ -} - -console.log("Counter app in Javascript") +console.log("Counter app in Javascript"); var app = new CounterApp(); var appServer = new server.AppServer(app); -appServer.server.listen(46658) +appServer.server.listen(46658); diff --git a/example/js/msgs.js b/example/js/msgs.js index 7993ca85..da3070ed 100644 --- a/example/js/msgs.js +++ b/example/js/msgs.js @@ -1,59 +1,88 @@ -wire = require("./wire") +var wire = require("js-wire"); +var types = require("./types"); + +var readRequestInfo = function(r) { return []; }; +var readRequestSetOption = function(r) { return [r.readString(), r.readString()]; }; +var readRequestAppendTx = function(r) { return [r.readByteArray()]; }; +var readRequestCheckTx = function(r) { return [r.readByteArray()]; }; +var readRequestGetHash = function(r) { return []; }; +var readRequestQuery = function(r) { return [r.readByteArray()]; }; + +var runOnce = function(name, f) { + var ran = false; + return function() { + if (ran) { + console.log("Error: response was already written for "+name); + return + } else { + ran = true; + } + return f.apply(this, arguments); + }; +}; + +var makeWriteResponseInfo = function(w, cb) { return runOnce("info", function(info) { + w.writeUint8(types.ResponseTypeInfo); + w.writeString(info); + cb(w); +});}; +var makeWriteResponseSetOption = function(w, cb) { return runOnce("set_option", function(log) { + w.writeUint8(types.ResponseTypeSetOption); + w.writeString(log); + cb(w); +});}; +var makeWriteResponseAppendTx = function(w, cb) { return runOnce("append_tx", function(code, result, log) { + w.writeUint8(types.ResponseTypeAppendTx); + w.writeUint8(code); + w.writeByteArray(result); + w.writeString(log); + cb(w); +});}; +var makeWriteResponseCheckTx = function(w, cb) { return runOnce("check_tx", function(code, result, log) { + w.writeUint8(types.ResponseTypeCheckTx); + w.writeUint8(code); + w.writeByteArray(result); + w.writeString(log); + cb(w); +});}; +var makeWriteResponseGetHash = function(w, cb) { return runOnce("get_hash", function(hash, log) { + w.writeUint8(types.ResponseTypeGetHash); + w.writeByteArray(hash); + w.writeString(log); + cb(w); +});}; +var makeWriteResponseQuery = function(w, cb) { return runOnce("query", function(result, log) { + w.writeUint8(types.ResponseTypeQuery); + w.writeByteArray(result); + w.writeString(log); + cb(w); +});}; module.exports = { - types : { - 0x01 : "echo", - 0x02 : "flush", - 0x03 : "info", - 0x04 : "set_option", - 0x21 : "append_tx", - 0x22 : "check_tx", - 0x23 : "get_hash", - 0x24 : "add_listener", - 0x25 : "rm_listener", - }, - decoder : RequestDecoder, - buffer: BytesBuffer -} - -function RequestDecoder(buf){ - this.buf= buf -} - -var decode_string = wire.decode_string - -// return nothing, one thing, or a list of things -RequestDecoder.prototype.echo = function(){ return decode_string(this.buf) }; -RequestDecoder.prototype.flush = function(){}; -RequestDecoder.prototype.info = function(){}; -RequestDecoder.prototype.set_option = function(){ return [decode_string(this.buf), decode_string(this.buf)] }; -RequestDecoder.prototype.append_tx = function(){ return decode_string(this.buf)}; -RequestDecoder.prototype.check_tx = function(){ return decode_string(this.buf)}; -RequestDecoder.prototype.get_hash = function(){ }; -RequestDecoder.prototype.add_listener = function(){ }; // TODO -RequestDecoder.prototype.rm_listener = function(){ }; // TODO - -// buffered reader with read(n) method -function BytesBuffer(buf){ - this.buf = buf -} - -BytesBuffer.prototype.read = function(n){ - b = this.buf.slice(0, n) - this.buf = this.buf.slice(n) - return b + types : { + 0x01 : "echo", + 0x02 : "flush", + 0x03 : "info", + 0x04 : "set_option", + 0x21 : "append_tx", + 0x22 : "check_tx", + 0x23 : "get_hash", + 0x24 : "query", + }, + readers : { + "info": readRequestInfo, + "set_option": readRequestSetOption, + "append_tx": readRequestAppendTx, + "check_tx": readRequestCheckTx, + "get_hash": readRequestGetHash, + "query": readRequestQuery, + }, + writerGenerators: { + "info": makeWriteResponseInfo, + "set_option": makeWriteResponseSetOption, + "append_tx": makeWriteResponseAppendTx, + "check_tx": makeWriteResponseCheckTx, + "get_hash": makeWriteResponseGetHash, + "query": makeWriteResponseQuery, + }, }; - -BytesBuffer.prototype.write = function(buf){ - this.buf = Buffer.concat([this.buf, buf]); -}; - - -BytesBuffer.prototype.size = function(){ - return this.buf.length -} - -BytesBuffer.prototype.peek = function(){ - return this.buf[0] -} - diff --git a/example/js/package.json b/example/js/package.json new file mode 100644 index 00000000..ed9bff31 --- /dev/null +++ b/example/js/package.json @@ -0,0 +1,10 @@ +{ + "name": "example", + "version": "0.0.1", + "description": "Example javascript TMSP application", + "main": "index.js", + "dependencies": { + "js-wire": "0.0.2" + } +} + diff --git a/example/js/server.js b/example/js/server.js index ab6e59cf..5ba30ea3 100644 --- a/example/js/server.js +++ b/example/js/server.js @@ -1,126 +1,141 @@ +var net = require("net"); +var wire = require("js-wire"); +var msg = require("./msgs"); +var types = require("./types"); -// Load the TCP Library -net = require('net'); -msg = require('./msgs'); -wire = require("./wire") +var maxWriteBufferLength = 4096; // Any more and flush -// Takes an application and handles tmsp connection +// Takes an application and handles TMSP connection // which invoke methods on the app function AppServer(app){ - // set the app for the socket handler - this.app = app; + // set the app for the socket handler + this.app = app; - // create a server by providing callback for - // accepting new connection and callbacks for - // connection events ('data', 'end', etc.) - this.createServer() + // create a server by providing callback for + // accepting new connection and callbacks for + // connection events ('data', 'end', etc.) + this.createServer(); } +AppServer.prototype.createServer = function() { + var app = this.app; + + // Define the socket handler + this.server = net.createServer(function(socket) { + socket.name = socket.remoteAddress + ":" + socket.remotePort; + console.log("new connection from", socket.name); + + var conn = new Connection(socket, function(msgBytes, cb) { + var r = new wire.Reader(msgBytes); + + // Now we can decode + var typeByte = r.readByte(); + var reqType = msg.types[typeByte]; + + // Special messages. + // NOTE: msgs are length prefixed + if (reqType == "flush") { + var w = new wire.Writer(); + w.writeByte(types.ResponseTypeFlush); + conn.writeMessage(w.getBuffer()); + conn.flush(); + return cb(); + } else if (reqType == "echo") { + var message = r.readString(); + var w = new wire.Writer(); + w.writeByte(types.ResponseTypeEcho); + w.writeString(message); + conn.writeMessage(w.getBuffer()); + return cb(); + } + + // Make callback by wrapping cp + var resCb = msg.writerGenerators[reqType](new wire.Writer(), function(w) { + conn.writeMessage(w.getBuffer()); + return cb(); + }); + + // Decode arguments + var args = msg.readers[reqType](r); + args.unshift(resCb); + + // Call function + var res = app[reqType].apply(app, args); + if (res != undefined) { + console.log("Message handler shouldn't return anything!"); + } + + }); + }); +} + +//---------------------------------------- + +function Connection(socket, msgCb) { + this.socket = socket; + this.recvBuf = new Buffer(0); + this.sendBuf = new Buffer(0); + this.msgCb = msgCb; + this.waitingResult = false; + var conn = this; + + // Handle TMSP requests. + socket.on('data', function(data) { + conn.appendData(data); + }); + socket.on('end', function() { + console.log("connection ended"); + }); +} + +Connection.prototype.appendData = function(bytes) { + var conn = this; + if (bytes.length > 0) { + this.recvBuf = Buffer.concat([this.recvBuf, new Buffer(bytes)]); + } + if (this.waitingResult) { + return; + } + var r = new wire.Reader(this.recvBuf); + var msg; + try { + msg = r.readByteArray(); + } catch(e) { + return; + } + this.recvBuf = r.buf.slice(r.offset); + this.waitingResult = true; + this.socket.pause(); + //try { + this.msgCb(msg, function() { + // This gets called after msg handler is finished with response. + conn.waitingResult = false; + conn.socket.resume(); + if (conn.recvBuf.length > 0) { + conn.appendData(""); + } + }); + //} catch(e) { + // console.log("FATAL ERROR: ", e); + //} +}; + +Connection.prototype.writeMessage = function(msgBytes) { + var msgLength = wire.uvarintSize(msgBytes.length); + var buf = new Buffer(1+msgLength+msgBytes.length); + var w = new wire.Writer(buf); + w.writeByteArray(msgBytes); // TODO technically should be writeVarint + this.sendBuf = Buffer.concat([this.sendBuf, w.getBuffer()]); + if (this.sendBuf.length >= maxWriteBufferLength) { + this.flush(); + } +}; + +Connection.prototype.flush = function() { + var n = this.socket.write(this.sendBuf); + this.sendBuf = new Buffer(0); +} + +//---------------------------------------- + module.exports = { AppServer: AppServer }; - -AppServer.prototype.createServer = function(){ - app = this.app - conns = {} // map sockets to their state - - // define the socket handler - this.server = net.createServer(function(socket){ - socket.name = socket.remoteAddress + ":" + socket.remotePort - console.log("new connection from", socket.name) - - var conn = { - recBuf: new msg.buffer(new Buffer(0)), - resBuf: new msg.buffer(new Buffer(0)), - msgLength: 0, - inProgress: false - } - conns[socket] = conn - - // Handle tmsp requests. - socket.on('data', function (data) { - - if (data.length == 0){ - // TODO err - console.log("empty data!") - return - } - conn = conns[socket] - - // we received data. append it - conn.recBuf.write(data) - - while ( conn.recBuf.size() > 0 ){ - - if (conn.msgLength == 0){ - ll = conn.recBuf.peek(); - if (conn.recBuf.size() < 1 + ll){ - // don't have enough bytes to read length yet - return - } - conn.msgLength = wire.decode_varint(conn.recBuf) - } - - if (conn.recBuf.size() < conn.msgLength) { - // don't have enough to decode the message - return - } - - // now we can decode - typeByte = conn.recBuf.read(1); - resTypeByte = typeByte[0] + 0x10 - reqType = msg.types[typeByte[0]]; - - if (reqType == "flush"){ - // msgs are length prefixed - conn.resBuf.write(wire.encode(1)); - conn.resBuf.write(new Buffer([resTypeByte])) - n = socket.write(conn.resBuf.buf); - conn.msgLength = 0; - conn.resBuf = new msg.buffer(new Buffer(0)); - return - } - - // decode args - decoder = new msg.decoder(conn.recBuf); - args = decoder[reqType](); - - // done decoding - conn.msgLength = 0 - - var res = function(){ - if (args == null){ - return app[reqType](); - } else if (Array.isArray(args)){ - return app[reqType].apply(app, args); - } else { - return app[reqType](args) - } - }() - - - var retCode = res["ret_code"] - var res = res["response"] - - if (retCode != null && retCode != 0){ - console.log("non-zero ret code", retCode) - } - - - if (reqType == "echo" || reqType == "info"){ - enc = Buffer.concat([new Buffer([resTypeByte]), wire.encode(res)]); - // length prefixed - conn.resBuf.write(wire.encode(enc.length)); - conn.resBuf.write(enc); - } else { - enc = Buffer.concat([new Buffer([resTypeByte]), wire.encode(retCode), wire.encode(res)]); - conn.resBuf.write(wire.encode(enc.length)); - conn.resBuf.write(enc); - } - } - }); - - socket.on('end', function () { - console.log("connection ended") - }); - }) -} - diff --git a/example/js/types.js b/example/js/types.js new file mode 100644 index 00000000..d22d8f21 --- /dev/null +++ b/example/js/types.js @@ -0,0 +1,27 @@ +module.exports = { + RetCodeOK: 0, + RetCodeInternalError: 1, + RetCodeUnauthorized: 2, + RetCodeInsufficientFees: 3, + RetCodeUnknownRequest: 4, + RetCodeEncodingError: 5, + RetCodeNonce: 6, + + RequestTypeEcho: 0x01, + RequestTypeFlush: 0x02, + RequestTypeInfo: 0x03, + RequestTypeSetOption: 0x04, + RequestTypeAppendTx: 0x21, + RequestTypeCheckTx: 0x22, + RequestTypeGetHash: 0x23, + RequestTypeQuery: 0x24, + + ResponseTypeEcho: 0x11, + ResponseTypeFlush: 0x12, + ResponseTypeInfo: 0x13, + ResponseTypeSetOption: 0x14, + ResponseTypeAppendTx: 0x31, + ResponseTypeCheckTx: 0x32, + ResponseTypeGetHash: 0x33, + ResponseTypeQuery: 0x34, +}; diff --git a/example/js/wire.js b/example/js/wire.js deleted file mode 100644 index 42c30867..00000000 --- a/example/js/wire.js +++ /dev/null @@ -1,111 +0,0 @@ -module.exports = { - decode_string: decode_string, - decode_varint: decode_varint, - decode_big_endian: decode_big_endian, - encode_big_endian: encode_big_endian, - encode: encode, - reverse: reverse, -} - -function reverse(buf){ - for (var i = 0; i < buf.length/2; i++){ - a = buf[i]; - b = buf[buf.length-1 - i]; - buf[i] = b; - buf[buf.length-1 - i] = a; - } - return buf -} - -function uvarint_size(i){ - if (i == 0){ - return 0 - } - - for(var j = 1; j < 9; j++) { - if ( i < 1< 0xF0){ negate = true } - if (negate) { size = size - 0xF0 } - i = decode_big_endian(reader, size); - if (negate) { i = i * -1} - return i -} - -function encode_list(l){ - var l2 = l.map(encode); - var buf = new Buffer(encode_varint(l2.length)); - return Buffer.concat([buf, Buffer.concat(l2)]); -} - -function encode(b){ - if (b == null){ - return Buffer(0) - } else if (typeof b == "number"){ - return encode_varint(b) - } else if (typeof b == "string"){ - return encode_string(b) - } else if (Array.isArray(b)){ - return encode_list(b) - } else{ - console.log("UNSUPPORTED TYPE!", typeof b, b) - } -} - - - - - diff --git a/example/python/app.py b/example/python/app.py index 49831ef5..8eda9020 100644 --- a/example/python/app.py +++ b/example/python/app.py @@ -8,6 +8,7 @@ from tmsp.reader import BytesBuffer class CounterApplication(): def __init__(self): + sys.exit("The python example is out of date. Upgrading the Python examples is currently left as an exercise to you.") self.hashCount = 0 self.txCount = 0 self.serial = False diff --git a/example/python3/app.py b/example/python3/app.py index b3616327..01f3de0b 100644 --- a/example/python3/app.py +++ b/example/python3/app.py @@ -8,6 +8,7 @@ from tmsp.reader import BytesBuffer class CounterApplication(): def __init__(self): + sys.exit("The python example is out of date. Upgrading the Python examples is currently left as an exercise to you.") self.hashCount = 0 self.txCount = 0 self.serial = False diff --git a/server/server.go b/server/server.go index f6aa2297..b6d0e3f1 100644 --- a/server/server.go +++ b/server/server.go @@ -99,37 +99,27 @@ func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error func handleRequest(app types.Application, req types.Request, responses chan<- types.Response) { switch req := req.(type) { case types.RequestEcho: - msg := app.Echo(req.Message) - responses <- types.ResponseEcho{msg} + responses <- types.ResponseEcho{req.Message} case types.RequestFlush: responses <- types.ResponseFlush{} case types.RequestInfo: data := app.Info() responses <- types.ResponseInfo{data} case types.RequestSetOption: - retCode := app.SetOption(req.Key, req.Value) - responses <- types.ResponseSetOption{retCode} + logstr := app.SetOption(req.Key, req.Value) + responses <- types.ResponseSetOption{logstr} case types.RequestAppendTx: - events, retCode := app.AppendTx(req.TxBytes) - responses <- types.ResponseAppendTx{retCode} - for _, event := range events { - responses <- types.ResponseEvent{event} - } + code, result, logstr := app.AppendTx(req.TxBytes) + responses <- types.ResponseAppendTx{code, result, logstr} case types.RequestCheckTx: - retCode := app.CheckTx(req.TxBytes) - responses <- types.ResponseCheckTx{retCode} + code, result, logstr := app.CheckTx(req.TxBytes) + responses <- types.ResponseCheckTx{code, result, logstr} case types.RequestGetHash: - hash, retCode := app.GetHash() - responses <- types.ResponseGetHash{retCode, hash} - case types.RequestAddListener: - retCode := app.AddListener(req.EventKey) - responses <- types.ResponseAddListener{retCode} - case types.RequestRemListener: - retCode := app.RemListener(req.EventKey) - responses <- types.ResponseRemListener{retCode} + hash, logstr := app.GetHash() + responses <- types.ResponseGetHash{hash, logstr} case types.RequestQuery: - result, retCode := app.Query(req.QueryBytes) - responses <- types.ResponseQuery{retCode, result} + result, logstr := app.Query(req.QueryBytes) + responses <- types.ResponseQuery{result, logstr} default: responses <- types.ResponseException{"Unknown request"} } diff --git a/tests/test.sh b/tests/test.sh old mode 100644 new mode 100755 index 9ee9be13..c99f92dc --- a/tests/test.sh +++ b/tests/test.sh @@ -8,10 +8,11 @@ bash tests/test_dummy.sh # test golang counter bash tests/test_counter.sh +# test js counter +cd example/js +COUNTER_APP="node app.js" bash $ROOT/tests/test_counter.sh + # test python counter -cd example/python +cd ../python COUNTER_APP="python app.py" bash $ROOT/tests/test_counter.sh -# test js counter -cd ../js -COUNTER_APP="node app.js" bash $ROOT/tests/test_counter.sh diff --git a/tests/test_counter.sh b/tests/test_counter.sh old mode 100644 new mode 100755 index 99af772c..53edab3b --- a/tests/test_counter.sh +++ b/tests/test_counter.sh @@ -1,3 +1,12 @@ +#! /bin/bash +function finish { + echo "Cleaning up..." + ps -p $PID > /dev/null + if [[ "$?" == "0" ]]; then + kill -9 $PID + fi +} +trap finish EXIT # so we can test other languages if [[ "$COUNTER_APP" == "" ]]; then @@ -55,21 +64,15 @@ fi HASH1=`echo "$OUTPUT" | tail -n +3 | head -n 1` HASH2=`echo "$OUTPUT" | tail -n +5 | head -n 1` -if [[ "${HASH1:0:2}" != "01" ]]; then +if [[ "${HASH1: -2}" != "01" ]]; then echo "Expected hash to lead with 01. Got $HASH1" exit 1 fi -if [[ "${HASH2:0:2}" != "02" ]]; then +if [[ "${HASH2: -2}" != "02" ]]; then echo "Expected hash to lead with 02. Got $HASH2" exit 1 fi echo "... Pass!" echo "" - -ps -p $PID > /dev/null -if [[ "$?" == "0" ]]; then - kill -9 $PID -fi - diff --git a/tests/test_dummy.sh b/tests/test_dummy.sh index f0209a89..ac8c62ac 100755 --- a/tests/test_dummy.sh +++ b/tests/test_dummy.sh @@ -1,4 +1,12 @@ #! /bin/bash +function finish { + echo "Cleaning up..." + ps -p $PID > /dev/null + if [[ "$?" == "0" ]]; then + kill -9 $PID + fi +} +trap finish EXIT # Make sure the tmsp cli can connect to the dummy echo "Dummy test ..." @@ -51,7 +59,3 @@ if [[ "$HASH1" != "$RESULT_HASH" ]]; then fi echo "... Pass!" echo "" - - -kill $PID -sleep 1 diff --git a/types/application.go b/types/application.go index b9bd1985..cedf1240 100644 --- a/types/application.go +++ b/types/application.go @@ -2,30 +2,21 @@ package types type Application interface { - // Echo a message - Echo(message string) string - // Return application info - Info() []string + Info() (info string) // Set application option (e.g. mode=mempool, mode=consensus) - SetOption(key string, value string) RetCode + SetOption(key string, value string) (log string) // Append a tx - AppendTx(tx []byte) ([]Event, RetCode) + AppendTx(tx []byte) (code RetCode, result []byte, log string) // Validate a tx for the mempool - CheckTx(tx []byte) RetCode + CheckTx(tx []byte) (code RetCode, result []byte, log string) // Return the application Merkle root hash - GetHash() ([]byte, RetCode) - - // Add event listener - AddListener(key string) RetCode - - // Remove event listener - RemListener(key string) RetCode + GetHash() (hash []byte, log string) // Query for state - Query(query []byte) ([]byte, RetCode) + Query(query []byte) (result []byte, log string) } diff --git a/types/events.go b/types/events.go deleted file mode 100644 index d1ba5f1f..00000000 --- a/types/events.go +++ /dev/null @@ -1,6 +0,0 @@ -package types - -type Event struct { - Key string - Data []byte -} diff --git a/types/messages.go b/types/messages.go index 063e2a57..0e05706d 100644 --- a/types/messages.go +++ b/types/messages.go @@ -86,11 +86,6 @@ var _ = wire.RegisterInterface( //---------------------------------------- -type KVPair struct { - Key []byte - Value []byte -} - type ResponseException struct { Error string } @@ -103,32 +98,33 @@ type ResponseFlush struct { } type ResponseInfo struct { - Data []string + Info string } type ResponseSetOption struct { - Error string + Log string } type ResponseAppendTx struct { - RetCode - RetData []KVPair - Error string + Code RetCode + Result []byte + Log string } type ResponseCheckTx struct { - RetCode - RetData []KVPair - Error string + Code RetCode + Result []byte + Log string } type ResponseGetHash struct { Hash []byte + Log string } type ResponseQuery struct { Result []byte - Error string + Log string } type Response interface { diff --git a/types/retcode.go b/types/retcode.go index 8f3ad5bf..05e49a2e 100644 --- a/types/retcode.go +++ b/types/retcode.go @@ -1,9 +1,5 @@ package types -import ( - "errors" -) - type RetCode int // Reserved return codes @@ -17,15 +13,6 @@ const ( RetCodeBadNonce RetCode = 6 ) -func (r RetCode) Error() error { - switch r { - case RetCodeOK: - return nil - default: - return errors.New(r.String()) - } -} - //go:generate stringer -type=RetCode // NOTE: The previous comment generates r.String(). diff --git a/types/retcode_string.go b/types/retcode_string.go index 65457041..0f30688b 100644 --- a/types/retcode_string.go +++ b/types/retcode_string.go @@ -4,9 +4,9 @@ package types import "fmt" -const _RetCode_name = "RetCodeOKRetCodeInternalErrorRetCodeUnauthorizedRetCodeInsufficientFeesRetCodeUnknownRequestRetCodeEncodingErrorRetCodeInvalidNonce" +const _RetCode_name = "RetCodeOKRetCodeInternalErrorRetCodeUnauthorizedRetCodeInsufficientFeesRetCodeUnknownRequestRetCodeEncodingErrorRetCodeBadNonce" -var _RetCode_index = [...]uint8{0, 9, 29, 48, 71, 92, 112, 131} +var _RetCode_index = [...]uint8{0, 9, 29, 48, 71, 92, 112, 127} func (i RetCode) String() string { if i < 0 || i >= RetCode(len(_RetCode_index)-1) {