Source code for terraformtestinglib.linting.linting

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: linting.py
#
# Copyright 2018 Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
#  DEALINGS IN THE SOFTWARE.
#

"""
Main code for linting.

.. _Google Python Style Guide:
   http://google.github.io/styleguide/pyguide.html

"""

import logging
import os
import re
import warnings

import yaml
from yaml.parser import ParserError
from schema import SchemaError

from terraformtestinglib.terraformtestinglib import Parser
from terraformtestinglib.configuration import NAMING_SCHEMA, POSITIONING_SCHEMA, DISASTER_RECOVERY_FILENAME
from terraformtestinglib.terraformtestinglibexceptions import InvalidNaming, InvalidPositioning
from terraformtestinglib.utils import RuleError, ResourceError, FilenameError, ConfigurationError

__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''2018-05-24'''
__copyright__ = '''Copyright 2018, Costas Tyfoxylos'''
__credits__ = ["Costas Tyfoxylos"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".

# This is the main prefix used for logging
LOGGER_BASENAME = '''linting'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())


[docs]class Stack(Parser): """Manages a stack as a collection of resources that can be checked for name convention.""" def __init__(self, # pylint: disable=too-many-arguments configuration_path, naming_file_path, positioning_file_path=None, global_variables_file_path=None, file_to_skip_for_positioning=None, raise_on_missing_variable=True, environment_variables=None): super(Stack, self).__init__(configuration_path, global_variables_file_path, raise_on_missing_variable, environment_variables) self._logger = logging.getLogger(f'{LOGGER_BASENAME}.{self.__class__.__name__}') self.path = configuration_path self.rules_set = self._get_naming_rules(naming_file_path) self.positioning = self._get_positioning_rules(positioning_file_path) self.positioning_skip_file = file_to_skip_for_positioning self.resources = self._get_resources() self._errors = [] @staticmethod def _get_naming_rules(rules_file): try: rules_path_file = os.path.expanduser(rules_file) with open(rules_path_file, 'r') as rules_file_handle: rules = yaml.load(rules_file_handle.read(), Loader=yaml.FullLoader) rules_file_handle.close() rules = NAMING_SCHEMA.validate(rules) except IOError: raise InvalidNaming('Could not load naming file') except ParserError: raise InvalidNaming('Unable to parse yaml file. Please check that it is valid.') except SchemaError as error: raise InvalidNaming(error) return RuleSet(rules) @staticmethod def _get_positioning_rules(positioning_file): if positioning_file is None: return None try: positioning_path_file = os.path.expanduser(positioning_file) with open(positioning_path_file, 'r') as positioning_file_handle: positioning = yaml.load(positioning_file_handle.read(), Loader=yaml.FullLoader) positioning_file_handle.close() positioning = POSITIONING_SCHEMA.validate(positioning) except IOError: raise InvalidPositioning('Could not load positioning file') except ParserError: raise InvalidPositioning('Unable to parse yaml file. Please check that it is valid.') except SchemaError as error: raise InvalidPositioning(error) return positioning def _get_resources(self): resources = [] for hcl_resource in self.hcl_resources: if hcl_resource.data.get('count'): for data in self.hcl_view.get_counter_resource_data_by_type(hcl_resource.resource_type, hcl_resource.resource_name): resources.append(self._instantiate_resource(hcl_resource.filename, hcl_resource.resource_type, hcl_resource.resource_name, data, hcl_resource.data)) else: resources.append(self._instantiate_resource(hcl_resource.filename, hcl_resource.resource_type, hcl_resource.resource_name, self.hcl_view.get_resource_data_by_type( hcl_resource.resource_type, hcl_resource.resource_name), hcl_resource.data)) return resources def _instantiate_resource(self, filename, resource_type, name, data, original_data): # pylint: disable=too-many-arguments resource = LintingResource(filename, resource_type, name, data, original_data) resource.register_rules_set(self.rules_set) if filename == self.positioning_skip_file: resource.register_positioning_set(None) else: resource.register_positioning_set(self.positioning) return resource
[docs] def validate(self): """Validates all the resources of the stack. Returns: None """ self._errors = [] for resource in self.resources: resource.validate() for error in resource.errors: self._errors.append(error)
@property def errors(self): """The errors of the validation of the resources of the stack. Returns: errors (ResourceError|FilenameError) : list of possible linting errors """ return self._errors
[docs]class LintingResource: # pylint: disable=too-many-instance-attributes """Manages a resource and provides validation capabilities..""" def __init__(self, filename, resource_type, name, data, original_data): # pylint: disable=too-many-arguments self._logger = logging.getLogger(f'{LOGGER_BASENAME}.{self.__class__.__name__}') self.filename = filename self.name = name self.type = resource_type self.data = data self.original_data = original_data self.rules_set = None self.positioning_set = None self.errors = None def __getattr__(self, value): return self.data.get(value)
[docs] def register_rules_set(self, rules_set): """Registers the set of rules with the Resource. Args: rules_set (dict): A dictionary with the rules for the naming convention Returns: None """ self.rules_set = rules_set
[docs] def register_positioning_set(self, positioning_set): """Registers the set of rules with the Resource. Args: positioning_set (dict): A dictionary with the rules for the positioning convention Returns: None """ self.positioning_set = positioning_set
def _get_entity_desired_filename(self, entity): target = next((filename for filename, entities in self.positioning_set.items() if entity in entities), 'unknown') return target
[docs] def validate(self): """Validates the resource according to the appropriate rule. Returns: True upon completion """ self.errors = [] validate_positioning = True if not self.rules_set: self._logger.warning('No rules set!') return True if self.positioning_set is None: message = ('Skipping resource positioning due to positioning file not been provided or ' 'being skipped.') self._logger.info(message) validate_positioning = False elif os.environ.get('SKIP_POSITIONING'): self._logger.info('Skipping resource positioning due to global environment setting.') validate_positioning = False self._logger.debug('Resource type %s', self.type) self._validate_naming() if self.filename == DISASTER_RECOVERY_FILENAME: self._logger.info('Not validating entries for "%s"', DISASTER_RECOVERY_FILENAME) return True if validate_positioning: self._validate_positioning() return True
def _is_check_skipped(self, tag_name, deprecated_tag_name=None): skip_check = False try: check_tag = tag_name deprecated_tag = deprecated_tag_name tags = self.tags or {} if tags.get(deprecated_tag): message = f'The tag "{deprecated_tag}" is deprecated. Please use "{check_tag}". Resource: {self.name}' warnings.warn(message, PendingDeprecationWarning) check_tag = deprecated_tag skip_check = tags.get(check_tag, False) except IndexError: self._logger.error(('Weird error with no or broken resources ' 'found %s for resource %s' % self.data, self.name)) except AttributeError: self._logger.exception('Multiple tags entry found on resource %s', self.name) return skip_check def _validate_positioning(self): self._logger.debug('Resource name %s', self.name) if self._is_check_skipped('skip-positioning', 'skip_positioning'): self._logger.warning('Skipping resource %s positioning checking ' 'due to user overriding tag.', self.name) else: full_desired_filename = self._get_entity_desired_filename(self.type) desired_filename, _, _ = full_desired_filename.rpartition('.tf') file_name, _, _ = self.filename.rpartition('.') if not re.match(desired_filename, file_name): self.errors.append(FilenameError(self.filename, self.name, full_desired_filename)) self._logger.error('Filename positioning not followed on file %s for resource ' '%s. Should be in a file matching %s.tf .', self.filename, self.name, desired_filename) return True def _validate_naming(self): self._logger.debug('Resource name %s', self.name) if self._is_check_skipped('skip-linting', 'skip_linting'): self._logger.warning('Skipping resource %s naming checking ' 'due to user overriding tag.', self.name) else: rule = self.rules_set.get_rule_for_resource(self.type) if rule: self._logger.debug('Found matching rule "%s"', rule.regex) rule.validate(self.type, self.name, self.data, self.original_data) for error in rule.errors: if isinstance(error, ConfigurationError): self._logger.error('Invalid configuration found on file %s for resource ' '%s with type %s . Invalid value found :%s', self.filename, error.entity, error.field, error.value) else: self._logger.error('Naming convention not followed on file %s for resource ' '%s with type %s. Regex not matched :%s. Value :%s', self.filename, error.entity, error.field, error.regex, error.value) self.errors.append(ResourceError(self.filename, *error)) else: self._logger.debug('No matching rule found') return True
[docs]class RuleSet: # pylint: disable=too-few-public-methods """Manages the rules as a group and can search them by name.""" def __init__(self, rules): self._rules = rules
[docs] def get_rule_for_resource(self, resource_name): """Retrieves the rule for the resource name. Args: resource_name (basestring): The resource type to retrieve the rule for Returns: The rule corresponding with the resource type if found, None otherwise """ return next((Rule(rule) for rule in self._rules if rule.get('resource') == resource_name), None)
[docs]class Rule: """Handles the rule object providing validation capabilities.""" def __init__(self, data): self._logger = logging.getLogger(f'{LOGGER_BASENAME}.{self.__class__.__name__}') self.data = data self.regex = self.data.get('regex') self._errors = [] @property def errors(self): """List of errors found. Returns: The errors found """ return self._errors @errors.setter def errors(self, error): self._errors.append(error)
[docs] def validate(self, resource_type, resource_name, resource_data, original_data): """Validates the given resource based on the ruleset. Args: resource_type (basestring): The type of the resource resource_name (basestring): The name of the resource resource_data (dict): The interpolated data of the resource original_data (dict): The original data of the resource, before the interpolation Returns: True on successful validation, False otherwise """ if not self.regex: return True self._validate_name(resource_type, resource_name) self._validate_values(resource_type, resource_name, resource_data, original_data) return True if not self.errors else False # pylint: disable=simplifiable-if-expression
def _validate_name(self, resource_type, resource_name): rule = re.compile(self.regex) if not re.match(rule, resource_name): self.errors = RuleError(resource_type, resource_name, 'id', self.regex, resource_name, None) def _validate_values(self, resource_type, resource_name, resource_data, original_data): for field in self.data.get('fields', []): regex = field.get('regex') if not regex: continue rule = re.compile(regex) original_value, key_name = self._get_value_from_resource(original_data, field.get('value')) value, key_name = self._get_value_from_resource(resource_data, field.get('value')) original_value = original_value if original_value != value else None rule_arguments = [resource_type, resource_name, field.get('value'), regex, value, original_value] if isinstance(value, list): for entry in value: # since there multiple occurances of the key we need to iterate all of them # and only use each ones value, that is why we overwrite the value for each iteration rule_arguments[4] = entry.get(key_name) self._match_rule_to_value(regex, rule, entry.get(key_name), rule_arguments) else: self._match_rule_to_value(regex, rule, value, rule_arguments) def _match_rule_to_value(self, regex, rule, value, rule_arguments): try: if not re.match(rule, value): self.errors = RuleError(*rule_arguments) except TypeError: self._logger.error('Error matching for regex, values passed were, rule:%s value:%s', regex, value) def _get_value_from_resource(self, resource, value): path = value.split('.') for entry in path: try: field = resource.get(entry) except AttributeError: self._logger.error('Error getting field %s, failed for path %s', value, path) resource = field # if the resource is a list it means that there are multiple entries for the same key # so it needs to be handled on the calling code if isinstance(resource, list): # we need to get the key name of the value that is a list. # That should be the one after we just iterated over. keys_path = value.split('.') return resource, keys_path.pop(keys_path.index(entry) + 1) return resource, None