Source code for taurus.core.taurusmanager

#!/usr/bin/env python

# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################

"""This module contains the taurus base manager class
"""

import os
import atexit
import pkg_resources

from .util.singleton import Singleton
from .util.log import Logger, taurus4_deprecation
from .util.threadpool import ThreadPool
from .taurusbasetypes import (
    OperationMode,
    ManagerState,
    TaurusSerializationMode,
)
from .taurusauthority import TaurusAuthority
from .taurusdevice import TaurusDevice
from .taurusattribute import TaurusAttribute
from .taurusexception import TaurusException
from .taurusfactory import TaurusFactory
from taurus import core, tauruscustomsettings, getSchemeFromName


__all__ = ["TaurusManager"]

__docformat__ = "restructuredtext"


[docs] class TaurusManager(Singleton, Logger): """A :class:`taurus.core.util.singleton.Singleton` class designed to provide Taurus management. Example:: >>> import taurus.core.taurusmanager >>> manager = taurus.core.taurusmanager.TaurusManager() >>> print(manager == taurus.core.taurusmanager.TaurusManager()) True """ PLUGIN_KEY = "__taurus_plugin__" DefaultSerializationMode = TaurusSerializationMode.Concurrent default_scheme = getattr(tauruscustomsettings, "DEFAULT_SCHEME", "tango") def __init__(self): """Initialization. Nothing to be done here for now.""" pass
[docs] def init(self, *args, **kwargs): """Singleton instance initialization. For internal usage only. Do **NOT** call this method directly """ self._state = ManagerState.UNINITIALIZED self.call__init__(Logger) self.reInit() atexit.register(self.cleanUp)
[docs] def reInit(self): """Reinitialization""" if self._state == ManagerState.INITED: return self.trace("reInit()") this_path = os.path.abspath(__file__) self._this_path = os.path.dirname(this_path) self._serialization_mode = self.DefaultSerializationMode self._thread_pool = ThreadPool( name="TaurusTP", parent=self, Psize=5, Qsize=1000 ) self._sthread_pool = ThreadPool( name="TaurusTSP", parent=self, Psize=1, Qsize=0 ) self._plugins = None self._initial_default_scheme = self.default_scheme self._state = ManagerState.INITED
[docs] def cleanUp(self): """Cleanup""" if self._state == ManagerState.CLEANED: return self.trace("cleanUp()") if self._plugins is None: return self.trace("[TaurusManager] cleanUp") self._plugins = None self._thread_pool.join() self._thread_pool = None self._sthread_pool.join() self._sthread_pool = None self._state = ManagerState.CLEANED
[docs] def addJob(self, job, callback=None, *args, **kw): """Deprecated. Wrapper of enqueueJob. See enqueueJob documentation.""" self.deprecated(dep="addJob", alt="enqueueJob", rel="4.3.2") self.enqueueJob(job, callback=callback, job_args=args, job_kwargs=kw)
[docs] def enqueueJob( self, job, callback=None, job_args=(), job_kwargs=None, serialization_mode=None, ): """Enqueue a job (callable) to the queue. The new job will be processed by a separate thread :param job: a callable object :type job: callable :param callback: called after the job has been processed :type callback: callable :param job_args: positional arguments passed to the job :type job_args: sequence :param job_kwargs: keyword arguments passed to the job :type job_kwargs: dict :param serialization_mode: serialization mode :type serialization_mode: TaurusSerializationMode """ if job_kwargs is None: job_kwargs = {} if serialization_mode is None: serialization_mode = self._serialization_mode if serialization_mode == TaurusSerializationMode.Concurrent: if not hasattr(self, "_thread_pool") or self._thread_pool is None: self.info("Job cannot be processed.") self.debug( "The requested job cannot be processed. " + "Make sure this manager is initialized" ) return self._thread_pool.add(job, callback, *job_args, **job_kwargs) elif serialization_mode == TaurusSerializationMode.Serial: if ( not hasattr(self, "_sthread_pool") or self._sthread_pool is None ): self.info("Job cannot be processed.") self.debug( "The requested job cannot be processed. " + "Make sure this manager is initialized" ) return self._sthread_pool.add(job, callback, *job_args, **job_kwargs) else: raise TaurusException( "{} serialization mode not supported".format( serialization_mode ) )
[docs] def setSerializationMode(self, mode): """Sets the serialization mode for the system. :param mode: the new serialization mode :type mode: TaurusSerializationMode """ self._serialization_mode = mode
[docs] def getSerializationMode(self): """Gives the serialization operation mode. :return: the current serialization mode :rtype: TaurusSerializationMode """ return self._serialization_mode
[docs] def setOperationMode(self, mode): """Deprecated. Sets the operation mode for the system. :param mode: the new operation mode :type mode: OperationMode """ dep = "setOperationMode" rel = "Taurus4" dbg_msg = "Don't use this method" msg = "%s is deprecated (from %s). %s" % (dep, rel, dbg_msg) self.deprecated(msg)
[docs] def getOperationMode(self): """Deprecated. Gives the current operation mode. :return: the current operation mode :rtype: OperationMode """ dep = "getOperationMode" rel = "Taurus4" dbg_msg = "Don't use this method" msg = "%s is deprecated (from %s). %s" % (dep, rel, dbg_msg) self.deprecated(msg) return OperationMode.ONLINE
[docs] def getDefaultFactory(self): """Gives the default factory. :return: the default taurus factory :rtype: taurus.core.taurusfactory.TaurusFactory """ return self.getPlugins().get(self.default_scheme, None)
[docs] def getPlugins(self): """Gives the information about the existing plugins :return: the list of plugins :rtype: dict<str, class taurus.core.taurusfactory.TaurusFactory> """ if self._plugins is None: self._plugins = self._build_plugins() return self._plugins
[docs] def getFactory(self, scheme=None): """Gives the factory class object supporting the given scheme :param scheme: the scheme. If None the default scheme is used :type scheme: str or None :return: the factory class object for the given scheme or None if a proper factory is not found :rtype: taurus.core.taurusfactory.TaurusFactory or None """ if scheme is None: return self.getDefaultFactory() return self.getPlugins().get(scheme)
[docs] def getObject(self, cls, name): """Gives the object for the given class with the given name :param cls: object class :type cls: taurus.core.taurusmodel.TaurusModel :param name: the object name :type name: str :return: a taurus model object :rtype: taurus.core.taurusmodel.TaurusModel or None """ factory = self._get_factory(name) if factory is None: return return factory.getObject(cls, name)
[docs] def findObject(self, absolute_name): """Finds the object with the given name :param absolute_name: the object name :type absolute_name: str :return: the taurus model object or None if no suitable name found :rtype: taurus.core.taurusmodel.TaurusModel or None """ factory = self._get_factory(absolute_name) if factory is None: return return factory.findObject(absolute_name)
[docs] def findObjectClass(self, absolute_name): """Finds the object class for the given object name :param absolute_name: the object name :type absolute_name: str :return: the taurus model class object or None if no suitable name found :rtype: class taurus.core.taurusmodel.TaurusModel or None """ factory = self._get_factory(absolute_name) if factory is None: return return factory.findObjectClass(absolute_name)
[docs] def getAuthority(self, name): """Returns a database object for the given name :param name: database name :type name: str :return: the authority for the given name :rtype: taurus.core.taurusauthority.TaurusAuthority """ return self.getObject(TaurusAuthority, name)
[docs] def getDatabase(self, name): """Deprecated. Use getAuthority instead""" self.warning("getDatabase is deprecated. Use getAuthority instead") return self.getAuthority(self, name)
[docs] def getDevice(self, name): """Returns a device object for the given name :param name: device name :type name: str :return: the device for the given name :rtype: taurus.core.taurusdevice.TaurusDevice """ return self.getObject(TaurusDevice, name)
[docs] def getAttribute(self, name): """Returns a attribute object for the given name :param name: attribute name :type name: str :return: the attribute for the given name :rtype: taurus.core.taurusattribute.TaurusAttribute """ return self.getObject(TaurusAttribute, name)
@taurus4_deprecation(alt="getAttribute") def getConfiguration(self, name): """Returns a configuration object for the given name :param name: configuration name :type name: str :return: the configuration for the given name :rtype: taurus.core.taurusconfiguration.TaurusConfiguration """ return self.getAttribute(name) def _get_factory(self, name): scheme = self.getScheme(name) if scheme is None: return try: return self.getPlugins()[scheme]() except Exception: raise TaurusException('Invalid scheme "%s"' % scheme)
[docs] def getScheme(self, name): """Returns the scheme name for a given model name :param name: model name :type name: str :return: scheme name :rtype: str """ return getSchemeFromName(name, implicit=True)
def _get_schema(self, name): raise DeprecationWarning( "_get_schema is deprecated. Use getScheme instead" ) def _build_plugins(self): plugin_classes = self._get_plugin_classes() plugins = {} for plugin_class in plugin_classes: schemes = list(plugin_class.schemes) for scheme in schemes: if scheme in plugins: k = plugins[scheme] self.warning( "Conflicting plugins: %s and %s both implement " "scheme %s. Will keep using %s" % ( k.__name__, plugin_class.__name__, scheme, k.__name__, ) ) else: plugins[scheme] = plugin_class return plugins
[docs] def buildPlugins(self): """Returns the current valid plugins :return: plugins :rtype: dic """ return self._build_plugins()
def _get_plugin_classes(self): elems = os.listdir(self._this_path) dirs = [] for elem in elems: if elem.startswith(".") or elem.startswith("_"): continue elem = os.path.join(self._this_path, elem) if not os.path.isdir(elem): continue plugin_file = os.path.join(elem, self.PLUGIN_KEY) if not os.path.exists(plugin_file): continue if not os.path.exists(os.path.join(elem, "__init__.py")): continue dirs.append(elem) plugins = [] full_module_names = [ "taurus.core.%s" % d.split(os.path.sep)[-1] for d in dirs ] full_module_names.extend( getattr(tauruscustomsettings, "EXTRA_SCHEME_MODULES", []) ) full_module_names.extend(getattr(core, "PLUGIN_SCHEME_MODULES", [])) # --------------------------------------------------------------------- # Note: this is an experimental feature introduced in v 4.5.0a # It may be removed or changed in future releases # Discover the taurus.core.schemes plugins schemes_ep = pkg_resources.iter_entry_points("taurus.core.schemes") full_module_names.extend([p.name for p in schemes_ep]) # --------------------------------------------------------------------- for full_module_name in full_module_names: try: m = __import__(full_module_name, fromlist=["*"], level=0) except Exception: self.debug("Failed to inspect %s" % (full_module_name)) continue for s in m.__dict__.values(): plugin = None try: if issubclass(s, TaurusFactory) and issubclass( s, Singleton ): if hasattr(s, "schemes"): schemes = getattr(s, "schemes") if len(schemes): plugin = s else: scheme = self._find_scheme(s) if scheme is not None: s.schemes = (scheme,) plugin = s except Exception: pass if plugin is not None: self.debug("Found plugin %s" % plugin.__name__) plugins.append(plugin) return plugins def _find_scheme(self, factory_class): class_name = factory_class.__name__ for i in range(1, len(class_name)): if class_name[i].isupper(): return class_name[:i].lower()
[docs] def applyPendingOperations(self, ops): """Executes the given operations :param ops: the sequence of operations :type ops: sequence<taurus.core.taurusoperation.TaurusOperation> """ for o in ops: o.execute()
[docs] def changeDefaultPollingPeriod(self, period): plugin_classes = self._get_plugin_classes() for plugin_class in plugin_classes: scheme = plugin_class.schemes[0] self.getFactory(scheme)().changeDefaultPollingPeriod(period)
def __str__name__(self, name): return "{0}({1})".format(self.__class__.__name__, name) def __str__(self): return self.__str__name__("") def __repr__(self): return self.__str__name__("")
if __name__ == "__main__": manager = TaurusManager() print(manager.getPlugins())