Source code for taurus.core.evaluation.evalvalidator

#!/usr/bin/env python
# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################

import re
import hashlib
import taurus
from taurus import isValidName, debug
from taurus.core import TaurusElementType

from taurus.core.taurusvalidator import (
    TaurusAttributeNameValidator,
    TaurusDeviceNameValidator,
    TaurusAuthorityNameValidator,
)

__all__ = ["EvaluationDeviceNameValidator", "EvaluationAttributeNameValidator"]

# Pattern for python variables
PY_VAR = r"(?<![\.a-zA-Z0-9_])[a-zA-Z_][a-zA-Z0-9_]*"
PY_VAR_RE = re.compile(PY_VAR)
# Pattern for semicolon-separated <variable>=<value> pairs (in URI paths)
K_EQUALS_V = r"(%s)=([^?#=;]+)" % PY_VAR
K_EQUALS_V_RE = re.compile(K_EQUALS_V)
#
QUOTED_TEXT = "(\".*?\"|'.*?')"
QUOTED_TEXT_RE = re.compile(QUOTED_TEXT)


def _findAllTokensBetweenChars(string, start, end, n=None):
    """Finds the text between (possibly nested) delimiters in a string.
    In case of nested delimiters, only the outermost level is
    returned. It returns a tuple of (idx,token)

    Example::

      _findAllTokensBetweenChars('{foo}bar{zig{zag}}boom', '{', '}')
      --> [(1,'foo'), (9, 'zig{zag}')]


    :param string: the expression to parse
    :type string: str
    :param start: the char delimiting the start of a token
    :type start: str
    :param end: the char delimiting the end of a token
    :type end: str
    :param n: If an int is passed, it sets the maximum number of tokens to be
        found
    :type n: int or None
    :return: ) a list of (idx, token) tuples. The idx is the position of the
        token in `string` (tokens d not include the delimiting chars not
        including the brackets)
    :rtype: list(<int>,<str>
    """
    if start == end:
        raise ValueError("star_char must be different from end_char")
    if string.count(start) != string.count(end):
        raise ValueError(
            'Non-matching delimiters (%i "%s" vs %i "%s")'
            % string.count(start),
            start,
            string.count(end),
            end,
        )
    tokens = []
    idx = 0
    rest = string
    while len(tokens) != n:
        s = rest.find(start)
        if s < 0:
            break
        e = rest.find(end) + 1
        while rest[s:e].count(start) != rest[s:e].count(end):
            ne = rest[e:].find(end)
            e = e + 1 + ne
        tokens.append((idx + s, rest[s + 1 : e - 1]))
        idx += e
        rest = rest[e:]
    return tokens


def _isQuoted(string, substring, idx):
    """returns True if position i of string is in a quoted region"""
    bfr = string[:idx]
    aft = string[idx + len(substring) :]
    if (
        bfr.count('"') % 2
        or aft.count('"') % 2
        or bfr.count("'") % 2
        or aft.count("'") % 2
    ):
        return True
    else:
        return False


def _replacepos(string, old, new, idx):
    """return copy of string where the occurrence of substring `old` at
    position `pos` is replaced by `new`
    """
    if not string[idx:].startswith(old):
        raise Exception("invalid")
    return string[:idx] + new + string[idx + len(old) :]


class EvaluationAuthorityNameValidator(TaurusAuthorityNameValidator):
    """Validator for Evaluation authority names. For now, the only supported
    authority (in strict mode) is "//localhost":
    """

    scheme = "eval"
    authority = "//localhost"
    path = "(?!)"
    query = "(?!)"
    fragment = "(?!)"

    @property
    def nonStrictNamePattern(self):
        """implement in derived classes if a "less strict" pattern is allowed
        (e.g. for backwards-compatibility, "tango://a/b/c" could be an accepted
        device name, even if it breaks RFC3986).
        """
        return r"^(?P<scheme>eval|evaluation)://(db=(?P<dbname>[^?#;]+))$"


