# spawn
# Copyright (C) 2018-2019, Simmovation Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""This module defines the ``SpecificationParser``, which parsers specifications
"""
import json
import logging
from copy import deepcopy
from spawn.errors import SpecFormatError
from ..specification import (
SpecificationModel, SpecificationMetadata, SpecificationNode,
ValueProxyNode, SpecificationNodeFactory
)
from ..specification.combinators import zip_properties, product
from ..specification.evaluators import (
RangeEvaluator, RepeatEvaluator, MultiplyEvaluator,
DivideEvaluator, AddEvaluator, SubtractEvaluator
)
from .value_proxy import ValueProxyParser
from .generators import GeneratorsParser
from .macros import MacrosParser
from .constants import (
COMBINATOR, ZIP, PRODUCT,
RANGE, REPEAT, MULTIPLY, DIVIDE, ADD, SUBTRACT,
PATH, POLICY, GHOST, LITERAL
)
from .value_libraries import ValueLibraries
from ..specification import generator_methods
from ..util.validation import validate_type, validate_file
from ..util.path_builder import PathBuilder
DEFAULT_COMBINATORS = {
ZIP: zip_properties,
PRODUCT: product
}
EVALUATOR_LIB = {
RANGE: RangeEvaluator,
REPEAT: RepeatEvaluator,
MULTIPLY: MultiplyEvaluator,
DIVIDE: DivideEvaluator,
ADD: AddEvaluator,
SUBTRACT: SubtractEvaluator
}
[docs]class SpecificationDescriptionProvider:
"""Abstract base class for implementations that provide the specification description.
"""
[docs] def get(self):
"""Gets the specification description
:returns: A dict representation of the description
:rtype: dict
"""
raise NotImplementedError()
[docs]class SpecificationFileReader(SpecificationDescriptionProvider):
"""Implementation of :class:`SpecificationDescriptionProvider` that reads
the specification from a file
"""
def __init__(self, input_file):
"""Initialises the :class:`SpecificationFileReader`
:param input_file: The input file
:type input_file: path-like
"""
validate_file(input_file, 'input_file')
self._input_file = input_file
[docs] def get(self):
"""Reads the specification description from a file
:returns: A dict representation of the description
:rtype: dict
"""
with open(self._input_file) as input_fp:
return json.load(input_fp)
[docs]class DictSpecificationProvider(SpecificationDescriptionProvider):
"""Implementation of :class:`SpecificationDescriptionProvider` that reads
the specification from a provided dict
"""
def __init__(self, spec):
"""Initialises the :class:`DictSpecificationProvider`
:param spec: The specification
:type spec: dict
"""
validate_type(spec, dict, 'spec')
self._spec = deepcopy(spec)
[docs] def get(self):
"""Gets the specification
:returns: A dict representation of the description
:rtype: dict
"""
return self._spec
[docs]class SpecificationParser:
"""Class for parsing specifications
Given a specification provider, the :class:`SpecificationParser` will get the specification and
produce a tree representation of the nodes.
"""
def __init__(self, plugin_loader):
"""Initialises the :class:`SpecificationParser`
:param provider: The source of the specification description
:type provider: :class:`SpecificationDescriptionProvider`
"""
self._plugin_loader = plugin_loader
plugin_evaluators = self._plugin_loader.load_evaluators()
self._pre_loaded_value_libraries = ValueLibraries(evaluators={**EVALUATOR_LIB, **plugin_evaluators})
[docs] def parse(self, description):
"""Parse the specification description
Reads the metadata from the file and creates any required value libraries,
before initialising a :class:`SpecificationNodeParser` to expand the nodes defined
in the description.
:param description: The specification description
:type description: dict
:returns: An object representing the expanded specification tree.
:rtype: :class:`SpecificationModel`
"""
if 'spec' not in description:
raise SpecFormatError('"spec" node not found in description')
if not isinstance(description['spec'], dict):
raise SpecFormatError('"spec" node should be of type dict')
if not description['spec']:
raise SpecFormatError('"spec" node is empty')
metadata = SpecificationMetadata(
description.get('type'), description.get('creation_time'), description.get('notes')
)
value_libraries = self._pre_loaded_value_libraries.copy()
value_proxy_parser = ValueProxyParser(value_libraries)
self._update_value_libraries(description, value_proxy_parser, value_libraries)
node_parser = SpecificationNodeParser(
value_proxy_parser, self._get_combinators(), default_combinator=PRODUCT
)
root_node = node_parser.parse(description.get('spec'))
root_node.evaluate()
return SpecificationModel(description.get('base_file'), root_node, metadata)
def _update_value_libraries(self, description, value_proxy_parser, value_libraries):
built_in_generators = GeneratorsParser.load_generators_from_module(generator_methods)
plugin_generators = self._plugin_loader.load_generators()
generator_lib = GeneratorsParser({
**built_in_generators, **plugin_generators
}).parse(description.get('generators'))
value_libraries.generators.update(generator_lib)
macros_lib = MacrosParser(value_libraries, value_proxy_parser).parse(description.get('macros'))
value_libraries.macros.update(macros_lib)
@staticmethod
def _get_combinators():
return DEFAULT_COMBINATORS
[docs]class SpecificationNodeParser:
"""Expands the specification nodes, starting at the ``node_spec`` provided
Given a starting `node_spec`, the specification ``node_spec`` parser assesses
child nodes and expands them according to their values.
"""
def __init__(self, value_proxy_parser, combinators=None, default_combinator=None):
"""Initialises the node parser
:param value_proxy_parser: The value proxy parser
:type value_proxy_parser: :class:`ValueProxyParser`
:param combinators: A mapping between combinator names (e.g. zip, product) and combinators.
The default is {}
:type combinators: dict
"""
self._combinators = combinators or {}
self._default_combinator = default_combinator
self._node_factory = SpecificationNodeFactory()
self._value_proxy_parser = value_proxy_parser
[docs] def parse(self, node_spec, parent=None, node_policies=None, ghost_parameters=None):
"""Parse the `node_spec`, and expand its children.
This iterates through a `node_spec` and expands it's children.
:param node_spec: A node specification
:type node_spec: dict
:param parent: A specification node to add the new nodes to
:type parent: :class:`SpecificationNode`
:returns: The expanded `node_spec`
:rtype: :class:`SpecificationNode`
"""
next_node_policies, node_spec = self._get_policies(node_spec)
node_policies = self._merge_policies(node_policies, next_node_policies)
next_ghost_parameters, node_spec = self._get_ghost_parameters(node_spec)
ghost_parameters = {**(ghost_parameters or {}), **next_ghost_parameters}
parent = parent or SpecificationNode.create_root(node_policies.pop(PATH, None))
if node_spec is None or node_spec == {}:
return parent
validate_type(node_spec, dict, 'node_spec')
(name, value), next_node_spec = self._get_next_node(node_spec)
self._parse_value(parent, name, value, next_node_spec, node_policies, ghost_parameters)
return parent
@staticmethod
def _merge_policies(left, right):
if not left:
return right
if not right:
return left
merged = {}
if PATH in left and PATH in right:
merged[PATH] = str(PathBuilder(left[PATH]).join(right[PATH]))
return {**left, **right, **merged}
def _parse_value(self, parent, name, value, next_node_spec, node_policies, ghost_parameters):
logger = logging.getLogger(__name__)
literal, name, value = self._parse_literal(name, value)
# combinator lookup
if self._is_combinator(name):
logger.debug('Parsing "%s" as combinator', name)
self._parse_combinator(parent, name, value, next_node_spec, node_policies, ghost_parameters)
# list expansion
elif isinstance(value, list) and not literal:
logger.debug('Parsing "%s" as list', name)
for val in value:
self._parse_value(parent, name, val, next_node_spec, node_policies, ghost_parameters)
# burrow into object
elif isinstance(value, dict) and not literal:
logger.debug('Parsing "%s" as object', name)
self.parse(value, parent, node_policies=node_policies, ghost_parameters=ghost_parameters)
self.parse(next_node_spec, parent, node_policies=node_policies, ghost_parameters=ghost_parameters)
# rhs prefixed proxies (evaluators and co.) - short form and long form
elif isinstance(value, str) and self._is_value_proxy(value) and not literal:
logger.debug('Parsing "%s" as value proxy', name)
next_parent = ValueProxyNode(
parent, name, self._value_proxy_parser.parse(value),
node_policies.get(PATH, None), ghost_parameters
)
self.parse(next_node_spec, next_parent)
# simple single value
else:
logger.debug('Parsing "%s" as raw value (%s)', name, value)
next_parent = self._node_factory.create(
parent, name, value, node_policies.get(PATH, None), ghost_parameters, literal=literal
)
self.parse(next_node_spec, next_parent)
def _parse_literal(self, name, value):
literal_key = self._is_literal(name)
literal_value = self._is_literal(value)
literal = literal_key or literal_value
if literal_key:
name = self._deliteral(name)
elif literal_value:
value = self._deliteral(value)
return literal, name, value
def _is_value_proxy(self, value):
return self._value_proxy_parser.is_value_proxy(value)
def _is_combinator(self, value):
is_equal = lambda f: value == '{}{}'.format(self._prefix(COMBINATOR), f)
return any(map(is_equal, self._combinators.keys()))
def _get_combinator(self, name):
prefix, combinator = self._parts(name)
if prefix != COMBINATOR:
#pylint: disable=line-too-long
raise ValueError('prefix "{}" does not match combinator prefix "{}"'.format(prefix, COMBINATOR))
if combinator not in self._combinators:
raise ValueError('combinator "{}" not found'.format(combinator))
return self._combinators[combinator]
def _parse_combinator(self, parent, name, value, next_node_spec, node_policies, ghost_parameters):
for node_spec in self._get_combinator(name)(value):
node_spec = {**node_spec, **next_node_spec}
self.parse(node_spec, parent, node_policies, ghost_parameters)
def _get_next_node(self, node_spec):
next_key = list(node_spec.keys())[0]
if len(node_spec) == 1:
return (next_key, node_spec[next_key]), {}
# If the next value is a list (but key is not a list), expand it using the default combinator if possible
if not self._is_combinator(next_key)\
and (isinstance(node_spec[next_key], list) and not self._is_literal(next_key))\
and self._default_combinator:
return ('{}{}'.format(self._prefix(COMBINATOR), self._default_combinator), node_spec), {}
next_node_spec = {k: v for k, v in node_spec.items() if k != next_key}
return (next_key, node_spec[next_key]), next_node_spec
def _get_policies(self, node_spec):
if not node_spec:
return {}, {}
prefix = self._prefix(POLICY)
policies = {k.replace(prefix, ''): v for k, v in node_spec.items() if k.startswith(prefix)}
return policies, {k: v for k, v in node_spec.items() if not k.startswith(prefix)}
def _get_ghost_parameters(self, node_spec):
if not node_spec:
return {}, {}
ghost_parameters = {self._deghost(k): v for k, v in node_spec.items() if self._is_ghost(k)}
return ghost_parameters, {k: v for k, v in node_spec.items() if not self._is_ghost(k)}
@staticmethod
def _is_ghost(prop):
return prop.startswith(GHOST)
@staticmethod
def _deghost(prop):
if not SpecificationNodeParser._is_ghost(prop):
raise ValueError('Cannot deghost a non-ghost property')
return prop[1:]
@staticmethod
def _is_literal(prop):
return isinstance(prop, str) and prop.startswith(LITERAL)
@staticmethod
def _deliteral(prop):
if not SpecificationNodeParser._is_literal(prop):
raise ValueError('Cannot deliteral a non-literal property')
try:
return json.loads(prop[1:])
except json.JSONDecodeError:
return prop[1:]
@staticmethod
def _prefix(name):
return '{}:'.format(name)
@staticmethod
def _parts(value):
if ':' in value:
return value.split(':', 1)
return None, value