remote_client -> socket_client; use logger

This commit is contained in:
Ethan Buchman 2016-08-10 17:49:15 -04:00
parent 58bacfb04e
commit d3bdb49aae
3 changed files with 51 additions and 43 deletions

View File

@ -1,7 +1,6 @@
package tmspcli
import (
"fmt"
"net"
"sync"
"time"
@ -50,7 +49,7 @@ RETRY_LOOP:
if cli.mustConnect {
return err
} else {
fmt.Printf("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr)
log.Warn(Fmt("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr))
time.Sleep(time.Second * 3)
continue RETRY_LOOP
}
@ -75,7 +74,7 @@ func (cli *grpcClient) SetResponseCallback(resCb Callback) {
func (cli *grpcClient) StopForError(err error) {
cli.mtx.Lock()
fmt.Printf("Stopping tmsp.grpcClient for error: %v\n", err.Error())
log.Warn(Fmt("Stopping tmsp.grpcClient for error: %v\n", err.Error()))
if cli.err == nil {
cli.err = err
}

7
client/log.go Normal file
View File

@ -0,0 +1,7 @@
package tmspcli
import (
"github.com/tendermint/go-logger"
)
var log = logger.New("module", "tmspcli")

View File

@ -26,7 +26,7 @@ const flushThrottleMS = 20 // Don't wait longer than...
// This is goroutine-safe, but users should beware that
// the application in general is not meant to be interfaced
// with concurrent callers.
type remoteClient struct {
type socketClient struct {
QuitService
sync.Mutex // [EB]: is this even used?
@ -42,22 +42,22 @@ type remoteClient struct {
resCb func(*types.Request, *types.Response) // listens to all callbacks
}
func NewSocketClient(addr string, mustConnect bool) (*remoteClient, error) {
cli := &remoteClient{
func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) {
cli := &socketClient{
reqQueue: make(chan *ReqRes, reqQueueSize),
flushTimer: NewThrottleTimer("remoteClient", flushThrottleMS),
flushTimer: NewThrottleTimer("socketClient", flushThrottleMS),
mustConnect: mustConnect,
addr: addr,
reqSent: list.New(),
resCb: nil,
}
cli.QuitService = *NewQuitService(nil, "remoteClient", cli)
cli.QuitService = *NewQuitService(nil, "socketClient", cli)
_, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
return cli, err
}
func (cli *remoteClient) OnStart() error {
func (cli *socketClient) OnStart() error {
cli.QuitService.OnStart()
RETRY_LOOP:
for {
@ -66,7 +66,7 @@ RETRY_LOOP:
if cli.mustConnect {
return err
} else {
fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr)
log.Warn(Fmt("tmsp.socketClient failed to connect to %v. Retrying...\n", cli.addr))
time.Sleep(time.Second * 3)
continue RETRY_LOOP
}
@ -78,7 +78,7 @@ RETRY_LOOP:
return nil // never happens
}
func (cli *remoteClient) OnStop() {
func (cli *socketClient) OnStop() {
cli.QuitService.OnStop()
if cli.conn != nil {
cli.conn.Close()
@ -87,15 +87,15 @@ func (cli *remoteClient) OnStop() {
// Set listener for all responses
// NOTE: callback may get internally generated flush responses.
func (cli *remoteClient) SetResponseCallback(resCb Callback) {
func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
}
func (cli *remoteClient) StopForError(err error) {
func (cli *socketClient) StopForError(err error) {
cli.mtx.Lock()
fmt.Printf("Stopping tmsp.remoteClient for error: %v\n", err.Error())
log.Warn(Fmt("Stopping tmsp.socketClient for error: %v\n", err.Error()))
if cli.err == nil {
cli.err = err
}
@ -103,7 +103,7 @@ func (cli *remoteClient) StopForError(err error) {
cli.Stop()
}
func (cli *remoteClient) Error() error {
func (cli *socketClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
@ -111,13 +111,13 @@ func (cli *remoteClient) Error() error {
//----------------------------------------
func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) {
func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
w := bufio.NewWriter(conn)
for {
select {
case <-cli.flushTimer.Ch:
select {
case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): // cant this block ?
case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
default:
// Probably will fill the buffer, or retry later.
}
@ -142,7 +142,7 @@ func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) {
}
}
func (cli *remoteClient) recvResponseRoutine(conn net.Conn) {
func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
r := bufio.NewReader(conn) // Buffer reads
for {
var res = &types.Response{}
@ -165,13 +165,13 @@ func (cli *remoteClient) recvResponseRoutine(conn net.Conn) {
}
}
func (cli *remoteClient) willSendReq(reqres *ReqRes) {
func (cli *socketClient) willSendReq(reqres *ReqRes) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.reqSent.PushBack(reqres)
}
func (cli *remoteClient) didRecvResponse(res *types.Response) error {
func (cli *socketClient) didRecvResponse(res *types.Response) error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
@ -205,53 +205,53 @@ func (cli *remoteClient) didRecvResponse(res *types.Response) error {
//----------------------------------------
func (cli *remoteClient) EchoAsync(msg string) *ReqRes {
func (cli *socketClient) EchoAsync(msg string) *ReqRes {
return cli.queueRequest(types.ToRequestEcho(msg))
}
func (cli *remoteClient) FlushAsync() *ReqRes {
func (cli *socketClient) FlushAsync() *ReqRes {
return cli.queueRequest(types.ToRequestFlush())
}
func (cli *remoteClient) InfoAsync() *ReqRes {
func (cli *socketClient) InfoAsync() *ReqRes {
return cli.queueRequest(types.ToRequestInfo())
}
func (cli *remoteClient) SetOptionAsync(key string, value string) *ReqRes {
func (cli *socketClient) SetOptionAsync(key string, value string) *ReqRes {
return cli.queueRequest(types.ToRequestSetOption(key, value))
}
func (cli *remoteClient) AppendTxAsync(tx []byte) *ReqRes {
func (cli *socketClient) AppendTxAsync(tx []byte) *ReqRes {
return cli.queueRequest(types.ToRequestAppendTx(tx))
}
func (cli *remoteClient) CheckTxAsync(tx []byte) *ReqRes {
func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes {
return cli.queueRequest(types.ToRequestCheckTx(tx))
}
func (cli *remoteClient) QueryAsync(query []byte) *ReqRes {
func (cli *socketClient) QueryAsync(query []byte) *ReqRes {
return cli.queueRequest(types.ToRequestQuery(query))
}
func (cli *remoteClient) CommitAsync() *ReqRes {
func (cli *socketClient) CommitAsync() *ReqRes {
return cli.queueRequest(types.ToRequestCommit())
}
func (cli *remoteClient) InitChainAsync(validators []*types.Validator) *ReqRes {
func (cli *socketClient) InitChainAsync(validators []*types.Validator) *ReqRes {
return cli.queueRequest(types.ToRequestInitChain(validators))
}
func (cli *remoteClient) BeginBlockAsync(height uint64) *ReqRes {
func (cli *socketClient) BeginBlockAsync(height uint64) *ReqRes {
return cli.queueRequest(types.ToRequestBeginBlock(height))
}
func (cli *remoteClient) EndBlockAsync(height uint64) *ReqRes {
func (cli *socketClient) EndBlockAsync(height uint64) *ReqRes {
return cli.queueRequest(types.ToRequestEndBlock(height))
}
//----------------------------------------
func (cli *remoteClient) EchoSync(msg string) (res types.Result) {
func (cli *socketClient) EchoSync(msg string) (res types.Result) {
reqres := cli.queueRequest(types.ToRequestEcho(msg))
cli.FlushSync()
if cli.err != nil {
@ -261,12 +261,14 @@ func (cli *remoteClient) EchoSync(msg string) (res types.Result) {
return types.Result{Code: OK, Data: []byte(resp.Message), Log: LOG}
}
func (cli *remoteClient) FlushSync() error {
func (cli *socketClient) FlushSync() error {
log.Warn("FlushSync")
cli.queueRequest(types.ToRequestFlush()).Wait()
log.Warn("Done FlushSync")
return cli.err
}
func (cli *remoteClient) InfoSync() (res types.Result) {
func (cli *socketClient) InfoSync() (res types.Result) {
reqres := cli.queueRequest(types.ToRequestInfo())
cli.FlushSync()
if cli.err != nil {
@ -276,7 +278,7 @@ func (cli *remoteClient) InfoSync() (res types.Result) {
return types.Result{Code: OK, Data: []byte(resp.Info), Log: LOG}
}
func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Result) {
func (cli *socketClient) SetOptionSync(key string, value string) (res types.Result) {
reqres := cli.queueRequest(types.ToRequestSetOption(key, value))
cli.FlushSync()
if cli.err != nil {
@ -286,7 +288,7 @@ func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Resu
return types.Result{Code: OK, Data: nil, Log: resp.Log}
}
func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) {
func (cli *socketClient) AppendTxSync(tx []byte) (res types.Result) {
reqres := cli.queueRequest(types.ToRequestAppendTx(tx))
cli.FlushSync()
if cli.err != nil {
@ -296,7 +298,7 @@ func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) {
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
}
func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) {
func (cli *socketClient) CheckTxSync(tx []byte) (res types.Result) {
reqres := cli.queueRequest(types.ToRequestCheckTx(tx))
cli.FlushSync()
if cli.err != nil {
@ -306,7 +308,7 @@ func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) {
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
}
func (cli *remoteClient) QuerySync(query []byte) (res types.Result) {
func (cli *socketClient) QuerySync(query []byte) (res types.Result) {
reqres := cli.queueRequest(types.ToRequestQuery(query))
cli.FlushSync()
if cli.err != nil {
@ -316,7 +318,7 @@ func (cli *remoteClient) QuerySync(query []byte) (res types.Result) {
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
}
func (cli *remoteClient) CommitSync() (res types.Result) {
func (cli *socketClient) CommitSync() (res types.Result) {
reqres := cli.queueRequest(types.ToRequestCommit())
cli.FlushSync()
if cli.err != nil {
@ -326,7 +328,7 @@ func (cli *remoteClient) CommitSync() (res types.Result) {
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
}
func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error) {
func (cli *socketClient) InitChainSync(validators []*types.Validator) (err error) {
cli.queueRequest(types.ToRequestInitChain(validators))
cli.FlushSync()
if cli.err != nil {
@ -335,7 +337,7 @@ func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error
return nil
}
func (cli *remoteClient) BeginBlockSync(height uint64) (err error) {
func (cli *socketClient) BeginBlockSync(height uint64) (err error) {
cli.queueRequest(types.ToRequestBeginBlock(height))
cli.FlushSync()
if cli.err != nil {
@ -344,7 +346,7 @@ func (cli *remoteClient) BeginBlockSync(height uint64) (err error) {
return nil
}
func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) {
func (cli *socketClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) {
reqres := cli.queueRequest(types.ToRequestEndBlock(height))
cli.FlushSync()
if cli.err != nil {
@ -355,7 +357,7 @@ func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Valida
//----------------------------------------
func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes {
func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
reqres := NewReqRes(req)
// TODO: set cli.err if reqQueue times out