package lwt-pipeline

  1. Overview
  2. Docs
Legend:
Library
Module
Module type
Parameter
Class
Class type

Pipelines: carry a bundle of inputs through a series of transformations whilst preserving the order of the original input.

type ('a, 'b) step

Steps are the building blocks of pipeline. A step is essentially a function from a given type to another.

val sync : ('a -> 'b) -> ('a, 'b) step

sync f is the function f as a synchronous step. Data that is transformed by a synchronous function is never buffered. Instead it is transformed as it arrives.

val async_s : ('a -> 'b Lwt.t) -> ('a, 'b) step

async_s f is the function f as an asynchronous serial step. There is only ever at most one unresolved promise created by this function. If data arrives at this step and the function as already been called, the data is buffered until the promise created by the call resolves.

val async_p : ('a -> 'b Lwt.t) -> ('a, 'b) step

async_p f is the function f as an asynchronous parallel step. Multiple unresolved promise for this step can be unresolved at the same time. Data might still be buffered if the global limit on unresolved promises is reached.

Error management: these steps are helpers for managing errors through the result type.

val all_ok : ('a, ('a, 'b) result) step

all_ok is sync (fun x -> Ok x) and it is meant to inject all of the available data into the result type.

val map_in_err : ('erra -> 'errb) -> (('a, 'errb) result, 'b) step -> (('a, 'erra) result, 'b) step

map_in_err f s is a step with the same synchronicity as s. On Ok data it acts the same as s, but Error data is modified by f before being handled normally by s.

val map_out_err : ('erra -> 'errb) -> ('a, ('b, 'erra) result) step -> ('a, ('b, 'errb) result) step

map_out_err f s is a step with the same synchronicity as s. On Ok data it acts the same as s, but Error data is modified by f after being handled normally by s.

val with_err : ('a, ('b, 'err) result) step -> (('a, 'err) result, ('b, 'err) result) step

with_err s is a step with the same synchronicity as s. It acts as s on Ok data and it is a no-op on Error data.

val recover : ('err -> 'a) -> (('a, 'err) result, 'a) step

recover f is sync (function | Ok v -> v | Error e -> f e): it maps the Error data onto the same type as the Ok data and exits the result type.

val with_key : ('a, 'b) step -> ('key * 'a, 'key * 'b) step

Carrying ID through a pipeline

val init_key : ('a, 'a * 'a) step

Pipelines are essentially lists of steps.

Pipeline constructors are akin to list constructors, with less sugar.

type ('i, 'o) pipe
val nil : ('x, 'x) pipe
val cons : ('a, 'b) step -> ('b, 'c) pipe -> ('a, 'c) pipe

The recommended use for building pipelines is: cons (sync f) @@ cons (async_p g) @@ cons (async_p h) @@ nil

val run : ?pool:int -> ('i, 'o) pipe -> 'i list -> 'o list Lwt.t

Core functionality: run ?pool pipe input runs all the elements of input through the steps of pipeline. All the while it maintains the following invariants:

  • There are never more than pool unresolved high-level promises at any one time. A high-level promise is one that corresponds to a call to one of the step functions. (Note that each such promise can create additional promises which are not limited by the pool parameter of run.) By default, no limits are imposed.
  • The elements maintain the same ordering all throughout the pipeline. In other words, if x is before y in input, then for any step s, the high-level promise of s for x will be created before the high-level promise of s for y.

Exception handling: run does not attempt to handle any exception. It is up to the caller to ensure that expected exceptions are wrapped in a result constructor or some such solution. The pipeline library provides some support for result-handling.

Post-processing: useful to deal with pipeline built around error management or id-marking combinators.

val partition_by_error : ('o, 'err) result list -> 'o list * 'err list

partition_by_error sorts the results into distinct sets: successful and error results. Note that the order within each set is preserved, but the interleaving across the sets is lost.

val index_by_key : ('key * 'o) list -> 'index -> ('key -> 'o -> 'index -> 'index) -> 'index

index_by_key r init folder folds over the results r. It is intended to build maps of results. E.g., index_by_key r Map.empty Map.add.