Source code for taurus.qt.qtgui.extra_guiqwt.image

#!/usr/bin/env python

# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################

"""
Extension of :mod:`guiqwt.image`
"""
__all__ = [
    "TaurusImageItem",
    "TaurusRGBImageItem",
    "TaurusTrend2DItem",
    "TaurusTrend2DScanItem",
    "TaurusEncodedImageItem",
    "TaurusEncodedRGBImageItem",
]

from taurus.core.units import Quantity
from taurus.qt.qtgui.base import TaurusBaseComponent
from taurus.qt.qtcore.util import baseSignal
import taurus.core
from taurus.core.util.containers import ArrayBuffer

from guiqwt.image import ImageItem, RGBImageItem, XYImageItem
from guiqwt.image import INTERP_NEAREST

import numpy


class TaurusBaseImageItem(TaurusBaseComponent):
    """A ImageItem that gets its data from a taurus attribute"""

    dataChanged = baseSignal("dataChanged")

    def setModel(self, model, **kwargs):
        # do the standard stuff
        TaurusBaseComponent.setModel(self, model, **kwargs)
        # ... and fire a fake event for initialization
        try:
            value = self.getModelObj(**kwargs).read()
            self.fireEvent(
                self, taurus.core.taurusbasetypes.TaurusEventType.Change, value
            )
        except Exception:
            pass

    def handleEvent(self, evt_src, evt_type, evt_value):
        if evt_value is None or getattr(evt_value, "rvalue", None) is None:
            self.debug("Ignoring event from %s" % repr(evt_src))
            return
        v = evt_value.rvalue
        if isinstance(v, Quantity):
            v = v.magnitude
            # TODO: units should be used for setting some title in the colorbar
        try:
            v = self.filterData(v)
        except Exception as e:
            self.info("Ignoring event. Reason: %s", e)
            return
        # this is the range of the z axis (color scale)
        lut_range = self.get_lut_range()
        # if the range was not set, make it None (autoscale z axis)
        if lut_range[0] == lut_range[1]:
            lut_range = None
        self.set_data(v, lut_range=lut_range)
        self.dataChanged.emit()
        p = self.plot()

        if p is not None:
            p.update_colormap_axis(self)
            p.replot()

    def filterData(self, data):
        """Reimplement this method if you want to pre-process
        the data that will be passed to set_data.

        It should return something acceptable by :meth:`setData`
        and raise an exception if the data cannot be processed.

        This default implementation casts array types not
        supported by guiqwt to numpy.int32

        See:
          - http://code.google.com/p/guiqwt/issues/detail?id=44
          - https://sourceforge.net/p/tango-cs/bugs/568/
          - https://sourceforge.net/p/tauruslib/tickets/33/
        """
        try:
            dtype = data.dtype
            v = data
        except Exception:
            v = numpy.array(data)  # note that this is potentially expensive
            dtype = v.dtype

        if dtype not in (
            float,
            numpy.double,
            numpy.int32,
            numpy.uint16,
            numpy.int16,
            numpy.uint8,
            numpy.int8,
            bool,
        ):
            # note: numpy.uint32 was not included because of
            # https://sourceforge.net/p/tauruslib/tickets/33/
            try:
                self.debug("casting to numpy.int32")
                v = numpy.int32(v)
            except OverflowError:
                raise OverflowError(
                    (
                        "type %s not supported by guiqwt "
                        + "and cannot be casted to int32"
                    )
                    % repr(v.dtype)
                )

        return v


