Source code for taurus.core.taurusmodel

#!/usr/bin/env python

# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################

"""This module contains the base TaurusModel class
"""

import weakref
from collections.abc import Sequence

from .util.log import Logger
from .util.event import (
    CallableRef,
    BoundMethodWeakref,
    _BoundMethodWeakrefWithCall,
)
from .taurusbasetypes import TaurusEventType

__all__ = ["TaurusModel"]

__docformat__ = "restructuredtext"


[docs]class TaurusModel(Logger): _factory = None RegularEvent = ( TaurusEventType.Change, TaurusEventType.Config, TaurusEventType.Periodic, ) def __init__(self, full_name="", parent=None, serializationMode=None): v = self.getNameValidator() self._full_name, self._norm_name, self._simp_name = v.getNames( full_name, self.factory() ) if ( self._full_name is None and self._norm_name and self._simp_name is None ): self.trace("invalid name") name = ( self._simp_name or self._norm_name or self._full_name or "TaurusModel" ) self.call__init__(Logger, name, parent) if serializationMode is None: s_obj = parent if s_obj is None: s_obj = self.factory() serializationMode = s_obj.getSerializationMode() self._serialization_mode = serializationMode self._parentObj = parent self._listeners = [] def __str__name__(self, name): return "{0}({1})".format(self.__class__.__name__, name) def __str__(self): return self.__str__name__(self.getNormalName()) def __repr__(self): return self.__str__name__(self.getFullName())
[docs] def cleanUp(self): self.trace("[TaurusModel] cleanUp") self._parentObj = None self._listeners = None Logger.cleanUp(self)
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~ # API for Factory access # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs] @classmethod def factory(cls): if cls._factory is None: from taurus import Factory cls._factory = Factory(scheme=cls._scheme) return cls._factory
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~ # API for naming # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs] @classmethod def getTaurusElementType(cls): raise NotImplementedError( "TaurusModel.getTaurusElementType cannot" " be called" )
[docs] def getFullName(self): return self._full_name
[docs] def getNormalName(self): return self._norm_name
[docs] def getSimpleName(self): return self._simp_name
[docs] @classmethod def isValid(cls, *args, **kwargs): return cls.getNameValidator().isValid(*args, **kwargs)
[docs] @classmethod def buildModelName(cls, parent_model, relative_name): raise NotImplementedError( "TaurusModel.buildModelName cannot be called" )
[docs] @classmethod def getNameValidator(cls): raise NotImplementedError( "TaurusModel.getNameValidator cannot be" "called" )
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~ # API for hierarchy access # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs] def getParentObj(self): return self._parentObj
[docs] def getChildObj(self, child_name): return None # TODO: consider raising NotImplementedError instead
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~ # API for serialization # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs] def setSerializationMode(self, mode): """Sets the serialization mode for the system. :param mode: the new serialization mode :type mode: TaurusSerializationMode """ self._serialization_mode = mode
[docs] def getSerializationMode(self): """Gives the serialization operation mode. :return: the current serialization mode :rtype: TaurusSerializationMode """ return self._serialization_mode
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~ # API for value access # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs] def getDisplayDescrObj(self, cache=True): """A brief description of the model. Can be used as tooltip, for example. """ raise NotImplementedError( "TaurusModel.getDisplayDescrObj cannot be" " called" )
[docs] def getDisplayName(self, cache=True, complete=True): full_name = self.getFullName() normal_name = self.getNormalName() simple_name = self.getSimpleName() if simple_name: ret = simple_name if complete: ret += " (" + normal_name.upper() + ")" elif normal_name: ret = normal_name.upper() else: ret = full_name.upper() return ret
[docs] def getFragmentObj(self, fragmentName=None): """Returns a fragment object of the model. A fragment is computed from a model by evaluating the expression `<model>.<fragmentName>` For a simple fragmentName (no dots), this is roughly equivalent to getattr(self, fragmentName) If the fragment cannot be computed, :class:`AttributeError` is raised :param fragmentName: the returned value will correspond to the given fragmentName. If None is passed the defaultFragmentName will be used instead. :type fragmentName: str or None :return: the computed fragment of the modelObj. :rtype: obj """ if fragmentName is None: fragmentName = self.defaultFragmentName try: return eval("obj." + fragmentName, {}, {"obj": self}) except Exception as e: # Note: always raise AttributeError to comply with existing API msg = "Cannot get fragment of {!r}.{!s}: Reason: {!r}" raise AttributeError(msg.format(self, fragmentName, e))
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~ # API for listeners # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~ def _listenerDied(self, weak_listener): if self._listeners is None: return try: self._listeners.remove(weak_listener) except Exception as e: self.debug("Problem removing listener: %r", e) def _getCallableRef(self, listener, cb=None): # return weakref.ref(listener, self._listenerDied) meth = getattr(listener, "eventReceived", None) if meth is not None and hasattr(meth, "__call__"): return weakref.ref(listener, cb) else: return CallableRef(listener, cb)
[docs] def addListener(self, listener): if self._listeners is None or listener is None: return False # TODO: _BoundMethodWeakrefWithCall is used as workaround for # PyTango #185 issue weak_listener = self._getCallableRef( listener, _BoundMethodWeakrefWithCall(self._listenerDied) ) # listener, self._listenerDied) if weak_listener in self._listeners: return False self._listeners.append(weak_listener) return True
[docs] def removeListener(self, listener): if self._listeners is None: return weak_listener = self._getCallableRef(listener) try: self._listeners.remove(weak_listener) except Exception: return False return True
[docs] def forceListening(self): class __DummyListener(object): def eventReceived(self, *args): pass if ( not hasattr(self, "__dummyListener") or self.__dummyListener is None ): self.__dummyListener = __DummyListener() self.addListener(self.__dummyListener)
[docs] def unforceListening(self): if ( hasattr(self, "__dummyListener") and self.__dummyListener is not None ): self.removeListener(self.__dummyListener) self.__dummyListener = None
[docs] def deleteListener(self, listener): self.deprecated("Use removeListener(listener) instead") self.removeListener(listener)
[docs] def hasListeners(self): """returns True if anybody is listening to events from this attribute """ if self._listeners is None: return False return len(self._listeners) > 0
[docs] def fireEvent(self, event_type, event_value, listeners=None): """sends an event to all listeners or a specific one""" if listeners is None: listeners = self._listeners if listeners is None: return if not isinstance(listeners, Sequence): listeners = (listeners,) for listener in listeners: if isinstance(listener, weakref.ref) or isinstance( listener, BoundMethodWeakref ): listener = listener() if listener is None: continue meth = getattr(listener, "eventReceived", None) if meth is not None and hasattr(meth, "__call__"): listener.eventReceived(self, event_type, event_value) elif hasattr(listener, "__call__"): listener(self, event_type, event_value)
[docs] def isWritable(self): return False
@property def name(self): return self._simp_name @property def fullname(self): return self._full_name parentObj = property(fget=getParentObj)