Source code for sodiumfrp.primitives

from threading import RLock
import traceback
from typing import \
    Any, \
    Callable, \
    Generic, \
    Iterable, \
    List, \
    Optional, \
    Sequence, \
    Set, \
    Tuple, \
    TypeVar, \
    Union
import weakref

from sodiumfrp.lazy import Lazy
from sodiumfrp.listener import Listener
from sodiumfrp.node import NODE_NULL, Node, Target
from sodiumfrp.transaction import Transaction
from sodiumfrp.typing import Handler, TransactionHandler
from sodiumfrp.unit import Unit, UNIT

A = TypeVar("A")
B = TypeVar("B")
C = TypeVar("C")
S = TypeVar("S")
T = TypeVar("T")


[docs]class Stream(Generic[A]): """ Represents a stream of discrete events/firings containing values of type `A`. """ _keep_listeners_alive: Set[Listener] = set() _keep_listeners_alive_lock: RLock = RLock()
[docs] def __init__(self, node: Node, finalizers: List[Listener], firings: List[A]) -> None: """ Shouldn't be constructed explicitly. """ self._node = node self._finalizers = finalizers self._firings = firings
[docs] @staticmethod def never() -> "StreamWithSend[A]": """ A stream that never fires. """ return StreamWithSend()
[docs] def listen(self, handler: Handler[A]) -> Listener: """ Listen for events/firings on this stream. This is the observer pattern. The returned :class:`~sodiumfrp.listener.Listener` has an :meth:`~sodiumfrp.listener.Listener.unlisten` method to cause the listener to be removed. This is an **operational** mechanism is for interfacing between the world of I/O and for FRP. :param handler: The handler to execute when there's a new value. You should make no assumptions about what thread you are called on, and the handler should not block. You are not allowed to use :meth:`CellSink.send` or :meth:`StreamSink.send` in the handler. An exception will be thrown, because you are not meant to use this to create your own primitives. """ l0 = self.listen_weak(handler) def unlisten(_self: Listener) -> None: l0.unlisten() with Stream._keep_listeners_alive_lock: Stream._keep_listeners_alive.remove(_self) l = Listener(unlisten) with Stream._keep_listeners_alive_lock: Stream._keep_listeners_alive.add(l) return l
def _listen(self, target: Node, action: TransactionHandler[A]) -> Listener: return Transaction._apply( lambda trans1: self._listen_internal( target, trans1, action, False))
[docs] def listen_weak(self, action: Handler[A]) -> Listener: """ A variant of :meth:`listen` that will deregister the listener automatically if the listener is garbage collected. With :meth:`listen`, the listener is only deregistered if :meth:`Listener.unlisten() <sodiumfrp.listener.Listener.unlisten>` is called explicitly. This method should be used for listeners that are to be passed to :meth:`Stream.add_cleanup` to ensure that things don't get kept alive when they shouldn't. """ return self._listen(NODE_NULL, lambda trans2, a: action(a))
def _listen_internal(self, target: Node, trans: Transaction, action: TransactionHandler[A], suppress_earlier_firings: bool) -> Listener: node_target_: List[Target] = [None] with Transaction._listeners_lock: if self._node._link_to(action, target, node_target_): trans._to_regen = True node_target = node_target_[0] firings = self._firings.copy() if (not suppress_earlier_firings) and (len(firings) > 0): def handler(trans2: Optional[Transaction]) -> None: for firing in firings: Transaction.in_callback += 1 try: action(trans2, firing) except: # Don't allow transactions to interfere with Sodium # internals. _print_stack() finally: Transaction.in_callback -= 1 trans._prioritized(target, handler) captures: List[Any] = [ # It's essential that we keep the listener alive while # the caller holds the Listener, so that the finalizer doesn't # get triggered. self, # It's also essential that we keep the action alive, since # the node uses a weak reference. action, node_target ] def unlisten(_self: Listener) -> None: event, _, target = captures with Transaction._listeners_lock: if event is not None: event._node._unlink_to(target) # Release captured references captures[0] = None captures[1] = None captures[2] = None return Listener(unlisten)
[docs] def map(self, f: Callable[[A], B]) -> "Stream[B]": """ Transform the stream's event values according to the supplied function, so the returned Stream's event values reflect the value of the function applied to the input Stream's event values. :param f: Function to apply to convert the values. It may construct FRP logic or use :meth:`Cell.sample` in which case it is equivalent to :meth:`Stream.snapshot`'ing the cell. Apart from this the function must be **referentially transparent**. """ out: StreamWithSend[B] = StreamWithSend() l = self._listen(out._node, lambda trans2, a: out._send(trans2, f(a))) return out._unsafe_add_cleanup(l)
[docs] def map_to(self, b: B) -> "Stream[B]": """ Returns a stream that outputs `b` for each event of the source stream. """ return self.map(lambda _: b)
[docs] def starmap(self, f: Callable[...,T]) -> "Stream[T]": """ Like :meth:`Stream.map`, except that the elements of the stream are iterables that are unpacked as arguments when passing to the mapping function. """ return self.map(lambda t: f(*t))
[docs] def hold(self, init_value: A) -> "Cell[A]": """ Create a cell with the specified initial value, that is updated by this stream's event values. There is an implicit delay: state updates caused by event firings don't become visible as the cell's current value as viewed by :meth:`Stream.snapshot` until the following transaction. To put this another way, :meth:`Stream.snapshot` always sees the value of a cell as it was before any state changes from the current transaction. """ assert isinstance(self, StreamWithSend) return Transaction._apply( lambda trans: Cell(self, init_value))
[docs] def hold_lazy(self, init_value: Lazy[A]) -> "Cell[A]": """ A variant of :meth:`hold` with an initial value captured by :meth:`Cell.sample_lazy`. """ return Transaction._apply( lambda trans: self._hold_lazy(trans, init_value))
def _hold_lazy(self, _: Transaction, init_value: Lazy[A]) -> "Cell[A]": return LazyCell(self, init_value)
[docs] def snapshot(self, *args: Union["Cell", Callable[...,T]]) -> "Stream[T]": """ Return a stream whose events are the result of the combination, using the specified function, of the input stream's event value and the values of the provided cells at the time of the event. The comibining function is optional and, if it is present, must be provided as the last argument. If the function is omitted, the default combining function will be used, which ignores the stream's value and returns the values of the provided cells as a tuple. If only one cell provided, the default combining function returns just the value of this cell. There is an implicit delay: state updates caused by event firings being held with :meth:`Stream.hold` don't become visible as the cell's current value until the following transaction. To put this another way, :meth:`Stream.snapshot` always sees the value of a cell as it was before any state changes from the current transaction. """ def parse_args() -> Tuple[Tuple, Callable]: if len(args) == 0: raise RuntimeError( "Stream.snapshot() expects one or more arguments") def check_cell_types(cells: Iterable[Cell]) -> None: for index, cell in enumerate(cells): if not isinstance(cell, Cell): raise TypeError( f"Argument {index} must be of type Cell. " f"Actual type: {type(cell)}") if callable(args[-1]): cells = args[:-1] if len(cells) == 0: raise RuntimeError("No cells were provided") check_cell_types(cells) f = args[-1] combine = lambda a: \ f(a, *map(lambda c: c._sample_no_trans(), cells)) return cells, combine else: cells = args check_cell_types(cells) if len(cells) == 1: cell = cells[0] combine = lambda a: cell._sample_no_trans() else: combine = lambda a: \ tuple(map(lambda c: c._sample_no_trans(), cells)) return cells, combine cells, combine = parse_args() out: StreamWithSend[T] = StreamWithSend() l = self._listen(out._node, lambda trans2, a: out._send(trans2, combine(a))) return out._unsafe_add_cleanup(l)
[docs] def or_else(self, s: "Stream[A]") -> "Stream[A]": """ Variant of :meth:`merge_with` that merges two streams and will drop an event in the simultaneous case. In the case where two events are simultaneous (i.e. both within the same transaction), the event from `this` will take precedence, and the event from `s` will be dropped. If you want to specify your own combining function, use :meth:`merge_with`. `s1.or_else(s2)` is equivalent to `s1.merge_with(s2, lambda l, r: l)`. The name `or_else` is used instead of `merge_with` to make it really clear that care should be taken, because events can be dropped. """ return self.merge_with(s, lambda left, right: left)
@staticmethod def _merge(ea: "Stream[A]", eb: "Stream[A]") -> "Stream[A]": out: StreamWithSend[A] = StreamWithSend() left = Node(0) right = out._node node_target_: List[Target] = [None] left._link_to(lambda _: None, right, node_target_) node_target = node_target_[0] h = out._send l1 = ea._listen(left, h) l2 = eb._listen(right, h) def unlisten(_self: Listener) -> None: left._unlink_to(node_target) return out \ ._unsafe_add_cleanup(l1) \ ._unsafe_add_cleanup(l2) \ ._unsafe_add_cleanup(Listener(unlisten))
[docs] def merge_with(self, s: "Stream[A]", f: Callable[[A, A], A]) -> "Stream[A]": """ Merge two streams of the same type into one, so that events on either input appear on the returned stream. If the events are simultaneous (that is, one event from this and one from `s` occurring in the same transaction), combine them into one using the specified combining function so that the returned stream is guaranteed only ever to have one event per transaction. The event from `self` will appear at the left input of the combining function, and the event from `s` will appear at the right. :param f: Function to combine the values. It may construct FRP logic or use :meth:`Cell.sample`. Apart from this the function must be **referentially transparent**. """ return Transaction._apply( lambda trans: Stream._merge(self, s)._coalesce(trans, f))
[docs] @staticmethod def or_else_(*streams: "Stream[A]") -> "Stream[A]": """ Variant of :meth:`or_else` that merges a collection of streams. """ return Stream.merge(lambda left, right: left, *streams)
[docs] @staticmethod def merge( f: Callable[[A,A],A], *streams: "Stream[A]") -> "Stream[A]": """ Variant of :meth:`merge_with` that merges multiple streams. """ return Stream._merge_many(streams, 0, len(streams), f)
@staticmethod def _merge_many( streams: Sequence["Stream[A]"], start: int, end: int, f: Callable[[A,A],A]) -> "Stream[A]": length = end - start if length == 0: return Stream.never() elif length == 1: return streams[start] elif length == 2: left = streams[start] right = streams[start + 1] return left.merge_with(right, f) else: mid = (start + end) // 2 left = Stream._merge_many(streams, start, mid, f) right = Stream._merge_many(streams, mid, end, f) return left.merge_with(right, f) def _coalesce(self, trans1: Transaction, f: Callable[[A,A],A]) -> "Stream[A]": out: StreamWithSend[A] = StreamWithSend() h = CoalesceHandler(f, weakref.ref(out)) l = self._listen_internal(out._node, trans1, h, False) return out._unsafe_add_cleanup(l) def _last_firing_only(self, trans: Transaction) -> "Stream[A]": """ Clean up the output by discarding any firing other than the last one. """ return self._coalesce(trans, lambda first, second: second)
[docs] def filter(self, predicate: Callable[[A], bool]) -> "Stream[A]": """ Return a stream that only outputs events for which the predicate returns `True`. """ out: StreamWithSend[A] = StreamWithSend() def handler(trans2: Transaction, a: A) -> None: if predicate(a): out._send(trans2, a) l = self._listen(out._node, handler) return out._unsafe_add_cleanup(l)
[docs] def gate(self, c: "Cell[bool]") -> "Stream[A]": """ Return a stream that only outputs events from the input stream when the specified cell's value is `True`. """ skip = object() return self \ .snapshot(c, lambda a, pred: a if pred else skip) \ .filter(lambda x: x is not skip)
[docs] def collect(self, init_state: S, f: Callable[[A, S], Tuple[B, S]]) -> "Stream[B]": """ Transform an event with a generalized state loop (a Mealy machine). The function is passed the input and the old state and returns the new state and output value. :param f: Function to apply to update the state. It may construct FRP logic or use :meth:`Cell.sample` in which case it is equivalent to :meth:`Stream.snapshot`'ing the cell. Apart from this the function must be **referentially transparent**. """ return self.collect_lazy(Lazy(lambda: init_state), f)
[docs] def collect_lazy(self, init_state: Lazy[S], f: Callable[[A, S], Tuple[B, S]]) -> "Stream[B]": """ A variant of :meth:`collect` that takes an initial state returned by :meth:`Cell.sample_lazy`. """ def handler() -> Stream[B]: ea = self es: StreamLoop[S] = StreamLoop() s = es.hold_lazy(init_state) ebs = ea.snapshot(s, f) eb = ebs.map(lambda bs: bs[0]) es_out = ebs.map(lambda bs: bs[1]) es.loop(es_out) return eb return Transaction.run(handler)
[docs] def accum(self, init_state: S, f: Callable[[A, S], S]) -> "Cell[S]": """ Accumulate on input event, outputting the new state each time. :param f: Function to apply to update the state. It may construct FRP logic or use :meth:`Cell.sample` in which case it is equivalent to :meth:`Stream.snapshot`'ing the cell. Apart from this the function must be **referentially transparent**. """ return self.accum_lazy(Lazy(lambda: init_state), f)
[docs] def accum_lazy(self, init_state: Lazy[S], f: Callable[[A, S], S]) -> "Cell[S]": """ A variant of :meth:`accum` that takes an initial state returned by :meth:`Cell.sample_lazy`. """ def handler() -> Cell[S]: ea = self es: StreamLoop[S] = StreamLoop() s = es.hold_lazy(init_state) es_out = ea.snapshot(s, f) es.loop(es_out) return es_out.hold_lazy(init_state) return Transaction.run(handler)
[docs] def once(self) -> "Stream[A]": """ Return a stream that outputs only one value: the next event of the input stream, starting from the transaction in which `once()` was invoked. """ # This is a bit long-winded but it's efficient because it deregisters # the listener. la: List[Optional[Listener]] = [None] out: StreamWithSend[A] = StreamWithSend() def handler(trans: Transaction, a: A) -> None: if la[0] is not None: out._send(trans, a) la[0].unlisten() la[0] = None la[0] = self._listen(out._node, handler) return out._unsafe_add_cleanup(la[0])
[docs] def calm(self) -> "Stream[A]": """ Returns a stream that ignores repeating values produced by the source stream. """ return self._calm(None)
def _calm(self, init) -> "Stream[A]": reducer = lambda new, old: (None, old) if new == old else (new, new) return self.collect(init, reducer).filter(lambda x: x is not None) def _unsafe_add_cleanup(self, cleanup: Listener) -> "Stream[A]": """ This is not thread-safe, so one of these two conditions must apply: 1. We are within a transaction, since in the current implementation a transaction locks out all other threads. 2. The object on which this is being called has not yet been returned from the method where it was created, so it can't be shared between threads. """ self._finalizers.append(cleanup) return self
[docs] def add_cleanup(self, cleanup: Listener) -> "Stream[A]": """ Attach a listener to this stream so that its :meth:`Listener.unlisten() <sodiumfrp.listener.Listener.unlisten>` is invoked when this stream is garbage collected. Useful for functions that initiate I/O, returning the result of it through a stream. You must use this only with listeners returned by :meth:`listen_weak()` so that things don't get kept alive when they shouldn't. """ def helper() -> Stream[A]: fs_new = self._finalizers.copy() fs_new.append(cleanup) return Stream(self._node, fs_new, self._firings) return Transaction.run(helper)
def __del__(self) -> None: for l in self._finalizers: l.unlisten()
class StreamWithSend(Stream[A]): def __init__(self) -> None: super().__init__(Node(0), [], []) def _send(self, trans: Transaction, a: A) -> None: if len(self._firings) == 0: trans.last(lambda: self._firings.clear()) self._firings.append(a) with Transaction._listeners_lock: listeners = set(self._node._listeners) for target in listeners: def handler( trans2: Transaction, # Pass target as default argument, as loop variables # shouldn't be captured by a closure target: Target = target) -> None: Transaction.in_callback += 1 try: uta = target.action() # Dereference the weak reference if uta is not None: # If it hasn't been GC'ed, call it uta(trans2, a) except: # Don't allow transactions to interfere with Sodium # internals. _print_stack() finally: Transaction.in_callback -= 1 trans._prioritized(target.node, handler)
[docs]class StreamSink(StreamWithSend[A]): """ A stream that allows values to be pushed into it, acting as an interface between the world of I/O and the world of FRP. Code that exports `StreamSinks` for read-only use should downcast to :class:`Stream`. """
[docs] def __init__(self, f: Callable[[A, A], A] = None) -> None: """ Construct a `StreamSink`. Use the provided function to combine values, that were sent to the stream during the same transaction, into a single event. If the function is `None`, :meth:`send` throws an exception, if it is called more then once per transaction. The combining function should be **associative**. :param f: Function to combine the values. It may construct FRP logic or use :meth:`Cell.sample`. Apart from this the function must be **referentially transparent**. """ super().__init__() def error(left: A, right: A) -> A: raise RuntimeError("send() called more than once per " "transaction, which isn't allowed. Did you want to combine " "the events? Then pass a combining function to your " "StreamSink constructor."); self._coalescer = CoalesceHandler( f if f is not None else error, weakref.ref(self))
[docs] def send(self, a: A) -> None: """ Send a value to be made available to consumers of the stream. `send()` may not be used inside handlers registered with :meth:`Stream.listen` or :meth:`Cell.listen`. An exception will be thrown, because `StreamSink` is for interfacing I/O to FRP only. You are not meant to use this to define your own primitives. :param a: value to push into the cell. """ def handler(trans: Transaction) -> None: if trans.in_callback > 0: raise RuntimeError("You are not allowed to use send() " "inside a Sodium callback"); self._coalescer(trans, a) Transaction._run_handler(handler)
[docs]class StreamLoop(StreamWithSend[A]): """ A forward reference for a `Stream` equivalent to the `Stream` that is referenced. """
[docs] def __init__(self) -> None: """ Make an instance of `StreamLoop`. """ super().__init__() self._assigned = False if Transaction.get_current_transaction() is None: raise RuntimeError("StreamLoop/CellLoop must be used within " "an explicit transaction")
[docs] def loop(self, ea_out: Stream[A]) -> None: """ Resolve the loop to specify what the `StreamLoop` was a forward reference to. It must be invoked inside the same transaction as the place where the `StreamLoop` is used. This requires you to create an explicit transaction with :meth:`Transaction.run() <sodiumfrp.transaction.Transaction.run>`. """ if self._assigned: raise RuntimeError("StreamLoop looped more than once") self._assigned = True Transaction.run( lambda: self._unsafe_add_cleanup( ea_out._listen(self._node, self._send) ) )
class CoalesceHandler(Generic[A]): def __init__(self, f: Callable[[A,A], A], # Pass a weak reference here, to prevent circular references # in Stream._coalesce() and StreamSink.__init__(). out: "weakref.ReferenceType[StreamWithSend[A]]") -> None: self._f = f self._out = out self._accum_valid = False self._accum: A = None def __call__(self, trans1: Transaction, a: A) -> None: if self._accum_valid: self._accum = self._f(self._accum, a) else: def handler(trans2: Optional[Transaction]) -> None: self._out()._send(trans2, self._accum) self._accum_valid = False self._accum = None trans1._prioritized(self._out()._node, handler) self._accum = a self._accum_valid = True
[docs]class Cell(Generic[A]): """ Represents a value of type `A` that changes over time. """
[docs] @staticmethod def constant(value: A) -> "Cell[A]": """ Returns a cell that never changes, always holding the `value`. """ return Cell(Stream.never(), value)
[docs] def __init__(self, stream: StreamWithSend[A], init_value: A) -> None: """ Shouldn't be constructed explicitly. """ self._stream = stream self._value = init_value self._value_update: A = None self._cleanup: Listener = None self._lazy_init_value: Lazy[A] = None # Use by LazyCell def handler(trans1: Transaction) -> None: def handler2(trans2: Transaction, a: A) -> None: if self._value_update is None: def run() -> None: self._value = self._value_update self._lazy_init_value = None self._value_update = None trans2.last(run) self._value_update = a self._cleanup = stream._listen_internal( NODE_NULL, trans1, handler2, False) Transaction._run_handler(handler)
[docs] def sample(self) -> A: """ Sample the cell's current value. It may be used inside the functions passed to primitives that apply them to :class:`Stream` including :meth:`Stream.map` (in which case it is equivalent to snapshotting the cell), :meth:`Stream.snapshot`, :meth:`Stream.filter` and :meth:`Stream.merge`. It should generally be avoided in favour of :meth:`listen` so you don't miss any updates, but in many circumstances it makes sense. """ return Transaction._apply(lambda trans: self._sample_no_trans())
[docs] def sample_lazy(self) -> Lazy[A]: """ A variant of :meth:`sample` that works with :class:`CellLoop` when they haven't been looped yet. It should be used in any code that's general enough that it could be passed a :class:`CellLoop`. """ return Transaction._apply(lambda trans: self._sample_lazy(trans))
def _sample_lazy(self, trans: Transaction) -> Lazy[A]: class LazySample: def __init__(self, cell: Cell[A], has_value: bool, value: A) -> None: self.cell = cell self.has_value = has_value self.value = value s = LazySample(cell=self, has_value=None, value=None) def handler() -> None: s.value = self._value_update \ if self._value_update is not None \ else self._sample_no_trans() s.has_value = True s.cell = None trans.last(handler) return Lazy(lambda: s.value if s.has_value else s.cell.sample()) def _sample_no_trans(self) -> A: return self._value def _updates(self) -> Stream[A]: return self._stream def _value_stream(self, trans1: Transaction) -> Stream[A]: s_spark: StreamWithSend[Unit] = StreamWithSend() trans1._prioritized( s_spark._node, lambda trans2: s_spark._send(trans2, UNIT)) s_initial: Stream[A] = s_spark.snapshot(self) return s_initial.merge_with( self._updates(), lambda left, right: right)
[docs] def map(self, f: Callable[[A], B]) -> "Cell[B]": """ Transform the cell's value according to the supplied function, so the returned cell always reflects the value of the function applied to the input cell's value. """ return Transaction._apply( lambda trans: self \ ._updates() \ .map(f) \ ._hold_lazy(trans, self._sample_lazy(trans).lift(f)) )
[docs] def starmap(self, f: Callable[...,T]) -> "Cell[T]": """ Like :meth:`Cell.map`, except that the value of the cell is an iterable that is unpacked as arguments when passing to the mapping function. """ return self.map(lambda t: f(*t))
# TODO lift over multiple cells at once
[docs] def lift(self, b: "Cell[B]", f: Callable[[A,B],C]) -> "Cell[C]": """ Lift a binary function into cells, so the returned cell always reflects the specified function applied to the input cells' values. :param f: Function to apply. It must be **referentially transparent**. """ bf = self.map(lambda aa: lambda bb: f(aa, bb)) return Cell.apply(bf, b)
[docs] @staticmethod def apply(bf: "Cell[Callable[[A],B]]", ba: "Cell[A]") -> "Cell[B]": """ Apply a value inside a cell to a function inside a cell. This is the primitive for all function lifting. """ def handler(trans0: Transaction) -> "Cell[B]": out: StreamWithSend[B] = StreamWithSend() class ApplyHandler: def __init__(self) -> None: self.f: Callable[[A],B] = None self.f_present = False self.a: A = None self.a_present = False def run(self, trans1: Transaction) -> None: trans1._prioritized(out._node, lambda trans2: out._send(trans2, self.f(self.a))) out_target = out._node in_target = Node(0) node_target_: List[Target] = [None] in_target._link_to(lambda _: None, out_target, node_target_) node_target = node_target_[0] h = ApplyHandler() def handler_f(trans1: Transaction, f: Callable[[A],B]) -> None: h.f = f h.f_present = True if h.a_present: h.run(trans1) l1 = bf._value_stream(trans0)._listen(in_target, handler_f) def handler_a(trans1: Transaction, a: A) -> None: h.a = a h.a_present = True if h.f_present: h.run(trans1) l2 = ba._value_stream(trans0)._listen(in_target, handler_a) return out \ ._last_firing_only(trans0) \ ._unsafe_add_cleanup(l1) \ ._unsafe_add_cleanup(l2) \ ._unsafe_add_cleanup( Listener(lambda _: in_target._unlink_to(node_target))) \ .hold_lazy( Lazy( lambda: bf._sample_no_trans()(ba._sample_no_trans()) ) ) return Transaction._apply(handler)
[docs] def switch(self) -> "Cell": """ Do either :meth:`switch_stream` or :meth:`switch_cell` depending on the type of the value of the cell. """ if isinstance(self._value, Stream): return Cell.switch_stream(self) elif isinstance(self._value, Cell): return Cell.switch_cell(self) else: raise TypeError( "Can't apply Cell.switch() to a Cell holding value of type " f"{type(self._value)}. Type of the value must be either " "Stream or Cell.")
[docs] @staticmethod def switch_cell(bba: "Cell[Cell[A]]") -> "Cell[A]": """ Unwrap a cell inside another cell to give a time-varying cell implementation. """ def helper(trans0: Transaction) -> "Cell[A]": za = bba.sample_lazy().lift(lambda ba: ba.sample()) out: StreamWithSend[A] = StreamWithSend() class _Handler: def __init__(self) -> None: self.current_listener: Listener = None def __call__(self, trans2: Transaction, ba: Cell[A]) -> None: # Note: If any switch takes place during a transaction, # then the _last_firing_only() below will always cause # a sample to be fetched from the one we just switched # to. So anything from the old input cell that might # have happened during this transaction will be suppressed. if self.current_listener is not None: self.current_listener.unlisten() self.current_listener = ba \ ._value_stream(trans2) \ ._listen_internal(out._node, trans2, out._send, False) def __del__(self) -> None: if self.current_listener is not None: self.current_listener.unlisten() l1 = bba._value_stream(trans0)._listen(out._node, _Handler()) return out \ ._last_firing_only(trans0) \ ._unsafe_add_cleanup(l1) \ .hold_lazy(za) return Transaction._apply(helper)
[docs] @staticmethod def switch_stream(bea: "Cell[Stream[A]]") -> Stream[A]: """ Unwrap a stream inside a cell to give a time-varying stream implementation. """ return Transaction._apply( lambda trans: Cell._switch_stream(trans, bea))
@staticmethod def _switch_stream( trans1: Transaction, bea: "Cell[Stream[A]]") -> Stream[A]: out: StreamWithSend[A] = StreamWithSend() h2 = out._send class _Handler: def __init__(self) -> None: self.current_listener = bea \ ._sample_no_trans() \ ._listen_internal(out._node, trans1, h2, False) def __call__(self, trans2: Transaction, ea: Stream[A]) -> None: def run() -> None: if self.current_listener is not None: self.current_listener.unlisten() self.current_listener = ea._listen_internal( out._node, trans2, h2, True) trans2.last(run) def __del__(self) -> None: if self.current_listener is not None: self.current_listener.unlisten() h1 = _Handler() l1 = bea._updates()._listen_internal(out._node, trans1, h1, False) return out._unsafe_add_cleanup(l1)
[docs] def calm(self) -> "Cell[A]": """ Returns a cell that holds the same value as the source cell, but ignores updates that don't change value of the cell. """ init = self.sample() return self._updates()._calm(init).hold(init)
def __del__(self) -> None: if self._cleanup is not None: self._cleanup.unlisten()
[docs] def listen(self, action: Callable[[A],None]) -> Listener: """ Listen for updates to the value of this cell. This is the observer pattern. The returned :class:`~sodiumfrp.listener.Listener` has an :meth:`~sodiumfrp.listener.Listener.unlisten` method to cause the listener to be removed. This is an **operational** mechanism for interfacing between the world of I/O and for FRP. :param action: the handler to execute when there's a new value. You should make no assumptions about what thread you are called on, and the handler should not block. You are not allowed to use :meth:`CellSink.send` or :meth:`StreamSink.send` in the handler. An exception will be thrown, because you are not meant to use this to create your own primitives. """ return Transaction._apply( lambda trans: self._value_stream(trans).listen(action))
[docs]class CellSink(Cell[A]): """ A cell that allows values to be pushed into it, acting as an interface between the world of I/O and the world of FRP. Code that exports `CellSinks` for read-only use should downcast to :class:`~sodiumfrp.primitives.Cell`. """
[docs] def __init__(self, init_value: A, f: Callable[[A,A],A] = None) -> None: """ Construct a writable cell with the specified initial value. If multiple values are sent in the same transaction, the specified function is used to combine them. If the function isn't provided, :meth:`send` throws an exception when called multiple times from the same transaction. """ super().__init__(StreamSink(f), init_value)
[docs] def send(self, a: A) -> None: """ Send a value, modifying the value of the cell. `send()` may not be used inside handlers registered with :meth:`Stream.listen` or :meth:`Cell.listen`. An exception will be thrown, because `CellSink` is for interfacing I/O to FRP only. You are not meant to use this to define your own primitives. :param a: value to push into the cell. """ self._stream.send(a)
class LazyCell(Cell[A]): def __init__(self, event: Stream[A], lazy_init_value: Lazy[A]) -> None: super().__init__(event, None) self._lazy_init_value = lazy_init_value def _sample_no_trans(self) -> A: if (self._value is None) and (self._lazy_init_value is not None): self._value = self._lazy_init_value.get() self._lazy_init_value = None return self._value
[docs]class CellLoop(LazyCell[A]): """ A forward reference for a :class:`Cell` equivalent to the `Cell` that is referenced. """
[docs] def __init__(self) -> None: """ Make an instance of `CellLoop`. """ super().__init__(StreamLoop(), None)
[docs] def loop(self, a_out: Cell[A]) -> None: """ Resolve the loop to specify what the `CellLoop` was a forward reference to. It must be invoked inside the same transaction as the place where the `CellLoop` is used. This requires you to create an explicit transaction with :meth:`Transaction.run() <sodiumfrp.transaction.Transaction.run>`. """ def handler(trans: Transaction) -> Unit: self._stream.loop(a_out._updates()) self._lazy_init_value = a_out._sample_lazy(trans) return UNIT Transaction._apply(handler)
def _sample_no_trans(self) -> A: if not self._stream._assigned: raise RuntimeError("CellLoop sampled before it was looped") return super()._sample_no_trans()
def _print_stack() -> None: """ Print full stack trace. format_exc() shows only part of it. """ stack = traceback.format_stack()[:-2] exc = traceback.format_exc() stack_lines = "".join(stack).split("\n")[:-1] exc_lines = exc.split("\n") trace = "\n".join(exc_lines[:1] + stack_lines + exc_lines[1:]) print(trace)