package async_extra

  1. Overview
  2. Docs
Legend:
Library
Module
Module type
Parameter
Class
Class type

An Async extension of Core_kernel.Bus. Functions that share the same name and types as those in Core_kernel.Bus are direct calls to the same.

include module type of struct include Core_kernel.Bus end
module Callback_arity = Core_kernel.Bus.Callback_arity

Callback_arity states the type of callbacks stored in a bus. Using Callback_arity is an implementation technique that allows callbacks to be defined as ordinary n-ary curried functions, instead of forcing n-ary-variadic callbacks to use tuples. This also avoids extra allocation.

type ('callback, 'phantom) t = ('callback, 'phantom) Core_kernel.Bus.t
include sig ... end
val sexp_of_t : ('callback -> Sexplib.Sexp.t) -> ('phantom -> Sexplib.Sexp.t) -> ('callback, 'phantom) t -> Sexplib.Sexp.t
type ('callback, 'phantom) bus = ('callback, 'phantom) t
module Read_write = Core_kernel.Bus.Read_write
module Read_only = Core_kernel.Bus.Read_only
val read_only : ('callback, _) t -> 'callback Read_only.t
val create : ?name:Core_kernel.Info.t -> Core_kernel.Source_code_position.t -> 'callback Callback_arity.t -> allow_subscription_after_first_write:Base.Bool.t -> on_callback_raise:(Core_kernel.Error.t -> Base.Unit.t) -> 'callback Read_write.t

In create [%here] ArityN ~allow_subscription_after_first_write ~on_callback_raise, [%here] is stored in the resulting bus, and contained in %sexp_of: t, which can help with debugging. If allow_subscription_after_first_write is false, then subscribe_exn will raise if it is called after write has been called the first time. If a callback raises, on_callback_raise is called with an error containing the exception. If on_callback_raise raises, then the exception is raised to write and the bus is closed.

val callback_arity : ('callback, _) t -> 'callback Callback_arity.t
val num_subscribers : (_, _) t -> Base.Int.t
val is_closed : (_, _) t -> Base.Bool.t
val close : 'callback Read_write.t -> Base.Unit.t

close disallows future writes -- once close t is called, all further calls to write t will raise. close is idempotent. If close is called from within a callback, the current message will still be sent to all subscribed callbacks that have not yet seen it before the close takes effect.

val write : 'callback Read_write.t -> 'callback

write calls all callbacks currently subscribed to the bus, with no guarantee on the order in which they will be called. write is fast and non-allocating, though the callbacks themselves may allocate.

Calling write t from within a callback on t or if is_closed t will raise.

module Subscriber = Core_kernel.Bus.Subscriber
val subscribe_exn : ?on_callback_raise:(Core_kernel.Error.t -> Base.Unit.t) -> 'callback Read_only.t -> Core_kernel.Source_code_position.t -> f:'callback -> 'callback Subscriber.t

subscribe_exn t [%here] ~f adds the callback f to the set of t's subscribers, and returns a Subscriber.t that can later be used to unsubscribe. [%here] is stored in the Subscriber.t, and contained in %sexp_of: Subscriber.t, which can help with debugging. If subscribe_exn t is called by a callback in t, i.e. during write t, the subscription takes effect for the next write, but does not affect the current write. subscribe_exn takes time proportional to the number of callbacks.

If on_callback_raise is supplied, then it will be called by write whenever f raises; only if that subsequently raises will t's on_callback_raise be called. If on_callback_raise is not supplied, then t's on_callback_raise will be called whenever f raises.

val iter_exn : 'callback Read_only.t -> Core_kernel.Source_code_position.t -> f:'callback -> Base.Unit.t

iter_exn t [%here] ~f is ignore (subscribe_exn t [%here] ~callback:f). This captures the common usage in which one never wants to unsubscribe from a bus.

module Fold_arity = Core_kernel.Bus.Fold_arity
val fold_exn : 'callback Read_only.t -> Core_kernel.Source_code_position.t -> ('callback, 'f, 's) Fold_arity.t -> init:'s -> f:'f -> Base.Unit.t

fold_exn t [%here] arity ~init ~f folds over the bus events, threading a state value to every call. It is otherwise similar to iter_exn.

val unsubscribe : 'callback Read_only.t -> 'callback Subscriber.t -> Base.Unit.t

unsubscribe t subscriber removes the callback corresponding to subscriber from t. unsubscribe never raises and is idempotent. As with subscribe_exn, unsubscribe t during write t takes effect after the current write finishes. Also like subscribe_exn, unsubscribe takes time proportional to the number of callbacks.

pipe1_exn t returns a pipe of updates from t. If the pipe is closed you will be unsubscribed. pipe1_exn raises in the same circumstances as subscribe_exn.

module First_arity : sig ... end

first_exn here t arity ~f returns a deferred that becomes determined with value r when the first event is published to t where f returns Some r. first_exn then unsubscribes from t, ensuring that f is never called again after it returns Some. first_exn raises if it can't subscribe to the bus, i.e. if subscribe_exn raises. If f raises, then first_exn raises to the monitor in effect when first_exn was called. first_exn takes time proportional to the number of bus subscribers.

If stop is provided and becomes determined, f will not be called again, it will unsubscribe from the bus, and the deferred that was returned by first_exn will never become determined.