Source code for terraformtestinglib.terraformtestinglib

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: terraformtestinglib.py
#
# Copyright 2018 Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
#  DEALINGS IN THE SOFTWARE.
#

"""
Main code for terraformtestinglib.

.. _Google Python Style Guide:
   http://google.github.io/styleguide/pyguide.html

"""

import copy
import glob
import logging
import os
import platform
import re
import warnings
from ast import literal_eval
from collections import namedtuple

import hcl
from colorama import init

from .terraformtestinglibexceptions import MissingVariable
from .utils import RecursiveDictionary

__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''2018-05-24'''
__copyright__ = '''Copyright 2018, Costas Tyfoxylos'''
__credits__ = ["Costas Tyfoxylos"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".

# This is the main prefix used for logging
LOGGER_BASENAME = '''terraformtestinglib'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())


[docs]def warning_on_one_line(message, category, filename, lineno, line=None): # pylint: disable=unused-argument """Warning formating method.""" return '\n\n%s:%s\n\n' % (category.__name__, message)
warnings.formatwarning = warning_on_one_line warnings.simplefilter('always', PendingDeprecationWarning) if platform.platform().lower() == 'windows': init(convert=True) else: init() HclFileResource = namedtuple('HclFileResource', ('filename', 'resource_type', 'resource_name', 'data'))
[docs]class HclView: # pylint: disable=too-many-instance-attributes """Object representing the global view of hcl resources along with any global variables.""" def __init__(self, hcl_resources, global_variables=None, raise_on_missing_variable=True, environment_variables=None): if environment_variables and not isinstance(environment_variables, dict): raise ValueError('Environment variables provided are not in a valid dictionary.') self._logger = logging.getLogger(f'{LOGGER_BASENAME}.{self.__class__.__name__}') self.state = RecursiveDictionary() self._raise_on_missing_variable = raise_on_missing_variable self._environment_variables = environment_variables if environment_variables else {} if global_variables and isinstance(global_variables, dict): self.state.update({'variable': global_variables}) for hcl_resource in hcl_resources: self._add_hcl_resource(hcl_resource) self.resources = self._interpolate_state(copy.deepcopy(self.state.get('resource', {}))) self.data = self._interpolate_state(copy.deepcopy(self.state.get('data', {}))) self.terraform = self._interpolate_state(copy.deepcopy(self.state.get('terraform', {}))) self.provider = self._interpolate_state(copy.deepcopy(self.state.get('provider', {}))) def _add_hcl_resource(self, data): self.state.update(self._filter_empty_variables(data)) @staticmethod def _filter_empty_variables(data): if 'variable' not in data.keys(): return data data['variable'] = {key: value.get('default') for key, value in data.get('variable').items() if value} return data def _interpolate_state(self, state): output = {} for resources_type, resources_entries in state.items(): if not isinstance(resources_entries, dict): entry = {resources_type: resources_entries} output[resources_type] = entry continue entry = {} for resource_name, resource_data in resources_entries.items(): try: counter = resource_data.get('count') except AttributeError: counter = None if counter: for number in range(int(self._interpolate_variable(counter))): name = f'resource_name.{number}' data = self._interpolate_counter(copy.deepcopy(resource_data), str(number)) entry[name] = data else: entry[resource_name] = resource_data output[resources_type] = entry state = self._interpolate_value(output) return state def _interpolate_value(self, data): output = {} for key, value in data.items(): if isinstance(key, str): key = self._interpolate_variable(key) if isinstance(value, str): value = self._interpolate_variable(value) elif isinstance(value, dict): value = self._interpolate_value(value) output[key] = value return output def _interpolate_counter(self, data, number): output = {} for key, value in data.items(): key = key.replace('count.index', number) if isinstance(value, str): value = value.replace('count.index', number) if isinstance(value, dict): value = self._interpolate_counter(value, number) output[key] = value return output @staticmethod def _interpolate_format(value): match = re.search(r'\(.*\)', value) # look for '(' ending in ')' pattern if match: contents = match.group(0)[1:-1] value, argument = contents.split(',') argument = eval(argument, {"__builtins__": {}}) # pylint: disable=eval-used value = eval(' % '.join([value, str(argument)]), {"__builtins__": {}}) # pylint: disable=eval-used return value def _interpolate_length(self, value): # look for '(' ending in ')' pattern match = re.search(r'\(.*\)', value) if match: return len(self.get_variable_value(match.group(0).strip())) return value def _interpolate_variable(self, value): # if its a number pass through if isinstance(value, (int, float)): return value # look for '${' ending in '}' pattern, stop at the first } and performing multiple matches for match in re.finditer(r'\${.+?\}', value): regex = match.group(0) if regex.startswith('${var.'): interpolated_value = self.get_variable_value(regex) if interpolated_value == regex: self._logger.error('Could not interpolate variable "%s" maybe not set in variables?', value) value = value.replace(regex, str(interpolated_value)) elif '${format(' in regex: value = self._interpolate_format(value) elif '${length(' in regex: value = self._interpolate_length(value) return value
[docs] def get_variable_value(self, variable): """Retrieves the value of a variable from the global view of variables. Args: variable (): The variable to look for Raises: MissingValue : If the value does not exist Returns: value (str): The value retrieved """ initial_value = variable permutations = ('${var.', '(var.') if variable.startswith(permutations): variable_name = variable.split('var.')[1] variable_name = variable_name.split('}')[0] variable_name = variable_name.split(')')[0] # look for '[' ending in ']' pattern match = re.search(r'\[.*\]', variable_name) if match: name = variable_name.split('[')[0] variable = self.state.get('variable', {}).get(name, variable) if isinstance(variable, dict): key = literal_eval(match.group(0))[0] variable = variable.get(key) elif isinstance(variable, list): index = literal_eval(match.group(0))[0] variable = variable[index] else: # we look into the variables set in the state and if nothing is there # we look into the provided environment variables variable = self.state.get('variable', {}).get(variable_name, self._environment_variables.get(variable_name, variable)) if variable == initial_value and self._raise_on_missing_variable: raise MissingVariable(initial_value) return variable
[docs] def get_resource_data_by_type(self, resource_type, resource_name): """Retrieves the data of a resource from the global hcl state based on its type. Args: resource_type (basestring): The resource type to retrieve the data for resource_name (basestring): The resource name to retrieve the data for Returns: Interpolated data (dict) for the provided resource name and resource type """ return self.resources.get(resource_type, {}).get(resource_name)
[docs] def get_counter_resource_data_by_type(self, resource_type, resource_name): """Retrieves the data of a resource from the global hcl state based on its type that has a count. Args: resource_type (basestring): The resource type to retrieve the data for resource_name (basestring): The resource name to retrieve the data for Returns: Original non interpolated data (dict) for the provided resource name and resource type """ return [data for resource, data in self.resources.get(resource_type, {}).items() if resource.startswith(resource_name)]
[docs]class Parser: # pylint: disable=too-few-public-methods """Manages the parsing of terraform files and creating the global hcl view from them.""" def __init__(self, configuration_path, global_variables_file_path=None, raise_on_missing_variable=True, environment_variables=None): self._logger = logging.getLogger(f'{LOGGER_BASENAME}.{self.__class__.__name__}') file_resources, hcl_resources = self._parse_path(configuration_path) self.hcl_view = HclView(file_resources, self._get_global_variables(global_variables_file_path), raise_on_missing_variable, environment_variables) self.file_resources = file_resources self.hcl_resources = hcl_resources def _get_global_variables(self, global_variables_file): if not global_variables_file: return {} try: global_variables_file_path = os.path.expanduser(global_variables_file) with open(global_variables_file_path, 'r') as globals_file: global_variables = hcl.load(globals_file) except ValueError: self._logger.warning('Could not parse %s for resources', global_variables_file) global_variables = {} return global_variables def _parse_path(self, path): hcl_resources = [] file_resources = [] path = os.path.expanduser(os.path.join(path, '*.tf')) for tf_file_path in glob.glob(path): _, _, filename = tf_file_path.rpartition(os.path.sep) try: self._logger.debug('Trying to load file :%s', tf_file_path) with open(tf_file_path, 'r') as terraform_file: data = hcl.load(terraform_file) file_resources.append(data) for resource_type, resource in data.get('resource', {}).items(): for resource_name, resource_data in resource.items(): hcl_resources.append(HclFileResource(filename, resource_type, resource_name, resource_data)) except ValueError: self._logger.debug('Could not parse %s for resources', filename) return file_resources, hcl_resources