Source code for taurus.core.util.timer

#!/usr/bin/env python

# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################

"""This module contains a :class:`Timer` class
"""

__all__ = ["Timer"]

__docformat__ = "restructuredtext"

import time
import threading

from .log import Logger


[docs] class Timer(Logger): """ Timer Object. Interval in seconds (The argument may be a floating point number for subsecond precision). If strict_timing is True, the timer will try to compensate for drifting due to the time it takes to execute function in each loop. """ def __init__( self, interval, function, parent, strict_timing=True, *args, **kwargs ): Logger.__init__(self, "Timer on " + function.__name__, parent) self.__lock = threading.Lock() self.__interval = interval self.__function = function self.__args = args self.__kwargs = kwargs self.__loop = False self.__alive = False self.__start_nb = 0 self.__thread = None self.__strict_timing = strict_timing
[docs] def start(self): """Start Timer Object""" with self.__lock: if not self.__alive: self.debug("Timer::start()") self.__loop = True self.__alive = True self.__start_nb += 1 thread_name = "TimerLoop %d" % self.__start_nb self.__thread = threading.Thread( target=self.__run, name=thread_name ) self.__thread.setDaemon(True) self.__thread.start()
[docs] def stop(self, sync=False): """Stop Timer Object""" self.debug("Timer::stop()") with self.__lock: self.__loop = False if sync and self.__thread is not None: self.__thread.join()
def __run(self): """Private Thread Function""" self.debug("Timer thread starting") next_time = time.time() + self.__interval while self.__loop: self.__function(*self.__args, **self.__kwargs) nap = self.__interval if self.__strict_timing: curr_time = time.time() nap = max(0, next_time - curr_time) if curr_time > next_time: self.warning( "loop function took more than loop interval (%ss)", self.__interval, ) next_time += self.__interval time.sleep(nap) self.__alive = False self.debug("Timer thread ending")