Integrate gRPC queries into BaseApp (#6335)

* Rename GRPCRouter to GRPCQueryRouter, add to BaseApp

* Rename field

* add comment

* Update baseapp/abci.go

* Update baseapp/abci.go

Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com>

* Address review feedback

* Improve code reuse

* Fix errors

Co-authored-by: Federico Kunze <31522760+fedekunze@users.noreply.github.com>
Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com>
This commit is contained in:
Aaron Craelius 2020-06-04 14:48:06 -04:00 committed by GitHub
parent b1f483fe24
commit c1355d0b45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 130 additions and 62 deletions

View File

@ -294,6 +294,12 @@ func (app *BaseApp) halt() {
// Query implements the ABCI interface. It delegates to CommitMultiStore if it // Query implements the ABCI interface. It delegates to CommitMultiStore if it
// implements Queryable. // implements Queryable.
func (app *BaseApp) Query(req abci.RequestQuery) abci.ResponseQuery { func (app *BaseApp) Query(req abci.RequestQuery) abci.ResponseQuery {
// handle gRPC routes first rather than calling splitPath because '/' characters
// are used as part of gRPC paths
if grpcHandler := app.grpcQueryRouter.Route(req.Path); grpcHandler != nil {
return app.handleQueryGRPC(grpcHandler, req)
}
path := splitPath(req.Path) path := splitPath(req.Path)
if len(path) == 0 { if len(path) == 0 {
sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, "no query path provided")) sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, "no query path provided"))
@ -317,6 +323,53 @@ func (app *BaseApp) Query(req abci.RequestQuery) abci.ResponseQuery {
return sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, "unknown query path")) return sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, "unknown query path"))
} }
func (app *BaseApp) handleQueryGRPC(handler GRPCQueryHandler, req abci.RequestQuery) abci.ResponseQuery {
ctx, err := app.createQueryContext(req)
if err != nil {
return sdkerrors.QueryResult(err)
}
res, err := handler(ctx, req)
if err != nil {
res = sdkerrors.QueryResult(err)
res.Height = req.Height
return res
}
return res
}
func (app *BaseApp) createQueryContext(req abci.RequestQuery) (sdk.Context, error) {
// when a client did not provide a query height, manually inject the latest
if req.Height == 0 {
req.Height = app.LastBlockHeight()
}
if req.Height <= 1 && req.Prove {
return sdk.Context{},
sdkerrors.Wrap(
sdkerrors.ErrInvalidRequest,
"cannot query with proof when height <= 1; please provide a valid height",
)
}
cacheMS, err := app.cms.CacheMultiStoreWithVersion(req.Height)
if err != nil {
return sdk.Context{},
sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"failed to load state at height %d; %s (latest height: %d)", req.Height, err, app.LastBlockHeight(),
)
}
// cache wrap the commit-multistore for safety
ctx := sdk.NewContext(
cacheMS, app.checkState.ctx.BlockHeader(), true, app.logger,
).WithMinGasPrices(app.minGasPrices)
return ctx, nil
}
func handleQueryApp(app *BaseApp, path []string, req abci.RequestQuery) abci.ResponseQuery { func handleQueryApp(app *BaseApp, path []string, req abci.RequestQuery) abci.ResponseQuery {
if len(path) >= 2 { if len(path) >= 2 {
switch path[1] { switch path[1] {
@ -439,48 +492,20 @@ func handleQueryCustom(app *BaseApp, path []string, req abci.RequestQuery) abci.
return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "no custom querier found for route %s", path[1])) return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "no custom querier found for route %s", path[1]))
} }
// when a client did not provide a query height, manually inject the latest ctx, err := app.createQueryContext(req)
if req.Height == 0 {
req.Height = app.LastBlockHeight()
}
if req.Height <= 1 && req.Prove {
return sdkerrors.QueryResult(
sdkerrors.Wrap(
sdkerrors.ErrInvalidRequest,
"cannot query with proof when height <= 1; please provide a valid height",
),
)
}
cacheMS, err := app.cms.CacheMultiStoreWithVersion(req.Height)
if err != nil { if err != nil {
return sdkerrors.QueryResult( return sdkerrors.QueryResult(err)
sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"failed to load state at height %d; %s (latest height: %d)", req.Height, err, app.LastBlockHeight(),
),
)
} }
// cache wrap the commit-multistore for safety
ctx := sdk.NewContext(
cacheMS, app.checkState.ctx.BlockHeader(), true, app.logger,
).WithMinGasPrices(app.minGasPrices)
// Passes the rest of the path as an argument to the querier. // Passes the rest of the path as an argument to the querier.
// //
// For example, in the path "custom/gov/proposal/test", the gov querier gets // For example, in the path "custom/gov/proposal/test", the gov querier gets
// []string{"proposal", "test"} as the path. // []string{"proposal", "test"} as the path.
resBytes, err := querier(ctx, path[2:], req) resBytes, err := querier(ctx, path[2:], req)
if err != nil { if err != nil {
space, code, log := sdkerrors.ABCIInfo(err, false) res := sdkerrors.QueryResult(err)
return abci.ResponseQuery{ res.Height = req.Height
Code: code, return res
Codespace: space,
Log: log,
Height: req.Height,
}
} }
return abci.ResponseQuery{ return abci.ResponseQuery{

View File

@ -6,6 +6,8 @@ import (
"runtime/debug" "runtime/debug"
"strings" "strings"
"github.com/gogo/protobuf/grpc"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -41,14 +43,15 @@ type (
// BaseApp reflects the ABCI application implementation. // BaseApp reflects the ABCI application implementation.
type BaseApp struct { // nolint: maligned type BaseApp struct { // nolint: maligned
// initialized on creation // initialized on creation
logger log.Logger logger log.Logger
name string // application name from abci.Info name string // application name from abci.Info
db dbm.DB // common DB backend db dbm.DB // common DB backend
cms sdk.CommitMultiStore // Main (uncached) state cms sdk.CommitMultiStore // Main (uncached) state
storeLoader StoreLoader // function to handle store loading, may be overridden with SetStoreLoader() storeLoader StoreLoader // function to handle store loading, may be overridden with SetStoreLoader()
router sdk.Router // handle any kind of message router sdk.Router // handle any kind of message
queryRouter sdk.QueryRouter // router for redirecting query calls queryRouter sdk.QueryRouter // router for redirecting query calls
txDecoder sdk.TxDecoder // unmarshal []byte into sdk.Tx grpcQueryRouter *GRPCQueryRouter // router for redirecting gRPC query calls
txDecoder sdk.TxDecoder // unmarshal []byte into sdk.Tx
anteHandler sdk.AnteHandler // ante handler for fee and auth anteHandler sdk.AnteHandler // ante handler for fee and auth
initChainer sdk.InitChainer // initialize state with validators and state blob initChainer sdk.InitChainer // initialize state with validators and state blob
@ -101,15 +104,16 @@ func NewBaseApp(
name string, logger log.Logger, db dbm.DB, txDecoder sdk.TxDecoder, options ...func(*BaseApp), name string, logger log.Logger, db dbm.DB, txDecoder sdk.TxDecoder, options ...func(*BaseApp),
) *BaseApp { ) *BaseApp {
app := &BaseApp{ app := &BaseApp{
logger: logger, logger: logger,
name: name, name: name,
db: db, db: db,
cms: store.NewCommitMultiStore(db), cms: store.NewCommitMultiStore(db),
storeLoader: DefaultStoreLoader, storeLoader: DefaultStoreLoader,
router: NewRouter(), router: NewRouter(),
queryRouter: NewQueryRouter(), queryRouter: NewQueryRouter(),
txDecoder: txDecoder, grpcQueryRouter: NewGRPCQueryRouter(),
fauxMerkleMode: false, txDecoder: txDecoder,
fauxMerkleMode: false,
} }
for _, option := range options { for _, option := range options {
@ -282,6 +286,9 @@ func (app *BaseApp) Router() sdk.Router {
// QueryRouter returns the QueryRouter of a BaseApp. // QueryRouter returns the QueryRouter of a BaseApp.
func (app *BaseApp) QueryRouter() sdk.QueryRouter { return app.queryRouter } func (app *BaseApp) QueryRouter() sdk.QueryRouter { return app.queryRouter }
// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
func (app *BaseApp) GRPCQueryRouter() grpc.Server { return app.grpcQueryRouter }
// Seal seals a BaseApp. It prohibits any further modifications to a BaseApp. // Seal seals a BaseApp. It prohibits any further modifications to a BaseApp.
func (app *BaseApp) Seal() { app.sealed = true } func (app *BaseApp) Seal() { app.sealed = true }

View File

@ -10,6 +10,8 @@ import (
"sync" "sync"
"testing" "testing"
"github.com/cosmos/cosmos-sdk/codec/testdata"
"github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -1506,6 +1508,40 @@ func TestQuery(t *testing.T) {
require.Equal(t, value, res.Value) require.Equal(t, value, res.Value)
} }
func TestGRPCQuery(t *testing.T) {
grpcQueryOpt := func(bapp *BaseApp) {
testdata.RegisterTestServiceServer(
bapp.GRPCQueryRouter(),
testServer{},
)
}
app := setupBaseApp(t, grpcQueryOpt)
app.InitChain(abci.RequestInitChain{})
header := abci.Header{Height: app.LastBlockHeight() + 1}
app.BeginBlock(abci.RequestBeginBlock{Header: header})
app.Commit()
req := testdata.SayHelloRequest{Name: "foo"}
reqBz, err := req.Marshal()
require.NoError(t, err)
reqQuery := abci.RequestQuery{
Data: reqBz,
Path: "/cosmos_sdk.codec.v1.TestService/SayHello",
}
resQuery := app.Query(reqQuery)
require.Equal(t, abci.CodeTypeOK, resQuery.Code, resQuery)
var res testdata.SayHelloResponse
err = res.Unmarshal(resQuery.Value)
require.NoError(t, err)
require.Equal(t, "Hello foo!", res.Greeting)
}
// Test p2p filter queries // Test p2p filter queries
func TestP2PQuery(t *testing.T) { func TestP2PQuery(t *testing.T) {
addrPeerFilterOpt := func(bapp *BaseApp) { addrPeerFilterOpt := func(bapp *BaseApp) {

View File

@ -14,16 +14,16 @@ import (
var protoCodec = encoding.GetCodec(proto.Name) var protoCodec = encoding.GetCodec(proto.Name)
// GRPCRouter routes ABCI Query requests to GRPC handlers // GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCRouter struct { type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler routes map[string]GRPCQueryHandler
} }
var _ gogogrpc.Server var _ gogogrpc.Server
// NewGRPCRouter creates a new GRPCRouter // NewGRPCQueryRouter creates a new GRPCQueryRouter
func NewGRPCRouter() *GRPCRouter { func NewGRPCQueryRouter() *GRPCQueryRouter {
return &GRPCRouter{ return &GRPCQueryRouter{
routes: map[string]GRPCQueryHandler{}, routes: map[string]GRPCQueryHandler{},
} }
} }
@ -34,7 +34,7 @@ type GRPCQueryHandler = func(ctx sdk.Context, req abci.RequestQuery) (abci.Respo
// Route returns the GRPCQueryHandler for a given query route path or nil // Route returns the GRPCQueryHandler for a given query route path or nil
// if not found // if not found
func (qrt *GRPCRouter) Route(path string) GRPCQueryHandler { func (qrt *GRPCQueryRouter) Route(path string) GRPCQueryHandler {
handler, found := qrt.routes[path] handler, found := qrt.routes[path]
if !found { if !found {
return nil return nil
@ -44,7 +44,7 @@ func (qrt *GRPCRouter) Route(path string) GRPCQueryHandler {
// RegisterService implements the gRPC Server.RegisterService method. sd is a gRPC // RegisterService implements the gRPC Server.RegisterService method. sd is a gRPC
// service description, handler is an object which implements that gRPC service // service description, handler is an object which implements that gRPC service
func (qrt *GRPCRouter) RegisterService(sd *grpc.ServiceDesc, handler interface{}) { func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interface{}) {
// adds a top-level query handler based on the gRPC service name // adds a top-level query handler based on the gRPC service name
for _, method := range sd.Methods { for _, method := range sd.Methods {
fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName) fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName)

View File

@ -16,14 +16,14 @@ import (
// interfaces needed to register a query service server and create a query // interfaces needed to register a query service server and create a query
// service client. // service client.
type QueryServiceTestHelper struct { type QueryServiceTestHelper struct {
*GRPCRouter *GRPCQueryRouter
ctx sdk.Context ctx sdk.Context
} }
// NewQueryServerTestHelper creates a new QueryServiceTestHelper that wraps // NewQueryServerTestHelper creates a new QueryServiceTestHelper that wraps
// the provided sdk.Context // the provided sdk.Context
func NewQueryServerTestHelper(ctx sdk.Context) *QueryServiceTestHelper { func NewQueryServerTestHelper(ctx sdk.Context) *QueryServiceTestHelper {
return &QueryServiceTestHelper{GRPCRouter: NewGRPCRouter(), ctx: ctx} return &QueryServiceTestHelper{GRPCQueryRouter: NewGRPCQueryRouter(), ctx: ctx}
} }
// Invoke implements the grpc ClientConn.Invoke method // Invoke implements the grpc ClientConn.Invoke method

View File

@ -25,11 +25,11 @@ func (e testServer) SayHello(_ context.Context, request *testdata.SayHelloReques
var _ testdata.TestServiceServer = testServer{} var _ testdata.TestServiceServer = testServer{}
func TestGRPCRouter(t *testing.T) { func TestGRPCRouter(t *testing.T) {
qr := NewGRPCRouter() qr := NewGRPCQueryRouter()
testdata.RegisterTestServiceServer(qr, testServer{}) testdata.RegisterTestServiceServer(qr, testServer{})
helper := &QueryServiceTestHelper{ helper := &QueryServiceTestHelper{
GRPCRouter: qr, GRPCQueryRouter: qr,
ctx: sdk.Context{}, ctx: sdk.Context{},
} }
client := testdata.NewTestServiceClient(helper) client := testdata.NewTestServiceClient(helper)