sodiumfrp package

Submodules

sodiumfrp.primitives module

class sodiumfrp.primitives.Cell(stream: StreamWithSend[A], init_value: A)[source]

Bases: Generic[A]

Represents a value of type A that changes over time.

__init__(stream: StreamWithSend[A], init_value: A) None[source]

Shouldn’t be constructed explicitly.

static apply(bf: Cell[Callable[[A], B]], ba: Cell[A]) Cell[B][source]

Apply a value inside a cell to a function inside a cell. This is the primitive for all function lifting.

calm() Cell[A][source]

Returns a cell that holds the same value as the source cell, but ignores updates that don’t change value of the cell.

static constant(value: A) Cell[A][source]

Returns a cell that never changes, always holding the value.

lift(b: Cell[B], f: Callable[[A, B], C]) Cell[C][source]

Lift a binary function into cells, so the returned cell always reflects the specified function applied to the input cells’ values.

Parameters:

f – Function to apply. It must be referentially transparent.

listen(action: Callable[[A], None]) Listener[source]

Listen for updates to the value of this cell. This is the observer pattern. The returned Listener has an unlisten() method to cause the listener to be removed. This is an operational mechanism for interfacing between the world of I/O and for FRP.

Parameters:

action – the handler to execute when there’s a new value. You should make no assumptions about what thread you are called on, and the handler should not block. You are not allowed to use CellSink.send() or StreamSink.send() in the handler. An exception will be thrown, because you are not meant to use this to create your own primitives.

map(f: Callable[[A], B]) Cell[B][source]

Transform the cell’s value according to the supplied function, so the returned cell always reflects the value of the function applied to the input cell’s value.

sample() A[source]

Sample the cell’s current value.

It may be used inside the functions passed to primitives that apply them to Stream including Stream.map() (in which case it is equivalent to snapshotting the cell), Stream.snapshot(), Stream.filter() and Stream.merge().

It should generally be avoided in favour of listen() so you don’t miss any updates, but in many circumstances it makes sense.

sample_lazy() Lazy[A][source]

A variant of sample() that works with CellLoop when they haven’t been looped yet. It should be used in any code that’s general enough that it could be passed a CellLoop.

starmap(f: Callable[[...], T]) Cell[T][source]

Like Cell.map(), except that the value of the cell is an iterable that is unpacked as arguments when passing to the mapping function.

switch() Cell[source]

Do either switch_stream() or switch_cell() depending on the type of the value of the cell.

static switch_cell(bba: Cell[Cell[A]]) Cell[A][source]

Unwrap a cell inside another cell to give a time-varying cell implementation.

static switch_stream(bea: Cell[Stream[A]]) Stream[A][source]

Unwrap a stream inside a cell to give a time-varying stream implementation.

class sodiumfrp.primitives.CellLoop[source]

Bases: LazyCell[A]

A forward reference for a Cell equivalent to the Cell that is referenced.

__init__() None[source]

Make an instance of CellLoop.

loop(a_out: Cell[A]) None[source]

Resolve the loop to specify what the CellLoop was a forward reference to. It must be invoked inside the same transaction as the place where the CellLoop is used. This requires you to create an explicit transaction with Transaction.run().

class sodiumfrp.primitives.CellSink(init_value: A, f: Optional[Callable[[A, A], A]] = None)[source]

Bases: Cell[A]

A cell that allows values to be pushed into it, acting as an interface between the world of I/O and the world of FRP. Code that exports CellSinks for read-only use should downcast to Cell.

__init__(init_value: A, f: Optional[Callable[[A, A], A]] = None) None[source]

Construct a writable cell with the specified initial value. If multiple values are sent in the same transaction, the specified function is used to combine them. If the function isn’t provided, send() throws an exception when called multiple times from the same transaction.

send(a: A) None[source]

Send a value, modifying the value of the cell. send() may not be used inside handlers registered with Stream.listen() or Cell.listen(). An exception will be thrown, because CellSink is for interfacing I/O to FRP only. You are not meant to use this to define your own primitives.

Parameters:

a – value to push into the cell.

class sodiumfrp.primitives.Stream(node: Node, finalizers: List[Listener], firings: List[A])[source]

Bases: Generic[A]

Represents a stream of discrete events/firings containing values of type A.

__init__(node: Node, finalizers: List[Listener], firings: List[A]) None[source]

Shouldn’t be constructed explicitly.

accum(init_state: S, f: Callable[[A, S], S]) Cell[S][source]

Accumulate on input event, outputting the new state each time.

Parameters:

f – Function to apply to update the state. It may construct FRP logic or use Cell.sample() in which case it is equivalent to Stream.snapshot()’ing the cell. Apart from this the function must be referentially transparent.

accum_lazy(init_state: Lazy[S], f: Callable[[A, S], S]) Cell[S][source]

A variant of accum() that takes an initial state returned by Cell.sample_lazy().

add_cleanup(cleanup: Listener) Stream[A][source]

Attach a listener to this stream so that its Listener.unlisten() is invoked when this stream is garbage collected. Useful for functions that initiate I/O, returning the result of it through a stream.

