shared-block-ring

A single-consumer single-producer queue on a block device
IN THIS PACKAGE
Module type Shared_block . S . CONSUMER
include RING
type t
type disk
type item
type error = [
| `Retry
| `Suspended
| `Msg of string
]
type 'a result = ( 'a, error ) Result.result
val pp_error : Format.formatter -> error -> unit
val open_error : 'a result -> ( 'a, [> error ] ) Result.result
val error_to_msg : 'a result -> ( 'a, msg ) Result.result
val attach : ?queue:string -> ?client:string -> disk:disk -> unit -> t result Lwt.t

attach queue client blockdevice attaches to a previously-created shared ring on top of blockdevice.

val detach : t -> unit Lwt.t

detach t frees all resources associated with t. Attempts to use t after a detach will result in an `Error _

val state : t -> [ `Running | `Suspended ] result Lwt.t

state t () queries the current state of the ring. If the result is `Suspended then the producer has acknowledged and will nolonger produce items. Clients which support suspend/resume should arrange to call this function periodically.

val debug_info : t -> (string * string) list result Lwt.t

debug_info t returns a list of key=value pairs which may be useful for debugging. Nothing should be assumed about the keys or the values; they should only be printed or logged.

type position

The position within a stream

val sexp_of_position : position -> Sexplib0.Sexp.t
include COMPARABLE with type t := position
val compare : position -> position -> [ `LessThan | `Equal | `GreaterThan ]

Compare two items

val advance : t:t -> position:position -> unit -> unit result Lwt.t

advance t position exposes the item associated with position to the Consumer so it can be popped.

val suspend : t -> unit result Lwt.t

suspend t signals that the producer should stop pushing items. Note this function returns before the producer has acknowledged. The result `Retry means that a previous call to resume has not been acknowledged; the client should retry.

val resume : t -> unit result Lwt.t

resume t signals that a producer may again start pushing items. This call does not wait for an acknowledgement from the producer. Note it is not an error to resume an already-resumed queue. The result `Retry means that a previous call to suspend has not been acknowledged; the client should retry.

val pop : t:t -> ?from:position -> unit -> (position * item) result Lwt.t

peek t ?position () returns a pair (position, item) where item is the next item on the ring after from. Repeated calls to pop will return the same item. To indicate that the item has been processed, call advance position. `Retry means there is no item available at the moment and the client should try again later.

val fold : f:( item -> 'a -> 'a ) -> t:t -> ?from:position -> init:'a -> unit -> (position * 'a) result Lwt.t

peek_all f t ?position init () folds f across all the values that can be immediately peeked from the ring. If any of the fold operations fail then the whole operation fails. The successful result includes the final position which can be used to consume all the items at once.