Allow late connecting
This commit is contained in:
parent
6785b9a3b6
commit
83fc4fc3b6
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
. "github.com/tendermint/go-common"
|
||||
"github.com/tendermint/tmsp/types"
|
||||
|
@ -25,47 +26,70 @@ type Client struct {
|
|||
QuitService
|
||||
sync.Mutex // [EB]: is this even used?
|
||||
|
||||
reqQueue chan *ReqRes
|
||||
flushTimer *ThrottleTimer
|
||||
reqQueue chan *ReqRes
|
||||
flushTimer *ThrottleTimer
|
||||
mustConnect bool
|
||||
|
||||
mtx sync.Mutex
|
||||
addr string
|
||||
conn net.Conn
|
||||
bufWriter *bufio.Writer
|
||||
err error
|
||||
reqSent *list.List
|
||||
resCb func(*types.Request, *types.Response) // listens to all callbacks
|
||||
mtx sync.Mutex
|
||||
addr string
|
||||
conn net.Conn
|
||||
err error
|
||||
reqSent *list.List
|
||||
resCb func(*types.Request, *types.Response) // listens to all callbacks
|
||||
}
|
||||
|
||||
func NewClient(addr string) (*Client, error) {
|
||||
conn, err := Connect(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func NewClient(addr string, mustConnect bool) (*Client, error) {
|
||||
cli := &Client{
|
||||
reqQueue: make(chan *ReqRes, reqQueueSize),
|
||||
flushTimer: NewThrottleTimer("Client", flushThrottleMS),
|
||||
reqQueue: make(chan *ReqRes, reqQueueSize),
|
||||
flushTimer: NewThrottleTimer("Client", flushThrottleMS),
|
||||
mustConnect: mustConnect,
|
||||
|
||||
conn: conn,
|
||||
bufWriter: bufio.NewWriter(conn),
|
||||
reqSent: list.New(),
|
||||
resCb: nil,
|
||||
addr: addr,
|
||||
reqSent: list.New(),
|
||||
resCb: nil,
|
||||
}
|
||||
cli.QuitService = *NewQuitService(nil, "Client", cli)
|
||||
cli.Start() // Just start it, it's confusing for callers to remember to start.
|
||||
return cli, nil
|
||||
_, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
|
||||
if mustConnect {
|
||||
return nil, err
|
||||
} else {
|
||||
return cli, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (cli *Client) OnStart() error {
|
||||
func (cli *Client) OnStart() (err error) {
|
||||
cli.QuitService.OnStart()
|
||||
go cli.sendRequestsRoutine()
|
||||
go cli.recvResponseRoutine()
|
||||
return nil
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
RETRY_LOOP:
|
||||
for {
|
||||
conn, err_ := Connect(cli.addr)
|
||||
if err_ != nil {
|
||||
if cli.mustConnect {
|
||||
err = err_ // OnStart() will return this.
|
||||
close(doneCh)
|
||||
return
|
||||
} else {
|
||||
fmt.Printf("tmsp.Client failed to connect to %v. Retrying...\n", cli.addr)
|
||||
time.Sleep(time.Second * 3)
|
||||
continue RETRY_LOOP
|
||||
}
|
||||
}
|
||||
go cli.sendRequestsRoutine(conn)
|
||||
go cli.recvResponseRoutine(conn)
|
||||
close(doneCh) // OnStart() will return no error.
|
||||
return
|
||||
}
|
||||
}()
|
||||
<-doneCh
|
||||
return // err
|
||||
}
|
||||
|
||||
func (cli *Client) OnStop() {
|
||||
cli.QuitService.OnStop()
|
||||
cli.conn.Close()
|
||||
if cli.conn != nil {
|
||||
cli.conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Set listener for all responses
|
||||
|
@ -78,7 +102,7 @@ func (cli *Client) SetResponseCallback(resCb Callback) {
|
|||
|
||||
func (cli *Client) StopForError(err error) {
|
||||
cli.mtx.Lock()
|
||||
// log.Error("Stopping Client for error.", "error", err)
|
||||
fmt.Printf("Stopping tmsp.Client for error: %v\n", err.Error())
|
||||
if cli.err == nil {
|
||||
cli.err = err
|
||||
}
|
||||
|
@ -94,7 +118,8 @@ func (cli *Client) Error() error {
|
|||
|
||||
//----------------------------------------
|
||||
|
||||
func (cli *Client) sendRequestsRoutine() {
|
||||
func (cli *Client) sendRequestsRoutine(conn net.Conn) {
|
||||
w := bufio.NewWriter(conn)
|
||||
for {
|
||||
select {
|
||||
case <-cli.flushTimer.Ch:
|
||||
|
@ -107,14 +132,14 @@ func (cli *Client) sendRequestsRoutine() {
|
|||
return
|
||||
case reqres := <-cli.reqQueue:
|
||||
cli.willSendReq(reqres)
|
||||
err := types.WriteMessage(reqres.Request, cli.bufWriter)
|
||||
err := types.WriteMessage(reqres.Request, w)
|
||||
if err != nil {
|
||||
cli.StopForError(err)
|
||||
return
|
||||
}
|
||||
// log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
|
||||
if reqres.Request.Type == types.MessageType_Flush {
|
||||
err = cli.bufWriter.Flush()
|
||||
err = w.Flush()
|
||||
if err != nil {
|
||||
cli.StopForError(err)
|
||||
return
|
||||
|
@ -124,8 +149,8 @@ func (cli *Client) sendRequestsRoutine() {
|
|||
}
|
||||
}
|
||||
|
||||
func (cli *Client) recvResponseRoutine() {
|
||||
r := bufio.NewReader(cli.conn) // Buffer reads
|
||||
func (cli *Client) recvResponseRoutine(conn net.Conn) {
|
||||
r := bufio.NewReader(conn) // Buffer reads
|
||||
for {
|
||||
var res = &types.Response{}
|
||||
err := types.ReadMessage(r, res)
|
||||
|
|
|
@ -70,7 +70,7 @@ func startApp() *process.Process {
|
|||
|
||||
func startClient() *tmspcli.Client {
|
||||
// Start client
|
||||
client, err := tmspcli.NewClient("tcp://127.0.0.1:46658")
|
||||
client, err := tmspcli.NewClient("tcp://127.0.0.1:46658", true)
|
||||
if err != nil {
|
||||
panic("connecting to counter_app: " + err.Error())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue