cloud-foundation-fabric/tools/state_iam.py

179 lines
6.4 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'Parse and output IAM bindings from Terraform state file.'
import collections
import json
import itertools
import re
import sys
import click
2022-02-14 03:13:42 -08:00
FIELDS = ('authoritative', 'resource_type', 'resource_id', 'role',
'member_type', 'member_id', 'conditions')
ORG_IDS = {}
RESOURCE_SORT = {'organization': 0, 'folder': 1, 'project': 2}
RESOURCE_TYPE_RE = re.compile(r'^google_([^_]+)_iam_([^_]+)$')
Binding = collections.namedtuple('Binding', ' '.join(FIELDS))
def _org_id(resource_id):
if resource_id not in ORG_IDS:
ORG_IDS[resource_id] = f'[org_id #{len(ORG_IDS)}]'
return ORG_IDS[resource_id]
def get_bindings(resources, prefix=None, folders=None):
'Parse resources and return bindings.'
org_ids = {}
for r in resources:
m = RESOURCE_TYPE_RE.match(r['type'])
if not m:
continue
resource_type = m.group(1)
authoritative = m.group(2) == 'binding'
for i in r.get('instances'):
attrs = i['attributes']
conditions = ' '.join(c['title'] for c in attrs.get('condition', []))
if resource_type == 'organization':
resource_id = _org_id(attrs['org_id'])
else:
resource_id = attrs[resource_type]
if prefix and resource_id.startswith(prefix):
resource_id = resource_id[len(prefix) + 1:]
role = attrs['role']
if role.startswith('organizations/'):
org_id = role.split('/')[1]
role = role.replace(org_id, _org_id(org_id))
members = attrs['members'] if authoritative else [attrs['member']]
if resource_type == 'folder' and folders:
resource_id = folders.get(resource_id, resource_id)
for member in members:
member_type, _, member_id = member.partition(':')
if member_type == 'user':
continue
Initial MVP for CI/CD (#608) * preliminary support for wif in stage 0 * IAM wif role * IAM wif role TODO * add support for external SA IAM to SA module * add name output to SA module * separate cicd SA * tfdoc * GITLAB principal (untested) * make GCS name output static * outputs bucket * fix stage 1 test * tweak outputs * tfdoc * move wif_pool to automation variable * add support for top-level and repository providers * add missing boilerplate * fix branchless principal * initial workflow * symlink provider template in stages * remove service accounts from stage 0 cicd tfvars * add cicd interface variable to resman stage * fix cicd variable in resman stage * better condition on outputs_location * fix last change * change outputs_location type * revert outputs_location change * split outputs in stage 0 * update ci/cd temporary notes * rename additive IAM resource in SA module * split outputs in stage 1 * remove unused locals * fix stage 1 tests * tfdoc * Upload action files to outputs_bucket * Fix tests and README * rename template, streamline outputs * local templates and gcs output for all stage 2 * add workflows to local output files * Use lowercase WIF providers everywhere * Bring back suffix for workflow files * Remove unused files * Update READMEs * preliminary CI/CD implementation for stage 1 * fix stage 1 * stage 1 cicd * tfdoc * fix tests * readme and links for cicd and wif * refactor wif providers * refactor cicd for stage 1 * fix stage 1 * wif org policies * split identity provider configuration from cicd * add type attribute to cicd repositories * valid cicd repositories have a workflow template * refactor stage 01 * fix stage 01 tests * minimal CI/CD documentation * better check_links error reporting * fix links * Added Gitlab specific configurations Set the default issuer_uri for Gitlab. Added allowed audiences to OIDC configuration. * Fixed TF formatting in identity providers. * Changing identity provider audience to null Changing identity provider audience to default to null. * add instructions for renaming workflows * address Julio's comments Co-authored-by: Julio Castillo <jccb@google.com> Co-authored-by: alexmeissner <alexmeissner@google.com>
2022-04-11 23:17:27 -07:00
try:
member_id, member_domain = member_id.split('@', 1)
except ValueError:
if member_type == 'domain':
member_id = 'GCP organization domain'
member_domain = ''
# raise SystemExit(f'Cannot parse binding {member_id}')
# Handle Cloud Services Service Account
if member_domain == 'cloudservices.gserviceaccount.com':
member_id = "PROJECT_CLOUD_SERVICES"
2023-05-13 20:51:46 -07:00
# Handle Cloud Service Identity Service Account
if re.match("^service-\d{8}", member_id):
member_id = "SERVICE_IDENTITY_" + member_domain.split(".", 1)[0]
2023-05-13 20:51:46 -07:00
# Handle BQ Cloud Service Identity Service Account
if re.match("^bq-\d{8}", member_id):
member_id = "IDENTITY_" + member_domain.split(".", 1)[0]
2022-04-03 15:45:27 -07:00
resource_type_output = "Service Identity - " + resource_type
else:
resource_type_output = resource_type
if prefix and member_id.startswith(prefix):
member_id = member_id[len(prefix) + 1:]
yield Binding(authoritative, resource_type_output, resource_id, role,
member_type, member_id, conditions)
def get_folders(resources):
'Parse resources and return folder id, name tuples.'
folders = {}
for r in resources:
if r['type'] != 'google_folder':
continue
for i in r['instances']:
folder_id = i['attributes']['id']
folder_name = i['attributes']['display_name']
if folder_name not in folders:
folders[folder_name] = []
folders[folder_name].append(folder_id)
for name, ids in folders.items():
for i, folder_id in enumerate(ids):
if len(ids) == 1:
yield folder_id, name
else:
yield folder_id, f'{name} [#{i}]'
def output_csv(bindings):
'Output bindings in CSV format.'
print(','.join(FIELDS))
for b in bindings:
print(','.join(str(getattr(b, f)) for f in FIELDS))
def output_principals(bindings):
'Output bindings in Markdown format by principals.'
resource_grouper = itertools.groupby(
bindings, key=lambda b: (b.resource_type, b.resource_id))
print('# IAM bindings reference')
print('\nLegend: <code>+</code> additive, <code>•</code> conditional.')
for resource, resource_groups in resource_grouper:
resource_type, resource_name = resource
print(f'\n## {resource_type.title()} <i>{resource_name.lower()}</i>\n')
principal_grouper = itertools.groupby(
resource_groups, key=lambda b: (b.member_type, b.member_id))
print('| members | roles |')
print('|---|---|')
for principal, principal_groups in principal_grouper:
roles = []
for b in principal_groups:
additive = '<code>+</code>' if not b.authoritative else ''
conditions = '<code>•</code>' if b.conditions else ''
if b.role.startswith('organizations/'):
roles.append(f'{b.role} {additive}{conditions}')
else:
2022-02-14 03:13:42 -08:00
url = ('https://cloud.google.com/iam/docs/understanding-roles#'
f'{b.role.replace("roles/", "")}')
roles.append(f'[{b.role}]({url}) {additive}{conditions}')
2022-02-14 03:13:42 -08:00
print(f'|<b>{principal[1]}</b><br><small><i>{principal[0]}</i></small>|'
f'{"<br>".join(roles)}|')
@click.command()
@click.argument('state-file', type=click.File('r'), default=sys.stdin)
2022-02-14 03:13:42 -08:00
@click.option('--format', type=click.Choice(['csv', 'principals', 'raw']),
default='raw')
@click.option('--prefix', default=None)
def main(state_file, format, prefix=None):
'Output IAM bindings parsed from Terraform state file or standard input.'
with state_file:
data = json.load(state_file)
resources = data.get('resources', [])
folders = dict(get_folders(resources))
bindings = get_bindings(resources, prefix=prefix, folders=folders)
2022-02-14 03:13:42 -08:00
bindings = sorted(
bindings, key=lambda b: (
RESOURCE_SORT.get(b.resource_type, 99),
b.resource_id,
b.member_type,
b.member_id,
))
if format == 'raw':
for b in bindings:
print(b)
else:
func = globals().get(f'output_{format}')
if not func:
raise SystemExit('Unknown format.')
func(bindings)
if __name__ == '__main__':
main()