#!/usr/bin/env python
# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus. If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################
"""
This module provides the :class:`TaurusFactory` base class that any valid
Factory in Taurus must inherit.
The Factory objects are the basic block for building and interacting with a
given scheme in Taurus. They provide Taurus model objects (TaurusAuthority,
TaurusDevice or TaurusAttribute) for a given taurus model name.
Taurus model naming is URI based (see <https://tools.ietf.org/html/rfc3986>)
All the standard components of an URI (scheme, authority, path, query and
fragment) may be part of a model name, and they are separated as follows:
<scheme>:<authority><path>?<query>#<fragment>
The following are some points to consider when using and/or implementing
schemes based on this Abstract class:
- It is strongly recommended that the scheme component is always present
explicitly in the model name, although a default scheme can be defined in
:mod:`taurus.tauruscustomsettings` so that model names which do not
explicit the scheme can be auto-completed.
- The authority component (if present on a given name) must always begin by
a double slash ('//').
(see <https://tools.ietf.org/html/rfc3986#section-3.2>)
- The path component, if present, must start by a single slash ('/') (see
<https://tools.ietf.org/html/rfc3986#section-3.3>)
"""
import atexit
from weakref import WeakValueDictionary
from .taurusbasetypes import TaurusElementType
from .taurusauthority import TaurusAuthority
from .taurusdevice import TaurusDevice
from .taurusattribute import TaurusAttribute
from .taurusconfiguration import TaurusConfiguration, TaurusConfigurationProxy
from .taurusexception import TaurusException
from taurus.core.tauruspollingtimer import TaurusPollingTimer
__all__ = ["TaurusFactory"]
__docformat__ = "restructuredtext"
[docs]
class TaurusFactory(object):
"""The base class for valid Factories in Taurus."""
schemes = () # reimplement in derived classes to declare supported schemes
caseSensitive = True # reimplement if your scheme is case insensitive
elementTypesMap = None # reimplement in derived classes to profit from
# generic implementations of getAuthority,
# getDevice, getAttribute, findObjectClass, etc.
# see findObjectClass for more details
DefaultPollingPeriod = 3000
def __init__(self):
atexit.register(self.cleanUp)
self._polling_period = self.DefaultPollingPeriod
self.polling_timers = {}
self._polling_enabled = True
self._attrs = WeakValueDictionary()
self._devs = WeakValueDictionary()
self._auths = WeakValueDictionary()
from . import taurusmanager
manager = taurusmanager.TaurusManager()
self._serialization_mode = manager.getSerializationMode()
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
# API for cleanUp at exit
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs]
def cleanUp(self):
"""Reimplement if you need to execute code on program execution exit.
Default implementation does nothing.
"""
pass
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
# API for serialization
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs]
def setSerializationMode(self, mode):
"""Sets the serialization mode for the system.
:param mode: the new serialization mode
:type mode: TaurusSerializationMode
"""
self._serialization_mode = mode
[docs]
def getSerializationMode(self):
"""Gives the serialization operation mode.
:return: the current serialization mode
:rtype: TaurusSerializationMode
"""
return self._serialization_mode
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
# API to get objects. Generic implementation. You may want to reimplement
# it in your scheme factory
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs]
def getAuthority(self, name=None):
"""Obtain the model object corresponding to the given authority name.
If the corresponding authority already exists, the existing instance
is returned. Otherwise a new instance is stored and returned.
:param name: authority name
:type name: str
:return: a taurus.core.taurusauthority.TaurusAuthority object
:raises: :TaurusException: if the given name is invalid.
"""
v = self.getAuthorityNameValidator()
if not v.isValid(name):
msg = "Invalid {scheme} authority name '{name}'".format(
scheme=self.schemes[0], name=name
)
raise TaurusException(msg)
fullname, _, _ = v.getNames(name)
auth = self._auths.get(fullname)
if auth is not None:
return auth
cls = self.elementTypesMap[TaurusElementType.Authority]
auth = cls(name=fullname)
self._auths[fullname] = auth
return auth
[docs]
def getDevice(self, name, **kw):
"""Obtain the model object corresponding to the given device name.
If the corresponding device already exists, the existing instance
is returned. Otherwise a new instance is stored and returned.
:param name: device name
:type name: str
:return: a taurus.core.taurusdevice.TaurusDevice object
:raises: :TaurusException: if the given name is invalid.
"""
v = self.getDeviceNameValidator()
if not v.isValid(name):
msg = "Invalid {scheme} device name '{name}'".format(
scheme=self.schemes[0], name=name
)
raise TaurusException(msg)
fullname, _, _ = v.getNames(name)
dev = self._devs.get(fullname)
if dev is not None:
return dev
try:
# this works if the authority name is present in the dev full name
# (which in principle should always be the case)
authname = v.getUriGroups(fullname)["authority"]
auth = self.getAuthority(authname)
except Exception:
self.debug('Cannot get device parent from name "%s"', fullname)
auth = None
cls = self.elementTypesMap[TaurusElementType.Device]
dev = cls(name=fullname, parent=auth)
self._devs[fullname] = dev
return dev
[docs]
def getAttribute(self, name):
"""Obtain the model object corresponding to the given attribute name.
If the corresponding attribute already exists, the existing instance
is returned. Otherwise a new instance is stored and returned.
:param name: attribute name
:type name: str
:return: a taurus.core.taurusattribute.TaurusAttribute object
:raises: :TaurusException: if the given name is invalid.
"""
v = self.getAttributeNameValidator()
if not v.isValid(name):
msg = "Invalid {scheme} attribute name '{name}'".format(
scheme=self.schemes[0], name=name
)
raise TaurusException(msg)
fullname, _, _ = v.getNames(name)
attr = self._attrs.get(fullname)
if attr is not None:
return attr
try:
# this works only if the devname is present in the attr full name
# (not all schemes are constructed in this way)
devname = v.getUriGroups(fullname)["devname"]
dev = self.getDevice(devname)
except Exception:
self.debug('Cannot get attribute parent from name "%s"', fullname)
dev = None
cls = self.elementTypesMap[TaurusElementType.Attribute]
attr = cls(name=fullname, parent=dev)
self._attrs[fullname] = attr
return attr
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
# Methods that must be implemented by the specific Factory
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs]
def getAuthorityNameValidator(self):
raise NotImplementedError(
"getAuthorityNameValidator cannot be called"
" for abstract TaurusFactory"
)
[docs]
def getDeviceNameValidator(self):
raise NotImplementedError(
"getDeviceNameValidator cannot be called"
" for abstract TaurusFactory"
)
[docs]
def getAttributeNameValidator(self):
raise NotImplementedError(
"getAttributeNameValidator cannot be called"
" for abstract TaurusFactory"
)
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
# Factory extension API
# Override the following methods if you need to provide special classes for
# special object types
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs]
def registerAttributeClass(self, attr_name, attr_klass):
pass
[docs]
def unregisterAttributeClass(self, attr_name):
pass
[docs]
def registerDeviceClass(self, dev_klass_name, dev_klass):
pass
[docs]
def unregisterDeviceClass(self, dev_klass_name):
pass
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
# Generic methods
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs]
def supportsScheme(self, scheme):
"""Returns whether the given scheme is supported by this factory
:param scheme: the name of the schem to be checked
:type scheme: str
:return: True if the scheme is supported (False otherwise)
:rtype: bool
"""
return scheme in self.schemes
[docs]
def findObject(self, absolute_name):
"""Must give an absolute name"""
if not absolute_name:
return None
obj = None
cls = self.findObjectClass(absolute_name)
if cls:
obj = self.getObject(cls, absolute_name)
return obj
[docs]
def getObject(self, cls, name):
t4_msg = "The TaurusConfiguration classes are deprecated in tep14"
if issubclass(cls, TaurusAuthority):
return self.getAuthority(name)
elif issubclass(cls, TaurusDevice):
return self.getDevice(name)
elif issubclass(cls, TaurusAttribute):
return self.getAttribute(name)
# For backward compatibility
elif issubclass(cls, TaurusConfiguration):
self.deprecated(
dep="TaurusConfiguration",
alt="TaurusAttribute",
rel="4.0",
dbg_msg=t4_msg,
)
return self.getAttribute(name)
elif issubclass(cls, TaurusConfigurationProxy):
self.deprecated(
dep="TaurusConfigurationProxy",
alt="TaurusAttribute",
rel="4.0",
dbg_msg=t4_msg,
)
return self.getAttribute(name)
else:
return None
[docs]
def changeDefaultPollingPeriod(self, period):
if period > 0:
self._polling_period = period
[docs]
def getDefaultPollingPeriod(self):
return self._polling_period
[docs]
def isPollingEnabled(self):
"""Tells if the Taurus polling is enabled
:return: whether or not the polling is enabled
:rtype: bool
"""
return self._polling_enabled
[docs]
def disablePolling(self):
"""Disable the application tango polling"""
if not self.isPollingEnabled():
return
self._polling_enabled = False
for period, timer in self.polling_timers.items():
timer.stop()
[docs]
def enablePolling(self):
"""Enable the application tango polling"""
if self.isPollingEnabled():
return
for period, timer in self.polling_timers.items():
timer.start()
self._polling_enabled = True
[docs]
def addAttributeToPolling(self, attribute, period, unsubscribe_evts=False):
"""Activates the polling (client side) for the given attribute with the
given period (seconds).
:param attribute: the attribute to be added
:type attribute: taurus.core.taurusattribute.TaurusAttribute
:param period: polling period (in seconds)
:type period: float
:param unsubscribe_evts: whether or not to unsubscribe from events
:type unsubscribe_evts: bool
"""
tmr = self.polling_timers.get(period)
if tmr is None:
tmr = TaurusPollingTimer(period)
self.polling_timers[period] = tmr
tmr.addAttribute(attribute, self.isPollingEnabled())
[docs]
def removeAttributeFromPolling(self, attribute):
"""Deactivate the polling (client side) for the given attribute. If the
polling of the attribute was not previously enabled, nothing happens.
:param attribute: the attribute to be removed
:type attribute: taurus.core.taurusattribute.TaurusAttribute
"""
timers = dict(self.polling_timers)
for period, timer in timers.items():
timer.removeAttribute(attribute)
if not timer.dev_dict and period in self.polling_timers:
del self.polling_timers[period]
def __str__(self):
return "{0}()".format(self.__class__.__name__)
def __repr__(self):
return "{0}(schemes={1})".format(
self.__class__.__name__, ", ".join(self.schemes)
)
[docs]
def getValidTypesForName(self, name, strict=None):
"""
Returns a list of all Taurus element types for which `name` is a valid
model name (while in many cases a name may only be valid for one
element type, this is not necessarily true in general)
In this base implementation, name is checked first for Attribute, then
for Device and finally for Authority, and the return value is sorted in
that same order.
If a given schema requires a different ordering, reimplement this
method
:param name: taurus model name
:type name: str
:return: where element can be one of: `Attribute`, `Device` or
`Authority`
:rtype: list<TaurusElementType.element>
"""
ret = []
if self.getAttributeNameValidator().isValid(name, strict=strict):
ret.append(TaurusElementType.Attribute)
if self.getDeviceNameValidator().isValid(name, strict=strict):
ret.append(TaurusElementType.Device)
if self.getAuthorityNameValidator().isValid(name, strict=strict):
ret.append(TaurusElementType.Authority)
return ret
[docs]
def getValidatorFromName(self, name):
"""
Obtain the validator object corresponding to the given model
name. If the model name is not valid for any TaurusModel class,
it returns None
"""
modeltypes = self.getValidTypesForName(name)
if not modeltypes:
return None
return self.elementTypesMap[modeltypes[0]].getNameValidator()
[docs]
def findObjectClass(self, absolute_name):
"""
Obtain the class object corresponding to the given name.
Note, this generic implementation expects that derived classes provide
a an attribute called elementTypesMap consisting in a dictionary whose
keys are TaurusElementTypes and whose values are the corresponding
specific object classes. e.g., the FooFactory should provide::
class FooFactory(TaurusFactory):
elementTypesMap = {TaurusElementType.Authority: FooAuthority,
TaurusElementType.Device: FooDevice,
TaurusElementType.Attribute: FooAttribute,
}
(...)
:param absolute_name: the object absolute name string
:type absolute_name: str
:return: a TaurusModel class derived type or None if the name is not
valid
:rtype: taurus.core.taurusmodel.TaurusModel or None
"""
try:
elementTypesMap = self.elementTypesMap
except AttributeError:
msg = (
"generic findObjectClass called but %s does "
+ "not define elementTypesMap."
) % self.__class__.__name__
raise RuntimeError(msg)
for t in self.getValidTypesForName(absolute_name):
ret = elementTypesMap.get(t, None)
if ret is not None:
return ret
return None