Make gRPC requests go through tendermint Query (#8549)
* Make gRPC requests go through tendermint Query * Remove commented code * Dry run in InitChain? * Save type of first run * Add metadata in repsonse * Factorize some code * Fix lint * Update comments * Fix md test * Fix test expected * Don't put RunGRPCQuery as clientCtx method * Update baseapp/grpcserver.go Co-authored-by: Robert Zaremba <robert@zaremba.ch> * Address review comments Co-authored-by: Robert Zaremba <robert@zaremba.ch>
This commit is contained in:
parent
d9d078fbc2
commit
e306e5b6f5
|
@ -2,6 +2,7 @@ package baseapp
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
gogogrpc "github.com/gogo/protobuf/grpc"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
|
@ -12,13 +13,19 @@ import (
|
|||
"github.com/cosmos/cosmos-sdk/client/grpc/reflection"
|
||||
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
|
||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
|
||||
)
|
||||
|
||||
var protoCodec = encoding.GetCodec(proto.Name)
|
||||
|
||||
// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
|
||||
type GRPCQueryRouter struct {
|
||||
routes map[string]GRPCQueryHandler
|
||||
routes map[string]GRPCQueryHandler
|
||||
// returnTypes is a map of FQ method name => its return type. It is used
|
||||
// for cache purposes: the first time a method handler is run, we save its
|
||||
// return type in this map. Then, on subsequent method handler calls, we
|
||||
// decode the ABCI response bytes using the cached return type.
|
||||
returnTypes map[string]reflect.Type
|
||||
interfaceRegistry codectypes.InterfaceRegistry
|
||||
serviceData []serviceData
|
||||
}
|
||||
|
@ -34,7 +41,8 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
|
|||
// NewGRPCQueryRouter creates a new GRPCQueryRouter
|
||||
func NewGRPCQueryRouter() *GRPCQueryRouter {
|
||||
return &GRPCQueryRouter{
|
||||
routes: map[string]GRPCQueryHandler{},
|
||||
returnTypes: map[string]reflect.Type{},
|
||||
routes: map[string]GRPCQueryHandler{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -89,8 +97,17 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
|
|||
if qrt.interfaceRegistry != nil {
|
||||
return codectypes.UnpackInterfaces(i, qrt.interfaceRegistry)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
// If it's the first time we call this handler, then we save
|
||||
// the return type of the handler in the `returnTypes` map.
|
||||
// The return type will be used for decoding subsequent requests.
|
||||
if _, found := qrt.returnTypes[fqName]; !found {
|
||||
qrt.returnTypes[fqName] = reflect.TypeOf(res)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return abci.ResponseQuery{}, err
|
||||
}
|
||||
|
@ -127,3 +144,16 @@ func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.In
|
|||
reflection.NewReflectionServiceServer(interfaceRegistry),
|
||||
)
|
||||
}
|
||||
|
||||
// returnTypeOf returns the return type of a gRPC method handler. With the way the
|
||||
// `returnTypes` cache map is set up, the return type of a method handler is
|
||||
// guaranteed to be found if it's retrieved **after** the method handler ran at
|
||||
// least once. If not, then a logic error is return.
|
||||
func (qrt *GRPCQueryRouter) returnTypeOf(method string) (reflect.Type, error) {
|
||||
returnType, found := qrt.returnTypes[method]
|
||||
if !found {
|
||||
return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot find %s return type", method)
|
||||
}
|
||||
|
||||
return returnType, nil
|
||||
}
|
||||
|
|
|
@ -2,67 +2,78 @@ package baseapp
|
|||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"reflect"
|
||||
|
||||
gogogrpc "github.com/gogo/protobuf/grpc"
|
||||
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||
"github.com/cosmos/cosmos-sdk/client"
|
||||
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
|
||||
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
|
||||
"github.com/cosmos/cosmos-sdk/types/tx"
|
||||
)
|
||||
|
||||
// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
|
||||
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }
|
||||
|
||||
// RegisterGRPCServer registers gRPC services directly with the gRPC server.
|
||||
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
|
||||
// Define an interceptor for all gRPC queries: this interceptor will create
|
||||
// a new sdk.Context, and pass it into the query handler.
|
||||
interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
// If there's some metadata in the context, retrieve it.
|
||||
md, ok := metadata.FromIncomingContext(grpcCtx)
|
||||
if !ok {
|
||||
return nil, status.Error(codes.Internal, "unable to retrieve metadata")
|
||||
func (app *BaseApp) RegisterGRPCServer(clientCtx client.Context, server gogogrpc.Server) {
|
||||
// Define an interceptor for all gRPC queries: this interceptor will route
|
||||
// the query through the `clientCtx`, which itself queries Tendermint.
|
||||
interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (interface{}, error) {
|
||||
// Two things can happen here:
|
||||
// 1. either we're broadcasting a Tx, in which case we call Tendermint's broadcast endpoint directly,
|
||||
// 2. or we are querying for state, in which case we call ABCI's Query.
|
||||
|
||||
// Case 1. Broadcasting a Tx.
|
||||
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
|
||||
if !ok {
|
||||
return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
|
||||
}
|
||||
|
||||
return client.TxServiceBroadcast(grpcCtx, clientCtx, reqProto)
|
||||
}
|
||||
|
||||
// Get height header from the request context, if present.
|
||||
var height int64
|
||||
if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) > 0 {
|
||||
height, err = strconv.ParseInt(heightHeaders[0], 10, 64)
|
||||
if err != nil {
|
||||
return nil, sdkerrors.Wrapf(
|
||||
sdkerrors.ErrInvalidRequest,
|
||||
"Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err)
|
||||
}
|
||||
if err := checkNegativeHeight(height); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Create the sdk.Context. Passing false as 2nd arg, as we can't
|
||||
// actually support proofs with gRPC right now.
|
||||
sdkCtx, err := app.createQueryContext(height, false)
|
||||
// Case 2. Querying state.
|
||||
inMd, _ := metadata.FromIncomingContext(grpcCtx)
|
||||
abciRes, outMd, err := client.RunGRPCQuery(clientCtx, grpcCtx, info.FullMethod, req, inMd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Attach the sdk.Context into the gRPC's context.Context.
|
||||
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)
|
||||
|
||||
// Add relevant gRPC headers
|
||||
if height == 0 {
|
||||
height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest
|
||||
// We need to know the return type of the grpc method for
|
||||
// unmarshalling abciRes.Value.
|
||||
//
|
||||
// When we call each method handler for the first time, we save its
|
||||
// return type in the `returnTypes` map (see the method handler in
|
||||
// `grpcrouter.go`). By this time, the method handler has already run
|
||||
// at least once (in the RunGRPCQuery call), so we're sure the
|
||||
// returnType maps is populated for this method. We're retrieving it
|
||||
// for decoding.
|
||||
returnType, err := app.GRPCQueryRouter().returnTypeOf(info.FullMethod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10))
|
||||
grpc.SetHeader(grpcCtx, md)
|
||||
|
||||
return handler(grpcCtx, req)
|
||||
// returnType is a pointer to a struct. Here, we're creating res which
|
||||
// is a new pointer to the underlying struct.
|
||||
res := reflect.New(returnType.Elem()).Interface()
|
||||
|
||||
err = protoCodec.Unmarshal(abciRes.Value, res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Send the metadata header back. The metadata currently includes:
|
||||
// - block height.
|
||||
err = grpc.SendHeader(grpcCtx, outMd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// Loop through all services and methods, add the interceptor, and register
|
||||
|
|
|
@ -24,86 +24,54 @@ var _ gogogrpc.ClientConn = Context{}
|
|||
var protoCodec = encoding.GetCodec(proto.Name)
|
||||
|
||||
// Invoke implements the grpc ClientConn.Invoke method
|
||||
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) {
|
||||
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) {
|
||||
// Two things can happen here:
|
||||
// 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly,
|
||||
// 2. or we are querying for state, in which case we call ABCI's Query.
|
||||
|
||||
// In both cases, we don't allow empty request args (it will panic unexpectedly).
|
||||
if reflect.ValueOf(args).IsNil() {
|
||||
// In both cases, we don't allow empty request req (it will panic unexpectedly).
|
||||
if reflect.ValueOf(req).IsNil() {
|
||||
return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil")
|
||||
}
|
||||
|
||||
// Case 1. Broadcasting a Tx.
|
||||
if isBroadcast(method) {
|
||||
req, ok := args.(*tx.BroadcastTxRequest)
|
||||
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
|
||||
if !ok {
|
||||
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args)
|
||||
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
|
||||
}
|
||||
res, ok := reply.(*tx.BroadcastTxResponse)
|
||||
resProto, ok := reply.(*tx.BroadcastTxResponse)
|
||||
if !ok {
|
||||
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args)
|
||||
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), req)
|
||||
}
|
||||
|
||||
broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req)
|
||||
broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, reqProto)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*res = *broadcastRes
|
||||
*resProto = *broadcastRes
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Case 2. Querying state.
|
||||
reqBz, err := protoCodec.Marshal(args)
|
||||
inMd, _ := metadata.FromOutgoingContext(grpcCtx)
|
||||
abciRes, outMd, err := RunGRPCQuery(ctx, grpcCtx, method, req, inMd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// parse height header
|
||||
md, _ := metadata.FromOutgoingContext(grpcCtx)
|
||||
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
|
||||
height, err := strconv.ParseInt(heights[0], 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if height < 0 {
|
||||
return sdkerrors.Wrapf(
|
||||
sdkerrors.ErrInvalidRequest,
|
||||
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
|
||||
}
|
||||
|
||||
ctx = ctx.WithHeight(height)
|
||||
}
|
||||
|
||||
req := abci.RequestQuery{
|
||||
Path: method,
|
||||
Data: reqBz,
|
||||
}
|
||||
|
||||
res, err := ctx.QueryABCI(req)
|
||||
err = protoCodec.Unmarshal(abciRes.Value, reply)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = protoCodec.Unmarshal(res.Value, reply)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create header metadata. For now the headers contain:
|
||||
// - block height
|
||||
// We then parse all the call options, if the call option is a
|
||||
// HeaderCallOption, then we manually set the value of that header to the
|
||||
// metadata.
|
||||
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
|
||||
for _, callOpt := range opts {
|
||||
header, ok := callOpt.(grpc.HeaderCallOption)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
*header.HeaderAddr = md
|
||||
*header.HeaderAddr = outMd
|
||||
}
|
||||
|
||||
if ctx.InterfaceRegistry != nil {
|
||||
|
@ -118,6 +86,47 @@ func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.Ca
|
|||
return nil, fmt.Errorf("streaming rpc not supported")
|
||||
}
|
||||
|
||||
func isBroadcast(method string) bool {
|
||||
return method == "/cosmos.tx.v1beta1.Service/BroadcastTx"
|
||||
// RunGRPCQuery runs a gRPC query from the clientCtx, given all necessary
|
||||
// arguments for the gRPC method, and returns the ABCI response. It is used
|
||||
// to factorize code between client (Invoke) and server (RegisterGRPCServer)
|
||||
// gRPC handlers.
|
||||
func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req interface{}, md metadata.MD) (abci.ResponseQuery, metadata.MD, error) {
|
||||
reqBz, err := protoCodec.Marshal(req)
|
||||
if err != nil {
|
||||
return abci.ResponseQuery{}, nil, err
|
||||
}
|
||||
|
||||
// parse height header
|
||||
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
|
||||
height, err := strconv.ParseInt(heights[0], 10, 64)
|
||||
if err != nil {
|
||||
return abci.ResponseQuery{}, nil, err
|
||||
}
|
||||
if height < 0 {
|
||||
return abci.ResponseQuery{}, nil, sdkerrors.Wrapf(
|
||||
sdkerrors.ErrInvalidRequest,
|
||||
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
|
||||
}
|
||||
|
||||
ctx = ctx.WithHeight(height)
|
||||
}
|
||||
|
||||
abciReq := abci.RequestQuery{
|
||||
Path: method,
|
||||
Data: reqBz,
|
||||
}
|
||||
|
||||
abciRes, err := ctx.QueryABCI(abciReq)
|
||||
if err != nil {
|
||||
return abci.ResponseQuery{}, nil, err
|
||||
}
|
||||
|
||||
// Create header metadata. For now the headers contain:
|
||||
// - block height
|
||||
// We then parse all the call options, if the call option is a
|
||||
// HeaderCallOption, then we manually set the value of that header to the
|
||||
// metadata.
|
||||
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(abciRes.Height, 10))
|
||||
|
||||
return abciRes, md, nil
|
||||
}
|
||||
|
|
|
@ -8,13 +8,14 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/reflection"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/client"
|
||||
"github.com/cosmos/cosmos-sdk/server/types"
|
||||
)
|
||||
|
||||
// StartGRPCServer starts a gRPC server on the given address.
|
||||
func StartGRPCServer(app types.Application, address string) (*grpc.Server, error) {
|
||||
func StartGRPCServer(clientCtx client.Context, app types.Application, address string) (*grpc.Server, error) {
|
||||
grpcSrv := grpc.NewServer()
|
||||
app.RegisterGRPCServer(grpcSrv)
|
||||
app.RegisterGRPCServer(clientCtx, grpcSrv)
|
||||
|
||||
// Reflection allows external clients to see what services and methods
|
||||
// the gRPC server exposes.
|
||||
|
|
|
@ -92,7 +92,7 @@ func (s *IntegrationTestSuite) TestGRPCServer_BankBalance() {
|
|||
grpc.Header(&header),
|
||||
)
|
||||
blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader)
|
||||
s.Require().Equal([]string{"1"}, blockHeight)
|
||||
s.Require().NotEmpty(blockHeight[0]) // blockHeight is []string, first element is block height.
|
||||
}
|
||||
|
||||
func (s *IntegrationTestSuite) TestGRPCServer_Reflection() {
|
||||
|
@ -124,8 +124,6 @@ func (s *IntegrationTestSuite) TestGRPCServer_GetTxsEvent() {
|
|||
Events: []string{"message.action=send"},
|
||||
},
|
||||
)
|
||||
// TODO Once https://github.com/cosmos/cosmos-sdk/pull/8029 is merged, this
|
||||
// should not error anymore.
|
||||
s.Require().NoError(err)
|
||||
}
|
||||
|
||||
|
@ -185,9 +183,9 @@ func (s *IntegrationTestSuite) TestGRPCServerInvalidHeaderHeights() {
|
|||
value string
|
||||
wantErr string
|
||||
}{
|
||||
{"-1", "height < 0"},
|
||||
{"-1", "\"x-cosmos-block-height\" must be >= 0"},
|
||||
{"9223372036854775808", "value out of range"}, // > max(int64) by 1
|
||||
{"-10", "height < 0"},
|
||||
{"-10", "\"x-cosmos-block-height\" must be >= 0"},
|
||||
{"18446744073709551615", "value out of range"}, // max uint64, which is > max(int64)
|
||||
{"-9223372036854775809", "value out of range"}, // Out of the range of for negative int64
|
||||
}
|
||||
|
|
|
@ -318,7 +318,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
|
|||
grpcWebSrv *http.Server
|
||||
)
|
||||
if config.GRPC.Enable {
|
||||
grpcSrv, err = servergrpc.StartGRPCServer(app, config.GRPC.Address)
|
||||
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ type (
|
|||
|
||||
// RegisterGRPCServer registers gRPC services directly with the gRPC
|
||||
// server.
|
||||
RegisterGRPCServer(grpc.Server)
|
||||
RegisterGRPCServer(client.Context, grpc.Server)
|
||||
|
||||
// RegisterTxService registers the gRPC Query service for tx (such as tx
|
||||
// simulation, fetching txs by hash...).
|
||||
|
|
|
@ -93,7 +93,7 @@ func startInProcess(cfg Config, val *Validator) error {
|
|||
}
|
||||
|
||||
if val.AppConfig.GRPC.Enable {
|
||||
grpcSrv, err := servergrpc.StartGRPCServer(app, val.AppConfig.GRPC.Address)
|
||||
grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue