permission: [WIP] start permission service

This commit is contained in:
Trung Nguyen 2019-08-20 09:33:54 -04:00
parent 973caccef5
commit 8a088def7a
No known key found for this signature in database
GPG Key ID: 4636434ED9505EB7
5 changed files with 95 additions and 37 deletions

View File

@ -347,8 +347,12 @@ func startNode(ctx *cli.Context, stack *node.Node) {
//
// checking if permissions is enabled and staring the permissions service
if stack.IsPermissionEnabled() {
if err := permission.StartPermissionService(stack); err != nil {
utils.Fatalf("Unable to start Smart Contract based Permission Service due to %s", err)
var permissionService *permission.PermissionCtrl
if err := stack.Service(&permissionService); err != nil {
utils.Fatalf("Permission service not runnning: %v", err)
}
if err := permissionService.AfterStart(); err != nil {
utils.Fatalf("Permission service post construct failure: %v", err)
}
}

View File

@ -1399,12 +1399,12 @@ func RegisterPermissionService(ctx *cli.Context, stack *node.Node) {
if err := stack.Register(func(sctx *node.ServiceContext) (node.Service, error) {
permissionConfig, err := permission.ParsePermissionConfig(stack.DataDir())
if err != nil {
Fatalf("loading of %s failed due to %v", params.PERMISSION_MODEL_CONFIG, err)
return nil, fmt.Errorf("loading of %s failed due to %v", params.PERMISSION_MODEL_CONFIG, err)
}
// start the permissions management service
pc, err := permission.NewQuorumPermissionCtrl(stack, &permissionConfig)
if err != nil {
Fatalf("Failed to load the permission contracts as given in %s due to %v", params.PERMISSION_MODEL_CONFIG, err)
return nil, fmt.Errorf("failed to load the permission contracts as given in %s due to %v", params.PERMISSION_MODEL_CONFIG, err)
}
return pc, nil
}); err != nil {

View File

@ -293,7 +293,7 @@ func (n *Node) startInProc(apis []rpc.API) error {
n.log.Debug("InProc registered", "service", api.Service, "namespace", api.Namespace)
}
n.inprocHandler = handler
return nil
return n.eventmux.Post(rpc.InProcServerReadyEvent{})
}
// stopInProc terminates the in-process RPC endpoint.

View File

@ -12,6 +12,8 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/raft"
"github.com/ethereum/go-ethereum/rpc"
@ -67,11 +69,14 @@ type PermissionCtrl struct {
permRole *pbind.RoleManager
permOrg *pbind.OrgManager
permConfig *types.PermissionConfig
orgChan chan struct{}
nodeChan chan struct{}
roleChan chan struct{}
acctChan chan struct{}
mux sync.Mutex
stopFeed event.Feed
mux sync.Mutex
}
// to signal all watches when service is stopped
type stopEvent struct {
}
// converts local permissions data to global permissions config
@ -164,20 +169,30 @@ func StartPermissionService(stack *node.Node) error {
// Creates the controls structure for permissions
func NewQuorumPermissionCtrl(stack *node.Node, pconfig *types.PermissionConfig) (*PermissionCtrl, error) {
// Create a new ethclient to for interfacing with the contract
return &PermissionCtrl{stack, nil, nil, stack.GetNodeKey(), stack.DataDir(), nil, nil, nil, nil, nil, nil, pconfig, make(chan struct{}), make(chan struct{}), make(chan struct{}), make(chan struct{}), sync.Mutex{}}, nil
return &PermissionCtrl{
node: stack,
key: stack.GetNodeKey(),
dataDir: stack.DataDir(),
permConfig: pconfig,
}, nil
}
// subscribe to ChainHeadEvent in order to capture contract references
//
// this is for a scenario where node is sync-ing blocks containing the permission contracts
func (p *PermissionCtrl) BeforeStart() error {
return nil
}
// setup contract references
func (p *PermissionCtrl) AfterStart() error {
p.eth.APIBackend.Get
}
func (p *PermissionCtrl) InitializeService() error {
clnt, eth, err := CreateEthClient(p.node)
if err != nil {
log.Error("creating eth client failed")
return err
}
waitForSync(eth)
if err != nil {
log.Error("Unable to create ethereum client for permissions check", "err", err)
return err
}
// synchronization might be happening, we need to wait until
// all contracts are available in the local blockchain
if p.permConfig.IsEmpty() {
log.Error("permission-config.json is missing contract address")
@ -230,9 +245,36 @@ func (p *PermissionCtrl) InitializeService() error {
return nil
}
// Starts the node permissioning and event monitoring for permissions
// smart contracts
// Permissioning depends on few things before it can fully start:
// 1. EthService
// 2. Downloader (to sync blocks which contains the transactions containing smart contracts)
// 3. InProc RPC Server
func (p *PermissionCtrl) start(ethereum *eth.Ethereum, srvr *p2p.Server) {
stopChan, stopSubscription := p.subscribeStopEvent()
defer stopSubscription.Unsubscribe()
inProcRPCServerSub := p.node.EventMux().Subscribe(rpc.InProcServerReadyEvent{})
defer inProcRPCServerSub.Unsubscribe()
// wait for dependencies
for {
select {
case <-inProcRPCServerSub.Chan():
break
case <-stopChan:
return
}
}
p.eth = ethereum
client, e := p.node.Attach()
p.ethClnt = ethclient.NewClient(e)
}
func (p *PermissionCtrl) Start(srvr *p2p.Server) error {
var ethereum *eth.Ethereum
if err := p.node.Service(&ethereum); err != nil {
return fmt.Errorf("dependent ethereum service not started")
}
go p.start(ethereum, srvr)
return nil
if p.ethClnt == nil || p.eth == nil {
log.Info("permission service not initialized")
@ -260,31 +302,25 @@ func (p *PermissionCtrl) Start(srvr *p2p.Server) error {
return nil
}
func (s *PermissionCtrl) APIs() []rpc.API {
func (p *PermissionCtrl) APIs() []rpc.API {
log.Info("permission rpc API called")
return []rpc.API{
{
Namespace: "quorumPermission",
Version: "1.0",
Service: NewQuorumControlsAPI(s),
Service: NewQuorumControlsAPI(p),
Public: true,
},
}
}
func (s *PermissionCtrl) Protocols() []p2p.Protocol {
func (p *PermissionCtrl) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
}
func (p *PermissionCtrl) Stop() error {
if p.eth == nil || p.ethClnt == nil {
return nil
}
log.Info("stopping permission service...")
p.roleChan <- struct{}{}
p.orgChan <- struct{}{}
p.acctChan <- struct{}{}
p.nodeChan <- struct{}{}
p.stopFeed.Send(stopEvent{})
log.Info("stopped permission service")
return nil
}
@ -336,6 +372,8 @@ func (p *PermissionCtrl) manageOrgPermissions() {
log.Info("Failed WatchNodePendingApproval: %v", err)
}
stopChan, stopSubscription := p.subscribeStopEvent()
defer stopSubscription.Unsubscribe()
for {
select {
case evtPendingApproval = <-chPendingApproval:
@ -349,13 +387,19 @@ func (p *PermissionCtrl) manageOrgPermissions() {
case evtOrgReactivated = <-chOrgReactivated:
types.OrgInfoMap.UpsertOrg(evtOrgReactivated.OrgId, evtOrgReactivated.PorgId, evtOrgReactivated.UltParent, evtOrgReactivated.Level, types.OrgApproved)
case <-p.orgChan:
case <-stopChan:
log.Info("quit org contract watch")
return
}
}
}
func (p *PermissionCtrl) subscribeStopEvent() (chan stopEvent, event.Subscription) {
c := make(chan stopEvent)
s := p.stopFeed.Subscribe(c)
return c, s
}
// Monitors node management events and updates cache accordingly
func (p *PermissionCtrl) manageNodePermissions() {
chNodeApproved := make(chan *pbind.NodeManagerNodeApproved, 1)
@ -405,6 +449,8 @@ func (p *PermissionCtrl) manageNodePermissions() {
log.Info("Failed NodeRecoveryCompleted", "error", err)
}
stopChan, stopSubscription := p.subscribeStopEvent()
defer stopSubscription.Unsubscribe()
for {
select {
case evtNodeApproved = <-chNodeApproved:
@ -435,7 +481,7 @@ func (p *PermissionCtrl) manageNodePermissions() {
p.updateDisallowedNodes(evtNodeRecoveryDone.EnodeId, NodeDelete)
p.updatePermissionedNodes(evtNodeRecoveryDone.EnodeId, NodeAdd)
case <-p.nodeChan:
case <-stopChan:
log.Info("quit node contract watch")
return
}
@ -555,6 +601,8 @@ func (p *PermissionCtrl) manageAccountPermissions() {
log.Info("Failed NewNodeProposed", "error", err)
}
stopChan, stopSubscription := p.subscribeStopEvent()
defer stopSubscription.Unsubscribe()
for {
select {
case evtAccessModified = <-chAccessModified:
@ -566,7 +614,7 @@ func (p *PermissionCtrl) manageAccountPermissions() {
case evtStatusChanged = <-chStatusChanged:
ac := types.AcctInfoMap.GetAccount(evtStatusChanged.Account)
types.AcctInfoMap.UpsertAccount(evtStatusChanged.OrgId, ac.RoleId, evtStatusChanged.Account, ac.IsOrgAdmin, types.AcctStatus(int(evtStatusChanged.Status.Uint64())))
case <-p.acctChan:
case <-stopChan:
log.Info("quit account contract watch")
return
}
@ -840,6 +888,8 @@ func (p *PermissionCtrl) manageRolePermissions() {
log.Info("Failed WatchRoleRemoved: %v", err)
}
stopChan, stopSubscription := p.subscribeStopEvent()
defer stopSubscription.Unsubscribe()
for {
select {
case evtRoleCreated = <-chRoleCreated:
@ -851,7 +901,7 @@ func (p *PermissionCtrl) manageRolePermissions() {
} else {
log.Error("Revoke role - cache is missing role", "org", evtRoleRevoked.OrgId, "role", evtRoleRevoked.RoleId)
}
case <-p.roleChan:
case <-stopChan:
log.Info("quit role contract watch")
return
}

View File

@ -21,6 +21,10 @@ import (
"net"
)
type InProcServerReadyEvent struct {
}
// DialInProc attaches an in-process connection to the given RPC server.
func DialInProc(handler *Server) *Client {
initctx := context.Background()