Commit 0e838833 authored by Kirill Smelkov's avatar Kirill Smelkov

time: Turn Ticker and Timer into cdef classes

- cdef their attributes (else accessing any of them raises
  AttributeError), in particular use pychan and sync.Mutex in cimport mode.
- change `with mu` into explicit lock/unlock (since pyx sync.Mutex does
  not support with).
- use INFINITY instead of None for empty dt.
parent 32f34607
......@@ -21,8 +21,12 @@
from __future__ import print_function, absolute_import
from golang import go, chan, select, default, nilchan, panic
from golang import _sync # avoid cycle: context -> time -> sync -> context
from golang cimport pychan
from golang cimport sync
from libc.math cimport INFINITY
from cython cimport final
from golang import go, select, default, nilchan, panic
def pynow(): # -> t
......@@ -63,13 +67,19 @@ def after_func(dt, f): # -> Timer
#
# If the receiver is slow, Ticker does not queue events and skips them.
# Ticking can be canceled via .stop() .
class Ticker(object):
@final
cdef class Ticker:
cdef readonly pychan c
cdef double _dt
cdef sync.Mutex _mu
cdef bint _stop
def __init__(self, dt):
if dt <= 0:
panic("ticker: dt <= 0")
self.c = chan(1) # 1-buffer -- same as in Go
self.c = pychan(1) # 1-buffer -- same as in Go
self._dt = dt
self._mu = _sync.PyMutex()
self._stop = False
go(self._tick)
......@@ -77,28 +87,31 @@ class Ticker(object):
#
# It is guaranteed that ticker channel is empty after stop completes.
def stop(self):
with self._mu:
self._stop = True
self._mu.lock()
self._stop = True
# drain what _tick could have been queued already
while len(self.c) > 0:
self.c.recv()
# drain what _tick could have been queued already
while len(self.c) > 0:
self.c.recv()
self._mu.unlock()
def _tick(self):
while 1:
# XXX adjust for accumulated error δ?
pysleep(self._dt)
with self._mu:
if self._stop:
return
self._mu.lock()
if self._stop:
self._mu.unlock()
return
# send from under ._mu so that .stop can be sure there is no
# ongoing send while it drains the channel.
select(
default,
(self.c.send, pynow()),
)
# send from under ._mu so that .stop can be sure there is no
# ongoing send while it drains the channel.
select(
default,
(self.c.send, pynow()),
)
self._mu.unlock()
# Timer arranges for time event to be sent to .c channel after dt time.
......@@ -107,13 +120,20 @@ class Ticker(object):
#
# If func f is provided - when the timer fires f is called in its own goroutine
# instead of event being sent to channel .c .
class Timer(object):
@final
cdef class Timer:
cdef readonly pychan c
cdef object _f
cdef sync.Mutex _mu
cdef double _dt # +inf - stopped, otherwise - armed
cdef int _ver # current timer was armed by n'th reset
def __init__(self, dt, f=None):
self._f = f
self.c = chan(1) if f is None else nilchan
self._mu = _sync.PyMutex()
self._dt = None # None - stopped, float - armed
self._ver = 0 # current timer was armed by n'th reset
self.c = pychan(1) if f is None else nilchan
self._dt = INFINITY
self._ver = 0
self.reset(dt)
# stop cancels the timer.
......@@ -130,44 +150,51 @@ class Timer(object):
# guaranteed that after stop the function is not running - in such case
# the caller must explicitly synchronize with that function to complete.
def stop(self): # -> canceled
with self._mu:
if self._dt is None:
canceled = False
else:
self._dt = None
self._ver += 1
canceled = True
self._mu.lock()
if self._dt == INFINITY:
canceled = False
else:
self._dt = INFINITY
self._ver += 1
canceled = True
# drain what _fire could have been queued already
while len(self.c) > 0:
self.c.recv()
# drain what _fire could have been queued already
while len(self.c) > 0:
self.c.recv()
return canceled
self._mu.unlock()
return canceled
# reset rearms the timer.
#
# the timer must be either already stopped or expired.
def reset(self, dt):
with self._mu:
if self._dt is not None:
panic("Timer.reset: the timer is armed; must be stopped or expired")
self._dt = dt
self._ver += 1
go(self._fire, dt, self._ver)
self._mu.lock()
if self._dt != INFINITY:
self._mu.unlock()
panic("Timer.reset: the timer is armed; must be stopped or expired")
self._dt = dt
self._ver += 1
go(self._fire, dt, self._ver)
self._mu.unlock()
def _fire(self, dt, ver):
pysleep(dt)
with self._mu:
if self._ver != ver:
return # the timer was stopped/resetted - don't fire it
self._dt = None
# send under ._mu so that .stop can be sure that if it sees
# ._dt = None, there is no ongoing .c send.
if self._f is None:
self.c.send(pynow())
return
self._mu.lock()
if self._ver != ver:
self._mu.unlock()
return # the timer was stopped/resetted - don't fire it
self._dt = INFINITY
# send under ._mu so that .stop can be sure that if it sees
# ._dt = INFINITY, there is no ongoing .c send.
if self._f is None:
self.c.send(pynow())
self._mu.unlock()
return
self._mu.unlock()
# call ._f not from under ._mu not to deadlock e.g. if ._f wants to reset the timer.
self._f()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment