Source code for qface.generator


# Copyright (c) Pelagicore AB 2016

from jinja2 import Environment, Template, Undefined, StrictUndefined
from jinja2 import FileSystemLoader, PackageLoader, ChoiceLoader
from jinja2 import TemplateSyntaxError, TemplateNotFound, TemplateError
from path import Path
from antlr4 import InputStream, FileStream, CommonTokenStream, ParseTreeWalker
from antlr4.error import DiagnosticErrorListener, ErrorListener
import shelve
import logging
import hashlib
import yaml
import click
import sys
import os

import antlr4.atn.ATNDeserializer
if (antlr4.atn.ATNDeserializer.SERIALIZED_VERSION == 3):
    from .idl.parser.TLexer import TLexer
    from .idl.parser.TParser import TParser
    from .idl.parser.TListener import TListener
elif (antlr4.atn.ATNDeserializer.SERIALIZED_VERSION == 4):
    from .idl.parser.T4Lexer import T4Lexer as TLexer
    from .idl.parser.T4Parser import T4Parser as TParser
    from .idl.parser.T4Listener import T4Listener as TListener
from .idl.profile import EProfile
from .idl.domain import System
from .idl.listener import DomainListener
from .filters import get_filters

try:
    from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
    from yaml import Loader, Dumper

logger = logging.getLogger(__name__)


def merge(a, b):
    "merges b into a recursively if a and b are dicts"
    for key in b:
        if isinstance(a.get(key), dict) and isinstance(b.get(key), dict):
            merge(a[key], b[key])
        else:
            a[key] = b[key]
    return a


class TestableUndefined(StrictUndefined):
    """Return an error for all undefined values, but allow testing them in if statements"""
    def __bool__(self):
        return False


class ReportingErrorListener(ErrorListener.ErrorListener):
    """ Provides an API for accessing the file system and controlling the generator """
    def __init__(self, document):
        self.document = document

    def syntaxError(self, recognizer, offendingSymbol, line, column, msg, e):
        msg = '{0}:{1}:{2} {3}'.format(self.document, line, column, msg)
        click.secho(msg, fg='red')
        raise ValueError(msg)

    def reportAmbiguity(self, recognizer, dfa, startIndex, stopIndex, exact, ambigAlts, configs):
        click.secho('ambiguity', fg='red')

    def reportAttemptingFullContext(self, recognizer, dfa, startIndex, stopIndex, conflictingAlts, configs):
        click.secho('reportAttemptingFullContext', fg='red')

    def reportContextSensitivity(self, recognizer, dfa, startIndex, stopIndex, prediction, configs):
        click.secho('reportContextSensitivity', fg='red')


