tendermint/server/server.go

164 lines
4.3 KiB
Go
Raw Normal View History

2015-11-02 07:39:53 -08:00
package server
import (
2015-11-08 15:18:58 -08:00
"bufio"
2015-11-02 07:39:53 -08:00
"fmt"
2015-11-30 17:56:36 -08:00
"io"
2015-11-02 07:39:53 -08:00
"net"
"strings"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tmsp/types"
)
// var maxNumberConnections = 2
2015-11-29 00:44:08 -08:00
2015-11-02 07:39:53 -08:00
func StartListener(protoAddr string, app types.Application) (net.Listener, error) {
parts := strings.SplitN(protoAddr, "://", 2)
proto, addr := parts[0], parts[1]
ln, err := net.Listen(proto, addr)
if err != nil {
return nil, err
}
// A goroutine to accept a connection.
go func() {
// semaphore := make(chan struct{}, maxNumberConnections)
2015-11-29 00:44:08 -08:00
2015-11-02 07:39:53 -08:00
for {
// semaphore <- struct{}{}
2015-11-29 00:44:08 -08:00
2015-11-02 07:39:53 -08:00
// Accept a connection
fmt.Println("Waiting for new connection...")
2015-11-02 07:39:53 -08:00
conn, err := ln.Accept()
if err != nil {
Exit("Failed to accept connection")
} else {
fmt.Println("Accepted a new connection")
}
appContext := app.Open()
closeConn := make(chan error, 2) // Push to signal connection closed
2015-11-02 07:39:53 -08:00
responses := make(chan types.Response, 1000) // A channel to buffer responses
// Read requests from conn and deal with them
go handleRequests(appContext, closeConn, conn, responses)
2015-11-02 07:39:53 -08:00
// Pull responses from 'responses' and write them to conn.
go handleResponses(closeConn, responses, conn)
2015-11-02 07:39:53 -08:00
2015-11-29 00:44:08 -08:00
go func() {
// Wait until signal to close connection
errClose := <-closeConn
if errClose != nil {
fmt.Printf("Connection error: %v\n", errClose)
} else {
fmt.Println("Connection was closed.")
}
// Close the connection
err := conn.Close()
if err != nil {
fmt.Printf("Error in closing connection: %v\n", err)
}
// Close the AppContext
err = appContext.Close()
if err != nil {
fmt.Printf("Error in closing app context: %v\n", err)
}
// <-semaphore
2015-11-29 00:44:08 -08:00
}()
2015-11-02 07:39:53 -08:00
}
}()
return ln, nil
}
// Read requests from conn and deal with them
func handleRequests(appC types.AppContext, closeConn chan error, conn net.Conn, responses chan<- types.Response) {
2015-11-02 07:39:53 -08:00
var count int
2015-11-08 15:18:58 -08:00
var bufReader = bufio.NewReader(conn)
2015-11-02 07:39:53 -08:00
for {
2015-11-10 12:49:07 -08:00
var n int
2015-11-02 07:39:53 -08:00
var err error
var req types.Request
2015-12-20 09:16:05 -08:00
wire.ReadBinaryPtrLengthPrefixed(&req, bufReader, 0, &n, &err)
2015-11-02 07:39:53 -08:00
if err != nil {
2015-11-30 17:56:36 -08:00
if err == io.EOF {
closeConn <- fmt.Errorf("Connection closed by client")
} else {
closeConn <- fmt.Errorf("Error in handleRequests: %v", err.Error())
}
2015-11-02 07:39:53 -08:00
return
}
count++
handleRequest(appC, req, responses)
2015-11-02 07:39:53 -08:00
}
}
func handleRequest(appC types.AppContext, req types.Request, responses chan<- types.Response) {
2015-11-02 07:39:53 -08:00
switch req := req.(type) {
case types.RequestEcho:
msg := appC.Echo(req.Message)
responses <- types.ResponseEcho{msg}
2015-11-08 15:18:58 -08:00
case types.RequestFlush:
responses <- types.ResponseFlush{}
2015-11-09 18:17:00 -08:00
case types.RequestInfo:
data := appC.Info()
2015-11-09 18:17:00 -08:00
responses <- types.ResponseInfo{data}
2015-11-27 10:14:46 -08:00
case types.RequestSetOption:
retCode := appC.SetOption(req.Key, req.Value)
2015-11-27 10:14:46 -08:00
responses <- types.ResponseSetOption{retCode}
2015-11-02 07:39:53 -08:00
case types.RequestAppendTx:
events, retCode := appC.AppendTx(req.TxBytes)
2015-11-02 07:39:53 -08:00
responses <- types.ResponseAppendTx{retCode}
for _, event := range events {
responses <- types.ResponseEvent{event}
}
case types.RequestGetHash:
hash, retCode := appC.GetHash()
2015-11-02 07:39:53 -08:00
responses <- types.ResponseGetHash{retCode, hash}
case types.RequestCommit:
retCode := appC.Commit()
2015-11-02 07:39:53 -08:00
responses <- types.ResponseCommit{retCode}
case types.RequestRollback:
retCode := appC.Rollback()
2015-11-02 07:39:53 -08:00
responses <- types.ResponseRollback{retCode}
case types.RequestAddListener:
retCode := appC.AddListener(req.EventKey)
2015-11-02 07:39:53 -08:00
responses <- types.ResponseAddListener{retCode}
case types.RequestRemListener:
retCode := appC.RemListener(req.EventKey)
2015-11-02 07:39:53 -08:00
responses <- types.ResponseRemListener{retCode}
default:
responses <- types.ResponseException{"Unknown request"}
}
}
// Pull responses from 'responses' and write them to conn.
func handleResponses(closeConn chan error, responses <-chan types.Response, conn net.Conn) {
2015-11-02 07:39:53 -08:00
var count int
2015-11-08 15:18:58 -08:00
var bufWriter = bufio.NewWriter(conn)
2015-11-02 07:39:53 -08:00
for {
var res = <-responses
2015-11-10 12:49:07 -08:00
var n int
2015-11-02 07:39:53 -08:00
var err error
2015-12-20 09:16:05 -08:00
wire.WriteBinaryLengthPrefixed(res, bufWriter, &n, &err)
2015-11-02 07:39:53 -08:00
if err != nil {
closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
2015-11-02 07:39:53 -08:00
return
}
2015-11-08 15:18:58 -08:00
if _, ok := res.(types.ResponseFlush); ok {
err = bufWriter.Flush()
if err != nil {
closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
2015-11-08 15:18:58 -08:00
return
}
}
2015-11-02 07:39:53 -08:00
count++
}
}