You must use this only with listeners returned by listen_weak() so that things don’t get kept alive when they shouldn’t.

calm() Stream[A][source]

Returns a stream that ignores repeating values produced by the source stream.

collect(init_state: S, f: Callable[[A, S], Tuple[B, S]]) Stream[B][source]

Transform an event with a generalized state loop (a Mealy machine). The function is passed the input and the old state and returns the new state and output value.

Parameters:

f – Function to apply to update the state. It may construct FRP logic or use Cell.sample() in which case it is equivalent to Stream.snapshot()’ing the cell. Apart from this the function must be referentially transparent.

collect_lazy(init_state: Lazy[S], f: Callable[[A, S], Tuple[B, S]]) Stream[B][source]

A variant of collect() that takes an initial state returned by Cell.sample_lazy().

filter(predicate: Callable[[A], bool]) Stream[A][source]

Return a stream that only outputs events for which the predicate returns True.

gate(c: Cell[bool]) Stream[A][source]

Return a stream that only outputs events from the input stream when the specified cell’s value is True.

hold(init_value: A) Cell[A][source]

Create a cell with the specified initial value, that is updated by this stream’s event values.

There is an implicit delay: state updates caused by event firings don’t become visible as the cell’s current value as viewed by Stream.snapshot() until the following transaction. To put this another way, Stream.snapshot() always sees the value of a cell as it was before any state changes from the current transaction.

hold_lazy(init_value: Lazy[A]) Cell[A][source]

A variant of hold() with an initial value captured by Cell.sample_lazy().

listen(handler: Callable[[A], None]) Listener[source]

Listen for events/firings on this stream. This is the observer pattern. The returned Listener has an unlisten() method to cause the listener to be removed. This is an operational mechanism is for interfacing between the world of I/O and for FRP.

Parameters:

handler – The handler to execute when there’s a new value. You should make no assumptions about what thread you are called on, and the handler should not block. You are not allowed to use CellSink.send() or StreamSink.send() in the handler. An exception will be thrown, because you are not meant to use this to create your own primitives.

listen_weak(action: Callable[[A], None]) Listener[source]

A variant of listen() that will deregister the listener automatically if the listener is garbage collected. With listen(), the listener is only deregistered if Listener.unlisten() is called explicitly.

This method should be used for listeners that are to be passed to Stream.add_cleanup() to ensure that things don’t get kept alive when they shouldn’t.

map(f: Callable[[A], B]) Stream[B][source]

Transform the stream’s event values according to the supplied function, so the returned Stream’s event values reflect the value of the function applied to the input Stream’s event values.

Parameters:

f – Function to apply to convert the values. It may construct FRP logic or use Cell.sample() in which case it is equivalent to Stream.snapshot()’ing the cell. Apart from this the function must be referentially transparent.

map_to(b: B) Stream[B][source]

Returns a stream that outputs b for each event of the source stream.

static merge(f: Callable[[A, A], A], *streams: Stream[A]) Stream[A][source]

Variant of merge_with() that merges multiple streams.

merge_with(s: Stream[A], f: Callable[[A, A], A]) Stream[A][source]

Merge two streams of the same type into one, so that events on either input appear on the returned stream.

If the events are simultaneous (that is, one event from this and one from s occurring in the same transaction), combine them into one using the specified combining function so that the returned stream is guaranteed only ever to have one event per transaction. The event from self will appear at the left input of the combining function, and the event from s will appear at the right.

Parameters:

f – Function to combine the values. It may construct FRP logic or use Cell.sample(). Apart from this the function must be referentially transparent.

static never() StreamWithSend[A][source]

A stream that never fires.

once() Stream[A][source]

Return a stream that outputs only one value: the next event of the input stream, starting from the transaction in which once() was invoked.

or_else(s: Stream[A]) Stream[A][source]

Variant of merge_with() that merges two streams and will drop an event in the simultaneous case.

In the case where two events are simultaneous (i.e. both within the same transaction), the event from this will take precedence, and the event from s will be dropped. If you want to specify your own combining function, use merge_with(). s1.or_else(s2) is equivalent to s1.merge_with(s2, lambda l, r: l).

The name or_else is used instead of merge_with to make it really clear that care should be taken, because events can be dropped.

static or_else_(*streams: Stream[A]) Stream[A][source]

Variant of or_else() that merges a collection of streams.

snapshot(*args: Union[Cell, Callable[[...], T]]) Stream[T][source]

Return a stream whose events are the result of the combination, using the specified function, of the input stream’s event value and the values of the provided cells at the time of the event.

The comibining function is optional and, if it is present, must be provided as the last argument. If the function is omitted, the default combining function will be used, which ignores the stream’s value and returns the values of the provided cells as a tuple. If only one cell provided, the default combining function returns just the value of this cell.

There is an implicit delay: state updates caused by event firings being held with Stream.hold() don’t become visible as the cell’s current value until the following transaction. To put this another way, Stream.snapshot() always sees the value of a cell as it was before any state changes from the current transaction.

starmap(f: Callable[[...], T]) Stream[T][source]

