400 lines
16 KiB
Python
400 lines
16 KiB
Python
# Copyright 2023 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Common fixtures."""
|
|
|
|
import collections
|
|
import contextlib
|
|
import glob
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
import tftest
|
|
import yaml
|
|
|
|
_REPO_ROOT = Path(__file__).parents[1]
|
|
PlanSummary = collections.namedtuple('PlanSummary', 'values counts outputs')
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _prepare_root_module(path):
|
|
"""Context manager to prepare a terraform module to be tested.
|
|
|
|
If the TFTEST_COPY environment variable is set, `path` is copied to
|
|
a temporary directory and a few terraform files (e.g.
|
|
terraform.tfvars) are deleted to ensure a clean test environment.
|
|
Otherwise, `path` is simply returned untouched.
|
|
"""
|
|
# if we're copying the module, we might as well ignore files and
|
|
# directories that are automatically read by terraform. Useful
|
|
# to avoid surprises if, for example, you have an active fast
|
|
# deployment with links to configs)
|
|
ignore_patterns = shutil.ignore_patterns('*.auto.tfvars',
|
|
'*.auto.tfvars.json',
|
|
'[0-9]-*-providers.tf',
|
|
'terraform.tfstate*',
|
|
'.terraform.lock.hcl',
|
|
'terraform.tfvars', '.terraform')
|
|
|
|
if os.environ.get('TFTEST_COPY'):
|
|
# if the TFTEST_COPY is set, create temp dir and copy the root
|
|
# module there
|
|
with tempfile.TemporaryDirectory(dir=path.parent) as tmp_path:
|
|
tmp_path = Path(tmp_path)
|
|
|
|
shutil.copytree(path, tmp_path, dirs_exist_ok=True,
|
|
ignore=ignore_patterns)
|
|
lockfile = _REPO_ROOT / 'tools' / 'lockfile' / '.terraform.lock.hcl'
|
|
if lockfile.exists():
|
|
shutil.copy(lockfile, tmp_path / '.terraform.lock.hcl')
|
|
|
|
yield tmp_path
|
|
else:
|
|
# check if any ignore_patterns files are present in path
|
|
if unwanted_files := ignore_patterns(path, os.listdir(path=path)):
|
|
# prevent shooting yourself in the foot (unexpected test results) when ignored files are present
|
|
raise RuntimeError(
|
|
f'Test in path {path} contains {", ".join(unwanted_files)} which may affect '
|
|
f'test results. Please run tests with TFTEST_COPY=1 environment variable'
|
|
)
|
|
# if TFTEST_COPY is not set, just return the same path
|
|
yield path
|
|
|
|
|
|
def plan_summary(module_path, basedir, tf_var_files=None, extra_files=None,
|
|
**tf_vars):
|
|
"""
|
|
Run a Terraform plan on the module located at `module_path`.
|
|
|
|
- module_path: terraform root module to run. Can be an absolute
|
|
path or relative to the root of the repository
|
|
|
|
- basedir: directory root to use for relative paths in
|
|
tf_var_files.
|
|
|
|
- tf_var_files: set of terraform variable files (tfvars) to pass
|
|
in to terraform
|
|
|
|
Returns a PlanSummary object containing 3 attributes:
|
|
- values: dictionary where the keys are terraform plan addresses
|
|
and values are the JSON representation (converted to python
|
|
types) of the attribute values of the resource.
|
|
|
|
- counts: dictionary where the keys are the terraform resource
|
|
types and the values are the number of times that type appears
|
|
in the plan
|
|
|
|
- outputs: dictionary of the modules outputs that can be
|
|
determined at plan type.
|
|
|
|
Consult [1] for mode details on the structure of values and outputs
|
|
|
|
[1] https://developer.hashicorp.com/terraform/internals/json-format
|
|
"""
|
|
# make the module_path relative to the root of the repo while still
|
|
# supporting absolute paths
|
|
module_path = _REPO_ROOT / module_path
|
|
with _prepare_root_module(module_path) as test_path:
|
|
binary = os.environ.get('TERRAFORM', 'terraform')
|
|
tf = tftest.TerraformTest(test_path, binary=binary)
|
|
extra_files = [(module_path / filename).resolve()
|
|
for x in extra_files or []
|
|
for filename in glob.glob(x, root_dir=module_path)]
|
|
tf.setup(extra_files=extra_files, upgrade=True)
|
|
tf_var_files = [(basedir / x).resolve() for x in tf_var_files or []]
|
|
plan = tf.plan(output=True, tf_var_file=tf_var_files, tf_vars=tf_vars)
|
|
|
|
# compute resource type counts and address->values map
|
|
values = {}
|
|
counts = collections.defaultdict(int)
|
|
counts['modules'] = counts['resources'] = 0
|
|
q = collections.deque([plan.root_module])
|
|
while q:
|
|
e = q.popleft()
|
|
|
|
if 'type' in e:
|
|
counts[e['type']] += 1
|
|
if 'values' in e:
|
|
values[e['address']] = e['values']
|
|
|
|
for x in e.get('resources', []):
|
|
counts['resources'] += 1
|
|
q.append(x)
|
|
for x in e.get('child_modules', []):
|
|
counts['modules'] += 1
|
|
q.append(x)
|
|
|
|
# extract planned outputs
|
|
outputs = plan.get('planned_values', {}).get('outputs', {})
|
|
|
|
# force the destruction of the tftest object, otherwise pytest
|
|
# will complain about unraisable exceptions caused by the context
|
|
# manager deleting temporary files, including the extra_files that
|
|
# tftest tries to remove on cleanup
|
|
del tf
|
|
|
|
return PlanSummary(values, dict(counts), outputs)
|
|
|
|
|
|
@pytest.fixture(name='plan_summary')
|
|
def plan_summary_fixture(request):
|
|
"""Return a function to generate a PlanSummary.
|
|
|
|
In the returned function `basedir` becomes optional and it defaults
|
|
to the directory of the calling test
|
|
"""
|
|
|
|
def inner(module_path, basedir=None, tf_var_files=None, extra_files=None,
|
|
**tf_vars):
|
|
if basedir is None:
|
|
basedir = Path(request.fspath).parent
|
|
return plan_summary(module_path=module_path, basedir=basedir,
|
|
tf_var_files=tf_var_files, extra_files=extra_files,
|
|
**tf_vars)
|
|
|
|
return inner
|
|
|
|
|
|
def plan_validator(module_path, inventory_paths, basedir, tf_var_files=None,
|
|
extra_files=None, **tf_vars):
|
|
summary = plan_summary(module_path=module_path, tf_var_files=tf_var_files,
|
|
extra_files=extra_files, basedir=basedir, **tf_vars)
|
|
|
|
# allow single single string for inventory_paths
|
|
if not isinstance(inventory_paths, list):
|
|
inventory_paths = [inventory_paths]
|
|
|
|
for path in inventory_paths:
|
|
# allow tfvars and inventory to be relative to the caller
|
|
path = basedir / path
|
|
relative_path = path.relative_to(_REPO_ROOT)
|
|
try:
|
|
inventory = yaml.safe_load(path.read_text())
|
|
except (IOError, OSError, yaml.YAMLError) as e:
|
|
raise Exception(f'cannot read test inventory {path}: {e}')
|
|
|
|
# don't fail if the inventory is empty
|
|
inventory = inventory or {}
|
|
|
|
# If you add additional asserts to this function:
|
|
# - put the values coming from the plan on the left side of
|
|
# any comparison operators
|
|
# - put the values coming from user's inventory the right
|
|
# side of any comparison operators.
|
|
# - include a descriptive error message to the assert
|
|
|
|
if 'values' in inventory:
|
|
validate_plan_object(inventory['values'], summary.values, relative_path,
|
|
"")
|
|
|
|
if 'counts' in inventory:
|
|
expected_counts = inventory['counts']
|
|
for type_, expected_count in expected_counts.items():
|
|
assert type_ in summary.counts, \
|
|
f'{relative_path}: module does not create any resources of type `{type_}`'
|
|
plan_count = summary.counts[type_]
|
|
assert plan_count == expected_count, \
|
|
f'{relative_path}: count of {type_} resources failed. Got {plan_count}, expected {expected_count}'
|
|
|
|
if 'outputs' in inventory:
|
|
expected_outputs = inventory['outputs']
|
|
for output_name, expected_output in expected_outputs.items():
|
|
assert output_name in summary.outputs, \
|
|
f'{relative_path}: module does not output `{output_name}`'
|
|
output = summary.outputs[output_name]
|
|
# assert 'value' in output, \
|
|
# f'output `{output_name}` does not have a value (is it sensitive or dynamic?)'
|
|
plan_output = output.get('value', '__missing__')
|
|
assert plan_output == expected_output, \
|
|
f'{relative_path}: output {output_name} failed. Got `{plan_output}`, expected `{expected_output}`'
|
|
|
|
return summary
|
|
|
|
|
|
def validate_plan_object(expected_value, plan_value, relative_path,
|
|
relative_address):
|
|
"""
|
|
Validate that plan object matches inventory
|
|
|
|
1. Verify each address in the user's inventory exists in the plan
|
|
2. For those address that exist on both the user's inventory and
|
|
the plan output, ensure the set of keys on the inventory are a
|
|
subset of the keys in the plan, and compare their values by
|
|
equality
|
|
3. For lists, verify that they have the same length and check
|
|
whether its members are equal (according to this function)
|
|
"""
|
|
# dictionaries / objects
|
|
if isinstance(expected_value, dict) and isinstance(plan_value, dict):
|
|
for k, v in expected_value.items():
|
|
assert k in plan_value, \
|
|
f'{relative_path}: {k} is not a valid address in the plan'
|
|
validate_plan_object(v, plan_value[k], relative_path,
|
|
f'{relative_address}.{k}')
|
|
|
|
# lists
|
|
elif isinstance(expected_value, list) and isinstance(plan_value, list):
|
|
assert len(plan_value) == len(expected_value), \
|
|
f'{relative_path}: {relative_address} has different length. Got {plan_value}, expected {expected_value}'
|
|
|
|
for i, (exp, actual) in enumerate(zip(expected_value, plan_value)):
|
|
validate_plan_object(exp, actual, relative_path,
|
|
f'{relative_address}[{i}]')
|
|
|
|
# all other objects
|
|
else:
|
|
assert plan_value == expected_value, \
|
|
f'{relative_path}: {relative_address} failed. Got `{plan_value}`, expected `{expected_value}`'
|
|
|
|
|
|
@pytest.fixture(name='plan_validator')
|
|
def plan_validator_fixture(request):
|
|
"""Return a function to build a PlanSummary and compare it to a YAML inventory.
|
|
|
|
In the returned function `basedir` becomes optional and it defaults
|
|
to the directory of the calling test'
|
|
|
|
"""
|
|
|
|
def inner(module_path, inventory_paths, basedir=None, tf_var_files=None,
|
|
**tf_vars):
|
|
if basedir is None:
|
|
basedir = Path(request.fspath).parent
|
|
return plan_validator(module_path=module_path,
|
|
inventory_paths=inventory_paths, basedir=basedir,
|
|
tf_var_files=tf_var_files, **tf_vars)
|
|
|
|
return inner
|
|
|
|
|
|
def get_tfvars_for_e2e():
|
|
_variables = ["billing_account", "group_email", "organization_id", "parent", "prefix", "region"]
|
|
tf_vars = {k: os.environ.get(f"TFTEST_E2E_{k}") for k in _variables}
|
|
return tf_vars
|
|
|
|
|
|
def e2e_validator(module_path, extra_files, tf_var_files, basedir=None):
|
|
"""Function running apply, plan and destroy to verify the case end to end
|
|
|
|
1. Tests whether apply does not return errors
|
|
2. Tests whether plan after apply is empty
|
|
3. Tests whether destroy does not return errors
|
|
"""
|
|
module_path = _REPO_ROOT / module_path
|
|
with _prepare_root_module(module_path) as test_path:
|
|
binary = os.environ.get('TERRAFORM', 'terraform')
|
|
tf = tftest.TerraformTest(test_path, binary=binary)
|
|
extra_files = [(module_path / filename).resolve()
|
|
for x in extra_files or []
|
|
for filename in glob.glob(x, root_dir=module_path)]
|
|
tf.setup(extra_files=extra_files, upgrade=True)
|
|
tf_var_files = [(basedir / x).resolve() for x in tf_var_files or []]
|
|
|
|
# we need only prefix variable to run the example test, all the other are passed in terraform.tfvars file
|
|
prefix = get_tfvars_for_e2e()["prefix"]
|
|
# to allow different tests to create projects (or other globally unique resources) with the same name
|
|
# bump prefix forward on each test execution
|
|
tf_vars = {"prefix": f'{prefix}-{int(time.time())}{os.environ.get("PYTEST_XDIST_WORKER", "0")[-2:]}'}
|
|
try:
|
|
apply = tf.apply(tf_var_file=tf_var_files, tf_vars=tf_vars)
|
|
plan = tf.plan(output=True, tf_var_file=tf_var_files, tf_vars=tf_vars)
|
|
changes = {}
|
|
for resource_name, value in plan.resource_changes.items():
|
|
if value.get('change', {}).get('actions') != ['no-op']:
|
|
changes[resource_name] = value['change']
|
|
|
|
# compare before with after to raise more meaningful failure to the user, i.e one
|
|
# that shows how resource will change
|
|
plan_before_state = {k: v.get('before') for k, v in changes.items()}
|
|
plan_after_state = {k: v.get('after') for k, v in changes.items()}
|
|
|
|
assert plan_before_state == plan_after_state, f'Plan not empty after apply for values'
|
|
|
|
plan_before_sensitive_state = {
|
|
k: v.get('before_sensitive') for k, v in changes.items()
|
|
}
|
|
plan_after_sensitive_state = {
|
|
k: v.get('after_sensitive') for k, v in changes.items()
|
|
}
|
|
assert plan_before_sensitive_state == plan_after_sensitive_state, f'Plan not empty after apply for sensitive values'
|
|
|
|
# If above did not fail, this should not either, but left as a safety check
|
|
assert changes == {}, f'Plan not empty for following resources: {", ".join(changes.keys())}'
|
|
finally:
|
|
destroy = tf.destroy(tf_var_file=tf_var_files, tf_vars=tf_vars)
|
|
|
|
|
|
@pytest.fixture(name='e2e_validator')
|
|
def e2e_validator_fixture(request):
|
|
"""Return a function to run end-to-end test
|
|
|
|
In the returned function `basedir` becomes optional and it defaults
|
|
to the directory of the calling test
|
|
|
|
"""
|
|
|
|
def inner(module_path: str, extra_files: list, tf_var_files: list,
|
|
basedir: os.PathLike = None):
|
|
if basedir is None:
|
|
basedir = Path(request.fspath).parent
|
|
return e2e_validator(module_path, extra_files, tf_var_files, basedir)
|
|
|
|
return inner
|
|
|
|
|
|
@pytest.fixture(scope='session', name='e2e_tfvars_path')
|
|
def e2e_tfvars_path():
|
|
"""Fixture preparing end-to-end test environment
|
|
|
|
If TFTEST_E2E_TFVARS_PATH is set in the environment, then assume the environment is already provisioned
|
|
and necessary variables are set in the file to which variable is pointing to.
|
|
|
|
Otherwise, create a unique test environment (in case of multiple workers - as many environments as
|
|
there are workers), that will be injected into each example test instead of `tests/examples/variables.tf`.
|
|
|
|
Returns path to tfvars file that contains information about envrionment to use for the tests.
|
|
"""
|
|
if tfvars_path := os.environ.get('TFTEST_E2E_TFVARS_PATH'):
|
|
# no need to set up the project
|
|
if int(os.environ.get('PYTEST_XDIST_WORKER_COUNT', '0')) > 1:
|
|
raise RuntimeError(
|
|
'Setting TFTEST_E2E_TFVARS_PATH is not compatible with running tests in parallel'
|
|
)
|
|
yield tfvars_path
|
|
else:
|
|
with _prepare_root_module(_REPO_ROOT / 'tests' / 'examples_e2e' /
|
|
'setup_module') as test_path:
|
|
binary = os.environ.get('TERRAFORM', 'terraform')
|
|
tf = tftest.TerraformTest(test_path, binary=binary)
|
|
tf_vars_file = None
|
|
tf_vars = get_tfvars_for_e2e()
|
|
tf_vars['suffix'] = os.environ.get("PYTEST_XDIST_WORKER", "0")[-2:] # take at most 2 last chars for suffix
|
|
tf_vars['timestamp'] = str(int(time.time()))
|
|
|
|
if 'TFTEST_E2E_SETUP_TFVARS_PATH' in os.environ:
|
|
tf_vars_file = os.environ["TFTEST_E2E_SETUP_TFVARS_PATH"]
|
|
tf.setup(upgrade=True)
|
|
tf.apply(tf_vars=tf_vars, tf_var_file=tf_vars_file)
|
|
yield test_path / "e2e_tests.tfvars"
|
|
tf.destroy(tf_vars=tf_vars, tf_var_file=tf_vars_file)
|
|
|
|
|
|
# @pytest.fixture
|
|
# def repo_root():
|
|
# 'Return a pathlib.Path to the root of the repository'
|
|
# return Path(__file__).parents[1]
|