#!/usr/bin/env python
# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus. If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################
"""
event.py:
"""
import sys
import weakref
import threading
import time
from collections.abc import Sequence
import taurus.core
from .object import Object
__all__ = [
"BoundMethodWeakref",
"CallableRef",
"EventGenerator",
"ConfigEventGenerator",
"ListEventGenerator",
"EventListener",
"AttributeEventWait",
"AttributeEventIterator",
]
__docformat__ = "restructuredtext"
[docs]
class BoundMethodWeakref(object):
"""This class represents a weak reference to a method of an object since
weak references to methods don't work by themselves
"""
def __init__(self, bound_method, del_cb=None):
cb = del_cb and self._deleted
self.func_ref = weakref.ref(bound_method.__func__, cb)
self.obj_ref = weakref.ref(bound_method.__self__, cb)
if cb:
self.del_cb = CallableRef(del_cb)
self.already_deleted = 0
def _deleted(self, obj):
if not self.already_deleted:
del_cb = self.del_cb()
if del_cb is not None:
del_cb(self)
self.already_deleted = 1
def __call__(self):
obj = self.obj_ref()
if obj is not None:
func = self.func_ref()
if func is not None:
return func.__get__(obj)
def __hash__(self):
return id(self)
def __eq__(self, other):
if hasattr(other, "func_ref") and hasattr(other, "obj_ref"):
return (self.func_ref, self.obj_ref) == (
other.func_ref,
other.obj_ref,
)
return False
def __ne__(self, other):
return not self == other
def __repr__(self):
obj, func = self.obj_ref(), self.func_ref()
return "BoundMethodWeakRef of %s.%s" % (obj, func)
[docs]
def CallableRef(object, del_cb=None):
"""This function returns a callable weak reference to a callable object.
Object can be a callable object, a function or a method.
:param object: a callable object
:type object: callable object
:param del_cb: calback function. Default is None meaning to callback.
:type del_cb: callable object or None
:return: a weak reference for the given callable
:rtype: taurus.core.util.BoundMethodWeakref or weakref.ref
"""
im_self = None
if hasattr(object, "__self__"):
im_self = object.__self__
elif hasattr(object, "im_self"):
im_self = object.im_self
if im_self is not None:
return BoundMethodWeakref(object, del_cb)
return weakref.ref(object, del_cb)
# Reimplementation of BoundMethodWeakref class to avoid to have a hard
# reference in the event callbacks.
# Related to "Keeping references to event callbacks after unsubscribe_event"
# PyTango #185 issue.
class _BoundMethodWeakrefWithCall(BoundMethodWeakref):
def __init__(self, bound_method, del_cb=None):
"""Reimplementation of __init__ method"""
super(_BoundMethodWeakrefWithCall, self).__init__(
bound_method, del_cb=del_cb
)
self.__name__ = self.func_ref().__name__
def __call__(self, *args, **kwargs):
"""Retrieve references and call callback with arguments"""
obj = self.obj_ref()
if obj is not None:
func = self.func_ref()
if func is not None:
return func(obj, *args, **kwargs)
class EventStack(object):
"""internal usage event stack"""
def __init__(self, history=True):
self.unread_stack = []
self.read_stack = []
self.history = history
def isEmpty(self):
return len(self.unread_stack) == 0
def push(self, event):
self.unread_stack.append(event)
def getNext(self):
if self.isEmpty():
return None
event = self.unread_stack[0]
del self.unread_stack[0]
if self.history:
self.read_stack.append(event)
return event
def getAllUnread(self):
unread = self.unread_stack
self.unread_stack = []
return unread
def getAllRead(self):
read = self.read_stack
self.read_stack = []
return read
[docs]
class EventGenerator(Object):
"""Base class capable of firing events"""
WaitTimeout = 0.1
def __init__(self, name, events_active=True):
"""Event generator constructor.
:param name: the event generator name
:type name: str
:param events_active: generate events (default is True)
:type events_active: bool
:return: a new EventGenerator
:rtype: EventGenerator
"""
self.call__init__(Object)
self.event_name = name
self.cb_list = []
self.last_val = None
self.first_event_val = None
self.events_active = events_active
self.cond = threading.Condition()
self.wait_list = []
[docs]
def lock(self):
"""Locks this event generator"""
self.cond.acquire()
[docs]
def unlock(self):
"""Unlocks this event generator"""
if self.cond._is_owned():
self.cond.release()
else:
pass
[docs]
def subscribeEvent(self, cb, data=None, with_first_event=True):
"""Subscribes to the event
:param cb: a callable object
:type cb: callable
:param data: extra data to send each time an event is triggered on the
given callback. Default is None.
:type data: object
:param with_first_event: whether call the callback with the first event
value (the most recent value) during the subscription process.
Default is True.
:type data: boolean
"""
if not self.events_active:
raise RuntimeError(
"%s does not have " "events/polling active" % self.event_name
)
cb_ref = CallableRef(cb, self.unsubscribeDeletedEvent)
try:
self.lock()
if (cb_ref, data) in self.cb_list:
raise RuntimeError(
"Callback %s(%s) already reg. on %s"
% (cb, data, self.event_name)
)
self.cb_list.append((cb_ref, data))
if with_first_event:
cb(data, self.first_event_val)
finally:
self.unlock()
[docs]
def unsubscribeDeletedEvent(self, cb_ref):
"""for internal usage only"""
try:
self.lock()
aux_list = list(self.cb_list)
for i in range(len(aux_list) - 1, -1, -1):
pair = self.cb_list[i]
if pair[0] is cb_ref:
del self.cb_list[i]
finally:
self.unlock()
[docs]
def unsubscribeEvent(self, cb, data=None):
"""Unsubscribes the given callback from the event. If the callback is
not a listener for this event a debug message is generated an nothing
happens.
:param cb: a callable object
:type cb: callable
:param data: extra data to send each time an event is triggered on the
given callback. Default is None
:type data: object
"""
cb_ref = CallableRef(cb)
try:
self.lock()
if (cb_ref, data) in self.cb_list:
self.cb_list.remove((cb_ref, data))
else:
self.debug(
"Trying to unsubscribe: %s is not a listener of %s"
% (str(cb_ref), self.event_name)
)
finally:
self.unlock()
[docs]
def isSubscribed(self, cb, data=None):
"""Determines if the given callback is registered for this event.
:param cb: a callable object
:type cb: callable
:param data: extra data to send each time an event is triggered on the
given callback. Default is None
:type data: object
:return: True if callback is registered or False otherwise
:rtype: bool
"""
cb_ref = CallableRef(cb, self.unsubscribeDeletedEvent)
return (cb_ref, data) in self.cb_list
[docs]
def setEventsActive(self, events_active):
"""(De)activates events on this event generator.
:param events_active: activate/deactivate events
:type events_active: bool
"""
self.events_active = events_active
[docs]
def getEventsActive(self):
"""Determines is events are active
:return: True if events are active or False otherwise
:rtype: bool
"""
return self.events_active
[docs]
def fireEvent(self, val=None, event_val=None):
"""Fires an event.
:param val: event value
:type val: object
"""
try:
self.lock()
self.last_val = val
self.cond.notifyAll()
if event_val is None:
event_val = val
self.first_event_val = event_val
for stack in self.wait_list:
stack.push(event_val)
for cb_ref, data in self.cb_list:
cb = cb_ref()
if cb is not None:
cb(data, event_val)
finally:
self.unlock()
[docs]
def waitEvent(
self, val=None, equal=True, any=False, timeout=None, stack=None
):
"""Waits for an event to occur
:param val: event value
:type val: object
:param equal: check for equality. Default is True
:type equal: bool
:param any: if True unblock after first event, not matter what value it
has. Default is False.
:type any: bool
:param timeout: maximum time to wait (seconds). Default is None meaning
wait forever.
:type timeout: float
:param stack: For internal usage only.
:return: the value of the event that unblocked the wait
:rtype: object
"""
if not self.events_active:
raise RuntimeError(
"%s does not have " "events/polling active" % self.event_name
)
try:
self.lock()
t0 = time.time()
timeout_expired = False
stack = stack or EventStack(history=False)
while True:
curr_val = self.last_val
avail = not stack.isEmpty()
if avail:
curr_val = stack.getNext()
if any:
block = not avail
else:
block = (equal and (val != curr_val)) or (
not equal and (val == curr_val)
)
if timeout:
timeout_expired = time.time() - t0 > timeout
if not block or timeout_expired:
break
self.wait_list.append(stack)
self.cond.wait(self.WaitTimeout)
self.wait_list.remove(stack)
val = curr_val
finally:
self.unlock()
return val
[docs]
def read(self):
"""Read the last event
:return: the last event value
:rtype: object
"""
return self.last_val
[docs]
class EventListener(object):
"""A class that listens for an event with a specific value
Note: Since this class stores for each event value the last timestamp when
it occured, it should only be used for events for which the event value
domain (possible values) is limited and well known (ex: an enum)
"""
def __init__(self):
self.last_val = None
self.cond = threading.Condition()
# a set implemented as a dictionary
# dict<object, float>
# key - event value
# value - timestamp of last event with that value
self.event_set = {}
self.attr.addListener(self)
[docs]
def lock(self):
"""Locks this event listener"""
self.cond.acquire()
[docs]
def unlock(self):
"""Unlocks this event listener"""
if self.cond._is_owned():
self.cond.release()
else:
pass
[docs]
def clearEventSet(self):
"Clears the internal event buffer"
self.event_set.clear()
[docs]
def fireEvent(self, v):
"""Notifies that a given event has arrived
This function is protected inside with the object's lock. Do NOT call
this function when you have the lock acquired on this object.
:param v: event value
:type v: object
"""
try:
t = time.time()
self.lock()
self.last_val = v
self.event_set[v] = t
self.cond.notifyAll()
finally:
self.unlock()
[docs]
def waitEvent(self, val, after=0, equal=True):
"""Wait for an event with the given value. You MUST protect this
function with this object's lock before calling this method and always
unlock it afterward, of course::
from taurus.core.util.event import EventListener
class MyEvtListener(EventListener):
# Your specific listener code here
pass
evt_listener = EventListener()
try:
evt_listener.lock()
t = time.time()
go()
evt_listener.waitEvent(Stop, t)
finally:
evt_listener.unlock()
:param val: value to compare
:type val: object
:param after: timestamp. wait for events comming after the given time.
default value is 0 meaning any event after Jan 1, 1970
:type after: float
:param equal: compare for equality. equal=True means an event with the
given value, equal=False means any event which as a different value
:type equal: bool
"""
s = self.event_set
while True:
if equal:
t = s.get(val)
if t and t >= after:
return
else:
for v, t in list(s.items()):
if v == val:
continue
if t >= after:
return
self.cond.wait()
[docs]
class ConfigEventGenerator(EventGenerator):
"""Manage configuration events"""
[docs]
def fireEvent(self, val, event_val=None):
EventGenerator.fireEvent(self, val, event_val)
[docs]
class ListEventGenerator(EventGenerator):
"""Manage list events, detecting changes in the list"""
def __init__(self, name, events_active=True):
self.call__init__(EventGenerator, name, events_active)
self.last_val = []
self.first_event_val = [], [], []
[docs]
def fireEvent(self, val):
"""Notifies that a given event has arrived
This function is protected inside with the object's lock. Do NOT call
this function when you have the lock acquired on this object.
:param val: event value
:type val: object
"""
# if attribute is not alive and last time was also not alive then
# skip the event propagation
if val is None:
if len(self.last_val) == 0:
return
val = []
try:
self.lock()
val = list(val)
last_val = self.last_val
rm = [x for x in last_val if x not in val]
add = [x for x in val if x not in last_val]
event_val = val, rm, add
EventGenerator.fireEvent(self, val, event_val)
self.first_event_val = val, [], val
finally:
self.unlock()
[docs]
class AttributeEventWait(object):
"""Class designed to connect to a
:class:`taurus.core.taurusattribute.TaurusAttribute` and fire events or
wait for a certain event."""
def __init__(self, attr=None):
self._last_val = None
self._attr = None
self._cond = threading.Condition()
self._event_set = {}
if attr is not None:
self.connect(attr)
[docs]
def connect(self, attr):
"""Connect to the given attribute
:param attr: the attribute to connect to
:type attr: taurus.core.taurusattribute.TaurusAttribute
"""
needAdd = True
if self._attr is not None:
if attr == self._attr:
needAdd = False
else:
self._attr.removeListener(self)
self.clearEventSet()
self._last_val = None
self._attr = attr
if needAdd:
self._attr.addListener(self)
[docs]
def disconnect(self):
"""Disconnects from the attribute. If not connected nothing happens."""
self.clearEventSet()
if self._attr:
self._attr.removeListener(self)
self._attr = None
self._last_val = None
[docs]
def lock(self):
"""Locks this event listener"""
self._cond.acquire()
[docs]
def unlock(self):
"""Unocks this event listener"""
if self._cond._is_owned():
self._cond.release()
else:
lock = getattr(self._cond, "_Condition__lock")
th = getattr(lock, "_RLock__owner")
curr_th = threading.current_thread()
if th is not None:
name = th.name
else:
name = "<unknown>"
print(
"WARNING: Thread %s trying to unlock condition previously "
"locked by thread %s" % (curr_th.name, name)
)
[docs]
def clearEventSet(self):
"Clears the internal event buffer"
self._event_set.clear()
self._last_val = None
[docs]
def eventReceived(self, s, t, v):
"""Event listener method for the underlying attribute. Do not call this
method. It will be called internally when the attribute generates
an event.
"""
if t == taurus.core.taurusbasetypes.TaurusEventType.Config:
return
elif t == taurus.core.taurusbasetypes.TaurusEventType.Error:
self.fireEvent(None)
else:
self.fireEvent(v.rvalue)
[docs]
def fireEvent(self, v):
"""Notifies that a given event has arrived
This function is protected inside with the object's lock. Do NOT call
this function when you have the lock acquired on this object.
:param v: event value
:type v: object
"""
t = time.time()
self.lock()
try:
self._last_val = v
self._event_set[v] = t
self._cond.notifyAll()
finally:
self.unlock()
[docs]
def getLastRecordedEvent(self):
"""returns the value of the last recorded event or None if no event has
been received or the last event was an error event
:return: the last event value to be recorded
:rtype: object
"""
return self._last_val
[docs]
def getRecordedEvents(self):
"""Returns a reference to the internal dictionary used to store the
internal events. Modify the return dictionary at your own risk!
:return: reference to the internal event dictionary
:rtype: dict
"""
return self._event_set
[docs]
def getRecordedEvent(self, v):
"""Returns the the recorded local timestamp for the event with the
given value or None if no event with the given value has been recorded.
:param v: event value
:type v: object
:return: local timestamp for the event or None if no event has been
recorded
:rtype: float
"""
return self._event_set.get(v)
def _isEventRecorded(self, values, after, equal, ignore_values):
s = self._event_set
if ignore_values:
for v, t in s.items():
if t >= after:
return True
return False
if equal:
for value in values:
t = s.get(value)
if t is not None and t >= after:
return True
else:
for v, t in s.items():
if v in values:
continue
if t >= after:
return True
return False
[docs]
def waitEvent(
self, val, after=0, equal=True, timeout=None, retries=-1, any=False
):
"""Wait for an event with the given value.
:param val: value to compare
:type val: object
:param after: timestamp. wait for events comming after the given time.
default value is 0 meaning any event after Jan 1, 1970
:type after: float
:param equal: compare for equality. equal=True means an event with the
given value, equal=False means any event which as a different value
:type equal: bool
:param timeout: maximum time to wait (seconds). Default is None meaning
wait forever.
:type timeout: float
:param retries: number of maximum retries of max timeout to attempts.
Default is -1 meaning infinite number of retries. 0 means no wait.
Positive number is obvious.
:param any: if any is True ignore 'val' parameter and accept any event.
If False (default),check with given 'val' parameter
:type any: bool
"""
from taurus.core.util.log import deprecated
deprecated(
dep="`AttributeEventWait.waitEvent()`",
alt="`AttributeEventWait.waitForEvent()`",
rel="4.9.0.dev",
)
if retries == 0:
return
if timeout is None:
# if waitting forever doesn't make sense to retry
retries = 1
if after is None:
after = 0
s = self._event_set
self.lock()
try:
# increase the retries by one just because of how the loop is done
if retries > 0:
retries += 1
while retries != 0:
if any:
for v, t in s.items():
if t >= after:
return
if equal:
t = s.get(val)
if (t is not None) and (t >= after):
return
else:
for v, t in s.items():
if v == val:
continue
if t >= after:
return
self._cond.wait(timeout)
retries -= 1
except Exception as e:
sys.stderr.write(
"AttributeEventWait: Caught exception while waiting: %s\n"
% str(e)
)
raise e
finally:
self.unlock()
[docs]
def waitForEvent(
self,
values,
after=0,
equal=True,
timeout=None,
reactivity=None,
ignore_values=False,
):
"""Wait for an event that matches some given conditions.
:param values: values to compare with the received event value
:type values: seq<object>
:param after: timestamp. wait for events comming after the given time.
default value is 0 meaning any event after Jan 1, 1970
:type after: float
:param equal: determines how the comparison is made. If True, the wait
will finish when the event matches any of the given values.
If False, the wait will finish when the event differs from all the
given values.
:type equal: bool
:param timeout: maximum time to wait (seconds). Default is None meaning
wait forever.
:type timeout: float
:param reactivity: setting reactivity (seconds) waranties reactions to
aborts comming from other threads in at most the reactivity time.
Default is None meaning no reactivity within timeout.
:param ignore_values: if any is True ignore 'values' parameter and
accept any event. If False (default),check with given 'values'
parameter
:type ignore_values: bool
:return: The return value is `True` unless a given timeout expired, in
which case it is `False`.
:rtype: bool
"""
if timeout is not None and reactivity is not None:
assert timeout > reactivity
if timeout is None:
timeout = float("inf")
if reactivity is None:
reactivity = timeout
if after is None:
after = 0
start_time = time.time()
result = False
self.lock()
try:
while True:
result = self._isEventRecorded(
values, after, equal, ignore_values
)
if result:
break
elapsed_time = time.time() - start_time
if elapsed_time > timeout:
break
left_time = timeout - elapsed_time
wait_timeout = min(reactivity, left_time)
if wait_timeout == float("inf"):
wait_timeout = None
self._cond.wait(wait_timeout)
finally:
self.unlock()
return result
[docs]
class AttributeEventIterator(object):
"""Internal class. For test purposes"""
def __init__(self, *attrs):
self._attrs = None
self._cond = threading.Condition()
if len(attrs) > 0:
self.connect(attrs)
[docs]
def connect(self, attrs):
if not isinstance(attrs, Sequence):
attrs = (attrs,)
self.disconnect()
self._attrs = attrs
for attr in self._attrs:
attr.addListener(self)
[docs]
def disconnect(self):
if self._attrs is None:
return
for attr in self._attrs:
attr.removeListener(self)
[docs]
def lock(self):
self._cond.acquire()
[docs]
def unlock(self):
if self._cond._is_owned():
self._cond.release()
else:
lock = getattr(self._cond, "_Condition__lock")
th = getattr(lock, "_RLock__owner")
curr_th = threading.current_thread()
print(
(
"WARNING: Thread %s trying to unlock condition previously "
+ "locked by thread %s"
)
% (curr_th.name, th.name)
)
[docs]
def eventReceived(self, s, t, v):
if t not in (
taurus.core.taurusbasetypes.TaurusEventType.Change,
taurus.core.taurusbasetypes.TaurusEventType.Periodic,
):
return
self.fireEvent(s, v.rvalue)
[docs]
def fireEvent(self, s, v):
self.lock()
try:
self._data = s, v
self._cond.notifyAll()
finally:
self.unlock()
[docs]
def events(self, timeout=1):
self.lock()
try:
while True:
self._cond.wait(timeout)
yield self._data
except Exception as e:
print("INFO: Caught exception while waiting", str(e))
finally:
self.unlock()