Source code for taurus.core.tango.tangodatabase

#!/usr/bin/env python

# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################

"""This module contains all taurus tango authority
"""

from collections.abc import Mapping

import os
import weakref

from PyTango import Database, DeviceProxy, DevFailed, ApiUtil
from taurus import Device
from taurus.core.taurusbasetypes import TaurusDevState, TaurusEventType
from taurus.core.taurusauthority import TaurusAuthority
from taurus.core.util.containers import CaselessDict
from taurus.core.util.log import taurus4_deprecation
from taurus.core.util.fqdn import fqdn_no_alias
from taurus.core.tango.tangovalidator import TangoAuthorityNameValidator


__docformat__ = "restructuredtext"

InvalidAlias = "nada"


[docs] class TangoInfo(object): def __init__(self, container, name=None, full_name=None): self._container = weakref.ref(container) self._name = name self._full_name = full_name
[docs] def container(self): return self._container()
[docs] def name(self): return self._name
[docs] def fullName(self): return self._full_name
def __str__(self): return "%s(%s)" % (self.__class__.__name__, self.name()) __repr__ = __str__
[docs] class TangoAttrInfo(TangoInfo): def __init__( self, container, name=None, full_name=None, device=None, info=None ): super(TangoAttrInfo, self).__init__( container, name=name, full_name=full_name ) self._info = info self._device = weakref.ref(device)
[docs] def device(self): return self._device()
[docs] def info(self): return self._info
def __getattr__(self, name): return getattr(self._info, name)
[docs] class TangoDevClassInfo(TangoInfo): def __init__(self, container, name=None, full_name=None): super(TangoDevClassInfo, self).__init__( container, name=name, full_name=full_name ) self._devices = CaselessDict()
[docs] def devices(self): return self._devices
[docs] def addDevice(self, dev): self._devices[dev.name()] = dev
[docs] def getDeviceNames(self): if not hasattr(self, "_device_name_list"): self._device_name_list = sorted( map(TangoDevInfo.name, self._devices.values()) ) return self._device_name_list
[docs] class TangoDevInfo(TangoInfo): def __init__( self, container, name=None, full_name=None, alias=None, server=None, klass=None, exported=False, host=None, ): super(TangoDevInfo, self).__init__( container, name=name, full_name=full_name ) self._alias = alias self._server = weakref.ref(server) self._klass = weakref.ref(klass) self._exported = bool(int(exported)) self._alive = None self._state = None self._host = host name = str(name) self._domain, self._family, self._member = list( map(str.upper, name.split("/", 2)) ) self._attributes = None self._alivePending = False
[docs] def domain(self): return self._domain
[docs] def family(self): return self._family
[docs] def member(self): return self._member
[docs] def alias(self): return self._alias
[docs] def server(self): return self._server()
[docs] def klass(self): return self._klass()
[docs] def exported(self): return self._exported
[docs] def alive(self): if self._alive is None: if self._alivePending: return False self._alivePending = True try: dev = self.getDeviceProxy() _ = dev.state() self._alive = True except Exception: self._alive = False self._alivePending = False return self._alive
[docs] def state(self): """Overwrite state so it doesn't call 'alive()' since it can take a long time for devices that are declared as exported but are in fact not running (crashed, network error, power cut, etc) """ if self._state is not None: return self._state exported = self.exported() if exported: self._state = TaurusDevState.Ready else: self._state = TaurusDevState.NotReady return self._state
[docs] def host(self): return self._host
[docs] def attributes(self): if self._attributes is None or len(self._attributes) == 0: self.refreshAttributes() return self._attributes
[docs] def getAttribute(self, attrname): attrname = attrname.lower() for a in self.attributes(): if a.name() == attrname: return a return None
[docs] def setAttributes(self, attributes): self._attributes = attributes
@taurus4_deprecation(alt="getDeviceProxy()") def getHWObj(self): return self.getDeviceProxy()
[docs] def getDeviceProxy(self): db = self.container().db name = self.name() full_name = db.getFullName() + "/" + name dev = None try: dev = db.factory().getDevice(full_name).getDeviceProxy() except Exception: pass return dev
[docs] def refreshAttributes(self): attrs = [] try: dev = self.getDeviceProxy() if dev is None: raise DevFailed() # @todo: is this the right exception? attr_info_list = dev.attribute_list_query_ex() for attr_info in attr_info_list: full_name = "%s/%s" % (self.fullName(), attr_info.name) attr_obj = TangoAttrInfo( self.container(), name=attr_info.name.lower(), full_name=full_name.lower(), device=self, info=attr_info, ) attrs.append(attr_obj) attrs = sorted(attrs, key=lambda attr: attr.name()) except DevFailed: self._state = TaurusDevState.NotReady self.setAttributes(attrs)
[docs] class TangoServInfo(TangoInfo): def __init__(self, container, name=None, full_name=None): super(TangoServInfo, self).__init__( container, name=name, full_name=full_name ) self._devices = {} self._exported = False self._alive = None self._host = "" self._server_name, self._server_instance = name.split("/", 1) self._alivePending = False
[docs] def devices(self): return self._devices
[docs] def getDeviceNames(self): if not hasattr(self, "_device_name_list"): self._device_name_list = sorted( map(TangoDevInfo.name, self._devices.values()) ) return self._device_name_list
[docs] def getClassNames(self): if not hasattr(self, "_klass_name_list"): klasses = set(map(TangoDevInfo.klass, self._devices.values())) self._klass_name_list = sorted( map(TangoDevClassInfo.name, klasses) ) return self._klass_name_list
[docs] def exported(self): return self._exported
[docs] def state(self): exported = self.exported() if exported: return TaurusDevState.Ready return TaurusDevState.NotReady
[docs] def host(self): return self._host
[docs] def serverName(self): return self._server_name
[docs] def serverInstance(self): return self._server_instance
[docs] def addDevice(self, dev): self._exported |= dev.exported() self._host = dev.host() self._devices[dev.name()] = dev
[docs] def alive(self): if self._alive is None: if self._alivePending: return False try: self._alivePending = True alive = True for d in self.devices().values(): alive = d.alive() if not alive: break self._alive = alive except Exception as e: print("except", e) self._alive = False self._alivePending = False return self._alive
[docs] class TangoDatabaseCache(object): def __init__(self, db): self._db = weakref.ref(db) self._device_tree = None self._server_tree = None self._servers = None self._server_name_list = None self._devices = None self._device_name_list = None self._klasses = None self._klass_name_list = None self._aliases = None self._alias_name_list = None self.refresh() @property def db(self): return self._db()
[docs] def refresh(self): db = self.db db_dev_name = "/".join((db.getFullName(), db.dev_name())) if hasattr(Device(db_dev_name), "DbMySqlSelect"): # optimization in case the db exposes a MySQL select API query = ( "SELECT name, alias, exported, host, server, class " + "FROM device" ) r = db.command_inout("DbMySqlSelect", query) row_nb, column_nb = r[0][-2:] data = r[1] assert row_nb == len(data) // column_nb else: # fallback using tango commands (slow but works with sqlite DB) # see http://sf.net/p/tauruslib/tickets/148/ data = [] all_alias = {} all_devs = db.get_device_name("*", "*") all_exported = db.get_device_exported("*") for k in db.get_device_alias_list("*"): # Time intensive!! all_alias[db.get_device_alias(k)] = k for d in all_devs: # Very time intensive!! _info = db.command_inout("DbGetDeviceInfo", d)[1] name, ior, level, server, host, started, stopped = _info[:7] klass = db.get_class_for_device(d) alias = all_alias.get(d, "") exported = str(int(d in all_exported)) data.extend((name, alias, exported, host, server, klass)) column_nb = 6 # len ((name, alias, exported, host, server, klass)) CD = CaselessDict dev_dict, serv_dict, klass_dict, alias_dict = CD(), {}, {}, CD() for i in range(0, len(data), column_nb): name, alias, exported, host, server, klass = data[ i : i + column_nb ] if name.count("/") != 2: continue # invalid/corrupted entry: just ignore it if server.count("/") != 1: continue # invalid/corrupted entry: just ignore it if not len(alias): alias = None serv_dict[server] = si = serv_dict.get( server, TangoServInfo(self, name=server, full_name=server) ) klass_dict[klass] = dc = klass_dict.get( klass, TangoDevClassInfo(self, name=klass, full_name=klass) ) full_name = "%s/%s" % (db.getFullName(), name) dev_dict[name] = di = TangoDevInfo( self, name=name, full_name=full_name, alias=alias, server=si, klass=dc, exported=exported, host=host, ) si.addDevice(di) dc.addDevice(di) if alias is not None: alias_dict[alias] = di self._devices = dev_dict self._device_tree = TangoDevTree(dev_dict) self._server_tree = TangoServerTree(serv_dict) self._servers = serv_dict self._klasses = klass_dict self._aliases = alias_dict
[docs] def refreshAttributes(self, device): attrs = [] try: db = self.db name = device.name() full_name = db.getFullName() + "/" + name taurus_dev = db.factory().getDevice( full_name, create_if_needed=False ) if taurus_dev is None: dev = DeviceProxy(full_name) else: dev = taurus_dev.getDeviceProxy() attr_info_list = dev.attribute_list_query_ex() for attr_info in attr_info_list: full_attr_name = "%s/%s" % (full_name, attr_info.name) attr_obj = TangoAttrInfo( self, name=attr_info.name, full_name=full_attr_name, device=device, info=attr_info, ) attrs.append(attr_obj) attrs = sorted(attrs, key=lambda attr: attr.name().lower()) except DevFailed: pass device.setAttributes(attrs)
[docs] def getDevice(self, name): """Returns a :class:`TangoDevInfo` object with information about the given device name :param name: the device name :type name: str :return: information about the device :rtype: TangoDevInfo """ return self._devices.get(name)
[docs] def getDeviceNames(self): """Returns a list of registered device names :return: a sequence with all registered device names :rtype: sequence<str> """ if self._device_name_list is None: self._device_name_list = sorted( map(TangoDevInfo.name, self.devices().values()) ) return self._device_name_list
[docs] def getAliasNames(self): if self._alias_name_list is None: self._alias_name_list = sorted( map(TangoDevInfo.alias, self.aliases().values()) ) return self._alias_name_list
[docs] def getServerNames(self): """Returns a list of registered server names :return: a sequence with all registered server names :rtype: sequence<str> """ if self._server_name_list is None: self._server_name_list = sorted( map(TangoServInfo.name, self.servers().values()) ) return self._server_name_list
[docs] def getClassNames(self): """Returns a list of registered device classes :return: a sequence with all registered device classes :rtype: sequence<str> """ if self._klass_name_list is None: self._klass_name_list = sorted( map(TangoDevClassInfo.name, self.klasses().values()) ) return self._klass_name_list
[docs] def deviceTree(self): """Returns a tree container with all devices in three levels: domain, family and member :return: a tree containning all devices :rtype: TangoDevTree """ return self._device_tree
[docs] def serverTree(self): """Returns a tree container with all servers in two levels: server name and server instance :return: a tree containning all servers :rtype: TangoServerTree """ return self._server_tree
[docs] def servers(self): return self._servers
[docs] def devices(self): return self._devices
[docs] def klasses(self): return self._klasses
[docs] def getDeviceDomainNames(self): return list(self._device_tree.keys())
[docs] def getDeviceFamilyNames(self, domain): families = self._device_tree.get(domain) if families is None: return [] return list(families.keys())
[docs] def getDeviceMemberNames(self, domain, family): families = self._device_tree.get(domain) if families is None: return [] members = families.get(family) if members is None: return [] return list(members.keys())
[docs] def getDomainDevices(self, domain): return self.deviceTree().getDomainDevices(domain)
[docs] def getFamilyDevices(self, domain, family): return self.deviceTree().getFamilyDevices(domain, family)
[docs] def getServerNameInstances(self, serverName): return self.serverTree().getServerNameInstances(serverName)
class TangoDevTree(CaselessDict): def __init__(self, other=None): super(TangoDevTree, self).__init__() self._devices = CaselessDict() if other is not None: self._update(other) def _update(self, other): try: if isinstance(other, Mapping): other = list(other.values()) for dev in other: try: self.addDevice(dev) except Exception as e: print(e) except Exception: raise Exception( "Must give dict<obj, TangoDevInfo> or sequence<TangoDevInfo>" ) def addDevice(self, dev_info): domain, family, member = ( dev_info.domain(), dev_info.family(), dev_info.member(), ) families = self[domain] = self.get(domain, CaselessDict()) devs = self._devices[domain] = self._devices.get( domain, CaselessDict() ) devs[dev_info.name()] = dev_info families[family] = members = families.get(family, CaselessDict()) members[member] = dev_info def getDomainDevices(self, domain): """Returns all devices under the given domain. Returns empty list if the domain doesn't exist or doesn't contain any devices """ return list(self._devices.get(domain, {}).values()) def getFamilyDevices(self, domain, family): """Returns all devices under the given domain/family. Returns empty list if the domain/family doesn't exist or doesn't contain any devices """ families = self.get(domain) if families is None: return return list(families.get(family, {}).values()) class TangoServerTree(dict): def __init__(self, other=None): super(TangoServerTree, self).__init__() if other is not None: self._update(other) def _update(self, other): try: if isinstance(other, Mapping): other = list(other.values()) for serv in other: try: self.addServer(serv) except Exception as e: print(e) except Exception: raise Exception( "Must give dict<obj, TangoServInfo> or sequence<TangoServInfo>" ) def addServer(self, serv_info): serverName, serverInstance = ( serv_info.serverName(), serv_info.serverInstance(), ) serverInstances = self[serverName] = self.get(serverName, {}) serverInstances[serverInstance] = serv_info def getServerNameInstances(self, serverName): """Returns all servers under the given serverName. Returns empty list if the server name doesn't exist or doesn't contain any instances """ return list(self.get(serverName, {}).values()) def get_home(): """ Find user's home directory if possible. Otherwise raise error. :return: user's home directory :rtype: str New in PyTango 7.2.0 """ path = "" try: path = os.path.expanduser("~") except Exception: pass if not os.path.isdir(path): for evar in ("HOME", "USERPROFILE", "TMP"): try: path = os.environ[evar] if os.path.isdir(path): break except Exception: pass if path: return path else: raise RuntimeError("please define environment variable $HOME") def get_env_var(env_var_name): """ Returns the value for the given environment name A backup method for old Tango/PyTango versions which don't implement :meth:`PyTango.ApiUtil.get_env_var` Search order: * a real environ var * HOME/.tangorc * /etc/tangorc :param env_var_name: the environment variable name :type env_var_name: str :return: the value for the given environment name :rtype: str """ if env_var_name in os.environ: return os.environ[env_var_name] fname = os.path.join(get_home(), ".tangorc") if not os.path.exists(fname): if os.name == "posix": fname = "/etc/tangorc" if not os.path.exists(fname): return None with open(fname) as f: for line in f: strippedline = line.split("#", 1)[0].strip() if not strippedline: # empty line continue tup = strippedline.split("=", 1) if len(tup) != 2: # illegal line! continue key, val = list(map(str.strip, tup)) if key == env_var_name: return val
[docs] class TangoAuthority(TaurusAuthority): # helper class property that stores a reference to the corresponding # factory _factory = None _scheme = "tango" _description = "A Tango Authority" def __init__(self, host=None, port=None, parent=None): if host is None or port is None: try: _hp = TangoAuthority.get_default_tango_host() v = TangoAuthorityNameValidator() g = v.getUriGroups("tango://{}".format(_hp)) host = g.get("host", None) port = g.get("port", None) except Exception: from taurus import warning warning("Error getting default Tango host") # Set host to fqdn if host is not None: host = fqdn_no_alias(host) db = Database(host, port) complete_name = "tango://{}:{}".format(host, port) else: complete_name = "tango://{}".format(_hp) db = Database() self.dbObj = db self._dbProxy = None self._dbCache = None self.call__init__(TaurusAuthority, complete_name, parent) try: self.get_class_for_device(self.dev_name()) except Exception: # Ok, old tango database. self.get_class_for_device = self.__get_class_for_device
[docs] @staticmethod def get_default_tango_host(): if hasattr(ApiUtil, "get_env_var"): f = ApiUtil.get_env_var else: f = get_env_var return f("TANGO_HOST")
def __get_class_for_device(self, dev_name): """Backup method when connecting to tango 5 database device server""" # Ok, old tango database. serv_name = self.command_inout("DbGetDeviceInfo", dev_name)[1][3] devs = self.get_device_class_list(serv_name) dev_name_lower = dev_name.lower() for i in range(len(devs) // 2): idx = i * 2 if devs[idx].lower() == dev_name_lower: return devs[idx + 1] return None
[docs] def get_device_attribute_list(self, dev_name, wildcard): return self.command_inout( "DbGetDeviceAttributeList", (dev_name, wildcard) )
# Export the PyTango.Database interface into this object. # This way we can call for example get_attribute_property on an object of # this class def __getattr__(self, name): if self.dbObj is not None: return getattr(self.dbObj, name) return None @taurus4_deprecation(alt="getTangoDB") def getValueObj(self, cache=True): return self.getTangoDB()
[docs] def getTangoDB(self): return self.dbObj
@taurus4_deprecation(alt="getFullName") def getDisplayValue(self, cache=True): return self.getDisplayDescription(cache)
[docs] def addListener(self, listener): ret = TaurusAuthority.addListener(self, listener) if not ret: return ret self.fireEvent(TaurusEventType.Change, self.getFullName(), listener) return ret
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~ # Query capabilities built on top of a cache # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs] def cache(self): if self._dbCache is None: self._dbCache = TangoDatabaseCache(self) return self._dbCache
[docs] def refreshCache(self): self.cache().refresh()
[docs] def getDevice(self, name): """ Reimplemented from :class:`TaurusDevice` to use cache and return :class:`taurus.core.tango.TangoDevInfo` objects with information about the given device name :param name: the device name :type name: str :return: information about the tango device :rtype: TangoDevInfo """ return self.cache().getDevice(name)
[docs] def getDeviceNames(self): """Returns a list of registered tango device names :return: a sequence with all registered tango device names :rtype: sequence<str> """ return self.cache().getDeviceNames()
[docs] def getAliasNames(self): """Returns a list of registered tango device alias :return: a sequence with all registered tango device alias :rtype: sequence<str> """ return self.cache().getAliasNames()
[docs] def getServerNames(self): """Returns a list of registered tango device servers in format<name>/<instance> :return: a sequence with all registered tango device servers :rtype: sequence<str> """ return self.cache().getServerNames()
[docs] def getClassNames(self): """Returns a list of registered tango device classes :return: a sequence with all registered tango device classes :rtype: sequence<str> """ return self.cache().getClassNames()
[docs] def getDeviceDomainNames(self): return self.cache().getDeviceDomainNames()
[docs] def getDeviceFamilyNames(self, domain): return self.cache().getDeviceFamilyNames(domain)
[docs] def getDeviceMemberNames(self, domain, family): return self.cache().getDeviceMemberNames(domain, family)
[docs] def getDomainDevices(self, domain): return self.cache().getDomainDevices(domain)
[docs] def getFamilyDevices(self, domain, family): return self.cache().getFamilyDevices(domain, family)
[docs] def getServerNameInstances(self, serverName): return self.cache().getServerNameInstances(serverName)
[docs] def deviceTree(self): """Returns a tree container with all devices in three levels : domain, family and member :return: a tree containning all devices :rtype: TangoDevTree """ return self.cache().deviceTree()
[docs] def getElementAlias(self, full_name): """return the alias of an element from its full name""" try: alias = self.getTangoDB().get_alias(full_name) if alias and alias.lower() == InvalidAlias: alias = None except Exception: alias = None return alias
[docs] def getElementFullName(self, alias): """return the full name of an element from its alias""" try: # PyTango v>=8.1.0 return self.getTangoDB().get_device_from_alias(alias) except AttributeError: try: # PyTango v<8.1.0 return self.getTangoDB().get_device_alias(alias) except Exception: return None except Exception: return None
@taurus4_deprecation(alt=".description") def getDescription(self, cache=True): return self.description
# Declare this alias for backwards compatibility TangoDatabase = TangoAuthority