QuitService->BaseService

This commit is contained in:
Jae Kwon 2016-10-28 12:06:40 -07:00 committed by Ethan Buchman
parent 8374785c1d
commit 7998ba668a
5 changed files with 22 additions and 18 deletions

View File

@ -68,7 +68,11 @@ TMSP requests/responses are simple Protobuf messages. Check out the [schema fil
* `Data ([]byte)`: Result bytes, if any * `Data ([]byte)`: Result bytes, if any
* `Log (string)`: Debug or error message * `Log (string)`: Debug or error message
* __Usage__:<br/> * __Usage__:<br/>
Validate a transaction. This message should not mutate the state. Validate a mempool transaction, prior to broadcasting or proposing. This message should not mutate the main state, but application
developers may want to keep a separate CheckTx state that gets reset upon Commit.
CheckTx can happen interspersed with AppendTx, but they happen on different connections - CheckTx from the mempool connection, and AppendTx from the consensus connection. During Commit, the mempool is locked, so you can reset the mempool state to the latest state after running all those appendtxs, and then the mempool will re run whatever txs it has against that latest mempool stte
Transactions are first run through CheckTx before broadcast to peers in the mempool layer. Transactions are first run through CheckTx before broadcast to peers in the mempool layer.
You can make CheckTx semi-stateful and clear the state upon `Commit` or `BeginBlock`, You can make CheckTx semi-stateful and clear the state upon `Commit` or `BeginBlock`,
to allow for dependent sequences of transactions in the same block. to allow for dependent sequences of transactions in the same block.

View File

@ -15,7 +15,7 @@ import (
// A stripped copy of the remoteClient that makes // A stripped copy of the remoteClient that makes
// synchronous calls using grpc // synchronous calls using grpc
type grpcClient struct { type grpcClient struct {
QuitService BaseService
mustConnect bool mustConnect bool
client types.TMSPApplicationClient client types.TMSPApplicationClient
@ -31,7 +31,7 @@ func NewGRPCClient(addr string, mustConnect bool) (*grpcClient, error) {
addr: addr, addr: addr,
mustConnect: mustConnect, mustConnect: mustConnect,
} }
cli.QuitService = *NewQuitService(nil, "grpcClient", cli) cli.BaseService = *NewBaseService(nil, "grpcClient", cli)
_, err := cli.Start() // Just start it, it's confusing for callers to remember to start. _, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
return cli, err return cli, err
} }
@ -41,7 +41,7 @@ func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
} }
func (cli *grpcClient) OnStart() error { func (cli *grpcClient) OnStart() error {
cli.QuitService.OnStart() cli.BaseService.OnStart()
RETRY_LOOP: RETRY_LOOP:
for { for {
@ -73,7 +73,7 @@ RETRY_LOOP:
} }
func (cli *grpcClient) OnStop() { func (cli *grpcClient) OnStop() {
cli.QuitService.OnStop() cli.BaseService.OnStop()
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock() defer cli.mtx.Unlock()
// TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close() // TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close()

View File

@ -27,7 +27,7 @@ const flushThrottleMS = 20 // Don't wait longer than...
// the application in general is not meant to be interfaced // the application in general is not meant to be interfaced
// with concurrent callers. // with concurrent callers.
type socketClient struct { type socketClient struct {
QuitService BaseService
reqQueue chan *ReqRes reqQueue chan *ReqRes
flushTimer *ThrottleTimer flushTimer *ThrottleTimer
@ -52,14 +52,14 @@ func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) {
reqSent: list.New(), reqSent: list.New(),
resCb: nil, resCb: nil,
} }
cli.QuitService = *NewQuitService(nil, "socketClient", cli) cli.BaseService = *NewBaseService(nil, "socketClient", cli)
_, err := cli.Start() // Just start it, it's confusing for callers to remember to start. _, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
return cli, err return cli, err
} }
func (cli *socketClient) OnStart() error { func (cli *socketClient) OnStart() error {
cli.QuitService.OnStart() cli.BaseService.OnStart()
var err error var err error
var conn net.Conn var conn net.Conn
@ -86,7 +86,7 @@ RETRY_LOOP:
} }
func (cli *socketClient) OnStop() { func (cli *socketClient) OnStop() {
cli.QuitService.OnStop() cli.BaseService.OnStop()
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock() defer cli.mtx.Unlock()
@ -140,7 +140,7 @@ func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
default: default:
// Probably will fill the buffer, or retry later. // Probably will fill the buffer, or retry later.
} }
case <-cli.QuitService.Quit: case <-cli.BaseService.Quit:
return return
case reqres := <-cli.reqQueue: case reqres := <-cli.reqQueue:
cli.willSendReq(reqres) cli.willSendReq(reqres)

View File

@ -13,7 +13,7 @@ import (
// var maxNumberConnections = 2 // var maxNumberConnections = 2
type GRPCServer struct { type GRPCServer struct {
QuitService BaseService
proto string proto string
addr string addr string
@ -32,13 +32,13 @@ func NewGRPCServer(protoAddr string, app types.TMSPApplicationServer) (Service,
listener: nil, listener: nil,
app: app, app: app,
} }
s.QuitService = *NewQuitService(nil, "TMSPServer", s) s.BaseService = *NewBaseService(nil, "TMSPServer", s)
_, err := s.Start() // Just start it _, err := s.Start() // Just start it
return s, err return s, err
} }
func (s *GRPCServer) OnStart() error { func (s *GRPCServer) OnStart() error {
s.QuitService.OnStart() s.BaseService.OnStart()
ln, err := net.Listen(s.proto, s.addr) ln, err := net.Listen(s.proto, s.addr)
if err != nil { if err != nil {
return err return err
@ -51,6 +51,6 @@ func (s *GRPCServer) OnStart() error {
} }
func (s *GRPCServer) OnStop() { func (s *GRPCServer) OnStop() {
s.QuitService.OnStop() s.BaseService.OnStop()
s.server.Stop() s.server.Stop()
} }

View File

@ -15,7 +15,7 @@ import (
// var maxNumberConnections = 2 // var maxNumberConnections = 2
type SocketServer struct { type SocketServer struct {
QuitService BaseService
proto string proto string
addr string addr string
@ -39,13 +39,13 @@ func NewSocketServer(protoAddr string, app types.Application) (Service, error) {
app: app, app: app,
conns: make(map[int]net.Conn), conns: make(map[int]net.Conn),
} }
s.QuitService = *NewQuitService(nil, "TMSPServer", s) s.BaseService = *NewBaseService(nil, "TMSPServer", s)
_, err := s.Start() // Just start it _, err := s.Start() // Just start it
return s, err return s, err
} }
func (s *SocketServer) OnStart() error { func (s *SocketServer) OnStart() error {
s.QuitService.OnStart() s.BaseService.OnStart()
ln, err := net.Listen(s.proto, s.addr) ln, err := net.Listen(s.proto, s.addr)
if err != nil { if err != nil {
return err return err
@ -56,7 +56,7 @@ func (s *SocketServer) OnStart() error {
} }
func (s *SocketServer) OnStop() { func (s *SocketServer) OnStop() {
s.QuitService.OnStop() s.BaseService.OnStop()
s.listener.Close() s.listener.Close()
s.connsMtx.Lock() s.connsMtx.Lock()