Source code for taurus.core.util.containers

#!/usr/bin/env python

# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################

""" This module contains a set of useful containers that are not part of the
standard python distribution. """

import copy
import time
import weakref
import pickle
import json
import csv
import os
import shutil
from collections.abc import Sequence, Mapping

__all__ = [
    "CaselessList",
    "CaselessDict",
    "CaselessWeakValueDict",
    "LoopList",
    "CircBuf",
    "LIFO",
    "TimedQueue",
    "self_locked",
    "ThreadDict",
    "defaultdict",
    "defaultdict_fromkey",
    "CaselessDefaultDict",
    "DefaultThreadDict",
    "getDictAsTree",
    "ArrayBuffer",
    "dictFromSequence",
]

__docformat__ = "restructuredtext"


[docs]class CaselessList(list): """A case insensitive lists that has some caseless methods. Only allows strings as list members. Most methods that would normally return a list, return a CaselessList. (Except list() and lowercopy()) Sequence Methods implemented are : __contains__, remove, count, index, append, extend, insert, __getitem__, __setitem__, __getslice__, __setslice__ __add__, __radd__, __iadd__, __mul__, __rmul__ Plus Extra methods: findentry, copy , lowercopy, list Inherited methods : __imul__, __len__, __iter__, pop, reverse, sort """ def __init__(self, inlist=[]): list.__init__(self) for entry in inlist: if not isinstance(entry, str): raise TypeError( "Members of this object must be strings. " 'You supplied "%s" which is "%s"' % (entry, type(entry)) ) self.append(entry) def __lowerstreq(self, a, b): a = str(a) b = str(b) return a.lower() == b.lower()
[docs] def findentry(self, item): """A caseless way of checking if an item is in the list or not. It returns None or the entry. """ if not isinstance(item, str): raise TypeError( "Members of this object must be strings. " 'You supplied "%s"' % type(item) ) for entry in self: if self.__lowerstreq(item, entry): return entry return None
def __contains__(self, item): """A caseless way of checking if a list has a member in it or not.""" for entry in self: if self.__lowerstreq(item, entry): return True return False
[docs] def remove(self, item): """Remove the first occurence of an item, the caseless way.""" for entry in self: if self.__lowerstreq(item, entry): list.remove(self, entry) return raise ValueError(": list.remove(x): x not in list")
[docs] def copy(self): """Return a CaselessList copy of self.""" return CaselessList(self)
[docs] def list(self): """Return a normal list version of self.""" return list(self)
[docs] def lowercopy(self): """Return a lowercase (list) copy of self.""" return [str(entry).lower() for entry in self]
[docs] def append(self, item): """Adds an item to the list and checks it's a string.""" if not isinstance(item, str): raise TypeError( "Members of this object must be strings. " 'You supplied "%s"' % type(item) ) list.append(self, item)
[docs] def extend(self, item): """Extend the list with another list. Each member of the list must be a string. """ if not isinstance(item, list): raise TypeError( "You can only extend lists with lists. " 'You supplied "%s"' % type(item) ) for entry in item: if not isinstance(entry, str): raise TypeError( "Members of this object must be strings. " 'You supplied "%s"' % type(entry) ) list.append(self, entry)
[docs] def count(self, item): """Counts references to 'item' in a caseless manner. If item is not a string it will always return 0. """ if not isinstance(item, str): return 0 count = 0 for entry in self: if self.__lowerstreq(item, entry): count += 1 return count
[docs] def index(self, item, minindex=0, maxindex=None): """Provide an index of first occurence of item in the list. (or raise a ValueError if item not present) If item is not a string, will raise a TypeError. minindex and maxindex are also optional arguments s.index(x[, i[, j]]) return smallest k such that s[k] == x and i <= k < j """ if maxindex is None: maxindex = len(self) minindex = max(0, minindex) - 1 maxindex = min(len(self), maxindex) if not isinstance(item, str): raise TypeError( "Members of this object must be strings. " 'You supplied "%s"' % type(item) ) index = minindex while index < maxindex: index += 1 if self.__lowerstreq(item, self[index]): return index raise ValueError(": list.index(x): x not in list")
[docs] def insert(self, i, x): """s.insert(i, x) same as s[i:i] = [x] Raises TypeError if x isn't a string. """ if not isinstance(x, str): raise TypeError( "Members of this object must be strings. " 'You supplied "%s"' % type(x) ) list.insert(self, i, x)
def __setitem__(self, index, value): """For setting values in the list. index must be an integer or (extended) slice object. (__setslice__ used for simple slices) If index is an integer then value must be a string. If index is a slice object then value must be a list of strings - with the same length as the slice object requires. """ if isinstance(index, int): if not isinstance(value, str): raise TypeError( "Members of this object must be strings. " 'You supplied "%s"' % type(value) ) list.__setitem__(self, index, value) elif isinstance(index, slice): if not hasattr(value, "__len__"): raise TypeError( "Value given to set slice is not a sequence object." ) for entry in value: if not isinstance(entry, str): raise TypeError( "Members of this object must be strings. " 'You supplied "%s"' % type(entry) ) list.__setitem__(self, index, value) else: raise TypeError("Indexes must be integers or slice objects.") def __setslice__(self, i, j, sequence): """Called to implement assignment to self[i:j].""" for entry in sequence: if not isinstance(entry, str): raise TypeError( "Members of this object must be strings. " 'You supplied "%s"' % type(entry) ) list.__setslice__(self, i, j, sequence) def __getslice__(self, i, j): """Called to implement evaluation of self[i:j]. Although the manual says this method is deprecated - if I don't define it the list one is called. (Which returns a list - this returns a CaselessList) """ return CaselessList(list.__getslice__(self, i, j)) def __getitem__(self, index): """For fetching indexes. If a slice is fetched then the list returned is a CaselessList. """ if not isinstance(index, slice): return list.__getitem__(self, index) else: return CaselessList(list.__getitem__(self, index)) def __add__(self, item): """To add a list, and return a CaselessList. Every element of item must be a string. """ return CaselessList(list.__add__(self, item)) def __radd__(self, item): """To add a list, and return a CaselessList. Every element of item must be a string. """ return CaselessList(list.__add__(self, item)) def __iadd__(self, item): """To add a list in place.""" for entry in item: self.append(entry) def __mul__(self, item): """To multiply itself, and return a CaselessList. Every element of item must be a string. """ return CaselessList(list.__mul__(self, item)) def __rmul__(self, item): """To multiply itself, and return a CaselessList. Every element of item must be a string. """ return CaselessList(list.__rmul__(self, item))
[docs]class CaselessDict(dict): """A case insensitive dictionary. Use this class as a normal dictionary. The keys must be strings """ def __init__(self, other=None): if other: # Doesn't do keyword args if isinstance(other, Mapping): for k, v in other.items(): dict.__setitem__(self, k.lower(), v) else: for k, v in other: dict.__setitem__(self, k.lower(), v) def __getitem__(self, key): return dict.__getitem__(self, key.lower()) def __setitem__(self, key, value): dict.__setitem__(self, key.lower(), value) def __contains__(self, key): return dict.__contains__(self, key.lower())
[docs] def has_key(self, key): """overwritten from :meth:`dict.has_key` (needed for python2)""" return key.lower() in self
[docs] def get(self, key, def_val=None): """overwritten from :meth:`dict.get`""" return dict.get(self, key.lower(), def_val)
[docs] def setdefault(self, key, def_val=None): """overwritten from :meth:`dict.setdefault`""" return dict.setdefault(self, key.lower(), def_val)
[docs] def update(self, other): """overwritten from :meth:`dict.update`""" for k, v in other.items(): dict.__setitem__(self, k.lower(), v)
[docs] def fromkeys(self, iterable, value=None): d = CaselessDict() for k in iterable: dict.__setitem__(d, k.lower(), value) return d
[docs] def pop(self, key, def_val=None): """overwritten from :meth:`dict.pop`""" return dict.pop(self, key.lower(), def_val)
def __delitem__(self, k): dict.__delitem__(self, k.lower())
[docs]class CaselessWeakValueDict(weakref.WeakValueDictionary): def __init__(self, other=None): weakref.WeakValueDictionary.__init__(self, other) if other: # Doesn't do keyword args if isinstance(other, dict): for k, v in other.items(): weakref.WeakValueDictionary.__setitem__(self, k.lower(), v) else: for k, v in other: weakref.WeakValueDictionary.__setitem__(self, k.lower(), v) def __getitem__(self, key): return weakref.WeakValueDictionary.__getitem__(self, key.lower()) def __setitem__(self, key, value): weakref.WeakValueDictionary.__setitem__(self, key.lower(), value) def __contains__(self, key): return weakref.WeakValueDictionary.__contains__(self, key.lower())
[docs] def has_key(self, key): """overwritten from :meth:`weakref.WeakValueDictionary` (needed for python2) """ return key in self
[docs] def get(self, key, def_val=None): """overwritten from :meth:`weakref.WeakValueDictionary.get`""" return weakref.WeakValueDictionary.get(self, key.lower(), def_val)
[docs] def setdefault(self, key, def_val=None): """overwritten from :meth:`weakref.WeakValueDictionary.setdefault`""" return weakref.WeakValueDictionary.setdefault( self, key.lower(), def_val )
[docs] def update(self, other): """overwritten from :meth:`weakref.WeakValueDictionary.update`""" if other: for k, v in other.items(): weakref.WeakValueDictionary.__setitem__(self, k.lower(), v)
[docs] def fromkeys(self, iterable, value=None): d = CaselessWeakValueDict() for k in iterable: weakref.WeakValueDictionary.__setitem__(d, k.lower(), value) return d
[docs] def pop(self, key, def_val=None): """overwritten from :meth:`weakref.WeakValueDictionary.pop`""" return weakref.WeakValueDictionary.pop(self, key.lower(), def_val)
def __delitem__(self, k): weakref.WeakValueDictionary.__delitem__(self, k.lower())
class PersistentDict(dict): """Persistent dictionary with an API compatible with shelve and anydbm. See: http://code.activestate.com/recipes/576642/ (r10) The dict is kept in memory, so the dictionary operations run as fast as a regular dictionary. Write to disk is delayed until close or sync (similar to gdbm's fast mode). Input file format is automatically discovered. Output file format is selectable between pickle, json, and csv. All three serialization formats are backed by fast C implementations. """ def __init__( self, filename, flag="c", mode=None, format="pickle", *args, **kwds ): self.flag = flag # r=readonly, c=create, or n=new self.mode = mode # None or an octal triple like 0644 self.format = format # 'csv', 'json', or 'pickle' self.filename = filename if flag != "n" and os.access(filename, os.R_OK): fileobj = open(filename, "rb" if format == "pickle" else "r") with fileobj: self.load(fileobj) dict.__init__(self, *args, **kwds) def sync(self): "Write dict to disk" if self.flag == "r": return filename = self.filename tempname = filename + ".tmp" fileobj = open(tempname, "wb" if self.format == "pickle" else "w") try: self.dump(fileobj) except Exception: os.remove(tempname) raise finally: fileobj.close() shutil.move(tempname, self.filename) # atomic commit if self.mode is not None: os.chmod(self.filename, self.mode) def close(self): self.sync() def __enter__(self): return self def __exit__(self, *exc_info): self.close() def dump(self, fileobj): if self.format == "csv": csv.writer(fileobj).writerows(list(self.items())) elif self.format == "json": json.dump(self, fileobj, separators=(",", ":")) elif self.format == "pickle": pickle.dump(dict(self), fileobj, 2) else: raise NotImplementedError("Unknown format: " + repr(self.format)) def load(self, fileobj): # try formats from most restrictive to least restrictive for loader in (pickle.load, json.load, csv.reader): fileobj.seek(0) try: return self.update(loader(fileobj)) except Exception: pass raise ValueError("File not in a supported format")
[docs]class LoopList(object): """this class provides an effectively cyclic list. It can be used, e.g., for storing colors or pen properties to be changed automatically in a plot A LoopList stores an internal index to remember the last accessed item in the list. It provides previous(), current() and next() methods that return the previous, current and next items in the list. The method allItems() returns a copy of all items contained in the list. The index can be accessed by setCurrentIndex() and getCurrentIndex() (setCurrentIndex(i) additionally returns new current item). Items can be accessed ***without modifying the current index*** by using llist[i] and llist[i]=x syntax len(llist) returns the **period** of the list. .. note:: only basic methods of lists are implemented for llists. In particular, the following are **not** implemented: - slicing - resizing (append, insert, del,...) - binary operators (+,*,...) ..note:: it can be used for loops, but the loop will be infinite unless other condition is used for exiting it: - for item in llist: print(item) # This is a infinite loop!! - for i in range(len(llist)):print(llist[i]) # not infinite since len(llist) returns the period of the list """ def __init__(self, itemlist=[]): self.setItemList(itemlist)
[docs] def setItemList(self, itemlist): """sets the item list""" self._itemlist = list(itemlist) self._index = 0 self._nitems = len(self._itemlist)
[docs] def current(self): """returns current item""" try: # makes the list effectively cyclic return self._itemlist[self._index % self._nitems] except ZeroDivisionError: raise IndexError("cyclic list is empty")
[docs] def setCurrentIndex(self, index): """sets current index (and returns the corresponding item)""" self._index = index return self.current()
[docs] def getCurrentIndex(self): """returns the current index""" return self._index
def __next__(self): """advances one item in the list and returns it""" self._index += 1 return self.current() next = __next__
[docs] def previous(self): """goes one item back in the list and returns it""" self._index -= 1 return self.current()
[docs] def allItems(self): """returns the items list (one period)""" return self._itemlist
def __getitem__(self, i): try: return self._itemlist[i % self._nitems] except ZeroDivisionError: raise IndexError("cyclic list is empty") def __setitem__(self, i, y): self._itemlist[i % self._nitems] = y def __len__(self): return self._nitems
[docs]class CircBuf(object): """A circular buffer of Python values. Examples:: >>> cb = CircBuf(3) >>> cb.is_empty() 1 >>> cb.put('first') >>> cb.is_empty() 0 >>> cb.put('second') >>> cb.put('third') >>> cb.is_full() 1 >>> cb.put('fourth') >>> cb.get() 'second' >>> cb.get() 'third' >>> cb.get() 'fourth' >>> cb.is_empty() 1 """ def __init__(self, leng): """Construct an empty circular buffer.""" self.buf = leng * [None] self.len = self.g = self.p = 0
[docs] def is_empty(self): """Returns true only if CircBuf has no items.""" return self.len == 0
[docs] def is_full(self): """Returns true only if CircBuf has no space.""" return self.len == len(self.buf)
[docs] def get(self): """Retrieves an item from a non-empty circular buffer.""" result = self.buf[self.g] self.g = (self.g + 1) % len(self.buf) self.len -= 1 return result
[docs] def put(self, item): """Puts an item onto a circular buffer.""" self.buf[self.p] = item self.p = (self.p + 1) % len(self.buf) if self.len == len(self.buf): self.g = self.p else: self.len += 1
[docs]class LIFO(object): def __init__(self, max=10): self._data = [] self._max = max
[docs] def append(self, elem): if self._max == 0: return while len(self._data) >= self._max: self._data.pop(0) self._data.append(elem)
[docs] def extend(self, lst): if self._max == 0: return s = len(lst) if s >= self._max: self._data = lst[s - self._max :] else: while len(self._data) + len(lst) > self._max: self._data.pop(0) self._data.extend(lst)
[docs] def pop(self, index=0): self._data.pop(index)
[docs] def clear(self): self._data = []
[docs] def get(self): return self._data
[docs] def getCopy(self): return copy.copy(self._data)
def __str__(self): return "%s(%s)" % (self.__class__.__name__, str(self._data))
[docs]class TimedQueue(list): """A FIFO that keeps all the values introduced at least for a given time. Applied to some device servers, to force States to be kept at least a minimum time. Previously named as PyTango_utils.device.StateQueue pop(): The value is removed only if delete_time has been reached. at least 1 value is always kept in the list""" def __init__(self, arg=None): """Initializes the list with a sequence or an initial value.""" if arg is None: list.__init__(self) elif isinstance(arg, Sequence): list.__init__(self, arg) else: list.__init__(self) self.append(arg, 1)
[docs] def append(self, obj, keep=15): """Inserts a tuple with (value,insert_time,delete_time=now+keep)""" now = time.time() t = (obj, now, now + keep) list.append(self, t)
[docs] def pop(self, index=0): """Returns the indicated value, or the first one; but removes only if delete_time has been reached. All values are returned at least once. When the queue has only a value, it is not deleted. """ if not self: return None # The list is empty now = time.time() s, t1, t2 = self[index] if now < t2 or len(self) == 1: return s else: return list.pop(self, index)[0]
[docs] def index(self, obj): for i in range(len(self)): if self[i][0] == obj: return i return None
def __contains__(self, obj): for t in self: if t[0] == obj: return True return False pass
[docs]def self_locked(func, reentrant=True): """Decorator to make thread-safe class members Decorator to create thread-safe objects. .. warning:: - With Lock() this decorator should not be used to decorate nested functions; it will cause Deadlock! - With RLock this problem is avoided ... but you should rely more on python threading """ import threading def lock_fun(self, *args, **kwargs): if not hasattr(self, "lock"): if reentrant: setattr(self, "lock", threading.RLock()) else: setattr(self, "lock", threading.Lock()) if not hasattr(self, "trace"): setattr(self, "trace", False) self.lock.acquire() try: if self.trace: print("locked: %s" % self.lock) result = func(self, *args, **kwargs) finally: self.lock.release() if self.trace: print("released: %s" % self.lock) return result return lock_fun
[docs]class ThreadDict(dict): """Thread safe dictionary with redefinable read/write methods and a background thread for hardware update. All methods are thread-safe using *@self_lock* decorator. .. note:: any method decorated in this way CANNOT call other decorated methods! All values of the dictionary will be automatically updated in a separate Thread using read_method provided. Any value overwritten in the dict should launch the write_method. Briefing:: a[2] equals to a[2]=read_method(2) a[2]=1 equals to a[2]=write_method(2,1) """ def __init__( self, other=None, read_method=None, write_method=None, timewait=0.1, threaded=True, ): self.read_method = read_method self.write_method = write_method self.timewait = timewait self.threaded = threaded self._threadkeys = [] self.trace = False self.last_update = 0 self.last_cycle_start = 0 # equals to self.__class__.__base__ or type(self).__bases__[0] self.parent = type(self).mro()[1]
[docs] def tracer(self, text): print(text)
[docs] def start(self): import threading if not self.threaded: return if ( hasattr(self, "_Thread") and self._Thread and self._Thread.is_alive() ): return self.event = threading.Event() self.event.clear() self._Thread = threading.Thread(target=self.run) self._Thread.setDaemon(True) self._Thread.start()
[docs] def stop(self): if self.threaded and hasattr(self, "event"): self.event.set() if hasattr(self, "_Thread"): self._Thread.join()
[docs] def alive(self): if not hasattr(self, "_Thread") or not self._Thread: return False else: return self._Thread.is_alive()
def __del__(self): self.stop() # dict.__del__(self)
[docs] def run(self): while not self.event.isSet(): keys = self.threadkeys() for k in keys: try: # @todo It must be checked wich method for reading is # better for thread_safe if True: self.__getitem__(k, hw=True) # try this alternative in case of deadlock (it could need # extra locks inside read_method) else: if self.read_method: value = self.read_method(k) self.__setitem__(k, value, hw=False) finally: self.event.wait(self.timewait) if self.event.isSet(): break timewait = self.get_timewait() self.event.wait(timewait)
[docs] @self_locked def get_last_update(self): return self.last_update
[docs] @self_locked def set_last_update(self, value): self.last_update = value
[docs] @self_locked def get_last_cycle_start(self): return self.last_cycle_start
[docs] @self_locked def set_last_cycle_start(self, value): self.last_cycle_start = value
[docs] @self_locked def get_timewait(self): return self.timewait
[docs] @self_locked def set_timewait(self, value): self.timewait = value
[docs] @self_locked def append(self, key, value=None): if key not in self: self.parent.__setitem__(self, key, value) if key not in self._threadkeys: self._threadkeys.append(key)
[docs] @self_locked def threadkeys(self): return self._threadkeys[:]
@self_locked def __getitem__(self, key, hw=False): """This method launches a read_method execution if there's no thread on charge of doing that or if the hw flag is set to True. """ import time if (hw or not self.threaded) and self.read_method: dict.__setitem__(self, key, self.read_method(key)) self.last_update = time.time() return dict.__getitem__(self, key) @self_locked def __setitem__(self, key, value, hw=True): """This method launches a write_method execution if the hw flag is not explicitly set to False. """ import time if hw and self.write_method: # It implies that a key will not be added here to read thread! dict.__setitem__(self, key, self.write_method(*[key, value])) else: dict.__setitem__(self, key, value) self.last_update = time.time()
[docs] @self_locked def get(self, key, default=None): import time if not self.threaded and self.read_method: dict.__setitem__(self, key, self.read_method(key)) self.last_update = time.time() if default is False: return dict.get(self, key) else: return dict.get(self, key, default)
# __getitem__ = self_locked(dict.__getitem__) # __setitem__ = self_locked(dict.__setitem__) __delitem__ = self_locked(dict.__delitem__) __contains__ = self_locked(dict.__contains__) __iter__ = self_locked(dict.__iter__) pop = self_locked(dict.pop) # @self_locked # def pop(self,k,d=None): # if k in self.keys(): # self.stop() # d = dict.pop(self,k) # self.start() # return d @self_locked def __str__(self): return ( "{" + ",".join( [ "'" + str(k) + "'" + ":" + "'" + str(v) + "'" for k, v in zip(dict.keys(self), dict.values(self)) ] ) + "}" ) @self_locked def __repr__(self): return ( "{\n" + "\n,".join( [ "'" + str(k) + "'" + ":" + "'" + str(v) + "'" for k, v in zip(dict.keys(self), dict.values(self)) ] ) + "\n}" ) # __str__ = self_locked(dict.__str__) # __repr__ = self_locked(dict.__repr__) # get = self_locked(dict.get) # has_key = self_locked(dict.has_key) update = self_locked(dict.update) copy = self_locked(dict.copy) keys = self_locked(dict.keys) values = self_locked(dict.values) items = self_locked(dict.items)
# iterkeys = self_locked(dict.iterkeys) # itervalues = self_locked(dict.itervalues) # iteritems = self_locked(dict.iteritems) class SortedDict(dict): """This class implements a dictionary that returns keys in the same order they were inserted.""" def __init__(self, other=None): dict.__init__(self) self._keys = [] if other is not None: self.update(other) return def sort(self, key): """This method modifies the sorting of the dictionary overriding the existing sort key. :param key: it can be a sequence containing all the keys already existing in the dictionary or a callable providing a sorting key algorithm. """ if hasattr(key, "__call__"): self._keys = sorted(self._keys, key=key) else: for k in self._keys: if k not in self._keys: raise KeyError(k) self._keys = list(key) return self._keys[:] def __setitem__(self, k, v): if k not in self._keys: self._keys.append(k) dict.__setitem__(self, k, v) def update(self, other): if hasattr(other, "items"): other = other.items() for k, v in other: self.__setitem__(k, v) @staticmethod def fromkeys(S, v=None): return SortedDict((s, v) for s in S) def pop(self, k, d=None): """Removes key and returns its (self[key] or d or None)""" if k not in self: return d self._keys.remove(k) return dict.pop(self, k) def popitem(self): """Removes and returns last key,value pair""" k = self._keys[-1] v = self[k] self.pop(k) return k, v def clear(self): self._keys = [] return dict.clear(self) def keys(self): return self._keys[:] def values(self): return [self[k] for k in self._keys] def items(self): return [(k, self[k]) for k in self._keys] def __iter__(self): return (i for i in self._keys) def iteritems(self): return ((k, self[k]) for k in self._keys) def iterkeys(self): return (i for i in self._keys) def itervalues(self): return (self[k] for k in self._keys) try: from collections import defaultdict except Exception: class defaultdict(dict): def __init__(self, default_factory=None, *a, **kw): if default_factory is not None and not hasattr( default_factory, "__call__" ): raise TypeError("first argument must be callable") dict.__init__(self, *a, **kw) self.default_factory = default_factory def __getitem__(self, key): try: return dict.__getitem__(self, key) except KeyError: return self.__missing__(key) def __missing__(self, key): if self.default_factory is None: raise KeyError(key) self[key] = value = self.default_factory() return value def __reduce__(self): if self.default_factory is None: args = tuple() else: args = (self.default_factory,) return type(self), args, None, None, list(self.items()) def copy(self): return self.__copy__() def __copy__(self): return type(self)(self.default_factory, self) def __deepcopy__(self, memo): import copy return type(self)( self.default_factory, copy.deepcopy(list(self.items())) ) def __repr__(self): return "defaultdict(%s, %s)" % ( self.default_factory, dict.__repr__(self), )
[docs]class defaultdict_fromkey(defaultdict): """Creates a dictionary with a default_factory function that creates new elements using key as argument. Usage : new_dict = defaultdict_fromkey(method) where method like (lambda key: return new_obj(key)) Each time that new_dict[key] is called with a key that doesn't exist, method(key) is used to create the value Copied from PyAlarm device server """ def __missing__(self, key): if self.default_factory is None: raise KeyError(key) self[key] = value = self.default_factory(key) return value
[docs]class CaselessDefaultDict(defaultdict_fromkey, CaselessDict): """a join venture between caseless and default dict This class merges the two previous ones. This declaration equals to:: CaselessDefaultDict = type( 'CaselessDefaultType', (CaselessDict,defaultdict_fromkey), {} ) """ def __getitem__(self, key): return defaultdict_fromkey.__getitem__(self, key.lower()) pass
[docs]class DefaultThreadDict(defaultdict_fromkey, ThreadDict): """a join venture between thread and default dict This class merges the two previous ones. """ # todo: These two classes are not yet well integrated ... # the way a new key is added to the dict must be rewritten # explicitly. def __init__( self, other=None, default_factory=None, read_method=None, write_method=None, timewait=0.1, threaded=True, ): defaultdict_fromkey.__init__(self, default_factory) ThreadDict.__init__( self, other, read_method, write_method, timewait, threaded ) pass
[docs]def getDictAsTree(dct): """This method will print a recursive dict in a tree-like shape:: >>> print(getDictAsTree({'A':{'B':[1,2],'C':[3]}})) """ def add_to_level(lvl, d): lines = [] if isinstance(d, dict): for k, v in d.items(): print('with key "%s"' % k) lines.append([""] * lvl + [str(k)]) lines += add_to_level(lvl + 1, v) elif type(d) in [list, set]: # End of recursion for e in d: if isinstance(e, dict): lines += add_to_level(lvl, e) else: lines.append([""] * lvl + [str(e)]) else: lines.append([""] * lvl + [str(d)]) return lines ls = ["\t".join(line) for line in add_to_level(0, dct)] print("lines are : \n", ls) return "\n".join(ls)
[docs]class ArrayBuffer(object): """A data buffer which internally uses a preallocated numpy.array. An ArrayBuffer will only present the actual contents, not the full internal buffer, so when appending or extending, it behaves as if dynamic reallocation was taking place. The contents of the class:`ArrayBuffer` can be accessed as if it was a numpy array (i.e slicing notation like b[2:3], b[:,2], b[-1],... are all valid). For retrieving the full contents, see :meth:`ArrayBuffer.contents` and :meth:`ArrayBuffer.toArray` On creation, a given initial internal buffer and a maximum size are set. If the maximum size is larger than the original internal buffer, this will be automatically grown in geometrical steps if needed to accommodate the contents, up to the maximum size. Once the contents fill the maximum size, appending or extending the contents will result in older contents being discarded (in a FIFO way) The :meth:`append` and meth:`extend` methods are designed to be cheap (especially if the internal buffer size is already at the maximum size), at the expense of memory usage """ def __init__(self, buffer, maxSize=0): """Creator. :param buffer: a numpy.array suitable to be used as the internal buffer. :type buffer: numpy.array :param maxSize: Maximum size of the internal buffer. The internal buffer length will be allowed to grow up to this value. If maxSize=0 (default), the maximum size will be that of the given buffer :type maxSize: int """ self.__buffer = buffer self.__end = 0 self.__bsize = self.__buffer.shape[0] self.__maxSize = max(maxSize, self.__bsize) def __getitem__(self, i): return self.__buffer[: self.__end].__getitem__(i) def __getslice__(self, i, j): return self.__buffer[: self.__end].__getslice__(i, j) def __len__(self): return self.__end def __repr__(self): return ( "ArrayBuffer with contents = %s" % self.__buffer[: self.__end].__repr__() ) def __str__(self): return self.__buffer[: self.__end].__str__() def __bool__(self): return self.__buffer[: self.__end].__bool__() def __setitem__(self, i, x): self.__buffer[: self.__end].__setitem__(i, x) def __setslice__(self, i, j, a): if i >= self.__end or j > self.__end: raise IndexError() self.__buffer[: self.__end].__setslice__(i, j, a)
[docs] def resizeBuffer(self, newlen): """resizes the internal buffer""" if newlen < self.__end: self.__end = newlen shape = list(self.__buffer.shape) shape[0] = newlen try: self.__buffer.resize(shape) # first try to resize in-place except Exception: import numpy # if not possible, do it by copying self.__buffer = numpy.resize(self.__buffer, shape) self.__bsize = self.__buffer.shape[0]
[docs] def append(self, x): """similar to the append method in a list, except that once the maximum buffer size is reached, elements get discarded on the begginning to keep the size within the limit :param x: element to be appended :type x: scalar .. seealso:: :meth:`extend` """ try: self.__buffer[self.__end] = x except IndexError: if self.__bsize < self.__maxSize: self.resizeBuffer(min(2 * self.__bsize, self.__maxSize)) # recursively call itself again after resizing return self.append(x) self.moveLeft(1) self.__buffer[self.__end] = x self.__end += 1
[docs] def extend(self, a): """similar to the extend method of a list, except that once the maximum buffer size is reached, elements get discarded on the begginning to keep the size within the limit :param a: array of elements to append :type a: numpy.array .. seealso:: :meth:`append`, :meth:`extendLeft` """ newend = self.__end + a.shape[0] if newend < self.__bsize: self.__buffer[self.__end : newend] = a self.__end = newend elif self.__bsize < self.__maxSize: self.resizeBuffer(min(2 * self.__bsize, self.__maxSize)) # recursively call itself again after resizing return self.extend(a) else: self.moveLeft(newend - self.__bsize) self.__buffer[self.__end :] = a[-min(a.shape[0], self.__bsize) :] self.__end = self.__bsize
[docs] def extendLeft(self, a): """Prepends data to the current contents. Note that, contrary to the extent method, no data will be discarded if the maximum size limit is reached. Instead, an exception will be raised. :param a: array of elements to append :type a: numpy.array .. seealso:: :meth:`extend` """ len_a = a.shape[0] newend = self.__end + len_a if newend < self.__bsize: self.__buffer[len_a:newend] = self.__buffer[ 0 : self.__end ] # move the contents to the right self.__buffer[0:len_a] = a self.__end = newend elif newend < self.__maxSize: self.resizeBuffer(min(2 * self.__bsize, self.__maxSize)) # recursively call itself again after resizing return self.extendLeft(a) else: raise ValueError( "Maximum buffer size cannot be exceeded when " + "calling extendLeft" )
[docs] def moveLeft(self, n): """discards n elements from the begginning to make space at the end of the buffer. Moves all elements n positions to the left and the contents size gets decreased by n **Note:** if n is larger or equal than the maximum buffer size, the whole buffer is wiped :param n: :type n: int """ newend = max(0, self.__end - n) self.__buffer[0:newend] = self.__buffer[n : self.__end] self.__end = newend
[docs] def contents(self): """returns the array of the contents that have already been filled. Note that it does not return the full buffer, only those elements that have been already set. It is equivalent to b[:] :return: array of contents :rtype: numpy.array .. seealso:: :meth:`toArray` """ return self.__buffer[: self.__end]
[docs] def toArray(self): """returns a copy of the array of the contents. It is equivalent to ``b.contents.copy()`` :return: copy of array of contents :rtype: numpy.array .. seealso:: :meth:`contents` """ return self.contents().copy()
[docs] def contentsSize(self): """Equivalent to len(b) :return: length of the current contents of the ArrayBuffer (not the maximum size of the buffer) :rtype: int .. seealso:: :meth:`maxSize` """ return self.__end
[docs] def bufferSize(self): """Returns the current size of the internal buffer :return: lcurrent length of the internal buffer :rtype: int .. seealso:: :meth:`contentsSize`, :meth:`maxSize` """ return self.__bsize
[docs] def maxSize(self): """Returns the maximum size of the internal buffer, beyond which the ArrayBuffer starts discarding elements when appending :return: maximum length of the internal buffer :rtype: int .. seealso:: :meth:`contentsSize`, :meth:`append`, :meth:`extend`, :meth:`isFull` """ return self.__maxSize
[docs] def setMaxSize(self, maxSize): """Sets the maximum size of the internal buffer, beyond which the ArrayBuffer starts discarding elements when appending :param maxSize: maximum length of the internal buffer :type maxSize: int .. seealso:: :meth:`contentsSize`, :meth:`append`, :meth:`extend`, :meth:`isFull` """ if maxSize < self.__bsize: raise ValueError( "Cannot set a maximum size below the current buffer size (%i)" % self.__bsize ) self.__maxSize = maxSize
[docs] def isFull(self): """Whether the contents fill the whole of the internal buffer :return: True if the contents fill the maximum size. False otherwise. :rtype: bool .. seealso:: :meth:`maxSize` """ return self.__end >= self.__maxSize
[docs] def remainingSize(self): """returns the remaining free space in the internal buffer (e.g., 0 if it is full) :return: length of the unused space in the internal buffer :rtype: int .. seealso:: :meth:`contentsSize`, :meth:`maxSize`, """ return self.maxSize() - self.contentsSize()
def chunks(c, n): """Generator which yields successive n-sized chunks from c""" for i in range(0, len(c), n): yield c[i : i + n]
[docs]def dictFromSequence(seq): """Translates a sequence into a dictionary by converting each to elements of the sequence (k,v) into a k:v pair in the dictionary :param seq: any sequence object :type seq: sequence :return: dictionary built from the given sequence :rtype: dict """ def _pairwise(iterable): """Utility method used by dictFromSequence""" itnext = iter(iterable).__next__ while True: yield itnext(), itnext() return dict(_pairwise(seq))