Source code for taurus.core.util.codecs

#!/usr/bin/env python

# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################
"""
This module contains a list of codecs for the DEV_ENCODED attribute type.
All codecs are based on the pair *format, data*. The format is a string
containing the codec signature and data is a sequence of bytes (string)
containing the encoded data.

This module contains a list of codecs capable of decoding several codings like
bz2, zip and json.

The :class:`CodecFactory` class allows you to get a codec object for a given
format and also to register new codecs.
The :class:`CodecPipeline` is a special codec that is able to code/decode a
sequence of codecs. This way you can have codecs 'inside' codecs.

Example::

    >>> from taurus.core.util.codecs import CodecFactory
    >>> cf = CodecFactory()
    >>> json_codec = cf.getCodec('json')
    >>> bz2_json_codec = cf.getCodec('bz2_json')
    >>> data = range(100000)
    >>> f1, enc_d1 = json_codec.encode(('', data))
    >>> f2, enc_d2 = bz2_json_codec.encode(('', data))
    >>> print(len(enc_d1), len(enc_d2))
    688890 123511
    >>>
    >>> f1, dec_d1 = json_codec.decode((f1, enc_d1))
    >>> f2, dec_d2 = bz2_json_codec.decode((f2, enc_d2))

A Taurus related example::

    >>> # automatically get the data from a DEV_ENCODED attribute
    >>> import taurus
    >>> from taurus.core.util.codecs import CodecFactory
    >>> cf = CodecFactory()
    >>> devenc_attr = taurus.Attribute('a/b/c/devenc_attr')
    >>> v = devenc_attr.read()
    >>> codec = CodecFactory().getCodec(v.format)
    >>> f, d = codec.decode((v.format, v.value))
"""

import copy

# need by VideoImageCodec
import struct
import sys
import numpy

from .singleton import Singleton
from .log import Logger
from .containers import CaselessDict

__all__ = [
    "Codec",
    "NullCodec",
    "ZIPCodec",
    "BZ2Codec",
    "JSONCodec",
    "Utf8Codec",
    "FunctionCodec",
    "PlotCodec",
    "CodecPipeline",
    "CodecFactory",
]

__docformat__ = "restructuredtext"


