Source code for taurus.core.epics.epicsattribute

#!/usr/bin/env python
# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################

"""
Epics module. See __init__.py for more detailed documentation
"""

import numpy
from taurus.core.units import Quantity
from taurus.core.taurusbasetypes import (
    TaurusEventType,
    TaurusAttrValue,
    TaurusTimeVal,
    AttrQuality,
    DataType,
    DataFormat,
)
from taurus.core.taurusattribute import TaurusAttribute
import epics
from epics.ca import ChannelAccessException
from epics import dbr


# map for epics DBR types to Taurus Types.
Dbr2TaurusType = {
    dbr.STRING: DataType.String,
    dbr.INT: DataType.Integer,
    dbr.SHORT: DataType.Integer,
    dbr.FLOAT: DataType.Float,
    # dbr.ENUM: DataType.Integer,  # TODO: Support enum
    dbr.CHAR: DataType.Bytes,
    dbr.LONG: DataType.Integer,
    dbr.DOUBLE: DataType.Float,
    # TODO: if a time type is added to DataType, change the next
    dbr.TIME_STRING: DataType.String,
    dbr.TIME_INT: DataType.Integer,
    dbr.TIME_SHORT: DataType.Integer,
    dbr.TIME_FLOAT: DataType.Float,
    # dbr.TIME_ENUM: DataType.Integer,  # TODO: Support enum
    dbr.TIME_CHAR: DataType.Bytes,
    dbr.TIME_LONG: DataType.Integer,
    dbr.TIME_DOUBLE: DataType.Float,
    dbr.CTRL_STRING: DataType.String,
    dbr.CTRL_INT: DataType.Integer,
    dbr.CTRL_SHORT: DataType.Integer,
    dbr.CTRL_FLOAT: DataType.Float,
    # dbr.CTRL_ENUM: DataType.Integer,  # TODO: Support enum
    dbr.CTRL_CHAR: DataType.Bytes,
    dbr.CTRL_LONG: DataType.Integer,
    dbr.CTRL_DOUBLE: DataType.Float,
}


[docs]class EpicsAttribute(TaurusAttribute): """ A :class:`TaurusAttribute` that gives access to an Epics Process Variable. .. seealso:: :mod:`taurus.core.epics` .. warning:: In most cases this class should not be instantiated directly. Instead it should be done via the :meth:`EpicsFactory.getAttribute` """ # TODO: support non-numerical PVs def __init__(self, name="", parent=None, storeCallback=None): self.call__init__( TaurusAttribute, name, parent, storeCallback=storeCallback ) self._label = self.getSimpleName() self._value = None self.data_format = None self.type = None self._range = [None, None] self._alarm = [None, None] self._warning = [None, None] self.__pv = epics.PV( self.getNormalName(), callback=self.onEpicsEvent, form="ctrl", connection_callback=self.onEpicsConnectionEvent, ) self.__pv.wait_for_connection()
[docs] def getPV(self): """Returns the underlying :obj:`epics.PV` object""" return self.__pv
[docs] def onEpicsEvent(self, **kwargs): """callback for PV changes""" # this is called from the ca thread. # TODO: Consider doing the following on a different thread self._value = self.decode(self.__pv) self.fireEvent(TaurusEventType.Change, self._value)
[docs] def onEpicsConnectionEvent(self, **kwargs): """callback for PV connection changes""" if kwargs["conn"]: self.debug("(re)connected to epics PV") if self._value is not None: self._value.error = None else: self.warning("Connection to epics PV lost") self._value.error = ChannelAccessException( 'PV "%s" not connected' % kwargs["pvname"] ) self.fireEvent(TaurusEventType.Change, self._value)
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~ # Necessary to overwrite from TaurusAttribute # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~
[docs] def encode(self, value): """encodes the value passed to the write method into a representation that can be written with :meth:`epics.PV.put` """ if isinstance(value, Quantity): # convert to units of the PV value = value.to(self.__pv.units).magnitude return value
[docs] def decode(self, pv): """Decodes an epics PV object into a TaurusValue, and also updates other properties of the Attribute object """ attr_value = TaurusAttrValue() if not pv.connected: attr_value.error = ChannelAccessException( 'PV "%s" not connected' % pv.pvname ) return attr_value v = pv.value # type try: self.type = Dbr2TaurusType[pv.ftype] except KeyError: raise ValueError('Unsupported epics type "%s"' % pv.type) # writable self.writable = pv.write_access # data_format if numpy.isscalar(v): self.data_format = DataFormat._0D else: self.data_format = DataFormat(len(numpy.shape(v))) # units and limits support if self.type in (DataType.Integer, DataType.Float): v = Quantity(v, pv.units) self._range = self.__decode_limit( pv.lower_ctrl_limit, pv.upper_ctrl_limit ) self._alarm = self.__decode_limit( pv.lower_alarm_limit, pv.upper_alarm_limit ) self._warning = self.__decode_limit( pv.lower_warning_limit, pv.upper_warning_limit ) # rvalue attr_value.rvalue = v # wvalue if pv.write_access: attr_value.wvalue = v # time if pv.timestamp is None: attr_value.time = TaurusTimeVal.now() else: attr_value.time = TaurusTimeVal.fromtimestamp(pv.timestamp) # quality if pv.severity > 0: attr_value.quality = AttrQuality.ATTR_ALARM else: attr_value.quality = AttrQuality.ATTR_VALID return attr_value
def __decode_limit(self, low, high): units = self.__pv.units if low is None or numpy.isnan(low): low = None else: low = Quantity(low, units) if low is None or numpy.isnan(high): high = None else: high = Quantity(high, units) return [low, high]
[docs] def write(self, value, with_read=True): value = self.encode(value) self.__pv.put(value, wait=True) if with_read: return self.read(cache=False)
[docs] def read(self, cache=True): """returns the value of the attribute. :param cache: If True (default), the last calculated value will be returned. If False, the referenced values will be re- read and the transformation string will be re-evaluated :type cache: bool :return: attribute value """ if not cache: self.__pv.get(use_monitor=False) self._value = self.decode(self.__pv) return self._value
[docs] def poll(self): v = self.read(cache=False) self.fireEvent(TaurusEventType.Periodic, v)
[docs] def isUsingEvents(self): return True # TODO: implement this
[docs] def factory(self): from .epicsfactory import EpicsFactory return EpicsFactory()
[docs] @classmethod def getNameValidator(cls): from .epicsvalidator import EpicsAttributeNameValidator return EpicsAttributeNameValidator()
if __name__ == "__main__": a = EpicsAttribute("ca:XXX:a", None) b = EpicsAttribute("ca:XXX:b", None) s = EpicsAttribute("ca:XXX:sum", None) a.write(3.0) b.write(4.0) s.read() print("!$!", s.read(cache=False)) print("a,b,s", a.read().rvalue, b.read().rvalue, s.read().rvalue) print("DF=", a.getDataFormat(), DataFormat.whatis(a.getDataFormat()))