tendermint/server/socket_server.go

223 lines
5.3 KiB
Go
Raw Normal View History

2016-05-18 15:30:38 -07:00
package server
import (
"bufio"
"fmt"
"io"
"net"
"sync"
2017-01-12 12:47:55 -08:00
"github.com/tendermint/abci/types"
2017-04-21 15:25:13 -07:00
cmn "github.com/tendermint/tmlibs/common"
2016-05-18 15:30:38 -07:00
)
// var maxNumberConnections = 2
type SocketServer struct {
2017-01-23 20:26:17 -08:00
cmn.BaseService
2016-05-18 15:30:38 -07:00
proto string
addr string
listener net.Listener
connsMtx sync.Mutex
conns map[int]net.Conn
nextConnID int
2016-05-18 15:30:38 -07:00
appMtx sync.Mutex
app types.Application
}
func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
2017-10-25 20:04:20 -07:00
proto, addr := cmn.ProtocolAndAddress(protoAddr)
2016-05-18 15:30:38 -07:00
s := &SocketServer{
proto: proto,
addr: addr,
listener: nil,
app: app,
conns: make(map[int]net.Conn),
2016-05-18 15:30:38 -07:00
}
2017-01-23 20:26:17 -08:00
s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
return s
2016-05-18 15:30:38 -07:00
}
func (s *SocketServer) OnStart() error {
2017-09-21 12:32:06 -07:00
if err := s.BaseService.OnStart(); err != nil {
return err
}
2016-05-18 15:30:38 -07:00
ln, err := net.Listen(s.proto, s.addr)
if err != nil {
return err
}
s.listener = ln
go s.acceptConnectionsRoutine()
return nil
}
func (s *SocketServer) OnStop() {
2016-10-28 12:06:40 -07:00
s.BaseService.OnStop()
2016-05-18 15:30:38 -07:00
s.listener.Close()
s.connsMtx.Lock()
2017-11-14 09:03:23 -08:00
defer s.connsMtx.Unlock()
for id, conn := range s.conns {
delete(s.conns, id)
conn.Close()
}
}
func (s *SocketServer) addConn(conn net.Conn) int {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()
connID := s.nextConnID
2017-01-17 00:26:32 -08:00
s.nextConnID++
s.conns[connID] = conn
return connID
}
// deletes conn even if close errs
2017-11-14 09:03:23 -08:00
func (s *SocketServer) rmConn(connID int) error {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()
2017-11-14 09:03:23 -08:00
conn, ok := s.conns[connID]
if !ok {
return fmt.Errorf("Connection %d does not exist", connID)
}
delete(s.conns, connID)
return conn.Close()
2016-05-18 15:30:38 -07:00
}
func (s *SocketServer) acceptConnectionsRoutine() {
for {
// Accept a connection
2017-04-27 13:37:18 -07:00
s.Logger.Info("Waiting for new connection...")
2016-05-18 15:30:38 -07:00
conn, err := s.listener.Accept()
if err != nil {
if !s.IsRunning() {
return // Ignore error from listener closing.
}
2017-04-27 13:37:18 -07:00
s.Logger.Error("Failed to accept connection: " + err.Error())
2017-11-14 09:03:23 -08:00
continue
2016-05-18 15:30:38 -07:00
}
2017-11-14 09:03:23 -08:00
s.Logger.Info("Accepted a new connection")
connID := s.addConn(conn)
2016-05-18 15:30:38 -07:00
closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan *types.Response, 1000) // A channel to buffer responses
// Read requests from conn and deal with them
go s.handleRequests(closeConn, conn, responses)
// Pull responses from 'responses' and write them to conn.
2017-11-14 09:03:23 -08:00
go s.handleResponses(closeConn, conn, responses)
2016-05-18 15:30:38 -07:00
2017-11-14 09:03:23 -08:00
// Wait until signal to close connection
go s.waitForClose(closeConn, connID)
}
}
func (s *SocketServer) waitForClose(closeConn chan error, connID int) {
err := <-closeConn
if err == io.EOF {
s.Logger.Error("Connection was closed by client")
} else if err != nil {
s.Logger.Error("Connection error", "error", err)
} else {
// never happens
s.Logger.Error("Connection was closed.")
}
2016-05-18 15:30:38 -07:00
2017-11-14 09:03:23 -08:00
// Close the connection
if err := s.rmConn(connID); err != nil {
s.Logger.Error("Error in closing connection", "error", err)
2016-05-18 15:30:38 -07:00
}
}
// Read requests from conn and deal with them
func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
var count int
var bufReader = bufio.NewReader(conn)
for {
var req = &types.Request{}
err := types.ReadMessage(bufReader, req)
if err != nil {
if err == io.EOF {
closeConn <- err
2016-05-18 15:30:38 -07:00
} else {
closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
2016-05-18 15:30:38 -07:00
}
return
}
s.appMtx.Lock()
count++
s.handleRequest(req, responses)
s.appMtx.Unlock()
}
}
func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
switch r := req.Value.(type) {
case *types.Request_Echo:
responses <- types.ToResponseEcho(r.Echo.Message)
case *types.Request_Flush:
responses <- types.ToResponseFlush()
case *types.Request_Info:
res := s.app.Info(*r.Info)
responses <- types.ToResponseInfo(res)
2016-05-18 15:30:38 -07:00
case *types.Request_SetOption:
res := s.app.SetOption(*r.SetOption)
responses <- types.ToResponseSetOption(res)
2017-01-12 12:27:08 -08:00
case *types.Request_DeliverTx:
res := s.app.DeliverTx(r.DeliverTx.Tx)
responses <- types.ToResponseDeliverTx(res)
2016-05-18 15:30:38 -07:00
case *types.Request_CheckTx:
res := s.app.CheckTx(r.CheckTx.Tx)
responses <- types.ToResponseCheckTx(res)
2016-05-18 15:30:38 -07:00
case *types.Request_Commit:
res := s.app.Commit()
responses <- types.ToResponseCommit(res)
2016-05-18 15:30:38 -07:00
case *types.Request_Query:
res := s.app.Query(*r.Query)
responses <- types.ToResponseQuery(res)
2016-05-18 15:30:38 -07:00
case *types.Request_InitChain:
res := s.app.InitChain(*r.InitChain)
responses <- types.ToResponseInitChain(res)
2016-11-03 16:50:57 -07:00
case *types.Request_BeginBlock:
res := s.app.BeginBlock(*r.BeginBlock)
responses <- types.ToResponseBeginBlock(res)
2016-05-18 15:30:38 -07:00
case *types.Request_EndBlock:
res := s.app.EndBlock(*r.EndBlock)
responses <- types.ToResponseEndBlock(res)
2016-05-18 15:30:38 -07:00
default:
responses <- types.ToResponseException("Unknown request")
}
}
// Pull responses from 'responses' and write them to conn.
2017-11-14 09:03:23 -08:00
func (s *SocketServer) handleResponses(closeConn chan error, conn net.Conn, responses <-chan *types.Response) {
2016-05-18 15:30:38 -07:00
var count int
var bufWriter = bufio.NewWriter(conn)
for {
var res = <-responses
err := types.WriteMessage(res, bufWriter)
if err != nil {
closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
2016-05-18 15:30:38 -07:00
return
}
if _, ok := res.Value.(*types.Response_Flush); ok {
err = bufWriter.Flush()
if err != nil {
closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
2016-05-18 15:30:38 -07:00
return
}
}
count++
}
}