class TaurusEncodedBaseImageItem(TaurusBaseImageItem):
    """A ImageItem that gets its data from a taurus DevEncoded attribute"""

    def setModel(self, model, **kwargs):
        # do the standard stuff
        TaurusBaseComponent.setModel(self, model, **kwargs)
        # ... and fire a fake event for initialization
        try:
            fmt, value = self.codec.decode(self.getModelObj(**kwargs).read())
            self.fireEvent(
                self, taurus.core.taurusbasetypes.TaurusEventType.Change, value
            )
        except Exception:
            pass

    def filterData(self, data):
        """reimplementation to decode data using the DevEncoded codecs"""
        if type(data) == tuple:
            from taurus.core.util.codecs import CodecFactory

            codec = CodecFactory().getCodec(data[0])

            try:
                fmt, decoded_data = codec.decode(data)
            except Exception as e:
                self.info("Decoder error: %s", e)
                raise e

            try:
                dtype = decoded_data.dtype
                v = decoded_data
            except Exception:
                # note that this is potentially expensive
                v = numpy.array(decoded_data)
                dtype = v.dtype

            if dtype not in (
                float,
                numpy.double,
                numpy.int32,
                numpy.uint16,
                numpy.int16,
                numpy.uint8,
                numpy.int8,
                bool,
            ):
                # note: numpy.uint32 was not included because of
                # https://sourceforge.net/p/tauruslib/tickets/33/
                try:
                    self.debug("casting to numpy.int32")
                    v = numpy.int32(v)
                except OverflowError:
                    raise OverflowError(
                        (
                            "type %s not supported by guiqwt"
                            + " and cannot be casted to int32"
                        )
                        % repr(v.dtype)
                    )

            return v
        else:
            raise ValueError(
                (
                    "Unexpected data type (%s) for "
                    + "DevEncoded attribute (tuple expected)"
                )
                % type(data)
            )