[docs]class Codec(Logger): """The base class for all codecs""" def __init__(self): """Constructor""" Logger.__init__(self, self.__class__.__name__) # TODO: similar code exists in encode and decode methods from different # codecs. It should be implemented in this base class 'Codec'.
[docs] def encode(self, data, *args, **kwargs): """encodes the given data. This method is abstract an therefore must be implemented in the subclass. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] :raises: NotImplementedError """ raise NotImplementedError("encode cannot be called on abstract Codec")
[docs] def decode(self, data, *args, **kwargs): """decodes the given data. This method is abstract an therefore must be implemented in the subclass. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] :raises: NotImplementedError """ raise NotImplementedError("decode cannot be called on abstract Codec")
def __str__(self): return "%s()" % self.__class__.__name__ def __repr__(self): return "%s()" % self.__class__.__name__
[docs]class NullCodec(Codec):
[docs] def encode(self, data, *args, **kwargs): """encodes with Null encoder. Just returns the given data :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ format = "null" if len(data[0]): format += "_%s" % data[0] return format, data[1]
[docs] def decode(self, data, *args, **kwargs): """decodes with Null encoder. Just returns the given data :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ if not data[0].startswith("null"): return data format = data[0].partition("_")[2] return format, data[1]
[docs]class ZIPCodec(Codec): """A codec able to encode/decode to/from gzip format. It uses the :mod:`zlib` module Example:: >>> from taurus.core.util.codecs import CodecFactory >>> # first encode something >>> data = 100 * "Hello world\\n" >>> cf = CodecFactory() >>> codec = cf.getCodec('zip') >>> format, encoded_data = codec.encode(("", data)) >>> print(len(data), len(encoded_data)) 1200, 31 >>> format, decoded_data = codec.decode((format, encoded_data)) >>> print(decoded_data[20]) 'Hello world\\nHello wo' """
[docs] def encode(self, data, *args, **kwargs): """encodes the given data to gzip bytes. The given data **must** be bytes :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ import zlib format = "zip" if len(data[0]): format += "_%s" % data[0] return format, zlib.compress(data[1])
[docs] def decode(self, data, *args, **kwargs): """decodes the given data from a gzip bytes. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ import zlib if not data[0].startswith("zip"): return data format = data[0].partition("_")[2] return format, zlib.decompress(data[1])
[docs]class BZ2Codec(Codec): """A codec able to encode/decode to/from BZ2 format. It uses the :mod:`bz2` module Example:: >>> from taurus.core.util.codecs import CodecFactory >>> # first encode something >>> data = 100 * "Hello world\\n" >>> cf = CodecFactory() >>> codec = cf.getCodec('bz2') >>> format, encoded_data = codec.encode(("", data)) >>> print(len(data), len(encoded_data)) 1200, 68 >>> format, decoded_data = codec.decode((format, encoded_data)) >>> print(decoded_data[20]) 'Hello world\\nHello wo' """
[docs] def encode(self, data, *args, **kwargs): """encodes the given data to bz2 bytes. The given data **must** be bytes :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ import bz2 format = "bz2" if len(data[0]): format += "_%s" % data[0] return format, bz2.compress(data[1])
[docs] def decode(self, data, *args, **kwargs): """decodes the given data from bz2 bytes. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ import bz2 if not data[0].startswith("bz2"): return data format = data[0].partition("_")[2] return format, bz2.decompress(data[1])
class PickleCodec(Codec): """A codec able to encode/decode to/from pickle format. It uses the :mod:`pickle` module. Example:: >>> from taurus.core.util.codecs import CodecFactory >>> cf = CodecFactory() >>> codec = cf.getCodec('pickle') >>> >>> # first encode something >>> data = { 'hello' : 'world', 'goodbye' : 1000 } >>> format, encoded_data = codec.encode(("", data)) >>> >>> # now decode it >>> format, decoded_data = codec.decode((format, encoded_data)) >>> print(decoded_data) {'hello': 'world', 'goodbye': 1000} """ def encode(self, data, *args, **kwargs): """encodes the given data to pickle bytes. The given data **must** be a python object that :mod:`pickle` is able to convert. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ import pickle format = "pickle" if len(data[0]): format += "_%s" % data[0] # make it compact by default kwargs["protocol"] = kwargs.get("protocol", pickle.HIGHEST_PROTOCOL) return format, pickle.dumps(data[1], *args, **kwargs) def decode(self, data, *args, **kwargs): """decodes the given data from a pickle string. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ import pickle if not data[0].startswith("pickle"): return data format = data[0].partition("_")[2] if isinstance(data[1], memoryview): data = data[0], bytes(data[1]) return format, pickle.loads(data[1])
[docs]class JSONCodec(Codec): """A codec able to encode/decode to/from json format. It uses the :mod:`json` module. Example:: >>> from taurus.core.util.codecs import CodecFactory >>> cf = CodecFactory() >>> codec = cf.getCodec('json') >>> >>> # first encode something >>> data = { 'hello' : 'world', 'goodbye' : 1000 } >>> format, encoded_data = codec.encode(("", data)) >>> print(encoded_data) '{"hello": "world", "goodbye": 1000}' >>> >>> # now decode it >>> format, decoded_data = codec.decode((format, encoded_data)) >>> print(decoded_data) {'hello': 'world', 'goodbye': 1000} """
[docs] def encode(self, data, *args, **kwargs): """encodes the given data to a json string. The given data **must** be a python object that json is able to convert. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ import json format = "json" if len(data[0]): format += "_%s" % data[0] # make it compact by default kwargs["separators"] = kwargs.get("separators", (",", ":")) return format, json.dumps(data[1], *args, **kwargs)
[docs] def decode(self, data, *args, **kwargs): """decodes the given data from a json string. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ import json if not data[0].startswith("json"): return data format = data[0].partition("_")[2] ensure_ascii = kwargs.pop("ensure_ascii", False) if isinstance(data[1], memoryview): data = data[0], str(data[1]) data = json.loads(data[1]) if ensure_ascii: data = self._transform_ascii(data) return format, data
def _transform_ascii(self, data): if isinstance(data, str): return data.encode("utf-8") elif isinstance(data, dict): return self._transform_dict(data) elif isinstance(data, list): return self._transform_list(data) elif isinstance(data, tuple): return tuple(self._transform_list(data)) else: return data def _transform_list(self, lst): return [self._transform_ascii(item) for item in lst] def _transform_dict(self, dct): newdict = {} for k, v in dct.items(): newdict[self._transform_ascii(k)] = self._transform_ascii(v) return newdict
[docs]class Utf8Codec(Codec): """A codec able to encode/decode utf8 strings to/from bytes. Useful to adapt i/o encodings in a codec pipe. Example:: >>> from taurus.core.util.codecs import CodecFactory >>> cf = CodecFactory() >>> codec = cf.getCodec('zip_utf8_json') >>> >>> # first encode something >>> data = { 'hello' : 'world', 'goodbye' : 1000 } >>> format, encoded_data = codec.encode(("", data)) >>> >>> # now decode it >>> _, decoded_data = codec.decode((format, encoded_data)) >>> print(decoded_data) """
[docs] def encode(self, data, *args, **kwargs): """ Encodes the given utf8 string to bytes. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ format = "utf8" fmt, data = data if len(fmt): format += "_%s" % fmt return format, str(data).encode()
[docs] def decode(self, data, *args, **kwargs): """decodes the given data from a bytes. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ fmt, data = data fmt = fmt.partition("_")[2] return fmt, bytes(data).decode()
class BSONCodec(Codec): """A codec able to encode/decode to/from bson format. It uses the :mod:`bson` module. Example:: >>> from taurus.core.util.codecs import CodecFactory >>> cf = CodecFactory() >>> codec = cf.getCodec('bson') >>> >>> # first encode something >>> data = { 'hello' : 'world', 'goodbye' : 1000 } >>> format, encoded_data = codec.encode(("", data)) >>> >>> # now decode it >>> _, decoded_data = codec.decode((format, encoded_data)) >>> print(decoded_data) {'hello': 'world', 'goodbye': 1000} """ def encode(self, data, *args, **kwargs): """encodes the given data to a bson string. The given data **must** be a python object that bson is able to convert. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ import bson format = "bson" if len(data[0]): format += "_%s" % data[0] return format, bson.BSON.encode(data[1], *args, **kwargs) def decode(self, data, *args, **kwargs): """decodes the given data from a bson string. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ import bson if not data[0].startswith("bson"): return data format = data[0].partition("_")[2] ensure_ascii = kwargs.pop("ensure_ascii", False) data = data[0], bson.BSON(data[1]) data = self.decode(data[1]) if ensure_ascii: data = self._transform_ascii(data) return format, data def _transform_ascii(self, data): if isinstance(data, str): return data.encode("utf-8") elif isinstance(data, dict): return self._transform_dict(data) elif isinstance(data, list): return self._transform_list(data) elif isinstance(data, tuple): return tuple(self._transform_list(data)) else: return data def _transform_list(self, lst): return [self._transform_ascii(item) for item in lst] def _transform_dict(self, dct): newdict = {} for k, v in dct.items(): newdict[self._transform_ascii(k)] = self._transform_ascii(v) return newdict
[docs]class FunctionCodec(Codec): """A generic function codec""" def __init__(self, func_name): Codec.__init__(self) self._func_name = func_name
[docs] def encode(self, data, *args, **kwargs): format = self._func_name if len(data[0]): format += "_%s" % data[0] return format, {"type": self._func_name, "data": data[1]}
[docs] def decode(self, data, *args, **kwargs): if not data[0].startswith(self._func_name): return data format = data[0].partition("_")[2] return format, data[1]
[docs]class PlotCodec(FunctionCodec): """A specialization of the :class:`FunctionCodec` for plot function""" def __init__(self): FunctionCodec.__init__(self, "plot")
class VideoImageCodec(Codec): """A codec able to encode/decode to/from LImA video_image format. Example:: >>> from taurus.core.util.codecs import CodecFactory >>> import PyTango >>> # first get an image from a LImA device to decode >>> d = PyTango.DeviceProxy(ccdName) >>> data = d.read_attribute('video_last_image').value >>> cf = CodecFactory() >>> codec = cf.getCodec('VIDEO_IMAGE') >>> format,decoded_data = codec.decode(data) >>> # encode it again to check >>> format, encoded_data = codec.encode(("",decoded_data)) >>> # compare images excluding the header: >>> data[1][32:] == encoded_data[32:] >>> # The headers can be different in the frameNumber >>> struct.unpack('!IHHqiiHHHH',data[1][:32]) (1447314767, 1, 0, 6868, 1294, 964, 0, 32, 0, 0) >>> struct.unpack('!IHHqiiHHHH',encoded_data[:32]) (1447314767, 1, 0, -1, 1294, 964, 0, 32, 0, 0) """ VIDEO_HEADER_FORMAT = "!IHHqiiHHHH" def encode(self, data, *args, **kwargs): """encodes the given data to a LImA's video_image. The given data **must** be an numpy.array :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ # TODO: support encoding for colour imgage modes fmt = "videoimage" if len(data[0]): fmt += "_%s" % data[0] # imgMode depends on numpy.array dtype imgMode = self.__getModeId(str(data[1].dtype)) # frameNumber, unknown then -1 height, width = data[1].shape header = self.__packHeader(imgMode, -1, width, height) buffer = data[1].tobytes() return fmt, header + buffer def decode(self, data, *args, **kwargs): """decodes the given data from a LImA's video_image. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ if data[0].startswith("VIDEO_IMAGE"): fixedformat = data[0].replace("VIDEO_IMAGE", "videoimage") _, _, fmt = fixedformat.partition("_") elif data[0].startswith("videoimage"): _, _, fmt = data[0].partition("_") else: return data header = self.__unpackHeader( data[1][: struct.calcsize(self.VIDEO_HEADER_FORMAT)] ) imgBuffer = data[1][struct.calcsize(self.VIDEO_HEADER_FORMAT) :] dtype = self.__getDtypeId(header["imageMode"]) if header["imageMode"] == 6: # RGB24, 3 bytes per pixel rgba = numpy.frombuffer(imgBuffer, dtype) bbuf = rgba[0::3] gbuf = rgba[1::3] rbuf = rgba[2::3] r = rbuf.reshape(header["height"], header["width"]) g = gbuf.reshape(header["height"], header["width"]) b = bbuf.reshape(header["height"], header["width"]) img2D = numpy.dstack((r, g, b)) elif header["imageMode"] == 7: # RGBA 4 bytes per pixel rgba = numpy.frombuffer(imgBuffer, dtype) bbuf = rgba[0::4] gbuf = rgba[1::4] rbuf = rgba[2::4] # abuf = rgba[3::4] r = rbuf.reshape(header["height"], header["width"]) g = gbuf.reshape(header["height"], header["width"]) b = bbuf.reshape(header["height"], header["width"]) # a = abuf.reshape(header['height'],header['width']) img2D = numpy.dstack((r, g, b)) elif header["imageMode"] == 17: # YUV444 3 bytes per pixel yuv = numpy.frombuffer(imgBuffer, dtype) y = yuv[0::3] u = yuv[1::3] v = yuv[2::3] rbuff, gbuff, bbuff = self.__yuv2rgb(y, u, v) # Shape buffers to image size r = rbuff.reshape(header["height"], header["width"]) g = gbuff.reshape(header["height"], header["width"]) b = bbuff.reshape(header["height"], header["width"]) # Build the RGB image img2D = numpy.dstack((r, g, b)) elif header["imageMode"] == 16: # YUV422 4 bytes per 2 pixels yuv = numpy.frombuffer(imgBuffer, dtype) u = yuv[0::4] y1 = yuv[1::4] v = yuv[2::4] y2 = yuv[3::4] r1, g1, b1 = self.__yuv2rgb(y1, u, v) r2, g2, b2 = self.__yuv2rgb(y2, u, v) # Create RGB buffers rbuff = numpy.dstack((r1, r2)).reshape( header["height"] * header["width"] ) gbuff = numpy.dstack((g1, g2)).reshape( header["height"] * header["width"] ) bbuff = numpy.dstack((b1, b2)).reshape( header["height"] * header["width"] ) # Shape buffers to image size r = rbuff.reshape(header["height"], header["width"]) g = gbuff.reshape(header["height"], header["width"]) b = bbuff.reshape(header["height"], header["width"]) # Build the RGB image img2D = numpy.dstack((r, g, b)) elif header["imageMode"] == 15: # YUV411 6 bytes per 4 pixels yuv = numpy.frombuffer(imgBuffer, dtype) u = yuv[0::6] y1 = yuv[1::6] y2 = yuv[2::6] v = yuv[3::6] y3 = yuv[4::6] y4 = yuv[5::6] r1, g1, b1 = self.__yuv2rgb(y1, u, v) r2, g2, b2 = self.__yuv2rgb(y2, u, v) r3, g3, b3 = self.__yuv2rgb(y3, u, v) r4, g4, b4 = self.__yuv2rgb(y4, u, v) # Create RGB buffers rbuff = numpy.dstack((r1, r2, r3, r4)).reshape( header["height"] * header["width"] ) gbuff = numpy.dstack((g1, g2, g3, g4)).reshape( header["height"] * header["width"] ) bbuff = numpy.dstack((b1, b2, b3, b4)).reshape( header["height"] * header["width"] ) # Shape buffers to image size r = rbuff.reshape(header["height"], header["width"]) g = gbuff.reshape(header["height"], header["width"]) b = bbuff.reshape(header["height"], header["width"]) img2D = numpy.dstack((r, g, b)) else: img1D = numpy.frombuffer(imgBuffer, dtype) img2D = img1D.reshape(header["height"], header["width"]) return fmt, img2D def __yuv2rgb(self, y, u, v): """YUV444 to RGB888 conversion""" Cr = v - 128.0 Cb = u - 128.0 R = y + 1.402 * Cr G = y - 0.344 * Cb - 0.714 * Cr B = y + 1.772 * Cb return ( numpy.clip(R, 0, 255), numpy.clip(G, 0, 255), numpy.clip(B, 0, 255), ) def __unpackHeader(self, header): h = struct.unpack(self.VIDEO_HEADER_FORMAT, header) headerDict = {} headerDict["magic"] = h[0] headerDict["headerVersion"] = h[1] headerDict["imageMode"] = h[2] headerDict["frameNumber"] = h[3] headerDict["width"] = h[4] headerDict["height"] = h[5] headerDict["endianness"] = h[6] headerDict["headerSize"] = h[7] headerDict["padding"] = h[8:] return headerDict def __packHeader(self, imgMode, frameNumber, width, height): magic = 0x5644454F version = 1 endian = 0 if sys.byteorder == "little" else 1 hsize = struct.calcsize(self.VIDEO_HEADER_FORMAT) return struct.pack( self.VIDEO_HEADER_FORMAT, magic, version, imgMode, frameNumber, width, height, endian, hsize, 0, 0, ) # padding def __getModeId(self, mode): return { # when encode "uint8": 0, # Core.Y8, "uint16": 1, # Core.Y16, "uint32": 2, # Core.Y32, "uint64": 3, # Core.Y64, # when decode "Y8": 0, # Core.Y8, "Y16": 1, # Core.Y16, "Y32": 2, # Core.Y32, "Y64": 3, # Core.Y64, # TODO: other modes # 'RGB555' : 4, # Core.RGB555, # 'RGB565' : 5, # Core.RGB565, "RGB24": 6, # Core.RGB24, "RGB32": 7, # Core.RGB32, # 'BGR24' : 8, # Core.BGR24, # 'BGR32' : 9, # Core.BGR32, # 'BAYER RG8' : 10, # Core.BAYER_RG8, # 'BAYER RG16' : 11, # Core.BAYER_RG16, # 'BAYER BG8' : 12, # Core.BAYER_BG8, # 'BAYER BG16' : 13, # Core.BAYER_BG16, # 'I420' : 14, # Core.I420, # 'YUV411' : 15, # Core.YUV411, "YUV422": 16, # Core.YUV422, # 'YUV444' : 17, # Core.YUV444 }[mode] def __getFormatId(self, mode): return { 0: "B", 1: "H", 2: "I", 3: "L", # 4:"RGB555" # Core.RGB555, # 5:"RGB565" # Core.RGB565, 6: "RGB24", # Core.RGB24, 7: "RGB32", # Core.RGB32, # 8: "BGR24", # Core.BGR24, # 9: "BGR32" # Core.BGR32, # 10: "BAYER RG8" # Core.BAYER_RG8, # 11: "BAYER RG16" # Core.BAYER_RG16, # 12: "BAYER BG8" # Core.BAYER_BG8, # 13: "BAYER BG16" #: Core.BAYER_BG16, # 14: "I420" # Core.I420, # 15:"YUV411" # Core.YUV411, 16: "YUV422", # Core.YUV422, # 17:"YUV444" # Core.YUV444 }[mode] def __getDtypeId(self, mode): return { 0: "uint8", 1: "uint16", 2: "uint32", 3: "uint64", # "RGB555" : Core.RGB555, # "RGB565" : Core.RGB565, 6: "uint8", # Core.RGB24, 7: "uint8", # Core.RGB32, # "BGR24" : Core.BGR24, # "BGR32" : Core.BGR32, # "BAYER RG8" : Core.BAYER_RG8, # "BAYER RG16" : Core.BAYER_RG16, # "BAYER BG8" : Core.BAYER_BG8, # "BAYER BG16" : Core.BAYER_BG16, # "I420" : Core.I420, # "YUV411" : Core.YUV411, 16: "uint8", # Core.YUV422, # "YUV444" : Core.YUV444 }[mode]
[docs]class CodecPipeline(Codec, list): """The codec class used when encoding/decoding data with multiple encoders Example usage:: >>> from taurus.core.util.codecs import CodecPipeline >>> data = range(100000) >>> codec = CodecPipeline('bz2_json') >>> format, encoded_data = codec.encode(("", data)) # decode it format, decoded_data = codec.decode((format, encoded_data)) print(decoded_data) """ def __init__(self, format): """Constructor. The CodecPipeline object will be created using the :class:`CodecFactory` to search for format(s) given in the format parameter. :param format: a string representing the format. :type format: str """ Codec.__init__(self) list.__init__(self) f = CodecFactory() for i in format.split("_"): codec = f.getCodec(i) self.debug("Appending %s => %s" % (i, codec)) if codec is None: raise TypeError( "Unsupported codec %s (namely %s)" % (format, i) ) self.append(codec) self.debug("Done")
[docs] def encode(self, data, *args, **kwargs): """encodes the given data. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ for codec in reversed(self): data = codec.encode(data, *args, **kwargs) return data
[docs] def decode(self, data, *args, **kwargs): """decodes the given data. :param data: a sequence of two elements where the first item is the encoding format of the second item object :type data: sequence[str, obj] :return: a sequence of two elements where the first item is the encoding format of the second item object :rtype: sequence[str, obj] """ for codec in self: data = codec.decode(data, *args, **kwargs) return data
[docs]class CodecFactory(Singleton, Logger): """The singleton CodecFactory class. To get the singleton object do:: from taurus.core.util.codecs import CodecFactory f = CodecFactory() The :class:`CodecFactory` class allows you to get a codec object for a given format and also to register new codecs. The :class:`CodecPipeline` is a special codec that is able to code/decode a sequence of codecs. This way you can have codecs 'inside' codecs. Example:: >>> from taurus.core.util.codecs import CodecFactory >>> cf = CodecFactory() >>> json_codec = cf.getCodec('json') >>> bz2_json_codec = cf.getCodec('bz2_json') >>> data = range(100000) >>> f1, enc_d1 = json_codec.encode(('', data)) >>> f2, enc_d2 = bz2_json_codec.encode(('', data)) >>> print(len(enc_d1), len(enc_d2)) 688890 123511 >>> >>> f1, dec_d1 = json_codec.decode((f1, enc_d1)) >>> f2, dec_d2 = bz2_json_codec.decode((f2, enc_d2)) A Taurus related example:: >>> # automatically get the data from a DEV_ENCODED attribute >>> import taurus >>> from taurus.core.util.codecs import CodecFactory >>> cf = CodecFactory() >>> devenc_attr = taurus.Attribute('a/b/c/devenc_attr') >>> v = devenc_attr.read() >>> codec = CodecFactory().getCodec(v.format) >>> f, d = codec.decode((v.format, v.value)) """ #: Default minimum map of registered codecs CODEC_MAP = CaselessDict( { "json": JSONCodec, "utf8": Utf8Codec, "bson": BSONCodec, "bz2": BZ2Codec, "zip": ZIPCodec, "pickle": PickleCodec, "plot": PlotCodec, "VIDEO_IMAGE": VideoImageCodec, # deprecated "videoimage": VideoImageCodec, "null": NullCodec, "none": NullCodec, "": NullCodec, } ) def __init__(self): """Initialization. Nothing to be done here for now.""" pass
[docs] def init(self, *args, **kwargs): """Singleton instance initialization.""" name = self.__class__.__name__ self.call__init__(Logger, name) self._codec_klasses = copy.copy(CodecFactory.CODEC_MAP) # dict<str, Codec> # where: # - key is the codec format # - value is the codec object that supports the format self._codecs = CaselessDict()
[docs] def registerCodec(self, format, klass): """Registers a new codec. If a codec already exists for the given format it is removed. :param format: the codec id :type format: str :param klass: the class that handles the format :type klass: Codec class """ self._codec_klasses[format] = klass # del old codec if exists if format in self._codecs: del self._codecs[format]
[docs] def unregisterCodec(self, format): """Unregisters the given format. If the format does not exist an exception is thrown. :param format: the codec id :type format: str :raises: KeyError """ if format in self._codec_klasses: del self._codec_klasses[format] if format in self._codecs: del self._codecs[format]
[docs] def getCodec(self, format): """Returns the codec object for the given format or None if no suitable codec is found :param format: the codec id :type format: str :return: the codec object for the given format :rtype: Codec or None """ codec = self._codecs.get(format) if codec is None: codec = self._getNewCodec(format) if codec is not None: self._codecs[format] = codec return codec
def _getNewCodec(self, format): klass = self._codec_klasses.get(format) if klass is not None: ret = klass() else: try: ret = CodecPipeline(format) except Exception: ret = self._codec_klasses.get("")() return ret
[docs] def decode(self, data, *args, **kwargs): while len(data[0]): data = self.getCodec(data[0]).decode(data, *args, **kwargs) return data[1]
[docs] def encode(self, format, data, *args, **kwargs): return self.getCodec(format).encode(data, *args, **kwargs)