feat: Modify grpc gateway to be concurrent (#11234)

Current grpc happens to be concurrent, while the grpc gateway itself is not, since it always uses abci query. Therefore, as the current queries are not concurrent, throughput has the room for improvement. This PR changes the grpc gateway so that when server is ran by a node daemon, it directly calls grpc to make queries concurrent. Any services that uses grpc gateway could improve throughput by fundamental amount, which has been tested and ensured in the process of running an Osmosis node using the current chagnes.

The code base has the following changes:
- GRPCClient field has been added to Client Context.
- The `Invoke` method in Client Context would use ABCI query when GRPCClient field is set to nil, otherwise use the GRPC Client to return results that have used grpc.
- If GRPC is set to enable in `startInProcess`, it sets the GRPC Client field in Client Context.
This commit is contained in:
JungHwan Tony Yun 2022-03-09 22:09:36 +09:00 committed by GitHub
parent dc0927071f
commit 5356a86825
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 1 deletions

View File

@ -70,6 +70,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [\#11240](https://github.com/cosmos/cosmos-sdk/pull/11240) Replace various modules `ModuleCdc` with the global `legacy.Cdc`
* [#11179](https://github.com/cosmos/cosmos-sdk/pull/11179) Add state rollback command.
* [\#10794](https://github.com/cosmos/cosmos-sdk/pull/10794) ADR-040: Add State Sync to V2 Store
* [\#11234](https://github.com/cosmos/cosmos-sdk/pull/11234) Add `GRPCClient` field to Client Context. If `GRPCClient` field is set to nil, the `Invoke` method would use ABCI query, otherwise use gprc.
### API Breaking Changes

View File

@ -9,6 +9,8 @@ import (
"sigs.k8s.io/yaml"
"google.golang.org/grpc"
"github.com/gogo/protobuf/proto"
rpcclient "github.com/tendermint/tendermint/rpc/client"
@ -23,6 +25,7 @@ import (
type Context struct {
FromAddress sdk.AccAddress
Client rpcclient.Client
GRPCClient *grpc.ClientConn
ChainID string
Codec codec.Codec
InterfaceRegistry codectypes.InterfaceRegistry
@ -128,6 +131,13 @@ func (ctx Context) WithClient(client rpcclient.Client) Context {
return ctx
}
// WithGRPCClient returns a copy of the context with an updated GRPC client
// instance.
func (ctx Context) WithGRPCClient(grpcClient *grpc.ClientConn) Context {
ctx.GRPCClient = grpcClient
return ctx
}
// WithUseLedger returns a copy of the context with an updated UseLedger flag.
func (ctx Context) WithUseLedger(useLedger bool) Context {
ctx.UseLedger = useLedger

View File

@ -32,7 +32,8 @@ var fallBackCodec = codec.NewProtoCodec(failingInterfaceRegistry{})
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.
// 2-1. or we are querying for state, in which case we call grpc if grpc client set.
// 2-2. or we are querying for state, in which case we call ABCI's Query if grpc client not set.
// In both cases, we don't allow empty request args (it will panic unexpectedly).
if reflect.ValueOf(req).IsNil() {
@ -55,6 +56,12 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
return err
}
if ctx.GRPCClient != nil {
// Case 2-1. Invoke grpc.
return ctx.GRPCClient.Invoke(grpcCtx, method, req, reply, opts...)
}
// Case 2-2. Querying state via abci query.
reqBz, err := ctx.gRPCCodec().Marshal(req)
if err != nil {
return err

View File

@ -4,6 +4,7 @@ package server
import (
"fmt"
"net"
"net/http"
"os"
"runtime/pprof"
@ -18,6 +19,7 @@ import (
"github.com/tendermint/tendermint/rpc/client/local"
tmtypes "github.com/tendermint/tendermint/types"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
@ -296,6 +298,25 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
WithHomeDir(home).
WithChainID(genDoc.ChainID)
if config.GRPC.Enable {
_, port, err := net.SplitHostPort(config.GRPC.Address)
if err != nil {
return err
}
grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)
// If grpc is enabled, configure grpc client for grpc gateway.
grpcClient, err := grpc.Dial(
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec())),
)
if err != nil {
return err
}
clientCtx = clientCtx.WithGRPCClient(grpcClient)
ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress)
}
apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server"))
app.RegisterAPIRoutes(apiSrv, config.API)
errCh := make(chan error)