Source code for sodiumfrp.time

""" Utilities for working with continuous time. """

from collections import namedtuple
from threading import Condition, Event, Thread
import time
from typing import Callable, List, Optional, Set, Tuple, TypeVar

from sodiumfrp import Cell, CellSink, Stream, StreamSink, Transaction

A = TypeVar("A")
B = TypeVar("B")

Timer = namedtuple("Timer", ["time", "streams"])

[docs]class TimerSystem: def __init__(self) -> None: self._time_ms: CellSink[int] = CellSink(int(_now_ms())) self._timers: List[Timer] = [] # WARNING: running a transaction with _timers_changed locked # may cause a deadlock self._timers_changed = Condition() self._stop = Event() self._released = False worker = Thread(target=self._thread) worker.daemon = True worker.start() self._unregister_hook = Transaction.on_start( self._on_transaction_start) def __del__(self) -> None: if not self._released: self.release() def __enter__(self) -> "TimerSystem": return self def __exit__(self, *args) -> None: self.release()
[docs] def release(self) -> None: self._released = True self._stop.set() self._unregister_hook() with self._timers_changed: # Unlock the thread and let it exit the infinite loop self._timers_changed.notify_all()
def _thread(self) -> None: """ Timers are executed at the beginning of a transaction. If there are not transactions, no timers will be executed. Thus, we need a separate thread that would generate transactions firing timers. """ while not self._stop.is_set(): trigger_timers = False with self._timers_changed: timeout = None if self._timers: earliest_timer = self._timers[0] now = _now_ms() if earliest_timer.time > now: timeout = (earliest_timer.time - now) * 0.001 else: trigger_timers = True if not trigger_timers: self._timers_changed.wait(timeout) if trigger_timers: # Start an empty transaction to trigger all passed timers # through the transaction start hook Transaction.run(lambda: None) def _on_transaction_start(self) -> None: """ This should be used as a :meth:`Transaction.on_start() <sodiumfrp.transaction.Transaction.start>` hook. At the beginning of each transaction, update `self._time_ms` and execute all timers that should've been fired by this time. """ trans_time = int(_now_ms()) time_cell_updated = False with self._timers_changed: while self._timers: timer = self._timers[0] timer_time = timer.time if timer_time > trans_time: break # Update time and execute timers inside the same transaction # to guarantee simultaneity of events def timer_transaction() -> None: self._time_ms.send(timer_time) for stream in timer.streams: stream.send(timer_time) del self._timers[0] # Don't need to notify _thread(). It will wake up and # skip to the next available timer on its own. Transaction.run(timer_transaction) if timer_time == trans_time: time_cell_updated = True if not time_cell_updated: self._time_ms.send(trans_time)
[docs] def time_ms(self) -> Cell[int]: """ Returns a cell that represents current time (in milliseconds). """ return self._time_ms
[docs] def at(self, alarm_time: Cell[Optional[int]]) -> Stream[int]: """ Returns a stream that fires events at the points in time indicated by the value of the input cell. If the value of the cell is `None` or less than the current time, then no events will be fired and the timer set by the previous value of the cell will be canceled. """ alarm_stream: StreamSink[int] = StreamSink() current_alarm_time_value: List[Optional[int]] = [None] def handler(alarm_time_value: Optional[int]) -> None: if current_alarm_time_value[0] is not None: if alarm_time_value != current_alarm_time_value[0]: self._cancel_timer( current_alarm_time_value[0], alarm_stream) if alarm_time_value is not None: if alarm_time_value > self._time_ms.sample(): self._set_timer(alarm_time_value, alarm_stream) current_alarm_time_value[0] = alarm_time_value else: current_alarm_time_value[0] = None else: current_alarm_time_value[0] = None listener = alarm_time.listen(handler) return alarm_stream.add_cleanup(listener)
def _cancel_timer(self, time_: int, stream: StreamSink[int]) -> None: with self._timers_changed: timer, index = _bisect(self._timers, time_, lambda t: t.time) if timer is not None: timer.streams.discard(stream) if len(timer.streams) == 0: del self._timers[index] self._timers_changed.notify_all() def _set_timer(self, time_: int, stream: StreamSink[int]) -> None: with self._timers_changed: timer, index = _bisect(self._timers, time_, lambda t: t.time) if timer is not None: timer.streams.add(stream) else: self._timers.insert(index, Timer(time_, {stream})) self._timers_changed.notify_all()
def _now_ms() -> float: return time.monotonic() * 1000 def _bisect( a: List[A], x: B, key: Callable[[A],B]) -> Tuple[Optional[A], int]: length = len(a) lo = 0 hi = length while lo < hi: mid = (lo + hi) // 2 if key(a[mid]) < x: lo = mid + 1 else: hi = mid index = lo if index < length and key(a[index]) == x: return a[index], index else: return None, index