Like Stream.map(), except that the elements of the stream are iterables that are unpacked as arguments when passing to the mapping function.

class sodiumfrp.primitives.StreamLoop[source]

Bases: StreamWithSend[A]

A forward reference for a Stream equivalent to the Stream that is referenced.

__init__() None[source]

Make an instance of StreamLoop.

loop(ea_out: Stream[A]) None[source]

Resolve the loop to specify what the StreamLoop was a forward reference to. It must be invoked inside the same transaction as the place where the StreamLoop is used. This requires you to create an explicit transaction with Transaction.run().

class sodiumfrp.primitives.StreamSink(f: Optional[Callable[[A, A], A]] = None)[source]

Bases: StreamWithSend[A]

A stream that allows values to be pushed into it, acting as an interface between the world of I/O and the world of FRP. Code that exports StreamSinks for read-only use should downcast to Stream.

__init__(f: Optional[Callable[[A, A], A]] = None) None[source]

Construct a StreamSink. Use the provided function to combine values, that were sent to the stream during the same transaction, into a single event. If the function is None, send() throws an exception, if it is called more then once per transaction.

The combining function should be associative.

Parameters:

f – Function to combine the values. It may construct FRP logic or use Cell.sample(). Apart from this the function must be referentially transparent.

send(a: A) None[source]

Send a value to be made available to consumers of the stream. send() may not be used inside handlers registered with Stream.listen() or Cell.listen(). An exception will be thrown, because StreamSink is for interfacing I/O to FRP only. You are not meant to use this to define your own primitives.

Parameters:

a – value to push into the cell.

sodiumfrp.operational module

Operational primitives that must be used with care.

sodiumfrp.operational.defer(s: Stream[A]) Stream[A][source]

Push each event onto a new transaction guaranteed to come before the next externally initiated transaction. Same as split() but it works on a single value.

sodiumfrp.operational.split(s: Stream[Iterable[A]]) Stream[A][source]

Push each event in the list onto a newly created transaction guaranteed to come before the next externally initiated transaction. Note that the semantics are such that two different invocations of split() can put events into the same new transaction, so the resulting stream’s events could be simultaneous with events output by split() or defer() invoked elsewhere in the code.

sodiumfrp.operational.updates(c: Cell[A]) Stream[A][source]

A stream that gives the updates/steps for a Cell.

This is an operational primitive, which is not part of the main Sodium API. It breaks the property of non-detectability of cell steps/updates. The rule with this primitive is that you should only use it in functions that do not allow the caller to detect the cell updates.

sodiumfrp.operational.value(c: Cell[A]) Stream[A][source]

A stream that is guaranteed to fire once in the transaction where value() is invoked, giving the current value of the cell, and thereafter behaves like updates(), firing for each update/step of the cell’s value.

This is an operational primitive, which is not part of the main Sodium API. It breaks the property of non-detectability of cell steps/updates. The rule with this primitive is that you should only use it in functions that do not allow the caller to detect the cell updates.

sodiumfrp.transaction module

class sodiumfrp.transaction.Transaction[source]

Bases: object

Functions for controlling transactions.

close() None[source]

Close this transaction. Runs all scheduled actions.

static get_current_transaction() Transaction[source]

Return the current transaction, or None if there isn’t one.

in_callback: int = 0
last(action: Callable[[], None]) None[source]

Add an action to run after all _prioritized() actions.

static on_start(runnable: Callable[[], None]) Callable[[], None][source]

Add a runnable that will be executed whenever a transaction is started. That runnable may start transactions itself, which will not cause the hooks to be run recursively.

The main use case of this is the implementation of a time/alarm system.

Returns a callable which, when called, unregisters the hook.

static post(action: Callable[[], None]) None[source]

Execute the specified code after the current transaction is closed, or immediately if there is no current transaction.

static run(code: Callable[[], T]) T[source]

Run the specified code inside a single transaction, with the contained code returning a value of the parameter type T.

In most cases this is not needed, because the primitives always create their own transaction automatically, but it is needed in some circumstances.

sodiumfrp.time module

Utilities for working with continuous time.

class sodiumfrp.time.TimerSystem[source]

Bases: object

at(alarm_time: Cell[Optional[int]]) Stream[int][source]

Returns a stream that fires events at the points in time indicated by the value of the input cell. If the value of the cell is None or less than the current time, then no events will be fired and the timer set by the previous value of the cell will be canceled.

release() None[source]
time_ms() Cell[int][source]

Returns a cell that represents current time (in milliseconds).

sodiumfrp.listener module

class sodiumfrp.listener.Listener(unlisten: Callable[[Listener], None])[source]

Bases: object

A handle for a listener that was registered with Cell.listen() or Stream.listen().

append(two: Listener) Listener[source]

Combine listeners into one so that invoking unlisten() on the returned listener will unlisten both the inputs.

unlisten() None[source]

Deregister the listener that was registered so it will no longer be called back, allowing associated resources to be garbage collected.

sodiumfrp.unit module

class sodiumfrp.unit.Unit[source]

Bases: object

Module contents