[docs] class EvaluationDeviceNameValidator(TaurusDeviceNameValidator): """Validator for Evaluation device names. Apart from the standard named groups (scheme, authority, path, query and fragment), the following named groups are created: - devname: device name (either _evalname or _evaldotname) - [_evalname]: evaluation instance name (aka non-dotted dev name) - [_evaldotname]: evaluation instance dotted name (if dotted name given) - [_old_devname]: devname without "@". Only in non-strict mode - [_dbname] and [_subst]: unused. Only if non-strict mode Note: brackets on the group name indicate that this group will only contain a string if the URI contains it. """ scheme = "eval" authority = EvaluationAuthorityNameValidator.authority _evaldotname = ( r"((?P<_evalinstname>\w+)=)?" + r"(?P<_evalmodname>(\w+\.)*\w+)\." + r"(?P<_evalclassname>(\w+|\*))" + r'(?P<_evalclassparenths>\(("[^"]*"|\'[^\']*\'|[^\'"/])*?\))?' ) # _evaldotname = r'(?P<_evaldotname>(\w+=)?(\w+\.)+(\w+(\(\))?|\*)))' _evaluatorname = ( r"((?P<_evalname>[^/?#:\.=]+)|(?P<_evaldotname>%s))" % _evaldotname ) devname = r"(?P<devname>@%s)" % _evaluatorname path = r"(?!//)/?%s" % devname query = "(?!)" fragment = "(?!)"
[docs] def getUriGroups(self, name, strict=None): """reimplemented from :class:`TaurusDeviceNameValidator` to provide backwards compatibility with ol syntax """ groups = TaurusDeviceNameValidator.getUriGroups( self, name, strict=strict ) if groups is not None and not groups["__STRICT__"]: _old_devname = groups["_old_devname"] groups["devname"] = "@%s" % _old_devname if "." in _old_devname: groups["_evalname"] = None groups["_evaldotname"] = _old_devname else: groups["_evalname"] = _old_devname groups["_evaldotname"] = None return groups
[docs] def getNames(self, fullname, factory=None): """reimplemented from :class:`TaurusDeviceNameValidator`""" from .evalfactory import EvaluationFactory # TODO: add mechanism to select strict mode instead of hardcoding here groups = self.getUriGroups(fullname) if groups is None: return None authority = groups.get("authority") if authority is None: f_or_fklass = factory or EvaluationFactory groups["authority"] = authority = f_or_fklass.DEFAULT_AUTHORITY complete = "eval:%(authority)s/%(devname)s" % groups normal = "%(devname)s" % groups short = normal.lstrip("@") return complete, normal, short
@property def nonStrictNamePattern(self): """In non-strict mode support old-style eval names""" p = ( r"^(?P<scheme>eval|evaluation)://(db=(?P<_dbname>[^?#;]+);)?" + r"(dev=(?P<_old_devname>%s))" % self._evaluatorname + r"(\?(?!configuration=)(?P<_subst>[^#?]*))?$" ) return p
[docs] class EvaluationAttributeNameValidator(TaurusAttributeNameValidator): """Validator for Evaluation attribute names. Apart from the standard named groups (scheme, authority, path, query and fragment), the following named groups are created: - attrname: attribute name. same as concatenating _subst with _expr - _expr: a mathematical expression - _evalrefs: a list of eval refs found in the name (see :meth:`getRefs`) - [_subst]: a semicolon-separated repetition of key=value (for replacing them in _expr) - [devname]: as in :class:`EvaluationDeviceNameValidator` - [_evalname]: evaluation instance name (aka non-dotted dev name) - [_evaldotname]: evaluator instance dotted name (if dotted name given) - [_old_devname]: devname without "@". Only in non-strict mode - [_dbname] and [_subst]: unused. Only if non-strict mode - [cfgkey] same as fragment (for bck-compat use only) Note: brackets on the group name indicate that this group will only contain a value if the URI contains it. """ scheme = "eval" authority = EvaluationAuthorityNameValidator.authority path = ( r"(?!//)/?(%s/)?" + r"(?P<attrname>(?P<_subst>(%s;)+)?(?P<_expr>[^@?#]+))" ) % (EvaluationDeviceNameValidator.devname, K_EQUALS_V) query = "(?!)" fragment = "(?P<cfgkey>[^# ]*)"
[docs] @staticmethod def expandExpr(expr, substmap): """expands expr by substituting all keys in map by their value. Note that eval references in expr (i.e. text within curly brackets) is not substituted. :param expr: string that may contain symbols defined in symbolMap :type expr: str :param symbolMap: dictionary whose keys (strings) are symbols to be substituted in `expr` and whose values are the corresponding replacements. Alternatively, a string containing a semi-colon separated list of symbol=value pairs can also be passed. :type symbolMap: dict or str """ if isinstance(substmap, str): substmap = dict(K_EQUALS_V_RE.findall(substmap)) ret = expr protected = {} # temporarily replace the text within quotes by hash-based placeholders for s in QUOTED_TEXT_RE.findall(expr): placeholder = hashlib.md5(s.encode("utf-8")).hexdigest() protected[placeholder] = s ret = re.sub(s, placeholder, ret) # Substitute each k by its v in the expr (unless they are in # references) for k, v in substmap.items(): # create a pattern for matching complete word k # unless it is within between curly brackets keyPattern = r"(?<!\w)%s(?!\w)(?![^\{]*\})" % k # substitute matches of keyPattern by their value ret = re.sub(keyPattern, v, ret) # restore the protected strings for placeholder, s in protected.items(): ret = re.sub(placeholder, s, ret) return ret
[docs] @staticmethod def getRefs(expr, ign_quoted=True): """Find the attribute references (strings within brackets) in an eval expression. In case of nested references, only the outermost level is returned. Example: val.getRefs('{foo}bar{zig{zag}}boom') --> ['foo', 'zig{zag}'] :param expr: the expression to parse :type expr: str :param ign_quoted: If True (default) ignore refs within quotes :return: a list of refs (not including the brackets) :rtype: list<str> """ refs = _findAllTokensBetweenChars(expr, "{", "}") if refs and not ign_quoted: _, refs = list(zip(*refs)) return refs ret = [] for i, ref in refs: if not _isQuoted(expr, "{" + ref + "}", i): ret.append(ref) return ret
[docs] @staticmethod def replaceUnquotedRef(string, substring, repl): """Return a copy of string where first non-quoted occurrence of `substring` is replaced by `repl` :param string: string to be used :type string: str :param substring: substring to be replaced :type substring: str :param repl: replacement :type repl: str :return: :rtype: str """ idx = string.find(substring) while _isQuoted(string, substring, idx): idx = string.find(substring, idx + 1) return _replacepos(string, substring, repl, idx)
[docs] def isValid(self, name, matchLevel=None, strict=None): """reimplemented from :class:`TaurusAttributeNameValidator` to do extra check on references validity (recursive) """ # Standard implementation if matchLevel is not None: groups = self._isValidAtLevel(name, matchLevel=matchLevel) else: groups = self.getUriGroups(name, strict=strict) if groups is None: return False # now check the references for ref in groups["_evalrefs"]: if not isValidName( ref, etypes=(TaurusElementType.Attribute,), strict=strict ): debug( '"%s" is invalid because ref "%s" is not a ' + "valid attribute", name, ref, ) return False return True
[docs] def getUriGroups(self, name, strict=None): """reimplemented from :class:`TaurusAttributeNameValidator` to provide backwards compatibility with old syntax """ # mangle refs before matching the pattern to sanitize them refs = self.getRefs(name, ign_quoted=False) refs_dict = {} _name = name for i, ref in enumerate(refs): refs_dict["__EVALREF_%d__" % i] = "{%s}" % ref _name = _name.replace("{%s}" % ref, "{__EVALREF_%d__}" % i, 1) _groups = TaurusAttributeNameValidator.getUriGroups( self, _name, strict=strict ) if _groups is None: return None # create the groups dict with unmangled refs in its values groups = {} for n, g in _groups.items(): if isinstance(g, str): # avoid None or boolean values g = g.format(**refs_dict) groups[n] = g if not groups["__STRICT__"]: # adapt attrname to what would be in strict mode _subst = groups["_subst"] or "" _expr = groups["_expr"] if _subst: groups["attrname"] = "%s;%s" % (_subst.rstrip(";"), _expr) else: groups["attrname"] = _expr # adapt devname to what would be in strict mode old_devname = groups["_old_devname"] if old_devname is None: groups["devname"] = None else: groups["devname"] = "@%s" % old_devname # check that there are not ";" in the expr (ign. quoted text and refs) sanitized_expr = QUOTED_TEXT_RE.sub("", groups["_expr"]) for ref in self.getRefs(sanitized_expr, ign_quoted=False): sanitized_expr = sanitized_expr.replace(ref, "") if ";" in sanitized_expr: return None # add a group containing refs in attrname (ign. those in quoted text) groups["_evalrefs"] = self.getRefs(groups["attrname"], ign_quoted=True) return groups
def _getSimpleNameFromExpression(self, expression): """Get the simple name of an evaluationAttribute from an expression""" name = expression for ref in self.getRefs(expression, ign_quoted=True): manager = taurus.core.TaurusManager() scheme = manager.getScheme(ref) _f = taurus.Factory(scheme) attrNameValidator = _f.getAttributeNameValidator() _, _, simple_name = attrNameValidator.getNames(ref) name = self.replaceUnquotedRef(name, "{%s}" % ref, simple_name) return name def _expandRefNames(self, attrname): """Expand the refs in an eval name to their full names""" name = attrname for ref in self.getRefs(attrname, ign_quoted=True): manager = taurus.core.TaurusManager() scheme = manager.getScheme(ref) _f = taurus.Factory(scheme) attrNameValidator = _f.getAttributeNameValidator() full_name, _, _ = attrNameValidator.getNames(ref) if full_name is None: debug("Cannot expand the fullname of %s" % ref) return None name = self.replaceUnquotedRef( name, "{%s}" % ref, "{%s}" % full_name ) return name
[docs] def getNames(self, fullname, factory=None, fragment=False): """reimplemented from :class:`TaurusDeviceNameValidator`""" from .evalfactory import EvaluationFactory groups = self.getUriGroups(fullname) if groups is None: return None f_or_fklass = factory or EvaluationFactory authority = groups.get("authority") if authority is None: groups["authority"] = authority = f_or_fklass.DEFAULT_AUTHORITY devname = groups.get("devname") if devname is None: groups["devname"] = devname = f_or_fklass.DEFAULT_DEVICE complete = "eval:%s/%s/%s" % (authority, devname, groups["attrname"]) complete = self._expandRefNames(complete) normal = groups["attrname"] if devname != f_or_fklass.DEFAULT_DEVICE: normal = "%s/%s" % (devname, normal) if authority != f_or_fklass.DEFAULT_AUTHORITY: normal = "%s/%s" % (authority, normal) short = self._getSimpleNameFromExpression(groups["_expr"]) # return fragment if requested if fragment: key = groups.get("fragment", None) return complete, normal, short, key return complete, normal, short
@property def nonStrictNamePattern(self): """In non-strict mode support old-style eval config names""" p = ( r"^(?P<scheme>eval|evaluation)://(db=(?P<_dbname>[^?#;]+);)?" + r"(dev=(?P<_old_devname>[^?#;]+);)?" + r"(?P<_expr>[^?#;]+)" + r"(\?(?P<_substquery>(?!configuration=)(?P<_subst>%s(;%s)*)))?" % (K_EQUALS_V, K_EQUALS_V) + r"(\?(?P<query>configuration(=" + "(?P<fragment>(?P<cfgkey>[^#?]*)))?))?$" ) return p
[docs] def getExpandedExpr(self, name): """ Returns the expanded expression from the attribute name URI :param name: eval attribute URI :type name: str :return: the expression (from the name )expanded with any substitution k,v pairs also defined in the name :rtype: str """ groups = self.getUriGroups(name) if groups is None: return None _expr = groups["_expr"] _subst = groups["_subst"] return self.expandExpr(_expr, _subst or {})
[docs] def getAttrName(self, s): # @TODO: Maybe this belongs to the factory, not the validator # TODO: this is pre-tep14 API from the # EvaluationConfigurationNameValidator. Check usage and remove. names = self.getNames(s) if names is None: return None return names[0]
[docs] def getDeviceName(self, name): # @TODO: Maybe this belongs to the factory, not the validator """Obtain the fullname of the device from the attribute name""" from .evalfactory import EvaluationFactory groups = self.getUriGroups(name) if groups is None: return None authority = groups.get("authority") if authority is None: authority = EvaluationFactory.DEFAULT_AUTHORITY devname = groups.get("devname") if devname is None: devname = EvaluationFactory.DEFAULT_DEVICE return "eval:%s/%s" % (authority, devname)
[docs] def getDBName(self, s): # @TODO: Maybe this belongs to the factory, not the validator """returns the full data base name for the given attribute name""" from .evalfactory import EvaluationFactory m = self.name_re.match(s) if m is None: return None dbname = m.group("dbname") or EvaluationFactory.DEFAULT_DATABASE return "eval://db=%s" % dbname
if __name__ == "__main__": pass