[docs] class TaurusImageItem(ImageItem, TaurusBaseImageItem): """A ImageItem that gets its data from a taurus attribute""" def __init__(self, param=None): ImageItem.__init__(self, numpy.zeros((1, 1)), param=param) TaurusBaseImageItem.__init__(self, self.__class__.__name__)
[docs] class TaurusEncodedImageItem(ImageItem, TaurusEncodedBaseImageItem): """A ImageItem that gets its data from a DevEncoded attribute""" def __init__(self, param=None): ImageItem.__init__(self, numpy.zeros((1, 1)), param=param) TaurusEncodedBaseImageItem.__init__(self, self.__class__.__name__)
class TaurusXYImageItem(XYImageItem, TaurusBaseImageItem): """A XYImageItem that gets its data from a taurus attribute""" def __init__(self, param=None): XYImageItem.__init__( self, numpy.arange(2), numpy.arange(2), numpy.zeros((2, 2)), param=param, ) TaurusBaseImageItem.__init__(self, self.__class__.__name__)
[docs] class TaurusRGBImageItem(RGBImageItem, TaurusBaseImageItem): """A RGBImageItem that gets its data from a taurus attribute""" def __init__(self, param=None): RGBImageItem.__init__(self, numpy.zeros((1, 1, 3)), param=param) TaurusBaseImageItem.__init__(self, self.__class__.__name__)
[docs] def set_data(self, data, lut_range=None, **kwargs): """dummy reimplementation to accept the lut_range kwarg (just ignoring it) """ return RGBImageItem.set_data(self, data, **kwargs)
[docs] class TaurusEncodedRGBImageItem(RGBImageItem, TaurusEncodedBaseImageItem): """A RGBImageItem that gets its data from a DevEncoded attribute""" def __init__(self, param=None): RGBImageItem.__init__(self, numpy.zeros((1, 1, 3)), param=param) TaurusEncodedBaseImageItem.__init__(self, self.__class__.__name__)
[docs] def set_data(self, data, lut_range=None, **kwargs): """dummy reimplementation to accept the lut_range kwarg (just ignoring it) """ return RGBImageItem.set_data(self, data, **kwargs)
[docs] class TaurusTrend2DItem(XYImageItem, TaurusBaseComponent): """ A XYImageItem that is constructed by stacking 1D arrays from events from a Taurus 1D attribute """ scrollRequested = baseSignal("scrollRequested", object, object, object) dataChanged = baseSignal("dataChanged") def __init__(self, param=None, buffersize=512, stackMode="datetime"): """ :param param: param to be passed to XYImageItem constructor :param buffersize: size of the stack :type buffersize: int :param stackMode: can be 'datetime', 'timedelta' or 'event' :type stackMode: str """ XYImageItem.__init__( self, numpy.arange(2), numpy.arange(2), numpy.zeros((2, 2)), param=param, ) TaurusBaseComponent.__init__(self, self.__class__.__name__) self.maxBufferSize = buffersize self._yValues = None self._xBuffer = None self._zBuffer = None self.stackMode = stackMode self.set_interpolation(INTERP_NEAREST) self.__timeOffset = None # Config properties self.registerConfigProperty( self.get_lut_range, self.set_lut_range, "lut_range" ) self.registerConfigProperty( self._get_interpolation_cfg, self._set_interpolation_cfg, "interpolation", ) self.registerConfigProperty( self.get_color_map_name, self.set_color_map, "color_map" ) def _get_interpolation_cfg(self): ret = self.get_interpolation() if len(ret) == 2: ret = (ret[0], len(ret[1])) return ret def _set_interpolation_cfg(self, interpolate_cfg): self.set_interpolation(*interpolate_cfg)
[docs] def setBufferSize(self, buffersize): """sets the size of the stack :param buffersize: size of the stack :type buffersize: int """ self.maxBufferSize = buffersize try: if self._xBuffer is not None: self._xBuffer.setMaxSize(buffersize) if self._zBuffer is not None: self._zBuffer.setMaxSize(buffersize) except ValueError: self.info( "buffer downsizing requested. " + "Current contents will be discarded" ) self._xBuffer = None self._zBuffer = None
[docs] def setModel(self, model, **kwargs): # do the standard stuff TaurusBaseComponent.setModel(self, model, **kwargs) # ... and fire a fake event for initialization try: value = self.getModelObj(**kwargs).read() self.fireEvent( self, taurus.core.taurusbasetypes.TaurusEventType.Change, value ) except Exception: pass
[docs] def handleEvent(self, evt_src, evt_type, evt_value): if evt_value is None or getattr(evt_value, "rvalue", None) is None: self.debug("Ignoring event from %s" % repr(evt_src)) return plot = self.plot() if plot is None: return # initialization ySize = len(evt_value.rvalue) if self._yValues is None: self._yValues = numpy.arange(ySize, dtype="d") if self._xBuffer is None: self._xBuffer = ArrayBuffer( numpy.zeros(min(128, self.maxBufferSize), dtype="d"), maxSize=self.maxBufferSize, ) if self._zBuffer is None: self._zBuffer = ArrayBuffer( numpy.zeros((min(128, self.maxBufferSize), ySize), dtype="d"), maxSize=self.maxBufferSize, ) return # check that new data is compatible with previous data if ySize != self._yValues.size: self.info( "Incompatible shape in data from event " + "(orig=%i, current=%i). Ignoring", self._yValues.size, ySize, ) return # update x values if self.stackMode == "datetime": x = evt_value.time.totime() if self.__timeOffset is None: self.__timeOffset = x plot.set_axis_title("bottom", "Time") plot.set_axis_unit("bottom", "") elif self.stackMode == "deltatime": try: x = evt_value.time.totime() - self.__timeOffset except TypeError: # self.__timeOffset has not been initialized self.__timeOffset = evt_value.time.totime() x = 0 plot.set_axis_title( "bottom", "Time since %s" % evt_value.time.isoformat() ) plot.set_axis_unit("bottom", "") elif self.stackMode == "event": try: step = 1 x = self._xBuffer[-1] + step except IndexError: # this will happen when the x buffer is empty x = 0 plot.set_axis_title("bottom", "Event #") plot.set_axis_unit("bottom", "") else: raise ValueError("Unsupported stack mode %s" % self.stackMode) if len(self._xBuffer) and x <= self._xBuffer[-1]: self.info("Ignoring event (non-increasing x value)") return self._xBuffer.append(x) # update z rvalue = evt_value.rvalue if isinstance(evt_value.rvalue, Quantity): rvalue = evt_value.rvalue.magnitude # TODO: units should be checked for coherence with previous values self._zBuffer.append(rvalue) # check if there is enough data to start plotting if len(self._xBuffer) < 2: self.info("waiting for at least 2 values to start plotting") return x = self._xBuffer.contents() y = self._yValues z = self._zBuffer.contents().transpose() # Use previous LUT range (z axis range), or set to None (autoscale) # if it is uninitialized lut_range = self.get_lut_range() if lut_range[0] == lut_range[1]: lut_range = None # update the plot data self.set_data(z, lut_range=lut_range) self.set_xy(x, y) # signal data changed and replot self.dataChanged.emit() if plot is not None: value = x[-1] axis = self.xAxis() xmin, xmax = plot.get_axis_limits(axis) if value > xmax or value < xmin: self.scrollRequested.emit(plot, axis, value) plot.update_colormap_axis(self) plot.replot()
[docs] class TaurusTrend2DScanItem(TaurusTrend2DItem): _xDataKey = "point_nb" def __init__(self, channelKey, xDataKey, door, param=None, buffersize=512): TaurusTrend2DItem.__init__( self, param=param, buffersize=buffersize, stackMode=None ) self._channelKey = channelKey self._xDataKey = xDataKey self.connectWithQDoor(door)
[docs] def scanDataReceived(self, packet): """packet is a dict with {type:str, "data":object} and the accepted types are: data_desc, record_data, record_end and the data objects are: seq<ColumnDesc.Todict()>, record.data dict and dict , respectively """ if packet is None: self.debug("Ignoring empty scan data packet") return id, packet = packet pcktype = packet.get("type", "__UNKNOWN_PCK_TYPE__") if pcktype == "data_desc": self._dataDescReceived(packet["data"]) elif pcktype == "record_data": self._scanLineReceived(packet["data"]) elif pcktype == "record_end": pass else: self.debug("Ignoring packet of type %s" % repr(pcktype))
[docs] def clearTrend(self): self._yValues = None self._xBuffer = None self._zBuffer = None
def _dataDescReceived(self, datadesc): """prepares the plot according to the info in the datadesc dictionary """ self.clearTrend() # decide which data to use for x if self._xDataKey is None or self._xDataKey == "<mov>": self._autoXDataKey = datadesc["ref_moveables"][0] elif self._xDataKey == "<idx>": self._autoXDataKey = "point_nb" else: self._autoXDataKey = self._xDataKey # set the x axis columndesc = datadesc.get("column_desc", []) xinfo = {"min_value": None, "max_value": None} for e in columndesc: if e["label"] == self._autoXDataKey: xinfo = e break plot = self.plot() plot.set_axis_title("bottom", self._autoXDataKey) xmin, xmax = xinfo.get("min_value"), xinfo.get("max_value") if xmin is None or xmax is None: pass # @todo: autoscale if any limit is unknown else: plot.set_axis_limits("bottom", xmin, xmax) def _scanLineReceived(self, recordData): """Receives a recordData dictionary and updates the curves associated to it .. seealso:: <Sardana>/MacroServer/scan/scandata.py:Record.data """ # obtain the x value try: xval = recordData[self._autoXDataKey] except KeyError: self.warning( 'Cannot find data "%s" in the current scan record. Ignoring', self._autoXDataKey, ) return if not numpy.isscalar(xval): self.warning( 'Data for "%s" is of type "%s". ' + "Cannot use it for the X values. Ignoring", self._autoXDataKey, type(xval), ) return # obtain y value try: chval = recordData[self._channelKey] except KeyError: self.warning( 'Cannot find data "%s" in the current scan record. Ignoring', self._channelKey, ) if chval.shape != self._yValues.shape: self.warning( 'Incompatible shape of "%s" (%s). Ignoring', self._channelKey, repr(chval.shape), ) return # initialization if self._yValues is None: self._yValues = numpy.arange(chval.size, dtype="d") if self._xBuffer is None: self._xBuffer = ArrayBuffer( numpy.zeros(min(16, self.maxBufferSize), dtype="d"), maxSize=self.maxBufferSize, ) if self._zBuffer is None: self._zBuffer = ArrayBuffer( numpy.zeros( (min(16, self.maxBufferSize), chval.size), dtype="d" ), maxSize=self.maxBufferSize, ) # update x self._xBuffer.append(xval) # update z self._zBuffer.append(chval) # check if there is enough data to start plotting if len(self._xBuffer) < 2: self.info("waiting for at least 2 values to start plotting") return x = self._xBuffer.contents() y = self._yValues z = self._zBuffer.contents().transpose() # update the plot data lut_range = ( self.get_lut_range() ) # this is the range of the z axis (color scale) if lut_range[0] == lut_range[1]: # if the range was not set, make it None (autoscale z axis) lut_range = None self.set_data(z, lut_range=lut_range) self.set_xy(x, y) # signal data changed and replot self.dataChanged.emit() plot = self.plot() if plot is not None: value = x[-1] axis = self.xAxis() xmin, xmax = plot.get_axis_limits(axis) if value > xmax or value < xmin: self.scrollRequested.emit(plot, axis, value) plot.update_colormap_axis(self) plot.replot()
[docs] def connectWithQDoor(self, doorname): """connects this TaurusTrend2DScanItem to a QDoor :param doorname: the QDoor name :type doorname: str """ qdoor = taurus.Device(doorname) qdoor.recordDataUpdated.connect(self.scanDataReceived)
[docs] def getModel(self, **kwargs): return self.__model
[docs] def setModel(self, model, **kwargs): self.__model = model
def taurusImageMain(): from guiqwt.tools import ( # noqa: F401 RectangleTool, EllipseTool, HRangeTool, PlaceAxesTool, MultiLineTool, FreeFormTool, SegmentTool, CircleTool, AnnotatedRectangleTool, AnnotatedEllipseTool, AnnotatedSegmentTool, AnnotatedCircleTool, LabelTool, AnnotatedPointTool, ObliqueRectangleTool, AnnotatedObliqueRectangleTool, ) try: # guiqwt replaced Annotated*CursorTool by *CursorTool from guiqwt.tools import AnnotatedVCursorTool, AnnotatedHCursorTool VCursorTool, HCursorTool = AnnotatedVCursorTool, AnnotatedHCursorTool except ImportError: from guiqwt.tools import VCursorTool, HCursorTool from taurus.qt.qtgui.extra_guiqwt.tools import TaurusImageChooserTool from guiqwt.plot import ImageDialog from taurus.qt.qtgui.extra_guiqwt.builder import make from taurus.qt.qtgui.application import TaurusApplication import taurus.core.util.argparse parser = taurus.core.util.argparse.get_taurus_parser() parser.set_usage("%prog [options] [<model1> [<model2>] ...]") parser.set_description("a taurus application for plotting 2D data sets") app = TaurusApplication( cmd_line_parser=parser, app_name="taurusimage", app_version=taurus.Release.version, ) args = app.get_command_line_args() # create a dialog with a plot and add the images win = ImageDialog( edit=False, toolbar=True, wintitle="Taurus Image", options=dict(show_xsection=False, show_ysection=False), ) # add tools for toolklass in ( TaurusImageChooserTool, LabelTool, HRangeTool, MultiLineTool, FreeFormTool, PlaceAxesTool, AnnotatedObliqueRectangleTool, AnnotatedEllipseTool, AnnotatedSegmentTool, AnnotatedPointTool, VCursorTool, HCursorTool, ): win.add_tool(toolklass) # add images from given models plot = win.get_plot() for m in args: img = make.image(taurusmodel=m) plot.add_item(img) # IMPORTANT: connect the cross section plots to the taurusimage so that # they are updated when the taurus data changes img.dataChanged.connect(win.update_cross_sections) win.exec_() def test1(): """Adapted from guiqwt cross_section.py example""" from guiqwt.plot import ImageDialog from taurus.qt.qtgui.extra_guiqwt.builder import make from taurus.qt.qtgui.application import TaurusApplication _ = TaurusApplication(cmd_line_parser=None) # define a taurus image # model1 = 'sys/tg_test/1/short_image_ro' # model1 = 'sys/tg_test/1/long64_image_ro' model1 = "sys/tg_test/1/ulong_image_ro" taurusimage = make.image(taurusmodel=model1) # create a dialog with a plot and add the images win = ImageDialog( edit=False, toolbar=True, wintitle="Taurus Cross sections test", options=dict(show_xsection=False, show_ysection=False), ) from taurus.qt.qtgui.extra_guiqwt.tools import TaurusImageChooserTool win.add_tool(TaurusImageChooserTool) plot = win.get_plot() plot.add_item(taurusimage) # plot.add_item(taurusxyimage) # plot.add_item(image) # plot.add_item(taurusrgbimage) # win.get_itemlist_panel().show() # IMPORTANT: connect the cross section plots to the taurusimage so that # they are updated when the taurus data changes # taurusimage.dataChanged.connect(win.update_cross_sections) win.exec_() if __name__ == "__main__": test1() # taurusImageMain()