[docs]class Generator(object): """Manages the templates and applies your context data""" strict = False """ enables strict code generation """ def __init__(self, search_path, context={}, force=False): loader = ChoiceLoader([ FileSystemLoader(search_path), PackageLoader('qface') ]) self.env = Environment( loader=loader, trim_blocks=True, lstrip_blocks=True, ) self.env.filters.update(get_filters()) self._destination = Path() self._path = Path() self._source = '' self.context = context self.force = force @property def destination(self): """destination prefix for generator write""" return self._destination @destination.setter def destination(self, dst): self._destination = dst @property def resolved_path(self): return self.destination / self.path @property def path(self): return self._path @path.setter def path(self, path): if not path: return self._path = Path(self.apply(path)) @property def source(self): """source prefix for template lookup""" return self._source @source.setter def source(self, source): if source: self._source = source @property def filters(self): return self.env.filters @filters.setter def filters(self, filters): self.env.filters.update(filters)
[docs] def get_template(self, name): """Retrieves a single template file from the template loader""" source = name if name and name[0] == '/': source = name[1:] elif self.source is not None: source = '/'.join((self.source, name)) return self.env.get_template(source)
[docs] def render(self, name, context): """Returns the rendered text from a single template file from the template loader using the given context data""" if Generator.strict: self.env.undefined = TestableUndefined else: self.env.undefined = Undefined template = self.get_template(name) return template.render(context)
def apply(self, template, context={}): context.update(self.context) """Return the rendered text of a template instance""" return self.env.from_string(template).render(context)
[docs] def write(self, file_path, template, context={}, preserve=False, force=False): """Using a template file name it renders a template into a file given a context """ if not file_path or not template: click.secho('source or target missing for document') return if not context: context = self.context error = False try: self._write(file_path, template, context, preserve, force) except TemplateSyntaxError as exc: message = '{0}:{1}: error: {2}'.format(exc.filename, exc.lineno, exc.message) click.secho(message, fg='red', err=True) error = True except TemplateNotFound as exc: message = '{0}: error: Template not found'.format(exc.name) click.secho(message, fg='red', err=True) error = True except TemplateError as exc: exc_tb = sys.exc_info()[2] while exc_tb.tb_next != None: exc_tb = exc_tb.tb_next message = '{0}:{1}: error: {2}'.format(exc_tb.tb_frame.f_code.co_filename, exc_tb.tb_lineno, exc.message) click.secho(message, fg='red', err=True) error = True if error and Generator.strict: sys.exit(1)
def _write(self, file_path: Path, template: str, context: dict, preserve: bool = False, force: bool = False): force = self.force or force path = self.resolved_path / Path(self.apply(file_path, context)) if path.parent: path.parent.makedirs_p() logger.info('write {0}'.format(path)) data = self.render(template, context) if self._has_different_content(data, path) or force: if path.exists() and preserve and not force: click.secho('preserve: {0}'.format(path), fg='blue') else: click.secho('create: {0}'.format(path), fg='blue') path.open('w', encoding='utf-8').write(data) def _has_different_content(self, data, path): if not path.exists(): return True dataHash = hashlib.new('md5', data.encode('utf-8')).digest() pathHash = path.read_hash('md5') return dataHash != pathHash
[docs] def register_filter(self, name, callback): """Register your custom template filter""" self.env.filters[name] = callback
class RuleGenerator(Generator): """Generates documents based on a rule YAML document""" def __init__(self, search_path: str, destination:Path, context:dict={}, features:set=set(), force=False): super().__init__(search_path, context, force) self.context.update({ 'dst': destination, 'project': Path(destination).name, 'features': features, }) self.destination = destination self.features = features def process_rules(self, path: Path, system: System): """writes the templates read from the rules document""" self.context.update({ 'system': system, }) document = FileSystem.load_yaml(path, required=True) for module, rules in document.items(): click.secho('process: {0}'.format(module), fg='green') self._process_rules(rules, system) def _process_rules(self, rules: dict, system: System): """ process a set of rules for a target """ self._source = None # reset the template source if not self._shall_proceed(rules): return self.context.update(rules.get('context', {})) self.path = rules.get('path', '') self.source = rules.get('source', None) self._process_rule(rules.get('system', None), {'system': system}) for module in system.modules: self._process_rule(rules.get('module', None), {'module': module}) for interface in module.interfaces: self._process_rule(rules.get('interface', None), {'interface': interface}) for struct in module.structs: self._process_rule(rules.get('struct', None), {'struct': struct}) for enum in module.enums: self._process_rule(rules.get('enum', None), {'enum': enum}) def _process_rule(self, rule: dict, context: dict): """ process a single rule """ if not rule or not self._shall_proceed(rule): return self.context.update(context) self.context.update(rule.get('context', {})) self.path = rule.get('path', None) self.source = rule.get('source', None) for entry in rule.get('documents', []): target, source = self._resolve_rule_document(entry) self.write(target, source) for entry in rule.get('preserve', []): target, source = self._resolve_rule_document(entry) self.write(target, source, preserve=True) def _shall_proceed(self, obj): conditions = obj.get('when', []) if not conditions: return True if not isinstance(conditions, list): conditions = [conditions] result = self.features.intersection(set(conditions)) return bool(len(result)) def _resolve_rule_document(self, entry): if type(entry) is dict: return next(iter(entry.items())) return (entry, entry)
[docs]class FileSystem(object): """QFace helper functions to work with the file system""" strict = False """ enables strict parsing """ @staticmethod def parse_document(document: Path, system: System = None, profile=EProfile.FULL): error = False try: return FileSystem._parse_document(document, system, profile) except FileNotFoundError as e: click.secho('{0}: error: file not found'.format(document), fg='red', err=True) error = True except ValueError as e: # The error is already printed in the ErrorHandler in this case error = True except Exception as e: click.secho('Error parsing document {0}'.format(document), fg='red', err=True) error = True if error and FileSystem.strict: sys.exit(-1) @staticmethod def parse_text(text: str, system: System = None, profile=EProfile.FULL): stream = InputStream(text) return FileSystem._parse_stream(stream, system, "<TEXT>", profile) @staticmethod def _parse_document(document: Path, system: System = None, profile=EProfile.FULL): """Parses a document and returns the resulting domain system :param path: document path to parse :param system: system to be used (optional) """ logger.debug('parse document: {0}'.format(document)) stream = FileStream(str(document), encoding='utf-8') system = FileSystem._parse_stream(stream, system, document, profile) FileSystem.merge_annotations(system, document.stripext() + '.yaml') return system @staticmethod def _parse_stream(stream: InputStream, system: System = None, document=None, profile=EProfile.FULL): logger.debug('parse stream') system = system or System() lexer = TLexer(stream) stream = CommonTokenStream(lexer) parser = TParser(stream) parser.removeErrorListeners() parser.addErrorListener(ReportingErrorListener(document)) tree = parser.documentSymbol() walker = ParseTreeWalker() walker.walk(DomainListener(system, profile), tree) return system
[docs] @staticmethod def merge_annotations(system, document): """Read a YAML document and for each root symbol identifier updates the tag information of that symbol """ if not Path(document).exists(): return meta = FileSystem.load_yaml(document) if not meta: click.secho('skipping empty: {0}'.format(document.name), fg='blue') return else: click.secho('merge: {0}'.format(document.name), fg='blue') try: for identifier, data in meta.items(): symbol = system.lookup(identifier) if symbol: merge(symbol.tags, data) except Exception as e: click.secho('Error parsing annotation {0}: {1}'.format(document, e), fg='red', err=True) if FileSystem.strict: sys.exit(-1)
[docs] @staticmethod def parse(input, identifier: str = None, use_cache=False, clear_cache=True, pattern="*.qface", profile=EProfile.FULL): """Input can be either a file or directory or a list of files or directory. A directory will be parsed recursively. The function returns the resulting system. Stores the result of the run in the domain cache named after the identifier. :param path: directory to parse :param identifier: identifies the parse run. Used to name the cache :param clear_cache: clears the domain cache (defaults to true) """ inputs = input if isinstance(input, (list, tuple)) else [input] logger.debug('parse input={0}'.format(inputs)) identifier = 'system' if not identifier else identifier system = System() cache = None if use_cache: cache = shelve.open('qface.cache') if identifier in cache and clear_cache: del cache[identifier] if identifier in cache: # use the cached domain model system = cache[identifier] # if domain model not cached generate it for input in inputs: path = Path.getcwd() / str(input) if path.isfile(): FileSystem.parse_document(path, system) else: for document in path.walkfiles(pattern): FileSystem.parse_document(document, system) if use_cache: cache[identifier] = system return system
@staticmethod def load_yaml(document: Path, required=False): document = Path(document) if not document.exists(): if required: click.secho('yaml document does not exists: {0}'.format(document), fg='red', err=True) if FileSystem.strict: sys.exit(-1) return {} try: # Silence the deprecation warning in newer path.py # but keep supporting older versions if not hasattr(Path, 'read_text'): document.read_text = document.text return yaml.load(document.read_text(), Loader=Loader) except yaml.YAMLError as exc: error = document if hasattr(exc, 'problem_mark'): error = '{0}:{1}'.format(error, exc.problem_mark.line+1) click.secho('{0}: error: {1}'.format(error, str(exc)), fg='red', err=True) if FileSystem.strict: sys.exit(-1) return {}