permissions: Reduced number of go routines being called for various events. Code clean up.

This commit is contained in:
vsmk98 2019-04-02 14:13:23 +08:00
parent 5a97fc4b99
commit 40fd7001cd
1 changed files with 95 additions and 243 deletions

View File

@ -190,13 +190,13 @@ func (p *PermissionCtrl) Start() error {
go p.manageOrgPermissions()
// monitor org level node management events
p.manageNodePermissions()
go p.manageNodePermissions()
// monitor org level role management events
p.manageRolePermissions()
go p.manageRolePermissions()
// monitor org level account management events
p.manageAccountPermissions()
go p.manageAccountPermissions()
return nil
}
@ -251,165 +251,80 @@ func (p *PermissionCtrl) manageOrgPermissions() {
for {
select {
case evtPendingApproval = <-chPendingApproval:
log.Info("AJ-newOrgPendingApproval", "node", evtPendingApproval.OrgId)
types.OrgInfoMap.UpsertOrg(evtPendingApproval.OrgId, types.OrgStatus(evtPendingApproval.Type.Uint64()))
log.Info("AJ-newOrgPendingApproval cached updated for ", "orgid", evtPendingApproval.OrgId)
case evtOrgApproved = <-chOrgApproved:
log.Info("AJ-newOrgPendingApproval", "node", evtOrgApproved.OrgId)
types.OrgInfoMap.UpsertOrg(evtOrgApproved.OrgId, types.OrgApproved)
log.Info("AJ-newOrgPendingApproval cached updated for ", "orgid", evtOrgApproved.OrgId)
case evtOrgSuspended = <-chOrgSuspended:
log.Info("AJ-newOrgPendingApproval", "node", evtOrgSuspended.OrgId)
types.OrgInfoMap.UpsertOrg(evtOrgSuspended.OrgId, types.OrgSuspended)
log.Info("AJ-newOrgPendingApproval cached updated for ", "orgid", evtOrgSuspended.OrgId)
case evtOrgReactivated = <-chOrgReactivated:
log.Info("AJ-newOrgPendingApproval", "node", evtOrgReactivated.OrgId)
types.OrgInfoMap.UpsertOrg(evtOrgReactivated.OrgId, types.OrgApproved)
log.Info("AJ-newOrgPendingApproval cached updated for ", "orgid", evtOrgReactivated.OrgId)
}
}
}
// Manages node addition, decavtivation and activation from network
// Monitors node management events and updates cache accordingly
func (p *PermissionCtrl) manageNodePermissions() {
chNodeApproved := make(chan *pbind.NodeManagerNodeApproved, 1)
chNodeProposed := make(chan *pbind.NodeManagerNodeProposed, 1)
chNodeDeactivated := make(chan *pbind.NodeManagerNodeDeactivated, 1)
chNodeActivated := make(chan *pbind.NodeManagerNodeActivated, 1)
chNodeBlacklisted := make(chan *pbind.NodeManagerNodeBlacklisted)
go p.monitorNewNodeAdd()
go p.monitorNewNodePendingApproval()
//monitor for nodes deletion via smart contract
go p.monitorNodeDeactivation()
//monitor for nodes activation from deactivation status
go p.monitorNodeActivation()
//monitor for nodes blacklisting via smart contract
go p.monitorNodeBlacklisting()
}
// Listens on the channel for new node approval via smart contract and
// adds the same into permissioned-nodes.json
func (p *PermissionCtrl) monitorNewNodeAdd() {
log.Info("AJ-new node approved event monitor started...")
ch := make(chan *pbind.NodeManagerNodeApproved, 1)
var evtNodeApproved *pbind.NodeManagerNodeApproved
var evtNodeProposed *pbind.NodeManagerNodeProposed
var evtNodeDeactivated *pbind.NodeManagerNodeDeactivated
var evtNodeActivated *pbind.NodeManagerNodeActivated
var evtNodeBlacklisted *pbind.NodeManagerNodeBlacklisted
opts := &bind.WatchOpts{}
var blockNumber uint64 = 1
opts.Start = &blockNumber
var evt *pbind.NodeManagerNodeApproved
_, err := p.permNode.NodeManagerFilterer.WatchNodeApproved(opts, ch)
if err != nil {
log.Info("Failed WatchNodeApproved: %v", err)
if _, err := p.permNode.NodeManagerFilterer.WatchNodeApproved(opts, chNodeApproved); err != nil {
log.Info("Failed WatchNodeApproved", "error", err)
}
for {
log.Info("AJ-new node approved waiting for events...")
select {
case evt = <-ch:
log.Info("AJ-newNodeApproved", "node", evt.EnodeId)
p.updatePermissionedNodes(evt.EnodeId, NodeAdd)
types.NodeInfoMap.UpsertNode(evt.OrgId, evt.EnodeId, types.NodeApproved)
log.Info("AJ-newNodeApproved cached updated for ", "enode", evt.EnodeId)
}
if _, err := p.permNode.NodeManagerFilterer.WatchNodeProposed(opts, chNodeProposed); err != nil {
log.Info("Failed WatchNodeProposed", "error", err)
}
}
func (p *PermissionCtrl) monitorNewNodePendingApproval() {
log.Info("AJ-new node proposed event monitor started...")
ch := make(chan *pbind.NodeManagerNodeProposed, 1)
opts := &bind.WatchOpts{}
var blockNumber uint64 = 1
opts.Start = &blockNumber
var evt *pbind.NodeManagerNodeProposed
_, err := p.permNode.NodeManagerFilterer.WatchNodeProposed(opts, ch)
if err != nil {
log.Info("Failed WatchNodeProposed: %v", err)
if _, err := p.permNode.NodeManagerFilterer.WatchNodeDeactivated(opts, chNodeDeactivated); err != nil {
log.Info("Failed NodeDeactivated", "error", err)
}
for {
log.Info("AJ-new node proposed waiting for events...")
select {
case evt = <-ch:
log.Info("AJ-newNodeProposed", "node", evt.EnodeId)
p.updatePermissionedNodes(evt.EnodeId, NodeAdd)
types.NodeInfoMap.UpsertNode(evt.OrgId, evt.EnodeId, types.NodePendingApproval)
log.Info("AJ-newNodeProposed cached updated for ", "enode", evt.EnodeId)
}
if _, err := p.permNode.NodeManagerFilterer.WatchNodeActivated(opts, chNodeActivated); err != nil {
log.Info("Failed WatchNodeActivated", "error", err)
}
}
// Listens on the channel for new node deactivation via smart contract
// and removes the same from permissioned-nodes.json
func (p *PermissionCtrl) monitorNodeDeactivation() {
ch := make(chan *pbind.NodeManagerNodeDeactivated)
opts := &bind.WatchOpts{}
var blockNumber uint64 = 1
opts.Start = &blockNumber
var evt *pbind.NodeManagerNodeDeactivated
_, err := p.permNode.NodeManagerFilterer.WatchNodeDeactivated(opts, ch)
if err != nil {
log.Info("Failed NodeDeactivated: %v", err)
if _, err := p.permNode.NodeManagerFilterer.WatchNodeBlacklisted(opts, chNodeBlacklisted); err != nil {
log.Info("Failed NodeBlacklisting", "error", err)
}
for {
select {
case evt = <-ch:
p.updatePermissionedNodes(evt.EnodeId, NodeDelete)
types.NodeInfoMap.UpsertNode(evt.OrgId, evt.EnodeId, types.NodeDeactivated)
log.Info("AJ-NodeDeactivated cached updated for ", "enode", evt.EnodeId)
case evtNodeApproved = <-chNodeApproved:
p.updatePermissionedNodes(evtNodeApproved.EnodeId, NodeAdd)
types.NodeInfoMap.UpsertNode(evtNodeApproved.OrgId, evtNodeApproved.EnodeId, types.NodeApproved)
case evtNodeProposed = <-chNodeProposed:
p.updatePermissionedNodes(evtNodeProposed.EnodeId, NodeAdd)
types.NodeInfoMap.UpsertNode(evtNodeProposed.OrgId, evtNodeProposed.EnodeId, types.NodePendingApproval)
case evtNodeDeactivated = <-chNodeDeactivated:
p.updatePermissionedNodes(evtNodeDeactivated.EnodeId, NodeDelete)
types.NodeInfoMap.UpsertNode(evtNodeDeactivated.OrgId, evtNodeDeactivated.EnodeId, types.NodeDeactivated)
case evtNodeActivated = <-chNodeActivated:
p.updatePermissionedNodes(evtNodeActivated.EnodeId, NodeAdd)
types.NodeInfoMap.UpsertNode(evtNodeActivated.OrgId, evtNodeActivated.EnodeId, types.NodeActivated)
case evtNodeBlacklisted = <-chNodeBlacklisted:
p.updatePermissionedNodes(evtNodeBlacklisted.EnodeId, NodeDelete)
p.updateDisallowedNodes(evtNodeBlacklisted.EnodeId)
types.NodeInfoMap.UpsertNode(evtNodeBlacklisted.OrgId, evtNodeBlacklisted.EnodeId, types.NodeBlackListed)
}
}
}
// Listnes on the channel for any node activation via smart contract
// and adds the same permissioned-nodes.json
func (p *PermissionCtrl) monitorNodeActivation() {
ch := make(chan *pbind.NodeManagerNodeActivated, 1)
opts := &bind.WatchOpts{}
var blockNumber uint64 = 1
opts.Start = &blockNumber
var evt *pbind.NodeManagerNodeActivated
_, err := p.permNode.NodeManagerFilterer.WatchNodeActivated(opts, ch)
if err != nil {
log.Info("Failed WatchNodeActivated: %v", err)
}
for {
select {
case evt = <-ch:
p.updatePermissionedNodes(evt.EnodeId, NodeAdd)
types.NodeInfoMap.UpsertNode(evt.OrgId, evt.EnodeId, types.NodeActivated)
log.Info("AJ-newNodeActivated cached updated for ", "enode", evt.EnodeId)
}
}
}
// Listens on the channel for node blacklisting via smart contract and
// adds the same into disallowed-nodes.json
func (p *PermissionCtrl) monitorNodeBlacklisting() {
ch := make(chan *pbind.NodeManagerNodeBlacklisted)
opts := &bind.WatchOpts{}
var blockNumber uint64 = 1
opts.Start = &blockNumber
var evt *pbind.NodeManagerNodeBlacklisted
_, err := p.permNode.NodeManagerFilterer.WatchNodeBlacklisted(opts, ch)
if err != nil {
log.Info("Failed NodeBlacklisting: %v", err)
}
for {
select {
case evt = <-ch:
log.Info("AJ-nodeBlackListed", "event", evt)
p.updatePermissionedNodes(evt.EnodeId, NodeDelete)
p.updateDisallowedNodes(evt.EnodeId)
types.NodeInfoMap.UpsertNode(evt.OrgId, evt.EnodeId, types.NodeBlackListed)
log.Info("AJ-newNodeABlacklisted cached updated for ", "enode", evt.EnodeId)
}
}
}
@ -510,14 +425,35 @@ func (p *PermissionCtrl) updateDisallowedNodes(url string) {
p.disconnectNode(url)
}
// Manages account level permissions update
// Monitors account access related events and updates the cache accordingly
func (p *PermissionCtrl) manageAccountPermissions() {
if !p.permissionedMode {
return
chAccessModified := make(chan *pbind.AcctManagerAccountAccessModified)
chAccessRevoked := make(chan *pbind.AcctManagerAccountAccessRevoked)
var evtAccessModified *pbind.AcctManagerAccountAccessModified
var evtAccessRevoked *pbind.AcctManagerAccountAccessRevoked
opts := &bind.WatchOpts{}
var blockNumber uint64 = 1
opts.Start = &blockNumber
if _, err := p.permAcct.AcctManagerFilterer.WatchAccountAccessModified(opts, chAccessModified); err != nil {
log.Info("Failed NewNodeProposed", "error", err)
}
if _, err := p.permAcct.AcctManagerFilterer.WatchAccountAccessRevoked(opts, chAccessRevoked); err != nil {
log.Info("Failed NewNodeProposed", "error", err)
}
for {
select {
case evtAccessModified = <-chAccessModified:
types.AcctInfoMap.UpsertAccount(evtAccessModified.OrgId, evtAccessModified.RoleId, evtAccessModified.Address, evtAccessModified.OrgAdmin, types.AcctStatus(int(evtAccessModified.Status.Uint64())))
case evtAccessRevoked = <-chAccessRevoked:
types.AcctInfoMap.UpsertAccount(evtAccessRevoked.OrgId, evtAccessRevoked.RoleId, evtAccessRevoked.Address, evtAccessRevoked.OrgAdmin, types.AcctActive)
}
}
go p.monitorAccountPermissionsAccessModified()
go p.monitorAccountPermissionsAccessRevoked()
return
}
// populates the nodes list from permissioned-nodes.json into the permissions smart contract
@ -549,55 +485,6 @@ func (p *PermissionCtrl) populatePermissionedNodes() error {
return nil
}
// Monitors permissions changes at acount level and uodate the account permissions cache
func (p *PermissionCtrl) monitorAccountPermissionsAccessModified() {
ch := make(chan *pbind.AcctManagerAccountAccessModified)
opts := &bind.WatchOpts{}
var blockNumber uint64 = 1
opts.Start = &blockNumber
var evt *pbind.AcctManagerAccountAccessModified
_, err := p.permAcct.AcctManagerFilterer.WatchAccountAccessModified(opts, ch)
if err != nil {
log.Info("AJ-Failed NewNodeProposed: %v", err)
}
for {
select {
case evt = <-ch:
log.Info("AJ-AccountAccessModified", "address", evt.Address, "role", evt.RoleId)
types.AcctInfoMap.UpsertAccount(evt.OrgId, evt.RoleId, evt.Address, evt.OrgAdmin, types.AcctStatus(int(evt.Status.Uint64())))
log.Info("AJ-AccountAccessModified cached updated for ", "acct", evt.Address)
}
}
}
func (p *PermissionCtrl) monitorAccountPermissionsAccessRevoked() {
ch := make(chan *pbind.AcctManagerAccountAccessRevoked)
opts := &bind.WatchOpts{}
var blockNumber uint64 = 1
opts.Start = &blockNumber
var evt *pbind.AcctManagerAccountAccessRevoked
_, err := p.permAcct.AcctManagerFilterer.WatchAccountAccessRevoked(opts, ch)
if err != nil {
log.Info("AJ-Failed NewNodeProposed: %v", err)
}
for {
select {
case evt = <-ch:
log.Info("AJ-AccountAccessModified", "address", evt.Address, "role", evt.RoleId)
types.AcctInfoMap.UpsertAccount(evt.OrgId, evt.RoleId, evt.Address, evt.OrgAdmin, types.AcctActive)
log.Info("AJ-AccountAccessModified cached updated for ", "acct", evt.Address)
}
}
}
// Disconnect the node from the network
func (p *PermissionCtrl) disconnectNode(enodeId string) {
if p.isRaft {
@ -623,15 +510,6 @@ func (p *PermissionCtrl) disconnectNode(enodeId string) {
}
}
// helper function to format EnodeId
func (p *PermissionCtrl) formatEnodeId(enodeId, ipAddrPort, discPort, raftPort string) string {
newEnodeId := "enode://" + enodeId + "@" + ipAddrPort + "?discport=" + discPort
if p.isRaft {
newEnodeId += "&raftport=" + raftPort
}
return newEnodeId
}
// Thus function checks if the its the initial network boot up status and if no
// populates permissioning model with details from permission-config.json
func (p *PermissionCtrl) populateInitPermissions() error {
@ -663,11 +541,8 @@ func (p *PermissionCtrl) populateInitPermissions() error {
} else {
//populate orgs, nodes, roles and accounts from contract
p.populateOrgsFromContract(auth)
p.populateNodesFromContract(auth)
p.populateRolesFromContract(auth)
p.populateAccountsFromContract(auth)
}
@ -706,6 +581,7 @@ func (p *PermissionCtrl) bootupNetwork(permInterfSession *pbind.PermInterfaceSes
return nil
}
// populates the account access details from contract into cache
func (p *PermissionCtrl) populateAccountsFromContract(auth *bind.TransactOpts) {
//populate accounts
permAcctSession := &pbind.AcctManagerSession{
@ -725,6 +601,7 @@ func (p *PermissionCtrl) populateAccountsFromContract(auth *bind.TransactOpts) {
}
}
// populates the role details from contract into cache
func (p *PermissionCtrl) populateRolesFromContract(auth *bind.TransactOpts) {
//populate roles
permRoleSession := &pbind.RoleManagerSession{
@ -744,6 +621,7 @@ func (p *PermissionCtrl) populateRolesFromContract(auth *bind.TransactOpts) {
}
}
// populates the node details from contract into cache
func (p *PermissionCtrl) populateNodesFromContract(auth *bind.TransactOpts) {
//populate nodes
permNodeSession := &pbind.NodeManagerSession{
@ -764,6 +642,7 @@ func (p *PermissionCtrl) populateNodesFromContract(auth *bind.TransactOpts) {
}
}
// populates the org details from contract into cache
func (p *PermissionCtrl) populateOrgsFromContract(auth *bind.TransactOpts) {
//populate orgs
permOrgSession := &pbind.OrgManagerSession{
@ -830,64 +709,37 @@ func (p *PermissionCtrl) updateNetworkStatus(permissionsSession *pbind.PermInter
return nil
}
// monitors role management related events and updated cache
func (p *PermissionCtrl) manageRolePermissions() {
if p.permissionedMode {
log.Info("AJ-manage role start")
//monitor for new nodes addition via smart contract
go p.monitorNewRoleAdd()
go p.monitorNewRoleRemove()
}
}
chRoleCreated := make(chan *pbind.RoleManagerRoleCreated, 1)
chRoleRevoked := make(chan *pbind.RoleManagerRoleRevoked, 1)
func (p *PermissionCtrl) monitorNewRoleAdd() {
log.Info("AJ-new role added event monitor started...")
ch := make(chan *pbind.RoleManagerRoleCreated, 1)
var evtRoleCreated *pbind.RoleManagerRoleCreated
var evtRoleRevoked *pbind.RoleManagerRoleRevoked
opts := &bind.WatchOpts{}
var blockNumber uint64 = 1
opts.Start = &blockNumber
var evt *pbind.RoleManagerRoleCreated
_, err := p.permRole.RoleManagerFilterer.WatchRoleCreated(opts, ch)
if err != nil {
if _, err := p.permRole.RoleManagerFilterer.WatchRoleCreated(opts, chRoleCreated); err != nil {
log.Info("Failed WatchRoleCreated: %v", err)
}
for {
log.Info("AJ-new role created waiting for events...")
select {
case evt = <-ch:
log.Info("AJ-newRoleCreated", "org", evt.OrgId, "role", evt.RoleId)
types.RoleInfoMap.UpsertRole(evt.OrgId, evt.RoleId, evt.IsVoter, types.AccessType(int(evt.BaseAccess.Uint64())), true)
log.Info("AJ-newRoleCreated cached updated for ", "orgid", evt.OrgId, "role", evt.RoleId)
}
}
}
func (p *PermissionCtrl) monitorNewRoleRemove() {
log.Info("AJ-new role remove event monitor started...")
ch := make(chan *pbind.RoleManagerRoleRevoked, 1)
opts := &bind.WatchOpts{}
var blockNumber uint64 = 1
opts.Start = &blockNumber
var evt *pbind.RoleManagerRoleRevoked
_, err := p.permRole.RoleManagerFilterer.WatchRoleRevoked(opts, ch)
if err != nil {
if _, err := p.permRole.RoleManagerFilterer.WatchRoleRevoked(opts, chRoleRevoked); err != nil {
log.Info("Failed WatchRoleRemoved: %v", err)
}
for {
log.Info("AJ-new role removed waiting for events...")
select {
case evt = <-ch:
log.Info("AJ-newRoleRemoved", "org", evt.OrgId, "role", evt.RoleId)
if r := types.RoleInfoMap.GetRole(evt.OrgId, evt.RoleId); r != nil {
types.RoleInfoMap.UpsertRole(evt.OrgId, evt.RoleId, r.IsVoter, r.Access, false)
log.Info("AJ-newRoleRemoved cached updated for ", "orgid", evt.OrgId, "role", evt.RoleId)
} else {
log.Error("AJ-revoke role - cache is missing role", "org", evt.OrgId, "role", evt.RoleId)
}
for {
select {
case evtRoleCreated = <-chRoleCreated:
types.RoleInfoMap.UpsertRole(evtRoleCreated.OrgId, evtRoleCreated.RoleId, evtRoleCreated.IsVoter, types.AccessType(int(evtRoleCreated.BaseAccess.Uint64())), true)
case evtRoleRevoked = <-chRoleRevoked:
if r := types.RoleInfoMap.GetRole(evtRoleRevoked.OrgId, evtRoleRevoked.RoleId); r != nil {
types.RoleInfoMap.UpsertRole(evtRoleRevoked.OrgId, evtRoleRevoked.RoleId, r.IsVoter, r.Access, false)
} else {
log.Error("Revoke role - cache is missing role", "org", evtRoleRevoked.OrgId, "role", evtRoleRevoked.RoleId)
}
}
}
}