tendermint/client/grpc_client.go

341 lines
9.5 KiB
Go
Raw Normal View History

2017-01-12 12:47:55 -08:00
package abcicli
2016-05-18 15:30:38 -07:00
import (
2017-01-16 22:59:46 -08:00
"fmt"
2016-05-18 15:30:38 -07:00
"net"
"sync"
"time"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
2017-01-12 12:47:55 -08:00
"github.com/tendermint/abci/types"
common "github.com/tendermint/go-common"
2016-05-18 15:30:38 -07:00
)
2016-05-24 18:51:28 -07:00
// A stripped copy of the remoteClient that makes
// synchronous calls using grpc
2016-05-18 15:30:38 -07:00
type grpcClient struct {
common.BaseService
2016-05-18 15:30:38 -07:00
mustConnect bool
2017-01-12 12:47:55 -08:00
client types.ABCIApplicationClient
2016-05-18 15:30:38 -07:00
mtx sync.Mutex
addr string
err error
resCb func(*types.Request, *types.Response) // listens to all callbacks
}
func NewGRPCClient(addr string, mustConnect bool) (*grpcClient, error) {
cli := &grpcClient{
addr: addr,
mustConnect: mustConnect,
}
cli.BaseService = *common.NewBaseService(nil, "grpcClient", cli)
2016-05-18 15:30:38 -07:00
_, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
return cli, err
}
func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
return common.Connect(addr)
2016-05-18 15:30:38 -07:00
}
2016-05-24 18:51:28 -07:00
func (cli *grpcClient) OnStart() error {
2016-10-28 12:06:40 -07:00
cli.BaseService.OnStart()
2016-05-18 15:30:38 -07:00
RETRY_LOOP:
2016-05-18 15:30:38 -07:00
for {
2016-05-24 18:51:28 -07:00
conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
if err != nil {
2016-05-18 15:30:38 -07:00
if cli.mustConnect {
2016-05-24 18:51:28 -07:00
return err
2016-05-18 15:30:38 -07:00
}
2017-01-17 00:26:32 -08:00
log.Warn(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr))
time.Sleep(time.Second * 3)
continue RETRY_LOOP
2016-05-18 15:30:38 -07:00
}
2017-01-12 12:47:55 -08:00
client := types.NewABCIApplicationClient(conn)
ENSURE_CONNECTED:
for {
_, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true))
if err == nil {
break ENSURE_CONNECTED
}
time.Sleep(time.Second)
}
cli.client = client
2016-05-24 18:51:28 -07:00
return nil
2016-05-18 15:30:38 -07:00
}
}
func (cli *grpcClient) OnStop() {
2016-10-28 12:06:40 -07:00
cli.BaseService.OnStop()
2016-05-18 15:30:38 -07:00
cli.mtx.Lock()
defer cli.mtx.Unlock()
// TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close()
/*if cli.conn != nil {
cli.conn.Close()
}*/
2016-05-18 15:30:38 -07:00
}
func (cli *grpcClient) StopForError(err error) {
cli.mtx.Lock()
if !cli.IsRunning() {
return
}
2016-05-18 15:30:38 -07:00
if cli.err == nil {
cli.err = err
}
cli.mtx.Unlock()
2017-01-16 22:59:46 -08:00
log.Warn(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error()))
2016-05-18 15:30:38 -07:00
cli.Stop()
}
func (cli *grpcClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}
// Set listener for all responses
// NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
}
2016-05-18 15:30:38 -07:00
//----------------------------------------
2016-07-06 13:57:02 -07:00
// GRPC calls are synchronous, but some callbacks expect to be called asynchronously
// (eg. the mempool expects to be able to lock to remove bad txs from cache).
// To accomodate, we finish each call in its own go-routine,
// which is expensive, but easy - if you want something better, use the socket protocol!
2016-05-18 15:30:38 -07:00
// maybe one day, if people really want it, we use grpc streams,
// but hopefully not :D
func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
req := types.ToRequestEcho(msg)
res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true))
2016-05-18 15:30:38 -07:00
if err != nil {
2016-09-08 18:41:35 -07:00
cli.StopForError(err)
2016-05-18 15:30:38 -07:00
}
return cli.finishAsyncCall(req, &types.Response{&types.Response_Echo{res}})
}
func (cli *grpcClient) FlushAsync() *ReqRes {
req := types.ToRequestFlush()
res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true))
2016-05-18 15:30:38 -07:00
if err != nil {
2016-09-08 18:41:35 -07:00
cli.StopForError(err)
2016-05-18 15:30:38 -07:00
}
return cli.finishAsyncCall(req, &types.Response{&types.Response_Flush{res}})
}
func (cli *grpcClient) InfoAsync() *ReqRes {
req := types.ToRequestInfo()
res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true))
2016-05-18 15:30:38 -07:00
if err != nil {
2016-09-08 18:41:35 -07:00
cli.StopForError(err)
2016-05-18 15:30:38 -07:00
}
return cli.finishAsyncCall(req, &types.Response{&types.Response_Info{res}})
}
func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes {
req := types.ToRequestSetOption(key, value)
res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true))
2016-05-18 15:30:38 -07:00
if err != nil {
2016-09-08 18:41:35 -07:00
cli.StopForError(err)
2016-05-18 15:30:38 -07:00
}
return cli.finishAsyncCall(req, &types.Response{&types.Response_SetOption{res}})
}
2017-01-12 12:27:08 -08:00
func (cli *grpcClient) DeliverTxAsync(tx []byte) *ReqRes {
req := types.ToRequestDeliverTx(tx)
res, err := cli.client.DeliverTx(context.Background(), req.GetDeliverTx(), grpc.FailFast(true))
2016-05-18 15:30:38 -07:00
if err != nil {
2016-09-08 18:41:35 -07:00
cli.StopForError(err)
2016-05-18 15:30:38 -07:00
}
2017-01-12 12:27:08 -08:00
return cli.finishAsyncCall(req, &types.Response{&types.Response_DeliverTx{res}})
2016-05-18 15:30:38 -07:00
}
func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
req := types.ToRequestCheckTx(tx)
res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true))
2016-05-18 15:30:38 -07:00
if err != nil {
2016-09-08 18:41:35 -07:00
cli.StopForError(err)
2016-05-18 15:30:38 -07:00
}
return cli.finishAsyncCall(req, &types.Response{&types.Response_CheckTx{res}})
}
func (cli *grpcClient) QueryAsync(query []byte) *ReqRes {
req := types.ToRequestQuery(query)
res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true))
2016-05-18 15:30:38 -07:00
if err != nil {
2016-09-08 18:41:35 -07:00
cli.StopForError(err)
2016-05-18 15:30:38 -07:00
}
return cli.finishAsyncCall(req, &types.Response{&types.Response_Query{res}})
}
func (cli *grpcClient) CommitAsync() *ReqRes {
req := types.ToRequestCommit()
res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true))
2016-05-18 15:30:38 -07:00
if err != nil {
2016-09-08 18:41:35 -07:00
cli.StopForError(err)
2016-05-18 15:30:38 -07:00
}
return cli.finishAsyncCall(req, &types.Response{&types.Response_Commit{res}})
}
func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes {
req := types.ToRequestInitChain(validators)
res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true))
2016-05-18 15:30:38 -07:00
if err != nil {
2016-09-08 18:41:35 -07:00
cli.StopForError(err)
2016-05-18 15:30:38 -07:00
}
return cli.finishAsyncCall(req, &types.Response{&types.Response_InitChain{res}})
}
2016-09-09 20:01:53 -07:00
func (cli *grpcClient) BeginBlockAsync(hash []byte, header *types.Header) *ReqRes {
req := types.ToRequestBeginBlock(hash, header)
res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true))
2016-05-18 15:30:38 -07:00
if err != nil {
2016-09-08 18:41:35 -07:00
cli.StopForError(err)
2016-05-18 15:30:38 -07:00
}
return cli.finishAsyncCall(req, &types.Response{&types.Response_BeginBlock{res}})
}
func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes {
req := types.ToRequestEndBlock(height)
res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true))
2016-05-18 15:30:38 -07:00
if err != nil {
2016-09-08 18:41:35 -07:00
cli.StopForError(err)
2016-05-18 15:30:38 -07:00
}
return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}})
}
2016-05-24 18:51:28 -07:00
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
reqres := NewReqRes(req)
reqres.Response = res // Set response
reqres.Done() // Release waiters
2016-06-22 11:25:18 -07:00
reqres.SetDone() // so reqRes.SetCallback will run the callback
2016-05-24 18:51:28 -07:00
2016-07-06 13:57:02 -07:00
// go routine for callbacks
go func() {
// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(res)
}
2016-05-24 18:51:28 -07:00
2016-07-06 13:57:02 -07:00
// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}
}()
2016-05-24 18:51:28 -07:00
return reqres
}
2016-08-17 20:27:49 -07:00
func (cli *grpcClient) checkErrGetResult() types.Result {
2016-09-08 18:41:35 -07:00
if err := cli.Error(); err != nil {
// StopForError should already have been called if error is set
return types.ErrInternalError.SetLog(err.Error())
}
2016-08-23 23:11:31 -07:00
return types.Result{}
}
2016-05-18 15:30:38 -07:00
//----------------------------------------
func (cli *grpcClient) EchoSync(msg string) (res types.Result) {
reqres := cli.EchoAsync(msg)
2016-08-17 20:27:49 -07:00
if res := cli.checkErrGetResult(); res.IsErr() {
return res
}
resp := reqres.Response.GetEcho()
return types.NewResultOK([]byte(resp.Message), LOG)
2016-05-18 15:30:38 -07:00
}
func (cli *grpcClient) FlushSync() error {
return nil
}
2016-12-26 17:44:36 -08:00
func (cli *grpcClient) InfoSync() (resInfo types.ResponseInfo, err error) {
reqres := cli.InfoAsync()
2016-12-26 17:44:36 -08:00
if err = cli.Error(); err != nil {
return resInfo, err
}
2017-01-17 00:26:32 -08:00
if info := reqres.Response.GetInfo(); info != nil {
return *info, nil
}
2017-01-17 00:26:32 -08:00
return resInfo, nil
2016-05-18 15:30:38 -07:00
}
func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) {
reqres := cli.SetOptionAsync(key, value)
2016-08-17 20:27:49 -07:00
if res := cli.checkErrGetResult(); res.IsErr() {
return res
2016-05-18 15:30:38 -07:00
}
resp := reqres.Response.GetSetOption()
return types.Result{Code: OK, Data: nil, Log: resp.Log}
}
2017-01-12 12:27:08 -08:00
func (cli *grpcClient) DeliverTxSync(tx []byte) (res types.Result) {
reqres := cli.DeliverTxAsync(tx)
2016-08-17 20:27:49 -07:00
if res := cli.checkErrGetResult(); res.IsErr() {
return res
2016-05-18 15:30:38 -07:00
}
2017-01-12 12:27:08 -08:00
resp := reqres.Response.GetDeliverTx()
2016-05-18 15:30:38 -07:00
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
}
func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) {
reqres := cli.CheckTxAsync(tx)
2016-08-17 20:27:49 -07:00
if res := cli.checkErrGetResult(); res.IsErr() {
return res
2016-05-18 15:30:38 -07:00
}
resp := reqres.Response.GetCheckTx()
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
}
func (cli *grpcClient) QuerySync(query []byte) (res types.Result) {
reqres := cli.QueryAsync(query)
2016-08-17 20:27:49 -07:00
if res := cli.checkErrGetResult(); res.IsErr() {
return res
2016-05-18 15:30:38 -07:00
}
resp := reqres.Response.GetQuery()
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
}
func (cli *grpcClient) CommitSync() (res types.Result) {
reqres := cli.CommitAsync()
2016-08-17 20:27:49 -07:00
if res := cli.checkErrGetResult(); res.IsErr() {
return res
2016-05-18 15:30:38 -07:00
}
resp := reqres.Response.GetCommit()
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
}
func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) {
cli.InitChainAsync(validators)
2016-09-08 18:41:35 -07:00
return cli.Error()
2016-05-18 15:30:38 -07:00
}
2016-09-09 20:01:53 -07:00
func (cli *grpcClient) BeginBlockSync(hash []byte, header *types.Header) (err error) {
cli.BeginBlockAsync(hash, header)
2016-09-08 18:41:35 -07:00
return cli.Error()
2016-05-18 15:30:38 -07:00
}
2016-12-26 22:12:32 -08:00
func (cli *grpcClient) EndBlockSync(height uint64) (resEndBlock types.ResponseEndBlock, err error) {
2016-05-18 15:30:38 -07:00
reqres := cli.EndBlockAsync(height)
2016-09-08 18:41:35 -07:00
if err := cli.Error(); err != nil {
2016-12-26 22:12:32 -08:00
return resEndBlock, err
}
2017-01-17 00:26:32 -08:00
if blk := reqres.Response.GetEndBlock(); blk != nil {
return *blk, nil
2016-05-18 15:30:38 -07:00
}
2017-01-17 00:26:32 -08:00
return resEndBlock, nil
2016-05-18 15:30:38 -07:00
}