# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Common fixtures.""" import collections import contextlib import glob import os import shutil import tempfile from pathlib import Path import pytest import tftest import yaml _REPO_ROOT = Path(__file__).parents[1] PlanSummary = collections.namedtuple('PlanSummary', 'values counts outputs') @contextlib.contextmanager def _prepare_root_module(path): """Context manager to prepare a terraform module to be tested. If the TFTEST_COPY environment variable is set, `path` is copied to a temporary directory and a few terraform files (e.g. terraform.tfvars) are deleted to ensure a clean test environment. Otherwise, `path` is simply returned untouched. """ if os.environ.get('TFTEST_COPY'): # if the TFTEST_COPY is set, create temp dir and copy the root # module there with tempfile.TemporaryDirectory(dir=path.parent) as tmp_path: tmp_path = Path(tmp_path) # if we're copying the module, we might as well ignore files and # directories that are automatically read by terraform. Useful # to avoid surprises if, for example, you have an active fast # deployment with links to configs) ignore_patterns = shutil.ignore_patterns('*.auto.tfvars', '*.auto.tfvars.json', '[0-9]-*-providers.tf', 'terraform.tfstate*', '.terraform.lock.hcl', 'terraform.tfvars', '.terraform') shutil.copytree(path, tmp_path, dirs_exist_ok=True, ignore=ignore_patterns) lockfile = _REPO_ROOT / 'tools' / 'lockfile' / '.terraform.lock.hcl' if lockfile.exists(): shutil.copy(lockfile, tmp_path / '.terraform.lock.hcl') yield tmp_path else: # if TFTEST_COPY is not set, just return the same path yield path def plan_summary(module_path, basedir, tf_var_files=None, extra_files=None, **tf_vars): """ Run a Terraform plan on the module located at `module_path`. - module_path: terraform root module to run. Can be an absolute path or relative to the root of the repository - basedir: directory root to use for relative paths in tf_var_files. - tf_var_files: set of terraform variable files (tfvars) to pass in to terraform Returns a PlanSummary object containing 3 attributes: - values: dictionary where the keys are terraform plan addresses and values are the JSON representation (converted to python types) of the attribute values of the resource. - counts: dictionary where the keys are the terraform resource types and the values are the number of times that type appears in the plan - outputs: dictionary of the modules outputs that can be determined at plan type. Consult [1] for mode details on the structure of values and outputs [1] https://developer.hashicorp.com/terraform/internals/json-format """ # make the module_path relative to the root of the repo while still # supporting absolute paths module_path = _REPO_ROOT / module_path with _prepare_root_module(module_path) as test_path: binary = os.environ.get('TERRAFORM', 'terraform') tf = tftest.TerraformTest(test_path, binary=binary) extra_files = [(module_path / filename).resolve() for x in extra_files or [] for filename in glob.glob(x, root_dir=module_path)] tf.setup(extra_files=extra_files, upgrade=True) tf_var_files = [(basedir / x).resolve() for x in tf_var_files or []] plan = tf.plan(output=True, tf_var_file=tf_var_files, tf_vars=tf_vars) # compute resource type counts and address->values map values = {} counts = collections.defaultdict(int) counts['modules'] = counts['resources'] = 0 q = collections.deque([plan.root_module]) while q: e = q.popleft() if 'type' in e: counts[e['type']] += 1 if 'values' in e: values[e['address']] = e['values'] for x in e.get('resources', []): counts['resources'] += 1 q.append(x) for x in e.get('child_modules', []): counts['modules'] += 1 q.append(x) # extract planned outputs outputs = plan.get('planned_values', {}).get('outputs', {}) # force the destruction of the tftest object, otherwise pytest # will complain about unraisable exceptions caused by the context # manager deleting temporary files, including the extra_files that # tftest tries to remove on cleanup del tf return PlanSummary(values, dict(counts), outputs) @pytest.fixture(name='plan_summary') def plan_summary_fixture(request): """Return a function to generate a PlanSummary. In the returned function `basedir` becomes optional and it defaults to the directory of the calling test """ def inner(module_path, basedir=None, tf_var_files=None, extra_files=None, **tf_vars): if basedir is None: basedir = Path(request.fspath).parent return plan_summary(module_path=module_path, basedir=basedir, tf_var_files=tf_var_files, extra_files=extra_files, **tf_vars) return inner def plan_validator(module_path, inventory_paths, basedir, tf_var_files=None, extra_files=None, **tf_vars): summary = plan_summary(module_path=module_path, tf_var_files=tf_var_files, extra_files=extra_files, basedir=basedir, **tf_vars) # allow single single string for inventory_paths if not isinstance(inventory_paths, list): inventory_paths = [inventory_paths] for path in inventory_paths: # allow tfvars and inventory to be relative to the caller path = basedir / path relative_path = path.relative_to(_REPO_ROOT) try: inventory = yaml.safe_load(path.read_text()) except (IOError, OSError, yaml.YAMLError) as e: raise Exception(f'cannot read test inventory {path}: {e}') # don't fail if the inventory is empty inventory = inventory or {} # If you add additional asserts to this function: # - put the values coming from the plan on the left side of # any comparison operators # - put the values coming from user's inventory the right # side of any comparison operators. # - include a descriptive error message to the assert if 'values' in inventory: validate_plan_object(inventory['values'], summary.values, relative_path, "") if 'counts' in inventory: expected_counts = inventory['counts'] for type_, expected_count in expected_counts.items(): assert type_ in summary.counts, \ f'{relative_path}: module does not create any resources of type `{type_}`' plan_count = summary.counts[type_] assert plan_count == expected_count, \ f'{relative_path}: count of {type_} resources failed. Got {plan_count}, expected {expected_count}' if 'outputs' in inventory: expected_outputs = inventory['outputs'] for output_name, expected_output in expected_outputs.items(): assert output_name in summary.outputs, \ f'{relative_path}: module does not output `{output_name}`' output = summary.outputs[output_name] # assert 'value' in output, \ # f'output `{output_name}` does not have a value (is it sensitive or dynamic?)' plan_output = output.get('value', '__missing__') assert plan_output == expected_output, \ f'{relative_path}: output {output_name} failed. Got `{plan_output}`, expected `{expected_output}`' return summary def validate_plan_object(expected_value, plan_value, relative_path, relative_address): """ Validate that plan object matches inventory 1. Verify each address in the user's inventory exists in the plan 2. For those address that exist on both the user's inventory and the plan output, ensure the set of keys on the inventory are a subset of the keys in the plan, and compare their values by equality 3. For lists, verify that they have the same length and check whether its members are equal (according to this function) """ # dictionaries / objects if isinstance(expected_value, dict) and isinstance(plan_value, dict): for k, v in expected_value.items(): assert k in plan_value, \ f'{relative_path}: {k} is not a valid address in the plan' validate_plan_object(v, plan_value[k], relative_path, f'{relative_address}.{k}') # lists elif isinstance(expected_value, list) and isinstance(plan_value, list): assert len(plan_value) == len(expected_value), \ f'{relative_path}: {relative_address} has different length. Got {plan_value}, expected {expected_value}' for i, (exp, actual) in enumerate(zip(expected_value, plan_value)): validate_plan_object(exp, actual, relative_path, f'{relative_address}[{i}]') # all other objects else: assert plan_value == expected_value, \ f'{relative_path}: {relative_address} failed. Got `{plan_value}`, expected `{expected_value}`' @pytest.fixture(name='plan_validator') def plan_validator_fixture(request): """Return a function to build a PlanSummary and compare it to a YAML inventory. In the returned function `basedir` becomes optional and it defaults to the directory of the calling test' """ def inner(module_path, inventory_paths, basedir=None, tf_var_files=None, **tf_vars): if basedir is None: basedir = Path(request.fspath).parent return plan_validator(module_path=module_path, inventory_paths=inventory_paths, basedir=basedir, tf_var_files=tf_var_files, **tf_vars) return inner # @pytest.fixture # def repo_root(): # 'Return a pathlib.Path to the root of the repository' # return Path(__file__).parents[1]